@@ -28,14 +28,13 @@ final class HTTPConnectionPool {
28
28
private var _idleTimer = [ Connection . ID : Scheduled < Void > ] ( )
29
29
/// The connection backoff timeout timers. Protected by the stateLock
30
30
private var _backoffTimer = [ Connection . ID : Scheduled < Void > ] ( )
31
+ /// The request connection timeout timers. Protected by the stateLock
32
+ private var _requestTimer = [ Request . ID : Scheduled < Void > ] ( )
31
33
32
34
private static let fallbackConnectTimeout : TimeAmount = . seconds( 30 )
33
35
34
36
let key : ConnectionPool . Key
35
37
36
- private let timerLock = Lock ( )
37
- private var _requestTimer = [ Request . ID : Scheduled < Void > ] ( )
38
-
39
38
private var logger : Logger
40
39
41
40
private let eventLoopGroup : EventLoopGroup
@@ -110,21 +109,64 @@ final class HTTPConnectionPool {
110
109
}
111
110
}
112
111
112
+ enum RequestAction {
113
+ enum Unlocked {
114
+ case executeRequest( Request , Connection )
115
+ case executeRequests( [ Request ] , Connection )
116
+ case failRequest( Request , Error )
117
+ case failRequests( [ Request ] , Error )
118
+ case none
119
+ }
120
+
121
+ enum Locked {
122
+ case scheduleRequestTimeout( for: Request , on: EventLoop )
123
+ case cancelRequestTimeout( Request . ID )
124
+ case cancelRequestTimeouts( [ Request ] )
125
+ case none
126
+ }
127
+ }
128
+
113
129
struct Locked {
114
130
var connection : ConnectionAction . Locked
131
+ var request : RequestAction . Locked
115
132
}
116
133
117
134
struct Unlocked {
118
135
var connection : ConnectionAction . Unlocked
119
- var request : StateMachine . RequestAction
136
+ var request : RequestAction . Unlocked
120
137
}
121
138
122
139
var locked : Locked
123
140
var unlocked : Unlocked
124
141
125
142
init ( from stateMachineAction: StateMachine . Action ) {
126
- self . locked = Locked ( connection: . none)
127
- self . unlocked = Unlocked ( connection: . none, request: stateMachineAction. request)
143
+ self . locked = Locked ( connection: . none, request: . none)
144
+ self . unlocked = Unlocked ( connection: . none, request: . none)
145
+
146
+ switch stateMachineAction. request {
147
+ case . cancelRequestTimeout( let requestID) :
148
+ self . locked. request = . cancelRequestTimeout( requestID)
149
+ case . executeRequest( let request, let connection, cancelTimeout: let cancelTimeout) :
150
+ if cancelTimeout {
151
+ self . locked. request = . cancelRequestTimeout( request. id)
152
+ }
153
+ self . unlocked. request = . executeRequest( request, connection)
154
+ case . executeRequestsAndCancelTimeouts( let requests, let connection) :
155
+ self . locked. request = . cancelRequestTimeouts( requests)
156
+ self . unlocked. request = . executeRequests( requests, connection)
157
+ case . failRequest( let request, let error, cancelTimeout: let cancelTimeout) :
158
+ if cancelTimeout {
159
+ self . locked. request = . cancelRequestTimeout( request. id)
160
+ }
161
+ self . unlocked. request = . failRequest( request, error)
162
+ case . failRequestsAndCancelTimeouts( let requests, let error) :
163
+ self . locked. request = . cancelRequestTimeouts( requests)
164
+ self . unlocked. request = . failRequests( requests, error)
165
+ case . scheduleRequestTimeout( for: let request, on: let eventLoop) :
166
+ self . locked. request = . scheduleRequestTimeout( for: request, on: eventLoop)
167
+ case . none:
168
+ break
169
+ }
128
170
129
171
switch stateMachineAction. connection {
130
172
case . createConnection( let connectionID, on: let eventLoop) :
@@ -154,14 +196,15 @@ final class HTTPConnectionPool {
154
196
let unlockedActions = self . stateLock. withLock { ( ) -> Actions . Unlocked in
155
197
let stateMachineAction = closure ( & self . _state)
156
198
let poolAction = Actions ( from: stateMachineAction)
157
- self . runLockedActions ( poolAction. locked)
199
+ self . runLockedConnectionAction ( poolAction. locked. connection)
200
+ self . runLockedRequestAction ( poolAction. locked. request)
158
201
return poolAction. unlocked
159
202
}
160
203
self . runUnlockedActions ( unlockedActions)
161
204
}
162
205
163
- private func runLockedActions ( _ actions : Actions . Locked ) {
164
- switch actions . connection {
206
+ private func runLockedConnectionAction ( _ action : Actions . ConnectionAction . Locked ) {
207
+ switch action {
165
208
case . scheduleBackoffTimer( let connectionID, backoff: let backoff, on: let eventLoop) :
166
209
self . scheduleConnectionStartBackoffTimer ( connectionID, backoff, on: eventLoop)
167
210
@@ -181,6 +224,22 @@ final class HTTPConnectionPool {
181
224
}
182
225
}
183
226
227
+ private func runLockedRequestAction( _ action: Actions . RequestAction . Locked ) {
228
+ switch action {
229
+ case . scheduleRequestTimeout( for: let request, on: let eventLoop) :
230
+ self . scheduleRequestTimeout ( request, on: eventLoop)
231
+
232
+ case . cancelRequestTimeout( let requestID) :
233
+ self . cancelRequestTimeout ( requestID)
234
+
235
+ case . cancelRequestTimeouts( let requests) :
236
+ requests. forEach { self . cancelRequestTimeout ( $0. id) }
237
+
238
+ case . none:
239
+ break
240
+ }
241
+ }
242
+
184
243
private func runUnlockedActions( _ actions: Actions . Unlocked ) {
185
244
self . runUnlockedConnectionAction ( actions. connection)
186
245
self . runUnlockedRequestAction ( actions. request)
@@ -225,38 +284,20 @@ final class HTTPConnectionPool {
225
284
}
226
285
}
227
286
228
- private func runUnlockedRequestAction( _ action: StateMachine . RequestAction ) {
229
- // The order of execution fail/execute request vs cancelling the request timeout timer does
230
- // not matter in the actions here. The actions don't cause any side effects that will be
231
- // reported back to the state machine and are not dependent on each other.
232
-
287
+ private func runUnlockedRequestAction( _ action: Actions . RequestAction . Unlocked ) {
233
288
switch action {
234
- case . executeRequest( let request, let connection, cancelTimeout: let cancelTimeout) :
235
- if cancelTimeout {
236
- self . cancelRequestTimeout ( request. id)
237
- }
289
+ case . executeRequest( let request, let connection) :
238
290
connection. executeRequest ( request. req)
239
291
240
- case . executeRequestsAndCancelTimeouts( let requests, let connection) :
241
- self . cancelRequestTimeouts ( requests)
292
+ case . executeRequests( let requests, let connection) :
242
293
requests. forEach { connection. executeRequest ( $0. req) }
243
294
244
- case . failRequest( let request, let error, cancelTimeout: let cancelTimeout) :
245
- if cancelTimeout {
246
- self . cancelRequestTimeout ( request. id)
247
- }
295
+ case . failRequest( let request, let error) :
248
296
request. req. fail ( error)
249
297
250
- case . failRequestsAndCancelTimeouts( let requests, let error) :
251
- self . cancelRequestTimeouts ( requests)
298
+ case . failRequests( let requests, let error) :
252
299
requests. forEach { $0. req. fail ( error) }
253
300
254
- case . scheduleRequestTimeout( let request, on: let eventLoop) :
255
- self . scheduleRequestTimeout ( request, on: eventLoop)
256
-
257
- case . cancelRequestTimeout( let requestID) :
258
- self . cancelRequestTimeout ( requestID)
259
-
260
301
case . none:
261
302
break
262
303
}
@@ -282,49 +323,29 @@ final class HTTPConnectionPool {
282
323
private func scheduleRequestTimeout( _ request: Request , on eventLoop: EventLoop ) {
283
324
let requestID = request. id
284
325
let scheduled = eventLoop. scheduleTask ( deadline: request. connectionDeadline) {
285
- // The timer has fired. Now we need to do a couple of things:
286
- //
287
- // 1. Remove ourselves from the timer dictionary to not leak any data. If our
288
- // waiter entry still exists, we need to tell the state machine, that we want
289
- // to fail the request.
290
- let timeoutFired = self . timerLock. withLock {
291
- self . _requestTimer. removeValue ( forKey: requestID) != nil
292
- }
293
-
294
- // 2. If the entry did not exists anymore, we can assume that the request was
295
- // scheduled on another connection. The timer still fired anyhow because of a
296
- // race. In such a situation we don't need to do anything.
297
- guard timeoutFired else { return }
298
-
299
- // 3. Tell the state machine about the timeout
300
- self . modifyStateAndRunActions {
301
- $0. timeoutRequest ( requestID)
326
+ // there might be a race between a the timeout timer and the pool scheduling the
327
+ // request on another thread.
328
+ self . modifyStateAndRunActions { stateMachine in
329
+ if self . _requestTimer. removeValue ( forKey: requestID) != nil {
330
+ // The timer still exists. State Machines assumes it is alive. Inform state
331
+ // machine.
332
+ return stateMachine. timeoutRequest ( requestID)
333
+ }
334
+ return . none
302
335
}
303
336
}
304
337
305
- self . timerLock. withLockVoid {
306
- assert ( self . _requestTimer [ requestID] == nil )
307
- self . _requestTimer [ requestID] = scheduled
308
- }
338
+ assert ( self . _requestTimer [ requestID] == nil )
339
+ self . _requestTimer [ requestID] = scheduled
309
340
310
341
request. req. requestWasQueued ( self )
311
342
}
312
343
313
344
private func cancelRequestTimeout( _ id: Request . ID ) {
314
- let scheduled = self . timerLock . withLock {
315
- self . _requestTimer . removeValue ( forKey : id )
345
+ guard let cancelTimer = self . _requestTimer . removeValue ( forKey : id ) else {
346
+ preconditionFailure ( " Expected to have a timer for request \( id ) at this point. " )
316
347
}
317
-
318
- scheduled? . cancel ( )
319
- }
320
-
321
- private func cancelRequestTimeouts( _ requests: [ Request ] ) {
322
- let scheduled = self . timerLock. withLock {
323
- requests. compactMap {
324
- self . _requestTimer. removeValue ( forKey: $0. id)
325
- }
326
- }
327
- scheduled. forEach { $0. cancel ( ) }
348
+ cancelTimer. cancel ( )
328
349
}
329
350
330
351
private func scheduleIdleTimerForConnection( _ connectionID: Connection . ID , on eventLoop: EventLoop ) {
0 commit comments