Skip to content

Commit ebafa1e

Browse files
committed
Call didSendRequestPart at the right time
# Motivation Right now, we call `didSendRequestPart` after passing the write to the executor. However, this does not mean that the write hit the socket. To implement proper backpressure using the delegate, we should only call this method once the write was successful. # Modification Pass a promise to the actual channel write and only call the delegate once that promise succeeds. # Result The delegate method `didSendRequestPart` is only called after the write was successful.
1 parent 71af9c7 commit ebafa1e

10 files changed

+65
-62
lines changed

Sources/AsyncHTTPClient/AsyncAwait/Transaction.swift

+3-3
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ final class Transaction: @unchecked Sendable {
6363

6464
switch writeAction {
6565
case .writeAndWait(let executor), .writeAndContinue(let executor):
66-
executor.writeRequestBodyPart(.byteBuffer(byteBuffer), request: self)
66+
executor.writeRequestBodyPart(.byteBuffer(byteBuffer), request: self, promise: nil)
6767

6868
case .fail:
6969
// an error/cancellation has happened. we don't need to continue here
@@ -105,14 +105,14 @@ final class Transaction: @unchecked Sendable {
105105
switch self.state.writeNextRequestPart() {
106106
case .writeAndContinue(let executor):
107107
self.stateLock.unlock()
108-
executor.writeRequestBodyPart(.byteBuffer(part), request: self)
108+
executor.writeRequestBodyPart(.byteBuffer(part), request: self, promise: nil)
109109

110110
case .writeAndWait(let executor):
111111
try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Void, Error>) in
112112
self.state.waitForRequestBodyDemand(continuation: continuation)
113113
self.stateLock.unlock()
114114

115-
executor.writeRequestBodyPart(.byteBuffer(part), request: self)
115+
executor.writeRequestBodyPart(.byteBuffer(part), request: self, promise: nil)
116116
}
117117

118118
case .fail:

Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift

+7-7
Original file line numberDiff line numberDiff line change
@@ -201,8 +201,8 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
201201
}
202202
}
203203

204-
case .sendBodyPart(let part):
205-
context.writeAndFlush(self.wrapOutboundOut(.body(part)), promise: nil)
204+
case .sendBodyPart(let part, let promise):
205+
context.writeAndFlush(self.wrapOutboundOut(.body(part)), promise: promise)
206206

207207
case .sendRequestEnd:
208208
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
@@ -330,7 +330,7 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
330330

331331
// MARK: Private HTTPRequestExecutor
332332

333-
private func writeRequestBodyPart0(_ data: IOData, request: HTTPExecutableRequest) {
333+
private func writeRequestBodyPart0(_ data: IOData, request: HTTPExecutableRequest, promise: EventLoopPromise<Void>?) {
334334
guard self.request === request, let context = self.channelContext else {
335335
// Because the HTTPExecutableRequest may run in a different thread to our eventLoop,
336336
// calls from the HTTPExecutableRequest to our ChannelHandler may arrive here after
@@ -340,7 +340,7 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
340340
return
341341
}
342342

343-
let action = self.state.requestStreamPartReceived(data)
343+
let action = self.state.requestStreamPartReceived(data, promise: promise)
344344
self.run(action, context: context)
345345
}
346346

@@ -380,12 +380,12 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
380380
}
381381

