Skip to content

Commit bfb149c

Browse files
committed
Fix HTTP2StreamChannel leak
1 parent 817d9aa commit bfb149c

File tree

4 files changed

+149
-9
lines changed

4 files changed

+149
-9
lines changed

Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2ClientRequestHandler.swift

+14-6
Original file line numberDiff line numberDiff line change
@@ -239,21 +239,29 @@ final class HTTP2ClientRequestHandler: ChannelDuplexHandler {
239239

240240
private func runSuccessfulFinalAction(_ action: HTTPRequestStateMachine.Action.FinalSuccessfulRequestAction, context: ChannelHandlerContext) {
241241
switch action {
242-
case .close:
242+
case .close, .none:
243+
// We must close the http2 stream after the request has finished. This breaks a reference
244+
// cycle in HTTP2Connection.
243245
context.close(promise: nil)
244246

245247
case .sendRequestEnd(let writePromise):
246-
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: writePromise)
247-
248-
case .none:
249-
break
248+
let promise = writePromise ?? context.eventLoop.makePromise(of: Void.self)
249+
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: promise)
250+
promise.futureResult.whenComplete { _ in
251+
// Once we have written the last message we must close the http2 stream, since this
252+
// break a reference cycle in HTTP2Connection.
253+
context.close(promise: nil)
254+
}
250255
}
251256
}
252257

