Skip to content

Commit be52a2d

Browse files
committed
channel: update documentation for cancellation
1 parent bdd7bc7 commit be52a2d

File tree

4 files changed

+23
-14
lines changed

4 files changed

+23
-14
lines changed

Package.swift

+5-2
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,12 @@ let package = Package(
1616
.library(name: "_CAsyncSequenceValidationSupport", type: .static, targets: ["AsyncSequenceValidation"]),
1717
.library(name: "AsyncAlgorithms_XCTest", targets: ["AsyncAlgorithms_XCTest"]),
1818
],
19-
dependencies: [],
19+
dependencies: [.package(url: "https://github.com/apple/swift-collections.git", .upToNextMajor(from: "1.0.3"))],
2020
targets: [
21-
.target(name: "AsyncAlgorithms"),
21+
.target(
22+
name: "AsyncAlgorithms",
23+
dependencies: [.product(name: "Collections", package: "swift-collections")]
24+
),
2225
.target(
2326
name: "AsyncSequenceValidation",
2427
dependencies: ["_CAsyncSequenceValidationSupport", "AsyncAlgorithms"]),

Sources/AsyncAlgorithms/AsyncAlgorithms.docc/Guides/Channel.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public final class AsyncThrowingChannel<Element: Sendable, Failure: Error>: Asyn
5151
}
5252
```
5353

54-
Channels are intended to be used as communication types between tasks. Particularly when one task produces values and another task consumes said values. On the one hand, the back pressure applied by `send(_:)` via the suspension/resume ensures that the production of values does not exceed the consumption of values from iteration. This method suspends after enqueuing the event and is resumed when the next call to `next()` on the `Iterator` is made. On the other hand, the call to `finish()` or `fail(_:)` immediately resumes all the pending operations for every producers and consumers. Thus, every suspended `send(_:)` operations instantly resume, so as every suspended `next()` operations by producing a nil value, or by throwing an error, indicating the termination of the iterations. Further calls to `send(_:)` will immediately resume.
54+
Channels are intended to be used as communication types between tasks. Particularly when one task produces values and another task consumes said values. On the one hand, the back pressure applied by `send(_:)` via the suspension/resume ensures that the production of values does not exceed the consumption of values from iteration. This method suspends after enqueuing the event and is resumed when the next call to `next()` on the `Iterator` is made. On the other hand, the call to `finish()` or `fail(_:)` immediately resumes all the pending operations for every producers and consumers. Thus, every suspended `send(_:)` operations instantly resume, so as every suspended `next()` operations by producing a nil value, or by throwing an error, indicating the termination of the iterations. Further calls to `send(_:)` will immediately resume. The calls to `send(:)` and `next()` will immediately resume when their supporting task is cancelled, other operations from other tasks will remain active.
5555

5656
```swift
5757
let channel = AsyncChannel<String>()

Sources/AsyncAlgorithms/AsyncChannel.swift

+9-6
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
//
1010
//===----------------------------------------------------------------------===//
1111

