From 01b7fa212f335696a7d38d6314199fbb66cdde56 Mon Sep 17 00:00:00 2001 From: Si Beaumont Date: Sun, 12 Sep 2021 19:58:32 +0100 Subject: [PATCH 1/7] Support for setting response headers via context Signed-off-by: Si Beaumont --- .../GRPCAsyncServerCallContext.swift | 32 ++++++--- .../GRPCAsyncServerHandler.swift | 71 ++++++++++++++----- .../GRPCAsyncServerHandlerTests.swift | 34 ++++++++- 3 files changed, 111 insertions(+), 26 deletions(-) diff --git a/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerCallContext.swift b/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerCallContext.swift index 2fe2aa2cc..a3cc6673c 100644 --- a/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerCallContext.swift +++ b/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerCallContext.swift @@ -35,7 +35,7 @@ public final class GRPCAsyncServerCallContext { private let lock = Lock() /// Request headers for this request. - public let headers: HPACKHeaders + public let requestHeaders: HPACKHeaders /// The logger used for this call. public var logger: Logger { @@ -83,18 +83,34 @@ public final class GRPCAsyncServerCallContext { @usableFromInline internal let userInfoRef: Ref - /// Metadata to return at the end of the RPC. If this is required it should be updated before - /// the `responsePromise` or `statusPromise` is fulfilled. - public var trailers: HPACKHeaders { + /// Metadata to return at the start of the RPC. + /// + /// If this is required it should be updated before the first response is sent via the response + /// stream writer. + public var responseHeaders: HPACKHeaders { + get { self.lock.withLock { + return self._responseHeaders + } } + set { self.lock.withLock { + self._responseHeaders = newValue + } } + } + + private var _responseHeaders: HPACKHeaders = [:] + + /// Metadata to return at the end of the RPC. + /// + /// If this is required it should be updated before returning from the handler. + public var responseTrailers: HPACKHeaders { get { self.lock.withLock { - return self._trailers + return self._responseTrailers } } set { self.lock.withLock { - self._trailers = newValue + self._responseTrailers = newValue } } } - private var _trailers: HPACKHeaders = [:] + private var _responseTrailers: HPACKHeaders = [:] @inlinable internal init( @@ -102,7 +118,7 @@ public final class GRPCAsyncServerCallContext { logger: Logger, userInfoRef: Ref ) { - self.headers = headers + self.requestHeaders = headers self.userInfoRef = userInfoRef self._logger = logger } diff --git a/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerHandler.swift b/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerHandler.swift index a23dc64d1..5e29b2fd5 100644 --- a/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerHandler.swift +++ b/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerHandler.swift @@ -230,7 +230,18 @@ internal final class AsyncServerHandler< /// enum value is accessed. However, if we do not store them here then the tests periodically /// segfault. This appears to be an bug in Swift and/or NIO since these should both have been /// captured by `completeWithTask(_:)`. - case active( + case activeAwaitingFirstResponse( + PassthroughMessageSource, + GRPCAsyncServerCallContext, + GRPCAsyncResponseStreamWriter, + EventLoopPromise + ) + + /// We have received the first response from the user handler via the writer and have sent the + /// response headers back to the client via the interceptors. + /// + /// The associated data in this state is the same as in `.activeAwaitingFirstResponse`. + case activeAwaitingSubsequentResponses( PassthroughMessageSource, GRPCAsyncServerCallContext, GRPCAsyncResponseStreamWriter, @@ -309,7 +320,7 @@ internal final class AsyncServerHandler< self.interceptors = nil self.state = .completed - case .active: + case .activeAwaitingFirstResponse, .activeAwaitingSubsequentResponses: self.userHandlerTask?.cancel() case .completed: @@ -363,15 +374,16 @@ internal final class AsyncServerHandler< ) // Set the state to active and bundle in all the associated data. - self.state = .active(requestStreamSource, context, responseStreamWriter, userHandlerPromise) + self.state = .activeAwaitingFirstResponse( + requestStreamSource, + context, + responseStreamWriter, + userHandlerPromise + ) // Register callback for the completion of the user handler. userHandlerPromise.futureResult.whenComplete(self.userHandlerCompleted(_:)) - // Send response headers back via the interceptors. - // TODO: In future we may want to defer this until the first response is available from the user handler which will allow the user to set the response headers via the context. - self.interceptors.send(.metadata([:]), promise: nil) - // Spin up a task to call the async user handler. self.userHandlerTask = userHandlerPromise.completeWithTask { return try await withTaskCancellationHandler { @@ -427,7 +439,7 @@ internal final class AsyncServerHandler< } } - case .active: + case .activeAwaitingFirstResponse, .activeAwaitingSubsequentResponses: self.handleError(GRPCError.ProtocolViolation("Multiple header blocks received on RPC")) case .completed: @@ -443,7 +455,9 @@ internal final class AsyncServerHandler< switch self.state { case .idle: self.handleError(GRPCError.ProtocolViolation("Message received before headers")) - case let .active(requestStreamSource, _, _, _): + case + let .activeAwaitingFirstResponse(requestStreamSource, _, _, _), + let .activeAwaitingSubsequentResponses(requestStreamSource, _, _, _): switch requestStreamSource.yield(request) { case .accepted(queueDepth: _): // TODO: In future we will potentially issue a read request to the channel based on the value of `queueDepth`. @@ -467,7 +481,9 @@ internal final class AsyncServerHandler< switch self.state { case .idle: self.handleError(GRPCError.ProtocolViolation("End of stream received before headers")) - case let .active(requestStreamSource, _, _, _): + case + let .activeAwaitingFirstResponse(requestStreamSource, _, _, _), + let .activeAwaitingSubsequentResponses(requestStreamSource, _, _, _): switch requestStreamSource.finish() { case .accepted(queueDepth: _): break @@ -495,7 +511,24 @@ internal final class AsyncServerHandler< // The user handler cannot send responses before it has been invoked. preconditionFailure() - case .active: + case let .activeAwaitingFirstResponse( + requestStreamSource, + context, + responseStreamWriter, + promise + ): + self.state = .activeAwaitingSubsequentResponses( + requestStreamSource, + context, + responseStreamWriter, + promise + ) + // Send response headers back via the interceptors. + self.interceptors.send(.metadata(context.responseHeaders), promise: nil) + fallthrough + + case .activeAwaitingSubsequentResponses: + // Send the response back via the interceptors. self.interceptors.send(.message(response, metadata), promise: nil) case .completed: @@ -522,7 +555,7 @@ internal final class AsyncServerHandler< // The user handler cannot complete before it is invoked. preconditionFailure() - case .active: + case .activeAwaitingFirstResponse, .activeAwaitingSubsequentResponses: switch result { case .success: /// The user handler has completed successfully. @@ -547,10 +580,12 @@ internal final class AsyncServerHandler< case .idle: preconditionFailure() - case let .active(_, context, _, _): + case + let .activeAwaitingFirstResponse(_, context, _, _), + let .activeAwaitingSubsequentResponses(_, context, _, _): // Now we have drained the response stream writer from the user handler we can send end. self.state = .completed - self.interceptors.send(.end(status, context.trailers), promise: nil) + self.interceptors.send(.end(status, context.responseTrailers), promise: nil) case .completed: () @@ -580,7 +615,9 @@ internal final class AsyncServerHandler< ) self.interceptors.send(.end(status, trailers), promise: nil) - case let .active(_, context, _, _): + case + let .activeAwaitingFirstResponse(_, context, _, _), + let .activeAwaitingSubsequentResponses(_, context, _, _): self.state = .completed // If we have an async task, then cancel it, which will terminate the request stream from @@ -593,8 +630,8 @@ internal final class AsyncServerHandler< if isHandlerError { (status, trailers) = ServerErrorProcessor.processObserverError( error, - headers: context.headers, - trailers: context.trailers, + headers: context.requestHeaders, + trailers: context.responseTrailers, delegate: self.context.errorDelegate ) } else { diff --git a/Tests/GRPCTests/GRPCAsyncServerHandlerTests.swift b/Tests/GRPCTests/GRPCAsyncServerHandlerTests.swift index 9eab2dc6c..5eae05dad 100644 --- a/Tests/GRPCTests/GRPCAsyncServerHandlerTests.swift +++ b/Tests/GRPCTests/GRPCAsyncServerHandlerTests.swift @@ -145,6 +145,38 @@ class AsyncServerHandlerTests: ServerHandlerTestCaseBase { await assertThat(self.recorder.messageMetadata.map { $0.compress }, .is([false, false, false])) } } + func testResponseHeadersAndTrailersSentFromContext() { XCTAsyncTest { + let handler = self.makeHandler { _, responseStreamWriter, context in + context.responseHeaders = ["pontiac": "bandit"] + try await responseStreamWriter.send("1") + context.responseTrailers = ["disco": "strangler"] + } + handler.receiveMetadata([:]) + handler.receiveEnd() + + // Wait for tasks to finish. + await handler.userHandlerTask?.value + + await assertThat(self.recorder.metadata, .is(["pontiac": "bandit"])) + await assertThat(self.recorder.trailers, .is(["disco": "strangler"])) + } } + + func testResponseHeadersDroppedIfSetAfterFirstResponse() { XCTAsyncTest { + let handler = self.makeHandler { _, responseStreamWriter, context in + try await responseStreamWriter.send("1") + context.responseHeaders = ["pontiac": "bandit"] + context.responseTrailers = ["disco": "strangler"] + } + handler.receiveMetadata([:]) + handler.receiveEnd() + + // Wait for tasks to finish. + await handler.userHandlerTask?.value + + await assertThat(self.recorder.metadata, .is([:])) + await assertThat(self.recorder.trailers, .is(["disco": "strangler"])) + } } + func testTaskOnlyCreatedAfterHeaders() { XCTAsyncTest { let handler = self.makeHandler(observer: self.echo(requests:responseStreamWriter:context:)) @@ -317,7 +349,7 @@ class AsyncServerHandlerTests: ServerHandlerTestCaseBase { // Send two requests and end, pausing the writer in the middle. switch handler.state { - case let .active(_, _, responseStreamWriter, promise): + case let .activeAwaitingFirstResponse(_, _, responseStreamWriter, promise): handler.receiveMessage(ByteBuffer(string: "diaz")) await responseStreamWriter.asyncWriter.toggleWritability() handler.receiveMessage(ByteBuffer(string: "santiago")) From f9f6d15af71d608f87f2c4956452ec607c882038 Mon Sep 17 00:00:00 2001 From: Si Beaumont Date: Mon, 13 Sep 2021 09:07:20 +0100 Subject: [PATCH 2/7] fixup: Update tests for new response header semantics Signed-off-by: Si Beaumont --- .../GRPCAsyncServerHandlerTests.swift | 46 ++++++------------- 1 file changed, 14 insertions(+), 32 deletions(-) diff --git a/Tests/GRPCTests/GRPCAsyncServerHandlerTests.swift b/Tests/GRPCTests/GRPCAsyncServerHandlerTests.swift index 5eae05dad..d9b69349c 100644 --- a/Tests/GRPCTests/GRPCAsyncServerHandlerTests.swift +++ b/Tests/GRPCTests/GRPCAsyncServerHandlerTests.swift @@ -74,8 +74,6 @@ class AsyncServerHandlerTests: ServerHandlerTestCaseBase { ) handler.receiveMetadata([:]) - await assertThat(self.recorder.metadata, .is([:])) - handler.receiveMessage(ByteBuffer(string: "1")) handler.receiveMessage(ByteBuffer(string: "2")) handler.receiveMessage(ByteBuffer(string: "3")) @@ -86,6 +84,7 @@ class AsyncServerHandlerTests: ServerHandlerTestCaseBase { handler.finish() + await assertThat(self.recorder.metadata, .is([:])) await assertThat( self.recorder.messages, .is([ByteBuffer(string: "1"), ByteBuffer(string: "2"), ByteBuffer(string: "3")]) @@ -180,11 +179,11 @@ class AsyncServerHandlerTests: ServerHandlerTestCaseBase { func testTaskOnlyCreatedAfterHeaders() { XCTAsyncTest { let handler = self.makeHandler(observer: self.echo(requests:responseStreamWriter:context:)) - await assertThat(handler.userHandlerTask, .is(.nil())) + await assertThat(handler.userHandlerTask, .nil()) handler.receiveMetadata([:]) - await assertThat(handler.userHandlerTask, .is(.notNil())) + await assertThat(handler.userHandlerTask, .notNil()) } } func testThrowingDeserializer() { XCTAsyncTest { @@ -197,18 +196,12 @@ class AsyncServerHandlerTests: ServerHandlerTestCaseBase { ) handler.receiveMetadata([:]) - - // Wait for the async user function to have processed the metadata. - try self.recorder.recordedMetadataPromise.futureResult.wait() - - await assertThat(self.recorder.metadata, .is([:])) - - let buffer = ByteBuffer(string: "hello") - handler.receiveMessage(buffer) + handler.receiveMessage(ByteBuffer(string: "hello")) // Wait for tasks to finish. await handler.userHandlerTask?.value + await assertThat(self.recorder.metadata, .nil()) await assertThat(self.recorder.messages, .isEmpty()) await assertThat(self.recorder.status, .notNil(.hasCode(.internalError))) } } @@ -223,15 +216,13 @@ class AsyncServerHandlerTests: ServerHandlerTestCaseBase { ) handler.receiveMetadata([:]) - await assertThat(self.recorder.metadata, .is([:])) - - let buffer = ByteBuffer(string: "hello") - handler.receiveMessage(buffer) + handler.receiveMessage(ByteBuffer(string: "hello")) handler.receiveEnd() // Wait for tasks to finish. await handler.userHandlerTask?.value + await assertThat(self.recorder.metadata, .is([:])) await assertThat(self.recorder.messages, .isEmpty()) await assertThat(self.recorder.status, .notNil(.hasCode(.internalError))) } } @@ -245,7 +236,7 @@ class AsyncServerHandlerTests: ServerHandlerTestCaseBase { // Wait for tasks to finish. await handler.userHandlerTask?.value - await assertThat(self.recorder.metadata, .is(.nil())) + await assertThat(self.recorder.metadata, .nil()) await assertThat(self.recorder.messages, .isEmpty()) await assertThat(self.recorder.status, .notNil(.hasCode(.internalError))) } } @@ -256,17 +247,12 @@ class AsyncServerHandlerTests: ServerHandlerTestCaseBase { .makeHandler(observer: self.neverReceivesMessage(requests:responseStreamWriter:context:)) handler.receiveMetadata([:]) - - // Wait for the async user function to have processed the metadata. - try self.recorder.recordedMetadataPromise.futureResult.wait() - - await assertThat(self.recorder.metadata, .is([:])) - handler.receiveMetadata([:]) // Wait for tasks to finish. await handler.userHandlerTask?.value + await assertThat(self.recorder.metadata, .nil()) await assertThat(self.recorder.messages, .isEmpty()) await assertThat(self.recorder.status, .notNil(.hasCode(.internalError))) } } @@ -276,26 +262,22 @@ class AsyncServerHandlerTests: ServerHandlerTestCaseBase { .makeHandler(observer: self.neverCalled(requests:responseStreamWriter:context:)) handler.finish() - await assertThat(self.recorder.metadata, .is(.nil())) + await assertThat(self.recorder.metadata, .nil()) await assertThat(self.recorder.messages, .isEmpty()) - await assertThat(self.recorder.status, .is(.nil())) - await assertThat(self.recorder.trailers, .is(.nil())) + await assertThat(self.recorder.status, .nil()) + await assertThat(self.recorder.trailers, .nil()) } } func testFinishAfterHeaders() { XCTAsyncTest { let handler = self.makeHandler(observer: self.echo(requests:responseStreamWriter:context:)) - handler.receiveMetadata([:]) - - // Wait for the async user function to have processed the metadata. - try self.recorder.recordedMetadataPromise.futureResult.wait() - - await assertThat(self.recorder.metadata, .is([:])) + handler.receiveMetadata([:]) handler.finish() // Wait for tasks to finish. await handler.userHandlerTask?.value + await assertThat(self.recorder.metadata, .nil()) await assertThat(self.recorder.messages, .isEmpty()) await assertThat(self.recorder.status, .notNil(.hasCode(.unavailable))) await assertThat(self.recorder.trailers, .is([:])) From 976cc7880839b6f36af4a60f2c566ef0a33ea163 Mon Sep 17 00:00:00 2001 From: Si Beaumont Date: Mon, 13 Sep 2021 09:07:32 +0100 Subject: [PATCH 3/7] fixup: Remove stale comments in tests Signed-off-by: Si Beaumont --- Tests/GRPCTests/GRPCAsyncServerHandlerTests.swift | 3 --- 1 file changed, 3 deletions(-) diff --git a/Tests/GRPCTests/GRPCAsyncServerHandlerTests.swift b/Tests/GRPCTests/GRPCAsyncServerHandlerTests.swift index d9b69349c..ac742d355 100644 --- a/Tests/GRPCTests/GRPCAsyncServerHandlerTests.swift +++ b/Tests/GRPCTests/GRPCAsyncServerHandlerTests.swift @@ -241,7 +241,6 @@ class AsyncServerHandlerTests: ServerHandlerTestCaseBase { await assertThat(self.recorder.status, .notNil(.hasCode(.internalError))) } } - // TODO: Running this 1000 times shows up a segfault in NIO event loop group. func testReceiveMultipleHeaders() { XCTAsyncTest { let handler = self .makeHandler(observer: self.neverReceivesMessage(requests:responseStreamWriter:context:)) @@ -318,8 +317,6 @@ class AsyncServerHandlerTests: ServerHandlerTestCaseBase { await assertThat(self.recorder.status, .notNil(.hasCode(.unknown))) } } - // TODO: We should be consistent about where we put the tasks... maybe even use a task group to simplify cancellation (unless they both go in the enum state which might be better). - func testResponseStreamDrain() { XCTAsyncTest { // Set up echo handler. let handler = self.makeHandler( From d35f39c6e772ac4981896fe72800ac598e41e240 Mon Sep 17 00:00:00 2001 From: Si Beaumont Date: Mon, 13 Sep 2021 10:44:36 +0100 Subject: [PATCH 4/7] fixup: Factor out ActiveState from State.active associated data Signed-off-by: Si Beaumont --- .../GRPCAsyncServerHandler.swift | 148 +++++++++--------- 1 file changed, 76 insertions(+), 72 deletions(-) diff --git a/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerHandler.swift b/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerHandler.swift index 5e29b2fd5..31c8edefe 100644 --- a/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerHandler.swift +++ b/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerHandler.swift @@ -204,49 +204,58 @@ internal final class AsyncServerHandler< /// No headers have been received. case idle - /// Headers have been received, and an async `Task` has been created to execute the user - /// handler. - /// - /// The inputs to the user handler are held in the associated data of this enum value: - /// - /// - The `PassthroughMessageSource` is the source backing the request stream that is being - /// consumed by the user handler. - /// - /// - The `GRPCAsyncServerContext` is a reference to the context that was passed to the user - /// handler. - /// - /// - The `GRPCAsyncResponseStreamWriter` is the response stream writer that is being written to - /// by the user handler. Because this is pausable, it may contain responses after the user - /// handler has completed that have yet to be written. However we will remain in the `.active` - /// state until the response stream writer has completed. - /// - /// - The `EventLoopPromise` bridges the NIO and async-await worlds. It is the mechanism that we - /// use to run a callback when the user handler has completed. The promise is not passed to the - /// user handler directly. Instead it is fulfilled with the result of the async `Task` executing - /// the user handler using `completeWithTask(_:)`. - /// - /// - TODO: It shouldn't really be necessary to stash the `GRPCAsyncResponseStreamWriter` or the - /// `EventLoopPromise` in this enum value. Specifically they are never used anywhere when this - /// enum value is accessed. However, if we do not store them here then the tests periodically - /// segfault. This appears to be an bug in Swift and/or NIO since these should both have been - /// captured by `completeWithTask(_:)`. - case activeAwaitingFirstResponse( - PassthroughMessageSource, - GRPCAsyncServerCallContext, - GRPCAsyncResponseStreamWriter, - EventLoopPromise - ) + @usableFromInline + internal struct ActiveState { + /// The source backing the request stream that is being consumed by the user handler. + @usableFromInline + let requestStreamSource: PassthroughMessageSource + + /// The call context that was passed to the user handler. + @usableFromInline + let context: GRPCAsyncServerCallContext + + /// The response stream writer that is being used by the user handler. + /// + /// Because this is pausable, it may contain responses after the user handler has completed + /// that have yet to be written. However we will remain in the `.active` state until the + /// response stream writer has completed. + @usableFromInline + let responseStreamWriter: GRPCAsyncResponseStreamWriter + + /// The promise we are using to bridge the NIO and async-await worlds. + /// + /// It is the mechanism that we use to run a callback when the user handler has completed. + /// The promise is not passed to the user handler directly. Instead it is fulfilled with the + /// result of the async `Task` executing the user handler using `completeWithTask(_:)`. + /// + /// - TODO: It shouldn't really be necessary to stash this promise here. Specifically it is + /// never used anywhere when the `.active` enum value is accessed. However, if we do not store + /// it here then the tests periodically segfault. This appears to be a reference counting bug + /// in Swift and/or NIO since it should have been captured by `completeWithTask(_:)`. + /// TODO: remove usableFromInline + @usableFromInline + let _userHandlerPromise: EventLoopPromise + + @usableFromInline + internal init( + requestStreamSource: PassthroughMessageSource, + context: GRPCAsyncServerCallContext, + responseStreamWriter: GRPCAsyncResponseStreamWriter, + userHandlerPromise: EventLoopPromise + ) { + self.requestStreamSource = requestStreamSource + self.context = context + self.responseStreamWriter = responseStreamWriter + self._userHandlerPromise = userHandlerPromise + } + } + + /// Headers have been received and an async `Task` has been created to execute the user handler. + case activeAwaitingFirstResponse(ActiveState) /// We have received the first response from the user handler via the writer and have sent the /// response headers back to the client via the interceptors. - /// - /// The associated data in this state is the same as in `.activeAwaitingFirstResponse`. - case activeAwaitingSubsequentResponses( - PassthroughMessageSource, - GRPCAsyncServerCallContext, - GRPCAsyncResponseStreamWriter, - EventLoopPromise - ) + case activeAwaitingSubsequentResponses(ActiveState) /// The handler has completed. case completed @@ -374,12 +383,12 @@ internal final class AsyncServerHandler< ) // Set the state to active and bundle in all the associated data. - self.state = .activeAwaitingFirstResponse( - requestStreamSource, - context, - responseStreamWriter, - userHandlerPromise - ) + self.state = .activeAwaitingFirstResponse(.init( + requestStreamSource: requestStreamSource, + context: context, + responseStreamWriter: responseStreamWriter, + userHandlerPromise: userHandlerPromise + )) // Register callback for the completion of the user handler. userHandlerPromise.futureResult.whenComplete(self.userHandlerCompleted(_:)) @@ -456,9 +465,9 @@ internal final class AsyncServerHandler< case .idle: self.handleError(GRPCError.ProtocolViolation("Message received before headers")) case - let .activeAwaitingFirstResponse(requestStreamSource, _, _, _), - let .activeAwaitingSubsequentResponses(requestStreamSource, _, _, _): - switch requestStreamSource.yield(request) { + let .activeAwaitingFirstResponse(state), + let .activeAwaitingSubsequentResponses(state): + switch state.requestStreamSource.yield(request) { case .accepted(queueDepth: _): // TODO: In future we will potentially issue a read request to the channel based on the value of `queueDepth`. break @@ -482,9 +491,9 @@ internal final class AsyncServerHandler< case .idle: self.handleError(GRPCError.ProtocolViolation("End of stream received before headers")) case - let .activeAwaitingFirstResponse(requestStreamSource, _, _, _), - let .activeAwaitingSubsequentResponses(requestStreamSource, _, _, _): - switch requestStreamSource.finish() { + let .activeAwaitingFirstResponse(state), + let .activeAwaitingSubsequentResponses(state): + switch state.requestStreamSource.finish() { case .accepted(queueDepth: _): break case .dropped: @@ -511,20 +520,15 @@ internal final class AsyncServerHandler< // The user handler cannot send responses before it has been invoked. preconditionFailure() - case let .activeAwaitingFirstResponse( - requestStreamSource, - context, - responseStreamWriter, - promise - ): - self.state = .activeAwaitingSubsequentResponses( - requestStreamSource, - context, - responseStreamWriter, - promise - ) + case let .activeAwaitingFirstResponse(state): + self.state = .activeAwaitingSubsequentResponses(.init( + requestStreamSource: state.requestStreamSource, + context: state.context, + responseStreamWriter: state.responseStreamWriter, + userHandlerPromise: state._userHandlerPromise + )) // Send response headers back via the interceptors. - self.interceptors.send(.metadata(context.responseHeaders), promise: nil) + self.interceptors.send(.metadata(state.context.responseHeaders), promise: nil) fallthrough case .activeAwaitingSubsequentResponses: @@ -581,11 +585,11 @@ internal final class AsyncServerHandler< preconditionFailure() case - let .activeAwaitingFirstResponse(_, context, _, _), - let .activeAwaitingSubsequentResponses(_, context, _, _): + let .activeAwaitingFirstResponse(state), + let .activeAwaitingSubsequentResponses(state): // Now we have drained the response stream writer from the user handler we can send end. self.state = .completed - self.interceptors.send(.end(status, context.responseTrailers), promise: nil) + self.interceptors.send(.end(status, state.context.responseTrailers), promise: nil) case .completed: () @@ -616,8 +620,8 @@ internal final class AsyncServerHandler< self.interceptors.send(.end(status, trailers), promise: nil) case - let .activeAwaitingFirstResponse(_, context, _, _), - let .activeAwaitingSubsequentResponses(_, context, _, _): + let .activeAwaitingFirstResponse(state), + let .activeAwaitingSubsequentResponses(state): self.state = .completed // If we have an async task, then cancel it, which will terminate the request stream from @@ -630,8 +634,8 @@ internal final class AsyncServerHandler< if isHandlerError { (status, trailers) = ServerErrorProcessor.processObserverError( error, - headers: context.requestHeaders, - trailers: context.responseTrailers, + headers: state.context.requestHeaders, + trailers: state.context.responseTrailers, delegate: self.context.errorDelegate ) } else { From 8275bb359945d068cfd1e019bb8e8913dde969e1 Mon Sep 17 00:00:00 2001 From: Si Beaumont Date: Mon, 13 Sep 2021 12:04:59 +0100 Subject: [PATCH 5/7] fixup: Merge active states and add boolean flag for sent headers Signed-off-by: Si Beaumont --- .../GRPCAsyncServerHandler.swift | 65 +++++++------------ .../GRPCAsyncServerHandlerTests.swift | 8 +-- 2 files changed, 29 insertions(+), 44 deletions(-) diff --git a/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerHandler.swift b/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerHandler.swift index 31c8edefe..8e464832f 100644 --- a/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerHandler.swift +++ b/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerHandler.swift @@ -222,6 +222,10 @@ internal final class AsyncServerHandler< @usableFromInline let responseStreamWriter: GRPCAsyncResponseStreamWriter + /// The response headers have been sent back to the client via the interceptors. + @usableFromInline + var haveSentResponseHeaders: Bool = false + /// The promise we are using to bridge the NIO and async-await worlds. /// /// It is the mechanism that we use to run a callback when the user handler has completed. @@ -232,8 +236,6 @@ internal final class AsyncServerHandler< /// never used anywhere when the `.active` enum value is accessed. However, if we do not store /// it here then the tests periodically segfault. This appears to be a reference counting bug /// in Swift and/or NIO since it should have been captured by `completeWithTask(_:)`. - /// TODO: remove usableFromInline - @usableFromInline let _userHandlerPromise: EventLoopPromise @usableFromInline @@ -251,11 +253,7 @@ internal final class AsyncServerHandler< } /// Headers have been received and an async `Task` has been created to execute the user handler. - case activeAwaitingFirstResponse(ActiveState) - - /// We have received the first response from the user handler via the writer and have sent the - /// response headers back to the client via the interceptors. - case activeAwaitingSubsequentResponses(ActiveState) + case active(ActiveState) /// The handler has completed. case completed @@ -329,7 +327,7 @@ internal final class AsyncServerHandler< self.interceptors = nil self.state = .completed - case .activeAwaitingFirstResponse, .activeAwaitingSubsequentResponses: + case .active: self.userHandlerTask?.cancel() case .completed: @@ -383,7 +381,7 @@ internal final class AsyncServerHandler< ) // Set the state to active and bundle in all the associated data. - self.state = .activeAwaitingFirstResponse(.init( + self.state = .active(.init( requestStreamSource: requestStreamSource, context: context, responseStreamWriter: responseStreamWriter, @@ -448,7 +446,7 @@ internal final class AsyncServerHandler< } } - case .activeAwaitingFirstResponse, .activeAwaitingSubsequentResponses: + case .active: self.handleError(GRPCError.ProtocolViolation("Multiple header blocks received on RPC")) case .completed: @@ -464,10 +462,8 @@ internal final class AsyncServerHandler< switch self.state { case .idle: self.handleError(GRPCError.ProtocolViolation("Message received before headers")) - case - let .activeAwaitingFirstResponse(state), - let .activeAwaitingSubsequentResponses(state): - switch state.requestStreamSource.yield(request) { + case let .active(activeState): + switch activeState.requestStreamSource.yield(request) { case .accepted(queueDepth: _): // TODO: In future we will potentially issue a read request to the channel based on the value of `queueDepth`. break @@ -490,10 +486,8 @@ internal final class AsyncServerHandler< switch self.state { case .idle: self.handleError(GRPCError.ProtocolViolation("End of stream received before headers")) - case - let .activeAwaitingFirstResponse(state), - let .activeAwaitingSubsequentResponses(state): - switch state.requestStreamSource.finish() { + case let .active(activeState): + switch activeState.requestStreamSource.finish() { case .accepted(queueDepth: _): break case .dropped: @@ -520,18 +514,13 @@ internal final class AsyncServerHandler< // The user handler cannot send responses before it has been invoked. preconditionFailure() - case let .activeAwaitingFirstResponse(state): - self.state = .activeAwaitingSubsequentResponses(.init( - requestStreamSource: state.requestStreamSource, - context: state.context, - responseStreamWriter: state.responseStreamWriter, - userHandlerPromise: state._userHandlerPromise - )) - // Send response headers back via the interceptors. - self.interceptors.send(.metadata(state.context.responseHeaders), promise: nil) - fallthrough - - case .activeAwaitingSubsequentResponses: + case var .active(activeState): + if !activeState.haveSentResponseHeaders { + activeState.haveSentResponseHeaders = true + self.state = .active(activeState) + // Send response headers back via the interceptors. + self.interceptors.send(.metadata(activeState.context.responseHeaders), promise: nil) + } // Send the response back via the interceptors. self.interceptors.send(.message(response, metadata), promise: nil) @@ -559,7 +548,7 @@ internal final class AsyncServerHandler< // The user handler cannot complete before it is invoked. preconditionFailure() - case .activeAwaitingFirstResponse, .activeAwaitingSubsequentResponses: + case .active: switch result { case .success: /// The user handler has completed successfully. @@ -584,12 +573,10 @@ internal final class AsyncServerHandler< case .idle: preconditionFailure() - case - let .activeAwaitingFirstResponse(state), - let .activeAwaitingSubsequentResponses(state): + case let .active(activeState): // Now we have drained the response stream writer from the user handler we can send end. self.state = .completed - self.interceptors.send(.end(status, state.context.responseTrailers), promise: nil) + self.interceptors.send(.end(status, activeState.context.responseTrailers), promise: nil) case .completed: () @@ -619,9 +606,7 @@ internal final class AsyncServerHandler< ) self.interceptors.send(.end(status, trailers), promise: nil) - case - let .activeAwaitingFirstResponse(state), - let .activeAwaitingSubsequentResponses(state): + case let .active(activeState): self.state = .completed // If we have an async task, then cancel it, which will terminate the request stream from @@ -634,8 +619,8 @@ internal final class AsyncServerHandler< if isHandlerError { (status, trailers) = ServerErrorProcessor.processObserverError( error, - headers: state.context.requestHeaders, - trailers: state.context.responseTrailers, + headers: activeState.context.requestHeaders, + trailers: activeState.context.responseTrailers, delegate: self.context.errorDelegate ) } else { diff --git a/Tests/GRPCTests/GRPCAsyncServerHandlerTests.swift b/Tests/GRPCTests/GRPCAsyncServerHandlerTests.swift index ac742d355..516006972 100644 --- a/Tests/GRPCTests/GRPCAsyncServerHandlerTests.swift +++ b/Tests/GRPCTests/GRPCAsyncServerHandlerTests.swift @@ -328,14 +328,14 @@ class AsyncServerHandlerTests: ServerHandlerTestCaseBase { // Send two requests and end, pausing the writer in the middle. switch handler.state { - case let .activeAwaitingFirstResponse(_, _, responseStreamWriter, promise): + case let .active(activeState): handler.receiveMessage(ByteBuffer(string: "diaz")) - await responseStreamWriter.asyncWriter.toggleWritability() + await activeState.responseStreamWriter.asyncWriter.toggleWritability() handler.receiveMessage(ByteBuffer(string: "santiago")) handler.receiveEnd() - await responseStreamWriter.asyncWriter.toggleWritability() + await activeState.responseStreamWriter.asyncWriter.toggleWritability() await handler.userHandlerTask?.value - _ = try await promise.futureResult.get() + _ = try await activeState._userHandlerPromise.futureResult.get() default: XCTFail("Unexpected handler state: \(handler.state)") } From c5cfcb1bddd7050aae7ce6dbe1f534a08ed2312b Mon Sep 17 00:00:00 2001 From: Si Beaumont Date: Mon, 13 Sep 2021 14:38:31 +0100 Subject: [PATCH 6/7] fixup: Rename and docs to context and clients about metadata semantics Signed-off-by: Si Beaumont --- .../GRPCAsyncBidirectionalStreamingCall.swift | 4 +++ .../GRPCAsyncClientStreamingCall.swift | 2 ++ .../GRPCAsyncServerCallContext.swift | 26 +++++++++---------- .../GRPCAsyncServerHandler.swift | 8 +++--- .../GRPCAsyncServerStreamingCall.swift | 4 +++ .../GRPCAsyncUnaryCall.swift | 2 ++ .../GRPCAsyncServerHandlerTests.swift | 8 +++--- 7 files changed, 33 insertions(+), 21 deletions(-) diff --git a/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncBidirectionalStreamingCall.swift b/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncBidirectionalStreamingCall.swift index 1092bbd9c..3450cfcc8 100644 --- a/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncBidirectionalStreamingCall.swift +++ b/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncBidirectionalStreamingCall.swift @@ -40,6 +40,10 @@ public struct GRPCAsyncBidirectionalStreamingCall { // MARK: - Response Parts /// The initial metadata returned from the server. + /// + /// - Important: The initial metadata will only be available when the first response has been + /// received. However, it is not necessary for the response to have been consumed before reading + /// this property. public var initialMetadata: HPACKHeaders { // swiftformat:disable:next redundantGet get async throws { diff --git a/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncClientStreamingCall.swift b/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncClientStreamingCall.swift index c755cf557..b6b33b74e 100644 --- a/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncClientStreamingCall.swift +++ b/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncClientStreamingCall.swift @@ -36,6 +36,8 @@ public struct GRPCAsyncClientStreamingCall { // MARK: - Response Parts /// The initial metadata returned from the server. + /// + /// - Important: The initial metadata will only be available when the response has been received. public var initialMetadata: HPACKHeaders { // swiftformat:disable:next redundantGet get async throws { diff --git a/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerCallContext.swift b/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerCallContext.swift index a3cc6673c..048b3eb16 100644 --- a/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerCallContext.swift +++ b/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerCallContext.swift @@ -34,8 +34,8 @@ import NIOHPACK public final class GRPCAsyncServerCallContext { private let lock = Lock() - /// Request headers for this request. - public let requestHeaders: HPACKHeaders + /// Metadata for this request. + public let requestMetadata: HPACKHeaders /// The logger used for this call. public var logger: Logger { @@ -85,32 +85,32 @@ public final class GRPCAsyncServerCallContext { /// Metadata to return at the start of the RPC. /// - /// If this is required it should be updated before the first response is sent via the response - /// stream writer. - public var responseHeaders: HPACKHeaders { + /// - Important: If this is required it should be updated _before_ the first response is sent via + /// the response stream writer. Any updates made after the first response will be ignored. + public var initialResponseMetadata: HPACKHeaders { get { self.lock.withLock { - return self._responseHeaders + return self._initialResponseMetadata } } set { self.lock.withLock { - self._responseHeaders = newValue + self._initialResponseMetadata = newValue } } } - private var _responseHeaders: HPACKHeaders = [:] + private var _initialResponseMetadata: HPACKHeaders = [:] /// Metadata to return at the end of the RPC. /// /// If this is required it should be updated before returning from the handler. - public var responseTrailers: HPACKHeaders { + public var trailingResponseMetadata: HPACKHeaders { get { self.lock.withLock { - return self._responseTrailers + return self._trailingResponseMetadata } } set { self.lock.withLock { - self._responseTrailers = newValue + self._trailingResponseMetadata = newValue } } } - private var _responseTrailers: HPACKHeaders = [:] + private var _trailingResponseMetadata: HPACKHeaders = [:] @inlinable internal init( @@ -118,7 +118,7 @@ public final class GRPCAsyncServerCallContext { logger: Logger, userInfoRef: Ref ) { - self.requestHeaders = headers + self.requestMetadata = headers self.userInfoRef = userInfoRef self._logger = logger } diff --git a/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerHandler.swift b/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerHandler.swift index 8e464832f..b8d23c779 100644 --- a/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerHandler.swift +++ b/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerHandler.swift @@ -519,7 +519,7 @@ internal final class AsyncServerHandler< activeState.haveSentResponseHeaders = true self.state = .active(activeState) // Send response headers back via the interceptors. - self.interceptors.send(.metadata(activeState.context.responseHeaders), promise: nil) + self.interceptors.send(.metadata(activeState.context.initialResponseMetadata), promise: nil) } // Send the response back via the interceptors. self.interceptors.send(.message(response, metadata), promise: nil) @@ -576,7 +576,7 @@ internal final class AsyncServerHandler< case let .active(activeState): // Now we have drained the response stream writer from the user handler we can send end. self.state = .completed - self.interceptors.send(.end(status, activeState.context.responseTrailers), promise: nil) + self.interceptors.send(.end(status, activeState.context.trailingResponseMetadata), promise: nil) case .completed: () @@ -619,8 +619,8 @@ internal final class AsyncServerHandler< if isHandlerError { (status, trailers) = ServerErrorProcessor.processObserverError( error, - headers: activeState.context.requestHeaders, - trailers: activeState.context.responseTrailers, + headers: activeState.context.requestMetadata, + trailers: activeState.context.trailingResponseMetadata, delegate: self.context.errorDelegate ) } else { diff --git a/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerStreamingCall.swift b/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerStreamingCall.swift index 93e8a2a9e..671410689 100644 --- a/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerStreamingCall.swift +++ b/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerStreamingCall.swift @@ -39,6 +39,10 @@ public struct GRPCAsyncServerStreamingCall { // MARK: - Response Parts /// The initial metadata returned from the server. + /// + /// - Important: The initial metadata will only be available when the first response has been + /// received. However, it is not necessary for the response to have been consumed before reading + /// this property. public var initialMetadata: HPACKHeaders { // swiftformat:disable:next redundantGet get async throws { diff --git a/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncUnaryCall.swift b/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncUnaryCall.swift index 92141f59c..71efaabf1 100644 --- a/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncUnaryCall.swift +++ b/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncUnaryCall.swift @@ -39,6 +39,8 @@ public struct GRPCAsyncUnaryCall { // MARK: - Response Parts /// The initial metadata returned from the server. + /// + /// - Important: The initial metadata will only be available when the response has been received. public var initialMetadata: HPACKHeaders { // swiftformat:disable:next redundantGet get async throws { diff --git a/Tests/GRPCTests/GRPCAsyncServerHandlerTests.swift b/Tests/GRPCTests/GRPCAsyncServerHandlerTests.swift index 516006972..52ce2bc4d 100644 --- a/Tests/GRPCTests/GRPCAsyncServerHandlerTests.swift +++ b/Tests/GRPCTests/GRPCAsyncServerHandlerTests.swift @@ -146,9 +146,9 @@ class AsyncServerHandlerTests: ServerHandlerTestCaseBase { func testResponseHeadersAndTrailersSentFromContext() { XCTAsyncTest { let handler = self.makeHandler { _, responseStreamWriter, context in - context.responseHeaders = ["pontiac": "bandit"] + context.initialResponseMetadata = ["pontiac": "bandit"] try await responseStreamWriter.send("1") - context.responseTrailers = ["disco": "strangler"] + context.trailingResponseMetadata = ["disco": "strangler"] } handler.receiveMetadata([:]) handler.receiveEnd() @@ -163,8 +163,8 @@ class AsyncServerHandlerTests: ServerHandlerTestCaseBase { func testResponseHeadersDroppedIfSetAfterFirstResponse() { XCTAsyncTest { let handler = self.makeHandler { _, responseStreamWriter, context in try await responseStreamWriter.send("1") - context.responseHeaders = ["pontiac": "bandit"] - context.responseTrailers = ["disco": "strangler"] + context.initialResponseMetadata = ["pontiac": "bandit"] + context.trailingResponseMetadata = ["disco": "strangler"] } handler.receiveMetadata([:]) handler.receiveEnd() From 95c16842fa8a5ed012df6cc1e131229069351fe1 Mon Sep 17 00:00:00 2001 From: Si Beaumont Date: Mon, 13 Sep 2021 14:41:07 +0100 Subject: [PATCH 7/7] fixup: swiftformat Signed-off-by: Si Beaumont --- Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerHandler.swift | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerHandler.swift b/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerHandler.swift index b8d23c779..2de59b807 100644 --- a/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerHandler.swift +++ b/Sources/GRPC/AsyncAwaitSupport/GRPCAsyncServerHandler.swift @@ -576,7 +576,10 @@ internal final class AsyncServerHandler< case let .active(activeState): // Now we have drained the response stream writer from the user handler we can send end. self.state = .completed - self.interceptors.send(.end(status, activeState.context.trailingResponseMetadata), promise: nil) + self.interceptors.send( + .end(status, activeState.context.trailingResponseMetadata), + promise: nil + ) case .completed: ()