From 206df4050f929257e96ce4c9200ca2d72547df2e Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Thu, 30 Sep 2021 17:10:27 +0200 Subject: [PATCH 01/15] HTTP2StateMachine --- .../HTTPConnectionPool+HTTP1Connections.swift | 57 ++- .../HTTPConnectionPool+HTTP2Connections.swift | 15 + ...HTTPConnectionPool+HTTP2StateMachine.swift | 342 ++++++++++++++++++ .../HTTPConnectionPool+RequestQueue.swift | 36 ++ ...onPool+HTTP2StateMachineTests+XCTest.swift | 31 ++ ...onnectionPool+HTTP2StateMachineTests.swift | 117 ++++++ .../Mocks/MockConnectionPool.swift | 21 ++ Tests/LinuxMain.swift | 1 + 8 files changed, 614 insertions(+), 6 deletions(-) create mode 100644 Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift create mode 100644 Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests+XCTest.swift create mode 100644 Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1Connections.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1Connections.swift index c38479778..018447ba7 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1Connections.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1Connections.swift @@ -189,6 +189,30 @@ extension HTTPConnectionPool { preconditionFailure("Unexpected state: Did not expect to have connections with this state in the state machine: \(self.state)") } } + + enum MigrateAction { + case removeConnection + case keepConnection + } + + func migrateToHTTP2(_ context: inout HTTP1Connections.HTTP2ToHTTP1MigrationContext) -> MigrateAction { + switch self.state { + case .starting: + context.starting.append((self.connectionID, self.eventLoop)) + return .removeConnection + case .backingOff: + context.backingOff.append((self.connectionID, self.eventLoop)) + return .removeConnection + case .idle(let connection, since: _): + // Idle connections can be removed right away + context.close.append(connection) + return .removeConnection + case .leased: + return .keepConnection + case .closed: + preconditionFailure("Unexpected state: Did not expect to have connections with this state in the state machine: \(self.state)") + } + } } /// A structure to hold the currently active HTTP/1.1 connections. @@ -298,6 +322,12 @@ extension HTTPConnectionPool { var connectionsStartingForUseCase: Int } + struct HTTP2ToHTTP1MigrationContext { + var backingOff: [(Connection.ID, EventLoop)] = [] + var starting: [(Connection.ID, EventLoop)] = [] + var close: [Connection] = [] + } + // MARK: Connection creation mutating func createNewConnection(on eventLoop: EventLoop) -> Connection.ID { @@ -485,6 +515,21 @@ extension HTTPConnectionPool { return (index, context) } + // MARK: Migration + + mutating func migrateToHTTP2() -> HTTP2ToHTTP1MigrationContext { + var migrationContext = HTTP2ToHTTP1MigrationContext() + self.connections.removeAll { connection in + switch connection.migrateToHTTP2(&migrationContext) { + case .removeConnection: + return true + case .keepConnection: + return false + } + } + return migrationContext + } + // MARK: Shutdown mutating func shutdown() -> CleanupContext { @@ -610,12 +655,12 @@ extension HTTPConnectionPool { return nil } - } - struct Stats { - var idle: Int = 0 - var leased: Int = 0 - var connecting: Int = 0 - var backingOff: Int = 0 + struct Stats { + var idle: Int = 0 + var leased: Int = 0 + var connecting: Int = 0 + var backingOff: Int = 0 + } } } diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift index 6573ea368..6e82773ab 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift @@ -49,6 +49,16 @@ extension HTTPConnectionPool { } } + /// A connection is established and can potentially execute requests if not all streams are leased + var isActive: Bool { + switch self.state { + case .active: + return true + case .starting, .backingOff, .draining, .closed: + return false + } + } + /// A request can be scheduled on the connection var isAvailable: Bool { switch self.state { @@ -326,6 +336,11 @@ extension HTTPConnectionPool { // MARK: Connection creation + /// true if one ore more connections are active + var hasActiveConnections: Bool { + self.connections.contains { $0.isActive } + } + /// used in general purpose connection scenarios to check if at least one connection exist, or if should we create a new one var hasConnectionThatCanOrWillBeAbleToExecuteRequests: Bool { self.connections.contains { $0.canOrWillBeAbleToExecuteRequests } diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift new file mode 100644 index 000000000..6387e3b29 --- /dev/null +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift @@ -0,0 +1,342 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the AsyncHTTPClient open source project +// +// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import NIOCore +import NIOHTTP2 + +extension HTTPConnectionPool { + struct HTTP2StateMaschine { + typealias Action = HTTPConnectionPool.StateMachine.Action + typealias RequestAction = HTTPConnectionPool.StateMachine.RequestAction + typealias ConnectionAction = HTTPConnectionPool.StateMachine.ConnectionAction + + private var lastConnectFailure: Error? + private var failedConsecutiveConnectionAttempts = 0 + + private var connections: HTTP2Connections + private var http1Connections: HTTP1Connections? + + private var requests: RequestQueue + + private let idGenerator: Connection.ID.Generator + + init( + idGenerator: Connection.ID.Generator + ) { + self.idGenerator = idGenerator + self.requests = RequestQueue() + + self.connections = HTTP2Connections(generator: idGenerator) + } + + mutating func migrateConnectionsFromHTTP1( + connections http1Connections: HTTP1Connections, + requests: RequestQueue + ) -> Action { + precondition(self.http1Connections == nil) + precondition(self.connections.isEmpty) + precondition(self.requests.isEmpty) + + var http1Connections = http1Connections // make http1Connections mutable + let context = http1Connections.migrateToHTTP2() + self.connections.migrateConnections( + starting: context.starting, + backingOff: context.backingOff + ) + + if !http1Connections.isEmpty { + self.http1Connections = http1Connections + } + + self.requests = requests + + // TODO: Close all idle connections from context.close + // TODO: Potentially cancel unneeded bootstraps (Needs cancellable ClientBootstrap) + + return .none + } + + mutating func executeRequest(_ request: Request) -> Action { + if let eventLoop = request.requiredEventLoop { + return self.executeRequest(request, onRequired: eventLoop) + } else { + return self.executeRequest(request, onPreferred: request.preferredEventLoop) + } + } + + private mutating func executeRequest( + _ request: Request, + onRequired eventLoop: EventLoop + ) -> Action { + if let connection = self.connections.leaseStream(onRequired: eventLoop) { + /// 1. we have a stream available and can execute the request immediately + return .init( + request: .executeRequest(request, connection, cancelTimeout: false), + connection: .cancelTimeoutTimer(connection.id) + ) + } + /// 2. No available stream so we definitely need to wait until we have one + self.requests.push(request) + + if self.connections.hasConnectionThatCanOrWillBeAbleToExecuteRequests(for: eventLoop) { + /// 3. we already have a connection, we just need to wait until until it becomes available + return .init( + request: .scheduleRequestTimeout(for: request, on: eventLoop), + connection: .none + ) + } else { + /// 4. we do *not* have a connection, need to create a new one and wait until it is connected. + let connectionId = self.connections.createNewConnection(on: eventLoop) + return .init( + request: .scheduleRequestTimeout(for: request, on: eventLoop), + connection: .createConnection(connectionId, on: eventLoop) + ) + } + } + + private mutating func executeRequest( + _ request: Request, + onPreferred eventLoop: EventLoop + ) -> Action { + if let connection = self.connections.leaseStream(onPreferred: eventLoop) { + /// 1. we have a stream available and can execute the request immediately + return .init( + request: .executeRequest(request, connection, cancelTimeout: false), + connection: .cancelTimeoutTimer(connection.id) + ) + } + /// 2. No available stream so we definitely need to wait until we have one + self.requests.push(request) + + if self.connections.hasConnectionThatCanOrWillBeAbleToExecuteRequests { + /// 3. we already have a connection, we just need to wait until until it becomes available + return .init( + request: .scheduleRequestTimeout(for: request, on: eventLoop), + connection: .none + ) + } else { + /// 4. we do *not* have a connection, need to create a new one and wait until it is connected. + let connectionId = self.connections.createNewConnection(on: eventLoop) + return .init( + request: .scheduleRequestTimeout(for: request, on: eventLoop), + connection: .createConnection(connectionId, on: eventLoop) + ) + } + } + + mutating func newHTTP2ConnectionEstablished(_ connection: Connection, maxConcurrentStreams: Int) -> Action { + self.failedConsecutiveConnectionAttempts = 0 + self.lastConnectFailure = nil + let (index, context) = self.connections.newHTTP2ConnectionEstablished( + connection, + maxConcurrentStreams: maxConcurrentStreams + ) + return self.nextActionForAvailableConnection(at: index, context: context) + } + + private mutating func nextActionForAvailableConnection( + at index: Int, + context: HTTP2Connections.AvailableConnectionContext + ) -> Action { + // We prioritise requests with a required event loop over those without a requirement. + // This can cause starvation for request without a required event loop. + // We should come up with a better algorithm in the future. + + var requestsToExecute = self.requests.popFirst(max: context.availableStreams, for: context.eventLoop) + let remainingAvailableStreams = context.availableStreams - requestsToExecute.count + // use the remaining available streams for requests without a required event loop + requestsToExecute += self.requests.popFirst(max: remainingAvailableStreams, for: nil) + let connection = self.connections.leaseStreams(at: index, count: requestsToExecute.count) + + let requestAction = { () -> RequestAction in + if requestsToExecute.isEmpty { + return .none + } else { + return .executeRequestsAndCancelTimeouts(requestsToExecute, connection) + } + }() + + let connectionAction = { () -> ConnectionAction in + if context.isIdle, requestsToExecute.isEmpty { + return .scheduleTimeoutTimer(connection.id, on: context.eventLoop) + } else { + return .none + } + }() + + return .init( + request: requestAction, + connection: connectionAction + ) + } + + mutating func newHTTP2MaxConcurrentStreamsReceived(_ connectionID: Connection.ID, newMaxStreams: Int) -> Action { + let (index, context) = self.connections.newHTTP2MaxConcurrentStreamsReceived(connectionID, newMaxStreams: newMaxStreams) + return self.nextActionForAvailableConnection(at: index, context: context) + } + + mutating func http2ConnectionGoAwayReceived(_ connectionID: Connection.ID) -> Action { + let context = self.connections.goAwayReceived(connectionID) + return self.nextActionForClosingConnection(on: context.eventLoop) + } + + mutating func http2ConnectionClosed(_ connectionID: Connection.ID) -> Action { + guard let (index, context) = self.connections.failConnection(connectionID) else { + return .none + } + return self.nextActionForFailedConnection(at: index, on: context.eventLoop) + } + + private mutating func nextActionForFailedConnection(at index: Int, on eventLoop: EventLoop) -> Action { + let hasPendingRequest = !self.requests.isEmpty(for: eventLoop) || !self.requests.isEmpty(for: nil) + guard hasPendingRequest else { + return .none + } + + let (newConnectionID, previousEventLoop) = self.connections.createNewConnectionByReplacingClosedConnection(at: index) + precondition(previousEventLoop === eventLoop) + + return .init( + request: .none, + connection: .createConnection(newConnectionID, on: eventLoop) + ) + } + + private mutating func nextActionForClosingConnection(on eventLoop: EventLoop) -> Action { + let hasPendingRequest = !self.requests.isEmpty(for: eventLoop) || !self.requests.isEmpty(for: nil) + guard hasPendingRequest else { + return .none + } + + let newConnectionID = self.connections.createNewConnection(on: eventLoop) + + return .init( + request: .none, + connection: .createConnection(newConnectionID, on: eventLoop) + ) + } + + mutating func http2ConnectionStreamClosed(_ connectionID: Connection.ID) -> Action { + let (index, context) = self.connections.releaseStream(connectionID) + return self.nextActionForAvailableConnection(at: index, context: context) + } + + mutating func failedToCreateNewConnection(_ error: Error, connectionID: Connection.ID) -> Action { + self.failedConsecutiveConnectionAttempts += 1 + self.lastConnectFailure = error + + let eventLoop = self.connections.backoffNextConnectionAttempt(connectionID) + let backoff = calculateBackoff(failedAttempt: self.failedConsecutiveConnectionAttempts) + return .init(request: .none, connection: .scheduleBackoffTimer(connectionID, backoff: backoff, on: eventLoop)) + } + + mutating func connectionCreationBackoffDone(_ connectionID: Connection.ID) -> Action { + // The naming of `failConnection` is a little confusing here. All it does is moving the + // connection state from `.backingOff` to `.closed` here. It also returns the + // connection's index. + guard let (index, _) = self.connections.failConnection(connectionID) else { + preconditionFailure("Backing off a connection that is unknown to us?") + } + let (newConnectionID, eventLoop) = self.connections.createNewConnectionByReplacingClosedConnection(at: index) + return .init(request: .none, connection: .createConnection(newConnectionID, on: eventLoop)) + } + + mutating func timeoutRequest(_ requestID: Request.ID) -> Action { + // 1. check requests in queue + if let request = self.requests.remove(requestID) { + var error: Error = HTTPClientError.getConnectionFromPoolTimeout + if let lastError = self.lastConnectFailure { + error = lastError + } else if !self.connections.hasActiveConnections { + error = HTTPClientError.connectTimeout + } + return .init( + request: .failRequest(request, error, cancelTimeout: false), + connection: .none + ) + } + + // 2. This point is reached, because the request may have already been scheduled. A + // connection might have become available shortly before the request timeout timer + // fired. + return .none + } + + mutating func cancelRequest(_ requestID: Request.ID) -> Action { + // 1. check requests in queue + if self.requests.remove(requestID) != nil { + return .init( + request: .cancelRequestTimeout(requestID), + connection: .none + ) + } + + // 2. This is point is reached, because the request may already have been forwarded to + // an idle connection. In this case the connection will need to handle the + // cancellation. + return .none + } + + mutating func connectionIdleTimeout(_ connectionID: Connection.ID) -> Action { + guard let connection = connections.closeConnectionIfIdle(connectionID) else { + return .none + } + return .init( + request: .none, + connection: .closeConnection(connection, isShutdown: .no) + ) + } + + mutating func connectionClosed(_ connectionID: Connection.ID) -> Action { + guard let (index, context) = self.connections.failConnection(connectionID) else { + // When a connection close is initiated by the connection pool, the connection will + // still report its close to the state machine. In those cases we must ignore the + // event. + return .none + } + return self.nextActionForFailedConnection(at: index, on: context.eventLoop) + } + + mutating func http1ConnectionReleased(_: Connection.ID) -> Action { + fatalError("TODO: implement \(#function)") + } + + mutating func shutdown() -> Action { + // If we have remaining request queued, we should fail all of them with a cancelled + // error. + let waitingRequests = self.requests.removeAll() + + var requestAction: StateMachine.RequestAction = .none + if !waitingRequests.isEmpty { + requestAction = .failRequestsAndCancelTimeouts(waitingRequests, HTTPClientError.cancelled) + } + + // clean up the connections, we can cleanup now! + let cleanupContext = self.connections.shutdown() + + // If there aren't any more connections, everything is shutdown + let isShutdown: StateMachine.ConnectionAction.IsShutdown + let unclean = !(cleanupContext.cancel.isEmpty && waitingRequests.isEmpty) + if self.connections.isEmpty { + isShutdown = .yes(unclean: unclean) + } else { + isShutdown = .no + } + return .init( + request: requestAction, + connection: .cleanupConnections(cleanupContext, isShutdown: isShutdown) + ) + } + } +} diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+RequestQueue.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+RequestQueue.swift index c58e18818..5756dedb8 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+RequestQueue.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+RequestQueue.swift @@ -70,6 +70,21 @@ extension HTTPConnectionPool { } } + /// removes up to `max` requests from the queue for the given `eventLoop` and returns them. + /// - Parameters: + /// - max: maximum number of requests to pop + /// - eventLoop: required event loop of the request + /// - Returns: requests for the given `eventLoop` + mutating func popFirst(max: Int, for eventLoop: EventLoop? = nil) -> [Request] { + if let eventLoop = eventLoop { + return self.withEventLoopQueue(for: eventLoop.id) { queue in + queue.popFirst(max: max) + } + } else { + return self.generalPurposeQueue.popFirst(max: max) + } + } + mutating func remove(_ requestID: Request.ID) -> Request? { if let eventLoopID = requestID.eventLoopID { return self.withEventLoopQueue(for: eventLoopID) { queue in @@ -118,3 +133,24 @@ extension HTTPConnectionPool { } } } + +extension CircularBuffer { + /// Removes up to `max` elements from the beginning of the + /// `CircularBuffer` and returns them. + /// + /// Calling this method may invalidate any existing indices for use with this + /// `CircularBuffer`. + /// + /// - Parameter max: The number of elements to remove. + /// `max` must be greater than or equal to zero. + /// - Returns: removed elements + /// + /// - Complexity: O(*k*), where *k* is the number of elements removed. + fileprivate mutating func popFirst(max: Int) -> [Element] { + precondition(max >= 0, "") + let elementCountToRemove = Swift.min(max, self.count) + let array = Array(self[self.startIndex.. () throws -> Void)] { + return [ + ("testCreatingOfConnection", testCreatingOfConnection), + ] + } +} diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift new file mode 100644 index 000000000..316aa081d --- /dev/null +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift @@ -0,0 +1,117 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the AsyncHTTPClient open source project +// +// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +@testable import AsyncHTTPClient +import NIOCore +import NIOEmbedded +import NIOHTTP1 +import NIOPosix +import XCTest + +private typealias Action = HTTPConnectionPool.StateMachine.Action +private typealias ConnectionAction = HTTPConnectionPool.StateMachine.ConnectionAction +private typealias RequestAction = HTTPConnectionPool.StateMachine.RequestAction + +class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { + func testCreatingOfConnection() { + let elg = EmbeddedEventLoopGroup(loops: 1) + let el1 = elg.next() + var connections = MockConnectionPool() + var queuer = MockRequestQueuer() + var state = HTTPConnectionPool.HTTP2StateMaschine(idGenerator: .init()) + + /// first request should create a new connection + let mockRequest = MockHTTPRequest(eventLoop: el1) + let request = HTTPConnectionPool.Request(mockRequest) + let executeAction = state.executeRequest(request) + + guard case .createConnection(let connID, let eventLoop) = executeAction.connection else { + return XCTFail("Unexpected connection action \(executeAction.connection)") + } + XCTAssertTrue(eventLoop === el1) + + XCTAssertEqual(executeAction.request, .scheduleRequestTimeout(for: request, on: mockRequest.eventLoop)) + + XCTAssertNoThrow(try connections.createConnection(connID, on: el1)) + XCTAssertNoThrow(try queuer.queue(mockRequest, id: request.id)) + + /// subsequent requests should not create a connection + for _ in 0..<9 { + let mockRequest = MockHTTPRequest(eventLoop: el1) + let request = HTTPConnectionPool.Request(mockRequest) + let action = state.executeRequest(request) + + XCTAssertEqual(action.connection, .none) + XCTAssertEqual(action.request, .scheduleRequestTimeout(for: request, on: mockRequest.eventLoop)) + + XCTAssertNoThrow(try queuer.queue(mockRequest, id: request.id)) + } + + /// connection establishment should result in 5 request executions because we set max concurrent streams to 5 + var maybeConn: HTTPConnectionPool.Connection? + XCTAssertNoThrow(maybeConn = try connections.succeedConnectionCreationHTTP2(connID, maxConcurrentStreams: 5)) + guard let conn = maybeConn else { + return XCTFail("unexpected throw") + } + let action = state.newHTTP2ConnectionEstablished(conn, maxConcurrentStreams: 5) + + XCTAssertEqual(action.connection, .none) + guard case .executeRequestsAndCancelTimeouts(let requests, conn) = action.request else { + return XCTFail("Unexpected request action \(action.request)") + } + XCTAssertEqual(requests.count, 5) + + for request in requests { + XCTAssertNoThrow(try queuer.get(request.id, request: request.__testOnly_wrapped_request())) + } + + /// closing a stream while we have requests queued should result in one request execution action + for _ in 0..<5 { + let action = state.http2ConnectionStreamClosed(connID) + XCTAssertEqual(action.connection, .none) + guard case .executeRequestsAndCancelTimeouts(let requests, conn) = action.request else { + return XCTFail("Unexpected request action \(action.request)") + } + XCTAssertEqual(requests.count, 1) + for request in requests { + XCTAssertNoThrow(try queuer.cancel(request.id)) + } + } + XCTAssertTrue(queuer.isEmpty) + + /// closing streams without any queued requests shouldn't do anything if it's *not* the last stream + for _ in 0..<4 { + let action = state.http2ConnectionStreamClosed(connID) + XCTAssertEqual(action.request, .none) + XCTAssertEqual(action.connection, .none) + } + + /// closing the last stream should schedule a idle timeout + let streamCloseAction = state.http2ConnectionStreamClosed(connID) + XCTAssertEqual(streamCloseAction.request, .none) + XCTAssertEqual(streamCloseAction.connection, .scheduleTimeoutTimer(connID, on: el1)) + + /// shutdown should only close one connection + let shutdownAction = state.shutdown() + XCTAssertEqual(shutdownAction.request, .none) + XCTAssertEqual(shutdownAction.connection, .cleanupConnections( + .init( + close: [conn], + cancel: [], + connectBackoff: [] + ), + isShutdown: .yes(unclean: false) + )) + } +} diff --git a/Tests/AsyncHTTPClientTests/Mocks/MockConnectionPool.swift b/Tests/AsyncHTTPClientTests/Mocks/MockConnectionPool.swift index 513cb42b6..009caa922 100644 --- a/Tests/AsyncHTTPClientTests/Mocks/MockConnectionPool.swift +++ b/Tests/AsyncHTTPClientTests/Mocks/MockConnectionPool.swift @@ -162,6 +162,14 @@ struct MockConnectionPool { self.state = .http1(.idle(parked: false, idleSince: .now())) } + mutating func http2Started(maxConcurrentStreams: Int) throws { + guard case .starting = self.state else { + throw Errors.connectionIsNotStarting + } + + self.state = .http2(.idle(maxConcurrentStreams: maxConcurrentStreams, parked: false, lastIdle: .now())) + } + mutating func park() throws { switch self.state { case .starting, .closed, .http1(.inUse), .http2(.inUse): @@ -333,6 +341,19 @@ struct MockConnectionPool { return .__testOnly_connection(id: connection.id, eventLoop: connection.eventLoop) } + mutating func succeedConnectionCreationHTTP2( + _ connectionID: Connection.ID, + maxConcurrentStreams: Int + ) throws -> Connection { + guard var connection = self.connections[connectionID] else { + throw Errors.connectionNotFound + } + + try connection.http2Started(maxConcurrentStreams: maxConcurrentStreams) + self.connections[connection.id] = connection + return .__testOnly_connection(id: connection.id, eventLoop: connection.eventLoop) + } + mutating func failConnectionCreation(_ connectionID: Connection.ID) throws { guard let connection = self.connections[connectionID] else { throw Errors.connectionNotFound diff --git a/Tests/LinuxMain.swift b/Tests/LinuxMain.swift index 988f6b194..1adb04801 100644 --- a/Tests/LinuxMain.swift +++ b/Tests/LinuxMain.swift @@ -43,6 +43,7 @@ import XCTest testCase(HTTPConnectionPool_HTTP1ConnectionsTests.allTests), testCase(HTTPConnectionPool_HTTP1StateMachineTests.allTests), testCase(HTTPConnectionPool_HTTP2ConnectionsTests.allTests), + testCase(HTTPConnectionPool_HTTP2StateMachineTests.allTests), testCase(HTTPConnectionPool_ManagerTests.allTests), testCase(HTTPConnectionPool_RequestQueueTests.allTests), testCase(HTTPRequestStateMachineTests.allTests), From 805e1f15629d93b1e1814b246b079f2a399f8a0f Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Thu, 30 Sep 2021 18:08:06 +0200 Subject: [PATCH 02/15] add `testConnectionFailureBackoff` and fix behaviour --- ...HTTPConnectionPool+HTTP2StateMachine.swift | 5 +- ...onnectionPool+HTTP2StateMachineTests.swift | 56 +++++++++++++++++++ 2 files changed, 58 insertions(+), 3 deletions(-) diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift index 6387e3b29..c73c2e492 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift @@ -245,11 +245,10 @@ extension HTTPConnectionPool { // The naming of `failConnection` is a little confusing here. All it does is moving the // connection state from `.backingOff` to `.closed` here. It also returns the // connection's index. - guard let (index, _) = self.connections.failConnection(connectionID) else { + guard let (index, context) = self.connections.failConnection(connectionID) else { preconditionFailure("Backing off a connection that is unknown to us?") } - let (newConnectionID, eventLoop) = self.connections.createNewConnectionByReplacingClosedConnection(at: index) - return .init(request: .none, connection: .createConnection(newConnectionID, on: eventLoop)) + return nextActionForFailedConnection(at: index, on: context.eventLoop) } mutating func timeoutRequest(_ requestID: Request.ID) -> Action { diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift index 316aa081d..b0d89e450 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift @@ -114,4 +114,60 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { isShutdown: .yes(unclean: false) )) } + + func testConnectionFailureBackoff() { + let elg = EmbeddedEventLoopGroup(loops: 4) + defer { XCTAssertNoThrow(try elg.syncShutdownGracefully()) } + + var state = HTTPConnectionPool.HTTP2StateMaschine( + idGenerator: .init() + ) + + let mockRequest = MockHTTPRequest(eventLoop: elg.next()) + let request = HTTPConnectionPool.Request(mockRequest) + + let action = state.executeRequest(request) + XCTAssertEqual(.scheduleRequestTimeout(for: request, on: mockRequest.eventLoop), action.request) + + // 1. connection attempt + guard case .createConnection(let connectionID, on: let connectionEL) = action.connection else { + return XCTFail("Unexpected connection action: \(action.connection)") + } + XCTAssert(connectionEL === mockRequest.eventLoop) // XCTAssertIdentical not available on Linux + + let failedConnect1 = state.failedToCreateNewConnection(HTTPClientError.connectTimeout, connectionID: connectionID) + XCTAssertEqual(failedConnect1.request, .none) + guard case .scheduleBackoffTimer(connectionID, let backoffTimeAmount1, _) = failedConnect1.connection else { + return XCTFail("Unexpected connection action: \(failedConnect1.connection)") + } + + // 2. connection attempt + let backoffDoneAction = state.connectionCreationBackoffDone(connectionID) + XCTAssertEqual(backoffDoneAction.request, .none) + guard case .createConnection(let newConnectionID, on: let newEventLoop) = backoffDoneAction.connection else { + return XCTFail("Unexpected connection action: \(backoffDoneAction.connection)") + } + XCTAssertGreaterThan(newConnectionID, connectionID) + XCTAssert(connectionEL === newEventLoop) // XCTAssertIdentical not available on Linux + + let failedConnect2 = state.failedToCreateNewConnection(HTTPClientError.connectTimeout, connectionID: newConnectionID) + XCTAssertEqual(failedConnect2.request, .none) + guard case .scheduleBackoffTimer(newConnectionID, let backoffTimeAmount2, _) = failedConnect2.connection else { + return XCTFail("Unexpected connection action: \(failedConnect2.connection)") + } + + XCTAssertNotEqual(backoffTimeAmount2, backoffTimeAmount1) + + // 3. request times out + let failRequest = state.timeoutRequest(request.id) + guard case .failRequest(let requestToFail, let requestError, cancelTimeout: false) = failRequest.request else { + return XCTFail("Unexpected request action: \(action.request)") + } + XCTAssert(requestToFail.__testOnly_wrapped_request() === mockRequest) // XCTAssertIdentical not available on Linux + XCTAssertEqual(requestError as? HTTPClientError, .connectTimeout) + XCTAssertEqual(failRequest.connection, .none) + + // 4. retry connection, but no more queued requests. + XCTAssertEqual(state.connectionCreationBackoffDone(newConnectionID), .none) + } } From c6244314d627fb2daf2501f2ef81cc1dea14e8ed Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Thu, 30 Sep 2021 18:29:01 +0200 Subject: [PATCH 03/15] add `testConnectionFailureBackoff` and `testCancelRequestWorks` --- ...onnectionPool+HTTP2StateMachineTests.swift | 37 +++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift index b0d89e450..afc3e106c 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift @@ -170,4 +170,41 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { // 4. retry connection, but no more queued requests. XCTAssertEqual(state.connectionCreationBackoffDone(newConnectionID), .none) } + + func testCancelRequestWorks() { + let elg = EmbeddedEventLoopGroup(loops: 4) + defer { XCTAssertNoThrow(try elg.syncShutdownGracefully()) } + + var state = HTTPConnectionPool.HTTP2StateMaschine( + idGenerator: .init() + ) + + let mockRequest = MockHTTPRequest(eventLoop: elg.next()) + let request = HTTPConnectionPool.Request(mockRequest) + + let executeAction = state.executeRequest(request) + XCTAssertEqual(.scheduleRequestTimeout(for: request, on: mockRequest.eventLoop), executeAction.request) + + // 1. connection attempt + guard case .createConnection(let connectionID, on: let connectionEL) = executeAction.connection else { + return XCTFail("Unexpected connection action: \(executeAction.connection)") + } + XCTAssert(connectionEL === mockRequest.eventLoop) // XCTAssertIdentical not available on Linux + + // 2. cancel request + let cancelAction = state.cancelRequest(request.id) + XCTAssertEqual(cancelAction.request, .cancelRequestTimeout(request.id)) + XCTAssertEqual(cancelAction.connection, .none) + + // 3. request timeout triggers to late + XCTAssertEqual(state.timeoutRequest(request.id), .none, "To late timeout is ignored") + + // 4. succeed connection attempt + let connectedAction = state.newHTTP2ConnectionEstablished( + .__testOnly_connection(id: connectionID, eventLoop: connectionEL), + maxConcurrentStreams: 100 + ) + XCTAssertEqual(connectedAction.request, .none, "Request must not be executed") + XCTAssertEqual(connectedAction.connection, .scheduleTimeoutTimer(connectionID, on: connectionEL)) + } } From 902ed8b2dd8b1330230b2cf9f5e7f1ab2144cc50 Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Thu, 30 Sep 2021 19:13:00 +0200 Subject: [PATCH 04/15] add `state` to `HTTP2StateMaschine` and add `testExecuteOnShuttingDownPool` --- ...HTTPConnectionPool+HTTP2StateMachine.swift | 171 +++++++++++++----- ...onPool+HTTP2StateMachineTests+XCTest.swift | 3 + ...onnectionPool+HTTP2StateMachineTests.swift | 58 +++++- 3 files changed, 180 insertions(+), 52 deletions(-) diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift index c73c2e492..949ec915e 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift @@ -21,6 +21,12 @@ extension HTTPConnectionPool { typealias RequestAction = HTTPConnectionPool.StateMachine.RequestAction typealias ConnectionAction = HTTPConnectionPool.StateMachine.ConnectionAction + private enum State: Equatable { + case running + case shuttingDown(unclean: Bool) + case shutDown + } + private var lastConnectFailure: Error? private var failedConsecutiveConnectionAttempts = 0 @@ -31,6 +37,8 @@ extension HTTPConnectionPool { private let idGenerator: Connection.ID.Generator + private var state: State = .running + init( idGenerator: Connection.ID.Generator ) { @@ -68,10 +76,24 @@ extension HTTPConnectionPool { } mutating func executeRequest(_ request: Request) -> Action { - if let eventLoop = request.requiredEventLoop { - return self.executeRequest(request, onRequired: eventLoop) - } else { - return self.executeRequest(request, onPreferred: request.preferredEventLoop) + switch self.state { + case .running: + if let eventLoop = request.requiredEventLoop { + return self.executeRequest(request, onRequired: eventLoop) + } else { + return self.executeRequest(request, onPreferred: request.preferredEventLoop) + } + case .shutDown, .shuttingDown: + // it is fairly unlikely that this condition is met, since the ConnectionPoolManager + // also fails new requests immediately, if it is shutting down. However there might + // be race conditions in which a request passes through a running connection pool + // manager, but hits a connection pool that is already shutting down. + // + // (Order in one lock does not guarantee order in the next lock!) + return .init( + request: .failRequest(request, HTTPClientError.alreadyShutdown, cancelTimeout: false), + connection: .none + ) } } @@ -149,36 +171,57 @@ extension HTTPConnectionPool { at index: Int, context: HTTP2Connections.AvailableConnectionContext ) -> Action { - // We prioritise requests with a required event loop over those without a requirement. - // This can cause starvation for request without a required event loop. - // We should come up with a better algorithm in the future. - - var requestsToExecute = self.requests.popFirst(max: context.availableStreams, for: context.eventLoop) - let remainingAvailableStreams = context.availableStreams - requestsToExecute.count - // use the remaining available streams for requests without a required event loop - requestsToExecute += self.requests.popFirst(max: remainingAvailableStreams, for: nil) - let connection = self.connections.leaseStreams(at: index, count: requestsToExecute.count) - - let requestAction = { () -> RequestAction in - if requestsToExecute.isEmpty { - return .none - } else { - return .executeRequestsAndCancelTimeouts(requestsToExecute, connection) - } - }() + switch self.state { + case .running: + // We prioritise requests with a required event loop over those without a requirement. + // This can cause starvation for request without a required event loop. + // We should come up with a better algorithm in the future. + + var requestsToExecute = self.requests.popFirst(max: context.availableStreams, for: context.eventLoop) + let remainingAvailableStreams = context.availableStreams - requestsToExecute.count + // use the remaining available streams for requests without a required event loop + requestsToExecute += self.requests.popFirst(max: remainingAvailableStreams, for: nil) + let connection = self.connections.leaseStreams(at: index, count: requestsToExecute.count) + + let requestAction = { () -> RequestAction in + if requestsToExecute.isEmpty { + return .none + } else { + return .executeRequestsAndCancelTimeouts(requestsToExecute, connection) + } + }() + + let connectionAction = { () -> ConnectionAction in + if context.isIdle, requestsToExecute.isEmpty { + return .scheduleTimeoutTimer(connection.id, on: context.eventLoop) + } else { + return .none + } + }() - let connectionAction = { () -> ConnectionAction in - if context.isIdle, requestsToExecute.isEmpty { - return .scheduleTimeoutTimer(connection.id, on: context.eventLoop) - } else { + return .init( + request: requestAction, + connection: connectionAction + ) + case .shuttingDown(let unclean): + guard context.isIdle else { return .none } - }() - return .init( - request: requestAction, - connection: connectionAction - ) + let connection = self.connections.closeConnection(at: index) + if self.connections.isEmpty { + return .init( + request: .none, + connection: .closeConnection(connection, isShutdown: .yes(unclean: unclean)) + ) + } + return .init( + request: .none, + connection: .closeConnection(connection, isShutdown: .no) + ) + case .shutDown: + preconditionFailure("It the pool is already shutdown, all connections must have been torn down.") + } } mutating func newHTTP2MaxConcurrentStreamsReceived(_ connectionID: Connection.ID, newMaxStreams: Int) -> Action { @@ -199,32 +242,53 @@ extension HTTPConnectionPool { } private mutating func nextActionForFailedConnection(at index: Int, on eventLoop: EventLoop) -> Action { - let hasPendingRequest = !self.requests.isEmpty(for: eventLoop) || !self.requests.isEmpty(for: nil) - guard hasPendingRequest else { - return .none - } + switch self.state { + case .running: + let hasPendingRequest = !self.requests.isEmpty(for: eventLoop) || !self.requests.isEmpty(for: nil) + guard hasPendingRequest else { + return .none + } - let (newConnectionID, previousEventLoop) = self.connections.createNewConnectionByReplacingClosedConnection(at: index) - precondition(previousEventLoop === eventLoop) + let (newConnectionID, previousEventLoop) = self.connections.createNewConnectionByReplacingClosedConnection(at: index) + precondition(previousEventLoop === eventLoop) - return .init( - request: .none, - connection: .createConnection(newConnectionID, on: eventLoop) - ) + return .init( + request: .none, + connection: .createConnection(newConnectionID, on: eventLoop) + ) + case .shuttingDown(let unclean): + assert(self.requests.isEmpty) + self.connections.removeConnection(at: index) + if self.connections.isEmpty { + return .init( + request: .none, + connection: .cleanupConnections(.init(), isShutdown: .yes(unclean: unclean)) + ) + } + return .none + + case .shutDown: + preconditionFailure("If the pool is already shutdown, all connections must have been torn down.") + } } private mutating func nextActionForClosingConnection(on eventLoop: EventLoop) -> Action { - let hasPendingRequest = !self.requests.isEmpty(for: eventLoop) || !self.requests.isEmpty(for: nil) - guard hasPendingRequest else { - return .none - } + switch self.state { + case .running: + let hasPendingRequest = !self.requests.isEmpty(for: eventLoop) || !self.requests.isEmpty(for: nil) + guard hasPendingRequest else { + return .none + } - let newConnectionID = self.connections.createNewConnection(on: eventLoop) + let newConnectionID = self.connections.createNewConnection(on: eventLoop) - return .init( - request: .none, - connection: .createConnection(newConnectionID, on: eventLoop) - ) + return .init( + request: .none, + connection: .createConnection(newConnectionID, on: eventLoop) + ) + case .shutDown, .shuttingDown: + return .none + } } mutating func http2ConnectionStreamClosed(_ connectionID: Connection.ID) -> Action { @@ -248,7 +312,7 @@ extension HTTPConnectionPool { guard let (index, context) = self.connections.failConnection(connectionID) else { preconditionFailure("Backing off a connection that is unknown to us?") } - return nextActionForFailedConnection(at: index, on: context.eventLoop) + return self.nextActionForFailedConnection(at: index, on: context.eventLoop) } mutating func timeoutRequest(_ requestID: Request.ID) -> Action { @@ -289,8 +353,13 @@ extension HTTPConnectionPool { mutating func connectionIdleTimeout(_ connectionID: Connection.ID) -> Action { guard let connection = connections.closeConnectionIfIdle(connectionID) else { + // because of a race this connection (connection close runs against trigger of timeout) + // was already removed from the state machine. return .none } + + precondition(self.state == .running, "If we are shutting down, we must not have any idle connections") + return .init( request: .none, connection: .closeConnection(connection, isShutdown: .no) @@ -329,8 +398,10 @@ extension HTTPConnectionPool { let unclean = !(cleanupContext.cancel.isEmpty && waitingRequests.isEmpty) if self.connections.isEmpty { isShutdown = .yes(unclean: unclean) + self.state = .shutDown } else { isShutdown = .no + self.state = .shuttingDown(unclean: unclean) } return .init( request: requestAction, diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests+XCTest.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests+XCTest.swift index 572859cde..3ec9bc22b 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests+XCTest.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests+XCTest.swift @@ -26,6 +26,9 @@ extension HTTPConnectionPool_HTTP2StateMachineTests { static var allTests: [(String, (HTTPConnectionPool_HTTP2StateMachineTests) -> () throws -> Void)] { return [ ("testCreatingOfConnection", testCreatingOfConnection), + ("testConnectionFailureBackoff", testConnectionFailureBackoff), + ("testCancelRequestWorks", testCancelRequestWorks), + ("testExecuteOnShuttingDownPool", testExecuteOnShuttingDownPool), ] } } diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift index afc3e106c..1b36ebe37 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift @@ -114,7 +114,7 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { isShutdown: .yes(unclean: false) )) } - + func testConnectionFailureBackoff() { let elg = EmbeddedEventLoopGroup(loops: 4) defer { XCTAssertNoThrow(try elg.syncShutdownGracefully()) } @@ -170,7 +170,7 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { // 4. retry connection, but no more queued requests. XCTAssertEqual(state.connectionCreationBackoffDone(newConnectionID), .none) } - + func testCancelRequestWorks() { let elg = EmbeddedEventLoopGroup(loops: 4) defer { XCTAssertNoThrow(try elg.syncShutdownGracefully()) } @@ -207,4 +207,58 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { XCTAssertEqual(connectedAction.request, .none, "Request must not be executed") XCTAssertEqual(connectedAction.connection, .scheduleTimeoutTimer(connectionID, on: connectionEL)) } + + func testExecuteOnShuttingDownPool() { + let elg = EmbeddedEventLoopGroup(loops: 4) + defer { XCTAssertNoThrow(try elg.syncShutdownGracefully()) } + + var state = HTTPConnectionPool.HTTP2StateMaschine( + idGenerator: .init() + ) + + let mockRequest = MockHTTPRequest(eventLoop: elg.next()) + let request = HTTPConnectionPool.Request(mockRequest) + + let executeAction = state.executeRequest(request) + XCTAssertEqual(.scheduleRequestTimeout(for: request, on: mockRequest.eventLoop), executeAction.request) + + // 1. connection attempt + guard case .createConnection(let connectionID, on: let connectionEL) = executeAction.connection else { + return XCTFail("Unexpected connection action: \(executeAction.connection)") + } + XCTAssert(connectionEL === mockRequest.eventLoop) // XCTAssertIdentical not available on Linux + + // 2. connection succeeds + let connection: HTTPConnectionPool.Connection = .__testOnly_connection(id: connectionID, eventLoop: connectionEL) + let connectedAction = state.newHTTP2ConnectionEstablished(connection, maxConcurrentStreams: 100) + guard case .executeRequestsAndCancelTimeouts([request], connection) = connectedAction.request else { + return XCTFail("Unexpected request action: \(connectedAction.request)") + } + XCTAssert(request.__testOnly_wrapped_request() === mockRequest) // XCTAssertIdentical not available on Linux + XCTAssertEqual(connectedAction.connection, .none) + + // 3. shutdown + let shutdownAction = state.shutdown() + XCTAssertEqual(.none, shutdownAction.request) + guard case .cleanupConnections(let cleanupContext, isShutdown: .no) = shutdownAction.connection else { + return XCTFail("Unexpected connection action: \(shutdownAction.connection)") + } + + XCTAssertEqual(cleanupContext.cancel.count, 1) + XCTAssertEqual(cleanupContext.cancel.first?.id, connectionID) + XCTAssertEqual(cleanupContext.close, []) + XCTAssertEqual(cleanupContext.connectBackoff, []) + + // 4. execute another request + let finalMockRequest = MockHTTPRequest(eventLoop: elg.next()) + let finalRequest = HTTPConnectionPool.Request(finalMockRequest) + let failAction = state.executeRequest(finalRequest) + XCTAssertEqual(failAction.connection, .none) + XCTAssertEqual(failAction.request, .failRequest(finalRequest, HTTPClientError.alreadyShutdown, cancelTimeout: false)) + + // 5. close open connection + let closeAction = state.connectionClosed(connectionID) + XCTAssertEqual(closeAction.connection, .cleanupConnections(.init(), isShutdown: .yes(unclean: true))) + XCTAssertEqual(closeAction.request, .none) + } } From 7540c0be13322a791fcb053980a5ee9d5131570c Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Fri, 1 Oct 2021 13:00:06 +0200 Subject: [PATCH 05/15] handle http1 connection release and close --- ...HTTPConnectionPool+HTTP2StateMachine.swift | 52 ++++++++++++++++--- 1 file changed, 45 insertions(+), 7 deletions(-) diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift index 949ec915e..ac25bc114 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift @@ -236,6 +236,9 @@ extension HTTPConnectionPool { mutating func http2ConnectionClosed(_ connectionID: Connection.ID) -> Action { guard let (index, context) = self.connections.failConnection(connectionID) else { + // When a connection close is initiated by the connection pool, the connection will + // still report its close to the state machine. In those cases we must ignore the + // event. return .none } return self.nextActionForFailedConnection(at: index, on: context.eventLoop) @@ -367,17 +370,52 @@ extension HTTPConnectionPool { } mutating func connectionClosed(_ connectionID: Connection.ID) -> Action { - guard let (index, context) = self.connections.failConnection(connectionID) else { - // When a connection close is initiated by the connection pool, the connection will - // still report its close to the state machine. In those cases we must ignore the - // event. + guard let index = self.http1Connections?.failConnection(connectionID)?.0 else { return .none } - return self.nextActionForFailedConnection(at: index, on: context.eventLoop) + self.http1Connections!.removeConnection(at: index) + if self.http1Connections!.isEmpty { + self.http1Connections = nil + } + switch state { + case .running: + return .none + case .shuttingDown(let unclean): + if self.http1Connections == nil && self.connections.isEmpty { + return .init( + request: .none, + connection: .cleanupConnections(.init(), isShutdown: .yes(unclean: unclean)) + ) + } else { + return .none + } + case .shutDown: + preconditionFailure("If the pool is already shutdown, all connections must have been torn down.") + } } - mutating func http1ConnectionReleased(_: Connection.ID) -> Action { - fatalError("TODO: implement \(#function)") + mutating func http1ConnectionReleased(_ connectionID: Connection.ID) -> Action { + // It is save to bang the http1Connections here. If we get this callback but we don't have + // http1 connections something has gone terribly wrong. + let (index, _) = self.http1Connections!.releaseConnection(connectionID) + // Any http1 connection that becomes idle should be closed right away after the transition + // to http2. + let connection = self.http1Connections!.closeConnection(at: index) + if self.http1Connections!.isEmpty { + self.http1Connections = nil + } + switch state { + case .running: + return .init(request: .none, connection: .closeConnection(connection, isShutdown: .no)) + case .shuttingDown(let unclean): + if self.http1Connections == nil && self.connections.isEmpty { + return .init(request: .none, connection: .closeConnection(connection, isShutdown: .yes(unclean: unclean))) + } else { + return .init(request: .none, connection: .closeConnection(connection, isShutdown: .no)) + } + case .shutDown: + preconditionFailure("If the pool is already shutdown, all connections must have been torn down.") + } } mutating func shutdown() -> Action { From 0148d16f04652a67abeebcd8284494e3e4b8550a Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Fri, 1 Oct 2021 13:15:06 +0200 Subject: [PATCH 06/15] Renamed generic function connectionClosed to specific http1ConnectionClose --- .../State Machine/HTTPConnectionPool+HTTP2StateMachine.swift | 2 +- .../HTTPConnectionPool+HTTP2StateMachineTests.swift | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift index ac25bc114..6277ea82d 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift @@ -369,7 +369,7 @@ extension HTTPConnectionPool { ) } - mutating func connectionClosed(_ connectionID: Connection.ID) -> Action { + mutating func http1ConnectionClose(_ connectionID: Connection.ID) -> Action { guard let index = self.http1Connections?.failConnection(connectionID)?.0 else { return .none } diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift index 1b36ebe37..b68479d06 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift @@ -257,7 +257,7 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { XCTAssertEqual(failAction.request, .failRequest(finalRequest, HTTPClientError.alreadyShutdown, cancelTimeout: false)) // 5. close open connection - let closeAction = state.connectionClosed(connectionID) + let closeAction = state.http2ConnectionClosed(connectionID) XCTAssertEqual(closeAction.connection, .cleanupConnections(.init(), isShutdown: .yes(unclean: true))) XCTAssertEqual(closeAction.request, .none) } From 65d24d030020912f7bb26f16016629e58208b524 Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Fri, 1 Oct 2021 16:40:01 +0200 Subject: [PATCH 07/15] add testHTTP1ToHTTP2MigrationAndShutdownIfFirstConnectionIsHTTP1 --- ...HTTPConnectionPool+HTTP1StateMachine.swift | 6 +- ...HTTPConnectionPool+HTTP2StateMachine.swift | 6 +- ...onnectionPool+HTTP2StateMachineTests.swift | 65 +++++++++++++++++++ 3 files changed, 71 insertions(+), 6 deletions(-) diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1StateMachine.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1StateMachine.swift index bd1eaeff1..0790f70db 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1StateMachine.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1StateMachine.swift @@ -24,12 +24,12 @@ extension HTTPConnectionPool { typealias Action = HTTPConnectionPool.StateMachine.Action - private var connections: HTTP1Connections + private(set) var connections: HTTP1Connections private var failedConsecutiveConnectionAttempts: Int = 0 /// the error from the last connection creation private var lastConnectFailure: Error? - private var requests: RequestQueue + private(set) var requests: RequestQueue private var state: State = .running init(idGenerator: Connection.ID.Generator, maximumConcurrentConnections: Int) { @@ -41,7 +41,7 @@ extension HTTPConnectionPool { self.requests = RequestQueue() } - // MARK: - Events - + // MARK: - Events mutating func executeRequest(_ request: Request) -> Action { switch self.state { diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift index 6277ea82d..4c30daddf 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift @@ -369,7 +369,7 @@ extension HTTPConnectionPool { ) } - mutating func http1ConnectionClose(_ connectionID: Connection.ID) -> Action { + mutating func http1ConnectionClosed(_ connectionID: Connection.ID) -> Action { guard let index = self.http1Connections?.failConnection(connectionID)?.0 else { return .none } @@ -433,8 +433,8 @@ extension HTTPConnectionPool { // If there aren't any more connections, everything is shutdown let isShutdown: StateMachine.ConnectionAction.IsShutdown - let unclean = !(cleanupContext.cancel.isEmpty && waitingRequests.isEmpty) - if self.connections.isEmpty { + let unclean = !(cleanupContext.cancel.isEmpty && waitingRequests.isEmpty && self.http1Connections == nil) + if self.connections.isEmpty && self.http1Connections == nil { isShutdown = .yes(unclean: unclean) self.state = .shutDown } else { diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift index b68479d06..4416adb35 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift @@ -261,4 +261,69 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { XCTAssertEqual(closeAction.connection, .cleanupConnections(.init(), isShutdown: .yes(unclean: true))) XCTAssertEqual(closeAction.request, .none) } + + func testHTTP1ToHTTP2MigrationAndShutdownIfFirstConnectionIsHTTP1() { + let elg = EmbeddedEventLoopGroup(loops: 4) + let el1 = elg.next() + + let idGenerator = HTTPConnectionPool.Connection.ID.Generator() + var http1State = HTTPConnectionPool.HTTP1StateMachine(idGenerator: idGenerator, maximumConcurrentConnections: 8) + + let mockRequest1 = MockHTTPRequest(eventLoop: el1) + let request1 = HTTPConnectionPool.Request(mockRequest1) + let mockRequest2 = MockHTTPRequest(eventLoop: el1) + let request2 = HTTPConnectionPool.Request(mockRequest2) + + let executeAction1 = http1State.executeRequest(request1) + XCTAssertEqual(executeAction1.request, .scheduleRequestTimeout(for: request1, on: el1)) + guard case .createConnection(let conn1ID, _) = executeAction1.connection else { + return XCTFail("unexpected connection action \(executeAction1.connection)") + } + let executeAction2 = http1State.executeRequest(request2) + XCTAssertEqual(executeAction2.request, .scheduleRequestTimeout(for: request2, on: el1)) + guard case .createConnection(let conn2ID, _) = executeAction2.connection else { + return XCTFail("unexpected connection action \(executeAction2.connection)") + } + + // first connection is a HTTP1 connection + let conn1: HTTPConnectionPool.Connection = .__testOnly_connection(id: conn1ID, eventLoop: el1) + let conn1Action = http1State.newHTTP1ConnectionEstablished(conn1) + XCTAssertEqual(conn1Action.connection, .none) + XCTAssertEqual(conn1Action.request, .executeRequest(request1, conn1, cancelTimeout: true)) + + + // second connection is a HTTP2 connection and we need to migrate + let conn2: HTTPConnectionPool.Connection = .__testOnly_connection(id: conn2ID, eventLoop: el1) + var http2State = HTTPConnectionPool.HTTP2StateMaschine(idGenerator: idGenerator) + + let migrationAction = http2State.migrateConnectionsFromHTTP1( + connections: http1State.connections, + requests: http1State.requests + ) + XCTAssertEqual(migrationAction, .none) + + let http2ConnectAction = http2State.newHTTP2ConnectionEstablished(conn2, maxConcurrentStreams: 100) + XCTAssertEqual(http2ConnectAction.connection, .none) + guard case .executeRequestsAndCancelTimeouts([request2], conn2) = http2ConnectAction.request else { + return XCTFail("Unexpected request action \(http2ConnectAction.request)") + } + + // second request is done first + let closeAction = http2State.http2ConnectionStreamClosed(conn2ID) + XCTAssertEqual(closeAction.request, .none) + XCTAssertEqual(closeAction.connection, .scheduleTimeoutTimer(conn2ID, on: el1)) + + + let shutdownAction = http2State.shutdown() + XCTAssertEqual(shutdownAction.request, .none) + XCTAssertEqual(shutdownAction.connection, .cleanupConnections(.init( + close: [conn2], + cancel: [], + connectBackoff: [] + ), isShutdown: .no)) + + let releaseAction = http2State.http1ConnectionReleased(conn1ID) + XCTAssertEqual(releaseAction.request, .none) + XCTAssertEqual(releaseAction.connection, .closeConnection(conn1, isShutdown: .yes(unclean: true))) + } } From 5dedf32b396e925fd5e1015b68e042fb82984d40 Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Fri, 1 Oct 2021 18:43:35 +0200 Subject: [PATCH 08/15] fix review comments --- .../HTTPConnectionPool+HTTP1Connections.swift | 8 ++++---- .../HTTPConnectionPool+HTTP2StateMachine.swift | 14 ++++++++++---- .../HTTPConnectionPool+RequestQueue.swift | 2 +- ...HTTPConnectionPool+HTTP2StateMachineTests.swift | 10 +++++----- 4 files changed, 20 insertions(+), 14 deletions(-) diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1Connections.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1Connections.swift index 018447ba7..b0317eef4 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1Connections.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1Connections.swift @@ -195,7 +195,7 @@ extension HTTPConnectionPool { case keepConnection } - func migrateToHTTP2(_ context: inout HTTP1Connections.HTTP2ToHTTP1MigrationContext) -> MigrateAction { + func migrateToHTTP2(_ context: inout HTTP1Connections.HTTP1ToHTTP2MigrationContext) -> MigrateAction { switch self.state { case .starting: context.starting.append((self.connectionID, self.eventLoop)) @@ -322,7 +322,7 @@ extension HTTPConnectionPool { var connectionsStartingForUseCase: Int } - struct HTTP2ToHTTP1MigrationContext { + struct HTTP1ToHTTP2MigrationContext { var backingOff: [(Connection.ID, EventLoop)] = [] var starting: [(Connection.ID, EventLoop)] = [] var close: [Connection] = [] @@ -517,8 +517,8 @@ extension HTTPConnectionPool { // MARK: Migration - mutating func migrateToHTTP2() -> HTTP2ToHTTP1MigrationContext { - var migrationContext = HTTP2ToHTTP1MigrationContext() + mutating func migrateToHTTP2() -> HTTP1ToHTTP2MigrationContext { + var migrationContext = HTTP1ToHTTP2MigrationContext() self.connections.removeAll { connection in switch connection.migrateToHTTP2(&migrationContext) { case .removeConnection: diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift index 4c30daddf..c60811595 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift @@ -16,7 +16,7 @@ import NIOCore import NIOHTTP2 extension HTTPConnectionPool { - struct HTTP2StateMaschine { + struct HTTP2StateMachine { typealias Action = HTTPConnectionPool.StateMachine.Action typealias RequestAction = HTTPConnectionPool.StateMachine.RequestAction typealias ConnectionAction = HTTPConnectionPool.StateMachine.ConnectionAction @@ -70,6 +70,7 @@ extension HTTPConnectionPool { self.requests = requests // TODO: Close all idle connections from context.close + // TODO: Start new http2 connections for // TODO: Potentially cancel unneeded bootstraps (Needs cancellable ClientBootstrap) return .none @@ -401,14 +402,19 @@ extension HTTPConnectionPool { // Any http1 connection that becomes idle should be closed right away after the transition // to http2. let connection = self.http1Connections!.closeConnection(at: index) - if self.http1Connections!.isEmpty { - self.http1Connections = nil + guard self.http1Connections!.isEmpty else { + return .init(request: .none, connection: .closeConnection(connection, isShutdown: .no)) } + // if there are no more http1Connections, we can remove the struct. + self.http1Connections = nil + + // we must also check, if we are shutting down. Was this maybe out last connection? switch state { case .running: return .init(request: .none, connection: .closeConnection(connection, isShutdown: .no)) case .shuttingDown(let unclean): - if self.http1Connections == nil && self.connections.isEmpty { + if self.connections.isEmpty { + // if the http2connections are empty as well, there are no more connections. Shutdown completed. return .init(request: .none, connection: .closeConnection(connection, isShutdown: .yes(unclean: unclean))) } else { return .init(request: .none, connection: .closeConnection(connection, isShutdown: .no)) diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+RequestQueue.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+RequestQueue.swift index 5756dedb8..74707e3f3 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+RequestQueue.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+RequestQueue.swift @@ -147,7 +147,7 @@ extension CircularBuffer { /// /// - Complexity: O(*k*), where *k* is the number of elements removed. fileprivate mutating func popFirst(max: Int) -> [Element] { - precondition(max >= 0, "") + precondition(max >= 0) let elementCountToRemove = Swift.min(max, self.count) let array = Array(self[self.startIndex.. Date: Fri, 1 Oct 2021 18:44:15 +0200 Subject: [PATCH 09/15] format and linux XCTest --- ...HTTPConnectionPool+HTTP2StateMachine.swift | 10 ++++----- ...onPool+HTTP2StateMachineTests+XCTest.swift | 1 + ...onnectionPool+HTTP2StateMachineTests.swift | 22 +++++++++---------- 3 files changed, 16 insertions(+), 17 deletions(-) diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift index c60811595..3298ef99d 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift @@ -378,11 +378,11 @@ extension HTTPConnectionPool { if self.http1Connections!.isEmpty { self.http1Connections = nil } - switch state { + switch self.state { case .running: return .none case .shuttingDown(let unclean): - if self.http1Connections == nil && self.connections.isEmpty { + if self.http1Connections == nil, self.connections.isEmpty { return .init( request: .none, connection: .cleanupConnections(.init(), isShutdown: .yes(unclean: unclean)) @@ -407,9 +407,9 @@ extension HTTPConnectionPool { } // if there are no more http1Connections, we can remove the struct. self.http1Connections = nil - + // we must also check, if we are shutting down. Was this maybe out last connection? - switch state { + switch self.state { case .running: return .init(request: .none, connection: .closeConnection(connection, isShutdown: .no)) case .shuttingDown(let unclean): @@ -440,7 +440,7 @@ extension HTTPConnectionPool { // If there aren't any more connections, everything is shutdown let isShutdown: StateMachine.ConnectionAction.IsShutdown let unclean = !(cleanupContext.cancel.isEmpty && waitingRequests.isEmpty && self.http1Connections == nil) - if self.connections.isEmpty && self.http1Connections == nil { + if self.connections.isEmpty, self.http1Connections == nil { isShutdown = .yes(unclean: unclean) self.state = .shutDown } else { diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests+XCTest.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests+XCTest.swift index 3ec9bc22b..aca5387bd 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests+XCTest.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests+XCTest.swift @@ -29,6 +29,7 @@ extension HTTPConnectionPool_HTTP2StateMachineTests { ("testConnectionFailureBackoff", testConnectionFailureBackoff), ("testCancelRequestWorks", testCancelRequestWorks), ("testExecuteOnShuttingDownPool", testExecuteOnShuttingDownPool), + ("testHTTP1ToHTTP2MigrationAndShutdownIfFirstConnectionIsHTTP1", testHTTP1ToHTTP2MigrationAndShutdownIfFirstConnectionIsHTTP1), ] } } diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift index a0b5462b7..7cd6f5b7c 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift @@ -261,19 +261,19 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { XCTAssertEqual(closeAction.connection, .cleanupConnections(.init(), isShutdown: .yes(unclean: true))) XCTAssertEqual(closeAction.request, .none) } - + func testHTTP1ToHTTP2MigrationAndShutdownIfFirstConnectionIsHTTP1() { let elg = EmbeddedEventLoopGroup(loops: 4) let el1 = elg.next() - + let idGenerator = HTTPConnectionPool.Connection.ID.Generator() var http1State = HTTPConnectionPool.HTTP1StateMachine(idGenerator: idGenerator, maximumConcurrentConnections: 8) - + let mockRequest1 = MockHTTPRequest(eventLoop: el1) let request1 = HTTPConnectionPool.Request(mockRequest1) let mockRequest2 = MockHTTPRequest(eventLoop: el1) let request2 = HTTPConnectionPool.Request(mockRequest2) - + let executeAction1 = http1State.executeRequest(request1) XCTAssertEqual(executeAction1.request, .scheduleRequestTimeout(for: request1, on: el1)) guard case .createConnection(let conn1ID, _) = executeAction1.connection else { @@ -284,36 +284,34 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { guard case .createConnection(let conn2ID, _) = executeAction2.connection else { return XCTFail("unexpected connection action \(executeAction2.connection)") } - + // first connection is a HTTP1 connection let conn1: HTTPConnectionPool.Connection = .__testOnly_connection(id: conn1ID, eventLoop: el1) let conn1Action = http1State.newHTTP1ConnectionEstablished(conn1) XCTAssertEqual(conn1Action.connection, .none) XCTAssertEqual(conn1Action.request, .executeRequest(request1, conn1, cancelTimeout: true)) - // second connection is a HTTP2 connection and we need to migrate let conn2: HTTPConnectionPool.Connection = .__testOnly_connection(id: conn2ID, eventLoop: el1) var http2State = HTTPConnectionPool.HTTP2StateMachine(idGenerator: idGenerator) - + let migrationAction = http2State.migrateConnectionsFromHTTP1( connections: http1State.connections, requests: http1State.requests ) XCTAssertEqual(migrationAction, .none) - + let http2ConnectAction = http2State.newHTTP2ConnectionEstablished(conn2, maxConcurrentStreams: 100) XCTAssertEqual(http2ConnectAction.connection, .none) guard case .executeRequestsAndCancelTimeouts([request2], conn2) = http2ConnectAction.request else { return XCTFail("Unexpected request action \(http2ConnectAction.request)") } - + // second request is done first let closeAction = http2State.http2ConnectionStreamClosed(conn2ID) XCTAssertEqual(closeAction.request, .none) XCTAssertEqual(closeAction.connection, .scheduleTimeoutTimer(conn2ID, on: el1)) - - + let shutdownAction = http2State.shutdown() XCTAssertEqual(shutdownAction.request, .none) XCTAssertEqual(shutdownAction.connection, .cleanupConnections(.init( @@ -321,7 +319,7 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { cancel: [], connectBackoff: [] ), isShutdown: .no)) - + let releaseAction = http2State.http1ConnectionReleased(conn1ID) XCTAssertEqual(releaseAction.request, .none) XCTAssertEqual(releaseAction.connection, .closeConnection(conn1, isShutdown: .yes(unclean: true))) From 64ae9ff397dc5af2e77b406f93a48940ba596037 Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Mon, 4 Oct 2021 11:19:27 +0200 Subject: [PATCH 10/15] test immediat request execution on required event loop --- .../HTTPConnectionPool+HTTP2Connections.swift | 5 +++ ...HTTPConnectionPool+HTTP2StateMachine.swift | 36 +++++++++++++------ ...onnectionPool+HTTP2StateMachineTests.swift | 17 +++++++++ 3 files changed, 47 insertions(+), 11 deletions(-) diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift index 6e82773ab..ff2ffbbf8 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift @@ -576,6 +576,11 @@ extension HTTPConnectionPool { /// true if no stream is leased var isIdle: Bool } + + struct LeasedStreamContext { + /// true if the connection was idle before leasing the stream + var wasIdle: Bool + } struct LeasedStreamContext { /// true if the connection was idle before leasing the stream diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift index 3298ef99d..5a79f93c5 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift @@ -102,12 +102,19 @@ extension HTTPConnectionPool { _ request: Request, onRequired eventLoop: EventLoop ) -> Action { - if let connection = self.connections.leaseStream(onRequired: eventLoop) { + if let (connection, context) = self.connections.leaseStream(onRequired: eventLoop) { /// 1. we have a stream available and can execute the request immediately - return .init( - request: .executeRequest(request, connection, cancelTimeout: false), - connection: .cancelTimeoutTimer(connection.id) - ) + if context.wasIdle { + return .init( + request: .executeRequest(request, connection, cancelTimeout: false), + connection: .cancelTimeoutTimer(connection.id) + ) + } else { + return .init( + request: .executeRequest(request, connection, cancelTimeout: false), + connection: .none + ) + } } /// 2. No available stream so we definitely need to wait until we have one self.requests.push(request) @@ -132,12 +139,19 @@ extension HTTPConnectionPool { _ request: Request, onPreferred eventLoop: EventLoop ) -> Action { - if let connection = self.connections.leaseStream(onPreferred: eventLoop) { + if let (connection, context) = self.connections.leaseStream(onPreferred: eventLoop) { /// 1. we have a stream available and can execute the request immediately - return .init( - request: .executeRequest(request, connection, cancelTimeout: false), - connection: .cancelTimeoutTimer(connection.id) - ) + if context.wasIdle { + return .init( + request: .executeRequest(request, connection, cancelTimeout: false), + connection: .cancelTimeoutTimer(connection.id) + ) + } else { + return .init( + request: .executeRequest(request, connection, cancelTimeout: false), + connection: .none + ) + } } /// 2. No available stream so we definitely need to wait until we have one self.requests.push(request) @@ -182,7 +196,7 @@ extension HTTPConnectionPool { let remainingAvailableStreams = context.availableStreams - requestsToExecute.count // use the remaining available streams for requests without a required event loop requestsToExecute += self.requests.popFirst(max: remainingAvailableStreams, for: nil) - let connection = self.connections.leaseStreams(at: index, count: requestsToExecute.count) + let (connection, _) = self.connections.leaseStreams(at: index, count: requestsToExecute.count) let requestAction = { () -> RequestAction in if requestsToExecute.isEmpty { diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift index 7cd6f5b7c..8648ff1be 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift @@ -90,6 +90,23 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { } XCTAssertTrue(queuer.isEmpty) + /// closing streams without any queued requests shouldn't do anything if it's *not* the last stream + for _ in 0..<4 { + let action = state.http2ConnectionStreamClosed(connID) + XCTAssertEqual(action.request, .none) + XCTAssertEqual(action.connection, .none) + } + + /// 4 streams are available and therefore request should be executed immediately + for _ in 0..<4 { + let mockRequest = MockHTTPRequest(eventLoop: el1, requiresEventLoopForChannel: true) + let request = HTTPConnectionPool.Request(mockRequest) + let action = state.executeRequest(request) + + XCTAssertEqual(action.connection, .none) + XCTAssertEqual(action.request, .executeRequest(request, conn, cancelTimeout: false)) + } + /// closing streams without any queued requests shouldn't do anything if it's *not* the last stream for _ in 0..<4 { let action = state.http2ConnectionStreamClosed(connID) From a1fc238e6044aa96d0f7ef947b9d2b5d9392bda1 Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Mon, 4 Oct 2021 12:47:00 +0200 Subject: [PATCH 11/15] fix merge conflicts --- .../State Machine/HTTPConnectionPool+HTTP2Connections.swift | 5 ----- 1 file changed, 5 deletions(-) diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift index ff2ffbbf8..6e82773ab 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift @@ -576,11 +576,6 @@ extension HTTPConnectionPool { /// true if no stream is leased var isIdle: Bool } - - struct LeasedStreamContext { - /// true if the connection was idle before leasing the stream - var wasIdle: Bool - } struct LeasedStreamContext { /// true if the connection was idle before leasing the stream From e793d7a5ad9714844d79e2464da45883a40b8bd6 Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Mon, 4 Oct 2021 19:17:57 +0200 Subject: [PATCH 12/15] add more tests --- ...onnectionPool+HTTP2StateMachineTests.swift | 95 +++++++++++++++++++ 1 file changed, 95 insertions(+) diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift index 8648ff1be..58d116890 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift @@ -341,4 +341,99 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { XCTAssertEqual(releaseAction.request, .none) XCTAssertEqual(releaseAction.connection, .closeConnection(conn1, isShutdown: .yes(unclean: true))) } + + func testSchedulingAndCancelingOfIdleTimeout() { + let elg = EmbeddedEventLoopGroup(loops: 1) + let el1 = elg.next() + + // establish one idle http2 connection + let idGenerator = HTTPConnectionPool.Connection.ID.Generator() + var http1Conns = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: idGenerator) + let conn1ID = http1Conns.createNewConnection(on: el1) + var state = HTTPConnectionPool.HTTP2StateMachine(idGenerator: idGenerator) + let migrationAction = state.migrateConnectionsFromHTTP1( + connections: http1Conns, + requests: HTTPConnectionPool.RequestQueue() + ) + XCTAssertEqual(migrationAction, .none) + let conn1 = HTTPConnectionPool.Connection.__testOnly_connection(id: conn1ID, eventLoop: el1) + let connectAction = state.newHTTP2ConnectionEstablished(conn1, maxConcurrentStreams: 100) + XCTAssertEqual(connectAction.request, .none) + XCTAssertEqual(connectAction.connection, .scheduleTimeoutTimer(conn1ID, on: el1)) + + // execute request on idle connection + let mockRequest1 = MockHTTPRequest(eventLoop: el1) + let request1 = HTTPConnectionPool.Request(mockRequest1) + let request1Action = state.executeRequest(request1) + XCTAssertEqual(request1Action.request, .executeRequest(request1, conn1, cancelTimeout: false)) + XCTAssertEqual(request1Action.connection, .cancelTimeoutTimer(conn1ID)) + + // close stream + let closeStream1Action = state.http2ConnectionStreamClosed(conn1ID) + XCTAssertEqual(closeStream1Action.request, .none) + XCTAssertEqual(closeStream1Action.connection, .scheduleTimeoutTimer(conn1ID, on: el1)) + + // execute request on idle connection with required event loop + let mockRequest2 = MockHTTPRequest(eventLoop: el1, requiresEventLoopForChannel: true) + let request2 = HTTPConnectionPool.Request(mockRequest2) + let request2Action = state.executeRequest(request2) + XCTAssertEqual(request2Action.request, .executeRequest(request2, conn1, cancelTimeout: false)) + XCTAssertEqual(request2Action.connection, .cancelTimeoutTimer(conn1ID)) + + // close stream + let closeStream2Action = state.http2ConnectionStreamClosed(conn1ID) + XCTAssertEqual(closeStream2Action.request, .none) + XCTAssertEqual(closeStream2Action.connection, .scheduleTimeoutTimer(conn1ID, on: el1)) + } + + func testConnectionTimeout() { + let elg = EmbeddedEventLoopGroup(loops: 1) + let el1 = elg.next() + + // establish one idle http2 connection + let idGenerator = HTTPConnectionPool.Connection.ID.Generator() + var http1Conns = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: idGenerator) + let conn1ID = http1Conns.createNewConnection(on: el1) + var state = HTTPConnectionPool.HTTP2StateMachine(idGenerator: idGenerator) + let migrationAction = state.migrateConnectionsFromHTTP1( + connections: http1Conns, + requests: HTTPConnectionPool.RequestQueue() + ) + XCTAssertEqual(migrationAction, .none) + let conn1 = HTTPConnectionPool.Connection.__testOnly_connection(id: conn1ID, eventLoop: el1) + let connectAction = state.newHTTP2ConnectionEstablished(conn1, maxConcurrentStreams: 100) + XCTAssertEqual(connectAction.request, .none) + XCTAssertEqual(connectAction.connection, .scheduleTimeoutTimer(conn1ID, on: el1)) + + + // let the connection timeout + let timeoutAction = state.connectionIdleTimeout(conn1ID) + XCTAssertEqual(timeoutAction.request, .none) + XCTAssertEqual(timeoutAction.connection, .closeConnection(conn1, isShutdown: .no)) + } + + func testConnectionEstablishmentFailure() { + struct SomeError: Error, Equatable {} + + let elg = EmbeddedEventLoopGroup(loops: 1) + let el1 = elg.next() + + // establish one idle http2 connection + let idGenerator = HTTPConnectionPool.Connection.ID.Generator() + var http1Conns = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: idGenerator) + let conn1ID = http1Conns.createNewConnection(on: el1) + var state = HTTPConnectionPool.HTTP2StateMachine(idGenerator: idGenerator) + let migrationAction = state.migrateConnectionsFromHTTP1( + connections: http1Conns, + requests: HTTPConnectionPool.RequestQueue() + ) + XCTAssertEqual(migrationAction, .none) + + let action = state.failedToCreateNewConnection(SomeError(), connectionID: conn1ID) + XCTAssertEqual(action.request, .none) + guard case .scheduleBackoffTimer(conn1ID, _, let eventLoop) = action.connection else { + return XCTFail("unexpected connection action \(action.connection)") + } + XCTAssertEqual(eventLoop.id, el1.id) + } } From b00e4757052ceb344f63611154efe3ce2024dda9 Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Mon, 4 Oct 2021 19:19:40 +0200 Subject: [PATCH 13/15] run swift-format and generate_linux_tests.rb --- ...onPool+HTTP2StateMachineTests+XCTest.swift | 4 ++ ...onnectionPool+HTTP2StateMachineTests.swift | 67 ++++++++++++++----- 2 files changed, 55 insertions(+), 16 deletions(-) diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests+XCTest.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests+XCTest.swift index aca5387bd..85834e5de 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests+XCTest.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests+XCTest.swift @@ -30,6 +30,10 @@ extension HTTPConnectionPool_HTTP2StateMachineTests { ("testCancelRequestWorks", testCancelRequestWorks), ("testExecuteOnShuttingDownPool", testExecuteOnShuttingDownPool), ("testHTTP1ToHTTP2MigrationAndShutdownIfFirstConnectionIsHTTP1", testHTTP1ToHTTP2MigrationAndShutdownIfFirstConnectionIsHTTP1), + ("testSchedulingAndCancelingOfIdleTimeout", testSchedulingAndCancelingOfIdleTimeout), + ("testConnectionTimeout", testConnectionTimeout), + ("testConnectionEstablishmentFailure", testConnectionEstablishmentFailure), + ("testGoAway", testGoAway), ] } } diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift index 58d116890..97a29e9cb 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift @@ -96,7 +96,7 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { XCTAssertEqual(action.request, .none) XCTAssertEqual(action.connection, .none) } - + /// 4 streams are available and therefore request should be executed immediately for _ in 0..<4 { let mockRequest = MockHTTPRequest(eventLoop: el1, requiresEventLoopForChannel: true) @@ -106,7 +106,7 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { XCTAssertEqual(action.connection, .none) XCTAssertEqual(action.request, .executeRequest(request, conn, cancelTimeout: false)) } - + /// closing streams without any queued requests shouldn't do anything if it's *not* the last stream for _ in 0..<4 { let action = state.http2ConnectionStreamClosed(connID) @@ -341,11 +341,11 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { XCTAssertEqual(releaseAction.request, .none) XCTAssertEqual(releaseAction.connection, .closeConnection(conn1, isShutdown: .yes(unclean: true))) } - + func testSchedulingAndCancelingOfIdleTimeout() { let elg = EmbeddedEventLoopGroup(loops: 1) let el1 = elg.next() - + // establish one idle http2 connection let idGenerator = HTTPConnectionPool.Connection.ID.Generator() var http1Conns = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: idGenerator) @@ -360,36 +360,36 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { let connectAction = state.newHTTP2ConnectionEstablished(conn1, maxConcurrentStreams: 100) XCTAssertEqual(connectAction.request, .none) XCTAssertEqual(connectAction.connection, .scheduleTimeoutTimer(conn1ID, on: el1)) - + // execute request on idle connection let mockRequest1 = MockHTTPRequest(eventLoop: el1) let request1 = HTTPConnectionPool.Request(mockRequest1) let request1Action = state.executeRequest(request1) XCTAssertEqual(request1Action.request, .executeRequest(request1, conn1, cancelTimeout: false)) XCTAssertEqual(request1Action.connection, .cancelTimeoutTimer(conn1ID)) - + // close stream let closeStream1Action = state.http2ConnectionStreamClosed(conn1ID) XCTAssertEqual(closeStream1Action.request, .none) XCTAssertEqual(closeStream1Action.connection, .scheduleTimeoutTimer(conn1ID, on: el1)) - + // execute request on idle connection with required event loop let mockRequest2 = MockHTTPRequest(eventLoop: el1, requiresEventLoopForChannel: true) let request2 = HTTPConnectionPool.Request(mockRequest2) let request2Action = state.executeRequest(request2) XCTAssertEqual(request2Action.request, .executeRequest(request2, conn1, cancelTimeout: false)) XCTAssertEqual(request2Action.connection, .cancelTimeoutTimer(conn1ID)) - + // close stream let closeStream2Action = state.http2ConnectionStreamClosed(conn1ID) XCTAssertEqual(closeStream2Action.request, .none) XCTAssertEqual(closeStream2Action.connection, .scheduleTimeoutTimer(conn1ID, on: el1)) } - + func testConnectionTimeout() { let elg = EmbeddedEventLoopGroup(loops: 1) let el1 = elg.next() - + // establish one idle http2 connection let idGenerator = HTTPConnectionPool.Connection.ID.Generator() var http1Conns = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: idGenerator) @@ -404,20 +404,19 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { let connectAction = state.newHTTP2ConnectionEstablished(conn1, maxConcurrentStreams: 100) XCTAssertEqual(connectAction.request, .none) XCTAssertEqual(connectAction.connection, .scheduleTimeoutTimer(conn1ID, on: el1)) - - + // let the connection timeout let timeoutAction = state.connectionIdleTimeout(conn1ID) XCTAssertEqual(timeoutAction.request, .none) XCTAssertEqual(timeoutAction.connection, .closeConnection(conn1, isShutdown: .no)) } - + func testConnectionEstablishmentFailure() { struct SomeError: Error, Equatable {} - + let elg = EmbeddedEventLoopGroup(loops: 1) let el1 = elg.next() - + // establish one idle http2 connection let idGenerator = HTTPConnectionPool.Connection.ID.Generator() var http1Conns = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: idGenerator) @@ -428,7 +427,7 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { requests: HTTPConnectionPool.RequestQueue() ) XCTAssertEqual(migrationAction, .none) - + let action = state.failedToCreateNewConnection(SomeError(), connectionID: conn1ID) XCTAssertEqual(action.request, .none) guard case .scheduleBackoffTimer(conn1ID, _, let eventLoop) = action.connection else { @@ -436,4 +435,40 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { } XCTAssertEqual(eventLoop.id, el1.id) } + + func testGoAway() { + let elg = EmbeddedEventLoopGroup(loops: 1) + let el1 = elg.next() + + // establish one idle http2 connection + let idGenerator = HTTPConnectionPool.Connection.ID.Generator() + var http1Conns = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: idGenerator) + let conn1ID = http1Conns.createNewConnection(on: el1) + var state = HTTPConnectionPool.HTTP2StateMachine(idGenerator: idGenerator) + let migrationAction = state.migrateConnectionsFromHTTP1( + connections: http1Conns, + requests: HTTPConnectionPool.RequestQueue() + ) + XCTAssertEqual(migrationAction, .none) + let conn1 = HTTPConnectionPool.Connection.__testOnly_connection(id: conn1ID, eventLoop: el1) + let connectAction = state.newHTTP2ConnectionEstablished(conn1, maxConcurrentStreams: 100) + XCTAssertEqual(connectAction.request, .none) + XCTAssertEqual(connectAction.connection, .scheduleTimeoutTimer(conn1ID, on: el1)) + + // execute request on idle connection + let mockRequest1 = MockHTTPRequest(eventLoop: el1) + let request1 = HTTPConnectionPool.Request(mockRequest1) + let request1Action = state.executeRequest(request1) + XCTAssertEqual(request1Action.request, .executeRequest(request1, conn1, cancelTimeout: false)) + XCTAssertEqual(request1Action.connection, .cancelTimeoutTimer(conn1ID)) + + let goAwayAction = state.http2ConnectionGoAwayReceived(conn1ID) + XCTAssertEqual(goAwayAction.request, .none) + XCTAssertEqual(goAwayAction.connection, .none) + + // close stream + let closeStream1Action = state.http2ConnectionStreamClosed(conn1ID) + XCTAssertEqual(closeStream1Action.request, .none) + XCTAssertEqual(closeStream1Action.connection, .closeConnection(conn1, isShutdown: .no)) + } } From 2e15b5871dbe4c268cb45074bfc32196cd79e91e Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Mon, 4 Oct 2021 19:34:12 +0200 Subject: [PATCH 14/15] Revert "run swift-format and generate_linux_tests.rb" This reverts commit b00e4757052ceb344f63611154efe3ce2024dda9. --- ...onnectionPool+HTTP2StateMachineTests.swift | 31 ++++++++++--------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift index 97a29e9cb..9d5d4718e 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift @@ -96,7 +96,7 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { XCTAssertEqual(action.request, .none) XCTAssertEqual(action.connection, .none) } - + /// 4 streams are available and therefore request should be executed immediately for _ in 0..<4 { let mockRequest = MockHTTPRequest(eventLoop: el1, requiresEventLoopForChannel: true) @@ -106,7 +106,7 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { XCTAssertEqual(action.connection, .none) XCTAssertEqual(action.request, .executeRequest(request, conn, cancelTimeout: false)) } - + /// closing streams without any queued requests shouldn't do anything if it's *not* the last stream for _ in 0..<4 { let action = state.http2ConnectionStreamClosed(connID) @@ -341,11 +341,11 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { XCTAssertEqual(releaseAction.request, .none) XCTAssertEqual(releaseAction.connection, .closeConnection(conn1, isShutdown: .yes(unclean: true))) } - + func testSchedulingAndCancelingOfIdleTimeout() { let elg = EmbeddedEventLoopGroup(loops: 1) let el1 = elg.next() - + // establish one idle http2 connection let idGenerator = HTTPConnectionPool.Connection.ID.Generator() var http1Conns = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: idGenerator) @@ -360,36 +360,36 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { let connectAction = state.newHTTP2ConnectionEstablished(conn1, maxConcurrentStreams: 100) XCTAssertEqual(connectAction.request, .none) XCTAssertEqual(connectAction.connection, .scheduleTimeoutTimer(conn1ID, on: el1)) - + // execute request on idle connection let mockRequest1 = MockHTTPRequest(eventLoop: el1) let request1 = HTTPConnectionPool.Request(mockRequest1) let request1Action = state.executeRequest(request1) XCTAssertEqual(request1Action.request, .executeRequest(request1, conn1, cancelTimeout: false)) XCTAssertEqual(request1Action.connection, .cancelTimeoutTimer(conn1ID)) - + // close stream let closeStream1Action = state.http2ConnectionStreamClosed(conn1ID) XCTAssertEqual(closeStream1Action.request, .none) XCTAssertEqual(closeStream1Action.connection, .scheduleTimeoutTimer(conn1ID, on: el1)) - + // execute request on idle connection with required event loop let mockRequest2 = MockHTTPRequest(eventLoop: el1, requiresEventLoopForChannel: true) let request2 = HTTPConnectionPool.Request(mockRequest2) let request2Action = state.executeRequest(request2) XCTAssertEqual(request2Action.request, .executeRequest(request2, conn1, cancelTimeout: false)) XCTAssertEqual(request2Action.connection, .cancelTimeoutTimer(conn1ID)) - + // close stream let closeStream2Action = state.http2ConnectionStreamClosed(conn1ID) XCTAssertEqual(closeStream2Action.request, .none) XCTAssertEqual(closeStream2Action.connection, .scheduleTimeoutTimer(conn1ID, on: el1)) } - + func testConnectionTimeout() { let elg = EmbeddedEventLoopGroup(loops: 1) let el1 = elg.next() - + // establish one idle http2 connection let idGenerator = HTTPConnectionPool.Connection.ID.Generator() var http1Conns = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: idGenerator) @@ -404,19 +404,20 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { let connectAction = state.newHTTP2ConnectionEstablished(conn1, maxConcurrentStreams: 100) XCTAssertEqual(connectAction.request, .none) XCTAssertEqual(connectAction.connection, .scheduleTimeoutTimer(conn1ID, on: el1)) - + + // let the connection timeout let timeoutAction = state.connectionIdleTimeout(conn1ID) XCTAssertEqual(timeoutAction.request, .none) XCTAssertEqual(timeoutAction.connection, .closeConnection(conn1, isShutdown: .no)) } - + func testConnectionEstablishmentFailure() { struct SomeError: Error, Equatable {} - + let elg = EmbeddedEventLoopGroup(loops: 1) let el1 = elg.next() - + // establish one idle http2 connection let idGenerator = HTTPConnectionPool.Connection.ID.Generator() var http1Conns = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: idGenerator) @@ -427,7 +428,7 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { requests: HTTPConnectionPool.RequestQueue() ) XCTAssertEqual(migrationAction, .none) - + let action = state.failedToCreateNewConnection(SomeError(), connectionID: conn1ID) XCTAssertEqual(action.request, .none) guard case .scheduleBackoffTimer(conn1ID, _, let eventLoop) = action.connection else { From 48d4be4b04346142f282ff709d0388e0be8f8585 Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Tue, 5 Oct 2021 10:39:48 +0200 Subject: [PATCH 15/15] remove testGoAway for now, will add & fix it in another PR --- ...onPool+HTTP2StateMachineTests+XCTest.swift | 1 - ...onnectionPool+HTTP2StateMachineTests.swift | 67 +++++-------------- 2 files changed, 15 insertions(+), 53 deletions(-) diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests+XCTest.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests+XCTest.swift index 85834e5de..de1ca4667 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests+XCTest.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests+XCTest.swift @@ -33,7 +33,6 @@ extension HTTPConnectionPool_HTTP2StateMachineTests { ("testSchedulingAndCancelingOfIdleTimeout", testSchedulingAndCancelingOfIdleTimeout), ("testConnectionTimeout", testConnectionTimeout), ("testConnectionEstablishmentFailure", testConnectionEstablishmentFailure), - ("testGoAway", testGoAway), ] } } diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift index 9d5d4718e..bbef727e0 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2StateMachineTests.swift @@ -96,7 +96,7 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { XCTAssertEqual(action.request, .none) XCTAssertEqual(action.connection, .none) } - + /// 4 streams are available and therefore request should be executed immediately for _ in 0..<4 { let mockRequest = MockHTTPRequest(eventLoop: el1, requiresEventLoopForChannel: true) @@ -106,7 +106,7 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { XCTAssertEqual(action.connection, .none) XCTAssertEqual(action.request, .executeRequest(request, conn, cancelTimeout: false)) } - + /// closing streams without any queued requests shouldn't do anything if it's *not* the last stream for _ in 0..<4 { let action = state.http2ConnectionStreamClosed(connID) @@ -341,11 +341,11 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { XCTAssertEqual(releaseAction.request, .none) XCTAssertEqual(releaseAction.connection, .closeConnection(conn1, isShutdown: .yes(unclean: true))) } - + func testSchedulingAndCancelingOfIdleTimeout() { let elg = EmbeddedEventLoopGroup(loops: 1) let el1 = elg.next() - + // establish one idle http2 connection let idGenerator = HTTPConnectionPool.Connection.ID.Generator() var http1Conns = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: idGenerator) @@ -360,36 +360,36 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { let connectAction = state.newHTTP2ConnectionEstablished(conn1, maxConcurrentStreams: 100) XCTAssertEqual(connectAction.request, .none) XCTAssertEqual(connectAction.connection, .scheduleTimeoutTimer(conn1ID, on: el1)) - + // execute request on idle connection let mockRequest1 = MockHTTPRequest(eventLoop: el1) let request1 = HTTPConnectionPool.Request(mockRequest1) let request1Action = state.executeRequest(request1) XCTAssertEqual(request1Action.request, .executeRequest(request1, conn1, cancelTimeout: false)) XCTAssertEqual(request1Action.connection, .cancelTimeoutTimer(conn1ID)) - + // close stream let closeStream1Action = state.http2ConnectionStreamClosed(conn1ID) XCTAssertEqual(closeStream1Action.request, .none) XCTAssertEqual(closeStream1Action.connection, .scheduleTimeoutTimer(conn1ID, on: el1)) - + // execute request on idle connection with required event loop let mockRequest2 = MockHTTPRequest(eventLoop: el1, requiresEventLoopForChannel: true) let request2 = HTTPConnectionPool.Request(mockRequest2) let request2Action = state.executeRequest(request2) XCTAssertEqual(request2Action.request, .executeRequest(request2, conn1, cancelTimeout: false)) XCTAssertEqual(request2Action.connection, .cancelTimeoutTimer(conn1ID)) - + // close stream let closeStream2Action = state.http2ConnectionStreamClosed(conn1ID) XCTAssertEqual(closeStream2Action.request, .none) XCTAssertEqual(closeStream2Action.connection, .scheduleTimeoutTimer(conn1ID, on: el1)) } - + func testConnectionTimeout() { let elg = EmbeddedEventLoopGroup(loops: 1) let el1 = elg.next() - + // establish one idle http2 connection let idGenerator = HTTPConnectionPool.Connection.ID.Generator() var http1Conns = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: idGenerator) @@ -404,20 +404,19 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { let connectAction = state.newHTTP2ConnectionEstablished(conn1, maxConcurrentStreams: 100) XCTAssertEqual(connectAction.request, .none) XCTAssertEqual(connectAction.connection, .scheduleTimeoutTimer(conn1ID, on: el1)) - - + // let the connection timeout let timeoutAction = state.connectionIdleTimeout(conn1ID) XCTAssertEqual(timeoutAction.request, .none) XCTAssertEqual(timeoutAction.connection, .closeConnection(conn1, isShutdown: .no)) } - + func testConnectionEstablishmentFailure() { struct SomeError: Error, Equatable {} - + let elg = EmbeddedEventLoopGroup(loops: 1) let el1 = elg.next() - + // establish one idle http2 connection let idGenerator = HTTPConnectionPool.Connection.ID.Generator() var http1Conns = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: idGenerator) @@ -428,7 +427,7 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { requests: HTTPConnectionPool.RequestQueue() ) XCTAssertEqual(migrationAction, .none) - + let action = state.failedToCreateNewConnection(SomeError(), connectionID: conn1ID) XCTAssertEqual(action.request, .none) guard case .scheduleBackoffTimer(conn1ID, _, let eventLoop) = action.connection else { @@ -436,40 +435,4 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase { } XCTAssertEqual(eventLoop.id, el1.id) } - - func testGoAway() { - let elg = EmbeddedEventLoopGroup(loops: 1) - let el1 = elg.next() - - // establish one idle http2 connection - let idGenerator = HTTPConnectionPool.Connection.ID.Generator() - var http1Conns = HTTPConnectionPool.HTTP1Connections(maximumConcurrentConnections: 8, generator: idGenerator) - let conn1ID = http1Conns.createNewConnection(on: el1) - var state = HTTPConnectionPool.HTTP2StateMachine(idGenerator: idGenerator) - let migrationAction = state.migrateConnectionsFromHTTP1( - connections: http1Conns, - requests: HTTPConnectionPool.RequestQueue() - ) - XCTAssertEqual(migrationAction, .none) - let conn1 = HTTPConnectionPool.Connection.__testOnly_connection(id: conn1ID, eventLoop: el1) - let connectAction = state.newHTTP2ConnectionEstablished(conn1, maxConcurrentStreams: 100) - XCTAssertEqual(connectAction.request, .none) - XCTAssertEqual(connectAction.connection, .scheduleTimeoutTimer(conn1ID, on: el1)) - - // execute request on idle connection - let mockRequest1 = MockHTTPRequest(eventLoop: el1) - let request1 = HTTPConnectionPool.Request(mockRequest1) - let request1Action = state.executeRequest(request1) - XCTAssertEqual(request1Action.request, .executeRequest(request1, conn1, cancelTimeout: false)) - XCTAssertEqual(request1Action.connection, .cancelTimeoutTimer(conn1ID)) - - let goAwayAction = state.http2ConnectionGoAwayReceived(conn1ID) - XCTAssertEqual(goAwayAction.request, .none) - XCTAssertEqual(goAwayAction.connection, .none) - - // close stream - let closeStream1Action = state.http2ConnectionStreamClosed(conn1ID) - XCTAssertEqual(closeStream1Action.request, .none) - XCTAssertEqual(closeStream1Action.connection, .closeConnection(conn1, isShutdown: .no)) - } }