Skip to content

Commit b549617

Browse files
committed
Call didSendRequestPart at the right time
# Motivation Right now, we call `didSendRequestPart` after passing the write to the executor. However, this does not mean that the write hit the socket. To implement proper backpressure using the delegate, we should only call this method once the write was successful. # Modification Pass a promise to the actual channel write and only call the delegate once that promise succeeds. # Result The delegate method `didSendRequestPart` is only called after the write was successful.
1 parent 9d8cd95 commit b549617

15 files changed

+218
-101
lines changed

Sources/AsyncHTTPClient/AsyncAwait/Transaction.swift

+4-4
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ final class Transaction: @unchecked Sendable {
6363

6464
switch writeAction {
6565
case .writeAndWait(let executor), .writeAndContinue(let executor):
66-
executor.writeRequestBodyPart(.byteBuffer(byteBuffer), request: self)
66+
executor.writeRequestBodyPart(.byteBuffer(byteBuffer), request: self, promise: nil)
6767

6868
case .fail:
6969
// an error/cancellation has happened. we don't need to continue here
@@ -105,14 +105,14 @@ final class Transaction: @unchecked Sendable {
105105
switch self.state.writeNextRequestPart() {
106106
case .writeAndContinue(let executor):
107107
self.stateLock.unlock()
108-
executor.writeRequestBodyPart(.byteBuffer(part), request: self)
108+
executor.writeRequestBodyPart(.byteBuffer(part), request: self, promise: nil)
109109

110110
case .writeAndWait(let executor):
111111
try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Void, Error>) in
112112
self.state.waitForRequestBodyDemand(continuation: continuation)
113113
self.stateLock.unlock()
114114

115-
executor.writeRequestBodyPart(.byteBuffer(part), request: self)
115+
executor.writeRequestBodyPart(.byteBuffer(part), request: self, promise: nil)
116116
}
117117

118118
case .fail:
@@ -132,7 +132,7 @@ final class Transaction: @unchecked Sendable {
132132
break
133133

134134
case .forwardStreamFinished(let executor, let succeedContinuation):
135-
executor.finishRequestBodyStream(self)
135+
executor.finishRequestBodyStream(self, promise: nil)
136136
succeedContinuation?.resume(returning: nil)
137137
}
138138
return

Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift

+15-14
Original file line numberDiff line numberDiff line change
@@ -185,11 +185,11 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
185185
case .sendRequestHead(let head, startBody: let startBody):
186186
self.sendRequestHead(head, startBody: startBody, context: context)
187187

188-
case .sendBodyPart(let part):
189-
context.writeAndFlush(self.wrapOutboundOut(.body(part)), promise: nil)
188+
case .sendBodyPart(let part, let promise):
189+
context.writeAndFlush(self.wrapOutboundOut(.body(part)), promise: promise)
190190

191-
case .sendRequestEnd:
192-
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
191+
case .sendRequestEnd(let promise):
192+
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: promise)
193193

194194
if let timeoutAction = self.idleReadTimeoutStateMachine?.requestEndSent() {
195195
self.runTimeoutAction(timeoutAction, context: context)
@@ -355,27 +355,28 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
355355

356356
// MARK: Private HTTPRequestExecutor
357357

358-
private func writeRequestBodyPart0(_ data: IOData, request: HTTPExecutableRequest) {
358+
private func writeRequestBodyPart0(_ data: IOData, request: HTTPExecutableRequest, promise: EventLoopPromise<Void>?) {
359359
guard self.request === request, let context = self.channelContext else {
360360
// Because the HTTPExecutableRequest may run in a different thread to our eventLoop,
361361
// calls from the HTTPExecutableRequest to our ChannelHandler may arrive here after
362362
// the request has been popped by the state machine or the ChannelHandler has been
363363
// removed from the Channel pipeline. This is a normal threading issue, noone has
364364
// screwed up.
365+
promise?.fail(ChannelError.eof)
365366
return
366367
}
367368

368-
let action = self.state.requestStreamPartReceived(data)
369+
let action = self.state.requestStreamPartReceived(data, promise: promise)
369370
self.run(action, context: context)
370371
}
371372

372-
private func finishRequestBodyStream0(_ request: HTTPExecutableRequest) {
373+
private func finishRequestBodyStream0(_ request: HTTPExecutableRequest, promise: EventLoopPromise<Void>?) {
373374
guard self.request === request, let context = self.channelContext else {
374375
// See code comment in `writeRequestBodyPart0`
375376
return
376377
}
377378

378-
let action = self.state.requestStreamFinished()
379+
let action = self.state.requestStreamFinished(promise: promise)
379380
self.run(action, context: context)
380381
}
381382

@@ -405,22 +406,22 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
405406
}
406407

407408
extension HTTP1ClientChannelHandler: HTTPRequestExecutor {
408-
func writeRequestBodyPart(_ data: IOData, request: HTTPExecutableRequest) {
409+
func writeRequestBodyPart(_ data: IOData, request: HTTPExecutableRequest, promise: EventLoopPromise<Void>?) {
409410
if self.eventLoop.inEventLoop {
410-
self.writeRequestBodyPart0(data, request: request)
411+
self.writeRequestBodyPart0(data, request: request, promise: promise)
411412
} else {
412413
self.eventLoop.execute {
413-
self.writeRequestBodyPart0(data, request: request)
414+
self.writeRequestBodyPart0(data, request: request, promise: promise)
414415
}
415416
}
416417
}
417418

418-
func finishRequestBodyStream(_ request: HTTPExecutableRequest) {
419+
func finishRequestBodyStream(_ request: HTTPExecutableRequest, promise: EventLoopPromise<Void>?) {
419420
if self.eventLoop.inEventLoop {
420-
self.finishRequestBodyStream0(request)
421+
self.finishRequestBodyStream0(request, promise: promise)
421422
} else {
422423
self.eventLoop.execute {
423-
self.finishRequestBodyStream0(request)
424+
self.finishRequestBodyStream0(request, promise: promise)
424425
}
425426
}
426427
}

Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ConnectionStateMachine.swift

+10-10
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@ struct HTTP1ConnectionStateMachine {
4141
}
4242

4343
case sendRequestHead(HTTPRequestHead, startBody: Bool)
44-
case sendBodyPart(IOData)
45-
case sendRequestEnd
44+
case sendBodyPart(IOData, EventLoopPromise<Void>?)
45+
case sendRequestEnd(EventLoopPromise<Void>?)
4646

4747
case pauseRequestBodyStream
4848
case resumeRequestBodyStream
@@ -189,25 +189,25 @@ struct HTTP1ConnectionStateMachine {
189189
}
190190
}
191191

192-
mutating func requestStreamPartReceived(_ part: IOData) -> Action {
192+
mutating func requestStreamPartReceived(_ part: IOData, promise: EventLoopPromise<Void>?) -> Action {
193193
guard case .inRequest(var requestStateMachine, let close) = self.state else {
194194
preconditionFailure("Invalid state: \(self.state)")
195195
}
196196

197197
return self.avoidingStateMachineCoW { state -> Action in
198-
let action = requestStateMachine.requestStreamPartReceived(part)
198+
let action = requestStateMachine.requestStreamPartReceived(part, promise: promise)
199199
state = .inRequest(requestStateMachine, close: close)
200200
return state.modify(with: action)
201201
}
202202
}
203203

204-
mutating func requestStreamFinished() -> Action {
204+
mutating func requestStreamFinished(promise: EventLoopPromise<Void>?) -> Action {
205205
guard case .inRequest(var requestStateMachine, let close) = self.state else {
206206
preconditionFailure("Invalid state: \(self.state)")
207207
}
208208

209209
return self.avoidingStateMachineCoW { state -> Action in
210-
let action = requestStateMachine.requestStreamFinished()
210+
let action = requestStateMachine.requestStreamFinished(promise: promise)
211211
state = .inRequest(requestStateMachine, close: close)
212212
return state.modify(with: action)
213213
}
@@ -377,10 +377,10 @@ extension HTTP1ConnectionStateMachine.State {
377377
return .pauseRequestBodyStream
378378
case .resumeRequestBodyStream:
379379
return .resumeRequestBodyStream
380-
case .sendBodyPart(let part):
381-
return .sendBodyPart(part)
382-
case .sendRequestEnd:
383-
return .sendRequestEnd
380+
case .sendBodyPart(let part, let promise):
381+
return .sendBodyPart(part, promise)
382+
case .sendRequestEnd(let promise):
383+
return .sendRequestEnd(promise)
384384
case .forwardResponseHead(let head, let pauseRequestBodyStream):
385385
return .forwardResponseHead(head, pauseRequestBodyStream: pauseRequestBodyStream)
386386
case .forwardResponseBodyParts(let parts):

Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2ClientRequestHandler.swift

+13-12
Original file line numberDiff line numberDiff line change
@@ -148,8 +148,8 @@ final class HTTP2ClientRequestHandler: ChannelDuplexHandler {
148148
// that the request is neither failed nor finished yet
149149
self.request!.pauseRequestBodyStream()
150150

151-
case .sendBodyPart(let data):
152-
context.writeAndFlush(self.wrapOutboundOut(.body(data)), promise: nil)
151+
case .sendBodyPart(let data, let promise):
152+
context.writeAndFlush(self.wrapOutboundOut(.body(data)), promise: promise)
153153

154154
case .sendRequestEnd:
155155
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
@@ -281,27 +281,28 @@ final class HTTP2ClientRequestHandler: ChannelDuplexHandler {
281281

282282
// MARK: Private HTTPRequestExecutor
283283

284-
private func writeRequestBodyPart0(_ data: IOData, request: HTTPExecutableRequest) {
284+
private func writeRequestBodyPart0(_ data: IOData, request: HTTPExecutableRequest, promise: EventLoopPromise<Void>?) {
285285
guard self.request === request, let context = self.channelContext else {
286286
// Because the HTTPExecutableRequest may run in a different thread to our eventLoop,
287287
// calls from the HTTPExecutableRequest to our ChannelHandler may arrive here after
288288
// the request has been popped by the state machine or the ChannelHandler has been
289289
// removed from the Channel pipeline. This is a normal threading issue, noone has
290290
// screwed up.
291+
promise?.fail(ChannelError.ioOnClosedChannel)
291292
return
292293
}
293294

294-
let action = self.state.requestStreamPartReceived(data)
295+
let action = self.state.requestStreamPartReceived(data, promise: promise)
295296
self.run(action, context: context)
296297
}
297298

298-
private func finishRequestBodyStream0(_ request: HTTPExecutableRequest) {
299+
private func finishRequestBodyStream0(_ request: HTTPExecutableRequest, promise: EventLoopPromise<Void>?) {
299300
guard self.request === request, let context = self.channelContext else {
300301
// See code comment in `writeRequestBodyPart0`
301302
return
302303
}
303304

304-
let action = self.state.requestStreamFinished()
305+
let action = self.state.requestStreamFinished(promise: promise)
305306
self.run(action, context: context)
306307
}
307308

@@ -327,22 +328,22 @@ final class HTTP2ClientRequestHandler: ChannelDuplexHandler {
327328
}
328329

329330
extension HTTP2ClientRequestHandler: HTTPRequestExecutor {
330-
func writeRequestBodyPart(_ data: IOData, request: HTTPExecutableRequest) {
331+
func writeRequestBodyPart(_ data: IOData, request: HTTPExecutableRequest, promise: EventLoopPromise<Void>?) {
331332
if self.eventLoop.inEventLoop {
332-
self.writeRequestBodyPart0(data, request: request)
333+
self.writeRequestBodyPart0(data, request: request, promise: promise)
333334
} else {
334335
self.eventLoop.execute {
335-
self.writeRequestBodyPart0(data, request: request)
336+
self.writeRequestBodyPart0(data, request: request, promise: promise)
336337
}
337338
}
338339
}
339340

340-
func finishRequestBodyStream(_ request: HTTPExecutableRequest) {
341+
func finishRequestBodyStream(_ request: HTTPExecutableRequest, promise: EventLoopPromise<Void>?) {
341342
if self.eventLoop.inEventLoop {
342-
self.finishRequestBodyStream0(request)
343+
self.finishRequestBodyStream0(request, promise: promise)
343344
} else {
344345
self.eventLoop.execute {
345-
self.finishRequestBodyStream0(request)
346+
self.finishRequestBodyStream0(request, promise: promise)
346347
}
347348
}
348349
}

Sources/AsyncHTTPClient/ConnectionPool/HTTPExecutableRequest.swift

+2-2
Original file line numberDiff line numberDiff line change
@@ -180,12 +180,12 @@ protocol HTTPRequestExecutor {
180180
/// Writes a body part into the channel pipeline
181181
///
182182
/// This method may be **called on any thread**. The executor needs to ensure thread safety.
183-
func writeRequestBodyPart(_: IOData, request: HTTPExecutableRequest)
183+
func writeRequestBodyPart(_: IOData, request: HTTPExecutableRequest, promise: EventLoopPromise<Void>?)
184184

185185
/// Signals that the request body stream has finished
186186
///
187187
/// This method may be **called on any thread**. The executor needs to ensure thread safety.
188-
func finishRequestBodyStream(_ task: HTTPExecutableRequest)
188+
func finishRequestBodyStream(_ task: HTTPExecutableRequest, promise: EventLoopPromise<Void>?)
189189

190190
/// Signals that more bytes from response body stream can be consumed.
191191
///

Sources/AsyncHTTPClient/ConnectionPool/HTTPRequestStateMachine.swift

+17-7
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,8 @@ struct HTTPRequestStateMachine {
8383
}
8484

8585
case sendRequestHead(HTTPRequestHead, startBody: Bool)
86-
case sendBodyPart(IOData)
87-
case sendRequestEnd
86+
case sendBodyPart(IOData, EventLoopPromise<Void>?)
87+
case sendRequestEnd(EventLoopPromise<Void>?)
8888

8989
case pauseRequestBodyStream
9090
case resumeRequestBodyStream
@@ -261,7 +261,7 @@ struct HTTPRequestStateMachine {
261261
}
262262
}
263263

264-
mutating func requestStreamPartReceived(_ part: IOData) -> Action {
264+
mutating func requestStreamPartReceived(_ part: IOData, promise: EventLoopPromise<Void>?) -> Action {
265265
switch self.state {
266266
case .initialized,
267267
.waitForChannelToBecomeWritable,
@@ -274,6 +274,7 @@ struct HTTPRequestStateMachine {
274274
// won't be interested. We expect that the producer has been informed to pause
275275
// producing.
276276
assert(producerState == .paused)
277+
promise?.fail(HTTPClientError.requestStreamCancelled)
277278
return .wait
278279

279280
case .running(.streaming(let expectedBodyLength, var sentBodyBytes, let producerState), let responseState):
@@ -290,6 +291,7 @@ struct HTTPRequestStateMachine {
290291
if let expected = expectedBodyLength, sentBodyBytes + part.readableBytes > expected {
291292
let error = HTTPClientError.bodyLengthMismatch
292293
self.state = .failed(error)
294+
promise?.fail(error)
293295
return .failRequest(error, .close)
294296
}
295297

@@ -303,9 +305,10 @@ struct HTTPRequestStateMachine {
303305

304306
self.state = .running(requestState, responseState)
305307

306-
return .sendBodyPart(part)
308+
return .sendBodyPart(part, promise)
307309

308310
case .failed:
311+
promise?.fail(HTTPClientError.failed)
309312
return .wait
310313

311314
case .finished:
@@ -318,14 +321,15 @@ struct HTTPRequestStateMachine {
318321

319322
// We may still receive something, here because of potential race conditions with the
320323
// producing thread.
324+
promise?.fail(ChannelError.eof)
321325
return .wait
322326

323327
case .modifying:
324328
preconditionFailure("Invalid state: \(self.state)")
325329
}
326330
}
327331

328-
mutating func requestStreamFinished() -> Action {
332+
mutating func requestStreamFinished(promise: EventLoopPromise<Void>?) -> Action {
329333
switch self.state {
330334
case .initialized,
331335
.waitForChannelToBecomeWritable,
@@ -336,35 +340,40 @@ struct HTTPRequestStateMachine {
336340
if let expected = expectedBodyLength, expected != sentBodyBytes {
337341
let error = HTTPClientError.bodyLengthMismatch
338342
self.state = .failed(error)
343+
promise?.fail(error)
339344
return .failRequest(error, .close)
340345
}
341346

342347
self.state = .running(.endSent, .waitingForHead)
343-
return .sendRequestEnd
348+
return .sendRequestEnd(promise)
344349

345350
case .running(.streaming(let expectedBodyLength, let sentBodyBytes, _), .receivingBody(let head, let streamState)):
346351
assert(head.status.code < 300)
347352

348353
if let expected = expectedBodyLength, expected != sentBodyBytes {
349354
let error = HTTPClientError.bodyLengthMismatch
350355
self.state = .failed(error)
356+
promise?.fail(error)
351357
return .failRequest(error, .close)
352358
}
353359

354360
self.state = .running(.endSent, .receivingBody(head, streamState))
355-
return .sendRequestEnd
361+
return .sendRequestEnd(promise)
356362

357363
case .running(.streaming(let expectedBodyLength, let sentBodyBytes, _), .endReceived):
358364
if let expected = expectedBodyLength, expected != sentBodyBytes {
359365
let error = HTTPClientError.bodyLengthMismatch
360366
self.state = .failed(error)
367+
promise?.fail(error)
361368
return .failRequest(error, .close)
362369
}
363370

364371
self.state = .finished
372+
promise?.succeed(())
365373
return .succeedRequest(.sendRequestEnd, .init())
366374

367375
case .failed:
376+
promise?.fail(HTTPClientError.failed)
368377
return .wait
369378

370379
case .finished:
@@ -377,6 +386,7 @@ struct HTTPRequestStateMachine {
377386

378387
// We may still receive something, here because of potential race conditions with the
379388
// producing thread.
389+
promise?.fail(ChannelError.eof)
380390
return .wait
381391

382392
case .modifying:

Sources/AsyncHTTPClient/HTTPClient.swift

+3
Original file line numberDiff line numberDiff line change
@@ -912,6 +912,7 @@ public struct HTTPClientError: Error, Equatable, CustomStringConvertible {
912912
case unsupportedScheme(String)
913913
case readTimeout
914914
case remoteConnectionClosed
915+
case failed
915916
case cancelled
916917
case identityCodingIncorrectlyPresent
917918
@available(*, deprecated, message: "AsyncHTTPClient now silently corrects this invalid header.")
@@ -965,6 +966,8 @@ public struct HTTPClientError: Error, Equatable, CustomStringConvertible {
965966
public static let readTimeout = HTTPClientError(code: .readTimeout)
966967
/// Remote connection was closed unexpectedly.
967968
public static let remoteConnectionClosed = HTTPClientError(code: .remoteConnectionClosed)
969+
/// Request failed.
970+
public static let failed = HTTPClientError(code: .failed)
968971
/// Request was cancelled.
969972
public static let cancelled = HTTPClientError(code: .cancelled)
970973
/// Request contains invalid identity encoding.

0 commit comments

Comments
 (0)