Skip to content

Commit 47c3087

Browse files
committed
Audit promise handling
1 parent 5780c10 commit 47c3087

File tree

6 files changed

+64
-1
lines changed

6 files changed

+64
-1
lines changed

Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift

+1
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,7 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
337337
// the request has been popped by the state machine or the ChannelHandler has been
338338
// removed from the Channel pipeline. This is a normal threading issue, noone has
339339
// screwed up.
340+
promise?.fail(ChannelError.eof)
340341
return
341342
}
342343

Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2ClientRequestHandler.swift

+1
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,7 @@ final class HTTP2ClientRequestHandler: ChannelDuplexHandler {
288288
// the request has been popped by the state machine or the ChannelHandler has been
289289
// removed from the Channel pipeline. This is a normal threading issue, noone has
290290
// screwed up.
291+
promise?.fail(ChannelError.eof)
291292
return
292293
}
293294

Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift

+4
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,7 @@ struct HTTPRequestStateMachine {
274274
// won't be interested. We expect that the producer has been informed to pause
275275
// producing.
276276
assert(producerState == .paused)
277+
promise?.fail(HTTPClientError.requestStreamCancelled)
277278
return .wait
278279

279280
case .running(.streaming(let expectedBodyLength, var sentBodyBytes, let producerState), let responseState):
@@ -290,6 +291,7 @@ struct HTTPRequestStateMachine {
290291
if let expected = expectedBodyLength, sentBodyBytes + part.readableBytes > expected {
291292
let error = HTTPClientError.bodyLengthMismatch
292293
self.state = .failed(error)
294+
promise?.fail(error)
293295
return .failRequest(error, .close)
294296
}
295297

@@ -306,6 +308,7 @@ struct HTTPRequestStateMachine {
306308
return .sendBodyPart(part, promise)
307309

308310
case .failed:
311+
promise?.fail(HTTPClientError.failed)
309312
return .wait
310313

311314
case .finished:
@@ -318,6 +321,7 @@ struct HTTPRequestStateMachine {
318321

319322
// We may still receive something, here because of potential race conditions with the
320323
// producing thread.
324+
promise?.fail(ChannelError.eof)
321325
return .wait
322326

323327
case .modifying:

Sources/AsyncHTTPClient/HTTPClient.swift

+3
Original file line numberDiff line numberDiff line change
@@ -912,6 +912,7 @@ public struct HTTPClientError: Error, Equatable, CustomStringConvertible {
912912
case unsupportedScheme(String)
913913
case readTimeout
914914
case remoteConnectionClosed
915+
case failed
915916
case cancelled
916917
case identityCodingIncorrectlyPresent
917918
@available(*, deprecated, message: "AsyncHTTPClient now silently corrects this invalid header.")
@@ -965,6 +966,8 @@ public struct HTTPClientError: Error, Equatable, CustomStringConvertible {
965966
public static let readTimeout = HTTPClientError(code: .readTimeout)
966967
/// Remote connection was closed unexpectedly.
967968
public static let remoteConnectionClosed = HTTPClientError(code: .remoteConnectionClosed)
969+
/// Request failed.
970+
public static let failed = HTTPClientError(code: .failed)
968971
/// Request was cancelled.
969972
public static let cancelled = HTTPClientError(code: .cancelled)
970973
/// Request contains invalid identity encoding.

Tests/AsyncHTTPClientTests/HTTP2ClientRequestHandlerTests.swift

+33
Original file line numberDiff line numberDiff line change
@@ -345,4 +345,37 @@ class HTTP2ClientRequestHandlerTests: XCTestCase {
345345
XCTAssertEqual(embedded.isActive, false)
346346
}
347347
}
348+
349+
func test() throws {
350+
let embeddedEventLoop = EmbeddedEventLoop()
351+
let requestHandler = HTTP2ClientRequestHandler(eventLoop: embeddedEventLoop)
352+
353+
let logger = Logger(label: "test")
354+
355+
var maybeRequest: HTTPClient.Request?
356+
XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "http://localhost/", method: .POST))
357+
guard let request = maybeRequest else { return XCTFail("Expected to be able to create a request") }
358+
359+
let delegate = ResponseAccumulator(request: request)
360+
var maybeRequestBag: RequestBag<ResponseAccumulator>?
361+
XCTAssertNoThrow(maybeRequestBag = try RequestBag(
362+
request: request,
363+
eventLoopPreference: .delegate(on: embeddedEventLoop),
364+
task: .init(eventLoop: embeddedEventLoop, logger: logger),
365+
redirectHandler: nil,
366+
connectionDeadline: .now() + .seconds(30),
367+
requestOptions: .forTests(idleReadTimeout: .milliseconds(200)),
368+
delegate: delegate
369+
))
370+
guard let requestBag = maybeRequestBag else { return XCTFail("Expected to be able to create a request bag") }
371+
372+
let promise = embeddedEventLoop.makePromise(of: Void.self)
373+
requestHandler.writeRequestBodyPart(
374+
.byteBuffer(.init()),
375+
request: requestBag,
376+
promise: promise
377+
)
378+
379+
XCTAssertThrowsError(try promise.futureResult.wait())
380+
}
348381
}

Tests/AsyncHTTPClientTests/HTTPRequestStateMachineTests.swift

+22-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
@testable import AsyncHTTPClient
1616
import NIOCore
17+
import NIOEmbedded
1718
import NIOHTTP1
1819
import NIOSSL
1920
import XCTest
@@ -94,6 +95,8 @@ class HTTPRequestStateMachineTests: XCTestCase {
9495
}
9596

9697
func testRequestBodyStreamIsCancelledIfServerRespondsWith301() {
98+
let eventLoop = EmbeddedEventLoop()
99+
let promise = eventLoop.makePromise(of: Void.self)
97100
var state = HTTPRequestStateMachine(isChannelWritable: true)
98101
let requestHead = HTTPRequestHead(version: .http1_1, method: .POST, uri: "/", headers: HTTPHeaders([("content-length", "12")]))
99102
let metadata = RequestFramingMetadata(connectionClose: false, body: .fixedSize(12))
@@ -106,8 +109,9 @@ class HTTPRequestStateMachineTests: XCTestCase {
106109
XCTAssertEqual(state.channelRead(.head(responseHead)), .forwardResponseHead(responseHead, pauseRequestBodyStream: true))
107110
XCTAssertEqual(state.writabilityChanged(writable: false), .wait)
108111
XCTAssertEqual(state.writabilityChanged(writable: true), .wait)
109-
XCTAssertEqual(state.requestStreamPartReceived(part, promise: nil), .wait,
112+
XCTAssertEqual(state.requestStreamPartReceived(part, promise: promise), .wait,
110113
"Expected to drop all stream data after having received a response head, with status >= 300")
114+
XCTAssertThrowsError(try promise.futureResult.wait())
111115

112116
XCTAssertEqual(state.channelRead(.end(nil)), .succeedRequest(.close, .init()))
113117

@@ -118,6 +122,18 @@ class HTTPRequestStateMachineTests: XCTestCase {
118122
"Expected to drop all stream data after having received a response head, with status >= 300")
119123
}
120124

125+
func testStreamPartReceived_whenCanceleld() {
126+
let eventLoop = EmbeddedEventLoop()
127+
let promise = eventLoop.makePromise(of: Void.self)
128+
var state = HTTPRequestStateMachine(isChannelWritable: false)
129+
let part = IOData.byteBuffer(ByteBuffer(bytes: [0, 1, 2, 3]))
130+
131+
XCTAssertEqual(state.requestCancelled(), .failRequest(HTTPClientError.cancelled, .none))
132+
XCTAssertEqual(state.requestStreamPartReceived(part, promise: promise), .wait,
133+
"Expected to drop all stream data after having received a response head, with status >= 300")
134+
XCTAssertThrowsError(try promise.futureResult.wait())
135+
}
136+
121137
func testRequestBodyStreamIsCancelledIfServerRespondsWith301WhileWriteBackpressure() {
122138
var state = HTTPRequestStateMachine(isChannelWritable: true)
123139
let requestHead = HTTPRequestHead(version: .http1_1, method: .POST, uri: "/", headers: HTTPHeaders([("content-length", "12")]))
@@ -144,6 +160,8 @@ class HTTPRequestStateMachineTests: XCTestCase {
144160
}
145161

146162
func testRequestBodyStreamIsContinuedIfServerRespondsWith200() {
163+
let eventLoop = EmbeddedEventLoop()
164+
let promise = eventLoop.makePromise(of: Void.self)
147165
var state = HTTPRequestStateMachine(isChannelWritable: true)
148166
let requestHead = HTTPRequestHead(version: .http1_1, method: .POST, uri: "/", headers: HTTPHeaders([("content-length", "12")]))
149167
let metadata = RequestFramingMetadata(connectionClose: false, body: .fixedSize(12))
@@ -161,6 +179,9 @@ class HTTPRequestStateMachineTests: XCTestCase {
161179
let part2 = IOData.byteBuffer(ByteBuffer(bytes: 8...11))
162180
XCTAssertEqual(state.requestStreamPartReceived(part2, promise: nil), .sendBodyPart(part2, nil))
163181
XCTAssertEqual(state.requestStreamFinished(), .succeedRequest(.sendRequestEnd, .init()))
182+
183+
XCTAssertEqual(state.requestStreamPartReceived(part2, promise: promise), .wait)
184+
XCTAssertThrowsError(try promise.futureResult.wait())
164185
}
165186

166187
func testRequestBodyStreamIsContinuedIfServerSendHeadWithStatus200() {

0 commit comments

Comments
 (0)