Skip to content

Commit 82d51cb

Browse files
committed
Call didSendRequestPart at the right time
# Motivation Right now, we call `didSendRequestPart` after passing the write to the executor. However, this does not mean that the write hit the socket. To implement proper backpressure using the delegate, we should only call this method once the write was successful. # Modification Pass a promise to the actual channel write and only call the delegate once that promise succeeds. # Result The delegate method `didSendRequestPart` is only called after the write was successful.
1 parent 9d8cd95 commit 82d51cb

11 files changed

+379
-343
lines changed

Sources/AsyncHTTPClient/AsyncAwait/Transaction.swift

+4-4
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ final class Transaction: @unchecked Sendable {
6363

6464
switch writeAction {
6565
case .writeAndWait(let executor), .writeAndContinue(let executor):
66-
executor.writeRequestBodyPart(.byteBuffer(byteBuffer), request: self)
66+
executor.writeRequestBodyPart(.byteBuffer(byteBuffer), request: self, promise: nil)
6767

6868
case .fail:
6969
// an error/cancellation has happened. we don't need to continue here
@@ -105,14 +105,14 @@ final class Transaction: @unchecked Sendable {
105105
switch self.state.writeNextRequestPart() {
106106
case .writeAndContinue(let executor):
107107
self.stateLock.unlock()
108-
executor.writeRequestBodyPart(.byteBuffer(part), request: self)
108+
executor.writeRequestBodyPart(.byteBuffer(part), request: self, promise: nil)
109109

110110
case .writeAndWait(let executor):
111111
try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Void, Error>) in
112112
self.state.waitForRequestBodyDemand(continuation: continuation)
113113
self.stateLock.unlock()
114114

115-
executor.writeRequestBodyPart(.byteBuffer(part), request: self)
115+
executor.writeRequestBodyPart(.byteBuffer(part), request: self, promise: nil)
116116
}
117117

118118
case .fail:
@@ -132,7 +132,7 @@ final class Transaction: @unchecked Sendable {
132132
break
133133

134134
case .forwardStreamFinished(let executor, let succeedContinuation):
135-
executor.finishRequestBodyStream(self)
135+
executor.finishRequestBodyStream(self, promise: nil)
136136
succeedContinuation?.resume(returning: nil)
137137
}
138138
return

Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift

+21-18
Original file line numberDiff line numberDiff line change
@@ -185,11 +185,11 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
185185
case .sendRequestHead(let head, startBody: let startBody):
186186
self.sendRequestHead(head, startBody: startBody, context: context)
187187

188-
case .sendBodyPart(let part):
189-
context.writeAndFlush(self.wrapOutboundOut(.body(part)), promise: nil)
188+
case .sendBodyPart(let part, let promise):
189+
context.writeAndFlush(self.wrapOutboundOut(.body(part)), promise: promise)
190190

191-
case .sendRequestEnd:
192-
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
191+
case .sendRequestEnd(let promise):
192+
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: promise)
193193

194194
if let timeoutAction = self.idleReadTimeoutStateMachine?.requestEndSent() {
195195
self.runTimeoutAction(timeoutAction, context: context)
@@ -223,8 +223,8 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
223223
case .close:
224224
context.close(promise: nil)
225225

226-
case .wait:
227-
break
226+
case .wait(let promise):
227+
promise?.fail(ChannelError.eof)
228228

229229
case .forwardResponseHead(let head, let pauseRequestBodyStream):
230230
// We can force unwrap the request here, as we have just validated in the state machine,
@@ -242,7 +242,7 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
242242
// that the request is neither failed nor finished yet
243243
self.request!.receiveResponseBodyParts(buffer)
244244

245-
case .succeedRequest(let finalAction, let buffer):
245+
case .succeedRequest(let finalAction, let buffer, let promise):
246246
// We can force unwrap the request here, as we have just validated in the state machine,
247247
// that the request is neither failed nor finished yet
248248

@@ -269,8 +269,9 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
269269
}
270270

271271
oldRequest.succeedRequest(buffer)
272+
promise?.succeed(())
272273

273-
case .failRequest(let error, let finalAction):
274+
case .failRequest(let error, let finalAction, let promise):
274275
// see comment in the `succeedRequest` case.
275276
let oldRequest = self.request!
276277
self.request = nil
@@ -288,6 +289,7 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
288289
}
289290

