Skip to content

Commit 85bb24d

Browse files
committed
Call didSendRequestPart at the right time
# Motivation Right now, we call `didSendRequestPart` after passing the write to the executor. However, this does not mean that the write hit the socket. To implement proper backpressure using the delegate, we should only call this method once the write was successful. # Modification Pass a promise to the actual channel write and only call the delegate once that promise succeeds. # Result The delegate method `didSendRequestPart` is only called after the write was successful.
1 parent 89b0da2 commit 85bb24d

11 files changed

+358
-181
lines changed

Sources/AsyncHTTPClient/AsyncAwait/Transaction.swift

+4-4
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ final class Transaction: @unchecked Sendable {
6363

6464
switch writeAction {
6565
case .writeAndWait(let executor), .writeAndContinue(let executor):
66-
executor.writeRequestBodyPart(.byteBuffer(byteBuffer), request: self)
66+
executor.writeRequestBodyPart(.byteBuffer(byteBuffer), request: self, promise: nil)
6767

6868
case .fail:
6969
// an error/cancellation has happened. we don't need to continue here
@@ -105,14 +105,14 @@ final class Transaction: @unchecked Sendable {
105105
switch self.state.writeNextRequestPart() {
106106
case .writeAndContinue(let executor):
107107
self.stateLock.unlock()
108-
executor.writeRequestBodyPart(.byteBuffer(part), request: self)
108+
executor.writeRequestBodyPart(.byteBuffer(part), request: self, promise: nil)
109109

110110
case .writeAndWait(let executor):
111111
try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Void, Error>) in
112112
self.state.waitForRequestBodyDemand(continuation: continuation)
113113
self.stateLock.unlock()
114114

115-
executor.writeRequestBodyPart(.byteBuffer(part), request: self)
115+
executor.writeRequestBodyPart(.byteBuffer(part), request: self, promise: nil)
116116
}
117117

118118
case .fail:
@@ -132,7 +132,7 @@ final class Transaction: @unchecked Sendable {
132132
break
133133

134134
case .forwardStreamFinished(let executor, let succeedContinuation):
135-
executor.finishRequestBodyStream(self)
135+
executor.finishRequestBodyStream(self, promise: nil)
136136
succeedContinuation?.resume(returning: nil)
137137
}
138138
return

Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift

+45-26
Original file line numberDiff line numberDiff line change
@@ -185,11 +185,11 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
185185
case .sendRequestHead(let head, startBody: let startBody):
186186
self.sendRequestHead(head, startBody: startBody, context: context)
187187

188-
case .sendBodyPart(let part):
189-
context.writeAndFlush(self.wrapOutboundOut(.body(part)), promise: nil)
188+
case .sendBodyPart(let part, let writePromise):
189+
context.writeAndFlush(self.wrapOutboundOut(.body(part)), promise: writePromise)
190190

191-
case .sendRequestEnd:
192-
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
191+
case .sendRequestEnd(let writePromise):
192+
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: writePromise)
193193

194194
if let timeoutAction = self.idleReadTimeoutStateMachine?.requestEndSent() {
195195
self.runTimeoutAction(timeoutAction, context: context)
@@ -260,34 +260,52 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
260260
switch finalAction {
261261
case .close:
262262
context.close(promise: nil)
263-
case .sendRequestEnd:
264-
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
263+
oldRequest.succeedRequest(buffer)
264+
case .sendRequestEnd(let writePromise):
265+
let writePromise = writePromise ?? context.eventLoop.makePromise(of: Void.self)
266+
// We need to defer succeeding the old request to avoid ordering issues
267+
writePromise.futureResult.whenComplete { result in
268+
switch result {
269+
case .success:
270+
oldRequest.succeedRequest(buffer)
271+
case .failure(let error):
272+
oldRequest.fail(error)
273+
}
274+
}
275+
276+
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: writePromise)
265277
case .informConnectionIsIdle:
266278
self.connection.taskCompleted()
267-
case .none:
268-
break
279+
oldRequest.succeedRequest(buffer)
269280
}
270281

271-
oldRequest.succeedRequest(buffer)
272-
273282
case .failRequest(let error, let finalAction):
274283
// see comment in the `succeedRequest` case.
275284
let oldRequest = self.request!
276285
self.request = nil
277286
self.runTimeoutAction(.clearIdleReadTimeoutTimer, context: context)
278287

279288
switch finalAction {
280-
case .close:
289+
case .close(let writePromise):
281290
context.close(promise: nil)
282-
case .sendRequestEnd:
283-
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
284-
case .informConnectionIsIdle:
291+
writePromise?.fail(error)
292+
oldRequest.fail(error)
293+
294+
case .informConnectionIsIdle(let writePromise):
285295
self.connection.taskCompleted()
296+
writePromise?.fail(error)
297+
oldRequest.fail(error)
298+
299+
case .failWritePromise(let writePromise):
300+
writePromise?.fail(error)
301+
oldRequest.fail(error)
302+
286303
case .none:
287-
break
304+
oldRequest.fail(error)
288305
}
289306

290-
oldRequest.fail(error)
307+
case .failSendBodyPart(let error, let writePromise), .failSendStreamFinished(let error, let writePromise):
308+
writePromise?.fail(error)
291309
}
292310
}
293311

