Skip to content

Commit ddc0233

Browse files
committed
Add a RequestQueue
1 parent eab2a84 commit ddc0233

7 files changed

+329
-188
lines changed

Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool+Waiter.swift

-57
This file was deleted.

Sources/AsyncHTTPClient/ConnectionPool/HTTPConnectionPool.swift

+40
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,46 @@ enum HTTPConnectionPool {
123123
}
124124
}
125125

126+
extension HTTPConnectionPool {
127+
/// This is a wrapper that we use inside the connection pool state machine to ensure that
128+
/// the actual request can not be accessed at any time. Further it exposes all that is needed within
129+
/// the state machine. A request ID and the `EventLoop` requirement.
130+
struct Request {
131+
struct ID: Hashable {
132+
let objectIdentifier: ObjectIdentifier
133+
let eventLoopID: EventLoopID?
134+
135+
fileprivate init(_ request: HTTPSchedulableRequest, eventLoopRequirement eventLoopID: EventLoopID?) {
136+
self.objectIdentifier = ObjectIdentifier(request)
137+
self.eventLoopID = eventLoopID
138+
}
139+
}
140+
141+
fileprivate let req: HTTPSchedulableRequest
142+
143+
init(_ request: HTTPSchedulableRequest) {
144+
self.req = request
145+
}
146+
147+
var id: HTTPConnectionPool.Request.ID {
148+
HTTPConnectionPool.Request.ID(self.req, eventLoopRequirement: self.requiredEventLoop?.id)
149+
}
150+
151+
var requiredEventLoop: EventLoop? {
152+
switch self.req.eventLoopPreference.preference {
153+
case .indifferent, .delegate:
154+
return nil
155+
case .delegateAndChannel(on: let eventLoop), .testOnly_exact(channelOn: let eventLoop, delegateOn: _):
156+
return eventLoop
157+
}
158+
}
159+
160+
func __testOnly_internal_value() -> HTTPSchedulableRequest {
161+
self.req
162+
}
163+
}
164+
}
165+
126166
struct EventLoopID: Hashable {
127167
private var id: Identifier
128168

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the AsyncHTTPClient open source project
4+
//
5+
// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import NIOCore
16+
17+
extension HTTPConnectionPool {
18+
/// A struct to store all queued requests.
19+
struct RequestQueue {
20+
private var generalPurposeQueue: CircularBuffer<Request>
21+
private var eventLoopQueues: [EventLoopID: CircularBuffer<Request>]
22+
23+
init() {
24+
self.generalPurposeQueue = CircularBuffer(initialCapacity: 32)
25+
self.eventLoopQueues = [:]
26+
}
27+
28+
var count: Int {
29+
self.generalPurposeQueue.count
30+
}
31+
32+
var isEmpty: Bool {
33+
self.generalPurposeQueue.isEmpty
34+
}
35+
36+
mutating func count(for eventLoop: EventLoop?) -> Int {
37+
if let eventLoop = eventLoop {
38+
return self.withEventLoopQueue(for: eventLoop.id) { $0.count }
39+
}
40+
return self.generalPurposeQueue.count
41+
}
42+
43+
mutating func isEmpty(for eventLoop: EventLoop?) -> Bool {
44+
if let eventLoop = eventLoop {
45+
return self.withEventLoopQueue(for: eventLoop.id) { $0.isEmpty }
46+
}
47+
return self.generalPurposeQueue.isEmpty
48+
}
49+
50+
@discardableResult
51+
mutating func push(_ request: Request) -> Request.ID {
52+
if let eventLoop = request.requiredEventLoop {
53+
self.withEventLoopQueue(for: eventLoop.id) { queue in
54+
queue.append(request)
55+
}
56+
} else {
57+
self.generalPurposeQueue.append(request)
58+
}
59+
return request.id
60+
}
61+
62+
mutating func popFirst(for eventLoop: EventLoop? = nil) -> Request? {
63+
if let eventLoop = eventLoop {
64+
return self.withEventLoopQueue(for: eventLoop.id) { queue in
65+
queue.popFirst()
66+
}
67+
}
68+
return self.generalPurposeQueue.popFirst()
69+
}
70+
71+
mutating func remove(_ requestID: Request.ID) -> Request? {
72+
if let eventLoopID = requestID.eventLoopID {
73+
return self.withEventLoopQueue(for: eventLoopID) { queue in
74+
guard let index = queue.firstIndex(where: { $0.id == requestID }) else {
75+
return nil
76+
}
77+
return queue.remove(at: index)
78+
}
79+
} else {
80+
if let index = self.generalPurposeQueue.firstIndex(where: { $0.id == requestID }) {
81+
// TBD: This is slow. Do we maybe want something more sophisticated here?
82+
return self.generalPurposeQueue.remove(at: index)
83+
}
84+
return nil
85+
}
86+
}
87+
88+
mutating func removeAll() -> [Request] {
89+
var result = [Request]()
90+
result = self.eventLoopQueues.reduce(into: result) { partialResult, element in
91+
element.value.forEach { request in
92+
partialResult.append(request)
93+
}
94+
}
95+
96+
self.generalPurposeQueue.forEach { request in
97+
result.append(request)
98+
}
99+
100+
self.eventLoopQueues.removeAll()
101+
self.generalPurposeQueue.removeAll()
102+
return result
103+
}
104+
105+
private mutating func withEventLoopQueue<Result>(
106+
for eventLoopID: EventLoopID,
107+
_ closure: (inout CircularBuffer<Request>) -> Result
108+
) -> Result {
109+
if self.eventLoopQueues[eventLoopID] == nil {
110+
self.eventLoopQueues[eventLoopID] = CircularBuffer(initialCapacity: 32)
111+
}
112+
return closure(&self.eventLoopQueues[eventLoopID]!)
113+
}
114+
}
115+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the AsyncHTTPClient open source project
4+
//
5+
// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
@testable import AsyncHTTPClient
16+
import Logging
17+
import NIOCore
18+
import NIOEmbedded
19+
import NIOHTTP1
20+
import XCTest
21+
22+
class HTTPConnectionPool_RequestQueueTests: XCTestCase {
23+
func testCountAndIsEmptyWorks() {
24+
var queue = HTTPConnectionPool.RequestQueue()
25+
XCTAssertTrue(queue.isEmpty)
26+
XCTAssertEqual(queue.count, 0)
27+
let req1 = MockScheduledRequest(eventLoopPreference: .indifferent)
28+
let req1ID = queue.push(.init(req1))
29+
XCTAssertFalse(queue.isEmpty)
30+
XCTAssertFalse(queue.isEmpty(for: nil))
31+
XCTAssertEqual(queue.count, 1)
32+
XCTAssertEqual(queue.count(for: nil), 1)
33+
34+
let req2 = MockScheduledRequest(eventLoopPreference: .indifferent)
35+
let req2ID = queue.push(.init(req2))
36+
XCTAssertEqual(queue.count, 2)
37+
38+
XCTAssert(queue.popFirst()?.__testOnly_internal_value() === req1)
39+
XCTAssertEqual(queue.count, 1)
40+
XCTAssert(queue.remove(req2ID)?.__testOnly_internal_value() === req2)
41+
XCTAssertNil(queue.remove(req1ID))
42+
43+
let eventLoop = EmbeddedEventLoop()
44+
45+
XCTAssertTrue(queue.isEmpty(for: eventLoop))
46+
XCTAssertEqual(queue.count(for: eventLoop), 0)
47+
let req3 = MockScheduledRequest(eventLoopPreference: .delegateAndChannel(on: eventLoop))
48+
let req3ID = queue.push(.init(req3))
49+
XCTAssertFalse(queue.isEmpty(for: eventLoop))
50+
XCTAssertEqual(queue.count(for: eventLoop), 1)
51+
XCTAssert(queue.popFirst(for: eventLoop)?.__testOnly_internal_value() === req3)
52+
XCTAssertNil(queue.remove(req3ID))
53+
54+
let req4 = MockScheduledRequest(eventLoopPreference: .delegateAndChannel(on: eventLoop))
55+
let req4ID = queue.push(.init(req4))
56+
XCTAssert(queue.remove(req4ID)?.__testOnly_internal_value() === req4)
57+
58+
let req5 = MockScheduledRequest(eventLoopPreference: .indifferent)
59+
queue.push(.init(req5))
60+
let req6 = MockScheduledRequest(eventLoopPreference: .delegateAndChannel(on: eventLoop))
61+
queue.push(.init(req6))
62+
let all = queue.removeAll()
63+
let testSet = all.map { $0.__testOnly_internal_value() }
64+
XCTAssertEqual(testSet.count, 2)
65+
XCTAssertTrue(testSet.contains(where: { $0 === req5 }))
66+
XCTAssertTrue(testSet.contains(where: { $0 === req6 }))
67+
XCTAssertFalse(testSet.contains(where: { $0 === req4 }))
68+
}
69+
}
70+
71+
private class MockScheduledRequest: HTTPSchedulableRequest {
72+
init(eventLoopPreference: HTTPClient.EventLoopPreference) {
73+
self.eventLoopPreference = eventLoopPreference
74+
}
75+
76+
var logger: Logger { preconditionFailure("Unimplemented") }
77+
var connectionDeadline: NIODeadline { preconditionFailure("Unimplemented") }
78+
let eventLoopPreference: HTTPClient.EventLoopPreference
79+
80+
func requestWasQueued(_: HTTPRequestScheduler) {
81+
preconditionFailure("Unimplemented")
82+
}
83+
84+
func fail(_: Error) {
85+
preconditionFailure("Unimplemented")
86+
}
87+
88+
// MARK: HTTPExecutableRequest
89+
90+
var requestHead: HTTPRequestHead { preconditionFailure("Unimplemented") }
91+
var requestFramingMetadata: RequestFramingMetadata { preconditionFailure("Unimplemented") }
92+
var idleReadTimeout: TimeAmount? { preconditionFailure("Unimplemented") }
93+
94+
func willExecuteRequest(_: HTTPRequestExecutor) {
95+
preconditionFailure("Unimplemented")
96+
}
97+
98+
func requestHeadSent() {
99+
preconditionFailure("Unimplemented")
100+
}
101+
102+
func resumeRequestBodyStream() {
103+
preconditionFailure("Unimplemented")
104+
}
105+
106+
func pauseRequestBodyStream() {
107+
preconditionFailure("Unimplemented")
108+
}
109+
110+
func receiveResponseHead(_: HTTPResponseHead) {
111+
preconditionFailure("Unimplemented")
112+
}
113+
114+
func receiveResponseBodyParts(_: CircularBuffer<ByteBuffer>) {
115+
preconditionFailure("Unimplemented")
116+
}
117+
118+
func succeedRequest(_: CircularBuffer<ByteBuffer>?) {
119+
preconditionFailure("Unimplemented")
120+
}
121+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the AsyncHTTPClient open source project
4+
//
5+
// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import Dispatch
16+
import NIOConcurrencyHelpers
17+
import NIOCore
18+
import NIOEmbedded
19+
20+
/// An `EventLoopGroup` of `EmbeddedEventLoop`s.
21+
final class EmbeddedEventLoopGroup: EventLoopGroup {
22+
private let loops: [EmbeddedEventLoop]
23+
private let index = NIOAtomic<Int>.makeAtomic(value: 0)
24+
25+
internal init(loops: Int) {
26+
self.loops = (0..<loops).map { _ in EmbeddedEventLoop() }
27+
}
28+
29+
internal func next() -> EventLoop {
30+
let index: Int = self.index.add(1)
31+
return self.loops[index % self.loops.count]
32+
}
33+
34+
internal func makeIterator() -> EventLoopIterator {
35+
return EventLoopIterator(self.loops)
36+
}
37+
38+
internal func shutdownGracefully(queue: DispatchQueue, _ callback: @escaping (Error?) -> Void) {
39+
var shutdownError: Error?
40+
41+
for loop in self.loops {
42+
loop.shutdownGracefully(queue: queue) { error in
43+
if let error = error {
44+
shutdownError = error
45+
}
46+
}
47+
}
48+
49+
queue.sync {
50+
callback(shutdownError)
51+
}
52+
}
53+
}

0 commit comments

Comments
 (0)