Skip to content

Reduce copy and paste within MergeStorage #275

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 11, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
348 changes: 94 additions & 254 deletions Sources/AsyncAlgorithms/Merge/MergeStorage.swift
Original file line number Diff line number Diff line change
Expand Up @@ -147,262 +147,12 @@ final class MergeStorage<
// sequences. We must store it to cancel it at the right times.
let task = Task {
await withThrowingTaskGroup(of: Void.self) { group in
// For each upstream sequence we are adding a child task that
// is consuming the upstream sequence
group.addTask {
var iterator1 = base1.makeAsyncIterator()

// This is our upstream consumption loop
loop: while true {
// We are creating a continuation before requesting the next
// element from upstream. This continuation is only resumed
// if the downstream consumer called `next` to signal his demand.
try await withUnsafeThrowingContinuation { continuation in
let action = self.lock.withLock {
self.stateMachine.childTaskSuspended(continuation)
}

switch action {
case let .resumeContinuation(continuation):
// This happens if there is outstanding demand
// and we need to demand from upstream right away
continuation.resume(returning: ())

case let .resumeContinuationWithError(continuation, error):
// This happens if another upstream already failed or if
// the task got cancelled.
continuation.resume(throwing: error)

case .none:
break
}
}

// We got signalled from the downstream that we have demand so let's
// request a new element from the upstream
if let element1 = try await iterator1.next() {
let action = self.lock.withLock {
self.stateMachine.elementProduced(element1)
}

switch action {
case let .resumeContinuation(continuation, element):
// We had an outstanding demand and where the first
// upstream to produce an element so we can forward it to
// the downstream
continuation.resume(returning: element)

case .none:
break
}

} else {
// The upstream returned `nil` which indicates that it finished
let action = self.lock.withLock {
self.stateMachine.upstreamFinished()
}

// All of this is mostly cleanup around the Task and the outstanding
// continuations used for signalling.
switch action {
case let .resumeContinuationWithNilAndCancelTaskAndUpstreamContinuations(
downstreamContinuation,
task,
upstreamContinuations
):
upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) }
task.cancel()

downstreamContinuation.resume(returning: nil)

break loop

case let .cancelTaskAndUpstreamContinuations(
task,
upstreamContinuations
):
upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) }
task.cancel()

break loop
case .none:

break loop
}
}
}
}

// Copy from the above just using the base2 sequence
group.addTask {
var iterator2 = base2.makeAsyncIterator()

// This is our upstream consumption loop
loop: while true {
// We are creating a continuation before requesting the next
// element from upstream. This continuation is only resumed
// if the downstream consumer called `next` to signal his demand.
try await withUnsafeThrowingContinuation { continuation in
let action = self.lock.withLock {
self.stateMachine.childTaskSuspended(continuation)
}

switch action {
case let .resumeContinuation(continuation):
// This happens if there is outstanding demand
// and we need to demand from upstream right away
continuation.resume(returning: ())

case let .resumeContinuationWithError(continuation, error):
// This happens if another upstream already failed or if
// the task got cancelled.
continuation.resume(throwing: error)

case .none:
break
}
}

// We got signalled from the downstream that we have demand so let's
// request a new element from the upstream
if let element2 = try await iterator2.next() {
let action = self.lock.withLock {
self.stateMachine.elementProduced(element2)
}

switch action {
case let .resumeContinuation(continuation, element):
// We had an outstanding demand and where the first
// upstream to produce an element so we can forward it to
// the downstream
continuation.resume(returning: element)

case .none:
break
}

} else {
// The upstream returned `nil` which indicates that it finished
let action = self.lock.withLock {
self.stateMachine.upstreamFinished()
}

// All of this is mostly cleanup around the Task and the outstanding
// continuations used for signalling.
switch action {
case let .resumeContinuationWithNilAndCancelTaskAndUpstreamContinuations(
downstreamContinuation,
task,
upstreamContinuations
):
upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) }
task.cancel()

downstreamContinuation.resume(returning: nil)

break loop

case let .cancelTaskAndUpstreamContinuations(
task,
upstreamContinuations
):
upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) }
task.cancel()

break loop
case .none:

break loop
}
}
}
}
self.iterateAsyncSequence(base1, in: &group)
self.iterateAsyncSequence(base2, in: &group)

