Skip to content

Commit 926e482

Browse files
committed
Call didSendRequestPart at the right time
# Motivation Right now, we call `didSendRequestPart` after passing the write to the executor. However, this does not mean that the write hit the socket. To implement proper backpressure using the delegate, we should only call this method once the write was successful. # Modification Pass a promise to the actual channel write and only call the delegate once that promise succeeds. # Result The delegate method `didSendRequestPart` is only called after the write was successful.
1 parent 89b0da2 commit 926e482

11 files changed

+327
-193
lines changed

Sources/AsyncHTTPClient/AsyncAwait/Transaction.swift

+4-4
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ final class Transaction: @unchecked Sendable {
6363

6464
switch writeAction {
6565
case .writeAndWait(let executor), .writeAndContinue(let executor):
66-
executor.writeRequestBodyPart(.byteBuffer(byteBuffer), request: self)
66+
executor.writeRequestBodyPart(.byteBuffer(byteBuffer), request: self, promise: nil)
6767

6868
case .fail:
6969
// an error/cancellation has happened. we don't need to continue here
@@ -105,14 +105,14 @@ final class Transaction: @unchecked Sendable {
105105
switch self.state.writeNextRequestPart() {
106106
case .writeAndContinue(let executor):
107107
self.stateLock.unlock()
108-
executor.writeRequestBodyPart(.byteBuffer(part), request: self)
108+
executor.writeRequestBodyPart(.byteBuffer(part), request: self, promise: nil)
109109

110110
case .writeAndWait(let executor):
111111
try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Void, Error>) in
112112
self.state.waitForRequestBodyDemand(continuation: continuation)
113113
self.stateLock.unlock()
114114

115-
executor.writeRequestBodyPart(.byteBuffer(part), request: self)
115+
executor.writeRequestBodyPart(.byteBuffer(part), request: self, promise: nil)
116116
}
117117

118118
case .fail:
@@ -132,7 +132,7 @@ final class Transaction: @unchecked Sendable {
132132
break
133133

134134
case .forwardStreamFinished(let executor, let succeedContinuation):
135-
executor.finishRequestBodyStream(self)
135+
executor.finishRequestBodyStream(self, promise: nil)
136136
succeedContinuation?.resume(returning: nil)
137137
}
138138
return

Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift

+44-27
Original file line numberDiff line numberDiff line change
@@ -185,11 +185,11 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
185185
case .sendRequestHead(let head, startBody: let startBody):
186186
self.sendRequestHead(head, startBody: startBody, context: context)
187187

188-
case .sendBodyPart(let part):
189-
context.writeAndFlush(self.wrapOutboundOut(.body(part)), promise: nil)
188+
case .sendBodyPart(let part, let writePromise):
189+
context.writeAndFlush(self.wrapOutboundOut(.body(part)), promise: writePromise)
190190

191-
case .sendRequestEnd:
192-
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
191+
case .sendRequestEnd(let writePromise):
192+
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: writePromise)
193193

194194
if let timeoutAction = self.idleReadTimeoutStateMachine?.requestEndSent() {
195195
self.runTimeoutAction(timeoutAction, context: context)
@@ -258,36 +258,52 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
258258
self.runTimeoutAction(.clearIdleReadTimeoutTimer, context: context)
259259

260260
switch finalAction {
261-
case .close:
262-
context.close(promise: nil)
263-
case .sendRequestEnd:
264-
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
265-
case .informConnectionIsIdle:
261+
case .close(let writePromise):
262+
let writePromise = writePromise ?? context.eventLoop.makePromise(of: Void.self)
263+
// We need to defer succeeding the old request to avoid ordering issues
264+
writePromise.futureResult.whenComplete { _ in
265+
oldRequest.succeedRequest(buffer)
266+
}
267+
268+
context.close(promise: writePromise)
269+
case .sendRequestEnd(let writePromise):
270+
let writePromise = writePromise ?? context.eventLoop.makePromise(of: Void.self)
271+
// We need to defer succeeding the old request to avoid ordering issues
272+
writePromise.futureResult.whenComplete { _ in
273+
oldRequest.succeedRequest(buffer)
274+
}
275+
276+
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: writePromise)
277+
case .informConnectionIsIdle(let writePromise):
266278
self.connection.taskCompleted()
279+
writePromise?.succeed(())
280+
oldRequest.succeedRequest(buffer)
267281
case .none:
268-
break
282+
oldRequest.succeedRequest(buffer)
269283
}
270284

271-
oldRequest.succeedRequest(buffer)
272-
273285
case .failRequest(let error, let finalAction):
274286
// see comment in the `succeedRequest` case.
275287
let oldRequest = self.request!
276288
self.request = nil
277289
self.runTimeoutAction(.clearIdleReadTimeoutTimer, context: context)
278290

279291
switch finalAction {
280-
case .close:
281-
context.close(promise: nil)
282-
case .sendRequestEnd:
283-
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
284-
case .informConnectionIsIdle:
292+
case .close(let writePromise):
293+
context.close(promise: writePromise)
294+
case .sendRequestEnd(let writerPromise):
295+
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: writerPromise)
296+
case .informConnectionIsIdle(let writePromise):
285297
self.connection.taskCompleted()
298+
writePromise?.fail(error)
286299
case .none:
287300
break
288301
}
289302

