|
16 | 16 | import NIO
|
17 | 17 | import NIOConcurrencyHelpers
|
18 | 18 | import NIOHTTP1
|
| 19 | +import NIOTestUtils |
19 | 20 | import XCTest
|
20 | 21 |
|
21 | 22 | class HTTPClientInternalTests: XCTestCase {
|
@@ -453,4 +454,131 @@ class HTTPClientInternalTests: XCTestCase {
|
453 | 454 | XCTAssertEqual(httpClient.pool.connectionProviderCount, 0)
|
454 | 455 | }.futureResult.wait()
|
455 | 456 | }
|
| 457 | + |
| 458 | + func testWeNoticeRemoteClosuresEvenWhenConnectionIsIdleInPool() { |
| 459 | + final class ServerThatRespondsThenJustCloses: ChannelInboundHandler { |
| 460 | + typealias InboundIn = HTTPServerRequestPart |
| 461 | + typealias OutboundOut = HTTPServerResponsePart |
| 462 | + |
| 463 | + let requestNumber: NIOAtomic<Int> |
| 464 | + let connectionNumber: NIOAtomic<Int> |
| 465 | + |
| 466 | + init(requestNumber: NIOAtomic<Int>, connectionNumber: NIOAtomic<Int>) { |
| 467 | + self.requestNumber = requestNumber |
| 468 | + self.connectionNumber = connectionNumber |
| 469 | + } |
| 470 | + |
| 471 | + func channelActive(context: ChannelHandlerContext) { |
| 472 | + _ = self.connectionNumber.add(1) |
| 473 | + } |
| 474 | + |
| 475 | + func channelRead(context: ChannelHandlerContext, data: NIOAny) { |
| 476 | + let req = self.unwrapInboundIn(data) |
| 477 | + |
| 478 | + switch req { |
| 479 | + case .head, .body: |
| 480 | + () |
| 481 | + case .end: |
| 482 | + let last = self.requestNumber.add(1) |
| 483 | + switch last { |
| 484 | + case 0: |
| 485 | + context.write(self.wrapOutboundOut(.head(.init(version: .init(major: 1, minor: 1), status: .ok))), |
| 486 | + promise: nil) |
| 487 | + context.writeAndFlush(self.wrapOutboundOut(.end(nil))).whenComplete { _ in |
| 488 | + context.eventLoop.scheduleTask(in: .milliseconds(10)) { |
| 489 | + context.close(promise: nil) |
| 490 | + } |
| 491 | + } |
| 492 | + case 1: |
| 493 | + context.write(self.wrapOutboundOut(.head(.init(version: .init(major: 1, minor: 1), status: .ok))), |
| 494 | + promise: nil) |
| 495 | + context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil) |
| 496 | + default: |
| 497 | + XCTFail("did not expect request \(last + 1)") |
| 498 | + } |
| 499 | + } |
| 500 | + } |
| 501 | + } |
| 502 | + |
| 503 | + final class ObserveWhenClosedHandler: ChannelInboundHandler { |
| 504 | + typealias InboundIn = Any |
| 505 | + |
| 506 | + let channelInactivePromise: EventLoopPromise<Void> |
| 507 | + |
| 508 | + init(channelInactivePromise: EventLoopPromise<Void>) { |
| 509 | + self.channelInactivePromise = channelInactivePromise |
| 510 | + } |
| 511 | + |
| 512 | + func channelInactive(context: ChannelHandlerContext) { |
| 513 | + context.fireChannelInactive() |
| 514 | + self.channelInactivePromise.succeed(()) |
| 515 | + } |
| 516 | + } |
| 517 | + |
| 518 | + let group = MultiThreadedEventLoopGroup(numberOfThreads: 1) |
| 519 | + defer { |
| 520 | + XCTAssertNoThrow(try group.syncShutdownGracefully()) |
| 521 | + } |
| 522 | + let requestNumber = NIOAtomic<Int>.makeAtomic(value: 0) |
| 523 | + let connectionNumber = NIOAtomic<Int>.makeAtomic(value: 0) |
| 524 | + let sharedStateServerHandler = ServerThatRespondsThenJustCloses(requestNumber: requestNumber, |
| 525 | + connectionNumber: connectionNumber) |
| 526 | + var maybeServer: Channel? |
| 527 | + XCTAssertNoThrow(maybeServer = try ServerBootstrap(group: group) |
| 528 | + .serverChannelOption(ChannelOptions.socket(.init(SOL_SOCKET), .init(SO_REUSEADDR)), value: 1) |
| 529 | + .childChannelInitializer { channel in |
| 530 | + channel.pipeline.configureHTTPServerPipeline().flatMap { |
| 531 | + // We're deliberately adding a handler which is shared between multiple channels. This is normally |
| 532 | + // very verboten but this handler is specially crafted to tolerate this. |
| 533 | + channel.pipeline.addHandler(sharedStateServerHandler) |
| 534 | + } |
| 535 | + } |
| 536 | + .bind(host: "127.0.0.1", port: 0) |
| 537 | + .wait()) |
| 538 | + guard let server = maybeServer else { |
| 539 | + XCTFail("couldn't create server") |
| 540 | + return |
| 541 | + } |
| 542 | + defer { |
| 543 | + XCTAssertNoThrow(try server.close().wait()) |
| 544 | + } |
| 545 | + |
| 546 | + let url = "http://127.0.0.1:\(server.localAddress!.port!)" |
| 547 | + let client = HTTPClient(eventLoopGroupProvider: .shared(group)) |
| 548 | + defer { |
| 549 | + XCTAssertNoThrow(try client.syncShutdown()) |
| 550 | + } |
| 551 | + |
| 552 | + var maybeConnection: ConnectionPool.Connection? |
| 553 | + // This is pretty evil but we literally just get hold of a connection to get to the channel to be able to |
| 554 | + // observe when the server closing the connection is known to the client. |
| 555 | + XCTAssertNoThrow(maybeConnection = try client.pool.getConnection(for: .init(url: url), |
| 556 | + preference: .indifferent, |
| 557 | + on: group.next(), |
| 558 | + deadline: nil).wait()) |
| 559 | + guard let connection = maybeConnection else { |
| 560 | + XCTFail("couldn't get connection") |
| 561 | + return |
| 562 | + } |
| 563 | + |
| 564 | + // And let's also give the connection back :). |
| 565 | + client.pool.release(connection) |
| 566 | + |
| 567 | + XCTAssertEqual(0, sharedStateServerHandler.requestNumber.load()) |
| 568 | + XCTAssertEqual(1, client.pool.connectionProviderCount) |
| 569 | + XCTAssertTrue(connection.channel.isActive) |
| 570 | + XCTAssertNoThrow(XCTAssertEqual(.ok, try client.get(url: url).wait().status)) |
| 571 | + XCTAssertEqual(1, sharedStateServerHandler.connectionNumber.load()) |
| 572 | + XCTAssertEqual(1, sharedStateServerHandler.requestNumber.load()) |
| 573 | + |
| 574 | + // We have received the first response and we know the remote end will now close the connection. |
| 575 | + // Let's wait until we see the closure in the client's channel. |
| 576 | + XCTAssertNoThrow(try connection.channel.closeFuture.wait()) |
| 577 | + |
| 578 | + // Now that we should have learned that the connection is dead, a subsequent request should work and use a new |
| 579 | + // connection |
| 580 | + XCTAssertNoThrow(XCTAssertEqual(.ok, try client.get(url: url).wait().status)) |
| 581 | + XCTAssertEqual(2, sharedStateServerHandler.connectionNumber.load()) |
| 582 | + XCTAssertEqual(2, sharedStateServerHandler.requestNumber.load()) |
| 583 | + } |
456 | 584 | }
|
0 commit comments