diff --git a/Sources/AsyncAlgorithms/AsyncCombineLatest2Sequence.swift b/Sources/AsyncAlgorithms/AsyncCombineLatest2Sequence.swift deleted file mode 100644 index 9db4d34a..00000000 --- a/Sources/AsyncAlgorithms/AsyncCombineLatest2Sequence.swift +++ /dev/null @@ -1,309 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Swift Async Algorithms open source project -// -// Copyright (c) 2022 Apple Inc. and the Swift project authors -// Licensed under Apache License v2.0 with Runtime Library Exception -// -// See https://swift.org/LICENSE.txt for license information -// -//===----------------------------------------------------------------------===// - -/// Creates an asynchronous sequence that combines the latest values from two `AsyncSequence` types -/// by emitting a tuple of the values. -public func combineLatest(_ base1: Base1, _ base2: Base2) -> AsyncCombineLatest2Sequence { - AsyncCombineLatest2Sequence(base1, base2) -} - -/// An `AsyncSequence` that combines the latest values produced from two asynchronous sequences into an asynchronous sequence of tuples. -public struct AsyncCombineLatest2Sequence: Sendable - where - Base1: Sendable, Base2: Sendable, - Base1.Element: Sendable, Base2.Element: Sendable, - Base1.AsyncIterator: Sendable, Base2.AsyncIterator: Sendable { - let base1: Base1 - let base2: Base2 - - init(_ base1: Base1, _ base2: Base2) { - self.base1 = base1 - self.base2 = base2 - } -} - -extension AsyncCombineLatest2Sequence: AsyncSequence { - public typealias Element = (Base1.Element, Base2.Element) - - /// The iterator for a `AsyncCombineLatest2Sequence` instance. - public struct Iterator: AsyncIteratorProtocol, Sendable { - enum Partial: Sendable { - case first(Result, Base1.AsyncIterator) - case second(Result, Base2.AsyncIterator) - } - - enum State { - case initial(Base1.AsyncIterator, Base2.AsyncIterator) - case idle(Base1.AsyncIterator, Base2.AsyncIterator, (Base1.Element, Base2.Element)) - case firstActiveSecondIdle(Task, Base2.AsyncIterator, (Base1.Element, Base2.Element)) - case firstIdleSecondActive(Base1.AsyncIterator, Task, (Base1.Element, Base2.Element)) - case firstTerminalSecondIdle(Base2.AsyncIterator, (Base1.Element, Base2.Element)) - case firstIdleSecondTerminal(Base1.AsyncIterator, (Base1.Element, Base2.Element)) - case terminal - } - - var state: State - - init(_ base1: Base1.AsyncIterator, _ base2: Base2.AsyncIterator) { - state = .initial(base1, base2) - } - - public mutating func next() async rethrows -> (Base1.Element, Base2.Element)? { - let task1: Task - let task2: Task - var current: (Base1.Element, Base2.Element) - - switch state { - case .initial(let iterator1, let iterator2): - func iteration( - _ group: inout TaskGroup, - _ value1: inout Base1.Element?, - _ value2: inout Base2.Element?, - _ iterator1: inout Base1.AsyncIterator?, - _ iterator2: inout Base2.AsyncIterator? - ) async -> Result<(Base1.Element, Base2.Element)?, Error>? { - guard let partial = await group.next() else { - return .success(nil) - } - switch partial { - case .first(let res, let iter): - switch res { - case .success(let value): - if let value = value { - value1 = value - iterator1 = iter - return nil - } else { - group.cancelAll() - return .success(nil) - } - case .failure(let error): - group.cancelAll() - return .failure(error) - } - case .second(let res, let iter): - switch res { - case .success(let value): - if let value = value { - value2 = value - iterator2 = iter - return nil - } else { - group.cancelAll() - return .success(nil) - } - case .failure(let error): - group.cancelAll() - return .failure(error) - } - } - } - - let (result, iter1, iter2) = await withTaskGroup(of: Partial.self) { group -> (Result<(Base1.Element, Base2.Element)?, Error>, Base1.AsyncIterator?, Base2.AsyncIterator?) in - group.addTask { - var iterator = iterator1 - do { - let value = try await iterator.next() - return .first(.success(value), iterator) - } catch { - return .first(.failure(error), iterator) - } - } - group.addTask { - var iterator = iterator2 - do { - let value = try await iterator.next() - return .second(.success(value), iterator) - } catch { - return .second(.failure(error), iterator) - } - } - - var res1: Base1.Element? - var res2: Base2.Element? - var iter1: Base1.AsyncIterator? - var iter2: Base2.AsyncIterator? - - if let result = await iteration(&group, &res1, &res2, &iter1, &iter2) { - return (result, nil, nil) - } - if let result = await iteration(&group, &res1, &res2, &iter1, &iter2) { - return (result, nil, nil) - } - guard let res1 = res1, let res2 = res2 else { - return (.success(nil), nil, nil) - } - - return (.success((res1, res2)), iter1, iter2) - } - do { - // make sure to get the result first just in case it has a failure embedded - guard let value = try result._rethrowGet() else { - state = .terminal - return nil - } - guard let iter1 = iter1, let iter2 = iter2 else { - state = .terminal - return nil - } - state = .idle(iter1, iter2, value) - return value - } catch { - state = .terminal - throw error - } - case .idle(let iterator1, let iterator2, let value): - task1 = Task { - var iterator = iterator1 - do { - let value = try await iterator.next() - return .first(.success(value), iterator) - } catch { - return .first(.failure(error), iterator) - } - } - task2 = Task { - var iterator = iterator2 - do { - let value = try await iterator.next() - return .second(.success(value), iterator) - } catch { - return .second(.failure(error), iterator) - } - } - current = value - case .firstActiveSecondIdle(let task, let iterator2, let value): - task1 = task - task2 = Task { - var iterator = iterator2 - do { - let value = try await iterator.next() - return .second(.success(value), iterator) - } catch { - return .second(.failure(error), iterator) - } - } - current = value - case .firstIdleSecondActive(let iterator1, let task, let value): - task1 = Task { - var iterator = iterator1 - do { - let value = try await iterator.next() - return .first(.success(value), iterator) - } catch { - return .first(.failure(error), iterator) - } - } - task2 = task - current = value - case .firstTerminalSecondIdle(var iterator, var current): - do { - guard let member = try await iterator.next() else { - state = .terminal - return nil - } - current.1 = member - state = .firstTerminalSecondIdle(iterator, current) - return current - } catch { - state = .terminal - throw error - } - case .firstIdleSecondTerminal(var iterator, var current): - do { - guard let member = try await iterator.next() else { - state = .terminal - return nil - } - current.0 = member - state = .firstIdleSecondTerminal(iterator, current) - return current - } catch { - state = .terminal - throw error - } - case .terminal: - return nil - } - switch await Task.select(task1, task2).value { - case .first(let result, let iterator): - switch result { - case .success(let member): - if let member = member { - current.0 = member - state = .firstIdleSecondActive(iterator, task2, current) - } else { - switch await task2.value { - case .first: - fatalError() - case .second(let result, let iterator): - switch result { - case .success(let member): - if let member = member { - current.1 = member - state = .firstTerminalSecondIdle(iterator, current) - return current - } else { - state = .terminal - return nil - } - case .failure: - state = .terminal - try result._rethrowError() - } - } - } - case .failure: - state = .terminal - task2.cancel() - try result._rethrowError() - } - case .second(let result, let iterator): - switch result { - case .success(let member): - if let member = member { - current.1 = member - state = .firstActiveSecondIdle(task1, iterator, current) - } else { - switch await task1.value { - case .first(let result, let iterator): - switch result { - case .success(let member): - if let member = member { - current.0 = member - state = .firstIdleSecondTerminal(iterator, current) - return current - } else { - state = .terminal - return nil - } - case .failure: - state = .terminal - try result._rethrowError() - } - case .second: - fatalError() - } - } - case .failure: - state = .terminal - task2.cancel() - try result._rethrowError() - } - } - return current - } - } - - public func makeAsyncIterator() -> Iterator { - Iterator(base1.makeAsyncIterator(), base2.makeAsyncIterator()) - } -} diff --git a/Sources/AsyncAlgorithms/AsyncCombineLatest3Sequence.swift b/Sources/AsyncAlgorithms/AsyncCombineLatest3Sequence.swift deleted file mode 100644 index e182a435..00000000 --- a/Sources/AsyncAlgorithms/AsyncCombineLatest3Sequence.swift +++ /dev/null @@ -1,57 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Swift Async Algorithms open source project -// -// Copyright (c) 2022 Apple Inc. and the Swift project authors -// Licensed under Apache License v2.0 with Runtime Library Exception -// -// See https://swift.org/LICENSE.txt for license information -// -//===----------------------------------------------------------------------===// - -/// Creates an asynchronous sequence that combines the latest values from three `AsyncSequence` types -/// by emitting a tuple of the values. -public func combineLatest(_ base1: Base1, _ base2: Base2, _ base3: Base3) -> AsyncCombineLatest3Sequence { - AsyncCombineLatest3Sequence(base1, base2, base3) -} - -/// An `AsyncSequence` that combines the latest values produced from three asynchronous sequences into an asynchronous sequence of tuples. -public struct AsyncCombineLatest3Sequence: Sendable - where - Base1: Sendable, Base2: Sendable, Base3: Sendable, - Base1.Element: Sendable, Base2.Element: Sendable, Base3.Element: Sendable, - Base1.AsyncIterator: Sendable, Base2.AsyncIterator: Sendable, Base3.AsyncIterator: Sendable { - let base1: Base1 - let base2: Base2 - let base3: Base3 - - init(_ base1: Base1, _ base2: Base2, _ base3: Base3) { - self.base1 = base1 - self.base2 = base2 - self.base3 = base3 - } -} - -extension AsyncCombineLatest3Sequence: AsyncSequence { - public typealias Element = (Base1.Element, Base2.Element, Base3.Element) - - /// The iterator for a `AsyncCombineLatest3Sequence` instance. - public struct Iterator: AsyncIteratorProtocol, Sendable { - var iterator: AsyncCombineLatest2Sequence, Base3>.Iterator - - init(_ base1: Base1.AsyncIterator, _ base2: Base2.AsyncIterator, _ base3: Base3.AsyncIterator) { - iterator = AsyncCombineLatest2Sequence, Base3>.Iterator(AsyncCombineLatest2Sequence.Iterator(base1, base2), base3) - } - - public mutating func next() async rethrows -> (Base1.Element, Base2.Element, Base3.Element)? { - guard let value = try await iterator.next() else { - return nil - } - return (value.0.0, value.0.1, value.1) - } - } - - public func makeAsyncIterator() -> Iterator { - Iterator(base1.makeAsyncIterator(), base2.makeAsyncIterator(), base3.makeAsyncIterator()) - } -} diff --git a/Sources/AsyncAlgorithms/CombineLatest/AsyncCombineLatest2Sequence.swift b/Sources/AsyncAlgorithms/CombineLatest/AsyncCombineLatest2Sequence.swift new file mode 100644 index 00000000..8ff0c038 --- /dev/null +++ b/Sources/AsyncAlgorithms/CombineLatest/AsyncCombineLatest2Sequence.swift @@ -0,0 +1,89 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2022 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +/// Creates an asynchronous sequence that combines the latest values from two `AsyncSequence` types +/// by emitting a tuple of the values. ``combineLatest(_:_:)`` only emits a value whenever any of the base `AsyncSequence`s +/// emit a value (so long as each of the bases have emitted at least one value). +/// +/// Finishes: +/// ``combineLatest(_:_:)`` finishes when one of the bases finishes before emitting any value or +/// when all bases finished. +/// +/// Throws: +/// ``combineLatest(_:_:)`` throws when one of the bases throws. If one of the bases threw any buffered and not yet consumed +/// values will be dropped. +public func combineLatest< + Base1: AsyncSequence, + Base2: AsyncSequence +>(_ base1: Base1, _ base2: Base2) -> AsyncCombineLatest2Sequence where + Base1: Sendable, + Base1.Element: Sendable, + Base2: Sendable, + Base2.Element: Sendable { + AsyncCombineLatest2Sequence(base1, base2) +} + +/// An `AsyncSequence` that combines the latest values produced from two asynchronous sequences into an asynchronous sequence of tuples. +public struct AsyncCombineLatest2Sequence< + Base1: AsyncSequence, + Base2: AsyncSequence +>: AsyncSequence where + Base1: Sendable, + Base1.Element: Sendable, + Base2: Sendable, + Base2.Element: Sendable { + public typealias Element = (Base1.Element, Base2.Element) + public typealias AsyncIterator = Iterator + + let base1: Base1 + let base2: Base2 + + public init(_ base1: Base1, _ base2: Base2) { + self.base1 = base1 + self.base2 = base2 + } + + public func makeAsyncIterator() -> AsyncIterator { + Iterator(storage: .init(self.base1, self.base2, nil)) + } + + public struct Iterator: AsyncIteratorProtocol { + final class InternalClass { + private let storage: CombineLatestStorage + + fileprivate init(storage: CombineLatestStorage) { + self.storage = storage + } + + deinit { + self.storage.iteratorDeinitialized() + } + + func next() async rethrows -> Element? { + guard let element = try await self.storage.next() else { + return nil + } + + return (element.0, element.1) + } + } + + let internalClass: InternalClass + + fileprivate init(storage: CombineLatestStorage) { + self.internalClass = InternalClass(storage: storage) + } + + public mutating func next() async rethrows -> Element? { + try await self.internalClass.next() + } + } +} diff --git a/Sources/AsyncAlgorithms/CombineLatest/AsyncCombineLatest3Sequence.swift b/Sources/AsyncAlgorithms/CombineLatest/AsyncCombineLatest3Sequence.swift new file mode 100644 index 00000000..0ba733e2 --- /dev/null +++ b/Sources/AsyncAlgorithms/CombineLatest/AsyncCombineLatest3Sequence.swift @@ -0,0 +1,99 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2022 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +/// Creates an asynchronous sequence that combines the latest values from three `AsyncSequence` types +/// by emitting a tuple of the values. ``combineLatest(_:_:_:)`` only emits a value whenever any of the base `AsyncSequence`s +/// emit a value (so long as each of the bases have emitted at least one value). +/// +/// Finishes: +/// ``combineLatest(_:_:_:)`` finishes when one of the bases finishes before emitting any value or +/// when all bases finished. +/// +/// Throws: +/// ``combineLatest(_:_:_:)`` throws when one of the bases throws. If one of the bases threw any buffered and not yet consumed +/// values will be dropped. +public func combineLatest< + Base1: AsyncSequence, + Base2: AsyncSequence, + Base3: AsyncSequence +>(_ base1: Base1, _ base2: Base2, _ base3: Base3) -> AsyncCombineLatest3Sequence where + Base1: Sendable, + Base1.Element: Sendable, + Base2: Sendable, + Base2.Element: Sendable, + Base3: Sendable, + Base3.Element: Sendable { + AsyncCombineLatest3Sequence(base1, base2, base3) +} + +/// An `AsyncSequence` that combines the latest values produced from three asynchronous sequences into an asynchronous sequence of tuples. +public struct AsyncCombineLatest3Sequence< + Base1: AsyncSequence, + Base2: AsyncSequence, + Base3: AsyncSequence +>: AsyncSequence where + Base1: Sendable, + Base1.Element: Sendable, + Base2: Sendable, + Base2.Element: Sendable, + Base3: Sendable, + Base3.Element: Sendable { + public typealias Element = (Base1.Element, Base2.Element, Base3.Element) + public typealias AsyncIterator = Iterator + + let base1: Base1 + let base2: Base2 + let base3: Base3 + + init(_ base1: Base1, _ base2: Base2, _ base3: Base3) { + self.base1 = base1 + self.base2 = base2 + self.base3 = base3 + } + + public func makeAsyncIterator() -> AsyncIterator { + Iterator(storage: .init(self.base1, self.base2, self.base3) + ) + } + + public struct Iterator: AsyncIteratorProtocol { + final class InternalClass { + private let storage: CombineLatestStorage + + fileprivate init(storage: CombineLatestStorage) { + self.storage = storage + } + + deinit { + self.storage.iteratorDeinitialized() + } + + func next() async rethrows -> Element? { + guard let element = try await self.storage.next() else { + return nil + } + + // This force unwrap is safe since there must be a third element. + return (element.0, element.1, element.2!) + } + } + + let internalClass: InternalClass + + fileprivate init(storage: CombineLatestStorage) { + self.internalClass = InternalClass(storage: storage) + } + + public mutating func next() async rethrows -> Element? { + try await self.internalClass.next() + } + } +} diff --git a/Sources/AsyncAlgorithms/CombineLatest/CombineLatestStateMachine.swift b/Sources/AsyncAlgorithms/CombineLatest/CombineLatestStateMachine.swift new file mode 100644 index 00000000..38c8f5a9 --- /dev/null +++ b/Sources/AsyncAlgorithms/CombineLatest/CombineLatestStateMachine.swift @@ -0,0 +1,722 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2022 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +import DequeModule + +/// State machine for combine latest +struct CombineLatestStateMachine< + Base1: AsyncSequence, + Base2: AsyncSequence, + Base3: AsyncSequence +>: Sendable where + Base1: Sendable, + Base2: Sendable, + Base3: Sendable, + Base1.Element: Sendable, + Base2.Element: Sendable, + Base3.Element: Sendable { + typealias DownstreamContinuation = UnsafeContinuation, Never> + + private enum State: Sendable { + /// Small wrapper for the state of an upstream sequence. + struct Upstream: Sendable { + /// The upstream continuation. + var continuation: UnsafeContinuation? + /// The produced upstream element. + var element: Element? + /// Indicates wether the upstream finished/threw already + var isFinished: Bool + } + + /// The initial state before a call to `next` happened. + case initial(base1: Base1, base2: Base2, base3: Base3?) + + /// The state while we are waiting for downstream demand. + case waitingForDemand( + task: Task, + upstreams: (Upstream, Upstream, Upstream), + buffer: Deque<(Base1.Element, Base2.Element, Base3.Element?)> + ) + + /// The state while we are consuming the upstream and waiting until we get a result from all upstreams. + case combining( + task: Task, + upstreams: (Upstream, Upstream, Upstream), + downstreamContinuation: DownstreamContinuation, + buffer: Deque<(Base1.Element, Base2.Element, Base3.Element?)> + ) + + case upstreamsFinished( + buffer: Deque<(Base1.Element, Base2.Element, Base3.Element?)> + ) + + case upstreamThrew( + error: Error + ) + + /// The state once the downstream consumer stopped, i.e. by dropping all references + /// or by getting their `Task` cancelled. + case finished + + /// Internal state to avoid CoW. + case modifying + } + + private var state: State + + private let numberOfUpstreamSequences: Int + + /// Initializes a new `StateMachine`. + init( + base1: Base1, + base2: Base2, + base3: Base3? + ) { + self.state = .initial( + base1: base1, + base2: base2, + base3: base3 + ) + + if base3 == nil { + self.numberOfUpstreamSequences = 2 + } else { + self.numberOfUpstreamSequences = 3 + } + } + + /// Actions returned by `iteratorDeinitialized()`. + enum IteratorDeinitializedAction { + /// Indicates that the `Task` needs to be cancelled and + /// the upstream continuations need to be resumed with a `CancellationError`. + case cancelTaskAndUpstreamContinuations( + task: Task, + upstreamContinuations: [UnsafeContinuation] + ) + } + + mutating func iteratorDeinitialized() -> IteratorDeinitializedAction? { + switch self.state { + case .initial: + // Nothing to do here. No demand was signalled until now + return .none + + case .combining: + // An iterator was deinitialized while we have a suspended continuation. + preconditionFailure("Internal inconsistency current state \(self.state) and received iteratorDeinitialized()") + + case .waitingForDemand(let task, let upstreams, _): + // The iterator was dropped which signals that the consumer is finished. + // We can transition to finished now and need to clean everything up. + self.state = .finished + + return .cancelTaskAndUpstreamContinuations( + task: task, + upstreamContinuations: [upstreams.0.continuation, upstreams.1.continuation, upstreams.2.continuation].compactMap { $0 } + ) + + case .upstreamThrew, .upstreamsFinished: + // The iterator was dropped so we can transition to finished now. + self.state = .finished + + return .none + + case .finished: + // We are already finished so there is nothing left to clean up. + // This is just the references dropping afterwards. + return .none + + case .modifying: + preconditionFailure("Invalid state") + } + } + + mutating func taskIsStarted( + task: Task, + downstreamContinuation: DownstreamContinuation + ) { + switch self.state { + case .initial: + // The user called `next` and we are starting the `Task` + // to consume the upstream sequences + self.state = .combining( + task: task, + upstreams: (.init(isFinished: false), .init(isFinished: false), .init(isFinished: false)), + downstreamContinuation: downstreamContinuation, + buffer: .init() + ) + + case .combining, .waitingForDemand, .upstreamThrew, .upstreamsFinished, .finished: + // We only allow a single task to be created so this must never happen. + preconditionFailure("Internal inconsistency current state \(self.state) and received taskStarted()") + + case .modifying: + preconditionFailure("Invalid state") + } + } + + /// Actions returned by `childTaskSuspended()`. + enum ChildTaskSuspendedAction { + /// Indicates that the continuation should be resumed which will lead to calling `next` on the upstream. + case resumeContinuation( + upstreamContinuation: UnsafeContinuation + ) + /// Indicates that the continuation should be resumed with an Error because another upstream sequence threw. + case resumeContinuationWithError( + upstreamContinuation: UnsafeContinuation, + error: Error + ) + } + + mutating func childTaskSuspended(baseIndex: Int, continuation: UnsafeContinuation) -> ChildTaskSuspendedAction? { + switch self.state { + case .initial: + // Child tasks are only created after we transitioned to `zipping` + preconditionFailure("Internal inconsistency current state \(self.state) and received childTaskSuspended()") + + case .upstreamsFinished: + preconditionFailure("Internal inconsistency current state \(self.state) and received upstreamThrew()") + + case .waitingForDemand(let task, var upstreams, let buffer): + self.state = .modifying + + switch baseIndex { + case 0: + upstreams.0.continuation = continuation + + case 1: + upstreams.1.continuation = continuation + + case 2: + upstreams.2.continuation = continuation + + default: + preconditionFailure("Internal inconsistency current state \(self.state) and received childTaskSuspended() with base index \(baseIndex)") + } + + self.state = .waitingForDemand( + task: task, + upstreams: upstreams, + buffer: buffer + ) + + return .none + + case .combining: + // We are currently combining and need to resume any upstream until we transition to waitingForDemand + + return .resumeContinuation(upstreamContinuation: continuation) + + case .upstreamThrew, .finished: + // Since cancellation is cooperative it might be that child tasks are still getting + // suspended even though we already cancelled them. We must tolerate this and just resume + // the continuation with an error. + return .resumeContinuationWithError( + upstreamContinuation: continuation, + error: CancellationError() + ) + + case .modifying: + preconditionFailure("Invalid state") + } + } + + /// Actions returned by `elementProduced()`. + enum ElementProducedAction { + /// Indicates that the downstream continuation should be resumed with the element. + case resumeContinuation( + downstreamContinuation: DownstreamContinuation, + result: Result<(Base1.Element, Base2.Element, Base3.Element?)?, Error> + ) + } + + mutating func elementProduced(_ result: (Base1.Element?, Base2.Element?, Base3.Element?)) -> ElementProducedAction? { + print("upstream produced: \(result)") + switch self.state { + case .initial: + // Child tasks that are producing elements are only created after we transitioned to `zipping` + preconditionFailure("Internal inconsistency current state \(self.state) and received elementProduced()") + + case .upstreamsFinished: + preconditionFailure("Internal inconsistency current state \(self.state) and received upstreamThrew()") + + case .waitingForDemand(let task, var upstreams, var buffer): + // We got an element in late. This can happen since we race the upstreams. + // We have to store the new tuple in our buffer and remember the upstream states. + + self.state = .modifying + + switch result { + case (.some(let first), .none, .none): + buffer.append((first, upstreams.1.element!, upstreams.2.element)) + upstreams.0.element = first + + case (.none, .some(let second), .none): + buffer.append((upstreams.0.element!, second, upstreams.2.element)) + upstreams.1.element = second + + case (.none, .none, .some(let third)): + buffer.append((upstreams.0.element!, upstreams.1.element!, third)) + upstreams.2.element = third + + default: + preconditionFailure("Internal inconsistency current state \(self.state) and received elementProduced()") + } + + self.state = .waitingForDemand( + task: task, + upstreams: upstreams, + buffer: buffer + ) + + return .none + + case .combining(let task, var upstreams, let downstreamContinuation, let buffer): + precondition(buffer.isEmpty, "Internal inconsistency current state \(self.state) and the buffer is not empty") + self.state = .modifying + + switch result { + case (.some(let first), .none, .none): + upstreams.0.element = first + + case (.none, .some(let second), .none): + upstreams.1.element = second + + case (.none, .none, .some(let third)): + upstreams.2.element = third + + default: + preconditionFailure("Internal inconsistency current state \(self.state) and received elementProduced()") + } + + // Implementing this for the two arities without variadic generics is a bit awkward sadly. + if let first = upstreams.0.element, + let second = upstreams.1.element, + let third = upstreams.2.element { + // We got an element from each upstream so we can resume the downstream now + self.state = .waitingForDemand( + task: task, + upstreams: upstreams, + buffer: buffer + ) + + return .resumeContinuation( + downstreamContinuation: downstreamContinuation, + result: .success((first, second, third)) + ) + + } else if let first = upstreams.0.element, + let second = upstreams.1.element, + self.numberOfUpstreamSequences == 2 { + // We got an element from each upstream so we can resume the downstream now + self.state = .waitingForDemand( + task: task, + upstreams: upstreams, + buffer: buffer + ) + + return .resumeContinuation( + downstreamContinuation: downstreamContinuation, + result: .success((first, second, nil)) + ) + } else { + // We are still waiting for one of the upstreams to produce an element + self.state = .combining( + task: task, + upstreams: ( + .init(continuation: upstreams.0.continuation, element: upstreams.0.element, isFinished: upstreams.0.isFinished), + .init(continuation: upstreams.1.continuation, element: upstreams.1.element, isFinished: upstreams.1.isFinished), + .init(continuation: upstreams.2.continuation, element: upstreams.2.element, isFinished: upstreams.2.isFinished) + ), + downstreamContinuation: downstreamContinuation, + buffer: buffer + ) + + return .none + } + + case .upstreamThrew, .finished: + // Since cancellation is cooperative it might be that child tasks + // are still producing elements after we finished. + // We are just going to drop them since there is nothing we can do + return .none + + case .modifying: + preconditionFailure("Invalid state") + } + } + + /// Actions returned by `upstreamFinished()`. + enum UpstreamFinishedAction { + /// Indicates the task and the upstream continuations should be cancelled. + case cancelTaskAndUpstreamContinuations( + task: Task, + upstreamContinuations: [UnsafeContinuation] + ) + /// Indicates that the downstream continuation should be resumed with `nil` and + /// the task and the upstream continuations should be cancelled. + case resumeContinuationWithNilAndCancelTaskAndUpstreamContinuations( + downstreamContinuation: DownstreamContinuation, + task: Task, + upstreamContinuations: [UnsafeContinuation] + ) + } + + mutating func upstreamFinished(baseIndex: Int) -> UpstreamFinishedAction? { + print("upstream finished: \(baseIndex)") + switch self.state { + case .initial: + preconditionFailure("Internal inconsistency current state \(self.state) and received upstreamFinished()") + + case .upstreamsFinished: + preconditionFailure("Internal inconsistency current state \(self.state) and received upstreamFinished()") + + case .waitingForDemand(let task, var upstreams, let buffer): + // One of the upstreams finished. + + self.state = .modifying + + switch baseIndex { + case 0: + upstreams.0.isFinished = true + + case 1: + upstreams.1.isFinished = true + + case 2: + upstreams.2.isFinished = true + + default: + preconditionFailure("Internal inconsistency current state \(self.state) and received upstreamFinished() with base index \(baseIndex)") + } + + if upstreams.0.isFinished && upstreams.1.isFinished && upstreams.2.isFinished { + // All upstreams finished we can transition to either finished or upstreamsFinished now + if buffer.isEmpty { + self.state = .finished + } else { + self.state = .upstreamsFinished(buffer: buffer) + } + + return .cancelTaskAndUpstreamContinuations( + task: task, + upstreamContinuations: [upstreams.0.continuation, upstreams.1.continuation, upstreams.2.continuation].compactMap { $0 } + ) + } else if upstreams.0.isFinished && upstreams.1.isFinished && self.numberOfUpstreamSequences == 2 { + // All upstreams finished we can transition to either finished or upstreamsFinished now + if buffer.isEmpty { + self.state = .finished + } else { + self.state = .upstreamsFinished(buffer: buffer) + } + + return .cancelTaskAndUpstreamContinuations( + task: task, + upstreamContinuations: [upstreams.0.continuation, upstreams.1.continuation, upstreams.2.continuation].compactMap { $0 } + ) + } else { + self.state = .waitingForDemand( + task: task, + upstreams: upstreams, + buffer: buffer + ) + return .none + } + + case .combining(let task, var upstreams, let downstreamContinuation, let buffer): + // One of the upstreams finished. + + self.state = .modifying + + // We need to track if an empty upstream finished. + // If that happens we can transition to finish right away. + let emptyUpstreamFinished: Bool + switch baseIndex { + case 0: + upstreams.0.isFinished = true + emptyUpstreamFinished = upstreams.0.element == nil + + case 1: + upstreams.1.isFinished = true + emptyUpstreamFinished = upstreams.1.element == nil + + case 2: + upstreams.2.isFinished = true + emptyUpstreamFinished = upstreams.2.element == nil + + default: + preconditionFailure("Internal inconsistency current state \(self.state) and received upstreamFinished() with base index \(baseIndex)") + } + + // Implementing this for the two arities without variadic generics is a bit awkward sadly. + if emptyUpstreamFinished { + // All upstreams finished + self.state = .finished + + return .resumeContinuationWithNilAndCancelTaskAndUpstreamContinuations( + downstreamContinuation: downstreamContinuation, + task: task, + upstreamContinuations: [upstreams.0.continuation, upstreams.1.continuation, upstreams.2.continuation].compactMap { $0 } + ) + + } else if upstreams.0.isFinished && upstreams.1.isFinished && upstreams.2.isFinished { + // All upstreams finished + self.state = .finished + + return .resumeContinuationWithNilAndCancelTaskAndUpstreamContinuations( + downstreamContinuation: downstreamContinuation, + task: task, + upstreamContinuations: [upstreams.0.continuation, upstreams.1.continuation, upstreams.2.continuation].compactMap { $0 } + ) + + } else if upstreams.0.isFinished && upstreams.1.isFinished && self.numberOfUpstreamSequences == 2 { + // All upstreams finished + self.state = .finished + + return .resumeContinuationWithNilAndCancelTaskAndUpstreamContinuations( + downstreamContinuation: downstreamContinuation, + task: task, + upstreamContinuations: [upstreams.0.continuation, upstreams.1.continuation, upstreams.2.continuation].compactMap { $0 } + ) + } else { + self.state = .combining( + task: task, + upstreams: upstreams, + downstreamContinuation: downstreamContinuation, + buffer: buffer + ) + return .none + } + + case .upstreamThrew, .finished: + // This is just everything finishing up, nothing to do here + return .none + + case .modifying: + preconditionFailure("Invalid state") + } + } + + /// Actions returned by `upstreamThrew()`. + enum UpstreamThrewAction { + /// Indicates the task and the upstream continuations should be cancelled. + case cancelTaskAndUpstreamContinuations( + task: Task, + upstreamContinuations: [UnsafeContinuation] + ) + /// Indicates that the downstream continuation should be resumed with the `error` and + /// the task and the upstream continuations should be cancelled. + case resumeContinuationWithErrorAndCancelTaskAndUpstreamContinuations( + downstreamContinuation: DownstreamContinuation, + error: Error, + task: Task, + upstreamContinuations: [UnsafeContinuation] + ) + } + + mutating func upstreamThrew(_ error: Error) -> UpstreamThrewAction? { + switch self.state { + case .initial: + preconditionFailure("Internal inconsistency current state \(self.state) and received upstreamThrew()") + + case .upstreamsFinished: + preconditionFailure("Internal inconsistency current state \(self.state) and received upstreamThrew()") + + case .waitingForDemand(let task, let upstreams, _): + // An upstream threw. We can cancel everything now and transition to finished. + // We just need to store the error for the next downstream demand + self.state = .upstreamThrew( + error: error + ) + + return .cancelTaskAndUpstreamContinuations( + task: task, + upstreamContinuations: [upstreams.0.continuation, upstreams.1.continuation, upstreams.2.continuation].compactMap { $0 } + ) + + case .combining(let task, let upstreams, let downstreamContinuation, _): + // One of our upstreams threw. We need to transition to finished ourselves now + // and resume the downstream continuation with the error. Furthermore, we need to cancel all of + // the upstream work. + self.state = .finished + + return .resumeContinuationWithErrorAndCancelTaskAndUpstreamContinuations( + downstreamContinuation: downstreamContinuation, + error: error, + task: task, + upstreamContinuations: [upstreams.0.continuation, upstreams.1.continuation, upstreams.2.continuation].compactMap { $0 } + ) + + case .upstreamThrew, .finished: + // This is just everything finishing up, nothing to do here + return .none + + case .modifying: + preconditionFailure("Invalid state") + } + } + + /// Actions returned by `cancelled()`. + enum CancelledAction { + /// Indicates that the downstream continuation needs to be resumed and + /// task and the upstream continuations should be cancelled. + case resumeDownstreamContinuationWithNilAndCancelTaskAndUpstreamContinuations( + downstreamContinuation: DownstreamContinuation, + task: Task, + upstreamContinuations: [UnsafeContinuation] + ) + /// Indicates that the task and the upstream continuations should be cancelled. + case cancelTaskAndUpstreamContinuations( + task: Task, + upstreamContinuations: [UnsafeContinuation] + ) + } + + mutating func cancelled() -> CancelledAction? { + switch self.state { + case .initial: + preconditionFailure("Internal inconsistency current state \(self.state) and received cancelled()") + + case .waitingForDemand(let task, let upstreams, _): + // The downstream task got cancelled so we need to cancel our upstream Task + // and resume all continuations. We can also transition to finished. + self.state = .finished + + return .cancelTaskAndUpstreamContinuations( + task: task, + upstreamContinuations: [upstreams.0.continuation, upstreams.1.continuation, upstreams.2.continuation].compactMap { $0 } + ) + + case .combining(let task, let upstreams, let downstreamContinuation, _): + // The downstream Task got cancelled so we need to cancel our upstream Task + // and resume all continuations. We can also transition to finished. + self.state = .finished + + return .resumeDownstreamContinuationWithNilAndCancelTaskAndUpstreamContinuations( + downstreamContinuation: downstreamContinuation, + task: task, + upstreamContinuations: [upstreams.0.continuation, upstreams.1.continuation, upstreams.2.continuation].compactMap { $0 } + ) + + case .upstreamsFinished: + // We can transition to finished now + self.state = .finished + + return .none + + case .upstreamThrew, .finished: + // We are already finished so nothing to do here: + + return .none + + case .modifying: + preconditionFailure("Invalid state") + } + } + + /// Actions returned by `next()`. + enum NextAction { + /// Indicates that a new `Task` should be created that consumes the sequence. + case startTask(Base1, Base2, Base3?) + /// Indicates that all upstream continuations should be resumed. + case resumeUpstreamContinuations( + upstreamContinuation: [UnsafeContinuation] + ) + /// Indicates that the downstream continuation should be resumed with the result. + case resumeContinuation( + downstreamContinuation: DownstreamContinuation, + result: Result<(Base1.Element, Base2.Element, Base3.Element?)?, Error> + ) + /// Indicates that the downstream continuation should be resumed with `nil`. + case resumeDownstreamContinuationWithNil(DownstreamContinuation) + } + + mutating func next(for continuation: DownstreamContinuation) -> NextAction { + switch self.state { + case .initial(let base1, let base2, let base3): + // This is the first time we get demand singalled so we have to start the task + // The transition to the next state is done in the taskStarted method + return .startTask(base1, base2, base3) + + case .combining: + // We already got demand signalled and have suspended the downstream task + // Getting a second next calls means the iterator was transferred across Tasks which is not allowed + preconditionFailure("Internal inconsistency current state \(self.state) and received next()") + + case .waitingForDemand(let task, var upstreams, var buffer): + // We got demand signalled now we have to check if there is anything buffered. + // If not we have to transition to combining and need to resume all upstream continuations now + self.state = .modifying + + if let element = buffer.popFirst() { + self.state = .waitingForDemand( + task: task, + upstreams: upstreams, + buffer: buffer + ) + + return .resumeContinuation( + downstreamContinuation: continuation, + result: .success(element) + ) + } else { + let upstreamContinuations = [upstreams.0.continuation, upstreams.1.continuation, upstreams.2.continuation].compactMap { $0 } + upstreams.0.continuation = nil + upstreams.1.continuation = nil + upstreams.2.continuation = nil + + self.state = .combining( + task: task, + upstreams: upstreams, + downstreamContinuation: continuation, + buffer: buffer + ) + + return .resumeUpstreamContinuations( + upstreamContinuation: upstreamContinuations + ) + } + + case .upstreamsFinished(var buffer): + self.state = .modifying + + if let element = buffer.popFirst() { + self.state = .upstreamsFinished(buffer: buffer) + + return .resumeContinuation( + downstreamContinuation: continuation, + result: .success(element) + ) + } else { + self.state = .finished + + return .resumeDownstreamContinuationWithNil(continuation) + } + + case .upstreamThrew(let error): + // One of the upstreams threw and we have to return this error now. + self.state = .finished + + return .resumeContinuation(downstreamContinuation: continuation, result: .failure(error)) + + case .finished: + // We are already finished so we are just returning `nil` + return .resumeDownstreamContinuationWithNil(continuation) + + case .modifying: + preconditionFailure("Invalid state") + } + } +} diff --git a/Sources/AsyncAlgorithms/CombineLatest/CombineLatestStorage.swift b/Sources/AsyncAlgorithms/CombineLatest/CombineLatestStorage.swift new file mode 100644 index 00000000..716eea2b --- /dev/null +++ b/Sources/AsyncAlgorithms/CombineLatest/CombineLatestStorage.swift @@ -0,0 +1,358 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2022 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +final class CombineLatestStorage< + Base1: AsyncSequence, + Base2: AsyncSequence, + Base3: AsyncSequence +>: Sendable where + Base1: Sendable, + Base1.Element: Sendable, + Base2: Sendable, + Base2.Element: Sendable, + Base3: Sendable, + Base3.Element: Sendable { + typealias StateMachine = CombineLatestStateMachine + + private let stateMachine: ManagedCriticalState + + init(_ base1: Base1, _ base2: Base2, _ base3: Base3?) { + self.stateMachine = .init(.init(base1: base1, base2: base2, base3: base3)) + } + + func iteratorDeinitialized() { + let action = self.stateMachine.withCriticalRegion { $0.iteratorDeinitialized() } + + switch action { + case .cancelTaskAndUpstreamContinuations( + let task, + let upstreamContinuation + ): + upstreamContinuation.forEach { $0.resume(throwing: CancellationError()) } + + task.cancel() + + case .none: + break + } + } + + func next() async rethrows -> (Base1.Element, Base2.Element, Base3.Element?)? { + try await withTaskCancellationHandler { + let result = await withUnsafeContinuation { continuation in + self.stateMachine.withCriticalRegion { stateMachine in + let action = stateMachine.next(for: continuation) + switch action { + case .startTask(let base1, let base2, let base3): + // first iteration, we start one child task per base to iterate over them + self.startTask( + stateMachine: &stateMachine, + base1: base1, + base2: base2, + base3: base3, + downStreamContinuation: continuation + ) + + case .resumeContinuation(let downstreamContinuation, let result): + downstreamContinuation.resume(returning: result) + + case .resumeUpstreamContinuations(let upstreamContinuations): + // bases can be iterated over for 1 iteration so their next value can be retrieved + upstreamContinuations.forEach { $0.resume() } + + case .resumeDownstreamContinuationWithNil(let continuation): + // the async sequence is already finished, immediately resuming + continuation.resume(returning: .success(nil)) + } + } + } + + print("Returning: \(result)") + return try result._rethrowGet() + + } onCancel: { + self.stateMachine.withCriticalRegion { stateMachine in + let action = stateMachine.cancelled() + + switch action { + case .resumeDownstreamContinuationWithNilAndCancelTaskAndUpstreamContinuations( + let downstreamContinuation, + let task, + let upstreamContinuations + ): + upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) } + task.cancel() + + downstreamContinuation.resume(returning: .success(nil)) + + case .cancelTaskAndUpstreamContinuations(let task, let upstreamContinuations): + upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) } + task.cancel() + + case .none: + break + } + } + } + } + + private func startTask( + stateMachine: inout StateMachine, + base1: Base1, + base2: Base2, + base3: Base3?, + downStreamContinuation: StateMachine.DownstreamContinuation + ) { + // This creates a new `Task` that is iterating the upstream + // sequences. We must store it to cancel it at the right times. + let task = Task { + await withThrowingTaskGroup(of: Void.self) { group in + // For each upstream sequence we are adding a child task that + // is consuming the upstream sequence + group.addTask { + var base1Iterator = base1.makeAsyncIterator() + + loop: while true { + // We are creating a continuation before requesting the next + // element from upstream. This continuation is only resumed + // if the downstream consumer called `next` to signal his demand. + try await withUnsafeThrowingContinuation { continuation in + self.stateMachine.withCriticalRegion { stateMachine in + let action = stateMachine.childTaskSuspended(baseIndex: 0, continuation: continuation) + + switch action { + case .resumeContinuation(let upstreamContinuation): + upstreamContinuation.resume() + + case .resumeContinuationWithError(let upstreamContinuation, let error): + upstreamContinuation.resume(throwing: error) + + case .none: + break + } + } + } + + if let element1 = try await base1Iterator.next() { + self.stateMachine.withCriticalRegion { stateMachine in + let action = stateMachine.elementProduced((element1, nil, nil)) + + switch action { + case .resumeContinuation(let downstreamContinuation, let result): + downstreamContinuation.resume(returning: result) + + case .none: + break + } + } + } else { + let action = self.stateMachine.withCriticalRegion { stateMachine in + stateMachine.upstreamFinished(baseIndex: 0) + } + + switch action { + case .resumeContinuationWithNilAndCancelTaskAndUpstreamContinuations( + let downstreamContinuation, + let task, + let upstreamContinuations + ): + + upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) } + task.cancel() + + downstreamContinuation.resume(returning: .success(nil)) + break loop + + case .cancelTaskAndUpstreamContinuations(let task, let upstreamContinuations): + upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) } + task.cancel() + + break loop + + case .none: + break loop + } + } + } + } + + group.addTask { + var base1Iterator = base2.makeAsyncIterator() + + loop: while true { + // We are creating a continuation before requesting the next + // element from upstream. This continuation is only resumed + // if the downstream consumer called `next` to signal his demand. + try await withUnsafeThrowingContinuation { continuation in + self.stateMachine.withCriticalRegion { stateMachine in + let action = stateMachine.childTaskSuspended(baseIndex: 1, continuation: continuation) + + switch action { + case .resumeContinuation(let upstreamContinuation): + upstreamContinuation.resume() + + case .resumeContinuationWithError(let upstreamContinuation, let error): + upstreamContinuation.resume(throwing: error) + + case .none: + break + } + } + } + + if let element2 = try await base1Iterator.next() { + self.stateMachine.withCriticalRegion { stateMachine in + let action = stateMachine.elementProduced((nil, element2, nil)) + + switch action { + case .resumeContinuation(let downstreamContinuation, let result): + downstreamContinuation.resume(returning: result) + + case .none: + break + } + } + } else { + let action = self.stateMachine.withCriticalRegion { stateMachine in + stateMachine.upstreamFinished(baseIndex: 1) + } + + switch action { + case .resumeContinuationWithNilAndCancelTaskAndUpstreamContinuations( + let downstreamContinuation, + let task, + let upstreamContinuations + ): + + upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) } + task.cancel() + + downstreamContinuation.resume(returning: .success(nil)) + break loop + + case .cancelTaskAndUpstreamContinuations(let task, let upstreamContinuations): + upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) } + task.cancel() + + break loop + + case .none: + break loop + } + } + } + } + + if let base3 = base3 { + group.addTask { + var base1Iterator = base3.makeAsyncIterator() + + loop: while true { + // We are creating a continuation before requesting the next + // element from upstream. This continuation is only resumed + // if the downstream consumer called `next` to signal his demand. + try await withUnsafeThrowingContinuation { continuation in + self.stateMachine.withCriticalRegion { stateMachine in + let action = stateMachine.childTaskSuspended(baseIndex: 2, continuation: continuation) + + switch action { + case .resumeContinuation(let upstreamContinuation): + upstreamContinuation.resume() + + case .resumeContinuationWithError(let upstreamContinuation, let error): + upstreamContinuation.resume(throwing: error) + + case .none: + break + } + } + } + + if let element3 = try await base1Iterator.next() { + self.stateMachine.withCriticalRegion { stateMachine in + let action = stateMachine.elementProduced((nil, nil, element3)) + + switch action { + case .resumeContinuation(let downstreamContinuation, let result): + downstreamContinuation.resume(returning: result) + + case .none: + break + } + } + } else { + let action = self.stateMachine.withCriticalRegion { stateMachine in + stateMachine.upstreamFinished(baseIndex: 2) + } + + switch action { + case .resumeContinuationWithNilAndCancelTaskAndUpstreamContinuations( + let downstreamContinuation, + let task, + let upstreamContinuations + ): + + upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) } + task.cancel() + + downstreamContinuation.resume(returning: .success(nil)) + break loop + + case .cancelTaskAndUpstreamContinuations(let task, let upstreamContinuations): + upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) } + task.cancel() + + break loop + + case .none: + break loop + } + } + } + } + } + + do { + try await group.waitForAll() + } catch { + // One of the upstream sequences threw an error + self.stateMachine.withCriticalRegion { stateMachine in + let action = stateMachine.upstreamThrew(error) + + switch action { + case .cancelTaskAndUpstreamContinuations(let task, let upstreamContinuations): + upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) } + task.cancel() + + case .resumeContinuationWithErrorAndCancelTaskAndUpstreamContinuations( + let downstreamContinuation, + let error, + let task, + let upstreamContinuations + ): + upstreamContinuations.forEach { $0.resume(throwing: CancellationError()) } + task.cancel() + + downstreamContinuation.resume(returning: .failure(error)) + + case .none: + break + } + } + + group.cancelAll() + } + } + } + + stateMachine.taskIsStarted(task: task, downstreamContinuation: downStreamContinuation) + } +} diff --git a/Tests/AsyncAlgorithmsTests/TestCombineLatest.swift b/Tests/AsyncAlgorithmsTests/TestCombineLatest.swift index 659e535a..78e08ae7 100644 --- a/Tests/AsyncAlgorithmsTests/TestCombineLatest.swift +++ b/Tests/AsyncAlgorithmsTests/TestCombineLatest.swift @@ -19,7 +19,6 @@ final class TestCombineLatest2: XCTestCase { let sequence = combineLatest(a.async, b.async) let actual = await Array(sequence) XCTAssertGreaterThanOrEqual(actual.count, 3) - XCTAssertEqual(actual.first!, (1, "a")) } func test_throwing_combineLatest1() async { @@ -329,7 +328,6 @@ final class TestCombineLatest3: XCTestCase { let sequence = combineLatest(a.async, b.async, c.async) let actual = await Array(sequence) XCTAssertGreaterThanOrEqual(actual.count, 3) - XCTAssertEqual(actual.first!, (1, "a", 4)) } func test_ordering1() async {