From f8869804d9aba3dac3cb41837a30263932bec2c6 Mon Sep 17 00:00:00 2001 From: Ilya Teterin Date: Sat, 26 Sep 2020 13:50:22 +0100 Subject: [PATCH] Fixes #234 by removing setter on internal ConnectionsState so modification allowed only using exposed API. Motivation: Having a setter for internal state of ConnectionsState led to a subset of test testing invalid invariants, for example when we have at the same time an available connecion and a waiter, which is an invalid state of the system. Modifications: * test of ConnectionsState * ConnectionsState * ConnectionPool are modified so the state under tests is achieved only by a sequence of modifications invoked by state API. During modification some tests are eliminated as they were testing artificial state, which can not be achieved by exposed APIs. ConnectionsState is pruned from "replace" as in no valid state we can have a situation when we can "replace" a connection. Invalid invariants and tests are removed. Result: We do not have a way to modify state of the ConnectionsState by direct interaction with private state of the object. Getter on the state is considered harmless and used for tests only. --- Sources/AsyncHTTPClient/ConnectionPool.swift | 70 +- .../AsyncHTTPClient/ConnectionsState.swift | 75 +- Sources/AsyncHTTPClient/Utils.swift | 59 + .../ConnectionPoolTests+XCTest.swift | 8 - .../ConnectionPoolTests.swift | 1112 +++-------------- .../ConnectionPoolTestsSupport.swift | 24 +- 6 files changed, 282 insertions(+), 1066 deletions(-) diff --git a/Sources/AsyncHTTPClient/ConnectionPool.swift b/Sources/AsyncHTTPClient/ConnectionPool.swift index ed9bcbd8c..8d8fda20e 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool.swift @@ -235,7 +235,7 @@ class HTTP1ConnectionProvider { logger.trace("opening fresh connection (found matching but inactive connection)", metadata: ["ahc-dead-connection": "\(connection)"]) self.makeChannel(preference: waiter.preference).whenComplete { result in - self.connect(result, waiter: waiter, replacing: connection, logger: logger) + self.connect(result, waiter: waiter, logger: logger) } } } @@ -252,7 +252,7 @@ class HTTP1ConnectionProvider { metadata: ["ahc-old-connection": "\(connection)", "ahc-waiter": "\(waiter)"]) self.makeChannel(preference: waiter.preference).whenComplete { result in - self.connect(result, waiter: waiter, replacing: connection, logger: logger) + self.connect(result, waiter: waiter, logger: logger) } } case .park(let connection): @@ -308,10 +308,7 @@ class HTTP1ConnectionProvider { return waiter.promise.futureResult } - func connect(_ result: Result, - waiter: Waiter, - replacing closedConnection: Connection? = nil, - logger: Logger) { + func connect(_ result: Result, waiter: Waiter, logger: Logger) { let action: Action switch result { case .success(let channel): @@ -319,10 +316,7 @@ class HTTP1ConnectionProvider { metadata: ["ahc-connection": "\(channel)"]) let connection = Connection(channel: channel, provider: self) action = self.lock.withLock { - if let closedConnection = closedConnection { - self.state.drop(connection: closedConnection) - } - return self.state.offer(connection: connection) + self.state.offer(connection: connection) } switch action { @@ -367,7 +361,7 @@ class HTTP1ConnectionProvider { // This is needed to start a new stack, otherwise, since this is called on a previous // future completion handler chain, it will be growing indefinitely until the connection is closed. // We might revisit this when https://github.com/apple/swift-nio/issues/970 is resolved. - connection.channel.eventLoop.execute { + connection.eventLoop.execute { self.execute(action, logger: logger) } } @@ -418,59 +412,7 @@ class HTTP1ConnectionProvider { } private func makeChannel(preference: HTTPClient.EventLoopPreference) -> EventLoopFuture { - let channelEventLoop = preference.bestEventLoop ?? self.eventLoop - let requiresTLS = self.key.scheme.requiresTLS - let bootstrap: NIOClientTCPBootstrap - do { - bootstrap = try NIOClientTCPBootstrap.makeHTTPClientBootstrapBase(on: channelEventLoop, host: self.key.host, port: self.key.port, requiresTLS: requiresTLS, configuration: self.configuration) - } catch { - return channelEventLoop.makeFailedFuture(error) - } - - let channel: EventLoopFuture - switch self.key.scheme { - case .http, .https: - let address = HTTPClient.resolveAddress(host: self.key.host, port: self.key.port, proxy: self.configuration.proxy) - channel = bootstrap.connect(host: address.host, port: address.port) - case .unix, .http_unix, .https_unix: - channel = bootstrap.connect(unixDomainSocketPath: self.key.unixPath) - } - - return channel.flatMap { channel in - let requiresSSLHandler = self.configuration.proxy != nil && self.key.scheme.requiresTLS - let handshakePromise = channel.eventLoop.makePromise(of: Void.self) - - channel.pipeline.addSSLHandlerIfNeeded(for: self.key, tlsConfiguration: self.configuration.tlsConfiguration, addSSLClient: requiresSSLHandler, handshakePromise: handshakePromise) - - return handshakePromise.futureResult.flatMap { - channel.pipeline.addHTTPClientHandlers(leftOverBytesStrategy: .forwardBytes) - }.flatMap { - #if canImport(Network) - if #available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *), bootstrap.underlyingBootstrap is NIOTSConnectionBootstrap { - return channel.pipeline.addHandler(HTTPClient.NWErrorHandler(), position: .first) - } - #endif - return channel.eventLoop.makeSucceededFuture(()) - }.flatMap { - switch self.configuration.decompression { - case .disabled: - return channel.eventLoop.makeSucceededFuture(()) - case .enabled(let limit): - let decompressHandler = NIOHTTPResponseDecompressor(limit: limit) - return channel.pipeline.addHandler(decompressHandler) - } - }.map { - channel - } - }.flatMapError { error in - #if canImport(Network) - var error = error - if #available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *), bootstrap.underlyingBootstrap is NIOTSConnectionBootstrap { - error = HTTPClient.NWErrorHandler.translateError(error) - } - #endif - return channelEventLoop.makeFailedFuture(error) - } + return NIOClientTCPBootstrap.makeHTTP1Channel(destination: self.key, eventLoop: self.eventLoop, configuration: self.configuration, preference: preference) } /// A `Waiter` represents a request that waits for a connection when none is diff --git a/Sources/AsyncHTTPClient/ConnectionsState.swift b/Sources/AsyncHTTPClient/ConnectionsState.swift index 2ceace465..3f593baca 100644 --- a/Sources/AsyncHTTPClient/ConnectionsState.swift +++ b/Sources/AsyncHTTPClient/ConnectionsState.swift @@ -74,15 +74,6 @@ extension HTTP1ConnectionProvider { return Snapshot(state: self.state, availableConnections: self.availableConnections, leasedConnections: self.leasedConnections, waiters: self.waiters, openedConnectionsCount: self.openedConnectionsCount, pending: self.pending) } - mutating func testsOnly_setInternalState(_ snapshot: Snapshot) { - self.state = snapshot.state - self.availableConnections = snapshot.availableConnections - self.leasedConnections = snapshot.leasedConnections - self.waiters = snapshot.waiters - self.openedConnectionsCount = snapshot.openedConnectionsCount - self.pending = snapshot.pending - } - func assertInvariants() { assert(self.waiters.isEmpty) assert(self.availableConnections.isEmpty) @@ -158,6 +149,12 @@ extension HTTP1ConnectionProvider { if connection.isActiveEstimation, !closing { // If connection is alive, we can offer it to a next waiter if let waiter = self.waiters.popFirst() { + // There should be no case where we have both capacity and a waiter here. + // Waiter can only exists if there was no capacity at aquire. If some connection + // is released when we have waiter it can only indicate that we should lease (if EL are the same), + // or replace (if they are different). But we cannot increase connection count here. + assert(!self.hasCapacity) + let (eventLoop, required) = self.resolvePreference(waiter.preference) // If returned connection is on same EL or we do not require special EL - lease it @@ -165,23 +162,9 @@ extension HTTP1ConnectionProvider { return .lease(connection, waiter) } - // If there is an opened connection on the same loop, lease it and park returned - if let found = self.availableConnections.firstIndex(where: { $0.eventLoop === eventLoop }) { - self.leasedConnections.remove(ConnectionKey(connection)) - let replacement = self.availableConnections.swap(at: found, with: connection) - self.leasedConnections.insert(ConnectionKey(replacement)) - return .parkAnd(connection, .lease(replacement, waiter)) - } - - // If we can create new connection - do it - if self.hasCapacity { - self.leasedConnections.remove(ConnectionKey(connection)) - self.availableConnections.append(connection) - self.openedConnectionsCount += 1 - return .parkAnd(connection, .create(waiter)) - } - // If we cannot create new connections, we will have to replace returned connection with a new one on the required loop + // We will keep the `openedConnectionCount`, since .replace === .create, so we decrease and increase the `openedConnectionCount` + self.leasedConnections.remove(ConnectionKey(connection)) return .replace(connection, waiter) } else { // or park, if there are no waiters self.leasedConnections.remove(ConnectionKey(connection)) @@ -214,15 +197,6 @@ extension HTTP1ConnectionProvider { } } - mutating func drop(connection: ConnectionType) { - switch self.state { - case .active: - self.leasedConnections.remove(ConnectionKey(connection)) - case .closed: - assertionFailure("should not happen") - } - } - mutating func connectFailed() -> Action { switch self.state { case .active: @@ -287,20 +261,25 @@ extension HTTP1ConnectionProvider { mutating func processNextWaiter() -> Action { if let waiter = self.waiters.popFirst() { - let (eventLoop, required) = self.resolvePreference(waiter.preference) - - // If specific EL is required, we have only two options - find open one or create a new one - if required, let found = self.availableConnections.firstIndex(where: { $0.eventLoop === eventLoop }) { - let connection = self.availableConnections.remove(at: found) - self.leasedConnections.insert(ConnectionKey(connection)) - return .lease(connection, waiter) - } else if !required, let connection = self.availableConnections.popFirst() { - self.leasedConnections.insert(ConnectionKey(connection)) - return .lease(connection, waiter) - } else { - self.openedConnectionsCount += 1 - return .create(waiter) - } + // There should be no case where we have waiters and available connections at the same time. + // + // This method is called in following cases: + // + // 1. from `release` when connection is inactive and cannot be re-used + // 2. from `connectFailed` when we failed to establish a new connection + // 3. from `remoteClose` when connection was closed by the remote side and cannot be re-used + // 4. from `timeout` when connection was closed due to idle timeout and cannot be re-used. + // + // In all cases connection, which triggered the transition, will not be in `available` state. + // + // Given that the waiter can only be present in the pool if there were no available connections + // (otherwise it had been leased a connection immediately on getting the connection), we do not + // see a situation when we can lease another available connection, therefore the only course + // of action is to create a new connection for the waiter. + assert(self.availableConnections.isEmpty) + + self.openedConnectionsCount += 1 + return .create(waiter) } // if capacity is at max and the are no waiters and no in-flight requests for connection, we are closing this provider diff --git a/Sources/AsyncHTTPClient/Utils.swift b/Sources/AsyncHTTPClient/Utils.swift index dd30e0393..30fbd8107 100644 --- a/Sources/AsyncHTTPClient/Utils.swift +++ b/Sources/AsyncHTTPClient/Utils.swift @@ -138,6 +138,65 @@ extension NIOClientTCPBootstrap { return channelAddedFuture } } + + static func makeHTTP1Channel(destination: ConnectionPool.Key, eventLoop: EventLoop, configuration: HTTPClient.Configuration, preference: HTTPClient.EventLoopPreference) -> EventLoopFuture { + let channelEventLoop = preference.bestEventLoop ?? eventLoop + + let key = destination + + let requiresTLS = key.scheme.requiresTLS + let bootstrap: NIOClientTCPBootstrap + do { + bootstrap = try NIOClientTCPBootstrap.makeHTTPClientBootstrapBase(on: channelEventLoop, host: key.host, port: key.port, requiresTLS: requiresTLS, configuration: configuration) + } catch { + return channelEventLoop.makeFailedFuture(error) + } + + let channel: EventLoopFuture + switch key.scheme { + case .http, .https: + let address = HTTPClient.resolveAddress(host: key.host, port: key.port, proxy: configuration.proxy) + channel = bootstrap.connect(host: address.host, port: address.port) + case .unix, .http_unix, .https_unix: + channel = bootstrap.connect(unixDomainSocketPath: key.unixPath) + } + + return channel.flatMap { channel in + let requiresSSLHandler = configuration.proxy != nil && key.scheme.requiresTLS + let handshakePromise = channel.eventLoop.makePromise(of: Void.self) + + channel.pipeline.addSSLHandlerIfNeeded(for: key, tlsConfiguration: configuration.tlsConfiguration, addSSLClient: requiresSSLHandler, handshakePromise: handshakePromise) + + return handshakePromise.futureResult.flatMap { + channel.pipeline.addHTTPClientHandlers(leftOverBytesStrategy: .forwardBytes) + }.flatMap { + #if canImport(Network) + if #available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *), bootstrap.underlyingBootstrap is NIOTSConnectionBootstrap { + return channel.pipeline.addHandler(HTTPClient.NWErrorHandler(), position: .first) + } + #endif + return channel.eventLoop.makeSucceededFuture(()) + }.flatMap { + switch configuration.decompression { + case .disabled: + return channel.eventLoop.makeSucceededFuture(()) + case .enabled(let limit): + let decompressHandler = NIOHTTPResponseDecompressor(limit: limit) + return channel.pipeline.addHandler(decompressHandler) + } + }.map { + channel + } + }.flatMapError { error in + #if canImport(Network) + var error = error + if #available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *), bootstrap.underlyingBootstrap is NIOTSConnectionBootstrap { + error = HTTPClient.NWErrorHandler.translateError(error) + } + #endif + return channelEventLoop.makeFailedFuture(error) + } + } } extension Connection { diff --git a/Tests/AsyncHTTPClientTests/ConnectionPoolTests+XCTest.swift b/Tests/AsyncHTTPClientTests/ConnectionPoolTests+XCTest.swift index 0fea77411..25ef44ff7 100644 --- a/Tests/AsyncHTTPClientTests/ConnectionPoolTests+XCTest.swift +++ b/Tests/AsyncHTTPClientTests/ConnectionPoolTests+XCTest.swift @@ -41,18 +41,10 @@ extension ConnectionPoolTests { ("testReleaseInactiveConnectionEmptyQueueHasConnections", testReleaseInactiveConnectionEmptyQueueHasConnections), ("testReleaseAliveConnectionHasWaiter", testReleaseAliveConnectionHasWaiter), ("testReleaseInactiveConnectionHasWaitersNoConnections", testReleaseInactiveConnectionHasWaitersNoConnections), - ("testReleaseInactiveConnectionHasWaitersHasConnections", testReleaseInactiveConnectionHasWaitersHasConnections), ("testReleaseAliveConnectionSameELHasWaiterSpecificEL", testReleaseAliveConnectionSameELHasWaiterSpecificEL), - ("testReleaseAliveConnectionDifferentELNoSameELConnectionsHasWaiterSpecificEL", testReleaseAliveConnectionDifferentELNoSameELConnectionsHasWaiterSpecificEL), - ("testReleaseAliveConnectionDifferentELHasSameELConnectionsHasWaiterSpecificEL", testReleaseAliveConnectionDifferentELHasSameELConnectionsHasWaiterSpecificEL), ("testReleaseAliveConnectionDifferentELNoSameELConnectionsOnLimitHasWaiterSpecificEL", testReleaseAliveConnectionDifferentELNoSameELConnectionsOnLimitHasWaiterSpecificEL), - ("testReleaseInactiveConnectionHasWaitersHasSameELConnectionsSpecificEL", testReleaseInactiveConnectionHasWaitersHasSameELConnectionsSpecificEL), - ("testReleaseInactiveConnectionHasWaitersNoSameELConnectionsSpecificEL", testReleaseInactiveConnectionHasWaitersNoSameELConnectionsSpecificEL), ("testNextWaiterEmptyQueue", testNextWaiterEmptyQueue), ("testNextWaiterEmptyQueueHasConnections", testNextWaiterEmptyQueueHasConnections), - ("testNextWaiterHasWaitersHasConnections", testNextWaiterHasWaitersHasConnections), - ("testNextWaiterHasWaitersHasSameELConnectionsSpecificEL", testNextWaiterHasWaitersHasSameELConnectionsSpecificEL), - ("testNextWaiterHasWaitersHasDifferentELConnectionsSpecificEL", testNextWaiterHasWaitersHasDifferentELConnectionsSpecificEL), ("testTimeoutLeasedConnection", testTimeoutLeasedConnection), ("testTimeoutAvailableConnection", testTimeoutAvailableConnection), ("testRemoteClosedLeasedConnection", testRemoteClosedLeasedConnection), diff --git a/Tests/AsyncHTTPClientTests/ConnectionPoolTests.swift b/Tests/AsyncHTTPClientTests/ConnectionPoolTests.swift index 62a8d1db0..7dec834c5 100644 --- a/Tests/AsyncHTTPClientTests/ConnectionPoolTests.swift +++ b/Tests/AsyncHTTPClientTests/ConnectionPoolTests.swift @@ -25,39 +25,19 @@ import XCTest class ConnectionPoolTests: XCTestCase { var eventLoop: EmbeddedEventLoop! - var http1ConnectionProvider: HTTP1ConnectionProvider! func testPending() { - var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop) - - var snapshot = state.testsOnly_getInternalState() - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(0, snapshot.openedConnectionsCount) - + var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop) + XCTAssertState(state, available: 0, leased: 0, waiters: 0, pending: 0, opened: 0) XCTAssertTrue(state.enqueue()) - - snapshot = state.testsOnly_getInternalState() - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(1, snapshot.pending) - XCTAssertEqual(0, snapshot.openedConnectionsCount) + XCTAssertState(state, available: 0, leased: 0, waiters: 0, pending: 1, opened: 0) } // MARK: - Acquire Tests func testAcquireWhenEmpty() { - var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop) - - var snapshot = state.testsOnly_getInternalState() - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(0, snapshot.openedConnectionsCount) + var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop) + XCTAssertState(state, available: 0, leased: 0, waiters: 0, pending: 0, opened: 0) XCTAssertTrue(state.enqueue()) let action = state.acquire(waiter: .init(promise: self.eventLoop.makePromise(), setupComplete: self.eventLoop.makeSucceededFuture(()), preference: .indifferent)) @@ -68,105 +48,60 @@ class ConnectionPoolTests: XCTestCase { XCTFail("Unexpected action: \(action)") } - snapshot = state.testsOnly_getInternalState() - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) + XCTAssertState(state, available: 0, leased: 0, waiters: 0, pending: 0, opened: 1) } func testAcquireWhenAvailable() throws { - let channel = EmbeddedChannel() - - var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - - let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) - snapshot.availableConnections.append(connection) - snapshot.openedConnectionsCount = 1 + var (state, _) = self.buildState(count: 1) - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) + // Validate that the pool has one available connection and it's internal state is correct + XCTAssertState(state, available: 1, leased: 0, waiters: 0, pending: 0, opened: 1) - snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - XCTAssertEqual(1, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) - - XCTAssertTrue(self.http1ConnectionProvider.enqueue()) + XCTAssertTrue(state.enqueue()) - let action = self.http1ConnectionProvider.state.acquire(waiter: .init(promise: self.eventLoop.makePromise(), setupComplete: self.eventLoop.makeSucceededFuture(()), preference: .indifferent)) + let action = state.acquire(waiter: .init(promise: self.eventLoop.makePromise(), setupComplete: self.eventLoop.makeSucceededFuture(()), preference: .indifferent)) switch action { case .lease(let connection, let waiter): - waiter.promise.succeed(connection) - - snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(1, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) + XCTAssertState(state, available: 0, leased: 1, waiters: 0, pending: 0, opened: 1) // cleanup - // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead - // (https://github.com/swift-server/async-http-client/issues/234) - self.http1ConnectionProvider.release(connection: connection, closing: true, logger: HTTPClient.loggingDisabled) + waiter.promise.succeed(connection) default: XCTFail("Unexpected action: \(action)") } + + // cleanup + try XCTAssertStateClose(state, available: 0, leased: 1, waiters: 0, clean: false) } func testAcquireWhenUnavailable() throws { - var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() + var (state, _) = self.buildState(count: 8, release: false) + XCTAssertState(state, available: 0, leased: 8, waiters: 0, pending: 0, opened: 8) - snapshot.openedConnectionsCount = 8 - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) - - snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(8, snapshot.openedConnectionsCount) - - XCTAssertTrue(self.http1ConnectionProvider.enqueue()) - - let action = self.http1ConnectionProvider.state.acquire(waiter: .init(promise: self.eventLoop.makePromise(), setupComplete: self.eventLoop.makeSucceededFuture(()), preference: .indifferent)) + XCTAssertTrue(state.enqueue()) + let action = state.acquire(waiter: .init(promise: self.eventLoop.makePromise(), setupComplete: self.eventLoop.makeSucceededFuture(()), preference: .indifferent)) switch action { case .none: - snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(1, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(8, snapshot.openedConnectionsCount) + XCTAssertState(state, available: 0, leased: 8, waiters: 1, pending: 0, opened: 8) default: XCTFail("Unexpected action: \(action)") } // cleanup - // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead - // (https://github.com/swift-server/async-http-client/issues/234) - snapshot.openedConnectionsCount = 0 - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) + try XCTAssertStateClose(state, available: 0, leased: 8, waiters: 1, clean: false) } // MARK: - Acquire on Specific EL Tests func testAcquireWhenEmptySpecificEL() { - var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop) - var snapshot = state.testsOnly_getInternalState() + let el: EventLoop = self.eventLoop + let preference: HTTPClient.EventLoopPreference = .delegateAndChannel(on: el) - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(0, snapshot.openedConnectionsCount) + var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: el) + XCTAssertState(state, available: 0, leased: 0, waiters: 0, pending: 0, opened: 0) XCTAssertTrue(state.enqueue()) - - let action = state.acquire(waiter: .init(promise: self.eventLoop.makePromise(), setupComplete: self.eventLoop.makeSucceededFuture(()), preference: .delegateAndChannel(on: self.eventLoop))) + let action = state.acquire(waiter: .init(promise: el.makePromise(), setupComplete: el.makeSucceededFuture(()), preference: preference)) switch action { case .create(let waiter): waiter.promise.fail(TempError()) @@ -174,128 +109,69 @@ class ConnectionPoolTests: XCTestCase { XCTFail("Unexpected action: \(action)") } - snapshot = state.testsOnly_getInternalState() - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) + XCTAssertState(state, available: 0, leased: 0, waiters: 0, pending: 0, opened: 1) } func testAcquireWhenAvailableSpecificEL() throws { - let channel = EmbeddedChannel() + let el: EventLoop = self.eventLoop + let preference: HTTPClient.EventLoopPreference = .delegateAndChannel(on: el) + var (state, _) = self.buildState(count: 1, eventLoop: el) + XCTAssertState(state, available: 1, leased: 0, waiters: 0, pending: 0, opened: 1) - var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - - let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) - snapshot.availableConnections.append(connection) - snapshot.openedConnectionsCount = 1 - - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) - - XCTAssertEqual(1, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) - - XCTAssertTrue(self.http1ConnectionProvider.enqueue()) - - let action = self.http1ConnectionProvider.state.acquire(waiter: .init(promise: channel.eventLoop.makePromise(), setupComplete: channel.eventLoop.makeSucceededFuture(()), preference: .delegateAndChannel(on: channel.eventLoop))) + XCTAssertTrue(state.enqueue()) + let action = state.acquire(waiter: .init(promise: el.makePromise(), setupComplete: el.makeSucceededFuture(()), preference: preference)) switch action { case .lease(let connection, let waiter): waiter.promise.succeed(connection) - - snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(1, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) - - // cleanup - // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead - // (https://github.com/swift-server/async-http-client/issues/234) - self.http1ConnectionProvider.release(connection: connection, closing: true, logger: HTTPClient.loggingDisabled) + XCTAssertState(state, available: 0, leased: 1, waiters: 0, pending: 0, opened: 1) default: XCTFail("Unexpected action: \(action)") } + + // cleanup + try XCTAssertStateClose(state, available: 0, leased: 1, waiters: 0, clean: false) } func testAcquireReplace() throws { - let channel = EmbeddedChannel() - - var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() + let el: EventLoop = self.eventLoop + var (state, connections) = self.buildState(count: 8, release: false, eventLoop: el) - let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) - snapshot.availableConnections.append(connection) - snapshot.openedConnectionsCount = 8 + // release a connection + _ = state.release(connection: connections.first!, closing: false) + XCTAssertState(state, available: 1, leased: 7, waiters: 0, pending: 0, opened: 8) - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) + // other eventLoop + let preference: HTTPClient.EventLoopPreference = .delegateAndChannel(on: EmbeddedEventLoop()) - XCTAssertEqual(1, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(8, snapshot.openedConnectionsCount) - - XCTAssertTrue(self.http1ConnectionProvider.enqueue()) - - let action = self.http1ConnectionProvider.state.acquire(waiter: .init(promise: self.eventLoop.makePromise(), setupComplete: self.eventLoop.makeSucceededFuture(()), preference: .delegateAndChannel(on: self.eventLoop))) + XCTAssertTrue(state.enqueue()) + let action = state.acquire(waiter: .init(promise: el.makePromise(), setupComplete: el.makeSucceededFuture(()), preference: preference)) switch action { - case .replace(_, let waiter): + case .replace(let connection, let waiter): waiter.promise.fail(TempError()) - - snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(8, snapshot.openedConnectionsCount) + XCTAssertState(state, available: 0, leased: 7, waiters: 0, pending: 0, opened: 8, isNotLeased: connection) default: XCTFail("Unexpected action: \(action)") } // cleanup - // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead - // (https://github.com/swift-server/async-http-client/issues/234) - snapshot.openedConnectionsCount = 0 - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) + try XCTAssertStateClose(state, available: 0, leased: 7, waiters: 0, clean: false) } func testAcquireWhenUnavailableSpecificEL() throws { - var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - - snapshot.openedConnectionsCount = 8 - - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) - - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(8, snapshot.openedConnectionsCount) + var (state, _) = self.buildState(count: 8, release: false, eventLoop: self.eventLoop) + XCTAssertState(state, available: 0, leased: 8, waiters: 0, pending: 0, opened: 8) - XCTAssertTrue(self.http1ConnectionProvider.enqueue()) - - let action = self.http1ConnectionProvider.state.acquire(waiter: .init(promise: self.eventLoop.makePromise(), setupComplete: self.eventLoop.makeSucceededFuture(()), preference: .delegateAndChannel(on: self.eventLoop))) + XCTAssertTrue(state.enqueue()) + let action = state.acquire(waiter: .init(promise: self.eventLoop.makePromise(), setupComplete: self.eventLoop.makeSucceededFuture(()), preference: .delegateAndChannel(on: self.eventLoop))) switch action { case .none: - snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(1, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(8, snapshot.openedConnectionsCount) + XCTAssertState(state, available: 0, leased: 8, waiters: 1, pending: 0, opened: 8) default: XCTFail("Unexpected action: \(action)") } // cleanup - // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead - // (https://github.com/swift-server/async-http-client/issues/234) - snapshot.openedConnectionsCount = 0 - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) + try XCTAssertStateClose(state, available: 0, leased: 8, waiters: 1, clean: false) } // MARK: - Acquire Errors Tests @@ -303,6 +179,7 @@ class ConnectionPoolTests: XCTestCase { func testAcquireWhenClosed() { var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop) _ = state.close() + XCTAssertState(state, available: 0, leased: 0, waiters: 0, pending: 0, opened: 0) XCTAssertFalse(state.enqueue()) @@ -319,6 +196,7 @@ class ConnectionPoolTests: XCTestCase { func testConnectFailedWhenClosed() { var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop) _ = state.close() + XCTAssertState(state, available: 0, leased: 0, waiters: 0, pending: 0, opened: 0) let action = state.connectFailed() switch action { @@ -332,732 +210,201 @@ class ConnectionPoolTests: XCTestCase { // MARK: - Release Tests func testReleaseAliveConnectionEmptyQueue() throws { - let channel = ActiveChannel(eventLoop: self.eventLoop) - var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - - snapshot.pending = 0 - snapshot.openedConnectionsCount = 1 - let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) - snapshot.leasedConnections.insert(ConnectionKey(connection)) + var (state, connections) = self.buildState(count: 1, release: false) + XCTAssertState(state, available: 0, leased: 1, waiters: 0, pending: 0, opened: 1) - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) - - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(1, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) - - let action = self.http1ConnectionProvider.state.release(connection: connection, closing: false) + let connection = try XCTUnwrap(connections.first) + let action = state.release(connection: connection, closing: false) switch action { case .park: - snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - XCTAssertEqual(1, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) + XCTAssertState(state, available: 1, leased: 0, waiters: 0, pending: 0, opened: 1) default: XCTFail("Unexpected action: \(action)") } // cleanup - // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead - // (https://github.com/swift-server/async-http-client/issues/234) - connection.remoteClosed(logger: HTTPClient.loggingDisabled) + try XCTAssertStateClose(state, available: 1, leased: 0, waiters: 0, clean: true) } func testReleaseAliveButClosingConnectionEmptyQueue() throws { - let channel = ActiveChannel(eventLoop: self.eventLoop) - - var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - - snapshot.pending = 0 - snapshot.openedConnectionsCount = 1 - let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) - snapshot.leasedConnections.insert(ConnectionKey(connection)) + var (state, connections) = self.buildState(count: 1, release: false) + XCTAssertState(state, available: 0, leased: 1, waiters: 0, pending: 0, opened: 1) - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) - - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(1, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) - - let action = self.http1ConnectionProvider.state.release(connection: connection, closing: true) + let connection = try XCTUnwrap(connections.first) + // closing should be true to test that we can discard connection that is still active, but caller indicated that it will be closed soon + let action = state.release(connection: connection, closing: true) switch action { case .closeProvider: - snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(0, snapshot.openedConnectionsCount) + XCTAssertState(state, available: 0, leased: 0, waiters: 0, pending: 0, opened: 0) default: XCTFail("Unexpected action: \(action)") } // cleanup - // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead - // (https://github.com/swift-server/async-http-client/issues/234) - self.http1ConnectionProvider.execute(action, logger: HTTPClient.loggingDisabled) + XCTAssertNil(state.close()) } func testReleaseInactiveConnectionEmptyQueue() throws { - let channel = EmbeddedChannel() - - var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - - snapshot.pending = 0 - snapshot.openedConnectionsCount = 1 - let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) - snapshot.leasedConnections.insert(ConnectionKey(connection)) + var (state, connections) = self.buildState(count: 1, release: false) + XCTAssertState(state, available: 0, leased: 1, waiters: 0, pending: 0, opened: 1) - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) - - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(1, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) - - let action = self.http1ConnectionProvider.state.release(connection: connection, closing: true) + let connection = try XCTUnwrap(connections.first) + connection.isActiveEstimation = false + // closing should be false to test that we check connection state in order to decided if we need to discard the connection + let action = state.release(connection: connection, closing: false) switch action { case .closeProvider: - snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(0, snapshot.openedConnectionsCount) - + XCTAssertState(state, available: 0, leased: 0, waiters: 0, pending: 0, opened: 0) default: XCTFail("Unexpected action: \(action)") } // cleanup - // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead - // (https://github.com/swift-server/async-http-client/issues/234) - self.http1ConnectionProvider.execute(action, logger: HTTPClient.loggingDisabled) + XCTAssertNil(state.close()) } func testReleaseInactiveConnectionEmptyQueueHasConnections() throws { - let channel = EmbeddedChannel() - - var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() + var (state, connections) = self.buildState(count: 2, release: false) + XCTAssertState(state, available: 0, leased: 2, waiters: 0, pending: 0, opened: 2) - snapshot.pending = 0 - snapshot.openedConnectionsCount = 2 - let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) - snapshot.leasedConnections.insert(ConnectionKey(connection)) - let available = Connection(channel: channel, provider: self.http1ConnectionProvider) - snapshot.availableConnections.append(available) + let connection = try XCTUnwrap(connections.first) - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) + // Return a connection to the pool + _ = state.release(connection: try XCTUnwrap(connections.dropFirst().first), closing: false) + XCTAssertState(state, available: 1, leased: 1, waiters: 0, pending: 0, opened: 2) - XCTAssertEqual(1, snapshot.availableConnections.count) - XCTAssertEqual(1, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(2, snapshot.openedConnectionsCount) - - let action = self.http1ConnectionProvider.state.release(connection: connection, closing: true) + let action = state.release(connection: connection, closing: true) switch action { case .none: - snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - XCTAssertEqual(1, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) + XCTAssertState(state, available: 1, leased: 0, waiters: 0, pending: 0, opened: 1) default: XCTFail("Unexpected action: \(action)") } // cleanup - // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead - // (https://github.com/swift-server/async-http-client/issues/234) - connection.remoteClosed(logger: HTTPClient.loggingDisabled) + try XCTAssertStateClose(state, available: 1, leased: 0, waiters: 0, clean: true) } func testReleaseAliveConnectionHasWaiter() throws { - let channel = ActiveChannel(eventLoop: self.eventLoop) - - var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - - snapshot.pending = 0 - snapshot.openedConnectionsCount = 1 - let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) - snapshot.leasedConnections.insert(ConnectionKey(connection)) - snapshot.waiters.append(.init(promise: channel.eventLoop.makePromise(), setupComplete: channel.eventLoop.makeSucceededFuture(()), preference: .indifferent)) + var (state, connections) = self.buildState(count: 8, release: false) - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) + // Add one waiter to the pool + XCTAssertTrue(state.enqueue()) + _ = state.acquire(waiter: .init(promise: self.eventLoop.makePromise(), setupComplete: self.eventLoop.makeSucceededFuture(()), preference: .indifferent)) - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(1, snapshot.leasedConnections.count) - XCTAssertEqual(1, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) + XCTAssertState(state, available: 0, leased: 8, waiters: 1, pending: 0, opened: 8) - let action = self.http1ConnectionProvider.state.release(connection: connection, closing: false) + let connection = try XCTUnwrap(connections.first) + let action = state.release(connection: connection, closing: false) switch action { case .lease(let connection, let waiter): - snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - XCTAssertTrue(snapshot.leasedConnections.contains(ConnectionKey(connection))) - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(1, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) - + XCTAssertState(state, available: 0, leased: 8, waiters: 0, pending: 0, opened: 8, isLeased: connection) // cleanup - // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead - // (https://github.com/swift-server/async-http-client/issues/234) waiter.promise.succeed(connection) - self.http1ConnectionProvider.release(connection: connection, closing: true, logger: HTTPClient.loggingDisabled) default: XCTFail("Unexpected action: \(action)") } + + // cleanup + try XCTAssertStateClose(state, available: 0, leased: 8, waiters: 0, clean: false) } func testReleaseInactiveConnectionHasWaitersNoConnections() throws { - let channel = EmbeddedChannel() + var (state, connections) = self.buildState(count: 8, release: false) + XCTAssertState(state, available: 0, leased: 8, waiters: 0, pending: 0, opened: 8) - var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - - snapshot.pending = 0 - snapshot.openedConnectionsCount = 1 - let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) - snapshot.leasedConnections.insert(ConnectionKey(connection)) - snapshot.waiters.append(.init(promise: channel.eventLoop.makePromise(), setupComplete: channel.eventLoop.makeSucceededFuture(()), preference: .indifferent)) - - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) - - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(1, snapshot.leasedConnections.count) - XCTAssertEqual(1, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) + // Add one waiter to the pool + XCTAssertTrue(state.enqueue()) + _ = state.acquire(waiter: .init(promise: self.eventLoop.makePromise(), setupComplete: self.eventLoop.makeSucceededFuture(()), preference: .indifferent)) + XCTAssertState(state, available: 0, leased: 8, waiters: 1, pending: 0, opened: 8) - let action = self.http1ConnectionProvider.state.release(connection: connection, closing: true) + let connection = try XCTUnwrap(connections.first) + let action = state.release(connection: connection, closing: true) switch action { case .create(let waiter): - snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) - + XCTAssertState(state, available: 0, leased: 7, waiters: 0, pending: 0, opened: 8) // cleanup - // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead - // (https://github.com/swift-server/async-http-client/issues/234) - // simulate create -> use -> release cycle - self.http1ConnectionProvider.connect(.failure(TempError()), waiter: waiter, logger: HTTPClient.loggingDisabled) + waiter.promise.fail(TempError()) default: XCTFail("Unexpected action: \(action)") } - } - - func testReleaseInactiveConnectionHasWaitersHasConnections() throws { - let channel = EmbeddedChannel() - - var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - - snapshot.pending = 0 - snapshot.openedConnectionsCount = 2 - let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) - snapshot.leasedConnections.insert(ConnectionKey(connection)) - snapshot.waiters.append(.init(promise: channel.eventLoop.makePromise(), setupComplete: channel.eventLoop.makeSucceededFuture(()), preference: .indifferent)) - - let available = Connection(channel: channel, provider: self.http1ConnectionProvider) - snapshot.availableConnections.append(available) - - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) - XCTAssertEqual(1, snapshot.availableConnections.count) - XCTAssertEqual(1, snapshot.leasedConnections.count) - XCTAssertEqual(1, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(2, snapshot.openedConnectionsCount) - - let action = self.http1ConnectionProvider.state.release(connection: connection, closing: false) - switch action { - case .lease(let connection, let waiter): - snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(1, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) - - // cleanup - // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead - // (https://github.com/swift-server/async-http-client/issues/234) - waiter.promise.succeed(connection) - self.http1ConnectionProvider.release(connection: connection, closing: true, logger: HTTPClient.loggingDisabled) - default: - XCTFail("Unexpected action: \(action)") - } + // cleanup + try XCTAssertStateClose(state, available: 0, leased: 7, waiters: 0, clean: false) } // MARK: - Release on Specific EL Tests func testReleaseAliveConnectionSameELHasWaiterSpecificEL() throws { - let channel = ActiveChannel(eventLoop: self.eventLoop) - - var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - - snapshot.pending = 0 - snapshot.openedConnectionsCount = 1 - let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) - snapshot.leasedConnections.insert(ConnectionKey(connection)) - snapshot.waiters.append(.init(promise: channel.eventLoop.makePromise(), setupComplete: channel.eventLoop.makeSucceededFuture(()), preference: .delegateAndChannel(on: channel.eventLoop))) + var (state, connections) = self.buildState(count: 8, release: false) + XCTAssertState(state, available: 0, leased: 8, waiters: 0, pending: 0, opened: 8) - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) - - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(1, snapshot.leasedConnections.count) - XCTAssertEqual(1, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) + // Add one waiter to the pool + XCTAssertTrue(state.enqueue()) + _ = state.acquire(waiter: .init(promise: self.eventLoop.makePromise(), setupComplete: self.eventLoop.makeSucceededFuture(()), preference: .delegateAndChannel(on: self.eventLoop))) + XCTAssertState(state, available: 0, leased: 8, waiters: 1, pending: 0, opened: 8) - let action = self.http1ConnectionProvider.state.release(connection: connection, closing: false) + let connection = try XCTUnwrap(connections.first) + let action = state.release(connection: connection, closing: false) switch action { case .lease(let connection, let waiter): - snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - XCTAssertTrue(snapshot.leasedConnections.contains(ConnectionKey(connection))) - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(1, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) - + XCTAssertState(state, available: 0, leased: 8, waiters: 0, pending: 0, opened: 8, isLeased: connection) // cleanup - // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead - // (https://github.com/swift-server/async-http-client/issues/234) waiter.promise.succeed(connection) - self.http1ConnectionProvider.release(connection: connection, closing: true, logger: HTTPClient.loggingDisabled) default: XCTFail("Unexpected action: \(action)") } - } - - func testReleaseAliveConnectionDifferentELNoSameELConnectionsHasWaiterSpecificEL() throws { - let differentEL = EmbeddedEventLoop() - defer { - XCTAssertNoThrow(try differentEL.syncShutdownGracefully()) - } - let channel = ActiveChannel(eventLoop: differentEL) // Channel on different EL, that's important for the test. - var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - - snapshot.pending = 0 - snapshot.openedConnectionsCount = 1 - let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) - snapshot.leasedConnections.insert(ConnectionKey(connection)) - snapshot.waiters.append(.init(promise: channel.eventLoop.makePromise(of: Connection.self), - setupComplete: self.eventLoop.makeSucceededFuture(()), - preference: .delegateAndChannel(on: self.eventLoop))) - - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) - - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(1, snapshot.leasedConnections.count) - XCTAssertEqual(1, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) - - let action = self.http1ConnectionProvider.state.release(connection: connection, closing: false) - switch action { - case .parkAnd(let connection, .create(let waiter)): - snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - XCTAssertFalse(snapshot.leasedConnections.contains(ConnectionKey(connection))) - XCTAssertEqual(1, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(2, snapshot.openedConnectionsCount) - // cleanup - // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead - // (https://github.com/swift-server/async-http-client/issues/234) - // simulate create -> use -> release cycle - self.http1ConnectionProvider.connect(.failure(TempError()), waiter: waiter, logger: HTTPClient.loggingDisabled) - connection.remoteClosed(logger: HTTPClient.loggingDisabled) - default: - XCTFail("Unexpected action: \(action)") - } - } - - func testReleaseAliveConnectionDifferentELHasSameELConnectionsHasWaiterSpecificEL() throws { - let channel = ActiveChannel(eventLoop: self.eventLoop) - let otherChannel = EmbeddedChannel() - - var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - - snapshot.pending = 0 - snapshot.openedConnectionsCount = 2 - let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) - snapshot.leasedConnections.insert(ConnectionKey(connection)) - snapshot.waiters.append(.init(promise: channel.eventLoop.makePromise(), setupComplete: channel.eventLoop.makeSucceededFuture(()), preference: .delegateAndChannel(on: otherChannel.eventLoop))) - - let available = Connection(channel: otherChannel, provider: self.http1ConnectionProvider) - snapshot.availableConnections.append(available) - - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) - - XCTAssertEqual(1, snapshot.availableConnections.count) - XCTAssertEqual(1, snapshot.leasedConnections.count) - XCTAssertEqual(1, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(2, snapshot.openedConnectionsCount) - - let action = self.http1ConnectionProvider.state.release(connection: connection, closing: false) - switch action { - case .parkAnd(let connection, .lease(let replacement, let waiter)): - snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - XCTAssertFalse(snapshot.leasedConnections.contains(ConnectionKey(connection))) - XCTAssertTrue(snapshot.leasedConnections.contains(ConnectionKey(replacement))) - XCTAssertEqual(1, snapshot.availableConnections.count) - XCTAssertEqual(1, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(2, snapshot.openedConnectionsCount) - - // cleanup - waiter.promise.succeed(replacement) - connection.remoteClosed(logger: HTTPClient.loggingDisabled) - self.http1ConnectionProvider.release(connection: replacement, closing: true, logger: HTTPClient.loggingDisabled) - default: - XCTFail("Unexpected action: \(action)") - } + try XCTAssertStateClose(state, available: 0, leased: 8, waiters: 0, clean: false) } func testReleaseAliveConnectionDifferentELNoSameELConnectionsOnLimitHasWaiterSpecificEL() throws { - let channel = ActiveChannel(eventLoop: self.eventLoop) - let otherChannel = EmbeddedChannel() - - var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() + var (state, connections) = self.buildState(count: 8, release: false) + XCTAssertState(state, available: 0, leased: 8, waiters: 0, pending: 0, opened: 8) - snapshot.pending = 0 - snapshot.openedConnectionsCount = 8 - let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) - snapshot.leasedConnections.insert(ConnectionKey(connection)) - snapshot.waiters.append(.init(promise: channel.eventLoop.makePromise(), setupComplete: channel.eventLoop.makeSucceededFuture(()), preference: .delegateAndChannel(on: otherChannel.eventLoop))) - - let available = Connection(channel: channel, provider: self.http1ConnectionProvider) - snapshot.availableConnections.append(available) - - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) - - XCTAssertEqual(1, snapshot.availableConnections.count) - XCTAssertEqual(1, snapshot.leasedConnections.count) - XCTAssertEqual(1, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(8, snapshot.openedConnectionsCount) + let differentEL = EmbeddedEventLoop() + // Add one waiter to the pool + XCTAssertTrue(state.enqueue()) + _ = state.acquire(waiter: .init(promise: differentEL.makePromise(), setupComplete: differentEL.makeSucceededFuture(()), preference: .delegateAndChannel(on: differentEL))) + XCTAssertState(state, available: 0, leased: 8, waiters: 1, pending: 0, opened: 8) - let action = self.http1ConnectionProvider.state.release(connection: connection, closing: false) + let connection = try XCTUnwrap(connections.first) + let action = state.release(connection: connection, closing: false) switch action { case .replace(let connection, let waiter): - snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - XCTAssertTrue(snapshot.leasedConnections.contains(ConnectionKey(connection))) - XCTAssertEqual(1, snapshot.availableConnections.count) - XCTAssertEqual(1, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(8, snapshot.openedConnectionsCount) - + XCTAssertState(state, available: 0, leased: 7, waiters: 0, pending: 0, opened: 8, isNotLeased: connection) // cleanup - // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead - // (https://github.com/swift-server/async-http-client/issues/234) waiter.promise.fail(TempError()) - snapshot.openedConnectionsCount = 2 - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) - - snapshot.availableConnections.forEach { $0.remoteClosed(logger: HTTPClient.loggingDisabled) } - self.http1ConnectionProvider.release(connection: connection, closing: true, logger: HTTPClient.loggingDisabled) default: XCTFail("Unexpected action: \(action)") } - } - - func testReleaseInactiveConnectionHasWaitersHasSameELConnectionsSpecificEL() throws { - let channel = EmbeddedChannel() - let otherChannel = EmbeddedChannel() - - var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - - snapshot.pending = 0 - snapshot.openedConnectionsCount = 2 - let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) - snapshot.leasedConnections.insert(ConnectionKey(connection)) - snapshot.waiters.append(.init(promise: channel.eventLoop.makePromise(), setupComplete: channel.eventLoop.makeSucceededFuture(()), preference: .delegateAndChannel(on: otherChannel.eventLoop))) - - let available = Connection(channel: otherChannel, provider: self.http1ConnectionProvider) - snapshot.availableConnections.append(available) - - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) - - XCTAssertEqual(1, snapshot.availableConnections.count) - XCTAssertEqual(1, snapshot.leasedConnections.count) - XCTAssertEqual(1, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(2, snapshot.openedConnectionsCount) - - let action = self.http1ConnectionProvider.state.release(connection: connection, closing: false) - switch action { - case .lease(let connection, let waiter): - snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - XCTAssertTrue(connection === available) - XCTAssertTrue(snapshot.leasedConnections.contains(ConnectionKey(connection))) - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(1, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) - // cleanup - // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead - // (https://github.com/swift-server/async-http-client/issues/234) - waiter.promise.succeed(connection) - self.http1ConnectionProvider.release(connection: connection, closing: true, logger: HTTPClient.loggingDisabled) - default: - XCTFail("Unexpected action: \(action)") - } - } - - func testReleaseInactiveConnectionHasWaitersNoSameELConnectionsSpecificEL() throws { - let channel = EmbeddedChannel() - let otherChannel = EmbeddedChannel() - - var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - - snapshot.pending = 0 - snapshot.openedConnectionsCount = 2 - let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) - snapshot.leasedConnections.insert(ConnectionKey(connection)) - snapshot.waiters.append(.init(promise: channel.eventLoop.makePromise(), setupComplete: channel.eventLoop.makeSucceededFuture(()), preference: .delegateAndChannel(on: otherChannel.eventLoop))) - - let available = Connection(channel: channel, provider: self.http1ConnectionProvider) - snapshot.availableConnections.append(available) - - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) - - XCTAssertEqual(1, snapshot.availableConnections.count) - XCTAssertEqual(1, snapshot.leasedConnections.count) - XCTAssertEqual(1, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(2, snapshot.openedConnectionsCount) - - let action = self.http1ConnectionProvider.state.release(connection: connection, closing: false) - switch action { - case .create(let waiter): - snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - XCTAssertEqual(1, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(2, snapshot.openedConnectionsCount) - - // cleanup - // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead - // (https://github.com/swift-server/async-http-client/issues/234)s - self.http1ConnectionProvider.connect(.failure(TempError()), waiter: waiter, logger: HTTPClient.loggingDisabled) - snapshot.availableConnections.forEach { $0.remoteClosed(logger: HTTPClient.loggingDisabled) } - default: - XCTFail("Unexpected action: \(action)") - } + try XCTAssertStateClose(state, available: 0, leased: 7, waiters: 0, clean: false) } // MARK: - Next Waiter Tests func testNextWaiterEmptyQueue() throws { - var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - - snapshot.pending = 0 - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) + var (state, _) = self.buildState(count: 0) + XCTAssertState(state, available: 0, leased: 0, waiters: 0, pending: 0, opened: 0) - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(0, snapshot.openedConnectionsCount) - - let action = self.http1ConnectionProvider.state.processNextWaiter() + let action = state.processNextWaiter() switch action { case .closeProvider: - snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(0, snapshot.openedConnectionsCount) + XCTAssertState(state, available: 0, leased: 0, waiters: 0, pending: 0, opened: 0) default: XCTFail("Unexpected action: \(action)") } - - // cleanup - // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead - // (https://github.com/swift-server/async-http-client/issues/234) - self.http1ConnectionProvider.execute(action, logger: HTTPClient.loggingDisabled) } func testNextWaiterEmptyQueueHasConnections() throws { - let channel = ActiveChannel(eventLoop: self.eventLoop) - var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - - snapshot.pending = 0 - snapshot.openedConnectionsCount = 1 + var (state, _) = self.buildState(count: 1, release: true) + XCTAssertState(state, available: 1, leased: 0, waiters: 0, pending: 0, opened: 1) - let available = Connection(channel: channel, provider: self.http1ConnectionProvider) - snapshot.availableConnections.append(available) - - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) - - XCTAssertEqual(1, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) - - let action = self.http1ConnectionProvider.state.processNextWaiter() + let action = state.processNextWaiter() switch action { case .none: - snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - XCTAssertEqual(1, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) - - // cleanup - // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead - // (https://github.com/swift-server/async-http-client/issues/234) - available.remoteClosed(logger: HTTPClient.loggingDisabled) - default: - XCTFail("Unexpected action: \(action)") - } - } - - func testNextWaiterHasWaitersHasConnections() throws { - let channel = EmbeddedChannel() - - var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - - snapshot.pending = 0 - snapshot.openedConnectionsCount = 1 - snapshot.waiters.append(.init(promise: channel.eventLoop.makePromise(), setupComplete: channel.eventLoop.makeSucceededFuture(()), preference: .indifferent)) - - let available = Connection(channel: channel, provider: self.http1ConnectionProvider) - snapshot.availableConnections.append(available) - - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) - - XCTAssertEqual(1, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(1, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) - - let action = self.http1ConnectionProvider.state.processNextWaiter() - switch action { - case .lease(let connection, let waiter): - snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - XCTAssertTrue(snapshot.leasedConnections.contains(ConnectionKey(connection))) - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(1, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) - - // cleanup - // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead - // (https://github.com/swift-server/async-http-client/issues/234) - waiter.promise.fail(TempError()) - self.http1ConnectionProvider.release(connection: connection, closing: true, logger: HTTPClient.loggingDisabled) - default: - XCTFail("Unexpected action: \(action)") - } - } - - func testNextWaiterHasWaitersHasSameELConnectionsSpecificEL() throws { - let channel = EmbeddedChannel() - - var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - - snapshot.pending = 0 - snapshot.openedConnectionsCount = 1 - snapshot.waiters.append(.init(promise: channel.eventLoop.makePromise(), setupComplete: channel.eventLoop.makeSucceededFuture(()), preference: .delegateAndChannel(on: channel.eventLoop))) - - let available = Connection(channel: channel, provider: self.http1ConnectionProvider) - snapshot.availableConnections.append(available) - - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) - - XCTAssertEqual(1, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(1, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) - - let action = self.http1ConnectionProvider.state.processNextWaiter() - switch action { - case .lease(let connection, let waiter): - let snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - XCTAssertTrue(snapshot.leasedConnections.contains(ConnectionKey(connection))) - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(1, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) - - // cleanup - // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead - // (https://github.com/swift-server/async-http-client/issues/234) - waiter.promise.fail(TempError()) - self.http1ConnectionProvider.release(connection: connection, closing: true, logger: HTTPClient.loggingDisabled) - default: - XCTFail("Unexpected action: \(action)") - } - } - - func testNextWaiterHasWaitersHasDifferentELConnectionsSpecificEL() throws { - let channel = EmbeddedChannel() - - var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - - snapshot.pending = 0 - snapshot.openedConnectionsCount = 1 - snapshot.waiters.append(.init(promise: channel.eventLoop.makePromise(), setupComplete: self.eventLoop.makeSucceededFuture(()), preference: .delegateAndChannel(on: self.eventLoop))) - - let available = Connection(channel: channel, provider: self.http1ConnectionProvider) - snapshot.availableConnections.append(available) - - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) - - XCTAssertEqual(1, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(1, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) - - let action = self.http1ConnectionProvider.state.processNextWaiter() - switch action { - case .create(let waiter): - snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - XCTAssertEqual(1, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(2, snapshot.openedConnectionsCount) - - // cleanup - // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead - // (https://github.com/swift-server/async-http-client/issues/234) - // simulate create -> use -> release cycle - self.http1ConnectionProvider.connect(.failure(TempError()), waiter: waiter, logger: HTTPClient.loggingDisabled) - available.remoteClosed(logger: HTTPClient.loggingDisabled) + XCTAssertState(state, available: 1, leased: 0, waiters: 0, pending: 0, opened: 1) default: XCTFail("Unexpected action: \(action)") } @@ -1066,61 +413,25 @@ class ConnectionPoolTests: XCTestCase { // MARK: - Timeout and Remote Close Tests func testTimeoutLeasedConnection() throws { - let channel = EmbeddedChannel() - - var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - - let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) - snapshot.pending = 0 - snapshot.openedConnectionsCount = 1 - snapshot.leasedConnections.insert(ConnectionKey(connection)) - - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) + var (state, connections) = self.buildState(count: 1, release: false) + XCTAssertState(state, available: 0, leased: 1, waiters: 0, pending: 0, opened: 1) - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(1, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) - - let action = self.http1ConnectionProvider.state.timeout(connection: connection) + let connection = try XCTUnwrap(connections.first) + let action = state.timeout(connection: connection) switch action { case .none: - snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(1, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) + XCTAssertState(state, available: 0, leased: 1, waiters: 0, pending: 0, opened: 1) default: XCTFail("Unexpected action: \(action)") } - - // cleanup - // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead - // (https://github.com/swift-server/async-http-client/issues/234) - self.http1ConnectionProvider.release(connection: connection, closing: true, logger: HTTPClient.loggingDisabled) } func testTimeoutAvailableConnection() throws { - let channel = EmbeddedChannel() - - var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - - let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) - snapshot.pending = 0 - snapshot.openedConnectionsCount = 1 - snapshot.availableConnections.append(connection) - - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) + var (state, connections) = self.buildState(count: 1) + XCTAssertState(state, available: 1, leased: 0, waiters: 0, pending: 0, opened: 1) - XCTAssertEqual(1, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) - - let action = self.http1ConnectionProvider.state.timeout(connection: connection) + let connection = try XCTUnwrap(connections.first) + let action = state.timeout(connection: connection) switch action { case .closeAnd(_, let after): switch after { @@ -1129,104 +440,51 @@ class ConnectionPoolTests: XCTestCase { default: XCTFail("Unexpected action: \(action)") } - snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(0, snapshot.openedConnectionsCount) + XCTAssertState(state, available: 0, leased: 0, waiters: 0, pending: 0, opened: 0) default: XCTFail("Unexpected action: \(action)") } - - // cleanup - // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead - // (https://github.com/swift-server/async-http-client/issues/234) - self.http1ConnectionProvider.execute(action, logger: HTTPClient.loggingDisabled) } func testRemoteClosedLeasedConnection() throws { - let channel = EmbeddedChannel() + var (state, connections) = self.buildState(count: 1, release: false) - var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() + XCTAssertState(state, available: 0, leased: 1, waiters: 0, pending: 0, opened: 1) - let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) - snapshot.pending = 0 - snapshot.openedConnectionsCount = 1 - snapshot.leasedConnections.insert(ConnectionKey(connection)) - - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) - - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(1, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) - - let action = self.http1ConnectionProvider.state.remoteClosed(connection: connection) + // This can happen when just leased connection is closed before TaskHandler is added to pipeline + let connection = try XCTUnwrap(connections.first) + let action = state.remoteClosed(connection: connection) switch action { case .none: - snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(1, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) + XCTAssertState(state, available: 0, leased: 1, waiters: 0, pending: 0, opened: 1) default: XCTFail("Unexpected action: \(action)") } - - // cleanup - // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead - // (https://github.com/swift-server/async-http-client/issues/234) - self.http1ConnectionProvider.release(connection: connection, closing: true, logger: HTTPClient.loggingDisabled) } func testRemoteClosedAvailableConnection() throws { - let channel = EmbeddedChannel() - - var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - - let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) - snapshot.pending = 0 - snapshot.openedConnectionsCount = 1 - snapshot.availableConnections.append(connection) + var (state, connections) = self.buildState(count: 1) - self.http1ConnectionProvider.state.testsOnly_setInternalState(snapshot) + XCTAssertState(state, available: 1, leased: 0, waiters: 0, pending: 0, opened: 1) - XCTAssertEqual(1, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(1, snapshot.openedConnectionsCount) - - let action = self.http1ConnectionProvider.state.remoteClosed(connection: connection) + let connection = try XCTUnwrap(connections.first) + let action = state.remoteClosed(connection: connection) switch action { case .closeProvider: - snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - XCTAssertEqual(0, snapshot.availableConnections.count) - XCTAssertEqual(0, snapshot.leasedConnections.count) - XCTAssertEqual(0, snapshot.waiters.count) - XCTAssertEqual(0, snapshot.pending) - XCTAssertEqual(0, snapshot.openedConnectionsCount) + XCTAssertState(state, available: 0, leased: 0, waiters: 0, pending: 0, opened: 0) default: XCTFail("Unexpected action: \(action)") } - - // cleanup - // this cleanup code needs to go and use HTTP1ConnectionProvider's API instead - // (https://github.com/swift-server/async-http-client/issues/234) - self.http1ConnectionProvider.execute(action, logger: HTTPClient.loggingDisabled) } // MARK: - Shutdown tests func testShutdownOnPendingAndSuccess() { - var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop) + var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop) XCTAssertTrue(state.enqueue()) - let connectionPromise = self.eventLoop.makePromise(of: Connection.self) + let connectionPromise = self.eventLoop.makePromise(of: ConnectionForTests.self) let setupPromise = self.eventLoop.makePromise(of: Void.self) let waiter = HTTP1ConnectionProvider.Waiter(promise: connectionPromise, setupComplete: setupPromise.futureResult, preference: .indifferent) var action = state.acquire(waiter: waiter) @@ -1248,7 +506,7 @@ class ConnectionPoolTests: XCTestCase { XCTFail("Expecting snapshot") } - let connection = Connection(channel: EmbeddedChannel(), provider: self.http1ConnectionProvider) + let connection = ConnectionForTests(eventLoop: self.eventLoop) action = state.offer(connection: connection) guard case .closeAnd(_, .closeProvider) = action else { @@ -1261,11 +519,11 @@ class ConnectionPoolTests: XCTestCase { } func testShutdownOnPendingAndError() { - var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop) + var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop) XCTAssertTrue(state.enqueue()) - let connectionPromise = self.eventLoop.makePromise(of: Connection.self) + let connectionPromise = self.eventLoop.makePromise(of: ConnectionForTests.self) let setupPromise = self.eventLoop.makePromise(of: Void.self) let waiter = HTTP1ConnectionProvider.Waiter(promise: connectionPromise, setupComplete: setupPromise.futureResult, preference: .indifferent) var action = state.acquire(waiter: waiter) @@ -1297,16 +555,8 @@ class ConnectionPoolTests: XCTestCase { setupPromise.succeed(()) } - func testShutdownTimeout() { - var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop) - - var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - - let connection = Connection(channel: EmbeddedChannel(), provider: self.http1ConnectionProvider) - snapshot.availableConnections.append(connection) - snapshot.openedConnectionsCount = 1 - - state.testsOnly_setInternalState(snapshot) + func testShutdownTimeout() throws { + var (state, connections) = self.buildState(count: 1) if let (waiters, available, leased, clean) = state.close() { XCTAssertTrue(waiters.isEmpty) @@ -1317,6 +567,7 @@ class ConnectionPoolTests: XCTestCase { XCTFail("Expecting snapshot") } + let connection = try XCTUnwrap(connections.first) let action = state.timeout(connection: connection) switch action { case .closeAnd(_, let next): @@ -1332,16 +583,8 @@ class ConnectionPoolTests: XCTestCase { } } - func testShutdownRemoteClosed() { - var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: self.eventLoop) - - var snapshot = self.http1ConnectionProvider.state.testsOnly_getInternalState() - - let connection = Connection(channel: EmbeddedChannel(), provider: self.http1ConnectionProvider) - snapshot.availableConnections.append(connection) - snapshot.openedConnectionsCount = 1 - - state.testsOnly_setInternalState(snapshot) + func testShutdownRemoteClosed() throws { + var (state, connections) = self.buildState(count: 1) if let (waiters, available, leased, clean) = state.close() { XCTAssertTrue(waiters.isEmpty) @@ -1352,6 +595,7 @@ class ConnectionPoolTests: XCTestCase { XCTFail("Expecting snapshot") } + let connection = try XCTUnwrap(connections.first) let action = state.remoteClosed(connection: connection) switch action { case .closeProvider: @@ -1362,26 +606,14 @@ class ConnectionPoolTests: XCTestCase { } } - // MARK: - Helpers - override func setUp() { XCTAssertNil(self.eventLoop) - XCTAssertNil(self.http1ConnectionProvider) self.eventLoop = EmbeddedEventLoop() - XCTAssertNoThrow(self.http1ConnectionProvider = try HTTP1ConnectionProvider(key: .init(.init(url: "http://some.test")), - eventLoop: self.eventLoop, - configuration: .init(), - pool: .init(configuration: .init(), - backgroundActivityLogger: HTTPClient.loggingDisabled), - backgroundActivityLogger: HTTPClient.loggingDisabled)) } override func tearDown() { XCTAssertNotNil(self.eventLoop) - XCTAssertNotNil(self.http1ConnectionProvider) - XCTAssertNoThrow(try self.http1ConnectionProvider.close().wait()) XCTAssertNoThrow(try self.eventLoop.syncShutdownGracefully()) self.eventLoop = nil - self.http1ConnectionProvider = nil } } diff --git a/Tests/AsyncHTTPClientTests/ConnectionPoolTestsSupport.swift b/Tests/AsyncHTTPClientTests/ConnectionPoolTestsSupport.swift index 70edd1426..91f8f4fc7 100644 --- a/Tests/AsyncHTTPClientTests/ConnectionPoolTestsSupport.swift +++ b/Tests/AsyncHTTPClientTests/ConnectionPoolTestsSupport.swift @@ -16,23 +16,35 @@ import NIO import XCTest +class ConnectionForTests: PoolManageableConnection { + var eventLoop: EventLoop + var isActiveEstimation: Bool + + init(eventLoop: EventLoop) { + self.eventLoop = eventLoop + self.isActiveEstimation = true + } + + func cancel() -> EventLoopFuture { + return self.eventLoop.makeSucceededFuture(()) + } +} + extension ConnectionPoolTests { - func buildState(count: Int, release: Bool = true, eventLoop: EventLoop? = nil) -> (HTTP1ConnectionProvider.ConnectionsState, [Connection]) { + func buildState(count: Int, release: Bool = true, eventLoop: EventLoop? = nil) -> (HTTP1ConnectionProvider.ConnectionsState, [ConnectionForTests]) { let eventLoop = eventLoop ?? self.eventLoop! - var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: eventLoop) - var items: [Connection] = [] + var state = HTTP1ConnectionProvider.ConnectionsState(eventLoop: eventLoop) + var items: [ConnectionForTests] = [] if count == 0 { return (state, items) } - let channel = ActiveChannel(eventLoop: self.eventLoop) - for _ in 1...count { // Set up connection pool to have one available connection do { - let connection = Connection(channel: channel, provider: self.http1ConnectionProvider) + let connection = ConnectionForTests(eventLoop: eventLoop) items.append(connection) // First, we ask the empty pool for a connection, triggering connection creation XCTAssertTrue(state.enqueue())