Skip to content

Commit e2920bb

Browse files
committed
asyncChannel: introduce ChannelToken to model Pending and Awaiting
1 parent 57072e6 commit e2920bb

File tree

2 files changed

+26
-21
lines changed

2 files changed

+26
-21
lines changed

Sources/AsyncAlgorithms/AsyncChannel.swift

+21-16
Original file line numberDiff line numberDiff line change
@@ -50,41 +50,44 @@ public final class AsyncChannel<Element: Sendable>: AsyncSequence, Sendable {
5050
}
5151
}
5252

53-
struct Awaiting: Hashable {
53+
typealias Pending = ChannelToken<UnsafeContinuation<UnsafeContinuation<Element?, Never>?, Never>>
54+
typealias Awaiting = ChannelToken<UnsafeContinuation<Element?, Never>>
55+
56+
struct ChannelToken<Continuation>: Hashable {
5457
var generation: Int
55-
var continuation: UnsafeContinuation<Element?, Never>?
58+
var continuation: Continuation?
5659
let cancelled: Bool
57-
58-
init(generation: Int, continuation: UnsafeContinuation<Element?, Never>) {
60+
61+
init(generation: Int, continuation: Continuation) {
5962
self.generation = generation
6063
self.continuation = continuation
6164
cancelled = false
6265
}
63-
66+
6467
init(placeholder generation: Int) {
6568
self.generation = generation
6669
self.continuation = nil
6770
cancelled = false
6871
}
69-
72+
7073
init(cancelled generation: Int) {
7174
self.generation = generation
7275
self.continuation = nil
7376
cancelled = true
7477
}
75-
78+
7679
func hash(into hasher: inout Hasher) {
7780
hasher.combine(generation)
7881
}
79-
80-
static func == (_ lhs: Awaiting, _ rhs: Awaiting) -> Bool {
82+
83+
static func == (_ lhs: ChannelToken, _ rhs: ChannelToken) -> Bool {
8184
return lhs.generation == rhs.generation
8285
}
8386
}
8487

8588
enum Emission {
8689
case idle
87-
case pending([UnsafeContinuation<UnsafeContinuation<Element?, Never>?, Never>])
90+
case pending([Pending])
8891
case awaiting(Set<Awaiting>)
8992

9093
mutating func cancel(_ generation: Int) -> UnsafeContinuation<Element?, Never>? {
@@ -131,7 +134,7 @@ public final class AsyncChannel<Element: Sendable>: AsyncSequence, Sendable {
131134
}
132135

133136
func next(_ generation: Int) async -> Element? {
134-
return await withUnsafeContinuation { continuation in
137+
return await withUnsafeContinuation { (continuation: UnsafeContinuation<Element?, Never>) in
135138
var cancelled = false
136139
var terminal = false
137140
state.withCriticalRegion { state -> UnsafeResumption<UnsafeContinuation<Element?, Never>?, Never>? in
@@ -150,7 +153,7 @@ public final class AsyncChannel<Element: Sendable>: AsyncSequence, Sendable {
150153
} else {
151154
state.emission = .pending(sends)
152155
}
153-
return UnsafeResumption(continuation: send, success: continuation)
156+
return UnsafeResumption(continuation: send.continuation, success: continuation)
154157
case .awaiting(var nexts):
155158
if nexts.update(with: Awaiting(generation: generation, continuation: continuation)) != nil {
156159
nexts.remove(Awaiting(placeholder: generation))
@@ -171,7 +174,7 @@ public final class AsyncChannel<Element: Sendable>: AsyncSequence, Sendable {
171174
}
172175

173176
func terminateAll() {
174-
let (sends, nexts) = state.withCriticalRegion { state -> ([UnsafeContinuation<UnsafeContinuation<Element?, Never>?, Never>], Set<Awaiting>) in
177+
let (sends, nexts) = state.withCriticalRegion { state -> ([Pending], Set<Awaiting>) in
175178
if state.terminal {
176179
return ([], [])
177180
}
@@ -188,14 +191,16 @@ public final class AsyncChannel<Element: Sendable>: AsyncSequence, Sendable {
188191
}
189192
}
190193
for send in sends {
191-
send.resume(returning: nil)
194+
send.continuation?.resume(returning: nil)
192195
}
193196
for next in nexts {
194197
next.continuation?.resume(returning: nil)
195198
}
196199
}
197200

198201
func _send(_ element: Element) async {
202+
let generation = establish()
203+
199204
await withTaskCancellationHandler {
200205
terminateAll()
201206
} operation: {
@@ -206,10 +211,10 @@ public final class AsyncChannel<Element: Sendable>: AsyncSequence, Sendable {
206211
}
207212
switch state.emission {
208213
case .idle:
209-
state.emission = .pending([continuation])
214+
state.emission = .pending([Pending(generation: generation, continuation: continuation)])
210215
return nil
211216
case .pending(var sends):
212-
sends.append(continuation)
217+
sends.append(Pending(generation: generation, continuation: continuation))
213218
state.emission = .pending(sends)
214219
return nil
215220
case .awaiting(var nexts):

Sources/AsyncAlgorithms/UnsafeResumption.swift

+5-5
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,22 @@
11
struct UnsafeResumption<Success, Failure: Error> {
2-
let continuation: UnsafeContinuation<Success, Failure>
2+
let continuation: UnsafeContinuation<Success, Failure>?
33
let result: Result<Success, Failure>
44

5-
init(continuation: UnsafeContinuation<Success, Failure>, result: Result<Success, Failure>) {
5+
init(continuation: UnsafeContinuation<Success, Failure>?, result: Result<Success, Failure>) {
66
self.continuation = continuation
77
self.result = result
88
}
99

10-
init(continuation: UnsafeContinuation<Success, Failure>, success: Success) {
10+
init(continuation: UnsafeContinuation<Success, Failure>?, success: Success) {
1111
self.init(continuation: continuation, result: .success(success))
1212
}
1313

14-
init(continuation: UnsafeContinuation<Success, Failure>, failure: Failure) {
14+
init(continuation: UnsafeContinuation<Success, Failure>?, failure: Failure) {
1515
self.init(continuation: continuation, result: .failure(failure))
1616
}
1717

1818
func resume() {
19-
continuation.resume(with: result)
19+
continuation?.resume(with: result)
2020
}
2121
}
2222

0 commit comments

Comments
 (0)