diff --git a/Sources/AsyncHTTPClient/ConnectionPool.swift b/Sources/AsyncHTTPClient/ConnectionPool.swift index ed9bcbd8c..8d8fda20e 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool.swift @@ -235,7 +235,7 @@ class HTTP1ConnectionProvider { logger.trace("opening fresh connection (found matching but inactive connection)", metadata: ["ahc-dead-connection": "\(connection)"]) self.makeChannel(preference: waiter.preference).whenComplete { result in - self.connect(result, waiter: waiter, replacing: connection, logger: logger) + self.connect(result, waiter: waiter, logger: logger) } } } @@ -252,7 +252,7 @@ class HTTP1ConnectionProvider { metadata: ["ahc-old-connection": "\(connection)", "ahc-waiter": "\(waiter)"]) self.makeChannel(preference: waiter.preference).whenComplete { result in - self.connect(result, waiter: waiter, replacing: connection, logger: logger) + self.connect(result, waiter: waiter, logger: logger) } } case .park(let connection): @@ -308,10 +308,7 @@ class HTTP1ConnectionProvider { return waiter.promise.futureResult } - func connect(_ result: Result, - waiter: Waiter, - replacing closedConnection: Connection? = nil, - logger: Logger) { + func connect(_ result: Result, waiter: Waiter, logger: Logger) { let action: Action switch result { case .success(let channel): @@ -319,10 +316,7 @@ class HTTP1ConnectionProvider { metadata: ["ahc-connection": "\(channel)"]) let connection = Connection(channel: channel, provider: self) action = self.lock.withLock { - if let closedConnection = closedConnection { - self.state.drop(connection: closedConnection) - } - return self.state.offer(connection: connection) + self.state.offer(connection: connection) } switch action { @@ -367,7 +361,7 @@ class HTTP1ConnectionProvider { // This is needed to start a new stack, otherwise, since this is called on a previous // future completion handler chain, it will be growing indefinitely until the connection is closed. // We might revisit this when https://github.com/apple/swift-nio/issues/970 is resolved. - connection.channel.eventLoop.execute { + connection.eventLoop.execute { self.execute(action, logger: logger) } } @@ -418,59 +412,7 @@ class HTTP1ConnectionProvider { } private func makeChannel(preference: HTTPClient.EventLoopPreference) -> EventLoopFuture { - let channelEventLoop = preference.bestEventLoop ?? self.eventLoop - let requiresTLS = self.key.scheme.requiresTLS - let bootstrap: NIOClientTCPBootstrap - do { - bootstrap = try NIOClientTCPBootstrap.makeHTTPClientBootstrapBase(on: channelEventLoop, host: self.key.host, port: self.key.port, requiresTLS: requiresTLS, configuration: self.configuration) - } catch { - return channelEventLoop.makeFailedFuture(error) - } - - let channel: EventLoopFuture - switch self.key.scheme { - case .http, .https: - let address = HTTPClient.resolveAddress(host: self.key.host, port: self.key.port, proxy: self.configuration.proxy) - channel = bootstrap.connect(host: address.host, port: address.port) - case .unix, .http_unix, .https_unix: - channel = bootstrap.connect(unixDomainSocketPath: self.key.unixPath) - } - - return channel.flatMap { channel in - let requiresSSLHandler = self.configuration.proxy != nil && self.key.scheme.requiresTLS - let handshakePromise = channel.eventLoop.makePromise(of: Void.self) - - channel.pipeline.addSSLHandlerIfNeeded(for: self.key, tlsConfiguration: self.configuration.tlsConfiguration, addSSLClient: requiresSSLHandler, handshakePromise: handshakePromise) - - return handshakePromise.futureResult.flatMap { - channel.pipeline.addHTTPClientHandlers(leftOverBytesStrategy: .forwardBytes) - }.flatMap { - #if canImport(Network) - if #available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *), bootstrap.underlyingBootstrap is NIOTSConnectionBootstrap { - return channel.pipeline.addHandler(HTTPClient.NWErrorHandler(), position: .first) - } - #endif - return channel.eventLoop.makeSucceededFuture(()) - }.flatMap { - switch self.configuration.decompression { - case .disabled: - return channel.eventLoop.makeSucceededFuture(()) - case .enabled(let limit): - let decompressHandler = NIOHTTPResponseDecompressor(limit: limit) - return channel.pipeline.addHandler(decompressHandler) - } - }.map { - channel - } - }.flatMapError { error in - #if canImport(Network) - var error = error - if #available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *), bootstrap.underlyingBootstrap is NIOTSConnectionBootstrap { - error = HTTPClient.NWErrorHandler.translateError(error) - } - #endif - return channelEventLoop.makeFailedFuture(error) - } + return NIOClientTCPBootstrap.makeHTTP1Channel(destination: self.key, eventLoop: self.eventLoop, configuration: self.configuration, preference: preference) } /// A `Waiter` represents a request that waits for a connection when none is diff --git a/Sources/AsyncHTTPClient/ConnectionsState.swift b/Sources/AsyncHTTPClient/ConnectionsState.swift index 2ceace465..3f593baca 100644 --- a/Sources/AsyncHTTPClient/ConnectionsState.swift +++ b/Sources/AsyncHTTPClient/ConnectionsState.swift @@ -74,15 +74,6 @@ extension HTTP1ConnectionProvider { return Snapshot(state: self.state, availableConnections: self.availableConnections, leasedConnections: self.leasedConnections, waiters: self.waiters, openedConnectionsCount: self.openedConnectionsCount, pending: self.pending) } - mutating func testsOnly_setInternalState(_ snapshot: Snapshot) { - self.state = snapshot.state - self.availableConnections = snapshot.availableConnections - self.leasedConnections = snapshot.leasedConnections - self.waiters = snapshot.waiters - self.openedConnectionsCount = snapshot.openedConnectionsCount - self.pending = snapshot.pending - } - func assertInvariants() { assert(self.waiters.isEmpty) assert(self.availableConnections.isEmpty) @@ -158,6 +149,12 @@ extension HTTP1ConnectionProvider { if connection.isActiveEstimation, !closing { // If connection is alive, we can offer it to a next waiter if let waiter = self.waiters.popFirst() { + // There should be no case where we have both capacity and a waiter here. + // Waiter can only exists if there was no capacity at aquire. If some connection + // is released when we have waiter it can only indicate that we should lease (if EL are the same), + // or replace (if they are different). But we cannot increase connection count here. + assert(!self.hasCapacity) + let (eventLoop, required) = self.resolvePreference(waiter.preference) // If returned connection is on same EL or we do not require special EL - lease it @@ -165,23 +162,9 @@ extension HTTP1ConnectionProvider { return .lease(connection, waiter) } - // If there is an opened connection on the same loop, lease it and park returned - if let found = self.availableConnections.firstIndex(where: { $0.eventLoop === eventLoop }) { - self.leasedConnections.remove(ConnectionKey(connection)) - let replacement = self.availableConnections.swap(at: found, with: connection) - self.leasedConnections.insert(ConnectionKey(replacement)) - return .parkAnd(connection, .lease(replacement, waiter)) - } - - // If we can create new connection - do it - if self.hasCapacity { - self.leasedConnections.remove(ConnectionKey(connection)) - self.availableConnections.append(connection) - self.openedConnectionsCount += 1 - return .parkAnd(connection, .create(waiter)) - } - // If we cannot create new connections, we will have to replace returned connection with a new one on the required loop + // We will keep the `openedConnectionCount`, since .replace === .create, so we decrease and increase the `openedConnectionCount` + self.leasedConnections.remove(ConnectionKey(connection)) return .replace(connection, waiter) } else { // or park, if there are no waiters self.leasedConnections.remove(ConnectionKey(connection)) @@ -214,15 +197,6 @@ extension HTTP1ConnectionProvider { } } - mutating func drop(connection: ConnectionType) { - switch self.state { - case .active: - self.leasedConnections.remove(ConnectionKey(connection)) - case .closed: - assertionFailure("should not happen") - } - } - mutating func connectFailed() -> Action { switch self.state { case .active: @@ -287,20 +261,25 @@ extension HTTP1ConnectionProvider { mutating func processNextWaiter() -> Action { if let waiter = self.waiters.popFirst() { - let (eventLoop, required) = self.resolvePreference(waiter.preference) - - // If specific EL is required, we have only two options - find open one or create a new one - if required, let found = self.availableConnections.firstIndex(where: { $0.eventLoop === eventLoop }) { - let connection = self.availableConnections.remove(at: found) - self.leasedConnections.insert(ConnectionKey(connection)) - return .lease(connection, waiter) - } else if !required, let connection = self.availableConnections.popFirst() { - self.leasedConnections.insert(ConnectionKey(connection)) - return .lease(connection, waiter) - } else { - self.openedConnectionsCount += 1 - return .create(waiter) - } + // There should be no case where we have waiters and available connections at the same time. + // + // This method is called in following cases: + // + // 1. from `release` when connection is inactive and cannot be re-used + // 2. from `connectFailed` when we failed to establish a new connection + // 3. from `remoteClose` when connection was closed by the remote side and cannot be re-used + // 4. from `timeout` when connection was closed due to idle timeout and cannot be re-used. + // + // In all cases connection, which triggered the transition, will not be in `available` state. + // + // Given that the waiter can only be present in the pool if there were no available connections + // (otherwise it had been leased a connection immediately on getting the connection), we do not + // see a situation when we can lease another available connection, therefore the only course + // of action is to create a new connection for the waiter. + assert(self.availableConnections.isEmpty) + + self.openedConnectionsCount += 1 + return .create(waiter) } // if capacity is at max and the are no waiters and no in-flight requests for connection, we are closing this provider diff --git a/Sources/AsyncHTTPClient/Utils.swift b/Sources/AsyncHTTPClient/Utils.swift index dd30e0393..30fbd8107 100644 --- a/Sources/AsyncHTTPClient/Utils.swift +++ b/Sources/AsyncHTTPClient/Utils.swift @@ -138,6 +138,65 @@ extension NIOClientTCPBootstrap { return channelAddedFuture } } + + static func makeHTTP1Channel(destination: ConnectionPool.Key, eventLoop: EventLoop, configuration: HTTPClient.Configuration, preference: HTTPClient.EventLoopPreference) -> EventLoopFuture { + let channelEventLoop = preference.bestEventLoop ?? eventLoop + + let key = destination + + let requiresTLS = key.scheme.requiresTLS + let bootstrap: NIOClientTCPBootstrap + do { + bootstrap = try NIOClientTCPBootstrap.makeHTTPClientBootstrapBase(on: channelEventLoop, host: key.host, port: key.port, requiresTLS: requiresTLS, configuration: configuration) + } catch { + return channelEventLoop.makeFailedFuture(error) + } + + let channel: EventLoopFuture + switch key.scheme { + case .http, .https: + let address = HTTPClient.resolveAddress(host: key.host, port: key.port, proxy: configuration.proxy) + channel = bootstrap.connect(host: address.host, port: address.port) + case .unix, .http_unix, .https_unix: + channel = bootstrap.connect(unixDomainSocketPath: key.unixPath) + } + + return channel.flatMap { channel in + let requiresSSLHandler = configuration.proxy != nil && key.scheme.requiresTLS + let handshakePromise = channel.eventLoop.makePromise(of: Void.self) + + channel.pipeline.addSSLHandlerIfNeeded(for: key, tlsConfiguration: configuration.tlsConfiguration, addSSLClient: requiresSSLHandler, handshakePromise: handshakePromise) + + return handshakePromise.futureResult.flatMap { + channel.pipeline.addHTTPClientHandlers(leftOverBytesStrategy: .forwardBytes) + }.flatMap { + #if canImport(Network) + if #available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *), bootstrap.underlyingBootstrap is NIOTSConnectionBootstrap { + return channel.pipeline.addHandler(HTTPClient.NWErrorHandler(), position: .first) + } + #endif + return channel.eventLoop.makeSucceededFuture(()) + }.flatMap { + switch configuration.decompression { + case .disabled: + return channel.eventLoop.makeSucceededFuture(()) + case .enabled(let limit): + let decompressHandler = NIOHTTPResponseDecompressor(limit: limit) + return channel.pipeline.addHandler(decompressHandler) + } + }.map { + channel + } + }.flatMapError { error in + #if canImport(Network) + var error = error + if #available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *), bootstrap.underlyingBootstrap is NIOTSConnectionBootstrap { + error = HTTPClient.NWErrorHandler.translateError(error) + } + #endif + return channelEventLoop.makeFailedFuture(error) + } + } } extension Connection { diff --git a/Tests/AsyncHTTPClientTests/ConnectionPoolTests+XCTest.swift b/Tests/AsyncHTTPClientTests/ConnectionPoolTests+XCTest.swift index 0fea77411..25ef44ff7 100644 --- a/Tests/AsyncHTTPClientTests/ConnectionPoolTests+XCTest.swift +++ b/Tests/AsyncHTTPClientTests/ConnectionPoolTests+XCTest.swift @@ -41,18 +41,10 @@ extension ConnectionPoolTests { ("testReleaseInactiveConnectionEmptyQueueHasConnections", testReleaseInactiveConnectionEmptyQueueHasConnections), ("testReleaseAliveConnectionHasWaiter", testReleaseAliveConnectionHasWaiter), ("testReleaseInactiveConnectionHasWaitersNoConnections", testReleaseInactiveConnectionHasWaitersNoConnections), - ("testReleaseInactiveConnectionHasWaitersHasConnections", testReleaseInactiveConnectionHasWaitersHasConnections), ("testReleaseAliveConnectionSameELHasWaiterSpecificEL", testReleaseAliveConnectionSameELHasWaiterSpecificEL), - ("testReleaseAliveConnectionDifferentELNoSameELConnectionsHasWaiterSpecificEL", testReleaseAliveConnectionDifferentELNoSameELConnectionsHasWaiterSpecificEL), - ("testReleaseAliveConnectionDifferentELHasSameELConnectionsHasWaiterSpecificEL", testReleaseAliveConnectionDifferentELHasSameELConnectionsHasWaiterSpecificEL), ("testReleaseAliveConnectionDifferentELNoSameELConnectionsOnLimitHasWaiterSpecificEL", testReleaseAliveConnectionDifferentELNoSameELConnectionsOnLimitHasWaiterSpecificEL), - ("testReleaseInactiveConnectionHasWaitersHasSameELConnectionsSpecificEL", testReleaseInactiveConnectionHasWaitersHasSameELConnectionsSpecificEL), - ("testReleaseInactiveConnectionHasWaitersNoSameELConnectionsSpecificEL", testReleaseInactiveConnectionHasWaitersNoSameELConnectionsSpecificEL), ("testNextWaiterEmptyQueue", testNextWaiterEmptyQueue), ("testNextWaiterEmptyQueueHasConnections", testNextWaiterEmptyQueueHasConnections), - ("testNextWaiterHasWaitersHasConnections", testNextWaiterHasWaitersHasConnections), - ("testNextWaiterHasWaitersHasSameELConnectionsSpecificEL", testNextWaiterHasWaitersHasSameELConnectionsSpecificEL), - ("testNextWaiterHasWaitersHasDifferentELConnectionsSpecificEL", testNextWaiterHasWaitersHasDifferentELConnectionsSpecificEL), ("testTimeoutLeasedConnection", testTimeoutLeasedConnection), ("testTimeoutAvailableConnection", testTimeoutAvailableConnection), ("testRemoteClosedLeasedConnection", testRemoteClosedLeasedConnection), diff --git a/Tests/AsyncHTTPClientTests/ConnectionPoolTests.swift b/Tests/AsyncHTTPClientTests/ConnectionPoolTests.swift index 62a8d1db0..7dec834c5 100644 --- a/Tests/AsyncHTTPClientTests/ConnectionPoolTests.swift +++ b/Tests/AsyncHTTPClientTests/ConnectionPoolTests.swift @@ -25,39 +25,19 @@ import XCTest class ConnectionPoolTests: XCTestCase { var eventLoop: EmbeddedEventLoop! - var http1ConnectionProvider: HTTP1ConnectionProvider! func testPending() { - var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop) - - var snapshot = state.testsOnly_getInternalState() - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(0, snapshot.openedConnectionsCount) - + var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop) + XCTAssertState(state, available: 0, leased: 0, waiters: 0, pending: 0, opened: 0) XCTAssertTrue(state.enqueue()) - - snapshot = state.testsOnly_getInternalState() - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(1, snapshot.pending) - XCTAssertEqual(0, snapshot.openedConnectionsCount) + XCTAssertState(state, available: 0, leased: 0, waiters: 0, pending: 1, opened: 0) } // MARK: - Acquire Tests func testAcquireWhenEmpty() { - var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop) - - var snapshot = state.testsOnly_getInternalState() - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(0, snapshot.openedConnectionsCount) + var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop) + XCTAssertState(state, available: 0, leased: 0, waiters: 0, pending: 0, opened: 0) XCTAssertTrue(state.enqueue()) let action = state.acquire(waiter: .init(promise: self.eventLoop.makePromise(), setupComplete: self.eventLoop.makeSucceededFuture(()), preference: .indifferent)) @@ -68,105 +48,60 @@ class ConnectionPoolTests: XCTestCase { XCTFail("Unexpected action: \(action)") } - snapshot = state.testsOnly_getInternalState() - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) + XCTAssertState(state, available: 0, leased: 0, waiters: 0, pending: 0, opened: 1) } func testAcquireWhenAvailable() throws { - let channel = EmbeddedChannel() - - var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - - let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) - snapshot.availableConnections.append(connection) - snapshot.openedConnectionsCount = 1 + var (state, _) = self.buildState(count: 1) - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) + // Validate that the pool has one available connection and it's internal state is correct + XCTAssertState(state, available: 1, leased: 0, waiters: 0, pending: 0, opened: 1) - snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - XCTAssertEqual(1, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) - - XCTAssertTrue(self.http1ConnectionProvider.enqueue()) + XCTAssertTrue(state.enqueue()) - let action = self.http1ConnectionProvider.state.acquire(waiter: .init(promise: self.eventLoop.makePromise(), setupComplete: self.eventLoop.makeSucceededFuture(()), preference: .indifferent)) + let action = state.acquire(waiter: .init(promise: self.eventLoop.makePromise(), setupComplete: self.eventLoop.makeSucceededFuture(()), preference: .indifferent)) switch action { case .lease(let connection, let waiter): - waiter.promise.succeed(connection) - - snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(1, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) + XCTAssertState(state, available: 0, leased: 1, waiters: 0, pending: 0, opened: 1) // cleanup - // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead - // (https://github.com/swift-server/async-http-client/issues/234) - self.http1ConnectionProvider.release(connection: connection, closing: true, logger: HTTPClient.loggingDisabled) + waiter.promise.succeed(connection) default: XCTFail("Unexpected action: \(action)") } + + // cleanup + try XCTAssertStateClose(state, available: 0, leased: 1, waiters: 0, clean: false) } func testAcquireWhenUnavailable() throws { - var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() + var (state, _) = self.buildState(count: 8, release: false) + XCTAssertState(state, available: 0, leased: 8, waiters: 0, pending: 0, opened: 8) - snapshot.openedConnectionsCount = 8 - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) - - snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(8, snapshot.openedConnectionsCount) - - XCTAssertTrue(self.http1ConnectionProvider.enqueue()) - - let action = self.http1ConnectionProvider.state.acquire(waiter: .init(promise: self.eventLoop.makePromise(), setupComplete: self.eventLoop.makeSucceededFuture(()), preference: .indifferent)) + XCTAssertTrue(state.enqueue()) + let action = state.acquire(waiter: .init(promise: self.eventLoop.makePromise(), setupComplete: self.eventLoop.makeSucceededFuture(()), preference: .indifferent)) switch action { case .none: - snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(1, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(8, snapshot.openedConnectionsCount) + XCTAssertState(state, available: 0, leased: 8, waiters: 1, pending: 0, opened: 8) default: XCTFail("Unexpected action: \(action)") } // cleanup - // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead - // (https://github.com/swift-server/async-http-client/issues/234) - snapshot.openedConnectionsCount = 0 - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) + try XCTAssertStateClose(state, available: 0, leased: 8, waiters: 1, clean: false) } // MARK: - Acquire on Specific EL Tests func testAcquireWhenEmptySpecificEL() { - var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop) - var snapshot = state.testsOnly_getInternalState() + let el: EventLoop = self.eventLoop + let preference: HTTPClient.EventLoopPreference = .delegateAndChannel(on: el) - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(0, snapshot.openedConnectionsCount) + var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: el) + XCTAssertState(state, available: 0, leased: 0, waiters: 0, pending: 0, opened: 0) XCTAssertTrue(state.enqueue()) - - let action = state.acquire(waiter: .init(promise: self.eventLoop.makePromise(), setupComplete: self.eventLoop.makeSucceededFuture(()), preference: .delegateAndChannel(on: self.eventLoop))) + let action = state.acquire(waiter: .init(promise: el.makePromise(), setupComplete: el.makeSucceededFuture(()), preference: preference)) switch action { case .create(let waiter): waiter.promise.fail(TempError()) @@ -174,128 +109,69 @@ class ConnectionPoolTests: XCTestCase { XCTFail("Unexpected action: \(action)") } - snapshot = state.testsOnly_getInternalState() - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) + XCTAssertState(state, available: 0, leased: 0, waiters: 0, pending: 0, opened: 1) } func testAcquireWhenAvailableSpecificEL() throws { - let channel = EmbeddedChannel() + let el: EventLoop = self.eventLoop + let preference: HTTPClient.EventLoopPreference = .delegateAndChannel(on: el) + var (state, _) = self.buildState(count: 1, eventLoop: el) + XCTAssertState(state, available: 1, leased: 0, waiters: 0, pending: 0, opened: 1) - var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - - let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) - snapshot.availableConnections.append(connection) - snapshot.openedConnectionsCount = 1 - - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) - - XCTAssertEqual(1, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) - - XCTAssertTrue(self.http1ConnectionProvider.enqueue()) - - let action = self.http1ConnectionProvider.state.acquire(waiter: .init(promise: channel.eventLoop.makePromise(), setupComplete: channel.eventLoop.makeSucceededFuture(()), preference: .delegateAndChannel(on: channel.eventLoop))) + XCTAssertTrue(state.enqueue()) + let action = state.acquire(waiter: .init(promise: el.makePromise(), setupComplete: el.makeSucceededFuture(()), preference: preference)) switch action { case .lease(let connection, let waiter): waiter.promise.succeed(connection) - - snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(1, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) - - // cleanup - // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead - // (https://github.com/swift-server/async-http-client/issues/234) - self.http1ConnectionProvider.release(connection: connection, closing: true, logger: HTTPClient.loggingDisabled) + XCTAssertState(state, available: 0, leased: 1, waiters: 0, pending: 0, opened: 1) default: XCTFail("Unexpected action: \(action)") } + + // cleanup + try XCTAssertStateClose(state, available: 0, leased: 1, waiters: 0, clean: false) } func testAcquireReplace() throws { - let channel = EmbeddedChannel() - - var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() + let el: EventLoop = self.eventLoop + var (state, connections) = self.buildState(count: 8, release: false, eventLoop: el) - let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) - snapshot.availableConnections.append(connection) - snapshot.openedConnectionsCount = 8 + // release a connection + _ = state.release(connection: connections.first!, closing: false) + XCTAssertState(state, available: 1, leased: 7, waiters: 0, pending: 0, opened: 8) - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) + // other eventLoop + let preference: HTTPClient.EventLoopPreference = .delegateAndChannel(on: EmbeddedEventLoop()) - XCTAssertEqual(1, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(8, snapshot.openedConnectionsCount) - - XCTAssertTrue(self.http1ConnectionProvider.enqueue()) - - let action = self.http1ConnectionProvider.state.acquire(waiter: .init(promise: self.eventLoop.makePromise(), setupComplete: self.eventLoop.makeSucceededFuture(()), preference: .delegateAndChannel(on: self.eventLoop))) + XCTAssertTrue(state.enqueue()) + let action = state.acquire(waiter: .init(promise: el.makePromise(), setupComplete: el.makeSucceededFuture(()), preference: preference)) switch action { - case .replace(_, let waiter): + case .replace(let connection, let waiter): waiter.promise.fail(TempError()) - - snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(8, snapshot.openedConnectionsCount) + XCTAssertState(state, available: 0, leased: 7, waiters: 0, pending: 0, opened: 8, isNotLeased: connection) default: XCTFail("Unexpected action: \(action)") } // cleanup - // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead - // (https://github.com/swift-server/async-http-client/issues/234) - snapshot.openedConnectionsCount = 0 - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) + try XCTAssertStateClose(state, available: 0, leased: 7, waiters: 0, clean: false) } func testAcquireWhenUnavailableSpecificEL() throws { - var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - - snapshot.openedConnectionsCount = 8 - - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) - - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(8, snapshot.openedConnectionsCount) + var (state, _) = self.buildState(count: 8, release: false, eventLoop: self.eventLoop) + XCTAssertState(state, available: 0, leased: 8, waiters: 0, pending: 0, opened: 8) - XCTAssertTrue(self.http1ConnectionProvider.enqueue()) - - let action = self.http1ConnectionProvider.state.acquire(waiter: .init(promise: self.eventLoop.makePromise(), setupComplete: self.eventLoop.makeSucceededFuture(()), preference: .delegateAndChannel(on: self.eventLoop))) + XCTAssertTrue(state.enqueue()) + let action = state.acquire(waiter: .init(promise: self.eventLoop.makePromise(), setupComplete: self.eventLoop.makeSucceededFuture(()), preference: .delegateAndChannel(on: self.eventLoop))) switch action { case .none: - snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(1, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(8, snapshot.openedConnectionsCount) + XCTAssertState(state, available: 0, leased: 8, waiters: 1, pending: 0, opened: 8) default: XCTFail("Unexpected action: \(action)") } // cleanup - // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead - // (https://github.com/swift-server/async-http-client/issues/234) - snapshot.openedConnectionsCount = 0 - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) + try XCTAssertStateClose(state, available: 0, leased: 8, waiters: 1, clean: false) } // MARK: - Acquire Errors Tests @@ -303,6 +179,7 @@ class ConnectionPoolTests: XCTestCase { func testAcquireWhenClosed() { var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop) _ = state.close() + XCTAssertState(state, available: 0, leased: 0, waiters: 0, pending: 0, opened: 0) XCTAssertFalse(state.enqueue()) @@ -319,6 +196,7 @@ class ConnectionPoolTests: XCTestCase { func testConnectFailedWhenClosed() { var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop) _ = state.close() + XCTAssertState(state, available: 0, leased: 0, waiters: 0, pending: 0, opened: 0) let action = state.connectFailed() switch action { @@ -332,732 +210,201 @@ class ConnectionPoolTests: XCTestCase { // MARK: - Release Tests func testReleaseAliveConnectionEmptyQueue() throws { - let channel = ActiveChannel(eventLoop: self.eventLoop) - var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - - snapshot.pending = 0 - snapshot.openedConnectionsCount = 1 - let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) - snapshot.leasedConnections.insert(ConnectionKey(connection)) + var (state, connections) = self.buildState(count: 1, release: false) + XCTAssertState(state, available: 0, leased: 1, waiters: 0, pending: 0, opened: 1) - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) - - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(1, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) - - let action = self.http1ConnectionProvider.state.release(connection: connection, closing: false) + let connection = try XCTUnwrap(connections.first) + let action = state.release(connection: connection, closing: false) switch action { case .park: - snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - XCTAssertEqual(1, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) + XCTAssertState(state, available: 1, leased: 0, waiters: 0, pending: 0, opened: 1) default: XCTFail("Unexpected action: \(action)") } // cleanup - // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead - // (https://github.com/swift-server/async-http-client/issues/234) - connection.remoteClosed(logger: HTTPClient.loggingDisabled) + try XCTAssertStateClose(state, available: 1, leased: 0, waiters: 0, clean: true) } func testReleaseAliveButClosingConnectionEmptyQueue() throws { - let channel = ActiveChannel(eventLoop: self.eventLoop) - - var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - - snapshot.pending = 0 - snapshot.openedConnectionsCount = 1 - let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) - snapshot.leasedConnections.insert(ConnectionKey(connection)) + var (state, connections) = self.buildState(count: 1, release: false) + XCTAssertState(state, available: 0, leased: 1, waiters: 0, pending: 0, opened: 1) - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) - - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(1, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) - - let action = self.http1ConnectionProvider.state.release(connection: connection, closing: true) + let connection = try XCTUnwrap(connections.first) + // closing should be true to test that we can discard connection that is still active, but caller indicated that it will be closed soon + let action = state.release(connection: connection, closing: true) switch action { case .closeProvider: - snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(0, snapshot.openedConnectionsCount) + XCTAssertState(state, available: 0, leased: 0, waiters: 0, pending: 0, opened: 0) default: XCTFail("Unexpected action: \(action)") } // cleanup - // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead - // (https://github.com/swift-server/async-http-client/issues/234) - self.http1ConnectionProvider.execute(action, logger: HTTPClient.loggingDisabled) + XCTAssertNil(state.close()) } func testReleaseInactiveConnectionEmptyQueue() throws { - let channel = EmbeddedChannel() - - var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - - snapshot.pending = 0 - snapshot.openedConnectionsCount = 1 - let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) - snapshot.leasedConnections.insert(ConnectionKey(connection)) + var (state, connections) = self.buildState(count: 1, release: false) + XCTAssertState(state, available: 0, leased: 1, waiters: 0, pending: 0, opened: 1) - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) - - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(1, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) - - let action = self.http1ConnectionProvider.state.release(connection: connection, closing: true) + let connection = try XCTUnwrap(connections.first) + connection.isActiveEstimation = false + // closing should be false to test that we check connection state in order to decided if we need to discard the connection + let action = state.release(connection: connection, closing: false) switch action { case .closeProvider: - snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(0, snapshot.openedConnectionsCount) - + XCTAssertState(state, available: 0, leased: 0, waiters: 0, pending: 0, opened: 0) default: XCTFail("Unexpected action: \(action)") } // cleanup - // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead - // (https://github.com/swift-server/async-http-client/issues/234) - self.http1ConnectionProvider.execute(action, logger: HTTPClient.loggingDisabled) + XCTAssertNil(state.close()) } func testReleaseInactiveConnectionEmptyQueueHasConnections() throws { - let channel = EmbeddedChannel() - - var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() + var (state, connections) = self.buildState(count: 2, release: false) + XCTAssertState(state, available: 0, leased: 2, waiters: 0, pending: 0, opened: 2) - snapshot.pending = 0 - snapshot.openedConnectionsCount = 2 - let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) - snapshot.leasedConnections.insert(ConnectionKey(connection)) - let available = Connection(channel: channel, provider: self.http1ConnectionProvider) - snapshot.availableConnections.append(available) + let connection = try XCTUnwrap(connections.first) - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) + // Return a connection to the pool + _ = state.release(connection: try XCTUnwrap(connections.dropFirst().first), closing: false) + XCTAssertState(state, available: 1, leased: 1, waiters: 0, pending: 0, opened: 2) - XCTAssertEqual(1, snapshot.availableConnections.count) - XCTAssertEqual(1, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(2, snapshot.openedConnectionsCount) - - let action = self.http1ConnectionProvider.state.release(connection: connection, closing: true) + let action = state.release(connection: connection, closing: true) switch action { case .none: - snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - XCTAssertEqual(1, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) + XCTAssertState(state, available: 1, leased: 0, waiters: 0, pending: 0, opened: 1) default: XCTFail("Unexpected action: \(action)") } // cleanup - // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead - // (https://github.com/swift-server/async-http-client/issues/234) - connection.remoteClosed(logger: HTTPClient.loggingDisabled) + try XCTAssertStateClose(state, available: 1, leased: 0, waiters: 0, clean: true) } func testReleaseAliveConnectionHasWaiter() throws { - let channel = ActiveChannel(eventLoop: self.eventLoop) - - var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - - snapshot.pending = 0 - snapshot.openedConnectionsCount = 1 - let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) - snapshot.leasedConnections.insert(ConnectionKey(connection)) - snapshot.waiters.append(.init(promise: channel.eventLoop.makePromise(), setupComplete: channel.eventLoop.makeSucceededFuture(()), preference: .indifferent)) + var (state, connections) = self.buildState(count: 8, release: false) - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) + // Add one waiter to the pool + XCTAssertTrue(state.enqueue()) + _ = state.acquire(waiter: .init(promise: self.eventLoop.makePromise(), setupComplete: self.eventLoop.makeSucceededFuture(()), preference: .indifferent)) - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(1, snapshot.leasedConnections.count) - XCTAssertEqual(1, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) + XCTAssertState(state, available: 0, leased: 8, waiters: 1, pending: 0, opened: 8) - let action = self.http1ConnectionProvider.state.release(connection: connection, closing: false) + let connection = try XCTUnwrap(connections.first) + let action = state.release(connection: connection, closing: false) switch action { case .lease(let connection, let waiter): - snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - XCTAssertTrue(snapshot.leasedConnections.contains(ConnectionKey(connection))) - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(1, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) - + XCTAssertState(state, available: 0, leased: 8, waiters: 0, pending: 0, opened: 8, isLeased: connection) // cleanup - // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead - // (https://github.com/swift-server/async-http-client/issues/234) waiter.promise.succeed(connection) - self.http1ConnectionProvider.release(connection: connection, closing: true, logger: HTTPClient.loggingDisabled) default: XCTFail("Unexpected action: \(action)") } + + // cleanup + try XCTAssertStateClose(state, available: 0, leased: 8, waiters: 0, clean: false) } func testReleaseInactiveConnectionHasWaitersNoConnections() throws { - let channel = EmbeddedChannel() + var (state, connections) = self.buildState(count: 8, release: false) + XCTAssertState(state, available: 0, leased: 8, waiters: 0, pending: 0, opened: 8) - var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - - snapshot.pending = 0 - snapshot.openedConnectionsCount = 1 - let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) - snapshot.leasedConnections.insert(ConnectionKey(connection)) - snapshot.waiters.append(.init(promise: channel.eventLoop.makePromise(), setupComplete: channel.eventLoop.makeSucceededFuture(()), preference: .indifferent)) - - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) - - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(1, snapshot.leasedConnections.count) - XCTAssertEqual(1, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) + // Add one waiter to the pool + XCTAssertTrue(state.enqueue()) + _ = state.acquire(waiter: .init(promise: self.eventLoop.makePromise(), setupComplete: self.eventLoop.makeSucceededFuture(()), preference: .indifferent)) + XCTAssertState(state, available: 0, leased: 8, waiters: 1, pending: 0, opened: 8) - let action = self.http1ConnectionProvider.state.release(connection: connection, closing: true) + let connection = try XCTUnwrap(connections.first) + let action = state.release(connection: connection, closing: true) switch action { case .create(let waiter): - snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) - + XCTAssertState(state, available: 0, leased: 7, waiters: 0, pending: 0, opened: 8) // cleanup - // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead - // (https://github.com/swift-server/async-http-client/issues/234) - // simulate create -> use -> release cycle - self.http1ConnectionProvider.connect(.failure(TempError()), waiter: waiter, logger: HTTPClient.loggingDisabled) + waiter.promise.fail(TempError()) default: XCTFail("Unexpected action: \(action)") } - } - - func testReleaseInactiveConnectionHasWaitersHasConnections() throws { - let channel = EmbeddedChannel() - - var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - - snapshot.pending = 0 - snapshot.openedConnectionsCount = 2 - let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) - snapshot.leasedConnections.insert(ConnectionKey(connection)) - snapshot.waiters.append(.init(promise: channel.eventLoop.makePromise(), setupComplete: channel.eventLoop.makeSucceededFuture(()), preference: .indifferent)) - - let available = Connection(channel: channel, provider: self.http1ConnectionProvider) - snapshot.availableConnections.append(available) - - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) - XCTAssertEqual(1, snapshot.availableConnections.count) - XCTAssertEqual(1, snapshot.leasedConnections.count) - XCTAssertEqual(1, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(2, snapshot.openedConnectionsCount) - - let action = self.http1ConnectionProvider.state.release(connection: connection, closing: false) - switch action { - case .lease(let connection, let waiter): - snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(1, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) - - // cleanup - // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead - // (https://github.com/swift-server/async-http-client/issues/234) - waiter.promise.succeed(connection) - self.http1ConnectionProvider.release(connection: connection, closing: true, logger: HTTPClient.loggingDisabled) - default: - XCTFail("Unexpected action: \(action)") - } + // cleanup + try XCTAssertStateClose(state, available: 0, leased: 7, waiters: 0, clean: false) } // MARK: - Release on Specific EL Tests func testReleaseAliveConnectionSameELHasWaiterSpecificEL() throws { - let channel = ActiveChannel(eventLoop: self.eventLoop) - - var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - - snapshot.pending = 0 - snapshot.openedConnectionsCount = 1 - let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) - snapshot.leasedConnections.insert(ConnectionKey(connection)) - snapshot.waiters.append(.init(promise: channel.eventLoop.makePromise(), setupComplete: channel.eventLoop.makeSucceededFuture(()), preference: .delegateAndChannel(on: channel.eventLoop))) + var (state, connections) = self.buildState(count: 8, release: false) + XCTAssertState(state, available: 0, leased: 8, waiters: 0, pending: 0, opened: 8) - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) - - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(1, snapshot.leasedConnections.count) - XCTAssertEqual(1, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) + // Add one waiter to the pool + XCTAssertTrue(state.enqueue()) + _ = state.acquire(waiter: .init(promise: self.eventLoop.makePromise(), setupComplete: self.eventLoop.makeSucceededFuture(()), preference: .delegateAndChannel(on: self.eventLoop))) + XCTAssertState(state, available: 0, leased: 8, waiters: 1, pending: 0, opened: 8) - let action = self.http1ConnectionProvider.state.release(connection: connection, closing: false) + let connection = try XCTUnwrap(connections.first) + let action = state.release(connection: connection, closing: false) switch action { case .lease(let connection, let waiter): - snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - XCTAssertTrue(snapshot.leasedConnections.contains(ConnectionKey(connection))) - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(1, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) - + XCTAssertState(state, available: 0, leased: 8, waiters: 0, pending: 0, opened: 8, isLeased: connection) // cleanup - // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead - // (https://github.com/swift-server/async-http-client/issues/234) waiter.promise.succeed(connection) - self.http1ConnectionProvider.release(connection: connection, closing: true, logger: HTTPClient.loggingDisabled) default: XCTFail("Unexpected action: \(action)") } - } - - func testReleaseAliveConnectionDifferentELNoSameELConnectionsHasWaiterSpecificEL() throws { - let differentEL = EmbeddedEventLoop() - defer { - XCTAssertNoThrow(try differentEL.syncShutdownGracefully()) - } - let channel = ActiveChannel(eventLoop: differentEL) // Channel on different EL, that's important for the test. - var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - - snapshot.pending = 0 - snapshot.openedConnectionsCount = 1 - let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) - snapshot.leasedConnections.insert(ConnectionKey(connection)) - snapshot.waiters.append(.init(promise: channel.eventLoop.makePromise(of: Connection.self), - setupComplete: self.eventLoop.makeSucceededFuture(()), - preference: .delegateAndChannel(on: self.eventLoop))) - - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) - - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(1, snapshot.leasedConnections.count) - XCTAssertEqual(1, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) - - let action = self.http1ConnectionProvider.state.release(connection: connection, closing: false) - switch action { - case .parkAnd(let connection, .create(let waiter)): - snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - XCTAssertFalse(snapshot.leasedConnections.contains(ConnectionKey(connection))) - XCTAssertEqual(1, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(2, snapshot.openedConnectionsCount) - // cleanup - // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead - // (https://github.com/swift-server/async-http-client/issues/234) - // simulate create -> use -> release cycle - self.http1ConnectionProvider.connect(.failure(TempError()), waiter: waiter, logger: HTTPClient.loggingDisabled) - connection.remoteClosed(logger: HTTPClient.loggingDisabled) - default: - XCTFail("Unexpected action: \(action)") - } - } - - func testReleaseAliveConnectionDifferentELHasSameELConnectionsHasWaiterSpecificEL() throws { - let channel = ActiveChannel(eventLoop: self.eventLoop) - let otherChannel = EmbeddedChannel() - - var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - - snapshot.pending = 0 - snapshot.openedConnectionsCount = 2 - let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) - snapshot.leasedConnections.insert(ConnectionKey(connection)) - snapshot.waiters.append(.init(promise: channel.eventLoop.makePromise(), setupComplete: channel.eventLoop.makeSucceededFuture(()), preference: .delegateAndChannel(on: otherChannel.eventLoop))) - - let available = Connection(channel: otherChannel, provider: self.http1ConnectionProvider) - snapshot.availableConnections.append(available) - - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) - - XCTAssertEqual(1, snapshot.availableConnections.count) - XCTAssertEqual(1, snapshot.leasedConnections.count) - XCTAssertEqual(1, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(2, snapshot.openedConnectionsCount) - - let action = self.http1ConnectionProvider.state.release(connection: connection, closing: false) - switch action { - case .parkAnd(let connection, .lease(let replacement, let waiter)): - snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - XCTAssertFalse(snapshot.leasedConnections.contains(ConnectionKey(connection))) - XCTAssertTrue(snapshot.leasedConnections.contains(ConnectionKey(replacement))) - XCTAssertEqual(1, snapshot.availableConnections.count) - XCTAssertEqual(1, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(2, snapshot.openedConnectionsCount) - - // cleanup - waiter.promise.succeed(replacement) - connection.remoteClosed(logger: HTTPClient.loggingDisabled) - self.http1ConnectionProvider.release(connection: replacement, closing: true, logger: HTTPClient.loggingDisabled) - default: - XCTFail("Unexpected action: \(action)") - } + try XCTAssertStateClose(state, available: 0, leased: 8, waiters: 0, clean: false) } func testReleaseAliveConnectionDifferentELNoSameELConnectionsOnLimitHasWaiterSpecificEL() throws { - let channel = ActiveChannel(eventLoop: self.eventLoop) - let otherChannel = EmbeddedChannel() - - var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() + var (state, connections) = self.buildState(count: 8, release: false) + XCTAssertState(state, available: 0, leased: 8, waiters: 0, pending: 0, opened: 8) - snapshot.pending = 0 - snapshot.openedConnectionsCount = 8 - let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) - snapshot.leasedConnections.insert(ConnectionKey(connection)) - snapshot.waiters.append(.init(promise: channel.eventLoop.makePromise(), setupComplete: channel.eventLoop.makeSucceededFuture(()), preference: .delegateAndChannel(on: otherChannel.eventLoop))) - - let available = Connection(channel: channel, provider: self.http1ConnectionProvider) - snapshot.availableConnections.append(available) - - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) - - XCTAssertEqual(1, snapshot.availableConnections.count) - XCTAssertEqual(1, snapshot.leasedConnections.count) - XCTAssertEqual(1, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(8, snapshot.openedConnectionsCount) + let differentEL = EmbeddedEventLoop() + // Add one waiter to the pool + XCTAssertTrue(state.enqueue()) + _ = state.acquire(waiter: .init(promise: differentEL.makePromise(), setupComplete: differentEL.makeSucceededFuture(()), preference: .delegateAndChannel(on: differentEL))) + XCTAssertState(state, available: 0, leased: 8, waiters: 1, pending: 0, opened: 8) - let action = self.http1ConnectionProvider.state.release(connection: connection, closing: false) + let connection = try XCTUnwrap(connections.first) + let action = state.release(connection: connection, closing: false) switch action { case .replace(let connection, let waiter): - snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - XCTAssertTrue(snapshot.leasedConnections.contains(ConnectionKey(connection))) - XCTAssertEqual(1, snapshot.availableConnections.count) - XCTAssertEqual(1, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(8, snapshot.openedConnectionsCount) - + XCTAssertState(state, available: 0, leased: 7, waiters: 0, pending: 0, opened: 8, isNotLeased: connection) // cleanup - // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead - // (https://github.com/swift-server/async-http-client/issues/234) waiter.promise.fail(TempError()) - snapshot.openedConnectionsCount = 2 - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) - - snapshot.availableConnections.forEach { $0.remoteClosed(logger: HTTPClient.loggingDisabled) } - self.http1ConnectionProvider.release(connection: connection, closing: true, logger: HTTPClient.loggingDisabled) default: XCTFail("Unexpected action: \(action)") } - } - - func testReleaseInactiveConnectionHasWaitersHasSameELConnectionsSpecificEL() throws { - let channel = EmbeddedChannel() - let otherChannel = EmbeddedChannel() - - var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - - snapshot.pending = 0 - snapshot.openedConnectionsCount = 2 - let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) - snapshot.leasedConnections.insert(ConnectionKey(connection)) - snapshot.waiters.append(.init(promise: channel.eventLoop.makePromise(), setupComplete: channel.eventLoop.makeSucceededFuture(()), preference: .delegateAndChannel(on: otherChannel.eventLoop))) - - let available = Connection(channel: otherChannel, provider: self.http1ConnectionProvider) - snapshot.availableConnections.append(available) - - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) - - XCTAssertEqual(1, snapshot.availableConnections.count) - XCTAssertEqual(1, snapshot.leasedConnections.count) - XCTAssertEqual(1, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(2, snapshot.openedConnectionsCount) - - let action = self.http1ConnectionProvider.state.release(connection: connection, closing: false) - switch action { - case .lease(let connection, let waiter): - snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - XCTAssertTrue(connection === available) - XCTAssertTrue(snapshot.leasedConnections.contains(ConnectionKey(connection))) - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(1, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) - // cleanup - // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead - // (https://github.com/swift-server/async-http-client/issues/234) - waiter.promise.succeed(connection) - self.http1ConnectionProvider.release(connection: connection, closing: true, logger: HTTPClient.loggingDisabled) - default: - XCTFail("Unexpected action: \(action)") - } - } - - func testReleaseInactiveConnectionHasWaitersNoSameELConnectionsSpecificEL() throws { - let channel = EmbeddedChannel() - let otherChannel = EmbeddedChannel() - - var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - - snapshot.pending = 0 - snapshot.openedConnectionsCount = 2 - let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) - snapshot.leasedConnections.insert(ConnectionKey(connection)) - snapshot.waiters.append(.init(promise: channel.eventLoop.makePromise(), setupComplete: channel.eventLoop.makeSucceededFuture(()), preference: .delegateAndChannel(on: otherChannel.eventLoop))) - - let available = Connection(channel: channel, provider: self.http1ConnectionProvider) - snapshot.availableConnections.append(available) - - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) - - XCTAssertEqual(1, snapshot.availableConnections.count) - XCTAssertEqual(1, snapshot.leasedConnections.count) - XCTAssertEqual(1, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(2, snapshot.openedConnectionsCount) - - let action = self.http1ConnectionProvider.state.release(connection: connection, closing: false) - switch action { - case .create(let waiter): - snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - XCTAssertEqual(1, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(2, snapshot.openedConnectionsCount) - - // cleanup - // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead - // (https://github.com/swift-server/async-http-client/issues/234)s - self.http1ConnectionProvider.connect(.failure(TempError()), waiter: waiter, logger: HTTPClient.loggingDisabled) - snapshot.availableConnections.forEach { $0.remoteClosed(logger: HTTPClient.loggingDisabled) } - default: - XCTFail("Unexpected action: \(action)") - } + try XCTAssertStateClose(state, available: 0, leased: 7, waiters: 0, clean: false) } // MARK: - Next Waiter Tests func testNextWaiterEmptyQueue() throws { - var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - - snapshot.pending = 0 - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) + var (state, _) = self.buildState(count: 0) + XCTAssertState(state, available: 0, leased: 0, waiters: 0, pending: 0, opened: 0) - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(0, snapshot.openedConnectionsCount) - - let action = self.http1ConnectionProvider.state.processNextWaiter() + let action = state.processNextWaiter() switch action { case .closeProvider: - snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(0, snapshot.openedConnectionsCount) + XCTAssertState(state, available: 0, leased: 0, waiters: 0, pending: 0, opened: 0) default: XCTFail("Unexpected action: \(action)") } - - // cleanup - // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead - // (https://github.com/swift-server/async-http-client/issues/234) - self.http1ConnectionProvider.execute(action, logger: HTTPClient.loggingDisabled) } func testNextWaiterEmptyQueueHasConnections() throws { - let channel = ActiveChannel(eventLoop: self.eventLoop) - var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - - snapshot.pending = 0 - snapshot.openedConnectionsCount = 1 + var (state, _) = self.buildState(count: 1, release: true) + XCTAssertState(state, available: 1, leased: 0, waiters: 0, pending: 0, opened: 1) - let available = Connection(channel: channel, provider: self.http1ConnectionProvider) - snapshot.availableConnections.append(available) - - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) - - XCTAssertEqual(1, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) - - let action = self.http1ConnectionProvider.state.processNextWaiter() + let action = state.processNextWaiter() switch action { case .none: - snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - XCTAssertEqual(1, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) - - // cleanup - // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead - // (https://github.com/swift-server/async-http-client/issues/234) - available.remoteClosed(logger: HTTPClient.loggingDisabled) - default: - XCTFail("Unexpected action: \(action)") - } - } - - func testNextWaiterHasWaitersHasConnections() throws { - let channel = EmbeddedChannel() - - var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - - snapshot.pending = 0 - snapshot.openedConnectionsCount = 1 - snapshot.waiters.append(.init(promise: channel.eventLoop.makePromise(), setupComplete: channel.eventLoop.makeSucceededFuture(()), preference: .indifferent)) - - let available = Connection(channel: channel, provider: self.http1ConnectionProvider) - snapshot.availableConnections.append(available) - - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) - - XCTAssertEqual(1, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(1, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) - - let action = self.http1ConnectionProvider.state.processNextWaiter() - switch action { - case .lease(let connection, let waiter): - snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - XCTAssertTrue(snapshot.leasedConnections.contains(ConnectionKey(connection))) - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(1, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) - - // cleanup - // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead - // (https://github.com/swift-server/async-http-client/issues/234) - waiter.promise.fail(TempError()) - self.http1ConnectionProvider.release(connection: connection, closing: true, logger: HTTPClient.loggingDisabled) - default: - XCTFail("Unexpected action: \(action)") - } - } - - func testNextWaiterHasWaitersHasSameELConnectionsSpecificEL() throws { - let channel = EmbeddedChannel() - - var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - - snapshot.pending = 0 - snapshot.openedConnectionsCount = 1 - snapshot.waiters.append(.init(promise: channel.eventLoop.makePromise(), setupComplete: channel.eventLoop.makeSucceededFuture(()), preference: .delegateAndChannel(on: channel.eventLoop))) - - let available = Connection(channel: channel, provider: self.http1ConnectionProvider) - snapshot.availableConnections.append(available) - - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) - - XCTAssertEqual(1, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(1, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) - - let action = self.http1ConnectionProvider.state.processNextWaiter() - switch action { - case .lease(let connection, let waiter): - let snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - XCTAssertTrue(snapshot.leasedConnections.contains(ConnectionKey(connection))) - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(1, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) - - // cleanup - // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead - // (https://github.com/swift-server/async-http-client/issues/234) - waiter.promise.fail(TempError()) - self.http1ConnectionProvider.release(connection: connection, closing: true, logger: HTTPClient.loggingDisabled) - default: - XCTFail("Unexpected action: \(action)") - } - } - - func testNextWaiterHasWaitersHasDifferentELConnectionsSpecificEL() throws { - let channel = EmbeddedChannel() - - var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - - snapshot.pending = 0 - snapshot.openedConnectionsCount = 1 - snapshot.waiters.append(.init(promise: channel.eventLoop.makePromise(), setupComplete: self.eventLoop.makeSucceededFuture(()), preference: .delegateAndChannel(on: self.eventLoop))) - - let available = Connection(channel: channel, provider: self.http1ConnectionProvider) - snapshot.availableConnections.append(available) - - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) - - XCTAssertEqual(1, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(1, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) - - let action = self.http1ConnectionProvider.state.processNextWaiter() - switch action { - case .create(let waiter): - snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - XCTAssertEqual(1, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(2, snapshot.openedConnectionsCount) - - // cleanup - // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead - // (https://github.com/swift-server/async-http-client/issues/234) - // simulate create -> use -> release cycle - self.http1ConnectionProvider.connect(.failure(TempError()), waiter: waiter, logger: HTTPClient.loggingDisabled) - available.remoteClosed(logger: HTTPClient.loggingDisabled) + XCTAssertState(state, available: 1, leased: 0, waiters: 0, pending: 0, opened: 1) default: XCTFail("Unexpected action: \(action)") } @@ -1066,61 +413,25 @@ class ConnectionPoolTests: XCTestCase { // MARK: - Timeout and Remote Close Tests func testTimeoutLeasedConnection() throws { - let channel = EmbeddedChannel() - - var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - - let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) - snapshot.pending = 0 - snapshot.openedConnectionsCount = 1 - snapshot.leasedConnections.insert(ConnectionKey(connection)) - - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) + var (state, connections) = self.buildState(count: 1, release: false) + XCTAssertState(state, available: 0, leased: 1, waiters: 0, pending: 0, opened: 1) - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(1, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) - - let action = self.http1ConnectionProvider.state.timeout(connection: connection) + let connection = try XCTUnwrap(connections.first) + let action = state.timeout(connection: connection) switch action { case .none: - snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(1, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) + XCTAssertState(state, available: 0, leased: 1, waiters: 0, pending: 0, opened: 1) default: XCTFail("Unexpected action: \(action)") } - - // cleanup - // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead - // (https://github.com/swift-server/async-http-client/issues/234) - self.http1ConnectionProvider.release(connection: connection, closing: true, logger: HTTPClient.loggingDisabled) } func testTimeoutAvailableConnection() throws { - let channel = EmbeddedChannel() - - var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - - let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) - snapshot.pending = 0 - snapshot.openedConnectionsCount = 1 - snapshot.availableConnections.append(connection) - - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) + var (state, connections) = self.buildState(count: 1) + XCTAssertState(state, available: 1, leased: 0, waiters: 0, pending: 0, opened: 1) - XCTAssertEqual(1, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) - - let action = self.http1ConnectionProvider.state.timeout(connection: connection) + let connection = try XCTUnwrap(connections.first) + let action = state.timeout(connection: connection) switch action { case .closeAnd(_, let after): switch after { @@ -1129,104 +440,51 @@ class ConnectionPoolTests: XCTestCase { default: XCTFail("Unexpected action: \(action)") } - snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(0, snapshot.openedConnectionsCount) + XCTAssertState(state, available: 0, leased: 0, waiters: 0, pending: 0, opened: 0) default: XCTFail("Unexpected action: \(action)") } - - // cleanup - // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead - // (https://github.com/swift-server/async-http-client/issues/234) - self.http1ConnectionProvider.execute(action, logger: HTTPClient.loggingDisabled) } func testRemoteClosedLeasedConnection() throws { - let channel = EmbeddedChannel() + var (state, connections) = self.buildState(count: 1, release: false) - var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() + XCTAssertState(state, available: 0, leased: 1, waiters: 0, pending: 0, opened: 1) - let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) - snapshot.pending = 0 - snapshot.openedConnectionsCount = 1 - snapshot.leasedConnections.insert(ConnectionKey(connection)) - - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) - - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(1, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) - - let action = self.http1ConnectionProvider.state.remoteClosed(connection: connection) + // This can happen when just leased connection is closed before TaskHandler is added to pipeline + let connection = try XCTUnwrap(connections.first) + let action = state.remoteClosed(connection: connection) switch action { case .none: - snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(1, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) + XCTAssertState(state, available: 0, leased: 1, waiters: 0, pending: 0, opened: 1) default: XCTFail("Unexpected action: \(action)") } - - // cleanup - // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead - // (https://github.com/swift-server/async-http-client/issues/234) - self.http1ConnectionProvider.release(connection: connection, closing: true, logger: HTTPClient.loggingDisabled) } func testRemoteClosedAvailableConnection() throws { - let channel = EmbeddedChannel() - - var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - - let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) - snapshot.pending = 0 - snapshot.openedConnectionsCount = 1 - snapshot.availableConnections.append(connection) + var (state, connections) = self.buildState(count: 1) - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) + XCTAssertState(state, available: 1, leased: 0, waiters: 0, pending: 0, opened: 1) - XCTAssertEqual(1, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) - - let action = self.http1ConnectionProvider.state.remoteClosed(connection: connection) + let connection = try XCTUnwrap(connections.first) + let action = state.remoteClosed(connection: connection) switch action { case .closeProvider: - snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(0, snapshot.openedConnectionsCount) + XCTAssertState(state, available: 0, leased: 0, waiters: 0, pending: 0, opened: 0) default: XCTFail("Unexpected action: \(action)") } - - // cleanup - // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead - // (https://github.com/swift-server/async-http-client/issues/234) - self.http1ConnectionProvider.execute(action, logger: HTTPClient.loggingDisabled) } // MARK: - Shutdown tests func testShutdownOnPendingAndSuccess() { - var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop) + var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop) XCTAssertTrue(state.enqueue()) - let connectionPromise = self.eventLoop.makePromise(of: Connection.self) + let connectionPromise = self.eventLoop.makePromise(of: ConnectionForTests.self) let setupPromise = self.eventLoop.makePromise(of: Void.self) let waiter = HTTP1ConnectionProvider.Waiter(promise: connectionPromise, setupComplete: setupPromise.futureResult, preference: .indifferent) var action = state.acquire(waiter: waiter) @@ -1248,7 +506,7 @@ class ConnectionPoolTests: XCTestCase { XCTFail("Expecting snapshot") } - let connection = Connection(channel: EmbeddedChannel(), provider: self.http1ConnectionProvider) + let connection = ConnectionForTests(eventLoop: self.eventLoop) action = state.offer(connection: connection) guard case .closeAnd(_, .closeProvider) = action else { @@ -1261,11 +519,11 @@ class ConnectionPoolTests: XCTestCase { } func testShutdownOnPendingAndError() { - var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop) + var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop) XCTAssertTrue(state.enqueue()) - let connectionPromise = self.eventLoop.makePromise(of: Connection.self) + let connectionPromise = self.eventLoop.makePromise(of: ConnectionForTests.self) let setupPromise = self.eventLoop.makePromise(of: Void.self) let waiter = HTTP1ConnectionProvider.Waiter(promise: connectionPromise, setupComplete: setupPromise.futureResult, preference: .indifferent) var action = state.acquire(waiter: waiter) @@ -1297,16 +555,8 @@ class ConnectionPoolTests: XCTestCase { setupPromise.succeed(()) } - func testShutdownTimeout() { - var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop) - - var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - - let connection = Connection(channel: EmbeddedChannel(), provider: self.http1ConnectionProvider) - snapshot.availableConnections.append(connection) - snapshot.openedConnectionsCount = 1 - - state.testsOnly_setInternalState(snapshot) + func testShutdownTimeout() throws { + var (state, connections) = self.buildState(count: 1) if let (waiters, available, leased, clean) = state.close() { XCTAssertTrue(waiters.isEmpty) @@ -1317,6 +567,7 @@ class ConnectionPoolTests: XCTestCase { XCTFail("Expecting snapshot") } + let connection = try XCTUnwrap(connections.first) let action = state.timeout(connection: connection) switch action { case .closeAnd(_, let next): @@ -1332,16 +583,8 @@ class ConnectionPoolTests: XCTestCase { } } - func testShutdownRemoteClosed() { - var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop) - - var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - - let connection = Connection(channel: EmbeddedChannel(), provider: self.http1ConnectionProvider) - snapshot.availableConnections.append(connection) - snapshot.openedConnectionsCount = 1 - - state.testsOnly_setInternalState(snapshot) + func testShutdownRemoteClosed() throws { + var (state, connections) = self.buildState(count: 1) if let (waiters, available, leased, clean) = state.close() { XCTAssertTrue(waiters.isEmpty) @@ -1352,6 +595,7 @@ class ConnectionPoolTests: XCTestCase { XCTFail("Expecting snapshot") } + let connection = try XCTUnwrap(connections.first) let action = state.remoteClosed(connection: connection) switch action { case .closeProvider: @@ -1362,26 +606,14 @@ class ConnectionPoolTests: XCTestCase { } } - // MARK: - Helpers - override func setUp() { XCTAssertNil(self.eventLoop) - XCTAssertNil(self.http1ConnectionProvider) self.eventLoop = EmbeddedEventLoop() - XCTAssertNoThrow(self.http1ConnectionProvider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), - eventLoop: self.eventLoop, - configuration: .init(), - pool: .init(configuration: .init(), - backgroundActivityLogger: HTTPClient.loggingDisabled), - backgroundActivityLogger: HTTPClient.loggingDisabled)) } override func tearDown() { XCTAssertNotNil(self.eventLoop) - XCTAssertNotNil(self.http1ConnectionProvider) - XCTAssertNoThrow(try self.http1ConnectionProvider.close().wait()) XCTAssertNoThrow(try self.eventLoop.syncShutdownGracefully()) self.eventLoop = nil - self.http1ConnectionProvider = nil } } diff --git a/Tests/AsyncHTTPClientTests/ConnectionPoolTestsSupport.swift b/Tests/AsyncHTTPClientTests/ConnectionPoolTestsSupport.swift index 70edd1426..91f8f4fc7 100644 --- a/Tests/AsyncHTTPClientTests/ConnectionPoolTestsSupport.swift +++ b/Tests/AsyncHTTPClientTests/ConnectionPoolTestsSupport.swift @@ -16,23 +16,35 @@ import NIO import XCTest +class ConnectionForTests: PoolManageableConnection { + var eventLoop: EventLoop + var isActiveEstimation: Bool + + init(eventLoop: EventLoop) { + self.eventLoop = eventLoop + self.isActiveEstimation = true + } + + func cancel() -> EventLoopFuture { + return self.eventLoop.makeSucceededFuture(()) + } +} + extension ConnectionPoolTests { - func buildState(count: Int, release: Bool = true, eventLoop: EventLoop? = nil) -> (HTTP1ConnectionProvider.ConnectionsState, [Connection]) { + func buildState(count: Int, release: Bool = true, eventLoop: EventLoop? = nil) -> (HTTP1ConnectionProvider.ConnectionsState, [ConnectionForTests]) { let eventLoop = eventLoop ?? self.eventLoop! - var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: eventLoop) - var items: [Connection] = [] + var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: eventLoop) + var items: [ConnectionForTests] = [] if count == 0 { return (state, items) } - let channel = ActiveChannel(eventLoop: self.eventLoop) - for _ in 1...count { // Set up connection pool to have one available connection do { - let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) + let connection = ConnectionForTests(eventLoop: eventLoop) items.append(connection) // First, we ask the empty pool for a connection, triggering connection creation XCTAssertTrue(state.enqueue())