290303
oldRequest.fail(error)
304+
305+
case .failSendBodyPart(let error, let writePromise), .failSendStreamFinished(let error, let writePromise):
306+
writePromise?.fail(error)
291307
}
292308
}
293309

@@ -355,27 +371,28 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
355371

356372
// MARK: Private HTTPRequestExecutor
357373

358-
private func writeRequestBodyPart0(_ data: IOData, request: HTTPExecutableRequest) {
374+
private func writeRequestBodyPart0(_ data: IOData, request: HTTPExecutableRequest, promise: EventLoopPromise<Void>?) {
359375
guard self.request === request, let context = self.channelContext else {
360376
// Because the HTTPExecutableRequest may run in a different thread to our eventLoop,
361377
// calls from the HTTPExecutableRequest to our ChannelHandler may arrive here after
362378
// the request has been popped by the state machine or the ChannelHandler has been
363379
// removed from the Channel pipeline. This is a normal threading issue, noone has
364380
// screwed up.
381+
promise?.fail(HTTPClientError.requestStreamCancelled)
365382
return
366383
}
367384

368-
let action = self.state.requestStreamPartReceived(data)
385+
let action = self.state.requestStreamPartReceived(data, promise: promise)
369386
self.run(action, context: context)
370387
}
371388

372-
private func finishRequestBodyStream0(_ request: HTTPExecutableRequest) {
389+
private func finishRequestBodyStream0(_ request: HTTPExecutableRequest, promise: EventLoopPromise<Void>?) {
373390
guard self.request === request, let context = self.channelContext else {
374391
// See code comment in `writeRequestBodyPart0`
375392
return
376393
}
377394

378-
let action = self.state.requestStreamFinished()
395+
let action = self.state.requestStreamFinished(promise: promise)
379396
self.run(action, context: context)
380397
}
381398

@@ -405,22 +422,22 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
405422
}
406423

407424
extension HTTP1ClientChannelHandler: HTTPRequestExecutor {
408-
func writeRequestBodyPart(_ data: IOData, request: HTTPExecutableRequest) {
425+
func writeRequestBodyPart(_ data: IOData, request: HTTPExecutableRequest, promise: EventLoopPromise<Void>?) {
409426
if self.eventLoop.inEventLoop {
410-
self.writeRequestBodyPart0(data, request: request)
427+
self.writeRequestBodyPart0(data, request: request, promise: promise)
411428
} else {
412429
self.eventLoop.execute {
413-
self.writeRequestBodyPart0(data, request: request)
430+
self.writeRequestBodyPart0(data, request: request, promise: promise)
414431
}
415432
}
416433
}
417434

418-
func finishRequestBodyStream(_ request: HTTPExecutableRequest) {
435+
func finishRequestBodyStream(_ request: HTTPExecutableRequest, promise: EventLoopPromise<Void>?) {
419436
if self.eventLoop.inEventLoop {
420-
self.finishRequestBodyStream0(request)
437+
self.finishRequestBodyStream0(request, promise: promise)
421438
} else {
422439
self.eventLoop.execute {
423-
self.finishRequestBodyStream0(request)
440+
self.finishRequestBodyStream0(request, promise: promise)
424441
}
425442
}
426443
}

Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ConnectionStateMachine.swift

+76-36
Original file line numberDiff line numberDiff line change
@@ -30,19 +30,29 @@ struct HTTP1ConnectionStateMachine {
3030
/// A action to execute, when we consider a request "done".
3131
enum FinalStreamAction {
3232
/// Close the connection
33-
case close
33+
///
34+
/// The promise is an optional write promise.
35+
case close(EventLoopPromise<Void>?)
3436
/// If the server has replied, with a status of 200...300 before all data was sent, a request is considered succeeded,
3537
/// as soon as we wrote the request end onto the wire.
36-
case sendRequestEnd
38+
///
39+
/// The promise is an optional write promise.
40+
case sendRequestEnd(EventLoopPromise<Void>?)
3741
/// Inform an observer that the connection has become idle
38-
case informConnectionIsIdle
42+
///
43+
/// The promise is an optional write promise.
44+
case informConnectionIsIdle(EventLoopPromise<Void>?)
3945
/// Do nothing.
40-
case none
46+
///
47+
/// The promise is an optional write promise.
48+
case none(EventLoopPromise<Void>?)
4149
}
4250

4351
case sendRequestHead(HTTPRequestHead, startBody: Bool)
44-
case sendBodyPart(IOData)
45-
case sendRequestEnd
52+
case sendBodyPart(IOData, EventLoopPromise<Void>?)
53+
case sendRequestEnd(EventLoopPromise<Void>?)
54+
case failSendBodyPart(Error, EventLoopPromise<Void>?)
55+
case failSendStreamFinished(Error, EventLoopPromise<Void>?)
4656

4757
case pauseRequestBodyStream
4858
case resumeRequestBodyStream
@@ -173,7 +183,7 @@ struct HTTP1ConnectionStateMachine {
173183
// as closed.
174184
//
175185
// TODO: AHC should support a fast rescheduling mechanism here.
176-
return .failRequest(HTTPClientError.remoteConnectionClosed, .none)
186+
return .failRequest(HTTPClientError.remoteConnectionClosed, .none(nil))
177187

178188
case .idle:
179189
var requestStateMachine = HTTPRequestStateMachine(isChannelWritable: self.isChannelWritable)
@@ -189,25 +199,25 @@ struct HTTP1ConnectionStateMachine {
189199
}
190200
}
191201

192-
mutating func requestStreamPartReceived(_ part: IOData) -> Action {
202+
mutating func requestStreamPartReceived(_ part: IOData, promise: EventLoopPromise<Void>?) -> Action {
193203
guard case .inRequest(var requestStateMachine, let close) = self.state else {
194204
preconditionFailure("Invalid state: \(self.state)")
195205
}
196206

197207
return self.avoidingStateMachineCoW { state -> Action in
198-
let action = requestStateMachine.requestStreamPartReceived(part)
208+
let action = requestStateMachine.requestStreamPartReceived(part, promise: promise)
199209
state = .inRequest(requestStateMachine, close: close)
200210
return state.modify(with: action)
201211
}
202212
}
203213

204-
mutating func requestStreamFinished() -> Action {
214+
mutating func requestStreamFinished(promise: EventLoopPromise<Void>?) -> Action {
205215
guard case .inRequest(var requestStateMachine, let close) = self.state else {
206216
preconditionFailure("Invalid state: \(self.state)")
207217
}
208218

209219
return self.avoidingStateMachineCoW { state -> Action in
210-
let action = requestStateMachine.requestStreamFinished()
220+
let action = requestStateMachine.requestStreamFinished(promise: promise)
211221
state = .inRequest(requestStateMachine, close: close)
212222
return state.modify(with: action)
213223
}
@@ -377,10 +387,10 @@ extension HTTP1ConnectionStateMachine.State {
377387
return .pauseRequestBodyStream
378388
case .resumeRequestBodyStream:
379389
return .resumeRequestBodyStream
380-
case .sendBodyPart(let part):
381-
return .sendBodyPart(part)
382-
case .sendRequestEnd:
383-
return .sendRequestEnd
390+
case .sendBodyPart(let part, let writePromise):
391+
return .sendBodyPart(part, writePromise)
392+
case .sendRequestEnd(let writePromise):
393+
return .sendRequestEnd(writePromise)
384394
case .forwardResponseHead(let head, let pauseRequestBodyStream):
385395
return .forwardResponseHead(head, pauseRequestBodyStream: pauseRequestBodyStream)
386396
case .forwardResponseBodyParts(let parts):
@@ -392,39 +402,63 @@ extension HTTP1ConnectionStateMachine.State {
392402

393403
let newFinalAction: HTTP1ConnectionStateMachine.Action.FinalStreamAction
394404
switch finalAction {
395-
case .close:
405+
case .close(let writePromise):
396406
self = .closing
397-
newFinalAction = .close
398-
case .sendRequestEnd:
399-
newFinalAction = .sendRequestEnd
407+
newFinalAction = .close(writePromise)
408+
case .sendRequestEnd(let writePromise):
409+
newFinalAction = .sendRequestEnd(writePromise)
400410
case .none:
401411
self = .idle
402-
newFinalAction = close ? .close : .informConnectionIsIdle
412+
newFinalAction = close ? .close(nil) : .informConnectionIsIdle(nil)
403413
}
404414
return .succeedRequest(newFinalAction, finalParts)
405415

406416
case .failRequest(let error, let finalAction):
407-
switch self {
408-
case .initialized:
417+
switch (self, finalAction) {
418+
case (.initialized, _):
409419
preconditionFailure("Invalid state: \(self)")
410-
case .idle:
420+
421+
case (.idle, _):
411422
preconditionFailure("How can we fail a task, if we are idle")
412-
case .inRequest(_, close: let close):
413-
if close || finalAction == .close {
414-
self = .closing
415-
return .failRequest(error, .close)
416-
} else {
417-
self = .idle
418-
return .failRequest(error, .informConnectionIsIdle)
419-
}
420423

421-
case .closing:
422-
return .failRequest(error, .none)
423-
case .closed:
424+
// If we are either in .inRequest(_, close: true) or the final action is .close
425+
// we have to fail the request with .close()
426+
case (.inRequest(_, let close), .sendRequestEnd(let writePromise)) where close:
427+
self = .closing
428+
return .failRequest(error, .close(writePromise))
429+
430+
case (.inRequest(_, let close), .none) where close:
431+
self = .closing
432+
return .failRequest(error, .close(nil))
433+
434+
case (.inRequest(_, _), .close(let writePromise)):
435+
self = .closing
436+
return .failRequest(error, .close(writePromise))
437+
438+
// otherwise we fail with .informConnectionIsIdle
439+
case (.inRequest(_, _), .sendRequestEnd(let writePromise)):
440+
self = .idle
441+
return .failRequest(error, .informConnectionIsIdle(writePromise))
442+
443+
case (.inRequest(_, _), .none):
444+
self = .idle
445+
return .failRequest(error, .informConnectionIsIdle(nil))
446+
447+
case (.closing, .close(let writePromise)), (.closing, .sendRequestEnd(let writePromise)):
448+
return .failRequest(error, .none(writePromise))
449+
450+
case (.closing, .none):
451+
return .failRequest(error, .none(nil))
452+
453+
case (.closed, .close(let writePromise)), (.closed, .sendRequestEnd(let writePromise)):
424454
// this state can be reached, if the connection was unexpectedly closed by remote
425-
return .failRequest(error, .none)
455+
return .failRequest(error, .none(writePromise))
426456

427-
case .modifying:
457+
case (.closed, .none):
458+
// this state can be reached, if the connection was unexpectedly closed by remote
459+
return .failRequest(error, .none(nil))
460+
461+
case (.modifying, _):
428462
preconditionFailure("Invalid state: \(self)")
429463
}
430464

@@ -433,6 +467,12 @@ extension HTTP1ConnectionStateMachine.State {
433467

434468
case .wait:
435469
return .wait
470+
471+
case .failSendBodyPart(let error, let writePromise):
472+
return .failSendBodyPart(error, writePromise)
473+
474+
case .failSendStreamFinished(let error, let writePromise):
475+
return .failSendStreamFinished(error, writePromise)
436476
}
437477
}
438478
}

0 commit comments

Comments
 (0)