diff --git a/Sources/AsyncHTTPClient/ConnectionPool.swift b/Sources/AsyncHTTPClient/ConnectionPool.swift index aae2f9748..4bd689804 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool.swift @@ -16,6 +16,7 @@ import Foundation import NIO import NIOConcurrencyHelpers import NIOHTTP1 +import NIOHTTPCompression import NIOTLS import NIOTransportServices @@ -28,14 +29,14 @@ final class ConnectionPool { /// The main data structure used by the `ConnectionPool` to retreive and create connections associated /// to a given `Key` . - /// - Warning: This property should be accessed with proper synchronization, see `connectionProvidersLock` - private var connectionProviders: [Key: HTTP1ConnectionProvider] = [:] - - /// The lock used by the connection pool used to ensure correct synchronization of accesses to `_connectionProviders` /// + /// - Warning: This property should be accessed with proper synchronization, see `lock` + private var providers: [Key: HTTP1ConnectionProvider] = [:] + + /// The lock used by the connection pool used to ensure correct synchronization of accesses to `providers` /// - /// - Warning: This lock should always be acquired *before* `HTTP1ConnectionProvider`s `stateLock` if used in combination with it. - private let connectionProvidersLock = Lock() + /// - Warning: This lock should always be acquired *before* `HTTP1ConnectionProvider`s `lock` if used in combination with it. + private let lock = Lock() init(configuration: HTTPClient.Configuration) { self.configuration = configuration @@ -48,8 +49,8 @@ final class ConnectionPool { /// Having a default `EventLoop` shared by the *channel* and the *delegate* avoids /// loss of performance due to `EventLoop` hopping func associatedEventLoop(for key: Key) -> EventLoop? { - return self.connectionProvidersLock.withLock { - self.connectionProviders[key]?.eventLoop + return self.lock.withLock { + self.providers[key]?.eventLoop } } @@ -61,69 +62,47 @@ final class ConnectionPool { /// - Returns: A connection corresponding to the specified parameters /// /// When the pool is asked for a new connection, it creates a `Key` from the url associated to the `request`. This key - /// is used to determine if there already exists an associated `HTTP1ConnectionProvider` in `connectionProviders`. + /// is used to determine if there already exists an associated `HTTP1ConnectionProvider` in `providers`. /// If there is, the connection provider then takes care of leasing a new connection. If a connection provider doesn't exist, it is created. - func getConnection(for request: HTTPClient.Request, preference: HTTPClient.EventLoopPreference, on eventLoop: EventLoop, deadline: NIODeadline?) -> EventLoopFuture { + func getConnection(for request: HTTPClient.Request, preference: HTTPClient.EventLoopPreference, on eventLoop: EventLoop, deadline: NIODeadline?, setupComplete: EventLoopFuture) -> EventLoopFuture { let key = Key(request) - let provider: HTTP1ConnectionProvider = self.connectionProvidersLock.withLock { - if let existing = self.connectionProviders[key] { - existing.stateLock.withLock { - existing.state.pending += 1 - } + let provider: HTTP1ConnectionProvider = self.lock.withLock { + if let existing = self.providers[key], existing.enqueue() { return existing } else { - let http1Provider = HTTP1ConnectionProvider(key: key, eventLoop: eventLoop, configuration: self.configuration, parentPool: self) - self.connectionProviders[key] = http1Provider - http1Provider.stateLock.withLock { - http1Provider.state.pending += 1 - } - return http1Provider + // Connection provider will be created with `pending = 1` + let provider = HTTP1ConnectionProvider(key: key, eventLoop: eventLoop, configuration: self.configuration, pool: self) + self.providers[key] = provider + return provider } } - return provider.getConnection(preference: preference) + return provider.getConnection(preference: preference, setupComplete: setupComplete) } - func release(_ connection: Connection) { - let connectionProvider = self.connectionProvidersLock.withLock { - self.connectionProviders[connection.key] - } - if let connectionProvider = connectionProvider { - connectionProvider.release(connection: connection) + func delete(_ provider: HTTP1ConnectionProvider) { + self.lock.withLockVoid { + self.providers[provider.key] = nil } } - func prepareForClose(on eventLoop: EventLoop) -> EventLoopFuture { - let connectionProviders = self.connectionProvidersLock.withLock { - self.connectionProviders.values + func close(on eventLoop: EventLoop) -> EventLoopFuture { + let providers = self.lock.withLock { + self.providers.values } - return EventLoopFuture.andAllComplete(connectionProviders.map { $0.prepareForClose() }, on: eventLoop) + return EventLoopFuture.reduce(true, providers.map { $0.close() }, on: eventLoop) { $0 && $1 } } - func close(on eventLoop: EventLoop) -> EventLoopFuture { - let connectionProviders = self.connectionProvidersLock.withLock { - self.connectionProviders.values - } - - return EventLoopFuture.andAllComplete(connectionProviders.map { $0.close() }, on: eventLoop).map { - self.connectionProvidersLock.withLock { - assert(self.connectionProviders.count == 0, "left-overs: \(self.connectionProviders)") - } - } - } - - var connectionProviderCount: Int { - return self.connectionProvidersLock.withLock { - self.connectionProviders.count - } + var count: Int { + return self.providers.count } /// Used by the `ConnectionPool` to index its `HTTP1ConnectionProvider`s /// /// A key is initialized from a `URL`, it uses the components to derive a hashed value - /// used by the `connectionProviders` dictionary to allow retrieving and creating + /// used by the `providers` dictionary to allow retrieving and creating /// connection providers associated to a certain request in constant time. struct Key: Hashable { init(_ request: HTTPClient.Request) { @@ -153,567 +132,414 @@ final class ConnectionPool { case unix } } +} - /// A `Connection` represents a `Channel` in the context of the connection pool +/// A `Connection` represents a `Channel` in the context of the connection pool +/// +/// In the `ConnectionPool`, each `Channel` belongs to a given `HTTP1ConnectionProvider` +/// and has a certain "lease state" (see the `inUse` property). +/// The role of `Connection` is to model this by storing a `Channel` alongside its associated properties +/// so that they can be passed around together and correct provider can be identified when connection is released. +class Connection { + /// The provider this `Connection` belongs to. /// - /// In the `ConnectionPool`, each `Channel` belongs to a given `HTTP1ConnectionProvider` - /// and has a certain "lease state" (see the `isLeased` property). - /// The role of `Connection` is to model this by storing a `Channel` alongside its associated properties - /// so that they can be passed around together. + /// This enables calling methods like `release()` directly on a `Connection` instead of + /// calling `provider.release(connection)`. This gives a more object oriented feel to the API + /// and can avoid having to keep explicit references to the pool at call site. + let provider: HTTP1ConnectionProvider + + /// The `Channel` of this `Connection` /// - /// - Warning: `Connection` properties are not thread-safe and should be used with proper synchronization - class Connection: CustomStringConvertible { - init(key: Key, channel: Channel, parentPool: ConnectionPool) { - self.key = key - self.channel = channel - self.parentPool = parentPool - self.closePromise = channel.eventLoop.makePromise(of: Void.self) - self.closeFuture = self.closePromise.futureResult - } + /// - Warning: Requests that lease connections from the `ConnectionPool` are responsible + /// for removing the specific handlers they added to the `Channel` pipeline before releasing it to the pool. + let channel: Channel - /// Release this `Connection` to its associated `HTTP1ConnectionProvider` in the parent `ConnectionPool` - /// - /// This is exactly equivalent to calling `.release(theProvider)` on `ConnectionPool` - /// - /// - Warning: This only releases the connection and doesn't take care of cleaning handlers in the - /// `Channel` pipeline. - func release() { - self.parentPool.release(self) - } + init(channel: Channel, provider: HTTP1ConnectionProvider) { + self.channel = channel + self.provider = provider + } - func close() -> EventLoopFuture { - self.channel.close(promise: nil) - return self.closeFuture - } + /// Convenience property indicating wether the underlying `Channel` is active or not. + var isActiveEstimation: Bool { + return self.channel.isActive + } - var description: String { - return "Connection { channel: \(self.channel) }" - } + /// Release this `Connection` to its associated `HTTP1ConnectionProvider`. + /// + /// - Warning: This only releases the connection and doesn't take care of cleaning handlers in the `Channel` pipeline. + func release(closing: Bool) { + assert(self.channel.eventLoop.inEventLoop) + self.provider.release(connection: self, closing: closing) + } + + /// Called when channel exceeds idle time in pool. + func timeout() { + assert(self.channel.eventLoop.inEventLoop) + self.provider.timeout(connection: self) + } + + /// Called when channel goes inactive while in the pool. + func remoteClosed() { + assert(self.channel.eventLoop.inEventLoop) + self.provider.remoteClosed(connection: self) + } - /// The connection pool this `Connection` belongs to. - /// - /// This enables calling methods like `release()` directly on a `Connection` instead of - /// calling `pool.release(connection)`. This gives a more object oriented feel to the API - /// and can avoid having to keep explicit references to the pool at call site. - let parentPool: ConnectionPool - - /// The `Key` of the `HTTP1ConnectionProvider` this `Connection` belongs to - /// - /// This lets `ConnectionPool` know the relationship between `Connection`s and `HTTP1ConnectionProvider`s - fileprivate let key: Key - - /// The `Channel` of this `Connection` - /// - /// - Warning: Requests that lease connections from the `ConnectionPool` are responsible - /// for removing the specific handlers they added to the `Channel` pipeline before releasing it to the pool. - let channel: Channel - - /// Wether the connection is currently leased or not - var isLeased: Bool = false - - /// Indicates that this connection is about to close - var isClosing: Bool = false - - /// Indicates wether the usual close callback should be run or not, this allows customizing what happens - /// on close in some cases such as for the `.replaceConnection` action - /// - /// - Warning: This should be accessed under the `stateLock` of `HTTP1ConnectionProvider` - fileprivate var mustRunDefaultCloseCallback: Bool = true - - /// Convenience property indicating wether the underlying `Channel` is active or not - var isActiveEstimation: Bool { - return self.channel.isActive + func cancel() -> EventLoopFuture { + return self.channel.triggerUserOutboundEvent(TaskCancelEvent()) + } + + /// Called from `HTTP1ConnectionProvider.close` when client is shutting down. + func close() -> EventLoopFuture { + return self.channel.close() + } + + /// Sets idle timeout handler and channel inactivity listener. + func setIdleTimeout(timeout: TimeAmount?) { + _ = self.channel.pipeline.addHandler(IdleStateHandler(writeTimeout: timeout), position: .first).flatMap { _ in + self.channel.pipeline.addHandler(IdlePoolConnectionHandler(connection: self)) } + } - fileprivate var closePromise: EventLoopPromise - - var closeFuture: EventLoopFuture - - func removeIdleConnectionHandlersForLease() -> EventLoopFuture { - return self.channel.eventLoop.flatSubmit { - self.removeHandler(IdleStateHandler.self).flatMap { () -> EventLoopFuture in - self.channel.pipeline.handler(type: IdlePoolConnectionHandler.self).flatMap { idleHandler in - self.channel.pipeline.removeHandler(idleHandler).flatMapError { _ in - self.channel.eventLoop.makeSucceededFuture(()) - }.map { - idleHandler.hasNotSentClose && self.channel.isActive - } - }.flatMapError { error in - // These handlers are only added on connection release, they are not added - // when a connection is made to be instantly leased, so we ignore this error - if let channelError = error as? ChannelPipelineError, channelError == .notFound { - return self.channel.eventLoop.makeSucceededFuture(self.channel.isActive) - } else { - return self.channel.eventLoop.makeFailedFuture(error) - } - } - }.flatMap { channelIsUsable in - if channelIsUsable { - return self.channel.eventLoop.makeSucceededFuture(self) - } else { - return self.channel.eventLoop.makeFailedFuture(InactiveChannelError()) - } - } - } + /// Removes idle timeout handler and channel inactivity listener + func cancelIdleTimeout() -> EventLoopFuture { + return self.removeHandler(IdleStateHandler.self).flatMap { _ in + self.removeHandler(IdlePoolConnectionHandler.self) } + } +} + +struct ConnectionKey: Hashable { + let connection: Connection - struct InactiveChannelError: Error {} + init(_ connection: Connection) { + self.connection = connection } - /// A connection provider of `HTTP/1.1` connections with a given `Key` (host, scheme, port) + static func == (lhs: ConnectionKey, rhs: ConnectionKey) -> Bool { + return ObjectIdentifier(lhs.connection) == ObjectIdentifier(rhs.connection) + } + + func hash(into hasher: inout Hasher) { + hasher.combine(ObjectIdentifier(self.connection)) + } + + func cancel() -> EventLoopFuture { + return self.connection.cancel() + } +} + +/// A connection provider of `HTTP/1.1` connections with a given `Key` (host, scheme, port) +/// +/// On top of enabling connection reuse this provider it also facilitates the creation +/// of concurrent requests as it has built-in politeness regarding the maximum number +/// of concurrent requests to the server. +class HTTP1ConnectionProvider { + struct ProviderClosedError: Error {} + + /// The client configuration used to bootstrap new requests + private let configuration: HTTPClient.Configuration + + /// The pool this provider belongs to + private let pool: ConnectionPool + + /// The key associated with this provider + let key: ConnectionPool.Key + + /// The default `EventLoop` for this provider /// - /// On top of enabling connection reuse this provider it also facilitates the creation - /// of concurrent requests as it has built-in politeness regarding the maximum number - /// of concurrent requests to the server. - class HTTP1ConnectionProvider: CustomStringConvertible { - /// The default `EventLoop` for this provider - /// - /// The default event loop is used to create futures and is used - /// when creating `Channel`s for requests for which the - /// `EventLoopPreference` is set to `.indifferent` - let eventLoop: EventLoop - - /// The client configuration used to bootstrap new requests - private let configuration: HTTPClient.Configuration - - /// The key associated with this provider - private let key: ConnectionPool.Key - - /// The `State` of this provider - /// - /// This property holds data structures representing the current state of the provider - /// - Warning: This type isn't thread safe and should be accessed with proper - /// synchronization (see the `stateLock` property) - fileprivate var state: State - - /// The lock used to access and modify the `state` property - /// - /// - Warning: This lock should always be acquired *after* `ConnectionPool`s `connectionProvidersLock` if used in combination with it. - fileprivate let stateLock = Lock() - - /// The maximum number of concurrent connections to a given (host, scheme, port) - private let maximumConcurrentConnections: Int = 8 - - /// The pool this provider belongs to - private let parentPool: ConnectionPool - - /// Creates a new `HTTP1ConnectionProvider` - /// - /// - parameters: - /// - key: The `Key` (host, scheme, port) this provider is associated to - /// - configuration: The client configuration used globally by all requests - /// - initialConnection: The initial connection the pool initializes this provider with - /// - parentPool: The pool this provider belongs to - init(key: ConnectionPool.Key, eventLoop: EventLoop, configuration: HTTPClient.Configuration, parentPool: ConnectionPool) { - self.eventLoop = eventLoop - self.configuration = configuration - self.key = key - self.parentPool = parentPool - self.state = State(eventLoop: eventLoop, parentPool: parentPool, key: key) - } + /// The default event loop is used to create futures and is used when creating `Channel`s for requests + /// for which the `EventLoopPreference` is set to `.indifferent` + let eventLoop: EventLoop - deinit { - assert(self.state.activity == .closed, "Non closed on deinit") - assert(self.state.availableConnections.isEmpty, "Available connections should be empty before deinit") - assert(self.state.leased == 0, "All leased connections should have been returned before deinit") - assert(self.state.waiters.count == 0, "Waiters on deinit: \(self.state.waiters)") - } + /// The lock used to access and modify the provider state - `availableConnections`, `waiters` and `openedConnectionsCount`. + /// + /// - Warning: This lock should always be acquired *after* `ConnectionPool`s `lock` if used in combination with it. + private let lock = Lock() - var description: String { - return "HTTP1ConnectionProvider { key: \(self.key), state: \(self.state) }" - } + var closePromise: EventLoopPromise - func getConnection(preference: HTTPClient.EventLoopPreference) -> EventLoopFuture { - self.activityPrecondition(expected: [.opened]) - let action = self.stateLock.withLock { self.state.connectionAction(for: preference) } - switch action { - case .leaseConnection(let connection): - return connection.removeIdleConnectionHandlersForLease().flatMapError { _ in - connection.closeFuture.flatMap { // We ensure close actions are run first - let defaultEventLoop = self.stateLock.withLock { - self.state.defaultEventLoop - } - return self.makeConnection(on: preference.bestEventLoop ?? defaultEventLoop) - } - } - case .makeConnection(let eventLoop): - return self.makeConnection(on: eventLoop) - case .leaseFutureConnection(let futureConnection): - return futureConnection - } - } + var state: ConnectionsState - func release(connection: Connection) { - self.activityPrecondition(expected: [.opened, .closing]) - let action = self.parentPool.connectionProvidersLock.withLock { - self.stateLock.withLock { self.state.releaseAction(for: connection) } - } - switch action { - case .succeed(let promise): - promise.succeed(connection) - - case .makeConnectionAndComplete(let eventLoop, let promise): - self.makeConnection(on: eventLoop).cascade(to: promise) - - case .replaceConnection(let eventLoop, let promise): - connection.close().flatMap { - self.makeConnection(on: eventLoop) - }.whenComplete { result in - switch result { - case .success(let connection): - promise.succeed(connection) - case .failure(let error): - promise.fail(error) - } - } + /// Creates a new `HTTP1ConnectionProvider` + /// + /// - parameters: + /// - key: The `Key` (host, scheme, port) this provider is associated to + /// - configuration: The client configuration used globally by all requests + /// - initialConnection: The initial connection the pool initializes this provider with + /// - pool: The pool this provider belongs to + init(key: ConnectionPool.Key, eventLoop: EventLoop, configuration: HTTPClient.Configuration, pool: ConnectionPool) { + self.eventLoop = eventLoop + self.configuration = configuration + self.key = key + self.pool = pool + self.closePromise = eventLoop.makePromise() + self.state = .init(eventLoop: eventLoop) + } - case .none: - break - } - } + deinit { + self.state.assertInvariants() + } - private func makeConnection(on eventLoop: EventLoop) -> EventLoopFuture { - self.activityPrecondition(expected: [.opened]) - let address = HTTPClient.resolveAddress(host: self.key.host, port: self.key.port, proxy: self.configuration.proxy) - let requiresTLS = self.key.scheme == .https - let bootstrap: NIOClientTCPBootstrap - do { - bootstrap = try NIOClientTCPBootstrap.makeHTTPClientBootstrapBase(on: eventLoop, host: self.key.host, port: self.key.port, requiresTLS: requiresTLS, configuration: self.configuration) - } catch { - return eventLoop.makeFailedFuture(error) + private func execute(_ action: Action) { + switch action { + case .lease(let connection, let waiter): + // if connection is became inactive, we create a new one. + connection.cancelIdleTimeout().whenComplete { _ in + if connection.isActiveEstimation { + waiter.promise.succeed(connection) + } else { + self.makeChannel(preference: waiter.preference).whenComplete { result in + self.connect(result, waiter: waiter, replacing: connection) + } + } } - let handshakePromise = eventLoop.makePromise(of: Void.self) - - let channel: EventLoopFuture - switch self.key.scheme { - case .http, .https: - channel = bootstrap.connect(host: address.host, port: address.port) - case .unix: - channel = bootstrap.connect(unixDomainSocketPath: self.key.unixPath) + case .create(let waiter): + self.makeChannel(preference: waiter.preference).whenComplete { result in + self.connect(result, waiter: waiter) } - - return channel.flatMap { channel -> EventLoopFuture in - let requiresSSLHandler = self.configuration.proxy != nil && self.key.scheme == .https - channel.pipeline.addSSLHandlerIfNeeded(for: self.key, tlsConfiguration: self.configuration.tlsConfiguration, addSSLClient: requiresSSLHandler, handshakePromise: handshakePromise) - return handshakePromise.futureResult.flatMap { - channel.pipeline.addHTTPClientHandlers(leftOverBytesStrategy: .forwardBytes) - }.flatMap { - #if canImport(Network) - if #available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *), bootstrap.underlyingBootstrap is NIOTSConnectionBootstrap { - return channel.pipeline.addHandler(HTTPClient.NWErrorHandler(), position: .first) - } - #endif - return eventLoop.makeSucceededFuture(()) - }.map { - let connection = Connection(key: self.key, channel: channel, parentPool: self.parentPool) - connection.isLeased = true - return connection - } - }.map { connection in - self.configureCloseCallback(of: connection) - return connection - }.flatMapError { error in - var error = error - #if canImport(Network) - if #available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *), bootstrap.underlyingBootstrap is NIOTSConnectionBootstrap { - error = HTTPClient.NWErrorHandler.translateError(error) - } - #endif - // This promise may not have been completed if we reach this - // so we fail it to avoid any leak - handshakePromise.fail(error) - let action = self.parentPool.connectionProvidersLock.withLock { - self.stateLock.withLock { - self.state.failedConnectionAction() - } + case .replace(let connection, let waiter): + connection.cancelIdleTimeout().flatMap { + connection.close() + }.whenComplete { _ in + self.makeChannel(preference: waiter.preference).whenComplete { result in + self.connect(result, waiter: waiter, replacing: connection) } - switch action { - case .makeConnectionAndComplete(let el, let promise): - self.makeConnection(on: el).cascade(to: promise) - case .none: - break + } + case .park(let connection): + connection.setIdleTimeout(timeout: self.configuration.maximumAllowedIdleTimeInConnectionPool) + case .closeProvider: + self.closeAndDelete() + case .none: + break + case .parkAnd(let connection, let action): + connection.setIdleTimeout(timeout: self.configuration.maximumAllowedIdleTimeInConnectionPool) + self.execute(action) + case .closeAnd(let connection, let action): + connection.channel.close(promise: nil) + self.execute(action) + case .cancel(let connection, let close): + connection.cancel().whenComplete { _ in + if close { + self.closeAndDelete() } - return self.eventLoop.makeFailedFuture(error) } + case .fail(let waiter, let error): + waiter.promise.fail(error) } + } - /// Adds a callback on connection close that asks the `state` what to do about this - /// - /// The callback informs the state about the event, and the state returns a - /// `ClosedConnectionRemoveAction` which instructs it about what it should do. - private func configureCloseCallback(of connection: Connection) { - connection.channel.closeFuture.whenComplete { result in - let action: HTTP1ConnectionProvider.State.ClosedConnectionRemoveAction? = self.parentPool.connectionProvidersLock.withLock { - self.stateLock.withLock { - guard connection.mustRunDefaultCloseCallback else { - return nil - } - switch result { - case .success: - return self.state.removeClosedConnection(connection) - - case .failure(let error): - preconditionFailure("Connection close future failed with error: \(error)") - } - } - } - - if let action = action { - switch action { - case .makeConnectionAndComplete(let el, let promise): - self.makeConnection(on: el).cascade(to: promise) - case .none: - break - } - } - - connection.closePromise.succeed(()) - } + /// This function is needed to ensure that there is no race between getting a provider from map, and shutting it down when there are no requests processed by it. + func enqueue() -> Bool { + return self.lock.withLock { + self.state.enqueue() } + } - /// Removes and fails all `waiters`, remove existing `availableConnections` and sets `state.activity` to `.closing` - func prepareForClose() -> EventLoopFuture { - let (waitersFutures, closeFutures) = self.stateLock.withLock { () -> ([EventLoopFuture], [EventLoopFuture]) in - // Fail waiters - let waitersCopy = self.state.waiters - self.state.waiters.removeAll() - let waitersPromises = waitersCopy.map { $0.promise } - let waitersFutures = waitersPromises.map { $0.futureResult } - waitersPromises.forEach { $0.fail(HTTPClientError.cancelled) } - let closeFutures = self.state.availableConnections.map { $0.close() } - return (waitersFutures, closeFutures) - } + func getConnection(preference: HTTPClient.EventLoopPreference, setupComplete: EventLoopFuture) -> EventLoopFuture { + let waiter = Waiter(promise: self.eventLoop.makePromise(), setupComplete: setupComplete, preference: preference) - return EventLoopFuture.andAllComplete(waitersFutures, on: self.eventLoop) - .flatMap { - EventLoopFuture.andAllComplete(closeFutures, on: self.eventLoop) - } - .map { _ in - self.stateLock.withLock { - if self.state.leased == 0, self.state.availableConnections.isEmpty { - self.state.activity = .closed - } else { - self.state.activity = .closing - } - } - } + let action: Action = self.lock.withLock { + self.state.acquire(waiter: waiter) } - func close() -> EventLoopFuture { - let availableConnections = self.stateLock.withLock { () -> CircularBuffer in - assert(self.state.activity == .closing) - return self.state.availableConnections - } + self.execute(action) - return EventLoopFuture.andAllComplete(availableConnections.map { $0.close() }, on: self.eventLoop) - } + return waiter.promise.futureResult + } - private func activityPrecondition(expected: Set) { - self.stateLock.withLock { - precondition(expected.contains(self.state.activity), "Attempting to use HTTP1ConnectionProvider with unexpected state: \(self.state.activity) (expected: \(expected))") + func connect(_ result: Result, waiter: Waiter, replacing closedConnection: Connection? = nil) { + let action: Action + switch result { + case .success(let channel): + let connection = Connection(channel: channel, provider: self) + action = self.lock.withLock { + if let closedConnection = closedConnection { + self.state.drop(connection: closedConnection) + } + return self.state.offer(connection: connection) } + waiter.promise.succeed(connection) + case .failure(let error): + action = self.lock.withLock { + self.state.connectFailed() + } + waiter.promise.fail(error) } + waiter.setupComplete.whenComplete { _ in + self.execute(action) + } + } - fileprivate struct State { - /// The default `EventLoop` to use for this `HTTP1ConnectionProvider` - let defaultEventLoop: EventLoop - - /// The maximum number of connections to a certain (host, scheme, port) tuple. - private let maximumConcurrentConnections: Int = 8 - - /// Opened connections that are available - fileprivate var availableConnections: CircularBuffer = .init(initialCapacity: 8) + func release(connection: Connection, closing: Bool) { + let action: Action = self.lock.withLock { + self.state.release(connection: connection, closing: closing) + } - /// The number of currently leased connections - fileprivate var leased: Int = 0 { - didSet { - assert((0...self.maximumConcurrentConnections).contains(self.leased), "Invalid number of leased connections (\(self.leased))") - } + switch action { + case .none: + break + case .park, .closeProvider: + // Since both `.park` and `.deleteProvider` are terminal in terms of execution, + // we can execute them immediately + self.execute(action) + case .cancel, .closeAnd, .create, .fail, .lease, .parkAnd, .replace: + // This is needed to start a new stack, otherwise, since this is called on a previous + // future completion handler chain, it will be growing indefinitely until the connection is closed. + // We might revisit this when https://github.com/apple/swift-nio/issues/970 is resolved. + connection.channel.eventLoop.execute { + self.execute(action) } + } + } - /// Consumers that weren't able to get a new connection without exceeding - /// `maximumConcurrentConnections` get a `Future` - /// whose associated promise is stored in `Waiter`. The promise is completed - /// as soon as possible by the provider, in FIFO order. - fileprivate var waiters: CircularBuffer = .init(initialCapacity: 8) + func remoteClosed(connection: Connection) { + let action: Action = self.lock.withLock { + self.state.remoteClosed(connection: connection) + } - fileprivate var activity: Activity = .opened + self.execute(action) + } - fileprivate var pending: Int = 0 { - didSet { - assert(self.pending >= 0) - } - } + func timeout(connection: Connection) { + let action: Action = self.lock.withLock { + self.state.timeout(connection: connection) + } - private let parentPool: ConnectionPool + self.execute(action) + } - private let key: Key + private func closeAndDelete() { + self.pool.delete(self) + self.closePromise.succeed(()) + } - fileprivate init(eventLoop: EventLoop, parentPool: ConnectionPool, key: Key) { - self.defaultEventLoop = eventLoop - self.parentPool = parentPool - self.key = key + func close() -> EventLoopFuture { + if let (waiters, available, leased, clean) = self.lock.withLock({ self.state.close() }) { + waiters.forEach { + $0.promise.fail(HTTPClientError.cancelled) } - fileprivate mutating func connectionAction(for preference: HTTPClient.EventLoopPreference) -> ConnectionGetAction { - self.pending -= 1 - let (channelEL, requiresSpecifiedEL) = self.resolvePreference(preference) - if self.leased < self.maximumConcurrentConnections { - self.leased += 1 - if let connection = availableConnections.swapWithFirstAndRemove(where: { $0.channel.eventLoop === channelEL }) { - connection.isLeased = true - return .leaseConnection(connection) - } else { - if requiresSpecifiedEL { - return .makeConnection(channelEL) - } else if let existingConnection = availableConnections.popFirst() { - return .leaseConnection(existingConnection) - } else { - return .makeConnection(self.defaultEventLoop) - } - } - } else { - let promise = channelEL.makePromise(of: Connection.self) - self.waiters.append(Waiter(promise: promise, preference: preference)) - return .leaseFutureConnection(promise.futureResult) - } + EventLoopFuture.andAllComplete(leased.map { $0.cancel() }, on: self.eventLoop).flatMap { _ in + EventLoopFuture.andAllComplete(available.map { $0.close() }, on: self.eventLoop) + }.whenFailure { error in + self.closePromise.fail(error) } - fileprivate mutating func releaseAction(for connection: Connection) -> ConnectionReleaseAction { - if let firstWaiter = self.waiters.popFirst() { - let (channelEL, requiresSpecifiedEL) = self.resolvePreference(firstWaiter.preference) + return self.closePromise.futureResult.map { clean } + } - guard connection.isActiveEstimation, !connection.isClosing else { - return .makeConnectionAndComplete(channelEL, firstWaiter.promise) - } + return self.closePromise.futureResult.map { true } + } - if connection.channel.eventLoop === channelEL { - return .succeed(firstWaiter.promise) - } else { - if requiresSpecifiedEL { - connection.mustRunDefaultCloseCallback = false - return .replaceConnection(channelEL, firstWaiter.promise) - } else { - return .makeConnectionAndComplete(channelEL, firstWaiter.promise) - } - } + private func makeChannel(preference: HTTPClient.EventLoopPreference) -> EventLoopFuture { + let eventLoop = preference.bestEventLoop ?? self.eventLoop + let requiresTLS = self.key.scheme == .https + let bootstrap: NIOClientTCPBootstrap + do { + bootstrap = try NIOClientTCPBootstrap.makeHTTPClientBootstrapBase(on: eventLoop, host: self.key.host, port: self.key.port, requiresTLS: requiresTLS, configuration: self.configuration) + } catch { + return eventLoop.makeFailedFuture(error) + } - } else { - connection.isLeased = false - self.leased -= 1 - if connection.isActiveEstimation, !connection.isClosing { - self.availableConnections.append(connection) - } + let channel: EventLoopFuture + switch self.key.scheme { + case .http, .https: + let address = HTTPClient.resolveAddress(host: self.key.host, port: self.key.port, proxy: self.configuration.proxy) + channel = bootstrap.connect(host: address.host, port: address.port) + case .unix: + channel = bootstrap.connect(unixDomainSocketPath: self.key.unixPath) + } - if self.providerMustClose() { - self.removeFromPool() - } + return channel.flatMap { channel in + let requiresSSLHandler = self.configuration.proxy != nil && self.key.scheme == .https + let handshakePromise = channel.eventLoop.makePromise(of: Void.self) - return .none - } - } + channel.pipeline.addSSLHandlerIfNeeded(for: self.key, tlsConfiguration: self.configuration.tlsConfiguration, addSSLClient: requiresSSLHandler, handshakePromise: handshakePromise) - fileprivate mutating func removeClosedConnection(_ connection: Connection) -> ClosedConnectionRemoveAction { - if connection.isLeased { - if let firstWaiter = self.waiters.popFirst() { - let (el, _) = self.resolvePreference(firstWaiter.preference) - return .makeConnectionAndComplete(el, firstWaiter.promise) + return handshakePromise.futureResult.flatMap { + channel.pipeline.addHTTPClientHandlers(leftOverBytesStrategy: .forwardBytes) + }.flatMap { + #if canImport(Network) + if #available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *), bootstrap.underlyingBootstrap is NIOTSConnectionBootstrap { + return channel.pipeline.addHandler(HTTPClient.NWErrorHandler(), position: .first) } - } else { - self.availableConnections.swapWithFirstAndRemove(where: { $0 === connection }) - } - - if self.providerMustClose() { - self.removeFromPool() + #endif + return channel.eventLoop.makeSucceededFuture(()) + }.flatMap { + switch self.configuration.decompression { + case .disabled: + return channel.eventLoop.makeSucceededFuture(()) + case .enabled(let limit): + let decompressHandler = NIOHTTPResponseDecompressor(limit: limit) + return channel.pipeline.addHandler(decompressHandler) } - - return .none + }.map { + channel } - - fileprivate mutating func failedConnectionAction() -> ClosedConnectionRemoveAction { - if let firstWaiter = self.waiters.popFirst() { - let (el, _) = self.resolvePreference(firstWaiter.preference) - return .makeConnectionAndComplete(el, firstWaiter.promise) - } else { - self.leased -= 1 - if self.providerMustClose() { - self.removeFromPool() - } - return .none + }.flatMapError { error in + #if canImport(Network) + var error = error + if #available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *), bootstrap.underlyingBootstrap is NIOTSConnectionBootstrap { + error = HTTPClient.NWErrorHandler.translateError(error) } - } - - private func providerMustClose() -> Bool { - return self.pending == 0 && self.activity != .closed && self.leased == 0 && self.availableConnections.isEmpty && self.waiters.isEmpty - } - - /// - Warning: This should always be called from a critical section protected by `.connectionProvidersLock` - fileprivate mutating func removeFromPool() { - assert(self.parentPool.connectionProviders[self.key] != nil) - self.parentPool.connectionProviders[self.key] = nil - assert(self.activity != .closed) - self.activity = .closed - } + #endif + return self.eventLoop.makeFailedFuture(error) + } + } - private func resolvePreference(_ preference: HTTPClient.EventLoopPreference) -> (EventLoop, Bool) { - switch preference.preference { - case .indifferent: - return (self.defaultEventLoop, false) - case .delegate(let el): - return (el, false) - case .delegateAndChannel(let el), .testOnly_exact(let el, _): - return (el, true) - } - } + /// A `Waiter` represents a request that waits for a connection when none is + /// currently available + /// + /// `Waiter`s are created when `maximumConcurrentConnections` is reached + /// and we cannot create new connections anymore. + struct Waiter { + /// The promise to complete once a connection is available + let promise: EventLoopPromise + + /// Future that will be succeeded when request timeout handler and `TaskHandler` are added to the pipeline. + let setupComplete: EventLoopFuture + + /// The event loop preference associated to this particular request + /// that the provider should respect + let preference: HTTPClient.EventLoopPreference + } +} - fileprivate enum ConnectionGetAction { - case leaseConnection(Connection) - case makeConnection(EventLoop) - case leaseFutureConnection(EventLoopFuture) - } +class IdlePoolConnectionHandler: ChannelInboundHandler, RemovableChannelHandler { + typealias InboundIn = NIOAny - fileprivate enum ConnectionReleaseAction { - case succeed(EventLoopPromise) - case makeConnectionAndComplete(EventLoop, EventLoopPromise) - case replaceConnection(EventLoop, EventLoopPromise) - case none - } + let connection: Connection + var eventSent: Bool - fileprivate enum ClosedConnectionRemoveAction { - case none - case makeConnectionAndComplete(EventLoop, EventLoopPromise) - } + init(connection: Connection) { + self.connection = connection + self.eventSent = false + } - /// A `Waiter` represents a request that waits for a connection when none is - /// currently available - /// - /// `Waiter`s are created when `maximumConcurrentConnections` is reached - /// and we cannot create new connections anymore. - fileprivate struct Waiter { - /// The promise to complete once a connection is available - let promise: EventLoopPromise - - /// The event loop preference associated to this particular request - /// that the provider should respect - let preference: HTTPClient.EventLoopPreference - } + // this is needed to detect when remote end closes connection while connection is in the pool idling + func channelInactive(context: ChannelHandlerContext) { + if !self.eventSent { + self.eventSent = true + self.connection.remoteClosed() + } + } - enum Activity: Hashable, CustomStringConvertible { - case opened - case closing - case closed - - var description: String { - switch self { - case .opened: - return "opened" - case .closing: - return "closing" - case .closed: - return "closed" - } - } + func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) { + if let idleEvent = event as? IdleStateHandler.IdleStateEvent, idleEvent == .write { + if !self.eventSent { + self.eventSent = true + self.connection.timeout() } + } else { + context.fireUserInboundEventTriggered(event) } } } + +extension CircularBuffer { + mutating func swap(at index: Index, with value: Element) -> Element { + let tmp = self[index] + self[index] = value + return tmp + } +} diff --git a/Sources/AsyncHTTPClient/ConnectionsState.swift b/Sources/AsyncHTTPClient/ConnectionsState.swift new file mode 100644 index 000000000..7d82771be --- /dev/null +++ b/Sources/AsyncHTTPClient/ConnectionsState.swift @@ -0,0 +1,335 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the AsyncHTTPClient open source project +// +// Copyright (c) 2019-2020 Apple Inc. and the AsyncHTTPClient project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import NIO + +extension HTTP1ConnectionProvider { + enum Action { + case lease(Connection, Waiter) + case create(Waiter) + case replace(Connection, Waiter) + case closeProvider + case park(Connection) + case none + case fail(Waiter, Error) + case cancel(Connection, Bool) + indirect case closeAnd(Connection, Action) + indirect case parkAnd(Connection, Action) + } + + struct ConnectionsState { + enum State { + case active + case closed + } + + struct Snapshot { + var state: State + var availableConnections: CircularBuffer + var leasedConnections: Set + var waiters: CircularBuffer + var openedConnectionsCount: Int + var pending: Int + } + + let maximumConcurrentConnections: Int + let eventLoop: EventLoop + + private var state: State = .active + + /// Opened connections that are available. + private var availableConnections: CircularBuffer = .init(initialCapacity: 8) + + /// Opened connections that are leased to the user. + private var leasedConnections: Set = .init() + + /// Consumers that weren't able to get a new connection without exceeding + /// `maximumConcurrentConnections` get a `Future` + /// whose associated promise is stored in `Waiter`. The promise is completed + /// as soon as possible by the provider, in FIFO order. + private var waiters: CircularBuffer = .init(initialCapacity: 8) + + /// Number of opened or opening connections, used to keep track of all connections and enforcing `maximumConcurrentConnections` limit. + private var openedConnectionsCount: Int = 0 + + /// Number of enqueued requests, used to track if it is safe to delete the provider. + private var pending: Int = 1 + + init(maximumConcurrentConnections: Int = 8, eventLoop: EventLoop) { + self.maximumConcurrentConnections = maximumConcurrentConnections + self.eventLoop = eventLoop + } + + func testsOnly_getInternalState() -> Snapshot { + return Snapshot(state: self.state, availableConnections: self.availableConnections, leasedConnections: self.leasedConnections, waiters: self.waiters, openedConnectionsCount: self.openedConnectionsCount, pending: self.pending) + } + + mutating func testsOnly_setInternalState(_ snapshot: Snapshot) { + self.state = snapshot.state + self.availableConnections = snapshot.availableConnections + self.leasedConnections = snapshot.leasedConnections + self.waiters = snapshot.waiters + self.openedConnectionsCount = snapshot.openedConnectionsCount + self.pending = snapshot.pending + } + + func assertInvariants() { + assert(self.waiters.isEmpty) + assert(self.availableConnections.isEmpty) + assert(self.leasedConnections.isEmpty) + assert(self.openedConnectionsCount == 0) + assert(self.pending == 0) + } + + mutating func enqueue() -> Bool { + switch self.state { + case .active: + self.pending += 1 + return true + case .closed: + return false + } + } + + private var hasCapacity: Bool { + return self.openedConnectionsCount < self.maximumConcurrentConnections + } + + private var isEmpty: Bool { + return self.openedConnectionsCount == 0 && self.pending == 0 + } + + mutating func acquire(waiter: Waiter) -> Action { + switch self.state { + case .active: + self.pending -= 1 + + let (eventLoop, required) = self.resolvePreference(waiter.preference) + if required { + // If there is an opened connection on the same EL - use it + if let found = self.availableConnections.firstIndex(where: { $0.channel.eventLoop === eventLoop }) { + let connection = self.availableConnections.remove(at: found) + self.leasedConnections.insert(ConnectionKey(connection)) + return .lease(connection, waiter) + } + + // If we can create additional connection, create + if self.hasCapacity { + self.openedConnectionsCount += 1 + return .create(waiter) + } + + // If we cannot create additional connection, but there is one in the pool, replace it + if let connection = self.availableConnections.popFirst() { + return .replace(connection, waiter) + } + + self.waiters.append(waiter) + return .none + } else if let connection = self.availableConnections.popFirst() { + self.leasedConnections.insert(ConnectionKey(connection)) + return .lease(connection, waiter) + } else if self.hasCapacity { + self.openedConnectionsCount += 1 + return .create(waiter) + } else { + self.waiters.append(waiter) + return .none + } + case .closed: + return .fail(waiter, ProviderClosedError()) + } + } + + mutating func release(connection: Connection, closing: Bool) -> Action { + switch self.state { + case .active: + assert(self.leasedConnections.contains(ConnectionKey(connection))) + + if connection.isActiveEstimation, !closing { // If connection is alive, we can offer it to a next waiter + if let waiter = self.waiters.popFirst() { + let (eventLoop, required) = self.resolvePreference(waiter.preference) + + // If returned connection is on same EL or we do not require special EL - lease it + if connection.channel.eventLoop === eventLoop || !required { + return .lease(connection, waiter) + } + + // If there is an opened connection on the same loop, lease it and park returned + if let found = self.availableConnections.firstIndex(where: { $0.channel.eventLoop === eventLoop }) { + self.leasedConnections.remove(ConnectionKey(connection)) + let replacement = self.availableConnections.swap(at: found, with: connection) + self.leasedConnections.insert(ConnectionKey(replacement)) + return .parkAnd(connection, .lease(replacement, waiter)) + } + + // If we can create new connection - do it + if self.hasCapacity { + self.leasedConnections.remove(ConnectionKey(connection)) + self.availableConnections.append(connection) + self.openedConnectionsCount += 1 + return .parkAnd(connection, .create(waiter)) + } + + // If we cannot create new connections, we will have to replace returned connection with a new one on the required loop + return .replace(connection, waiter) + } else { // or park, if there are no waiters + self.leasedConnections.remove(ConnectionKey(connection)) + self.availableConnections.append(connection) + return .park(connection) + } + } else { // if connection is not alive, we delete it and process the next waiter + // this connections is now gone, we will either create new connection or do nothing + self.openedConnectionsCount -= 1 + self.leasedConnections.remove(ConnectionKey(connection)) + + return self.processNextWaiter() + } + case .closed: + self.openedConnectionsCount -= 1 + self.leasedConnections.remove(ConnectionKey(connection)) + + return self.processNextWaiter() + } + } + + mutating func offer(connection: Connection) -> Action { + switch self.state { + case .active: + self.leasedConnections.insert(ConnectionKey(connection)) + return .none + case .closed: // This can happen when we close the client while connections was being estableshed + return .cancel(connection, self.isEmpty) + } + } + + mutating func drop(connection: Connection) { + switch self.state { + case .active: + self.leasedConnections.remove(ConnectionKey(connection)) + case .closed: + assertionFailure("should not happen") + } + } + + mutating func connectFailed() -> Action { + switch self.state { + case .active: + self.openedConnectionsCount -= 1 + return self.processNextWaiter() + case .closed: + assertionFailure("should not happen") + return .none + } + } + + mutating func remoteClosed(connection: Connection) -> Action { + switch self.state { + case .active: + // Connection can be closed remotely while we wait for `.lease` action to complete. + // If this happens when connections is leased, we do not remove it from leased connections, + // it will be done when a new replacement will be ready for it. + if self.leasedConnections.contains(ConnectionKey(connection)) { + return .none + } + + // If this connection is not in use, the have to release it as well + self.openedConnectionsCount -= 1 + self.availableConnections.removeAll { $0 === connection } + + return self.processNextWaiter() + case .closed: + self.openedConnectionsCount -= 1 + return self.processNextWaiter() + } + } + + mutating func timeout(connection: Connection) -> Action { + switch self.state { + case .active: + // We can get timeout and inUse = true when we decided to lease the connection, but this action is not executed yet. + // In this case we can ignore timeout notification. + if self.leasedConnections.contains(ConnectionKey(connection)) { + return .none + } + + // If connection was not in use, we release it from the pool, increasing available capacity + self.openedConnectionsCount -= 1 + self.availableConnections.removeAll { $0 === connection } + + return .closeAnd(connection, self.processNextWaiter()) + case .closed: + return .none + } + } + + mutating func processNextWaiter() -> Action { + if let waiter = self.waiters.popFirst() { + let (eventLoop, required) = self.resolvePreference(waiter.preference) + + // If specific EL is required, we have only two options - find open one or create a new one + if required, let found = self.availableConnections.firstIndex(where: { $0.channel.eventLoop === eventLoop }) { + let connection = self.availableConnections.remove(at: found) + self.leasedConnections.insert(ConnectionKey(connection)) + return .lease(connection, waiter) + } else if !required, let connection = self.availableConnections.popFirst() { + self.leasedConnections.insert(ConnectionKey(connection)) + return .lease(connection, waiter) + } else { + self.openedConnectionsCount += 1 + return .create(waiter) + } + } + + // if capacity is at max and the are no waiters and no in-flight requests for connection, we are closing this provider + if self.isEmpty { + // deactivate and remove + self.state = .closed + return .closeProvider + } + + return .none + } + + mutating func close() -> (CircularBuffer, CircularBuffer, Set, Bool)? { + switch self.state { + case .active: + let waiters = self.waiters + self.waiters.removeAll() + + let available = self.availableConnections + self.availableConnections.removeAll() + + let leased = self.leasedConnections + + self.state = .closed + + return (waiters, available, leased, self.openedConnectionsCount - available.count == 0) + case .closed: + return nil + } + } + + private func resolvePreference(_ preference: HTTPClient.EventLoopPreference) -> (EventLoop, Bool) { + switch preference.preference { + case .indifferent: + return (self.eventLoop, false) + case .delegate(let el): + return (el, false) + case .delegateAndChannel(let el), .testOnly_exact(let el, _): + return (el, true) + } + } + } +} diff --git a/Sources/AsyncHTTPClient/HTTPClient.swift b/Sources/AsyncHTTPClient/HTTPClient.swift index 6c6dfa21a..49619a814 100644 --- a/Sources/AsyncHTTPClient/HTTPClient.swift +++ b/Sources/AsyncHTTPClient/HTTPClient.swift @@ -52,7 +52,6 @@ public class HTTPClient { let configuration: Configuration let pool: ConnectionPool var state: State - private var tasks = [UUID: TaskProtocol]() private let stateLock = Lock() /// Create an `HTTPClient` with specified `EventLoopGroup` provider and configuration. @@ -82,7 +81,7 @@ public class HTTPClient { } deinit { - assert(self.pool.connectionProviderCount == 0) + assert(self.pool.count == 0) assert(self.state == .shutDown, "Client not shut down before the deinit. Please call client.syncShutdown() when no longer needed.") } @@ -136,14 +135,6 @@ public class HTTPClient { self.shutdown(requiresCleanClose: false, queue: queue, callback) } - private func cancelTasks(_ tasks: Dictionary.Values) -> EventLoopFuture { - for task in tasks { - task.cancel() - } - - return EventLoopFuture.andAllComplete(tasks.map { $0.completion }, on: self.eventLoopGroup.next()) - } - private func shutdownEventLoop(queue: DispatchQueue, _ callback: @escaping (Error?) -> Void) { self.stateLock.withLock { switch self.eventLoopGroupProvider { @@ -163,37 +154,34 @@ public class HTTPClient { } private func shutdown(requiresCleanClose: Bool, queue: DispatchQueue, _ callback: @escaping (Error?) -> Void) { - let result: Result.Values, Error> = self.stateLock.withLock { - if self.state != .upAndRunning { - return .failure(HTTPClientError.alreadyShutdown) - } else { + do { + try self.stateLock.withLock { + if self.state != .upAndRunning { + throw HTTPClientError.alreadyShutdown + } self.state = .shuttingDown - return .success(self.tasks.values) } + } catch { + callback(error) + return } - switch result { - case .failure(let error): - callback(error) - case .success(let tasks): - self.pool.prepareForClose(on: self.eventLoopGroup.next()).whenComplete { _ in - var closeError: Error? - if !tasks.isEmpty, requiresCleanClose { + self.pool.close(on: self.eventLoopGroup.next()).whenComplete { result in + var closeError: Error? + switch result { + case .failure(let error): + closeError = error + case .success(let cleanShutdown): + if !cleanShutdown, requiresCleanClose { closeError = HTTPClientError.uncleanShutdown } - // we ignore errors here - self.cancelTasks(tasks).whenComplete { _ in - // we ignore errors here - self.pool.close(on: self.eventLoopGroup.next()).whenComplete { _ in - self.shutdownEventLoop(queue: queue) { eventLoopError in - // we prioritise .uncleanShutdown here - if let error = closeError { - callback(error) - } else { - callback(eventLoopError) - } - } + self.shutdownEventLoop(queue: queue) { eventLoopError in + // we prioritise .uncleanShutdown here + if let error = closeError { + callback(error) + } else { + callback(eventLoopError) } } } @@ -361,38 +349,20 @@ public class HTTPClient { redirectHandler = nil } - let task = Task(eventLoop: taskEL, poolingTimeout: self.configuration.maximumAllowedIdleTimeInConnectionPool) - self.stateLock.withLock { - self.tasks[task.id] = task - } - let promise = task.promise - - promise.futureResult.whenComplete { _ in - self.stateLock.withLock { - self.tasks[task.id] = nil - } - } - - let connection = self.pool.getConnection(for: request, preference: eventLoopPreference, on: taskEL, deadline: deadline) + let task = Task(eventLoop: taskEL) + let setupComplete = taskEL.makePromise(of: Void.self) + let connection = self.pool.getConnection(for: request, preference: eventLoopPreference, on: taskEL, deadline: deadline, setupComplete: setupComplete.futureResult) connection.flatMap { connection -> EventLoopFuture in let channel = connection.channel - let addedFuture: EventLoopFuture - switch self.configuration.decompression { - case .disabled: - addedFuture = channel.eventLoop.makeSucceededFuture(()) - case .enabled(let limit): - let decompressHandler = NIOHTTPResponseDecompressor(limit: limit) - addedFuture = channel.pipeline.addHandler(decompressHandler) + let future: EventLoopFuture + if let timeout = self.resolve(timeout: self.configuration.timeout.read, deadline: deadline) { + future = channel.pipeline.addHandler(IdleStateHandler(readTimeout: timeout)) + } else { + future = channel.eventLoop.makeSucceededFuture(()) } - return addedFuture.flatMap { - if let timeout = self.resolve(timeout: self.configuration.timeout.read, deadline: deadline) { - return channel.pipeline.addHandler(IdleStateHandler(readTimeout: timeout)) - } else { - return channel.eventLoop.makeSucceededFuture(()) - } - }.flatMap { + return future.flatMap { let taskHandler = TaskHandler(task: task, kind: request.kind, delegate: delegate, @@ -416,10 +386,12 @@ public class HTTPClient { return channel.eventLoop.makeSucceededFuture(()) } }.flatMapError { error in - connection.release() + connection.release(closing: true) return channel.eventLoop.makeFailedFuture(error) } - }.cascadeFailure(to: promise) + }.always { _ in + setupComplete.succeed(()) + }.cascadeFailure(to: task.promise) return task } diff --git a/Sources/AsyncHTTPClient/HTTPHandler.swift b/Sources/AsyncHTTPClient/HTTPHandler.swift index dbe140981..6adb55342 100644 --- a/Sources/AsyncHTTPClient/HTTPHandler.swift +++ b/Sources/AsyncHTTPClient/HTTPHandler.swift @@ -485,25 +485,22 @@ extension URL { extension HTTPClient { /// Response execution context. Will be created by the library and could be used for obtaining /// `EventLoopFuture` of the execution or cancellation of the execution. - public final class Task: TaskProtocol { + public final class Task { /// The `EventLoop` the delegate will be executed on. public let eventLoop: EventLoop let promise: EventLoopPromise var completion: EventLoopFuture - var connection: ConnectionPool.Connection? + var connection: Connection? var cancelled: Bool let lock: Lock - let id = UUID() - let poolingTimeout: TimeAmount? - init(eventLoop: EventLoop, poolingTimeout: TimeAmount? = nil) { + init(eventLoop: EventLoop) { self.eventLoop = eventLoop self.promise = eventLoop.makePromise() self.completion = self.promise.futureResult.map { _ in } self.cancelled = false self.lock = Lock() - self.poolingTimeout = poolingTimeout } static func failedTask(eventLoop: EventLoop, error: Error) -> Task { @@ -528,8 +525,8 @@ extension HTTPClient { /// Cancels the request execution. public func cancel() { let channel: Channel? = self.lock.withLock { - if !cancelled { - cancelled = true + if !self.cancelled { + self.cancelled = true return self.connection?.channel } else { return nil @@ -539,7 +536,7 @@ extension HTTPClient { } @discardableResult - func setConnection(_ connection: ConnectionPool.Connection) -> ConnectionPool.Connection { + func setConnection(_ connection: Connection) -> Connection { return self.lock.withLock { self.connection = connection if self.cancelled { @@ -549,47 +546,32 @@ extension HTTPClient { } } - func succeed(promise: EventLoopPromise?, with value: Response, delegateType: Delegate.Type) { - self.releaseAssociatedConnection(delegateType: delegateType).whenSuccess { + func succeed(promise: EventLoopPromise?, with value: Response, delegateType: Delegate.Type, closing: Bool) { + self.releaseAssociatedConnection(delegateType: delegateType, closing: closing).whenSuccess { promise?.succeed(value) } } func fail(with error: Error, delegateType: Delegate.Type) { if let connection = self.connection { - connection.close().whenComplete { _ in - self.releaseAssociatedConnection(delegateType: delegateType).whenComplete { _ in + connection.channel.close(promise: nil) + self.releaseAssociatedConnection(delegateType: delegateType, closing: true) + .whenSuccess { self.promise.fail(error) } - } } } - func releaseAssociatedConnection(delegateType: Delegate.Type) -> EventLoopFuture { + func releaseAssociatedConnection(delegateType: Delegate.Type, closing: Bool) -> EventLoopFuture { if let connection = self.connection { - return connection.removeHandler(NIOHTTPResponseDecompressor.self).flatMap { - connection.removeHandler(IdleStateHandler.self) - }.flatMap { + // remove read timeout handler + return connection.removeHandler(IdleStateHandler.self).flatMap { connection.removeHandler(TaskHandler.self) - }.flatMap { - let idlePoolConnectionHandler = IdlePoolConnectionHandler() - return connection.channel.pipeline.addHandler(idlePoolConnectionHandler, position: .last).flatMap { - connection.channel.pipeline.addHandler(IdleStateHandler(writeTimeout: self.poolingTimeout), position: .before(idlePoolConnectionHandler)) - } - }.flatMapError { error in - if let error = error as? ChannelError, error == .ioOnClosedChannel { - // We may get this error if channel is released because it is - // closed, it is safe to ignore it - return connection.channel.eventLoop.makeSucceededFuture(()) - } else { - return connection.channel.eventLoop.makeFailedFuture(error) - } }.map { - connection.release() + connection.release(closing: closing) }.flatMapError { error in fatalError("Couldn't remove taskHandler: \(error)") } - } else { // TODO: This seems only reached in some internal unit test // Maybe there could be a better handling in the future to make @@ -602,12 +584,6 @@ extension HTTPClient { internal struct TaskCancelEvent {} -internal protocol TaskProtocol { - func cancel() - var id: UUID { get } - var completion: EventLoopFuture { get } -} - // MARK: - TaskHandler internal class TaskHandler: RemovableChannelHandler { @@ -628,6 +604,7 @@ internal class TaskHandler: RemovableChann var state: State = .idle var pendingRead = false var mayRead = true + var closing = false let kind: HTTPClient.Request.Kind init(task: HTTPClient.Task, @@ -695,7 +672,7 @@ extension TaskHandler { do { let result = try body(self.task) - self.task.succeed(promise: promise, with: result, delegateType: Delegate.self) + self.task.succeed(promise: promise, with: result, delegateType: Delegate.self, closing: self.closing) } catch { self.task.fail(with: error, delegateType: Delegate.self) } @@ -727,7 +704,7 @@ extension TaskHandler: ChannelDuplexHandler { func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise?) { self.state = .idle - let request = unwrapOutboundIn(data) + let request = self.unwrapOutboundIn(data) let uri: String switch (self.kind, request.url.baseURL) { @@ -772,8 +749,13 @@ extension TaskHandler: ChannelDuplexHandler { self.callOutToDelegateFireAndForget(self.delegate.didSendRequest) }.flatMapErrorThrowing { error in context.eventLoop.assertInEventLoop() - self.state = .end - self.failTaskAndNotifyDelegate(error: error, self.delegate.didReceiveError) + switch self.state { + case .end: + break + default: + self.state = .end + self.failTaskAndNotifyDelegate(error: error, self.delegate.didReceiveError) + } throw error }.cascade(to: promise) } @@ -805,16 +787,10 @@ extension TaskHandler: ChannelDuplexHandler { switch response { case .head(let head): if !head.isKeepAlive { - self.task.lock.withLock { - if let connection = self.task.connection { - connection.isClosing = true - } else { - preconditionFailure("There should always be a connection at this point") - } - } + self.closing = true } - if let redirectURL = redirectHandler?.redirectTarget(status: head.status, headers: head.headers) { + if let redirectURL = self.redirectHandler?.redirectTarget(status: head.status, headers: head.headers) { self.state = .redirected(head, redirectURL) } else { self.state = .head @@ -840,7 +816,7 @@ extension TaskHandler: ChannelDuplexHandler { switch self.state { case .redirected(let head, let redirectURL): self.state = .end - self.task.releaseAssociatedConnection(delegateType: Delegate.self).whenSuccess { + self.task.releaseAssociatedConnection(delegateType: Delegate.self, closing: self.closing).whenSuccess { self.redirectHandler?.redirect(status: head.status, to: redirectURL, promise: self.task.promise) } default: @@ -1021,21 +997,3 @@ internal struct RedirectHandler { } } } - -class IdlePoolConnectionHandler: ChannelInboundHandler, RemovableChannelHandler { - typealias InboundIn = NIOAny - - let _hasNotSentClose: NIOAtomic = .makeAtomic(value: true) - var hasNotSentClose: Bool { - return self._hasNotSentClose.load() - } - - func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) { - if let idleEvent = event as? IdleStateHandler.IdleStateEvent, idleEvent == .write { - self._hasNotSentClose.store(false) - context.close(promise: nil) - } else { - context.fireUserInboundEventTriggered(event) - } - } -} diff --git a/Sources/AsyncHTTPClient/Utils.swift b/Sources/AsyncHTTPClient/Utils.swift index 95d85c958..9160653e1 100644 --- a/Sources/AsyncHTTPClient/Utils.swift +++ b/Sources/AsyncHTTPClient/Utils.swift @@ -135,29 +135,7 @@ extension NIOClientTCPBootstrap { } } -extension CircularBuffer { - @discardableResult - mutating func swapWithFirstAndRemove(at index: Index) -> Element? { - precondition(index >= self.startIndex && index < self.endIndex) - if !self.isEmpty { - self.swapAt(self.startIndex, index) - return self.removeFirst() - } else { - return nil - } - } - - @discardableResult - mutating func swapWithFirstAndRemove(where predicate: (Element) throws -> Bool) rethrows -> Element? { - if let existingIndex = try self.firstIndex(where: predicate) { - return self.swapWithFirstAndRemove(at: existingIndex) - } else { - return nil - } - } -} - -extension ConnectionPool.Connection { +extension Connection { func removeHandler(_ type: Handler.Type) -> EventLoopFuture { return self.channel.pipeline.handler(type: type).flatMap { handler in self.channel.pipeline.removeHandler(handler) diff --git a/Tests/AsyncHTTPClientTests/ConnectionPoolTests+XCTest.swift b/Tests/AsyncHTTPClientTests/ConnectionPoolTests+XCTest.swift new file mode 100644 index 000000000..0350edf72 --- /dev/null +++ b/Tests/AsyncHTTPClientTests/ConnectionPoolTests+XCTest.swift @@ -0,0 +1,66 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the AsyncHTTPClient open source project +// +// Copyright (c) 2018-2019 Apple Inc. and the AsyncHTTPClient project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// +// +// ConnectionPoolTests+XCTest.swift +// +import XCTest + +/// +/// NOTE: This file was generated by generate_linux_tests.rb +/// +/// Do NOT edit this file directly as it will be regenerated automatically when needed. +/// + +extension ConnectionPoolTests { + static var allTests: [(String, (ConnectionPoolTests) -> () throws -> Void)] { + return [ + ("testPending", testPending), + ("testAcquireWhenEmpty", testAcquireWhenEmpty), + ("testAcquireWhenAvailable", testAcquireWhenAvailable), + ("testAcquireWhenUnavailable", testAcquireWhenUnavailable), + ("testAcquireWhenEmptySpecificEL", testAcquireWhenEmptySpecificEL), + ("testAcquireWhenAvailableSpecificEL", testAcquireWhenAvailableSpecificEL), + ("testAcquireReplace", testAcquireReplace), + ("testAcquireWhenUnavailableSpecificEL", testAcquireWhenUnavailableSpecificEL), + ("testAcquireWhenClosed", testAcquireWhenClosed), + ("testReleaseAliveConnectionEmptyQueue", testReleaseAliveConnectionEmptyQueue), + ("testReleaseAliveButClosingConnectionEmptyQueue", testReleaseAliveButClosingConnectionEmptyQueue), + ("testReleaseInactiveConnectionEmptyQueue", testReleaseInactiveConnectionEmptyQueue), + ("testReleaseInactiveConnectionEmptyQueueHasConnections", testReleaseInactiveConnectionEmptyQueueHasConnections), + ("testReleaseAliveConnectionHasWaiter", testReleaseAliveConnectionHasWaiter), + ("testReleaseInactiveConnectionHasWaitersNoConnections", testReleaseInactiveConnectionHasWaitersNoConnections), + ("testReleaseInactiveConnectionHasWaitersHasConnections", testReleaseInactiveConnectionHasWaitersHasConnections), + ("testReleaseAliveConnectionSameELHasWaiterSpecificEL", testReleaseAliveConnectionSameELHasWaiterSpecificEL), + ("testReleaseAliveConnectionDifferentELNoSameELConnectionsHasWaiterSpecificEL", testReleaseAliveConnectionDifferentELNoSameELConnectionsHasWaiterSpecificEL), + ("testReleaseAliveConnectionDifferentELHasSameELConnectionsHasWaiterSpecificEL", testReleaseAliveConnectionDifferentELHasSameELConnectionsHasWaiterSpecificEL), + ("testReleaseAliveConnectionDifferentELNoSameELConnectionsOnLimitHasWaiterSpecificEL", testReleaseAliveConnectionDifferentELNoSameELConnectionsOnLimitHasWaiterSpecificEL), + ("testReleaseInactiveConnectionHasWaitersHasSameELConnectionsSpecificEL", testReleaseInactiveConnectionHasWaitersHasSameELConnectionsSpecificEL), + ("testReleaseInactiveConnectionHasWaitersNoSameELConnectionsSpecificEL", testReleaseInactiveConnectionHasWaitersNoSameELConnectionsSpecificEL), + ("testNextWaiterEmptyQueue", testNextWaiterEmptyQueue), + ("testNextWaiterEmptyQueueHasConnections", testNextWaiterEmptyQueueHasConnections), + ("testNextWaiterHasWaitersHasConnections", testNextWaiterHasWaitersHasConnections), + ("testNextWaiterHasWaitersHasSameELConnectionsSpecificEL", testNextWaiterHasWaitersHasSameELConnectionsSpecificEL), + ("testNextWaiterHasWaitersHasDifferentELConnectionsSpecificEL", testNextWaiterHasWaitersHasDifferentELConnectionsSpecificEL), + ("testTimeoutLeasedConnection", testTimeoutLeasedConnection), + ("testTimeoutAvailableConnection", testTimeoutAvailableConnection), + ("testRemoteClosedLeasedConnection", testRemoteClosedLeasedConnection), + ("testRemoteClosedAvailableConnection", testRemoteClosedAvailableConnection), + ("testConnectionReleaseActive", testConnectionReleaseActive), + ("testConnectionReleaseInactive", testConnectionReleaseInactive), + ("testConnectionRemoteCloseRelease", testConnectionRemoteCloseRelease), + ("testConnectionTimeoutRelease", testConnectionTimeoutRelease), + ("testAcquireAvailableBecomesUnavailable", testAcquireAvailableBecomesUnavailable), + ] + } +} diff --git a/Tests/AsyncHTTPClientTests/ConnectionPoolTests.swift b/Tests/AsyncHTTPClientTests/ConnectionPoolTests.swift new file mode 100644 index 000000000..ef7958b6a --- /dev/null +++ b/Tests/AsyncHTTPClientTests/ConnectionPoolTests.swift @@ -0,0 +1,1437 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the AsyncHTTPClient open source project +// +// Copyright (c) 2018-2019 Apple Inc. and the AsyncHTTPClient project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +@testable import AsyncHTTPClient +@testable import NIO +import NIOConcurrencyHelpers +import NIOFoundationCompat +import NIOHTTP1 +import NIOHTTPCompression +import NIOSSL +import NIOTestUtils +import NIOTransportServices +import XCTest + +class ConnectionPoolTests: XCTestCase { + struct TempError: Error {} + + func testPending() { + let eventLoop = EmbeddedEventLoop() + + var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: eventLoop) + + var snapshot = state.testsOnly_getInternalState() + XCTAssertEqual(0, snapshot.availableConnections.count) + XCTAssertEqual(0, snapshot.leasedConnections.count) + XCTAssertEqual(0, snapshot.waiters.count) + XCTAssertEqual(1, snapshot.pending) + XCTAssertEqual(0, snapshot.openedConnectionsCount) + + XCTAssertTrue(state.enqueue()) + + snapshot = state.testsOnly_getInternalState() + XCTAssertEqual(0, snapshot.availableConnections.count) + XCTAssertEqual(0, snapshot.leasedConnections.count) + XCTAssertEqual(0, snapshot.waiters.count) + XCTAssertEqual(2, snapshot.pending) + XCTAssertEqual(0, snapshot.openedConnectionsCount) + } + + // MARK: - Acquire Tests + + func testAcquireWhenEmpty() { + let eventLoop = EmbeddedEventLoop() + + var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: eventLoop) + + var snapshot = state.testsOnly_getInternalState() + XCTAssertEqual(0, snapshot.availableConnections.count) + XCTAssertEqual(0, snapshot.leasedConnections.count) + XCTAssertEqual(0, snapshot.waiters.count) + XCTAssertEqual(1, snapshot.pending) + XCTAssertEqual(0, snapshot.openedConnectionsCount) + + let action = state.acquire(waiter: .init(promise: eventLoop.makePromise(), setupComplete: eventLoop.makeSucceededFuture(()), preference: .indifferent)) + switch action { + case .create(let waiter): + waiter.promise.fail(TempError()) + default: + XCTFail("Unexpected action: \(action)") + } + + snapshot = state.testsOnly_getInternalState() + XCTAssertEqual(0, snapshot.availableConnections.count) + XCTAssertEqual(0, snapshot.leasedConnections.count) + XCTAssertEqual(0, snapshot.waiters.count) + XCTAssertEqual(0, snapshot.pending) + XCTAssertEqual(1, snapshot.openedConnectionsCount) + } + + func testAcquireWhenAvailable() throws { + let eventLoop = EmbeddedEventLoop() + let channel = EmbeddedChannel() + + let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: eventLoop, configuration: .init(), pool: .init(configuration: .init())) + var snapshot = provider.state.testsOnly_getInternalState() + + let connection = Connection(channel: channel, provider: provider) + snapshot.availableConnections.append(connection) + snapshot.openedConnectionsCount = 1 + + provider.state.testsOnly_setInternalState(snapshot) + + snapshot = provider.state.testsOnly_getInternalState() + XCTAssertEqual(1, snapshot.availableConnections.count) + XCTAssertEqual(0, snapshot.leasedConnections.count) + XCTAssertEqual(0, snapshot.waiters.count) + XCTAssertEqual(1, snapshot.pending) + XCTAssertEqual(1, snapshot.openedConnectionsCount) + + let action = provider.state.acquire(waiter: .init(promise: eventLoop.makePromise(), setupComplete: eventLoop.makeSucceededFuture(()), preference: .indifferent)) + switch action { + case .lease(let connection, let waiter): + waiter.promise.succeed(connection) + + snapshot = provider.state.testsOnly_getInternalState() + XCTAssertEqual(0, snapshot.availableConnections.count) + XCTAssertEqual(1, snapshot.leasedConnections.count) + XCTAssertEqual(0, snapshot.waiters.count) + XCTAssertEqual(0, snapshot.pending) + XCTAssertEqual(1, snapshot.openedConnectionsCount) + + // cleanup, since we don't call release + snapshot.openedConnectionsCount = 0 + provider.state.testsOnly_setInternalState(snapshot) + provider.closePromise.succeed(()) + default: + XCTFail("Unexpected action: \(action)") + } + } + + func testAcquireWhenUnavailable() throws { + let eventLoop = EmbeddedEventLoop() + + let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: eventLoop, configuration: .init(), pool: .init(configuration: .init())) + var snapshot = provider.state.testsOnly_getInternalState() + + snapshot.openedConnectionsCount = 8 + provider.state.testsOnly_setInternalState(snapshot) + + snapshot = provider.state.testsOnly_getInternalState() + XCTAssertEqual(0, snapshot.availableConnections.count) + XCTAssertEqual(0, snapshot.leasedConnections.count) + XCTAssertEqual(0, snapshot.waiters.count) + XCTAssertEqual(1, snapshot.pending) + XCTAssertEqual(8, snapshot.openedConnectionsCount) + + let action = provider.state.acquire(waiter: .init(promise: eventLoop.makePromise(), setupComplete: eventLoop.makeSucceededFuture(()), preference: .indifferent)) + switch action { + case .none: + snapshot = provider.state.testsOnly_getInternalState() + XCTAssertEqual(0, snapshot.availableConnections.count) + XCTAssertEqual(0, snapshot.leasedConnections.count) + XCTAssertEqual(1, snapshot.waiters.count) + XCTAssertEqual(0, snapshot.pending) + XCTAssertEqual(8, snapshot.openedConnectionsCount) + default: + XCTFail("Unexpected action: \(action)") + } + + // cleanup + snapshot.openedConnectionsCount = 0 + provider.state.testsOnly_setInternalState(snapshot) + provider.closePromise.succeed(()) + _ = try provider.close().wait() + } + + // MARK: - Acquire on Specific EL Tests + + func testAcquireWhenEmptySpecificEL() { + let eventLoop = EmbeddedEventLoop() + + var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: eventLoop) + var snapshot = state.testsOnly_getInternalState() + + XCTAssertEqual(0, snapshot.availableConnections.count) + XCTAssertEqual(0, snapshot.leasedConnections.count) + XCTAssertEqual(0, snapshot.waiters.count) + XCTAssertEqual(1, snapshot.pending) + XCTAssertEqual(0, snapshot.openedConnectionsCount) + + let action = state.acquire(waiter: .init(promise: eventLoop.makePromise(), setupComplete: eventLoop.makeSucceededFuture(()), preference: .delegateAndChannel(on: eventLoop))) + switch action { + case .create(let waiter): + waiter.promise.fail(TempError()) + default: + XCTFail("Unexpected action: \(action)") + } + + snapshot = state.testsOnly_getInternalState() + XCTAssertEqual(0, snapshot.availableConnections.count) + XCTAssertEqual(0, snapshot.leasedConnections.count) + XCTAssertEqual(0, snapshot.waiters.count) + XCTAssertEqual(0, snapshot.pending) + XCTAssertEqual(1, snapshot.openedConnectionsCount) + } + + func testAcquireWhenAvailableSpecificEL() throws { + let channel = EmbeddedChannel() + + let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) + var snapshot = provider.state.testsOnly_getInternalState() + + let connection = Connection(channel: channel, provider: provider) + snapshot.availableConnections.append(connection) + snapshot.openedConnectionsCount = 1 + + provider.state.testsOnly_setInternalState(snapshot) + + XCTAssertEqual(1, snapshot.availableConnections.count) + XCTAssertEqual(0, snapshot.leasedConnections.count) + XCTAssertEqual(0, snapshot.waiters.count) + XCTAssertEqual(1, snapshot.pending) + XCTAssertEqual(1, snapshot.openedConnectionsCount) + + let action = provider.state.acquire(waiter: .init(promise: channel.eventLoop.makePromise(), setupComplete: channel.eventLoop.makeSucceededFuture(()), preference: .delegateAndChannel(on: channel.eventLoop))) + switch action { + case .lease(let connection, let waiter): + waiter.promise.succeed(connection) + + snapshot = provider.state.testsOnly_getInternalState() + XCTAssertEqual(0, snapshot.availableConnections.count) + XCTAssertEqual(1, snapshot.leasedConnections.count) + XCTAssertEqual(0, snapshot.waiters.count) + XCTAssertEqual(0, snapshot.pending) + XCTAssertEqual(1, snapshot.openedConnectionsCount) + + // cleanup, since we don't call release + snapshot.openedConnectionsCount = 0 + provider.state.testsOnly_setInternalState(snapshot) + provider.closePromise.succeed(()) + default: + XCTFail("Unexpected action: \(action)") + } + } + + func testAcquireReplace() throws { + let eventLoop = EmbeddedEventLoop() + let channel = EmbeddedChannel() + + let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) + var snapshot = provider.state.testsOnly_getInternalState() + + let connection = Connection(channel: channel, provider: provider) + snapshot.availableConnections.append(connection) + snapshot.openedConnectionsCount = 8 + + provider.state.testsOnly_setInternalState(snapshot) + + XCTAssertEqual(1, snapshot.availableConnections.count) + XCTAssertEqual(0, snapshot.leasedConnections.count) + XCTAssertEqual(0, snapshot.waiters.count) + XCTAssertEqual(1, snapshot.pending) + XCTAssertEqual(8, snapshot.openedConnectionsCount) + + let action = provider.state.acquire(waiter: .init(promise: eventLoop.makePromise(), setupComplete: eventLoop.makeSucceededFuture(()), preference: .delegateAndChannel(on: eventLoop))) + switch action { + case .replace(_, let waiter): + waiter.promise.fail(TempError()) + + snapshot = provider.state.testsOnly_getInternalState() + XCTAssertEqual(0, snapshot.availableConnections.count) + XCTAssertEqual(0, snapshot.leasedConnections.count) + XCTAssertEqual(0, snapshot.waiters.count) + XCTAssertEqual(0, snapshot.pending) + XCTAssertEqual(8, snapshot.openedConnectionsCount) + default: + XCTFail("Unexpected action: \(action)") + } + + // cleanup + snapshot.openedConnectionsCount = 0 + provider.state.testsOnly_setInternalState(snapshot) + provider.closePromise.succeed(()) + _ = try provider.close().wait() + } + + func testAcquireWhenUnavailableSpecificEL() throws { + let eventLoop = EmbeddedEventLoop() + + let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: eventLoop, configuration: .init(), pool: .init(configuration: .init())) + var snapshot = provider.state.testsOnly_getInternalState() + + snapshot.openedConnectionsCount = 8 + + provider.state.testsOnly_setInternalState(snapshot) + + XCTAssertEqual(0, snapshot.availableConnections.count) + XCTAssertEqual(0, snapshot.leasedConnections.count) + XCTAssertEqual(0, snapshot.waiters.count) + XCTAssertEqual(1, snapshot.pending) + XCTAssertEqual(8, snapshot.openedConnectionsCount) + + let action = provider.state.acquire(waiter: .init(promise: eventLoop.makePromise(), setupComplete: eventLoop.makeSucceededFuture(()), preference: .delegateAndChannel(on: eventLoop))) + switch action { + case .none: + snapshot = provider.state.testsOnly_getInternalState() + XCTAssertEqual(0, snapshot.availableConnections.count) + XCTAssertEqual(0, snapshot.leasedConnections.count) + XCTAssertEqual(1, snapshot.waiters.count) + XCTAssertEqual(0, snapshot.pending) + XCTAssertEqual(8, snapshot.openedConnectionsCount) + default: + XCTFail("Unexpected action: \(action)") + } + + // cleanup + snapshot.openedConnectionsCount = 0 + provider.state.testsOnly_setInternalState(snapshot) + provider.closePromise.succeed(()) + _ = try provider.close().wait() + } + + // MARK: - Acquire Errors Tests + + func testAcquireWhenClosed() { + let eventLoop = EmbeddedEventLoop() + + var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: eventLoop) + var snapshot = state.testsOnly_getInternalState() + snapshot.state = .closed + state.testsOnly_setInternalState(snapshot) + + XCTAssertFalse(state.enqueue()) + + let promise = eventLoop.makePromise(of: Connection.self) + let action = state.acquire(waiter: .init(promise: promise, setupComplete: eventLoop.makeSucceededFuture(()), preference: .indifferent)) + switch action { + case .fail(let waiter, let error): + waiter.promise.fail(error) + default: + XCTFail("Unexpected action: \(action)") + } + } + + // MARK: - Release Tests + + func testReleaseAliveConnectionEmptyQueue() throws { + let channel = ActiveChannel() + + let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) + var snapshot = provider.state.testsOnly_getInternalState() + + snapshot.pending = 0 + snapshot.openedConnectionsCount = 1 + let connection = Connection(channel: channel, provider: provider) + snapshot.leasedConnections.insert(ConnectionKey(connection)) + + provider.state.testsOnly_setInternalState(snapshot) + + XCTAssertEqual(0, snapshot.availableConnections.count) + XCTAssertEqual(1, snapshot.leasedConnections.count) + XCTAssertEqual(0, snapshot.waiters.count) + XCTAssertEqual(0, snapshot.pending) + XCTAssertEqual(1, snapshot.openedConnectionsCount) + + let action = provider.state.release(connection: connection, closing: false) + switch action { + case .park: + snapshot = provider.state.testsOnly_getInternalState() + XCTAssertEqual(1, snapshot.availableConnections.count) + XCTAssertEqual(0, snapshot.leasedConnections.count) + XCTAssertEqual(0, snapshot.waiters.count) + XCTAssertEqual(0, snapshot.pending) + XCTAssertEqual(1, snapshot.openedConnectionsCount) + + // cleanp + provider.closePromise.succeed(()) + default: + XCTFail("Unexpected action: \(action)") + } + } + + func testReleaseAliveButClosingConnectionEmptyQueue() throws { + let channel = ActiveChannel() + + let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) + var snapshot = provider.state.testsOnly_getInternalState() + + snapshot.pending = 0 + snapshot.openedConnectionsCount = 1 + let connection = Connection(channel: channel, provider: provider) + snapshot.leasedConnections.insert(ConnectionKey(connection)) + + provider.state.testsOnly_setInternalState(snapshot) + + XCTAssertEqual(0, snapshot.availableConnections.count) + XCTAssertEqual(1, snapshot.leasedConnections.count) + XCTAssertEqual(0, snapshot.waiters.count) + XCTAssertEqual(0, snapshot.pending) + XCTAssertEqual(1, snapshot.openedConnectionsCount) + + let action = provider.state.release(connection: connection, closing: true) + switch action { + case .closeProvider: + snapshot = provider.state.testsOnly_getInternalState() + XCTAssertEqual(0, snapshot.availableConnections.count) + XCTAssertEqual(0, snapshot.leasedConnections.count) + XCTAssertEqual(0, snapshot.waiters.count) + XCTAssertEqual(0, snapshot.pending) + XCTAssertEqual(0, snapshot.openedConnectionsCount) + + // cleanup + provider.closePromise.succeed(()) + default: + XCTFail("Unexpected action: \(action)") + } + } + + func testReleaseInactiveConnectionEmptyQueue() throws { + let channel = EmbeddedChannel() + + let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) + var snapshot = provider.state.testsOnly_getInternalState() + + snapshot.pending = 0 + snapshot.openedConnectionsCount = 1 + let connection = Connection(channel: channel, provider: provider) + snapshot.leasedConnections.insert(ConnectionKey(connection)) + + provider.state.testsOnly_setInternalState(snapshot) + + XCTAssertEqual(0, snapshot.availableConnections.count) + XCTAssertEqual(1, snapshot.leasedConnections.count) + XCTAssertEqual(0, snapshot.waiters.count) + XCTAssertEqual(0, snapshot.pending) + XCTAssertEqual(1, snapshot.openedConnectionsCount) + + let action = provider.state.release(connection: connection, closing: true) + switch action { + case .closeProvider: + snapshot = provider.state.testsOnly_getInternalState() + XCTAssertEqual(0, snapshot.availableConnections.count) + XCTAssertEqual(0, snapshot.leasedConnections.count) + XCTAssertEqual(0, snapshot.waiters.count) + XCTAssertEqual(0, snapshot.pending) + XCTAssertEqual(0, snapshot.openedConnectionsCount) + + // cleanup + provider.closePromise.succeed(()) + default: + XCTFail("Unexpected action: \(action)") + } + } + + func testReleaseInactiveConnectionEmptyQueueHasConnections() throws { + let channel = EmbeddedChannel() + + let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) + var snapshot = provider.state.testsOnly_getInternalState() + + snapshot.pending = 0 + snapshot.openedConnectionsCount = 2 + let connection = Connection(channel: channel, provider: provider) + snapshot.leasedConnections.insert(ConnectionKey(connection)) + let available = Connection(channel: channel, provider: provider) + snapshot.availableConnections.append(available) + + provider.state.testsOnly_setInternalState(snapshot) + + XCTAssertEqual(1, snapshot.availableConnections.count) + XCTAssertEqual(1, snapshot.leasedConnections.count) + XCTAssertEqual(0, snapshot.waiters.count) + XCTAssertEqual(0, snapshot.pending) + XCTAssertEqual(2, snapshot.openedConnectionsCount) + + let action = provider.state.release(connection: connection, closing: true) + switch action { + case .none: + snapshot = provider.state.testsOnly_getInternalState() + XCTAssertEqual(1, snapshot.availableConnections.count) + XCTAssertEqual(0, snapshot.leasedConnections.count) + XCTAssertEqual(0, snapshot.waiters.count) + XCTAssertEqual(0, snapshot.pending) + XCTAssertEqual(1, snapshot.openedConnectionsCount) + + // cleanup + provider.closePromise.succeed(()) + default: + XCTFail("Unexpected action: \(action)") + } + } + + func testReleaseAliveConnectionHasWaiter() throws { + let channel = ActiveChannel() + + let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) + var snapshot = provider.state.testsOnly_getInternalState() + + snapshot.pending = 0 + snapshot.openedConnectionsCount = 1 + let connection = Connection(channel: channel, provider: provider) + snapshot.leasedConnections.insert(ConnectionKey(connection)) + snapshot.waiters.append(.init(promise: channel.eventLoop.makePromise(), setupComplete: channel.eventLoop.makeSucceededFuture(()), preference: .indifferent)) + + provider.state.testsOnly_setInternalState(snapshot) + + XCTAssertEqual(0, snapshot.availableConnections.count) + XCTAssertEqual(1, snapshot.leasedConnections.count) + XCTAssertEqual(1, snapshot.waiters.count) + XCTAssertEqual(0, snapshot.pending) + XCTAssertEqual(1, snapshot.openedConnectionsCount) + + let action = provider.state.release(connection: connection, closing: false) + switch action { + case .lease(let connection, let waiter): + // XCTAssertTrue(connection.isInUse) + snapshot = provider.state.testsOnly_getInternalState() + XCTAssertEqual(0, snapshot.availableConnections.count) + XCTAssertEqual(1, snapshot.leasedConnections.count) + XCTAssertEqual(0, snapshot.waiters.count) + XCTAssertEqual(0, snapshot.pending) + XCTAssertEqual(1, snapshot.openedConnectionsCount) + + // cleanup + waiter.promise.succeed(connection) + snapshot.openedConnectionsCount = 0 + provider.state.testsOnly_setInternalState(snapshot) + default: + XCTFail("Unexpected action: \(action)") + } + } + + func testReleaseInactiveConnectionHasWaitersNoConnections() throws { + let channel = EmbeddedChannel() + + let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) + var snapshot = provider.state.testsOnly_getInternalState() + + snapshot.pending = 0 + snapshot.openedConnectionsCount = 1 + let connection = Connection(channel: channel, provider: provider) + snapshot.leasedConnections.insert(ConnectionKey(connection)) + snapshot.waiters.append(.init(promise: channel.eventLoop.makePromise(), setupComplete: channel.eventLoop.makeSucceededFuture(()), preference: .indifferent)) + + provider.state.testsOnly_setInternalState(snapshot) + + XCTAssertEqual(0, snapshot.availableConnections.count) + XCTAssertEqual(1, snapshot.leasedConnections.count) + XCTAssertEqual(1, snapshot.waiters.count) + XCTAssertEqual(0, snapshot.pending) + XCTAssertEqual(1, snapshot.openedConnectionsCount) + + let action = provider.state.release(connection: connection, closing: true) + switch action { + case .create(let waiter): + snapshot = provider.state.testsOnly_getInternalState() + XCTAssertEqual(0, snapshot.availableConnections.count) + XCTAssertEqual(0, snapshot.leasedConnections.count) + XCTAssertEqual(0, snapshot.waiters.count) + XCTAssertEqual(0, snapshot.pending) + XCTAssertEqual(1, snapshot.openedConnectionsCount) + + // cleanup + waiter.promise.fail(TempError()) + snapshot.openedConnectionsCount = 0 + provider.state.testsOnly_setInternalState(snapshot) + provider.closePromise.succeed(()) + default: + XCTFail("Unexpected action: \(action)") + } + } + + func testReleaseInactiveConnectionHasWaitersHasConnections() throws { + let channel = EmbeddedChannel() + + let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) + var snapshot = provider.state.testsOnly_getInternalState() + + snapshot.pending = 0 + snapshot.openedConnectionsCount = 2 + let connection = Connection(channel: channel, provider: provider) + snapshot.leasedConnections.insert(ConnectionKey(connection)) + snapshot.waiters.append(.init(promise: channel.eventLoop.makePromise(), setupComplete: channel.eventLoop.makeSucceededFuture(()), preference: .indifferent)) + + let available = Connection(channel: channel, provider: provider) + snapshot.availableConnections.append(available) + + provider.state.testsOnly_setInternalState(snapshot) + + XCTAssertEqual(1, snapshot.availableConnections.count) + XCTAssertEqual(1, snapshot.leasedConnections.count) + XCTAssertEqual(1, snapshot.waiters.count) + XCTAssertEqual(0, snapshot.pending) + XCTAssertEqual(2, snapshot.openedConnectionsCount) + + let action = provider.state.release(connection: connection, closing: false) + switch action { + case .lease(let connection, let waiter): + snapshot = provider.state.testsOnly_getInternalState() + XCTAssertEqual(0, snapshot.availableConnections.count) + XCTAssertEqual(1, snapshot.leasedConnections.count) + XCTAssertEqual(0, snapshot.waiters.count) + XCTAssertEqual(0, snapshot.pending) + XCTAssertEqual(1, snapshot.openedConnectionsCount) + + // cleanup + waiter.promise.succeed(connection) + snapshot.openedConnectionsCount = 0 + provider.state.testsOnly_setInternalState(snapshot) + provider.closePromise.succeed(()) + default: + XCTFail("Unexpected action: \(action)") + } + } + + // MARK: - Release on Specific EL Tests + + func testReleaseAliveConnectionSameELHasWaiterSpecificEL() throws { + let channel = ActiveChannel() + + let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) + var snapshot = provider.state.testsOnly_getInternalState() + + snapshot.pending = 0 + snapshot.openedConnectionsCount = 1 + let connection = Connection(channel: channel, provider: provider) + snapshot.leasedConnections.insert(ConnectionKey(connection)) + snapshot.waiters.append(.init(promise: channel.eventLoop.makePromise(), setupComplete: channel.eventLoop.makeSucceededFuture(()), preference: .delegateAndChannel(on: channel.eventLoop))) + + provider.state.testsOnly_setInternalState(snapshot) + + XCTAssertEqual(0, snapshot.availableConnections.count) + XCTAssertEqual(1, snapshot.leasedConnections.count) + XCTAssertEqual(1, snapshot.waiters.count) + XCTAssertEqual(0, snapshot.pending) + XCTAssertEqual(1, snapshot.openedConnectionsCount) + + let action = provider.state.release(connection: connection, closing: false) + switch action { + case .lease(let connection, let waiter): + snapshot = provider.state.testsOnly_getInternalState() + XCTAssertTrue(snapshot.leasedConnections.contains(ConnectionKey(connection))) + XCTAssertEqual(0, snapshot.availableConnections.count) + XCTAssertEqual(1, snapshot.leasedConnections.count) + XCTAssertEqual(0, snapshot.waiters.count) + XCTAssertEqual(0, snapshot.pending) + XCTAssertEqual(1, snapshot.openedConnectionsCount) + + // cleanup + waiter.promise.succeed(connection) + snapshot.openedConnectionsCount = 0 + provider.state.testsOnly_setInternalState(snapshot) + provider.closePromise.succeed(()) + default: + XCTFail("Unexpected action: \(action)") + } + } + + func testReleaseAliveConnectionDifferentELNoSameELConnectionsHasWaiterSpecificEL() throws { + let channel = ActiveChannel() + let eventLoop = EmbeddedEventLoop() + + let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) + var snapshot = provider.state.testsOnly_getInternalState() + + snapshot.pending = 0 + snapshot.openedConnectionsCount = 1 + let connection = Connection(channel: channel, provider: provider) + snapshot.leasedConnections.insert(ConnectionKey(connection)) + snapshot.waiters.append(.init(promise: channel.eventLoop.makePromise(), setupComplete: eventLoop.makeSucceededFuture(()), preference: .delegateAndChannel(on: eventLoop))) + + provider.state.testsOnly_setInternalState(snapshot) + + XCTAssertEqual(0, snapshot.availableConnections.count) + XCTAssertEqual(1, snapshot.leasedConnections.count) + XCTAssertEqual(1, snapshot.waiters.count) + XCTAssertEqual(0, snapshot.pending) + XCTAssertEqual(1, snapshot.openedConnectionsCount) + + let action = provider.state.release(connection: connection, closing: false) + switch action { + case .parkAnd(let connection, .create(let waiter)): + snapshot = provider.state.testsOnly_getInternalState() + XCTAssertFalse(snapshot.leasedConnections.contains(ConnectionKey(connection))) + XCTAssertEqual(1, snapshot.availableConnections.count) + XCTAssertEqual(0, snapshot.leasedConnections.count) + XCTAssertEqual(0, snapshot.waiters.count) + XCTAssertEqual(0, snapshot.pending) + XCTAssertEqual(2, snapshot.openedConnectionsCount) + + // cleanup + waiter.promise.succeed(connection) + snapshot.openedConnectionsCount = 0 + provider.state.testsOnly_setInternalState(snapshot) + provider.closePromise.succeed(()) + default: + XCTFail("Unexpected action: \(action)") + } + } + + func testReleaseAliveConnectionDifferentELHasSameELConnectionsHasWaiterSpecificEL() throws { + let channel = ActiveChannel() + let otherChannel = EmbeddedChannel() + + let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) + var snapshot = provider.state.testsOnly_getInternalState() + + snapshot.pending = 0 + snapshot.openedConnectionsCount = 2 + let connection = Connection(channel: channel, provider: provider) + snapshot.leasedConnections.insert(ConnectionKey(connection)) + snapshot.waiters.append(.init(promise: channel.eventLoop.makePromise(), setupComplete: channel.eventLoop.makeSucceededFuture(()), preference: .delegateAndChannel(on: otherChannel.eventLoop))) + + let available = Connection(channel: otherChannel, provider: provider) + snapshot.availableConnections.append(available) + + provider.state.testsOnly_setInternalState(snapshot) + + XCTAssertEqual(1, snapshot.availableConnections.count) + XCTAssertEqual(1, snapshot.leasedConnections.count) + XCTAssertEqual(1, snapshot.waiters.count) + XCTAssertEqual(0, snapshot.pending) + XCTAssertEqual(2, snapshot.openedConnectionsCount) + + let action = provider.state.release(connection: connection, closing: false) + switch action { + case .parkAnd(let connection, .lease(let replacement, let waiter)): + snapshot = provider.state.testsOnly_getInternalState() + XCTAssertFalse(snapshot.leasedConnections.contains(ConnectionKey(connection))) + XCTAssertTrue(snapshot.leasedConnections.contains(ConnectionKey(replacement))) + XCTAssertEqual(1, snapshot.availableConnections.count) + XCTAssertEqual(1, snapshot.leasedConnections.count) + XCTAssertEqual(0, snapshot.waiters.count) + XCTAssertEqual(0, snapshot.pending) + XCTAssertEqual(2, snapshot.openedConnectionsCount) + + // cleanup + waiter.promise.succeed(replacement) + snapshot.openedConnectionsCount = 0 + provider.state.testsOnly_setInternalState(snapshot) + provider.closePromise.succeed(()) + default: + XCTFail("Unexpected action: \(action)") + } + } + + func testReleaseAliveConnectionDifferentELNoSameELConnectionsOnLimitHasWaiterSpecificEL() throws { + let channel = ActiveChannel() + let otherChannel = EmbeddedChannel() + + let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) + var snapshot = provider.state.testsOnly_getInternalState() + + snapshot.pending = 0 + snapshot.openedConnectionsCount = 8 + let connection = Connection(channel: channel, provider: provider) + snapshot.leasedConnections.insert(ConnectionKey(connection)) + snapshot.waiters.append(.init(promise: channel.eventLoop.makePromise(), setupComplete: channel.eventLoop.makeSucceededFuture(()), preference: .delegateAndChannel(on: otherChannel.eventLoop))) + + let available = Connection(channel: channel, provider: provider) + snapshot.availableConnections.append(available) + + provider.state.testsOnly_setInternalState(snapshot) + + XCTAssertEqual(1, snapshot.availableConnections.count) + XCTAssertEqual(1, snapshot.leasedConnections.count) + XCTAssertEqual(1, snapshot.waiters.count) + XCTAssertEqual(0, snapshot.pending) + XCTAssertEqual(8, snapshot.openedConnectionsCount) + + let action = provider.state.release(connection: connection, closing: false) + switch action { + case .replace(let connection, let waiter): + snapshot = provider.state.testsOnly_getInternalState() + XCTAssertTrue(snapshot.leasedConnections.contains(ConnectionKey(connection))) + XCTAssertEqual(1, snapshot.availableConnections.count) + XCTAssertEqual(1, snapshot.leasedConnections.count) + XCTAssertEqual(0, snapshot.waiters.count) + XCTAssertEqual(0, snapshot.pending) + XCTAssertEqual(8, snapshot.openedConnectionsCount) + + // cleanup + waiter.promise.fail(TempError()) + snapshot.openedConnectionsCount = 0 + provider.state.testsOnly_setInternalState(snapshot) + provider.closePromise.succeed(()) + default: + XCTFail("Unexpected action: \(action)") + } + } + + func testReleaseInactiveConnectionHasWaitersHasSameELConnectionsSpecificEL() throws { + let channel = EmbeddedChannel() + let otherChannel = EmbeddedChannel() + + let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) + var snapshot = provider.state.testsOnly_getInternalState() + + snapshot.pending = 0 + snapshot.openedConnectionsCount = 2 + let connection = Connection(channel: channel, provider: provider) + snapshot.leasedConnections.insert(ConnectionKey(connection)) + snapshot.waiters.append(.init(promise: channel.eventLoop.makePromise(), setupComplete: channel.eventLoop.makeSucceededFuture(()), preference: .delegateAndChannel(on: otherChannel.eventLoop))) + + let available = Connection(channel: otherChannel, provider: provider) + snapshot.availableConnections.append(available) + + provider.state.testsOnly_setInternalState(snapshot) + + XCTAssertEqual(1, snapshot.availableConnections.count) + XCTAssertEqual(1, snapshot.leasedConnections.count) + XCTAssertEqual(1, snapshot.waiters.count) + XCTAssertEqual(0, snapshot.pending) + XCTAssertEqual(2, snapshot.openedConnectionsCount) + + let action = provider.state.release(connection: connection, closing: false) + switch action { + case .lease(let connection, let waiter): + snapshot = provider.state.testsOnly_getInternalState() + XCTAssertTrue(connection === available) + XCTAssertTrue(snapshot.leasedConnections.contains(ConnectionKey(connection))) + XCTAssertEqual(0, snapshot.availableConnections.count) + XCTAssertEqual(1, snapshot.leasedConnections.count) + XCTAssertEqual(0, snapshot.waiters.count) + XCTAssertEqual(0, snapshot.pending) + XCTAssertEqual(1, snapshot.openedConnectionsCount) + + // cleanup + waiter.promise.succeed(connection) + snapshot.openedConnectionsCount = 0 + provider.state.testsOnly_setInternalState(snapshot) + provider.closePromise.succeed(()) + default: + XCTFail("Unexpected action: \(action)") + } + } + + func testReleaseInactiveConnectionHasWaitersNoSameELConnectionsSpecificEL() throws { + let channel = EmbeddedChannel() + let otherChannel = EmbeddedChannel() + + let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) + var snapshot = provider.state.testsOnly_getInternalState() + + snapshot.pending = 0 + snapshot.openedConnectionsCount = 2 + let connection = Connection(channel: channel, provider: provider) + snapshot.leasedConnections.insert(ConnectionKey(connection)) + snapshot.waiters.append(.init(promise: channel.eventLoop.makePromise(), setupComplete: channel.eventLoop.makeSucceededFuture(()), preference: .delegateAndChannel(on: otherChannel.eventLoop))) + + let available = Connection(channel: channel, provider: provider) + snapshot.availableConnections.append(available) + + provider.state.testsOnly_setInternalState(snapshot) + + XCTAssertEqual(1, snapshot.availableConnections.count) + XCTAssertEqual(1, snapshot.leasedConnections.count) + XCTAssertEqual(1, snapshot.waiters.count) + XCTAssertEqual(0, snapshot.pending) + XCTAssertEqual(2, snapshot.openedConnectionsCount) + + let action = provider.state.release(connection: connection, closing: false) + switch action { + case .create(let waiter): + snapshot = provider.state.testsOnly_getInternalState() + XCTAssertEqual(1, snapshot.availableConnections.count) + XCTAssertEqual(0, snapshot.leasedConnections.count) + XCTAssertEqual(0, snapshot.waiters.count) + XCTAssertEqual(0, snapshot.pending) + XCTAssertEqual(2, snapshot.openedConnectionsCount) + + // cleanup + waiter.promise.fail(TempError()) + snapshot.openedConnectionsCount = 0 + provider.state.testsOnly_setInternalState(snapshot) + provider.closePromise.succeed(()) + default: + XCTFail("Unexpected action: \(action)") + } + } + + // MARK: - Next Waiter Tests + + func testNextWaiterEmptyQueue() throws { + let channel = ActiveChannel() + + let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) + var snapshot = provider.state.testsOnly_getInternalState() + + snapshot.pending = 0 + provider.state.testsOnly_setInternalState(snapshot) + + XCTAssertEqual(0, snapshot.availableConnections.count) + XCTAssertEqual(0, snapshot.leasedConnections.count) + XCTAssertEqual(0, snapshot.waiters.count) + XCTAssertEqual(0, snapshot.pending) + XCTAssertEqual(0, snapshot.openedConnectionsCount) + + let action = provider.state.processNextWaiter() + switch action { + case .closeProvider: + snapshot = provider.state.testsOnly_getInternalState() + XCTAssertEqual(0, snapshot.availableConnections.count) + XCTAssertEqual(0, snapshot.leasedConnections.count) + XCTAssertEqual(0, snapshot.waiters.count) + XCTAssertEqual(0, snapshot.pending) + XCTAssertEqual(0, snapshot.openedConnectionsCount) + + // cleanup + provider.closePromise.succeed(()) + default: + XCTFail("Unexpected action: \(action)") + } + } + + func testNextWaiterEmptyQueueHasConnections() throws { + let channel = ActiveChannel() + + let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) + var snapshot = provider.state.testsOnly_getInternalState() + + snapshot.pending = 0 + snapshot.openedConnectionsCount = 1 + + let available = Connection(channel: channel, provider: provider) + snapshot.availableConnections.append(available) + + provider.state.testsOnly_setInternalState(snapshot) + + XCTAssertEqual(1, snapshot.availableConnections.count) + XCTAssertEqual(0, snapshot.leasedConnections.count) + XCTAssertEqual(0, snapshot.waiters.count) + XCTAssertEqual(0, snapshot.pending) + XCTAssertEqual(1, snapshot.openedConnectionsCount) + + let action = provider.state.processNextWaiter() + switch action { + case .none: + snapshot = provider.state.testsOnly_getInternalState() + XCTAssertEqual(1, snapshot.availableConnections.count) + XCTAssertEqual(0, snapshot.leasedConnections.count) + XCTAssertEqual(0, snapshot.waiters.count) + XCTAssertEqual(0, snapshot.pending) + XCTAssertEqual(1, snapshot.openedConnectionsCount) + + // cleanup + provider.closePromise.succeed(()) + default: + XCTFail("Unexpected action: \(action)") + } + } + + func testNextWaiterHasWaitersHasConnections() throws { + let channel = EmbeddedChannel() + + let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) + var snapshot = provider.state.testsOnly_getInternalState() + + snapshot.pending = 0 + snapshot.openedConnectionsCount = 1 + snapshot.waiters.append(.init(promise: channel.eventLoop.makePromise(), setupComplete: channel.eventLoop.makeSucceededFuture(()), preference: .indifferent)) + + let available = Connection(channel: channel, provider: provider) + snapshot.availableConnections.append(available) + + provider.state.testsOnly_setInternalState(snapshot) + + XCTAssertEqual(1, snapshot.availableConnections.count) + XCTAssertEqual(0, snapshot.leasedConnections.count) + XCTAssertEqual(1, snapshot.waiters.count) + XCTAssertEqual(0, snapshot.pending) + XCTAssertEqual(1, snapshot.openedConnectionsCount) + + let action = provider.state.processNextWaiter() + switch action { + case .lease(let connection, let waiter): + snapshot = provider.state.testsOnly_getInternalState() + XCTAssertTrue(snapshot.leasedConnections.contains(ConnectionKey(connection))) + XCTAssertEqual(0, snapshot.availableConnections.count) + XCTAssertEqual(1, snapshot.leasedConnections.count) + XCTAssertEqual(0, snapshot.waiters.count) + XCTAssertEqual(0, snapshot.pending) + XCTAssertEqual(1, snapshot.openedConnectionsCount) + + // cleanup + waiter.promise.succeed(connection) + snapshot.openedConnectionsCount = 0 + provider.state.testsOnly_setInternalState(snapshot) + provider.closePromise.succeed(()) + default: + XCTFail("Unexpected action: \(action)") + } + } + + func testNextWaiterHasWaitersHasSameELConnectionsSpecificEL() throws { + let channel = EmbeddedChannel() + + let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) + var snapshot = provider.state.testsOnly_getInternalState() + + snapshot.pending = 0 + snapshot.openedConnectionsCount = 1 + snapshot.waiters.append(.init(promise: channel.eventLoop.makePromise(), setupComplete: channel.eventLoop.makeSucceededFuture(()), preference: .delegateAndChannel(on: channel.eventLoop))) + + let available = Connection(channel: channel, provider: provider) + snapshot.availableConnections.append(available) + + provider.state.testsOnly_setInternalState(snapshot) + + XCTAssertEqual(1, snapshot.availableConnections.count) + XCTAssertEqual(0, snapshot.leasedConnections.count) + XCTAssertEqual(1, snapshot.waiters.count) + XCTAssertEqual(0, snapshot.pending) + XCTAssertEqual(1, snapshot.openedConnectionsCount) + + let action = provider.state.processNextWaiter() + switch action { + case .lease(let connection, let waiter): + var snapshot = provider.state.testsOnly_getInternalState() + XCTAssertTrue(snapshot.leasedConnections.contains(ConnectionKey(connection))) + XCTAssertEqual(0, snapshot.availableConnections.count) + XCTAssertEqual(1, snapshot.leasedConnections.count) + XCTAssertEqual(0, snapshot.waiters.count) + XCTAssertEqual(0, snapshot.pending) + XCTAssertEqual(1, snapshot.openedConnectionsCount) + + // cleanup + waiter.promise.succeed(connection) + snapshot.openedConnectionsCount = 0 + provider.state.testsOnly_setInternalState(snapshot) + provider.closePromise.succeed(()) + default: + XCTFail("Unexpected action: \(action)") + } + } + + func testNextWaiterHasWaitersHasDifferentELConnectionsSpecificEL() throws { + let channel = EmbeddedChannel() + let eventLoop = EmbeddedEventLoop() + + let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) + var snapshot = provider.state.testsOnly_getInternalState() + + snapshot.pending = 0 + snapshot.openedConnectionsCount = 1 + snapshot.waiters.append(.init(promise: channel.eventLoop.makePromise(), setupComplete: eventLoop.makeSucceededFuture(()), preference: .delegateAndChannel(on: eventLoop))) + + let available = Connection(channel: channel, provider: provider) + snapshot.availableConnections.append(available) + + provider.state.testsOnly_setInternalState(snapshot) + + XCTAssertEqual(1, snapshot.availableConnections.count) + XCTAssertEqual(0, snapshot.leasedConnections.count) + XCTAssertEqual(1, snapshot.waiters.count) + XCTAssertEqual(0, snapshot.pending) + XCTAssertEqual(1, snapshot.openedConnectionsCount) + + let action = provider.state.processNextWaiter() + switch action { + case .create(let waiter): + snapshot = provider.state.testsOnly_getInternalState() + XCTAssertEqual(1, snapshot.availableConnections.count) + XCTAssertEqual(0, snapshot.leasedConnections.count) + XCTAssertEqual(0, snapshot.waiters.count) + XCTAssertEqual(0, snapshot.pending) + XCTAssertEqual(2, snapshot.openedConnectionsCount) + + // cleanup + waiter.promise.fail(TempError()) + snapshot.openedConnectionsCount = 0 + provider.state.testsOnly_setInternalState(snapshot) + provider.closePromise.succeed(()) + default: + XCTFail("Unexpected action: \(action)") + } + } + + // MARK: - Timeout and Remote Close Tests + + func testTimeoutLeasedConnection() throws { + let channel = EmbeddedChannel() + + let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) + var snapshot = provider.state.testsOnly_getInternalState() + + let connection = Connection(channel: channel, provider: provider) + snapshot.pending = 0 + snapshot.openedConnectionsCount = 1 + snapshot.leasedConnections.insert(ConnectionKey(connection)) + + provider.state.testsOnly_setInternalState(snapshot) + + XCTAssertEqual(0, snapshot.availableConnections.count) + XCTAssertEqual(1, snapshot.leasedConnections.count) + XCTAssertEqual(0, snapshot.waiters.count) + XCTAssertEqual(0, snapshot.pending) + XCTAssertEqual(1, snapshot.openedConnectionsCount) + + let action = provider.state.timeout(connection: connection) + switch action { + case .none: + snapshot = provider.state.testsOnly_getInternalState() + XCTAssertEqual(0, snapshot.availableConnections.count) + XCTAssertEqual(1, snapshot.leasedConnections.count) + XCTAssertEqual(0, snapshot.waiters.count) + XCTAssertEqual(0, snapshot.pending) + XCTAssertEqual(1, snapshot.openedConnectionsCount) + + // cleanup + provider.closePromise.succeed(()) + default: + XCTFail("Unexpected action: \(action)") + } + } + + func testTimeoutAvailableConnection() throws { + let channel = EmbeddedChannel() + + let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) + var snapshot = provider.state.testsOnly_getInternalState() + + let connection = Connection(channel: channel, provider: provider) + snapshot.pending = 0 + snapshot.openedConnectionsCount = 1 + snapshot.availableConnections.append(connection) + + provider.state.testsOnly_setInternalState(snapshot) + + XCTAssertEqual(1, snapshot.availableConnections.count) + XCTAssertEqual(0, snapshot.leasedConnections.count) + XCTAssertEqual(0, snapshot.waiters.count) + XCTAssertEqual(0, snapshot.pending) + XCTAssertEqual(1, snapshot.openedConnectionsCount) + + let action = provider.state.timeout(connection: connection) + switch action { + case .closeAnd(_, let after): + switch after { + case .closeProvider: + break + default: + XCTFail("Unexpected action: \(action)") + } + snapshot = provider.state.testsOnly_getInternalState() + XCTAssertEqual(0, snapshot.availableConnections.count) + XCTAssertEqual(0, snapshot.leasedConnections.count) + XCTAssertEqual(0, snapshot.waiters.count) + XCTAssertEqual(0, snapshot.pending) + XCTAssertEqual(0, snapshot.openedConnectionsCount) + + // cleanup + provider.closePromise.succeed(()) + default: + XCTFail("Unexpected action: \(action)") + } + } + + func testRemoteClosedLeasedConnection() throws { + let channel = EmbeddedChannel() + + let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) + var snapshot = provider.state.testsOnly_getInternalState() + + let connection = Connection(channel: channel, provider: provider) + snapshot.pending = 0 + snapshot.openedConnectionsCount = 1 + snapshot.leasedConnections.insert(ConnectionKey(connection)) + + provider.state.testsOnly_setInternalState(snapshot) + + XCTAssertEqual(0, snapshot.availableConnections.count) + XCTAssertEqual(1, snapshot.leasedConnections.count) + XCTAssertEqual(0, snapshot.waiters.count) + XCTAssertEqual(0, snapshot.pending) + XCTAssertEqual(1, snapshot.openedConnectionsCount) + + let action = provider.state.remoteClosed(connection: connection) + switch action { + case .none: + snapshot = provider.state.testsOnly_getInternalState() + XCTAssertEqual(0, snapshot.availableConnections.count) + XCTAssertEqual(1, snapshot.leasedConnections.count) + XCTAssertEqual(0, snapshot.waiters.count) + XCTAssertEqual(0, snapshot.pending) + XCTAssertEqual(1, snapshot.openedConnectionsCount) + + // cleanup + provider.closePromise.succeed(()) + default: + XCTFail("Unexpected action: \(action)") + } + } + + func testRemoteClosedAvailableConnection() throws { + let channel = EmbeddedChannel() + + let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) + var snapshot = provider.state.testsOnly_getInternalState() + + let connection = Connection(channel: channel, provider: provider) + snapshot.pending = 0 + snapshot.openedConnectionsCount = 1 + snapshot.availableConnections.append(connection) + + provider.state.testsOnly_setInternalState(snapshot) + + XCTAssertEqual(1, snapshot.availableConnections.count) + XCTAssertEqual(0, snapshot.leasedConnections.count) + XCTAssertEqual(0, snapshot.waiters.count) + XCTAssertEqual(0, snapshot.pending) + XCTAssertEqual(1, snapshot.openedConnectionsCount) + + let action = provider.state.remoteClosed(connection: connection) + switch action { + case .closeProvider: + snapshot = provider.state.testsOnly_getInternalState() + XCTAssertEqual(0, snapshot.availableConnections.count) + XCTAssertEqual(0, snapshot.leasedConnections.count) + XCTAssertEqual(0, snapshot.waiters.count) + XCTAssertEqual(0, snapshot.pending) + XCTAssertEqual(0, snapshot.openedConnectionsCount) + + // cleanup + provider.closePromise.succeed(()) + default: + XCTFail("Unexpected action: \(action)") + } + } + + // MARK: - Connection Tests + + func testConnectionReleaseActive() throws { + let channel = ActiveChannel() + + let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) + var snapshot = provider.state.testsOnly_getInternalState() + + let connection = Connection(channel: channel, provider: provider) + snapshot.openedConnectionsCount = 1 + snapshot.leasedConnections.insert(ConnectionKey(connection)) + + provider.state.testsOnly_setInternalState(snapshot) + + XCTAssertEqual(0, snapshot.availableConnections.count) + XCTAssertEqual(1, snapshot.leasedConnections.count) + XCTAssertEqual(0, snapshot.waiters.count) + XCTAssertEqual(1, snapshot.pending) + XCTAssertEqual(1, snapshot.openedConnectionsCount) + + connection.release(closing: false) + + // XCTAssertFalse(connection.isInUse) + snapshot = provider.state.testsOnly_getInternalState() + XCTAssertEqual(1, snapshot.availableConnections.count) + XCTAssertEqual(0, snapshot.leasedConnections.count) + XCTAssertEqual(0, snapshot.waiters.count) + XCTAssertEqual(1, snapshot.pending) + XCTAssertEqual(1, snapshot.openedConnectionsCount) + + // cleanup + snapshot.pending = 0 + provider.state.testsOnly_setInternalState(snapshot) + provider.closePromise.succeed(()) + } + + func testConnectionReleaseInactive() throws { + let channel = EmbeddedChannel() + + let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) + var snapshot = provider.state.testsOnly_getInternalState() + + let connection = Connection(channel: channel, provider: provider) + snapshot.openedConnectionsCount = 1 + snapshot.leasedConnections.insert(ConnectionKey(connection)) + + provider.state.testsOnly_setInternalState(snapshot) + + XCTAssertEqual(0, snapshot.availableConnections.count) + XCTAssertEqual(1, snapshot.leasedConnections.count) + XCTAssertEqual(0, snapshot.waiters.count) + XCTAssertEqual(1, snapshot.pending) + XCTAssertEqual(1, snapshot.openedConnectionsCount) + + connection.release(closing: true) + + snapshot = provider.state.testsOnly_getInternalState() + XCTAssertEqual(0, snapshot.availableConnections.count) + XCTAssertEqual(0, snapshot.leasedConnections.count) + XCTAssertEqual(0, snapshot.waiters.count) + XCTAssertEqual(1, snapshot.pending) + XCTAssertEqual(0, snapshot.openedConnectionsCount) + + // cleanup + snapshot.pending = 0 + provider.state.testsOnly_setInternalState(snapshot) + provider.closePromise.succeed(()) + } + + func testConnectionRemoteCloseRelease() throws { + let channel = EmbeddedChannel() + + let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) + var snapshot = provider.state.testsOnly_getInternalState() + + let connection = Connection(channel: channel, provider: provider) + snapshot.availableConnections.append(connection) + snapshot.openedConnectionsCount = 1 + + provider.state.testsOnly_setInternalState(snapshot) + + XCTAssertEqual(1, snapshot.availableConnections.count) + XCTAssertEqual(0, snapshot.leasedConnections.count) + XCTAssertEqual(0, snapshot.waiters.count) + XCTAssertEqual(1, snapshot.pending) + XCTAssertEqual(1, snapshot.openedConnectionsCount) + + connection.remoteClosed() + + snapshot = provider.state.testsOnly_getInternalState() + XCTAssertEqual(0, snapshot.availableConnections.count) + XCTAssertEqual(0, snapshot.leasedConnections.count) + XCTAssertEqual(0, snapshot.waiters.count) + XCTAssertEqual(1, snapshot.pending) + XCTAssertEqual(0, snapshot.openedConnectionsCount) + + // cleanup + snapshot.pending = 0 + provider.state.testsOnly_setInternalState(snapshot) + provider.closePromise.succeed(()) + } + + func testConnectionTimeoutRelease() throws { + let channel = EmbeddedChannel() + + let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: channel.eventLoop, configuration: .init(), pool: .init(configuration: .init())) + var snapshot = provider.state.testsOnly_getInternalState() + + let connection = Connection(channel: channel, provider: provider) + snapshot.availableConnections.append(connection) + snapshot.openedConnectionsCount = 1 + + provider.state.testsOnly_setInternalState(snapshot) + + XCTAssertEqual(1, snapshot.availableConnections.count) + XCTAssertEqual(0, snapshot.leasedConnections.count) + XCTAssertEqual(0, snapshot.waiters.count) + XCTAssertEqual(1, snapshot.pending) + XCTAssertEqual(1, snapshot.openedConnectionsCount) + + connection.timeout() + + snapshot = provider.state.testsOnly_getInternalState() + XCTAssertEqual(0, snapshot.availableConnections.count) + XCTAssertEqual(0, snapshot.leasedConnections.count) + XCTAssertEqual(0, snapshot.waiters.count) + XCTAssertEqual(1, snapshot.pending) + XCTAssertEqual(0, snapshot.openedConnectionsCount) + + // cleanup + snapshot.pending = 0 + provider.state.testsOnly_setInternalState(snapshot) + provider.closePromise.succeed(()) + } + + func testAcquireAvailableBecomesUnavailable() throws { + let eventLoop = EmbeddedEventLoop() + let channel = ActiveChannel() + + let provider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), eventLoop: eventLoop, configuration: .init(), pool: .init(configuration: .init())) + var snapshot = provider.state.testsOnly_getInternalState() + + let connection = Connection(channel: channel, provider: provider) + snapshot.availableConnections.append(connection) + snapshot.openedConnectionsCount = 1 + + provider.state.testsOnly_setInternalState(snapshot) + + XCTAssertEqual(1, snapshot.availableConnections.count) + XCTAssertEqual(0, snapshot.leasedConnections.count) + XCTAssertEqual(0, snapshot.waiters.count) + XCTAssertEqual(1, snapshot.pending) + XCTAssertEqual(1, snapshot.openedConnectionsCount) + + let action = provider.state.acquire(waiter: .init(promise: eventLoop.makePromise(), setupComplete: eventLoop.makeSucceededFuture(()), preference: .indifferent)) + switch action { + case .lease(let connection, let waiter): + // Since this connection is already in use, this should be a no-op and state should not have changed from normal lease + connection.timeout() + + snapshot = provider.state.testsOnly_getInternalState() + XCTAssertTrue(connection.isActiveEstimation) + XCTAssertTrue(snapshot.leasedConnections.contains(ConnectionKey(connection))) + XCTAssertEqual(0, snapshot.availableConnections.count) + XCTAssertEqual(1, snapshot.leasedConnections.count) + XCTAssertEqual(0, snapshot.waiters.count) + XCTAssertEqual(0, snapshot.pending) + XCTAssertEqual(1, snapshot.openedConnectionsCount) + + // This is unrecoverable, but in this case we create a new connection, so state again should not change, even though release will be called + // This is important to prevent provider deletion since connection is released and there could be 0 waiters + connection.remoteClosed() + + snapshot = provider.state.testsOnly_getInternalState() + XCTAssertTrue(snapshot.leasedConnections.contains(ConnectionKey(connection))) + XCTAssertEqual(0, snapshot.availableConnections.count) + XCTAssertEqual(1, snapshot.leasedConnections.count) + XCTAssertEqual(0, snapshot.waiters.count) + XCTAssertEqual(0, snapshot.pending) + XCTAssertEqual(1, snapshot.openedConnectionsCount) + + waiter.promise.succeed(connection) + + // cleanup + snapshot.openedConnectionsCount = 0 + provider.state.testsOnly_setInternalState(snapshot) + provider.closePromise.succeed(()) + default: + XCTFail("Unexpected action: \(action)") + } + } +} + +class ActiveChannel: Channel { + var allocator: ByteBufferAllocator + var closeFuture: EventLoopFuture + var eventLoop: EventLoop + + var localAddress: SocketAddress? + var remoteAddress: SocketAddress? + var parent: Channel? + var isWritable: Bool = true + var isActive: Bool = true + + init() { + self.allocator = ByteBufferAllocator() + self.eventLoop = EmbeddedEventLoop() + self.closeFuture = self.eventLoop.makeSucceededFuture(()) + } + + var _channelCore: ChannelCore { + preconditionFailure("Not implemented") + } + + var pipeline: ChannelPipeline { + return ChannelPipeline(channel: self) + } + + func setOption