Skip to content

test that connection close whilst idle works #167

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ extension HTTPClientInternalTests {
("testRequestURITrailingSlash", testRequestURITrailingSlash),
("testChannelAndDelegateOnDifferentEventLoops", testChannelAndDelegateOnDifferentEventLoops),
("testResponseConnectionCloseGet", testResponseConnectionCloseGet),
("testWeNoticeRemoteClosuresEvenWhenConnectionIsIdleInPool", testWeNoticeRemoteClosuresEvenWhenConnectionIsIdleInPool),
]
}
}
128 changes: 128 additions & 0 deletions Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import NIO
import NIOConcurrencyHelpers
import NIOHTTP1
import NIOTestUtils
import XCTest

class HTTPClientInternalTests: XCTestCase {
Expand Down Expand Up @@ -453,4 +454,131 @@ class HTTPClientInternalTests: XCTestCase {
XCTAssertEqual(httpClient.pool.connectionProviderCount, 0)
}.futureResult.wait()
}

func testWeNoticeRemoteClosuresEvenWhenConnectionIsIdleInPool() {
final class ServerThatRespondsThenJustCloses: ChannelInboundHandler {
typealias InboundIn = HTTPServerRequestPart
typealias OutboundOut = HTTPServerResponsePart

let requestNumber: NIOAtomic<Int>
let connectionNumber: NIOAtomic<Int>

init(requestNumber: NIOAtomic<Int>, connectionNumber: NIOAtomic<Int>) {
self.requestNumber = requestNumber
self.connectionNumber = connectionNumber
}

func channelActive(context: ChannelHandlerContext) {
_ = self.connectionNumber.add(1)
}

func channelRead(context: ChannelHandlerContext, data: NIOAny) {
let req = self.unwrapInboundIn(data)

switch req {
case .head, .body:
()
case .end:
let last = self.requestNumber.add(1)
switch last {
case 0:
context.write(self.wrapOutboundOut(.head(.init(version: .init(major: 1, minor: 1), status: .ok))),
promise: nil)
context.writeAndFlush(self.wrapOutboundOut(.end(nil))).whenComplete { _ in
context.eventLoop.scheduleTask(in: .milliseconds(10)) {
context.close(promise: nil)
}
}
case 1:
context.write(self.wrapOutboundOut(.head(.init(version: .init(major: 1, minor: 1), status: .ok))),
promise: nil)
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
default:
XCTFail("did not expect request \(last + 1)")
}
}
}
}

final class ObserveWhenClosedHandler: ChannelInboundHandler {
typealias InboundIn = Any

let channelInactivePromise: EventLoopPromise<Void>

init(channelInactivePromise: EventLoopPromise<Void>) {
self.channelInactivePromise = channelInactivePromise
}

func channelInactive(context: ChannelHandlerContext) {
context.fireChannelInactive()
self.channelInactivePromise.succeed(())
}
}

let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer {
XCTAssertNoThrow(try group.syncShutdownGracefully())
}
let requestNumber = NIOAtomic<Int>.makeAtomic(value: 0)
let connectionNumber = NIOAtomic<Int>.makeAtomic(value: 0)
let sharedStateServerHandler = ServerThatRespondsThenJustCloses(requestNumber: requestNumber,
connectionNumber: connectionNumber)
var maybeServer: Channel?
XCTAssertNoThrow(maybeServer = try ServerBootstrap(group: group)
.serverChannelOption(ChannelOptions.socket(.init(SOL_SOCKET), .init(SO_REUSEADDR)), value: 1)
.childChannelInitializer { channel in
channel.pipeline.configureHTTPServerPipeline().flatMap {
// We're deliberately adding a handler which is shared between multiple channels. This is normally
// very verboten but this handler is specially crafted to tolerate this.
channel.pipeline.addHandler(sharedStateServerHandler)
}
}
.bind(host: "127.0.0.1", port: 0)
.wait())
guard let server = maybeServer else {
XCTFail("couldn't create server")
return
}
defer {
XCTAssertNoThrow(try server.close().wait())
}

let url = "http://127.0.0.1:\(server.localAddress!.port!)"
let client = HTTPClient(eventLoopGroupProvider: .shared(group))
defer {
XCTAssertNoThrow(try client.syncShutdown())
}

var maybeConnection: ConnectionPool.Connection?
// This is pretty evil but we literally just get hold of a connection to get to the channel to be able to
// observe when the server closing the connection is known to the client.
XCTAssertNoThrow(maybeConnection = try client.pool.getConnection(for: .init(url: url),
preference: .indifferent,
on: group.next(),
deadline: nil).wait())
guard let connection = maybeConnection else {
XCTFail("couldn't get connection")
return
}

// And let's also give the connection back :).
client.pool.release(connection)

XCTAssertEqual(0, sharedStateServerHandler.requestNumber.load())
XCTAssertEqual(1, client.pool.connectionProviderCount)
XCTAssertTrue(connection.channel.isActive)
XCTAssertNoThrow(XCTAssertEqual(.ok, try client.get(url: url).wait().status))
XCTAssertEqual(1, sharedStateServerHandler.connectionNumber.load())
XCTAssertEqual(1, sharedStateServerHandler.requestNumber.load())

// We have received the first response and we know the remote end will now close the connection.
// Let's wait until we see the closure in the client's channel.
XCTAssertNoThrow(try connection.channel.closeFuture.wait())

// Now that we should have learned that the connection is dead, a subsequent request should work and use a new
// connection
XCTAssertNoThrow(XCTAssertEqual(.ok, try client.get(url: url).wait().status))
XCTAssertEqual(2, sharedStateServerHandler.connectionNumber.load())
XCTAssertEqual(2, sharedStateServerHandler.requestNumber.load())
}
}