Skip to content

Commit c9a2814

Browse files
committed
Fix crash for large HTTP request headers
Fix crash for when sending HTTP request headers result in a channel writability change event
1 parent 8ad3d8a commit c9a2814

9 files changed

+190
-121
lines changed

Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift

Lines changed: 18 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -183,9 +183,19 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
183183

184184
private func run(_ action: HTTP1ConnectionStateMachine.Action, context: ChannelHandlerContext) {
185185
switch action {
186-
case .sendRequestHead(let head, startBody: let startBody):
187-
self.sendRequestHead(head, startBody: startBody, context: context)
188-
186+
case .sendRequestHead(let head, let sendEnd):
187+
self.sendRequestHead(head, sendEnd: sendEnd, context: context)
188+
case .notifyRequestHeadSendSuccessfully(let resumeRequestBodyStream, let startIdleTimer):
189+
190+
request!.requestHeadSent()
191+
if resumeRequestBodyStream {
192+
request!.resumeRequestBodyStream()
193+
}
194+
if startIdleTimer {
195+
if let timeoutAction = self.idleReadTimeoutStateMachine?.requestEndSent() {
196+
self.runTimeoutAction(timeoutAction, context: context)
197+
}
198+
}
189199
case .sendBodyPart(let part, let writePromise):
190200
context.writeAndFlush(self.wrapOutboundOut(.body(part)), promise: writePromise)
191201

@@ -320,32 +330,15 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
320330
}
321331
}
322332

