-
Notifications
You must be signed in to change notification settings - Fork 428
Add an internal pausable writer #1245
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) | ||
fileprivate final class AsyncWriterStorage< | ||
Element, | ||
End, | ||
Delegate: AsyncWriterDelegate | ||
> where Delegate.Element == Element, Delegate.End == End { | ||
/// A value pending a write. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a hill I will die on, but if we put the Storage
into the AsyncWriter
, we don't need to redefine the types here, since they would be inherited from their enclosing struct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It'd rather have this than the extra level of indentation tbh!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, In this case we should still remove the Element
and End
from the generic parameters. They are implicit in the Delegate
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agh, yes, missed this one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW I think this change wouldn't be great for public APIs since Xcode suggests the "full" type name rather than any type aliases, e.g.: "AsyncWriter<DelegateImplType<String, Int>>.Element"
rather than "Element"
.
} | ||
|
||
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) | ||
fileprivate final class AsyncWriterStorage< |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you consider making this an actor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No -- pause
and resume
won't be called from asynchronous contexts.
} | ||
|
||
/// A lock which must be help when accessing or modifying mutable state held by this storage. | ||
private var lock: Lock |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this a var?
case let .element(pendingElement): | ||
self.delegate.write(pendingElement.value) | ||
pendingElement.continuation.resume() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure about this. Let's consider we have a buffer full with 16 writes waiting. After we have written the first element, do we want to allow more writes from the users side? There are still 15 writes pending...
Further this is in a non detached loop. This means we will def. write all 16 items, and we will signal to everyone waiting, you can write again...
If this was an actor, we could consider using await Task.yield
to check after a number of writes if the target is still writable and only in those cases to start succeed the continuations.
#if compiler(>=5.5) | ||
|
||
extension Result where Failure == Error { | ||
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) | ||
internal init(catching operation: @escaping () async throws -> Success) async { | ||
do { | ||
let success = try await operation() | ||
self = .success(success) | ||
} catch { | ||
self = .failure(error) | ||
} | ||
} | ||
} | ||
|
||
#endif // compiler(>=5.5) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this used anywhere? CMD+F doesn't show me a use.
writer.resume() | ||
try await written1 | ||
try await written2 | ||
XCTAssertEqual(delegate.elements.sorted(), ["wunch", "meat"].sorted()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we have sorted()
here? I would expect that we check that the elements arrive in order here.
writer.pause() | ||
|
||
async let w1: Void = writer.write("hitchcock") | ||
await Task.yield() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we have the Task.yield()
here?
} | ||
|
||
writer.resume() | ||
try await writer.finish(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe check delegate.element.isEmpty
try await writer.write("cheddar") | ||
} verify: { error in | ||
XCTAssertEqual(error as? AsyncWriterError, .alreadyFinished) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe check that write was not forwarded to delegate
|
||
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) | ||
internal func XCTAssertThrowsError<T>( | ||
_ expression: @escaping () async throws -> T, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just out of interest for AHC, can we make this an @autoclosure
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It wasn't supported when I implemented this but seems to work now!
|
||
// `writes` will never exceed `maxWritesBeforeYield` so unchecked arithmetic is okay here. | ||
writes &+= 1 | ||
if writes == self.maxWritesBeforeYield { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think, we also need to check, if the queue is empty? You want to reset your counter to 0 once you have written all elements.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
writes
is local to this function so doesn't need to be reset?
Although I suppose if writes == self.maxWritesBeforeYield
and self.pendingElements.isEmpty
then we'll yield even though we're about to exit the function anyway. Maybe that doesn't matter so much though.
// Still 0. | ||
XCTAssertEqual(delegate.end, 0) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
needs
testCallToFinishWhilePending()
testTooManyCallsToFinishWhilePending()
Motivation: Async RPCs which stream outbound messages (i.e. requests on the client, responses on the server) will be provided with a means to asynchronously send messages. This object should suspend if the underlying channel is not currently writable. Modifications: Add an 'AsyncWriter' as the underlying type for these objects which suspends if the writer is 'paused' and resumes when the writer is resumed. Result: We have a writer capable of suspending writes.
0aaff3b
to
7ad0403
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great! Really like how it turned out! Thanks!
Motivation: Async RPCs which stream outbound messages (i.e. requests on the client, responses on the server) will be provided with a means to asynchronously send messages. This object should suspend if the underlying channel is not currently writable. Modifications: Add an 'AsyncWriter' as the underlying type for these objects which suspends if the writer is 'paused' and resumes when the writer is resumed. Result: We have a writer capable of suspending writes.
This commit implements some of the types required by the proposal for async/await support, added in #1231. To aid reviewing, only the types required for the server are included. They have been pulled in from the proof-of-concept implementation linked from the proposal PR. It is a complimentary PR to #1243 ("Async-await: Base types for client implementation"). It provides a unified `AsyncServerHandler` for all of the RPC types which avoids a substantial amount of code duplication that is found in the existing handlers. Wrappers are provided for the four RPC types. Otherwise it is analogous to the existing `BidirectionalStreamingServerHandler`. It's worth calling out that this PR makes use of some placeholder types which are not intended to be final. Specifically: * `AsyncResponseStreamWriter` is expected to be superseded by the `AsyncWriter` from #1245. * `AsyncServerCallContext` conformance has been added to the existing `ServerCallContextBase`. It is expected that we will provide a new implementation of `AsyncServerCallContext` that is independent from the existing call context types.
Motivation: Async RPCs which stream outbound messages (i.e. requests on the client, responses on the server) will be provided with a means to asynchronously send messages. This object should suspend if the underlying channel is not currently writable. Modifications: Add an 'AsyncWriter' as the underlying type for these objects which suspends if the writer is 'paused' and resumes when the writer is resumed. Result: We have a writer capable of suspending writes.
This commit implements some of the types required by the proposal for async/await support, added in grpc#1231. To aid reviewing, only the types required for the server are included. They have been pulled in from the proof-of-concept implementation linked from the proposal PR. It is a complimentary PR to grpc#1243 ("Async-await: Base types for client implementation"). It provides a unified `AsyncServerHandler` for all of the RPC types which avoids a substantial amount of code duplication that is found in the existing handlers. Wrappers are provided for the four RPC types. Otherwise it is analogous to the existing `BidirectionalStreamingServerHandler`. It's worth calling out that this PR makes use of some placeholder types which are not intended to be final. Specifically: * `AsyncResponseStreamWriter` is expected to be superseded by the `AsyncWriter` from grpc#1245. * `AsyncServerCallContext` conformance has been added to the existing `ServerCallContextBase`. It is expected that we will provide a new implementation of `AsyncServerCallContext` that is independent from the existing call context types.
Motivation: Async RPCs which stream outbound messages (i.e. requests on the client, responses on the server) will be provided with a means to asynchronously send messages. This object should suspend if the underlying channel is not currently writable. Modifications: Add an 'AsyncWriter' as the underlying type for these objects which suspends if the writer is 'paused' and resumes when the writer is resumed. Result: We have a writer capable of suspending writes.
This commit implements some of the types required by the proposal for async/await support, added in grpc#1231. To aid reviewing, only the types required for the server are included. They have been pulled in from the proof-of-concept implementation linked from the proposal PR. It is a complimentary PR to grpc#1243 ("Async-await: Base types for client implementation"). It provides a unified `AsyncServerHandler` for all of the RPC types which avoids a substantial amount of code duplication that is found in the existing handlers. Wrappers are provided for the four RPC types. Otherwise it is analogous to the existing `BidirectionalStreamingServerHandler`. It's worth calling out that this PR makes use of some placeholder types which are not intended to be final. Specifically: * `AsyncResponseStreamWriter` is expected to be superseded by the `AsyncWriter` from grpc#1245. * `AsyncServerCallContext` conformance has been added to the existing `ServerCallContextBase`. It is expected that we will provide a new implementation of `AsyncServerCallContext` that is independent from the existing call context types.
Motivation: Async RPCs which stream outbound messages (i.e. requests on the client, responses on the server) will be provided with a means to asynchronously send messages. This object should suspend if the underlying channel is not currently writable. Modifications: Add an 'AsyncWriter' as the underlying type for these objects which suspends if the writer is 'paused' and resumes when the writer is resumed. Result: We have a writer capable of suspending writes.
This commit implements some of the types required by the proposal for async/await support, added in #1231. To aid reviewing, only the types required for the server are included. They have been pulled in from the proof-of-concept implementation linked from the proposal PR. It is a complimentary PR to #1243 ("Async-await: Base types for client implementation"). It provides a unified `AsyncServerHandler` for all of the RPC types which avoids a substantial amount of code duplication that is found in the existing handlers. Wrappers are provided for the four RPC types. Otherwise it is analogous to the existing `BidirectionalStreamingServerHandler`. It's worth calling out that this PR makes use of some placeholder types which are not intended to be final. Specifically: * `AsyncResponseStreamWriter` is expected to be superseded by the `AsyncWriter` from #1245. * `AsyncServerCallContext` conformance has been added to the existing `ServerCallContextBase`. It is expected that we will provide a new implementation of `AsyncServerCallContext` that is independent from the existing call context types.
Motivation: Async RPCs which stream outbound messages (i.e. requests on the client, responses on the server) will be provided with a means to asynchronously send messages. This object should suspend if the underlying channel is not currently writable. Modifications: Add an 'AsyncWriter' as the underlying type for these objects which suspends if the writer is 'paused' and resumes when the writer is resumed. Result: We have a writer capable of suspending writes.
This commit implements some of the types required by the proposal for async/await support, added in #1231. To aid reviewing, only the types required for the server are included. They have been pulled in from the proof-of-concept implementation linked from the proposal PR. It is a complimentary PR to #1243 ("Async-await: Base types for client implementation"). It provides a unified `AsyncServerHandler` for all of the RPC types which avoids a substantial amount of code duplication that is found in the existing handlers. Wrappers are provided for the four RPC types. Otherwise it is analogous to the existing `BidirectionalStreamingServerHandler`. It's worth calling out that this PR makes use of some placeholder types which are not intended to be final. Specifically: * `AsyncResponseStreamWriter` is expected to be superseded by the `AsyncWriter` from #1245. * `AsyncServerCallContext` conformance has been added to the existing `ServerCallContextBase`. It is expected that we will provide a new implementation of `AsyncServerCallContext` that is independent from the existing call context types.
Motivation:
Async RPCs which stream outbound messages (i.e. requests on the client,
responses on the server) will be provided with a means to asynchronously
send messages. This object should suspend if the underlying channel is
not currently writable.
Modifications:
Add an 'AsyncWriter' as the underlying type for these objects which
suspends if the writer is 'paused' and resumes when the writer is
resumed.
Result:
We have a writer capable of suspending writes.