Skip to content

Make http1Connections and http2Connections non-optional #482

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -532,12 +532,21 @@ extension HTTPConnectionPool {

mutating func migrateToHTTP2() -> HTTP1ToHTTP2MigrationContext {
var migrationContext = HTTP1ToHTTP2MigrationContext()
self.connections.removeAll { connection in
switch connection.migrateToHTTP2(&migrationContext) {
let initialOverflowIndex = self.overflowIndex

self.connections = self.connections.enumerated().compactMap { index, connectionState in
switch connectionState.migrateToHTTP2(&migrationContext) {
case .removeConnection:
return true
// If the connection has an index smaller than the previous overflow index,
// we deal with a general purpose connection.
// For this reason we need to decrement the overflow index.
if index < initialOverflowIndex {
self.overflowIndex = self.connections.index(before: self.overflowIndex)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously we didn't update self.overflowIndex correctly which did hide some bugs because we rarely reused HTTP1Connections ever again after migration.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would love to have a comment like this here:

// If the connection has an index smaller than the previous overflow index, we deal with
// a general purpose connection. For this reason we need to decrement the overflow index.

Great catch!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also added to .shutdown() from where I have borrowed this code snippet.

}
return nil

case .keepConnection:
return false
return connectionState
}
}
return migrationContext
Expand Down Expand Up @@ -654,6 +663,9 @@ extension HTTPConnectionPool {
self.connections = self.connections.enumerated().compactMap { index, connectionState in
switch connectionState.cleanup(&cleanupContext) {
case .removeConnection:
// If the connection has an index smaller than the previous overflow index,
// we deal with a general purpose connection.
// For this reason we need to decrement the overflow index.
if index < initialOverflowIndex {
self.overflowIndex = self.connections.index(before: self.overflowIndex)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ extension HTTPConnectionPool {
typealias EstablishedConnectionAction = HTTPConnectionPool.StateMachine.EstablishedConnectionAction

private(set) var connections: HTTP1Connections
private(set) var http2Connections: HTTP2Connections?
private(set) var http2Connections: HTTP2Connections
private var failedConsecutiveConnectionAttempts: Int = 0
/// the error from the last connection creation
private var lastConnectFailure: Error?
Expand All @@ -41,12 +41,13 @@ extension HTTPConnectionPool {
maximumConcurrentConnections: maximumConcurrentConnections,
generator: idGenerator
)
self.http2Connections = HTTP2Connections(generator: idGenerator)

self.requests = RequestQueue()
}

mutating func migrateFromHTTP2(
http1Connections: HTTP1Connections? = nil,
http1Connections: HTTP1Connections,
http2Connections: HTTP2Connections,
requests: RequestQueue,
newHTTP1Connection: Connection
Expand All @@ -68,23 +69,19 @@ extension HTTPConnectionPool {
}

private mutating func migrateConnectionsAndRequestsFromHTTP2(
http1Connections: HTTP1Connections?,
http1Connections: HTTP1Connections,
http2Connections: HTTP2Connections,
requests: RequestQueue
) -> ConnectionMigrationAction {
precondition(self.connections.isEmpty, "expected an empty state machine but connections are not empty")
precondition(self.http2Connections == nil, "expected an empty state machine but http2Connections are not nil")
precondition(self.http2Connections.isEmpty, "expected an empty state machine but http2Connections are not nil")
precondition(self.requests.isEmpty, "expected an empty state machine but requests are not empty")

self.requests = requests
self.connections = http1Connections
self.http2Connections = http2Connections

// we may have remaining open http1 connections from a pervious migration to http2
if let http1Connections = http1Connections {
self.connections = http1Connections
}

var http2Connections = http2Connections
let migration = http2Connections.migrateToHTTP1()
let migration = self.http2Connections.migrateToHTTP1()

self.connections.migrateFromHTTP2(
starting: migration.starting,
Expand All @@ -96,12 +93,7 @@ extension HTTPConnectionPool {
generalPurposeRequestCountGroupedByPreferredEventLoop: requests.generalPurposeRequestCountGroupedByPreferredEventLoop()
)

if !http2Connections.isEmpty {
self.http2Connections = http2Connections
}

// TODO: Potentially cancel unneeded bootstraps (Needs cancellable ClientBootstrap)

return .init(
closeConnections: migration.close,
createConnections: createConnections
Expand Down Expand Up @@ -349,7 +341,7 @@ extension HTTPConnectionPool {
// If there aren't any more connections, everything is shutdown
let isShutdown: StateMachine.ConnectionAction.IsShutdown
let unclean = !(cleanupContext.cancel.isEmpty && waitingRequests.isEmpty)
if self.connections.isEmpty && self.http2Connections == nil {
if self.connections.isEmpty && self.http2Connections.isEmpty {
self.state = .shutDown
isShutdown = .yes(unclean: unclean)
} else {
Expand Down Expand Up @@ -382,7 +374,7 @@ extension HTTPConnectionPool {
case .shuttingDown(let unclean):
assert(self.requests.isEmpty)
let connection = self.connections.closeConnection(at: index)
if self.connections.isEmpty && self.http2Connections == nil {
if self.connections.isEmpty && self.http2Connections.isEmpty {
return .init(
request: .none,
connection: .closeConnection(connection, isShutdown: .yes(unclean: unclean))
Expand Down Expand Up @@ -469,7 +461,7 @@ extension HTTPConnectionPool {
case .shuttingDown(let unclean):
assert(self.requests.isEmpty)
self.connections.removeConnection(at: index)
if self.connections.isEmpty && self.http2Connections == nil {
if self.connections.isEmpty && self.http2Connections.isEmpty {
return .init(
request: .none,
connection: .cleanupConnections(.init(), isShutdown: .yes(unclean: unclean))
Expand Down Expand Up @@ -519,39 +511,25 @@ extension HTTPConnectionPool {
// MARK: HTTP2

mutating func newHTTP2MaxConcurrentStreamsReceived(_ connectionID: Connection.ID, newMaxStreams: Int) -> Action {
// The `http2Connections` are optional here:
// Connections report events back to us, if they are in a shutdown that was
// initiated by the state machine. For this reason this callback might be invoked
// even though all references to HTTP2Connections have already been cleared.
_ = self.http2Connections?.newHTTP2MaxConcurrentStreamsReceived(connectionID, newMaxStreams: newMaxStreams)
_ = self.http2Connections.newHTTP2MaxConcurrentStreamsReceived(connectionID, newMaxStreams: newMaxStreams)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove comment above.

return .none
}

mutating func http2ConnectionGoAwayReceived(_ connectionID: Connection.ID) -> Action {
// The `http2Connections` are optional here:
// Connections report events back to us, if they are in a shutdown that was
// initiated by the state machine. For this reason this callback might be invoked
// even though all references to HTTP2Connections have already been cleared.
_ = self.http2Connections?.goAwayReceived(connectionID)
_ = self.http2Connections.goAwayReceived(connectionID)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove comment above.

return .none
}

mutating func http2ConnectionClosed(_ connectionID: Connection.ID) -> Action {
switch self.state {
case .running:
_ = self.http2Connections?.failConnection(connectionID)
if self.http2Connections?.isEmpty == true {
self.http2Connections = nil
}
_ = self.http2Connections.failConnection(connectionID)
return .none

case .shuttingDown(let unclean):
assert(self.requests.isEmpty)
_ = self.http2Connections?.failConnection(connectionID)
if self.http2Connections?.isEmpty == true {
self.http2Connections = nil
}
if self.connections.isEmpty && self.http2Connections == nil {
_ = self.http2Connections.failConnection(connectionID)
if self.connections.isEmpty && self.http2Connections.isEmpty {
return .init(
request: .none,
connection: .cleanupConnections(.init(), isShutdown: .yes(unclean: unclean))
Expand All @@ -568,36 +546,29 @@ extension HTTPConnectionPool {
}

mutating func http2ConnectionStreamClosed(_ connectionID: Connection.ID) -> Action {
// It is save to bang the http2Connections here. If we get this callback but we don't have
// http2 connections something has gone terribly wrong.
switch self.state {
case .running:
let (index, context) = self.http2Connections!.releaseStream(connectionID)
let (index, context) = self.http2Connections.releaseStream(connectionID)
guard context.isIdle else {
return .none
}

let connection = self.http2Connections!.closeConnection(at: index)
if self.http2Connections!.isEmpty {
self.http2Connections = nil
}
let connection = self.http2Connections.closeConnection(at: index)
return .init(
request: .none,
connection: .closeConnection(connection, isShutdown: .no)
)

case .shuttingDown(let unclean):
assert(self.requests.isEmpty)
let (index, context) = self.http2Connections!.releaseStream(connectionID)
let (index, context) = self.http2Connections.releaseStream(connectionID)
guard context.isIdle else {
return .none
}

let connection = self.http2Connections!.closeConnection(at: index)
if self.http2Connections!.isEmpty {
self.http2Connections = nil
}
if self.connections.isEmpty && self.http2Connections == nil {
let connection = self.http2Connections.closeConnection(at: index)

if self.connections.isEmpty && self.http2Connections.isEmpty {
return .init(
request: .none,
connection: .closeConnection(connection, isShutdown: .yes(unclean: unclean))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,36 @@ extension HTTPConnectionPool {
case shutDown
}

static func migrateFromHTTP1(
idGenerator: Connection.ID.Generator,
http1Connections: HTTP1Connections,
http2Connections: HTTP2Connections,
requests: RequestQueue,
newHTTP2Connection: Connection,
maxConcurrentStreams: Int
) -> (Self, Action) {
var http2StateMachine = HTTP2StateMachine(
idGenerator: idGenerator,
// To reduce allocations we set `maximumConcurrentHTTP1Connections` to `0`, because
// `.migrateFromHTTP1(...)` will replace the `http1Connections` struct anyway
maximumConcurrentHTTP1Connections: 0
)
let migrationAction = http2StateMachine.migrateFromHTTP1(
http1Connections: http1Connections,
http2Connections: http2Connections,
requests: requests,
newHTTP2Connection: newHTTP2Connection,
maxConcurrentStreams: maxConcurrentStreams
)

return (http2StateMachine, migrationAction)
}

private var lastConnectFailure: Error?
private var failedConsecutiveConnectionAttempts = 0

private(set) var connections: HTTP2Connections
private(set) var http1Connections: HTTP1Connections?
private(set) var http1Connections: HTTP1Connections

private(set) var requests: RequestQueue

Expand All @@ -41,17 +66,22 @@ extension HTTPConnectionPool {
private var state: State = .running

init(
idGenerator: Connection.ID.Generator
idGenerator: Connection.ID.Generator,
maximumConcurrentHTTP1Connections: Int
) {
self.idGenerator = idGenerator
self.requests = RequestQueue()

self.connections = HTTP2Connections(generator: idGenerator)
self.http1Connections = HTTP1Connections(
maximumConcurrentConnections: maximumConcurrentHTTP1Connections,
generator: idGenerator
)
Comment on lines +76 to +79
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I bet we reserve an array within this initializer. Maybe we want to add another initializer here, that really creates an empty HTTP1Connections. The struct is replaced in the migration anyway.

The constructor should set maximumConcurrentConnections to a default value. Then we don't need to pass it in the HTTP2 constructor.

}

mutating func migrateFromHTTP1(
http1Connections: HTTP1Connections,
http2Connections: HTTP2Connections? = nil,
http2Connections: HTTP2Connections,
requests: RequestQueue,
newHTTP2Connection: Connection,
maxConcurrentStreams: Int
Expand Down Expand Up @@ -79,18 +109,18 @@ extension HTTPConnectionPool {
requests: RequestQueue
) -> ConnectionMigrationAction {
precondition(self.connections.isEmpty, "expected an empty state machine but connections are not empty")
precondition(self.http1Connections == nil, "expected an empty state machine but http1Connections are not nil")
precondition(self.http1Connections.isEmpty, "expected an empty state machine but http1Connections is not empty")
precondition(self.requests.isEmpty, "expected an empty state machine but requests are not empty")

self.requests = requests
self.http1Connections = http1Connections

// we may have remaining open http2 connections from a pervious migration to http1
if let http2Connections = http2Connections {
self.connections = http2Connections
}

var http1Connections = http1Connections // make http1Connections mutable
let context = http1Connections.migrateToHTTP2()
let context = self.http1Connections.migrateToHTTP2()
self.connections.migrateFromHTTP1(
starting: context.starting,
backingOff: context.backingOff
Expand All @@ -100,10 +130,6 @@ extension HTTPConnectionPool {
requiredEventLoopsOfPendingRequests: requests.eventLoopsWithPendingRequests()
)

if !http1Connections.isEmpty {
self.http1Connections = http1Connections
}

// TODO: Potentially cancel unneeded bootstraps (Needs cancellable ClientBootstrap)
return .init(
closeConnections: context.close,
Expand Down Expand Up @@ -277,7 +303,7 @@ extension HTTPConnectionPool {
}

let connection = self.connections.closeConnection(at: index)
if self.http1Connections == nil, self.connections.isEmpty {
if self.http1Connections.isEmpty, self.connections.isEmpty {
return .init(
request: .none,
connection: .closeConnection(connection, isShutdown: .yes(unclean: unclean))
Expand Down Expand Up @@ -472,18 +498,16 @@ extension HTTPConnectionPool {
}

mutating func http1ConnectionClosed(_ connectionID: Connection.ID) -> Action {
guard let index = self.http1Connections?.failConnection(connectionID)?.0 else {
guard let index = self.http1Connections.failConnection(connectionID)?.0 else {
return .none
}
self.http1Connections!.removeConnection(at: index)
if self.http1Connections!.isEmpty {
self.http1Connections = nil
}
self.http1Connections.removeConnection(at: index)

switch self.state {
case .running:
return .none
case .shuttingDown(let unclean):
if self.http1Connections == nil, self.connections.isEmpty {
if self.http1Connections.isEmpty && self.connections.isEmpty {
return .init(
request: .none,
connection: .cleanupConnections(.init(), isShutdown: .yes(unclean: unclean))
Expand All @@ -497,17 +521,13 @@ extension HTTPConnectionPool {
}

mutating func http1ConnectionReleased(_ connectionID: Connection.ID) -> Action {
// It is save to bang the http1Connections here. If we get this callback but we don't have
// http1 connections something has gone terribly wrong.
let (index, _) = self.http1Connections!.releaseConnection(connectionID)
let (index, _) = self.http1Connections.releaseConnection(connectionID)
// Any http1 connection that becomes idle should be closed right away after the transition
// to http2.
let connection = self.http1Connections!.closeConnection(at: index)
guard self.http1Connections!.isEmpty else {
let connection = self.http1Connections.closeConnection(at: index)
guard self.http1Connections.isEmpty else {
return .init(request: .none, connection: .closeConnection(connection, isShutdown: .no))
}
// if there are no more http1Connections, we can remove the struct.
self.http1Connections = nil

// we must also check, if we are shutting down. Was this maybe out last connection?
switch self.state {
Expand Down Expand Up @@ -540,8 +560,8 @@ extension HTTPConnectionPool {

// If there aren't any more connections, everything is shutdown
let isShutdown: StateMachine.ConnectionAction.IsShutdown
let unclean = !(cleanupContext.cancel.isEmpty && waitingRequests.isEmpty && self.http1Connections == nil)
if self.connections.isEmpty && self.http1Connections == nil {
let unclean = !(cleanupContext.cancel.isEmpty && waitingRequests.isEmpty && self.http1Connections.isEmpty)
if self.connections.isEmpty && self.http1Connections.isEmpty {
isShutdown = .yes(unclean: unclean)
self.state = .shutDown
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,8 @@ extension HTTPConnectionPool {
switch self.state {
case .http1(let http1StateMachine):

var http2StateMachine = HTTP2StateMachine(
idGenerator: self.idGenerator
)
let migrationAction = http2StateMachine.migrateFromHTTP1(
let (http2StateMachine, migrationAction) = HTTP2StateMachine.migrateFromHTTP1(
idGenerator: self.idGenerator,
http1Connections: http1StateMachine.connections,
http2Connections: http1StateMachine.http2Connections,
requests: http1StateMachine.requests,
Expand Down
Loading