diff --git a/Tests/AsyncHTTPClientTests/HTTPClientInternalTests+XCTest.swift b/Tests/AsyncHTTPClientTests/HTTPClientInternalTests+XCTest.swift index 8139aff49..06f9b4228 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientInternalTests+XCTest.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientInternalTests+XCTest.swift @@ -34,6 +34,7 @@ extension HTTPClientInternalTests { ("testRequestURITrailingSlash", testRequestURITrailingSlash), ("testChannelAndDelegateOnDifferentEventLoops", testChannelAndDelegateOnDifferentEventLoops), ("testResponseConnectionCloseGet", testResponseConnectionCloseGet), + ("testWeNoticeRemoteClosuresEvenWhenConnectionIsIdleInPool", testWeNoticeRemoteClosuresEvenWhenConnectionIsIdleInPool), ] } } diff --git a/Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift b/Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift index e23f9684b..d2124171e 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift @@ -16,6 +16,7 @@ import NIO import NIOConcurrencyHelpers import NIOHTTP1 +import NIOTestUtils import XCTest class HTTPClientInternalTests: XCTestCase { @@ -453,4 +454,131 @@ class HTTPClientInternalTests: XCTestCase { XCTAssertEqual(httpClient.pool.connectionProviderCount, 0) }.futureResult.wait() } + + func testWeNoticeRemoteClosuresEvenWhenConnectionIsIdleInPool() { + final class ServerThatRespondsThenJustCloses: ChannelInboundHandler { + typealias InboundIn = HTTPServerRequestPart + typealias OutboundOut = HTTPServerResponsePart + + let requestNumber: NIOAtomic + let connectionNumber: NIOAtomic + + init(requestNumber: NIOAtomic, connectionNumber: NIOAtomic) { + self.requestNumber = requestNumber + self.connectionNumber = connectionNumber + } + + func channelActive(context: ChannelHandlerContext) { + _ = self.connectionNumber.add(1) + } + + func channelRead(context: ChannelHandlerContext, data: NIOAny) { + let req = self.unwrapInboundIn(data) + + switch req { + case .head, .body: + () + case .end: + let last = self.requestNumber.add(1) + switch last { + case 0: + context.write(self.wrapOutboundOut(.head(.init(version: .init(major: 1, minor: 1), status: .ok))), + promise: nil) + context.writeAndFlush(self.wrapOutboundOut(.end(nil))).whenComplete { _ in + context.eventLoop.scheduleTask(in: .milliseconds(10)) { + context.close(promise: nil) + } + } + case 1: + context.write(self.wrapOutboundOut(.head(.init(version: .init(major: 1, minor: 1), status: .ok))), + promise: nil) + context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil) + default: + XCTFail("did not expect request \(last + 1)") + } + } + } + } + + final class ObserveWhenClosedHandler: ChannelInboundHandler { + typealias InboundIn = Any + + let channelInactivePromise: EventLoopPromise + + init(channelInactivePromise: EventLoopPromise) { + self.channelInactivePromise = channelInactivePromise + } + + func channelInactive(context: ChannelHandlerContext) { + context.fireChannelInactive() + self.channelInactivePromise.succeed(()) + } + } + + let group = MultiThreadedEventLoopGroup(numberOfThreads: 1) + defer { + XCTAssertNoThrow(try group.syncShutdownGracefully()) + } + let requestNumber = NIOAtomic.makeAtomic(value: 0) + let connectionNumber = NIOAtomic.makeAtomic(value: 0) + let sharedStateServerHandler = ServerThatRespondsThenJustCloses(requestNumber: requestNumber, + connectionNumber: connectionNumber) + var maybeServer: Channel? + XCTAssertNoThrow(maybeServer = try ServerBootstrap(group: group) + .serverChannelOption(ChannelOptions.socket(.init(SOL_SOCKET), .init(SO_REUSEADDR)), value: 1) + .childChannelInitializer { channel in + channel.pipeline.configureHTTPServerPipeline().flatMap { + // We're deliberately adding a handler which is shared between multiple channels. This is normally + // very verboten but this handler is specially crafted to tolerate this. + channel.pipeline.addHandler(sharedStateServerHandler) + } + } + .bind(host: "127.0.0.1", port: 0) + .wait()) + guard let server = maybeServer else { + XCTFail("couldn't create server") + return + } + defer { + XCTAssertNoThrow(try server.close().wait()) + } + + let url = "http://127.0.0.1:\(server.localAddress!.port!)" + let client = HTTPClient(eventLoopGroupProvider: .shared(group)) + defer { + XCTAssertNoThrow(try client.syncShutdown()) + } + + var maybeConnection: ConnectionPool.Connection? + // This is pretty evil but we literally just get hold of a connection to get to the channel to be able to + // observe when the server closing the connection is known to the client. + XCTAssertNoThrow(maybeConnection = try client.pool.getConnection(for: .init(url: url), + preference: .indifferent, + on: group.next(), + deadline: nil).wait()) + guard let connection = maybeConnection else { + XCTFail("couldn't get connection") + return + } + + // And let's also give the connection back :). + client.pool.release(connection) + + XCTAssertEqual(0, sharedStateServerHandler.requestNumber.load()) + XCTAssertEqual(1, client.pool.connectionProviderCount) + XCTAssertTrue(connection.channel.isActive) + XCTAssertNoThrow(XCTAssertEqual(.ok, try client.get(url: url).wait().status)) + XCTAssertEqual(1, sharedStateServerHandler.connectionNumber.load()) + XCTAssertEqual(1, sharedStateServerHandler.requestNumber.load()) + + // We have received the first response and we know the remote end will now close the connection. + // Let's wait until we see the closure in the client's channel. + XCTAssertNoThrow(try connection.channel.closeFuture.wait()) + + // Now that we should have learned that the connection is dead, a subsequent request should work and use a new + // connection + XCTAssertNoThrow(XCTAssertEqual(.ok, try client.get(url: url).wait().status)) + XCTAssertEqual(2, sharedStateServerHandler.connectionNumber.load()) + XCTAssertEqual(2, sharedStateServerHandler.requestNumber.load()) + } }