Skip to content

Fix bi directional streaming test #405

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 129 additions & 8 deletions Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1149,23 +1149,144 @@ struct CollectEverythingLogHandler: LogHandler {
}
}

class StreamDelegate: HTTPClientResponseDelegate {
typealias Response = Void

enum State {
case idle
case waitingForBytes(EventLoopPromise<ByteBuffer?>)
case buffering(ByteBuffer, done: Bool)
case failed(Error)
case finished
}

let eventLoop: EventLoop
private var state: State = .idle

init(eventLoop: EventLoop) {
self.eventLoop = eventLoop
}

func next() -> EventLoopFuture<ByteBuffer?> {
if self.eventLoop.inEventLoop {
return self.next0()
} else {
return self.eventLoop.flatSubmit {
self.next0()
}
}
}

private func next0() -> EventLoopFuture<ByteBuffer?> {
switch self.state {
case .idle:
let promise = self.eventLoop.makePromise(of: ByteBuffer?.self)
self.state = .waitingForBytes(promise)
return promise.futureResult

case .buffering(let byteBuffer, done: false):
self.state = .idle
return self.eventLoop.makeSucceededFuture(byteBuffer)

case .buffering(let byteBuffer, done: true):
self.state = .finished
return self.eventLoop.makeSucceededFuture(byteBuffer)

case .waitingForBytes:
preconditionFailure("Don't call `.next` twice")

case .failed(let error):
self.state = .finished
return self.eventLoop.makeFailedFuture(error)

case .finished:
return self.eventLoop.makeSucceededFuture(nil)
}
}

// MARK: HTTPClientResponseDelegate

func didSendRequestHead(task: HTTPClient.Task<Response>, _ head: HTTPRequestHead) {
XCTAssert(self.eventLoop.inEventLoop)
}

func didSendRequestPart(task: HTTPClient.Task<Response>, _ part: IOData) {
XCTAssert(self.eventLoop.inEventLoop)
}

func didSendRequest(task: HTTPClient.Task<Response>) {
XCTAssert(self.eventLoop.inEventLoop)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove this assertion? inEventLoop is validly allowed to return false negatives, so we shouldn't assert that it won't. We can add preconditionInEventLoop instead if you want the check.

}

func didReceiveHead(task: HTTPClient.Task<Response>, _ head: HTTPResponseHead) -> EventLoopFuture<Void> {
XCTAssert(self.eventLoop.inEventLoop)
return task.eventLoop.makeSucceededVoidFuture()
}

func didReceiveBodyPart(task: HTTPClient.Task<Response>, _ buffer: ByteBuffer) -> EventLoopFuture<Void> {
XCTAssert(self.eventLoop.inEventLoop)

switch self.state {
case .idle:
self.state = .buffering(buffer, done: false)
case .waitingForBytes(let promise):
self.state = .idle
promise.succeed(buffer)
case .buffering(var byteBuffer, done: false):
var buffer = buffer
byteBuffer.writeBuffer(&buffer)
self.state = .buffering(byteBuffer, done: false)
case .buffering(_, done: true), .finished, .failed:
preconditionFailure("Invalid state: \(self.state)")
}

return task.eventLoop.makeSucceededVoidFuture()
}

func didReceiveError(task: HTTPClient.Task<Response>, _ error: Error) {
XCTAssert(self.eventLoop.inEventLoop)

switch self.state {
case .idle:
self.state = .failed(error)
case .waitingForBytes(let promise):
self.state = .finished
promise.fail(error)
case .buffering(_, done: false):
self.state = .failed(error)
case .buffering(_, done: true), .finished, .failed:
preconditionFailure("Invalid state: \(self.state)")
}
}

func didFinishRequest(task: HTTPClient.Task<Response>) throws {
XCTAssert(self.eventLoop.inEventLoop)

switch self.state {
case .idle:
self.state = .finished
case .waitingForBytes(let promise):
self.state = .finished
promise.succeed(nil)
case .buffering(let byteBuffer, done: false):
self.state = .buffering(byteBuffer, done: true)
case .buffering(_, done: true), .finished, .failed:
preconditionFailure("Invalid state: \(self.state)")
}
}
}