290291
oldRequest.fail(error)
292+
promise?.fail(error)
291293
}
292294
}
293295

@@ -355,27 +357,28 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
355357

356358
// MARK: Private HTTPRequestExecutor
357359

358-
private func writeRequestBodyPart0(_ data: IOData, request: HTTPExecutableRequest) {
360+
private func writeRequestBodyPart0(_ data: IOData, request: HTTPExecutableRequest, promise: EventLoopPromise<Void>?) {
359361
guard self.request === request, let context = self.channelContext else {
360362
// Because the HTTPExecutableRequest may run in a different thread to our eventLoop,
361363
// calls from the HTTPExecutableRequest to our ChannelHandler may arrive here after
362364
// the request has been popped by the state machine or the ChannelHandler has been
363365
// removed from the Channel pipeline. This is a normal threading issue, noone has
364366
// screwed up.
367+
promise?.fail(ChannelError.eof)
365368
return
366369
}
367370

368-
let action = self.state.requestStreamPartReceived(data)
371+
let action = self.state.requestStreamPartReceived(data, promise: promise)
369372
self.run(action, context: context)
370373
}
371374

372-
private func finishRequestBodyStream0(_ request: HTTPExecutableRequest) {
375+
private func finishRequestBodyStream0(_ request: HTTPExecutableRequest, promise: EventLoopPromise<Void>?) {
373376
guard self.request === request, let context = self.channelContext else {
374377
// See code comment in `writeRequestBodyPart0`
375378
return
376379
}
377380

378-
let action = self.state.requestStreamFinished()
381+
let action = self.state.requestStreamFinished(promise: promise)
379382
self.run(action, context: context)
380383
}
381384

@@ -405,22 +408,22 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
405408
}
406409

407410
extension HTTP1ClientChannelHandler: HTTPRequestExecutor {
408-
func writeRequestBodyPart(_ data: IOData, request: HTTPExecutableRequest) {
411+
func writeRequestBodyPart(_ data: IOData, request: HTTPExecutableRequest, promise: EventLoopPromise<Void>?) {
409412
if self.eventLoop.inEventLoop {
410-
self.writeRequestBodyPart0(data, request: request)
413+
self.writeRequestBodyPart0(data, request: request, promise: promise)
411414
} else {
412415
self.eventLoop.execute {
413-
self.writeRequestBodyPart0(data, request: request)
416+
self.writeRequestBodyPart0(data, request: request, promise: promise)
414417
}
415418
}
416419
}
417420

418-
func finishRequestBodyStream(_ request: HTTPExecutableRequest) {
421+
func finishRequestBodyStream(_ request: HTTPExecutableRequest, promise: EventLoopPromise<Void>?) {
419422
if self.eventLoop.inEventLoop {
420-
self.finishRequestBodyStream0(request)
423+
self.finishRequestBodyStream0(request, promise: promise)
421424
} else {
422425
self.eventLoop.execute {
423-
self.finishRequestBodyStream0(request)
426+
self.finishRequestBodyStream0(request, promise: promise)
424427
}
425428
}
426429
}

Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ConnectionStateMachine.swift

+30-30
Original file line numberDiff line numberDiff line change
@@ -41,21 +41,21 @@ struct HTTP1ConnectionStateMachine {
4141
}
4242

4343
case sendRequestHead(HTTPRequestHead, startBody: Bool)
44-
case sendBodyPart(IOData)
45-
case sendRequestEnd
44+
case sendBodyPart(IOData, EventLoopPromise<Void>?)
45+
case sendRequestEnd(EventLoopPromise<Void>?)
4646

4747
case pauseRequestBodyStream
4848
case resumeRequestBodyStream
4949

5050
case forwardResponseHead(HTTPResponseHead, pauseRequestBodyStream: Bool)
5151
case forwardResponseBodyParts(CircularBuffer<ByteBuffer>)
5252

53-
case failRequest(Error, FinalStreamAction)
54-
case succeedRequest(FinalStreamAction, CircularBuffer<ByteBuffer>)
53+
case failRequest(Error, FinalStreamAction, EventLoopPromise<Void>?)
54+
case succeedRequest(FinalStreamAction, CircularBuffer<ByteBuffer>, EventLoopPromise<Void>?)
5555

5656
case read
5757
case close
58-
case wait
58+
case wait(EventLoopPromise<Void>?)
5959

6060
case fireChannelActive
6161
case fireChannelInactive
@@ -80,7 +80,7 @@ struct HTTP1ConnectionStateMachine {
8080
// Since NIO triggers promise before pipeline, the handler might have been added to the
8181
// pipeline, before the channelActive callback was triggered. For this reason, we might
8282
// get the channelActive call twice
83-
return .wait
83+
return .wait(nil)
8484

8585
case .modifying:
8686
preconditionFailure("Invalid state: \(self.state)")
@@ -104,7 +104,7 @@ struct HTTP1ConnectionStateMachine {
104104
return .fireChannelInactive
105105

106106
case .closed:
107-
return .wait
107+
return .wait(nil)
108108

109109
case .modifying:
110110
preconditionFailure("Invalid state: \(self.state)")
@@ -141,7 +141,7 @@ struct HTTP1ConnectionStateMachine {
141141

142142
switch self.state {
143143
case .initialized, .idle, .closing, .closed:
144-
return .wait
144+
return .wait(nil)
145145
case .inRequest(var requestStateMachine, let close):
146146
return self.avoidingStateMachineCoW { state -> Action in
147147
let action = requestStateMachine.writabilityChanged(writable: writable)
@@ -173,7 +173,7 @@ struct HTTP1ConnectionStateMachine {
173173
// as closed.
174174
//
175175
// TODO: AHC should support a fast rescheduling mechanism here.
176-
return .failRequest(HTTPClientError.remoteConnectionClosed, .none)
176+
return .failRequest(HTTPClientError.remoteConnectionClosed, .none, nil)
177177

178178
case .idle:
179179
var requestStateMachine = HTTPRequestStateMachine(isChannelWritable: self.isChannelWritable)
@@ -189,25 +189,25 @@ struct HTTP1ConnectionStateMachine {
189189
}
190190
}
191191

192-
mutating func requestStreamPartReceived(_ part: IOData) -> Action {
192+
mutating func requestStreamPartReceived(_ part: IOData, promise: EventLoopPromise<Void>?) -> Action {
193193
guard case .inRequest(var requestStateMachine, let close) = self.state else {
194194
preconditionFailure("Invalid state: \(self.state)")
195195
}
196196

197197
return self.avoidingStateMachineCoW { state -> Action in
198-
let action = requestStateMachine.requestStreamPartReceived(part)
198+
let action = requestStateMachine.requestStreamPartReceived(part, promise: promise)
199199
state = .inRequest(requestStateMachine, close: close)
200200
return state.modify(with: action)
201201
}
202202
}
203203

204-
mutating func requestStreamFinished() -> Action {
204+
mutating func requestStreamFinished(promise: EventLoopPromise<Void>?) -> Action {
205205
guard case .inRequest(var requestStateMachine, let close) = self.state else {
206206
preconditionFailure("Invalid state: \(self.state)")
207207
}
208208

209209
return self.avoidingStateMachineCoW { state -> Action in
210-
let action = requestStateMachine.requestStreamFinished()
210+
let action = requestStateMachine.requestStreamFinished(promise: promise)
211211
state = .inRequest(requestStateMachine, close: close)
212212
return state.modify(with: action)
213213
}
@@ -223,7 +223,7 @@ struct HTTP1ConnectionStateMachine {
223223
self.state = .closing
224224
return .close
225225
} else {
226-
return .wait
226+
return .wait(nil)
227227
}
228228

229229
case .inRequest(var requestStateMachine, close: let close):
@@ -234,7 +234,7 @@ struct HTTP1ConnectionStateMachine {
234234
}
235235

236236
case .closing, .closed:
237-
return .wait
237+
return .wait(nil)
238238

239239
case .modifying:
240240
preconditionFailure("Invalid state: \(self.state)")
@@ -284,7 +284,7 @@ struct HTTP1ConnectionStateMachine {
284284
}
285285

286286
case .closing, .closed:
287-
return .wait
287+
return .wait(nil)
288288

289289
case .modifying:
290290
preconditionFailure("Invalid state: \(self.state)")
@@ -294,7 +294,7 @@ struct HTTP1ConnectionStateMachine {
294294
mutating func channelReadComplete() -> Action {
295295
switch self.state {
296296
case .initialized, .idle, .closing, .closed:
297-
return .wait
297+
return .wait(nil)
298298

299299
case .inRequest(var requestStateMachine, let close):
300300
return self.avoidingStateMachineCoW { state -> Action in
@@ -377,15 +377,15 @@ extension HTTP1ConnectionStateMachine.State {
377377
return .pauseRequestBodyStream
378378
case .resumeRequestBodyStream:
379379
return .resumeRequestBodyStream
380-
case .sendBodyPart(let part):
381-
return .sendBodyPart(part)
382-
case .sendRequestEnd:
383-
return .sendRequestEnd
380+
case .sendBodyPart(let part, let promise):
381+
return .sendBodyPart(part, promise)
382+
case .sendRequestEnd(let promise):
383+
return .sendRequestEnd(promise)
384384
case .forwardResponseHead(let head, let pauseRequestBodyStream):
385385
return .forwardResponseHead(head, pauseRequestBodyStream: pauseRequestBodyStream)
386386
case .forwardResponseBodyParts(let parts):
387387
return .forwardResponseBodyParts(parts)
388-
case .succeedRequest(let finalAction, let finalParts):
388+
case .succeedRequest(let finalAction, let finalParts, let promise):
389389
guard case .inRequest(_, close: let close) = self else {
390390
preconditionFailure("Invalid state: \(self)")
391391
}
@@ -401,9 +401,9 @@ extension HTTP1ConnectionStateMachine.State {
401401
self = .idle
402402
newFinalAction = close ? .close : .informConnectionIsIdle
403403
}
404-
return .succeedRequest(newFinalAction, finalParts)
404+
return .succeedRequest(newFinalAction, finalParts, promise)
405405

406-
case .failRequest(let error, let finalAction):
406+
case .failRequest(let error, let finalAction, let promise):
407407
switch self {
408408
case .initialized:
409409
preconditionFailure("Invalid state: \(self)")
@@ -412,17 +412,17 @@ extension HTTP1ConnectionStateMachine.State {
412412
case .inRequest(_, close: let close):
413413
if close || finalAction == .close {
414414
self = .closing
415-
return .failRequest(error, .close)
415+
return .failRequest(error, .close, promise)
416416
} else {
417417
self = .idle
418-
return .failRequest(error, .informConnectionIsIdle)
418+
return .failRequest(error, .informConnectionIsIdle, promise)
419419
}
420420

421421
case .closing:
422-
return .failRequest(error, .none)
422+
return .failRequest(error, .none, nil)
423423
case .closed:
424424
// this state can be reached, if the connection was unexpectedly closed by remote
425-
return .failRequest(error, .none)
425+
return .failRequest(error, .none, nil)
426426

427427
case .modifying:
428428
preconditionFailure("Invalid state: \(self)")
@@ -431,8 +431,8 @@ extension HTTP1ConnectionStateMachine.State {
431431
case .read:
432432
return .read
433433

434-
case .wait:
435-
return .wait
434+
case .wait(let promise):
435+
return .wait(promise)
436436
}
437437
}
438438
}

0 commit comments

Comments
 (0)