12+
import OrderedCollections
13+
1214
/// A channel for sending elements from one task to another with back pressure.
1315
///
1416
/// The `AsyncChannel` class is intended to be used as a communication type between tasks,
@@ -86,8 +88,8 @@ public final class AsyncChannel<Element: Sendable>: AsyncSequence, Sendable {
8688

8789
enum Emission {
8890
case idle
89-
case pending(Set<Pending>)
90-
case awaiting(Set<Awaiting>)
91+
case pending(OrderedSet<Pending>)
92+
case awaiting(OrderedSet<Awaiting>)
9193
case finished
9294
}
9395

@@ -158,7 +160,7 @@ public final class AsyncChannel<Element: Sendable>: AsyncSequence, Sendable {
158160
}
159161
return UnsafeResumption(continuation: send.continuation, success: continuation)
160162
case .awaiting(var nexts):
161-
nexts.update(with: Awaiting(generation: generation, continuation: continuation))
163+
nexts.updateOrAppend(Awaiting(generation: generation, continuation: continuation))
162164
state.emission = .awaiting(nexts)
163165
return nil
164166
case .finished:
@@ -213,7 +215,7 @@ public final class AsyncChannel<Element: Sendable>: AsyncSequence, Sendable {
213215
state.emission = .pending([Pending(generation: generation, continuation: continuation)])
214216
return nil
215217
case .pending(var sends):
216-
sends.update(with: Pending(generation: generation, continuation: continuation))
218+
sends.updateOrAppend(Pending(generation: generation, continuation: continuation))
217219
state.emission = .pending(sends)
218220
return nil
219221
case .awaiting(var nexts):
@@ -235,6 +237,7 @@ public final class AsyncChannel<Element: Sendable>: AsyncSequence, Sendable {
235237
/// Send an element to an awaiting iteration. This function will resume when the next call to `next()` is made
236238
/// or when a call to `finish()` is made from another Task.
237239
/// If the channel is already finished then this returns immediately
240+
/// If the task is cancelled, this function will resume. Other sending operations from other tasks will remain active.
238241
public func send(_ element: Element) async {
239242
let generation = establish()
240243
let sendTokenStatus = ManagedCriticalState<ChannelTokenStatus>(.new)
@@ -249,8 +252,8 @@ public final class AsyncChannel<Element: Sendable>: AsyncSequence, Sendable {
249252
/// Send a finish to all awaiting iterations.
250253
/// All subsequent calls to `next(_:)` will resume immediately.
251254
public func finish() {
252-
let (sends, nexts) = state.withCriticalRegion { state -> (Set<Pending>, Set<Awaiting>) in
253-
let result: (Set<Pending>, Set<Awaiting>)
255+
let (sends, nexts) = state.withCriticalRegion { state -> (OrderedSet<Pending>, OrderedSet<Awaiting>) in
256+
let result: (OrderedSet<Pending>, OrderedSet<Awaiting>)
254257

255258
switch state.emission {
256259
case .idle:

Sources/AsyncAlgorithms/AsyncThrowingChannel.swift

+8-5
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
//
1010
//===----------------------------------------------------------------------===//
1111

12+
import OrderedCollections
13+
1214
/// An error-throwing channel for sending elements from on task to another with back pressure.
1315
///
1416
/// The `AsyncThrowingChannel` class is intended to be used as a communication types between tasks,
@@ -95,8 +97,8 @@ public final class AsyncThrowingChannel<Element: Sendable, Failure: Error>: Asyn
9597

9698
enum Emission {
9799
case idle
98-
case pending(Set<Pending>)
99-
case awaiting(Set<Awaiting>)
100+
case pending(OrderedSet<Pending>)
101+
case awaiting(OrderedSet<Awaiting>)
100102
case terminated(Termination)
101103
}
102104

@@ -167,7 +169,7 @@ public final class AsyncThrowingChannel<Element: Sendable, Failure: Error>: Asyn
167169
}
168170
return UnsafeResumption(continuation: send.continuation, success: continuation)
169171
case .awaiting(var nexts):
170-
nexts.update(with: Awaiting(generation: generation, continuation: continuation))
172+
nexts.updateOrAppend(Awaiting(generation: generation, continuation: continuation))
171173
state.emission = .awaiting(nexts)
172174
return nil
173175
case .terminated(let termination):
@@ -235,7 +237,7 @@ public final class AsyncThrowingChannel<Element: Sendable, Failure: Error>: Asyn
235237
state.emission = .pending([Pending(generation: generation, continuation: continuation)])
236238
return nil
237239
case .pending(var sends):
238-
sends.update(with: Pending(generation: generation, continuation: continuation))
240+
sends.updateOrAppend(Pending(generation: generation, continuation: continuation))
239241
state.emission = .pending(sends)
240242
return nil
241243
case .awaiting(var nexts):
@@ -255,7 +257,7 @@ public final class AsyncThrowingChannel<Element: Sendable, Failure: Error>: Asyn
255257
}
256258

257259
func terminateAll(error: Failure? = nil) {
258-
let (sends, nexts) = state.withCriticalRegion { state -> (Set<Pending>, Set<Awaiting>) in
260+
let (sends, nexts) = state.withCriticalRegion { state -> (OrderedSet<Pending>, OrderedSet<Awaiting>) in
259261

260262
let nextState: Emission
261263
if let error = error {
@@ -297,6 +299,7 @@ public final class AsyncThrowingChannel<Element: Sendable, Failure: Error>: Asyn
297299
/// Send an element to an awaiting iteration. This function will resume when the next call to `next()` is made
298300
/// or when a call to `finish()`/`fail(_:)` is made from another Task.
299301
/// If the channel is already finished then this returns immediately
302+
/// If the task is cancelled, this function will resume. Other sending operations from other tasks will remain active.
300303
public func send(_ element: Element) async {
301304
let generation = establish()
302305
let sendTokenStatus = ManagedCriticalState<ChannelTokenStatus>(.new)

0 commit comments

Comments
 (0)