From a561be3f64a6e8315b72b95751d413f2522bc75b Mon Sep 17 00:00:00 2001 From: Johannes Weiss Date: Tue, 25 Feb 2020 18:52:51 +0000 Subject: [PATCH] test that connection close whilst idle works Motivation: I wasn't sure if we actually properly observed an idle connection suddenly closing. Ideally, if an idle connection just goes away, we just want to remove it from the connection pool (which is what happens). Modification: Add a test that verifies idle connections that get closed from the server don't get reused in the pool. Result: Better test coverage for more complicated scenarios. --- .../HTTPClientInternalTests+XCTest.swift | 1 + .../HTTPClientInternalTests.swift | 128 ++++++++++++++++++ 2 files changed, 129 insertions(+) diff --git a/Tests/AsyncHTTPClientTests/HTTPClientInternalTests+XCTest.swift b/Tests/AsyncHTTPClientTests/HTTPClientInternalTests+XCTest.swift index 8139aff49..06f9b4228 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientInternalTests+XCTest.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientInternalTests+XCTest.swift @@ -34,6 +34,7 @@ extension HTTPClientInternalTests { ("testRequestURITrailingSlash", testRequestURITrailingSlash), ("testChannelAndDelegateOnDifferentEventLoops", testChannelAndDelegateOnDifferentEventLoops), ("testResponseConnectionCloseGet", testResponseConnectionCloseGet), + ("testWeNoticeRemoteClosuresEvenWhenConnectionIsIdleInPool", testWeNoticeRemoteClosuresEvenWhenConnectionIsIdleInPool), ] } } diff --git a/Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift b/Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift index e23f9684b..d2124171e 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift @@ -16,6 +16,7 @@ import NIO import NIOConcurrencyHelpers import NIOHTTP1 +import NIOTestUtils import XCTest class HTTPClientInternalTests: XCTestCase { @@ -453,4 +454,131 @@ class HTTPClientInternalTests: XCTestCase { XCTAssertEqual(httpClient.pool.connectionProviderCount, 0) }.futureResult.wait() } + + func testWeNoticeRemoteClosuresEvenWhenConnectionIsIdleInPool() { + final class ServerThatRespondsThenJustCloses: ChannelInboundHandler { + typealias InboundIn = HTTPServerRequestPart + typealias OutboundOut = HTTPServerResponsePart + + let requestNumber: NIOAtomic + let connectionNumber: NIOAtomic + + init(requestNumber: NIOAtomic, connectionNumber: NIOAtomic) { + self.requestNumber = requestNumber + self.connectionNumber = connectionNumber + } + + func channelActive(context: ChannelHandlerContext) { + _ = self.connectionNumber.add(1) + } + + func channelRead(context: ChannelHandlerContext, data: NIOAny) { + let req = self.unwrapInboundIn(data) + + switch req { + case .head, .body: + () + case .end: + let last = self.requestNumber.add(1) + switch last { + case 0: + context.write(self.wrapOutboundOut(.head(.init(version: .init(major: 1, minor: 1), status: .ok))), + promise: nil) + context.writeAndFlush(self.wrapOutboundOut(.end(nil))).whenComplete { _ in + context.eventLoop.scheduleTask(in: .milliseconds(10)) { + context.close(promise: nil) + } + } + case 1: + context.write(self.wrapOutboundOut(.head(.init(version: .init(major: 1, minor: 1), status: .ok))), + promise: nil) + context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil) + default: + XCTFail("did not expect request \(last + 1)") + } + } + } + } + + final class ObserveWhenClosedHandler: ChannelInboundHandler { + typealias InboundIn = Any + + let channelInactivePromise: EventLoopPromise + + init(channelInactivePromise: EventLoopPromise) { + self.channelInactivePromise = channelInactivePromise + } + + func channelInactive(context: ChannelHandlerContext) { + context.fireChannelInactive() + self.channelInactivePromise.succeed(()) + } + } + + let group = MultiThreadedEventLoopGroup(numberOfThreads: 1) + defer { + XCTAssertNoThrow(try group.syncShutdownGracefully()) + } + let requestNumber = NIOAtomic.makeAtomic(value: 0) + let connectionNumber = NIOAtomic.makeAtomic(value: 0) + let sharedStateServerHandler = ServerThatRespondsThenJustCloses(requestNumber: requestNumber, + connectionNumber: connectionNumber) + var maybeServer: Channel? + XCTAssertNoThrow(maybeServer = try ServerBootstrap(group: group) + .serverChannelOption(ChannelOptions.socket(.init(SOL_SOCKET), .init(SO_REUSEADDR)), value: 1) + .childChannelInitializer { channel in + channel.pipeline.configureHTTPServerPipeline().flatMap { + // We're deliberately adding a handler which is shared between multiple channels. This is normally + // very verboten but this handler is specially crafted to tolerate this. + channel.pipeline.addHandler(sharedStateServerHandler) + } + } + .bind(host: "127.0.0.1", port: 0) + .wait()) + guard let server = maybeServer else { + XCTFail("couldn't create server") + return + } + defer { + XCTAssertNoThrow(try server.close().wait()) + } + + let url = "http://127.0.0.1:\(server.localAddress!.port!)" + let client = HTTPClient(eventLoopGroupProvider: .shared(group)) + defer { + XCTAssertNoThrow(try client.syncShutdown()) + } + + var maybeConnection: ConnectionPool.Connection? + // This is pretty evil but we literally just get hold of a connection to get to the channel to be able to + // observe when the server closing the connection is known to the client. + XCTAssertNoThrow(maybeConnection = try client.pool.getConnection(for: .init(url: url), + preference: .indifferent, + on: group.next(), + deadline: nil).wait()) + guard let connection = maybeConnection else { + XCTFail("couldn't get connection") + return + } + + // And let's also give the connection back :). + client.pool.release(connection) + + XCTAssertEqual(0, sharedStateServerHandler.requestNumber.load()) + XCTAssertEqual(1, client.pool.connectionProviderCount) + XCTAssertTrue(connection.channel.isActive) + XCTAssertNoThrow(XCTAssertEqual(.ok, try client.get(url: url).wait().status)) + XCTAssertEqual(1, sharedStateServerHandler.connectionNumber.load()) + XCTAssertEqual(1, sharedStateServerHandler.requestNumber.load()) + + // We have received the first response and we know the remote end will now close the connection. + // Let's wait until we see the closure in the client's channel. + XCTAssertNoThrow(try connection.channel.closeFuture.wait()) + + // Now that we should have learned that the connection is dead, a subsequent request should work and use a new + // connection + XCTAssertNoThrow(XCTAssertEqual(.ok, try client.get(url: url).wait().status)) + XCTAssertEqual(2, sharedStateServerHandler.connectionNumber.load()) + XCTAssertEqual(2, sharedStateServerHandler.requestNumber.load()) + } }