@@ -49,6 +49,15 @@ extension HTTPConnectionPool {
49
49
}
50
50
}
51
51
52
+ var canOrWillBeAbleToExecuteRequests : Bool {
53
+ switch self . state {
54
+ case . starting, . backingOff, . active:
55
+ return true
56
+ case . draining, . closed:
57
+ return false
58
+ }
59
+ }
60
+
52
61
/// A request can be scheduled on the connection
53
62
var isAvailable : Bool {
54
63
switch self . state {
@@ -92,6 +101,7 @@ extension HTTPConnectionPool {
92
101
switch self . state {
93
102
case . active, . draining, . backingOff, . closed:
94
103
preconditionFailure ( " Invalid state: \( self . state) " )
104
+
95
105
case . starting:
96
106
self . state = . active( conn, maxStreams: maxStreams, usedStreams: 0 , lastIdle: . now( ) )
97
107
return maxStreams
@@ -106,25 +116,29 @@ extension HTTPConnectionPool {
106
116
switch self . state {
107
117
case . starting, . backingOff, . closed:
108
118
preconditionFailure ( " Invalid state for updating max concurrent streams: \( self . state) " )
109
- case . draining( let conn, _, let usedStreams) :
110
- self . state = . draining( conn, maxStreams: maxStreams, usedStreams: usedStreams)
111
- return 0
119
+
112
120
case . active( let conn, _, let usedStreams, let lastIdle) :
113
121
self . state = . active( conn, maxStreams: maxStreams, usedStreams: usedStreams, lastIdle: lastIdle)
114
122
return max ( maxStreams - usedStreams, 0 )
123
+
124
+ case . draining( let conn, _, let usedStreams) :
125
+ self . state = . draining( conn, maxStreams: maxStreams, usedStreams: usedStreams)
126
+ return 0
115
127
}
116
128
}
117
129
118
130
mutating func goAwayReceived( ) -> EventLoop {
119
131
switch self . state {
120
132
case . starting, . backingOff, . closed:
121
133
preconditionFailure ( " Invalid state for draining a connection: \( self . state) " )
122
- case . draining( let conn, _, _) :
123
- // we could potentially receive another go away while we drain all active streams and we just ignore it
124
- return conn. eventLoop
134
+
125
135
case . active( let conn, let maxStreams, let usedStreams, _) :
126
136
self . state = . draining( conn, maxStreams: maxStreams, usedStreams: usedStreams)
127
137
return conn. eventLoop
138
+
139
+ case . draining( let conn, _, _) :
140
+ // we could potentially receive another go away while we drain all active streams and we just ignore it
141
+ return conn. eventLoop
128
142
}
129
143
}
130
144
@@ -151,6 +165,7 @@ extension HTTPConnectionPool {
151
165
switch self . state {
152
166
case . starting, . backingOff, . draining, . closed:
153
167
preconditionFailure ( " Invalid state for leasing a stream: \( self . state) " )
168
+
154
169
case . active( let conn, let maxStreams, var usedStreams, let lastIdle) :
155
170
usedStreams += count
156
171
precondition ( usedStreams <= maxStreams, " tried to lease a connection which is not available " )
@@ -165,11 +180,7 @@ extension HTTPConnectionPool {
165
180
switch self . state {
166
181
case . starting, . backingOff, . closed:
167
182
preconditionFailure ( " Invalid state: \( self . state) " )
168
- case . draining( let conn, let maxStreams, var usedStreams) :
169
- usedStreams -= 1
170
- assert ( usedStreams >= 0 )
171
- self . state = . draining( conn, maxStreams: maxStreams, usedStreams: usedStreams)
172
- return 0
183
+
173
184
case . active( let conn, let maxStreams, var usedStreams, var lastIdle) :
174
185
usedStreams -= 1
175
186
assert ( usedStreams >= 0 )
@@ -178,6 +189,12 @@ extension HTTPConnectionPool {
178
189
}
179
190
self . state = . active( conn, maxStreams: maxStreams, usedStreams: usedStreams, lastIdle: lastIdle)
180
191
return max ( maxStreams - usedStreams, 0 )
192
+
193
+ case . draining( let conn, let maxStreams, var usedStreams) :
194
+ usedStreams -= 1
195
+ assert ( usedStreams >= 0 )
196
+ self . state = . draining( conn, maxStreams: maxStreams, usedStreams: usedStreams)
197
+ return 0
181
198
}
182
199
}
183
200
@@ -186,6 +203,7 @@ extension HTTPConnectionPool {
186
203
case . active( let conn, _, 0 , _) :
187
204
self . state = . closed
188
205
return conn
206
+
189
207
case . starting, . backingOff, . draining, . closed, . active:
190
208
preconditionFailure ( " Invalid state for closing a connection: \( self . state) " )
191
209
}
@@ -214,9 +232,11 @@ extension HTTPConnectionPool {
214
232
switch self . state {
215
233
case . starting:
216
234
return . keepConnection
235
+
217
236
case . backingOff:
218
237
context. connectBackoff. append ( self . connectionID)
219
238
return . removeConnection
239
+
220
240
case . active( let connection, _, let usedStreams, _) :
221
241
if usedStreams <= 0 {
222
242
context. close. append ( connection)
@@ -225,9 +245,11 @@ extension HTTPConnectionPool {
225
245
context. cancel. append ( connection)
226
246
return . keepConnection
227
247
}
248
+
228
249
case . draining( let connection, _, _) :
229
250
context. cancel. append ( connection)
230
251
return . keepConnection
252
+
231
253
case . closed:
232
254
preconditionFailure ( " Unexpected state for cleanup: Did not expect to have closed connections in the state machine. " )
233
255
}
@@ -240,15 +262,19 @@ extension HTTPConnectionPool {
240
262
switch self . state {
241
263
case . starting:
242
264
stats. startingConnections &+= 1
265
+
243
266
case . backingOff:
244
267
stats. backingOffConnections &+= 1
268
+
245
269
case . active( _, let maxStreams, let usedStreams, _) :
246
270
stats. availableStreams += max ( maxStreams - usedStreams, 0 )
247
271
stats. leasedStreams += usedStreams
248
272
stats. availableConnections &+= 1
273
+
249
274
case . draining( _, _, let usedStreams) :
250
275
stats. drainingConnections &+= 1
251
276
stats. leasedStreams += usedStreams
277
+
252
278
case . closed:
253
279
break
254
280
}
@@ -310,21 +336,23 @@ extension HTTPConnectionPool {
310
336
311
337
/// used in general purpose connection scenarios to check if at least one connection exist, or if should we create a new one
312
338
var hasConnectionThatCanOrWillBeAbleToExecuteRequests : Bool {
313
- self . connections. contains { $0. isStartingOrBackingOff || $0 . isActive }
339
+ self . connections. contains { $0. canOrWillBeAbleToExecuteRequests }
314
340
}
315
341
316
342
/// used in eventLoop scenarios. does at least one connection exist for this eventLoop, or should we create a new one?
317
343
/// - Parameter eventLoop: connection `EventLoop` to search for
318
344
/// - Returns: true if at least one connection is starting or active for the given `eventLoop`
319
345
func hasConnectionThatCanOrWillBeAbleToExecuteRequests( for eventLoop: EventLoop ) -> Bool {
320
346
self . connections. contains {
321
- $0. eventLoop === eventLoop && ( $0. isStartingOrBackingOff || $0 . isActive )
347
+ $0. eventLoop === eventLoop && $0. canOrWillBeAbleToExecuteRequests
322
348
}
323
349
}
324
350
325
351
mutating func createNewConnection( on eventLoop: EventLoop ) -> Connection . ID {
326
- // assert no active connection exists on the requested eventLoop
327
- assert ( self . connections. allSatisfy { $0. eventLoop !== eventLoop || !$0. isActive } )
352
+ assert (
353
+ !self . hasConnectionThatCanOrWillBeAbleToExecuteRequests ( for: eventLoop) ,
354
+ " we should not create more than one connection per event loop "
355
+ )
328
356
329
357
let connection = HTTP2ConnectionState ( connectionID: self . generator. next ( ) , eventLoop: eventLoop)
330
358
self . connections. append ( connection)
@@ -353,10 +381,11 @@ extension HTTPConnectionPool {
353
381
return ( index, context)
354
382
}
355
383
356
- /// Move the HTTP1ConnectionState to backingOff.
384
+ /// Move the connection state to backingOff.
357
385
///
358
386
/// - Parameter connectionID: The connectionID of the failed connection attempt
359
387
/// - Returns: The eventLoop on which to schedule the backoff timer
388
+ /// - Precondition: connection needs to be in the `.starting` state
360
389
mutating func backoffNextConnectionAttempt( _ connectionID: Connection . ID ) -> EventLoop {
361
390
guard let index = self . connections. firstIndex ( where: { $0. connectionID == connectionID } ) else {
362
391
preconditionFailure ( " We tried to create a new connection that we know nothing about? " )
@@ -368,6 +397,9 @@ extension HTTPConnectionPool {
368
397
369
398
// MARK: Connection lifecycle events
370
399
400
+ /// Sets the connection with the given `connectionId` to the draining state.
401
+ /// - Returns: the `EventLoop` to create a new connection on if applicable
402
+ /// - Precondition: connection with given `connectionId` must be either `.active` or already in the `.draining` state
371
403
mutating func goAwayReceived( _ connectionID: Connection . ID ) -> GoAwayContext {
372
404
guard let index = self . connections. firstIndex ( where: { $0. connectionID == connectionID } ) else {
373
405
preconditionFailure ( " go away recieved for a connection that does not exists " )
@@ -376,12 +408,18 @@ extension HTTPConnectionPool {
376
408
return GoAwayContext ( eventLoop: eventLoop)
377
409
}
378
410
411
+ /// Update the maximum number of concurrent streams for the given connection.
412
+ /// - Parameters:
413
+ /// - connectionID: The connectionID for which we received new settings
414
+ /// - newMaxStreams: new maximum concurrent streams
415
+ /// - Returns: index of the connection and new number of available streams in the `AvailableConnectionContext`
416
+ /// - Precondition: Connections must be in the `.active` or `.draining` state.
379
417
mutating func newHTTP2MaxConcurrentStreamsReceived(
380
418
_ connectionID: Connection . ID ,
381
419
newMaxStreams: Int
382
420
) -> ( Int , AvailableConnectionContext ) {
383
421
guard let index = self . connections. firstIndex ( where: { $0. connectionID == connectionID } ) else {
384
- preconditionFailure ( " We tried to update the maximum concurren streams number for a connection that does not exists " )
422
+ preconditionFailure ( " We tried to update the maximum number of concurrent streams for a connection that does not exists " )
385
423
}
386
424
let availableStreams = self . connections [ index] . newMaxConcurrentStreams ( newMaxStreams)
387
425
let context = AvailableConnectionContext (
@@ -413,7 +451,7 @@ extension HTTPConnectionPool {
413
451
return availableConnection
414
452
}
415
453
416
- mutating func leaseStreams ( onRequired eventLoop: EventLoop ) -> Connection ? {
454
+ mutating func leaseStream ( onRequired eventLoop: EventLoop ) -> Connection ? {
417
455
guard let index = self . findAvailableConnection ( onRequired: eventLoop) else { return nil }
418
456
return self . leaseStreams ( at: index, count: 1 )
419
457
}
@@ -453,7 +491,7 @@ extension HTTPConnectionPool {
453
491
/// - Returns: closed and removed connection
454
492
mutating func closeConnection( at index: Int ) -> Connection {
455
493
let connection = self . connections [ index] . close ( )
456
- self . connections . remove ( at: index)
494
+ self . removeConnection ( at: index)
457
495
return connection
458
496
}
459
497
@@ -479,7 +517,7 @@ extension HTTPConnectionPool {
479
517
}
480
518
481
519
/// replaces a closed connection by creating a new starting connection.
482
- /// - Parameter index: index of the connection which we get from `failConnection(_:)`
520
+ /// - Parameter index: index of the connection which we got from `failConnection(_:)`
483
521
/// - Precondition: connection must be closed
484
522
mutating func createNewConnectionByReplacingClosedConnection( at index: Int ) -> ( Connection . ID , EventLoop ) {
485
523
precondition ( self . connections [ index] . isClosed)
0 commit comments