Skip to content

[async-await] Support for sending response headers via context #1262

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ public struct GRPCAsyncBidirectionalStreamingCall<Request, Response> {
// MARK: - Response Parts

/// The initial metadata returned from the server.
///
/// - Important: The initial metadata will only be available when the first response has been
/// received. However, it is not necessary for the response to have been consumed before reading
/// this property.
public var initialMetadata: HPACKHeaders {
// swiftformat:disable:next redundantGet
get async throws {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public struct GRPCAsyncClientStreamingCall<Request, Response> {
// MARK: - Response Parts

/// The initial metadata returned from the server.
///
/// - Important: The initial metadata will only be available when the response has been received.
public var initialMetadata: HPACKHeaders {
// swiftformat:disable:next redundantGet
get async throws {
Expand Down
34 changes: 25 additions & 9 deletions Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerCallContext.swift
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ import NIOHPACK
public final class GRPCAsyncServerCallContext {
private let lock = Lock()

/// Request headers for this request.
public let headers: HPACKHeaders
/// Metadata for this request.
public let requestMetadata: HPACKHeaders

/// The logger used for this call.
public var logger: Logger {
Expand Down Expand Up @@ -83,26 +83,42 @@ public final class GRPCAsyncServerCallContext {
@usableFromInline
internal let userInfoRef: Ref<UserInfo>

/// Metadata to return at the end of the RPC. If this is required it should be updated before
/// the `responsePromise` or `statusPromise` is fulfilled.
public var trailers: HPACKHeaders {
/// Metadata to return at the start of the RPC.
///
/// - Important: If this is required it should be updated _before_ the first response is sent via
/// the response stream writer. Any updates made after the first response will be ignored.
public var initialResponseMetadata: HPACKHeaders {
get { self.lock.withLock {
return self._initialResponseMetadata
} }
set { self.lock.withLock {
self._initialResponseMetadata = newValue
} }
}

private var _initialResponseMetadata: HPACKHeaders = [:]

/// Metadata to return at the end of the RPC.
///
/// If this is required it should be updated before returning from the handler.
public var trailingResponseMetadata: HPACKHeaders {
get { self.lock.withLock {
return self._trailers
return self._trailingResponseMetadata
} }
set { self.lock.withLock {
self._trailers = newValue
self._trailingResponseMetadata = newValue
} }
}

private var _trailers: HPACKHeaders = [:]
private var _trailingResponseMetadata: HPACKHeaders = [:]

@inlinable
internal init(
headers: HPACKHeaders,
logger: Logger,
userInfoRef: Ref<UserInfo>
) {
self.headers = headers
self.requestMetadata = headers
self.userInfoRef = userInfoRef
self._logger = logger
}
Expand Down
123 changes: 76 additions & 47 deletions Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -204,38 +204,56 @@ internal final class AsyncServerHandler<
/// No headers have been received.
case idle

/// Headers have been received, and an async `Task` has been created to execute the user
/// handler.
///
/// The inputs to the user handler are held in the associated data of this enum value:
///
/// - The `PassthroughMessageSource` is the source backing the request stream that is being
/// consumed by the user handler.
///
/// - The `GRPCAsyncServerContext` is a reference to the context that was passed to the user
/// handler.
///
/// - The `GRPCAsyncResponseStreamWriter` is the response stream writer that is being written to
/// by the user handler. Because this is pausable, it may contain responses after the user
/// handler has completed that have yet to be written. However we will remain in the `.active`
/// state until the response stream writer has completed.
///
/// - The `EventLoopPromise` bridges the NIO and async-await worlds. It is the mechanism that we
/// use to run a callback when the user handler has completed. The promise is not passed to the
/// user handler directly. Instead it is fulfilled with the result of the async `Task` executing
/// the user handler using `completeWithTask(_:)`.
///
/// - TODO: It shouldn't really be necessary to stash the `GRPCAsyncResponseStreamWriter` or the
/// `EventLoopPromise` in this enum value. Specifically they are never used anywhere when this
/// enum value is accessed. However, if we do not store them here then the tests periodically
/// segfault. This appears to be an bug in Swift and/or NIO since these should both have been
/// captured by `completeWithTask(_:)`.
case active(
PassthroughMessageSource<Request, Error>,
GRPCAsyncServerCallContext,
GRPCAsyncResponseStreamWriter<Response>,
EventLoopPromise<Void>
)
@usableFromInline
internal struct ActiveState {
/// The source backing the request stream that is being consumed by the user handler.
@usableFromInline
let requestStreamSource: PassthroughMessageSource<Request, Error>

/// The call context that was passed to the user handler.
@usableFromInline
let context: GRPCAsyncServerCallContext

/// The response stream writer that is being used by the user handler.
///
/// Because this is pausable, it may contain responses after the user handler has completed
/// that have yet to be written. However we will remain in the `.active` state until the
/// response stream writer has completed.
@usableFromInline
let responseStreamWriter: GRPCAsyncResponseStreamWriter<Response>

/// The response headers have been sent back to the client via the interceptors.
@usableFromInline
var haveSentResponseHeaders: Bool = false

/// The promise we are using to bridge the NIO and async-await worlds.
///
/// It is the mechanism that we use to run a callback when the user handler has completed.
/// The promise is not passed to the user handler directly. Instead it is fulfilled with the
/// result of the async `Task` executing the user handler using `completeWithTask(_:)`.
///
/// - TODO: It shouldn't really be necessary to stash this promise here. Specifically it is
/// never used anywhere when the `.active` enum value is accessed. However, if we do not store
/// it here then the tests periodically segfault. This appears to be a reference counting bug
/// in Swift and/or NIO since it should have been captured by `completeWithTask(_:)`.
let _userHandlerPromise: EventLoopPromise<Void>

@usableFromInline
internal init(
requestStreamSource: PassthroughMessageSource<Request, Error>,
context: GRPCAsyncServerCallContext,
responseStreamWriter: GRPCAsyncResponseStreamWriter<Response>,
userHandlerPromise: EventLoopPromise<Void>
) {
self.requestStreamSource = requestStreamSource
self.context = context
self.responseStreamWriter = responseStreamWriter
self._userHandlerPromise = userHandlerPromise
}
}

/// Headers have been received and an async `Task` has been created to execute the user handler.
case active(ActiveState)

/// The handler has completed.
case completed
Expand Down Expand Up @@ -363,15 +381,16 @@ internal final class AsyncServerHandler<
)

// Set the state to active and bundle in all the associated data.
self.state = .active(requestStreamSource, context, responseStreamWriter, userHandlerPromise)
self.state = .active(.init(
requestStreamSource: requestStreamSource,
context: context,
responseStreamWriter: responseStreamWriter,
userHandlerPromise: userHandlerPromise
))

// Register callback for the completion of the user handler.
userHandlerPromise.futureResult.whenComplete(self.userHandlerCompleted(_:))

// Send response headers back via the interceptors.
// TODO: In future we may want to defer this until the first response is available from the user handler which will allow the user to set the response headers via the context.
self.interceptors.send(.metadata([:]), promise: nil)

// Spin up a task to call the async user handler.
self.userHandlerTask = userHandlerPromise.completeWithTask {
return try await withTaskCancellationHandler {
Expand Down Expand Up @@ -443,8 +462,8 @@ internal final class AsyncServerHandler<
switch self.state {
case .idle:
self.handleError(GRPCError.ProtocolViolation("Message received before headers"))
case let .active(requestStreamSource, _, _, _):
switch requestStreamSource.yield(request) {
case let .active(activeState):
switch activeState.requestStreamSource.yield(request) {
case .accepted(queueDepth: _):
// TODO: In future we will potentially issue a read request to the channel based on the value of `queueDepth`.
break
Expand All @@ -467,8 +486,8 @@ internal final class AsyncServerHandler<
switch self.state {
case .idle:
self.handleError(GRPCError.ProtocolViolation("End of stream received before headers"))
case let .active(requestStreamSource, _, _, _):
switch requestStreamSource.finish() {
case let .active(activeState):
switch activeState.requestStreamSource.finish() {
case .accepted(queueDepth: _):
break
case .dropped:
Expand All @@ -495,7 +514,14 @@ internal final class AsyncServerHandler<
// The user handler cannot send responses before it has been invoked.
preconditionFailure()

case .active:
case var .active(activeState):
if !activeState.haveSentResponseHeaders {
activeState.haveSentResponseHeaders = true
self.state = .active(activeState)
// Send response headers back via the interceptors.
self.interceptors.send(.metadata(activeState.context.initialResponseMetadata), promise: nil)
}
// Send the response back via the interceptors.
self.interceptors.send(.message(response, metadata), promise: nil)

case .completed:
Expand Down Expand Up @@ -547,10 +573,13 @@ internal final class AsyncServerHandler<
case .idle:
preconditionFailure()

case let .active(_, context, _, _):
case let .active(activeState):
// Now we have drained the response stream writer from the user handler we can send end.
self.state = .completed
self.interceptors.send(.end(status, context.trailers), promise: nil)
self.interceptors.send(
.end(status, activeState.context.trailingResponseMetadata),
promise: nil
)

case .completed:
()
Expand Down Expand Up @@ -580,7 +609,7 @@ internal final class AsyncServerHandler<
)
self.interceptors.send(.end(status, trailers), promise: nil)

case let .active(_, context, _, _):
case let .active(activeState):
self.state = .completed

// If we have an async task, then cancel it, which will terminate the request stream from
Expand All @@ -593,8 +622,8 @@ internal final class AsyncServerHandler<
if isHandlerError {
(status, trailers) = ServerErrorProcessor.processObserverError(
error,
headers: context.headers,
trailers: context.trailers,
headers: activeState.context.requestMetadata,
trailers: activeState.context.trailingResponseMetadata,
delegate: self.context.errorDelegate
)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ public struct GRPCAsyncServerStreamingCall<Request, Response> {
// MARK: - Response Parts

/// The initial metadata returned from the server.
///
/// - Important: The initial metadata will only be available when the first response has been
/// received. However, it is not necessary for the response to have been consumed before reading
/// this property.
public var initialMetadata: HPACKHeaders {
// swiftformat:disable:next redundantGet
get async throws {
Expand Down
2 changes: 2 additions & 0 deletions Sources/GRPC/AsyncAwaitSupport/GRPCAsyncUnaryCall.swift
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ public struct GRPCAsyncUnaryCall<Request, Response> {
// MARK: - Response Parts

/// The initial metadata returned from the server.
///
/// - Important: The initial metadata will only be available when the response has been received.
public var initialMetadata: HPACKHeaders {
// swiftformat:disable:next redundantGet
get async throws {
Expand Down
Loading