382382
extension HTTP1ClientChannelHandler: HTTPRequestExecutor {
383-
func writeRequestBodyPart(_ data: IOData, request: HTTPExecutableRequest) {
383+
func writeRequestBodyPart(_ data: IOData, request: HTTPExecutableRequest, promise: EventLoopPromise<Void>?) {
384384
if self.eventLoop.inEventLoop {
385-
self.writeRequestBodyPart0(data, request: request)
385+
self.writeRequestBodyPart0(data, request: request, promise: promise)
386386
} else {
387387
self.eventLoop.execute {
388-
self.writeRequestBodyPart0(data, request: request)
388+
self.writeRequestBodyPart0(data, request: request, promise: promise)
389389
}
390390
}
391391
}

Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ConnectionStateMachine.swift

+5-5
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ struct HTTP1ConnectionStateMachine {
4141
}
4242

4343
case sendRequestHead(HTTPRequestHead, startBody: Bool)
44-
case sendBodyPart(IOData)
44+
case sendBodyPart(IOData, EventLoopPromise<Void>?)
4545
case sendRequestEnd
4646

4747
case pauseRequestBodyStream
@@ -189,13 +189,13 @@ struct HTTP1ConnectionStateMachine {
189189
}
190190
}
191191

192-
mutating func requestStreamPartReceived(_ part: IOData) -> Action {
192+
mutating func requestStreamPartReceived(_ part: IOData, promise: EventLoopPromise<Void>?) -> Action {
193193
guard case .inRequest(var requestStateMachine, let close) = self.state else {
194194
preconditionFailure("Invalid state: \(self.state)")
195195
}
196196

197197
return self.avoidingStateMachineCoW { state -> Action in
198-
let action = requestStateMachine.requestStreamPartReceived(part)
198+
let action = requestStateMachine.requestStreamPartReceived(part, promise: promise)
199199
state = .inRequest(requestStateMachine, close: close)
200200
return state.modify(with: action)
201201
}
@@ -377,8 +377,8 @@ extension HTTP1ConnectionStateMachine.State {
377377
return .pauseRequestBodyStream
378378
case .resumeRequestBodyStream:
379379
return .resumeRequestBodyStream
380-
case .sendBodyPart(let part):
381-
return .sendBodyPart(part)
380+
case .sendBodyPart(let part, let promise):
381+
return .sendBodyPart(part, promise)
382382
case .sendRequestEnd:
383383
return .sendRequestEnd
384384
case .forwardResponseHead(let head, let pauseRequestBodyStream):

Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2ClientRequestHandler.swift

+7-7
Original file line numberDiff line numberDiff line change
@@ -148,8 +148,8 @@ final class HTTP2ClientRequestHandler: ChannelDuplexHandler {
148148
// that the request is neither failed nor finished yet
149149
self.request!.pauseRequestBodyStream()
150150

151-
case .sendBodyPart(let data):
152-
context.writeAndFlush(self.wrapOutboundOut(.body(data)), promise: nil)
151+
case .sendBodyPart(let data, let promise):
152+
context.writeAndFlush(self.wrapOutboundOut(.body(data)), promise: promise)
153153

154154
case .sendRequestEnd:
155155
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
@@ -281,7 +281,7 @@ final class HTTP2ClientRequestHandler: ChannelDuplexHandler {
281281

282282
// MARK: Private HTTPRequestExecutor
283283

284-
private func writeRequestBodyPart0(_ data: IOData, request: HTTPExecutableRequest) {
284+
private func writeRequestBodyPart0(_ data: IOData, request: HTTPExecutableRequest, promise: EventLoopPromise<Void>?) {
285285
guard self.request === request, let context = self.channelContext else {
286286
// Because the HTTPExecutableRequest may run in a different thread to our eventLoop,
287287
// calls from the HTTPExecutableRequest to our ChannelHandler may arrive here after
@@ -291,7 +291,7 @@ final class HTTP2ClientRequestHandler: ChannelDuplexHandler {
291291
return
292292
}
293293

294-
let action = self.state.requestStreamPartReceived(data)
294+
let action = self.state.requestStreamPartReceived(data, promise: promise)
295295
self.run(action, context: context)
296296
}
297297

@@ -327,12 +327,12 @@ final class HTTP2ClientRequestHandler: ChannelDuplexHandler {
327327
}
328328

329329
extension HTTP2ClientRequestHandler: HTTPRequestExecutor {
330-
func writeRequestBodyPart(_ data: IOData, request: HTTPExecutableRequest) {
330+
func writeRequestBodyPart(_ data: IOData, request: HTTPExecutableRequest, promise: EventLoopPromise<Void>?) {
331331
if self.eventLoop.inEventLoop {
332-
self.writeRequestBodyPart0(data, request: request)
332+
self.writeRequestBodyPart0(data, request: request, promise: promise)
333333
} else {
334334
self.eventLoop.execute {
335-
self.writeRequestBodyPart0(data, request: request)
335+
self.writeRequestBodyPart0(data, request: request, promise: promise)
336336
}
337337
}
338338
}

Sources/AsyncHTTPClient/ConnectionPool/HTTPExecutableRequest.swift

+1-1
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ protocol HTTPRequestExecutor {
180180
/// Writes a body part into the channel pipeline
181181
///
182182
/// This method may be **called on any thread**. The executor needs to ensure thread safety.
183-
func writeRequestBodyPart(_: IOData, request: HTTPExecutableRequest)
183+
func writeRequestBodyPart(_: IOData, request: HTTPExecutableRequest, promise: EventLoopPromise<Void>?)
184184

185185
/// Signals that the request body stream has finished
186186
///

Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift

+3-3
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ struct HTTPRequestStateMachine {
8383
}
8484

8585
case sendRequestHead(HTTPRequestHead, startBody: Bool)
86-
case sendBodyPart(IOData)
86+
case sendBodyPart(IOData, EventLoopPromise<Void>?)
8787
case sendRequestEnd
8888

8989
case pauseRequestBodyStream
@@ -261,7 +261,7 @@ struct HTTPRequestStateMachine {
261261
}
262262
}
263263

264-
mutating func requestStreamPartReceived(_ part: IOData) -> Action {
264+
mutating func requestStreamPartReceived(_ part: IOData, promise: EventLoopPromise<Void>?) -> Action {
265265
switch self.state {
266266
case .initialized,
267267
.waitForChannelToBecomeWritable,
@@ -303,7 +303,7 @@ struct HTTPRequestStateMachine {
303303

304304
self.state = .running(requestState, responseState)
305305

306-
return .sendBodyPart(part)
306+
return .sendBodyPart(part, promise)
307307

308308
case .failed:
309309
return .wait

Sources/AsyncHTTPClient/RequestBag.swift

+5-2
Original file line numberDiff line numberDiff line change
@@ -154,8 +154,11 @@ final class RequestBag<Delegate: HTTPClientResponseDelegate> {
154154
return self.task.eventLoop.makeFailedFuture(error)
155155

156156
case .write(let part, let writer, let future):
157-
writer.writeRequestBodyPart(part, request: self)
158-
self.delegate.didSendRequestPart(task: self.task, part)
157+
let promise = self.task.eventLoop.makePromise(of: Void.self)
158+
promise.futureResult.whenSuccess {
159+
self.delegate.didSendRequestPart(task: self.task, part)
160+
}
161+
writer.writeRequestBodyPart(part, request: self, promise: promise)
159162
return future
160163
}
161164
}

Tests/AsyncHTTPClientTests/HTTP1ConnectionStateMachineTests.swift

+7-7
Original file line numberDiff line numberDiff line change
@@ -32,21 +32,21 @@ class HTTP1ConnectionStateMachineTests: XCTestCase {
3232
let part1 = IOData.byteBuffer(ByteBuffer(bytes: [1]))
3333
let part2 = IOData.byteBuffer(ByteBuffer(bytes: [2]))
3434
let part3 = IOData.byteBuffer(ByteBuffer(bytes: [3]))
35-
XCTAssertEqual(state.requestStreamPartReceived(part0), .sendBodyPart(part0))
36-
XCTAssertEqual(state.requestStreamPartReceived(part1), .sendBodyPart(part1))
35+
XCTAssertEqual(state.requestStreamPartReceived(part0, promise: nil), .sendBodyPart(part0, nil))
36+
XCTAssertEqual(state.requestStreamPartReceived(part1, promise: nil), .sendBodyPart(part1, nil))
3737

3838
// oh the channel reports... we should slow down producing...
3939
XCTAssertEqual(state.writabilityChanged(writable: false), .pauseRequestBodyStream)
4040

4141
// but we issued a .produceMoreRequestBodyData before... Thus, we must accept more produced
4242
// data
43-
XCTAssertEqual(state.requestStreamPartReceived(part2), .sendBodyPart(part2))
43+
XCTAssertEqual(state.requestStreamPartReceived(part2, promise: nil), .sendBodyPart(part2, nil))
4444
// however when we have put the data on the channel, we should not issue further
4545
// .produceMoreRequestBodyData events
4646

4747
// once we receive a writable event again, we can allow the producer to produce more data
4848
XCTAssertEqual(state.writabilityChanged(writable: true), .resumeRequestBodyStream)
49-
XCTAssertEqual(state.requestStreamPartReceived(part3), .sendBodyPart(part3))
49+
XCTAssertEqual(state.requestStreamPartReceived(part3, promise: nil), .sendBodyPart(part3, nil))
5050
XCTAssertEqual(state.requestStreamFinished(), .sendRequestEnd)
5151

5252
let responseHead = HTTPResponseHead(version: .http1_1, status: .ok)
@@ -186,8 +186,8 @@ class HTTP1ConnectionStateMachineTests: XCTestCase {
186186

187187
let part0 = IOData.byteBuffer(ByteBuffer(bytes: [0]))
188188
let part1 = IOData.byteBuffer(ByteBuffer(bytes: [1]))
189-
XCTAssertEqual(state.requestStreamPartReceived(part0), .sendBodyPart(part0))
190-
XCTAssertEqual(state.requestStreamPartReceived(part1), .sendBodyPart(part1))
189+
XCTAssertEqual(state.requestStreamPartReceived(part0, promise: nil), .sendBodyPart(part0, nil))
190+
XCTAssertEqual(state.requestStreamPartReceived(part1, promise: nil), .sendBodyPart(part1, nil))
191191
XCTAssertEqual(state.requestCancelled(closeConnection: false), .failRequest(HTTPClientError.cancelled, .close))
192192
}
193193

@@ -295,7 +295,7 @@ extension HTTP1ConnectionStateMachine.Action: Equatable {
295295
case (.sendRequestHead(let lhsHead, let lhsStartBody), .sendRequestHead(let rhsHead, let rhsStartBody)):
296296
return lhsHead == rhsHead && lhsStartBody == rhsStartBody
297297

298-
case (.sendBodyPart(let lhsData), .sendBodyPart(let rhsData)):
298+
case (.sendBodyPart(let lhsData, _), .sendBodyPart(let rhsData, _)):
299299
return lhsData == rhsData
300300

301301
case (.sendRequestEnd, .sendRequestEnd):

0 commit comments

Comments
 (0)