From 259d1882027075a69be704fb9a2db986ebc0bba2 Mon Sep 17 00:00:00 2001 From: Franz Busch Date: Mon, 27 Feb 2023 14:16:24 +0000 Subject: [PATCH] Switch from `group.waitForAll()` to `group.next()` # Motivation Swift 5.8 is including a change to how `group.waitForAll()` is working. It now properly waits for all tasks to finish even if one of the tasks throws. We have used `group.waitForAll()` in multiple places and need to change this code accordingly. # Modification Switch code from `group.waitForAll()` to `group.next()`. # Result This fixes a few stuck tests that we have seen when running against development snapshots. --- .../CombineLatestStateMachine.swift | 3 +- .../CombineLatest/CombineLatestStorage.swift | 54 ++++++++-------- .../Debounce/DebounceStateMachine.swift | 4 +- .../Debounce/DebounceStorage.swift | 64 ++++++++++--------- .../AsyncAlgorithms/Merge/MergeStorage.swift | 63 +++++++++--------- Sources/AsyncAlgorithms/Zip/ZipStorage.swift | 48 +++++++------- 6 files changed, 118 insertions(+), 118 deletions(-) diff --git a/Sources/AsyncAlgorithms/CombineLatest/CombineLatestStateMachine.swift b/Sources/AsyncAlgorithms/CombineLatest/CombineLatestStateMachine.swift index 34f4dead..71d0507a 100644 --- a/Sources/AsyncAlgorithms/CombineLatest/CombineLatestStateMachine.swift +++ b/Sources/AsyncAlgorithms/CombineLatest/CombineLatestStateMachine.swift @@ -530,7 +530,8 @@ struct CombineLatestStateMachine< preconditionFailure("Internal inconsistency current state \(self.state) and received upstreamThrew()") case .upstreamsFinished: - preconditionFailure("Internal inconsistency current state \(self.state) and received upstreamThrew()") + // We need to tolerate multiple upstreams failing + return .none case .waitingForDemand(let task, let upstreams, _): // An upstream threw. We can cancel everything now and transition to finished. diff --git a/Sources/AsyncAlgorithms/CombineLatest/CombineLatestStorage.swift b/Sources/AsyncAlgorithms/CombineLatest/CombineLatestStorage.swift index 31245579..bc32cee8 100644 --- a/Sources/AsyncAlgorithms/CombineLatest/CombineLatestStorage.swift +++ b/Sources/AsyncAlgorithms/CombineLatest/CombineLatestStorage.swift @@ -319,35 +319,33 @@ final class CombineLatestStorage< } } - do { - try await group.waitForAll() - } catch { - // One of the upstream sequences threw an error - self.stateMachine.withCriticalRegion { stateMachine in - let action = stateMachine.upstreamThrew(error) - - switch action { - case .cancelTaskAndUpstreamContinuations(let task, let upstreamContinuations): - upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) } - task.cancel() - - case .resumeContinuationWithErrorAndCancelTaskAndUpstreamContinuations( - let downstreamContinuation, - let error, - let task, - let upstreamContinuations - ): - upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) } - task.cancel() - - downstreamContinuation.resume(returning: .failure(error)) - - case .none: - break - } - } + while !group.isEmpty { + do { + try await group.next() + } catch { + // One of the upstream sequences threw an error + self.stateMachine.withCriticalRegion { stateMachine in + let action = stateMachine.upstreamThrew(error) + switch action { + case .cancelTaskAndUpstreamContinuations(let task, let upstreamContinuations): + upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) } + task.cancel() + case .resumeContinuationWithErrorAndCancelTaskAndUpstreamContinuations( + let downstreamContinuation, + let error, + let task, + let upstreamContinuations + ): + upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) } + task.cancel() + downstreamContinuation.resume(returning: .failure(error)) + case .none: + break + } + } - group.cancelAll() + group.cancelAll() + } } } } diff --git a/Sources/AsyncAlgorithms/Debounce/DebounceStateMachine.swift b/Sources/AsyncAlgorithms/Debounce/DebounceStateMachine.swift index bd111ab1..98c23287 100644 --- a/Sources/AsyncAlgorithms/Debounce/DebounceStateMachine.swift +++ b/Sources/AsyncAlgorithms/Debounce/DebounceStateMachine.swift @@ -390,8 +390,8 @@ struct DebounceStateMachine { preconditionFailure("Internal inconsistency current state \(self.state) and received upstreamFinished()") case .upstreamFailure: - // The upstream already failed so it should never have throw again. - preconditionFailure("Internal inconsistency current state \(self.state) and received childTaskSuspended()") + // We need to tolerate multiple upstreams failing + return .none case .waitingForDemand(let task, .none, let clockContinuation, .none): // We don't have any buffered element so we can just go ahead diff --git a/Sources/AsyncAlgorithms/Debounce/DebounceStorage.swift b/Sources/AsyncAlgorithms/Debounce/DebounceStorage.swift index 21e1ddf6..cd1bb515 100644 --- a/Sources/AsyncAlgorithms/Debounce/DebounceStorage.swift +++ b/Sources/AsyncAlgorithms/Debounce/DebounceStorage.swift @@ -254,39 +254,41 @@ final class DebounceStorage: @unchecked Sendable } } - do { - try await group.waitForAll() - } catch { - // The upstream sequence threw an error - let action = self.stateMachine.withCriticalRegion { $0.upstreamThrew(error) } + while !group.isEmpty { + do { + try await group.next() + } catch { + // One of the upstream sequences threw an error + self.stateMachine.withCriticalRegion { stateMachine in + let action = stateMachine.upstreamThrew(error) + switch action { + case .resumeContinuationWithErrorAndCancelTaskAndUpstreamContinuation( + let downstreamContinuation, + let error, + let task, + let upstreamContinuation, + let clockContinuation + ): + upstreamContinuation?.resume(throwing: CancellationError()) + clockContinuation?.resume(throwing: CancellationError()) - switch action { - case .resumeContinuationWithErrorAndCancelTaskAndUpstreamContinuation( - let downstreamContinuation, - let error, - let task, - let upstreamContinuation, - let clockContinuation - ): - upstreamContinuation?.resume(throwing: CancellationError()) - clockContinuation?.resume(throwing: CancellationError()) - - task.cancel() - - downstreamContinuation.resume(returning: .failure(error)) - - case .cancelTaskAndClockContinuation( - let task, - let clockContinuation - ): - clockContinuation?.resume(throwing: CancellationError()) - task.cancel() - - case .none: - break - } + task.cancel() + + downstreamContinuation.resume(returning: .failure(error)) - group.cancelAll() + case .cancelTaskAndClockContinuation( + let task, + let clockContinuation + ): + clockContinuation?.resume(throwing: CancellationError()) + task.cancel() + case .none: + break + } + } + + group.cancelAll() + } } } } diff --git a/Sources/AsyncAlgorithms/Merge/MergeStorage.swift b/Sources/AsyncAlgorithms/Merge/MergeStorage.swift index de4c72b8..7a83ad8b 100644 --- a/Sources/AsyncAlgorithms/Merge/MergeStorage.swift +++ b/Sources/AsyncAlgorithms/Merge/MergeStorage.swift @@ -404,40 +404,39 @@ final class MergeStorage< } } } - - do { - try await group.waitForAll() - } catch { + + while !group.isEmpty { + do { + try await group.next() + } catch { // One of the upstream sequences threw an error - let action = self.lock.withLock { - self.stateMachine.upstreamThrew(error) - } - - switch action { - case let .resumeContinuationWithErrorAndCancelTaskAndUpstreamContinuations( - downstreamContinuation, - error, - task, - upstreamContinuations - ): - upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) } - - task.cancel() - - downstreamContinuation.resume(throwing: error) - case let .cancelTaskAndUpstreamContinuations( - task, - upstreamContinuations - ): - upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) } - - task.cancel() - - case .none: - break + let action = self.lock.withLock { + self.stateMachine.upstreamThrew(error) + } + switch action { + case let .resumeContinuationWithErrorAndCancelTaskAndUpstreamContinuations( + downstreamContinuation, + error, + task, + upstreamContinuations + ): + upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) } + + task.cancel() + + downstreamContinuation.resume(throwing: error) + case let .cancelTaskAndUpstreamContinuations( + task, + upstreamContinuations + ): + upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) } + + task.cancel() + case .none: + break + } + group.cancelAll() } - - group.cancelAll() } } } diff --git a/Sources/AsyncAlgorithms/Zip/ZipStorage.swift b/Sources/AsyncAlgorithms/Zip/ZipStorage.swift index d9fb3a82..f997e681 100644 --- a/Sources/AsyncAlgorithms/Zip/ZipStorage.swift +++ b/Sources/AsyncAlgorithms/Zip/ZipStorage.swift @@ -286,31 +286,31 @@ final class ZipStorage