Skip to content

Commit 747c766

Browse files
committed
Setup task after first next
1 parent 8da789b commit 747c766

File tree

5 files changed

+336
-391
lines changed

5 files changed

+336
-391
lines changed

Sources/AsyncAlgorithms/Merge/AsyncMerge2Sequence.swift

-1
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@ extension AsyncMerge2Sequence {
7070

7171
fileprivate init(storage: MergeStorage<Base1, Base2, Base1>) {
7272
self.storage = storage
73-
self.storage.iteratorInitialized()
7473
}
7574

7675
deinit {

Sources/AsyncAlgorithms/Merge/AsyncMerge3Sequence.swift

-1
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,6 @@ public extension AsyncMerge3Sequence {
8181

8282
fileprivate init(storage: MergeStorage<Base1, Base2, Base3>) {
8383
self.storage = storage
84-
self.storage.iteratorInitialized()
8584
}
8685

8786
deinit {

Sources/AsyncAlgorithms/Merge/MergeStateMachine.swift

+8-25
Original file line numberDiff line numberDiff line change
@@ -82,27 +82,6 @@ struct MergeStateMachine<
8282
}
8383
}
8484

85-
/// Actions returned by `iteratorInitialized()`.
86-
enum IteratorInitializedAction {
87-
/// Indicates that a new `Task` should be created that consumed the sequences.
88-
case startTask(Base1, Base2, Base3?)
89-
}
90-
91-
mutating func iteratorInitialized() -> IteratorInitializedAction {
92-
switch state {
93-
case let .initial(base1, base2, base3):
94-
// This is the first iterator being created and we need to create our `Task`
95-
// that is consuming the upstream sequences.
96-
return .startTask(base1, base2, base3)
97-
98-
case .merging, .upstreamFailure, .finished:
99-
fatalError("merge allows only a single AsyncIterator to be created")
100-
101-
case .modifying:
102-
preconditionFailure("Invalid state")
103-
}
104-
}
105-
10685
/// Actions returned by `iteratorDeinitialized()`.
10786
enum IteratorDeinitializedAction {
10887
/// Indicates that the `Task` needs to be cancelled and
@@ -118,8 +97,8 @@ struct MergeStateMachine<
11897
mutating func iteratorDeinitialized() -> IteratorDeinitializedAction {
11998
switch state {
12099
case .initial:
121-
// An iterator needs to be initialized before it can be deinitialized.
122-
preconditionFailure("Internal inconsistency current state \(self.state) and received iteratorDeinitialized()")
100+
// Nothing to do here. No demand was signalled until now
101+
return .none
123102

124103
case .merging(_, _, _, _, .some):
125104
// An iterator was deinitialized while we have a suspended continuation.
@@ -531,6 +510,8 @@ struct MergeStateMachine<
531510

532511
/// Actions returned by `next()`.
533512
enum NextAction {
513+
/// Indicates that a new `Task` should be created that consumes the sequence and the downstream must be supsended
514+
case startTaskAndSuspendDownstreamTask(Base1, Base2, Base3?)
534515
/// Indicates that the `element` should be returned.
535516
case returnElement(Result<Element, Error>)
536517
/// Indicates that `nil` should be returned.
@@ -543,8 +524,10 @@ struct MergeStateMachine<
543524

544525
mutating func next() -> NextAction {
545526
switch state {
546-
case .initial:
547-
preconditionFailure("Internal inconsistency current state \(self.state) and received next()")
527+
case .initial(let base1, let base2, let base3):
528+
// This is the first time we got demand signalled. We need to start the task now
529+
// We are transitioning to merging in the taskStarted method.
530+
return .startTaskAndSuspendDownstreamTask(base1, base2, base3)
548531

549532
case .merging(_, _, _, _, .some):
550533
// We have multiple AsyncIterators iterating the sequence

0 commit comments

Comments
 (0)