From 18a5bd65cd2e20b3af8016c95e4cb8b865d132f1 Mon Sep 17 00:00:00 2001 From: Artem Redkin Date: Fri, 3 Apr 2020 17:38:40 +0100 Subject: [PATCH 01/31] refactor pool --- Sources/AsyncHTTPClient/ConnectionPool.swift | 569 +++++------------- Sources/AsyncHTTPClient/HTTPClient.swift | 31 +- Sources/AsyncHTTPClient/HTTPHandler.swift | 7 +- Sources/AsyncHTTPClient/Utils.swift | 22 - .../HTTPClientTests.swift | 14 + 5 files changed, 182 insertions(+), 461 deletions(-) diff --git a/Sources/AsyncHTTPClient/ConnectionPool.swift b/Sources/AsyncHTTPClient/ConnectionPool.swift index dfb7d5f4f..f8c121701 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool.swift @@ -27,14 +27,14 @@ final class ConnectionPool { /// The main data structure used by the `ConnectionPool` to retreive and create connections associated /// to a given `Key` . + /// /// - Warning: This property should be accessed with proper synchronization, see `connectionProvidersLock` - private var connectionProviders: [Key: HTTP1ConnectionProvider] = [:] + private var providers: [Key: HTTP1ConnectionProvider] = [:] /// The lock used by the connection pool used to ensure correct synchronization of accesses to `_connectionProviders` /// - /// /// - Warning: This lock should always be acquired *before* `HTTP1ConnectionProvider`s `stateLock` if used in combination with it. - private let connectionProvidersLock = Lock() + private let lock = Lock() init(configuration: HTTPClient.Configuration) { self.configuration = configuration @@ -47,8 +47,8 @@ final class ConnectionPool { /// Having a default `EventLoop` shared by the *channel* and the *delegate* avoids /// loss of performance due to `EventLoop` hopping func associatedEventLoop(for key: Key) -> EventLoop? { - return self.connectionProvidersLock.withLock { - self.connectionProviders[key]?.eventLoop + return self.lock.withLock { + self.providers[key]?.eventLoop } } @@ -65,18 +65,12 @@ final class ConnectionPool { func getConnection(for request: HTTPClient.Request, preference: HTTPClient.EventLoopPreference, on eventLoop: EventLoop, deadline: NIODeadline?) -> EventLoopFuture { let key = Key(request) - let provider: HTTP1ConnectionProvider = self.connectionProvidersLock.withLock { - if let existing = self.connectionProviders[key] { - existing.stateLock.withLock { - existing.state.pending += 1 - } + let provider: HTTP1ConnectionProvider = self.lock.withLock { + if let existing = self.providers[key] { return existing } else { - let http1Provider = HTTP1ConnectionProvider(key: key, eventLoop: eventLoop, configuration: self.configuration, parentPool: self) - self.connectionProviders[key] = http1Provider - http1Provider.stateLock.withLock { - http1Provider.state.pending += 1 - } + let http1Provider = HTTP1ConnectionProvider(key: key, eventLoop: eventLoop, configuration: self.configuration, pool: self) + self.providers[key] = http1Provider return http1Provider } } @@ -85,37 +79,28 @@ final class ConnectionPool { } func release(_ connection: Connection) { - let connectionProvider = self.connectionProvidersLock.withLock { - self.connectionProviders[connection.key] + let connectionProvider = self.lock.withLock { + self.providers[connection.key] } + if let connectionProvider = connectionProvider { connectionProvider.release(connection: connection) } } - func prepareForClose(on eventLoop: EventLoop) -> EventLoopFuture { - let connectionProviders = self.connectionProvidersLock.withLock { - self.connectionProviders.values + func close(on eventLoop: EventLoop) { + let providers = self.lock.withLock { + self.providers.values } - return EventLoopFuture.andAllComplete(connectionProviders.map { $0.prepareForClose() }, on: eventLoop) - } - - func close(on eventLoop: EventLoop) -> EventLoopFuture { - let connectionProviders = self.connectionProvidersLock.withLock { - self.connectionProviders.values - } - - return EventLoopFuture.andAllComplete(connectionProviders.map { $0.close() }, on: eventLoop).map { - self.connectionProvidersLock.withLock { - assert(self.connectionProviders.count == 0, "left-overs: \(self.connectionProviders)") - } + providers.forEach { + $0.close() } } var connectionProviderCount: Int { - return self.connectionProvidersLock.withLock { - self.connectionProviders.count + return self.lock.withLock { + self.providers.count } } @@ -162,44 +147,17 @@ final class ConnectionPool { /// /// - Warning: `Connection` properties are not thread-safe and should be used with proper synchronization class Connection: CustomStringConvertible { - init(key: Key, channel: Channel, parentPool: ConnectionPool) { - self.key = key - self.channel = channel - self.parentPool = parentPool - self.closePromise = channel.eventLoop.makePromise(of: Void.self) - self.closeFuture = self.closePromise.futureResult - } - - /// Release this `Connection` to its associated `HTTP1ConnectionProvider` in the parent `ConnectionPool` - /// - /// This is exactly equivalent to calling `.release(theProvider)` on `ConnectionPool` - /// - /// - Warning: This only releases the connection and doesn't take care of cleaning handlers in the - /// `Channel` pipeline. - func release() { - self.parentPool.release(self) - } - - func close() -> EventLoopFuture { - self.channel.close(promise: nil) - return self.closeFuture - } - - var description: String { - return "Connection { channel: \(self.channel) }" - } - /// The connection pool this `Connection` belongs to. /// /// This enables calling methods like `release()` directly on a `Connection` instead of /// calling `pool.release(connection)`. This gives a more object oriented feel to the API /// and can avoid having to keep explicit references to the pool at call site. - let parentPool: ConnectionPool + private let pool: ConnectionPool /// The `Key` of the `HTTP1ConnectionProvider` this `Connection` belongs to /// /// This lets `ConnectionPool` know the relationship between `Connection`s and `HTTP1ConnectionProvider`s - fileprivate let key: Key + let key: Key /// The `Channel` of this `Connection` /// @@ -213,22 +171,33 @@ final class ConnectionPool { /// Indicates that this connection is about to close var isClosing: Bool = false - /// Indicates wether the usual close callback should be run or not, this allows customizing what happens - /// on close in some cases such as for the `.replaceConnection` action - /// - /// - Warning: This should be accessed under the `stateLock` of `HTTP1ConnectionProvider` - fileprivate var mustRunDefaultCloseCallback: Bool = true + init(key: Key, channel: Channel, pool: ConnectionPool) { + self.key = key + self.channel = channel + self.pool = pool + } + + var description: String { + return "Connection { channel: \(self.channel) }" + } /// Convenience property indicating wether the underlying `Channel` is active or not var isActiveEstimation: Bool { - return self.channel.isActive + return self.channel.isActive && !self.isClosing } - fileprivate var closePromise: EventLoopPromise + /// Release this `Connection` to its associated `HTTP1ConnectionProvider` in the parent `ConnectionPool` + /// + /// - Warning: This only releases the connection and doesn't take care of cleaning handlers in the `Channel` pipeline. + func release() { + self.pool.release(self) + } - var closeFuture: EventLoopFuture + func close() { + self.channel.close(promise: nil) + } - func removeIdleConnectionHandlersForLease() -> EventLoopFuture { + func cancelIdleTimeout() -> EventLoopFuture { return self.channel.eventLoop.flatSubmit { self.removeHandler(IdleStateHandler.self).flatMap { () -> EventLoopFuture in self.channel.pipeline.handler(type: IdlePoolConnectionHandler.self).flatMap { idleHandler in @@ -265,36 +234,40 @@ final class ConnectionPool { /// of concurrent requests as it has built-in politeness regarding the maximum number /// of concurrent requests to the server. class HTTP1ConnectionProvider: CustomStringConvertible { - /// The default `EventLoop` for this provider - /// - /// The default event loop is used to create futures and is used - /// when creating `Channel`s for requests for which the - /// `EventLoopPreference` is set to `.indifferent` - let eventLoop: EventLoop - /// The client configuration used to bootstrap new requests private let configuration: HTTPClient.Configuration + /// The pool this provider belongs to + private let pool: ConnectionPool + /// The key associated with this provider private let key: ConnectionPool.Key - /// The `State` of this provider + /// The default `EventLoop` for this provider /// - /// This property holds data structures representing the current state of the provider - /// - Warning: This type isn't thread safe and should be accessed with proper - /// synchronization (see the `stateLock` property) - fileprivate var state: State + /// The default event loop is used to create futures and is used when creating `Channel`s for requests + /// for which the `EventLoopPreference` is set to `.indifferent` + let eventLoop: EventLoop /// The lock used to access and modify the `state` property /// /// - Warning: This lock should always be acquired *after* `ConnectionPool`s `connectionProvidersLock` if used in combination with it. - fileprivate let stateLock = Lock() + private let lock = Lock() /// The maximum number of concurrent connections to a given (host, scheme, port) private let maximumConcurrentConnections: Int = 8 - /// The pool this provider belongs to - private let parentPool: ConnectionPool + /// Opened connections that are available + var availableConnections: CircularBuffer = .init(initialCapacity: 8) + + /// Consumers that weren't able to get a new connection without exceeding + /// `maximumConcurrentConnections` get a `Future` + /// whose associated promise is stored in `Waiter`. The promise is completed + /// as soon as possible by the provider, in FIFO order. + var waiters: CircularBuffer = .init(initialCapacity: 8) + + // TODO: description + var openedConnectionsCount: Int = 0 /// Creates a new `HTTP1ConnectionProvider` /// @@ -302,77 +275,91 @@ final class ConnectionPool { /// - key: The `Key` (host, scheme, port) this provider is associated to /// - configuration: The client configuration used globally by all requests /// - initialConnection: The initial connection the pool initializes this provider with - /// - parentPool: The pool this provider belongs to - init(key: ConnectionPool.Key, eventLoop: EventLoop, configuration: HTTPClient.Configuration, parentPool: ConnectionPool) { + /// - pool: The pool this provider belongs to + init(key: ConnectionPool.Key, eventLoop: EventLoop, configuration: HTTPClient.Configuration, pool: ConnectionPool) { self.eventLoop = eventLoop self.configuration = configuration self.key = key - self.parentPool = parentPool - self.state = State(eventLoop: eventLoop, parentPool: parentPool, key: key) + self.pool = pool } deinit { - assert(self.state.activity == .closed, "Non closed on deinit") - assert(self.state.availableConnections.isEmpty, "Available connections should be empty before deinit") - assert(self.state.leased == 0, "All leased connections should have been returned before deinit") - assert(self.state.waiters.count == 0, "Waiters on deinit: \(self.state.waiters)") + // TODO: +// assert(self.state.activity == .closed, "Non closed on deinit") +// assert(self.state.availableConnections.isEmpty, "Available connections should be empty before deinit") +// assert(self.state.leased == 0, "All leased connections should have been returned before deinit") +// assert(self.state.waiters.count == 0, "Waiters on deinit: \(self.state.waiters)") } var description: String { - return "HTTP1ConnectionProvider { key: \(self.key), state: \(self.state) }" + return "HTTP1ConnectionProvider { key: \(self.key) }" } func getConnection(preference: HTTPClient.EventLoopPreference) -> EventLoopFuture { - self.activityPrecondition(expected: [.opened]) - let action = self.stateLock.withLock { self.state.connectionAction(for: preference) } - switch action { - case .leaseConnection(let connection): - return connection.removeIdleConnectionHandlersForLease().flatMapError { _ in - connection.closeFuture.flatMap { // We ensure close actions are run first - let defaultEventLoop = self.stateLock.withLock { - self.state.defaultEventLoop + let promise = self.eventLoop.makePromise(of: Connection.self) + + self.lock.withLockVoid { + if let connection = self.availableConnections.popFirst() { + connection.isLeased = true + // check if we can vend this connection to caller + connection.cancelIdleTimeout().flatMapError { error in + if error is Connection.InactiveChannelError { + return self.makeConnection(on: preference.bestEventLoop ?? self.eventLoop) } - return self.makeConnection(on: preference.bestEventLoop ?? defaultEventLoop) + return connection.channel.eventLoop.makeFailedFuture(error) } + .cascade(to: promise) + } else if self.openedConnectionsCount < self.maximumConcurrentConnections { + self.openedConnectionsCount += 1 + self.makeConnection(on: preference.bestEventLoop ?? self.eventLoop).cascade(to: promise) + } else { + self.waiters.append(.init(promise: promise, preference: preference)) } - case .makeConnection(let eventLoop): - return self.makeConnection(on: eventLoop) - case .leaseFutureConnection(let futureConnection): - return futureConnection } + + return promise.futureResult } func release(connection: Connection) { - self.activityPrecondition(expected: [.opened, .closing]) - let action = self.parentPool.connectionProvidersLock.withLock { - self.stateLock.withLock { self.state.releaseAction(for: connection) } - } - switch action { - case .succeed(let promise): - promise.succeed(connection) - - case .makeConnectionAndComplete(let eventLoop, let promise): - self.makeConnection(on: eventLoop).cascade(to: promise) - - case .replaceConnection(let eventLoop, let promise): - connection.close().flatMap { - self.makeConnection(on: eventLoop) - }.whenComplete { result in - switch result { - case .success(let connection): - promise.succeed(connection) - case .failure(let error): - promise.fail(error) + self.lock.withLock { + if connection.isActiveEstimation { // If connection is alive, we can give to a next waiter + if let waiter = self.waiters.popFirst() { + // TODO: + connection.channel.eventLoop.execute { + connection.cancelIdleTimeout().flatMapError { error in + if error is Connection.InactiveChannelError { + return self.makeConnection(on: waiter.preference.bestEventLoop ?? self.eventLoop) + } + return connection.channel.eventLoop.makeFailedFuture(error) + } + .cascade(to: waiter.promise) + } + } else { + connection.isLeased = false + self.availableConnections.append(connection) + } + } else { + connection.close() + self.openedConnectionsCount -= 1 + + if let waiter = self.waiters.popFirst() { + self.openedConnectionsCount += 1 + self.makeConnection(on: waiter.preference.bestEventLoop ?? self.eventLoop).cascade(to: waiter.promise) } } + } + } - case .none: - break + private func processNextWaiter() { + self.lock.withLock { + if let waiter = self.waiters.popFirst() { + self.openedConnectionsCount += 1 + self.makeConnection(on: waiter.preference.bestEventLoop ?? self.eventLoop).cascade(to: waiter.promise) + } } } private func makeConnection(on eventLoop: EventLoop) -> EventLoopFuture { - self.activityPrecondition(expected: [.opened]) let handshakePromise = eventLoop.makePromise(of: Void.self) let bootstrap = ClientBootstrap.makeHTTPClientBootstrapBase(group: eventLoop, host: self.key.host, port: self.key.port, configuration: self.configuration) let address = HTTPClient.resolveAddress(host: self.key.host, port: self.key.port, proxy: self.configuration.proxy) @@ -385,314 +372,62 @@ final class ConnectionPool { channel = bootstrap.connect(unixDomainSocketPath: self.key.unixPath) } - return channel.flatMap { channel -> EventLoopFuture in + return channel.flatMap { channel -> EventLoopFuture in channel.pipeline.addSSLHandlerIfNeeded(for: self.key, tlsConfiguration: self.configuration.tlsConfiguration, handshakePromise: handshakePromise) return handshakePromise.futureResult.flatMap { channel.pipeline.addHTTPClientHandlers(leftOverBytesStrategy: .forwardBytes) }.map { - let connection = Connection(key: self.key, channel: channel, parentPool: self.parentPool) + let connection = Connection(key: self.key, channel: channel, pool: self.pool) connection.isLeased = true return connection } - }.map { connection in - self.configureCloseCallback(of: connection) - return connection }.flatMapError { error in // This promise may not have been completed if we reach this // so we fail it to avoid any leak handshakePromise.fail(error) - let action = self.parentPool.connectionProvidersLock.withLock { - self.stateLock.withLock { - self.state.failedConnectionAction() - } - } - switch action { - case .makeConnectionAndComplete(let el, let promise): - self.makeConnection(on: el).cascade(to: promise) - case .none: - break - } - return self.eventLoop.makeFailedFuture(error) - } - } - - /// Adds a callback on connection close that asks the `state` what to do about this - /// - /// The callback informs the state about the event, and the state returns a - /// `ClosedConnectionRemoveAction` which instructs it about what it should do. - private func configureCloseCallback(of connection: Connection) { - connection.channel.closeFuture.whenComplete { result in - let action: HTTP1ConnectionProvider.State.ClosedConnectionRemoveAction? = self.parentPool.connectionProvidersLock.withLock { - self.stateLock.withLock { - guard connection.mustRunDefaultCloseCallback else { - return nil - } - switch result { - case .success: - return self.state.removeClosedConnection(connection) - - case .failure(let error): - preconditionFailure("Connection close future failed with error: \(error)") - } - } - } - - if let action = action { - switch action { - case .makeConnectionAndComplete(let el, let promise): - self.makeConnection(on: el).cascade(to: promise) - case .none: - break - } - } - connection.closePromise.succeed(()) + // there is no connection here anymore, we need to bootstrap next waiter + self.openedConnectionsCount -= 1 + self.processNextWaiter() + return self.eventLoop.makeFailedFuture(error) } } - /// Removes and fails all `waiters`, remove existing `availableConnections` and sets `state.activity` to `.closing` - func prepareForClose() -> EventLoopFuture { - let (waitersFutures, closeFutures) = self.stateLock.withLock { () -> ([EventLoopFuture], [EventLoopFuture]) in - // Fail waiters - let waitersCopy = self.state.waiters - self.state.waiters.removeAll() - let waitersPromises = waitersCopy.map { $0.promise } - let waitersFutures = waitersPromises.map { $0.futureResult } - waitersPromises.forEach { $0.fail(HTTPClientError.cancelled) } - let closeFutures = self.state.availableConnections.map { $0.close() } - return (waitersFutures, closeFutures) + func close() { + self.lock.withLockVoid { + self.waiters.forEach { $0.promise.fail(HTTPClientError.cancelled) } + self.waiters.removeAll() } - return EventLoopFuture.andAllComplete(waitersFutures, on: self.eventLoop) - .flatMap { - EventLoopFuture.andAllComplete(closeFutures, on: self.eventLoop) - } - .map { _ in - self.stateLock.withLock { - if self.state.leased == 0, self.state.availableConnections.isEmpty { - self.state.activity = .closed - } else { - self.state.activity = .closing - } - } - } - } - - func close() -> EventLoopFuture { - let availableConnections = self.stateLock.withLock { () -> CircularBuffer in - assert(self.state.activity == .closing) - return self.state.availableConnections + self.lock.withLock { + self.availableConnections.forEach { $0.close() } + self.availableConnections.removeAll() } - - return EventLoopFuture.andAllComplete(availableConnections.map { $0.close() }, on: self.eventLoop) } - private func activityPrecondition(expected: Set) { - self.stateLock.withLock { - precondition(expected.contains(self.state.activity), "Attempting to use HTTP1ConnectionProvider with unexpected state: \(self.state.activity) (expected: \(expected))") + private func resolvePreference(_ preference: HTTPClient.EventLoopPreference) -> (EventLoop, Bool) { + switch preference.preference { + case .indifferent: + return (self.eventLoop, false) + case .delegate(let el): + return (el, false) + case .delegateAndChannel(let el), .testOnly_exact(let el, _): + return (el, true) } } - fileprivate struct State { - /// The default `EventLoop` to use for this `HTTP1ConnectionProvider` - let defaultEventLoop: EventLoop - - /// The maximum number of connections to a certain (host, scheme, port) tuple. - private let maximumConcurrentConnections: Int = 8 - - /// Opened connections that are available - fileprivate var availableConnections: CircularBuffer = .init(initialCapacity: 8) - - /// The number of currently leased connections - fileprivate var leased: Int = 0 { - didSet { - assert((0...self.maximumConcurrentConnections).contains(self.leased), "Invalid number of leased connections (\(self.leased))") - } - } - - /// Consumers that weren't able to get a new connection without exceeding - /// `maximumConcurrentConnections` get a `Future` - /// whose associated promise is stored in `Waiter`. The promise is completed - /// as soon as possible by the provider, in FIFO order. - fileprivate var waiters: CircularBuffer = .init(initialCapacity: 8) - - fileprivate var activity: Activity = .opened - - fileprivate var pending: Int = 0 { - didSet { - assert(self.pending >= 0) - } - } - - private let parentPool: ConnectionPool - - private let key: Key - - fileprivate init(eventLoop: EventLoop, parentPool: ConnectionPool, key: Key) { - self.defaultEventLoop = eventLoop - self.parentPool = parentPool - self.key = key - } - - fileprivate mutating func connectionAction(for preference: HTTPClient.EventLoopPreference) -> ConnectionGetAction { - self.pending -= 1 - let (channelEL, requiresSpecifiedEL) = self.resolvePreference(preference) - if self.leased < self.maximumConcurrentConnections { - self.leased += 1 - if let connection = availableConnections.swapWithFirstAndRemove(where: { $0.channel.eventLoop === channelEL }) { - connection.isLeased = true - return .leaseConnection(connection) - } else { - if requiresSpecifiedEL { - return .makeConnection(channelEL) - } else if let existingConnection = availableConnections.popFirst() { - return .leaseConnection(existingConnection) - } else { - return .makeConnection(self.defaultEventLoop) - } - } - } else { - let promise = channelEL.makePromise(of: Connection.self) - self.waiters.append(Waiter(promise: promise, preference: preference)) - return .leaseFutureConnection(promise.futureResult) - } - } - - fileprivate mutating func releaseAction(for connection: Connection) -> ConnectionReleaseAction { - if let firstWaiter = self.waiters.popFirst() { - let (channelEL, requiresSpecifiedEL) = self.resolvePreference(firstWaiter.preference) - - guard connection.isActiveEstimation, !connection.isClosing else { - return .makeConnectionAndComplete(channelEL, firstWaiter.promise) - } - - if connection.channel.eventLoop === channelEL { - return .succeed(firstWaiter.promise) - } else { - if requiresSpecifiedEL { - connection.mustRunDefaultCloseCallback = false - return .replaceConnection(channelEL, firstWaiter.promise) - } else { - return .makeConnectionAndComplete(channelEL, firstWaiter.promise) - } - } - - } else { - connection.isLeased = false - self.leased -= 1 - if connection.isActiveEstimation, !connection.isClosing { - self.availableConnections.append(connection) - } - - if self.providerMustClose() { - self.removeFromPool() - } - - return .none - } - } - - fileprivate mutating func removeClosedConnection(_ connection: Connection) -> ClosedConnectionRemoveAction { - if connection.isLeased { - if let firstWaiter = self.waiters.popFirst() { - let (el, _) = self.resolvePreference(firstWaiter.preference) - return .makeConnectionAndComplete(el, firstWaiter.promise) - } - } else { - self.availableConnections.swapWithFirstAndRemove(where: { $0 === connection }) - } - - if self.providerMustClose() { - self.removeFromPool() - } - - return .none - } - - fileprivate mutating func failedConnectionAction() -> ClosedConnectionRemoveAction { - if let firstWaiter = self.waiters.popFirst() { - let (el, _) = self.resolvePreference(firstWaiter.preference) - return .makeConnectionAndComplete(el, firstWaiter.promise) - } else { - self.leased -= 1 - if self.providerMustClose() { - self.removeFromPool() - } - return .none - } - } - - private func providerMustClose() -> Bool { - return self.pending == 0 && self.activity != .closed && self.leased == 0 && self.availableConnections.isEmpty && self.waiters.isEmpty - } - - /// - Warning: This should always be called from a critical section protected by `.connectionProvidersLock` - fileprivate mutating func removeFromPool() { - assert(self.parentPool.connectionProviders[self.key] != nil) - self.parentPool.connectionProviders[self.key] = nil - assert(self.activity != .closed) - self.activity = .closed - } - - private func resolvePreference(_ preference: HTTPClient.EventLoopPreference) -> (EventLoop, Bool) { - switch preference.preference { - case .indifferent: - return (self.defaultEventLoop, false) - case .delegate(let el): - return (el, false) - case .delegateAndChannel(let el), .testOnly_exact(let el, _): - return (el, true) - } - } - - fileprivate enum ConnectionGetAction { - case leaseConnection(Connection) - case makeConnection(EventLoop) - case leaseFutureConnection(EventLoopFuture) - } - - fileprivate enum ConnectionReleaseAction { - case succeed(EventLoopPromise) - case makeConnectionAndComplete(EventLoop, EventLoopPromise) - case replaceConnection(EventLoop, EventLoopPromise) - case none - } - - fileprivate enum ClosedConnectionRemoveAction { - case none - case makeConnectionAndComplete(EventLoop, EventLoopPromise) - } - - /// A `Waiter` represents a request that waits for a connection when none is - /// currently available - /// - /// `Waiter`s are created when `maximumConcurrentConnections` is reached - /// and we cannot create new connections anymore. - fileprivate struct Waiter { - /// The promise to complete once a connection is available - let promise: EventLoopPromise - - /// The event loop preference associated to this particular request - /// that the provider should respect - let preference: HTTPClient.EventLoopPreference - } - - enum Activity: Hashable, CustomStringConvertible { - case opened - case closing - case closed - - var description: String { - switch self { - case .opened: - return "opened" - case .closing: - return "closing" - case .closed: - return "closed" - } - } - } + /// A `Waiter` represents a request that waits for a connection when none is + /// currently available + /// + /// `Waiter`s are created when `maximumConcurrentConnections` is reached + /// and we cannot create new connections anymore. + struct Waiter { + /// The promise to complete once a connection is available + let promise: EventLoopPromise + + /// The event loop preference associated to this particular request + /// that the provider should respect + let preference: HTTPClient.EventLoopPreference } } } diff --git a/Sources/AsyncHTTPClient/HTTPClient.swift b/Sources/AsyncHTTPClient/HTTPClient.swift index eb9833774..a2c1acb50 100644 --- a/Sources/AsyncHTTPClient/HTTPClient.swift +++ b/Sources/AsyncHTTPClient/HTTPClient.swift @@ -73,7 +73,7 @@ public class HTTPClient { } deinit { - assert(self.pool.connectionProviderCount == 0) +// assert(self.pool.connectionProviderCount == 0) assert(self.state == .shutDown, "Client not shut down before the deinit. Please call client.syncShutdown() when no longer needed.") } @@ -167,24 +167,19 @@ public class HTTPClient { case .failure(let error): callback(error) case .success(let tasks): - self.pool.prepareForClose(on: self.eventLoopGroup.next()).whenComplete { _ in - var closeError: Error? - if !tasks.isEmpty, requiresCleanClose { - closeError = HTTPClientError.uncleanShutdown - } + var closeError: Error? + if !tasks.isEmpty, requiresCleanClose { + closeError = HTTPClientError.uncleanShutdown + } - // we ignore errors here - self.cancelTasks(tasks).whenComplete { _ in - // we ignore errors here - self.pool.close(on: self.eventLoopGroup.next()).whenComplete { _ in - self.shutdownEventLoop(queue: queue) { eventLoopError in - // we prioritise .uncleanShutdown here - if let error = closeError { - callback(error) - } else { - callback(eventLoopError) - } - } + self.cancelTasks(tasks).whenComplete { _ in + self.pool.close(on: self.eventLoopGroup.next()) + self.shutdownEventLoop(queue: queue) { eventLoopError in + // we prioritise .uncleanShutdown here + if let error = closeError { + callback(error) + } else { + callback(eventLoopError) } } } diff --git a/Sources/AsyncHTTPClient/HTTPHandler.swift b/Sources/AsyncHTTPClient/HTTPHandler.swift index b77ab02d1..a7b96ed12 100644 --- a/Sources/AsyncHTTPClient/HTTPHandler.swift +++ b/Sources/AsyncHTTPClient/HTTPHandler.swift @@ -579,10 +579,9 @@ extension HTTPClient { func fail(with error: Error, delegateType: Delegate.Type) { if let connection = self.connection { - connection.close().whenComplete { _ in - self.releaseAssociatedConnection(delegateType: delegateType).whenComplete { _ in - self.promise.fail(error) - } + connection.close() + self.releaseAssociatedConnection(delegateType: delegateType).whenComplete { _ in + self.promise.fail(error) } } } diff --git a/Sources/AsyncHTTPClient/Utils.swift b/Sources/AsyncHTTPClient/Utils.swift index 6e2fedf53..21415652f 100644 --- a/Sources/AsyncHTTPClient/Utils.swift +++ b/Sources/AsyncHTTPClient/Utils.swift @@ -66,28 +66,6 @@ extension ClientBootstrap { } } -extension CircularBuffer { - @discardableResult - mutating func swapWithFirstAndRemove(at index: Index) -> Element? { - precondition(index >= self.startIndex && index < self.endIndex) - if !self.isEmpty { - self.swapAt(self.startIndex, index) - return self.removeFirst() - } else { - return nil - } - } - - @discardableResult - mutating func swapWithFirstAndRemove(where predicate: (Element) throws -> Bool) rethrows -> Element? { - if let existingIndex = try self.firstIndex(where: predicate) { - return self.swapWithFirstAndRemove(at: existingIndex) - } else { - return nil - } - } -} - extension ConnectionPool.Connection { func removeHandler(_ type: Handler.Type) -> EventLoopFuture { return self.channel.pipeline.handler(type: type).flatMap { handler in diff --git a/Tests/AsyncHTTPClientTests/HTTPClientTests.swift b/Tests/AsyncHTTPClientTests/HTTPClientTests.swift index 986194761..afabc0100 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientTests.swift @@ -83,6 +83,20 @@ class HTTPClientTests: XCTestCase { } let response = try httpClient.get(url: "http://localhost:\(httpBin.port)/get").wait() + try EventLoopFuture.andAllSucceed([ + httpClient.get(url: "http://localhost:\(httpBin.port)/get"), + httpClient.get(url: "http://localhost:\(httpBin.port)/get"), + httpClient.get(url: "http://localhost:\(httpBin.port)/get"), + httpClient.get(url: "http://localhost:\(httpBin.port)/get"), + httpClient.get(url: "http://localhost:\(httpBin.port)/get"), + httpClient.get(url: "http://localhost:\(httpBin.port)/get"), + httpClient.get(url: "http://localhost:\(httpBin.port)/get"), + httpClient.get(url: "http://localhost:\(httpBin.port)/get"), + httpClient.get(url: "http://localhost:\(httpBin.port)/get"), + httpClient.get(url: "http://localhost:\(httpBin.port)/get"), + httpClient.get(url: "http://localhost:\(httpBin.port)/get"), + httpClient.get(url: "http://localhost:\(httpBin.port)/get") + ], on: httpClient.eventLoopGroup.next()).wait() XCTAssertEqual(.ok, response.status) } From 8da635abcce6205bbd711d32a7a41649036f392a Mon Sep 17 00:00:00 2001 From: Artem Redkin Date: Fri, 3 Apr 2020 17:46:47 +0100 Subject: [PATCH 02/31] revert test --- Tests/AsyncHTTPClientTests/HTTPClientTests.swift | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/Tests/AsyncHTTPClientTests/HTTPClientTests.swift b/Tests/AsyncHTTPClientTests/HTTPClientTests.swift index afabc0100..986194761 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientTests.swift @@ -83,20 +83,6 @@ class HTTPClientTests: XCTestCase { } let response = try httpClient.get(url: "http://localhost:\(httpBin.port)/get").wait() - try EventLoopFuture.andAllSucceed([ - httpClient.get(url: "http://localhost:\(httpBin.port)/get"), - httpClient.get(url: "http://localhost:\(httpBin.port)/get"), - httpClient.get(url: "http://localhost:\(httpBin.port)/get"), - httpClient.get(url: "http://localhost:\(httpBin.port)/get"), - httpClient.get(url: "http://localhost:\(httpBin.port)/get"), - httpClient.get(url: "http://localhost:\(httpBin.port)/get"), - httpClient.get(url: "http://localhost:\(httpBin.port)/get"), - httpClient.get(url: "http://localhost:\(httpBin.port)/get"), - httpClient.get(url: "http://localhost:\(httpBin.port)/get"), - httpClient.get(url: "http://localhost:\(httpBin.port)/get"), - httpClient.get(url: "http://localhost:\(httpBin.port)/get"), - httpClient.get(url: "http://localhost:\(httpBin.port)/get") - ], on: httpClient.eventLoopGroup.next()).wait() XCTAssertEqual(.ok, response.status) } From 110dc2a2442adf7f5ce504971ae21c195778e50f Mon Sep 17 00:00:00 2001 From: Artem Redkin Date: Fri, 3 Apr 2020 20:41:27 +0100 Subject: [PATCH 03/31] review fixes - move callouts out of locks --- Sources/AsyncHTTPClient/ConnectionPool.swift | 93 ++++++++++++------- .../HTTPClientInternalTests.swift | 16 ++++ 2 files changed, 78 insertions(+), 31 deletions(-) diff --git a/Sources/AsyncHTTPClient/ConnectionPool.swift b/Sources/AsyncHTTPClient/ConnectionPool.swift index f8c121701..7848cc824 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool.swift @@ -234,6 +234,12 @@ final class ConnectionPool { /// of concurrent requests as it has built-in politeness regarding the maximum number /// of concurrent requests to the server. class HTTP1ConnectionProvider: CustomStringConvertible { + enum Action { + case lease(Connection, Waiter) + case create(Waiter) + case none + } + /// The client configuration used to bootstrap new requests private let configuration: HTTPClient.Configuration @@ -295,68 +301,86 @@ final class ConnectionPool { return "HTTP1ConnectionProvider { key: \(self.key) }" } + func execute(_ action: Action) { + switch action { + case .lease(let connection, let waiter): + // check if we can vend this connection to caller + connection.cancelIdleTimeout().flatMapError { error in + if error is Connection.InactiveChannelError { + return self.makeConnection(on: waiter.preference.bestEventLoop ?? self.eventLoop) + } + return connection.channel.eventLoop.makeFailedFuture(error) + } + .cascade(to: waiter.promise) + case .create(let waiter): + self.makeConnection(on: waiter.preference.bestEventLoop ?? self.eventLoop).cascade(to: waiter.promise) + case .none: + break + } + } + func getConnection(preference: HTTPClient.EventLoopPreference) -> EventLoopFuture { - let promise = self.eventLoop.makePromise(of: Connection.self) + let waiter = Waiter(promise: self.eventLoop.makePromise(), preference: preference) - self.lock.withLockVoid { + let action: Action = self.lock.withLock { if let connection = self.availableConnections.popFirst() { connection.isLeased = true - // check if we can vend this connection to caller - connection.cancelIdleTimeout().flatMapError { error in - if error is Connection.InactiveChannelError { - return self.makeConnection(on: preference.bestEventLoop ?? self.eventLoop) - } - return connection.channel.eventLoop.makeFailedFuture(error) - } - .cascade(to: promise) + return .lease(connection, waiter) } else if self.openedConnectionsCount < self.maximumConcurrentConnections { self.openedConnectionsCount += 1 - self.makeConnection(on: preference.bestEventLoop ?? self.eventLoop).cascade(to: promise) + return .create(waiter) } else { - self.waiters.append(.init(promise: promise, preference: preference)) + self.waiters.append(waiter) + return .none } } - return promise.futureResult + self.execute(action) + + return waiter.promise.futureResult } func release(connection: Connection) { - self.lock.withLock { + let action: Action = self.lock.withLock { if connection.isActiveEstimation { // If connection is alive, we can give to a next waiter if let waiter = self.waiters.popFirst() { - // TODO: - connection.channel.eventLoop.execute { - connection.cancelIdleTimeout().flatMapError { error in - if error is Connection.InactiveChannelError { - return self.makeConnection(on: waiter.preference.bestEventLoop ?? self.eventLoop) - } - return connection.channel.eventLoop.makeFailedFuture(error) - } - .cascade(to: waiter.promise) - } + connection.isLeased = true + return .lease(connection, waiter) } else { connection.isLeased = false self.availableConnections.append(connection) + return .none } } else { + // TODO: close here is probably not ok connection.close() self.openedConnectionsCount -= 1 if let waiter = self.waiters.popFirst() { self.openedConnectionsCount += 1 - self.makeConnection(on: waiter.preference.bestEventLoop ?? self.eventLoop).cascade(to: waiter.promise) + return .create(waiter) } + + return .none } } + + // TODO: is this correct? + connection.channel.eventLoop.execute { + self.execute(action) + } } private func processNextWaiter() { - self.lock.withLock { + let action: Action = self.lock.withLock { if let waiter = self.waiters.popFirst() { self.openedConnectionsCount += 1 - self.makeConnection(on: waiter.preference.bestEventLoop ?? self.eventLoop).cascade(to: waiter.promise) + return .create(waiter) } + return .none } + + self.execute(action) } private func makeConnection(on eventLoop: EventLoop) -> EventLoopFuture { @@ -387,6 +411,7 @@ final class ConnectionPool { handshakePromise.fail(error) // there is no connection here anymore, we need to bootstrap next waiter + // TODO: this is done out of lock, most likely incorrect self.openedConnectionsCount -= 1 self.processNextWaiter() return self.eventLoop.makeFailedFuture(error) @@ -394,15 +419,21 @@ final class ConnectionPool { } func close() { - self.lock.withLockVoid { - self.waiters.forEach { $0.promise.fail(HTTPClientError.cancelled) } + let waiters: CircularBuffer = self.lock.withLock { + let copy = self.waiters self.waiters.removeAll() + return copy } - self.lock.withLock { - self.availableConnections.forEach { $0.close() } + waiters.forEach { $0.promise.fail(HTTPClientError.cancelled) } + + let connections: CircularBuffer = self.lock.withLock { + let copy = self.availableConnections self.availableConnections.removeAll() + return copy } + + connections.forEach { $0.close() } } private func resolvePreference(_ preference: HTTPClient.EventLoopPreference) -> (EventLoop, Bool) { diff --git a/Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift b/Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift index 9288814c6..66c2496c2 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift @@ -455,6 +455,22 @@ class HTTPClientInternalTests: XCTestCase { }.futureResult.wait() } + func _testProviderEmptyAfterIdle() throws { + let httpClient = HTTPClient(eventLoopGroupProvider: .createNew, + configuration: .init(maximumAllowedIdleTimeInConnectionPool: .milliseconds(50))) + let web = NIOHTTP1TestServer(group: httpClient.eventLoopGroup) + + defer { + XCTAssertNoThrow(try web.stop()) + XCTAssertNoThrow(try httpClient.syncShutdown(requiresCleanClose: true)) + } + + //let result = try httpClient.get(url: "http://localhost:\(web.serverPort)/foo").wait() + //Thread.sleep(forTimeInterval: 100) + XCTFail("Not implemented yet") + //XCTAssert(httpClient.pool.) + } + func testWeNoticeRemoteClosuresEvenWhenConnectionIsIdleInPool() { final class ServerThatRespondsThenJustCloses: ChannelInboundHandler { typealias InboundIn = HTTPServerRequestPart From 6c51cc646747b78b950ef5a3a186bb5f9e00db20 Mon Sep 17 00:00:00 2001 From: Artem Redkin Date: Fri, 3 Apr 2020 21:13:52 +0100 Subject: [PATCH 04/31] restore some asserts --- Sources/AsyncHTTPClient/ConnectionPool.swift | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/Sources/AsyncHTTPClient/ConnectionPool.swift b/Sources/AsyncHTTPClient/ConnectionPool.swift index 7848cc824..48e292e21 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool.swift @@ -290,11 +290,9 @@ final class ConnectionPool { } deinit { - // TODO: -// assert(self.state.activity == .closed, "Non closed on deinit") -// assert(self.state.availableConnections.isEmpty, "Available connections should be empty before deinit") -// assert(self.state.leased == 0, "All leased connections should have been returned before deinit") -// assert(self.state.waiters.count == 0, "Waiters on deinit: \(self.state.waiters)") + assert(self.waiters.isEmpty) + assert(self.availableConnections.isEmpty) + assert(self.openedConnectionsCount == 0) } var description: String { From d5830a89bb6c9f66951c1ec456c2bbf6b4f69f0a Mon Sep 17 00:00:00 2001 From: Artem Redkin Date: Fri, 3 Apr 2020 21:25:26 +0100 Subject: [PATCH 05/31] remove duplicate test --- .../HTTPClientInternalTests.swift | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift b/Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift index 66c2496c2..9288814c6 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift @@ -455,22 +455,6 @@ class HTTPClientInternalTests: XCTestCase { }.futureResult.wait() } - func _testProviderEmptyAfterIdle() throws { - let httpClient = HTTPClient(eventLoopGroupProvider: .createNew, - configuration: .init(maximumAllowedIdleTimeInConnectionPool: .milliseconds(50))) - let web = NIOHTTP1TestServer(group: httpClient.eventLoopGroup) - - defer { - XCTAssertNoThrow(try web.stop()) - XCTAssertNoThrow(try httpClient.syncShutdown(requiresCleanClose: true)) - } - - //let result = try httpClient.get(url: "http://localhost:\(web.serverPort)/foo").wait() - //Thread.sleep(forTimeInterval: 100) - XCTFail("Not implemented yet") - //XCTAssert(httpClient.pool.) - } - func testWeNoticeRemoteClosuresEvenWhenConnectionIsIdleInPool() { final class ServerThatRespondsThenJustCloses: ChannelInboundHandler { typealias InboundIn = HTTPServerRequestPart From 5bc7c36e60e8d9775f058c11ecdcad6748c2b354 Mon Sep 17 00:00:00 2001 From: Artem Redkin Date: Thu, 16 Apr 2020 13:03:02 +0100 Subject: [PATCH 06/31] add empty provider removal --- Sources/AsyncHTTPClient/ConnectionPool.swift | 71 ++++++++++++++------ Sources/AsyncHTTPClient/HTTPClient.swift | 1 - 2 files changed, 50 insertions(+), 22 deletions(-) diff --git a/Sources/AsyncHTTPClient/ConnectionPool.swift b/Sources/AsyncHTTPClient/ConnectionPool.swift index 48e292e21..f0771f8ca 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool.swift @@ -66,7 +66,7 @@ final class ConnectionPool { let key = Key(request) let provider: HTTP1ConnectionProvider = self.lock.withLock { - if let existing = self.providers[key] { + if let existing = self.providers[key], existing.isActive { return existing } else { let http1Provider = HTTP1ConnectionProvider(key: key, eventLoop: eventLoop, configuration: self.configuration, pool: self) @@ -88,6 +88,12 @@ final class ConnectionPool { } } + func delete(_ provider: HTTP1ConnectionProvider) { + self.lock.withLockVoid { + self.providers[provider.key] = nil + } + } + func close(on eventLoop: EventLoop) { let providers = self.lock.withLock { self.providers.values @@ -165,9 +171,6 @@ final class ConnectionPool { /// for removing the specific handlers they added to the `Channel` pipeline before releasing it to the pool. let channel: Channel - /// Wether the connection is currently leased or not - var isLeased: Bool = false - /// Indicates that this connection is about to close var isClosing: Bool = false @@ -247,7 +250,7 @@ final class ConnectionPool { private let pool: ConnectionPool /// The key associated with this provider - private let key: ConnectionPool.Key + let key: ConnectionPool.Key /// The default `EventLoop` for this provider /// @@ -255,14 +258,16 @@ final class ConnectionPool { /// for which the `EventLoopPreference` is set to `.indifferent` let eventLoop: EventLoop - /// The lock used to access and modify the `state` property + /// The lock used to access and modify the provider state - `availableConnections`, `waiters` and `openedConnectionsCount`. /// - /// - Warning: This lock should always be acquired *after* `ConnectionPool`s `connectionProvidersLock` if used in combination with it. + /// - Warning: This lock should always be acquired *after* `ConnectionPool`s `lock` if used in combination with it. private let lock = Lock() /// The maximum number of concurrent connections to a given (host, scheme, port) private let maximumConcurrentConnections: Int = 8 + private var active: Bool = true + /// Opened connections that are available var availableConnections: CircularBuffer = .init(initialCapacity: 8) @@ -272,7 +277,7 @@ final class ConnectionPool { /// as soon as possible by the provider, in FIFO order. var waiters: CircularBuffer = .init(initialCapacity: 8) - // TODO: description + /// Number of opened or opening connections, used to keep track of all connections and enforcing `maximumConcurrentConnections` limit. var openedConnectionsCount: Int = 0 /// Creates a new `HTTP1ConnectionProvider` @@ -299,12 +304,20 @@ final class ConnectionPool { return "HTTP1ConnectionProvider { key: \(self.key) }" } + var isActive: Bool { + self.lock.withLock { + self.active + } + } + func execute(_ action: Action) { switch action { case .lease(let connection, let waiter): // check if we can vend this connection to caller connection.cancelIdleTimeout().flatMapError { error in + // if connection is already inactive, we create a new one. if error is Connection.InactiveChannelError { + self.openedConnectionsCount += 1 return self.makeConnection(on: waiter.preference.bestEventLoop ?? self.eventLoop) } return connection.channel.eventLoop.makeFailedFuture(error) @@ -322,7 +335,6 @@ final class ConnectionPool { let action: Action = self.lock.withLock { if let connection = self.availableConnections.popFirst() { - connection.isLeased = true return .lease(connection, waiter) } else if self.openedConnectionsCount < self.maximumConcurrentConnections { self.openedConnectionsCount += 1 @@ -342,18 +354,12 @@ final class ConnectionPool { let action: Action = self.lock.withLock { if connection.isActiveEstimation { // If connection is alive, we can give to a next waiter if let waiter = self.waiters.popFirst() { - connection.isLeased = true return .lease(connection, waiter) } else { - connection.isLeased = false self.availableConnections.append(connection) return .none } } else { - // TODO: close here is probably not ok - connection.close() - self.openedConnectionsCount -= 1 - if let waiter = self.waiters.popFirst() { self.openedConnectionsCount += 1 return .create(waiter) @@ -363,12 +369,29 @@ final class ConnectionPool { } } - // TODO: is this correct? + // This is needed to start a new stack, otherwise, since this is called on a previous + // future completion handler chain, it will be growing indefinitely until the connection is closed. + // We might revisit this when https://github.com/apple/swift-nio/issues/970 is resolved. connection.channel.eventLoop.execute { self.execute(action) } } + func delete(connection: Connection) { + self.lock.withLock { + self.openedConnectionsCount -= 1 + self.availableConnections.removeAll { $0 === connection } + + if self.openedConnectionsCount == 0 { + self.active = false + } + } + + if !self.active { + self.pool.delete(self) + } + } + private func processNextWaiter() { let action: Action = self.lock.withLock { if let waiter = self.waiters.popFirst() { @@ -400,17 +423,23 @@ final class ConnectionPool { channel.pipeline.addHTTPClientHandlers(leftOverBytesStrategy: .forwardBytes) }.map { let connection = Connection(key: self.key, channel: channel, pool: self.pool) - connection.isLeased = true + + channel.closeFuture.whenComplete { _ in + self.delete(connection: connection) + } + return connection } }.flatMapError { error in - // This promise may not have been completed if we reach this - // so we fail it to avoid any leak + // This promise may not have been completed if we reach this so we fail it to avoid any leak handshakePromise.fail(error) + // since we failed to create a connection, we need to decrease opened connection count + self.lock.withLockVoid { + self.openedConnectionsCount -= 1 + } + // there is no connection here anymore, we need to bootstrap next waiter - // TODO: this is done out of lock, most likely incorrect - self.openedConnectionsCount -= 1 self.processNextWaiter() return self.eventLoop.makeFailedFuture(error) } diff --git a/Sources/AsyncHTTPClient/HTTPClient.swift b/Sources/AsyncHTTPClient/HTTPClient.swift index a2c1acb50..cc82dac03 100644 --- a/Sources/AsyncHTTPClient/HTTPClient.swift +++ b/Sources/AsyncHTTPClient/HTTPClient.swift @@ -73,7 +73,6 @@ public class HTTPClient { } deinit { -// assert(self.pool.connectionProviderCount == 0) assert(self.state == .shutDown, "Client not shut down before the deinit. Please call client.syncShutdown() when no longer needed.") } From 81889025eb0c9a7d06ec99dfb016fffe13ee95de Mon Sep 17 00:00:00 2001 From: Artem Redkin Date: Thu, 16 Apr 2020 13:27:09 +0100 Subject: [PATCH 07/31] fix warning and a race --- Sources/AsyncHTTPClient/ConnectionPool.swift | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Sources/AsyncHTTPClient/ConnectionPool.swift b/Sources/AsyncHTTPClient/ConnectionPool.swift index f0771f8ca..3c4c1ab6b 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool.swift @@ -305,7 +305,7 @@ final class ConnectionPool { } var isActive: Bool { - self.lock.withLock { + return self.lock.withLock { self.active } } @@ -387,7 +387,7 @@ final class ConnectionPool { } } - if !self.active { + if !self.isActive { self.pool.delete(self) } } From 1bf32d60c9f1f47f912938106ab96f45867ef21d Mon Sep 17 00:00:00 2001 From: Artem Redkin Date: Sun, 26 Apr 2020 11:25:37 +0100 Subject: [PATCH 08/31] review fix: add missing lock --- Sources/AsyncHTTPClient/ConnectionPool.swift | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/Sources/AsyncHTTPClient/ConnectionPool.swift b/Sources/AsyncHTTPClient/ConnectionPool.swift index 3c4c1ab6b..80a651624 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool.swift @@ -317,7 +317,9 @@ final class ConnectionPool { connection.cancelIdleTimeout().flatMapError { error in // if connection is already inactive, we create a new one. if error is Connection.InactiveChannelError { - self.openedConnectionsCount += 1 + self.lock.withLockVoid { + self.openedConnectionsCount += 1 + } return self.makeConnection(on: waiter.preference.bestEventLoop ?? self.eventLoop) } return connection.channel.eventLoop.makeFailedFuture(error) From e77fdb97f2f2ebff0f96d1b9dd9b645bd5a8c7c8 Mon Sep 17 00:00:00 2001 From: Artem Redkin Date: Sun, 26 Apr 2020 13:00:31 +0100 Subject: [PATCH 09/31] add waiter for provider deletion --- Sources/AsyncHTTPClient/ConnectionPool.swift | 25 +++++++++++++------- Sources/AsyncHTTPClient/HTTPClient.swift | 16 +++++++------ 2 files changed, 25 insertions(+), 16 deletions(-) diff --git a/Sources/AsyncHTTPClient/ConnectionPool.swift b/Sources/AsyncHTTPClient/ConnectionPool.swift index 80a651624..59a350ad5 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool.swift @@ -94,14 +94,12 @@ final class ConnectionPool { } } - func close(on eventLoop: EventLoop) { + func close(on eventLoop: EventLoop) -> EventLoopFuture { let providers = self.lock.withLock { self.providers.values } - providers.forEach { - $0.close() - } + return EventLoopFuture.andAllComplete(providers.map { $0.close(on: eventLoop) }, on: eventLoop) } var connectionProviderCount: Int { @@ -240,6 +238,7 @@ final class ConnectionPool { enum Action { case lease(Connection, Waiter) case create(Waiter) + case delete case none } @@ -327,6 +326,8 @@ final class ConnectionPool { .cascade(to: waiter.promise) case .create(let waiter): self.makeConnection(on: waiter.preference.bestEventLoop ?? self.eventLoop).cascade(to: waiter.promise) + case .delete: + self.pool.delete(self) case .none: break } @@ -380,18 +381,19 @@ final class ConnectionPool { } func delete(connection: Connection) { - self.lock.withLock { + let action: Action = self.lock.withLock { self.openedConnectionsCount -= 1 self.availableConnections.removeAll { $0 === connection } if self.openedConnectionsCount == 0 { self.active = false + return .delete } - } - if !self.isActive { - self.pool.delete(self) + return .none } + + self.execute(action) } private func processNextWaiter() { @@ -399,6 +401,9 @@ final class ConnectionPool { if let waiter = self.waiters.popFirst() { self.openedConnectionsCount += 1 return .create(waiter) + } else if self.openedConnectionsCount == 0 { + self.active = false + return .delete } return .none } @@ -447,7 +452,7 @@ final class ConnectionPool { } } - func close() { + func close(on eventLoop: EventLoop) -> EventLoopFuture { let waiters: CircularBuffer = self.lock.withLock { let copy = self.waiters self.waiters.removeAll() @@ -463,6 +468,8 @@ final class ConnectionPool { } connections.forEach { $0.close() } + + return EventLoopFuture.andAllComplete(connections.map { $0.channel.closeFuture }, on: eventLoop) } private func resolvePreference(_ preference: HTTPClient.EventLoopPreference) -> (EventLoop, Bool) { diff --git a/Sources/AsyncHTTPClient/HTTPClient.swift b/Sources/AsyncHTTPClient/HTTPClient.swift index cc82dac03..26cf95d2b 100644 --- a/Sources/AsyncHTTPClient/HTTPClient.swift +++ b/Sources/AsyncHTTPClient/HTTPClient.swift @@ -73,6 +73,7 @@ public class HTTPClient { } deinit { + assert(self.pool.connectionProviderCount == 0) assert(self.state == .shutDown, "Client not shut down before the deinit. Please call client.syncShutdown() when no longer needed.") } @@ -172,13 +173,14 @@ public class HTTPClient { } self.cancelTasks(tasks).whenComplete { _ in - self.pool.close(on: self.eventLoopGroup.next()) - self.shutdownEventLoop(queue: queue) { eventLoopError in - // we prioritise .uncleanShutdown here - if let error = closeError { - callback(error) - } else { - callback(eventLoopError) + self.pool.close(on: self.eventLoopGroup.next()).whenComplete { _ in + self.shutdownEventLoop(queue: queue) { eventLoopError in + // we prioritise .uncleanShutdown here + if let error = closeError { + callback(error) + } else { + callback(eventLoopError) + } } } } From ef484e3eff07f09e8e086a3a6e83cb48e3c75da0 Mon Sep 17 00:00:00 2001 From: Artem Redkin Date: Sun, 26 Apr 2020 13:32:46 +0100 Subject: [PATCH 10/31] make provider close more robust --- Sources/AsyncHTTPClient/ConnectionPool.swift | 15 +++++++++------ Sources/AsyncHTTPClient/HTTPHandler.swift | 2 +- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/Sources/AsyncHTTPClient/ConnectionPool.swift b/Sources/AsyncHTTPClient/ConnectionPool.swift index 59a350ad5..a8da4a34f 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool.swift @@ -168,13 +168,15 @@ final class ConnectionPool { /// - Warning: Requests that lease connections from the `ConnectionPool` are responsible /// for removing the specific handlers they added to the `Channel` pipeline before releasing it to the pool. let channel: Channel + let closeFuture: EventLoopFuture /// Indicates that this connection is about to close var isClosing: Bool = false - init(key: Key, channel: Channel, pool: ConnectionPool) { + init(key: Key, channel: Channel, closeFuture: EventLoopFuture, pool: ConnectionPool) { self.key = key self.channel = channel + self.closeFuture = closeFuture self.pool = pool } @@ -194,8 +196,9 @@ final class ConnectionPool { self.pool.release(self) } - func close() { + func close() -> EventLoopFuture { self.channel.close(promise: nil) + return self.closeFuture } func cancelIdleTimeout() -> EventLoopFuture { @@ -429,10 +432,12 @@ final class ConnectionPool { return handshakePromise.futureResult.flatMap { channel.pipeline.addHTTPClientHandlers(leftOverBytesStrategy: .forwardBytes) }.map { - let connection = Connection(key: self.key, channel: channel, pool: self.pool) + let closePromise = channel.eventLoop.makePromise(of: Void.self) + let connection = Connection(key: self.key, channel: channel, closeFuture: closePromise.futureResult, pool: self.pool) channel.closeFuture.whenComplete { _ in self.delete(connection: connection) + closePromise.succeed(()) } return connection @@ -467,9 +472,7 @@ final class ConnectionPool { return copy } - connections.forEach { $0.close() } - - return EventLoopFuture.andAllComplete(connections.map { $0.channel.closeFuture }, on: eventLoop) + return EventLoopFuture.andAllComplete(connections.map { $0.close() }, on: eventLoop) } private func resolvePreference(_ preference: HTTPClient.EventLoopPreference) -> (EventLoop, Bool) { diff --git a/Sources/AsyncHTTPClient/HTTPHandler.swift b/Sources/AsyncHTTPClient/HTTPHandler.swift index de8328440..b9b4a71c4 100644 --- a/Sources/AsyncHTTPClient/HTTPHandler.swift +++ b/Sources/AsyncHTTPClient/HTTPHandler.swift @@ -557,7 +557,7 @@ extension HTTPClient { func fail(with error: Error, delegateType: Delegate.Type) { if let connection = self.connection { - connection.close() + connection.channel.close(promise: nil) self.releaseAssociatedConnection(delegateType: delegateType).whenComplete { _ in self.promise.fail(error) } From 7663dee6a90eb07a7232b84f60221b64b77af74d Mon Sep 17 00:00:00 2001 From: Artem Redkin Date: Sun, 26 Apr 2020 17:12:46 +0100 Subject: [PATCH 11/31] small optimization --- Sources/AsyncHTTPClient/ConnectionPool.swift | 4 ++++ Sources/AsyncHTTPClient/HTTPClient.swift | 16 +++++++--------- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/Sources/AsyncHTTPClient/ConnectionPool.swift b/Sources/AsyncHTTPClient/ConnectionPool.swift index 53024f936..d43d7a237 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool.swift @@ -376,6 +376,10 @@ final class ConnectionPool { } } + if case .none = action { + return + } + // This is needed to start a new stack, otherwise, since this is called on a previous // future completion handler chain, it will be growing indefinitely until the connection is closed. // We might revisit this when https://github.com/apple/swift-nio/issues/970 is resolved. diff --git a/Sources/AsyncHTTPClient/HTTPClient.swift b/Sources/AsyncHTTPClient/HTTPClient.swift index b3ee21c2d..d0ac53ae3 100644 --- a/Sources/AsyncHTTPClient/HTTPClient.swift +++ b/Sources/AsyncHTTPClient/HTTPClient.swift @@ -181,15 +181,13 @@ public class HTTPClient { closeError = HTTPClientError.uncleanShutdown } - self.cancelTasks(tasks).whenComplete { _ in - self.pool.close(on: self.eventLoopGroup.next()).whenComplete { _ in - self.shutdownEventLoop(queue: queue) { eventLoopError in - // we prioritise .uncleanShutdown here - if let error = closeError { - callback(error) - } else { - callback(eventLoopError) - } + self.cancelTasks(tasks).and(self.pool.close(on: self.eventLoopGroup.next())).whenComplete { _ in + self.shutdownEventLoop(queue: queue) { eventLoopError in + // we prioritise .uncleanShutdown here + if let error = closeError { + callback(error) + } else { + callback(eventLoopError) } } } From c19705d9e795ad96c10d1353f0c5046bd0923101 Mon Sep 17 00:00:00 2001 From: Artem Redkin Date: Sun, 26 Apr 2020 21:23:06 +0100 Subject: [PATCH 12/31] unoptimize --- Sources/AsyncHTTPClient/HTTPClient.swift | 16 +++++++++------- Sources/AsyncHTTPClient/HTTPHandler.swift | 7 +++---- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/Sources/AsyncHTTPClient/HTTPClient.swift b/Sources/AsyncHTTPClient/HTTPClient.swift index d0ac53ae3..b3ee21c2d 100644 --- a/Sources/AsyncHTTPClient/HTTPClient.swift +++ b/Sources/AsyncHTTPClient/HTTPClient.swift @@ -181,13 +181,15 @@ public class HTTPClient { closeError = HTTPClientError.uncleanShutdown } - self.cancelTasks(tasks).and(self.pool.close(on: self.eventLoopGroup.next())).whenComplete { _ in - self.shutdownEventLoop(queue: queue) { eventLoopError in - // we prioritise .uncleanShutdown here - if let error = closeError { - callback(error) - } else { - callback(eventLoopError) + self.cancelTasks(tasks).whenComplete { _ in + self.pool.close(on: self.eventLoopGroup.next()).whenComplete { _ in + self.shutdownEventLoop(queue: queue) { eventLoopError in + // we prioritise .uncleanShutdown here + if let error = closeError { + callback(error) + } else { + callback(eventLoopError) + } } } } diff --git a/Sources/AsyncHTTPClient/HTTPHandler.swift b/Sources/AsyncHTTPClient/HTTPHandler.swift index 5421cf94b..c7a48b91e 100644 --- a/Sources/AsyncHTTPClient/HTTPHandler.swift +++ b/Sources/AsyncHTTPClient/HTTPHandler.swift @@ -528,8 +528,8 @@ extension HTTPClient { /// Cancels the request execution. public func cancel() { let channel: Channel? = self.lock.withLock { - if !cancelled { - cancelled = true + if !self.cancelled { + self.cancelled = true return self.connection?.channel } else { return nil @@ -557,10 +557,9 @@ extension HTTPClient { func fail(with error: Error, delegateType: Delegate.Type) { if let connection = self.connection { - let closeFuture = connection.closeFuture connection.channel.close(promise: nil) self.releaseAssociatedConnection(delegateType: delegateType).whenComplete { _ in - closeFuture.whenComplete { _ in + connection.closeFuture.whenComplete { _ in self.promise.fail(error) } } From 4e3e884d8538a1c8f3c67d86135f16547ea70cb1 Mon Sep 17 00:00:00 2001 From: Artem Redkin Date: Sun, 3 May 2020 15:21:59 +0100 Subject: [PATCH 13/31] refactor pool and add tests --- Sources/AsyncHTTPClient/ConnectionPool.swift | 811 +++++++----- Sources/AsyncHTTPClient/HTTPClient.swift | 2 +- Sources/AsyncHTTPClient/HTTPHandler.swift | 47 +- Sources/AsyncHTTPClient/Utils.swift | 2 +- .../ConnectionPoolTests.swift | 1089 +++++++++++++++++ .../HTTPClientInternalTests.swift | 24 +- .../HTTPClientTestUtils.swift | 4 +- .../HTTPClientTests.swift | 2 +- 8 files changed, 1631 insertions(+), 350 deletions(-) create mode 100644 Tests/AsyncHTTPClientTests/ConnectionPoolTests.swift diff --git a/Sources/AsyncHTTPClient/ConnectionPool.swift b/Sources/AsyncHTTPClient/ConnectionPool.swift index d43d7a237..00c581e88 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool.swift @@ -17,6 +17,7 @@ import NIO import NIOConcurrencyHelpers import NIOHTTP1 import NIOTLS +import NIOHTTPCompression import NIOTransportServices /// A connection pool that manages and creates new connections to hosts respecting the specified preferences @@ -29,12 +30,12 @@ final class ConnectionPool { /// The main data structure used by the `ConnectionPool` to retreive and create connections associated /// to a given `Key` . /// - /// - Warning: This property should be accessed with proper synchronization, see `connectionProvidersLock` + /// - Warning: This property should be accessed with proper synchronization, see `lock` private var providers: [Key: HTTP1ConnectionProvider] = [:] - /// The lock used by the connection pool used to ensure correct synchronization of accesses to `_connectionProviders` + /// The lock used by the connection pool used to ensure correct synchronization of accesses to `providers` /// - /// - Warning: This lock should always be acquired *before* `HTTP1ConnectionProvider`s `stateLock` if used in combination with it. + /// - Warning: This lock should always be acquired *before* `HTTP1ConnectionProvider`s `lock` if used in combination with it. private let lock = Lock() init(configuration: HTTPClient.Configuration) { @@ -61,34 +62,24 @@ final class ConnectionPool { /// - Returns: A connection corresponding to the specified parameters /// /// When the pool is asked for a new connection, it creates a `Key` from the url associated to the `request`. This key - /// is used to determine if there already exists an associated `HTTP1ConnectionProvider` in `connectionProviders`. + /// is used to determine if there already exists an associated `HTTP1ConnectionProvider` in `providers`. /// If there is, the connection provider then takes care of leasing a new connection. If a connection provider doesn't exist, it is created. func getConnection(for request: HTTPClient.Request, preference: HTTPClient.EventLoopPreference, on eventLoop: EventLoop, deadline: NIODeadline?) -> EventLoopFuture { let key = Key(request) let provider: HTTP1ConnectionProvider = self.lock.withLock { - if let existing = self.providers[key], existing.isActive { + if let existing = self.providers[key], existing.enqueue() { return existing } else { - let http1Provider = HTTP1ConnectionProvider(key: key, eventLoop: eventLoop, configuration: self.configuration, pool: self) - self.providers[key] = http1Provider - return http1Provider + let provider = HTTP1ConnectionProvider(key: key, eventLoop: eventLoop, configuration: self.configuration, pool: self) + self.providers[key] = provider + return provider } } return provider.getConnection(preference: preference) } - func release(_ connection: Connection) { - let connectionProvider = self.lock.withLock { - self.providers[connection.key] - } - - if let connectionProvider = connectionProvider { - connectionProvider.release(connection: connection) - } - } - func delete(_ provider: HTTP1ConnectionProvider) { self.lock.withLockVoid { self.providers[provider.key] = nil @@ -112,7 +103,7 @@ final class ConnectionPool { /// Used by the `ConnectionPool` to index its `HTTP1ConnectionProvider`s /// /// A key is initialized from a `URL`, it uses the components to derive a hashed value - /// used by the `connectionProviders` dictionary to allow retrieving and creating + /// used by the `providers` dictionary to allow retrieving and creating /// connection providers associated to a certain request in constant time. struct Key: Hashable { init(_ request: HTTPClient.Request) { @@ -142,134 +133,164 @@ final class ConnectionPool { case unix } } +} - /// A `Connection` represents a `Channel` in the context of the connection pool +/// A `Connection` represents a `Channel` in the context of the connection pool +/// +/// In the `ConnectionPool`, each `Channel` belongs to a given `HTTP1ConnectionProvider` +/// and has a certain "lease state" (see the `inUse` property). +/// The role of `Connection` is to model this by storing a `Channel` alongside its associated properties +/// so that they can be passed around together and correct provider can be identified when connection is released. +class Connection: CustomStringConvertible { + /// The provider this `Connection` belongs to. /// - /// In the `ConnectionPool`, each `Channel` belongs to a given `HTTP1ConnectionProvider` - /// and has a certain "lease state" (see the `isLeased` property). - /// The role of `Connection` is to model this by storing a `Channel` alongside its associated properties - /// so that they can be passed around together. + /// This enables calling methods like `release()` directly on a `Connection` instead of + /// calling `provider.release(connection)`. This gives a more object oriented feel to the API + /// and can avoid having to keep explicit references to the pool at call site. + let provider: HTTP1ConnectionProvider + + /// The `Channel` of this `Connection` /// - /// - Warning: `Connection` properties are not thread-safe and should be used with proper synchronization - class Connection: CustomStringConvertible { - /// The connection pool this `Connection` belongs to. - /// - /// This enables calling methods like `release()` directly on a `Connection` instead of - /// calling `pool.release(connection)`. This gives a more object oriented feel to the API - /// and can avoid having to keep explicit references to the pool at call site. - private let pool: ConnectionPool + /// - Warning: Requests that lease connections from the `ConnectionPool` are responsible + /// for removing the specific handlers they added to the `Channel` pipeline before releasing it to the pool. + let channel: Channel - /// The `Key` of the `HTTP1ConnectionProvider` this `Connection` belongs to - /// - /// This lets `ConnectionPool` know the relationship between `Connection`s and `HTTP1ConnectionProvider`s - let key: Key + /// This indicates if connection is going to be or is used for a request. + private var inUse: NIOAtomic - /// The `Channel` of this `Connection` - /// - /// - Warning: Requests that lease connections from the `ConnectionPool` are responsible - /// for removing the specific handlers they added to the `Channel` pipeline before releasing it to the pool. - let channel: Channel - let closeFuture: EventLoopFuture + /// This indicates that connection is going to be closed. + private var closing: NIOAtomic - /// Indicates that this connection is about to close - var isClosing: Bool = false + init(channel: Channel, provider: HTTP1ConnectionProvider) { + self.channel = channel + self.provider = provider + self.inUse = NIOAtomic.makeAtomic(value: true) + self.closing = NIOAtomic.makeAtomic(value: false) + } - init(key: Key, channel: Channel, closeFuture: EventLoopFuture, pool: ConnectionPool) { - self.key = key - self.channel = channel - self.closeFuture = closeFuture - self.pool = pool - } + deinit { + assert(!self.inUse.load()) + } - var description: String { - return "Connection { channel: \(self.channel) }" - } + var description: String { + return "Connection { channel: \(self.channel) }" + } - /// Convenience property indicating wether the underlying `Channel` is active or not - var isActiveEstimation: Bool { - return self.channel.isActive && !self.isClosing - } + /// Convenience property indicating wether the underlying `Channel` is active or not. + var isActiveEstimation: Bool { + return !self.isClosing && self.channel.isActive + } - /// Release this `Connection` to its associated `HTTP1ConnectionProvider` in the parent `ConnectionPool` - /// - /// - Warning: This only releases the connection and doesn't take care of cleaning handlers in the `Channel` pipeline. - func release() { - self.pool.release(self) + var isClosing: Bool { + get { + return self.closing.load() } - - func close() -> EventLoopFuture { - self.channel.close(promise: nil) - return self.closeFuture + set { + self.closing.store(newValue) } + } - func cancelIdleTimeout() -> EventLoopFuture { - return self.channel.eventLoop.flatSubmit { - self.removeHandler(IdleStateHandler.self).flatMap { () -> EventLoopFuture in - self.channel.pipeline.handler(type: IdlePoolConnectionHandler.self).flatMap { idleHandler in - self.channel.pipeline.removeHandler(idleHandler).flatMapError { _ in - self.channel.eventLoop.makeSucceededFuture(()) - }.map { - idleHandler.hasNotSentClose && self.channel.isActive - } - }.flatMapError { error in - // These handlers are only added on connection release, they are not added - // when a connection is made to be instantly leased, so we ignore this error - if let channelError = error as? ChannelPipelineError, channelError == .notFound { - return self.channel.eventLoop.makeSucceededFuture(self.channel.isActive) - } else { - return self.channel.eventLoop.makeFailedFuture(error) - } - } - }.flatMap { channelIsUsable in - if channelIsUsable { - return self.channel.eventLoop.makeSucceededFuture(self) - } else { - return self.channel.eventLoop.makeFailedFuture(InactiveChannelError()) - } - } - } + var isInUse: Bool { + get { + return self.inUse.load() } + set { + return self.inUse.store(newValue) + } + } - struct InactiveChannelError: Error {} + func lease() { + self.inUse.store(true) } - /// A connection provider of `HTTP/1.1` connections with a given `Key` (host, scheme, port) + /// Release this `Connection` to its associated `HTTP1ConnectionProvider`. /// - /// On top of enabling connection reuse this provider it also facilitates the creation - /// of concurrent requests as it has built-in politeness regarding the maximum number - /// of concurrent requests to the server. - class HTTP1ConnectionProvider: CustomStringConvertible { - enum Action { - case lease(Connection, Waiter) - case create(Waiter) - case delete - case none + /// - Warning: This only releases the connection and doesn't take care of cleaning handlers in the `Channel` pipeline. + func release() { + assert(self.channel.eventLoop.inEventLoop) + assert(self.inUse.load()) + + self.inUse.store(false) + self.provider.release(connection: self, inPool: false) + } + + /// Called when channel exceeds idle time in pool. + func timeout() { + assert(self.channel.eventLoop.inEventLoop) + + // We can get timeout and inUse = true when we decided to lease the connection, but this action is not executed yet. + // In this case we can ignore timeout notification. If connection was not in use, we release it from the pool, increasing + // available capacity + if !self.inUse.load() { + self.closing.store(true) + self.provider.release(connection: self, inPool: true) + self.channel.close(promise: nil) + } + } + + /// Called when channel goes inactive while in the pool. + func remoteClosed() { + assert(self.channel.eventLoop.inEventLoop) + + // Connection can be closed remotely while we wait for `.lease` action to complete. If this + // happens, we have no other choice but to mark connection as `closing`. If this connection is not in use, + // the have to release it as well + self.closing.store(true) + if !self.inUse.load() { + self.provider.release(connection: self, inPool: true) } + } - /// The client configuration used to bootstrap new requests - private let configuration: HTTPClient.Configuration + /// Called from `HTTP1ConnectionProvider.close` when client is shutting down. + func close() -> EventLoopFuture { + assert(!self.inUse.load()) - /// The pool this provider belongs to - private let pool: ConnectionPool + self.closing.store(true) + return self.channel.close() + } - /// The key associated with this provider - let key: ConnectionPool.Key + /// Sets idle timeout handler and channel inactivity listener. + func setIdleTimeout(timeout: TimeAmount?) { + _ = self.channel.pipeline.addHandler(IdleStateHandler(writeTimeout: timeout)).flatMap { _ in + self.channel.pipeline.addHandler(IdlePoolConnectionHandler(connection: self)) + } + } + + /// Removes idle timeout handler and channel inactivity listener + func cancelIdleTimeout() -> EventLoopFuture { + return self.removeHandler(IdleStateHandler.self).flatMap { _ in + self.removeHandler(IdlePoolConnectionHandler.self) + } + } +} - /// The default `EventLoop` for this provider - /// - /// The default event loop is used to create futures and is used when creating `Channel`s for requests - /// for which the `EventLoopPreference` is set to `.indifferent` - let eventLoop: EventLoop +/// A connection provider of `HTTP/1.1` connections with a given `Key` (host, scheme, port) +/// +/// On top of enabling connection reuse this provider it also facilitates the creation +/// of concurrent requests as it has built-in politeness regarding the maximum number +/// of concurrent requests to the server. +class HTTP1ConnectionProvider: CustomStringConvertible { + enum Action { + case lease(Connection, Waiter) + case create(Waiter) + case replace(Connection, Waiter) + case deleteProvider + case park(Connection) + case none + case fail(Waiter, Error) + indirect case parkAnd(Connection, Action) + } - /// The lock used to access and modify the provider state - `availableConnections`, `waiters` and `openedConnectionsCount`. - /// - /// - Warning: This lock should always be acquired *after* `ConnectionPool`s `lock` if used in combination with it. - private let lock = Lock() + struct ConnectionsState { + enum State { + case active + case closed + } - /// The maximum number of concurrent connections to a given (host, scheme, port) - private let maximumConcurrentConnections: Int = 8 + private let maximumConcurrentConnections: Int + private let eventLoop: EventLoop - private var active: Bool = true + var state: State = .active /// Opened connections that are available var availableConnections: CircularBuffer = .init(initialCapacity: 8) @@ -283,65 +304,53 @@ final class ConnectionPool { /// Number of opened or opening connections, used to keep track of all connections and enforcing `maximumConcurrentConnections` limit. var openedConnectionsCount: Int = 0 - /// Creates a new `HTTP1ConnectionProvider` - /// - /// - parameters: - /// - key: The `Key` (host, scheme, port) this provider is associated to - /// - configuration: The client configuration used globally by all requests - /// - initialConnection: The initial connection the pool initializes this provider with - /// - pool: The pool this provider belongs to - init(key: ConnectionPool.Key, eventLoop: EventLoop, configuration: HTTPClient.Configuration, pool: ConnectionPool) { - self.eventLoop = eventLoop - self.configuration = configuration - self.key = key - self.pool = pool - } - - deinit { - assert(self.waiters.isEmpty) - assert(self.availableConnections.isEmpty) - assert(self.openedConnectionsCount == 0) - } + /// Number of enqueued requests, used to track if it is safe to delete the provider. + var pending: Int = 1 - var description: String { - return "HTTP1ConnectionProvider { key: \(self.key) }" + init(maximumConcurrentConnections: Int = 8, eventLoop: EventLoop) { + self.maximumConcurrentConnections = maximumConcurrentConnections + self.eventLoop = eventLoop } - var isActive: Bool { - return self.lock.withLock { - self.active + mutating func enqueue() -> Bool { + switch self.state { + case .active: + self.pending += 1 + return true + case .closed: + return false } } - func execute(_ action: Action) { - switch action { - case .lease(let connection, let waiter): - // check if we can vend this connection to caller - connection.cancelIdleTimeout().flatMapError { error in - // if connection is already inactive, we create a new one. - if error is Connection.InactiveChannelError { - self.lock.withLockVoid { - self.openedConnectionsCount += 1 - } - return self.makeConnection(on: waiter.preference.bestEventLoop ?? self.eventLoop) + mutating func acquire(waiter: Waiter) -> Action { + switch self.state { + case .active: + self.pending -= 1 + + let (eventLoop, required) = self.resolvePreference(waiter.preference) + if required { + // If there is an opened connection on the same EL - use it + if let found = self.availableConnections.firstIndex(where: { $0.channel.eventLoop === eventLoop }) { + let connection = self.availableConnections.remove(at: found) + connection.lease() + return .lease(connection, waiter) + } + + // If we can create additional connection, create + if self.openedConnectionsCount < self.maximumConcurrentConnections { + self.openedConnectionsCount += 1 + return .create(waiter) } - return connection.channel.eventLoop.makeFailedFuture(error) - } - .cascade(to: waiter.promise) - case .create(let waiter): - self.makeConnection(on: waiter.preference.bestEventLoop ?? self.eventLoop).cascade(to: waiter.promise) - case .delete: - self.pool.delete(self) - case .none: - break - } - } - func getConnection(preference: HTTPClient.EventLoopPreference) -> EventLoopFuture { - let waiter = Waiter(promise: self.eventLoop.makePromise(), preference: preference) + // If we cannot create additional connection, but there is one in the pool, replace it + if let connection = self.availableConnections.popFirst() { + return .replace(connection, waiter) + } - let action: Action = self.lock.withLock { - if let connection = self.availableConnections.popFirst() { + self.waiters.append(waiter) + return .none + } else if let connection = self.availableConnections.popFirst() { + connection.lease() return .lease(connection, waiter) } else if self.openedConnectionsCount < self.maximumConcurrentConnections { self.openedConnectionsCount += 1 @@ -350,36 +359,265 @@ final class ConnectionPool { self.waiters.append(waiter) return .none } + case .closed: + return .fail(waiter, ProviderClosedError()) } - - self.execute(action) - - return waiter.promise.futureResult } - func release(connection: Connection) { - let action: Action = self.lock.withLock { - if connection.isActiveEstimation { // If connection is alive, we can give to a next waiter + mutating func release(connection: Connection, inPool: Bool) -> Action { + switch self.state { + case .active: + if connection.isActiveEstimation { // If connection is alive, we can offer it to a next waiter if let waiter = self.waiters.popFirst() { - return .lease(connection, waiter) - } else { + let (eventLoop, required) = self.resolvePreference(waiter.preference) + + // If returned connection is on same EL or we do not require special EL - lease it + if connection.channel.eventLoop === eventLoop || !required { + connection.lease() + return .lease(connection, waiter) + } + + // If there is an opened connection on the same loop, lease it and park returned + if let found = self.availableConnections.firstIndex(where: { $0.channel.eventLoop === eventLoop }) { + let replacement = self.availableConnections.swap(at: found, with: connection) + replacement.lease() + return .parkAnd(connection, .lease(replacement, waiter)) + } + + // If we can create new connection - do it + if self.openedConnectionsCount < self.maximumConcurrentConnections { + self.availableConnections.append(connection) + self.openedConnectionsCount += 1 + return .parkAnd(connection, .create(waiter)) + } + + // If we cannot create new connections, we will have to replace returned connection with a new one on the required loop + return .replace(connection, waiter) + } else { // or park, if there are no waiters self.availableConnections.append(connection) - return .none + return .park(connection) } - } else { + } else { // if connection is not alive, we delete it and process the next waiter + // this connections is now gone, we will either create new connection or do nothing + assert(!connection.isInUse) + + self.openedConnectionsCount -= 1 + if let waiter = self.waiters.popFirst() { + let (eventLoop, required) = self.resolvePreference(waiter.preference) + + // If specific EL is required, we have only two options - find open one or create a new one + if required, let found = self.availableConnections.firstIndex(where: { $0.channel.eventLoop === eventLoop }) { + let connection = self.availableConnections.remove(at: found) + connection.lease() + return .lease(connection, waiter) + } else if !required, let connection = self.availableConnections.popFirst() { + connection.lease() + return .lease(connection, waiter) + } else { + self.openedConnectionsCount += 1 + return .create(waiter) + } + } + + if inPool { + self.availableConnections.removeAll { $0 === connection } + } + + // if capacity is at max and the are no waiters and no in-flight requests for connection, we are closing this provider + if self.openedConnectionsCount == 0, self.pending == 0 { + // deactivate and remove + self.state = .closed + return .deleteProvider + } + + return .none + } + case .closed: + assertionFailure("should not happen") + return .none + } + } + + mutating func processNextWaiter() -> Action { + switch self.state { + case .active: + self.openedConnectionsCount -= 1 + + if let waiter = self.waiters.popFirst() { + let (eventLoop, required) = self.resolvePreference(waiter.preference) + + // If specific EL is required, we have only two options - find open one or create a new one + if required, let found = self.availableConnections.firstIndex(where: { $0.channel.eventLoop === eventLoop }) { + let connection = self.availableConnections.remove(at: found) + connection.lease() + return .lease(connection, waiter) + } else if !required, let connection = self.availableConnections.popFirst() { + connection.lease() + return .lease(connection, waiter) + } else { self.openedConnectionsCount += 1 return .create(waiter) } + } - return .none + // if capacity is at max and the are no waiters and no in-flight requests for connection, we are closing this provider + if self.openedConnectionsCount == 0, self.pending == 0 { + // deactivate and remove + self.state = .closed + return .deleteProvider } + + return .none + case .closed: + assertionFailure("should not happen") + return .none } + } - if case .none = action { - return + mutating func close() -> (CircularBuffer, CircularBuffer)? { + switch self.state { + case .active: + let waiters = self.waiters + self.waiters.removeAll() + + let connections = self.availableConnections + self.availableConnections.removeAll() + + return (waiters, connections) + case .closed: + return nil } + } + private func resolvePreference(_ preference: HTTPClient.EventLoopPreference) -> (EventLoop, Bool) { + switch preference.preference { + case .indifferent: + return (self.eventLoop, false) + case .delegate(let el): + return (el, false) + case .delegateAndChannel(let el), .testOnly_exact(let el, _): + return (el, true) + } + } + } + + struct ProviderClosedError: Error { + } + + /// The client configuration used to bootstrap new requests + private let configuration: HTTPClient.Configuration + + /// The pool this provider belongs to + private let pool: ConnectionPool + + /// The key associated with this provider + let key: ConnectionPool.Key + + /// The default `EventLoop` for this provider + /// + /// The default event loop is used to create futures and is used when creating `Channel`s for requests + /// for which the `EventLoopPreference` is set to `.indifferent` + let eventLoop: EventLoop + + /// The lock used to access and modify the provider state - `availableConnections`, `waiters` and `openedConnectionsCount`. + /// + /// - Warning: This lock should always be acquired *after* `ConnectionPool`s `lock` if used in combination with it. + private let lock = Lock() + + var state: ConnectionsState + + /// Creates a new `HTTP1ConnectionProvider` + /// + /// - parameters: + /// - key: The `Key` (host, scheme, port) this provider is associated to + /// - configuration: The client configuration used globally by all requests + /// - initialConnection: The initial connection the pool initializes this provider with + /// - pool: The pool this provider belongs to + init(key: ConnectionPool.Key, eventLoop: EventLoop, configuration: HTTPClient.Configuration, pool: ConnectionPool) { + self.eventLoop = eventLoop + self.configuration = configuration + self.key = key + self.pool = pool + self.state = .init(eventLoop: eventLoop) + } + + deinit { + assert(self.state.waiters.isEmpty) + assert(self.state.availableConnections.isEmpty) + assert(self.state.openedConnectionsCount == 0) + assert(self.state.pending == 0) + } + + var description: String { + return "HTTP1ConnectionProvider { key: \(self.key) }" + } + + private func execute(_ action: Action) { + switch action { + case .lease(let connection, let waiter): + // if connection is became inactive, we create a new one. + connection.cancelIdleTimeout().whenComplete { _ in + if connection.isActiveEstimation { + waiter.promise.succeed(connection) + } else { + connection.isInUse = false + self.makeConnection(on: waiter.preference.bestEventLoop ?? self.eventLoop).cascade(to: waiter.promise) + } + } + case .create(let waiter): + self.makeConnection(on: waiter.preference.bestEventLoop ?? self.eventLoop).cascade(to: waiter.promise) + case .replace(let connection, let waiter): + connection.cancelIdleTimeout().whenComplete { + _ in connection.channel.close(promise: nil) + } + self.makeConnection(on: waiter.preference.bestEventLoop ?? self.eventLoop).cascade(to: waiter.promise) + case .park(let connection): + connection.setIdleTimeout(timeout: self.configuration.maximumAllowedIdleTimeInConnectionPool) + case .deleteProvider: + self.pool.delete(self) + case .none: + break + case .parkAnd(let connection, let action): + connection.setIdleTimeout(timeout: self.configuration.maximumAllowedIdleTimeInConnectionPool) + self.execute(action) + case .fail(let waiter, let error): + waiter.promise.fail(error) + } + } + + /// This function is needed to ensure that there is no race between getting a provider from map, and shutting it down when there are no requests processed by it. + func enqueue() -> Bool { + return self.lock.withLock { + self.state.enqueue() + } + } + + func getConnection(preference: HTTPClient.EventLoopPreference) -> EventLoopFuture { + let waiter = Waiter(promise: self.eventLoop.makePromise(), preference: preference) + + let action: Action = self.lock.withLock { + self.state.acquire(waiter: waiter) + } + + self.execute(action) + + return waiter.promise.futureResult + } + + func release(connection: Connection, inPool: Bool = false) { + let action: Action = self.lock.withLock { + self.state.release(connection: connection, inPool: inPool) + } + + switch action { + case .none: + break + case .park, .deleteProvider: + // Since both `.park` and `.deleteProvider` are terminal in terms of execution, + // we can execute them immediately + self.execute(action) + default: // This is needed to start a new stack, otherwise, since this is called on a previous // future completion handler chain, it will be growing indefinitely until the connection is closed. // We might revisit this when https://github.com/apple/swift-nio/issues/970 is resolved. @@ -387,143 +625,124 @@ final class ConnectionPool { self.execute(action) } } + } - func delete(connection: Connection) { - let action: Action = self.lock.withLock { - self.openedConnectionsCount -= 1 - self.availableConnections.removeAll { $0 === connection } - - if self.openedConnectionsCount == 0 { - self.active = false - return .delete - } + func processNextWaiter() { + let action: Action = self.lock.withLock { + self.state.processNextWaiter() + } - return .none - } + self.execute(action) + } - self.execute(action) + func close(on eventLoop: EventLoop) -> EventLoopFuture { + if let (waiters, connections) = self.lock.withLock({ self.state.close() }) { + waiters.forEach { $0.promise.fail(HTTPClientError.cancelled) } + return EventLoopFuture.andAllComplete(connections.map { $0.close() }, on: eventLoop) } + return self.eventLoop.makeSucceededFuture(()) + } - private func processNextWaiter() { - let action: Action = self.lock.withLock { - if let waiter = self.waiters.popFirst() { - self.openedConnectionsCount += 1 - return .create(waiter) - } else if self.openedConnectionsCount == 0 { - self.active = false - return .delete - } - return .none - } - - self.execute(action) + private func makeConnection(on eventLoop: EventLoop) -> EventLoopFuture { + let requiresTLS = self.key.scheme == .https + let bootstrap: NIOClientTCPBootstrap + do { + bootstrap = try NIOClientTCPBootstrap.makeHTTPClientBootstrapBase(on: eventLoop, host: self.key.host, port: self.key.port, requiresTLS: requiresTLS, configuration: self.configuration) + } catch { + return eventLoop.makeFailedFuture(error) } - private func makeConnection(on eventLoop: EventLoop) -> EventLoopFuture { + let channel: EventLoopFuture + switch self.key.scheme { + case .http, .https: let address = HTTPClient.resolveAddress(host: self.key.host, port: self.key.port, proxy: self.configuration.proxy) - let requiresTLS = self.key.scheme == .https - let bootstrap: NIOClientTCPBootstrap - do { - bootstrap = try NIOClientTCPBootstrap.makeHTTPClientBootstrapBase(on: eventLoop, host: self.key.host, port: self.key.port, requiresTLS: requiresTLS, configuration: self.configuration) - } catch { - return eventLoop.makeFailedFuture(error) - } - let handshakePromise = eventLoop.makePromise(of: Void.self) + channel = bootstrap.connect(host: address.host, port: address.port) + case .unix: + channel = bootstrap.connect(unixDomainSocketPath: self.key.unixPath) + } - let channel: EventLoopFuture - switch self.key.scheme { - case .http, .https: - channel = bootstrap.connect(host: address.host, port: address.port) - case .unix: - channel = bootstrap.connect(unixDomainSocketPath: self.key.unixPath) - } + return channel.flatMap { channel in + let requiresSSLHandler = self.configuration.proxy != nil && self.key.scheme == .https + let handshakePromise = eventLoop.makePromise(of: Void.self) - return channel.flatMap { channel -> EventLoopFuture in - let requiresSSLHandler = self.configuration.proxy != nil && self.key.scheme == .https - channel.pipeline.addSSLHandlerIfNeeded(for: self.key, tlsConfiguration: self.configuration.tlsConfiguration, addSSLClient: requiresSSLHandler, handshakePromise: handshakePromise) - return handshakePromise.futureResult.flatMap { - channel.pipeline.addHTTPClientHandlers(leftOverBytesStrategy: .forwardBytes) - }.flatMap { - #if canImport(Network) - if #available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *), bootstrap.underlyingBootstrap is NIOTSConnectionBootstrap { - return channel.pipeline.addHandler(HTTPClient.NWErrorHandler(), position: .first) - } - #endif - return eventLoop.makeSucceededFuture(()) - }.map { - let closePromise = channel.eventLoop.makePromise(of: Void.self) - let connection = Connection(key: self.key, channel: channel, closeFuture: closePromise.futureResult, pool: self.pool) - - channel.closeFuture.whenComplete { _ in - self.delete(connection: connection) - closePromise.succeed(()) - } + channel.pipeline.addSSLHandlerIfNeeded(for: self.key, tlsConfiguration: self.configuration.tlsConfiguration, addSSLClient: requiresSSLHandler, handshakePromise: handshakePromise) - return connection - } - }.flatMapError { error in + return handshakePromise.futureResult.flatMap { + channel.pipeline.addHTTPClientHandlers(leftOverBytesStrategy: .forwardBytes) + }.flatMap { #if canImport(Network) - var error = error if #available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *), bootstrap.underlyingBootstrap is NIOTSConnectionBootstrap { - error = HTTPClient.NWErrorHandler.translateError(error) + return channel.pipeline.addHandler(HTTPClient.NWErrorHandler(), position: .first) } #endif - - // This promise may not have been completed if we reach this so we fail it to avoid any leak - handshakePromise.fail(error) - - // since we failed to create a connection, we need to decrease opened connection count - self.lock.withLockVoid { - self.openedConnectionsCount -= 1 + return eventLoop.makeSucceededFuture(()) + }.map { + Connection(channel: channel, provider: self) + } + }.flatMapError { error in + #if canImport(Network) + var error = error + if #available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *), bootstrap.underlyingBootstrap is NIOTSConnectionBootstrap { + error = HTTPClient.NWErrorHandler.translateError(error) } + #endif - // there is no connection here anymore, we need to bootstrap next waiter - self.processNextWaiter() - return self.eventLoop.makeFailedFuture(error) - } + // there is no connection here anymore, we need to bootstrap next waiter + self.processNextWaiter() + return self.eventLoop.makeFailedFuture(error) } + } - func close(on eventLoop: EventLoop) -> EventLoopFuture { - let waiters: CircularBuffer = self.lock.withLock { - let copy = self.waiters - self.waiters.removeAll() - return copy - } + /// A `Waiter` represents a request that waits for a connection when none is + /// currently available + /// + /// `Waiter`s are created when `maximumConcurrentConnections` is reached + /// and we cannot create new connections anymore. + struct Waiter { + /// The promise to complete once a connection is available + let promise: EventLoopPromise + + /// The event loop preference associated to this particular request + /// that the provider should respect + let preference: HTTPClient.EventLoopPreference + } +} - waiters.forEach { $0.promise.fail(HTTPClientError.cancelled) } +class IdlePoolConnectionHandler: ChannelInboundHandler, RemovableChannelHandler { + typealias InboundIn = NIOAny - let connections: CircularBuffer = self.lock.withLock { - let copy = self.availableConnections - self.availableConnections.removeAll() - return copy - } + let connection: Connection + var eventSent: Bool - return EventLoopFuture.andAllComplete(connections.map { $0.close() }, on: eventLoop) + init(connection: Connection) { + self.connection = connection + self.eventSent = false + } + + // this is needed to detect when remote end closes connection while connection is in the pool idling + func channelInactive(context: ChannelHandlerContext) { + if !self.eventSent { + self.eventSent = true + self.connection.remoteClosed() } + } - private func resolvePreference(_ preference: HTTPClient.EventLoopPreference) -> (EventLoop, Bool) { - switch preference.preference { - case .indifferent: - return (self.eventLoop, false) - case .delegate(let el): - return (el, false) - case .delegateAndChannel(let el), .testOnly_exact(let el, _): - return (el, true) + func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) { + if let idleEvent = event as? IdleStateHandler.IdleStateEvent, idleEvent == .write { + if !self.eventSent { + self.eventSent = true + self.connection.timeout() } + } else { + context.fireUserInboundEventTriggered(event) } + } +} - /// A `Waiter` represents a request that waits for a connection when none is - /// currently available - /// - /// `Waiter`s are created when `maximumConcurrentConnections` is reached - /// and we cannot create new connections anymore. - struct Waiter { - /// The promise to complete once a connection is available - let promise: EventLoopPromise - - /// The event loop preference associated to this particular request - /// that the provider should respect - let preference: HTTPClient.EventLoopPreference - } +extension CircularBuffer { + mutating func swap(at index: Index, with value: Element) -> Element { + let tmp = self[index] + self[index] = value + return tmp } } diff --git a/Sources/AsyncHTTPClient/HTTPClient.swift b/Sources/AsyncHTTPClient/HTTPClient.swift index b3ee21c2d..5c3d1cc04 100644 --- a/Sources/AsyncHTTPClient/HTTPClient.swift +++ b/Sources/AsyncHTTPClient/HTTPClient.swift @@ -357,7 +357,7 @@ public class HTTPClient { redirectHandler = nil } - let task = Task(eventLoop: taskEL, poolingTimeout: self.configuration.maximumAllowedIdleTimeInConnectionPool) + let task = Task(eventLoop: taskEL) self.stateLock.withLock { self.tasks[task.id] = task } diff --git a/Sources/AsyncHTTPClient/HTTPHandler.swift b/Sources/AsyncHTTPClient/HTTPHandler.swift index c7a48b91e..5993995e7 100644 --- a/Sources/AsyncHTTPClient/HTTPHandler.swift +++ b/Sources/AsyncHTTPClient/HTTPHandler.swift @@ -491,19 +491,17 @@ extension HTTPClient { let promise: EventLoopPromise var completion: EventLoopFuture - var connection: ConnectionPool.Connection? + var connection: Connection? var cancelled: Bool let lock: Lock let id = UUID() - let poolingTimeout: TimeAmount? - init(eventLoop: EventLoop, poolingTimeout: TimeAmount? = nil) { + init(eventLoop: EventLoop) { self.eventLoop = eventLoop self.promise = eventLoop.makePromise() self.completion = self.promise.futureResult.map { _ in } self.cancelled = false self.lock = Lock() - self.poolingTimeout = poolingTimeout } static func failedTask(eventLoop: EventLoop, error: Error) -> Task { @@ -539,7 +537,7 @@ extension HTTPClient { } @discardableResult - func setConnection(_ connection: ConnectionPool.Connection) -> ConnectionPool.Connection { + func setConnection(_ connection: Connection) -> Connection { return self.lock.withLock { self.connection = connection if self.cancelled { @@ -558,10 +556,8 @@ extension HTTPClient { func fail(with error: Error, delegateType: Delegate.Type) { if let connection = self.connection { connection.channel.close(promise: nil) - self.releaseAssociatedConnection(delegateType: delegateType).whenComplete { _ in - connection.closeFuture.whenComplete { _ in - self.promise.fail(error) - } + self.releaseAssociatedConnection(delegateType: delegateType).whenSuccess { + self.promise.fail(error) } } } @@ -569,28 +565,15 @@ extension HTTPClient { func releaseAssociatedConnection(delegateType: Delegate.Type) -> EventLoopFuture { if let connection = self.connection { return connection.removeHandler(NIOHTTPResponseDecompressor.self).flatMap { + // remove read timeout handler connection.removeHandler(IdleStateHandler.self) }.flatMap { connection.removeHandler(TaskHandler.self) - }.flatMap { - let idlePoolConnectionHandler = IdlePoolConnectionHandler() - return connection.channel.pipeline.addHandler(idlePoolConnectionHandler, position: .last).flatMap { - connection.channel.pipeline.addHandler(IdleStateHandler(writeTimeout: self.poolingTimeout), position: .before(idlePoolConnectionHandler)) - } - }.flatMapError { error in - if let error = error as? ChannelError, error == .ioOnClosedChannel { - // We may get this error if channel is released because it is - // closed, it is safe to ignore it - return connection.channel.eventLoop.makeSucceededFuture(()) - } else { - return connection.channel.eventLoop.makeFailedFuture(error) - } }.map { connection.release() }.flatMapError { error in fatalError("Couldn't remove taskHandler: \(error)") } - } else { // TODO: This seems only reached in some internal unit test // Maybe there could be a better handling in the future to make @@ -1022,21 +1005,3 @@ internal struct RedirectHandler { } } } - -class IdlePoolConnectionHandler: ChannelInboundHandler, RemovableChannelHandler { - typealias InboundIn = NIOAny - - let _hasNotSentClose: NIOAtomic = .makeAtomic(value: true) - var hasNotSentClose: Bool { - return self._hasNotSentClose.load() - } - - func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) { - if let idleEvent = event as? IdleStateHandler.IdleStateEvent, idleEvent == .write { - self._hasNotSentClose.store(false) - context.close(promise: nil) - } else { - context.fireUserInboundEventTriggered(event) - } - } -} diff --git a/Sources/AsyncHTTPClient/Utils.swift b/Sources/AsyncHTTPClient/Utils.swift index 5500dc813..9160653e1 100644 --- a/Sources/AsyncHTTPClient/Utils.swift +++ b/Sources/AsyncHTTPClient/Utils.swift @@ -135,7 +135,7 @@ extension NIOClientTCPBootstrap { } } -extension ConnectionPool.Connection { +extension Connection { func removeHandler(_ type: Handler.Type) -> EventLoopFuture { return self.channel.pipeline.handler(type: type).flatMap { handler in self.channel.pipeline.removeHandler(handler) diff --git a/Tests/AsyncHTTPClientTests/ConnectionPoolTests.swift b/Tests/AsyncHTTPClientTests/ConnectionPoolTests.swift new file mode 100644 index 000000000..9ce873a70 --- /dev/null +++ b/Tests/AsyncHTTPClientTests/ConnectionPoolTests.swift @@ -0,0 +1,1089 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the AsyncHTTPClient open source project +// +// Copyright (c) 2018-2019 Apple Inc. and the AsyncHTTPClient project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +@testable import AsyncHTTPClient +@testable import NIO +import NIOConcurrencyHelpers +import NIOFoundationCompat +import NIOHTTP1 +import NIOHTTPCompression +import NIOSSL +import NIOTestUtils +import NIOTransportServices +import XCTest + +class ConnectionPoolTests: XCTestCase { + struct TempError: Error { + } + + func testPending() { + let eventLoop = EmbeddedEventLoop() + + var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: eventLoop) + + XCTAssertEqual(0, state.availableConnections.count) + XCTAssertEqual(0, state.waiters.count) + XCTAssertEqual(1, state.pending) + XCTAssertEqual(0, state.openedConnectionsCount) + + XCTAssertTrue(state.enqueue()) + + XCTAssertEqual(0, state.availableConnections.count) + XCTAssertEqual(0, state.waiters.count) + XCTAssertEqual(2, state.pending) + XCTAssertEqual(0, state.openedConnectionsCount) + } + + // MARK: - Acquire Tests + + func testAcquireWhenEmpty() { + let eventLoop = EmbeddedEventLoop() + + var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: eventLoop) + + XCTAssertEqual(0, state.availableConnections.count) + XCTAssertEqual(0, state.waiters.count) + XCTAssertEqual(1, state.pending) + XCTAssertEqual(0, state.openedConnectionsCount) + + let action = state.acquire(waiter: .init(promise: eventLoop.makePromise(), preference: .indifferent)) + switch action { + case .create(let waiter): + waiter.promise.fail(TempError()) + default: + XCTFail("Unexpected action: \(action)") + } + + XCTAssertEqual(0, state.availableConnections.count) + XCTAssertEqual(0, state.waiters.count) + XCTAssertEqual(0, state.pending) + XCTAssertEqual(1, state.openedConnectionsCount) + } + + func testAcquireWhenAvailable() throws { + let eventLoop = EmbeddedEventLoop() + let channel = EmbeddedChannel() + + let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: eventLoop, configuration: .init(), pool: .init(configuration: .init())) + + let connection = Connection(channel: channel, provider: provider) + connection.isInUse = false + provider.state.availableConnections.append(connection) + provider.state.openedConnectionsCount = 1 + + XCTAssertEqual(1, provider.state.availableConnections.count) + XCTAssertEqual(0, provider.state.waiters.count) + XCTAssertEqual(1, provider.state.pending) + XCTAssertEqual(1, provider.state.openedConnectionsCount) + + let action = provider.state.acquire(waiter: .init(promise: eventLoop.makePromise(), preference: .indifferent)) + switch action { + case .lease(let connection, let waiter): + waiter.promise.succeed(connection) + + XCTAssertEqual(0, provider.state.availableConnections.count) + XCTAssertEqual(0, provider.state.waiters.count) + XCTAssertEqual(0, provider.state.pending) + XCTAssertEqual(1, provider.state.openedConnectionsCount) + + // cleanup, since we don't call release + connection.isInUse = false + provider.state.openedConnectionsCount = 0 + default: + XCTFail("Unexpected action: \(action)") + } + } + + func testAcquireWhenUnavailable() throws { + let eventLoop = EmbeddedEventLoop() + + let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: eventLoop, configuration: .init(), pool: .init(configuration: .init())) + + provider.state.openedConnectionsCount = 8 + + XCTAssertEqual(0, provider.state.availableConnections.count) + XCTAssertEqual(0, provider.state.waiters.count) + XCTAssertEqual(1, provider.state.pending) + XCTAssertEqual(8, provider.state.openedConnectionsCount) + + let action = provider.state.acquire(waiter: .init(promise: eventLoop.makePromise(), preference: .indifferent)) + switch action { + case .none: + XCTAssertEqual(0, provider.state.availableConnections.count) + XCTAssertEqual(1, provider.state.waiters.count) + XCTAssertEqual(0, provider.state.pending) + XCTAssertEqual(8, provider.state.openedConnectionsCount) + default: + XCTFail("Unexpected action: \(action)") + } + + // cleanup + provider.state.openedConnectionsCount = 0 + try provider.close(on: eventLoop).wait() + } + + // MARK: - Acquire on Specific EL Tests + + func testAcquireWhenEmptySpecificEL() { + let eventLoop = EmbeddedEventLoop() + + var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: eventLoop) + + XCTAssertEqual(0, state.availableConnections.count) + XCTAssertEqual(0, state.waiters.count) + XCTAssertEqual(1, state.pending) + XCTAssertEqual(0, state.openedConnectionsCount) + + let action = state.acquire(waiter: .init(promise: eventLoop.makePromise(), preference: .delegateAndChannel(on: eventLoop))) + switch action { + case .create(let waiter): + waiter.promise.fail(TempError()) + default: + XCTFail("Unexpected action: \(action)") + } + + XCTAssertEqual(0, state.availableConnections.count) + XCTAssertEqual(0, state.waiters.count) + XCTAssertEqual(0, state.pending) + XCTAssertEqual(1, state.openedConnectionsCount) + } + + func testAcquireWhenAvailableSpecificEL() throws { + let channel = EmbeddedChannel() + + let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) + + let connection = Connection(channel: channel, provider: provider) + connection.isInUse = false + provider.state.availableConnections.append(connection) + provider.state.openedConnectionsCount = 1 + + XCTAssertEqual(1, provider.state.availableConnections.count) + XCTAssertEqual(0, provider.state.waiters.count) + XCTAssertEqual(1, provider.state.pending) + XCTAssertEqual(1, provider.state.openedConnectionsCount) + + let action = provider.state.acquire(waiter: .init(promise: channel.eventLoop.makePromise(), preference: .delegateAndChannel(on: channel.eventLoop))) + switch action { + case .lease(let connection, let waiter): + waiter.promise.succeed(connection) + + XCTAssertEqual(0, provider.state.availableConnections.count) + XCTAssertEqual(0, provider.state.waiters.count) + XCTAssertEqual(0, provider.state.pending) + XCTAssertEqual(1, provider.state.openedConnectionsCount) + + // cleanup, since we don't call release + connection.isInUse = false + provider.state.openedConnectionsCount = 0 + default: + XCTFail("Unexpected action: \(action)") + } + } + + func testAcquireReplace() throws { + let eventLoop = EmbeddedEventLoop() + let channel = EmbeddedChannel() + + let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) + + let connection = Connection(channel: channel, provider: provider) + connection.isInUse = false + provider.state.availableConnections.append(connection) + provider.state.openedConnectionsCount = 8 + + XCTAssertEqual(1, provider.state.availableConnections.count) + XCTAssertEqual(0, provider.state.waiters.count) + XCTAssertEqual(1, provider.state.pending) + XCTAssertEqual(8, provider.state.openedConnectionsCount) + + let action = provider.state.acquire(waiter: .init(promise: eventLoop.makePromise(), preference: .delegateAndChannel(on: eventLoop))) + switch action { + case .replace(let connection, let waiter): + connection.isInUse = false + waiter.promise.fail(TempError()) + + XCTAssertEqual(0, provider.state.availableConnections.count) + XCTAssertEqual(0, provider.state.waiters.count) + XCTAssertEqual(0, provider.state.pending) + XCTAssertEqual(8, provider.state.openedConnectionsCount) + default: + XCTFail("Unexpected action: \(action)") + } + + // cleanup + provider.state.openedConnectionsCount = 0 + try provider.close(on: channel.eventLoop).wait() + } + + func testAcquireWhenUnavailableSpecificEL() throws { + let eventLoop = EmbeddedEventLoop() + + let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: eventLoop, configuration: .init(), pool: .init(configuration: .init())) + + provider.state.openedConnectionsCount = 8 + + XCTAssertEqual(0, provider.state.availableConnections.count) + XCTAssertEqual(0, provider.state.waiters.count) + XCTAssertEqual(1, provider.state.pending) + XCTAssertEqual(8, provider.state.openedConnectionsCount) + + let action = provider.state.acquire(waiter: .init(promise: eventLoop.makePromise(), preference: .delegateAndChannel(on: eventLoop))) + switch action { + case .none: + XCTAssertEqual(0, provider.state.availableConnections.count) + XCTAssertEqual(1, provider.state.waiters.count) + XCTAssertEqual(0, provider.state.pending) + XCTAssertEqual(8, provider.state.openedConnectionsCount) + default: + XCTFail("Unexpected action: \(action)") + } + + // cleanup + provider.state.openedConnectionsCount = 0 + try provider.close(on: eventLoop).wait() + } + + // MARK: - Acquire Errors Tests + + func testAcquireWhenClosed() { + let eventLoop = EmbeddedEventLoop() + + var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: eventLoop) + state.state = .closed + + XCTAssertFalse(state.enqueue()) + + let promise = eventLoop.makePromise(of: Connection.self) + let action = state.acquire(waiter: .init(promise: promise, preference: .indifferent)) + switch action { + case .fail(let waiter, let error): + waiter.promise.fail(error) + default: + XCTFail("Unexpected action: \(action)") + } + } + + // MARK: - Release Tests + + func testReleaseAliveConnectionEmptyQueue() throws { + let channel = ActiveChannel() + + let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) + + provider.state.pending = 0 + provider.state.openedConnectionsCount = 1 + + XCTAssertEqual(0, provider.state.availableConnections.count) + XCTAssertEqual(0, provider.state.waiters.count) + XCTAssertEqual(0, provider.state.pending) + XCTAssertEqual(1, provider.state.openedConnectionsCount) + + let connection = Connection(channel: channel, provider: provider) + connection.isInUse = false + + let action = provider.state.release(connection: connection, inPool: false) + switch action { + case .park: + XCTAssertEqual(1, provider.state.availableConnections.count) + XCTAssertEqual(0, provider.state.waiters.count) + XCTAssertEqual(0, provider.state.pending) + XCTAssertEqual(1, provider.state.openedConnectionsCount) + default: + XCTFail("Unexpected action: \(action)") + } + } + + func testReleaseAliveButClosingConnectionEmptyQueue() throws { + let channel = ActiveChannel() + + let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) + + provider.state.pending = 0 + provider.state.openedConnectionsCount = 1 + + XCTAssertEqual(0, provider.state.availableConnections.count) + XCTAssertEqual(0, provider.state.waiters.count) + XCTAssertEqual(0, provider.state.pending) + XCTAssertEqual(1, provider.state.openedConnectionsCount) + + let connection = Connection(channel: channel, provider: provider) + connection.isInUse = false + connection.isClosing = true + + let action = provider.state.release(connection: connection, inPool: false) + switch action { + case .deleteProvider: + XCTAssertEqual(0, provider.state.availableConnections.count) + XCTAssertEqual(0, provider.state.waiters.count) + XCTAssertEqual(0, provider.state.pending) + XCTAssertEqual(0, provider.state.openedConnectionsCount) + default: + XCTFail("Unexpected action: \(action)") + } + } + + func testReleaseInactiveConnectionEmptyQueue() throws { + let channel = EmbeddedChannel() + + let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) + + provider.state.pending = 0 + provider.state.openedConnectionsCount = 1 + + XCTAssertEqual(0, provider.state.availableConnections.count) + XCTAssertEqual(0, provider.state.waiters.count) + XCTAssertEqual(0, provider.state.pending) + XCTAssertEqual(1, provider.state.openedConnectionsCount) + + let connection = Connection(channel: channel, provider: provider) + connection.isInUse = false + + let action = provider.state.release(connection: connection, inPool: true) + switch action { + case .deleteProvider: + XCTAssertEqual(0, provider.state.availableConnections.count) + XCTAssertEqual(0, provider.state.waiters.count) + XCTAssertEqual(0, provider.state.pending) + XCTAssertEqual(0, provider.state.openedConnectionsCount) + default: + XCTFail("Unexpected action: \(action)") + } + } + + func testReleaseInactiveConnectionEmptyQueueHasConnections() throws { + let channel = EmbeddedChannel() + + let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) + + provider.state.pending = 0 + provider.state.openedConnectionsCount = 2 + + let available = Connection(channel: channel, provider: provider) + available.isInUse = false + provider.state.availableConnections.append(available) + + XCTAssertEqual(1, provider.state.availableConnections.count) + XCTAssertEqual(0, provider.state.waiters.count) + XCTAssertEqual(0, provider.state.pending) + XCTAssertEqual(2, provider.state.openedConnectionsCount) + + let connection = Connection(channel: channel, provider: provider) + connection.isInUse = false + + let action = provider.state.release(connection: connection, inPool: false) + switch action { + case .none: + XCTAssertEqual(1, provider.state.availableConnections.count) + XCTAssertEqual(0, provider.state.waiters.count) + XCTAssertEqual(0, provider.state.pending) + XCTAssertEqual(1, provider.state.openedConnectionsCount) + default: + XCTFail("Unexpected action: \(action)") + } + } + + func testReleaseAliveConnectionHasWaiter() throws { + let channel = ActiveChannel() + + let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) + + provider.state.pending = 0 + provider.state.openedConnectionsCount = 1 + provider.state.waiters.append(.init(promise: channel.eventLoop.makePromise(), preference: .indifferent)) + + XCTAssertEqual(0, provider.state.availableConnections.count) + XCTAssertEqual(1, provider.state.waiters.count) + XCTAssertEqual(0, provider.state.pending) + XCTAssertEqual(1, provider.state.openedConnectionsCount) + + let connection = Connection(channel: channel, provider: provider) + connection.isInUse = false + + let action = provider.state.release(connection: connection, inPool: false) + switch action { + case .lease(let connection, let waiter): + XCTAssertTrue(connection.isInUse) + XCTAssertEqual(0, provider.state.availableConnections.count) + XCTAssertEqual(0, provider.state.waiters.count) + XCTAssertEqual(0, provider.state.pending) + XCTAssertEqual(1, provider.state.openedConnectionsCount) + + // cleanup + waiter.promise.succeed(connection) + connection.isInUse = false + provider.state.openedConnectionsCount = 0 + default: + XCTFail("Unexpected action: \(action)") + } + } + + func testReleaseInactiveConnectionHasWaitersNoConnections() throws { + let channel = EmbeddedChannel() + + let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) + + provider.state.pending = 0 + provider.state.openedConnectionsCount = 1 + provider.state.waiters.append(.init(promise: channel.eventLoop.makePromise(), preference: .indifferent)) + + XCTAssertEqual(0, provider.state.availableConnections.count) + XCTAssertEqual(1, provider.state.waiters.count) + XCTAssertEqual(0, provider.state.pending) + XCTAssertEqual(1, provider.state.openedConnectionsCount) + + let connection = Connection(channel: channel, provider: provider) + connection.isInUse = false + + let action = provider.state.release(connection: connection, inPool: false) + switch action { + case .create(let waiter): + XCTAssertEqual(0, provider.state.availableConnections.count) + XCTAssertEqual(0, provider.state.waiters.count) + XCTAssertEqual(0, provider.state.pending) + XCTAssertEqual(1, provider.state.openedConnectionsCount) + + // cleanup + waiter.promise.fail(TempError()) + provider.state.openedConnectionsCount = 0 + default: + XCTFail("Unexpected action: \(action)") + } + } + + func testReleaseInactiveConnectionHasWaitersHasConnections() throws { + let channel = EmbeddedChannel() + + let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) + + provider.state.pending = 0 + provider.state.openedConnectionsCount = 2 + provider.state.waiters.append(.init(promise: channel.eventLoop.makePromise(), preference: .indifferent)) + + let available = Connection(channel: channel, provider: provider) + available.isInUse = false + provider.state.availableConnections.append(available) + + XCTAssertEqual(1, provider.state.availableConnections.count) + XCTAssertEqual(1, provider.state.waiters.count) + XCTAssertEqual(0, provider.state.pending) + XCTAssertEqual(2, provider.state.openedConnectionsCount) + + let connection = Connection(channel: channel, provider: provider) + connection.isInUse = false + + let action = provider.state.release(connection: connection, inPool: false) + switch action { + case .lease(let connection, let waiter): + XCTAssertEqual(0, provider.state.availableConnections.count) + XCTAssertEqual(0, provider.state.waiters.count) + XCTAssertEqual(0, provider.state.pending) + XCTAssertEqual(1, provider.state.openedConnectionsCount) + + // cleanup + waiter.promise.succeed(connection) + connection.isInUse = false + provider.state.openedConnectionsCount = 0 + default: + XCTFail("Unexpected action: \(action)") + } + } + + // MARK: - Release on Specific EL Tests + + func testReleaseAliveConnectionSameELHasWaiterSpecificEL() throws { + let channel = ActiveChannel() + + let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) + + provider.state.pending = 0 + provider.state.openedConnectionsCount = 1 + provider.state.waiters.append(.init(promise: channel.eventLoop.makePromise(), preference: .delegateAndChannel(on: channel.eventLoop))) + + XCTAssertEqual(0, provider.state.availableConnections.count) + XCTAssertEqual(1, provider.state.waiters.count) + XCTAssertEqual(0, provider.state.pending) + XCTAssertEqual(1, provider.state.openedConnectionsCount) + + let connection = Connection(channel: channel, provider: provider) + connection.isInUse = false + + let action = provider.state.release(connection: connection, inPool: false) + switch action { + case .lease(let connection, let waiter): + XCTAssertTrue(connection.isInUse) + XCTAssertEqual(0, provider.state.availableConnections.count) + XCTAssertEqual(0, provider.state.waiters.count) + XCTAssertEqual(0, provider.state.pending) + XCTAssertEqual(1, provider.state.openedConnectionsCount) + + // cleanup + waiter.promise.succeed(connection) + connection.isInUse = false + provider.state.openedConnectionsCount = 0 + default: + XCTFail("Unexpected action: \(action)") + } + } + + func testReleaseAliveConnectionDifferentELNoSameELConnectionsHasWaiterSpecificEL() throws { + let channel = ActiveChannel() + let eventLoop = EmbeddedEventLoop() + + let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) + + provider.state.pending = 0 + provider.state.openedConnectionsCount = 1 + provider.state.waiters.append(.init(promise: channel.eventLoop.makePromise(), preference: .delegateAndChannel(on: eventLoop))) + + XCTAssertEqual(0, provider.state.availableConnections.count) + XCTAssertEqual(1, provider.state.waiters.count) + XCTAssertEqual(0, provider.state.pending) + XCTAssertEqual(1, provider.state.openedConnectionsCount) + + let connection = Connection(channel: channel, provider: provider) + connection.isInUse = false + + let action = provider.state.release(connection: connection, inPool: false) + switch action { + case .parkAnd(let connection, .create(let waiter)): + XCTAssertFalse(connection.isInUse) + XCTAssertEqual(1, provider.state.availableConnections.count) + XCTAssertEqual(0, provider.state.waiters.count) + XCTAssertEqual(0, provider.state.pending) + XCTAssertEqual(2, provider.state.openedConnectionsCount) + + // cleanup + waiter.promise.succeed(connection) + provider.state.openedConnectionsCount = 0 + default: + XCTFail("Unexpected action: \(action)") + } + } + + func testReleaseAliveConnectionDifferentELHasSameELConnectionsHasWaiterSpecificEL() throws { + let channel = ActiveChannel() + let otherChannel = EmbeddedChannel() + + let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) + + provider.state.pending = 0 + provider.state.openedConnectionsCount = 2 + provider.state.waiters.append(.init(promise: channel.eventLoop.makePromise(), preference: .delegateAndChannel(on: otherChannel.eventLoop))) + + let available = Connection(channel: otherChannel, provider: provider) + available.isInUse = false + provider.state.availableConnections.append(available) + + XCTAssertEqual(1, provider.state.availableConnections.count) + XCTAssertEqual(1, provider.state.waiters.count) + XCTAssertEqual(0, provider.state.pending) + XCTAssertEqual(2, provider.state.openedConnectionsCount) + + let connection = Connection(channel: channel, provider: provider) + connection.isInUse = false + + let action = provider.state.release(connection: connection, inPool: false) + switch action { + case .parkAnd(let connection, .lease(let replacement, let waiter)): + XCTAssertFalse(connection.isInUse) + XCTAssertTrue(replacement.isInUse) + XCTAssertEqual(1, provider.state.availableConnections.count) + XCTAssertEqual(0, provider.state.waiters.count) + XCTAssertEqual(0, provider.state.pending) + XCTAssertEqual(2, provider.state.openedConnectionsCount) + + // cleanup + waiter.promise.succeed(replacement) + replacement.isInUse = false + provider.state.openedConnectionsCount = 0 + default: + XCTFail("Unexpected action: \(action)") + } + } + + func testReleaseAliveConnectionDifferentELNoSameELConnectionsOnLimitHasWaiterSpecificEL() throws { + let channel = ActiveChannel() + let otherChannel = EmbeddedChannel() + + let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) + + provider.state.pending = 0 + provider.state.openedConnectionsCount = 8 + provider.state.waiters.append(.init(promise: channel.eventLoop.makePromise(), preference: .delegateAndChannel(on: otherChannel.eventLoop))) + + let available = Connection(channel: channel, provider: provider) + available.isInUse = false + provider.state.availableConnections.append(available) + + XCTAssertEqual(1, provider.state.availableConnections.count) + XCTAssertEqual(1, provider.state.waiters.count) + XCTAssertEqual(0, provider.state.pending) + XCTAssertEqual(8, provider.state.openedConnectionsCount) + + let connection = Connection(channel: channel, provider: provider) + connection.isInUse = false + + let action = provider.state.release(connection: connection, inPool: false) + switch action { + case .replace(let connection, let waiter): + XCTAssertFalse(connection.isInUse) + + XCTAssertEqual(1, provider.state.availableConnections.count) + XCTAssertEqual(0, provider.state.waiters.count) + XCTAssertEqual(0, provider.state.pending) + XCTAssertEqual(8, provider.state.openedConnectionsCount) + + // cleanup + waiter.promise.fail(TempError()) + provider.state.openedConnectionsCount = 0 + default: + XCTFail("Unexpected action: \(action)") + } + } + + func testReleaseInactiveConnectionHasWaitersHasSameELConnectionsSpecificEL() throws { + let channel = EmbeddedChannel() + let otherChannel = EmbeddedChannel() + + let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) + + provider.state.pending = 0 + provider.state.openedConnectionsCount = 2 + provider.state.waiters.append(.init(promise: channel.eventLoop.makePromise(), preference: .delegateAndChannel(on: otherChannel.eventLoop))) + + let available = Connection(channel: otherChannel, provider: provider) + available.isInUse = false + provider.state.availableConnections.append(available) + + XCTAssertEqual(1, provider.state.availableConnections.count) + XCTAssertEqual(1, provider.state.waiters.count) + XCTAssertEqual(0, provider.state.pending) + XCTAssertEqual(2, provider.state.openedConnectionsCount) + + let connection = Connection(channel: channel, provider: provider) + connection.isInUse = false + + let action = provider.state.release(connection: connection, inPool: false) + switch action { + case .lease(let connection, let waiter): + XCTAssertTrue(connection.isInUse) + XCTAssertTrue(connection === available) + XCTAssertEqual(0, provider.state.availableConnections.count) + XCTAssertEqual(0, provider.state.waiters.count) + XCTAssertEqual(0, provider.state.pending) + XCTAssertEqual(1, provider.state.openedConnectionsCount) + + // cleanup + waiter.promise.succeed(connection) + connection.isInUse = false + provider.state.openedConnectionsCount = 0 + default: + XCTFail("Unexpected action: \(action)") + } + } + + func testReleaseInactiveConnectionHasWaitersNoSameELConnectionsSpecificEL() throws { + let channel = EmbeddedChannel() + let otherChannel = EmbeddedChannel() + + let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) + + provider.state.pending = 0 + provider.state.openedConnectionsCount = 2 + provider.state.waiters.append(.init(promise: channel.eventLoop.makePromise(), preference: .delegateAndChannel(on: otherChannel.eventLoop))) + + let available = Connection(channel: channel, provider: provider) + available.isInUse = false + provider.state.availableConnections.append(available) + + XCTAssertEqual(1, provider.state.availableConnections.count) + XCTAssertEqual(1, provider.state.waiters.count) + XCTAssertEqual(0, provider.state.pending) + XCTAssertEqual(2, provider.state.openedConnectionsCount) + + let connection = Connection(channel: channel, provider: provider) + connection.isInUse = false + + let action = provider.state.release(connection: connection, inPool: false) + switch action { + case .create(let waiter): + XCTAssertEqual(1, provider.state.availableConnections.count) + XCTAssertEqual(0, provider.state.waiters.count) + XCTAssertEqual(0, provider.state.pending) + XCTAssertEqual(2, provider.state.openedConnectionsCount) + + // cleanup + waiter.promise.fail(TempError()) + provider.state.openedConnectionsCount = 0 + default: + XCTFail("Unexpected action: \(action)") + } + } + + // MARK: - Next Waiter Tests + + func testNextWaiterEmptyQueue() throws { + let channel = ActiveChannel() + + let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) + + provider.state.pending = 0 + provider.state.openedConnectionsCount = 1 + + XCTAssertEqual(0, provider.state.availableConnections.count) + XCTAssertEqual(0, provider.state.waiters.count) + XCTAssertEqual(0, provider.state.pending) + XCTAssertEqual(1, provider.state.openedConnectionsCount) + + let action = provider.state.processNextWaiter() + switch action { + case .deleteProvider: + XCTAssertEqual(0, provider.state.availableConnections.count) + XCTAssertEqual(0, provider.state.waiters.count) + XCTAssertEqual(0, provider.state.pending) + XCTAssertEqual(0, provider.state.openedConnectionsCount) + default: + XCTFail("Unexpected action: \(action)") + } + } + + func testNextWaiterEmptyQueueHasConnections() throws { + let channel = ActiveChannel() + + let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) + + provider.state.pending = 0 + provider.state.openedConnectionsCount = 2 + + let available = Connection(channel: channel, provider: provider) + available.isInUse = false + provider.state.availableConnections.append(available) + + XCTAssertEqual(1, provider.state.availableConnections.count) + XCTAssertEqual(0, provider.state.waiters.count) + XCTAssertEqual(0, provider.state.pending) + XCTAssertEqual(2, provider.state.openedConnectionsCount) + + let action = provider.state.processNextWaiter() + switch action { + case .none: + XCTAssertEqual(1, provider.state.availableConnections.count) + XCTAssertEqual(0, provider.state.waiters.count) + XCTAssertEqual(0, provider.state.pending) + XCTAssertEqual(1, provider.state.openedConnectionsCount) + default: + XCTFail("Unexpected action: \(action)") + } + } + + func testNextWaiterHasWaitersHasConnections() throws { + let channel = EmbeddedChannel() + + let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) + + provider.state.pending = 0 + provider.state.openedConnectionsCount = 2 + provider.state.waiters.append(.init(promise: channel.eventLoop.makePromise(), preference: .indifferent)) + + let available = Connection(channel: channel, provider: provider) + available.isInUse = false + provider.state.availableConnections.append(available) + + XCTAssertEqual(1, provider.state.availableConnections.count) + XCTAssertEqual(1, provider.state.waiters.count) + XCTAssertEqual(0, provider.state.pending) + XCTAssertEqual(2, provider.state.openedConnectionsCount) + + let action = provider.state.processNextWaiter() + switch action { + case .lease(let connection, let waiter): + XCTAssertEqual(0, provider.state.availableConnections.count) + XCTAssertEqual(0, provider.state.waiters.count) + XCTAssertEqual(0, provider.state.pending) + XCTAssertEqual(1, provider.state.openedConnectionsCount) + + // cleanup + waiter.promise.succeed(connection) + connection.isInUse = false + provider.state.openedConnectionsCount = 0 + default: + XCTFail("Unexpected action: \(action)") + } + } + + func testNextWaiterHasWaitersHasSameELConnectionsSpecificEL() throws { + let channel = EmbeddedChannel() + + let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) + + provider.state.pending = 0 + provider.state.openedConnectionsCount = 2 + provider.state.waiters.append(.init(promise: channel.eventLoop.makePromise(), preference: .delegateAndChannel(on: channel.eventLoop))) + + let available = Connection(channel: channel, provider: provider) + available.isInUse = false + provider.state.availableConnections.append(available) + + XCTAssertEqual(1, provider.state.availableConnections.count) + XCTAssertEqual(1, provider.state.waiters.count) + XCTAssertEqual(0, provider.state.pending) + XCTAssertEqual(2, provider.state.openedConnectionsCount) + + let action = provider.state.processNextWaiter() + switch action { + case .lease(let connection, let waiter): + XCTAssertEqual(0, provider.state.availableConnections.count) + XCTAssertEqual(0, provider.state.waiters.count) + XCTAssertEqual(0, provider.state.pending) + XCTAssertEqual(1, provider.state.openedConnectionsCount) + + // cleanup + waiter.promise.succeed(connection) + connection.isInUse = false + provider.state.openedConnectionsCount = 0 + default: + XCTFail("Unexpected action: \(action)") + } + } + + func testNextWaiterHasWaitersHasDifferentELConnectionsSpecificEL() throws { + let channel = EmbeddedChannel() + let eventLoop = EmbeddedEventLoop() + + let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) + + provider.state.pending = 0 + provider.state.openedConnectionsCount = 2 + provider.state.waiters.append(.init(promise: channel.eventLoop.makePromise(), preference: .delegateAndChannel(on: eventLoop))) + + let available = Connection(channel: channel, provider: provider) + available.isInUse = false + provider.state.availableConnections.append(available) + + XCTAssertEqual(1, provider.state.availableConnections.count) + XCTAssertEqual(1, provider.state.waiters.count) + XCTAssertEqual(0, provider.state.pending) + XCTAssertEqual(2, provider.state.openedConnectionsCount) + + let action = provider.state.processNextWaiter() + switch action { + case .create(let waiter): + XCTAssertEqual(1, provider.state.availableConnections.count) + XCTAssertEqual(0, provider.state.waiters.count) + XCTAssertEqual(0, provider.state.pending) + XCTAssertEqual(2, provider.state.openedConnectionsCount) + + // cleanup + waiter.promise.fail(TempError()) + provider.state.openedConnectionsCount = 0 + default: + XCTFail("Unexpected action: \(action)") + } + } + + // MARK: - Connection Tests + + func testConnectionReleaseActive() throws { + let channel = ActiveChannel() + + let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) + + provider.state.openedConnectionsCount = 1 + + XCTAssertEqual(0, provider.state.availableConnections.count) + XCTAssertEqual(0, provider.state.waiters.count) + XCTAssertEqual(1, provider.state.pending) + XCTAssertEqual(1, provider.state.openedConnectionsCount) + + let connection = Connection(channel: channel, provider: provider) + connection.isInUse = true + connection.isClosing = false + + connection.release() + + XCTAssertFalse(connection.isInUse) + XCTAssertEqual(1, provider.state.availableConnections.count) + XCTAssertEqual(0, provider.state.waiters.count) + XCTAssertEqual(1, provider.state.pending) + XCTAssertEqual(1, provider.state.openedConnectionsCount) + + // cleanup + provider.state.pending = 0 + } + + func testConnectionReleaseInactive() throws { + let channel = EmbeddedChannel() + + let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) + + provider.state.openedConnectionsCount = 1 + + XCTAssertEqual(0, provider.state.availableConnections.count) + XCTAssertEqual(0, provider.state.waiters.count) + XCTAssertEqual(1, provider.state.pending) + XCTAssertEqual(1, provider.state.openedConnectionsCount) + + let connection = Connection(channel: channel, provider: provider) + connection.isInUse = true + connection.isClosing = true + + connection.release() + + XCTAssertFalse(connection.isInUse) + XCTAssertEqual(0, provider.state.availableConnections.count) + XCTAssertEqual(0, provider.state.waiters.count) + XCTAssertEqual(1, provider.state.pending) + XCTAssertEqual(0, provider.state.openedConnectionsCount) + + // cleanup + provider.state.pending = 0 + } + + func testConnectionRemoteCloseRelease() throws { + let channel = EmbeddedChannel() + + let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) + + let connection = Connection(channel: channel, provider: provider) + connection.isInUse = false + provider.state.availableConnections.append(connection) + provider.state.openedConnectionsCount = 1 + + XCTAssertEqual(1, provider.state.availableConnections.count) + XCTAssertEqual(0, provider.state.waiters.count) + XCTAssertEqual(1, provider.state.pending) + XCTAssertEqual(1, provider.state.openedConnectionsCount) + + connection.remoteClosed() + + XCTAssertEqual(0, provider.state.availableConnections.count) + XCTAssertEqual(0, provider.state.waiters.count) + XCTAssertEqual(1, provider.state.pending) + XCTAssertEqual(0, provider.state.openedConnectionsCount) + + // cleanup + provider.state.pending = 0 + } + + func testConnectionTimeoutRelease() throws { + let channel = EmbeddedChannel() + + let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) + + let connection = Connection(channel: channel, provider: provider) + connection.isInUse = false + provider.state.availableConnections.append(connection) + provider.state.openedConnectionsCount = 1 + + XCTAssertEqual(1, provider.state.availableConnections.count) + XCTAssertEqual(0, provider.state.waiters.count) + XCTAssertEqual(1, provider.state.pending) + XCTAssertEqual(1, provider.state.openedConnectionsCount) + + connection.timeout() + + XCTAssertEqual(0, provider.state.availableConnections.count) + XCTAssertEqual(0, provider.state.waiters.count) + XCTAssertEqual(1, provider.state.pending) + XCTAssertEqual(0, provider.state.openedConnectionsCount) + + // cleanup + provider.state.pending = 0 + } + + func testAcquireAvailableBecomesUnavailable() throws { + let eventLoop = EmbeddedEventLoop() + let channel = ActiveChannel() + + let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: eventLoop, configuration: .init(), pool: .init(configuration: .init())) + + let connection = Connection(channel: channel, provider: provider) + connection.isInUse = false + provider.state.availableConnections.append(connection) + provider.state.openedConnectionsCount = 1 + + XCTAssertEqual(1, provider.state.availableConnections.count) + XCTAssertEqual(0, provider.state.waiters.count) + XCTAssertEqual(1, provider.state.pending) + XCTAssertEqual(1, provider.state.openedConnectionsCount) + + let action = provider.state.acquire(waiter: .init(promise: eventLoop.makePromise(), preference: .indifferent)) + switch action { + case .lease(let connection, let waiter): + // Since this connection is already in use, this should be a no-op and state should not have changed from normal lease + connection.timeout() + + XCTAssertTrue(connection.isActiveEstimation) + XCTAssertTrue(connection.isInUse) + XCTAssertEqual(0, provider.state.availableConnections.count) + XCTAssertEqual(0, provider.state.waiters.count) + XCTAssertEqual(0, provider.state.pending) + XCTAssertEqual(1, provider.state.openedConnectionsCount) + + // This is unrecoverable, but in this case we create a new connection, so state again should not change, even though release will be called + // This is important to prevent provider deletion since connection is released and there could be 0 waiters + connection.remoteClosed() + + XCTAssertTrue(connection.isClosing) + XCTAssertFalse(connection.isActiveEstimation) + XCTAssertEqual(0, provider.state.availableConnections.count) + XCTAssertEqual(0, provider.state.waiters.count) + XCTAssertEqual(0, provider.state.pending) + XCTAssertEqual(1, provider.state.openedConnectionsCount) + + waiter.promise.succeed(connection) + + // cleanup + connection.isInUse = false + provider.state.openedConnectionsCount = 0 + default: + XCTFail("Unexpected action: \(action)") + } + } +} + +class ActiveChannel: Channel { + var allocator: ByteBufferAllocator + var closeFuture: EventLoopFuture + var eventLoop: EventLoop + + var localAddress: SocketAddress? = nil + var remoteAddress: SocketAddress? = nil + var parent: Channel? = nil + var isWritable: Bool = true + var isActive: Bool = true + + init() { + self.allocator = ByteBufferAllocator() + self.eventLoop = EmbeddedEventLoop() + self.closeFuture = self.eventLoop.makeSucceededFuture(()) + } + + var _channelCore: ChannelCore { + preconditionFailure("Not implemented") + } + + var pipeline: ChannelPipeline { + return ChannelPipeline(channel: self) + } + + func setOption