diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift index 2a3bc9c27..97f850c33 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift @@ -267,6 +267,10 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler { writePromise.futureResult.whenComplete { result in switch result { case .success: + // If our final action was `sendRequestEnd`, that means we've already received + // the complete response. As a result, once we've uploaded all the body parts + // we need to tell the pool that the connection is idle. + self.connection.taskCompleted() oldRequest.succeedRequest(buffer) case .failure(let error): oldRequest.fail(error) diff --git a/Sources/AsyncHTTPClient/RequestBag+StateMachine.swift b/Sources/AsyncHTTPClient/RequestBag+StateMachine.swift index 557af2af1..63cb15758 100644 --- a/Sources/AsyncHTTPClient/RequestBag+StateMachine.swift +++ b/Sources/AsyncHTTPClient/RequestBag+StateMachine.swift @@ -347,10 +347,14 @@ extension RequestBag.StateMachine { self.state = .executing(executor, requestState, .buffering(currentBuffer, next: next)) return .none case .executing(let executor, let requestState, .waitingForRemote): - var buffer = buffer - let first = buffer.removeFirst() - self.state = .executing(executor, requestState, .buffering(buffer, next: .askExecutorForMore)) - return .forwardResponsePart(first) + if buffer.count > 0 { + var buffer = buffer + let first = buffer.removeFirst() + self.state = .executing(executor, requestState, .buffering(buffer, next: .askExecutorForMore)) + return .forwardResponsePart(first) + } else { + return .none + } case .redirected(let executor, var receivedBytes, let head, let redirectURL): let partsLength = buffer.reduce(into: 0) { $0 += $1.readableBytes } receivedBytes += partsLength diff --git a/Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift b/Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift index 230c91a2b..63a1cf540 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift @@ -1253,6 +1253,37 @@ class HTTPEchoHandler: ChannelInboundHandler { } } +final class HTTP200DelayedHandler: ChannelInboundHandler { + typealias InboundIn = HTTPServerRequestPart + typealias OutboundOut = HTTPServerResponsePart + + var pendingBodyParts: Int? + + init(bodyPartsBeforeResponse: Int) { + self.pendingBodyParts = bodyPartsBeforeResponse + } + + func channelRead(context: ChannelHandlerContext, data: NIOAny) { + let request = self.unwrapInboundIn(data) + switch request { + case .head: + break + case .body: + if let pendingBodyParts = self.pendingBodyParts { + if pendingBodyParts > 0 { + self.pendingBodyParts = pendingBodyParts - 1 + } else { + self.pendingBodyParts = nil + context.writeAndFlush(self.wrapOutboundOut(.head(.init(version: .http1_1, status: .ok))), promise: nil) + context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil) + } + } + case .end: + break + } + } +} + private let cert = """ -----BEGIN CERTIFICATE----- MIICmDCCAYACCQCPC8JDqMh1zzANBgkqhkiG9w0BAQsFADANMQswCQYDVQQGEwJ1 diff --git a/Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift b/Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift index 7eb532cf9..d6fe77e47 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift @@ -129,6 +129,7 @@ extension HTTPClientTests { ("testSSLHandshakeErrorPropagationDelayedClose", testSSLHandshakeErrorPropagationDelayedClose), ("testWeCloseConnectionsWhenConnectionCloseSetByServer", testWeCloseConnectionsWhenConnectionCloseSetByServer), ("testBiDirectionalStreaming", testBiDirectionalStreaming), + ("testBiDirectionalStreamingEarly200", testBiDirectionalStreamingEarly200), ("testSynchronousHandshakeErrorReporting", testSynchronousHandshakeErrorReporting), ("testFileDownloadChunked", testFileDownloadChunked), ("testCloseWhileBackpressureIsExertedIsFine", testCloseWhileBackpressureIsExertedIsFine), diff --git a/Tests/AsyncHTTPClientTests/HTTPClientTests.swift b/Tests/AsyncHTTPClientTests/HTTPClientTests.swift index de110fdb5..0e5ccf63f 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientTests.swift @@ -2940,6 +2940,60 @@ class HTTPClientTests: XCTestCase { XCTAssertNil(try delegate.next().wait()) } + // In this test, we test that a request can continue to stream its body after the response head and end + // was received where the end is a 200. + func testBiDirectionalStreamingEarly200() { + let httpBin = HTTPBin(.http1_1(ssl: false, compress: false)) { _ in HTTP200DelayedHandler(bodyPartsBeforeResponse: 1) } + defer { XCTAssertNoThrow(try httpBin.shutdown()) } + + let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 2) + defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) } + let writeEL = eventLoopGroup.next() + let delegateEL = eventLoopGroup.next() + + let httpClient = HTTPClient(eventLoopGroupProvider: .shared(eventLoopGroup)) + defer { XCTAssertNoThrow(try httpClient.syncShutdown()) } + + let delegate = ResponseStreamDelegate(eventLoop: delegateEL) + + let body: HTTPClient.Body = .stream { writer in + let finalPromise = writeEL.makePromise(of: Void.self) + + func writeLoop(_ writer: HTTPClient.Body.StreamWriter, index: Int) { + // always invoke from the wrong el to test thread safety + writeEL.preconditionInEventLoop() + + if index >= 30 { + return finalPromise.succeed(()) + } + + let sent = ByteBuffer(integer: index) + writer.write(.byteBuffer(sent)).whenComplete { result in + switch result { + case .success: + writeEL.execute { + writeLoop(writer, index: index + 1) + } + + case .failure(let error): + finalPromise.fail(error) + } + } + } + + writeEL.execute { + writeLoop(writer, index: 0) + } + + return finalPromise.futureResult + } + + let request = try! HTTPClient.Request(url: "http://localhost:\(httpBin.port)", body: body) + let future = httpClient.execute(request: request, delegate: delegate, eventLoop: .delegate(on: delegateEL)) + XCTAssertNoThrow(try future.wait()) + XCTAssertNil(try delegate.next().wait()) + } + func testSynchronousHandshakeErrorReporting() throws { // This only affects cases where we use NIOSSL. guard !isTestingNIOTS() else { return }