// Copy from the above just using the base3 sequence
if let base3 = base3 {
group.addTask {
var iterator3 = base3.makeAsyncIterator()

// This is our upstream consumption loop
loop: while true {
// We are creating a continuation before requesting the next
// element from upstream. This continuation is only resumed
// if the downstream consumer called `next` to signal his demand.
try await withUnsafeThrowingContinuation { continuation in
let action = self.lock.withLock {
self.stateMachine.childTaskSuspended(continuation)
}

switch action {
case let .resumeContinuation(continuation):
// This happens if there is outstanding demand
// and we need to demand from upstream right away
continuation.resume(returning: ())

case let .resumeContinuationWithError(continuation, error):
// This happens if another upstream already failed or if
// the task got cancelled.
continuation.resume(throwing: error)

case .none:
break
}
}

// We got signalled from the downstream that we have demand so let's
// request a new element from the upstream
if let element3 = try await iterator3.next() {
let action = self.lock.withLock {
self.stateMachine.elementProduced(element3)
}

switch action {
case let .resumeContinuation(continuation, element):
// We had an outstanding demand and where the first
// upstream to produce an element so we can forward it to
// the downstream
continuation.resume(returning: element)

case .none:
break
}

} else {
// The upstream returned `nil` which indicates that it finished
let action = self.lock.withLock {
self.stateMachine.upstreamFinished()
}

// All of this is mostly cleanup around the Task and the outstanding
// continuations used for signalling.
switch action {
case let .resumeContinuationWithNilAndCancelTaskAndUpstreamContinuations(
downstreamContinuation,
task,
upstreamContinuations
):
upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) }
task.cancel()

downstreamContinuation.resume(returning: nil)

break loop

case let .cancelTaskAndUpstreamContinuations(
task,
upstreamContinuations
):
upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) }
task.cancel()

break loop
case .none:

break loop
}
}
}
}
self.iterateAsyncSequence(base3, in: &group)
}

while !group.isEmpty {
Expand Down Expand Up @@ -444,5 +194,95 @@ final class MergeStorage<
// We need to inform our state machine that we started the Task
stateMachine.taskStarted(task)
}
}

private func iterateAsyncSequence<AsyncSequence: _Concurrency.AsyncSequence>(
_ base: AsyncSequence,
in taskGroup: inout ThrowingTaskGroup<Void, Error>
) where AsyncSequence.Element == Base1.Element {
// For each upstream sequence we are adding a child task that
// is consuming the upstream sequence
taskGroup.addTask {
var iterator = base.makeAsyncIterator()

// This is our upstream consumption loop
loop: while true {
// We are creating a continuation before requesting the next
// element from upstream. This continuation is only resumed
// if the downstream consumer called `next` to signal his demand.
try await withUnsafeThrowingContinuation { continuation in
let action = self.lock.withLock {
self.stateMachine.childTaskSuspended(continuation)
}

switch action {
case let .resumeContinuation(continuation):
// This happens if there is outstanding demand
// and we need to demand from upstream right away
continuation.resume(returning: ())

case let .resumeContinuationWithError(continuation, error):
// This happens if another upstream already failed or if
// the task got cancelled.
continuation.resume(throwing: error)

case .none:
break
}
}

// We got signalled from the downstream that we have demand so let's
// request a new element from the upstream
if let element1 = try await iterator.next() {
let action = self.lock.withLock {
self.stateMachine.elementProduced(element1)
}

switch action {
case let .resumeContinuation(continuation, element):
// We had an outstanding demand and where the first
// upstream to produce an element so we can forward it to
// the downstream
continuation.resume(returning: element)

case .none:
break
}

} else {
// The upstream returned `nil` which indicates that it finished
let action = self.lock.withLock {
self.stateMachine.upstreamFinished()
}

// All of this is mostly cleanup around the Task and the outstanding
// continuations used for signalling.
switch action {
case let .resumeContinuationWithNilAndCancelTaskAndUpstreamContinuations(
downstreamContinuation,
task,
upstreamContinuations
):
upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) }
task.cancel()

downstreamContinuation.resume(returning: nil)

break loop

case let .cancelTaskAndUpstreamContinuations(
task,
upstreamContinuations
):
upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) }
task.cancel()

break loop
case .none:

break loop
}
}
}
}
}
}