Skip to content

Commit ac34f6d

Browse files
authored
Correctly handle Connection: close with streaming (#598)
Motivation When users stream their bodies they may still want to send Connection: close headers and terminate the connection early. This should work properly. Unfortunately it became clear that we didn't correctly pass the information that the connection needed to be closed. As a result, we'd inappropriately re-use the connection, potentially causing unnecessary HTTP errors. Modifications Signal whether the connection needs to be closed when the final connection action is to send .end. Results We behave better with streaming uploads.
1 parent 062989e commit ac34f6d

File tree

6 files changed

+98
-8
lines changed

6 files changed

+98
-8
lines changed

Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift

+10-4
Original file line numberDiff line numberDiff line change
@@ -261,16 +261,22 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
261261
case .close:
262262
context.close(promise: nil)
263263
oldRequest.succeedRequest(buffer)
264-
case .sendRequestEnd(let writePromise):
264+
case .sendRequestEnd(let writePromise, let shouldClose):
265265
let writePromise = writePromise ?? context.eventLoop.makePromise(of: Void.self)
266266
// We need to defer succeeding the old request to avoid ordering issues
267-
writePromise.futureResult.whenComplete { result in
267+
writePromise.futureResult.hop(to: context.eventLoop).whenComplete { result in
268268
switch result {
269269
case .success:
270270
// If our final action was `sendRequestEnd`, that means we've already received
271271
// the complete response. As a result, once we've uploaded all the body parts
272-
// we need to tell the pool that the connection is idle.
273-
self.connection.taskCompleted()
272+
// we need to tell the pool that the connection is idle or, if we were asked to
273+
// close when we're done, send the close. Either way, we then succeed the request
274+
if shouldClose {
275+
context.close(promise: nil)
276+
} else {
277+
self.connection.taskCompleted()
278+
}
279+
274280
oldRequest.succeedRequest(buffer)
275281
case .failure(let error):
276282
oldRequest.fail(error)

Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ConnectionStateMachine.swift

+5-2
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,10 @@ struct HTTP1ConnectionStateMachine {
3535
/// as soon as we wrote the request end onto the wire.
3636
///
3737
/// The promise is an optional write promise.
38-
case sendRequestEnd(EventLoopPromise<Void>?)
38+
///
39+
/// `shouldClose` records whether we have attached a Connection: close header to this request, and so the connection should
40+
/// be terminated
41+
case sendRequestEnd(EventLoopPromise<Void>?, shouldClose: Bool)
3942
/// Inform an observer that the connection has become idle
4043
case informConnectionIsIdle
4144
}
@@ -413,7 +416,7 @@ extension HTTP1ConnectionStateMachine.State {
413416
newFinalAction = .close
414417
case .sendRequestEnd(let writePromise):
415418
self = .idle
416-
newFinalAction = .sendRequestEnd(writePromise)
419+
newFinalAction = .sendRequestEnd(writePromise, shouldClose: close)
417420
case .none:
418421
self = .idle
419422
newFinalAction = close ? .close : .informConnectionIsIdle

Tests/AsyncHTTPClientTests/HTTP1ConnectionStateMachineTests.swift

+2-2
Original file line numberDiff line numberDiff line change
@@ -338,8 +338,8 @@ extension HTTP1ConnectionStateMachine.Action.FinalSuccessfulStreamAction: Equata
338338
switch (lhs, rhs) {
339339
case (.close, .close):
340340
return true
341-
case (sendRequestEnd(let lhsPromise), sendRequestEnd(let rhsPromise)):
342-
return lhsPromise?.futureResult == rhsPromise?.futureResult
341+
case (sendRequestEnd(let lhsPromise, let lhsShouldClose), sendRequestEnd(let rhsPromise, let rhsShouldClose)):
342+
return lhsPromise?.futureResult == rhsPromise?.futureResult && lhsShouldClose == rhsShouldClose
343343
case (informConnectionIsIdle, informConnectionIsIdle):
344344
return true
345345
default:

Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift

+26
Original file line numberDiff line numberDiff line change
@@ -1017,6 +1017,32 @@ internal final class CloseWithoutClosingServerHandler: ChannelInboundHandler {
10171017
}
10181018
}
10191019

1020+
final class ExpectClosureServerHandler: ChannelInboundHandler {
1021+
typealias InboundIn = HTTPServerRequestPart
1022+
typealias OutboundOut = HTTPServerResponsePart
1023+
1024+
private let onClosePromise: EventLoopPromise<Void>
1025+
1026+
init(onClosePromise: EventLoopPromise<Void>) {
1027+
self.onClosePromise = onClosePromise
1028+
}
1029+
1030+
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
1031+
switch self.unwrapInboundIn(data) {
1032+
case .head:
1033+
let head = HTTPResponseHead(version: .http1_1, status: .ok, headers: ["Content-Length": "0"])
1034+
context.write(self.wrapOutboundOut(.head(head)), promise: nil)
1035+
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
1036+
case .body, .end:
1037+
()
1038+
}
1039+
}
1040+
1041+
func channelInactive(context: ChannelHandlerContext) {
1042+
self.onClosePromise.succeed(())
1043+
}
1044+
}
1045+
10201046
struct EventLoopFutureTimeoutError: Error {}
10211047

10221048
extension EventLoopFuture {

Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift

+1
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ extension HTTPClientTests {
131131
("testBiDirectionalStreaming", testBiDirectionalStreaming),
132132
("testBiDirectionalStreamingEarly200", testBiDirectionalStreamingEarly200),
133133
("testBiDirectionalStreamingEarly200DoesntPreventUsFromSendingMoreRequests", testBiDirectionalStreamingEarly200DoesntPreventUsFromSendingMoreRequests),
134+
("testCloseConnectionAfterEarly2XXWhenStreaming", testCloseConnectionAfterEarly2XXWhenStreaming),
134135
("testSynchronousHandshakeErrorReporting", testSynchronousHandshakeErrorReporting),
135136
("testFileDownloadChunked", testFileDownloadChunked),
136137
("testCloseWhileBackpressureIsExertedIsFine", testCloseWhileBackpressureIsExertedIsFine),

Tests/AsyncHTTPClientTests/HTTPClientTests.swift

+54
Original file line numberDiff line numberDiff line change
@@ -3075,6 +3075,60 @@ class HTTPClientTests: XCTestCase {
30753075
XCTAssertNoThrow(try future2.wait())
30763076
}
30773077

3078+
// This test validates that we correctly close the connection after our body completes when we've streamed a
3079+
// body and received the 2XX response _before_ we finished our stream.
3080+
func testCloseConnectionAfterEarly2XXWhenStreaming() {
3081+
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 2)
3082+
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }
3083+
3084+
let onClosePromise = eventLoopGroup.next().makePromise(of: Void.self)
3085+
let httpBin = HTTPBin(.http1_1(ssl: false, compress: false)) { _ in ExpectClosureServerHandler(onClosePromise: onClosePromise) }
3086+
defer { XCTAssertNoThrow(try httpBin.shutdown()) }
3087+
3088+
let writeEL = eventLoopGroup.next()
3089+
3090+
let httpClient = HTTPClient(eventLoopGroupProvider: .shared(eventLoopGroup))
3091+
defer { XCTAssertNoThrow(try httpClient.syncShutdown()) }
3092+
3093+
let body: HTTPClient.Body = .stream { writer in
3094+
let finalPromise = writeEL.makePromise(of: Void.self)
3095+
3096+
func writeLoop(_ writer: HTTPClient.Body.StreamWriter, index: Int) {
3097+
// always invoke from the wrong el to test thread safety
3098+
writeEL.preconditionInEventLoop()
3099+
3100+
if index >= 30 {
3101+
return finalPromise.succeed(())
3102+
}
3103+
3104+
let sent = ByteBuffer(integer: index)
3105+
writer.write(.byteBuffer(sent)).whenComplete { result in
3106+
switch result {
3107+
case .success:
3108+
writeEL.execute {
3109+
writeLoop(writer, index: index + 1)
3110+
}
3111+
3112+
case .failure(let error):
3113+
finalPromise.fail(error)
3114+
}
3115+
}
3116+
}
3117+
3118+
writeEL.execute {
3119+
writeLoop(writer, index: 0)
3120+
}
3121+
3122+
return finalPromise.futureResult
3123+
}
3124+
3125+
let headers = HTTPHeaders([("Connection", "close")])
3126+
let request = try! HTTPClient.Request(url: "http://localhost:\(httpBin.port)", headers: headers, body: body)
3127+
let future = httpClient.execute(request: request)
3128+
XCTAssertNoThrow(try future.wait())
3129+
XCTAssertNoThrow(try onClosePromise.futureResult.wait())
3130+
}
3131+
30783132
func testSynchronousHandshakeErrorReporting() throws {
30793133
// This only affects cases where we use NIOSSL.
30803134
guard !isTestingNIOTS() else { return }

0 commit comments

Comments
 (0)