Skip to content

Commit 2f577e5

Browse files
authored
Replace the placeholder client request and response mechanisms (#1264)
Motivation: The client code for async-await has some placeholder implementations for receiving responses and sending requests. Now that we have the necessary types we can replace the placeholders. Modifications: - Replace `AsyncStream` usage with a `PassthroughMessageSequence` - Add a `GRPCAsyncRequestStreamWriter` which request streaming RPCs may use to write requests and finish the request stream. - This writer depends on a delegate which uses callbacks to send messages on the underlying `Call`. - Request streaming RPCs now have a `requestStream` on which to send requests. - Wrap up `AsyncWriterError` as a `public` `struct`. Result: Clients may send requests on a `requestStream`.
1 parent 6c5c289 commit 2f577e5

10 files changed

+294
-200
lines changed

Sources/GRPC/AsyncAwaitSupport/AsyncWriter.swift

+33-8
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ internal final actor AsyncWriter<Delegate: AsyncWriterDelegate> {
239239
///
240240
/// The call may be suspend if the writer is paused.
241241
///
242-
/// Throws: ``AsyncWriterError`` if the writer has already been finished or too many write tasks
242+
/// Throws: ``GRPCAsyncWriterError`` if the writer has already been finished or too many write tasks
243243
/// have been suspended.
244244
@inlinable
245245
internal func write(_ element: Element) async throws {
@@ -256,15 +256,15 @@ internal final actor AsyncWriter<Delegate: AsyncWriterDelegate> {
256256
// - error (the writer is complete or the queue is full).
257257

258258
if self._completionState.isPendingOrCompleted {
259-
continuation.resume(throwing: AsyncWriterError.alreadyFinished)
259+
continuation.resume(throwing: GRPCAsyncWriterError.alreadyFinished)
260260
} else if !self._isPaused, self._pendingElements.isEmpty {
261261
self._delegate.write(element)
262262
continuation.resume()
263263
} else if self._pendingElements.count < self._maxPendingElements {
264264
// The continuation will be resumed later.
265265
self._pendingElements.append(PendingElement(element, continuation: continuation))
266266
} else {
267-
continuation.resume(throwing: AsyncWriterError.tooManyPendingWrites)
267+
continuation.resume(throwing: GRPCAsyncWriterError.tooManyPendingWrites)
268268
}
269269
}
270270

@@ -279,7 +279,7 @@ internal final actor AsyncWriter<Delegate: AsyncWriterDelegate> {
279279
@inlinable
280280
internal func _finish(_ end: End, continuation: CheckedContinuation<Void, Error>) {
281281
if self._completionState.isPendingOrCompleted {
282-
continuation.resume(throwing: AsyncWriterError.alreadyFinished)
282+
continuation.resume(throwing: GRPCAsyncWriterError.alreadyFinished)
283283
} else if !self._isPaused, self._pendingElements.isEmpty {
284284
self._completionState = .completed
285285
self._delegate.writeEnd(end)
@@ -291,10 +291,35 @@ internal final actor AsyncWriter<Delegate: AsyncWriterDelegate> {
291291
}
292292
}
293293

294-
@usableFromInline
295-
internal enum AsyncWriterError: Error, Hashable {
296-
case tooManyPendingWrites
297-
case alreadyFinished
294+
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
295+
extension AsyncWriter where End == Void {
296+
@inlinable
297+
internal func finish() async throws {
298+
try await self.finish(())
299+
}
300+
}
301+
302+
public struct GRPCAsyncWriterError: Error, Hashable {
303+
private let wrapped: Wrapped
304+
305+
@usableFromInline
306+
internal enum Wrapped {
307+
case tooManyPendingWrites
308+
case alreadyFinished
309+
}
310+
311+
@usableFromInline
312+
internal init(_ wrapped: Wrapped) {
313+
self.wrapped = wrapped
314+
}
315+
316+
/// There are too many writes pending. This may occur when too many Tasks are writing
317+
/// concurrently.
318+
public static let tooManyPendingWrites = Self(.tooManyPendingWrites)
319+
320+
/// The writer has already finished. This may occur when the RPC completes prematurely, or when
321+
/// a user calls finish more than once.
322+
public static let alreadyFinished = Self(.alreadyFinished)
298323
}
299324

300325
@usableFromInline
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright 2021, gRPC Authors All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
#if compiler(>=5.5)
17+
18+
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
19+
extension Call {
20+
internal func makeRequestStreamWriter() -> GRPCAsyncRequestStreamWriter<Request> {
21+
let delegate = GRPCAsyncRequestStreamWriter<Request>.Delegate(
22+
compressionEnabled: self.options.messageEncoding.enabledForRequests
23+
) { request, metadata in
24+
self.send(.message(request, metadata), promise: nil)
25+
} finish: {
26+
self.send(.end, promise: nil)
27+
}
28+
29+
return GRPCAsyncRequestStreamWriter(asyncWriter: .init(delegate: delegate))
30+
}
31+
}
32+
33+
#endif // compiler(>=5.5)

Sources/GRPC/AsyncAwaitSupport/GRPCAsyncBidirectionalStreamingCall.swift

+49-80
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,17 @@
1515
*/
1616
#if compiler(>=5.5)
1717

18-
import _NIOConcurrency
1918
import NIOHPACK
2019

2120
/// Async-await variant of BidirectionalStreamingCall.
2221
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
2322
public struct GRPCAsyncBidirectionalStreamingCall<Request, Response> {
2423
private let call: Call<Request, Response>
2524
private let responseParts: StreamingResponseParts<Response>
25+
private let responseSource: PassthroughMessageSource<Response, Error>
26+
27+
/// A request stream writer for sending messages to the server.
28+
public let requestStream: GRPCAsyncRequestStreamWriter<Request>
2629

2730
/// The stream of responses from the server.
2831
public let responses: GRPCAsyncResponseStream<Response>
@@ -74,93 +77,59 @@ public struct GRPCAsyncBidirectionalStreamingCall<Request, Response> {
7477

7578
private init(call: Call<Request, Response>) {
7679
self.call = call
77-
// Initialise `responseParts` with an empty response handler because we
78-
// provide the responses as an AsyncSequence in `responseStream`.
7980
self.responseParts = StreamingResponseParts(on: call.eventLoop) { _ in }
80-
81-
// Call and StreamingResponseParts are reference types so we grab a
82-
// referecence to them here to avoid capturing mutable self in the closure
83-
// passed to the AsyncThrowingStream initializer.
84-
//
85-
// The alternative would be to declare the responseStream as:
86-
// ```
87-
// public private(set) var responseStream: AsyncThrowingStream<ResponsePayload>!
88-
// ```
89-
//
90-
// UPDATE: Additionally we expect to replace this soon with an AsyncSequence
91-
// implementation that supports yielding values from outside the closure.
92-
let call = self.call
93-
let responseParts = self.responseParts
94-
let responseStream = AsyncThrowingStream(Response.self) { continuation in
95-
call.invokeStreamingRequests { error in
96-
responseParts.handleError(error)
97-
continuation.finish(throwing: error)
98-
} onResponsePart: { responsePart in
99-
responseParts.handle(responsePart)
100-
switch responsePart {
101-
case let .message(response):
102-
continuation.yield(response)
103-
case .metadata:
104-
break
105-
case .end:
106-
continuation.finish()
107-
}
108-
}
109-
}
110-
self.responses = .init(responseStream)
81+
self.responseSource = PassthroughMessageSource<Response, Error>()
82+
self.responses = .init(PassthroughMessageSequence(consuming: self.responseSource))
83+
self.requestStream = call.makeRequestStreamWriter()
11184
}
11285

11386
/// We expose this as the only non-private initializer so that the caller
11487
/// knows that invocation is part of initialisation.
11588
internal static func makeAndInvoke(call: Call<Request, Response>) -> Self {
116-
Self(call: call)
117-
}
118-
119-
// MARK: - Requests
120-
121-
/// Sends a message to the service.
122-
///
123-
/// - Important: Callers must terminate the stream of messages by calling `sendEnd()`.
124-
///
125-
/// - Parameters:
126-
/// - message: The message to send.
127-
/// - compression: Whether compression should be used for this message. Ignored if compression
128-
/// was not enabled for the RPC.
129-
public func sendMessage(
130-
_ message: Request,
131-
compression: Compression = .deferToCallDefault
132-
) async throws {
133-
let compress = self.call.compress(compression)
134-
let promise = self.call.eventLoop.makePromise(of: Void.self)
135-
self.call.send(.message(message, .init(compress: compress, flush: true)), promise: promise)
136-
// TODO: This waits for the message to be written to the socket. We should probably just wait for it to be written to the channel?
137-
try await promise.futureResult.get()
138-
}
139-
140-
/// Sends a sequence of messages to the service.
141-
///
142-
/// - Important: Callers must terminate the stream of messages by calling `sendEnd()`.
143-
///
144-
/// - Parameters:
145-
/// - messages: The sequence of messages to send.
146-
/// - compression: Whether compression should be used for this message. Ignored if compression
147-
/// was not enabled for the RPC.
148-
public func sendMessages<S>(
149-
_ messages: S,
150-
compression: Compression = .deferToCallDefault
151-
) async throws where S: Sequence, S.Element == Request {
152-
let promise = self.call.eventLoop.makePromise(of: Void.self)
153-
self.call.sendMessages(messages, compression: compression, promise: promise)
154-
try await promise.futureResult.get()
89+
let asyncCall = Self(call: call)
90+
91+
asyncCall.call.invokeStreamingRequests(
92+
onError: { error in
93+
asyncCall.responseParts.handleError(error)
94+
asyncCall.responseSource.finish(throwing: error)
95+
},
96+
onResponsePart: AsyncCall.makeResponsePartHandler(
97+
responseParts: asyncCall.responseParts,
98+
responseSource: asyncCall.responseSource
99+
)
100+
)
101+
102+
return asyncCall
155103
}
104+
}
156105

157-
/// Terminates a stream of messages sent to the service.
158-
///
159-
/// - Important: This should only ever be called once.
160-
public func sendEnd() async throws {
161-
let promise = self.call.eventLoop.makePromise(of: Void.self)
162-
self.call.send(.end, promise: promise)
163-
try await promise.futureResult.get()
106+
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
107+
internal enum AsyncCall {
108+
internal static func makeResponsePartHandler<Response>(
109+
responseParts: StreamingResponseParts<Response>,
110+
responseSource: PassthroughMessageSource<Response, Error>
111+
) -> (GRPCClientResponsePart<Response>) -> Void {
112+
return { responsePart in
113+
// Handle the metadata, trailers and status.
114+
responseParts.handle(responsePart)
115+
116+
// Handle the response messages and status.
117+
switch responsePart {
118+
case .metadata:
119+
()
120+
121+
case let .message(response):
122+
// TODO: when we support backpressure we will need to stop ignoring the return value.
123+
_ = responseSource.yield(response)
124+
125+
case let .end(status, _):
126+
if status.isOk {
127+
responseSource.finish()
128+
} else {
129+
responseSource.finish(throwing: status)
130+
}
131+
}
132+
}
164133
}
165134
}
166135

Sources/GRPC/AsyncAwaitSupport/GRPCAsyncClientStreamingCall.swift

+4-47
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ public struct GRPCAsyncClientStreamingCall<Request, Response> {
2323
private let call: Call<Request, Response>
2424
private let responseParts: UnaryResponseParts<Response>
2525

26+
/// A request stream writer for sending messages to the server.
27+
public let requestStream: GRPCAsyncRequestStreamWriter<Request>
28+
2629
/// The options used to make the RPC.
2730
public var options: CallOptions {
2831
return self.call.options
@@ -81,60 +84,14 @@ public struct GRPCAsyncClientStreamingCall<Request, Response> {
8184
onError: self.responseParts.handleError(_:),
8285
onResponsePart: self.responseParts.handle(_:)
8386
)
87+
self.requestStream = call.makeRequestStreamWriter()
8488
}
8589

8690
/// We expose this as the only non-private initializer so that the caller
8791
/// knows that invocation is part of initialisation.
8892
internal static func makeAndInvoke(call: Call<Request, Response>) -> Self {
8993
Self(call: call)
9094
}
91-
92-
// MARK: - Requests
93-
94-
/// Sends a message to the service.
95-
///
96-
/// - Important: Callers must terminate the stream of messages by calling `sendEnd()`.
97-
///
98-
/// - Parameters:
99-
/// - message: The message to send.
100-
/// - compression: Whether compression should be used for this message. Ignored if compression
101-
/// was not enabled for the RPC.
102-
public func sendMessage(
103-
_ message: Request,
104-
compression: Compression = .deferToCallDefault
105-
) async throws {
106-
let compress = self.call.compress(compression)
107-
let promise = self.call.eventLoop.makePromise(of: Void.self)
108-
self.call.send(.message(message, .init(compress: compress, flush: true)), promise: promise)
109-
// TODO: This waits for the message to be written to the socket. We should probably just wait for it to be written to the channel?
110-
try await promise.futureResult.get()
111-
}
112-
113-
/// Sends a sequence of messages to the service.
114-
///
115-
/// - Important: Callers must terminate the stream of messages by calling `sendEnd()`.
116-
///
117-
/// - Parameters:
118-
/// - messages: The sequence of messages to send.
119-
/// - compression: Whether compression should be used for this message. Ignored if compression
120-
/// was not enabled for the RPC.
121-
public func sendMessages<S>(
122-
_ messages: S,
123-
compression: Compression = .deferToCallDefault
124-
) async throws where S: Sequence, S.Element == Request {
125-
let promise = self.call.eventLoop.makePromise(of: Void.self)
126-
self.call.sendMessages(messages, compression: compression, promise: promise)
127-
try await promise.futureResult.get()
128-
}
129-
130-
/// Terminates a stream of messages sent to the service.
131-
///
132-
/// - Important: This should only ever be called once.
133-
public func sendEnd() async throws {
134-
let promise = self.call.eventLoop.makePromise(of: Void.self)
135-
self.call.send(.end, promise: promise)
136-
try await promise.futureResult.get()
137-
}
13895
}
13996

14097
#endif

0 commit comments

Comments
 (0)