From 3ded7e8303b4897a51ba4855798334ab56705848 Mon Sep 17 00:00:00 2001 From: Fabian Fett Date: Mon, 10 Jul 2023 18:03:14 +0200 Subject: [PATCH] Remove copy and paste within MergeStorage --- .../AsyncAlgorithms/Merge/MergeStorage.swift | 348 +++++------------- 1 file changed, 94 insertions(+), 254 deletions(-) diff --git a/Sources/AsyncAlgorithms/Merge/MergeStorage.swift b/Sources/AsyncAlgorithms/Merge/MergeStorage.swift index 7a83ad8b..42712cae 100644 --- a/Sources/AsyncAlgorithms/Merge/MergeStorage.swift +++ b/Sources/AsyncAlgorithms/Merge/MergeStorage.swift @@ -147,262 +147,12 @@ final class MergeStorage< // sequences. We must store it to cancel it at the right times. let task = Task { await withThrowingTaskGroup(of: Void.self) { group in - // For each upstream sequence we are adding a child task that - // is consuming the upstream sequence - group.addTask { - var iterator1 = base1.makeAsyncIterator() - - // This is our upstream consumption loop - loop: while true { - // We are creating a continuation before requesting the next - // element from upstream. This continuation is only resumed - // if the downstream consumer called `next` to signal his demand. - try await withUnsafeThrowingContinuation { continuation in - let action = self.lock.withLock { - self.stateMachine.childTaskSuspended(continuation) - } - - switch action { - case let .resumeContinuation(continuation): - // This happens if there is outstanding demand - // and we need to demand from upstream right away - continuation.resume(returning: ()) - - case let .resumeContinuationWithError(continuation, error): - // This happens if another upstream already failed or if - // the task got cancelled. - continuation.resume(throwing: error) - - case .none: - break - } - } - - // We got signalled from the downstream that we have demand so let's - // request a new element from the upstream - if let element1 = try await iterator1.next() { - let action = self.lock.withLock { - self.stateMachine.elementProduced(element1) - } - - switch action { - case let .resumeContinuation(continuation, element): - // We had an outstanding demand and where the first - // upstream to produce an element so we can forward it to - // the downstream - continuation.resume(returning: element) - - case .none: - break - } - - } else { - // The upstream returned `nil` which indicates that it finished - let action = self.lock.withLock { - self.stateMachine.upstreamFinished() - } - - // All of this is mostly cleanup around the Task and the outstanding - // continuations used for signalling. - switch action { - case let .resumeContinuationWithNilAndCancelTaskAndUpstreamContinuations( - downstreamContinuation, - task, - upstreamContinuations - ): - upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) } - task.cancel() - - downstreamContinuation.resume(returning: nil) - - break loop - - case let .cancelTaskAndUpstreamContinuations( - task, - upstreamContinuations - ): - upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) } - task.cancel() - - break loop - case .none: - - break loop - } - } - } - } - - // Copy from the above just using the base2 sequence - group.addTask { - var iterator2 = base2.makeAsyncIterator() - - // This is our upstream consumption loop - loop: while true { - // We are creating a continuation before requesting the next - // element from upstream. This continuation is only resumed - // if the downstream consumer called `next` to signal his demand. - try await withUnsafeThrowingContinuation { continuation in - let action = self.lock.withLock { - self.stateMachine.childTaskSuspended(continuation) - } - - switch action { - case let .resumeContinuation(continuation): - // This happens if there is outstanding demand - // and we need to demand from upstream right away - continuation.resume(returning: ()) - - case let .resumeContinuationWithError(continuation, error): - // This happens if another upstream already failed or if - // the task got cancelled. - continuation.resume(throwing: error) - - case .none: - break - } - } - - // We got signalled from the downstream that we have demand so let's - // request a new element from the upstream - if let element2 = try await iterator2.next() { - let action = self.lock.withLock { - self.stateMachine.elementProduced(element2) - } - - switch action { - case let .resumeContinuation(continuation, element): - // We had an outstanding demand and where the first - // upstream to produce an element so we can forward it to - // the downstream - continuation.resume(returning: element) - - case .none: - break - } - - } else { - // The upstream returned `nil` which indicates that it finished - let action = self.lock.withLock { - self.stateMachine.upstreamFinished() - } - - // All of this is mostly cleanup around the Task and the outstanding - // continuations used for signalling. - switch action { - case let .resumeContinuationWithNilAndCancelTaskAndUpstreamContinuations( - downstreamContinuation, - task, - upstreamContinuations - ): - upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) } - task.cancel() - - downstreamContinuation.resume(returning: nil) - - break loop - - case let .cancelTaskAndUpstreamContinuations( - task, - upstreamContinuations - ): - upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) } - task.cancel() - - break loop - case .none: - - break loop - } - } - } - } + self.iterateAsyncSequence(base1, in: &group) + self.iterateAsyncSequence(base2, in: &group) // Copy from the above just using the base3 sequence if let base3 = base3 { - group.addTask { - var iterator3 = base3.makeAsyncIterator() - - // This is our upstream consumption loop - loop: while true { - // We are creating a continuation before requesting the next - // element from upstream. This continuation is only resumed - // if the downstream consumer called `next` to signal his demand. - try await withUnsafeThrowingContinuation { continuation in - let action = self.lock.withLock { - self.stateMachine.childTaskSuspended(continuation) - } - - switch action { - case let .resumeContinuation(continuation): - // This happens if there is outstanding demand - // and we need to demand from upstream right away - continuation.resume(returning: ()) - - case let .resumeContinuationWithError(continuation, error): - // This happens if another upstream already failed or if - // the task got cancelled. - continuation.resume(throwing: error) - - case .none: - break - } - } - - // We got signalled from the downstream that we have demand so let's - // request a new element from the upstream - if let element3 = try await iterator3.next() { - let action = self.lock.withLock { - self.stateMachine.elementProduced(element3) - } - - switch action { - case let .resumeContinuation(continuation, element): - // We had an outstanding demand and where the first - // upstream to produce an element so we can forward it to - // the downstream - continuation.resume(returning: element) - - case .none: - break - } - - } else { - // The upstream returned `nil` which indicates that it finished - let action = self.lock.withLock { - self.stateMachine.upstreamFinished() - } - - // All of this is mostly cleanup around the Task and the outstanding - // continuations used for signalling. - switch action { - case let .resumeContinuationWithNilAndCancelTaskAndUpstreamContinuations( - downstreamContinuation, - task, - upstreamContinuations - ): - upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) } - task.cancel() - - downstreamContinuation.resume(returning: nil) - - break loop - - case let .cancelTaskAndUpstreamContinuations( - task, - upstreamContinuations - ): - upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) } - task.cancel() - - break loop - case .none: - - break loop - } - } - } - } + self.iterateAsyncSequence(base3, in: &group) } while !group.isEmpty { @@ -444,5 +194,95 @@ final class MergeStorage< // We need to inform our state machine that we started the Task stateMachine.taskStarted(task) } -} + private func iterateAsyncSequence( + _ base: AsyncSequence, + in taskGroup: inout ThrowingTaskGroup + ) where AsyncSequence.Element == Base1.Element { + // For each upstream sequence we are adding a child task that + // is consuming the upstream sequence + taskGroup.addTask { + var iterator = base.makeAsyncIterator() + + // This is our upstream consumption loop + loop: while true { + // We are creating a continuation before requesting the next + // element from upstream. This continuation is only resumed + // if the downstream consumer called `next` to signal his demand. + try await withUnsafeThrowingContinuation { continuation in + let action = self.lock.withLock { + self.stateMachine.childTaskSuspended(continuation) + } + + switch action { + case let .resumeContinuation(continuation): + // This happens if there is outstanding demand + // and we need to demand from upstream right away + continuation.resume(returning: ()) + + case let .resumeContinuationWithError(continuation, error): + // This happens if another upstream already failed or if + // the task got cancelled. + continuation.resume(throwing: error) + + case .none: + break + } + } + + // We got signalled from the downstream that we have demand so let's + // request a new element from the upstream + if let element1 = try await iterator.next() { + let action = self.lock.withLock { + self.stateMachine.elementProduced(element1) + } + + switch action { + case let .resumeContinuation(continuation, element): + // We had an outstanding demand and where the first + // upstream to produce an element so we can forward it to + // the downstream + continuation.resume(returning: element) + + case .none: + break + } + + } else { + // The upstream returned `nil` which indicates that it finished + let action = self.lock.withLock { + self.stateMachine.upstreamFinished() + } + + // All of this is mostly cleanup around the Task and the outstanding + // continuations used for signalling. + switch action { + case let .resumeContinuationWithNilAndCancelTaskAndUpstreamContinuations( + downstreamContinuation, + task, + upstreamContinuations + ): + upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) } + task.cancel() + + downstreamContinuation.resume(returning: nil) + + break loop + + case let .cancelTaskAndUpstreamContinuations( + task, + upstreamContinuations + ): + upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) } + task.cancel() + + break loop + case .none: + + break loop + } + } + } + } + } +}