253258
private func runFailedFinalAction(_ action: HTTPRequestStateMachine.Action.FinalFailedRequestAction, context: ChannelHandlerContext, error: Error) {
259+
// We must close the http2 stream after the request has finished. This breaks a reference
260+
// cycle in HTTP2Connection.
261+
context.close(promise: nil)
262+
254263
switch action {
255264
case .close(let writePromise):
256-
context.close(promise: nil)
257265
writePromise?.fail(error)
258266

259267
case .none:

Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2Connection.swift

+2-2
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ final class HTTP2Connection {
7777

7878
/// We use this channel set to remember, which open streams we need to inform that
7979
/// we want to close the connection. The channels shall than cancel their currently running
80-
/// request.
80+
/// request. This property must only be accessed from the connections `EventLoop`.
8181
private var openStreams = Set<ChannelBox>()
8282
let id: HTTPConnectionPool.Connection.ID
8383
let decompression: HTTPClient.Decompression
@@ -241,7 +241,7 @@ final class HTTP2Connection {
241241
// before.
242242
let box = ChannelBox(channel)
243243
self.openStreams.insert(box)
244-
self.channel.closeFuture.whenComplete { _ in
244+
channel.closeFuture.whenComplete { _ in
245245
self.openStreams.remove(box)
246246
}
247247

Tests/AsyncHTTPClientTests/HTTP2ClientRequestHandlerTests+XCTest.swift

+2
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ extension HTTP2ClientRequestHandlerTests {
3030
("testIdleReadTimeout", testIdleReadTimeout),
3131
("testIdleReadTimeoutIsCanceledIfRequestIsCanceled", testIdleReadTimeoutIsCanceledIfRequestIsCanceled),
3232
("testWriteHTTPHeadFails", testWriteHTTPHeadFails),
33+
("testChannelIsClosedAfterRequest", testChannelIsClosedAfterRequest),
34+
("testRequestFinishesAfterResponseWasReceived", testRequestFinishesAfterResponseWasReceived),
3335
]
3436
}
3537
}

Tests/AsyncHTTPClientTests/HTTP2ClientRequestHandlerTests.swift

+131-1
Original file line numberDiff line numberDiff line change
@@ -275,8 +275,10 @@ class HTTP2ClientRequestHandlerTests: XCTestCase {
275275
XCTAssertEqual(readEventHandler.readHitCounter, 1)
276276
XCTAssertNoThrow(try embedded.writeInbound(HTTPClientResponsePart.head(responseHead)))
277277

278+
XCTAssertTrue(embedded.isActive)
278279
// canceling the request
279280
requestBag.cancel()
281+
XCTAssertFalse(embedded.isActive, "The stream is closed")
280282
XCTAssertThrowsError(try requestBag.task.futureResult.wait()) {
281283
XCTAssertEqual($0 as? HTTPClientError, .cancelled)
282284
}
@@ -335,14 +337,142 @@ class HTTP2ClientRequestHandlerTests: XCTestCase {
335337

336338
// the handler only writes once the channel is writable
337339
XCTAssertEqual(try embedded.readOutbound(as: HTTPClientRequestPart.self), .none)
340+
XCTAssertTrue(embedded.isActive)
338341
embedded.isWritable = true
339342
embedded.pipeline.fireChannelWritabilityChanged()
340343

341344
XCTAssertThrowsError(try requestBag.task.futureResult.wait()) {
342345
XCTAssertEqual($0 as? WriteError, WriteError())
343346
}
344347

345-
XCTAssertEqual(embedded.isActive, false)
348+
XCTAssertFalse(embedded.isActive)
346349
}
347350
}
351+
352+
func testChannelIsClosedAfterRequest() {
353+
let embedded = EmbeddedChannel()
354+
let requestHandler = HTTP2ClientRequestHandler(eventLoop: embedded.eventLoop)
355+
let logger = Logger(label: "test")
356+
357+
XCTAssertNoThrow(try embedded.pipeline.syncOperations.addHandlers([requestHandler]))
358+
359+
var maybeRequest: HTTPClient.Request?
360+
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/"))
361+
guard let request = maybeRequest else { return XCTFail("Expected to be able to create a request") }
362+
363+
let delegate = ResponseAccumulator(request: request)
364+
var maybeRequestBag: RequestBag<ResponseAccumulator>?
365+
XCTAssertNoThrow(maybeRequestBag = try RequestBag(
366+
request: request,
367+
eventLoopPreference: .delegate(on: embedded.eventLoop),
368+
task: .init(eventLoop: embedded.eventLoop, logger: logger),
369+
redirectHandler: nil,
370+
connectionDeadline: .now() + .seconds(30),
371+
requestOptions: .forTests(),
372+
delegate: delegate
373+
))
374+
guard let requestBag = maybeRequestBag else { return XCTFail("Expected to be able to create a request bag") }
375+
376+
embedded.write(requestBag, promise: nil)
377+
XCTAssertNoThrow(try embedded.connect(to: .makeAddressResolvingHost("localhost", port: 0)).wait())
378+
379+
XCTAssertNoThrow(try embedded.receiveHeadAndVerify {
380+
XCTAssertEqual($0.method, .GET)
381+
XCTAssertEqual($0.uri, "/")
382+
XCTAssertEqual($0.headers.first(name: "host"), "localhost")
383+
})
384+
XCTAssertEqual(try embedded.readOutbound(as: HTTPClientRequestPart.self), .end(nil))
385+
386+
let responseHead = HTTPResponseHead(version: .http1_1, status: .ok, headers: HTTPHeaders([("content-length", "4")]))
387+
388+
XCTAssertNoThrow(try embedded.writeInbound(HTTPClientResponsePart.head(responseHead)))
389+
XCTAssertNoThrow(try embedded.writeInbound(HTTPClientResponsePart.body(ByteBuffer(bytes: 0...3))))
390+
XCTAssertTrue(embedded.isActive)
391+
XCTAssertNoThrow(try embedded.writeInbound(HTTPClientResponsePart.end(nil)))
392+
XCTAssertFalse(embedded.isActive)
393+
394+
XCTAssertNoThrow(try requestBag.task.futureResult.wait())
395+
}
396+
397+
func testRequestFinishesAfterResponseWasReceived() {
398+
let embedded = EmbeddedChannel()
399+
let requestHandler = HTTP2ClientRequestHandler(eventLoop: embedded.eventLoop)
400+
XCTAssertNoThrow(try embedded.pipeline.syncOperations.addHandler(requestHandler))
401+
let logger = Logger(label: "test")
402+
403+
let testWriter = TestBackpressureWriter(eventLoop: embedded.eventLoop, parts: 50)
404+
405+
var maybeRequest: HTTPClient.Request?
406+
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST, body: .stream(length: 100) { writer in
407+
testWriter.start(writer: writer)
408+
}))
409+
guard let request = maybeRequest else { return XCTFail("Expected to be able to create a request") }
410+
411+
let delegate = ResponseAccumulator(request: request)
412+
var maybeRequestBag: RequestBag<ResponseAccumulator>?
413+
XCTAssertNoThrow(maybeRequestBag = try RequestBag(
414+
request: request,
415+
eventLoopPreference: .delegate(on: embedded.eventLoop),
416+
task: .init(eventLoop: embedded.eventLoop, logger: logger),
417+
redirectHandler: nil,
418+
connectionDeadline: .now() + .seconds(30),
419+
requestOptions: .forTests(idleReadTimeout: .milliseconds(200)),
420+
delegate: delegate
421+
))
422+
guard let requestBag = maybeRequestBag else { return XCTFail("Expected to be able to create a request bag") }
423+
424+
embedded.isWritable = false
425+
XCTAssertNoThrow(try embedded.connect(to: .makeAddressResolvingHost("localhost", port: 0)).wait())
426+
embedded.write(requestBag, promise: nil)
427+
428+
// the handler only writes once the channel is writable
429+
XCTAssertEqual(try embedded.readOutbound(as: HTTPClientRequestPart.self), .none)
430+
embedded.isWritable = true
431+
testWriter.writabilityChanged(true)
432+
embedded.pipeline.fireChannelWritabilityChanged()
433+
434+
// receive the head
435+
XCTAssertNoThrow(try embedded.receiveHeadAndVerify {
436+
XCTAssertEqual($0.method, .POST)
437+
XCTAssertEqual($0.uri, "/")
438+
XCTAssertEqual($0.headers.first(name: "host"), "localhost")
439+
XCTAssertEqual($0.headers.first(name: "content-length"), "100")
440+
})
441+
442+
// send full response right away
443+
let responseHead = HTTPResponseHead(version: .http1_1, status: .ok)
444+
XCTAssertNoThrow(try embedded.writeInbound(HTTPClientResponsePart.head(responseHead)))
445+
embedded.read()
446+
XCTAssertNoThrow(try embedded.writeInbound(HTTPClientResponsePart.end(nil)))
447+
XCTAssertTrue(embedded.isActive)
448+
449+
// the next body write will be executed once we tick the el. before we make the channel
450+
// unwritable
451+
452+
for index in 0..<50 {
453+
embedded.isWritable = false
454+
testWriter.writabilityChanged(false)
455+
embedded.pipeline.fireChannelWritabilityChanged()
456+
457+
XCTAssertEqual(testWriter.written, index)
458+
459+
embedded.embeddedEventLoop.run()
460+
461+
XCTAssertNoThrow(try embedded.receiveBodyAndVerify {
462+
XCTAssertEqual($0.readableBytes, 2)
463+
})
464+
465+
XCTAssertEqual(testWriter.written, index + 1)
466+
467+
embedded.isWritable = true
468+
testWriter.writabilityChanged(true)
469+
embedded.pipeline.fireChannelWritabilityChanged()
470+
}
471+
472+
embedded.embeddedEventLoop.run()
473+
XCTAssertNoThrow(try embedded.receiveEnd())
474+
XCTAssertFalse(embedded.isActive, "Expected the channel to be closed after last write")
475+
476+
XCTAssertNoThrow(try requestBag.task.futureResult.wait())
477+
}
348478
}

0 commit comments

Comments
 (0)