diff --git a/Sources/AsyncHTTPClient/Connection.swift b/Sources/AsyncHTTPClient/Connection.swift new file mode 100644 index 000000000..3923603ee --- /dev/null +++ b/Sources/AsyncHTTPClient/Connection.swift @@ -0,0 +1,174 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the AsyncHTTPClient open source project +// +// Copyright (c) 2019-2020 Apple Inc. and the AsyncHTTPClient project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Foundation +import Logging +import NIO +import NIOConcurrencyHelpers +import NIOHTTP1 +import NIOHTTPCompression +import NIOTLS +import NIOTransportServices + +/// A `Connection` represents a `Channel` in the context of the connection pool +/// +/// In the `ConnectionPool`, each `Channel` belongs to a given `HTTP1ConnectionProvider` +/// and has a certain "lease state" (see the `inUse` property). +/// The role of `Connection` is to model this by storing a `Channel` alongside its associated properties +/// so that they can be passed around together and correct provider can be identified when connection is released. +class Connection { + /// The provider this `Connection` belongs to. + /// + /// This enables calling methods like `release()` directly on a `Connection` instead of + /// calling `provider.release(connection)`. This gives a more object oriented feel to the API + /// and can avoid having to keep explicit references to the pool at call site. + private let provider: HTTP1ConnectionProvider + + /// The `Channel` of this `Connection` + /// + /// - Warning: Requests that lease connections from the `ConnectionPool` are responsible + /// for removing the specific handlers they added to the `Channel` pipeline before releasing it to the pool. + let channel: Channel + + init(channel: Channel, provider: HTTP1ConnectionProvider) { + self.channel = channel + self.provider = provider + } +} + +extension Connection { + /// Release this `Connection` to its associated `HTTP1ConnectionProvider`. + /// + /// - Warning: This only releases the connection and doesn't take care of cleaning handlers in the `Channel` pipeline. + func release(closing: Bool, logger: Logger) { + self.channel.eventLoop.assertInEventLoop() + self.provider.release(connection: self, closing: closing, logger: logger) + } + + /// Called when channel exceeds idle time in pool. + func timeout(logger: Logger) { + self.channel.eventLoop.assertInEventLoop() + self.provider.timeout(connection: self, logger: logger) + } + + /// Called when channel goes inactive while in the pool. + func remoteClosed(logger: Logger) { + self.channel.eventLoop.assertInEventLoop() + self.provider.remoteClosed(connection: self, logger: logger) + } + + /// Called from `HTTP1ConnectionProvider.close` when client is shutting down. + func close() -> EventLoopFuture { + return self.channel.close() + } +} + +/// Methods of Connection which are used in ConnectionsState extracted as protocol +/// to facilitate test of ConnectionsState. +protocol PoolManageableConnection: AnyObject { + func cancel() -> EventLoopFuture + var eventLoop: EventLoop { get } + var isActiveEstimation: Bool { get } +} + +/// Implementation of methods used by ConnectionsState and its tests to manage Connection +extension Connection: PoolManageableConnection { + /// Convenience property indicating whether the underlying `Channel` is active or not. + var isActiveEstimation: Bool { + return self.channel.isActive + } + + var eventLoop: EventLoop { + return self.channel.eventLoop + } + + func cancel() -> EventLoopFuture { + return self.channel.triggerUserOutboundEvent(TaskCancelEvent()) + } +} + +extension Connection { + /// Sets idle timeout handler and channel inactivity listener. + func setIdleTimeout(timeout: TimeAmount?, logger: Logger) { + _ = self.channel.pipeline.addHandler(IdleStateHandler(writeTimeout: timeout), position: .first).flatMap { _ in + self.channel.pipeline.addHandler(IdlePoolConnectionHandler(connection: self, logger: logger)) + } + } + + /// Removes idle timeout handler and channel inactivity listener + func cancelIdleTimeout() -> EventLoopFuture { + return self.removeHandler(IdleStateHandler.self).flatMap { _ in + self.removeHandler(IdlePoolConnectionHandler.self) + } + } +} + +class IdlePoolConnectionHandler: ChannelInboundHandler, RemovableChannelHandler { + typealias InboundIn = NIOAny + + let connection: Connection + var eventSent: Bool + let logger: Logger + + init(connection: Connection, logger: Logger) { + self.connection = connection + self.eventSent = false + self.logger = logger + } + + // this is needed to detect when remote end closes connection while connection is in the pool idling + func channelInactive(context: ChannelHandlerContext) { + if !self.eventSent { + self.eventSent = true + self.connection.remoteClosed(logger: self.logger) + } + } + + func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) { + if let idleEvent = event as? IdleStateHandler.IdleStateEvent, idleEvent == .write { + if !self.eventSent { + self.eventSent = true + self.connection.timeout(logger: self.logger) + } + } else { + context.fireUserInboundEventTriggered(event) + } + } +} + +extension Connection: CustomStringConvertible { + var description: String { + return "\(self.channel)" + } +} + +struct ConnectionKey: Hashable where ConnectionType: PoolManageableConnection { + let connection: ConnectionType + + init(_ connection: ConnectionType) { + self.connection = connection + } + + static func == (lhs: ConnectionKey, rhs: ConnectionKey) -> Bool { + return ObjectIdentifier(lhs.connection) == ObjectIdentifier(rhs.connection) + } + + func hash(into hasher: inout Hasher) { + hasher.combine(ObjectIdentifier(self.connection)) + } + + func cancel() -> EventLoopFuture { + return self.connection.cancel() + } +} diff --git a/Sources/AsyncHTTPClient/ConnectionPool.swift b/Sources/AsyncHTTPClient/ConnectionPool.swift index e482ceba6..ed9bcbd8c 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool.swift @@ -163,101 +163,6 @@ final class ConnectionPool { } } -/// A `Connection` represents a `Channel` in the context of the connection pool -/// -/// In the `ConnectionPool`, each `Channel` belongs to a given `HTTP1ConnectionProvider` -/// and has a certain "lease state" (see the `inUse` property). -/// The role of `Connection` is to model this by storing a `Channel` alongside its associated properties -/// so that they can be passed around together and correct provider can be identified when connection is released. -class Connection { - /// The provider this `Connection` belongs to. - /// - /// This enables calling methods like `release()` directly on a `Connection` instead of - /// calling `provider.release(connection)`. This gives a more object oriented feel to the API - /// and can avoid having to keep explicit references to the pool at call site. - let provider: HTTP1ConnectionProvider - - /// The `Channel` of this `Connection` - /// - /// - Warning: Requests that lease connections from the `ConnectionPool` are responsible - /// for removing the specific handlers they added to the `Channel` pipeline before releasing it to the pool. - let channel: Channel - - init(channel: Channel, provider: HTTP1ConnectionProvider) { - self.channel = channel - self.provider = provider - } - - /// Convenience property indicating wether the underlying `Channel` is active or not. - var isActiveEstimation: Bool { - return self.channel.isActive - } - - /// Release this `Connection` to its associated `HTTP1ConnectionProvider`. - /// - /// - Warning: This only releases the connection and doesn't take care of cleaning handlers in the `Channel` pipeline. - func release(closing: Bool, logger: Logger) { - self.channel.eventLoop.assertInEventLoop() - self.provider.release(connection: self, closing: closing, logger: logger) - } - - /// Called when channel exceeds idle time in pool. - func timeout(logger: Logger) { - self.channel.eventLoop.assertInEventLoop() - self.provider.timeout(connection: self, logger: logger) - } - - /// Called when channel goes inactive while in the pool. - func remoteClosed(logger: Logger) { - self.channel.eventLoop.assertInEventLoop() - self.provider.remoteClosed(connection: self, logger: logger) - } - - func cancel() -> EventLoopFuture { - return self.channel.triggerUserOutboundEvent(TaskCancelEvent()) - } - - /// Called from `HTTP1ConnectionProvider.close` when client is shutting down. - func close() -> EventLoopFuture { - return self.channel.close() - } - - /// Sets idle timeout handler and channel inactivity listener. - func setIdleTimeout(timeout: TimeAmount?, logger: Logger) { - _ = self.channel.pipeline.addHandler(IdleStateHandler(writeTimeout: timeout), position: .first).flatMap { _ in - self.channel.pipeline.addHandler(IdlePoolConnectionHandler(connection: self, - logger: logger)) - } - } - - /// Removes idle timeout handler and channel inactivity listener - func cancelIdleTimeout() -> EventLoopFuture { - return self.removeHandler(IdleStateHandler.self).flatMap { _ in - self.removeHandler(IdlePoolConnectionHandler.self) - } - } -} - -struct ConnectionKey: Hashable { - let connection: Connection - - init(_ connection: Connection) { - self.connection = connection - } - - static func == (lhs: ConnectionKey, rhs: ConnectionKey) -> Bool { - return ObjectIdentifier(lhs.connection) == ObjectIdentifier(rhs.connection) - } - - func hash(into hasher: inout Hasher) { - hasher.combine(ObjectIdentifier(self.connection)) - } - - func cancel() -> EventLoopFuture { - return self.connection.cancel() - } -} - /// A connection provider of `HTTP/1.1` connections with a given `Key` (host, scheme, port) /// /// On top of enabling connection reuse this provider it also facilitates the creation @@ -286,7 +191,7 @@ class HTTP1ConnectionProvider { var closePromise: EventLoopPromise - var state: ConnectionsState + var state: ConnectionsState private let backgroundActivityLogger: Logger @@ -317,7 +222,7 @@ class HTTP1ConnectionProvider { self.state.assertInvariants() } - func execute(_ action: Action, logger: Logger) { + func execute(_ action: Action, logger: Logger) { switch action { case .lease(let connection, let waiter): // if connection is became inactive, we create a new one. @@ -392,7 +297,7 @@ class HTTP1ConnectionProvider { func getConnection(preference: HTTPClient.EventLoopPreference, setupComplete: EventLoopFuture, logger: Logger) -> EventLoopFuture { - let waiter = Waiter(promise: self.eventLoop.makePromise(), setupComplete: setupComplete, preference: preference) + let waiter = Waiter(promise: self.eventLoop.makePromise(), setupComplete: setupComplete, preference: preference) let action: Action = self.lock.withLock { self.state.acquire(waiter: waiter) @@ -404,10 +309,10 @@ class HTTP1ConnectionProvider { } func connect(_ result: Result, - waiter: Waiter, + waiter: Waiter, replacing closedConnection: Connection? = nil, logger: Logger) { - let action: Action + let action: Action switch result { case .success(let channel): logger.trace("successfully created connection", @@ -573,9 +478,9 @@ class HTTP1ConnectionProvider { /// /// `Waiter`s are created when `maximumConcurrentConnections` is reached /// and we cannot create new connections anymore. - struct Waiter { + struct Waiter { /// The promise to complete once a connection is available - let promise: EventLoopPromise + let promise: EventLoopPromise /// Future that will be succeeded when request timeout handler and `TaskHandler` are added to the pipeline. let setupComplete: EventLoopFuture @@ -586,39 +491,6 @@ class HTTP1ConnectionProvider { } } -class IdlePoolConnectionHandler: ChannelInboundHandler, RemovableChannelHandler { - typealias InboundIn = NIOAny - - let connection: Connection - var eventSent: Bool - let logger: Logger - - init(connection: Connection, logger: Logger) { - self.connection = connection - self.eventSent = false - self.logger = logger - } - - // this is needed to detect when remote end closes connection while connection is in the pool idling - func channelInactive(context: ChannelHandlerContext) { - if !self.eventSent { - self.eventSent = true - self.connection.remoteClosed(logger: self.logger) - } - } - - func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) { - if let idleEvent = event as? IdleStateHandler.IdleStateEvent, idleEvent == .write { - if !self.eventSent { - self.eventSent = true - self.connection.timeout(logger: self.logger) - } - } else { - context.fireUserInboundEventTriggered(event) - } - } -} - extension CircularBuffer { mutating func swap(at index: Index, with value: Element) -> Element { let tmp = self[index] diff --git a/Sources/AsyncHTTPClient/ConnectionsState.swift b/Sources/AsyncHTTPClient/ConnectionsState.swift index d99416a49..2ceace465 100644 --- a/Sources/AsyncHTTPClient/ConnectionsState.swift +++ b/Sources/AsyncHTTPClient/ConnectionsState.swift @@ -15,29 +15,29 @@ import NIO extension HTTP1ConnectionProvider { - enum Action { - case lease(Connection, Waiter) - case create(Waiter) - case replace(Connection, Waiter) + enum Action { + case lease(ConnectionType, Waiter) + case create(Waiter) + case replace(ConnectionType, Waiter) case closeProvider - case park(Connection) + case park(ConnectionType) case none - case fail(Waiter, Error) - indirect case closeAnd(Connection, Action) - indirect case parkAnd(Connection, Action) + case fail(Waiter, Error) + indirect case closeAnd(ConnectionType, Action) + indirect case parkAnd(ConnectionType, Action) } - struct ConnectionsState { + struct ConnectionsState { enum State { case active case closed } - struct Snapshot { + struct Snapshot { var state: State - var availableConnections: CircularBuffer - var leasedConnections: Set - var waiters: CircularBuffer + var availableConnections: CircularBuffer + var leasedConnections: Set> + var waiters: CircularBuffer> var openedConnectionsCount: Int var pending: Int } @@ -48,16 +48,16 @@ extension HTTP1ConnectionProvider { private var state: State = .active /// Opened connections that are available. - private var availableConnections: CircularBuffer = .init(initialCapacity: 8) + private var availableConnections: CircularBuffer = .init(initialCapacity: 8) /// Opened connections that are leased to the user. - private var leasedConnections: Set = .init() + private var leasedConnections: Set> = .init() /// Consumers that weren't able to get a new connection without exceeding /// `maximumConcurrentConnections` get a `Future` /// whose associated promise is stored in `Waiter`. The promise is completed /// as soon as possible by the provider, in FIFO order. - private var waiters: CircularBuffer = .init(initialCapacity: 8) + private var waiters: CircularBuffer> = .init(initialCapacity: 8) /// Number of opened or opening connections, used to keep track of all connections and enforcing `maximumConcurrentConnections` limit. private var openedConnectionsCount: Int = 0 @@ -70,11 +70,11 @@ extension HTTP1ConnectionProvider { self.eventLoop = eventLoop } - func testsOnly_getInternalState() -> Snapshot { + func testsOnly_getInternalState() -> Snapshot { return Snapshot(state: self.state, availableConnections: self.availableConnections, leasedConnections: self.leasedConnections, waiters: self.waiters, openedConnectionsCount: self.openedConnectionsCount, pending: self.pending) } - mutating func testsOnly_setInternalState(_ snapshot: Snapshot) { + mutating func testsOnly_setInternalState(_ snapshot: Snapshot) { self.state = snapshot.state self.availableConnections = snapshot.availableConnections self.leasedConnections = snapshot.leasedConnections @@ -109,7 +109,7 @@ extension HTTP1ConnectionProvider { return self.openedConnectionsCount == 0 && self.pending == 0 } - mutating func acquire(waiter: Waiter) -> Action { + mutating func acquire(waiter: Waiter) -> Action { switch self.state { case .active: self.pending -= 1 @@ -117,7 +117,7 @@ extension HTTP1ConnectionProvider { let (eventLoop, required) = self.resolvePreference(waiter.preference) if required { // If there is an opened connection on the same EL - use it - if let found = self.availableConnections.firstIndex(where: { $0.channel.eventLoop === eventLoop }) { + if let found = self.availableConnections.firstIndex(where: { $0.eventLoop === eventLoop }) { let connection = self.availableConnections.remove(at: found) self.leasedConnections.insert(ConnectionKey(connection)) return .lease(connection, waiter) @@ -151,7 +151,7 @@ extension HTTP1ConnectionProvider { } } - mutating func release(connection: Connection, closing: Bool) -> Action { + mutating func release(connection: ConnectionType, closing: Bool) -> Action { switch self.state { case .active: assert(self.leasedConnections.contains(ConnectionKey(connection))) @@ -161,12 +161,12 @@ extension HTTP1ConnectionProvider { let (eventLoop, required) = self.resolvePreference(waiter.preference) // If returned connection is on same EL or we do not require special EL - lease it - if connection.channel.eventLoop === eventLoop || !required { + if connection.eventLoop === eventLoop || !required { return .lease(connection, waiter) } // If there is an opened connection on the same loop, lease it and park returned - if let found = self.availableConnections.firstIndex(where: { $0.channel.eventLoop === eventLoop }) { + if let found = self.availableConnections.firstIndex(where: { $0.eventLoop === eventLoop }) { self.leasedConnections.remove(ConnectionKey(connection)) let replacement = self.availableConnections.swap(at: found, with: connection) self.leasedConnections.insert(ConnectionKey(replacement)) @@ -203,7 +203,7 @@ extension HTTP1ConnectionProvider { } } - mutating func offer(connection: Connection) -> Action { + mutating func offer(connection: ConnectionType) -> Action { switch self.state { case .active: self.leasedConnections.insert(ConnectionKey(connection)) @@ -214,7 +214,7 @@ extension HTTP1ConnectionProvider { } } - mutating func drop(connection: Connection) { + mutating func drop(connection: ConnectionType) { switch self.state { case .active: self.leasedConnections.remove(ConnectionKey(connection)) @@ -223,7 +223,7 @@ extension HTTP1ConnectionProvider { } } - mutating func connectFailed() -> Action { + mutating func connectFailed() -> Action { switch self.state { case .active: self.openedConnectionsCount -= 1 @@ -239,7 +239,7 @@ extension HTTP1ConnectionProvider { } } - mutating func remoteClosed(connection: Connection) -> Action { + mutating func remoteClosed(connection: ConnectionType) -> Action { switch self.state { case .active: // Connection can be closed remotely while we wait for `.lease` action to complete. @@ -260,7 +260,7 @@ extension HTTP1ConnectionProvider { } } - mutating func timeout(connection: Connection) -> Action { + mutating func timeout(connection: ConnectionType) -> Action { switch self.state { case .active: // We can get timeout and inUse = true when we decided to lease the connection, but this action is not executed yet. @@ -285,12 +285,12 @@ extension HTTP1ConnectionProvider { } } - mutating func processNextWaiter() -> Action { + mutating func processNextWaiter() -> Action { if let waiter = self.waiters.popFirst() { let (eventLoop, required) = self.resolvePreference(waiter.preference) // If specific EL is required, we have only two options - find open one or create a new one - if required, let found = self.availableConnections.firstIndex(where: { $0.channel.eventLoop === eventLoop }) { + if required, let found = self.availableConnections.firstIndex(where: { $0.eventLoop === eventLoop }) { let connection = self.availableConnections.remove(at: found) self.leasedConnections.insert(ConnectionKey(connection)) return .lease(connection, waiter) @@ -313,7 +313,7 @@ extension HTTP1ConnectionProvider { return .none } - mutating func close() -> (CircularBuffer, CircularBuffer, Set, Bool)? { + mutating func close() -> (CircularBuffer>, CircularBuffer, Set>, Bool)? { switch self.state { case .active: let waiters = self.waiters diff --git a/Sources/AsyncHTTPClient/StringConvertibleInstances.swift b/Sources/AsyncHTTPClient/StringConvertibleInstances.swift index 1039dc3f1..9cbb0afad 100644 --- a/Sources/AsyncHTTPClient/StringConvertibleInstances.swift +++ b/Sources/AsyncHTTPClient/StringConvertibleInstances.swift @@ -12,12 +12,6 @@ // //===----------------------------------------------------------------------===// -extension Connection: CustomStringConvertible { - var description: String { - return "\(self.channel)" - } -} - extension HTTP1ConnectionProvider.Waiter: CustomStringConvertible { var description: String { return "HTTP1ConnectionProvider.Waiter(\(self.preference))" diff --git a/Tests/AsyncHTTPClientTests/ConnectionPoolTests+XCTest.swift b/Tests/AsyncHTTPClientTests/ConnectionPoolTests+XCTest.swift index 04cfa6421..0fea77411 100644 --- a/Tests/AsyncHTTPClientTests/ConnectionPoolTests+XCTest.swift +++ b/Tests/AsyncHTTPClientTests/ConnectionPoolTests+XCTest.swift @@ -57,11 +57,6 @@ extension ConnectionPoolTests { ("testTimeoutAvailableConnection", testTimeoutAvailableConnection), ("testRemoteClosedLeasedConnection", testRemoteClosedLeasedConnection), ("testRemoteClosedAvailableConnection", testRemoteClosedAvailableConnection), - ("testConnectionReleaseActive", testConnectionReleaseActive), - ("testConnectionReleaseInactive", testConnectionReleaseInactive), - ("testConnectionRemoteCloseRelease", testConnectionRemoteCloseRelease), - ("testConnectionTimeoutRelease", testConnectionTimeoutRelease), - ("testAcquireAvailableBecomesUnavailable", testAcquireAvailableBecomesUnavailable), ("testShutdownOnPendingAndSuccess", testShutdownOnPendingAndSuccess), ("testShutdownOnPendingAndError", testShutdownOnPendingAndError), ("testShutdownTimeout", testShutdownTimeout), diff --git a/Tests/AsyncHTTPClientTests/ConnectionPoolTests.swift b/Tests/AsyncHTTPClientTests/ConnectionPoolTests.swift index aee407b28..62a8d1db0 100644 --- a/Tests/AsyncHTTPClientTests/ConnectionPoolTests.swift +++ b/Tests/AsyncHTTPClientTests/ConnectionPoolTests.swift @@ -27,10 +27,8 @@ class ConnectionPoolTests: XCTestCase { var eventLoop: EmbeddedEventLoop! var http1ConnectionProvider: HTTP1ConnectionProvider! - struct TempError: Error {} - func testPending() { - var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop) + var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop) var snapshot = state.testsOnly_getInternalState() XCTAssertEqual(0, snapshot.availableConnections.count) @@ -52,7 +50,7 @@ class ConnectionPoolTests: XCTestCase { // MARK: - Acquire Tests func testAcquireWhenEmpty() { - var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop) + var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop) var snapshot = state.testsOnly_getInternalState() XCTAssertEqual(0, snapshot.availableConnections.count) @@ -157,7 +155,7 @@ class ConnectionPoolTests: XCTestCase { // MARK: - Acquire on Specific EL Tests func testAcquireWhenEmptySpecificEL() { - var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop) + var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop) var snapshot = state.testsOnly_getInternalState() XCTAssertEqual(0, snapshot.availableConnections.count) @@ -303,7 +301,7 @@ class ConnectionPoolTests: XCTestCase { // MARK: - Acquire Errors Tests func testAcquireWhenClosed() { - var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop) + var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop) _ = state.close() XCTAssertFalse(state.enqueue()) @@ -319,7 +317,7 @@ class ConnectionPoolTests: XCTestCase { } func testConnectFailedWhenClosed() { - var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop) + var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop) _ = state.close() let action = state.connectFailed() @@ -1221,180 +1219,10 @@ class ConnectionPoolTests: XCTestCase { self.http1ConnectionProvider.execute(action, logger: HTTPClient.loggingDisabled) } - // MARK: - Connection Tests - - func testConnectionReleaseActive() throws { - let channel = ActiveChannel(eventLoop: self.eventLoop) - var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - - let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) - snapshot.openedConnectionsCount = 1 - snapshot.leasedConnections.insert(ConnectionKey(connection)) - - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) - - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(1, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) - - connection.release(closing: false, logger: HTTPClient.loggingDisabled) - - // XCTAssertFalse(connection.isInUse) - snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - XCTAssertEqual(1, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) - - // cleanup - // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead - // (https://github.com/swift-server/async-http-client/issues/234) - connection.remoteClosed(logger: HTTPClient.loggingDisabled) - } - - func testConnectionReleaseInactive() throws { - let channel = EmbeddedChannel() - - var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - - let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) - snapshot.openedConnectionsCount = 1 - snapshot.leasedConnections.insert(ConnectionKey(connection)) - - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) - - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(1, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) - - connection.release(closing: true, logger: HTTPClient.loggingDisabled) - - snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(0, snapshot.openedConnectionsCount) - } - - func testConnectionRemoteCloseRelease() throws { - let channel = EmbeddedChannel() - - var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - - let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) - snapshot.availableConnections.append(connection) - snapshot.openedConnectionsCount = 1 - - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) - - XCTAssertEqual(1, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) - - connection.remoteClosed(logger: HTTPClient.loggingDisabled) - - snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(0, snapshot.openedConnectionsCount) - } - - func testConnectionTimeoutRelease() throws { - let channel = EmbeddedChannel() - - var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - - let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) - snapshot.availableConnections.append(connection) - snapshot.openedConnectionsCount = 1 - - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) - - XCTAssertEqual(1, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) - - connection.timeout(logger: HTTPClient.loggingDisabled) - - snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(0, snapshot.openedConnectionsCount) - } - - func testAcquireAvailableBecomesUnavailable() throws { - let channel = ActiveChannel(eventLoop: self.eventLoop) - var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - - let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) - snapshot.availableConnections.append(connection) - snapshot.openedConnectionsCount = 1 - - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) - - XCTAssertEqual(1, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) - - XCTAssertTrue(self.http1ConnectionProvider.enqueue()) - - let action = self.http1ConnectionProvider.state.acquire(waiter: .init(promise: self.eventLoop.makePromise(), setupComplete: self.eventLoop.makeSucceededFuture(()), preference: .indifferent)) - switch action { - case .lease(let connection, let waiter): - // Since this connection is already in use, this should be a no-op and state should not have changed from normal lease - connection.timeout(logger: HTTPClient.loggingDisabled) - - snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - XCTAssertTrue(connection.isActiveEstimation) - XCTAssertTrue(snapshot.leasedConnections.contains(ConnectionKey(connection))) - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(1, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) - - // This is unrecoverable, but in this case we create a new connection, so state again should not change, even though release will be called - // This is important to preventself.http1ConnectionProvider deletion since connection is released and there could be 0 waiters - connection.remoteClosed(logger: HTTPClient.loggingDisabled) - - snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - XCTAssertTrue(snapshot.leasedConnections.contains(ConnectionKey(connection))) - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(1, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) - - // cleanup - // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead - // (https://github.com/swift-server/async-http-client/issues/234) - waiter.promise.fail(TempError()) - self.http1ConnectionProvider.release(connection: connection, closing: true, logger: HTTPClient.loggingDisabled) - default: - XCTFail("Unexpected action: \(action)") - } - } - // MARK: - Shutdown tests func testShutdownOnPendingAndSuccess() { - var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop) + var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop) XCTAssertTrue(state.enqueue()) @@ -1433,7 +1261,7 @@ class ConnectionPoolTests: XCTestCase { } func testShutdownOnPendingAndError() { - var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop) + var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop) XCTAssertTrue(state.enqueue()) @@ -1470,7 +1298,7 @@ class ConnectionPoolTests: XCTestCase { } func testShutdownTimeout() { - var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop) + var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop) var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() @@ -1505,7 +1333,7 @@ class ConnectionPoolTests: XCTestCase { } func testShutdownRemoteClosed() { - var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop) + var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop) var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() @@ -1557,79 +1385,3 @@ class ConnectionPoolTests: XCTestCase { self.http1ConnectionProvider = nil } } - -class ActiveChannel: Channel, ChannelCore { - struct NotImplementedError: Error {} - - func localAddress0() throws -> SocketAddress { - throw NotImplementedError() - } - - func remoteAddress0() throws -> SocketAddress { - throw NotImplementedError() - } - - func register0(promise: EventLoopPromise?) { - promise?.fail(NotImplementedError()) - } - - func bind0(to: SocketAddress, promise: EventLoopPromise?) { - promise?.fail(NotImplementedError()) - } - - func connect0(to: SocketAddress, promise: EventLoopPromise?) { - promise?.fail(NotImplementedError()) - } - - func write0(_ data: NIOAny, promise: EventLoopPromise?) { - promise?.fail(NotImplementedError()) - } - - func flush0() {} - - func read0() {} - - func close0(error: Error, mode: CloseMode, promise: EventLoopPromise?) { - promise?.succeed(()) - } - - func triggerUserOutboundEvent0(_ event: Any, promise: EventLoopPromise?) { - promise?.fail(NotImplementedError()) - } - - func channelRead0(_: NIOAny) {} - - func errorCaught0(error: Error) {} - - var allocator: ByteBufferAllocator - var closeFuture: EventLoopFuture - var eventLoop: EventLoop - - var localAddress: SocketAddress? - var remoteAddress: SocketAddress? - var parent: Channel? - var isWritable: Bool = true - var isActive: Bool = true - - init(eventLoop: EmbeddedEventLoop) { - self.allocator = ByteBufferAllocator() - self.eventLoop = eventLoop - self.closeFuture = self.eventLoop.makeSucceededFuture(()) - } - - var _channelCore: ChannelCore { - return self - } - - var pipeline: ChannelPipeline { - return ChannelPipeline(channel: self) - } - - func setOption