@@ -27,7 +27,7 @@ extension HTTPConnectionPool {
27
27
private var connections : HTTP1Connections
28
28
private var failedConsecutiveConnectionAttempts : Int = 0
29
29
30
- private var queue : RequestQueue
30
+ private var requests : RequestQueue
31
31
private var state : State = . running
32
32
33
33
init ( idGenerator: Connection . ID . Generator , maximumConcurrentConnections: Int ) {
@@ -36,7 +36,7 @@ extension HTTPConnectionPool {
36
36
generator: idGenerator
37
37
)
38
38
39
- self . queue = RequestQueue ( )
39
+ self . requests = RequestQueue ( )
40
40
}
41
41
42
42
// MARK: - Events -
@@ -72,7 +72,7 @@ extension HTTPConnectionPool {
72
72
}
73
73
74
74
// No matter what we do now, the request will need to wait!
75
- self . queue . push ( request)
75
+ self . requests . push ( request)
76
76
let requestAction : StateMachine . RequestAction = . scheduleRequestTimeout(
77
77
for: request,
78
78
on: eventLoop
@@ -84,7 +84,7 @@ extension HTTPConnectionPool {
84
84
}
85
85
86
86
// if we are not at max connections, we may want to create a new connection
87
- if self . connections. startingGeneralPurposeConnections >= self . queue . count {
87
+ if self . connections. startingGeneralPurposeConnections >= self . requests . generalPurposeCount {
88
88
// If there are at least as many connections starting as we have request queued, we
89
89
// don't need to create a new connection. we just need to wait.
90
90
return . init( request: requestAction, connection: . none)
@@ -109,14 +109,14 @@ extension HTTPConnectionPool {
109
109
}
110
110
111
111
// No matter what we do now, the request will need to wait!
112
- self . queue . push ( request)
112
+ self . requests . push ( request)
113
113
let requestAction : StateMachine . RequestAction = . scheduleRequestTimeout(
114
114
for: request,
115
115
on: eventLoop
116
116
)
117
117
118
118
let starting = self . connections. startingEventLoopConnections ( on: eventLoop)
119
- let waiting = self . queue . count ( for: eventLoop)
119
+ let waiting = self . requests . count ( for: eventLoop)
120
120
121
121
if starting >= waiting {
122
122
// There are already as many connections starting as we need for the waiting
@@ -141,11 +141,7 @@ extension HTTPConnectionPool {
141
141
}
142
142
143
143
mutating func failedToCreateNewConnection( _ error: Error , connectionID: Connection . ID ) -> Action {
144
- defer {
145
- // After we have processed this call, we have one more failed connection attempt,
146
- // that we need to consider when computing the jitter.
147
- self . failedConsecutiveConnectionAttempts += 1
148
- }
144
+ self . failedConsecutiveConnectionAttempts += 1
149
145
150
146
switch self . state {
151
147
case . running:
@@ -154,14 +150,16 @@ extension HTTPConnectionPool {
154
150
// decision about the retry will be made in `connectionCreationBackoffDone(_:)`
155
151
let eventLoop = self . connections. backoffNextConnectionAttempt ( connectionID)
156
152
157
- let backoff = self . calculateBackoff ( for : self . failedConsecutiveConnectionAttempts)
153
+ let backoff = self . calculateBackoff ( failedAttempt : self . failedConsecutiveConnectionAttempts)
158
154
return . init(
159
155
request: . none,
160
156
connection: . scheduleBackoffTimer( connectionID, backoff: backoff, on: eventLoop)
161
157
)
162
158
163
159
case . shuttingDown:
164
- let ( index, context) = self . connections. failConnection ( connectionID)
160
+ guard let ( index, context) = self . connections. failConnection ( connectionID) else {
161
+ preconditionFailure ( " Failed to create a connection that is unknown to us? " )
162
+ }
165
163
return self . nextActionForFailedConnection ( at: index, context: context)
166
164
167
165
case . shutDown:
@@ -174,7 +172,9 @@ extension HTTPConnectionPool {
174
172
// The naming of `failConnection` is a little confusing here. All it does is moving the
175
173
// connection state from `.backingOff` to `.closed` here. It also returns the
176
174
// connection's index.
177
- let ( index, context) = self . connections. failConnection ( connectionID)
175
+ guard let ( index, context) = self . connections. failConnection ( connectionID) else {
176
+ preconditionFailure ( " Backing off a connection that is unknown to us? " )
177
+ }
178
178
// In `nextActionForFailedConnection` a decision will be made whether the failed
179
179
// connection should be replaced or removed.
180
180
return self . nextActionForFailedConnection ( at: index, context: context)
@@ -202,13 +202,18 @@ extension HTTPConnectionPool {
202
202
203
203
/// A connection has been unexpectedly closed
204
204
mutating func connectionClosed( _ connectionID: Connection . ID ) -> Action {
205
- let ( index, context) = self . connections. failConnection ( connectionID)
205
+ guard let ( index, context) = self . connections. failConnection ( connectionID) else {
206
+ // When a connection close is initiated by the connection pool, the connection will
207
+ // still report its close to the state machine. In those cases we must ignore the
208
+ // event.
209
+ return . none
210
+ }
206
211
return self . nextActionForFailedConnection ( at: index, context: context)
207
212
}
208
213
209
214
mutating func timeoutRequest( _ requestID: Request . ID ) -> Action {
210
215
// 1. check requests in queue
211
- if let request = self . queue . remove ( requestID) {
216
+ if let request = self . requests . remove ( requestID) {
212
217
return . init(
213
218
request: . failRequest( request, HTTPClientError . getConnectionFromPoolTimeout, cancelTimeout: false ) ,
214
219
connection: . none
@@ -223,7 +228,7 @@ extension HTTPConnectionPool {
223
228
224
229
mutating func cancelRequest( _ requestID: Request . ID ) -> Action {
225
230
// 1. check requests in queue
226
- if self . queue . remove ( requestID) != nil {
231
+ if self . requests . remove ( requestID) != nil {
227
232
return . init(
228
233
request: . cancelRequestTimeout( requestID) ,
229
234
connection: . none
@@ -241,7 +246,7 @@ extension HTTPConnectionPool {
241
246
242
247
// If we have remaining request queued, we should fail all of them with a cancelled
243
248
// error.
244
- let waitingRequests = self . queue . removeAll ( )
249
+ let waitingRequests = self . requests . removeAll ( )
245
250
246
251
var requestAction : StateMachine . RequestAction = . none
247
252
if !waitingRequests. isEmpty {
@@ -285,7 +290,7 @@ extension HTTPConnectionPool {
285
290
return self . nextActionForIdleEventLoopConnection ( at: index, context: context)
286
291
}
287
292
case . shuttingDown( let unclean) :
288
- assert ( self . queue . isEmpty)
293
+ assert ( self . requests . isEmpty)
289
294
let connection = self . connections. closeConnection ( at: index)
290
295
if self . connections. isEmpty {
291
296
return . init(
@@ -308,15 +313,15 @@ extension HTTPConnectionPool {
308
313
context: HTTP1Connections . IdleConnectionContext
309
314
) -> Action {
310
315
// 1. Check if there are waiting requests in the general purpose queue
311
- if let request = self . queue . popFirst ( for: nil ) {
316
+ if let request = self . requests . popFirst ( for: nil ) {
312
317
return . init(
313
318
request: . executeRequest( request, self . connections. leaseConnection ( at: index) , cancelTimeout: true ) ,
314
319
connection: . none
315
320
)
316
321
}
317
322
318
323
// 2. Check if there are waiting requests in the matching eventLoop queue
319
- if let request = self . queue . popFirst ( for: context. eventLoop) {
324
+ if let request = self . requests . popFirst ( for: context. eventLoop) {
320
325
return . init(
321
326
request: . executeRequest( request, self . connections. leaseConnection ( at: index) , cancelTimeout: true ) ,
322
327
connection: . none
@@ -337,7 +342,7 @@ extension HTTPConnectionPool {
337
342
context: HTTP1Connections . IdleConnectionContext
338
343
) -> Action {
339
344
// Check if there are waiting requests in the matching eventLoop queue
340
- if let request = self . queue . popFirst ( for: context. eventLoop) {
345
+ if let request = self . requests . popFirst ( for: context. eventLoop) {
341
346
return . init(
342
347
request: . executeRequest( request, self . connections. leaseConnection ( at: index) , cancelTimeout: true ) ,
343
348
connection: . none
@@ -372,7 +377,7 @@ extension HTTPConnectionPool {
372
377
}
373
378
374
379
case . shuttingDown( let unclean) :
375
- assert ( self . queue . isEmpty)
380
+ assert ( self . requests . isEmpty)
376
381
self . connections. removeConnection ( at: index)
377
382
if self . connections. isEmpty {
378
383
return . init(
@@ -391,7 +396,7 @@ extension HTTPConnectionPool {
391
396
at index: Int ,
392
397
context: HTTP1Connections . FailedConnectionContext
393
398
) -> Action {
394
- if context. connectionsStartingForUseCase < self . queue . count ( for : nil ) {
399
+ if context. connectionsStartingForUseCase < self . requests . generalPurposeCount {
395
400
// if we have more requests queued up, than we have starting connections, we should
396
401
// create a new connection
397
402
let ( newConnectionID, newEventLoop) = self . connections. replaceConnection ( at: index)
@@ -408,7 +413,7 @@ extension HTTPConnectionPool {
408
413
at index: Int ,
409
414
context: HTTP1Connections . FailedConnectionContext
410
415
) -> Action {
411
- if context. connectionsStartingForUseCase < self . queue . count ( for: context. eventLoop) {
416
+ if context. connectionsStartingForUseCase < self . requests . count ( for: context. eventLoop) {
412
417
// if we have more requests queued up, than we have starting connections, we should
413
418
// create a new connection
414
419
let ( newConnectionID, newEventLoop) = self . connections. replaceConnection ( at: index)
@@ -421,8 +426,8 @@ extension HTTPConnectionPool {
421
426
return . none
422
427
}
423
428
424
- private func calculateBackoff( for attempt : Int ) -> TimeAmount {
425
- // Our backoff formula is: 100ms * 1.25^attempt that is capped of at 1minute
429
+ private func calculateBackoff( failedAttempt attempts : Int ) -> TimeAmount {
430
+ // Our backoff formula is: 100ms * 1.25^(attempts - 1) that is capped of at 1minute
426
431
// This means for:
427
432
// - 1 failed attempt : 100ms
428
433
// - 5 failed attempts: ~300ms
@@ -433,7 +438,7 @@ extension HTTPConnectionPool {
433
438
// - 29 failed attempts: ~60s (max out)
434
439
435
440
let start = Double ( TimeAmount . milliseconds ( 100 ) . nanoseconds)
436
- let backoffNanoseconds = Int64 ( start * pow( 1.25 , Double ( attempt ) ) )
441
+ let backoffNanoseconds = Int64 ( start * pow( 1.25 , Double ( attempts - 1 ) ) )
437
442
438
443
let backoff : TimeAmount = min ( . nanoseconds( backoffNanoseconds) , . seconds( 60 ) )
439
444
@@ -450,7 +455,7 @@ extension HTTPConnectionPool {
450
455
extension HTTPConnectionPool . HTTP1StateMachine : CustomStringConvertible {
451
456
var description : String {
452
457
let stats = self . connections. stats
453
- let queued = self . queue . count
458
+ let queued = self . requests . count
454
459
455
460
return " connections: [connecting: \( stats. connecting) | backoff: \( stats. backingOff) | leased: \( stats. leased) | idle: \( stats. idle) ], queued: \( queued) "
456
461
}
0 commit comments