Skip to content

Commit 0568b8e

Browse files
simonjbeaumontglbrntt
authored andcommitted
[async-await] Base types for server implementation (grpc#1249)
This commit implements some of the types required by the proposal for async/await support, added in grpc#1231. To aid reviewing, only the types required for the server are included. They have been pulled in from the proof-of-concept implementation linked from the proposal PR. It is a complimentary PR to grpc#1243 ("Async-await: Base types for client implementation"). It provides a unified `AsyncServerHandler` for all of the RPC types which avoids a substantial amount of code duplication that is found in the existing handlers. Wrappers are provided for the four RPC types. Otherwise it is analogous to the existing `BidirectionalStreamingServerHandler`. It's worth calling out that this PR makes use of some placeholder types which are not intended to be final. Specifically: * `AsyncResponseStreamWriter` is expected to be superseded by the `AsyncWriter` from grpc#1245. * `AsyncServerCallContext` conformance has been added to the existing `ServerCallContextBase`. It is expected that we will provide a new implementation of `AsyncServerCallContext` that is independent from the existing call context types.
1 parent 8bbc0ff commit 0568b8e

10 files changed

+1331
-28
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Copyright 2021, gRPC Authors All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#if compiler(>=5.5)
18+
19+
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
20+
extension CancellationError: GRPCStatusTransformable {
21+
public func makeGRPCStatus() -> GRPCStatus {
22+
return GRPCStatus(code: .unavailable, message: nil)
23+
}
24+
}
25+
26+
#endif
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Copyright 2021, gRPC Authors All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#if compiler(>=5.5)
18+
19+
/// This is currently a wrapper around AsyncThrowingStream because we want to be
20+
/// able to swap out the implementation for something else in the future.
21+
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
22+
public struct GRPCAsyncRequestStream<Element>: AsyncSequence {
23+
@usableFromInline
24+
internal typealias _WrappedStream = PassthroughMessageSequence<Element, Error>
25+
26+
@usableFromInline
27+
internal let _stream: _WrappedStream
28+
29+
@inlinable
30+
internal init(_ stream: _WrappedStream) {
31+
self._stream = stream
32+
}
33+
34+
@inlinable
35+
public func makeAsyncIterator() -> Iterator {
36+
Self.AsyncIterator(self._stream)
37+
}
38+
39+
public struct Iterator: AsyncIteratorProtocol {
40+
@usableFromInline
41+
internal var iterator: _WrappedStream.AsyncIterator
42+
43+
@usableFromInline
44+
internal init(_ stream: _WrappedStream) {
45+
self.iterator = stream.makeAsyncIterator()
46+
}
47+
48+
@inlinable
49+
public mutating func next() async throws -> Element? {
50+
try await self.iterator.next()
51+
}
52+
}
53+
}
54+
55+
#endif
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Copyright 2021, gRPC Authors All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#if compiler(>=5.5)
18+
19+
/// Writer for server-streaming RPC handlers to provide responses.
20+
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
21+
public struct GRPCAsyncResponseStreamWriter<Response> {
22+
@usableFromInline
23+
internal typealias Element = (Response, Compression)
24+
25+
@usableFromInline
26+
internal typealias Delegate = AsyncResponseStreamWriterDelegate<Response>
27+
28+
@usableFromInline
29+
internal let asyncWriter: AsyncWriter<Delegate>
30+
31+
@inlinable
32+
internal init(wrapping asyncWriter: AsyncWriter<Delegate>) {
33+
self.asyncWriter = asyncWriter
34+
}
35+
36+
@inlinable
37+
public func send(
38+
_ response: Response,
39+
compression: Compression = .deferToCallDefault
40+
) async throws {
41+
try await self.asyncWriter.write((response, compression))
42+
}
43+
}
44+
45+
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
46+
@usableFromInline
47+
internal final class AsyncResponseStreamWriterDelegate<Response>: AsyncWriterDelegate {
48+
@usableFromInline
49+
internal typealias Element = (Response, Compression)
50+
51+
@usableFromInline
52+
internal typealias End = GRPCStatus
53+
54+
@usableFromInline
55+
internal let _context: GRPCAsyncServerCallContext
56+
57+
@usableFromInline
58+
internal let _send: (Response, MessageMetadata) -> Void
59+
60+
@usableFromInline
61+
internal let _finish: (GRPCStatus) -> Void
62+
63+
@usableFromInline
64+
internal let _compressionEnabledOnServer: Bool
65+
66+
// Create a new AsyncResponseStreamWriterDelegate.
67+
//
68+
// - Important: the `send` and `finish` closures must be thread-safe.
69+
@inlinable
70+
internal init(
71+
context: GRPCAsyncServerCallContext,
72+
compressionIsEnabled: Bool,
73+
send: @escaping (Response, MessageMetadata) -> Void,
74+
finish: @escaping (GRPCStatus) -> Void
75+
) {
76+
self._context = context
77+
self._compressionEnabledOnServer = compressionIsEnabled
78+
self._send = send
79+
self._finish = finish
80+
}
81+
82+
@inlinable
83+
internal func _shouldCompress(_ compression: Compression) -> Bool {
84+
guard self._compressionEnabledOnServer else {
85+
return false
86+
}
87+
return compression.isEnabled(callDefault: self._context.compressionEnabled)
88+
}
89+
90+
@inlinable
91+
internal func _send(
92+
_ response: Response,
93+
compression: Compression = .deferToCallDefault
94+
) {
95+
let compress = self._shouldCompress(compression)
96+
self._send(response, .init(compress: compress, flush: true))
97+
}
98+
99+
// MARK: - AsyncWriterDelegate conformance.
100+
101+
@inlinable
102+
internal func write(_ element: (Response, Compression)) {
103+
self._send(element.0, compression: element.1)
104+
}
105+
106+
@inlinable
107+
internal func writeEnd(_ end: GRPCStatus) {
108+
self._finish(end)
109+
}
110+
}
111+
112+
#endif
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Copyright 2021, gRPC Authors All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
#if compiler(>=5.5)
17+
18+
import Logging
19+
import NIOConcurrencyHelpers
20+
import NIOHPACK
21+
22+
// We use a `class` here because we do not want copy-on-write semantics. The instance that the async
23+
// handler holds must not diverge from the instance the implementor of the RPC holds. They hold these
24+
// instances on different threads (EventLoop vs Task).
25+
//
26+
// We considered wrapping this in a `struct` and pass it `inout` to the RPC. This would communicate
27+
// explicitly that it stores mutable state. However, without copy-on-write semantics, this could
28+
// make for a surprising API.
29+
//
30+
// We also considered an `actor` but that felt clunky at the point of use since adopters would need
31+
// to `await` the retrieval of a logger or the updating of the trailers and each would requrie a
32+
// promise to glue the NIO and async-await paradigms in the handler.
33+
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
34+
public final class GRPCAsyncServerCallContext {
35+
private let lock = Lock()
36+
37+
/// Request headers for this request.
38+
public let headers: HPACKHeaders
39+
40+
/// The logger used for this call.
41+
public var logger: Logger {
42+
get { self.lock.withLock {
43+
self._logger
44+
} }
45+
set { self.lock.withLock {
46+
self._logger = newValue
47+
} }
48+
}
49+
50+
@usableFromInline
51+
internal var _logger: Logger
52+
53+
/// Whether compression should be enabled for responses, defaulting to `true`. Note that for
54+
/// this value to take effect compression must have been enabled on the server and a compression
55+
/// algorithm must have been negotiated with the client.
56+
public var compressionEnabled: Bool {
57+
get { self.lock.withLock {
58+
self._compressionEnabled
59+
} }
60+
set { self.lock.withLock {
61+
self._compressionEnabled = newValue
62+
} }
63+
}
64+
65+
private var _compressionEnabled: Bool = true
66+
67+
/// A `UserInfo` dictionary which is shared with the interceptor contexts for this RPC.
68+
///
69+
/// - Important: While `UserInfo` has value-semantics, this property retrieves from, and sets a
70+
/// reference wrapped `UserInfo`. The contexts passed to interceptors provide the same
71+
/// reference. As such this may be used as a mechanism to pass information between interceptors
72+
/// and service providers.
73+
public var userInfo: UserInfo {
74+
get { self.lock.withLock {
75+
self.userInfoRef.value
76+
} }
77+
set { self.lock.withLock {
78+
self.userInfoRef.value = newValue
79+
} }
80+
}
81+
82+
/// A reference to an underlying `UserInfo`. We share this with the interceptors.
83+
@usableFromInline
84+
internal let userInfoRef: Ref<UserInfo>
85+
86+
/// Metadata to return at the end of the RPC. If this is required it should be updated before
87+
/// the `responsePromise` or `statusPromise` is fulfilled.
88+
public var trailers: HPACKHeaders {
89+
get { self.lock.withLock {
90+
return self._trailers
91+
} }
92+
set { self.lock.withLock {
93+
self._trailers = newValue
94+
} }
95+
}
96+
97+
private var _trailers: HPACKHeaders = [:]
98+
99+
@inlinable
100+
internal init(
101+
headers: HPACKHeaders,
102+
logger: Logger,
103+
userInfoRef: Ref<UserInfo>
104+
) {
105+
self.headers = headers
106+
self.userInfoRef = userInfoRef
107+
self._logger = logger
108+
}
109+
}
110+
111+
#endif

0 commit comments

Comments
 (0)