Skip to content

Implement asynchronous shutdown #183

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Mar 30, 2020
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 29 additions & 25 deletions Sources/AsyncHTTPClient/ConnectionPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -93,20 +93,23 @@ final class ConnectionPool {
}
}

func prepareForClose() {
let connectionProviders = self.connectionProvidersLock.withLock { self.connectionProviders.values }
for connectionProvider in connectionProviders {
connectionProvider.prepareForClose()
func prepareForClose(on eventLoop: EventLoop) -> EventLoopFuture<Void> {
let connectionProviders = self.connectionProvidersLock.withLock {
self.connectionProviders.values
}

return EventLoopFuture<Void>.andAllComplete(connectionProviders.map { $0.prepareForClose() }, on: eventLoop)
}

func syncClose() {
let connectionProviders = self.connectionProvidersLock.withLock { self.connectionProviders.values }
for connectionProvider in connectionProviders {
connectionProvider.syncClose()
func close(on eventLoop: EventLoop) -> EventLoopFuture<Void> {
let connectionProviders = self.connectionProvidersLock.withLock {
self.connectionProviders.values
}
self.connectionProvidersLock.withLock {
assert(self.connectionProviders.count == 0, "left-overs: \(self.connectionProviders)")

return EventLoopFuture.andAllComplete(connectionProviders.map { $0.close() }, on: eventLoop).map {
self.connectionProvidersLock.withLock {
assert(self.connectionProviders.count == 0, "left-overs: \(self.connectionProviders)")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not 100% sure: if the check is delayed there might be cases where the assertion would have triggered but does not anymore. Even if true, I don't know how much important it is because the order/synchronization of the close actions will need to be improved anyway as we are already aware of issues linked to this in #175 and #176

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I'll look into that when I'll get to those issues, thanks!

}
}
}

Expand Down Expand Up @@ -448,9 +451,7 @@ final class ConnectionPool {
}

/// Removes and fails all `waiters`, remove existing `availableConnections` and sets `state.activity` to `.closing`
func prepareForClose() {
assert(MultiThreadedEventLoopGroup.currentEventLoop == nil,
"HTTPClient shutdown on EventLoop unsupported") // calls .wait() so it would crash later anyway
func prepareForClose() -> EventLoopFuture<Void> {
let (waitersFutures, closeFutures) = self.stateLock.withLock { () -> ([EventLoopFuture<Connection>], [EventLoopFuture<Void>]) in
// Fail waiters
let waitersCopy = self.state.waiters
Expand All @@ -461,26 +462,29 @@ final class ConnectionPool {
let closeFutures = self.state.availableConnections.map { $0.close() }
return (waitersFutures, closeFutures)
}
try? EventLoopFuture<Connection>.andAllComplete(waitersFutures, on: self.eventLoop).wait()
try? EventLoopFuture<Void>.andAllComplete(closeFutures, on: self.eventLoop).wait()

self.stateLock.withLock {
if self.state.leased == 0, self.state.availableConnections.isEmpty {
self.state.activity = .closed
} else {
self.state.activity = .closing
return EventLoopFuture<Connection>.andAllComplete(waitersFutures, on: self.eventLoop)
.flatMap {
EventLoopFuture<Void>.andAllComplete(closeFutures, on: self.eventLoop)
}
.map { _ in
self.stateLock.withLock {
if self.state.leased == 0, self.state.availableConnections.isEmpty {
self.state.activity = .closed
} else {
self.state.activity = .closing
}
}
}
}
}

func syncClose() {
assert(MultiThreadedEventLoopGroup.currentEventLoop == nil,
"HTTPClient shutdown on EventLoop unsupported") // calls .wait() so it would crash later anyway
func close() -> EventLoopFuture<Void> {
let availableConnections = self.stateLock.withLock { () -> CircularBuffer<ConnectionPool.Connection> in
assert(self.state.activity == .closing)
return self.state.availableConnections
}
try? EventLoopFuture<Void>.andAllComplete(availableConnections.map { $0.close() }, on: self.eventLoop).wait()

return EventLoopFuture<Void>.andAllComplete(availableConnections.map { $0.close() }, on: self.eventLoop)
}

private func activityPrecondition(expected: Set<State.Activity>) {
Expand Down
115 changes: 83 additions & 32 deletions Sources/AsyncHTTPClient/HTTPClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -93,54 +93,105 @@ public class HTTPClient {
/// this indicate shutdown was called too early before tasks were completed or explicitly canceled.
/// In general, setting this parameter to `true` should make it easier and faster to catch related programming errors.
internal func syncShutdown(requiresCleanClose: Bool) throws {
var closeError: Error?

let tasks = try self.stateLock.withLock { () -> Dictionary<UUID, TaskProtocol>.Values in
if self.state != .upAndRunning {
throw HTTPClientError.alreadyShutdown
if let eventLoop = MultiThreadedEventLoopGroup.currentEventLoop {
preconditionFailure("""
BUG DETECTED: syncShutdown() must not be called when on an EventLoop.
Calling syncShutdown() on any EventLoop can lead to deadlocks.
Current eventLoop: \(eventLoop)
""")
}
let errorStorageLock = Lock()
var errorStorage: Error?
let continuation = DispatchWorkItem {}
self.shutdown(requiresCleanClose: requiresCleanClose, queue: .global()) { error in
if let error = error {
errorStorageLock.withLock {
errorStorage = error
}
}
self.state = .shuttingDown
return self.tasks.values
continuation.perform()
}
continuation.wait()
try errorStorageLock.withLock {
if let error = errorStorage {
throw error
}
}
}

self.pool.prepareForClose()
/// Shuts down the client and event loop gracefully. This function is clearly an outlier in that it uses a completion
/// callback instead of an EventLoopFuture. The reason for that is that NIO's EventLoopFutures will call back on an event loop.
/// The virtue of this function is to shut the event loop down. To work around that we call back on a DispatchQueue
/// instead.
public func shutdown(_ callback: @escaping (Error?) -> Void) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don’t default to .global and take a DispatchQueue instead. A user who wants async shutdown will know what queue they want to be called on and it’s very likely not going to be `.global

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, one question is though: should EventLoopGroup do the same?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(done)

self.shutdown(requiresCleanClose: false, queue: .global(), callback)
}

if !tasks.isEmpty, requiresCleanClose {
closeError = HTTPClientError.uncleanShutdown
}
public func shutdown(queue: DispatchQueue, _ callback: @escaping (Error?) -> Void) {
self.shutdown(requiresCleanClose: false, queue: queue, callback)
}

private func cancelTasks(_ tasks: Dictionary<UUID, TaskProtocol>.Values) -> EventLoopFuture<Void> {
for task in tasks {
task.cancel()
}

try? EventLoopFuture.andAllComplete((tasks.map { $0.completion }), on: self.eventLoopGroup.next()).wait()

self.pool.syncClose()
return EventLoopFuture.andAllComplete(tasks.map { $0.completion }, on: self.eventLoopGroup.next())
}

do {
try self.stateLock.withLock {
switch self.eventLoopGroupProvider {
case .shared:
private func shutdownEventLoop(queue: DispatchQueue, _ callback: @escaping (Error?) -> Void) {
self.stateLock.withLock {
switch self.eventLoopGroupProvider {
case .shared:
self.state = .shutDown
callback(nil)
case .createNew:
switch self.state {
case .shuttingDown:
self.state = .shutDown
return
case .createNew:
switch self.state {
case .shuttingDown:
self.state = .shutDown
try self.eventLoopGroup.syncShutdownGracefully()
case .shutDown, .upAndRunning:
assertionFailure("The only valid state at this point is \(State.shutDown)")
}
self.eventLoopGroup.shutdownGracefully(queue: queue, callback)
case .shutDown, .upAndRunning:
assertionFailure("The only valid state at this point is \(State.shutDown)")
}
}
} catch {
if closeError == nil {
closeError = error
}
}

private func shutdown(requiresCleanClose: Bool, queue: DispatchQueue, _ callback: @escaping (Error?) -> Void) {
let result: Result<Dictionary<UUID, TaskProtocol>.Values, Error> = self.stateLock.withLock {
if self.state != .upAndRunning {
return .failure(HTTPClientError.alreadyShutdown)
} else {
self.state = .shuttingDown
return .success(self.tasks.values)
}
}

if let closeError = closeError {
throw closeError
switch result {
case .failure(let error):
callback(error)
case .success(let tasks):
self.pool.prepareForClose(on: self.eventLoopGroup.next()).whenComplete { _ in
var closeError: Error?
if !tasks.isEmpty, requiresCleanClose {
closeError = HTTPClientError.uncleanShutdown
}

// we ignore errors here
self.cancelTasks(tasks).whenComplete { _ in
// we ignore errors here
self.pool.close(on: self.eventLoopGroup.next()).whenComplete { _ in
self.shutdownEventLoop(queue: queue) { eventLoopError in
// we prioritise .uncleanShutdown here
if let error = closeError {
callback(error)
} else {
callback(eventLoopError)
}
}
}
}
}
}
}

Expand Down
1 change: 1 addition & 0 deletions Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ extension HTTPClientTests {
("testPoolClosesIdleConnections", testPoolClosesIdleConnections),
("testRacePoolIdleConnectionsAndGet", testRacePoolIdleConnectionsAndGet),
("testAvoidLeakingTLSHandshakeCompletionPromise", testAvoidLeakingTLSHandshakeCompletionPromise),
("testAsyncShutdown", testAsyncShutdown),
]
}
}
13 changes: 13 additions & 0 deletions Tests/AsyncHTTPClientTests/HTTPClientTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1673,4 +1673,17 @@ class HTTPClientTests: XCTestCase {
}
}
}

func testAsyncShutdown() {
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
let httpClient = HTTPClient(eventLoopGroupProvider: .shared(eventLoopGroup))
let promise = eventLoopGroup.next().makePromise(of: Void.self)
eventLoopGroup.next().execute {
httpClient.shutdown { error in
XCTAssertNil(error)
promise.succeed(())
}
}
XCTAssertNoThrow(try promise.futureResult.wait())
}
}