diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift index caed79945..3eb18ef35 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift @@ -28,14 +28,13 @@ final class HTTPConnectionPool { private var _idleTimer = [Connection.ID: Scheduled]() /// The connection backoff timeout timers. Protected by the stateLock private var _backoffTimer = [Connection.ID: Scheduled]() + /// The request connection timeout timers. Protected by the stateLock + private var _requestTimer = [Request.ID: Scheduled]() private static let fallbackConnectTimeout: TimeAmount = .seconds(30) let key: ConnectionPool.Key - private let timerLock = Lock() - private var _requestTimer = [Request.ID: Scheduled]() - private var logger: Logger private let eventLoopGroup: EventLoopGroup @@ -110,21 +109,64 @@ final class HTTPConnectionPool { } } + enum RequestAction { + enum Unlocked { + case executeRequest(Request, Connection) + case executeRequests([Request], Connection) + case failRequest(Request, Error) + case failRequests([Request], Error) + case none + } + + enum Locked { + case scheduleRequestTimeout(for: Request, on: EventLoop) + case cancelRequestTimeout(Request.ID) + case cancelRequestTimeouts([Request]) + case none + } + } + struct Locked { var connection: ConnectionAction.Locked + var request: RequestAction.Locked } struct Unlocked { var connection: ConnectionAction.Unlocked - var request: StateMachine.RequestAction + var request: RequestAction.Unlocked } var locked: Locked var unlocked: Unlocked init(from stateMachineAction: StateMachine.Action) { - self.locked = Locked(connection: .none) - self.unlocked = Unlocked(connection: .none, request: stateMachineAction.request) + self.locked = Locked(connection: .none, request: .none) + self.unlocked = Unlocked(connection: .none, request: .none) + + switch stateMachineAction.request { + case .cancelRequestTimeout(let requestID): + self.locked.request = .cancelRequestTimeout(requestID) + case .executeRequest(let request, let connection, cancelTimeout: let cancelTimeout): + if cancelTimeout { + self.locked.request = .cancelRequestTimeout(request.id) + } + self.unlocked.request = .executeRequest(request, connection) + case .executeRequestsAndCancelTimeouts(let requests, let connection): + self.locked.request = .cancelRequestTimeouts(requests) + self.unlocked.request = .executeRequests(requests, connection) + case .failRequest(let request, let error, cancelTimeout: let cancelTimeout): + if cancelTimeout { + self.locked.request = .cancelRequestTimeout(request.id) + } + self.unlocked.request = .failRequest(request, error) + case .failRequestsAndCancelTimeouts(let requests, let error): + self.locked.request = .cancelRequestTimeouts(requests) + self.unlocked.request = .failRequests(requests, error) + case .scheduleRequestTimeout(for: let request, on: let eventLoop): + self.locked.request = .scheduleRequestTimeout(for: request, on: eventLoop) + case .none: + break + } switch stateMachineAction.connection { case .createConnection(let connectionID, on: let eventLoop): @@ -154,14 +196,15 @@ final class HTTPConnectionPool { let unlockedActions = self.stateLock.withLock { () -> Actions.Unlocked in let stateMachineAction = closure(&self._state) let poolAction = Actions(from: stateMachineAction) - self.runLockedActions(poolAction.locked) + self.runLockedConnectionAction(poolAction.locked.connection) + self.runLockedRequestAction(poolAction.locked.request) return poolAction.unlocked } self.runUnlockedActions(unlockedActions) } - private func runLockedActions(_ actions: Actions.Locked) { - switch actions.connection { + private func runLockedConnectionAction(_ action: Actions.ConnectionAction.Locked) { + switch action { case .scheduleBackoffTimer(let connectionID, backoff: let backoff, on: let eventLoop): self.scheduleConnectionStartBackoffTimer(connectionID, backoff, on: eventLoop) @@ -181,6 +224,22 @@ final class HTTPConnectionPool { } } + private func runLockedRequestAction(_ action: Actions.RequestAction.Locked) { + switch action { + case .scheduleRequestTimeout(for: let request, on: let eventLoop): + self.scheduleRequestTimeout(request, on: eventLoop) + + case .cancelRequestTimeout(let requestID): + self.cancelRequestTimeout(requestID) + + case .cancelRequestTimeouts(let requests): + requests.forEach { self.cancelRequestTimeout($0.id) } + + case .none: + break + } + } + private func runUnlockedActions(_ actions: Actions.Unlocked) { self.runUnlockedConnectionAction(actions.connection) self.runUnlockedRequestAction(actions.request) @@ -225,38 +284,20 @@ final class HTTPConnectionPool { } } - private func runUnlockedRequestAction(_ action: StateMachine.RequestAction) { - // The order of execution fail/execute request vs cancelling the request timeout timer does - // not matter in the actions here. The actions don't cause any side effects that will be - // reported back to the state machine and are not dependent on each other. - + private func runUnlockedRequestAction(_ action: Actions.RequestAction.Unlocked) { switch action { - case .executeRequest(let request, let connection, cancelTimeout: let cancelTimeout): - if cancelTimeout { - self.cancelRequestTimeout(request.id) - } + case .executeRequest(let request, let connection): connection.executeRequest(request.req) - case .executeRequestsAndCancelTimeouts(let requests, let connection): - self.cancelRequestTimeouts(requests) + case .executeRequests(let requests, let connection): requests.forEach { connection.executeRequest($0.req) } - case .failRequest(let request, let error, cancelTimeout: let cancelTimeout): - if cancelTimeout { - self.cancelRequestTimeout(request.id) - } + case .failRequest(let request, let error): request.req.fail(error) - case .failRequestsAndCancelTimeouts(let requests, let error): - self.cancelRequestTimeouts(requests) + case .failRequests(let requests, let error): requests.forEach { $0.req.fail(error) } - case .scheduleRequestTimeout(let request, on: let eventLoop): - self.scheduleRequestTimeout(request, on: eventLoop) - - case .cancelRequestTimeout(let requestID): - self.cancelRequestTimeout(requestID) - case .none: break } @@ -282,49 +323,29 @@ final class HTTPConnectionPool { private func scheduleRequestTimeout(_ request: Request, on eventLoop: EventLoop) { let requestID = request.id let scheduled = eventLoop.scheduleTask(deadline: request.connectionDeadline) { - // The timer has fired. Now we need to do a couple of things: - // - // 1. Remove ourselves from the timer dictionary to not leak any data. If our - // waiter entry still exists, we need to tell the state machine, that we want - // to fail the request. - let timeoutFired = self.timerLock.withLock { - self._requestTimer.removeValue(forKey: requestID) != nil - } - - // 2. If the entry did not exists anymore, we can assume that the request was - // scheduled on another connection. The timer still fired anyhow because of a - // race. In such a situation we don't need to do anything. - guard timeoutFired else { return } - - // 3. Tell the state machine about the timeout - self.modifyStateAndRunActions { - $0.timeoutRequest(requestID) + // there might be a race between a the timeout timer and the pool scheduling the + // request on another thread. + self.modifyStateAndRunActions { stateMachine in + if self._requestTimer.removeValue(forKey: requestID) != nil { + // The timer still exists. State Machines assumes it is alive. Inform state + // machine. + return stateMachine.timeoutRequest(requestID) + } + return .none } } - self.timerLock.withLockVoid { - assert(self._requestTimer[requestID] == nil) - self._requestTimer[requestID] = scheduled - } + assert(self._requestTimer[requestID] == nil) + self._requestTimer[requestID] = scheduled request.req.requestWasQueued(self) } private func cancelRequestTimeout(_ id: Request.ID) { - let scheduled = self.timerLock.withLock { - self._requestTimer.removeValue(forKey: id) + guard let cancelTimer = self._requestTimer.removeValue(forKey: id) else { + preconditionFailure("Expected to have a timer for request \(id) at this point.") } - - scheduled?.cancel() - } - - private func cancelRequestTimeouts(_ requests: [Request]) { - let scheduled = self.timerLock.withLock { - requests.compactMap { - self._requestTimer.removeValue(forKey: $0.id) - } - } - scheduled.forEach { $0.cancel() } + cancelTimer.cancel() } private func scheduleIdleTimerForConnection(_ connectionID: Connection.ID, on eventLoop: EventLoop) {