Skip to content

Commit 2483e08

Browse files
authored
Fix crash when receiving 2xx response before stream is complete. (#591)
Motivation It's totally acceptable for a HTTP server to respond before a request upload has completed. If the response is an error, we should abort the upload (and we do), but if the response is a 2xx we should probably just finish the upload. In this case it turns out we'll actually hit a crash when we attempt to deliver an empty body message. his is no good! Once that bug was fixed it revealed another: while we'd attempted to account for this case, we hadn't tested it, and so it turns out that shutdown would hang. As a result, I've also cleaned that up. Modifications - Tolerate empty circular buffers of bytes when streaming an upload. - Notify the connection that the task is complete when we're done. Result Fewer crashes and hangs.
1 parent 3fcd670 commit 2483e08

File tree

5 files changed

+98
-4
lines changed

5 files changed

+98
-4
lines changed

Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift

+4
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,10 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
267267
writePromise.futureResult.whenComplete { result in
268268
switch result {
269269
case .success:
270+
// If our final action was `sendRequestEnd`, that means we've already received
271+
// the complete response. As a result, once we've uploaded all the body parts
272+
// we need to tell the pool that the connection is idle.
273+
self.connection.taskCompleted()
270274
oldRequest.succeedRequest(buffer)
271275
case .failure(let error):
272276
oldRequest.fail(error)

Sources/AsyncHTTPClient/RequestBag+StateMachine.swift

+8-4
Original file line numberDiff line numberDiff line change
@@ -347,10 +347,14 @@ extension RequestBag.StateMachine {
347347
self.state = .executing(executor, requestState, .buffering(currentBuffer, next: next))
348348
return .none
349349
case .executing(let executor, let requestState, .waitingForRemote):
350-
var buffer = buffer
351-
let first = buffer.removeFirst()
352-
self.state = .executing(executor, requestState, .buffering(buffer, next: .askExecutorForMore))
353-
return .forwardResponsePart(first)
350+
if buffer.count > 0 {
351+
var buffer = buffer
352+
let first = buffer.removeFirst()
353+
self.state = .executing(executor, requestState, .buffering(buffer, next: .askExecutorForMore))
354+
return .forwardResponsePart(first)
355+
} else {
356+
return .none
357+
}
354358
case .redirected(let executor, var receivedBytes, let head, let redirectURL):
355359
let partsLength = buffer.reduce(into: 0) { $0 += $1.readableBytes }
356360
receivedBytes += partsLength

Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift

+31
Original file line numberDiff line numberDiff line change
@@ -1253,6 +1253,37 @@ class HTTPEchoHandler: ChannelInboundHandler {
12531253
}
12541254
}
12551255

1256+
final class HTTP200DelayedHandler: ChannelInboundHandler {
1257+
typealias InboundIn = HTTPServerRequestPart
1258+
typealias OutboundOut = HTTPServerResponsePart
1259+
1260+
var pendingBodyParts: Int?
1261+
1262+
init(bodyPartsBeforeResponse: Int) {
1263+
self.pendingBodyParts = bodyPartsBeforeResponse
1264+
}
1265+
1266+
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
1267+
let request = self.unwrapInboundIn(data)
1268+
switch request {
1269+
case .head:
1270+
break
1271+
case .body:
1272+
if let pendingBodyParts = self.pendingBodyParts {
1273+
if pendingBodyParts > 0 {
1274+
self.pendingBodyParts = pendingBodyParts - 1
1275+
} else {
1276+
self.pendingBodyParts = nil
1277+
context.writeAndFlush(self.wrapOutboundOut(.head(.init(version: .http1_1, status: .ok))), promise: nil)
1278+
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
1279+
}
1280+
}
1281+
case .end:
1282+
break
1283+
}
1284+
}
1285+
}
1286+
12561287
private let cert = """
12571288
-----BEGIN CERTIFICATE-----
12581289
MIICmDCCAYACCQCPC8JDqMh1zzANBgkqhkiG9w0BAQsFADANMQswCQYDVQQGEwJ1

Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift

+1
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ extension HTTPClientTests {
129129
("testSSLHandshakeErrorPropagationDelayedClose", testSSLHandshakeErrorPropagationDelayedClose),
130130
("testWeCloseConnectionsWhenConnectionCloseSetByServer", testWeCloseConnectionsWhenConnectionCloseSetByServer),
131131
("testBiDirectionalStreaming", testBiDirectionalStreaming),
132+
("testBiDirectionalStreamingEarly200", testBiDirectionalStreamingEarly200),
132133
("testSynchronousHandshakeErrorReporting", testSynchronousHandshakeErrorReporting),
133134
("testFileDownloadChunked", testFileDownloadChunked),
134135
("testCloseWhileBackpressureIsExertedIsFine", testCloseWhileBackpressureIsExertedIsFine),

Tests/AsyncHTTPClientTests/HTTPClientTests.swift

+54
Original file line numberDiff line numberDiff line change
@@ -2940,6 +2940,60 @@ class HTTPClientTests: XCTestCase {
29402940
XCTAssertNil(try delegate.next().wait())
29412941
}
29422942

2943+
// In this test, we test that a request can continue to stream its body after the response head and end
2944+
// was received where the end is a 200.
2945+
func testBiDirectionalStreamingEarly200() {
2946+
let httpBin = HTTPBin(.http1_1(ssl: false, compress: false)) { _ in HTTP200DelayedHandler(bodyPartsBeforeResponse: 1) }
2947+
defer { XCTAssertNoThrow(try httpBin.shutdown()) }
2948+
2949+
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 2)
2950+
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }
2951+
let writeEL = eventLoopGroup.next()
2952+
let delegateEL = eventLoopGroup.next()
2953+
2954+
let httpClient = HTTPClient(eventLoopGroupProvider: .shared(eventLoopGroup))
2955+
defer { XCTAssertNoThrow(try httpClient.syncShutdown()) }
2956+
2957+
let delegate = ResponseStreamDelegate(eventLoop: delegateEL)
2958+
2959+
let body: HTTPClient.Body = .stream { writer in
2960+
let finalPromise = writeEL.makePromise(of: Void.self)
2961+
2962+
func writeLoop(_ writer: HTTPClient.Body.StreamWriter, index: Int) {
2963+
// always invoke from the wrong el to test thread safety
2964+
writeEL.preconditionInEventLoop()
2965+
2966+
if index >= 30 {
2967+
return finalPromise.succeed(())
2968+
}
2969+
2970+
let sent = ByteBuffer(integer: index)
2971+
writer.write(.byteBuffer(sent)).whenComplete { result in
2972+
switch result {
2973+
case .success:
2974+
writeEL.execute {
2975+
writeLoop(writer, index: index + 1)
2976+
}
2977+
2978+
case .failure(let error):
2979+
finalPromise.fail(error)
2980+
}
2981+
}
2982+
}
2983+
2984+
writeEL.execute {
2985+
writeLoop(writer, index: 0)
2986+
}
2987+
2988+
return finalPromise.futureResult
2989+
}
2990+
2991+
let request = try! HTTPClient.Request(url: "http://localhost:\(httpBin.port)", body: body)
2992+
let future = httpClient.execute(request: request, delegate: delegate, eventLoop: .delegate(on: delegateEL))
2993+
XCTAssertNoThrow(try future.wait())
2994+
XCTAssertNil(try delegate.next().wait())
2995+
}
2996+
29432997
func testSynchronousHandshakeErrorReporting() throws {
29442998
// This only affects cases where we use NIOSSL.
29452999
guard !isTestingNIOTS() else { return }

0 commit comments

Comments
 (0)