Skip to content

Crash fix: HTTP2Connections emit events after the pool has closed them. #481

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ extension HTTPConnectionPool {
)
}

// MARK: - Events
// MARK: - Events -

mutating func executeRequest(_ request: Request) -> Action {
switch self.state {
Expand Down Expand Up @@ -519,16 +519,20 @@ extension HTTPConnectionPool {
// MARK: HTTP2

mutating func newHTTP2MaxConcurrentStreamsReceived(_ connectionID: Connection.ID, newMaxStreams: Int) -> Action {
// It is save to bang the http2Connections here. If we get this callback but we don't have
// http2 connections something has gone terribly wrong.
_ = self.http2Connections!.newHTTP2MaxConcurrentStreamsReceived(connectionID, newMaxStreams: newMaxStreams)
// The `http2Connections` are optional here:
// Connections report events back to us, if they are in a shutdown that was
// initiated by the state machine. For this reason this callback might be invoked
Comment on lines +523 to +524
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't quite make sense to me. I think you're saying that we can still receive events after shutdown was initiated by the state machine, and when that happens http2Connections will be nil and we can safely ignore the event?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it'd be worth adding a comment where http2Connections is declared on L31 saying when we expect it to be nil.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also wonder if http2Connections ever needs to be nil -- presumably we can just remove all connections from it?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the only reason why we made it optional was to free some memory early. I think it would make the code clearer if we make it non-optional e.g.: after we have closed a connection we set it to nil if it is empty.

if self.http2Connections?.isEmpty == true {
self.http2Connections = nil
}

Instead we could just check if it is empty in all places where we currently check that it is nil e.g. during shutdown and make http2Connections non-optional.

The same is true in the HTTP2StateMachine for http1Connections.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dnadoba will fix this in a follow up PR.

// even though all references to HTTP2Connections have already been cleared.
_ = self.http2Connections?.newHTTP2MaxConcurrentStreamsReceived(connectionID, newMaxStreams: newMaxStreams)
return .none
}

mutating func http2ConnectionGoAwayReceived(_ connectionID: Connection.ID) -> Action {
// It is save to bang the http2Connections here. If we get this callback but we don't have
// http2 connections something has gone terribly wrong.
_ = self.http2Connections!.goAwayReceived(connectionID)
// The `http2Connections` are optional here:
// Connections report events back to us, if they are in a shutdown that was
// initiated by the state machine. For this reason this callback might be invoked
// even though all references to HTTP2Connections have already been cleared.
_ = self.http2Connections?.goAwayReceived(connectionID)
return .none
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -523,9 +523,14 @@ extension HTTPConnectionPool {
/// Sets the connection with the given `connectionId` to the draining state.
/// - Returns: the `EventLoop` to create a new connection on if applicable
/// - Precondition: connection with given `connectionId` must be either `.active` or already in the `.draining` state
mutating func goAwayReceived(_ connectionID: Connection.ID) -> GoAwayContext {
mutating func goAwayReceived(_ connectionID: Connection.ID) -> GoAwayContext? {
guard let index = self.connections.firstIndex(where: { $0.connectionID == connectionID }) else {
preconditionFailure("go away recieved for a connection that does not exists")
// When a connection close is initiated by the connection pool (e.g. because the
// connection was idle for too long), the connection will still report further
// events to the state machine even though we don't care about its state anymore.
//
// This is because the HTTP2Connection has a strong let reference to its delegate.
return nil
}
let eventLoop = self.connections[index].goAwayReceived()
return GoAwayContext(eventLoop: eventLoop)
Expand All @@ -540,9 +545,13 @@ extension HTTPConnectionPool {
mutating func newHTTP2MaxConcurrentStreamsReceived(
_ connectionID: Connection.ID,
newMaxStreams: Int
) -> (Int, EstablishedConnectionContext) {
) -> (Int, EstablishedConnectionContext)? {
guard let index = self.connections.firstIndex(where: { $0.connectionID == connectionID }) else {
preconditionFailure("We tried to update the maximum number of concurrent streams for a connection that does not exists")
// When a connection close is initiated by the connection pool (e.g. because the
// connection was idle for too long), the connection will still report its events to
// the state machine and hence to this `HTTP2Connections` struct. In those cases we
// must ignore the event.
return nil
}
let availableStreams = self.connections[index].newMaxConcurrentStreams(newMaxStreams)
let context = EstablishedConnectionContext(
Expand Down Expand Up @@ -661,8 +670,10 @@ extension HTTPConnectionPool {

mutating func failConnection(_ connectionID: Connection.ID) -> (Int, FailedConnectionContext)? {
guard let index = self.connections.firstIndex(where: { $0.connectionID == connectionID }) else {
/// When a connection close is initiated by the connection pool (e.g. because the connection was idle for too long), the connection will
/// still report its close to the state machine and then to us. In those cases we must ignore the event.
// When a connection close is initiated by the connection pool (e.g. because the
// connection was idle for too long), the connection will still report its close to
// the state machine and then to this `HTTP2Connections` struct. In those cases we
// must ignore the event.
return nil
}
self.connections[index].fail()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,12 +293,22 @@ extension HTTPConnectionPool {
}

mutating func newHTTP2MaxConcurrentStreamsReceived(_ connectionID: Connection.ID, newMaxStreams: Int) -> Action {
let (index, context) = self.connections.newHTTP2MaxConcurrentStreamsReceived(connectionID, newMaxStreams: newMaxStreams)
guard let (index, context) = self.connections.newHTTP2MaxConcurrentStreamsReceived(connectionID, newMaxStreams: newMaxStreams) else {
// When a connection close is initiated by the connection pool, the connection will
// still report further events (like newMaxConcurrentStreamsReceived) to the state
// machine. In those cases we must ignore the event.
return .none
}
return .init(self.nextActionForAvailableConnection(at: index, context: context))
}

mutating func http2ConnectionGoAwayReceived(_ connectionID: Connection.ID) -> Action {
let context = self.connections.goAwayReceived(connectionID)
guard let context = self.connections.goAwayReceived(connectionID) else {
// When a connection close is initiated by the connection pool, the connection will
// still report further events (like GOAWAY received) to the state machine. In those
// cases we must ignore the event.
return .none
}
return self.nextActionForClosingConnection(on: context.eventLoop)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ extension HTTPConnectionPool_HTTP2ConnectionsTests {
("testLeasingAllConnections", testLeasingAllConnections),
("testGoAway", testGoAway),
("testNewMaxConcurrentStreamsSetting", testNewMaxConcurrentStreamsSetting),
("testEventsAfterConnectionIsClosed", testEventsAfterConnectionIsClosed),
("testLeaseOnPreferredEventLoopWithoutAnyAvailable", testLeaseOnPreferredEventLoopWithoutAnyAvailable),
("testMigrationFromHTTP1", testMigrationFromHTTP1),
("testMigrationToHTTP1", testMigrationToHTTP1),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ class HTTPConnectionPool_HTTP2ConnectionsTests: XCTestCase {
XCTAssertEqual(leasedConn1, conn1)
XCTAssertEqual(leasdConnContext1.wasIdle, true)

XCTAssertTrue(connections.goAwayReceived(conn1ID).eventLoop === el1)
XCTAssertTrue(connections.goAwayReceived(conn1ID)?.eventLoop === el1)

XCTAssertEqual(
connections.stats,
Expand All @@ -389,7 +389,7 @@ class HTTPConnectionPool_HTTP2ConnectionsTests: XCTestCase {
XCTAssertNil(connections.leaseStream(onRequired: el1), "we should not be able to lease a stream because the connection is draining")

// a server can potentially send more than one connection go away and we should not crash
XCTAssertTrue(connections.goAwayReceived(conn1ID).eventLoop === el1)
XCTAssertTrue(connections.goAwayReceived(conn1ID)?.eventLoop === el1)
XCTAssertEqual(
connections.stats,
.init(
Expand Down Expand Up @@ -454,7 +454,9 @@ class HTTPConnectionPool_HTTP2ConnectionsTests: XCTestCase {

XCTAssertNil(connections.leaseStream(onRequired: el1), "all streams are in use")

let (_, newSettingsContext1) = connections.newHTTP2MaxConcurrentStreamsReceived(conn1ID, newMaxStreams: 2)
guard let (_, newSettingsContext1) = connections.newHTTP2MaxConcurrentStreamsReceived(conn1ID, newMaxStreams: 2) else {
return XCTFail("Expected to get a new settings context")
}
XCTAssertEqual(newSettingsContext1.availableStreams, 1)
XCTAssertTrue(newSettingsContext1.eventLoop === el1)
XCTAssertFalse(newSettingsContext1.isIdle)
Expand All @@ -465,7 +467,9 @@ class HTTPConnectionPool_HTTP2ConnectionsTests: XCTestCase {
XCTAssertEqual(leasedConn2, conn1)
XCTAssertEqual(leaseContext2.wasIdle, false)

let (_, newSettingsContext2) = connections.newHTTP2MaxConcurrentStreamsReceived(conn1ID, newMaxStreams: 1)
guard let (_, newSettingsContext2) = connections.newHTTP2MaxConcurrentStreamsReceived(conn1ID, newMaxStreams: 1) else {
return XCTFail("Expected to get a new settings context")
}
XCTAssertEqual(newSettingsContext2.availableStreams, 0)
XCTAssertTrue(newSettingsContext2.eventLoop === el1)
XCTAssertFalse(newSettingsContext2.isIdle)
Expand All @@ -489,6 +493,41 @@ class HTTPConnectionPool_HTTP2ConnectionsTests: XCTestCase {
XCTAssertEqual(leaseContext3.wasIdle, true)
}

func testEventsAfterConnectionIsClosed() {
let elg = EmbeddedEventLoopGroup(loops: 2)
var connections = HTTPConnectionPool.HTTP2Connections(generator: .init())
let el1 = elg.next()

let conn1ID = connections.createNewConnection(on: el1)
let conn1: HTTPConnectionPool.Connection = .__testOnly_connection(id: conn1ID, eventLoop: el1)
let (conn1Index, conn1CreatedContext) = connections.newHTTP2ConnectionEstablished(conn1, maxConcurrentStreams: 1)
XCTAssertEqual(conn1CreatedContext.availableStreams, 1)

let (leasedConn1, leasdConnContext1) = connections.leaseStreams(at: conn1Index, count: 1)
XCTAssertEqual(leasedConn1, conn1)
XCTAssertEqual(leasdConnContext1.wasIdle, true)

XCTAssertNil(connections.leaseStream(onRequired: el1), "all streams are in use")

let (_, releaseContext) = connections.releaseStream(conn1ID)
XCTAssertTrue(releaseContext.eventLoop === el1)
XCTAssertEqual(releaseContext.availableStreams, 1)
XCTAssertEqual(releaseContext.connectionID, conn1ID)
XCTAssertEqual(releaseContext.isIdle, true)

// schedule timeout... this should remove the connection from http2Connections

XCTAssertEqual(connections.closeConnectionIfIdle(conn1ID), conn1)

// events race with the complete shutdown

XCTAssertNil(connections.newHTTP2MaxConcurrentStreamsReceived(conn1ID, newMaxStreams: 2))
XCTAssertNil(connections.goAwayReceived(conn1ID))

// finally close event
XCTAssertNil(connections.failConnection(conn1ID))
}

func testLeaseOnPreferredEventLoopWithoutAnyAvailable() {
let elg = EmbeddedEventLoopGroup(loops: 4)
var connections = HTTPConnectionPool.HTTP2Connections(generator: .init())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ extension HTTPConnectionPool_HTTP2StateMachineTests {
("testHTTP2toHTTP1Migration", testHTTP2toHTTP1Migration),
("testConnectionIsImmediatelyCreatedAfterBackoffTimerFires", testConnectionIsImmediatelyCreatedAfterBackoffTimerFires),
("testMaxConcurrentStreamsIsRespected", testMaxConcurrentStreamsIsRespected),
("testEventsAfterConnectionIsClosed", testEventsAfterConnectionIsClosed),
]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1088,4 +1088,23 @@ class HTTPConnectionPool_HTTP2StateMachineTests: XCTestCase {
XCTAssertNotNil(connections.randomParkedConnection())
XCTAssertEqual(connections.count, 1)
}

func testEventsAfterConnectionIsClosed() {
let elg = EmbeddedEventLoopGroup(loops: 2)
guard var (connections, state) = try? MockConnectionPool.http2(elg: elg, maxConcurrentStreams: 100) else {
return XCTFail("Test setup failed")
}

let connection = connections.randomParkedConnection()!
XCTAssertNoThrow(try connections.closeConnection(connection))

let idleTimeoutAction = state.connectionIdleTimeout(connection.id)
XCTAssertEqual(idleTimeoutAction.connection, .closeConnection(connection, isShutdown: .no))
XCTAssertEqual(idleTimeoutAction.request, .none)

XCTAssertEqual(state.newHTTP2MaxConcurrentStreamsReceived(connection.id, newMaxStreams: 50), .none)
XCTAssertEqual(state.http2ConnectionGoAwayReceived(connection.id), .none)

XCTAssertEqual(state.http2ConnectionClosed(connection.id), .none)
}
}