323-
private func sendRequestHead(_ head: HTTPRequestHead, startBody: Bool, context: ChannelHandlerContext) {
324-
if startBody {
325-
context.writeAndFlush(self.wrapOutboundOut(.head(head)), promise: nil)
326-
327-
// The above write might trigger an error, which may lead to a call to `errorCaught`,
328-
// which in turn, may fail the request and pop it from the handler. For this reason
329-
// we must check if the request is still present here.
330-
guard let request = self.request else { return }
331-
request.requestHeadSent()
332-
333-
request.resumeRequestBodyStream()
334-
} else {
333+
private func sendRequestHead(_ head: HTTPRequestHead, sendEnd: Bool, context: ChannelHandlerContext) {
334+
if sendEnd {
335335
context.write(self.wrapOutboundOut(.head(head)), promise: nil)
336336
context.write(self.wrapOutboundOut(.end(nil)), promise: nil)
337337
context.flush()
338-
339-
// The above write might trigger an error, which may lead to a call to `errorCaught`,
340-
// which in turn, may fail the request and pop it from the handler. For this reason
341-
// we must check if the request is still present here.
342-
guard let request = self.request else { return }
343-
request.requestHeadSent()
344-
345-
if let timeoutAction = self.idleReadTimeoutStateMachine?.requestEndSent() {
346-
self.runTimeoutAction(timeoutAction, context: context)
347-
}
338+
} else {
339+
context.writeAndFlush(self.wrapOutboundOut(.head(head)), promise: nil)
348340
}
341+
self.run(self.state.headSent(), context: context)
349342
}
350343

351344
private func runTimeoutAction(_ action: IdleReadStateMachine.Action, context: ChannelHandlerContext) {

Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ConnectionStateMachine.swift

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,11 @@ struct HTTP1ConnectionStateMachine {
5757
case none
5858
}
5959

60-
case sendRequestHead(HTTPRequestHead, startBody: Bool)
60+
case sendRequestHead(HTTPRequestHead, sendEnd: Bool)
61+
case notifyRequestHeadSendSuccessfully(
62+
resumeRequestBodyStream: Bool,
63+
startIdleTimer: Bool
64+
)
6165
case sendBodyPart(IOData, EventLoopPromise<Void>?)
6266
case sendRequestEnd(EventLoopPromise<Void>?)
6367
case failSendBodyPart(Error, EventLoopPromise<Void>?)
@@ -350,6 +354,17 @@ struct HTTP1ConnectionStateMachine {
350354
return state.modify(with: action)
351355
}
352356
}
357+
358+
mutating func headSent() -> Action {
359+
guard case .inRequest(var requestStateMachine, let close) = state else {
360+
return .wait
361+
}
362+
return self.avoidingStateMachineCoW { state in
363+
let action = requestStateMachine.headSent()
364+
state = .inRequest(requestStateMachine, close: close)
365+
return state.modify(with: action)
366+
}
367+
}
353368
}
354369

355370
extension HTTP1ConnectionStateMachine {
@@ -390,8 +405,10 @@ extension HTTP1ConnectionStateMachine {
390405
extension HTTP1ConnectionStateMachine.State {
391406
fileprivate mutating func modify(with action: HTTPRequestStateMachine.Action) -> HTTP1ConnectionStateMachine.Action {
392407
switch action {
393-
case .sendRequestHead(let head, let startBody):
394-
return .sendRequestHead(head, startBody: startBody)
408+
case .sendRequestHead(let head, let sendEnd):
409+
return .sendRequestHead(head, sendEnd: sendEnd)
410+
case .notifyRequestHeadSendSuccessfully(let resumeRequestBodyStream, let startIdleTimer):
411+
return .notifyRequestHeadSendSuccessfully(resumeRequestBodyStream: resumeRequestBodyStream, startIdleTimer: startIdleTimer)
395412
case .pauseRequestBodyStream:
396413
return .pauseRequestBodyStream
397414
case .resumeRequestBodyStream:
@@ -462,6 +479,7 @@ extension HTTP1ConnectionStateMachine.State {
462479

463480
case .failSendStreamFinished(let error, let writePromise):
464481
return .failSendStreamFinished(error, writePromise)
482+
465483
}
466484
}
467485
}

Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2ClientRequestHandler.swift

Lines changed: 17 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -140,9 +140,18 @@ final class HTTP2ClientRequestHandler: ChannelDuplexHandler {
140140

141141
private func run(_ action: HTTPRequestStateMachine.Action, context: ChannelHandlerContext) {
142142
switch action {
143-
case .sendRequestHead(let head, let startBody):
144-
self.sendRequestHead(head, startBody: startBody, context: context)
145-
143+
case .sendRequestHead(let head, let sendEnd):
144+
self.sendRequestHead(head, sendEnd: sendEnd, context: context)
145+
case .notifyRequestHeadSendSuccessfully(let resumeRequestBodyStream, let startIdleTimer):
146+
request!.requestHeadSent()
147+
if resumeRequestBodyStream {
148+
request!.resumeRequestBodyStream()
149+
}
150+
if startIdleTimer {
151+
if let timeoutAction = self.idleReadTimeoutStateMachine?.requestEndSent() {
152+
self.runTimeoutAction(timeoutAction, context: context)
153+
}
154+
}
146155
case .pauseRequestBodyStream:
147156
// We can force unwrap the request here, as we have just validated in the state machine,
148157
// that the request is neither failed nor finished yet
@@ -210,31 +219,15 @@ final class HTTP2ClientRequestHandler: ChannelDuplexHandler {
210219
}
211220
}
212221

213-
private func sendRequestHead(_ head: HTTPRequestHead, startBody: Bool, context: ChannelHandlerContext) {
214-
if startBody {
215-
context.writeAndFlush(self.wrapOutboundOut(.head(head)), promise: nil)
216-
217-
// The above write might trigger an error, which may lead to a call to `errorCaught`,
218-
// which in turn, may fail the request and pop it from the handler. For this reason
219-
// we must check if the request is still present here.
220-
guard let request = self.request else { return }
221-
request.requestHeadSent()
222-
request.resumeRequestBodyStream()
223-
} else {
222+
private func sendRequestHead(_ head: HTTPRequestHead, sendEnd: Bool, context: ChannelHandlerContext) {
223+
if sendEnd {
224224
context.write(self.wrapOutboundOut(.head(head)), promise: nil)
225225
context.write(self.wrapOutboundOut(.end(nil)), promise: nil)
226226
context.flush()
227-
228-
// The above write might trigger an error, which may lead to a call to `errorCaught`,
229-
// which in turn, may fail the request and pop it from the handler. For this reason
230-
// we must check if the request is still present here.
231-
guard let request = self.request else { return }
232-
request.requestHeadSent()
233-
234-
if let timeoutAction = self.idleReadTimeoutStateMachine?.requestEndSent() {
235-
self.runTimeoutAction(timeoutAction, context: context)
236-
}
227+
} else {
228+
context.writeAndFlush(self.wrapOutboundOut(.head(head)), promise: nil)
237229
}
230+
self.run(self.state.headSent(), context: context)
238231
}
239232

240233
private func runSuccessfulFinalAction(_ action: HTTPRequestStateMachine.Action.FinalSuccessfulRequestAction, context: ChannelHandlerContext) {

Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift

Lines changed: 62 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -20,21 +20,24 @@ struct HTTPRequestStateMachine {
2020
fileprivate enum State {
2121
/// The initial state machine state. The only valid mutation is `start()`. The state will
2222
/// transitions to:
23-
/// - `.waitForChannelToBecomeWritable`
24-
/// - `.running(.streaming, .initialized)` (if the Channel is writable and if a request body is expected)
25-
/// - `.running(.endSent, .initialized)` (if the Channel is writable and no request body is expected)
23+
/// - `.waitForChannelToBecomeWritable` (if the channel becomes non writable while sending the header)
24+
/// - `.sendingHead` if the channel is writable
2625
case initialized
26+
2727
/// Waiting for the channel to be writable. Valid transitions are:
28-
/// - `.running(.streaming, .initialized)` (once the Channel is writable again and if a request body is expected)
29-
/// - `.running(.endSent, .initialized)` (once the Channel is writable again and no request body is expected)
28+
/// - `.running(.streaming, .waitingForHead)` (once the Channel is writable again and if a request body is expected)
29+
/// - `.running(.endSent, .waitingForHead)` (once the Channel is writable again and no request body is expected)
3030
/// - `.failed` (if a connection error occurred)
3131
case waitForChannelToBecomeWritable(HTTPRequestHead, RequestFramingMetadata)
32+
3233
/// A request is on the wire. Valid transitions are:
3334
/// - `.finished`
3435
/// - `.failed`
3536
case running(RequestState, ResponseState)
37+
3638
/// The request has completed successfully
3739
case finished
40+
3841
/// The request has failed
3942
case failed(Error)
4043

@@ -93,7 +96,11 @@ struct HTTPRequestStateMachine {
9396
case none
9497
}
9598

96-
case sendRequestHead(HTTPRequestHead, startBody: Bool)
99+
case sendRequestHead(HTTPRequestHead, sendEnd: Bool)
100+
case notifyRequestHeadSendSuccessfully(
101+
resumeRequestBodyStream: Bool,
102+
startIdleTimer: Bool
103+
)
97104
case sendBodyPart(IOData, EventLoopPromise<Void>?)
98105
case sendRequestEnd(EventLoopPromise<Void>?)
99106
case failSendBodyPart(Error, EventLoopPromise<Void>?)
@@ -223,6 +230,7 @@ struct HTTPRequestStateMachine {
223230
// the request failed, before it was sent onto the wire.
224231
self.state = .failed(error)
225232
return .failRequest(error, .none)
233+
226234
case .running:
227235
self.state = .failed(error)
228236
return .failRequest(error, .close(nil))
@@ -520,7 +528,7 @@ struct HTTPRequestStateMachine {
520528

521529
switch self.state {
522530
case .initialized, .waitForChannelToBecomeWritable:
523-
preconditionFailure("How can we receive a response head before sending a request head ourselves")
531+
preconditionFailure("How can we receive a response head before sending a request head ourselves \(self.state)")
524532

525533
case .running(.streaming(let expectedBodyLength, let sentBodyBytes, producer: .paused), .waitingForHead):
526534
self.state = .running(
@@ -561,7 +569,7 @@ struct HTTPRequestStateMachine {
561569
mutating func receivedHTTPResponseBodyPart(_ body: ByteBuffer) -> Action {
562570
switch self.state {
563571
case .initialized, .waitForChannelToBecomeWritable:
564-
preconditionFailure("How can we receive a response head before sending a request head ourselves. Invalid state: \(self.state)")
572+
preconditionFailure("How can we receive a response head before completely sending a request head ourselves. Invalid state: \(self.state)")
565573

566574
case .running(_, .waitingForHead):
567575
preconditionFailure("How can we receive a response body, if we haven't received a head. Invalid state: \(self.state)")
@@ -587,7 +595,7 @@ struct HTTPRequestStateMachine {
587595
private mutating func receivedHTTPResponseEnd() -> Action {
588596
switch self.state {
589597
case .initialized, .waitForChannelToBecomeWritable:
590-
preconditionFailure("How can we receive a response head before sending a request head ourselves. Invalid state: \(self.state)")
598+
preconditionFailure("How can we receive a response end before completely sending a request head ourselves. Invalid state: \(self.state)")
591599

592600
case .running(_, .waitingForHead):
593601
preconditionFailure("How can we receive a response end, if we haven't a received a head. Invalid state: \(self.state)")
@@ -654,7 +662,7 @@ struct HTTPRequestStateMachine {
654662
case .initialized,
655663
.running(_, .waitingForHead),
656664
.waitForChannelToBecomeWritable:
657-
preconditionFailure("The response is expected to only ask for more data after the response head was forwarded")
665+
preconditionFailure("The response is expected to only ask for more data after the response head was forwarded \(self.state)")
658666

659667
case .running(let requestState, .receivingBody(let head, var responseStreamState)):
660668
return self.avoidingStateMachineCoW { state -> Action in
@@ -697,18 +705,52 @@ struct HTTPRequestStateMachine {
697705
}
698706

699707
private mutating func startSendingRequest(head: HTTPRequestHead, metadata: RequestFramingMetadata) -> Action {
700-
switch metadata.body {
701-
case .stream:
702-
self.state = .running(.streaming(expectedBodyLength: nil, sentBodyBytes: 0, producer: .producing), .waitingForHead)
703-
return .sendRequestHead(head, startBody: true)
704-
case .fixedSize(0):
708+
let length = metadata.body.expectedLength
709+
if length == 0 {
705710
// no body
706711
self.state = .running(.endSent, .waitingForHead)
707-
return .sendRequestHead(head, startBody: false)
708-
case .fixedSize(let length):
709-
// length is greater than zero and we therefore have a body to send
710-
self.state = .running(.streaming(expectedBodyLength: length, sentBodyBytes: 0, producer: .producing), .waitingForHead)
711-
return .sendRequestHead(head, startBody: true)
712+
return .sendRequestHead(head, sendEnd: true)
713+
} else {
714+
self.state = .running(.streaming(expectedBodyLength: length, sentBodyBytes: 0, producer: .paused), .waitingForHead)
715+
return .sendRequestHead(head, sendEnd: false)
716+
}
717+
}
718+
719+
mutating func headSent() -> Action {
720+
switch self.state {
721+
case .initialized, .waitForChannelToBecomeWritable, .finished:
722+
preconditionFailure("Not a valid transition after `.sendingHeader`: \(self.state)")
723+
724+
case .running(.streaming(let expectedBodyLength, let sentBodyBytes, producer: .paused), let responseState):
725+
let startProducing = self.isChannelWritable && expectedBodyLength != sentBodyBytes
726+
self.state = .running(.streaming(
727+
expectedBodyLength: expectedBodyLength,
728+
sentBodyBytes: sentBodyBytes,
729+
producer: startProducing ? .producing : .paused
730+
), responseState)
731+
return .notifyRequestHeadSendSuccessfully(
732+
resumeRequestBodyStream: startProducing,
733+
startIdleTimer: false
734+
)
735+
case .running(.endSent, _):
736+
return .notifyRequestHeadSendSuccessfully(resumeRequestBodyStream: false, startIdleTimer: true)
737+
case .running(.streaming(_, _, producer: .producing), _):
738+
preconditionFailure("request body producing can not start before we have successfully send the header \(self.state)")
739+
case .failed:
740+
return .wait
741+
742+
case .modifying:
743+
preconditionFailure("Invalid state: \(self.state)")
744+
745+
}
746+
}
747+
}
748+
749+
extension RequestFramingMetadata.Body {
750+
var expectedLength: Int? {
751+
switch self {
752+
case .fixedSize(let length): return length
753+
case .stream: return nil
712754
}
713755
}
714756
}

Tests/AsyncHTTPClientTests/HTTP1ClientChannelHandlerTests.swift

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -528,7 +528,6 @@ class HTTP1ClientChannelHandlerTests: XCTestCase {
528528
}
529529

530530
func testChannelBecomesNonWritableDuringHeaderWrite() throws {
531-
try XCTSkipIf(true, "this currently fails and will be fixed in follow up PR")
532531
final class ChangeWritabilityOnFlush: ChannelOutboundHandler {
533532
typealias OutboundIn = Any
534533
func flush(context: ChannelHandlerContext) {

0 commit comments

Comments
 (0)