Skip to content

[HTTP2] Prepare migration actions #456

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Oct 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .swiftformat
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@
--patternlet inline
--stripunusedargs unnamed-only
--ranges nospace
--disable typeSugar # https://github.com/nicklockwood/SwiftFormat/issues/636
--disable typeSugar, andOperator # typeSugar: https://github.com/nicklockwood/SwiftFormat/issues/636

# rules
26 changes: 25 additions & 1 deletion Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ final class HTTPConnectionPool {
self.idleConnectionTimeout = clientConfiguration.connectionPool.idleTimeout

self._state = StateMachine(
eventLoopGroup: eventLoopGroup,
idGenerator: idGenerator,
maximumConcurrentHTTP1Connections: clientConfiguration.connectionPool.concurrentHTTP1ConnectionsPerHostSoftLimit
)
Expand Down Expand Up @@ -97,6 +96,10 @@ final class HTTPConnectionPool {
case createConnection(Connection.ID, on: EventLoop)
case closeConnection(Connection, isShutdown: StateMachine.ConnectionAction.IsShutdown)
case cleanupConnections(CleanupContext, isShutdown: StateMachine.ConnectionAction.IsShutdown)
case migration(
createConnections: [(Connection.ID, EventLoop)],
closeConnections: [Connection]
)
case none
}

Expand Down Expand Up @@ -184,6 +187,18 @@ final class HTTPConnectionPool {
self.locked.connection = .cancelBackoffTimers(cleanupContext.connectBackoff)
cleanupContext.connectBackoff = []
self.unlocked.connection = .cleanupConnections(cleanupContext, isShutdown: isShutdown)
case .migration(
let createConnections,
let closeConnections,
let scheduleTimeout
):
if let (connectionID, eventLoop) = scheduleTimeout {
self.locked.connection = .scheduleTimeoutTimer(connectionID, on: eventLoop)
}
self.unlocked.connection = .migration(
createConnections: createConnections,
closeConnections: closeConnections
)
case .none:
break
}
Expand Down Expand Up @@ -279,6 +294,15 @@ final class HTTPConnectionPool {
self.delegate.connectionPoolDidShutdown(self, unclean: unclean)
}

case .migration(let createConnections, let closeConnections):
for connection in closeConnections {
connection.close(promise: nil)
}

for (connectionID, eventLoop) in createConnections {
self.createConnection(connectionID, on: eventLoop)
}

case .none:
break
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,28 @@ extension HTTPConnectionPool {
return migrationContext
}

/// we only handle starting and backing off connection here.
/// All running connections must be handled by the enclosing state machine
/// - Parameters:
/// - starting: starting HTTP connections from previous state machine
/// - backingOff: backing off HTTP connections from previous state machine
mutating func migrateFromHTTP2(
starting: [(Connection.ID, EventLoop)],
backingOff: [(Connection.ID, EventLoop)]
) {
for (connectionID, eventLoop) in starting {
let newConnection = HTTP1ConnectionState(connectionID: connectionID, eventLoop: eventLoop)
self.connections.append(newConnection)
}

for (connectionID, eventLoop) in backingOff {
var backingOffConnection = HTTP1ConnectionState(connectionID: connectionID, eventLoop: eventLoop)
// TODO: Maybe we want to add a static init for backing off connections to HTTP1ConnectionState
backingOffConnection.failedToConnect()
self.connections.append(backingOffConnection)
}
}

// MARK: Shutdown

mutating func shutdown() -> CleanupContext {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@ extension HTTPConnectionPool {
}

typealias Action = HTTPConnectionPool.StateMachine.Action
typealias ConnectionMigrationAction = HTTPConnectionPool.StateMachine.ConnectionMigrationAction
typealias EstablishedAction = HTTPConnectionPool.StateMachine.EstablishedAction
typealias EstablishedConnectionAction = HTTPConnectionPool.StateMachine.EstablishedConnectionAction

private(set) var connections: HTTP1Connections
private(set) var http2Connections: HTTP2Connections?
private var failedConsecutiveConnectionAttempts: Int = 0
/// the error from the last connection creation
private var lastConnectFailure: Error?
Expand All @@ -41,6 +45,73 @@ extension HTTPConnectionPool {
self.requests = RequestQueue()
}

mutating func migrateFromHTTP2(
http2State: HTTP2StateMachine,
newHTTP1Connection: Connection
) -> Action {
self.migrateFromHTTP2(
http1Connections: http2State.http1Connections,
http2Connections: http2State.connections,
requests: http2State.requests,
newHTTP1Connection: newHTTP1Connection
)
}

mutating func migrateFromHTTP2(
http1Connections: HTTP1Connections? = nil,
http2Connections: HTTP2Connections,
requests: RequestQueue,
newHTTP1Connection: Connection
) -> Action {
let migrationAction = self.migrateConnectionsAndRequestsFromHTTP2(
http1Connections: http1Connections,
http2Connections: http2Connections,
requests: requests
)

let newConnectionAction = self._newHTTP1ConnectionEstablished(
newHTTP1Connection
)

return .init(
request: newConnectionAction.request,
connection: .combined(migrationAction, newConnectionAction.connection)
)
}

private mutating func migrateConnectionsAndRequestsFromHTTP2(
http1Connections: HTTP1Connections?,
http2Connections: HTTP2Connections,
requests: RequestQueue
) -> ConnectionMigrationAction {
precondition(self.connections.isEmpty)
precondition(self.http2Connections == nil)
precondition(self.requests.isEmpty)

if let http1Connections = http1Connections {
self.connections = http1Connections
}

var http2Connections = http2Connections
let migration = http2Connections.migrateToHTTP1()
self.connections.migrateFromHTTP2(
starting: migration.starting,
backingOff: migration.backingOff
)

if !http2Connections.isEmpty {
self.http2Connections = http2Connections
}

// TODO: Close all idle connections from context.close
// TODO: Start new http1 connections for pending requests
// TODO: Potentially cancel unneeded bootstraps (Needs cancellable ClientBootstrap)

self.requests = requests

return .init(closeConnections: [], createConnections: [])
}

// MARK: - Events

mutating func executeRequest(_ request: Request) -> Action {
Expand Down Expand Up @@ -137,6 +208,10 @@ extension HTTPConnectionPool {
}

mutating func newHTTP1ConnectionEstablished(_ connection: Connection) -> Action {
.init(self._newHTTP1ConnectionEstablished(connection))
}

private mutating func _newHTTP1ConnectionEstablished(_ connection: Connection) -> EstablishedAction {
self.failedConsecutiveConnectionAttempts = 0
self.lastConnectFailure = nil
let (index, context) = self.connections.newHTTP1ConnectionEstablished(connection)
Expand Down Expand Up @@ -210,7 +285,7 @@ extension HTTPConnectionPool {

mutating func http1ConnectionReleased(_ connectionID: Connection.ID) -> Action {
let (index, context) = self.connections.releaseConnection(connectionID)
return self.nextActionForIdleConnection(at: index, context: context)
return .init(self.nextActionForIdleConnection(at: index, context: context))
}

/// A connection has been unexpectedly closed
Expand Down Expand Up @@ -278,7 +353,7 @@ extension HTTPConnectionPool {
// If there aren't any more connections, everything is shutdown
let isShutdown: StateMachine.ConnectionAction.IsShutdown
let unclean = !(cleanupContext.cancel.isEmpty && waitingRequests.isEmpty)
if self.connections.isEmpty {
if self.connections.isEmpty && self.http2Connections == nil {
self.state = .shutDown
isShutdown = .yes(unclean: unclean)
} else {
Expand All @@ -299,7 +374,7 @@ extension HTTPConnectionPool {
private mutating func nextActionForIdleConnection(
at index: Int,
context: HTTP1Connections.IdleConnectionContext
) -> Action {
) -> EstablishedAction {
switch self.state {
case .running:
switch context.use {
Expand All @@ -311,7 +386,7 @@ extension HTTPConnectionPool {
case .shuttingDown(let unclean):
assert(self.requests.isEmpty)
let connection = self.connections.closeConnection(at: index)
if self.connections.isEmpty {
if self.connections.isEmpty && self.http2Connections == nil {
return .init(
request: .none,
connection: .closeConnection(connection, isShutdown: .yes(unclean: unclean))
Expand All @@ -330,7 +405,7 @@ extension HTTPConnectionPool {
private mutating func nextActionForIdleGeneralPurposeConnection(
at index: Int,
context: HTTP1Connections.IdleConnectionContext
) -> Action {
) -> EstablishedAction {
// 1. Check if there are waiting requests in the general purpose queue
if let request = self.requests.popFirst(for: nil) {
return .init(
Expand Down Expand Up @@ -359,7 +434,7 @@ extension HTTPConnectionPool {
private mutating func nextActionForIdleEventLoopConnection(
at index: Int,
context: HTTP1Connections.IdleConnectionContext
) -> Action {
) -> EstablishedAction {
// Check if there are waiting requests in the matching eventLoop queue
if let request = self.requests.popFirst(for: context.eventLoop) {
return .init(
Expand Down Expand Up @@ -398,7 +473,7 @@ extension HTTPConnectionPool {
case .shuttingDown(let unclean):
assert(self.requests.isEmpty)
self.connections.removeConnection(at: index)
if self.connections.isEmpty {
if self.connections.isEmpty && self.http2Connections == nil {
return .init(
request: .none,
connection: .cleanupConnections(.init(), isShutdown: .yes(unclean: unclean))
Expand Down Expand Up @@ -444,6 +519,99 @@ extension HTTPConnectionPool {
self.connections.removeConnection(at: index)
return .none
}

// MARK: HTTP2

mutating func newHTTP2MaxConcurrentStreamsReceived(_ connectionID: Connection.ID, newMaxStreams: Int) -> Action {
// It is save to bang the http2Connections here. If we get this callback but we don't have
// http2 connections something has gone terribly wrong.
_ = self.http2Connections!.newHTTP2MaxConcurrentStreamsReceived(connectionID, newMaxStreams: newMaxStreams)
return .none
}

mutating func http2ConnectionGoAwayReceived(_ connectionID: Connection.ID) -> Action {
// It is save to bang the http2Connections here. If we get this callback but we don't have
// http2 connections something has gone terribly wrong.
_ = self.http2Connections!.goAwayReceived(connectionID)
return .none
}

mutating func http2ConnectionClosed(_ connectionID: Connection.ID) -> Action {
switch self.state {
case .running:
_ = self.http2Connections?.failConnection(connectionID)
if self.http2Connections?.isEmpty == true {
self.http2Connections = nil
}
return .none

case .shuttingDown(let unclean):
assert(self.requests.isEmpty)
_ = self.http2Connections?.failConnection(connectionID)
if self.http2Connections?.isEmpty == true {
self.http2Connections = nil
}
if self.connections.isEmpty && self.http2Connections == nil {
return .init(
request: .none,
connection: .cleanupConnections(.init(), isShutdown: .yes(unclean: unclean))
)
}
return .init(
request: .none,
connection: .none
)

case .shutDown:
preconditionFailure("It the pool is already shutdown, all connections must have been torn down.")
}
}

mutating func http2ConnectionStreamClosed(_ connectionID: Connection.ID) -> Action {
// It is save to bang the http2Connections here. If we get this callback but we don't have
// http2 connections something has gone terribly wrong.
switch self.state {
case .running:
let (index, context) = self.http2Connections!.releaseStream(connectionID)
guard context.isIdle else {
return .none
}

let connection = self.http2Connections!.closeConnection(at: index)
if self.http2Connections!.isEmpty {
self.http2Connections = nil
}
return .init(
request: .none,
connection: .closeConnection(connection, isShutdown: .no)
)

case .shuttingDown(let unclean):
assert(self.requests.isEmpty)
let (index, context) = self.http2Connections!.releaseStream(connectionID)
guard context.isIdle else {
return .none
}

let connection = self.http2Connections!.closeConnection(at: index)
if self.http2Connections!.isEmpty {
self.http2Connections = nil
}
if self.connections.isEmpty && self.http2Connections == nil {
return .init(
request: .none,
connection: .closeConnection(connection, isShutdown: .yes(unclean: unclean))
)
}
return .init(
request: .none,
connection: .closeConnection(connection, isShutdown: .no)
)

case .shutDown:
preconditionFailure("It the pool is already shutdown, all connections must have been torn down.")
}
}
}
}

Expand Down
Loading