Skip to content

Commit 6c5058e

Browse files
authored
Add a control to limit connection reuses (#678)
Motivation: Sometimes it can be helpful to limit the number of times a connection can be used before discarding it. AHC has no such support for this at the moment. Modifications: - Add a `maximumUsesPerConnection` configuration option which defaults to `nil` (i.e. no limit). - For HTTP1 we count down uses in the state machine and close the connection if it hits zero. - For HTTP2, each use maps to a stream so we count down remaining uses in the state machine which we combine with max concurrent streams to limit how many streams are available per connection. We also count remaining uses in the HTTP2 idle handler: we treat no remaining uses as receiving a GOAWAY frame and notify the pool which then drains the streams and replaces the connection. Result: Users can control how many times each connection can be used.
1 parent 343cdf4 commit 6c5058e

20 files changed

+394
-154
lines changed

Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2Connection.swift

+6-1
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ final class HTTP2Connection {
8181
private var openStreams = Set<ChannelBox>()
8282
let id: HTTPConnectionPool.Connection.ID
8383
let decompression: HTTPClient.Decompression
84+
let maximumConnectionUses: Int?
8485

8586
var closeFuture: EventLoopFuture<Void> {
8687
self.channel.closeFuture
@@ -89,11 +90,13 @@ final class HTTP2Connection {
8990
init(channel: Channel,
9091
connectionID: HTTPConnectionPool.Connection.ID,
9192
decompression: HTTPClient.Decompression,
93+
maximumConnectionUses: Int?,
9294
delegate: HTTP2ConnectionDelegate,
9395
logger: Logger) {
9496
self.channel = channel
9597
self.id = connectionID
9698
self.decompression = decompression
99+
self.maximumConnectionUses = maximumConnectionUses
97100
self.logger = logger
98101
self.multiplexer = HTTP2StreamMultiplexer(
99102
mode: .client,
@@ -120,12 +123,14 @@ final class HTTP2Connection {
120123
connectionID: HTTPConnectionPool.Connection.ID,
121124
delegate: HTTP2ConnectionDelegate,
122125
decompression: HTTPClient.Decompression,
126+
maximumConnectionUses: Int?,
123127
logger: Logger
124128
) -> EventLoopFuture<(HTTP2Connection, Int)> {
125129
let connection = HTTP2Connection(
126130
channel: channel,
127131
connectionID: connectionID,
128132
decompression: decompression,
133+
maximumConnectionUses: maximumConnectionUses,
129134
delegate: delegate,
130135
logger: logger
131136
)
@@ -192,7 +197,7 @@ final class HTTP2Connection {
192197
let sync = self.channel.pipeline.syncOperations
193198

194199
let http2Handler = NIOHTTP2Handler(mode: .client, initialSettings: nioDefaultSettings)
195-
let idleHandler = HTTP2IdleHandler(delegate: self, logger: self.logger)
200+
let idleHandler = HTTP2IdleHandler(delegate: self, logger: self.logger, maximumConnectionUses: self.maximumConnectionUses)
196201

197202
try sync.addHandler(http2Handler, position: .last)
198203
try sync.addHandler(idleHandler, position: .last)

Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2IdleHandler.swift

+33-19
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,10 @@ final class HTTP2IdleHandler<Delegate: HTTP2IdleHandlerDelegate>: ChannelDuplexH
3535
let logger: Logger
3636
let delegate: Delegate
3737

38-
private var state: StateMachine = .init()
38+
private var state: StateMachine
3939

40-
init(delegate: Delegate, logger: Logger) {
40+
init(delegate: Delegate, logger: Logger, maximumConnectionUses: Int? = nil) {
41+
self.state = StateMachine(maximumUses: maximumConnectionUses)
4142
self.delegate = delegate
4243
self.logger = logger
4344
}
@@ -140,19 +141,23 @@ extension HTTP2IdleHandler {
140141
}
141142

142143
enum State {
143-
case initialized
144-
case connected
145-
case active(openStreams: Int, maxStreams: Int)
144+
case initialized(maximumUses: Int?)
145+
case connected(remainingUses: Int?)
146+
case active(openStreams: Int, maxStreams: Int, remainingUses: Int?)
146147
case closing(openStreams: Int, maxStreams: Int)
147148
case closed
148149
}
149150

150-
var state: State = .initialized
151+
var state: State
152+
153+
init(maximumUses: Int?) {
154+
self.state = .initialized(maximumUses: maximumUses)
155+
}
151156

152157
mutating func channelActive() {
153158
switch self.state {
154-
case .initialized:
155-
self.state = .connected
159+
case .initialized(let maximumUses):
160+
self.state = .connected(remainingUses: maximumUses)
156161

157162
case .connected, .active, .closing, .closed:
158163
break
@@ -171,17 +176,17 @@ extension HTTP2IdleHandler {
171176
case .initialized:
172177
preconditionFailure("Invalid state: \(self.state)")
173178

174-
case .connected:
179+
case .connected(let remainingUses):
175180
// a settings frame might have multiple entries for `maxConcurrentStreams`. We are
176181
// only interested in the last value! If no `maxConcurrentStreams` is set, we assume
177182
// the http/2 default of 100.
178183
let maxStreams = settings.last(where: { $0.parameter == .maxConcurrentStreams })?.value ?? 100
179-
self.state = .active(openStreams: 0, maxStreams: maxStreams)
184+
self.state = .active(openStreams: 0, maxStreams: maxStreams, remainingUses: remainingUses)
180185
return .notifyConnectionNewMaxStreamsSettings(maxStreams)
181186

182-
case .active(openStreams: let openStreams, maxStreams: let maxStreams):
187+
case .active(openStreams: let openStreams, maxStreams: let maxStreams, remainingUses: let remainingUses):
183188
if let newMaxStreams = settings.last(where: { $0.parameter == .maxConcurrentStreams })?.value, newMaxStreams != maxStreams {
184-
self.state = .active(openStreams: openStreams, maxStreams: newMaxStreams)
189+
self.state = .active(openStreams: openStreams, maxStreams: newMaxStreams, remainingUses: remainingUses)
185190
return .notifyConnectionNewMaxStreamsSettings(newMaxStreams)
186191
}
187192
return .nothing
@@ -205,7 +210,7 @@ extension HTTP2IdleHandler {
205210
self.state = .closing(openStreams: 0, maxStreams: 0)
206211
return .notifyConnectionGoAwayReceived(close: true)
207212

208-
case .active(let openStreams, let maxStreams):
213+
case .active(let openStreams, let maxStreams, _):
209214
self.state = .closing(openStreams: openStreams, maxStreams: maxStreams)
210215
return .notifyConnectionGoAwayReceived(close: openStreams == 0)
211216

@@ -228,7 +233,7 @@ extension HTTP2IdleHandler {
228233
self.state = .closing(openStreams: 0, maxStreams: 0)
229234
return .close
230235

231-
case .active(let openStreams, let maxStreams):
236+
case .active(let openStreams, let maxStreams, _):
232237
if openStreams == 0 {
233238
self.state = .closed
234239
return .close
@@ -247,10 +252,19 @@ extension HTTP2IdleHandler {
247252
case .initialized, .connected:
248253
preconditionFailure("Invalid state: \(self.state)")
249254

250-
case .active(var openStreams, let maxStreams):
255+
case .active(var openStreams, let maxStreams, let remainingUses):
251256
openStreams += 1
252-
self.state = .active(openStreams: openStreams, maxStreams: maxStreams)
253-
return .nothing
257+
let remainingUses = remainingUses.map { $0 - 1 }
258+
self.state = .active(openStreams: openStreams, maxStreams: maxStreams, remainingUses: remainingUses)
259+
260+
if remainingUses == 0 {
261+
// Treat running out of connection uses as if we received a GOAWAY frame. This
262+
// will notify the delegate (i.e. connection pool) that the connection can no
263+
// longer be used.
264+
return self.goAwayReceived()
265+
} else {
266+
return .nothing
267+
}
254268

255269
case .closing(var openStreams, let maxStreams):
256270
// A stream might be opened, while we are closing because of race conditions. For
@@ -271,10 +285,10 @@ extension HTTP2IdleHandler {
271285
case .initialized, .connected:
272286
preconditionFailure("Invalid state: \(self.state)")
273287

274-
case .active(var openStreams, let maxStreams):
288+
case .active(var openStreams, let maxStreams, let remainingUses):
275289
openStreams -= 1
276290
assert(openStreams >= 0)
277-
self.state = .active(openStreams: openStreams, maxStreams: maxStreams)
291+
self.state = .active(openStreams: openStreams, maxStreams: maxStreams, remainingUses: remainingUses)
278292
return .notifyConnectionStreamClosed(currentlyAvailable: maxStreams - openStreams)
279293

280294
case .closing(var openStreams, let maxStreams):

Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool+Factory.swift

+1
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ extension HTTPConnectionPool.ConnectionFactory {
8484
connectionID: connectionID,
8585
delegate: http2ConnectionDelegate,
8686
decompression: self.clientConfiguration.decompression,
87+
maximumConnectionUses: self.clientConfiguration.maximumUsesPerConnection,
8788
logger: logger
8889
).whenComplete { result in
8990
switch result {

Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift

+2-1
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,8 @@ final class HTTPConnectionPool {
7171
self._state = StateMachine(
7272
idGenerator: idGenerator,
7373
maximumConcurrentHTTP1Connections: clientConfiguration.connectionPool.concurrentHTTP1ConnectionsPerHostSoftLimit,
74-
retryConnectionEstablishment: clientConfiguration.connectionPool.retryConnectionEstablishment
74+
retryConnectionEstablishment: clientConfiguration.connectionPool.retryConnectionEstablishment,
75+
maximumConnectionUses: clientConfiguration.maximumUsesPerConnection
7576
)
7677
}
7778

0 commit comments

Comments
 (0)