Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ extension Transaction {
case none
}

mutating func succeedRequest(_ newChunks: CircularBuffer<ByteBuffer>?) -> ReceiveResponseEndAction {
mutating func receiveResponseEnd(_ newChunks: CircularBuffer<ByteBuffer>?) -> ReceiveResponseEndAction {
switch self.state {
case .initialized,
.queued,
Expand All @@ -479,8 +479,14 @@ extension Transaction {
)

case .executing(let context, let requestState, .streamingBody(let source)):
self.state = .executing(context, requestState, .finished)
switch requestState {
case .finished:
self.state = .finished(error: nil)
case .paused, .producing, .requestHeadSent:
self.state = .executing(context, requestState, .finished)
}
return .finishResponseStream(source, finalBody: newChunks)

case .finished:
// the request failed or was cancelled before, we can ignore all events
return .none
Expand Down
8 changes: 4 additions & 4 deletions Sources/AsyncHTTPClient/AsyncAwait/Transaction.swift
Original file line number Diff line number Diff line change
Expand Up @@ -286,11 +286,11 @@ extension Transaction: HTTPExecutableRequest {
}
}

func succeedRequest(_ buffer: CircularBuffer<ByteBuffer>?) {
let succeedAction = self.state.withLockedValue { state in
state.succeedRequest(buffer)
func receiveResponseEnd(_ buffer: CircularBuffer<ByteBuffer>?, trailers: HTTPHeaders?) {
let receiveResponseEndAction = self.state.withLockedValue { state in
state.receiveResponseEnd(buffer)
}
switch succeedAction {
switch receiveResponseEndAction {
case .finishResponseStream(let source, let finalResponse):
if let finalResponse = finalResponse {
_ = source.yield(contentsOf: finalResponse)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
switch finalAction {
case .close:
context.close(promise: nil)
oldRequest.succeedRequest(buffer)
oldRequest.receiveResponseEnd(buffer, trailers: nil)
case .sendRequestEnd(let writePromise, let shouldClose):
let writePromise = writePromise ?? context.eventLoop.makePromise(of: Void.self)
// We need to defer succeeding the old request to avoid ordering issues
Expand All @@ -336,7 +336,7 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
self.onConnectionIdle()
}

oldRequest.succeedRequest(buffer)
oldRequest.receiveResponseEnd(buffer, trailers: nil)
case .failure(let error):
context.close(promise: nil)
oldRequest.fail(error)
Expand All @@ -346,7 +346,7 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: writePromise)
case .informConnectionIsIdle:
self.onConnectionIdle()
oldRequest.succeedRequest(buffer)
oldRequest.receiveResponseEnd(buffer, trailers: nil)
}

case .failRequest(let error, let finalAction):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ final class HTTP2ClientRequestHandler: ChannelDuplexHandler {
case .succeedRequest(let finalAction, let finalParts):
// We can force unwrap the request here, as we have just validated in the state machine,
// that the request object is still present.
self.request!.succeedRequest(finalParts)
self.request!.receiveResponseEnd(finalParts, trailers: nil)
self.request = nil
self.runTimeoutAction(.clearIdleReadTimeoutTimer, context: context)
self.runTimeoutAction(.clearIdleWriteTimeoutTimer, context: context)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,10 +260,12 @@ protocol HTTPExecutableRequest: AnyObject, Sendable {
/// to ask for more data.
func receiveResponseBodyParts(_ buffer: CircularBuffer<ByteBuffer>)

/// Succeeds the executing request. The executor will not call any further methods on the request after this method.
/// Finishes the server response.
///
/// - Parameter buffer: The remaining response body parts, that were received before the request end
func succeedRequest(_ buffer: CircularBuffer<ByteBuffer>?)
/// - Parameters:
/// - buffer: The remaining response body parts, that were received before the response end
/// - trailers: The response trailers if any where received. Nil means no trailers were received.
func receiveResponseEnd(_ buffer: CircularBuffer<ByteBuffer>?, trailers: HTTPHeaders?)

/// Fails the executing request, with an error.
func fail(_ error: Error)
Expand Down
2 changes: 1 addition & 1 deletion Sources/AsyncHTTPClient/RequestBag+StateMachine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ extension RequestBag.StateMachine {
case none
}

mutating func succeedRequest(_ newChunks: CircularBuffer<ByteBuffer>?) -> ReceiveResponseEndAction {
mutating func receiveResponseEnd(_ newChunks: CircularBuffer<ByteBuffer>?) -> ReceiveResponseEndAction {
switch self.state {
case .initialized, .queued, .deadlineExceededWhileQueued:
preconditionFailure("How can we receive a response body part, if the request hasn't started yet.")
Expand Down
10 changes: 5 additions & 5 deletions Sources/AsyncHTTPClient/RequestBag.swift
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,8 @@ final class RequestBag<Delegate: HTTPClientResponseDelegate & Sendable>: Sendabl
}
}

private func succeedRequest0(_ buffer: CircularBuffer<ByteBuffer>?) {
let action = self.loopBoundState.value.state.succeedRequest(buffer)
private func receiveResponseEnd0(_ buffer: CircularBuffer<ByteBuffer>?) {
let action = self.loopBoundState.value.state.receiveResponseEnd(buffer)

switch action {
case .none:
Expand Down Expand Up @@ -538,12 +538,12 @@ extension RequestBag: HTTPExecutableRequest {
}
}

func succeedRequest(_ buffer: CircularBuffer<ByteBuffer>?) {
func receiveResponseEnd(_ buffer: CircularBuffer<ByteBuffer>?, trailers: HTTPHeaders?) {
if self.task.eventLoop.inEventLoop {
self.succeedRequest0(buffer)
self.receiveResponseEnd0(buffer)
} else {
self.task.eventLoop.execute {
self.succeedRequest0(buffer)
self.receiveResponseEnd0(buffer)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ final private class MockScheduledRequest: HTTPSchedulableRequest {
preconditionFailure("Unimplemented")
}

func succeedRequest(_: CircularBuffer<ByteBuffer>?) {
func receiveResponseEnd(_ buffer: CircularBuffer<ByteBuffer>?, trailers: HTTPHeaders?) {
preconditionFailure("Unimplemented")
}
}
2 changes: 1 addition & 1 deletion Tests/AsyncHTTPClientTests/Mocks/MockConnectionPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,7 @@ final class MockHTTPScheduableRequest: HTTPSchedulableRequest {
preconditionFailure("Unimplemented")
}

func succeedRequest(_: CircularBuffer<ByteBuffer>?) {
func receiveResponseEnd(_ buffer: CircularBuffer<ByteBuffer>?, trailers: HTTPHeaders?) {
preconditionFailure("Unimplemented")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ final class MockHTTPExecutableRequest: HTTPExecutableRequest {
receiveResponseBodyPartsCallback(buffer)
}

func succeedRequest(_ buffer: CircularBuffer<NIOCore.ByteBuffer>?) {
func receiveResponseEnd(_ buffer: CircularBuffer<ByteBuffer>?, trailers: HTTPHeaders?) {
self.events.append(.succeedRequest(buffer))
guard let succeedRequestCallback = succeedRequestCallback else {
return self.calledUnimplementedMethod(#function)
Expand Down
10 changes: 5 additions & 5 deletions Tests/AsyncHTTPClientTests/RequestBagTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ final class RequestBagTests: XCTestCase {
}

XCTAssertEqual(delegate.hitDidReceiveResponse, 0)
bag.succeedRequest(nil)
bag.receiveResponseEnd(nil, trailers: nil)
XCTAssertEqual(delegate.hitDidReceiveResponse, 1)

XCTAssertNoThrow(try bag.task.futureResult.wait(), "The request has succeeded")
Expand Down Expand Up @@ -568,7 +568,7 @@ final class RequestBagTests: XCTestCase {

bag.receiveResponseHead(.init(version: .http1_1, status: .ok))

bag.succeedRequest([ByteBuffer([1])])
bag.receiveResponseEnd([ByteBuffer([1])], trailers: nil)

XCTAssertThrowsError(try delegate.didFinishPromise.futureResult.wait()) { error in
XCTAssertEqualTypeAndValue(error, MyError())
Expand Down Expand Up @@ -640,7 +640,7 @@ final class RequestBagTests: XCTestCase {
// receive a 301 response immediately.
bag.receiveResponseHead(.init(version: .http1_1, status: .movedPermanently))
XCTAssertNoThrow(try XCTUnwrap(delegate.backpressurePromise).succeed(()))
bag.succeedRequest(.init())
bag.receiveResponseEnd([], trailers: nil)

// if we now write our second part of the response this should fail the backpressure promise
writeSecondPartPromise.succeed(())
Expand Down Expand Up @@ -695,7 +695,7 @@ final class RequestBagTests: XCTestCase {
XCTAssertEqual(delegate.hitDidReceiveBodyPart, 2)

// the remote closes the connection, which leads to more data and a succeed of the request
bag.succeedRequest([ByteBuffer(string: "baz")])
bag.receiveResponseEnd([ByteBuffer(string: "baz")], trailers: nil)
XCTAssertEqual(delegate.hitDidReceiveBodyPart, 2)

XCTAssertNoThrow(try XCTUnwrap(delegate.backpressurePromise).succeed(()))
Expand Down Expand Up @@ -785,7 +785,7 @@ final class RequestBagTests: XCTestCase {

XCTAssertEqual(delegate.hitDidReceiveBodyPart, 0)
XCTAssertFalse(executor.signalledDemandForResponseBody)
bag.succeedRequest([ByteBuffer(repeating: 2, count: 1024)])
bag.receiveResponseEnd([ByteBuffer(repeating: 2, count: 1024)], trailers: nil)
XCTAssertFalse(executor.signalledDemandForResponseBody)
XCTAssertEqual(delegate.hitDidReceiveResponse, 0)
XCTAssertNil(delegate.backpressurePromise)
Expand Down
8 changes: 4 additions & 4 deletions Tests/AsyncHTTPClientTests/TransactionTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ final class TransactionTests: XCTestCase {
async let part = iterator.next()
XCTAssertNoThrow(try executor.receiveResponseDemand())
executor.resetResponseStreamDemandSignal()
transaction.succeedRequest([])
transaction.receiveResponseEnd([], trailers: nil)
let result = try await part
XCTAssertNil(result)
}
Expand Down Expand Up @@ -225,7 +225,7 @@ final class TransactionTests: XCTestCase {

// doesn't crash if receives more data because of race
transaction.receiveResponseBodyParts([ByteBuffer(string: "foo bar")])
transaction.succeedRequest(nil)
transaction.receiveResponseEnd(nil, trailers: nil)
}
}

Expand Down Expand Up @@ -297,7 +297,7 @@ final class TransactionTests: XCTestCase {

XCTAssertNoThrow(try executor.receiveResponseDemand())
executor.resetResponseStreamDemandSignal()
transaction.succeedRequest([])
transaction.receiveResponseEnd([], trailers: nil)
let result = try await part
XCTAssertNil(result)
}
Expand Down Expand Up @@ -391,7 +391,7 @@ final class TransactionTests: XCTestCase {

let responseHead = HTTPResponseHead(version: .http1_1, status: .ok, headers: ["foo": "bar"])
transaction.receiveResponseHead(responseHead)
transaction.succeedRequest(nil)
transaction.receiveResponseEnd(nil, trailers: nil)

let response = try await responseTask.value
XCTAssertEqual(response.status, .ok)
Expand Down
Loading