From 964c59efa58f6aac90e847d1698048ddb78c39b5 Mon Sep 17 00:00:00 2001 From: Cory Benfield Date: Thu, 16 Jun 2022 15:53:33 +0100 Subject: [PATCH 1/2] Correctly handle Connection: close with streaming Motivation When users stream their bodies they may still want to send Connection: close headers and terminate the connection early. This should work properly. Unfortunately it became clear that we didn't correctly pass the information that the connection needed to be closed. As a result, we'd inappropriately re-use the connection, potentially causing unnecessary HTTP errors. Modifications Signal whether the connection needs to be closed when the final connection action is to send .end. Results We behave better with streaming uploads. --- .../HTTP1/HTTP1ClientChannelHandler.swift | 14 +++-- .../HTTP1/HTTP1ConnectionStateMachine.swift | 7 ++- .../HTTP1ConnectionStateMachineTests.swift | 4 +- .../HTTPClientTestUtils.swift | 26 +++++++++ .../HTTPClientTests+XCTest.swift | 1 + .../HTTPClientTests.swift | 55 +++++++++++++++++++ 6 files changed, 99 insertions(+), 8 deletions(-) diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift index 97f850c33..affe4770c 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift @@ -261,16 +261,22 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler { case .close: context.close(promise: nil) oldRequest.succeedRequest(buffer) - case .sendRequestEnd(let writePromise): + case .sendRequestEnd(let writePromise, let shouldClose): let writePromise = writePromise ?? context.eventLoop.makePromise(of: Void.self) // We need to defer succeeding the old request to avoid ordering issues - writePromise.futureResult.whenComplete { result in + writePromise.futureResult.hop(to: context.eventLoop).whenComplete { result in switch result { case .success: // If our final action was `sendRequestEnd`, that means we've already received // the complete response. As a result, once we've uploaded all the body parts - // we need to tell the pool that the connection is idle. - self.connection.taskCompleted() + // we need to tell the pool that the connection is idle or, if we were asked to + // close when we're done, send the close. Either way, we then succeed the request + if shouldClose { + context.close(promise: nil) + } else { + self.connection.taskCompleted() + } + oldRequest.succeedRequest(buffer) case .failure(let error): oldRequest.fail(error) diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ConnectionStateMachine.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ConnectionStateMachine.swift index f0aff762c..e7258611c 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ConnectionStateMachine.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ConnectionStateMachine.swift @@ -35,7 +35,10 @@ struct HTTP1ConnectionStateMachine { /// as soon as we wrote the request end onto the wire. /// /// The promise is an optional write promise. - case sendRequestEnd(EventLoopPromise?) + /// + /// `shouldClose` records whether we have attached a Connection: close header to this request, and so the connection should + /// be terminated + case sendRequestEnd(EventLoopPromise?, shouldClose: Bool) /// Inform an observer that the connection has become idle case informConnectionIsIdle } @@ -413,7 +416,7 @@ extension HTTP1ConnectionStateMachine.State { newFinalAction = .close case .sendRequestEnd(let writePromise): self = .idle - newFinalAction = .sendRequestEnd(writePromise) + newFinalAction = .sendRequestEnd(writePromise, shouldClose: close) case .none: self = .idle newFinalAction = close ? .close : .informConnectionIsIdle diff --git a/Tests/AsyncHTTPClientTests/HTTP1ConnectionStateMachineTests.swift b/Tests/AsyncHTTPClientTests/HTTP1ConnectionStateMachineTests.swift index 55014f8c6..fd771aca0 100644 --- a/Tests/AsyncHTTPClientTests/HTTP1ConnectionStateMachineTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTP1ConnectionStateMachineTests.swift @@ -338,8 +338,8 @@ extension HTTP1ConnectionStateMachine.Action.FinalSuccessfulStreamAction: Equata switch (lhs, rhs) { case (.close, .close): return true - case (sendRequestEnd(let lhsPromise), sendRequestEnd(let rhsPromise)): - return lhsPromise?.futureResult == rhsPromise?.futureResult + case (sendRequestEnd(let lhsPromise, let lhsShouldClose), sendRequestEnd(let rhsPromise, let rhsShouldClose)): + return lhsPromise?.futureResult == rhsPromise?.futureResult && lhsShouldClose == rhsShouldClose case (informConnectionIsIdle, informConnectionIsIdle): return true default: diff --git a/Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift b/Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift index f2cc7b1d8..c99facc3f 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift @@ -1017,6 +1017,32 @@ internal final class CloseWithoutClosingServerHandler: ChannelInboundHandler { } } +final class ExpectClosureServerHandler: ChannelInboundHandler { + typealias InboundIn = HTTPServerRequestPart + typealias OutboundOut = HTTPServerResponsePart + + private let onClosePromise: EventLoopPromise + + init(onClosePromise: EventLoopPromise) { + self.onClosePromise = onClosePromise + } + + func channelRead(context: ChannelHandlerContext, data: NIOAny) { + switch self.unwrapInboundIn(data) { + case .head: + let head = HTTPResponseHead(version: .http1_1, status: .ok, headers: ["Content-Length": "0"]) + context.write(self.wrapOutboundOut(.head(head)), promise: nil) + context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil) + case .body, .end: + () + } + } + + func channelInactive(context: ChannelHandlerContext) { + self.onClosePromise.succeed(()) + } +} + struct EventLoopFutureTimeoutError: Error {} extension EventLoopFuture { diff --git a/Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift b/Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift index a709cf2d6..603c1aa9c 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift @@ -131,6 +131,7 @@ extension HTTPClientTests { ("testBiDirectionalStreaming", testBiDirectionalStreaming), ("testBiDirectionalStreamingEarly200", testBiDirectionalStreamingEarly200), ("testBiDirectionalStreamingEarly200DoesntPreventUsFromSendingMoreRequests", testBiDirectionalStreamingEarly200DoesntPreventUsFromSendingMoreRequests), + ("testCloseConnectionAfterEarly2XXWhenStreaming", testCloseConnectionAfterEarly2XXWhenStreaming), ("testSynchronousHandshakeErrorReporting", testSynchronousHandshakeErrorReporting), ("testFileDownloadChunked", testFileDownloadChunked), ("testCloseWhileBackpressureIsExertedIsFine", testCloseWhileBackpressureIsExertedIsFine), diff --git a/Tests/AsyncHTTPClientTests/HTTPClientTests.swift b/Tests/AsyncHTTPClientTests/HTTPClientTests.swift index e5d935fb9..523b07a6e 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientTests.swift @@ -3075,6 +3075,61 @@ class HTTPClientTests: XCTestCase { XCTAssertNoThrow(try future2.wait()) } + // This test validates that we correctly close the connection after our body completes when we've streamed a + // body and received the 2XX response _before_ we finished our stream. + func testCloseConnectionAfterEarly2XXWhenStreaming() { + let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 2) + defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) } + + let onClosePromise = eventLoopGroup.next().makePromise(of: Void.self) + let httpBin = HTTPBin(.http1_1(ssl: false, compress: false)) { _ in ExpectClosureServerHandler(onClosePromise: onClosePromise) } + defer { XCTAssertNoThrow(try httpBin.shutdown()) } + + let writeEL = eventLoopGroup.next() + + let httpClient = HTTPClient(eventLoopGroupProvider: .shared(eventLoopGroup)) + defer { XCTAssertNoThrow(try httpClient.syncShutdown()) } + + let body: HTTPClient.Body = .stream { writer in + let finalPromise = writeEL.makePromise(of: Void.self) + + func writeLoop(_ writer: HTTPClient.Body.StreamWriter, index: Int) { + // always invoke from the wrong el to test thread safety + writeEL.preconditionInEventLoop() + + if index >= 30 { + return finalPromise.succeed(()) + } + + let sent = ByteBuffer(integer: index) + writer.write(.byteBuffer(sent)).whenComplete { result in + switch result { + case .success: + writeEL.execute { + writeLoop(writer, index: index + 1) + } + + case .failure(let error): + finalPromise.fail(error) + } + } + } + + writeEL.execute { + writeLoop(writer, index: 0) + } + + return finalPromise.futureResult + } + + let headers = HTTPHeaders([("Connection", "close")]) + let request = try! HTTPClient.Request(url: "http://localhost:\(httpBin.port)", headers: headers, body: body) + let future = httpClient.execute(request: request) + XCTAssertNoThrow(try future.wait()) + XCTAssertNoThrow(try onClosePromise.futureResult.wait()) + } + + func testSynchronousHandshakeErrorReporting() throws { // This only affects cases where we use NIOSSL. guard !isTestingNIOTS() else { return } From 33ce2ec289b96a8d4cb25f0bfaf2df2afc777496 Mon Sep 17 00:00:00 2001 From: Cory Benfield Date: Fri, 17 Jun 2022 12:23:53 +0100 Subject: [PATCH 2/2] Remove whitespace --- Tests/AsyncHTTPClientTests/HTTPClientTests.swift | 1 - 1 file changed, 1 deletion(-) diff --git a/Tests/AsyncHTTPClientTests/HTTPClientTests.swift b/Tests/AsyncHTTPClientTests/HTTPClientTests.swift index 523b07a6e..8f2c7c1aa 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientTests.swift @@ -3129,7 +3129,6 @@ class HTTPClientTests: XCTestCase { XCTAssertNoThrow(try onClosePromise.futureResult.wait()) } - func testSynchronousHandshakeErrorReporting() throws { // This only affects cases where we use NIOSSL. guard !isTestingNIOTS() else { return }