Skip to content

[async-await] Base types for server implementation #1249

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 37 commits into from
Sep 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
1713de2
Async-await: Base types for server implementation
simonjbeaumont Aug 25, 2021
3d100a7
fixup: Add GRPC prefix and remove Payload suffix from types
simonjbeaumont Aug 26, 2021
49e633f
fixup: Remove comment
simonjbeaumont Aug 26, 2021
a09de1a
fixup: swiftformat now some type names are longer
simonjbeaumont Aug 26, 2021
1967797
fixup: Implement separate context and move status promise to handler
simonjbeaumont Aug 31, 2021
cb80654
fixup: Rename GRPCAsyncResponseStreamWriter.sendResponse to just .send
simonjbeaumont Aug 31, 2021
587259c
fixup: Rename AsyncServerHandlerTests.swift -> GRPCAsyncServerHandler…
simonjbeaumont Aug 31, 2021
9f306b5
fixup: Move #if compiler guards before imports
simonjbeaumont Aug 31, 2021
b4aa358
fixup: Remove unused imports
simonjbeaumont Aug 31, 2021
1d89bae
fixup: Add testHandlerThrowsGRPCStatusOK
simonjbeaumont Aug 31, 2021
2acfcb5
fixup: Validate just one request in unary and server-streaming handle…
simonjbeaumont Aug 31, 2021
04341f2
fixup: Rename GRPCAsyncStream to GRPCAsyncRequestStream
simonjbeaumont Aug 31, 2021
d8c0a2f
fixup: Remove __consuming from GRPCAsyncRequestStream.makeAsyncIterat…
simonjbeaumont Aug 31, 2021
c99c0bd
fixup: Rename State.observing to active and observer to userHandler
simonjbeaumont Aug 31, 2021
db2620f
fixup: Run swiftformat
simonjbeaumont Aug 31, 2021
9da3672
fixup: Make handler class internal and wrap in public struct
simonjbeaumont Aug 31, 2021
e77bc1f
fixup: @inlinable and @usableFromInline
simonjbeaumont Aug 31, 2021
b7b60d9
fixup: Prepend _ to @inlinable/@usableFromInline internal properties …
simonjbeaumont Sep 1, 2021
fd80c6b
fixup: Remove public init from context
simonjbeaumont Sep 1, 2021
c85eef3
fixup: Check for exactly one request in unary and server-streaming wr…
simonjbeaumont Sep 1, 2021
97cd688
fixup: Rename internal type from _GRPCAsyncServerHandler to AsyncServ…
simonjbeaumont Sep 1, 2021
0ea704a
fixup: Make GRPCAsyncServerCallContext.logger mutable
simonjbeaumont Sep 1, 2021
cdd1428
fixup: Treat the user handler throwing GRPCStatus.ok as an error
simonjbeaumont Sep 2, 2021
d668315
Make use of new stream source and pausable writer
simonjbeaumont Sep 2, 2021
c658d7e
fixup: Move tasks out of enum assoc data to avoid race
simonjbeaumont Sep 3, 2021
16e880c
fixup: swiftformat
simonjbeaumont Sep 3, 2021
96bc859
fixup: Remove .finishingSuccessfully state and use writer end state
simonjbeaumont Sep 6, 2021
9211bb8
fixup: Some @inlinable to request stream type
simonjbeaumont Sep 6, 2021
209c6fe
fixup: For consistency fix comments to refer to user handler (cf. fun…
simonjbeaumont Sep 6, 2021
32a35e5
fixup: Add TODO for supporting response headers
simonjbeaumont Sep 6, 2021
6443b08
fixup: Use a void promise now we are using the writer to carry the st…
simonjbeaumont Sep 6, 2021
73dfaae
fixup: await responseStreamWriter.cancel() in catch blocks
simonjbeaumont Sep 6, 2021
0c819a5
fixup: Eventloop dances for delegate callbacks
simonjbeaumont Sep 6, 2021
33b6582
fixup: Update docs for State.active
simonjbeaumont Sep 6, 2021
d2904bc
fixup: Remove unused promise param to interceptResponse
simonjbeaumont Sep 6, 2021
5a45fd0
fixup: Remove stale comment and move task cancellation higher in hand…
simonjbeaumont Sep 6, 2021
732f864
fixup: Add comments explaining use of class for context
simonjbeaumont Sep 6, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2021, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#if compiler(>=5.5)

@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
extension CancellationError: GRPCStatusTransformable {
public func makeGRPCStatus() -> GRPCStatus {
return GRPCStatus(code: .unavailable, message: nil)
}
}

#endif
55 changes: 55 additions & 0 deletions Sources/GRPC/AsyncAwaitSupport/GRPCAsyncRequestStream.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2021, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#if compiler(>=5.5)

/// This is currently a wrapper around AsyncThrowingStream because we want to be
/// able to swap out the implementation for something else in the future.
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
public struct GRPCAsyncRequestStream<Element>: AsyncSequence {
@usableFromInline
internal typealias _WrappedStream = PassthroughMessageSequence<Element, Error>

@usableFromInline
internal let _stream: _WrappedStream

@inlinable
internal init(_ stream: _WrappedStream) {
self._stream = stream
}

@inlinable
public func makeAsyncIterator() -> Iterator {
Self.AsyncIterator(self._stream)
}

public struct Iterator: AsyncIteratorProtocol {
@usableFromInline
internal var iterator: _WrappedStream.AsyncIterator

@usableFromInline
internal init(_ stream: _WrappedStream) {
self.iterator = stream.makeAsyncIterator()
}

@inlinable
public mutating func next() async throws -> Element? {
try await self.iterator.next()
}
}
}

#endif
112 changes: 112 additions & 0 deletions Sources/GRPC/AsyncAwaitSupport/GRPCAsyncResponseStreamWriter.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright 2021, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#if compiler(>=5.5)

/// Writer for server-streaming RPC handlers to provide responses.
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
public struct GRPCAsyncResponseStreamWriter<Response> {
@usableFromInline
internal typealias Element = (Response, Compression)

@usableFromInline
internal typealias Delegate = AsyncResponseStreamWriterDelegate<Response>

@usableFromInline
internal let asyncWriter: AsyncWriter<Delegate>

@inlinable
internal init(wrapping asyncWriter: AsyncWriter<Delegate>) {
self.asyncWriter = asyncWriter
}

@inlinable
public func send(
_ response: Response,
compression: Compression = .deferToCallDefault
) async throws {
try await self.asyncWriter.write((response, compression))
}
}

@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
@usableFromInline
internal final class AsyncResponseStreamWriterDelegate<Response>: AsyncWriterDelegate {
@usableFromInline
internal typealias Element = (Response, Compression)

@usableFromInline
internal typealias End = GRPCStatus

@usableFromInline
internal let _context: GRPCAsyncServerCallContext

@usableFromInline
internal let _send: (Response, MessageMetadata) -> Void

@usableFromInline
internal let _finish: (GRPCStatus) -> Void

@usableFromInline
internal let _compressionEnabledOnServer: Bool

// Create a new AsyncResponseStreamWriterDelegate.
//
// - Important: the `send` and `finish` closures must be thread-safe.
@inlinable
internal init(
context: GRPCAsyncServerCallContext,
compressionIsEnabled: Bool,
send: @escaping (Response, MessageMetadata) -> Void,
finish: @escaping (GRPCStatus) -> Void
) {
self._context = context
self._compressionEnabledOnServer = compressionIsEnabled
self._send = send
self._finish = finish
}

@inlinable
internal func _shouldCompress(_ compression: Compression) -> Bool {
guard self._compressionEnabledOnServer else {
return false
}
return compression.isEnabled(callDefault: self._context.compressionEnabled)
}

@inlinable
internal func _send(
_ response: Response,
compression: Compression = .deferToCallDefault
) {
let compress = self._shouldCompress(compression)
self._send(response, .init(compress: compress, flush: true))
}

// MARK: - AsyncWriterDelegate conformance.

@inlinable
internal func write(_ element: (Response, Compression)) {
self._send(element.0, compression: element.1)
}

@inlinable
internal func writeEnd(_ end: GRPCStatus) {
self._finish(end)
}
}

#endif
111 changes: 111 additions & 0 deletions Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerCallContext.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright 2021, gRPC Authors All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#if compiler(>=5.5)

import Logging
import NIOConcurrencyHelpers
import NIOHPACK

// We use a `class` here because we do not want copy-on-write semantics. The instance that the async
// handler holds must not diverge from the instance the implementor of the RPC holds. They hold these
// instances on different threads (EventLoop vs Task).
//
// We considered wrapping this in a `struct` and pass it `inout` to the RPC. This would communicate
// explicitly that it stores mutable state. However, without copy-on-write semantics, this could
// make for a surprising API.
//
// We also considered an `actor` but that felt clunky at the point of use since adopters would need
// to `await` the retrieval of a logger or the updating of the trailers and each would requrie a
// promise to glue the NIO and async-await paradigms in the handler.
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
public final class GRPCAsyncServerCallContext {
private let lock = Lock()

/// Request headers for this request.
public let headers: HPACKHeaders

/// The logger used for this call.
public var logger: Logger {
get { self.lock.withLock {
self._logger
} }
set { self.lock.withLock {
self._logger = newValue
} }
}

@usableFromInline
internal var _logger: Logger

/// Whether compression should be enabled for responses, defaulting to `true`. Note that for
/// this value to take effect compression must have been enabled on the server and a compression
/// algorithm must have been negotiated with the client.
public var compressionEnabled: Bool {
get { self.lock.withLock {
self._compressionEnabled
} }
set { self.lock.withLock {
self._compressionEnabled = newValue
} }
}

private var _compressionEnabled: Bool = true

/// A `UserInfo` dictionary which is shared with the interceptor contexts for this RPC.
///
/// - Important: While `UserInfo` has value-semantics, this property retrieves from, and sets a
/// reference wrapped `UserInfo`. The contexts passed to interceptors provide the same
/// reference. As such this may be used as a mechanism to pass information between interceptors
/// and service providers.
public var userInfo: UserInfo {
get { self.lock.withLock {
self.userInfoRef.value
} }
set { self.lock.withLock {
self.userInfoRef.value = newValue
} }
}

/// A reference to an underlying `UserInfo`. We share this with the interceptors.
@usableFromInline
internal let userInfoRef: Ref<UserInfo>

/// Metadata to return at the end of the RPC. If this is required it should be updated before
/// the `responsePromise` or `statusPromise` is fulfilled.
public var trailers: HPACKHeaders {
get { self.lock.withLock {
return self._trailers
} }
set { self.lock.withLock {
self._trailers = newValue
} }
}

private var _trailers: HPACKHeaders = [:]

@inlinable
internal init(
headers: HPACKHeaders,
logger: Logger,
userInfoRef: Ref<UserInfo>
) {
self.headers = headers
self.userInfoRef = userInfoRef
self._logger = logger
}
}

#endif
Loading