Skip to content

Commit 3a6adcd

Browse files
committed
merge: optimize task creation and remove sendable constraint
1 parent 68c8dc2 commit 3a6adcd

11 files changed

+670
-461
lines changed

Package.swift

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,14 @@ let package = Package(
1616
.library(name: "_CAsyncSequenceValidationSupport", type: .static, targets: ["AsyncSequenceValidation"]),
1717
.library(name: "AsyncAlgorithms_XCTest", targets: ["AsyncAlgorithms_XCTest"]),
1818
],
19-
dependencies: [],
19+
dependencies: [
20+
.package(url: "https://github.com/apple/swift-collections.git", .upToNextMajor(from: "1.0.3"))
21+
],
2022
targets: [
21-
.target(name: "AsyncAlgorithms"),
23+
.target(
24+
name: "AsyncAlgorithms",
25+
dependencies: [.product(name: "Collections", package: "swift-collections")]
26+
),
2227
.target(
2328
name: "AsyncSequenceValidation",
2429
dependencies: ["_CAsyncSequenceValidationSupport", "AsyncAlgorithms"]),

Sources/AsyncAlgorithms/AsyncChannel.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ public final class AsyncChannel<Element: Sendable>: AsyncSequence, Sendable {
2828
init(_ channel: AsyncChannel<Element>) {
2929
self.channel = channel
3030
}
31-
31+
3232
/// Await the next sent element or finish.
3333
public mutating func next() async -> Element? {
3434
guard active else {
@@ -116,7 +116,7 @@ public final class AsyncChannel<Element: Sendable>: AsyncSequence, Sendable {
116116

117117
/// Create a new `AsyncChannel` given an element type.
118118
public init(element elementType: Element.Type = Element.self) { }
119-
119+
120120
func establish() -> Int {
121121
state.withCriticalRegion { state in
122122
defer { state.generation &+= 1 }
@@ -152,7 +152,7 @@ public final class AsyncChannel<Element: Sendable>: AsyncSequence, Sendable {
152152
}
153153
return UnsafeResumption(continuation: send, success: continuation)
154154
case .awaiting(var nexts):
155-
if nexts.update(with: Awaiting(generation: generation, continuation: continuation)) != nil {
155+
if nexts.update(with: Awaiting(generation: generation, continuation: continuation)) != nil {
156156
nexts.remove(Awaiting(placeholder: generation))
157157
cancelled = true
158158
}

Sources/AsyncAlgorithms/AsyncChunksOfCountOrSignalSequence.swift

Lines changed: 72 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -55,39 +55,89 @@ extension AsyncSequence {
5555
}
5656
}
5757

58+
struct AsyncEitherSequence<Base: AsyncSequence>: AsyncSequence {
59+
typealias Element = Either
60+
typealias AsyncIterator = Iterator
61+
62+
enum Either {
63+
case first(Base.Element)
64+
case second(Base.Element)
65+
}
66+
67+
let base: Base
68+
let factory: (Base.Element) -> Either
69+
70+
func makeAsyncIterator() -> AsyncIterator {
71+
Iterator(
72+
base: self.base.makeAsyncIterator(),
73+
factory: self.factory
74+
)
75+
}
76+
77+
struct Iterator: AsyncIteratorProtocol {
78+
79+
var base: Base.AsyncIterator
80+
let factory: (Base.Element) -> Either
81+
82+
mutating func next() async rethrows -> Element? {
83+
guard let element = try await self.base.next() else {
84+
return nil
85+
}
86+
return self.factory(element)
87+
}
88+
}
89+
90+
}
91+
5892
/// An `AsyncSequence` that chunks elements into collected `RangeReplaceableCollection` instances by either count or a signal from another `AsyncSequence`.
5993
public struct AsyncChunksOfCountOrSignalSequence<Base: AsyncSequence, Collected: RangeReplaceableCollection, Signal: AsyncSequence>: AsyncSequence, Sendable where Collected.Element == Base.Element, Base: Sendable, Signal: Sendable, Base.AsyncIterator: Sendable, Signal.AsyncIterator: Sendable, Base.Element: Sendable, Signal.Element: Sendable {
6094

6195
public typealias Element = Collected
6296

97+
enum Either {
98+
case first(Base.Element)
99+
case second(Signal.Element)
100+
}
101+
63102
/// The iterator for a `AsyncChunksOfCountOrSignalSequence` instance.
64103
public struct Iterator: AsyncIteratorProtocol, Sendable {
65104
let count: Int?
66-
var state: Merge2StateMachine<Base, Signal>
67-
init(base: Base.AsyncIterator, count: Int?, signal: Signal.AsyncIterator) {
105+
let state: MergeStateMachine<Either>
106+
init(base: Base, count: Int?, signal: Signal) {
68107
self.count = count
69-
self.state = Merge2StateMachine(base, terminatesOnNil: true, signal)
108+
let eitherBase = base.map { Either.first($0) }
109+
let eitherSignal = signal.map { Either.second($0) }
110+
self.state = MergeStateMachine(eitherBase, terminatesOnNil: true, eitherSignal)
70111
}
71-
112+
72113
public mutating func next() async rethrows -> Collected? {
73-
var result : Collected?
74-
while let next = try await state.next() {
75-
switch next {
76-
case .first(let element):
77-
if result == nil {
78-
result = Collected()
79-
}
80-
result!.append(element)
81-
if result?.count == count {
82-
return result
83-
}
84-
case .second(_):
85-
if result != nil {
86-
return result
87-
}
88-
}
114+
var collected: Collected?
115+
116+
loop: while true {
117+
let next = await state.next()
118+
119+
switch next {
120+
case .termination:
121+
break loop
122+
case .element(let result):
123+
let element = try result._rethrowGet()
124+
switch element {
125+
case .first(let element):
126+
if collected == nil {
127+
collected = Collected()
128+
}
129+
collected!.append(element)
130+
if collected?.count == count {
131+
return collected
132+
}
133+
case .second(_):
134+
if collected != nil {
135+
return collected
136+
}
137+
}
89138
}
90-
return result
139+
}
140+
return collected
91141
}
92142
}
93143

@@ -105,6 +155,6 @@ public struct AsyncChunksOfCountOrSignalSequence<Base: AsyncSequence, Collected:
105155
}
106156

107157
public func makeAsyncIterator() -> Iterator {
108-
return Iterator(base: base.makeAsyncIterator(), count: count, signal: signal.makeAsyncIterator())
158+
return Iterator(base: base, count: count, signal: signal)
109159
}
110160
}

0 commit comments

Comments
 (0)