From bf8050604dae7da8062cb0e74691cb05ab083d32 Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Thu, 21 Apr 2022 10:35:06 +0200 Subject: [PATCH 1/5] use helper class to sync access --- .../TransactionTests.swift | 94 +++++++++++++++---- 1 file changed, 74 insertions(+), 20 deletions(-) diff --git a/Tests/AsyncHTTPClientTests/TransactionTests.swift b/Tests/AsyncHTTPClientTests/TransactionTests.swift index 7e2c62a0d..3fbb27dde 100644 --- a/Tests/AsyncHTTPClientTests/TransactionTests.swift +++ b/Tests/AsyncHTTPClientTests/TransactionTests.swift @@ -41,7 +41,7 @@ final class TransactionTests: XCTestCase { guard let preparedRequest = maybePreparedRequest else { return XCTFail("Expected to have a request here.") } - let (transaction, responseTask) = Transaction.makeWithResultTask( + let (transaction, responseTask) = await Transaction.makeWithResultTask( request: preparedRequest, preferredEventLoop: embeddedEventLoop ) @@ -78,7 +78,7 @@ final class TransactionTests: XCTestCase { guard let preparedRequest = maybePreparedRequest else { return } - let (transaction, responseTask) = Transaction.makeWithResultTask( + let (transaction, responseTask) = await Transaction.makeWithResultTask( request: preparedRequest, preferredEventLoop: embeddedEventLoop ) @@ -141,7 +141,7 @@ final class TransactionTests: XCTestCase { guard let preparedRequest = maybePreparedRequest else { return } - var tuple: (Transaction, Task)! = Transaction.makeWithResultTask( + var tuple: (Transaction, Task)! = await Transaction.makeWithResultTask( request: preparedRequest, preferredEventLoop: embeddedEventLoop ) @@ -196,7 +196,7 @@ final class TransactionTests: XCTestCase { guard let preparedRequest = maybePreparedRequest else { return XCTFail("Expected to have a request here.") } - let (transaction, responseTask) = Transaction.makeWithResultTask( + let (transaction, responseTask) = await Transaction.makeWithResultTask( request: preparedRequest, preferredEventLoop: embeddedEventLoop ) @@ -282,7 +282,7 @@ final class TransactionTests: XCTestCase { guard let preparedRequest = maybePreparedRequest else { return XCTFail("Expected to have a request here.") } - let (transaction, responseTask) = Transaction.makeWithResultTask( + let (transaction, responseTask) = await Transaction.makeWithResultTask( request: preparedRequest, preferredEventLoop: eventLoopGroup.next() ) @@ -324,7 +324,7 @@ final class TransactionTests: XCTestCase { guard let preparedRequest = maybePreparedRequest else { return XCTFail("Expected to have a request here.") } - let (transaction, responseTask) = Transaction.makeWithResultTask( + let (transaction, responseTask) = await Transaction.makeWithResultTask( request: preparedRequest, preferredEventLoop: embeddedEventLoop ) @@ -366,7 +366,7 @@ final class TransactionTests: XCTestCase { guard let preparedRequest = maybePreparedRequest else { return XCTFail("Expected to have a request here.") } - let (transaction, responseTask) = Transaction.makeWithResultTask( + let (transaction, responseTask) = await Transaction.makeWithResultTask( request: preparedRequest, preferredEventLoop: embeddedEventLoop ) @@ -397,9 +397,13 @@ final class TransactionTests: XCTestCase { func testResponseStreamFails() { #if compiler(>=5.5.2) && canImport(_Concurrency) guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { return } - XCTAsyncTest { + XCTAsyncTest(timeout: 30) { let embeddedEventLoop = EmbeddedEventLoop() - defer { XCTAssertNoThrow(try embeddedEventLoop.syncShutdownGracefully()) } + defer { + print("7", terminator: "") + XCTAssertNoThrow(try embeddedEventLoop.syncShutdownGracefully()) + print("8", terminator: "\n") + } var request = HTTPClientRequest(url: "https://localhost/") request.method = .GET @@ -409,10 +413,12 @@ final class TransactionTests: XCTestCase { guard let preparedRequest = maybePreparedRequest else { return } - let (transaction, responseTask) = Transaction.makeWithResultTask( + print("0", terminator: "") + let (transaction, responseTask) = await Transaction.makeWithResultTask( request: preparedRequest, preferredEventLoop: embeddedEventLoop ) + print("1", terminator: "") let executor = MockRequestExecutor( pauseRequestBodyPartStreamAfterASingleWrite: true, @@ -425,8 +431,9 @@ final class TransactionTests: XCTestCase { let responseHead = HTTPResponseHead(version: .http1_1, status: .ok, headers: ["foo": "bar"]) XCTAssertFalse(executor.signalledDemandForResponseBody) transaction.receiveResponseHead(responseHead) - + print("2", terminator: "") let response = try await responseTask.value + print("3", terminator: "") XCTAssertEqual(response.status, responseHead.status) XCTAssertEqual(response.headers, responseHead.headers) XCTAssertEqual(response.version, responseHead.version) @@ -438,6 +445,7 @@ final class TransactionTests: XCTestCase { XCTAssertNoThrow(try executor.receiveResponseDemand()) executor.resetResponseStreamDemandSignal() transaction.receiveResponseBodyParts([ByteBuffer(integer: 123)]) + print("4", terminator: "") let result = try await part1 XCTAssertEqual(result, ByteBuffer(integer: 123)) @@ -450,9 +458,11 @@ final class TransactionTests: XCTestCase { // can't use XCTAssertThrowsError() here, since capturing async let variables is // not allowed. + print("5", terminator: "") await XCTAssertThrowsError(try await responsePartTask.value) { XCTAssertEqual($0 as? HTTPClientError, .readTimeout) } + print("6", terminator: "") } #endif } @@ -493,7 +503,7 @@ final class TransactionTests: XCTestCase { guard let preparedRequest = maybePreparedRequest else { return } - let (transaction, responseTask) = Transaction.makeWithResultTask( + let (transaction, responseTask) = await Transaction.makeWithResultTask( request: preparedRequest, preferredEventLoop: eventLoopGroup.next() ) @@ -561,9 +571,55 @@ extension Transaction { logger: Logger = Logger(label: "test"), connectionDeadline: NIODeadline = .distantFuture, preferredEventLoop: EventLoop - ) -> (Transaction, _Concurrency.Task) { - let transactionPromise = preferredEventLoop.makePromise(of: Transaction.self) - let result = Task { + ) async -> (Transaction, _Concurrency.Task) { + final class ResultAccumulator: @unchecked Sendable { + var task: _Concurrency.Task? + var transaction: Transaction? + var continuation: CheckedContinuation<(Transaction, _Concurrency.Task), Never>? + var lock = Lock() + init() {} + + func result() async -> (Transaction, _Concurrency.Task) { + self.lock.lock() + precondition(self.continuation == nil) + guard let transaction = self.transaction, + let task = self.task + else { + return await withCheckedContinuation { (continuation: CheckedContinuation<(Transaction, _Concurrency.Task), Never>) in + self.continuation = continuation + self.lock.unlock() + } + } + self.lock.unlock() + return (transaction, task) + } + + func setTransaction(_ transaction: Transaction) { + lock.withLock { + precondition(self.transaction == nil) + self.transaction = transaction + self.resumeContinuationIfNeeded() + } + } + func setTask(_ task: _Concurrency.Task) { + lock.withLock { + precondition(self.task == nil) + self.task = task + self.resumeContinuationIfNeeded() + } + } + private func resumeContinuationIfNeeded() { + guard let continuation = continuation, + let transaction = transaction, + let task = task + else { + return + } + continuation.resume(returning: (transaction, task)) + } + } + let resultAccumulator = ResultAccumulator() + let task = Task { try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in let transaction = Transaction( request: request, @@ -573,13 +629,11 @@ extension Transaction { preferredEventLoop: preferredEventLoop, responseContinuation: continuation ) - transactionPromise.succeed(transaction) + resultAccumulator.setTransaction(transaction) } } - // the promise can never fail and it is therefore safe to force unwrap - let transaction = try! transactionPromise.futureResult.wait() - - return (transaction, result) + resultAccumulator.setTask(task) + return await resultAccumulator.result() } } #endif From e4060cf8c870c2691fc0d4bf965ff2f94d8a1232 Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Thu, 21 Apr 2022 11:16:16 +0200 Subject: [PATCH 2/5] use swift concurrency friendly primitives instead of a lock --- .../TransactionTests.swift | 95 +++++++++---------- 1 file changed, 45 insertions(+), 50 deletions(-) diff --git a/Tests/AsyncHTTPClientTests/TransactionTests.swift b/Tests/AsyncHTTPClientTests/TransactionTests.swift index 3fbb27dde..647728ca8 100644 --- a/Tests/AsyncHTTPClientTests/TransactionTests.swift +++ b/Tests/AsyncHTTPClientTests/TransactionTests.swift @@ -563,6 +563,45 @@ actor SharedIterator { } } +/// non fail-able promise that only support one observer +@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) +fileprivate actor Promise { + private enum State { + case initialised + case fulfilled(Value) + } + + private var state: State = .initialised + + private var observer: CheckedContinuation? + + init() {} + + func fulfil(_ value: Value) { + switch state { + case .initialised: + self.state = .fulfilled(value) + observer?.resume(returning: value) + case .fulfilled: + preconditionFailure("\(Self.self) over fulfilled") + } + } + + var value: Value { + get async { + switch state { + case .initialised: + return await withCheckedContinuation { (continuation: CheckedContinuation) in + precondition(observer == nil, "\(Self.self) supports only one observer") + observer = continuation + } + case .fulfilled(let value): + return value + } + } + } +} + @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) extension Transaction { fileprivate static func makeWithResultTask( @@ -572,53 +611,7 @@ extension Transaction { connectionDeadline: NIODeadline = .distantFuture, preferredEventLoop: EventLoop ) async -> (Transaction, _Concurrency.Task) { - final class ResultAccumulator: @unchecked Sendable { - var task: _Concurrency.Task? - var transaction: Transaction? - var continuation: CheckedContinuation<(Transaction, _Concurrency.Task), Never>? - var lock = Lock() - init() {} - - func result() async -> (Transaction, _Concurrency.Task) { - self.lock.lock() - precondition(self.continuation == nil) - guard let transaction = self.transaction, - let task = self.task - else { - return await withCheckedContinuation { (continuation: CheckedContinuation<(Transaction, _Concurrency.Task), Never>) in - self.continuation = continuation - self.lock.unlock() - } - } - self.lock.unlock() - return (transaction, task) - } - - func setTransaction(_ transaction: Transaction) { - lock.withLock { - precondition(self.transaction == nil) - self.transaction = transaction - self.resumeContinuationIfNeeded() - } - } - func setTask(_ task: _Concurrency.Task) { - lock.withLock { - precondition(self.task == nil) - self.task = task - self.resumeContinuationIfNeeded() - } - } - private func resumeContinuationIfNeeded() { - guard let continuation = continuation, - let transaction = transaction, - let task = task - else { - return - } - continuation.resume(returning: (transaction, task)) - } - } - let resultAccumulator = ResultAccumulator() + let transactionPromise = Promise() let task = Task { try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in let transaction = Transaction( @@ -629,11 +622,13 @@ extension Transaction { preferredEventLoop: preferredEventLoop, responseContinuation: continuation ) - resultAccumulator.setTransaction(transaction) + Task { + await transactionPromise.fulfil(transaction) + } } } - resultAccumulator.setTask(task) - return await resultAccumulator.result() + + return (await transactionPromise.value, task) } } #endif From 9f2c8ca0bc87d78acf8c9b3ad5551d1fa24fab16 Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Thu, 21 Apr 2022 11:30:59 +0200 Subject: [PATCH 3/5] remove debug prints --- .../AsyncHTTPClientTests/TransactionTests.swift | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/Tests/AsyncHTTPClientTests/TransactionTests.swift b/Tests/AsyncHTTPClientTests/TransactionTests.swift index 647728ca8..76d742400 100644 --- a/Tests/AsyncHTTPClientTests/TransactionTests.swift +++ b/Tests/AsyncHTTPClientTests/TransactionTests.swift @@ -399,11 +399,7 @@ final class TransactionTests: XCTestCase { guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { return } XCTAsyncTest(timeout: 30) { let embeddedEventLoop = EmbeddedEventLoop() - defer { - print("7", terminator: "") - XCTAssertNoThrow(try embeddedEventLoop.syncShutdownGracefully()) - print("8", terminator: "\n") - } + defer { XCTAssertNoThrow(try embeddedEventLoop.syncShutdownGracefully()) } var request = HTTPClientRequest(url: "https://localhost/") request.method = .GET @@ -413,12 +409,10 @@ final class TransactionTests: XCTestCase { guard let preparedRequest = maybePreparedRequest else { return } - print("0", terminator: "") let (transaction, responseTask) = await Transaction.makeWithResultTask( request: preparedRequest, preferredEventLoop: embeddedEventLoop ) - print("1", terminator: "") let executor = MockRequestExecutor( pauseRequestBodyPartStreamAfterASingleWrite: true, @@ -431,9 +425,9 @@ final class TransactionTests: XCTestCase { let responseHead = HTTPResponseHead(version: .http1_1, status: .ok, headers: ["foo": "bar"]) XCTAssertFalse(executor.signalledDemandForResponseBody) transaction.receiveResponseHead(responseHead) - print("2", terminator: "") + let response = try await responseTask.value - print("3", terminator: "") + XCTAssertEqual(response.status, responseHead.status) XCTAssertEqual(response.headers, responseHead.headers) XCTAssertEqual(response.version, responseHead.version) @@ -445,7 +439,7 @@ final class TransactionTests: XCTestCase { XCTAssertNoThrow(try executor.receiveResponseDemand()) executor.resetResponseStreamDemandSignal() transaction.receiveResponseBodyParts([ByteBuffer(integer: 123)]) - print("4", terminator: "") + let result = try await part1 XCTAssertEqual(result, ByteBuffer(integer: 123)) @@ -458,11 +452,9 @@ final class TransactionTests: XCTestCase { // can't use XCTAssertThrowsError() here, since capturing async let variables is // not allowed. - print("5", terminator: "") await XCTAssertThrowsError(try await responsePartTask.value) { XCTAssertEqual($0 as? HTTPClientError, .readTimeout) } - print("6", terminator: "") } #endif } From 4eadf8a609962e6e0fdc600709552c50e7433a2b Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Thu, 21 Apr 2022 11:43:08 +0200 Subject: [PATCH 4/5] add missing s --- Tests/AsyncHTTPClientTests/TransactionTests.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Tests/AsyncHTTPClientTests/TransactionTests.swift b/Tests/AsyncHTTPClientTests/TransactionTests.swift index 76d742400..349b40714 100644 --- a/Tests/AsyncHTTPClientTests/TransactionTests.swift +++ b/Tests/AsyncHTTPClientTests/TransactionTests.swift @@ -555,7 +555,7 @@ actor SharedIterator { } } -/// non fail-able promise that only support one observer +/// non fail-able promise that only supports one observer @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) fileprivate actor Promise { private enum State { From 597bf61913048aa7d1a83d0693bcd23f09575c9a Mon Sep 17 00:00:00 2001 From: David Nadoba Date: Thu, 21 Apr 2022 11:50:57 +0200 Subject: [PATCH 5/5] SwiftFormat --- .../TransactionTests.swift | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/Tests/AsyncHTTPClientTests/TransactionTests.swift b/Tests/AsyncHTTPClientTests/TransactionTests.swift index 349b40714..d124eb089 100644 --- a/Tests/AsyncHTTPClientTests/TransactionTests.swift +++ b/Tests/AsyncHTTPClientTests/TransactionTests.swift @@ -425,9 +425,9 @@ final class TransactionTests: XCTestCase { let responseHead = HTTPResponseHead(version: .http1_1, status: .ok, headers: ["foo": "bar"]) XCTAssertFalse(executor.signalledDemandForResponseBody) transaction.receiveResponseHead(responseHead) - + let response = try await responseTask.value - + XCTAssertEqual(response.status, responseHead.status) XCTAssertEqual(response.headers, responseHead.headers) XCTAssertEqual(response.version, responseHead.version) @@ -439,7 +439,7 @@ final class TransactionTests: XCTestCase { XCTAssertNoThrow(try executor.receiveResponseDemand()) executor.resetResponseStreamDemandSignal() transaction.receiveResponseBodyParts([ByteBuffer(integer: 123)]) - + let result = try await part1 XCTAssertEqual(result, ByteBuffer(integer: 123)) @@ -562,30 +562,30 @@ fileprivate actor Promise { case initialised case fulfilled(Value) } - + private var state: State = .initialised - + private var observer: CheckedContinuation? - + init() {} func fulfil(_ value: Value) { - switch state { + switch self.state { case .initialised: self.state = .fulfilled(value) - observer?.resume(returning: value) + self.observer?.resume(returning: value) case .fulfilled: preconditionFailure("\(Self.self) over fulfilled") } } - + var value: Value { get async { - switch state { + switch self.state { case .initialised: return await withCheckedContinuation { (continuation: CheckedContinuation) in - precondition(observer == nil, "\(Self.self) supports only one observer") - observer = continuation + precondition(self.observer == nil, "\(Self.self) supports only one observer") + self.observer = continuation } case .fulfilled(let value): return value @@ -619,7 +619,7 @@ extension Transaction { } } } - + return (await transactionPromise.value, task) } }