From 2f8e25275dff259f7568559b16b833d6fe04090e Mon Sep 17 00:00:00 2001 From: adtrevor Date: Wed, 26 Feb 2020 18:51:12 +0100 Subject: [PATCH 01/14] Close idle pool connections Motivation: Pooled connections should close at some point (see #168) Changes: - Add new poolingTimeout property to HTTPClient.Configuration, it's default value is .seconds(60), it can be set to nil if one wishes to disable this timeout. - Add relevant unit test --- Sources/AsyncHTTPClient/HTTPClient.swift | 63 +++++++++++++++---- Sources/AsyncHTTPClient/HTTPHandler.swift | 26 +++++++- .../HTTPClientTestUtils.swift | 33 +++++++++- .../HTTPClientTests+XCTest.swift | 1 + .../HTTPClientTests.swift | 13 ++++ 5 files changed, 123 insertions(+), 13 deletions(-) diff --git a/Sources/AsyncHTTPClient/HTTPClient.swift b/Sources/AsyncHTTPClient/HTTPClient.swift index 99ec37f2e..a3d7bbc39 100644 --- a/Sources/AsyncHTTPClient/HTTPClient.swift +++ b/Sources/AsyncHTTPClient/HTTPClient.swift @@ -305,7 +305,7 @@ public class HTTPClient { redirectHandler = nil } - let task = Task(eventLoop: taskEL) + let task = Task(eventLoop: taskEL, poolingTimeout: self.configuration.poolingTimeout) self.stateLock.withLock { self.tasks[task.id] = task } @@ -321,17 +321,18 @@ public class HTTPClient { connection.flatMap { connection -> EventLoopFuture in let channel = connection.channel - let addedFuture: EventLoopFuture - - switch self.configuration.decompression { - case .disabled: - addedFuture = channel.eventLoop.makeSucceededFuture(()) - case .enabled(let limit): - let decompressHandler = NIOHTTPResponseDecompressor(limit: limit) - addedFuture = channel.pipeline.addHandler(decompressHandler) - } - return addedFuture.flatMap { + return connection.removeHandler(IdleStateHandler.self).flatMap { + connection.removeHandler(IdlePoolConnectionHandler.self) + }.flatMap { + switch self.configuration.decompression { + case .disabled: + return channel.eventLoop.makeSucceededFuture(()) + case .enabled(let limit): + let decompressHandler = NIOHTTPResponseDecompressor(limit: limit) + return channel.pipeline.addHandler(decompressHandler) + } + }.flatMap { if let timeout = self.resolve(timeout: self.configuration.timeout.read, deadline: deadline) { return channel.pipeline.addHandler(IdleStateHandler(readTimeout: timeout)) } else { @@ -408,6 +409,8 @@ public class HTTPClient { public var redirectConfiguration: RedirectConfiguration /// Default client timeout, defaults to no timeouts. public var timeout: Timeout + /// Timeout of pooled connections + public var poolingTimeout: TimeAmount? /// Upstream proxy, defaults to no proxy. public var proxy: Proxy? /// Enables automatic body decompression. Supported algorithms are gzip and deflate. @@ -418,30 +421,68 @@ public class HTTPClient { public init(tlsConfiguration: TLSConfiguration? = nil, redirectConfiguration: RedirectConfiguration? = nil, timeout: Timeout = Timeout(), + poolingTimeout: TimeAmount, proxy: Proxy? = nil, ignoreUncleanSSLShutdown: Bool = false, decompression: Decompression = .disabled) { self.tlsConfiguration = tlsConfiguration self.redirectConfiguration = redirectConfiguration ?? RedirectConfiguration() self.timeout = timeout + self.poolingTimeout = poolingTimeout self.proxy = proxy self.ignoreUncleanSSLShutdown = ignoreUncleanSSLShutdown self.decompression = decompression } + public init(tlsConfiguration: TLSConfiguration? = nil, + redirectConfiguration: RedirectConfiguration? = nil, + timeout: Timeout = Timeout(), + proxy: Proxy? = nil, + ignoreUncleanSSLShutdown: Bool = false, + decompression: Decompression = .disabled) { + self.init( + tlsConfiguration: tlsConfiguration, + redirectConfiguration: redirectConfiguration, + timeout: timeout, + poolingTimeout: .seconds(60), + proxy: proxy, + ignoreUncleanSSLShutdown: ignoreUncleanSSLShutdown, + decompression: decompression + ) + } + public init(certificateVerification: CertificateVerification, redirectConfiguration: RedirectConfiguration? = nil, timeout: Timeout = Timeout(), + poolingTimeout: TimeAmount = .seconds(60), proxy: Proxy? = nil, ignoreUncleanSSLShutdown: Bool = false, decompression: Decompression = .disabled) { self.tlsConfiguration = TLSConfiguration.forClient(certificateVerification: certificateVerification) self.redirectConfiguration = redirectConfiguration ?? RedirectConfiguration() self.timeout = timeout + self.poolingTimeout = poolingTimeout self.proxy = proxy self.ignoreUncleanSSLShutdown = ignoreUncleanSSLShutdown self.decompression = decompression } + + public init(certificateVerification: CertificateVerification, + redirectConfiguration: RedirectConfiguration? = nil, + timeout: Timeout = Timeout(), + proxy: Proxy? = nil, + ignoreUncleanSSLShutdown: Bool = false, + decompression: Decompression = .disabled) { + self.init( + certificateVerification: certificateVerification, + redirectConfiguration: redirectConfiguration, + timeout: timeout, + poolingTimeout: .seconds(60), + proxy: proxy, + ignoreUncleanSSLShutdown: ignoreUncleanSSLShutdown, + decompression: decompression + ) + } } /// Specifies how `EventLoopGroup` will be created and establishes lifecycle ownership. diff --git a/Sources/AsyncHTTPClient/HTTPHandler.swift b/Sources/AsyncHTTPClient/HTTPHandler.swift index 6320266c9..8f88b0077 100644 --- a/Sources/AsyncHTTPClient/HTTPHandler.swift +++ b/Sources/AsyncHTTPClient/HTTPHandler.swift @@ -497,13 +497,15 @@ extension HTTPClient { var cancelled: Bool let lock: Lock let id = UUID() + let poolingTimeout: TimeAmount? - init(eventLoop: EventLoop) { + init(eventLoop: EventLoop, poolingTimeout: TimeAmount? = nil) { self.eventLoop = eventLoop self.promise = eventLoop.makePromise() self.completion = self.promise.futureResult.map { _ in } self.cancelled = false self.lock = Lock() + self.poolingTimeout = poolingTimeout } static func failedTask(eventLoop: EventLoop, error: Error) -> Task { @@ -571,6 +573,19 @@ extension HTTPClient { connection.removeHandler(IdleStateHandler.self) }.flatMap { connection.removeHandler(TaskHandler.self) + }.flatMap { + connection.channel.pipeline.addHandlers([ + IdleStateHandler(writeTimeout: self.poolingTimeout), + IdlePoolConnectionHandler(), + ]) + }.flatMapError { error in + if let error = error as? ChannelError, error == .ioOnClosedChannel { + // We may get this error if channel is released because it is + // closed, it is safe to ignore it + return connection.channel.eventLoop.makeSucceededFuture(()) + } else { + return connection.channel.eventLoop.makeFailedFuture(error) + } }.map { connection.release() }.flatMapError { error in @@ -1008,3 +1023,12 @@ internal struct RedirectHandler { } } } + +class IdlePoolConnectionHandler: ChannelInboundHandler, RemovableChannelHandler { + typealias InboundIn = NIOAny + func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) { + if let idleEvent = event as? IdleStateHandler.IdleStateEvent, idleEvent == .write { + context.close(promise: nil) + } + } +} diff --git a/Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift b/Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift index e0c843ca4..62e5fe96e 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift @@ -167,6 +167,11 @@ internal final class HTTPBin { let group = MultiThreadedEventLoopGroup(numberOfThreads: 1) let serverChannel: Channel let isShutdown: NIOAtomic = .makeAtomic(value: false) + var connectionCount: NIOAtomic = .makeAtomic(value: 0) + private let activeConnCounterHandler: CountActiveConnectionsHandler + var activeConnections: Int { + return self.activeConnCounterHandler.currentlyActiveConnections + } enum BindTarget { case unixDomainSocket(String) @@ -204,10 +209,15 @@ internal final class HTTPBin { socketAddress = try! SocketAddress(unixDomainSocketPath: path) } + let activeConnCounterHandler = CountActiveConnectionsHandler() + self.activeConnCounterHandler = activeConnCounterHandler + self.serverChannel = try! ServerBootstrap(group: self.group) .serverChannelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1) .childChannelOption(ChannelOptions.socket(IPPROTO_TCP, TCP_NODELAY), value: 1) - .childChannelInitializer { channel in + .serverChannelInitializer { channel in + channel.pipeline.addHandler(activeConnCounterHandler) + }.childChannelInitializer { channel in guard !refusesConnections else { return channel.eventLoop.makeFailedFuture(HTTPBinError.refusedConnection) } @@ -537,6 +547,27 @@ internal final class HttpBinHandler: ChannelInboundHandler { } } +final class CountActiveConnectionsHandler: ChannelInboundHandler { + typealias InboundIn = Channel + + private let activeConns = NIOAtomic.makeAtomic(value: 0) + + public var currentlyActiveConnections: Int { + return self.activeConns.load() + } + + func channelRead(context: ChannelHandlerContext, data: NIOAny) { + let channel = self.unwrapInboundIn(data) + + _ = self.activeConns.add(1) + channel.closeFuture.whenComplete { _ in + _ = self.activeConns.sub(1) + } + + context.fireChannelRead(data) + } +} + internal class HttpBinForSSLUncleanShutdown { let group = MultiThreadedEventLoopGroup(numberOfThreads: 1) let serverChannel: Channel diff --git a/Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift b/Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift index f03604e80..92b46bf7c 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift @@ -93,6 +93,7 @@ extension HTTPClientTests { ("testUDSSocketAndPath", testUDSSocketAndPath), ("testUseExistingConnectionOnDifferentEL", testUseExistingConnectionOnDifferentEL), ("testWeRecoverFromServerThatClosesTheConnectionOnUs", testWeRecoverFromServerThatClosesTheConnectionOnUs), + ("testPoolClosesIdleConnections", testPoolClosesIdleConnections), ] } } diff --git a/Tests/AsyncHTTPClientTests/HTTPClientTests.swift b/Tests/AsyncHTTPClientTests/HTTPClientTests.swift index 2bf3f8909..61f43ea73 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientTests.swift @@ -1631,4 +1631,17 @@ class HTTPClientTests: XCTestCase { XCTAssertEqual(2, sharedStateServerHandler.connectionNumber.load()) XCTAssertEqual(3, sharedStateServerHandler.requestNumber.load()) } + + func testPoolClosesIdleConnections() { + let httpBin = HTTPBin() + let httpClient = HTTPClient(eventLoopGroupProvider: .createNew, configuration: .init(poolingTimeout: .seconds(1))) + defer { + XCTAssertNoThrow(try httpBin.shutdown()) + XCTAssertNoThrow(try httpClient.syncShutdown(requiresCleanClose: true)) + } + XCTAssertNoThrow(try httpClient.get(url: "http://localhost:\(httpBin.port)/get").wait()) + XCTAssertEqual(httpBin.activeConnections, 1) + Thread.sleep(forTimeInterval: 2) + XCTAssertEqual(httpBin.activeConnections, 0) + } } From abbd05738e2023d31c27c7dee9e4cc2edcd72ac0 Mon Sep 17 00:00:00 2001 From: adtrevor Date: Wed, 26 Feb 2020 19:19:23 +0100 Subject: [PATCH 02/14] Transmit inbound events in IdlePoolConnectionHandler --- Sources/AsyncHTTPClient/HTTPHandler.swift | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Sources/AsyncHTTPClient/HTTPHandler.swift b/Sources/AsyncHTTPClient/HTTPHandler.swift index 8f88b0077..ee92f35e6 100644 --- a/Sources/AsyncHTTPClient/HTTPHandler.swift +++ b/Sources/AsyncHTTPClient/HTTPHandler.swift @@ -1029,6 +1029,8 @@ class IdlePoolConnectionHandler: ChannelInboundHandler, RemovableChannelHandler func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) { if let idleEvent = event as? IdleStateHandler.IdleStateEvent, idleEvent == .write { context.close(promise: nil) + } else { + context.fireUserInboundEventTriggered(event) } } } From 6ccd28a5a48faa93f453038ecd532f904bcf902a Mon Sep 17 00:00:00 2001 From: adtrevor Date: Wed, 26 Feb 2020 19:21:29 +0100 Subject: [PATCH 03/14] Fix testPoolClosesIdleConnections --- Tests/AsyncHTTPClientTests/HTTPClientTests.swift | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/Tests/AsyncHTTPClientTests/HTTPClientTests.swift b/Tests/AsyncHTTPClientTests/HTTPClientTests.swift index 61f43ea73..03f74b389 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientTests.swift @@ -1634,14 +1634,13 @@ class HTTPClientTests: XCTestCase { func testPoolClosesIdleConnections() { let httpBin = HTTPBin() - let httpClient = HTTPClient(eventLoopGroupProvider: .createNew, configuration: .init(poolingTimeout: .seconds(1))) + let httpClient = HTTPClient(eventLoopGroupProvider: .createNew, configuration: .init(poolingTimeout: .milliseconds(100))) defer { XCTAssertNoThrow(try httpBin.shutdown()) XCTAssertNoThrow(try httpClient.syncShutdown(requiresCleanClose: true)) } XCTAssertNoThrow(try httpClient.get(url: "http://localhost:\(httpBin.port)/get").wait()) - XCTAssertEqual(httpBin.activeConnections, 1) - Thread.sleep(forTimeInterval: 2) + Thread.sleep(forTimeInterval: 0.2) XCTAssertEqual(httpBin.activeConnections, 0) } } From 2044699c3b2d0e6713b687eee6ab6f4da71d5a88 Mon Sep 17 00:00:00 2001 From: adtrevor Date: Wed, 26 Feb 2020 21:15:30 +0100 Subject: [PATCH 04/14] Fix idle handle racing condition Fixes the race condition and adds a new test for it --- Sources/AsyncHTTPClient/ConnectionPool.swift | 33 +++++++++++++++++-- Sources/AsyncHTTPClient/HTTPClient.swift | 13 ++++++++ .../HTTPClientTests+XCTest.swift | 1 + .../HTTPClientTests.swift | 13 ++++++++ 4 files changed, 57 insertions(+), 3 deletions(-) diff --git a/Sources/AsyncHTTPClient/ConnectionPool.swift b/Sources/AsyncHTTPClient/ConnectionPool.swift index 8e01efc52..9e43ee7de 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool.swift @@ -224,6 +224,22 @@ final class ConnectionPool { fileprivate var closePromise: EventLoopPromise var closeFuture: EventLoopFuture + + func removeIdleConnectionHandlersForLease() -> EventLoopFuture { + return self.channel.eventLoop.flatSubmit { + self.removeHandler(IdleStateHandler.self).flatMap { + self.removeHandler(IdlePoolConnectionHandler.self) + }.flatMap { + if self.channel.isActive { + return self.channel.eventLoop.makeSucceededFuture(self) + } else { + return self.channel.eventLoop.makeFailedFuture(IdleCloseError()) + } + } + } + } + + struct IdleCloseError: Error {} } /// A connection provider of `HTTP/1.1` connections with a given `Key` (host, scheme, port) @@ -294,7 +310,14 @@ final class ConnectionPool { let action = self.stateLock.withLock { self.state.connectionAction(for: preference) } switch action { case .leaseConnection(let connection): - return connection.channel.eventLoop.makeSucceededFuture(connection) + return connection.removeIdleConnectionHandlersForLease().flatMapError { _ in + connection.closeFuture.flatMap { // We ensure close actions are run first + let defaultEventLoop = self.stateLock.withLock { + self.state.defaultEventLoop + } + return self.makeConnection(on: preference.bestEventLoop ?? defaultEventLoop) + } + } case .makeConnection(let eventLoop): return self.makeConnection(on: eventLoop) case .leaseFutureConnection(let futureConnection): @@ -454,7 +477,7 @@ final class ConnectionPool { fileprivate struct State { /// The default `EventLoop` to use for this `HTTP1ConnectionProvider` - private let defaultEventLoop: EventLoop + let defaultEventLoop: EventLoop /// The maximum number of connections to a certain (host, scheme, port) tuple. private let maximumConcurrentConnections: Int = 8 @@ -477,7 +500,11 @@ final class ConnectionPool { fileprivate var activity: Activity = .opened - fileprivate var pending: Int = 0 + fileprivate var pending: Int = 0 { + didSet { + assert(self.pending >= 0) + } + } private let parentPool: ConnectionPool diff --git a/Sources/AsyncHTTPClient/HTTPClient.swift b/Sources/AsyncHTTPClient/HTTPClient.swift index a3d7bbc39..6cf25b71e 100644 --- a/Sources/AsyncHTTPClient/HTTPClient.swift +++ b/Sources/AsyncHTTPClient/HTTPClient.swift @@ -531,6 +531,19 @@ public class HTTPClient { public static func delegateAndChannel(on eventLoop: EventLoop) -> EventLoopPreference { return EventLoopPreference(.delegateAndChannel(on: eventLoop)) } + + var bestEventLoop: EventLoop? { + switch self.preference { + case .delegate(on: let el): + return el + case .delegateAndChannel(on: let el): + return el + case .testOnly_exact(channelOn: let el, delegateOn: _): + return el + case .indifferent: + return nil + } + } } /// Specifies decompression settings. diff --git a/Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift b/Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift index 92b46bf7c..07cca50d7 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift @@ -94,6 +94,7 @@ extension HTTPClientTests { ("testUseExistingConnectionOnDifferentEL", testUseExistingConnectionOnDifferentEL), ("testWeRecoverFromServerThatClosesTheConnectionOnUs", testWeRecoverFromServerThatClosesTheConnectionOnUs), ("testPoolClosesIdleConnections", testPoolClosesIdleConnections), + ("testRacePoolIdleConnectionsAndGet", testRacePoolIdleConnectionsAndGet), ] } } diff --git a/Tests/AsyncHTTPClientTests/HTTPClientTests.swift b/Tests/AsyncHTTPClientTests/HTTPClientTests.swift index 03f74b389..d5137384d 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientTests.swift @@ -1643,4 +1643,17 @@ class HTTPClientTests: XCTestCase { Thread.sleep(forTimeInterval: 0.2) XCTAssertEqual(httpBin.activeConnections, 0) } + + func testRacePoolIdleConnectionsAndGet() { + let httpBin = HTTPBin() + let httpClient = HTTPClient(eventLoopGroupProvider: .createNew, configuration: .init(poolingTimeout: .milliseconds(10))) + defer { + XCTAssertNoThrow(try httpBin.shutdown()) + XCTAssertNoThrow(try httpClient.syncShutdown(requiresCleanClose: true)) + } + for _ in 1...500 { + XCTAssertNoThrow(try httpClient.get(url: "http://localhost:\(httpBin.port)/get").wait()) + Thread.sleep(forTimeInterval: 0.01 + .random(in: -0.05...0.05)) + } + } } From 060c277547e2fd403495420fbed8762a3e33f218 Mon Sep 17 00:00:00 2001 From: adtrevor Date: Wed, 26 Feb 2020 21:19:16 +0100 Subject: [PATCH 05/14] Configuration property rename pollingTimeout to maximumAllowedIdleTimeInConnectionPool --- Sources/AsyncHTTPClient/HTTPClient.swift | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/Sources/AsyncHTTPClient/HTTPClient.swift b/Sources/AsyncHTTPClient/HTTPClient.swift index 6cf25b71e..48e29fb41 100644 --- a/Sources/AsyncHTTPClient/HTTPClient.swift +++ b/Sources/AsyncHTTPClient/HTTPClient.swift @@ -305,7 +305,7 @@ public class HTTPClient { redirectHandler = nil } - let task = Task(eventLoop: taskEL, poolingTimeout: self.configuration.poolingTimeout) + let task = Task(eventLoop: taskEL, poolingTimeout: self.configuration.maximumAllowedIdleTimeInConnectionPool) self.stateLock.withLock { self.tasks[task.id] = task } @@ -410,7 +410,7 @@ public class HTTPClient { /// Default client timeout, defaults to no timeouts. public var timeout: Timeout /// Timeout of pooled connections - public var poolingTimeout: TimeAmount? + public var maximumAllowedIdleTimeInConnectionPool: TimeAmount? /// Upstream proxy, defaults to no proxy. public var proxy: Proxy? /// Enables automatic body decompression. Supported algorithms are gzip and deflate. @@ -421,14 +421,14 @@ public class HTTPClient { public init(tlsConfiguration: TLSConfiguration? = nil, redirectConfiguration: RedirectConfiguration? = nil, timeout: Timeout = Timeout(), - poolingTimeout: TimeAmount, + maximumAllowedIdleTimeInConnectionPool: TimeAmount, proxy: Proxy? = nil, ignoreUncleanSSLShutdown: Bool = false, decompression: Decompression = .disabled) { self.tlsConfiguration = tlsConfiguration self.redirectConfiguration = redirectConfiguration ?? RedirectConfiguration() self.timeout = timeout - self.poolingTimeout = poolingTimeout + self.maximumAllowedIdleTimeInConnectionPool = maximumAllowedIdleTimeInConnectionPool self.proxy = proxy self.ignoreUncleanSSLShutdown = ignoreUncleanSSLShutdown self.decompression = decompression @@ -444,7 +444,7 @@ public class HTTPClient { tlsConfiguration: tlsConfiguration, redirectConfiguration: redirectConfiguration, timeout: timeout, - poolingTimeout: .seconds(60), + maximumAllowedIdleTimeInConnectionPool: .seconds(60), proxy: proxy, ignoreUncleanSSLShutdown: ignoreUncleanSSLShutdown, decompression: decompression @@ -454,14 +454,14 @@ public class HTTPClient { public init(certificateVerification: CertificateVerification, redirectConfiguration: RedirectConfiguration? = nil, timeout: Timeout = Timeout(), - poolingTimeout: TimeAmount = .seconds(60), + maximumAllowedIdleTimeInConnectionPool: TimeAmount = .seconds(60), proxy: Proxy? = nil, ignoreUncleanSSLShutdown: Bool = false, decompression: Decompression = .disabled) { self.tlsConfiguration = TLSConfiguration.forClient(certificateVerification: certificateVerification) self.redirectConfiguration = redirectConfiguration ?? RedirectConfiguration() self.timeout = timeout - self.poolingTimeout = poolingTimeout + self.maximumAllowedIdleTimeInConnectionPool = maximumAllowedIdleTimeInConnectionPool self.proxy = proxy self.ignoreUncleanSSLShutdown = ignoreUncleanSSLShutdown self.decompression = decompression @@ -477,7 +477,7 @@ public class HTTPClient { certificateVerification: certificateVerification, redirectConfiguration: redirectConfiguration, timeout: timeout, - poolingTimeout: .seconds(60), + maximumAllowedIdleTimeInConnectionPool: .seconds(60), proxy: proxy, ignoreUncleanSSLShutdown: ignoreUncleanSSLShutdown, decompression: decompression From fdd9648023602f374cbd752d89b652cbaf51866e Mon Sep 17 00:00:00 2001 From: adtrevor Date: Wed, 26 Feb 2020 21:26:34 +0100 Subject: [PATCH 06/14] Fix config property name in tests --- Tests/AsyncHTTPClientTests/HTTPClientTests.swift | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Tests/AsyncHTTPClientTests/HTTPClientTests.swift b/Tests/AsyncHTTPClientTests/HTTPClientTests.swift index d5137384d..049ce78d4 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientTests.swift @@ -1634,7 +1634,7 @@ class HTTPClientTests: XCTestCase { func testPoolClosesIdleConnections() { let httpBin = HTTPBin() - let httpClient = HTTPClient(eventLoopGroupProvider: .createNew, configuration: .init(poolingTimeout: .milliseconds(100))) + let httpClient = HTTPClient(eventLoopGroupProvider: .createNew, configuration: .init(maximumAllowedIdleTimeInConnectionPool: .milliseconds(100))) defer { XCTAssertNoThrow(try httpBin.shutdown()) XCTAssertNoThrow(try httpClient.syncShutdown(requiresCleanClose: true)) @@ -1646,7 +1646,7 @@ class HTTPClientTests: XCTestCase { func testRacePoolIdleConnectionsAndGet() { let httpBin = HTTPBin() - let httpClient = HTTPClient(eventLoopGroupProvider: .createNew, configuration: .init(poolingTimeout: .milliseconds(10))) + let httpClient = HTTPClient(eventLoopGroupProvider: .createNew, configuration: .init(maximumAllowedIdleTimeInConnectionPool: .milliseconds(10))) defer { XCTAssertNoThrow(try httpBin.shutdown()) XCTAssertNoThrow(try httpClient.syncShutdown(requiresCleanClose: true)) From 9559b3fcc0fc57ba546f2b53b9821db800ccdb0a Mon Sep 17 00:00:00 2001 From: adtrevor Date: Thu, 27 Feb 2020 00:19:32 +0100 Subject: [PATCH 07/14] Fix redundant handler removal --- Sources/AsyncHTTPClient/HTTPClient.swift | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/Sources/AsyncHTTPClient/HTTPClient.swift b/Sources/AsyncHTTPClient/HTTPClient.swift index 48e29fb41..da6ca3952 100644 --- a/Sources/AsyncHTTPClient/HTTPClient.swift +++ b/Sources/AsyncHTTPClient/HTTPClient.swift @@ -321,18 +321,16 @@ public class HTTPClient { connection.flatMap { connection -> EventLoopFuture in let channel = connection.channel + let addedFuture: EventLoopFuture + switch self.configuration.decompression { + case .disabled: + addedFuture = channel.eventLoop.makeSucceededFuture(()) + case .enabled(let limit): + let decompressHandler = NIOHTTPResponseDecompressor(limit: limit) + addedFuture = channel.pipeline.addHandler(decompressHandler) + } - return connection.removeHandler(IdleStateHandler.self).flatMap { - connection.removeHandler(IdlePoolConnectionHandler.self) - }.flatMap { - switch self.configuration.decompression { - case .disabled: - return channel.eventLoop.makeSucceededFuture(()) - case .enabled(let limit): - let decompressHandler = NIOHTTPResponseDecompressor(limit: limit) - return channel.pipeline.addHandler(decompressHandler) - } - }.flatMap { + return addedFuture.flatMap { if let timeout = self.resolve(timeout: self.configuration.timeout.read, deadline: deadline) { return channel.pipeline.addHandler(IdleStateHandler(readTimeout: timeout)) } else { From 6c23cf51532e1e94410409683e420f9611fd36e6 Mon Sep 17 00:00:00 2001 From: Johannes Weiss Date: Thu, 27 Feb 2020 18:46:50 +0000 Subject: [PATCH 08/14] test delayed close --- .../HTTPClientInternalTests.swift | 100 ++++++++++++++++++ 1 file changed, 100 insertions(+) diff --git a/Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift b/Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift index a016b2b31..57d7bacbc 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift @@ -623,4 +623,104 @@ class HTTPClientInternalTests: XCTestCase { } XCTAssertNoThrow(try client.syncShutdown()) } + + func testRaceBetweenAsynchronousCloseAndChannelUsabilityDetection() { + final class DelayChannelCloseUntilToldHandler: ChannelOutboundHandler { + typealias OutboundIn = Any + + enum State { + case idling + case delayedClose + case closeDone + } + + var state: State = .idling + let doTheCloseNowFuture: EventLoopFuture + let sawTheClosePromise: EventLoopPromise + + init(doTheCloseNowFuture: EventLoopFuture, + sawTheClosePromise: EventLoopPromise) { + self.doTheCloseNowFuture = doTheCloseNowFuture + self.sawTheClosePromise = sawTheClosePromise + } + + func handlerRemoved(context: ChannelHandlerContext) { + XCTAssertEqual(.closeDone, self.state) + } + + func close(context: ChannelHandlerContext, mode: CloseMode, promise: EventLoopPromise?) { + XCTAssertEqual(.idling, self.state) + self.state = .delayedClose + self.sawTheClosePromise.succeed(()) + // let's hold the close until the future's complete + self.doTheCloseNowFuture.whenSuccess { + context.close(mode: mode).map { + XCTAssertEqual(.delayedClose, self.state) + self.state = .closeDone + }.cascade(to: promise) + } + } + } + + let web = HTTPBin() + defer { + XCTAssertNoThrow(try web.shutdown()) + } + + let client = HTTPClient(eventLoopGroupProvider: .createNew) + defer { + XCTAssertNoThrow(try client.syncShutdown()) + } + + let req = try! HTTPClient.Request(url: "http://localhost:\(web.serverChannel.localAddress!.port!)/get", + method: .GET, + body: nil) + + // Let's start by getting a connection so we can mess with the Channel :). + var maybeConnection: ConnectionPool.Connection? + XCTAssertNoThrow(try maybeConnection = client.pool.getConnection(for: req, + preference: .indifferent, + on: client.eventLoopGroup.next(), + deadline: nil).wait()) + guard let connection = maybeConnection else { + XCTFail("couldn't make connection") + return + } + + let channel = connection.channel + let doActualCloseNowPromise = channel.eventLoop.makePromise(of: Void.self) + let sawTheClosePromise = channel.eventLoop.makePromise(of: Void.self) + + XCTAssertNoThrow(try channel.pipeline.addHandler(DelayChannelCloseUntilToldHandler(doTheCloseNowFuture: doActualCloseNowPromise.futureResult, + sawTheClosePromise: sawTheClosePromise), + position: .first).wait()) + client.pool.release(connection) + + XCTAssertNoThrow(try client.execute(request: req).wait()) + + // Now, let's pretend the timeout happened + channel.pipeline.fireUserInboundEventTriggered(IdleStateHandler.IdleStateEvent.write) + + // The Channel's closure should have already been initialised now but still, let's make sure the close + // was initiated + XCTAssertNoThrow(try sawTheClosePromise.futureResult.wait()) + // The Channel should still be active though because we delayed the close through our handler above. + XCTAssertTrue(channel.isActive) + + // When asking for a connection again, we should _not_ get the same one back because we did most of the close, + // similar to what the SSLHandler would do. + XCTAssertNoThrow(try maybeConnection = client.pool.getConnection(for: req, + preference: .indifferent, + on: client.eventLoopGroup.next(), + deadline: nil).wait()) + doActualCloseNowPromise.succeed(()) + guard let connection2 = maybeConnection else { + XCTFail("couldn't get second connection") + return + } + + XCTAssert(connection !== connection2) + client.pool.release(connection2) + XCTAssertTrue(connection2.channel.isActive) + } } From 44079a8a64f404e82c8c490fe99a02d8b604946a Mon Sep 17 00:00:00 2001 From: adtrevor Date: Sun, 1 Mar 2020 15:11:26 +0100 Subject: [PATCH 09/14] Fix delayed close race This should fix the race. Deterministic tests are not possible with the current implementation --- Sources/AsyncHTTPClient/ConnectionPool.swift | 22 +++++++++++++++---- Sources/AsyncHTTPClient/HTTPHandler.swift | 10 +++++---- .../HTTPClientInternalTests+XCTest.swift | 1 + .../HTTPClientInternalTests.swift | 3 ++- 4 files changed, 27 insertions(+), 9 deletions(-) diff --git a/Sources/AsyncHTTPClient/ConnectionPool.swift b/Sources/AsyncHTTPClient/ConnectionPool.swift index df4f53cd0..11f5c4829 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool.swift @@ -227,10 +227,24 @@ final class ConnectionPool { func removeIdleConnectionHandlersForLease() -> EventLoopFuture { return self.channel.eventLoop.flatSubmit { - self.removeHandler(IdleStateHandler.self).flatMap { - self.removeHandler(IdlePoolConnectionHandler.self) - }.flatMap { - if self.channel.isActive { + self.removeHandler(IdleStateHandler.self).flatMap { () -> EventLoopFuture in + self.channel.pipeline.handler(type: IdlePoolConnectionHandler.self).flatMap { idleHandler -> EventLoopFuture in + + self.channel.pipeline.removeHandler(idleHandler).flatMapError { error in + fatalError("\(error)") + return self.channel.eventLoop.makeSucceededFuture(()) + }.map { + idleHandler.timeoutClosed.load() + } + }.flatMapError { error in + if let channelError = error as? ChannelPipelineError, channelError == .notFound { + return self.channel.eventLoop.makeSucceededFuture(!self.channel.isActive) + } else { + return self.channel.eventLoop.makeFailedFuture(error) + } + } + }.flatMap { timeoutClosed in + if timeoutClosed == false { return self.channel.eventLoop.makeSucceededFuture(self) } else { return self.channel.eventLoop.makeFailedFuture(IdleCloseError()) diff --git a/Sources/AsyncHTTPClient/HTTPHandler.swift b/Sources/AsyncHTTPClient/HTTPHandler.swift index ee92f35e6..ddc4837d5 100644 --- a/Sources/AsyncHTTPClient/HTTPHandler.swift +++ b/Sources/AsyncHTTPClient/HTTPHandler.swift @@ -574,10 +574,10 @@ extension HTTPClient { }.flatMap { connection.removeHandler(TaskHandler.self) }.flatMap { - connection.channel.pipeline.addHandlers([ - IdleStateHandler(writeTimeout: self.poolingTimeout), - IdlePoolConnectionHandler(), - ]) + let idlePoolConnectionHandler = IdlePoolConnectionHandler() + return connection.channel.pipeline.addHandler(idlePoolConnectionHandler, position: .last).flatMap { + connection.channel.pipeline.addHandler(IdleStateHandler(writeTimeout: self.poolingTimeout), position: .before(idlePoolConnectionHandler)) + } }.flatMapError { error in if let error = error as? ChannelError, error == .ioOnClosedChannel { // We may get this error if channel is released because it is @@ -1026,8 +1026,10 @@ internal struct RedirectHandler { class IdlePoolConnectionHandler: ChannelInboundHandler, RemovableChannelHandler { typealias InboundIn = NIOAny + let timeoutClosed: NIOAtomic = .makeAtomic(value: false) func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) { if let idleEvent = event as? IdleStateHandler.IdleStateEvent, idleEvent == .write { + self.timeoutClosed.store(true) context.close(promise: nil) } else { context.fireUserInboundEventTriggered(event) diff --git a/Tests/AsyncHTTPClientTests/HTTPClientInternalTests+XCTest.swift b/Tests/AsyncHTTPClientTests/HTTPClientInternalTests+XCTest.swift index a66cfcbf2..870ddfc37 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientInternalTests+XCTest.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientInternalTests+XCTest.swift @@ -36,6 +36,7 @@ extension HTTPClientInternalTests { ("testResponseConnectionCloseGet", testResponseConnectionCloseGet), ("testWeNoticeRemoteClosuresEvenWhenConnectionIsIdleInPool", testWeNoticeRemoteClosuresEvenWhenConnectionIsIdleInPool), ("testWeTolerateConnectionsGoingAwayWhilstPoolIsShuttingDown", testWeTolerateConnectionsGoingAwayWhilstPoolIsShuttingDown), + ("testRaceBetweenAsynchronousCloseAndChannelUsabilityDetection", testRaceBetweenAsynchronousCloseAndChannelUsabilityDetection), ] } } diff --git a/Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift b/Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift index 57d7bacbc..80a6e83bc 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift @@ -707,13 +707,14 @@ class HTTPClientInternalTests: XCTestCase { // The Channel should still be active though because we delayed the close through our handler above. XCTAssertTrue(channel.isActive) + doActualCloseNowPromise.succeed(()) // When asking for a connection again, we should _not_ get the same one back because we did most of the close, // similar to what the SSLHandler would do. XCTAssertNoThrow(try maybeConnection = client.pool.getConnection(for: req, preference: .indifferent, on: client.eventLoopGroup.next(), deadline: nil).wait()) - doActualCloseNowPromise.succeed(()) + guard let connection2 = maybeConnection else { XCTFail("couldn't get second connection") return From e6f2584ff0cdf019431426876c08a19bd7d1d6fa Mon Sep 17 00:00:00 2001 From: adtrevor Date: Sun, 1 Mar 2020 15:27:07 +0100 Subject: [PATCH 10/14] Fix warning --- Sources/AsyncHTTPClient/ConnectionPool.swift | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/Sources/AsyncHTTPClient/ConnectionPool.swift b/Sources/AsyncHTTPClient/ConnectionPool.swift index 11f5c4829..9a717c31e 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool.swift @@ -228,11 +228,9 @@ final class ConnectionPool { func removeIdleConnectionHandlersForLease() -> EventLoopFuture { return self.channel.eventLoop.flatSubmit { self.removeHandler(IdleStateHandler.self).flatMap { () -> EventLoopFuture in - self.channel.pipeline.handler(type: IdlePoolConnectionHandler.self).flatMap { idleHandler -> EventLoopFuture in - - self.channel.pipeline.removeHandler(idleHandler).flatMapError { error in - fatalError("\(error)") - return self.channel.eventLoop.makeSucceededFuture(()) + self.channel.pipeline.handler(type: IdlePoolConnectionHandler.self).flatMap { idleHandler in + self.channel.pipeline.removeHandler(idleHandler).flatMapError { _ in + self.channel.eventLoop.makeSucceededFuture(()) }.map { idleHandler.timeoutClosed.load() } From 30e2e309e573e38e94dbe144e6228382bce3ee67 Mon Sep 17 00:00:00 2001 From: Johannes Weiss Date: Tue, 3 Mar 2020 19:48:40 +0000 Subject: [PATCH 11/14] fix test --- .../HTTPClientInternalTests.swift | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift b/Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift index 80a6e83bc..9288814c6 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift @@ -707,14 +707,15 @@ class HTTPClientInternalTests: XCTestCase { // The Channel should still be active though because we delayed the close through our handler above. XCTAssertTrue(channel.isActive) - doActualCloseNowPromise.succeed(()) // When asking for a connection again, we should _not_ get the same one back because we did most of the close, // similar to what the SSLHandler would do. - XCTAssertNoThrow(try maybeConnection = client.pool.getConnection(for: req, - preference: .indifferent, - on: client.eventLoopGroup.next(), - deadline: nil).wait()) + let connection2Future = client.pool.getConnection(for: req, + preference: .indifferent, + on: client.eventLoopGroup.next(), + deadline: nil) + doActualCloseNowPromise.succeed(()) + XCTAssertNoThrow(try maybeConnection = connection2Future.wait()) guard let connection2 = maybeConnection else { XCTFail("couldn't get second connection") return From 98e4886a1107d02ae0bd2a8baf92b69ef12dd74f Mon Sep 17 00:00:00 2001 From: adtrevor Date: Tue, 3 Mar 2020 21:04:38 +0100 Subject: [PATCH 12/14] Explain why we ignore .notFound error for the timeout handlers --- Sources/AsyncHTTPClient/ConnectionPool.swift | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Sources/AsyncHTTPClient/ConnectionPool.swift b/Sources/AsyncHTTPClient/ConnectionPool.swift index 9a717c31e..64c9906ee 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool.swift @@ -235,6 +235,8 @@ final class ConnectionPool { idleHandler.timeoutClosed.load() } }.flatMapError { error in + // These handlers are only added on connection release, they are not added + // when a connection is made to be instantly leased, so we ignore this error if let channelError = error as? ChannelPipelineError, channelError == .notFound { return self.channel.eventLoop.makeSucceededFuture(!self.channel.isActive) } else { From 87b6b5a36a5a9e4dcf91788e5bdb46b7b5446248 Mon Sep 17 00:00:00 2001 From: adtrevor Date: Tue, 3 Mar 2020 21:10:17 +0100 Subject: [PATCH 13/14] Add additional activity checks --- Sources/AsyncHTTPClient/ConnectionPool.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/AsyncHTTPClient/ConnectionPool.swift b/Sources/AsyncHTTPClient/ConnectionPool.swift index 64c9906ee..931124a54 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool.swift @@ -232,7 +232,7 @@ final class ConnectionPool { self.channel.pipeline.removeHandler(idleHandler).flatMapError { _ in self.channel.eventLoop.makeSucceededFuture(()) }.map { - idleHandler.timeoutClosed.load() + idleHandler.timeoutClosed.load() || !self.channel.isActive } }.flatMapError { error in // These handlers are only added on connection release, they are not added From 9e9d3b7a99c38e88c971ddacde24360088a8b057 Mon Sep 17 00:00:00 2001 From: adtrevor Date: Tue, 3 Mar 2020 22:13:01 +0100 Subject: [PATCH 14/14] Rename and invert timeoutClosed --- Sources/AsyncHTTPClient/ConnectionPool.swift | 12 ++++++------ Sources/AsyncHTTPClient/HTTPHandler.swift | 9 +++++++-- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/Sources/AsyncHTTPClient/ConnectionPool.swift b/Sources/AsyncHTTPClient/ConnectionPool.swift index 931124a54..d17612c38 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool.swift @@ -232,28 +232,28 @@ final class ConnectionPool { self.channel.pipeline.removeHandler(idleHandler).flatMapError { _ in self.channel.eventLoop.makeSucceededFuture(()) }.map { - idleHandler.timeoutClosed.load() || !self.channel.isActive + idleHandler.hasNotSentClose && self.channel.isActive } }.flatMapError { error in // These handlers are only added on connection release, they are not added // when a connection is made to be instantly leased, so we ignore this error if let channelError = error as? ChannelPipelineError, channelError == .notFound { - return self.channel.eventLoop.makeSucceededFuture(!self.channel.isActive) + return self.channel.eventLoop.makeSucceededFuture(self.channel.isActive) } else { return self.channel.eventLoop.makeFailedFuture(error) } } - }.flatMap { timeoutClosed in - if timeoutClosed == false { + }.flatMap { channelIsUsable in + if channelIsUsable { return self.channel.eventLoop.makeSucceededFuture(self) } else { - return self.channel.eventLoop.makeFailedFuture(IdleCloseError()) + return self.channel.eventLoop.makeFailedFuture(InactiveChannelError()) } } } } - struct IdleCloseError: Error {} + struct InactiveChannelError: Error {} } /// A connection provider of `HTTP/1.1` connections with a given `Key` (host, scheme, port) diff --git a/Sources/AsyncHTTPClient/HTTPHandler.swift b/Sources/AsyncHTTPClient/HTTPHandler.swift index ddc4837d5..04c8cb7af 100644 --- a/Sources/AsyncHTTPClient/HTTPHandler.swift +++ b/Sources/AsyncHTTPClient/HTTPHandler.swift @@ -1026,10 +1026,15 @@ internal struct RedirectHandler { class IdlePoolConnectionHandler: ChannelInboundHandler, RemovableChannelHandler { typealias InboundIn = NIOAny - let timeoutClosed: NIOAtomic = .makeAtomic(value: false) + + let _hasNotSentClose: NIOAtomic = .makeAtomic(value: true) + var hasNotSentClose: Bool { + return self._hasNotSentClose.load() + } + func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) { if let idleEvent = event as? IdleStateHandler.IdleStateEvent, idleEvent == .write { - self.timeoutClosed.store(true) + self._hasNotSentClose.store(false) context.close(promise: nil) } else { context.fireUserInboundEventTriggered(event)