From 609e533f14d2db1071e67712122a1db6b466bf08 Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Mon, 27 Sep 2021 09:29:59 +0200 Subject: [PATCH 1/5] [HTTP2ConnectionPool] added `HTTP2Connections` struct --- .../HTTPConnectionPool+HTTP2Connections.swift | 553 ++++++++++++++++++ ...tionPool+HTTP2ConnectionsTest+XCTest.swift | 45 ++ ...PConnectionPool+HTTP2ConnectionsTest.swift | 511 ++++++++++++++++ Tests/LinuxMain.swift | 1 + 4 files changed, 1110 insertions(+) create mode 100644 Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift create mode 100644 Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2ConnectionsTest+XCTest.swift create mode 100644 Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2ConnectionsTest.swift diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift new file mode 100644 index 000000000..fb8629135 --- /dev/null +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift @@ -0,0 +1,553 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the AsyncHTTPClient open source project +// +// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import NIOCore + +extension HTTPConnectionPool { + private struct HTTP2ConnectionState { + private enum State { + /// the pool is establishing a connection. Valid transitions are to: .backingOff, .active and .closed + case starting + /// the connection is waiting to retry to establish a connection. Transition to .closed. From .closed + /// a new connection state must be created for a retry. + case backingOff + /// the connection is active and is able to run requests. Valid transitions to: .draining and .closed + case active(Connection, maxStreams: Int, usedStreams: Int, lastIdle: NIODeadline) + /// the connection is active and is running requests. No new requests must be scheduled. + /// Valid transitions to: .draining and .closed + case draining(Connection, maxStreams: Int, usedStreams: Int) + /// the connection is closed + case closed + } + + var isActive: Bool { + switch self.state { + case .starting, .backingOff, .draining, .closed: + return false + case .active: + return true + } + } + + var isStartingOrBackingOff: Bool { + switch self.state { + case .starting, .backingOff: + return true + case .active, .draining, .closed: + return false + } + } + + /// A request can be scheduled on the connection + var isAvailable: Bool { + switch self.state { + case .active(_, let maxStreams, let usedStreams, _): + return usedStreams < maxStreams + case .starting, .backingOff, .draining, .closed: + return false + } + } + + /// The connection is active, but there are no running requests on the connection. + /// Every idle connection is available, but not every available connection is idle. + var isIdle: Bool { + switch self.state { + case .active(_, _, let usedStreams, _): + return usedStreams == 0 + case .starting, .backingOff, .draining, .closed: + return false + } + } + + var isClosed: Bool { + switch self.state { + case .starting, .backingOff, .draining, .active: + return false + case .closed: + return true + } + } + + private var state: State + let eventLoop: EventLoop + let connectionID: Connection.ID + + /// should be called after the connection was successfully established + /// - Parameters: + /// - conn: HTTP2 connection + /// - maxStreams: max streams settings from the server + /// - Returns: number of available streams which can be leased + mutating func connected(_ conn: Connection, maxStreams: Int) -> Int { + switch self.state { + case .active, .draining, .backingOff, .closed: + preconditionFailure("Invalid state: \(self.state)") + case .starting: + self.state = .active(conn, maxStreams: maxStreams, usedStreams: 0, lastIdle: .now()) + return maxStreams + } + } + + /// should be called after receiving new http2 settings from the server + /// - Parameters: + /// - maxStreams: max streams settings from the server + /// - Returns: number of available streams which can be leased + mutating func newMaxConcurrentStreams(_ maxStreams: Int) -> Int { + switch self.state { + case .starting, .backingOff, .closed: + preconditionFailure("Invalid state for updating max concurrent streams: \(self.state)") + case .draining(let conn, _, let usedStreams): + self.state = .draining(conn, maxStreams: maxStreams, usedStreams: usedStreams) + return 0 + case .active(let conn, _, let usedStreams, let lastIdle): + self.state = .active(conn, maxStreams: maxStreams, usedStreams: usedStreams, lastIdle: lastIdle) + return max(maxStreams - usedStreams, 0) + } + } + + mutating func goAwayReceived() -> EventLoop { + switch self.state { + case .starting, .backingOff, .closed: + preconditionFailure("Invalid state for draining a connection: \(self.state)") + case .draining(let conn, _, _): + // we could potentially receive another go away while we drain all active streams and we just ignore it + return conn.eventLoop + case .active(let conn, let maxStreams, let usedStreams, _): + self.state = .draining(conn, maxStreams: maxStreams, usedStreams: usedStreams) + return conn.eventLoop + } + } + + /// The connection failed to start + mutating func failedToConnect() { + switch self.state { + case .starting: + self.state = .backingOff + case .backingOff, .active, .draining, .closed: + preconditionFailure("Invalid state: \(self.state)") + } + } + + mutating func fail() { + switch self.state { + case .starting, .active, .backingOff, .draining: + self.state = .closed + case .closed: + preconditionFailure("Invalid state: \(self.state)") + } + } + + mutating func lease(_ count: Int) -> Connection { + switch self.state { + case .starting, .backingOff, .draining, .closed: + preconditionFailure("Invalid state for leasing a stream: \(self.state)") + case .active(let conn, let maxStreams, var usedStreams, let lastIdle): + usedStreams += count + precondition(usedStreams <= maxStreams, "tried to lease a connection which is not available") + self.state = .active(conn, maxStreams: maxStreams, usedStreams: usedStreams, lastIdle: lastIdle) + return conn + } + } + + /// should be called after a request has finished and the stream can be used again for a new request + /// - Returns: number of available streams which can be leased + mutating func release() -> Int { + switch self.state { + case .starting, .backingOff, .closed: + preconditionFailure("Invalid state: \(self.state)") + case .draining(let conn, let maxStreams, var usedStreams): + usedStreams -= 1 + assert(usedStreams >= 0) + self.state = .draining(conn, maxStreams: maxStreams, usedStreams: usedStreams) + return 0 + case .active(let conn, let maxStreams, var usedStreams, var lastIdle): + usedStreams -= 1 + assert(usedStreams >= 0) + if usedStreams == 0 { + lastIdle = .now() + } + self.state = .active(conn, maxStreams: maxStreams, usedStreams: usedStreams, lastIdle: lastIdle) + return max(maxStreams - usedStreams, 0) + } + } + + mutating func close() -> Connection { + switch self.state { + case .active(let conn, _, 0, _): + self.state = .closed + return conn + case .starting, .backingOff, .draining, .closed, .active: + preconditionFailure("Invalid state for closing a connection: \(self.state)") + } + } + + enum CleanupAction { + case removeConnection + case keepConnection + } + + /// Cleanup the current connection for shutdown. + /// + /// This method is called, when the connections shall shutdown. Depending on the state + /// the connection is in, it adds itself to one of the arrays that are used to signal shutdown + /// intent to the underlying connections. Connections that are backing off can be easily + /// dropped (since, we only need to cancel the backoff timer), connections that are leased + /// need to be cancelled (notifying the `ChannelHandler` that we want to cancel the + /// running request), connections that are idle can be closed right away. Sadly we can't + /// cancel connection starts right now. For this reason we need to wait for them to succeed + /// or fail until we finalize the shutdown. + /// + /// - Parameter context: A cleanup context to add the connection to based on its state. + /// - Returns: A cleanup action indicating if the connection can be removed from the + /// connection list. + func cleanup(_ context: inout CleanupContext) -> CleanupAction { + switch self.state { + case .starting: + return .keepConnection + case .backingOff: + context.connectBackoff.append(self.connectionID) + return .removeConnection + case .active(let connection, _, let usedStreams, _): + if usedStreams <= 0 { + context.close.append(connection) + return .removeConnection + } else { + context.cancel.append(connection) + return .keepConnection + } + case .draining(let connection, _, _): + context.cancel.append(connection) + return .keepConnection + case .closed: + preconditionFailure("Unexpected state for cleanup: Did not expect to have closed connections in the state machine.") + } + } + + func addStats(into stats: inout HTTP2Connections.Stats) { + if self.isIdle { + stats.idleConnections &+= 1 + } + switch self.state { + case .starting: + stats.startingConnections &+= 1 + case .backingOff: + stats.backingOffConnections &+= 1 + case .active(_, let maxStreams, let usedStreams, _): + stats.availableStreams += max(maxStreams - usedStreams, 0) + stats.leasedStreams += usedStreams + stats.availableConnections &+= 1 + case .draining(_, _, let usedStreams): + stats.drainingConnections &+= 1 + stats.leasedStreams += usedStreams + case .closed: + break + } + } + + init(connectionID: Connection.ID, eventLoop: EventLoop) { + self.connectionID = connectionID + self.eventLoop = eventLoop + self.state = .starting + } + } + + struct HTTP2Connections { + /// A connectionID generator. + private let generator: Connection.ID.Generator + /// The connections states + private var connections: [HTTP2ConnectionState] + + var isEmpty: Bool { + self.connections.isEmpty + } + + var stats: Stats { + self.connections.reduce(into: Stats()) { stats, connection in + connection.addStats(into: &stats) + } + } + + init(generator: Connection.ID.Generator) { + self.generator = generator + self.connections = [] + } + + // MARK: Migration + + /// we only handle starting and backing off connection here. + /// All running connections must be handled by the enclosing state machine + /// - Parameters: + /// - starting: starting HTTP connections from previous state machine + /// - backingOff: backing off HTTP connections from previous state machine + mutating func migrateConnections( + starting: [(Connection.ID, EventLoop)], + backingOff: [(Connection.ID, EventLoop)] + ) { + for (connectionID, eventLoop) in starting { + let newConnection = HTTP2ConnectionState(connectionID: connectionID, eventLoop: eventLoop) + self.connections.append(newConnection) + } + + for (connectionID, eventLoop) in backingOff { + var backingOffConnection = HTTP2ConnectionState(connectionID: connectionID, eventLoop: eventLoop) + // TODO: Maybe we want to add a static init for backing off connections to HTTP2ConnectionState + backingOffConnection.failedToConnect() + self.connections.append(backingOffConnection) + } + } + + // MARK: Connection creation + + /// used in general purpose connection scenarios to check if at least one connection exist, or if should we create a new one + var hasConnectionThatCanOrWillBeAbleToExecuteRequests: Bool { + self.connections.contains { $0.isStartingOrBackingOff || $0.isActive } + } + + /// used in eventLoop scenarios. does at least one connection exist for this eventLoop, or should we create a new one? + /// - Parameter eventLoop: connection `EventLoop` to search for + /// - Returns: true if at least one connection is starting or active for the given `eventLoop` + func hasConnectionThatCanOrWillBeAbleToExecuteRequests(for eventLoop: EventLoop) -> Bool { + self.connections.contains { + $0.eventLoop === eventLoop && ($0.isStartingOrBackingOff || $0.isActive) + } + } + + mutating func createNewConnection(on eventLoop: EventLoop) -> Connection.ID { + // assert no active connection exists on the requested eventLoop + assert(self.connections.allSatisfy { $0.eventLoop !== eventLoop || !$0.isActive }) + + let connection = HTTP2ConnectionState(connectionID: self.generator.next(), eventLoop: eventLoop) + self.connections.append(connection) + return connection.connectionID + } + + /// A new HTTP/2.0 connection was established. + /// + /// This will put the connection into the idle state. + /// + /// - Parameter connection: The new established connection. + /// - Returns: An index and an ``AvailableConnectionContext`` to determine the next action for the now idle connection. + /// Call ``leaseStreams(at:count:)`` or ``closeConnection(at:)`` with the supplied index after + /// this. + mutating func newHTTP2ConnectionEstablished(_ connection: Connection, maxConcurrentStreams: Int) -> (Int, AvailableConnectionContext) { + guard let index = self.connections.firstIndex(where: { $0.connectionID == connection.id }) else { + preconditionFailure("There is a new connection that we didn't request!") + } + precondition(connection.eventLoop === self.connections[index].eventLoop, "Expected the new connection to be on EL") + let availableStreams = self.connections[index].connected(connection, maxStreams: maxConcurrentStreams) + let context = AvailableConnectionContext( + availableStreams: availableStreams, + eventLoop: connection.eventLoop, + isIdle: self.connections[index].isIdle + ) + return (index, context) + } + + /// Move the HTTP1ConnectionState to backingOff. + /// + /// - Parameter connectionID: The connectionID of the failed connection attempt + /// - Returns: The eventLoop on which to schedule the backoff timer + mutating func backoffNextConnectionAttempt(_ connectionID: Connection.ID) -> EventLoop { + guard let index = self.connections.firstIndex(where: { $0.connectionID == connectionID }) else { + preconditionFailure("We tried to create a new connection that we know nothing about?") + } + + self.connections[index].failedToConnect() + return self.connections[index].eventLoop + } + + // MARK: Connection lifecycle events + + mutating func goAwayReceived(_ connectionID: Connection.ID) -> GoAwayContext { + guard let index = self.connections.firstIndex(where: { $0.connectionID == connectionID }) else { + preconditionFailure("go away recieved for a connection that does not exists") + } + let eventLoop = self.connections[index].goAwayReceived() + return GoAwayContext(eventLoop: eventLoop) + } + + mutating func newHTTP2MaxConcurrentStreamsReceived( + _ connectionID: Connection.ID, + newMaxStreams: Int + ) -> (Int, AvailableConnectionContext) { + guard let index = self.connections.firstIndex(where: { $0.connectionID == connectionID }) else { + preconditionFailure("We tried to update the maximum concurren streams number for a connection that does not exists") + } + let availableStreams = self.connections[index].newMaxConcurrentStreams(newMaxStreams) + let context = AvailableConnectionContext( + availableStreams: availableStreams, + eventLoop: self.connections[index].eventLoop, + isIdle: self.connections[index].isIdle + ) + return (index, context) + } + + // MARK: Leasing and releasing + + mutating func leaseStream(onPreferred eventLoop: EventLoop) -> Connection? { + guard let index = self.findAvailableConnection(onPreferred: eventLoop) else { return nil } + return self.leaseStreams(at: index, count: 1) + } + + /// tries to find an available connection on the prefered `eventLoop`. If it can't find one with the given `eventLoop`, it returns the first available connection + private func findAvailableConnection(onPreferred eventLoop: EventLoop) -> Int? { + var availableConnection: Int? + for (index, connection) in self.connections.enumerated() { + guard connection.isAvailable else { continue } + if connection.eventLoop === eventLoop { + return index + } else if availableConnection == nil { + availableConnection = index + } + } + return availableConnection + } + + mutating func leaseStreams(onRequired eventLoop: EventLoop) -> Connection? { + guard let index = self.findAvailableConnection(onRequired: eventLoop) else { return nil } + return self.leaseStreams(at: index, count: 1) + } + + /// tries to find an available connection on the required `eventLoop` + private func findAvailableConnection(onRequired eventLoop: EventLoop) -> Int? { + self.connections.firstIndex(where: { $0.eventLoop === eventLoop && $0.isAvailable }) + } + + /// lease `count` streams after connections establishment + /// - Parameters: + /// - index: index of the connection you got by calling `newHTTP2ConnectionEstablished(_:maxConcurrentStreams:)` + /// - count: number of streams you want to lease. You get the current available streams from the `AvailableConnectionContext` which `newHTTP2ConnectionEstablished(_:maxConcurrentStreams:)` returns + /// - Returns: connection to execute `count` requests on + /// - precondition: `index` needs to be valid. `count` must be greater than or equal to *0* and not execeed the number of available streams. + mutating func leaseStreams(at index: Int, count: Int) -> Connection { + self.connections[index].lease(count) + } + + mutating func releaseStream(_ connectionID: Connection.ID) -> (Int, AvailableConnectionContext) { + guard let index = self.connections.firstIndex(where: { $0.connectionID == connectionID }) else { + preconditionFailure("We tried to release a connection we do not know anything about") + } + let availableStreams = self.connections[index].release() + let context = AvailableConnectionContext( + availableStreams: availableStreams, + eventLoop: self.connections[index].eventLoop, + isIdle: self.connections[index].isIdle + ) + return (index, context) + } + + // MARK: Connection close/removal + + /// Closes the connection at the given index. This will also remove the connection right away. + /// - Parameter index: index of the connection which we get from `releaseStream(_:)` + /// - Returns: closed and removed connection + mutating func closeConnection(at index: Int) -> Connection { + let connection = self.connections[index].close() + self.connections.remove(at: index) + return connection + } + + /// removes a closed connection. + /// - Parameter index: index of the connection which we get from `failConnection(_:)` + /// - Precondition: connection must be closed + mutating func removeConnection(at index: Int) { + precondition(self.connections[index].isClosed, "We tried to remove a connection which is not closed") + self.connections.remove(at: index) + } + + mutating func closeConnectionIfIdle(_ connectionID: Connection.ID) -> Connection? { + guard let index = self.connections.firstIndex(where: { $0.connectionID == connectionID }) else { + // because of a race this connection (connection close runs against trigger of timeout) + // was already removed from the state machine. + return nil + } + guard self.connections[index].isIdle else { + // connection is not idle anymore, we may have just leased it for a request + return nil + } + return self.closeConnection(at: index) + } + + /// replaces a closed connection by creating a new starting connection. + /// - Parameter index: index of the connection which we get from `failConnection(_:)` + /// - Precondition: connection must be closed + mutating func createNewConnectionByReplacingClosedConnection(at index: Int) -> (Connection.ID, EventLoop) { + precondition(self.connections[index].isClosed) + let newConnection = HTTP2ConnectionState( + connectionID: self.generator.next(), + eventLoop: self.connections[index].eventLoop + ) + + self.connections[index] = newConnection + return (newConnection.connectionID, newConnection.eventLoop) + } + + mutating func failConnection(_ connectionID: Connection.ID) -> (Int, FailedConnectionContext)? { + guard let index = self.connections.firstIndex(where: { $0.connectionID == connectionID }) else { + /// When a connection close is initiated by the connection pool (e.g. because the connection was idle for too long), the connection will + /// still report its close to the state machine and then to us. In those cases we must ignore the event. + return nil + } + self.connections[index].fail() + let eventLoop = self.connections[index].eventLoop + let context = FailedConnectionContext(eventLoop: eventLoop) + return (index, context) + } + + mutating func shutdown() -> CleanupContext { + var cleanupContext = CleanupContext() + self.connections.removeAll(where: { connectionState in + switch connectionState.cleanup(&cleanupContext) { + case .removeConnection: + return true + case .keepConnection: + return false + } + }) + return cleanupContext + } + + // MARK: Result structs + + /// Information around an available connection + struct AvailableConnectionContext { + /// number of streams which can be leased + var availableStreams: Int + /// The eventLoop the connection is running on. + var eventLoop: EventLoop + /// true if no stream is leased + var isIdle: Bool + } + + struct GoAwayContext { + /// The eventLoop the connection is running on. + var eventLoop: EventLoop + } + + /// Information around the failed/closed connection. + struct FailedConnectionContext { + /// The eventLoop the connection ran on. + var eventLoop: EventLoop + } + + struct Stats: Equatable { + var startingConnections: Int = 0 + var backingOffConnections: Int = 0 + var idleConnections: Int = 0 + var availableConnections: Int = 0 + var drainingConnections: Int = 0 + var leasedStreams: Int = 0 + var availableStreams: Int = 0 + } + } +} diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2ConnectionsTest+XCTest.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2ConnectionsTest+XCTest.swift new file mode 100644 index 000000000..953da7222 --- /dev/null +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2ConnectionsTest+XCTest.swift @@ -0,0 +1,45 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the AsyncHTTPClient open source project +// +// Copyright (c) 2018-2019 Apple Inc. and the AsyncHTTPClient project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// +// +// HTTPConnectionPool+HTTP2ConnectionsTest+XCTest.swift +// +import XCTest + +/// +/// NOTE: This file was generated by generate_linux_tests.rb +/// +/// Do NOT edit this file directly as it will be regenerated automatically when needed. +/// + +extension HTTPConnectionPool_HTTP2ConnectionsTests { + static var allTests: [(String, (HTTPConnectionPool_HTTP2ConnectionsTests) -> () throws -> Void)] { + return [ + ("testCreatingConnections", testCreatingConnections), + ("testCreatingConnectionAndFailing", testCreatingConnectionAndFailing), + ("testFailConnectionRace", testFailConnectionRace), + ("testLeaseConnectionOfPreferredButUnavailableEL", testLeaseConnectionOfPreferredButUnavailableEL), + ("testLeaseConnectionOnRequiredButUnavailableEL", testLeaseConnectionOnRequiredButUnavailableEL), + ("testCloseConnectionIfIdle", testCloseConnectionIfIdle), + ("testCloseConnectionIfIdleButLeasedRaceCondition", testCloseConnectionIfIdleButLeasedRaceCondition), + ("testCloseConnectionIfIdleButClosedRaceCondition", testCloseConnectionIfIdleButClosedRaceCondition), + ("testCloseConnectionIfIdleRace", testCloseConnectionIfIdleRace), + ("testShutdown", testShutdown), + ("testLeasingAllConnections", testLeasingAllConnections), + ("testGoAway", testGoAway), + ("testNewMaxConcurrentStreamsSetting", testNewMaxConcurrentStreamsSetting), + ("testLeaseOnPreferredEventLoopWithoutAnyAvailable", testLeaseOnPreferredEventLoopWithoutAnyAvailable), + ("testMigrationFromHTTP1", testMigrationFromHTTP1), + ] + } +} diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2ConnectionsTest.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2ConnectionsTest.swift new file mode 100644 index 000000000..ec8c3ea04 --- /dev/null +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2ConnectionsTest.swift @@ -0,0 +1,511 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the AsyncHTTPClient open source project +// +// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +@testable import AsyncHTTPClient +import NIOCore +import NIOEmbedded +import XCTest + +class HTTPConnectionPool_HTTP2ConnectionsTests: XCTestCase { + func testCreatingConnections() { + let elg = EmbeddedEventLoopGroup(loops: 4) + var connections = HTTPConnectionPool.HTTP2Connections(generator: .init()) + + let el1 = elg.next() + let el2 = elg.next() + + // general purpose connection + XCTAssertFalse(connections.hasConnectionThatCanOrWillBeAbleToExecuteRequests) + XCTAssertFalse(connections.hasConnectionThatCanOrWillBeAbleToExecuteRequests(for: el1)) + let conn1ID = connections.createNewConnection(on: el1) + XCTAssertTrue(connections.hasConnectionThatCanOrWillBeAbleToExecuteRequests) + XCTAssertTrue(connections.hasConnectionThatCanOrWillBeAbleToExecuteRequests(for: el1)) + let conn1: HTTPConnectionPool.Connection = .__testOnly_connection(id: conn1ID, eventLoop: el1) + let (conn1Index, conn1CreatedContext) = connections.newHTTP2ConnectionEstablished(conn1, maxConcurrentStreams: 100) + XCTAssertEqual(conn1CreatedContext.availableStreams, 100) + XCTAssertEqual(conn1CreatedContext.isIdle, true) + XCTAssert(conn1CreatedContext.eventLoop === el1) + XCTAssertEqual(connections.leaseStreams(at: conn1Index, count: 1), conn1) + + // eventLoop connection + XCTAssertTrue(connections.hasConnectionThatCanOrWillBeAbleToExecuteRequests) + XCTAssertFalse(connections.hasConnectionThatCanOrWillBeAbleToExecuteRequests(for: el2)) + let conn2ID = connections.createNewConnection(on: el2) + XCTAssertTrue(connections.hasConnectionThatCanOrWillBeAbleToExecuteRequests(for: el2)) + let conn2: HTTPConnectionPool.Connection = .__testOnly_connection(id: conn2ID, eventLoop: el2) + let (conn2Index, conn2CreatedContext) = connections.newHTTP2ConnectionEstablished(conn2, maxConcurrentStreams: 100) + XCTAssertEqual(conn1CreatedContext.availableStreams, 100) + XCTAssertTrue(conn1CreatedContext.isIdle) + XCTAssert(conn2CreatedContext.eventLoop === el2) + XCTAssertEqual(connections.leaseStreams(at: conn2Index, count: 1), conn2) + XCTAssertTrue(connections.hasConnectionThatCanOrWillBeAbleToExecuteRequests(for: el2)) + } + + func testCreatingConnectionAndFailing() { + let elg = EmbeddedEventLoopGroup(loops: 4) + var connections = HTTPConnectionPool.HTTP2Connections(generator: .init()) + + let el1 = elg.next() + let el2 = elg.next() + + // general purpose connection + XCTAssertFalse(connections.hasConnectionThatCanOrWillBeAbleToExecuteRequests) + XCTAssertFalse(connections.hasConnectionThatCanOrWillBeAbleToExecuteRequests(for: el1)) + let conn1ID = connections.createNewConnection(on: el1) + XCTAssertEqual(conn1ID, 0) + XCTAssertTrue(connections.hasConnectionThatCanOrWillBeAbleToExecuteRequests) + XCTAssertTrue(connections.hasConnectionThatCanOrWillBeAbleToExecuteRequests(for: el1)) + + // connection failed to start. 1. backoff + let backoff1EL = connections.backoffNextConnectionAttempt(conn1ID) + XCTAssert(backoff1EL === el1) + // backoff done. 2. decide what's next + guard let (conn1FailIndex, conn1FailContext) = connections.failConnection(conn1ID) else { + return XCTFail("Expected that the connection is remembered") + } + + XCTAssert(conn1FailContext.eventLoop === el1) + XCTAssertFalse(connections.hasConnectionThatCanOrWillBeAbleToExecuteRequests) + XCTAssertFalse(connections.hasConnectionThatCanOrWillBeAbleToExecuteRequests(for: el1)) + let (replaceConn1ID, replaceConn1EL) = connections.createNewConnectionByReplacingClosedConnection(at: conn1FailIndex) + XCTAssert(replaceConn1EL === el1) + XCTAssertEqual(replaceConn1ID, 1) + XCTAssertTrue(connections.hasConnectionThatCanOrWillBeAbleToExecuteRequests) + XCTAssertTrue(connections.hasConnectionThatCanOrWillBeAbleToExecuteRequests(for: el1)) + + // eventLoop connection + let conn2ID = connections.createNewConnection(on: el2) + XCTAssertTrue(connections.hasConnectionThatCanOrWillBeAbleToExecuteRequests) + XCTAssertTrue(connections.hasConnectionThatCanOrWillBeAbleToExecuteRequests(for: el2)) + let backoff2EL = connections.backoffNextConnectionAttempt(conn2ID) + XCTAssert(backoff2EL === el2) + guard let (conn2FailIndex, conn2FailContext) = connections.failConnection(conn2ID) else { + return XCTFail("Expected that the connection is remembered") + } + XCTAssert(conn2FailContext.eventLoop === el2) + connections.removeConnection(at: conn2FailIndex) + XCTAssertFalse(connections.hasConnectionThatCanOrWillBeAbleToExecuteRequests(for: el2)) + } + + func testFailConnectionRace() { + let elg = EmbeddedEventLoopGroup(loops: 5) + + let el1 = elg.next() + + var connections = HTTPConnectionPool.HTTP2Connections(generator: .init()) + + // connection is idle + let conn1ID = connections.createNewConnection(on: el1) + let conn1: HTTPConnectionPool.Connection = .__testOnly_connection(id: conn1ID, eventLoop: el1) + _ = connections.newHTTP2ConnectionEstablished(conn1, maxConcurrentStreams: 100) + + // connection close is initiated from the pool + XCTAssertEqual(connections.closeConnectionIfIdle(conn1ID), conn1) + + // connection will report close event to us even if we have initialled it an we need to tolerate it + XCTAssertNil(connections.failConnection(conn1ID)) + } + + func testLeaseConnectionOfPreferredButUnavailableEL() { + let elg = EmbeddedEventLoopGroup(loops: 5) + let el1 = elg.next() + let el2 = elg.next() + let el3 = elg.next() + let el4 = elg.next() + let el5 = elg.next() + + var connections = HTTPConnectionPool.HTTP2Connections(generator: .init()) + XCTAssertFalse(connections.hasConnectionThatCanOrWillBeAbleToExecuteRequests) + for el in [el1, el2, el3, el4] { + XCTAssertFalse(connections.hasConnectionThatCanOrWillBeAbleToExecuteRequests(for: el)) + let connID = connections.createNewConnection(on: el) + XCTAssertTrue(connections.hasConnectionThatCanOrWillBeAbleToExecuteRequests) + XCTAssertTrue(connections.hasConnectionThatCanOrWillBeAbleToExecuteRequests(for: el)) + let conn: HTTPConnectionPool.Connection = .__testOnly_connection(id: connID, eventLoop: el) + let (_, conn1CreatedContext) = connections.newHTTP2ConnectionEstablished(conn, maxConcurrentStreams: 100) + XCTAssertEqual(conn1CreatedContext.availableStreams, 100) + XCTAssertEqual(conn1CreatedContext.isIdle, true) + XCTAssert(conn1CreatedContext.eventLoop === el) + } + + XCTAssertNotNil(connections.leaseStream(onPreferred: el5)) + } + + func testLeaseConnectionOnRequiredButUnavailableEL() { + let elg = EmbeddedEventLoopGroup(loops: 5) + let el1 = elg.next() + let el2 = elg.next() + let el3 = elg.next() + let el4 = elg.next() + let el5 = elg.next() + + var connections = HTTPConnectionPool.HTTP2Connections(generator: .init()) + XCTAssertFalse(connections.hasConnectionThatCanOrWillBeAbleToExecuteRequests) + for el in [el1, el2, el3, el4] { + XCTAssertFalse(connections.hasConnectionThatCanOrWillBeAbleToExecuteRequests(for: el)) + let connID = connections.createNewConnection(on: el) + XCTAssertTrue(connections.hasConnectionThatCanOrWillBeAbleToExecuteRequests) + XCTAssertTrue(connections.hasConnectionThatCanOrWillBeAbleToExecuteRequests(for: el)) + let conn: HTTPConnectionPool.Connection = .__testOnly_connection(id: connID, eventLoop: el) + let (_, conn1CreatedContext) = connections.newHTTP2ConnectionEstablished(conn, maxConcurrentStreams: 100) + XCTAssertEqual(conn1CreatedContext.availableStreams, 100) + XCTAssertEqual(conn1CreatedContext.isIdle, true) + XCTAssert(conn1CreatedContext.eventLoop === el) + } + + XCTAssertNil(connections.leaseStreams(onRequired: el5)) + } + + func testCloseConnectionIfIdle() { + let elg = EmbeddedEventLoopGroup(loops: 5) + + let el1 = elg.next() + + var connections = HTTPConnectionPool.HTTP2Connections(generator: .init()) + + // connection is idle + let conn1ID = connections.createNewConnection(on: el1) + let conn1: HTTPConnectionPool.Connection = .__testOnly_connection(id: conn1ID, eventLoop: el1) + _ = connections.newHTTP2ConnectionEstablished(conn1, maxConcurrentStreams: 100) + XCTAssertEqual(connections.closeConnectionIfIdle(conn1ID), conn1) + + // connection is not idle + let conn2ID = connections.createNewConnection(on: el1) + let conn2: HTTPConnectionPool.Connection = .__testOnly_connection(id: conn2ID, eventLoop: el1) + let (conn2Index, _) = connections.newHTTP2ConnectionEstablished(conn2, maxConcurrentStreams: 100) + XCTAssertEqual(connections.leaseStreams(at: conn2Index, count: 1), conn2) + XCTAssertNil(connections.closeConnectionIfIdle(conn2ID)) + } + + func testCloseConnectionIfIdleButLeasedRaceCondition() { + let elg = EmbeddedEventLoopGroup(loops: 5) + + let el1 = elg.next() + + var connections = HTTPConnectionPool.HTTP2Connections(generator: .init()) + + // connection is idle + let conn1ID = connections.createNewConnection(on: el1) + let conn1: HTTPConnectionPool.Connection = .__testOnly_connection(id: conn1ID, eventLoop: el1) + _ = connections.newHTTP2ConnectionEstablished(conn1, maxConcurrentStreams: 100) + + // connection is leased + let lease = connections.leaseStream(onPreferred: el1) + XCTAssertEqual(lease, conn1) + + // timeout arrives minimal to late + XCTAssertEqual(connections.closeConnectionIfIdle(conn1ID), nil) + } + + func testCloseConnectionIfIdleButClosedRaceCondition() { + let elg = EmbeddedEventLoopGroup(loops: 5) + + let el1 = elg.next() + + var connections = HTTPConnectionPool.HTTP2Connections(generator: .init()) + + // connection is idle + let conn1ID = connections.createNewConnection(on: el1) + let conn1: HTTPConnectionPool.Connection = .__testOnly_connection(id: conn1ID, eventLoop: el1) + _ = connections.newHTTP2ConnectionEstablished(conn1, maxConcurrentStreams: 100) + _ = connections.failConnection(conn1ID) + + // timeout arrives minimal to late + XCTAssertEqual(connections.closeConnectionIfIdle(conn1ID), nil) + } + + func testCloseConnectionIfIdleRace() { + let elg = EmbeddedEventLoopGroup(loops: 5) + + let el1 = elg.next() + + var connections = HTTPConnectionPool.HTTP2Connections(generator: .init()) + + // connection is idle + let conn1ID = connections.createNewConnection(on: el1) + let conn1: HTTPConnectionPool.Connection = .__testOnly_connection(id: conn1ID, eventLoop: el1) + _ = connections.newHTTP2ConnectionEstablished(conn1, maxConcurrentStreams: 100) + + // we lease it just before timeout + XCTAssertEqual(connections.leaseStreams(onRequired: el1), conn1) + + // timeout arrives minimal to late + XCTAssertEqual(connections.closeConnectionIfIdle(conn1ID), nil) + } + + func testShutdown() { + let elg = EmbeddedEventLoopGroup(loops: 6) + let el1 = elg.next() + let el2 = elg.next() + let el3 = elg.next() + let el4 = elg.next() + let el5 = elg.next() + let el6 = elg.next() + + var connections = HTTPConnectionPool.HTTP2Connections(generator: .init()) + XCTAssertFalse(connections.hasConnectionThatCanOrWillBeAbleToExecuteRequests) + for el in [el1, el2, el3, el4] { + XCTAssertFalse(connections.hasConnectionThatCanOrWillBeAbleToExecuteRequests(for: el)) + let connID = connections.createNewConnection(on: el) + XCTAssertTrue(connections.hasConnectionThatCanOrWillBeAbleToExecuteRequests) + XCTAssertTrue(connections.hasConnectionThatCanOrWillBeAbleToExecuteRequests(for: el)) + let conn: HTTPConnectionPool.Connection = .__testOnly_connection(id: connID, eventLoop: el) + let (_, conn1CreatedContext) = connections.newHTTP2ConnectionEstablished(conn, maxConcurrentStreams: 100) + XCTAssertEqual(conn1CreatedContext.availableStreams, 100) + XCTAssertEqual(conn1CreatedContext.isIdle, true) + XCTAssert(conn1CreatedContext.eventLoop === el) + } + + XCTAssertEqual(connections.stats.backingOffConnections, 0) + XCTAssertEqual(connections.stats.leasedStreams, 0) + XCTAssertEqual(connections.stats.availableStreams, 400) + XCTAssertEqual(connections.stats.idleConnections, 4) + + // connection is leased + guard let lease = connections.leaseStream(onPreferred: el1) else { + return XCTFail("Expected to be able to lease a connection") + } + XCTAssertEqual(lease, .__testOnly_connection(id: 0, eventLoop: el1)) + + XCTAssertEqual(connections.stats.backingOffConnections, 0) + XCTAssertEqual(connections.stats.leasedStreams, 1) + XCTAssertEqual(connections.stats.availableStreams, 399) + XCTAssertEqual(connections.stats.idleConnections, 3) + + // start another connection that fails + let backingOffID = connections.createNewConnection(on: el5) + XCTAssert(connections.backoffNextConnectionAttempt(backingOffID) === el5) + + // start another connection + let startingID = connections.createNewConnection(on: el6) + + let context = connections.shutdown() + XCTAssertEqual(context.close.count, 3) + XCTAssertEqual(context.cancel, [lease]) + XCTAssertEqual(context.connectBackoff, [backingOffID]) + + XCTAssertEqual(connections.stats.idleConnections, 0) + XCTAssertEqual(connections.stats.backingOffConnections, 0) + XCTAssertEqual(connections.stats.leasedStreams, 1) + XCTAssertEqual(connections.stats.availableStreams, 99) + XCTAssertEqual(connections.stats.startingConnections, 1) + XCTAssertFalse(connections.isEmpty) + + let (releaseIndex, _) = connections.releaseStream(lease.id) + XCTAssertEqual(connections.closeConnection(at: releaseIndex), lease) + XCTAssertFalse(connections.isEmpty) + + guard let (failIndex, _) = connections.failConnection(startingID) else { + return XCTFail("Expected that the connection is remembered") + } + connections.removeConnection(at: failIndex) + XCTAssertTrue(connections.isEmpty) + } + + func testLeasingAllConnections() { + let elg = EmbeddedEventLoopGroup(loops: 4) + var connections = HTTPConnectionPool.HTTP2Connections(generator: .init()) + let el1 = elg.next() + + let conn1ID = connections.createNewConnection(on: el1) + let conn1: HTTPConnectionPool.Connection = .__testOnly_connection(id: conn1ID, eventLoop: el1) + let (conn1Index, conn1CreatedContext) = connections.newHTTP2ConnectionEstablished(conn1, maxConcurrentStreams: 100) + XCTAssertEqual(conn1CreatedContext.availableStreams, 100) + XCTAssertEqual(connections.leaseStreams(at: conn1Index, count: 100), conn1) + + XCTAssertNil(connections.leaseStreams(onRequired: el1), "should not be able to lease stream because they are all already leased") + + let (_, releaseContext) = connections.releaseStream(conn1ID) + XCTAssertFalse(releaseContext.isIdle) + XCTAssertEqual(releaseContext.availableStreams, 1) + + XCTAssertEqual(connections.leaseStreams(onRequired: el1), conn1) + XCTAssertNil(connections.leaseStreams(onRequired: el1), "should not be able to lease stream because they are all already leased") + } + + func testGoAway() { + let elg = EmbeddedEventLoopGroup(loops: 4) + var connections = HTTPConnectionPool.HTTP2Connections(generator: .init()) + let el1 = elg.next() + + let conn1ID = connections.createNewConnection(on: el1) + let conn1: HTTPConnectionPool.Connection = .__testOnly_connection(id: conn1ID, eventLoop: el1) + let (conn1Index, conn1CreatedContext) = connections.newHTTP2ConnectionEstablished(conn1, maxConcurrentStreams: 10) + XCTAssertEqual(conn1CreatedContext.availableStreams, 10) + XCTAssertEqual(connections.leaseStreams(at: conn1Index, count: 2), conn1) + + XCTAssertTrue(connections.goAwayReceived(conn1ID).eventLoop === el1) + + XCTAssertEqual( + connections.stats, + .init( + startingConnections: 0, + backingOffConnections: 0, + idleConnections: 0, + availableConnections: 0, + drainingConnections: 1, + leasedStreams: 2, + availableStreams: 0 + ) + ) + + XCTAssertNil(connections.leaseStreams(onRequired: el1), "we should not be able to lease a stream because the connection is draining") + + // a server can potentially send more than one connection go away and we should not crash + XCTAssertTrue(connections.goAwayReceived(conn1ID).eventLoop === el1) + XCTAssertEqual( + connections.stats, + .init( + startingConnections: 0, + backingOffConnections: 0, + idleConnections: 0, + availableConnections: 0, + drainingConnections: 1, + leasedStreams: 2, + availableStreams: 0 + ) + ) + + // release a connection + let (_, release1Context) = connections.releaseStream(conn1ID) + XCTAssertFalse(release1Context.isIdle) + XCTAssertEqual(release1Context.availableStreams, 0) + XCTAssertEqual( + connections.stats, + .init( + startingConnections: 0, + backingOffConnections: 0, + idleConnections: 0, + availableConnections: 0, + drainingConnections: 1, + leasedStreams: 1, + availableStreams: 0 + ) + ) + + // release last connection + let (_, release2Context) = connections.releaseStream(conn1ID) + XCTAssertFalse(release2Context.isIdle) + XCTAssertEqual(release2Context.availableStreams, 0) + XCTAssertEqual( + connections.stats, + .init( + startingConnections: 0, + backingOffConnections: 0, + idleConnections: 0, + availableConnections: 0, + drainingConnections: 1, + leasedStreams: 0, + availableStreams: 0 + ) + ) + } + + func testNewMaxConcurrentStreamsSetting() { + let elg = EmbeddedEventLoopGroup(loops: 4) + var connections = HTTPConnectionPool.HTTP2Connections(generator: .init()) + let el1 = elg.next() + + let conn1ID = connections.createNewConnection(on: el1) + let conn1: HTTPConnectionPool.Connection = .__testOnly_connection(id: conn1ID, eventLoop: el1) + let (conn1Index, conn1CreatedContext) = connections.newHTTP2ConnectionEstablished(conn1, maxConcurrentStreams: 1) + XCTAssertEqual(conn1CreatedContext.availableStreams, 1) + XCTAssertEqual(connections.leaseStreams(at: conn1Index, count: 1), conn1) + + XCTAssertNil(connections.leaseStreams(onRequired: el1), "all streams are in use") + + let (_, newSettingsContext1) = connections.newHTTP2MaxConcurrentStreamsReceived(conn1ID, newMaxStreams: 2) + XCTAssertEqual(newSettingsContext1.availableStreams, 1) + XCTAssertTrue(newSettingsContext1.eventLoop === el1) + XCTAssertFalse(newSettingsContext1.isIdle) + + XCTAssertEqual(connections.leaseStreams(onRequired: el1), conn1) + + let (_, newSettingsContext2) = connections.newHTTP2MaxConcurrentStreamsReceived(conn1ID, newMaxStreams: 1) + XCTAssertEqual(newSettingsContext2.availableStreams, 0) + XCTAssertTrue(newSettingsContext2.eventLoop === el1) + XCTAssertFalse(newSettingsContext2.isIdle) + + // release a connection + let (_, release1Context) = connections.releaseStream(conn1ID) + XCTAssertFalse(release1Context.isIdle) + XCTAssertEqual(release1Context.availableStreams, 0) + + XCTAssertNil(connections.leaseStreams(onRequired: el1), "all streams are in use") + + // release a connection + let (_, release2Context) = connections.releaseStream(conn1ID) + XCTAssertTrue(release2Context.isIdle) + XCTAssertEqual(release2Context.availableStreams, 1) + + XCTAssertEqual(connections.leaseStreams(onRequired: el1), conn1) + } + + func testLeaseOnPreferredEventLoopWithoutAnyAvailable() { + let elg = EmbeddedEventLoopGroup(loops: 4) + var connections = HTTPConnectionPool.HTTP2Connections(generator: .init()) + let el1 = elg.next() + + let conn1ID = connections.createNewConnection(on: el1) + let conn1: HTTPConnectionPool.Connection = .__testOnly_connection(id: conn1ID, eventLoop: el1) + let (conn1Index, conn1CreatedContext) = connections.newHTTP2ConnectionEstablished(conn1, maxConcurrentStreams: 1) + XCTAssertEqual(conn1CreatedContext.availableStreams, 1) + XCTAssertEqual(connections.leaseStreams(at: conn1Index, count: 1), conn1) + + XCTAssertNil(connections.leaseStream(onPreferred: el1), "all streams are in use") + } + + func testMigrationFromHTTP1() { + let elg = EmbeddedEventLoopGroup(loops: 4) + var connections = HTTPConnectionPool.HTTP2Connections(generator: .init()) + let el1 = elg.next() + let el2 = elg.next() + let conn1ID: HTTPConnectionPool.Connection.ID = 1 + let conn2ID: HTTPConnectionPool.Connection.ID = 2 + + connections.migrateConnections( + starting: [(conn1ID, el1)], + backingOff: [(conn2ID, el2)] + ) + XCTAssertEqual( + connections.stats, + .init( + startingConnections: 1, + backingOffConnections: 1, + idleConnections: 0, + availableConnections: 0, + drainingConnections: 0, + leasedStreams: 0, + availableStreams: 0 + ) + ) + + let conn1: HTTPConnectionPool.Connection = .__testOnly_connection(id: conn1ID, eventLoop: el1) + let (conn1Index, conn1CreatedContext) = connections.newHTTP2ConnectionEstablished(conn1, maxConcurrentStreams: 100) + XCTAssertEqual(conn1CreatedContext.availableStreams, 100) + XCTAssertEqual(connections.leaseStreams(at: conn1Index, count: 2), conn1) + XCTAssertEqual( + connections.stats, + .init( + startingConnections: 0, + backingOffConnections: 1, + idleConnections: 0, + availableConnections: 1, + drainingConnections: 0, + leasedStreams: 2, + availableStreams: 98 + ) + ) + } +} diff --git a/Tests/LinuxMain.swift b/Tests/LinuxMain.swift index e42154c06..3f0de1080 100644 --- a/Tests/LinuxMain.swift +++ b/Tests/LinuxMain.swift @@ -44,6 +44,7 @@ import XCTest testCase(HTTPConnectionPool_FactoryTests.allTests), testCase(HTTPConnectionPool_HTTP1ConnectionsTests.allTests), testCase(HTTPConnectionPool_HTTP1StateMachineTests.allTests), + testCase(HTTPConnectionPool_HTTP2ConnectionsTests.allTests), testCase(HTTPConnectionPool_ManagerTests.allTests), testCase(HTTPConnectionPool_RequestQueueTests.allTests), testCase(HTTPRequestStateMachineTests.allTests), From 4d1759c102cc14bbd00e80f3d747dc6867d4645b Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Mon, 27 Sep 2021 14:42:34 +0200 Subject: [PATCH 2/5] fix review comments from @fabianfett --- .../HTTPConnectionPool+HTTP2Connections.swift | 78 ++++++++++++++----- ...PConnectionPool+HTTP2ConnectionsTest.swift | 20 ++--- 2 files changed, 68 insertions(+), 30 deletions(-) diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift index fb8629135..28d3792ad 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift @@ -49,6 +49,15 @@ extension HTTPConnectionPool { } } + var canOrWillBeAbleToExecuteRequests: Bool { + switch self.state { + case .starting, .backingOff, .active: + return true + case .draining, .closed: + return false + } + } + /// A request can be scheduled on the connection var isAvailable: Bool { switch self.state { @@ -92,6 +101,7 @@ extension HTTPConnectionPool { switch self.state { case .active, .draining, .backingOff, .closed: preconditionFailure("Invalid state: \(self.state)") + case .starting: self.state = .active(conn, maxStreams: maxStreams, usedStreams: 0, lastIdle: .now()) return maxStreams @@ -106,12 +116,14 @@ extension HTTPConnectionPool { switch self.state { case .starting, .backingOff, .closed: preconditionFailure("Invalid state for updating max concurrent streams: \(self.state)") - case .draining(let conn, _, let usedStreams): - self.state = .draining(conn, maxStreams: maxStreams, usedStreams: usedStreams) - return 0 + case .active(let conn, _, let usedStreams, let lastIdle): self.state = .active(conn, maxStreams: maxStreams, usedStreams: usedStreams, lastIdle: lastIdle) return max(maxStreams - usedStreams, 0) + + case .draining(let conn, _, let usedStreams): + self.state = .draining(conn, maxStreams: maxStreams, usedStreams: usedStreams) + return 0 } } @@ -119,12 +131,14 @@ extension HTTPConnectionPool { switch self.state { case .starting, .backingOff, .closed: preconditionFailure("Invalid state for draining a connection: \(self.state)") - case .draining(let conn, _, _): - // we could potentially receive another go away while we drain all active streams and we just ignore it - return conn.eventLoop + case .active(let conn, let maxStreams, let usedStreams, _): self.state = .draining(conn, maxStreams: maxStreams, usedStreams: usedStreams) return conn.eventLoop + + case .draining(let conn, _, _): + // we could potentially receive another go away while we drain all active streams and we just ignore it + return conn.eventLoop } } @@ -151,6 +165,7 @@ extension HTTPConnectionPool { switch self.state { case .starting, .backingOff, .draining, .closed: preconditionFailure("Invalid state for leasing a stream: \(self.state)") + case .active(let conn, let maxStreams, var usedStreams, let lastIdle): usedStreams += count precondition(usedStreams <= maxStreams, "tried to lease a connection which is not available") @@ -165,11 +180,7 @@ extension HTTPConnectionPool { switch self.state { case .starting, .backingOff, .closed: preconditionFailure("Invalid state: \(self.state)") - case .draining(let conn, let maxStreams, var usedStreams): - usedStreams -= 1 - assert(usedStreams >= 0) - self.state = .draining(conn, maxStreams: maxStreams, usedStreams: usedStreams) - return 0 + case .active(let conn, let maxStreams, var usedStreams, var lastIdle): usedStreams -= 1 assert(usedStreams >= 0) @@ -178,6 +189,12 @@ extension HTTPConnectionPool { } self.state = .active(conn, maxStreams: maxStreams, usedStreams: usedStreams, lastIdle: lastIdle) return max(maxStreams - usedStreams, 0) + + case .draining(let conn, let maxStreams, var usedStreams): + usedStreams -= 1 + assert(usedStreams >= 0) + self.state = .draining(conn, maxStreams: maxStreams, usedStreams: usedStreams) + return 0 } } @@ -186,6 +203,7 @@ extension HTTPConnectionPool { case .active(let conn, _, 0, _): self.state = .closed return conn + case .starting, .backingOff, .draining, .closed, .active: preconditionFailure("Invalid state for closing a connection: \(self.state)") } @@ -214,9 +232,11 @@ extension HTTPConnectionPool { switch self.state { case .starting: return .keepConnection + case .backingOff: context.connectBackoff.append(self.connectionID) return .removeConnection + case .active(let connection, _, let usedStreams, _): if usedStreams <= 0 { context.close.append(connection) @@ -225,9 +245,11 @@ extension HTTPConnectionPool { context.cancel.append(connection) return .keepConnection } + case .draining(let connection, _, _): context.cancel.append(connection) return .keepConnection + case .closed: preconditionFailure("Unexpected state for cleanup: Did not expect to have closed connections in the state machine.") } @@ -240,15 +262,19 @@ extension HTTPConnectionPool { switch self.state { case .starting: stats.startingConnections &+= 1 + case .backingOff: stats.backingOffConnections &+= 1 + case .active(_, let maxStreams, let usedStreams, _): stats.availableStreams += max(maxStreams - usedStreams, 0) stats.leasedStreams += usedStreams stats.availableConnections &+= 1 + case .draining(_, _, let usedStreams): stats.drainingConnections &+= 1 stats.leasedStreams += usedStreams + case .closed: break } @@ -310,7 +336,7 @@ extension HTTPConnectionPool { /// used in general purpose connection scenarios to check if at least one connection exist, or if should we create a new one var hasConnectionThatCanOrWillBeAbleToExecuteRequests: Bool { - self.connections.contains { $0.isStartingOrBackingOff || $0.isActive } + self.connections.contains { $0.canOrWillBeAbleToExecuteRequests } } /// used in eventLoop scenarios. does at least one connection exist for this eventLoop, or should we create a new one? @@ -318,13 +344,15 @@ extension HTTPConnectionPool { /// - Returns: true if at least one connection is starting or active for the given `eventLoop` func hasConnectionThatCanOrWillBeAbleToExecuteRequests(for eventLoop: EventLoop) -> Bool { self.connections.contains { - $0.eventLoop === eventLoop && ($0.isStartingOrBackingOff || $0.isActive) + $0.eventLoop === eventLoop && $0.canOrWillBeAbleToExecuteRequests } } mutating func createNewConnection(on eventLoop: EventLoop) -> Connection.ID { - // assert no active connection exists on the requested eventLoop - assert(self.connections.allSatisfy { $0.eventLoop !== eventLoop || !$0.isActive }) + assert( + !self.hasConnectionThatCanOrWillBeAbleToExecuteRequests(for: eventLoop), + "we should not create more than one connection per event loop" + ) let connection = HTTP2ConnectionState(connectionID: self.generator.next(), eventLoop: eventLoop) self.connections.append(connection) @@ -353,10 +381,11 @@ extension HTTPConnectionPool { return (index, context) } - /// Move the HTTP1ConnectionState to backingOff. + /// Move the connection state to backingOff. /// /// - Parameter connectionID: The connectionID of the failed connection attempt /// - Returns: The eventLoop on which to schedule the backoff timer + /// - Precondition: connection needs to be in the `.starting` state mutating func backoffNextConnectionAttempt(_ connectionID: Connection.ID) -> EventLoop { guard let index = self.connections.firstIndex(where: { $0.connectionID == connectionID }) else { preconditionFailure("We tried to create a new connection that we know nothing about?") @@ -368,6 +397,9 @@ extension HTTPConnectionPool { // MARK: Connection lifecycle events + /// Sets the connection with the given `connectionId` to the draining state. + /// - Returns: the `EventLoop` to create a new connection on if applicable + /// - Precondition: connection with given `connectionId` must be either `.active` or already in the `.draining` state mutating func goAwayReceived(_ connectionID: Connection.ID) -> GoAwayContext { guard let index = self.connections.firstIndex(where: { $0.connectionID == connectionID }) else { preconditionFailure("go away recieved for a connection that does not exists") @@ -376,12 +408,18 @@ extension HTTPConnectionPool { return GoAwayContext(eventLoop: eventLoop) } + /// Update the maximum number of concurrent streams for the given connection. + /// - Parameters: + /// - connectionID: The connectionID for which we received new settings + /// - newMaxStreams: new maximum concurrent streams + /// - Returns: index of the connection and new number of available streams in the `AvailableConnectionContext` + /// - Precondition: Connections must be in the `.active` or `.draining` state. mutating func newHTTP2MaxConcurrentStreamsReceived( _ connectionID: Connection.ID, newMaxStreams: Int ) -> (Int, AvailableConnectionContext) { guard let index = self.connections.firstIndex(where: { $0.connectionID == connectionID }) else { - preconditionFailure("We tried to update the maximum concurren streams number for a connection that does not exists") + preconditionFailure("We tried to update the maximum number of concurrent streams for a connection that does not exists") } let availableStreams = self.connections[index].newMaxConcurrentStreams(newMaxStreams) let context = AvailableConnectionContext( @@ -413,7 +451,7 @@ extension HTTPConnectionPool { return availableConnection } - mutating func leaseStreams(onRequired eventLoop: EventLoop) -> Connection? { + mutating func leaseStream(onRequired eventLoop: EventLoop) -> Connection? { guard let index = self.findAvailableConnection(onRequired: eventLoop) else { return nil } return self.leaseStreams(at: index, count: 1) } @@ -453,7 +491,7 @@ extension HTTPConnectionPool { /// - Returns: closed and removed connection mutating func closeConnection(at index: Int) -> Connection { let connection = self.connections[index].close() - self.connections.remove(at: index) + self.removeConnection(at: index) return connection } @@ -479,7 +517,7 @@ extension HTTPConnectionPool { } /// replaces a closed connection by creating a new starting connection. - /// - Parameter index: index of the connection which we get from `failConnection(_:)` + /// - Parameter index: index of the connection which we got from `failConnection(_:)` /// - Precondition: connection must be closed mutating func createNewConnectionByReplacingClosedConnection(at index: Int) -> (Connection.ID, EventLoop) { precondition(self.connections[index].isClosed) diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2ConnectionsTest.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2ConnectionsTest.swift index ec8c3ea04..eae4c73c0 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2ConnectionsTest.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+HTTP2ConnectionsTest.swift @@ -164,7 +164,7 @@ class HTTPConnectionPool_HTTP2ConnectionsTests: XCTestCase { XCTAssert(conn1CreatedContext.eventLoop === el) } - XCTAssertNil(connections.leaseStreams(onRequired: el5)) + XCTAssertNil(connections.leaseStream(onRequired: el5)) } func testCloseConnectionIfIdle() { @@ -238,7 +238,7 @@ class HTTPConnectionPool_HTTP2ConnectionsTests: XCTestCase { _ = connections.newHTTP2ConnectionEstablished(conn1, maxConcurrentStreams: 100) // we lease it just before timeout - XCTAssertEqual(connections.leaseStreams(onRequired: el1), conn1) + XCTAssertEqual(connections.leaseStream(onRequired: el1), conn1) // timeout arrives minimal to late XCTAssertEqual(connections.closeConnectionIfIdle(conn1ID), nil) @@ -324,14 +324,14 @@ class HTTPConnectionPool_HTTP2ConnectionsTests: XCTestCase { XCTAssertEqual(conn1CreatedContext.availableStreams, 100) XCTAssertEqual(connections.leaseStreams(at: conn1Index, count: 100), conn1) - XCTAssertNil(connections.leaseStreams(onRequired: el1), "should not be able to lease stream because they are all already leased") + XCTAssertNil(connections.leaseStream(onRequired: el1), "should not be able to lease stream because they are all already leased") let (_, releaseContext) = connections.releaseStream(conn1ID) XCTAssertFalse(releaseContext.isIdle) XCTAssertEqual(releaseContext.availableStreams, 1) - XCTAssertEqual(connections.leaseStreams(onRequired: el1), conn1) - XCTAssertNil(connections.leaseStreams(onRequired: el1), "should not be able to lease stream because they are all already leased") + XCTAssertEqual(connections.leaseStream(onRequired: el1), conn1) + XCTAssertNil(connections.leaseStream(onRequired: el1), "should not be able to lease stream because they are all already leased") } func testGoAway() { @@ -360,7 +360,7 @@ class HTTPConnectionPool_HTTP2ConnectionsTests: XCTestCase { ) ) - XCTAssertNil(connections.leaseStreams(onRequired: el1), "we should not be able to lease a stream because the connection is draining") + XCTAssertNil(connections.leaseStream(onRequired: el1), "we should not be able to lease a stream because the connection is draining") // a server can potentially send more than one connection go away and we should not crash XCTAssertTrue(connections.goAwayReceived(conn1ID).eventLoop === el1) @@ -423,14 +423,14 @@ class HTTPConnectionPool_HTTP2ConnectionsTests: XCTestCase { XCTAssertEqual(conn1CreatedContext.availableStreams, 1) XCTAssertEqual(connections.leaseStreams(at: conn1Index, count: 1), conn1) - XCTAssertNil(connections.leaseStreams(onRequired: el1), "all streams are in use") + XCTAssertNil(connections.leaseStream(onRequired: el1), "all streams are in use") let (_, newSettingsContext1) = connections.newHTTP2MaxConcurrentStreamsReceived(conn1ID, newMaxStreams: 2) XCTAssertEqual(newSettingsContext1.availableStreams, 1) XCTAssertTrue(newSettingsContext1.eventLoop === el1) XCTAssertFalse(newSettingsContext1.isIdle) - XCTAssertEqual(connections.leaseStreams(onRequired: el1), conn1) + XCTAssertEqual(connections.leaseStream(onRequired: el1), conn1) let (_, newSettingsContext2) = connections.newHTTP2MaxConcurrentStreamsReceived(conn1ID, newMaxStreams: 1) XCTAssertEqual(newSettingsContext2.availableStreams, 0) @@ -442,14 +442,14 @@ class HTTPConnectionPool_HTTP2ConnectionsTests: XCTestCase { XCTAssertFalse(release1Context.isIdle) XCTAssertEqual(release1Context.availableStreams, 0) - XCTAssertNil(connections.leaseStreams(onRequired: el1), "all streams are in use") + XCTAssertNil(connections.leaseStream(onRequired: el1), "all streams are in use") // release a connection let (_, release2Context) = connections.releaseStream(conn1ID) XCTAssertTrue(release2Context.isIdle) XCTAssertEqual(release2Context.availableStreams, 1) - XCTAssertEqual(connections.leaseStreams(onRequired: el1), conn1) + XCTAssertEqual(connections.leaseStream(onRequired: el1), conn1) } func testLeaseOnPreferredEventLoopWithoutAnyAvailable() { From 38154d2851672d653eeded327cc6bee4c22b5a5b Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Mon, 27 Sep 2021 18:35:56 +0200 Subject: [PATCH 3/5] fix review comments from @lukasa --- .../HTTPConnectionPool+HTTP2Connections.swift | 43 ++++++++----------- 1 file changed, 19 insertions(+), 24 deletions(-) diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift index 28d3792ad..3180133ff 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift @@ -19,10 +19,10 @@ extension HTTPConnectionPool { private enum State { /// the pool is establishing a connection. Valid transitions are to: .backingOff, .active and .closed case starting - /// the connection is waiting to retry to establish a connection. Transition to .closed. From .closed - /// a new connection state must be created for a retry. + /// the connection is waiting to retry to establish a connection. Valid transitions are to .closed. + /// From .closed a new connection state must be created for a retry. case backingOff - /// the connection is active and is able to run requests. Valid transitions to: .draining and .closed + /// the connection is active and is able to run requests. Valid transitions are to: .draining and .closed case active(Connection, maxStreams: Int, usedStreams: Int, lastIdle: NIODeadline) /// the connection is active and is running requests. No new requests must be scheduled. /// Valid transitions to: .draining and .closed @@ -31,15 +31,6 @@ extension HTTPConnectionPool { case closed } - var isActive: Bool { - switch self.state { - case .starting, .backingOff, .draining, .closed: - return false - case .active: - return true - } - } - var isStartingOrBackingOff: Bool { switch self.state { case .starting, .backingOff: @@ -182,17 +173,17 @@ extension HTTPConnectionPool { preconditionFailure("Invalid state: \(self.state)") case .active(let conn, let maxStreams, var usedStreams, var lastIdle): - usedStreams -= 1 - assert(usedStreams >= 0) + precondition(usedStreams > 0, "we cannot release more streams than we have leased") + usedStreams &-= 1 if usedStreams == 0 { lastIdle = .now() } self.state = .active(conn, maxStreams: maxStreams, usedStreams: usedStreams, lastIdle: lastIdle) - return max(maxStreams - usedStreams, 0) + return max(maxStreams &- usedStreams, 0) case .draining(let conn, let maxStreams, var usedStreams): - usedStreams -= 1 - assert(usedStreams >= 0) + precondition(usedStreams > 0, "we cannot release more streams than we have leased") + usedStreams &-= 1 self.state = .draining(conn, maxStreams: maxStreams, usedStreams: usedStreams) return 0 } @@ -238,7 +229,8 @@ extension HTTPConnectionPool { return .removeConnection case .active(let connection, _, let usedStreams, _): - if usedStreams <= 0 { + precondition(usedStreams >= 0) + if usedStreams == 0 { context.close.append(connection) return .removeConnection } else { @@ -256,9 +248,6 @@ extension HTTPConnectionPool { } func addStats(into stats: inout HTTP2Connections.Stats) { - if self.isIdle { - stats.idleConnections &+= 1 - } switch self.state { case .starting: stats.startingConnections &+= 1 @@ -270,11 +259,17 @@ extension HTTPConnectionPool { stats.availableStreams += max(maxStreams - usedStreams, 0) stats.leasedStreams += usedStreams stats.availableConnections &+= 1 - + precondition(usedStreams >= 0) + if usedStreams == 0 { + stats.idleConnections &+= 1 + } case .draining(_, _, let usedStreams): stats.drainingConnections &+= 1 stats.leasedStreams += usedStreams - + precondition(usedStreams >= 0) + if usedStreams == 0 { + stats.idleConnections &+= 1 + } case .closed: break } @@ -359,7 +354,7 @@ extension HTTPConnectionPool { return connection.connectionID } - /// A new HTTP/2.0 connection was established. + /// A new HTTP/2 connection was established. /// /// This will put the connection into the idle state. /// From 237494ed3e06fb95d7a37e7e1b1ca84040f6126c Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Mon, 27 Sep 2021 20:10:07 +0200 Subject: [PATCH 4/5] use `index(_:offsetBy:)` to convert from offset to index --- .../HTTPConnectionPool+HTTP2Connections.swift | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift index 3180133ff..6d52bcbbb 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift @@ -432,18 +432,18 @@ extension HTTPConnectionPool { return self.leaseStreams(at: index, count: 1) } - /// tries to find an available connection on the prefered `eventLoop`. If it can't find one with the given `eventLoop`, it returns the first available connection + /// tries to find an available connection on the preferred `eventLoop`. If it can't find one with the given `eventLoop`, it returns the first available connection private func findAvailableConnection(onPreferred eventLoop: EventLoop) -> Int? { - var availableConnection: Int? - for (index, connection) in self.connections.enumerated() { + var availableConnectionIndex: Int? + for (offset, connection) in self.connections.enumerated() { guard connection.isAvailable else { continue } if connection.eventLoop === eventLoop { - return index - } else if availableConnection == nil { - availableConnection = index + return self.connections.index(self.connections.startIndex, offsetBy: offset) + } else if availableConnectionIndex == nil { + availableConnectionIndex = self.connections.index(self.connections.startIndex, offsetBy: offset) } } - return availableConnection + return availableConnectionIndex } mutating func leaseStream(onRequired eventLoop: EventLoop) -> Connection? { From 88f3606f97383dfb8a4b89e3073ed0084ab3988b Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Tue, 28 Sep 2021 10:09:01 +0200 Subject: [PATCH 5/5] draining connections do not count as idle connections --- .../State Machine/HTTPConnectionPool+HTTP2Connections.swift | 3 --- 1 file changed, 3 deletions(-) diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift index 6d52bcbbb..8a22c543e 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift @@ -267,9 +267,6 @@ extension HTTPConnectionPool { stats.drainingConnections &+= 1 stats.leasedStreams += usedStreams precondition(usedStreams >= 0) - if usedStreams == 0 { - stats.idleConnections &+= 1 - } case .closed: break }