diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2Connection.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2Connection.swift index 0cad92cfe..2c3c3cc0a 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2Connection.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2Connection.swift @@ -81,6 +81,7 @@ final class HTTP2Connection { private var openStreams = Set() let id: HTTPConnectionPool.Connection.ID let decompression: HTTPClient.Decompression + let maximumConnectionUses: Int? var closeFuture: EventLoopFuture { self.channel.closeFuture @@ -89,11 +90,13 @@ final class HTTP2Connection { init(channel: Channel, connectionID: HTTPConnectionPool.Connection.ID, decompression: HTTPClient.Decompression, + maximumConnectionUses: Int?, delegate: HTTP2ConnectionDelegate, logger: Logger) { self.channel = channel self.id = connectionID self.decompression = decompression + self.maximumConnectionUses = maximumConnectionUses self.logger = logger self.multiplexer = HTTP2StreamMultiplexer( mode: .client, @@ -120,12 +123,14 @@ final class HTTP2Connection { connectionID: HTTPConnectionPool.Connection.ID, delegate: HTTP2ConnectionDelegate, decompression: HTTPClient.Decompression, + maximumConnectionUses: Int?, logger: Logger ) -> EventLoopFuture<(HTTP2Connection, Int)> { let connection = HTTP2Connection( channel: channel, connectionID: connectionID, decompression: decompression, + maximumConnectionUses: maximumConnectionUses, delegate: delegate, logger: logger ) @@ -192,7 +197,7 @@ final class HTTP2Connection { let sync = self.channel.pipeline.syncOperations let http2Handler = NIOHTTP2Handler(mode: .client, initialSettings: nioDefaultSettings) - let idleHandler = HTTP2IdleHandler(delegate: self, logger: self.logger) + let idleHandler = HTTP2IdleHandler(delegate: self, logger: self.logger, maximumConnectionUses: self.maximumConnectionUses) try sync.addHandler(http2Handler, position: .last) try sync.addHandler(idleHandler, position: .last) diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2IdleHandler.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2IdleHandler.swift index c522b2425..06458cb7e 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2IdleHandler.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2IdleHandler.swift @@ -35,9 +35,10 @@ final class HTTP2IdleHandler: ChannelDuplexH let logger: Logger let delegate: Delegate - private var state: StateMachine = .init() + private var state: StateMachine - init(delegate: Delegate, logger: Logger) { + init(delegate: Delegate, logger: Logger, maximumConnectionUses: Int? = nil) { + self.state = StateMachine(maximumUses: maximumConnectionUses) self.delegate = delegate self.logger = logger } @@ -140,19 +141,23 @@ extension HTTP2IdleHandler { } enum State { - case initialized - case connected - case active(openStreams: Int, maxStreams: Int) + case initialized(maximumUses: Int?) + case connected(remainingUses: Int?) + case active(openStreams: Int, maxStreams: Int, remainingUses: Int?) case closing(openStreams: Int, maxStreams: Int) case closed } - var state: State = .initialized + var state: State + + init(maximumUses: Int?) { + self.state = .initialized(maximumUses: maximumUses) + } mutating func channelActive() { switch self.state { - case .initialized: - self.state = .connected + case .initialized(let maximumUses): + self.state = .connected(remainingUses: maximumUses) case .connected, .active, .closing, .closed: break @@ -171,17 +176,17 @@ extension HTTP2IdleHandler { case .initialized: preconditionFailure("Invalid state: \(self.state)") - case .connected: + case .connected(let remainingUses): // a settings frame might have multiple entries for `maxConcurrentStreams`. We are // only interested in the last value! If no `maxConcurrentStreams` is set, we assume // the http/2 default of 100. let maxStreams = settings.last(where: { $0.parameter == .maxConcurrentStreams })?.value ?? 100 - self.state = .active(openStreams: 0, maxStreams: maxStreams) + self.state = .active(openStreams: 0, maxStreams: maxStreams, remainingUses: remainingUses) return .notifyConnectionNewMaxStreamsSettings(maxStreams) - case .active(openStreams: let openStreams, maxStreams: let maxStreams): + case .active(openStreams: let openStreams, maxStreams: let maxStreams, remainingUses: let remainingUses): if let newMaxStreams = settings.last(where: { $0.parameter == .maxConcurrentStreams })?.value, newMaxStreams != maxStreams { - self.state = .active(openStreams: openStreams, maxStreams: newMaxStreams) + self.state = .active(openStreams: openStreams, maxStreams: newMaxStreams, remainingUses: remainingUses) return .notifyConnectionNewMaxStreamsSettings(newMaxStreams) } return .nothing @@ -205,7 +210,7 @@ extension HTTP2IdleHandler { self.state = .closing(openStreams: 0, maxStreams: 0) return .notifyConnectionGoAwayReceived(close: true) - case .active(let openStreams, let maxStreams): + case .active(let openStreams, let maxStreams, _): self.state = .closing(openStreams: openStreams, maxStreams: maxStreams) return .notifyConnectionGoAwayReceived(close: openStreams == 0) @@ -228,7 +233,7 @@ extension HTTP2IdleHandler { self.state = .closing(openStreams: 0, maxStreams: 0) return .close - case .active(let openStreams, let maxStreams): + case .active(let openStreams, let maxStreams, _): if openStreams == 0 { self.state = .closed return .close @@ -247,10 +252,19 @@ extension HTTP2IdleHandler { case .initialized, .connected: preconditionFailure("Invalid state: \(self.state)") - case .active(var openStreams, let maxStreams): + case .active(var openStreams, let maxStreams, let remainingUses): openStreams += 1 - self.state = .active(openStreams: openStreams, maxStreams: maxStreams) - return .nothing + let remainingUses = remainingUses.map { $0 - 1 } + self.state = .active(openStreams: openStreams, maxStreams: maxStreams, remainingUses: remainingUses) + + if remainingUses == 0 { + // Treat running out of connection uses as if we received a GOAWAY frame. This + // will notify the delegate (i.e. connection pool) that the connection can no + // longer be used. + return self.goAwayReceived() + } else { + return .nothing + } case .closing(var openStreams, let maxStreams): // A stream might be opened, while we are closing because of race conditions. For @@ -271,10 +285,10 @@ extension HTTP2IdleHandler { case .initialized, .connected: preconditionFailure("Invalid state: \(self.state)") - case .active(var openStreams, let maxStreams): + case .active(var openStreams, let maxStreams, let remainingUses): openStreams -= 1 assert(openStreams >= 0) - self.state = .active(openStreams: openStreams, maxStreams: maxStreams) + self.state = .active(openStreams: openStreams, maxStreams: maxStreams, remainingUses: remainingUses) return .notifyConnectionStreamClosed(currentlyAvailable: maxStreams - openStreams) case .closing(var openStreams, let maxStreams): diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool+Factory.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool+Factory.swift index 48aedfd8e..1461a6620 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool+Factory.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool+Factory.swift @@ -84,6 +84,7 @@ extension HTTPConnectionPool.ConnectionFactory { connectionID: connectionID, delegate: http2ConnectionDelegate, decompression: self.clientConfiguration.decompression, + maximumConnectionUses: self.clientConfiguration.maximumUsesPerConnection, logger: logger ).whenComplete { result in switch result { diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift index 593802a58..eac4cc21f 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift @@ -71,7 +71,8 @@ final class HTTPConnectionPool { self._state = StateMachine( idGenerator: idGenerator, maximumConcurrentHTTP1Connections: clientConfiguration.connectionPool.concurrentHTTP1ConnectionsPerHostSoftLimit, - retryConnectionEstablishment: clientConfiguration.connectionPool.retryConnectionEstablishment + retryConnectionEstablishment: clientConfiguration.connectionPool.retryConnectionEstablishment, + maximumConnectionUses: clientConfiguration.maximumUsesPerConnection ) } diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1Connections.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1Connections.swift index cdbf02394..935cdb2f6 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1Connections.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1Connections.swift @@ -19,15 +19,15 @@ extension HTTPConnectionPool { private struct HTTP1ConnectionState { enum State { /// the connection is creating a connection. Valid transitions are to: .backingOff, .idle, and .closed - case starting + case starting(maximumUses: Int?) /// the connection is waiting to retry the establishing a connection. Valid transitions are to: .closed. /// This means, the connection can be removed from the connections without cancelling external /// state. The connection state can then be replaced by a new one. case backingOff /// the connection is idle for a new request. Valid transitions to: .leased and .closed - case idle(Connection, since: NIODeadline) + case idle(Connection, since: NIODeadline, remainingUses: Int?) /// the connection is leased and running for a request. Valid transitions to: .idle and .closed - case leased(Connection) + case leased(Connection, remainingUses: Int?) /// the connection is closed. final state. case closed } @@ -36,10 +36,10 @@ extension HTTPConnectionPool { let connectionID: Connection.ID let eventLoop: EventLoop - init(connectionID: Connection.ID, eventLoop: EventLoop) { + init(connectionID: Connection.ID, eventLoop: EventLoop, maximumUses: Int?) { self.connectionID = connectionID self.eventLoop = eventLoop - self.state = .starting + self.state = .starting(maximumUses: maximumUses) } var isConnecting: Bool { @@ -69,6 +69,19 @@ extension HTTPConnectionPool { } } + var idleAndNoRemainingUses: Bool { + switch self.state { + case .idle(_, since: _, remainingUses: let remainingUses): + if let remainingUses = remainingUses { + return remainingUses <= 0 + } else { + return false + } + case .backingOff, .starting, .leased, .closed: + return false + } + } + var canOrWillBeAbleToExecuteRequests: Bool { switch self.state { case .leased, .backingOff, .idle, .starting: @@ -89,7 +102,7 @@ extension HTTPConnectionPool { var idleSince: NIODeadline? { switch self.state { - case .idle(_, since: let idleSince): + case .idle(_, since: let idleSince, _): return idleSince case .backingOff, .starting, .leased, .closed: return nil @@ -107,8 +120,8 @@ extension HTTPConnectionPool { mutating func connected(_ connection: Connection) { switch self.state { - case .starting: - self.state = .idle(connection, since: .now()) + case .starting(maximumUses: let maxUses): + self.state = .idle(connection, since: .now(), remainingUses: maxUses) case .backingOff, .idle, .leased, .closed: preconditionFailure("Invalid state: \(self.state)") } @@ -126,8 +139,8 @@ extension HTTPConnectionPool { mutating func lease() -> Connection { switch self.state { - case .idle(let connection, since: _): - self.state = .leased(connection) + case .idle(let connection, since: _, remainingUses: let remainingUses): + self.state = .leased(connection, remainingUses: remainingUses.map { $0 - 1 }) return connection case .backingOff, .starting, .leased, .closed: preconditionFailure("Invalid state: \(self.state)") @@ -136,8 +149,8 @@ extension HTTPConnectionPool { mutating func release() { switch self.state { - case .leased(let connection): - self.state = .idle(connection, since: .now()) + case .leased(let connection, let remainingUses): + self.state = .idle(connection, since: .now(), remainingUses: remainingUses) case .backingOff, .starting, .idle, .closed: preconditionFailure("Invalid state: \(self.state)") } @@ -145,7 +158,7 @@ extension HTTPConnectionPool { mutating func close() -> Connection { switch self.state { - case .idle(let connection, since: _): + case .idle(let connection, since: _, remainingUses: _): self.state = .closed return connection case .backingOff, .starting, .leased, .closed: @@ -188,10 +201,10 @@ extension HTTPConnectionPool { return .removeConnection case .starting: return .keepConnection - case .idle(let connection, since: _): + case .idle(let connection, since: _, remainingUses: _): context.close.append(connection) return .removeConnection - case .leased(let connection): + case .leased(let connection, remainingUses: _): context.cancel.append(connection) return .keepConnection case .closed: @@ -212,7 +225,7 @@ extension HTTPConnectionPool { case .backingOff: context.backingOff.append((self.connectionID, self.eventLoop)) return .removeConnection - case .idle(let connection, since: _): + case .idle(let connection, since: _, remainingUses: _): // Idle connections can be removed right away context.close.append(connection) return .removeConnection @@ -243,13 +256,16 @@ extension HTTPConnectionPool { /// The index after which you will find the connections for requests with `EventLoop` /// requirements in `connections`. private var overflowIndex: Array.Index + /// The number of times each connection can be used before it is closed and replaced. + private let maximumConnectionUses: Int? - init(maximumConcurrentConnections: Int, generator: Connection.ID.Generator) { + init(maximumConcurrentConnections: Int, generator: Connection.ID.Generator, maximumConnectionUses: Int?) { self.connections = [] self.connections.reserveCapacity(maximumConcurrentConnections) self.overflowIndex = self.connections.endIndex self.maximumConcurrentConnections = maximumConcurrentConnections self.generator = generator + self.maximumConnectionUses = maximumConnectionUses } var stats: Stats { @@ -323,6 +339,8 @@ extension HTTPConnectionPool { /// The connection's use. Either general purpose or for requests with `EventLoop` /// requirements. var use: ConnectionUse + /// Whether the connection should be closed. + var shouldBeClosed: Bool } /// Information around the failed/closed connection. @@ -345,14 +363,22 @@ extension HTTPConnectionPool { mutating func createNewConnection(on eventLoop: EventLoop) -> Connection.ID { precondition(self.canGrow) - let connection = HTTP1ConnectionState(connectionID: self.generator.next(), eventLoop: eventLoop) + let connection = HTTP1ConnectionState( + connectionID: self.generator.next(), + eventLoop: eventLoop, + maximumUses: self.maximumConnectionUses + ) self.connections.insert(connection, at: self.overflowIndex) self.overflowIndex = self.connections.index(after: self.overflowIndex) return connection.connectionID } mutating func createNewOverflowConnection(on eventLoop: EventLoop) -> Connection.ID { - let connection = HTTP1ConnectionState(connectionID: self.generator.next(), eventLoop: eventLoop) + let connection = HTTP1ConnectionState( + connectionID: self.generator.next(), + eventLoop: eventLoop, + maximumUses: self.maximumConnectionUses + ) self.connections.append(connection) return connection.connectionID } @@ -484,7 +510,8 @@ extension HTTPConnectionPool { precondition(self.connections[index].isClosed) let newConnection = HTTP1ConnectionState( connectionID: self.generator.next(), - eventLoop: self.connections[index].eventLoop + eventLoop: self.connections[index].eventLoop, + maximumUses: self.maximumConnectionUses ) self.connections[index] = newConnection @@ -562,7 +589,11 @@ extension HTTPConnectionPool { backingOff: [(Connection.ID, EventLoop)] ) { for (connectionID, eventLoop) in starting { - let newConnection = HTTP1ConnectionState(connectionID: connectionID, eventLoop: eventLoop) + let newConnection = HTTP1ConnectionState( + connectionID: connectionID, + eventLoop: eventLoop, + maximumUses: self.maximumConnectionUses + ) self.connections.insert(newConnection, at: self.overflowIndex) /// If we can grow, we mark the connection as a general purpose connection. /// Otherwise, it will be an overflow connection which is only used once for requests with a required event loop @@ -572,7 +603,11 @@ extension HTTPConnectionPool { } for (connectionID, eventLoop) in backingOff { - var backingOffConnection = HTTP1ConnectionState(connectionID: connectionID, eventLoop: eventLoop) + var backingOffConnection = HTTP1ConnectionState( + connectionID: connectionID, + eventLoop: eventLoop, + maximumUses: self.maximumConnectionUses + ) // TODO: Maybe we want to add a static init for backing off connections to HTTP1ConnectionState backingOffConnection.failedToConnect() self.connections.insert(backingOffConnection, at: self.overflowIndex) @@ -690,7 +725,8 @@ extension HTTPConnectionPool { } else { use = .eventLoop(eventLoop) } - return IdleConnectionContext(eventLoop: eventLoop, use: use) + let hasNoRemainingUses = self.connections[index].idleAndNoRemainingUses + return IdleConnectionContext(eventLoop: eventLoop, use: use, shouldBeClosed: hasNoRemainingUses) } private func findIdleConnection(onPreferred preferredEL: EventLoop) -> Int? { diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1StateMachine.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1StateMachine.swift index 669e43f13..2629b0ea2 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1StateMachine.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP1StateMachine.swift @@ -38,11 +38,13 @@ extension HTTPConnectionPool { idGenerator: Connection.ID.Generator, maximumConcurrentConnections: Int, retryConnectionEstablishment: Bool, + maximumConnectionUses: Int?, lifecycleState: StateMachine.LifecycleState ) { self.connections = HTTP1Connections( maximumConcurrentConnections: maximumConcurrentConnections, - generator: idGenerator + generator: idGenerator, + maximumConnectionUses: maximumConnectionUses ) self.retryConnectionEstablishment = retryConnectionEstablishment @@ -397,11 +399,20 @@ extension HTTPConnectionPool { ) -> EstablishedAction { switch self.lifecycleState { case .running: - switch context.use { - case .generalPurpose: - return self.nextActionForIdleGeneralPurposeConnection(at: index, context: context) - case .eventLoop: - return self.nextActionForIdleEventLoopConnection(at: index, context: context) + // Close the connection if it's expired. + if context.shouldBeClosed { + let connection = self.connections.closeConnection(at: index) + return .init( + request: .none, + connection: .closeConnection(connection, isShutdown: .no) + ) + } else { + switch context.use { + case .generalPurpose: + return self.nextActionForIdleGeneralPurposeConnection(at: index, context: context) + case .eventLoop: + return self.nextActionForIdleEventLoopConnection(at: index, context: context) + } } case .shuttingDown(let unclean): assert(self.requests.isEmpty) diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift index 7aa504d03..01d68b8e4 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2Connections.swift @@ -18,12 +18,12 @@ extension HTTPConnectionPool { private struct HTTP2ConnectionState { private enum State { /// the pool is establishing a connection. Valid transitions are to: .backingOff, .active and .closed - case starting + case starting(maximumUses: Int?) /// the connection is waiting to retry to establish a connection. Valid transitions are to .closed. /// From .closed a new connection state must be created for a retry. case backingOff /// the connection is active and is able to run requests. Valid transitions are to: .draining and .closed - case active(Connection, maxStreams: Int, usedStreams: Int, lastIdle: NIODeadline) + case active(Connection, maxStreams: Int, usedStreams: Int, lastIdle: NIODeadline, remainingUses: Int?) /// the connection is active and is running requests. No new requests must be scheduled. /// Valid transitions to: .draining and .closed case draining(Connection, maxStreams: Int, usedStreams: Int) @@ -71,8 +71,12 @@ extension HTTPConnectionPool { /// A request can be scheduled on the connection var isAvailable: Bool { switch self.state { - case .active(_, let maxStreams, let usedStreams, _): - return usedStreams < maxStreams + case .active(_, let maxStreams, let usedStreams, _, let remainingUses): + if let remainingUses = remainingUses { + return usedStreams < maxStreams && remainingUses > 0 + } else { + return usedStreams < maxStreams + } case .starting, .backingOff, .draining, .closed: return false } @@ -82,7 +86,7 @@ extension HTTPConnectionPool { /// Every idle connection is available, but not every available connection is idle. var isIdle: Bool { switch self.state { - case .active(_, _, let usedStreams, _): + case .active(_, _, let usedStreams, _, _): return usedStreams == 0 case .starting, .backingOff, .draining, .closed: return false @@ -112,9 +116,13 @@ extension HTTPConnectionPool { case .active, .draining, .backingOff, .closed: preconditionFailure("Invalid state: \(self.state)") - case .starting: - self.state = .active(conn, maxStreams: maxStreams, usedStreams: 0, lastIdle: .now()) - return maxStreams + case .starting(let maxUses): + self.state = .active(conn, maxStreams: maxStreams, usedStreams: 0, lastIdle: .now(), remainingUses: maxUses) + if let maxUses = maxUses { + return min(maxStreams, maxUses) + } else { + return maxStreams + } } } @@ -127,9 +135,14 @@ extension HTTPConnectionPool { case .starting, .backingOff, .closed: preconditionFailure("Invalid state for updating max concurrent streams: \(self.state)") - case .active(let conn, _, let usedStreams, let lastIdle): - self.state = .active(conn, maxStreams: maxStreams, usedStreams: usedStreams, lastIdle: lastIdle) - return max(maxStreams - usedStreams, 0) + case .active(let conn, _, let usedStreams, let lastIdle, let remainingUses): + self.state = .active(conn, maxStreams: maxStreams, usedStreams: usedStreams, lastIdle: lastIdle, remainingUses: remainingUses) + let availableStreams = max(maxStreams - usedStreams, 0) + if let remainingUses = remainingUses { + return min(remainingUses, availableStreams) + } else { + return availableStreams + } case .draining(let conn, _, let usedStreams): self.state = .draining(conn, maxStreams: maxStreams, usedStreams: usedStreams) @@ -142,7 +155,7 @@ extension HTTPConnectionPool { case .starting, .backingOff, .closed: preconditionFailure("Invalid state for draining a connection: \(self.state)") - case .active(let conn, let maxStreams, let usedStreams, _): + case .active(let conn, let maxStreams, let usedStreams, _, _): self.state = .draining(conn, maxStreams: maxStreams, usedStreams: usedStreams) return conn.eventLoop @@ -176,10 +189,11 @@ extension HTTPConnectionPool { case .starting, .backingOff, .draining, .closed: preconditionFailure("Invalid state for leasing a stream: \(self.state)") - case .active(let conn, let maxStreams, var usedStreams, let lastIdle): + case .active(let conn, let maxStreams, var usedStreams, let lastIdle, let remainingUses): usedStreams += count precondition(usedStreams <= maxStreams, "tried to lease a connection which is not available") - self.state = .active(conn, maxStreams: maxStreams, usedStreams: usedStreams, lastIdle: lastIdle) + precondition(remainingUses.map { $0 >= count } ?? true, "tried to lease streams from a connection which does not have enough remaining streams") + self.state = .active(conn, maxStreams: maxStreams, usedStreams: usedStreams, lastIdle: lastIdle, remainingUses: remainingUses.map { $0 - count }) return conn } } @@ -191,14 +205,20 @@ extension HTTPConnectionPool { case .starting, .backingOff, .closed: preconditionFailure("Invalid state: \(self.state)") - case .active(let conn, let maxStreams, var usedStreams, var lastIdle): + case .active(let conn, let maxStreams, var usedStreams, var lastIdle, let remainingUses): precondition(usedStreams > 0, "we cannot release more streams than we have leased") usedStreams &-= 1 if usedStreams == 0 { lastIdle = .now() } - self.state = .active(conn, maxStreams: maxStreams, usedStreams: usedStreams, lastIdle: lastIdle) - return max(maxStreams &- usedStreams, 0) + + self.state = .active(conn, maxStreams: maxStreams, usedStreams: usedStreams, lastIdle: lastIdle, remainingUses: remainingUses) + let availableStreams = max(maxStreams &- usedStreams, 0) + if let remainingUses = remainingUses { + return min(availableStreams, remainingUses) + } else { + return availableStreams + } case .draining(let conn, let maxStreams, var usedStreams): precondition(usedStreams > 0, "we cannot release more streams than we have leased") @@ -210,7 +230,7 @@ extension HTTPConnectionPool { mutating func close() -> Connection { switch self.state { - case .active(let conn, _, 0, _): + case .active(let conn, _, 0, _, _): self.state = .closed return conn @@ -247,7 +267,7 @@ extension HTTPConnectionPool { context.connectBackoff.append(self.connectionID) return .removeConnection - case .active(let connection, _, let usedStreams, _): + case .active(let connection, _, let usedStreams, _, _): precondition(usedStreams >= 0) if usedStreams == 0 { context.close.append(connection) @@ -274,7 +294,7 @@ extension HTTPConnectionPool { case .backingOff: stats.backingOffConnections &+= 1 - case .active(_, let maxStreams, let usedStreams, _): + case .active(_, let maxStreams, let usedStreams, _, _): stats.availableStreams += max(maxStreams - usedStreams, 0) stats.leasedStreams += usedStreams stats.availableConnections &+= 1 @@ -304,7 +324,7 @@ extension HTTPConnectionPool { context.starting.append((self.connectionID, self.eventLoop)) return .removeConnection - case .active(let connection, _, let usedStreams, _): + case .active(let connection, _, let usedStreams, _, _): precondition(usedStreams >= 0) if usedStreams == 0 { context.close.append(connection) @@ -325,10 +345,10 @@ extension HTTPConnectionPool { } } - init(connectionID: Connection.ID, eventLoop: EventLoop) { + init(connectionID: Connection.ID, eventLoop: EventLoop, maximumUses: Int?) { self.connectionID = connectionID self.eventLoop = eventLoop - self.state = .starting + self.state = .starting(maximumUses: maximumUses) } } @@ -337,6 +357,8 @@ extension HTTPConnectionPool { private let generator: Connection.ID.Generator /// The connections states private var connections: [HTTP2ConnectionState] + /// The number of times each connection can be used before it is closed and replaced. + private let maximumConnectionUses: Int? var isEmpty: Bool { self.connections.isEmpty @@ -348,9 +370,10 @@ extension HTTPConnectionPool { } } - init(generator: Connection.ID.Generator) { + init(generator: Connection.ID.Generator, maximumConnectionUses: Int?) { self.generator = generator self.connections = [] + self.maximumConnectionUses = maximumConnectionUses } // MARK: Migration @@ -365,12 +388,16 @@ extension HTTPConnectionPool { backingOff: [(Connection.ID, EventLoop)] ) { for (connectionID, eventLoop) in starting { - let newConnection = HTTP2ConnectionState(connectionID: connectionID, eventLoop: eventLoop) + let newConnection = HTTP2ConnectionState(connectionID: connectionID, + eventLoop: eventLoop, + maximumUses: self.maximumConnectionUses) self.connections.append(newConnection) } for (connectionID, eventLoop) in backingOff { - var backingOffConnection = HTTP2ConnectionState(connectionID: connectionID, eventLoop: eventLoop) + var backingOffConnection = HTTP2ConnectionState(connectionID: connectionID, + eventLoop: eventLoop, + maximumUses: self.maximumConnectionUses) // TODO: Maybe we want to add a static init for backing off connections to HTTP2ConnectionState backingOffConnection.failedToConnect() self.connections.append(backingOffConnection) @@ -476,7 +503,9 @@ extension HTTPConnectionPool { "we should not create more than one connection per event loop" ) - let connection = HTTP2ConnectionState(connectionID: self.generator.next(), eventLoop: eventLoop) + let connection = HTTP2ConnectionState(connectionID: self.generator.next(), + eventLoop: eventLoop, + maximumUses: self.maximumConnectionUses) self.connections.append(connection) return connection.connectionID } @@ -661,7 +690,8 @@ extension HTTPConnectionPool { precondition(self.connections[index].isClosed) let newConnection = HTTP2ConnectionState( connectionID: self.generator.next(), - eventLoop: self.connections[index].eventLoop + eventLoop: self.connections[index].eventLoop, + maximumUses: self.maximumConnectionUses ) self.connections[index] = newConnection diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift index 9964ccd05..83a7647f4 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+HTTP2StateMachine.swift @@ -41,12 +41,14 @@ extension HTTPConnectionPool { init( idGenerator: Connection.ID.Generator, retryConnectionEstablishment: Bool, - lifecycleState: StateMachine.LifecycleState + lifecycleState: StateMachine.LifecycleState, + maximumConnectionUses: Int? ) { self.idGenerator = idGenerator self.requests = RequestQueue() - self.connections = HTTP2Connections(generator: idGenerator) + self.connections = HTTP2Connections(generator: idGenerator, + maximumConnectionUses: maximumConnectionUses) self.lifecycleState = lifecycleState self.retryConnectionEstablishment = retryConnectionEstablishment } diff --git a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+StateMachine.swift b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+StateMachine.swift index 0460849cc..a61471a69 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+StateMachine.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/State Machine/HTTPConnectionPool+StateMachine.swift @@ -99,19 +99,23 @@ extension HTTPConnectionPool { /// The property was introduced to fail fast during testing. /// Otherwise this should always be true and not turned off. private let retryConnectionEstablishment: Bool + let maximumConnectionUses: Int? init( idGenerator: Connection.ID.Generator, maximumConcurrentHTTP1Connections: Int, - retryConnectionEstablishment: Bool + retryConnectionEstablishment: Bool, + maximumConnectionUses: Int? ) { self.maximumConcurrentHTTP1Connections = maximumConcurrentHTTP1Connections self.retryConnectionEstablishment = retryConnectionEstablishment self.idGenerator = idGenerator + self.maximumConnectionUses = maximumConnectionUses let http1State = HTTP1StateMachine( idGenerator: idGenerator, maximumConcurrentConnections: maximumConcurrentHTTP1Connections, retryConnectionEstablishment: retryConnectionEstablishment, + maximumConnectionUses: maximumConnectionUses, lifecycleState: .running ) self.state = .http1(http1State) @@ -137,6 +141,7 @@ extension HTTPConnectionPool { idGenerator: self.idGenerator, maximumConcurrentConnections: self.maximumConcurrentHTTP1Connections, retryConnectionEstablishment: self.retryConnectionEstablishment, + maximumConnectionUses: self.maximumConnectionUses, lifecycleState: http2StateMachine.lifecycleState ) @@ -158,7 +163,8 @@ extension HTTPConnectionPool { var http2StateMachine = HTTP2StateMachine( idGenerator: self.idGenerator, retryConnectionEstablishment: self.retryConnectionEstablishment, - lifecycleState: http1StateMachine.lifecycleState + lifecycleState: http1StateMachine.lifecycleState, + maximumConnectionUses: self.maximumConnectionUses ) let migrationAction = http2StateMachine.migrateFromHTTP1( http1Connections: http1StateMachine.connections, diff --git a/Sources/AsyncHTTPClient/HTTPClient.swift b/Sources/AsyncHTTPClient/HTTPClient.swift index a116ea495..a916d3ade 100644 --- a/Sources/AsyncHTTPClient/HTTPClient.swift +++ b/Sources/AsyncHTTPClient/HTTPClient.swift @@ -758,6 +758,18 @@ public class HTTPClient { /// which is the recommended setting. Only set this to `false` when attempting to trigger a particular error path. public var networkFrameworkWaitForConnectivity: Bool + /// The maximum number of times each connection can be used before it is replaced with a new one. Use `nil` (the default) + /// if no limit should be applied to each connection. + /// + /// - Precondition: The value must be greater than zero. + public var maximumUsesPerConnection: Int? { + willSet { + if let newValue = newValue, newValue <= 0 { + fatalError("maximumUsesPerConnection must be greater than zero or nil") + } + } + } + public init( tlsConfiguration: TLSConfiguration? = nil, redirectConfiguration: RedirectConfiguration? = nil, diff --git a/Tests/AsyncHTTPClientTests/HTTP2ConnectionTests.swift b/Tests/AsyncHTTPClientTests/HTTP2ConnectionTests.swift index 951a64494..15e5cdff2 100644 --- a/Tests/AsyncHTTPClientTests/HTTP2ConnectionTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTP2ConnectionTests.swift @@ -38,6 +38,7 @@ class HTTP2ConnectionTests: XCTestCase { connectionID: 0, delegate: TestHTTP2ConnectionDelegate(), decompression: .disabled, + maximumConnectionUses: nil, logger: logger ).wait()) } @@ -51,6 +52,7 @@ class HTTP2ConnectionTests: XCTestCase { channel: embedded, connectionID: 0, decompression: .disabled, + maximumConnectionUses: nil, delegate: TestHTTP2ConnectionDelegate(), logger: logger ) diff --git a/Tests/AsyncHTTPClientTests/HTTP2IdleHandlerTests+XCTest.swift b/Tests/AsyncHTTPClientTests/HTTP2IdleHandlerTests+XCTest.swift index 1b9558105..a69530597 100644 --- a/Tests/AsyncHTTPClientTests/HTTP2IdleHandlerTests+XCTest.swift +++ b/Tests/AsyncHTTPClientTests/HTTP2IdleHandlerTests+XCTest.swift @@ -36,6 +36,7 @@ extension HTTP2IdleHandlerTests { ("testCloseEventWhileThereAreOpenStreams", testCloseEventWhileThereAreOpenStreams), ("testGoAwayWhileThereAreOpenStreams", testGoAwayWhileThereAreOpenStreams), ("testReceiveSettingsAndGoAwayAfterClientSideClose", testReceiveSettingsAndGoAwayAfterClientSideClose), + ("testConnectionUseLimitTriggersGoAway", testConnectionUseLimitTriggersGoAway), ] } } diff --git a/Tests/AsyncHTTPClientTests/HTTP2IdleHandlerTests.swift b/Tests/AsyncHTTPClientTests/HTTP2IdleHandlerTests.swift index 355969c6a..611e31457 100644 --- a/Tests/AsyncHTTPClientTests/HTTP2IdleHandlerTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTP2IdleHandlerTests.swift @@ -251,6 +251,40 @@ class HTTP2IdleHandlerTests: XCTestCase { XCTAssertNoThrow(try embedded.writeInbound(goAwayFrame)) XCTAssertEqual(delegate.goAwayReceived, false, "Expected go away to not be forwarded.") } + + func testConnectionUseLimitTriggersGoAway() { + let delegate = MockHTTP2IdleHandlerDelegate() + let idleHandler = HTTP2IdleHandler(delegate: delegate, logger: Logger(label: "test"), maximumConnectionUses: 5) + let embedded = EmbeddedChannel(handlers: [idleHandler]) + XCTAssertNoThrow(try embedded.connect(to: .makeAddressResolvingHost("localhost", port: 0)).wait()) + + let settingsFrame = HTTP2Frame(streamID: 0, payload: .settings(.settings([.init(parameter: .maxConcurrentStreams, value: 100)]))) + XCTAssertEqual(delegate.maxStreams, nil) + XCTAssertNoThrow(try embedded.writeInbound(settingsFrame)) + XCTAssertEqual(delegate.maxStreams, 100) + + for streamID in HTTP2StreamID(1)...Mode, maximumUses: Int, requests: Int) throws { + let bin = HTTPBin(mode) + defer { XCTAssertNoThrow(try bin.shutdown()) } + + var configuration = HTTPClient.Configuration(certificateVerification: .none) + // Limit each connection to two uses before discarding them. The test will verify that the + // connection number indicated by the server increments every two requests. + configuration.maximumUsesPerConnection = maximumUses + + let client = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup), configuration: configuration) + defer { XCTAssertNoThrow(try client.syncShutdown()) } + + let request = try HTTPClient.Request(url: bin.baseURL + "stats") + let decoder = JSONDecoder() + + // Do two requests per batch. Both should report the same connection number. + for requestNumber in stride(from: 0, to: requests, by: maximumUses) { + var responses = [RequestInfo]() + + for _ in 0..