@@ -355,27 +373,28 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
355373

356374
// MARK: Private HTTPRequestExecutor
357375

358-
private func writeRequestBodyPart0(_ data: IOData, request: HTTPExecutableRequest) {
376+
private func writeRequestBodyPart0(_ data: IOData, request: HTTPExecutableRequest, promise: EventLoopPromise<Void>?) {
359377
guard self.request === request, let context = self.channelContext else {
360378
// Because the HTTPExecutableRequest may run in a different thread to our eventLoop,
361379
// calls from the HTTPExecutableRequest to our ChannelHandler may arrive here after
362380
// the request has been popped by the state machine or the ChannelHandler has been
363381
// removed from the Channel pipeline. This is a normal threading issue, noone has
364382
// screwed up.
383+
promise?.fail(HTTPClientError.requestStreamCancelled)
365384
return
366385
}
367386

368-
let action = self.state.requestStreamPartReceived(data)
387+
let action = self.state.requestStreamPartReceived(data, promise: promise)
369388
self.run(action, context: context)
370389
}
371390

372-
private func finishRequestBodyStream0(_ request: HTTPExecutableRequest) {
391+
private func finishRequestBodyStream0(_ request: HTTPExecutableRequest, promise: EventLoopPromise<Void>?) {
373392
guard self.request === request, let context = self.channelContext else {
374393
// See code comment in `writeRequestBodyPart0`
375394
return
376395
}
377396

378-
let action = self.state.requestStreamFinished()
397+
let action = self.state.requestStreamFinished(promise: promise)
379398
self.run(action, context: context)
380399
}
381400

@@ -405,22 +424,22 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
405424
}
406425

407426
extension HTTP1ClientChannelHandler: HTTPRequestExecutor {
408-
func writeRequestBodyPart(_ data: IOData, request: HTTPExecutableRequest) {
427+
func writeRequestBodyPart(_ data: IOData, request: HTTPExecutableRequest, promise: EventLoopPromise<Void>?) {
409428
if self.eventLoop.inEventLoop {
410-
self.writeRequestBodyPart0(data, request: request)
429+
self.writeRequestBodyPart0(data, request: request, promise: promise)
411430
} else {
412431
self.eventLoop.execute {
413-
self.writeRequestBodyPart0(data, request: request)
432+
self.writeRequestBodyPart0(data, request: request, promise: promise)
414433
}
415434
}
416435
}
417436

418-
func finishRequestBodyStream(_ request: HTTPExecutableRequest) {
437+
func finishRequestBodyStream(_ request: HTTPExecutableRequest, promise: EventLoopPromise<Void>?) {
419438
if self.eventLoop.inEventLoop {
420-
self.finishRequestBodyStream0(request)
439+
self.finishRequestBodyStream0(request, promise: promise)
421440
} else {
422441
self.eventLoop.execute {
423-
self.finishRequestBodyStream0(request)
442+
self.finishRequestBodyStream0(request, promise: promise)
424443
}
425444
}
426445
}

Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ConnectionStateMachine.swift

+72-32
Original file line numberDiff line numberDiff line change
@@ -28,30 +28,48 @@ struct HTTP1ConnectionStateMachine {
2828

2929
enum Action {
3030
/// A action to execute, when we consider a request "done".
31-
enum FinalStreamAction {
31+
enum FinalSuccessfulStreamAction {
3232
/// Close the connection
3333
case close
3434
/// If the server has replied, with a status of 200...300 before all data was sent, a request is considered succeeded,
3535
/// as soon as we wrote the request end onto the wire.
36-
case sendRequestEnd
36+
///
37+
/// The promise is an optional write promise.
38+
case sendRequestEnd(EventLoopPromise<Void>?)
3739
/// Inform an observer that the connection has become idle
3840
case informConnectionIsIdle
41+
}
42+
43+
/// A action to execute, when we consider a request "done".
44+
enum FinalFailedStreamAction {
45+
/// Close the connection
46+
///
47+
/// The promise is an optional write promise.
48+
case close(EventLoopPromise<Void>?)
49+
/// Inform an observer that the connection has become idle
50+
///
51+
/// The promise is an optional write promise.
52+
case informConnectionIsIdle(EventLoopPromise<Void>?)
53+
/// Fail the write promise
54+
case failWritePromise(EventLoopPromise<Void>?)
3955
/// Do nothing.
4056
case none
4157
}
4258

4359
case sendRequestHead(HTTPRequestHead, startBody: Bool)
44-
case sendBodyPart(IOData)
45-
case sendRequestEnd
60+
case sendBodyPart(IOData, EventLoopPromise<Void>?)
61+
case sendRequestEnd(EventLoopPromise<Void>?)
62+
case failSendBodyPart(Error, EventLoopPromise<Void>?)
63+
case failSendStreamFinished(Error, EventLoopPromise<Void>?)
4664

4765
case pauseRequestBodyStream
4866
case resumeRequestBodyStream
4967

5068
case forwardResponseHead(HTTPResponseHead, pauseRequestBodyStream: Bool)
5169
case forwardResponseBodyParts(CircularBuffer<ByteBuffer>)
5270

53-
case failRequest(Error, FinalStreamAction)
54-
case succeedRequest(FinalStreamAction, CircularBuffer<ByteBuffer>)
71+
case failRequest(Error, FinalFailedStreamAction)
72+
case succeedRequest(FinalSuccessfulStreamAction, CircularBuffer<ByteBuffer>)
5573

5674
case read
5775
case close
@@ -173,7 +191,7 @@ struct HTTP1ConnectionStateMachine {
173191
// as closed.
174192
//
175193
// TODO: AHC should support a fast rescheduling mechanism here.
176-
return .failRequest(HTTPClientError.remoteConnectionClosed, .none)
194+
return .failRequest(HTTPClientError.remoteConnectionClosed, .failWritePromise(nil))
177195

178196
case .idle:
179197
var requestStateMachine = HTTPRequestStateMachine(isChannelWritable: self.isChannelWritable)
@@ -189,25 +207,25 @@ struct HTTP1ConnectionStateMachine {
189207
}
190208
}
191209

192-
mutating func requestStreamPartReceived(_ part: IOData) -> Action {
210+
mutating func requestStreamPartReceived(_ part: IOData, promise: EventLoopPromise<Void>?) -> Action {
193211
guard case .inRequest(var requestStateMachine, let close) = self.state else {
194212
preconditionFailure("Invalid state: \(self.state)")
195213
}
196214

197215
return self.avoidingStateMachineCoW { state -> Action in
198-
let action = requestStateMachine.requestStreamPartReceived(part)
216+
let action = requestStateMachine.requestStreamPartReceived(part, promise: promise)
199217
state = .inRequest(requestStateMachine, close: close)
200218
return state.modify(with: action)
201219
}
202220
}
203221

204-
mutating func requestStreamFinished() -> Action {
222+
mutating func requestStreamFinished(promise: EventLoopPromise<Void>?) -> Action {
205223
guard case .inRequest(var requestStateMachine, let close) = self.state else {
206224
preconditionFailure("Invalid state: \(self.state)")
207225
}
208226

209227
return self.avoidingStateMachineCoW { state -> Action in
210-
let action = requestStateMachine.requestStreamFinished()
228+
let action = requestStateMachine.requestStreamFinished(promise: promise)
211229
state = .inRequest(requestStateMachine, close: close)
212230
return state.modify(with: action)
213231
}
@@ -377,10 +395,10 @@ extension HTTP1ConnectionStateMachine.State {
377395
return .pauseRequestBodyStream
378396
case .resumeRequestBodyStream:
379397
return .resumeRequestBodyStream
380-
case .sendBodyPart(let part):
381-
return .sendBodyPart(part)
382-
case .sendRequestEnd:
383-
return .sendRequestEnd
398+
case .sendBodyPart(let part, let writePromise):
399+
return .sendBodyPart(part, writePromise)
400+
case .sendRequestEnd(let writePromise):
401+
return .sendRequestEnd(writePromise)
384402
case .forwardResponseHead(let head, let pauseRequestBodyStream):
385403
return .forwardResponseHead(head, pauseRequestBodyStream: pauseRequestBodyStream)
386404
case .forwardResponseBodyParts(let parts):
@@ -390,41 +408,57 @@ extension HTTP1ConnectionStateMachine.State {
390408
preconditionFailure("Invalid state: \(self)")
391409
}
392410

393-
let newFinalAction: HTTP1ConnectionStateMachine.Action.FinalStreamAction
411+
let newFinalAction: HTTP1ConnectionStateMachine.Action.FinalSuccessfulStreamAction
394412
switch finalAction {
395413
case .close:
396414
self = .closing
397415
newFinalAction = .close
398-
case .sendRequestEnd:
399-
newFinalAction = .sendRequestEnd
416+
case .sendRequestEnd(let writePromise):
417+
newFinalAction = .sendRequestEnd(writePromise)
400418
case .none:
401419
self = .idle
402420
newFinalAction = close ? .close : .informConnectionIsIdle
403421
}
404422
return .succeedRequest(newFinalAction, finalParts)
405423

406424
case .failRequest(let error, let finalAction):
407-
switch self {
408-
case .initialized:
425+
switch (self, finalAction) {
426+
case (.initialized, _):
409427
preconditionFailure("Invalid state: \(self)")
410-
case .idle:
428+
429+
case (.idle, _):
411430
preconditionFailure("How can we fail a task, if we are idle")
412-
case .inRequest(_, close: let close):
413-
if close || finalAction == .close {
414-
self = .closing
415-
return .failRequest(error, .close)
416-
} else {
417-
self = .idle
418-
return .failRequest(error, .informConnectionIsIdle)
419-
}
420431

421-
case .closing:
432+
// If we are either in .inRequest(_, close: true) or the final action is .close
433+
// we have to fail the request with .close()
434+
case (.inRequest(_, let close), .none) where close:
435+
self = .closing
436+
return .failRequest(error, .close(nil))
437+
438+
case (.inRequest(_, _), .close(let writePromise)):
439+
self = .closing
440+
return .failRequest(error, .close(writePromise))
441+
442+
// otherwise we fail with .informConnectionIsIdle
443+
case (.inRequest(_, _), .none):
444+
self = .idle
445+
return .failRequest(error, .informConnectionIsIdle(nil))
446+
447+
case (.closing, .close(let writePromise)):
448+
return .failRequest(error, .failWritePromise(writePromise))
449+
450+
case (.closing, .none):
422451
return .failRequest(error, .none)
423-
case .closed:
452+
453+
case (.closed, .close(let writePromise)):
454+
// this state can be reached, if the connection was unexpectedly closed by remote
455+
return .failRequest(error, .failWritePromise(writePromise))
456+
457+
case (.closed, .none):
424458
// this state can be reached, if the connection was unexpectedly closed by remote
425459
return .failRequest(error, .none)
426460

427-
case .modifying:
461+
case (.modifying, _):
428462
preconditionFailure("Invalid state: \(self)")
429463
}
430464

@@ -433,6 +467,12 @@ extension HTTP1ConnectionStateMachine.State {
433467

434468
case .wait:
435469
return .wait
470+
471+
case .failSendBodyPart(let error, let writePromise):
472+
return .failSendBodyPart(error, writePromise)
473+
474+
case .failSendStreamFinished(let error, let writePromise):
475+
return .failSendStreamFinished(error, writePromise)
436476
}
437477
}
438478
}

0 commit comments

Comments
 (0)