class HTTPEchoHandler: ChannelInboundHandler {
typealias InboundIn = HTTPServerRequestPart
typealias OutboundOut = HTTPServerResponsePart

var promises: CircularBuffer<EventLoopPromise<Void>> = CircularBuffer()

func channelRead(context: ChannelHandlerContext, data: NIOAny) {
let request = self.unwrapInboundIn(data)
switch request {
case .head:
context.writeAndFlush(self.wrapOutboundOut(.head(.init(version: .init(major: 1, minor: 1), status: .ok))), promise: nil)
context.writeAndFlush(self.wrapOutboundOut(.head(.init(version: .http1_1, status: .ok))), promise: nil)
case .body(let bytes):
context.writeAndFlush(self.wrapOutboundOut(.body(.byteBuffer(bytes)))).whenSuccess {
if let promise = self.promises.popFirst() {
promise.succeed(())
}
}
context.writeAndFlush(self.wrapOutboundOut(.body(.byteBuffer(bytes))), promise: nil)
case .end:
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
context.close(promise: nil)
Expand Down
73 changes: 49 additions & 24 deletions Tests/AsyncHTTPClientTests/HTTPClientTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2815,40 +2815,65 @@ class HTTPClientTests: XCTestCase {
XCTAssertEqual(result, .success, "we never closed the connection!")
}

func testBiDirectionalStreaming() throws {
let handler = HTTPEchoHandler()
// In this test, we test that a request can continue to stream its body after the response head,
// was received. The client sends a number to the server and waits for the server to echo the
// number. Once the client receives the echoed number, it will continue with the next number.
// The client and server ping/pong 30 times.
func testBiDirectionalStreaming() {
let httpBin = HTTPBin(.http1_1(ssl: false, compress: false)) { _ in HTTPEchoHandler() }
defer { XCTAssertNoThrow(try httpBin.shutdown()) }

let server = try ServerBootstrap(group: self.serverGroup)
.serverChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1)
.childChannelInitializer { channel in
channel.pipeline.configureHTTPServerPipeline().flatMap {
channel.pipeline.addHandler(handler)
}
}
.bind(host: "localhost", port: 0)
.wait()
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 2)
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }
let writeEL = eventLoopGroup.next()
let delegateEL = eventLoopGroup.next()

defer {
server.close(promise: nil)
}
let httpClient = HTTPClient(eventLoopGroupProvider: .shared(eventLoopGroup))
defer { XCTAssertNoThrow(try httpClient.syncShutdown()) }

let delegate = StreamDelegate(eventLoop: delegateEL)

let body: HTTPClient.Body = .stream { writer in
let promise = self.clientGroup.next().makePromise(of: Void.self)
handler.promises.append(promise)
return writer.write(.byteBuffer(ByteBuffer(string: "hello"))).flatMap {
promise.futureResult
}.flatMap {
let promise = self.clientGroup.next().makePromise(of: Void.self)
handler.promises.append(promise)
return writer.write(.byteBuffer(ByteBuffer(string: "hello2"))).flatMap {
promise.futureResult
let finalPromise = writeEL.makePromise(of: Void.self)

func writeLoop(_ writer: HTTPClient.Body.StreamWriter, index: Int) {
XCTAssert(writeEL.inEventLoop, "Always write from unexpected el")

if index >= 30 {
return finalPromise.succeed(())
}

let sent = ByteBuffer(integer: index)
writer.write(.byteBuffer(sent)).flatMap { () -> EventLoopFuture<ByteBuffer?> in
XCTAssert(delegateEL.inEventLoop, "Always dispatch back to delegate el")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same note here about false positives.

return delegate.next()
}.whenComplete { result in
switch result {
case .success(let returned):
XCTAssertEqual(returned, sent)

writeEL.execute {
writeLoop(writer, index: index + 1)
}

case .failure(let error):
finalPromise.fail(error)
}
}
}

writeEL.execute {
writeLoop(writer, index: 0)
}

return finalPromise.futureResult
}

let future = self.defaultClient.execute(url: "http://localhost:\(server.localAddress!.port!)", body: body)
let request = try! HTTPClient.Request(url: "http://localhost:\(httpBin.port)", body: body)
let future = httpClient.execute(request: request, delegate: delegate, eventLoop: .delegate(on: delegateEL))

XCTAssertNoThrow(try future.wait())
XCTAssertNil(try delegate.next().wait())
}

func testSynchronousHandshakeErrorReporting() throws {
Expand Down