15
15
import NIO
16
16
17
17
extension HTTP1ConnectionProvider {
18
- enum Action {
19
- case lease( Connection , Waiter )
20
- case create( Waiter )
21
- case replace( Connection , Waiter )
18
+ enum Action < ConnectionType : PoolManageableConnection > {
19
+ case lease( ConnectionType , Waiter < ConnectionType > )
20
+ case create( Waiter < ConnectionType > )
21
+ case replace( ConnectionType , Waiter < ConnectionType > )
22
22
case closeProvider
23
- case park( Connection )
23
+ case park( ConnectionType )
24
24
case none
25
- case fail( Waiter , Error )
26
- indirect case closeAnd( Connection , Action )
27
- indirect case parkAnd( Connection , Action )
25
+ case fail( Waiter < ConnectionType > , Error )
26
+ indirect case closeAnd( ConnectionType , Action < ConnectionType > )
27
+ indirect case parkAnd( ConnectionType , Action < ConnectionType > )
28
28
}
29
29
30
- struct ConnectionsState {
30
+ struct ConnectionsState < ConnectionType : PoolManageableConnection > {
31
31
enum State {
32
32
case active
33
33
case closed
34
34
}
35
35
36
- struct Snapshot {
36
+ struct Snapshot < ConnectionType : PoolManageableConnection > {
37
37
var state : State
38
- var availableConnections : CircularBuffer < Connection >
39
- var leasedConnections : Set < ConnectionKey >
40
- var waiters : CircularBuffer < Waiter >
38
+ var availableConnections : CircularBuffer < ConnectionType >
39
+ var leasedConnections : Set < ConnectionKey < ConnectionType > >
40
+ var waiters : CircularBuffer < Waiter < ConnectionType > >
41
41
var openedConnectionsCount : Int
42
42
var pending : Int
43
43
}
@@ -48,16 +48,16 @@ extension HTTP1ConnectionProvider {
48
48
private var state : State = . active
49
49
50
50
/// Opened connections that are available.
51
- private var availableConnections : CircularBuffer < Connection > = . init( initialCapacity: 8 )
51
+ private var availableConnections : CircularBuffer < ConnectionType > = . init( initialCapacity: 8 )
52
52
53
53
/// Opened connections that are leased to the user.
54
- private var leasedConnections : Set < ConnectionKey > = . init( )
54
+ private var leasedConnections : Set < ConnectionKey < ConnectionType > > = . init( )
55
55
56
56
/// Consumers that weren't able to get a new connection without exceeding
57
57
/// `maximumConcurrentConnections` get a `Future<Connection>`
58
58
/// whose associated promise is stored in `Waiter`. The promise is completed
59
59
/// as soon as possible by the provider, in FIFO order.
60
- private var waiters : CircularBuffer < Waiter > = . init( initialCapacity: 8 )
60
+ private var waiters : CircularBuffer < Waiter < ConnectionType > > = . init( initialCapacity: 8 )
61
61
62
62
/// Number of opened or opening connections, used to keep track of all connections and enforcing `maximumConcurrentConnections` limit.
63
63
private var openedConnectionsCount : Int = 0
@@ -70,11 +70,11 @@ extension HTTP1ConnectionProvider {
70
70
self . eventLoop = eventLoop
71
71
}
72
72
73
- func testsOnly_getInternalState( ) -> Snapshot {
73
+ func testsOnly_getInternalState( ) -> Snapshot < ConnectionType > {
74
74
return Snapshot ( state: self . state, availableConnections: self . availableConnections, leasedConnections: self . leasedConnections, waiters: self . waiters, openedConnectionsCount: self . openedConnectionsCount, pending: self . pending)
75
75
}
76
76
77
- mutating func testsOnly_setInternalState( _ snapshot: Snapshot ) {
77
+ mutating func testsOnly_setInternalState( _ snapshot: Snapshot < ConnectionType > ) {
78
78
self . state = snapshot. state
79
79
self . availableConnections = snapshot. availableConnections
80
80
self . leasedConnections = snapshot. leasedConnections
@@ -109,15 +109,15 @@ extension HTTP1ConnectionProvider {
109
109
return self . openedConnectionsCount == 0 && self . pending == 0
110
110
}
111
111
112
- mutating func acquire( waiter: Waiter ) -> Action {
112
+ mutating func acquire( waiter: Waiter < ConnectionType > ) -> Action < ConnectionType > {
113
113
switch self . state {
114
114
case . active:
115
115
self . pending -= 1
116
116
117
117
let ( eventLoop, required) = self . resolvePreference ( waiter. preference)
118
118
if required {
119
119
// If there is an opened connection on the same EL - use it
120
- if let found = self . availableConnections. firstIndex ( where: { $0. channel . eventLoop === eventLoop } ) {
120
+ if let found = self . availableConnections. firstIndex ( where: { $0. eventLoop === eventLoop } ) {
121
121
let connection = self . availableConnections. remove ( at: found)
122
122
self . leasedConnections. insert ( ConnectionKey ( connection) )
123
123
return . lease( connection, waiter)
@@ -151,7 +151,7 @@ extension HTTP1ConnectionProvider {
151
151
}
152
152
}
153
153
154
- mutating func release( connection: Connection , closing: Bool ) -> Action {
154
+ mutating func release( connection: ConnectionType , closing: Bool ) -> Action < ConnectionType > {
155
155
switch self . state {
156
156
case . active:
157
157
assert ( self . leasedConnections. contains ( ConnectionKey ( connection) ) )
@@ -161,12 +161,12 @@ extension HTTP1ConnectionProvider {
161
161
let ( eventLoop, required) = self . resolvePreference ( waiter. preference)
162
162
163
163
// If returned connection is on same EL or we do not require special EL - lease it
164
- if connection. channel . eventLoop === eventLoop || !required {
164
+ if connection. eventLoop === eventLoop || !required {
165
165
return . lease( connection, waiter)
166
166
}
167
167
168
168
// If there is an opened connection on the same loop, lease it and park returned
169
- if let found = self . availableConnections. firstIndex ( where: { $0. channel . eventLoop === eventLoop } ) {
169
+ if let found = self . availableConnections. firstIndex ( where: { $0. eventLoop === eventLoop } ) {
170
170
self . leasedConnections. remove ( ConnectionKey ( connection) )
171
171
let replacement = self . availableConnections. swap ( at: found, with: connection)
172
172
self . leasedConnections. insert ( ConnectionKey ( replacement) )
@@ -203,7 +203,7 @@ extension HTTP1ConnectionProvider {
203
203
}
204
204
}
205
205
206
- mutating func offer( connection: Connection ) -> Action {
206
+ mutating func offer( connection: ConnectionType ) -> Action < ConnectionType > {
207
207
switch self . state {
208
208
case . active:
209
209
self . leasedConnections. insert ( ConnectionKey ( connection) )
@@ -214,7 +214,7 @@ extension HTTP1ConnectionProvider {
214
214
}
215
215
}
216
216
217
- mutating func drop( connection: Connection ) {
217
+ mutating func drop( connection: ConnectionType ) {
218
218
switch self . state {
219
219
case . active:
220
220
self . leasedConnections. remove ( ConnectionKey ( connection) )
@@ -223,7 +223,7 @@ extension HTTP1ConnectionProvider {
223
223
}
224
224
}
225
225
226
- mutating func connectFailed( ) -> Action {
226
+ mutating func connectFailed( ) -> Action < ConnectionType > {
227
227
switch self . state {
228
228
case . active:
229
229
self . openedConnectionsCount -= 1
@@ -239,7 +239,7 @@ extension HTTP1ConnectionProvider {
239
239
}
240
240
}
241
241
242
- mutating func remoteClosed( connection: Connection ) -> Action {
242
+ mutating func remoteClosed( connection: ConnectionType ) -> Action < ConnectionType > {
243
243
switch self . state {
244
244
case . active:
245
245
// Connection can be closed remotely while we wait for `.lease` action to complete.
@@ -260,7 +260,7 @@ extension HTTP1ConnectionProvider {
260
260
}
261
261
}
262
262
263
- mutating func timeout( connection: Connection ) -> Action {
263
+ mutating func timeout( connection: ConnectionType ) -> Action < ConnectionType > {
264
264
switch self . state {
265
265
case . active:
266
266
// We can get timeout and inUse = true when we decided to lease the connection, but this action is not executed yet.
@@ -285,12 +285,12 @@ extension HTTP1ConnectionProvider {
285
285
}
286
286
}
287
287
288
- mutating func processNextWaiter( ) -> Action {
288
+ mutating func processNextWaiter( ) -> Action < ConnectionType > {
289
289
if let waiter = self . waiters. popFirst ( ) {
290
290
let ( eventLoop, required) = self . resolvePreference ( waiter. preference)
291
291
292
292
// If specific EL is required, we have only two options - find open one or create a new one
293
- if required, let found = self . availableConnections. firstIndex ( where: { $0. channel . eventLoop === eventLoop } ) {
293
+ if required, let found = self . availableConnections. firstIndex ( where: { $0. eventLoop === eventLoop } ) {
294
294
let connection = self . availableConnections. remove ( at: found)
295
295
self . leasedConnections. insert ( ConnectionKey ( connection) )
296
296
return . lease( connection, waiter)
@@ -313,7 +313,7 @@ extension HTTP1ConnectionProvider {
313
313
return . none
314
314
}
315
315
316
- mutating func close( ) -> ( CircularBuffer < Waiter > , CircularBuffer < Connection > , Set < ConnectionKey > , Bool ) ? {
316
+ mutating func close( ) -> ( CircularBuffer < Waiter < ConnectionType > > , CircularBuffer < ConnectionType > , Set < ConnectionKey < ConnectionType > > , Bool ) ? {
317
317
switch self . state {
318
318
case . active:
319
319
let waiters = self . waiters
0 commit comments