Skip to content

Fixes issue #234 : ConnectionsState exposes a setter into internal state #311

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 6 additions & 64 deletions Sources/AsyncHTTPClient/ConnectionPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ class HTTP1ConnectionProvider {
logger.trace("opening fresh connection (found matching but inactive connection)",
metadata: ["ahc-dead-connection": "\(connection)"])
self.makeChannel(preference: waiter.preference).whenComplete { result in
self.connect(result, waiter: waiter, replacing: connection, logger: logger)
self.connect(result, waiter: waiter, logger: logger)
}
}
}
Expand All @@ -252,7 +252,7 @@ class HTTP1ConnectionProvider {
metadata: ["ahc-old-connection": "\(connection)",
"ahc-waiter": "\(waiter)"])
self.makeChannel(preference: waiter.preference).whenComplete { result in
self.connect(result, waiter: waiter, replacing: connection, logger: logger)
self.connect(result, waiter: waiter, logger: logger)
}
}
case .park(let connection):
Expand Down Expand Up @@ -308,21 +308,15 @@ class HTTP1ConnectionProvider {
return waiter.promise.futureResult
}

func connect(_ result: Result<Channel, Error>,
waiter: Waiter<Connection>,
replacing closedConnection: Connection? = nil,
logger: Logger) {
func connect(_ result: Result<Channel, Error>, waiter: Waiter<Connection>, logger: Logger) {
let action: Action<Connection>
switch result {
case .success(let channel):
logger.trace("successfully created connection",
metadata: ["ahc-connection": "\(channel)"])
let connection = Connection(channel: channel, provider: self)
action = self.lock.withLock {
if let closedConnection = closedConnection {
self.state.drop(connection: closedConnection)
}
return self.state.offer(connection: connection)
self.state.offer(connection: connection)
}

switch action {
Expand Down Expand Up @@ -367,7 +361,7 @@ class HTTP1ConnectionProvider {
// This is needed to start a new stack, otherwise, since this is called on a previous
// future completion handler chain, it will be growing indefinitely until the connection is closed.
// We might revisit this when https://github.com/apple/swift-nio/issues/970 is resolved.
connection.channel.eventLoop.execute {
connection.eventLoop.execute {
self.execute(action, logger: logger)
}
}
Expand Down Expand Up @@ -418,59 +412,7 @@ class HTTP1ConnectionProvider {
}

private func makeChannel(preference: HTTPClient.EventLoopPreference) -> EventLoopFuture<Channel> {
let channelEventLoop = preference.bestEventLoop ?? self.eventLoop
let requiresTLS = self.key.scheme.requiresTLS
let bootstrap: NIOClientTCPBootstrap
do {
bootstrap = try NIOClientTCPBootstrap.makeHTTPClientBootstrapBase(on: channelEventLoop, host: self.key.host, port: self.key.port, requiresTLS: requiresTLS, configuration: self.configuration)
} catch {
return channelEventLoop.makeFailedFuture(error)
}

let channel: EventLoopFuture<Channel>
switch self.key.scheme {
case .http, .https:
let address = HTTPClient.resolveAddress(host: self.key.host, port: self.key.port, proxy: self.configuration.proxy)
channel = bootstrap.connect(host: address.host, port: address.port)
case .unix, .http_unix, .https_unix:
channel = bootstrap.connect(unixDomainSocketPath: self.key.unixPath)
}

return channel.flatMap { channel in
let requiresSSLHandler = self.configuration.proxy != nil && self.key.scheme.requiresTLS
let handshakePromise = channel.eventLoop.makePromise(of: Void.self)

channel.pipeline.addSSLHandlerIfNeeded(for: self.key, tlsConfiguration: self.configuration.tlsConfiguration, addSSLClient: requiresSSLHandler, handshakePromise: handshakePromise)

return handshakePromise.futureResult.flatMap {
channel.pipeline.addHTTPClientHandlers(leftOverBytesStrategy: .forwardBytes)
}.flatMap {
#if canImport(Network)
if #available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *), bootstrap.underlyingBootstrap is NIOTSConnectionBootstrap {
return channel.pipeline.addHandler(HTTPClient.NWErrorHandler(), position: .first)
}
#endif
return channel.eventLoop.makeSucceededFuture(())
}.flatMap {
switch self.configuration.decompression {
case .disabled:
return channel.eventLoop.makeSucceededFuture(())
case .enabled(let limit):
let decompressHandler = NIOHTTPResponseDecompressor(limit: limit)
return channel.pipeline.addHandler(decompressHandler)
}
}.map {
channel
}
}.flatMapError { error in
#if canImport(Network)
var error = error
if #available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *), bootstrap.underlyingBootstrap is NIOTSConnectionBootstrap {
error = HTTPClient.NWErrorHandler.translateError(error)
}
#endif
return channelEventLoop.makeFailedFuture(error)
}
return NIOClientTCPBootstrap.makeHTTP1Channel(destination: self.key, eventLoop: self.eventLoop, configuration: self.configuration, preference: preference)
}

/// A `Waiter` represents a request that waits for a connection when none is
Expand Down
75 changes: 27 additions & 48 deletions Sources/AsyncHTTPClient/ConnectionsState.swift
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,6 @@ extension HTTP1ConnectionProvider {
return Snapshot(state: self.state, availableConnections: self.availableConnections, leasedConnections: self.leasedConnections, waiters: self.waiters, openedConnectionsCount: self.openedConnectionsCount, pending: self.pending)
}

mutating func testsOnly_setInternalState(_ snapshot: Snapshot<ConnectionType>) {
self.state = snapshot.state
self.availableConnections = snapshot.availableConnections
self.leasedConnections = snapshot.leasedConnections
self.waiters = snapshot.waiters
self.openedConnectionsCount = snapshot.openedConnectionsCount
self.pending = snapshot.pending
}

func assertInvariants() {
assert(self.waiters.isEmpty)
assert(self.availableConnections.isEmpty)
Expand Down Expand Up @@ -158,30 +149,22 @@ extension HTTP1ConnectionProvider {

if connection.isActiveEstimation, !closing { // If connection is alive, we can offer it to a next waiter
if let waiter = self.waiters.popFirst() {
// There should be no case where we have both capacity and a waiter here.
// Waiter can only exists if there was no capacity at aquire. If some connection
// is released when we have waiter it can only indicate that we should lease (if EL are the same),
// or replace (if they are different). But we cannot increase connection count here.
assert(!self.hasCapacity)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this code behave gracefully if this invariant is not true?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. Worst case scenario - we will replace the connection with a new one without checking for potentially available connections first. Though if this happens, it means that there is a bug in our code... What do you think we should do if for some reason we fail this?


let (eventLoop, required) = self.resolvePreference(waiter.preference)

// If returned connection is on same EL or we do not require special EL - lease it
if connection.eventLoop === eventLoop || !required {
return .lease(connection, waiter)
}

// If there is an opened connection on the same loop, lease it and park returned
if let found = self.availableConnections.firstIndex(where: { $0.eventLoop === eventLoop }) {
self.leasedConnections.remove(ConnectionKey(connection))
let replacement = self.availableConnections.swap(at: found, with: connection)
self.leasedConnections.insert(ConnectionKey(replacement))
return .parkAnd(connection, .lease(replacement, waiter))
}

// If we can create new connection - do it
if self.hasCapacity {
self.leasedConnections.remove(ConnectionKey(connection))
self.availableConnections.append(connection)
self.openedConnectionsCount += 1
return .parkAnd(connection, .create(waiter))
}

// If we cannot create new connections, we will have to replace returned connection with a new one on the required loop
// We will keep the `openedConnectionCount`, since .replace === .create, so we decrease and increase the `openedConnectionCount`
self.leasedConnections.remove(ConnectionKey(connection))
return .replace(connection, waiter)
} else { // or park, if there are no waiters
self.leasedConnections.remove(ConnectionKey(connection))
Expand Down Expand Up @@ -214,15 +197,6 @@ extension HTTP1ConnectionProvider {
}
}

mutating func drop(connection: ConnectionType) {
switch self.state {
case .active:
self.leasedConnections.remove(ConnectionKey(connection))
case .closed:
assertionFailure("should not happen")
}
}

mutating func connectFailed() -> Action<ConnectionType> {
switch self.state {
case .active:
Expand Down Expand Up @@ -287,20 +261,25 @@ extension HTTP1ConnectionProvider {

mutating func processNextWaiter() -> Action<ConnectionType> {
if let waiter = self.waiters.popFirst() {
let (eventLoop, required) = self.resolvePreference(waiter.preference)

// If specific EL is required, we have only two options - find open one or create a new one
if required, let found = self.availableConnections.firstIndex(where: { $0.eventLoop === eventLoop }) {
let connection = self.availableConnections.remove(at: found)
self.leasedConnections.insert(ConnectionKey(connection))
return .lease(connection, waiter)
} else if !required, let connection = self.availableConnections.popFirst() {
self.leasedConnections.insert(ConnectionKey(connection))
return .lease(connection, waiter)
} else {
self.openedConnectionsCount += 1
return .create(waiter)
}
// There should be no case where we have waiters and available connections at the same time.
//
// This method is called in following cases:
//
// 1. from `release` when connection is inactive and cannot be re-used
// 2. from `connectFailed` when we failed to establish a new connection
// 3. from `remoteClose` when connection was closed by the remote side and cannot be re-used
// 4. from `timeout` when connection was closed due to idle timeout and cannot be re-used.
//
// In all cases connection, which triggered the transition, will not be in `available` state.
//
// Given that the waiter can only be present in the pool if there were no available connections
// (otherwise it had been leased a connection immediately on getting the connection), we do not
// see a situation when we can lease another available connection, therefore the only course
// of action is to create a new connection for the waiter.
assert(self.availableConnections.isEmpty)

self.openedConnectionsCount += 1
return .create(waiter)
}

// if capacity is at max and the are no waiters and no in-flight requests for connection, we are closing this provider
Expand Down
59 changes: 59 additions & 0 deletions Sources/AsyncHTTPClient/Utils.swift
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,65 @@ extension NIOClientTCPBootstrap {
return channelAddedFuture
}
}

static func makeHTTP1Channel(destination: ConnectionPool.Key, eventLoop: EventLoop, configuration: HTTPClient.Configuration, preference: HTTPClient.EventLoopPreference) -> EventLoopFuture<Channel> {
let channelEventLoop = preference.bestEventLoop ?? eventLoop

let key = destination

let requiresTLS = key.scheme.requiresTLS
let bootstrap: NIOClientTCPBootstrap
do {
bootstrap = try NIOClientTCPBootstrap.makeHTTPClientBootstrapBase(on: channelEventLoop, host: key.host, port: key.port, requiresTLS: requiresTLS, configuration: configuration)
} catch {
return channelEventLoop.makeFailedFuture(error)
}

let channel: EventLoopFuture<Channel>
switch key.scheme {
case .http, .https:
let address = HTTPClient.resolveAddress(host: key.host, port: key.port, proxy: configuration.proxy)
channel = bootstrap.connect(host: address.host, port: address.port)
case .unix, .http_unix, .https_unix:
channel = bootstrap.connect(unixDomainSocketPath: key.unixPath)
}

return channel.flatMap { channel in
let requiresSSLHandler = configuration.proxy != nil && key.scheme.requiresTLS
let handshakePromise = channel.eventLoop.makePromise(of: Void.self)

channel.pipeline.addSSLHandlerIfNeeded(for: key, tlsConfiguration: configuration.tlsConfiguration, addSSLClient: requiresSSLHandler, handshakePromise: handshakePromise)

return handshakePromise.futureResult.flatMap {
channel.pipeline.addHTTPClientHandlers(leftOverBytesStrategy: .forwardBytes)
}.flatMap {
#if canImport(Network)
if #available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *), bootstrap.underlyingBootstrap is NIOTSConnectionBootstrap {
return channel.pipeline.addHandler(HTTPClient.NWErrorHandler(), position: .first)
}
#endif
return channel.eventLoop.makeSucceededFuture(())
}.flatMap {
switch configuration.decompression {
case .disabled:
return channel.eventLoop.makeSucceededFuture(())
case .enabled(let limit):
let decompressHandler = NIOHTTPResponseDecompressor(limit: limit)
return channel.pipeline.addHandler(decompressHandler)
}
}.map {
channel
}
}.flatMapError { error in
#if canImport(Network)
var error = error
if #available(OSX 10.14, iOS 12.0, tvOS 12.0, watchOS 6.0, *), bootstrap.underlyingBootstrap is NIOTSConnectionBootstrap {
error = HTTPClient.NWErrorHandler.translateError(error)
}
#endif
return channelEventLoop.makeFailedFuture(error)
}
}
}

extension Connection {
Expand Down
8 changes: 0 additions & 8 deletions Tests/AsyncHTTPClientTests/ConnectionPoolTests+XCTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,10 @@ extension ConnectionPoolTests {
("testReleaseInactiveConnectionEmptyQueueHasConnections", testReleaseInactiveConnectionEmptyQueueHasConnections),
("testReleaseAliveConnectionHasWaiter", testReleaseAliveConnectionHasWaiter),
("testReleaseInactiveConnectionHasWaitersNoConnections", testReleaseInactiveConnectionHasWaitersNoConnections),
("testReleaseInactiveConnectionHasWaitersHasConnections", testReleaseInactiveConnectionHasWaitersHasConnections),
("testReleaseAliveConnectionSameELHasWaiterSpecificEL", testReleaseAliveConnectionSameELHasWaiterSpecificEL),
("testReleaseAliveConnectionDifferentELNoSameELConnectionsHasWaiterSpecificEL", testReleaseAliveConnectionDifferentELNoSameELConnectionsHasWaiterSpecificEL),
("testReleaseAliveConnectionDifferentELHasSameELConnectionsHasWaiterSpecificEL", testReleaseAliveConnectionDifferentELHasSameELConnectionsHasWaiterSpecificEL),
("testReleaseAliveConnectionDifferentELNoSameELConnectionsOnLimitHasWaiterSpecificEL", testReleaseAliveConnectionDifferentELNoSameELConnectionsOnLimitHasWaiterSpecificEL),
("testReleaseInactiveConnectionHasWaitersHasSameELConnectionsSpecificEL", testReleaseInactiveConnectionHasWaitersHasSameELConnectionsSpecificEL),
("testReleaseInactiveConnectionHasWaitersNoSameELConnectionsSpecificEL", testReleaseInactiveConnectionHasWaitersNoSameELConnectionsSpecificEL),
("testNextWaiterEmptyQueue", testNextWaiterEmptyQueue),
("testNextWaiterEmptyQueueHasConnections", testNextWaiterEmptyQueueHasConnections),
("testNextWaiterHasWaitersHasConnections", testNextWaiterHasWaitersHasConnections),
("testNextWaiterHasWaitersHasSameELConnectionsSpecificEL", testNextWaiterHasWaitersHasSameELConnectionsSpecificEL),
("testNextWaiterHasWaitersHasDifferentELConnectionsSpecificEL", testNextWaiterHasWaitersHasDifferentELConnectionsSpecificEL),
("testTimeoutLeasedConnection", testTimeoutLeasedConnection),
("testTimeoutAvailableConnection", testTimeoutAvailableConnection),
("testRemoteClosedLeasedConnection", testRemoteClosedLeasedConnection),
Expand Down
Loading