Skip to content

Commit 62b7fcc

Browse files
committed
Tests
1 parent 4af1b1a commit 62b7fcc

14 files changed

+1309
-129
lines changed

Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2ClientRequestHandler.swift

+12
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,8 @@ final class HTTP2ClientRequestHandler: ChannelDuplexHandler {
106106
let request = self.unwrapOutboundIn(data)
107107
self.request = request
108108

109+
request.willExecuteRequest(self)
110+
109111
let action = self.state.startRequest(
110112
head: request.requestHead,
111113
metadata: request.requestFramingMetadata
@@ -118,6 +120,16 @@ final class HTTP2ClientRequestHandler: ChannelDuplexHandler {
118120
self.run(action, context: context)
119121
}
120122

123+
func triggerUserOutboundEvent(context: ChannelHandlerContext, event: Any, promise: EventLoopPromise<Void>?) {
124+
switch event {
125+
case HTTPConnectionEvent.cancelRequest:
126+
let action = self.state.requestCancelled()
127+
self.run(action, context: context)
128+
default:
129+
context.fireUserInboundEventTriggered(event)
130+
}
131+
}
132+
121133
// MARK: - Private Methods -
122134

123135
// MARK: Run Actions

Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2Connection.swift

+53-59
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import NIO
1717
import NIOHTTP2
1818

1919
protocol HTTP2ConnectionDelegate {
20+
func http2Connection(_: HTTP2Connection, newMaxStreamSetting: Int)
2021
func http2ConnectionStreamClosed(_: HTTP2Connection, availableStreams: Int)
2122
func http2ConnectionGoAwayReceived(_: HTTP2Connection)
2223
func http2ConnectionClosed(_: HTTP2Connection)
@@ -37,7 +38,7 @@ final class HTTP2Connection {
3738
enum State {
3839
case initialized
3940
case starting(EventLoopPromise<Void>)
40-
case active(HTTP2Settings)
41+
case active(maxStreams: Int)
4142
case closing
4243
case closed
4344
}
@@ -71,16 +72,6 @@ final class HTTP2Connection {
7172
}
7273
}
7374

74-
var settings: HTTP2Settings? {
75-
self.channel.eventLoop.assertInEventLoop()
76-
switch self.state {
77-
case .initialized, .starting, .closing, .closed:
78-
return nil
79-
case .active(let settings):
80-
return settings
81-
}
82-
}
83-
8475
private var state: State
8576

8677
/// We use this channel set to remember, which open streams we need to inform that
@@ -93,9 +84,6 @@ final class HTTP2Connection {
9384
connectionID: HTTPConnectionPool.Connection.ID,
9485
delegate: HTTP2ConnectionDelegate,
9586
logger: Logger) {
96-
precondition(channel.isActive)
97-
channel.eventLoop.preconditionInEventLoop()
98-
9987
self.channel = channel
10088
self.id = connectionID
10189
self.logger = logger
@@ -178,50 +166,6 @@ final class HTTP2Connection {
178166
self.channel.close()
179167
}
180168

181-
func http2SettingsReceived(_ settings: HTTP2Settings) {
182-
self.channel.eventLoop.assertInEventLoop()
183-
184-
switch self.state {
185-
case .initialized, .closed:
186-
preconditionFailure("Invalid state: \(self.state)")
187-
case .starting(let promise):
188-
self.state = .active(settings)
189-
promise.succeed(())
190-
case .active:
191-
self.state = .active(settings)
192-
case .closing:
193-
// ignore. we only wait for all connections to be closed anyway.
194-
break
195-
}
196-
}
197-
198-
func http2GoAwayReceived() {
199-
self.channel.eventLoop.assertInEventLoop()
200-
201-
switch self.state {
202-
case .initialized, .closed:
203-
preconditionFailure("Invalid state: \(self.state)")
204-
205-
case .starting(let promise):
206-
self.state = .closing
207-
promise.fail(HTTP2ReceivedGoAwayBeforeSettingsError())
208-
209-
case .active:
210-
self.state = .closing
211-
self.delegate.http2ConnectionGoAwayReceived(self)
212-
213-
case .closing:
214-
// we are already closing. Nothing new
215-
break
216-
}
217-
}
218-
219-
func http2StreamClosed(availableStreams: Int) {
220-
self.channel.eventLoop.assertInEventLoop()
221-
222-
self.delegate.http2ConnectionStreamClosed(self, availableStreams: availableStreams)
223-
}
224-
225169
private func start() -> EventLoopFuture<Void> {
226170
self.channel.eventLoop.assertInEventLoop()
227171

@@ -242,7 +186,7 @@ final class HTTP2Connection {
242186
let sync = self.channel.pipeline.syncOperations
243187

244188
let http2Handler = NIOHTTP2Handler(mode: .client, initialSettings: nioDefaultSettings)
245-
let idleHandler = HTTP2IdleHandler(connection: self, logger: self.logger)
189+
let idleHandler = HTTP2IdleHandler(delegate: self, logger: self.logger)
246190

247191
try sync.addHandler(http2Handler, position: .last)
248192
try sync.addHandler(idleHandler, position: .last)
@@ -270,3 +214,53 @@ final class HTTP2Connection {
270214
self.channel.triggerUserOutboundEvent(HTTPConnectionEvent.closeConnection, promise: nil)
271215
}
272216
}
217+
218+
extension HTTP2Connection: HTTP2IdleHandlerDelegate {
219+
func http2SettingsReceived(maxStreams: Int) {
220+
self.channel.eventLoop.assertInEventLoop()
221+
222+
switch self.state {
223+
case .initialized:
224+
preconditionFailure("Invalid state: \(self.state)")
225+
226+
case .starting(let promise):
227+
self.state = .active(maxStreams: maxStreams)
228+
promise.succeed(())
229+
230+
case .active:
231+
self.state = .active(maxStreams: maxStreams)
232+
self.delegate.http2Connection(self, newMaxStreamSetting: maxStreams)
233+
234+
case .closing, .closed:
235+
// ignore. we only wait for all connections to be closed anyway.
236+
break
237+
}
238+
}
239+
240+
func http2GoAwayReceived() {
241+
self.channel.eventLoop.assertInEventLoop()
242+
243+
switch self.state {
244+
case .initialized:
245+
preconditionFailure("Invalid state: \(self.state)")
246+
247+
case .starting(let promise):
248+
self.state = .closing
249+
promise.fail(HTTP2ReceivedGoAwayBeforeSettingsError())
250+
251+
case .active:
252+
self.state = .closing
253+
self.delegate.http2ConnectionGoAwayReceived(self)
254+
255+
case .closing, .closed:
256+
// we are already closing. Nothing new
257+
break
258+
}
259+
}
260+
261+
func http2StreamClosed(availableStreams: Int) {
262+
self.channel.eventLoop.assertInEventLoop()
263+
264+
self.delegate.http2ConnectionStreamClosed(self, availableStreams: availableStreams)
265+
}
266+
}

Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2IdleHandler.swift

+47-15
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,29 @@ import Logging
1616
import NIO
1717
import NIOHTTP2
1818

19-
// This is a `ChannelDuplexHandler` since we need to intercept outgoing user events.
20-
final class HTTP2IdleHandler: ChannelDuplexHandler {
19+
protocol HTTP2IdleHandlerDelegate {
20+
func http2SettingsReceived(maxStreams: Int)
21+
22+
func http2GoAwayReceived()
23+
24+
func http2StreamClosed(availableStreams: Int)
25+
}
26+
27+
// This is a `ChannelDuplexHandler` since we need to intercept outgoing user events. It is generic
28+
// over its delegate to allow for specialization.
29+
final class HTTP2IdleHandler<Delegate: HTTP2IdleHandlerDelegate>: ChannelDuplexHandler {
2130
typealias InboundIn = HTTP2Frame
2231
typealias InboundOut = HTTP2Frame
2332
typealias OutboundIn = HTTP2Frame
2433
typealias OutboundOut = HTTP2Frame
2534

2635
let logger: Logger
27-
let connection: HTTP2Connection
36+
let delegate: Delegate
2837

2938
private var state: StateMachine = .init()
3039

31-
init(connection: HTTP2Connection, logger: Logger) {
32-
self.connection = connection
40+
init(delegate: Delegate, logger: Logger) {
41+
self.delegate = delegate
3342
self.logger = logger
3443
}
3544

@@ -56,17 +65,39 @@ final class HTTP2IdleHandler: ChannelDuplexHandler {
5665
case .goAway:
5766
let action = self.state.goAwayReceived()
5867
self.run(action, context: context)
68+
5969
case .settings(.settings(let settings)):
6070
let action = self.state.settingsReceived(settings)
6171
self.run(action, context: context)
72+
6273
default:
6374
// We're not interested in other events.
64-
()
75+
break
6576
}
6677

6778
context.fireChannelRead(data)
6879
}
6980

81+
func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) {
82+
// We intercept calls between the `NIOHTTP2ChannelHandler` and the `HTTP2StreamMultiplexer`
83+
// to learn, how many open streams we have.
84+
switch event {
85+
case is StreamClosedEvent:
86+
let action = self.state.streamClosed()
87+
self.run(action, context: context)
88+
89+
case is NIOHTTP2StreamCreatedEvent:
90+
let action = self.state.streamCreated()
91+
self.run(action, context: context)
92+
93+
default:
94+
// We're not interested in other events.
95+
break
96+
}
97+
98+
context.fireUserInboundEventTriggered(event)
99+
}
100+
70101
func triggerUserOutboundEvent(context: ChannelHandlerContext, event: Any, promise: EventLoopPromise<Void>?) {
71102
switch event {
72103
case HTTPConnectionEvent.closeConnection:
@@ -83,14 +114,14 @@ final class HTTP2IdleHandler: ChannelDuplexHandler {
83114
case .nothing:
84115
break
85116

86-
case .notifyConnectionNewSettings(let settings):
87-
self.connection.http2SettingsReceived(settings)
117+
case .notifyConnectionNewMaxStreamsSettings(let maxStreams):
118+
self.delegate.http2SettingsReceived(maxStreams: maxStreams)
88119

89120
case .notifyConnectionStreamClosed(let currentlyAvailable):
90-
self.connection.http2StreamClosed(availableStreams: currentlyAvailable)
121+
self.delegate.http2StreamClosed(availableStreams: currentlyAvailable)
91122

92123
case .notifyConnectionGoAwayReceived:
93-
self.connection.http2GoAwayReceived()
124+
self.delegate.http2GoAwayReceived()
94125

95126
case .close:
96127
context.close(mode: .all, promise: nil)
@@ -101,7 +132,7 @@ final class HTTP2IdleHandler: ChannelDuplexHandler {
101132
extension HTTP2IdleHandler {
102133
struct StateMachine {
103134
enum Action {
104-
case notifyConnectionNewSettings(HTTP2Settings)
135+
case notifyConnectionNewMaxStreamsSettings(Int)
105136
case notifyConnectionGoAwayReceived(close: Bool)
106137
case notifyConnectionStreamClosed(currentlyAvailable: Int)
107138
case nothing
@@ -146,12 +177,12 @@ extension HTTP2IdleHandler {
146177
// the http/2 default of 100.
147178
let maxStreams = settings.last(where: { $0.parameter == .maxConcurrentStreams })?.value ?? 100
148179
self.state = .active(openStreams: 0, maxStreams: maxStreams)
149-
return .notifyConnectionNewSettings(settings)
180+
return .notifyConnectionNewMaxStreamsSettings(maxStreams)
150181

151182
case .active(openStreams: let openStreams, maxStreams: let maxStreams):
152183
if let newMaxStreams = settings.last(where: { $0.parameter == .maxConcurrentStreams })?.value, newMaxStreams != maxStreams {
153184
self.state = .active(openStreams: openStreams, maxStreams: newMaxStreams)
154-
return .notifyConnectionNewSettings(settings)
185+
return .notifyConnectionNewMaxStreamsSettings(newMaxStreams)
155186
}
156187
return .nothing
157188

@@ -163,7 +194,8 @@ extension HTTP2IdleHandler {
163194
mutating func goAwayReceived() -> Action {
164195
switch self.state {
165196
case .initialized, .closed:
166-
preconditionFailure("Invalid state")
197+
preconditionFailure("Invalid state: \(self.state)")
198+
167199
case .connected:
168200
self.state = .closing(openStreams: 0, maxStreams: 0)
169201
return .notifyConnectionGoAwayReceived(close: true)
@@ -180,7 +212,7 @@ extension HTTP2IdleHandler {
180212
mutating func closeEventReceived() -> Action {
181213
switch self.state {
182214
case .initialized:
183-
preconditionFailure("")
215+
preconditionFailure("Invalid state: \(self.state)")
184216

185217
case .connected:
186218
self.state = .closing(openStreams: 0, maxStreams: 0)

0 commit comments

Comments
 (0)