Skip to content

Commit 0aaa813

Browse files
committed
asyncChannel: add .finished as an emission state
1 parent d578c06 commit 0aaa813

File tree

1 file changed

+18
-15
lines changed

1 file changed

+18
-15
lines changed

Sources/AsyncAlgorithms/AsyncChannel.swift

+18-15
Original file line numberDiff line numberDiff line change
@@ -88,12 +88,12 @@ public final class AsyncChannel<Element: Sendable>: AsyncSequence, Sendable {
8888
case idle
8989
case pending(Set<Pending>)
9090
case awaiting(Set<Awaiting>)
91+
case finished
9192
}
9293

9394
struct State {
9495
var emission: Emission = .idle
9596
var generation = 0
96-
var terminal = false
9797
}
9898

9999
let state = ManagedCriticalState(State())
@@ -145,10 +145,6 @@ public final class AsyncChannel<Element: Sendable>: AsyncSequence, Sendable {
145145
return nil
146146
}
147147

148-
if state.terminal {
149-
terminal = true
150-
return nil
151-
}
152148
switch state.emission {
153149
case .idle:
154150
state.emission = .awaiting([Awaiting(generation: generation, continuation: continuation)])
@@ -165,6 +161,9 @@ public final class AsyncChannel<Element: Sendable>: AsyncSequence, Sendable {
165161
nexts.update(with: Awaiting(generation: generation, continuation: continuation))
166162
state.emission = .awaiting(nexts)
167163
return nil
164+
case .finished:
165+
terminal = true
166+
return nil
168167
}
169168
}?.resume()
170169

@@ -205,7 +204,7 @@ public final class AsyncChannel<Element: Sendable>: AsyncSequence, Sendable {
205204
let continuation = await withUnsafeContinuation { continuation in
206205
state.withCriticalRegion { state -> UnsafeResumption<UnsafeContinuation<Element?, Never>?, Never>? in
207206

208-
if sendTokenStatus.withCriticalRegion({ $0 }) == .cancelled || state.terminal {
207+
if sendTokenStatus.withCriticalRegion({ $0 }) == .cancelled {
209208
return UnsafeResumption(continuation: continuation, success: nil)
210209
}
211210

@@ -225,6 +224,8 @@ public final class AsyncChannel<Element: Sendable>: AsyncSequence, Sendable {
225224
state.emission = .awaiting(nexts)
226225
}
227226
return UnsafeResumption(continuation: continuation, success: next)
227+
case .finished:
228+
return UnsafeResumption(continuation: continuation, success: nil)
228229
}
229230
}?.resume()
230231
}
@@ -249,20 +250,22 @@ public final class AsyncChannel<Element: Sendable>: AsyncSequence, Sendable {
249250
/// All subsequent calls to `next(_:)` will resume immediately.
250251
public func finish() {
251252
let (sends, nexts) = state.withCriticalRegion { state -> (Set<Pending>, Set<Awaiting>) in
252-
if state.terminal {
253-
return ([], [])
254-
}
255-
state.terminal = true
253+
let result: (Set<Pending>, Set<Awaiting>)
254+
256255
switch state.emission {
257256
case .idle:
258-
return ([], [])
257+
result = ([], [])
259258
case .pending(let nexts):
260-
state.emission = .idle
261-
return (nexts, [])
259+
result = (nexts, [])
262260
case .awaiting(let nexts):
263-
state.emission = .idle
264-
return ([], nexts)
261+
result = ([], nexts)
262+
case .finished:
263+
result = ([], [])
265264
}
265+
266+
state.emission = .finished
267+
268+
return result
266269
}
267270
for send in sends {
268271
send.continuation?.resume(returning: nil)

0 commit comments

Comments
 (0)