-
Notifications
You must be signed in to change notification settings - Fork 428
Add an async throwing source #1252
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@@ -0,0 +1,59 @@ | |||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was copied from #1245
cc @Davidde94 |
/// Provide an element to the sequence. | ||
/// | ||
/// - Parameter element: The element to provide to the sequence. | ||
/// - Returns: A `YieldResult` indicating whether the sequence accepted the element or not. | ||
@discardableResult | ||
internal func yield(_ element: Element) -> YieldResult { | ||
return self.storage.yield(element) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Who would use the yield
method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a ChannelHandler
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is called by a channel handler, this might become interesting with the generic element here.
fileprivate init(storage: AsyncThrowingSourceStorage<Element, Failure>) { | ||
self.storage = storage | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are missing a deinit here, I'm afraid. If the consumer is not interested in your messages anymore, they might want to cancel the stream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The best way to achieve this, is to back the iterator by a class that references the storage.
internal func makeAsyncIterator() -> Iterator { | ||
return Iterator(storage: self.storage) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what happens, if the consumer never calls makeAsyncIterator
and the stream goes out of scope?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then any buffered elements would be dropped; but that's somewhat expected if they never call makeAsyncIterator
? Feels like I'm missing your point here...
XCTAssertEqual(elements, 0) | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add tests:
- user creates iterator but never consumes iterator
- user never creates iterator
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you elaborate? Not sure I understand what we would actually be asserting in each case.
Motivation: `AsyncThrowingStream` provides an implementation of `AsyncSequence` which allows the holder to provide new values to that sequence from within a closure provided to the initializer. This API doesn't fit our needs: we must be able to provide the values 'from the outside' rather than during initialization. Modifications: - Add an `AsyncThrowingSource`, an implementation of `AsyncSequence` which may have values provided to it via `yield(_:)` and terminated with `finish()` or `finish(throwing:)`. - Add tests and a few `AsyncSequence` helpers. Result: We have an `AsyncSequence` implementation which can have values provided to it.
050f570
to
91661a4
Compare
internal func consumeNextElement() async throws -> Element? { | ||
return try await withCheckedThrowingContinuation { | ||
self.consumeNextElement(continuation: $0) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume this should be @inlinable
?
internal func next() async throws -> Element? { | ||
return try await self.storage.consumeNextElement() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be @inlinable
?
5b636c4
to
eabd681
Compare
Motivation: `AsyncThrowingStream` provides an implementation of `AsyncSequence` which allows the holder to provide new values to that sequence from within a closure provided to the initializer. This API doesn't fit our needs: we must be able to provide the values 'from the outside' rather than during initialization. Modifications: - Add a `PassthroughMessageSequence`, an implementation of `AsyncSequence` which consumes messages from a `PassthroughMessagesSource`. - The source may have values provided to it via `yield(_:)` and terminated with `finish()` or `finish(throwing:)`. - Add tests and a few `AsyncSequence` helpers. Result: We have an `AsyncSequence` implementation which can have values provided to it.
Motivation: `AsyncThrowingStream` provides an implementation of `AsyncSequence` which allows the holder to provide new values to that sequence from within a closure provided to the initializer. This API doesn't fit our needs: we must be able to provide the values 'from the outside' rather than during initialization. Modifications: - Add a `PassthroughMessageSequence`, an implementation of `AsyncSequence` which consumes messages from a `PassthroughMessagesSource`. - The source may have values provided to it via `yield(_:)` and terminated with `finish()` or `finish(throwing:)`. - Add tests and a few `AsyncSequence` helpers. Result: We have an `AsyncSequence` implementation which can have values provided to it.
Motivation: `AsyncThrowingStream` provides an implementation of `AsyncSequence` which allows the holder to provide new values to that sequence from within a closure provided to the initializer. This API doesn't fit our needs: we must be able to provide the values 'from the outside' rather than during initialization. Modifications: - Add a `PassthroughMessageSequence`, an implementation of `AsyncSequence` which consumes messages from a `PassthroughMessagesSource`. - The source may have values provided to it via `yield(_:)` and terminated with `finish()` or `finish(throwing:)`. - Add tests and a few `AsyncSequence` helpers. Result: We have an `AsyncSequence` implementation which can have values provided to it.
Motivation: `AsyncThrowingStream` provides an implementation of `AsyncSequence` which allows the holder to provide new values to that sequence from within a closure provided to the initializer. This API doesn't fit our needs: we must be able to provide the values 'from the outside' rather than during initialization. Modifications: - Add a `PassthroughMessageSequence`, an implementation of `AsyncSequence` which consumes messages from a `PassthroughMessagesSource`. - The source may have values provided to it via `yield(_:)` and terminated with `finish()` or `finish(throwing:)`. - Add tests and a few `AsyncSequence` helpers. Result: We have an `AsyncSequence` implementation which can have values provided to it.
Motivation: `AsyncThrowingStream` provides an implementation of `AsyncSequence` which allows the holder to provide new values to that sequence from within a closure provided to the initializer. This API doesn't fit our needs: we must be able to provide the values 'from the outside' rather than during initialization. Modifications: - Add a `PassthroughMessageSequence`, an implementation of `AsyncSequence` which consumes messages from a `PassthroughMessagesSource`. - The source may have values provided to it via `yield(_:)` and terminated with `finish()` or `finish(throwing:)`. - Add tests and a few `AsyncSequence` helpers. Result: We have an `AsyncSequence` implementation which can have values provided to it.
Motivation:
AsyncThrowingStream
provides an implementation ofAsyncSequence
which allows the holder to provide new values to that sequence from
within a closure provided to the initializer. This API doesn't fit our
needs: we must be able to provide the values 'from the outside' rather
than during initialization.
Modifications:
AsyncThrowingSource
, an implementation ofAsyncSequence
which may have values provided to it via
yield(_:)
and terminatedwith
finish()
orfinish(throwing:)
.AsyncSequence
helpers.Result:
We have an
AsyncSequence
implementation which can have values providedto it.