Skip to content

Commit e226f94

Browse files
committed
Update collect method with cancellation handling
The `collect()` function in the `AsyncSequence` extension has been modified to handle cancellation. 1. The return type of `collect()` has changed from `[Element]` to `[Element]?`. 2. New functionality added: - Returns `nil` if the task is cancelled before any elements are emitted. - Returns the collected elements even if the sequence is cancelled after emitting some elements.
1 parent e331563 commit e226f94

File tree

3 files changed

+104
-6
lines changed

3 files changed

+104
-6
lines changed

Sources/TestingSupport/AsyncSequence+Collect.swift

Lines changed: 74 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,79 @@ import AsyncAlgorithms
33
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
44
extension AsyncSequence {
55

6-
/// Returns the array of the elements of the asynchronous sequence.
7-
@inlinable package func collect() async rethrows -> [Element] {
8-
try await reduce(into: []) { $0.append($1) }
6+
/// Collects all elements of an `AsyncSequence` into an array.
7+
///
8+
/// This method consumes the entire asynchronous sequence and accumulates its elements into a single array.
9+
/// It returns `nil` if the task is cancelled before any elements are emitted. If the task is cancelled
10+
/// after emitting elements, the collected elements are still returned. If the sequence completes without
11+
/// producing any elements, an empty array is returned (unless cancelled).
12+
///
13+
/// Rethrows any error thrown by the underlying asynchronous sequence.
14+
///
15+
/// - Returns: An array of elements from the sequence, or `nil` if the task was cancelled before producing any elements.
16+
///
17+
/// ### Usage Examples
18+
///
19+
/// Collecting elements from a regular sequence:
20+
/// ```swift
21+
/// let sequence = [1, 2, 3].async
22+
/// let result = await sequence.collect()
23+
/// // result == [1, 2, 3]
24+
/// ```
25+
///
26+
/// Collecting from an empty sequence:
27+
/// ```swift
28+
/// let stream = AsyncStream<Int> { $0.finish() }
29+
/// let result = await stream.collect()
30+
/// // result == []
31+
/// ```
32+
///
33+
/// Handling cancellation:
34+
/// ```swift
35+
/// let task = Task {
36+
/// let stream = AsyncStream<Int> { _ in }
37+
/// return await stream.collect()
38+
/// }
39+
/// task.cancel()
40+
/// let result = await task.value
41+
/// // result == nil
42+
/// ```
43+
///
44+
/// Cancellation after yielding some elements:
45+
/// ```swift
46+
/// let task = Task {
47+
/// let stream = AsyncStream<Int> {
48+
/// for i in 1...3 {
49+
/// $0.yield(i)
50+
/// }
51+
/// }
52+
/// return await stream.collect()
53+
/// }
54+
/// task.cancel()
55+
/// let result = await task.value
56+
/// // result == [1, 2, 3]
57+
/// ```
58+
///
59+
/// Handling errors:
60+
/// ```swift
61+
/// struct MyError: Error {}
62+
///
63+
/// let stream = AsyncThrowingStream<Int, Error> {
64+
/// throw MyError()
65+
/// }
66+
///
67+
/// do {
68+
/// _ = try await stream.collect()
69+
/// } catch {
70+
/// print("Caught error: \(error)") // Caught error: MyError
71+
/// }
72+
/// ```
73+
@inlinable package func collect() async rethrows -> [Element]? {
74+
let elements = try await reduce(into: []) { $0.append($1) }
75+
return if elements.isEmpty {
76+
Task.isCancelled ? nil : []
77+
} else {
78+
elements
79+
}
980
}
1081
}

Tests/AsyncMaterializedSequenceTests/AsyncMaterializedSequenceTests.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ struct AsyncMaterializedSequenceTests {
5555
}
5656

5757
let sequence = source.materialize()
58-
let events = await sequence.collect()
58+
let events = await sequence.collect() ?? []
5959

6060
#expect(events.count == 1)
6161

Tests/TestingSupportTests/AsyncSequence+CollectTests.swift

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,34 @@ struct AsyncSequenceCollectTests {
1717

1818
@Test func collect_produces_empty_array_when_source_sequence_is_completed_without_elements() async throws {
1919
let stream = AsyncStream<Int> { $0.finish() }
20-
await #expect(stream.collect().isEmpty == true)
20+
await #expect(stream.collect()?.isEmpty == true)
21+
}
22+
23+
@Test func collect_produces_nil_when_source_sequence_is_cancelled() async throws {
24+
let task = Task {
25+
let stream = AsyncStream<Int> { _ in }
26+
return await stream.collect()
27+
}
28+
29+
task.cancel()
30+
31+
await #expect(task.value == nil)
32+
}
33+
34+
@Test func collect_produces_array_of_elements_when_source_sequence_is_cancelled_and_produced_some_elements() async throws {
35+
let source = [1, 2, 3]
36+
let task = Task {
37+
let stream = AsyncStream<Int> {
38+
for value in source {
39+
$0.yield(value)
40+
}
41+
}
42+
return await stream.collect()
43+
}
44+
45+
task.cancel()
46+
47+
await #expect(task.value == source)
2148
}
2249

2350
@Test func collect_rethrows_error_when_source_sequence_is_failed() async throws {
@@ -29,4 +56,4 @@ struct AsyncSequenceCollectTests {
2956
_ = try await stream.collect()
3057
}
3158
}
32-
}
59+
}

0 commit comments

Comments
 (0)