diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..84b0df3 --- /dev/null +++ b/.editorconfig @@ -0,0 +1,15 @@ +[*] +insert_final_newline = true +indent_size = 4 # We can remove this line if anyone has strong opinions. + +# swift +[*.swift] +indent_style = tab + +# sh +[*.sh] +indent_style = tab + +# documentation, utils +[*.{md,mdx,diff}] +trim_trailing_whitespace = false diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 146ede4..2cbd141 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -100,3 +100,12 @@ jobs: uses: actions/checkout@v4 - name: Link Checker uses: AlexanderDokuchaev/md-dead-link-check@v1.0.1 + lint-swift: + name: Lint Swift + runs-on: ubuntu-latest + container: swift:6.0 + steps: + - name: Checkout Repo + uses: actions/checkout@v4 + - name: Lint Swift + run: swift run --package-path CLI swiftformat . --lint diff --git a/.swiftformat b/.swiftformat new file mode 100644 index 0000000..7cd7eb1 --- /dev/null +++ b/.swiftformat @@ -0,0 +1,20 @@ +# format options +--indent tab +--modifierorder nonisolated,open,public,internal,fileprivate,private,private(set),final,override,required,convenience +--ranges no-space +--extensionacl on-declarations +--funcattributes prev-line +--typeattributes prev-line +--storedvarattrs same-line +--hexgrouping none +--decimalgrouping 3 + +# rules +--enable isEmpty +--enable wrapEnumCases +--enable wrapMultilineStatementBraces +--disable consistentSwitchCaseSpacing +--disable blankLineAfterSwitchCase + +# global +--swiftversion 6.0 diff --git a/CLI/Package.resolved b/CLI/Package.resolved new file mode 100644 index 0000000..c7450ae --- /dev/null +++ b/CLI/Package.resolved @@ -0,0 +1,15 @@ +{ + "originHash" : "b043e9190ddd3045375d40ccb608edf659c48e3b9f8e2ee40cc489c8ec3f496d", + "pins" : [ + { + "identity" : "swiftformat", + "kind" : "remoteSourceControl", + "location" : "https://github.com/nicklockwood/SwiftFormat", + "state" : { + "revision" : "d35227722eb590b34a3ccaf8b40759e8910bc870", + "version" : "0.56.1" + } + } + ], + "version" : 3 +} diff --git a/CLI/Package.swift b/CLI/Package.swift new file mode 100644 index 0000000..f536f52 --- /dev/null +++ b/CLI/Package.swift @@ -0,0 +1,16 @@ +// swift-tools-version: 6.0 +// The swift-tools-version declares the minimum version of Swift required to build this package. + +import PackageDescription + +let package = Package( + name: "CLI", + platforms: [ + .macOS(.v14), + ], + products: [], + dependencies: [ + .package(url: "https://github.com/nicklockwood/SwiftFormat", from: "0.56.1"), + ], + targets: [] +) diff --git a/Package.swift b/Package.swift index 67342b8..d3ef00f 100644 --- a/Package.swift +++ b/Package.swift @@ -4,35 +4,35 @@ import PackageDescription let package = Package( - name: "swift-async-queue", - platforms: [ - .macOS(.v10_15), - .iOS(.v13), - .tvOS(.v13), - .watchOS(.v6), - .macCatalyst(.v13), - .visionOS(.v1), - ], - products: [ - .library( - name: "AsyncQueue", - targets: ["AsyncQueue"] - ), - ], - targets: [ - .target( - name: "AsyncQueue", - dependencies: [], - swiftSettings: [ - .swiftLanguageMode(.v6), - ] - ), - .testTarget( - name: "AsyncQueueTests", - dependencies: ["AsyncQueue"], - swiftSettings: [ - .swiftLanguageMode(.v6), - ] - ), - ] + name: "swift-async-queue", + platforms: [ + .macOS(.v10_15), + .iOS(.v13), + .tvOS(.v13), + .watchOS(.v6), + .macCatalyst(.v13), + .visionOS(.v1), + ], + products: [ + .library( + name: "AsyncQueue", + targets: ["AsyncQueue"] + ), + ], + targets: [ + .target( + name: "AsyncQueue", + dependencies: [], + swiftSettings: [ + .swiftLanguageMode(.v6), + ] + ), + .testTarget( + name: "AsyncQueueTests", + dependencies: ["AsyncQueue"], + swiftSettings: [ + .swiftLanguageMode(.v6), + ] + ), + ] ) diff --git a/Scripts/build.swift b/Scripts/build.swift index 38d76c8..31c1d72 100755 --- a/Scripts/build.swift +++ b/Scripts/build.swift @@ -5,111 +5,111 @@ import Foundation // Usage: build.swift platforms func execute(commandPath: String, arguments: [String]) throws { - let task = Process() - task.executableURL = .init(filePath: commandPath) - task.arguments = arguments - print("Launching command: \(commandPath) \(arguments.joined(separator: " "))") - try task.run() - task.waitUntilExit() - guard task.terminationStatus == 0 else { - throw TaskError.code(task.terminationStatus) - } + let task = Process() + task.executableURL = .init(filePath: commandPath) + task.arguments = arguments + print("Launching command: \(commandPath) \(arguments.joined(separator: " "))") + try task.run() + task.waitUntilExit() + guard task.terminationStatus == 0 else { + throw TaskError.code(task.terminationStatus) + } } enum TaskError: Error { - case code(Int32) + case code(Int32) } enum Platform: String, CaseIterable, CustomStringConvertible { - case iOS_18 - case tvOS_18 - case macOS_15 - case macCatalyst_15 - case watchOS_11 - case visionOS_2 - - var destination: String { - switch self { - case .iOS_18: - "platform=iOS Simulator,OS=18.0,name=iPad (10th generation)" - - case .tvOS_18: - "platform=tvOS Simulator,OS=18.0,name=Apple TV" - - case .macOS_15, - .macCatalyst_15: - "platform=OS X" - - case .watchOS_11: - "OS=11.0,name=Apple Watch Series 10 (46mm)" - - case .visionOS_2: - "OS=2.0,name=Apple Vision Pro" - } - } - - var sdk: String { - switch self { - case .iOS_18: - "iphonesimulator" - - case .tvOS_18: - "appletvsimulator" - - case .macOS_15, - .macCatalyst_15: - "macosx15.0" - - case .watchOS_11: - "watchsimulator" - - case .visionOS_2: - "xrsimulator" - } - } - - var derivedDataPath: String { - ".build/derivedData/" + description - } - - var description: String { - rawValue - } + case iOS_18 + case tvOS_18 + case macOS_15 + case macCatalyst_15 + case watchOS_11 + case visionOS_2 + + var destination: String { + switch self { + case .iOS_18: + "platform=iOS Simulator,OS=18.0,name=iPad (10th generation)" + + case .tvOS_18: + "platform=tvOS Simulator,OS=18.0,name=Apple TV" + + case .macOS_15, + .macCatalyst_15: + "platform=OS X" + + case .watchOS_11: + "OS=11.0,name=Apple Watch Series 10 (46mm)" + + case .visionOS_2: + "OS=2.0,name=Apple Vision Pro" + } + } + + var sdk: String { + switch self { + case .iOS_18: + "iphonesimulator" + + case .tvOS_18: + "appletvsimulator" + + case .macOS_15, + .macCatalyst_15: + "macosx15.0" + + case .watchOS_11: + "watchsimulator" + + case .visionOS_2: + "xrsimulator" + } + } + + var derivedDataPath: String { + ".build/derivedData/" + description + } + + var description: String { + rawValue + } } guard CommandLine.arguments.count > 1 else { - print("Usage: build.swift platforms") - throw TaskError.code(1) + print("Usage: build.swift platforms") + throw TaskError.code(1) } let rawPlatforms = CommandLine.arguments[1].components(separatedBy: ",") for rawPlatform in rawPlatforms { - guard let platform = Platform(rawValue: rawPlatform) else { - print("Received unknown platform type \(rawPlatform)") - print("Possible platform types are: \(Platform.allCases)") - throw TaskError.code(1) - } - - var xcodeBuildArguments = [ - "-scheme", "swift-async-queue", - "-sdk", platform.sdk, - "-derivedDataPath", platform.derivedDataPath, - "-PBXBuildsContinueAfterErrors=0", - "OTHER_SWIFT_FLAGS=-warnings-as-errors", - ] - - if !platform.destination.isEmpty { - xcodeBuildArguments.append("-destination") - xcodeBuildArguments.append(platform.destination) - } - xcodeBuildArguments.append("-enableCodeCoverage") - xcodeBuildArguments.append("YES") - xcodeBuildArguments.append("build") - xcodeBuildArguments.append("test") - xcodeBuildArguments.append("-test-iterations") - xcodeBuildArguments.append("100") - xcodeBuildArguments.append("-run-tests-until-failure") - - try execute(commandPath: "/usr/bin/xcodebuild", arguments: xcodeBuildArguments) + guard let platform = Platform(rawValue: rawPlatform) else { + print("Received unknown platform type \(rawPlatform)") + print("Possible platform types are: \(Platform.allCases)") + throw TaskError.code(1) + } + + var xcodeBuildArguments = [ + "-scheme", "swift-async-queue", + "-sdk", platform.sdk, + "-derivedDataPath", platform.derivedDataPath, + "-PBXBuildsContinueAfterErrors=0", + "OTHER_SWIFT_FLAGS=-warnings-as-errors", + ] + + if !platform.destination.isEmpty { + xcodeBuildArguments.append("-destination") + xcodeBuildArguments.append(platform.destination) + } + xcodeBuildArguments.append("-enableCodeCoverage") + xcodeBuildArguments.append("YES") + xcodeBuildArguments.append("build") + xcodeBuildArguments.append("test") + xcodeBuildArguments.append("-test-iterations") + xcodeBuildArguments.append("100") + xcodeBuildArguments.append("-run-tests-until-failure") + + try execute(commandPath: "/usr/bin/xcodebuild", arguments: xcodeBuildArguments) } diff --git a/Scripts/prepare-coverage-reports.sh b/Scripts/prepare-coverage-reports.sh index 5ecdeff..9cf62b4 100755 --- a/Scripts/prepare-coverage-reports.sh +++ b/Scripts/prepare-coverage-reports.sh @@ -2,33 +2,33 @@ set -e function exportlcov() { - build_type=$1 - executable_name=$2 + build_type=$1 + executable_name=$2 - executable=$(find "${directory}" -type f -name $executable_name) - profile=$(find "${directory}" -type f -name 'Coverage.profdata') - output_file_name="$executable_name.lcov" + executable=$(find "${directory}" -type f -name $executable_name) + profile=$(find "${directory}" -type f -name 'Coverage.profdata') + output_file_name="$executable_name.lcov" - can_proceed=true - if [[ $build_type == watchOS* ]]; then - echo "\tAborting creation of $output_file_name – watchOS not supported." - elif [[ -z $profile ]]; then - echo "\tAborting creation of $output_file_name – no profile found." - elif [[ -z $executable ]]; then - echo "\tAborting creation of $output_file_name – no executable found." - else - output_dir=".build/artifacts/$build_type" - mkdir -p $output_dir + can_proceed=true + if [[ $build_type == watchOS* ]]; then + echo "\tAborting creation of $output_file_name – watchOS not supported." + elif [[ -z $profile ]]; then + echo "\tAborting creation of $output_file_name – no profile found." + elif [[ -z $executable ]]; then + echo "\tAborting creation of $output_file_name – no executable found." + else + output_dir=".build/artifacts/$build_type" + mkdir -p $output_dir - output_file="$output_dir/$output_file_name" - echo "\tExporting $output_file" - xcrun llvm-cov export -format="lcov" $executable -instr-profile $profile > $output_file - fi + output_file="$output_dir/$output_file_name" + echo "\tExporting $output_file" + xcrun llvm-cov export -format="lcov" $executable -instr-profile $profile >$output_file + fi } for directory in $(git rev-parse --show-toplevel)/.build/derivedData/*/; do - build_type=$(basename $directory) - echo "Finding coverage information for $build_type" + build_type=$(basename $directory) + echo "Finding coverage information for $build_type" - exportlcov $build_type 'AsyncQueueTests' + exportlcov $build_type 'AsyncQueueTests' done diff --git a/Sources/AsyncQueue/ActorQueue.swift b/Sources/AsyncQueue/ActorQueue.swift index afd3f5e..d34fa69 100644 --- a/Sources/AsyncQueue/ActorQueue.swift +++ b/Sources/AsyncQueue/ActorQueue.swift @@ -52,315 +52,314 @@ /// - Warning: The `ActorQueue`'s conformance to `@unchecked Sendable` is safe if and only if `adoptExecutionContext(of:)` is called only from the adopted actor's `init` method. /// - Precondition: The lifecycle of an `ActorQueue` must not exceed that of the adopted actor. public final class ActorQueue: @unchecked Sendable { + // MARK: Initialization - // MARK: Initialization + /// Instantiates an actor queue. + public init() { + let (taskStream, taskStreamContinuation) = AsyncStream.makeStream() + self.taskStreamContinuation = taskStreamContinuation - /// Instantiates an actor queue. - public init() { - let (taskStream, taskStreamContinuation) = AsyncStream.makeStream() - self.taskStreamContinuation = taskStreamContinuation + Task { + // In an ideal world, we would isolate this `for await` loop to the `ActorType`. + // However, there's no good way to do that without retaining the actor and creating a cycle. + for await actorTask in taskStream { + // Await switching to the ActorType context. + await actorTask.task(actorTask.executionContext) + } + } + } - Task { - // In an ideal world, we would isolate this `for await` loop to the `ActorType`. - // However, there's no good way to do that without retaining the actor and creating a cycle. - for await actorTask in taskStream { - // Await switching to the ActorType context. - await actorTask.task(actorTask.executionContext) - } - } - } + deinit { + taskStreamContinuation.finish() + } - deinit { - taskStreamContinuation.finish() - } + // MARK: Public - // MARK: Public + /// Sets the actor context within which each `enqueue` and `enqueueAndWait`ed task will execute. + /// It is recommended that this method be called in the adopted actor’s `init` method. + /// **Must be called prior to enqueuing any work on the receiver.** + /// + /// - Parameter actor: The actor on which the queue's task will execute. This parameter is not retained by the receiver. + /// - Warning: Calling this method more than once will result in an assertion failure. + public func adoptExecutionContext(of actor: ActorType) { + assert(weakExecutionContext == nil) // Adopting multiple executionContexts on the same queue is API abuse. + weakExecutionContext = actor + } - /// Sets the actor context within which each `enqueue` and `enqueueAndWait`ed task will execute. - /// It is recommended that this method be called in the adopted actor’s `init` method. - /// **Must be called prior to enqueuing any work on the receiver.** - /// - /// - Parameter actor: The actor on which the queue's task will execute. This parameter is not retained by the receiver. - /// - Warning: Calling this method more than once will result in an assertion failure. - public func adoptExecutionContext(of actor: ActorType) { - assert(weakExecutionContext == nil) // Adopting multiple executionContexts on the same queue is API abuse. - weakExecutionContext = actor - } + // MARK: Fileprivate - // MARK: Fileprivate + fileprivate let taskStreamContinuation: AsyncStream.Continuation - fileprivate let taskStreamContinuation: AsyncStream.Continuation + /// The actor on whose isolated context our tasks run, force-unwrapped. + /// Utilize this accessor to retrieve the weak execution context in order to avoid repeating the below comment. + fileprivate var executionContext: ActorType { + // Crashing here means that this queue is being sent tasks either before an execution context has been set, or + // after the execution context has deallocated. An ActorQueue's execution context should be set in the adopted + // actor's `init` method, and the ActorQueue should not exceed the lifecycle of the adopted actor. + weakExecutionContext! + } - /// The actor on whose isolated context our tasks run, force-unwrapped. - /// Utilize this accessor to retrieve the weak execution context in order to avoid repeating the below comment. - fileprivate var executionContext: ActorType { - // Crashing here means that this queue is being sent tasks either before an execution context has been set, or - // after the execution context has deallocated. An ActorQueue's execution context should be set in the adopted - // actor's `init` method, and the ActorQueue should not exceed the lifecycle of the adopted actor. - weakExecutionContext! - } + fileprivate struct ActorTask: Sendable { + init( + executionContext: ActorType, + task: @escaping @Sendable (isolated ActorType) async -> Void + ) { + self.executionContext = executionContext + self.task = task + } - fileprivate struct ActorTask: Sendable { - init( - executionContext: ActorType, - task: @escaping @Sendable (isolated ActorType) async -> Void - ) { - self.executionContext = executionContext - self.task = task - } + let executionContext: ActorType + let task: @Sendable (isolated ActorType) async -> Void + } - let executionContext: ActorType - let task: @Sendable (isolated ActorType) async -> Void - } + // MARK: Private - // MARK: Private - - /// The actor on whose isolated context our tasks run. - /// We must use`weak` here to avoid creating a retain cycle between the adopted actor and this actor queue. - /// - /// We will assume this execution context always exists for the lifecycle of the queue because: - /// 1. The lifecycle of any `ActorQueue` must not exceed the lifecycle of its adopted `actor`. - /// 2. The adopted `actor` must set itself as the execution context for this queue within its `init` method. - private weak var weakExecutionContext: ActorType? + /// The actor on whose isolated context our tasks run. + /// We must use`weak` here to avoid creating a retain cycle between the adopted actor and this actor queue. + /// + /// We will assume this execution context always exists for the lifecycle of the queue because: + /// 1. The lifecycle of any `ActorQueue` must not exceed the lifecycle of its adopted `actor`. + /// 2. The adopted `actor` must set itself as the execution context for this queue within its `init` method. + private weak var weakExecutionContext: ActorType? } extension Task { - /// Runs the given nonthrowing operation asynchronously - /// as part of a new top-level task on behalf of the current actor. - /// The operation will not execute until all prior tasks have - /// completed or suspended. - /// - /// Use this function when creating asynchronous work - /// that operates on behalf of the synchronous function that calls it. - /// Like `Task.detached(priority:operation:)`, - /// this function creates a separate, top-level task. - /// Unlike `Task.detached(priority:operation:)`, - /// the task created by `Task.init(priority:operation:)` - /// inherits the priority and actor context of the caller, - /// so the operation is treated more like an asynchronous extension - /// to the synchronous operation. - /// - /// You need to keep a reference to the task - /// if you want to cancel it by calling the `Task.cancel()` method. - /// Discarding your reference to a detached task - /// doesn't implicitly cancel that task, - /// it only makes it impossible for you to explicitly cancel the task. - /// - /// - Parameters: - /// - actorQueue: The queue on which to enqueue the task. - /// - operation: The operation to perform. - @discardableResult - public init( - priority: TaskPriority? = nil, - on actorQueue: ActorQueue, - operation: @Sendable @escaping (isolated ActorType) async -> Success - ) where Failure == Never { - let delivery = Delivery() - let semaphore = Semaphore() - let task = ActorQueue.ActorTask( - executionContext: actorQueue.executionContext, - task: { executionContext in - await semaphore.wait() - delivery.execute({ @Sendable executionContext in - await delivery.sendValue(operation(executionContext)) - }, in: executionContext, priority: priority) - } - ) - actorQueue.taskStreamContinuation.yield(task) - self.init(priority: priority) { - await withTaskCancellationHandler( - operation: { - await semaphore.signal() - return await delivery.getValue() - }, - onCancel: delivery.cancel - ) - } - } + /// Runs the given nonthrowing operation asynchronously + /// as part of a new top-level task on behalf of the current actor. + /// The operation will not execute until all prior tasks have + /// completed or suspended. + /// + /// Use this function when creating asynchronous work + /// that operates on behalf of the synchronous function that calls it. + /// Like `Task.detached(priority:operation:)`, + /// this function creates a separate, top-level task. + /// Unlike `Task.detached(priority:operation:)`, + /// the task created by `Task.init(priority:operation:)` + /// inherits the priority and actor context of the caller, + /// so the operation is treated more like an asynchronous extension + /// to the synchronous operation. + /// + /// You need to keep a reference to the task + /// if you want to cancel it by calling the `Task.cancel()` method. + /// Discarding your reference to a detached task + /// doesn't implicitly cancel that task, + /// it only makes it impossible for you to explicitly cancel the task. + /// + /// - Parameters: + /// - actorQueue: The queue on which to enqueue the task. + /// - operation: The operation to perform. + @discardableResult + public init( + priority: TaskPriority? = nil, + on actorQueue: ActorQueue, + operation: @Sendable @escaping (isolated ActorType) async -> Success + ) where Failure == Never { + let delivery = Delivery() + let semaphore = Semaphore() + let task = ActorQueue.ActorTask( + executionContext: actorQueue.executionContext, + task: { executionContext in + await semaphore.wait() + delivery.execute({ @Sendable executionContext in + await delivery.sendValue(operation(executionContext)) + }, in: executionContext, priority: priority) + } + ) + actorQueue.taskStreamContinuation.yield(task) + self.init(priority: priority) { + await withTaskCancellationHandler( + operation: { + await semaphore.signal() + return await delivery.getValue() + }, + onCancel: delivery.cancel + ) + } + } - /// Runs the given throwing operation asynchronously - /// as part of a new top-level task on behalf of the current actor. - /// The operation will not execute until all prior tasks have - /// completed or suspended. - /// - /// Use this function when creating asynchronous work - /// that operates on behalf of the synchronous function that calls it. - /// Like `Task.detached(priority:operation:)`, - /// this function creates a separate, top-level task. - /// Unlike `Task.detached(priority:operation:)`, - /// the task created by `Task.init(priority:operation:)` - /// inherits the priority and actor context of the caller, - /// so the operation is treated more like an asynchronous extension - /// to the synchronous operation. - /// - /// You need to keep a reference to the task - /// if you want to cancel it by calling the `Task.cancel()` method. - /// Discarding your reference to a detached task - /// doesn't implicitly cancel that task, - /// it only makes it impossible for you to explicitly cancel the task. - /// - /// - Parameters: - /// - priority: The priority of the task. - /// Pass `nil` to use the priority from `Task.currentPriority`. - /// - actorQueue: The queue on which to enqueue the task. - /// - operation: The operation to perform. - @discardableResult - public init( - priority: TaskPriority? = nil, - on actorQueue: ActorQueue, - operation: @escaping @Sendable (isolated ActorType) async throws -> Success - ) where Failure == any Error { - let delivery = Delivery() - let semaphore = Semaphore() - let task = ActorQueue.ActorTask( - executionContext: actorQueue.executionContext, - task: { executionContext in - await semaphore.wait() - delivery.execute({ @Sendable executionContext in - do { - try await delivery.sendValue(operation(executionContext)) - } catch { - await delivery.sendFailure(error) - } - }, in: executionContext, priority: priority) - } - ) - actorQueue.taskStreamContinuation.yield(task) - self.init(priority: priority) { - try await withTaskCancellationHandler( - operation: { - await semaphore.signal() - return try await delivery.getValue() - }, - onCancel: delivery.cancel - ) - } - } + /// Runs the given throwing operation asynchronously + /// as part of a new top-level task on behalf of the current actor. + /// The operation will not execute until all prior tasks have + /// completed or suspended. + /// + /// Use this function when creating asynchronous work + /// that operates on behalf of the synchronous function that calls it. + /// Like `Task.detached(priority:operation:)`, + /// this function creates a separate, top-level task. + /// Unlike `Task.detached(priority:operation:)`, + /// the task created by `Task.init(priority:operation:)` + /// inherits the priority and actor context of the caller, + /// so the operation is treated more like an asynchronous extension + /// to the synchronous operation. + /// + /// You need to keep a reference to the task + /// if you want to cancel it by calling the `Task.cancel()` method. + /// Discarding your reference to a detached task + /// doesn't implicitly cancel that task, + /// it only makes it impossible for you to explicitly cancel the task. + /// + /// - Parameters: + /// - priority: The priority of the task. + /// Pass `nil` to use the priority from `Task.currentPriority`. + /// - actorQueue: The queue on which to enqueue the task. + /// - operation: The operation to perform. + @discardableResult + public init( + priority: TaskPriority? = nil, + on actorQueue: ActorQueue, + operation: @escaping @Sendable (isolated ActorType) async throws -> Success + ) where Failure == any Error { + let delivery = Delivery() + let semaphore = Semaphore() + let task = ActorQueue.ActorTask( + executionContext: actorQueue.executionContext, + task: { executionContext in + await semaphore.wait() + delivery.execute({ @Sendable executionContext in + do { + try await delivery.sendValue(operation(executionContext)) + } catch { + await delivery.sendFailure(error) + } + }, in: executionContext, priority: priority) + } + ) + actorQueue.taskStreamContinuation.yield(task) + self.init(priority: priority) { + try await withTaskCancellationHandler( + operation: { + await semaphore.signal() + return try await delivery.getValue() + }, + onCancel: delivery.cancel + ) + } + } - /// Runs the given nonthrowing operation asynchronously - /// as part of a new top-level task on behalf of the current actor. - /// The operation will not execute until all prior tasks have - /// completed or suspended. - /// - /// Use this function when creating asynchronous work - /// that operates on behalf of the synchronous function that calls it. - /// Like `Task.detached(priority:operation:)`, - /// this function creates a separate, top-level task. - /// Unlike `Task.detached(priority:operation:)`, - /// the task created by `Task.init(priority:operation:)` - /// inherits the priority and actor context of the caller, - /// so the operation is treated more like an asynchronous extension - /// to the synchronous operation. - /// - /// You need to keep a reference to the task - /// if you want to cancel it by calling the `Task.cancel()` method. - /// Discarding your reference to a detached task - /// doesn't implicitly cancel that task, - /// it only makes it impossible for you to explicitly cancel the task. - /// - /// - Parameters: - /// - priority: The priority of the task. - /// Pass `nil` to use the priority from `Task.currentPriority`. - /// - actorQueue: The queue on which to enqueue the task. - /// - operation: The operation to perform. - @discardableResult - public init( - priority: TaskPriority? = nil, - on actorQueue: ActorQueue, - operation: @MainActor @escaping () async -> Success - ) where Failure == Never { - let delivery = Delivery() - let semaphore = Semaphore() - let task = ActorQueue.ActorTask( - executionContext: actorQueue.executionContext, - task: { executionContext in - await semaphore.wait() - delivery.execute({ @Sendable executionContext in - await delivery.sendValue(operation()) - }, in: executionContext, priority: priority) - } - ) - actorQueue.taskStreamContinuation.yield(task) - self.init(priority: priority) { - return await withTaskCancellationHandler( - operation: { - await semaphore.signal() - return await delivery.getValue() - }, - onCancel: delivery.cancel - ) - } - } + /// Runs the given nonthrowing operation asynchronously + /// as part of a new top-level task on behalf of the current actor. + /// The operation will not execute until all prior tasks have + /// completed or suspended. + /// + /// Use this function when creating asynchronous work + /// that operates on behalf of the synchronous function that calls it. + /// Like `Task.detached(priority:operation:)`, + /// this function creates a separate, top-level task. + /// Unlike `Task.detached(priority:operation:)`, + /// the task created by `Task.init(priority:operation:)` + /// inherits the priority and actor context of the caller, + /// so the operation is treated more like an asynchronous extension + /// to the synchronous operation. + /// + /// You need to keep a reference to the task + /// if you want to cancel it by calling the `Task.cancel()` method. + /// Discarding your reference to a detached task + /// doesn't implicitly cancel that task, + /// it only makes it impossible for you to explicitly cancel the task. + /// + /// - Parameters: + /// - priority: The priority of the task. + /// Pass `nil` to use the priority from `Task.currentPriority`. + /// - actorQueue: The queue on which to enqueue the task. + /// - operation: The operation to perform. + @discardableResult + public init( + priority: TaskPriority? = nil, + on actorQueue: ActorQueue, + operation: @MainActor @escaping () async -> Success + ) where Failure == Never { + let delivery = Delivery() + let semaphore = Semaphore() + let task = ActorQueue.ActorTask( + executionContext: actorQueue.executionContext, + task: { executionContext in + await semaphore.wait() + delivery.execute({ @Sendable executionContext in + await delivery.sendValue(operation()) + }, in: executionContext, priority: priority) + } + ) + actorQueue.taskStreamContinuation.yield(task) + self.init(priority: priority) { + await withTaskCancellationHandler( + operation: { + await semaphore.signal() + return await delivery.getValue() + }, + onCancel: delivery.cancel + ) + } + } - /// Runs the given throwing operation asynchronously - /// as part of a new top-level task on behalf of the current actor. - /// The operation will not execute until all prior tasks have - /// completed or suspended. - /// - /// Use this function when creating asynchronous work - /// that operates on behalf of the synchronous function that calls it. - /// Like `Task.detached(priority:operation:)`, - /// this function creates a separate, top-level task. - /// Unlike `Task.detached(priority:operation:)`, - /// the task created by `Task.init(priority:operation:)` - /// inherits the priority and actor context of the caller, - /// so the operation is treated more like an asynchronous extension - /// to the synchronous operation. - /// - /// You need to keep a reference to the task - /// if you want to cancel it by calling the `Task.cancel()` method. - /// Discarding your reference to a detached task - /// doesn't implicitly cancel that task, - /// it only makes it impossible for you to explicitly cancel the task. - /// - /// - Parameters: - /// - priority: The priority of the task. - /// Pass `nil` to use the priority from `Task.currentPriority`. - /// - actorQueue: The queue on which to enqueue the task. - /// - operation: The operation to perform. - @discardableResult - public init( - priority: TaskPriority? = nil, - on actorQueue: ActorQueue, - operation: @escaping @MainActor () async throws -> Success - ) where Failure == any Error { - let delivery = Delivery() - let semaphore = Semaphore() - let task = ActorQueue.ActorTask( - executionContext: actorQueue.executionContext, - task: { executionContext in - await semaphore.wait() - delivery.execute({ @Sendable executionContext in - do { - try await delivery.sendValue(operation()) - } catch { - await delivery.sendFailure(error) - } - }, in: executionContext, priority: priority) - } - ) - actorQueue.taskStreamContinuation.yield(task) - self.init(priority: priority) { - try await withTaskCancellationHandler( - operation: { - await semaphore.signal() - return try await delivery.getValue() - }, - onCancel: delivery.cancel - ) - } - } + /// Runs the given throwing operation asynchronously + /// as part of a new top-level task on behalf of the current actor. + /// The operation will not execute until all prior tasks have + /// completed or suspended. + /// + /// Use this function when creating asynchronous work + /// that operates on behalf of the synchronous function that calls it. + /// Like `Task.detached(priority:operation:)`, + /// this function creates a separate, top-level task. + /// Unlike `Task.detached(priority:operation:)`, + /// the task created by `Task.init(priority:operation:)` + /// inherits the priority and actor context of the caller, + /// so the operation is treated more like an asynchronous extension + /// to the synchronous operation. + /// + /// You need to keep a reference to the task + /// if you want to cancel it by calling the `Task.cancel()` method. + /// Discarding your reference to a detached task + /// doesn't implicitly cancel that task, + /// it only makes it impossible for you to explicitly cancel the task. + /// + /// - Parameters: + /// - priority: The priority of the task. + /// Pass `nil` to use the priority from `Task.currentPriority`. + /// - actorQueue: The queue on which to enqueue the task. + /// - operation: The operation to perform. + @discardableResult + public init( + priority: TaskPriority? = nil, + on actorQueue: ActorQueue, + operation: @escaping @MainActor () async throws -> Success + ) where Failure == any Error { + let delivery = Delivery() + let semaphore = Semaphore() + let task = ActorQueue.ActorTask( + executionContext: actorQueue.executionContext, + task: { executionContext in + await semaphore.wait() + delivery.execute({ @Sendable executionContext in + do { + try await delivery.sendValue(operation()) + } catch { + await delivery.sendFailure(error) + } + }, in: executionContext, priority: priority) + } + ) + actorQueue.taskStreamContinuation.yield(task) + self.init(priority: priority) { + try await withTaskCancellationHandler( + operation: { + await semaphore.signal() + return try await delivery.getValue() + }, + onCancel: delivery.cancel + ) + } + } } extension MainActor { - /// A global instance of an `ActorQueue`. - public static var queue: ActorQueue { - mainActorQueue - } + /// A global instance of an `ActorQueue`. + public static var queue: ActorQueue { + mainActorQueue + } } private let mainActorQueue = { - let queue = ActorQueue() - queue.adoptExecutionContext(of: MainActor.shared) - return queue + let queue = ActorQueue() + queue.adoptExecutionContext(of: MainActor.shared) + return queue }() diff --git a/Sources/AsyncQueue/FIFOQueue.swift b/Sources/AsyncQueue/FIFOQueue.swift index 3ebb448..c6c85d0 100644 --- a/Sources/AsyncQueue/FIFOQueue.swift +++ b/Sources/AsyncQueue/FIFOQueue.swift @@ -24,267 +24,266 @@ /// Tasks are guaranteed to begin _and end_ executing in the order in which they are enqueued. /// Asynchronous tasks sent to this queue work as they would in a `DispatchQueue` type. Attempting to `enqueueAndWait` this queue from a task executing on this queue will result in a deadlock. public final class FIFOQueue: Sendable { + // MARK: Initialization - // MARK: Initialization + /// Instantiates a FIFO queue. + /// - Parameter priority: The baseline priority of the tasks added to the asynchronous queue. + public init(priority: TaskPriority? = nil) { + let (taskStream, taskStreamContinuation) = AsyncStream.makeStream() + self.taskStreamContinuation = taskStreamContinuation - /// Instantiates a FIFO queue. - /// - Parameter priority: The baseline priority of the tasks added to the asynchronous queue. - public init(priority: TaskPriority? = nil) { - let (taskStream, taskStreamContinuation) = AsyncStream.makeStream() - self.taskStreamContinuation = taskStreamContinuation + Task.detached(priority: priority) { + for await fifoTask in taskStream { + await fifoTask.task() + } + } + } - Task.detached(priority: priority) { - for await fifoTask in taskStream { - await fifoTask.task() - } - } - } + deinit { + taskStreamContinuation.finish() + } - deinit { - taskStreamContinuation.finish() - } + // MARK: Fileprivate - // MARK: Fileprivate + fileprivate struct FIFOTask: Sendable { + init(task: @escaping @Sendable () async -> Void) { + self.task = task + } - fileprivate struct FIFOTask: Sendable { - init(task: @escaping @Sendable () async -> Void) { - self.task = task - } + let task: @Sendable () async -> Void + } - let task: @Sendable () async -> Void - } - - fileprivate let taskStreamContinuation: AsyncStream.Continuation + fileprivate let taskStreamContinuation: AsyncStream.Continuation } extension Task { - /// Runs the given nonthrowing operation asynchronously - /// as part of a new top-level task on behalf of the current actor. - /// The operation will not execute until all prior tasks – including - /// suspended tasks – have completed. - /// - /// Use this function when creating asynchronous work - /// that operates on behalf of the synchronous function that calls it. - /// Like `Task.detached(priority:operation:)`, - /// this function creates a separate, top-level task. - /// Unlike `Task.detached(priority:operation:)`, - /// the task created by `Task.init(priority:operation:)` - /// inherits the priority and actor context of the caller, - /// so the operation is treated more like an asynchronous extension - /// to the synchronous operation. - /// - /// You need to keep a reference to the task - /// if you want to cancel it by calling the `Task.cancel()` method. - /// Discarding your reference to a detached task - /// doesn't implicitly cancel that task, - /// it only makes it impossible for you to explicitly cancel the task. - /// - /// - Parameters: - /// - fifoQueue: The queue on which to enqueue the task. - /// - operation: The operation to perform. - @discardableResult - public init( - on fifoQueue: FIFOQueue, - @_inheritActorContext @_implicitSelfCapture operation: sending @escaping @isolated(any) () async -> Success - ) where Failure == Never { - let delivery = Delivery() - let semaphore = Semaphore() - let executeOnce = UnsafeClosureHolder(operation: operation) - let task = FIFOQueue.FIFOTask { - await semaphore.wait() - await delivery.execute({ @Sendable delivery in - await delivery.sendValue(executeOnce.operation()) - }, in: delivery).value - } - fifoQueue.taskStreamContinuation.yield(task) - self.init { - await withTaskCancellationHandler( - operation: { - await semaphore.signal() - return await delivery.getValue() - }, - onCancel: delivery.cancel - ) - } - } + /// Runs the given nonthrowing operation asynchronously + /// as part of a new top-level task on behalf of the current actor. + /// The operation will not execute until all prior tasks – including + /// suspended tasks – have completed. + /// + /// Use this function when creating asynchronous work + /// that operates on behalf of the synchronous function that calls it. + /// Like `Task.detached(priority:operation:)`, + /// this function creates a separate, top-level task. + /// Unlike `Task.detached(priority:operation:)`, + /// the task created by `Task.init(priority:operation:)` + /// inherits the priority and actor context of the caller, + /// so the operation is treated more like an asynchronous extension + /// to the synchronous operation. + /// + /// You need to keep a reference to the task + /// if you want to cancel it by calling the `Task.cancel()` method. + /// Discarding your reference to a detached task + /// doesn't implicitly cancel that task, + /// it only makes it impossible for you to explicitly cancel the task. + /// + /// - Parameters: + /// - fifoQueue: The queue on which to enqueue the task. + /// - operation: The operation to perform. + @discardableResult + public init( + on fifoQueue: FIFOQueue, + @_inheritActorContext @_implicitSelfCapture operation: sending @escaping @isolated(any) () async -> Success + ) where Failure == Never { + let delivery = Delivery() + let semaphore = Semaphore() + let executeOnce = UnsafeClosureHolder(operation: operation) + let task = FIFOQueue.FIFOTask { + await semaphore.wait() + await delivery.execute({ @Sendable delivery in + await delivery.sendValue(executeOnce.operation()) + }, in: delivery).value + } + fifoQueue.taskStreamContinuation.yield(task) + self.init { + await withTaskCancellationHandler( + operation: { + await semaphore.signal() + return await delivery.getValue() + }, + onCancel: delivery.cancel + ) + } + } - /// Runs the given throwing operation asynchronously - /// as part of a new top-level task on behalf of the current actor. - /// The operation will not execute until all prior tasks – including - /// suspended tasks – have completed. - /// - /// Use this function when creating asynchronous work - /// that operates on behalf of the synchronous function that calls it. - /// Like `Task.detached(priority:operation:)`, - /// this function creates a separate, top-level task. - /// Unlike `Task.detached(priority:operation:)`, - /// the task created by `Task.init(priority:operation:)` - /// inherits the priority and actor context of the caller, - /// so the operation is treated more like an asynchronous extension - /// to the synchronous operation. - /// - /// You need to keep a reference to the task - /// if you want to cancel it by calling the `Task.cancel()` method. - /// Discarding your reference to a detached task - /// doesn't implicitly cancel that task, - /// it only makes it impossible for you to explicitly cancel the task. - /// - /// - Parameters: - /// - fifoQueue: The queue on which to enqueue the task. - /// - operation: The operation to perform. - @discardableResult - public init( - on fifoQueue: FIFOQueue, - @_inheritActorContext @_implicitSelfCapture operation: sending @escaping @isolated(any) () async throws -> Success - ) where Failure == any Error { - let delivery = Delivery() - let semaphore = Semaphore() - let executeOnce = UnsafeThrowingClosureHolder(operation: operation) - let task = FIFOQueue.FIFOTask { - await semaphore.wait() - await delivery.execute({ @Sendable delivery in - do { - try await delivery.sendValue(executeOnce.operation()) - } catch { - delivery.sendFailure(error) - } - }, in: delivery).value - } - fifoQueue.taskStreamContinuation.yield(task) - self.init { - try await withTaskCancellationHandler( - operation: { - await semaphore.signal() - return try await delivery.getValue() - }, - onCancel: delivery.cancel - ) - } - } + /// Runs the given throwing operation asynchronously + /// as part of a new top-level task on behalf of the current actor. + /// The operation will not execute until all prior tasks – including + /// suspended tasks – have completed. + /// + /// Use this function when creating asynchronous work + /// that operates on behalf of the synchronous function that calls it. + /// Like `Task.detached(priority:operation:)`, + /// this function creates a separate, top-level task. + /// Unlike `Task.detached(priority:operation:)`, + /// the task created by `Task.init(priority:operation:)` + /// inherits the priority and actor context of the caller, + /// so the operation is treated more like an asynchronous extension + /// to the synchronous operation. + /// + /// You need to keep a reference to the task + /// if you want to cancel it by calling the `Task.cancel()` method. + /// Discarding your reference to a detached task + /// doesn't implicitly cancel that task, + /// it only makes it impossible for you to explicitly cancel the task. + /// + /// - Parameters: + /// - fifoQueue: The queue on which to enqueue the task. + /// - operation: The operation to perform. + @discardableResult + public init( + on fifoQueue: FIFOQueue, + @_inheritActorContext @_implicitSelfCapture operation: sending @escaping @isolated(any) () async throws -> Success + ) where Failure == any Error { + let delivery = Delivery() + let semaphore = Semaphore() + let executeOnce = UnsafeThrowingClosureHolder(operation: operation) + let task = FIFOQueue.FIFOTask { + await semaphore.wait() + await delivery.execute({ @Sendable delivery in + do { + try await delivery.sendValue(executeOnce.operation()) + } catch { + delivery.sendFailure(error) + } + }, in: delivery).value + } + fifoQueue.taskStreamContinuation.yield(task) + self.init { + try await withTaskCancellationHandler( + operation: { + await semaphore.signal() + return try await delivery.getValue() + }, + onCancel: delivery.cancel + ) + } + } - /// Runs the given nonthrowing operation asynchronously - /// as part of a new top-level task on behalf of the current actor. - /// The operation will not execute until all prior tasks – including - /// suspended tasks – have completed. - /// - /// Use this function when creating asynchronous work - /// that operates on behalf of the synchronous function that calls it. - /// Like `Task.detached(priority:operation:)`, - /// this function creates a separate, top-level task. - /// Unlike `Task.detached(priority:operation:)`, - /// the task created by `Task.init(priority:operation:)` - /// inherits the priority and actor context of the caller, - /// so the operation is treated more like an asynchronous extension - /// to the synchronous operation. - /// - /// You need to keep a reference to the task - /// if you want to cancel it by calling the `Task.cancel()` method. - /// Discarding your reference to a detached task - /// doesn't implicitly cancel that task, - /// it only makes it impossible for you to explicitly cancel the task. - /// - /// - Parameters: - /// - fifoQueue: The queue on which to enqueue the task. - /// - isolatedActor: The actor to which the operation is isolated. - /// - operation: The operation to perform. - @discardableResult - public init( - priority: TaskPriority? = nil, - on fifoQueue: FIFOQueue, - isolatedTo isolatedActor: ActorType, - operation: @Sendable @escaping (isolated ActorType) async -> Success - ) where Failure == Never { - let delivery = Delivery() - let semaphore = Semaphore() - let task = FIFOQueue.FIFOTask { - await semaphore.wait() - await delivery.execute({ @Sendable isolatedActor in - await delivery.sendValue(operation(isolatedActor)) - }, in: isolatedActor, priority: priority).value - } - fifoQueue.taskStreamContinuation.yield(task) - self.init { - await withTaskCancellationHandler( - operation: { - await semaphore.signal() - return await delivery.getValue() - }, - onCancel: delivery.cancel - ) - } - } + /// Runs the given nonthrowing operation asynchronously + /// as part of a new top-level task on behalf of the current actor. + /// The operation will not execute until all prior tasks – including + /// suspended tasks – have completed. + /// + /// Use this function when creating asynchronous work + /// that operates on behalf of the synchronous function that calls it. + /// Like `Task.detached(priority:operation:)`, + /// this function creates a separate, top-level task. + /// Unlike `Task.detached(priority:operation:)`, + /// the task created by `Task.init(priority:operation:)` + /// inherits the priority and actor context of the caller, + /// so the operation is treated more like an asynchronous extension + /// to the synchronous operation. + /// + /// You need to keep a reference to the task + /// if you want to cancel it by calling the `Task.cancel()` method. + /// Discarding your reference to a detached task + /// doesn't implicitly cancel that task, + /// it only makes it impossible for you to explicitly cancel the task. + /// + /// - Parameters: + /// - fifoQueue: The queue on which to enqueue the task. + /// - isolatedActor: The actor to which the operation is isolated. + /// - operation: The operation to perform. + @discardableResult + public init( + priority: TaskPriority? = nil, + on fifoQueue: FIFOQueue, + isolatedTo isolatedActor: ActorType, + operation: @Sendable @escaping (isolated ActorType) async -> Success + ) where Failure == Never { + let delivery = Delivery() + let semaphore = Semaphore() + let task = FIFOQueue.FIFOTask { + await semaphore.wait() + await delivery.execute({ @Sendable isolatedActor in + await delivery.sendValue(operation(isolatedActor)) + }, in: isolatedActor, priority: priority).value + } + fifoQueue.taskStreamContinuation.yield(task) + self.init { + await withTaskCancellationHandler( + operation: { + await semaphore.signal() + return await delivery.getValue() + }, + onCancel: delivery.cancel + ) + } + } - /// Runs the given throwing operation asynchronously - /// as part of a new top-level task on behalf of the current actor. - /// The operation will not execute until all prior tasks – including - /// suspended tasks – have completed. - /// - /// Use this function when creating asynchronous work - /// that operates on behalf of the synchronous function that calls it. - /// Like `Task.detached(priority:operation:)`, - /// this function creates a separate, top-level task. - /// Unlike `Task.detached(priority:operation:)`, - /// the task created by `Task.init(priority:operation:)` - /// inherits the priority and actor context of the caller, - /// so the operation is treated more like an asynchronous extension - /// to the synchronous operation. - /// - /// You need to keep a reference to the task - /// if you want to cancel it by calling the `Task.cancel()` method. - /// Discarding your reference to a detached task - /// doesn't implicitly cancel that task, - /// it only makes it impossible for you to explicitly cancel the task. - /// - /// - Parameters: - /// - priority: The priority of the queue. - /// Pass `nil` to use the priority from `Task.currentPriority`. - /// - fifoQueue: The queue on which to enqueue the task. - /// - isolatedActor: The actor to which the operation is isolated. - /// - operation: The operation to perform. - @discardableResult - public init( - priority: TaskPriority? = nil, - on fifoQueue: FIFOQueue, - isolatedTo isolatedActor: ActorType, - operation: @Sendable @escaping (isolated ActorType) async throws -> Success - ) where Failure == any Error { - let delivery = Delivery() - let semaphore = Semaphore() - let task = FIFOQueue.FIFOTask { - await semaphore.wait() - await delivery.execute({ @Sendable isolatedActor in - do { - try await delivery.sendValue(operation(isolatedActor)) - } catch { - await delivery.sendFailure(error) - } - }, in: isolatedActor, priority: priority).value - } - fifoQueue.taskStreamContinuation.yield(task) - self.init(priority: priority) { - try await withTaskCancellationHandler( - operation: { - await semaphore.signal() - return try await delivery.getValue() - }, - onCancel: delivery.cancel - ) - } - } + /// Runs the given throwing operation asynchronously + /// as part of a new top-level task on behalf of the current actor. + /// The operation will not execute until all prior tasks – including + /// suspended tasks – have completed. + /// + /// Use this function when creating asynchronous work + /// that operates on behalf of the synchronous function that calls it. + /// Like `Task.detached(priority:operation:)`, + /// this function creates a separate, top-level task. + /// Unlike `Task.detached(priority:operation:)`, + /// the task created by `Task.init(priority:operation:)` + /// inherits the priority and actor context of the caller, + /// so the operation is treated more like an asynchronous extension + /// to the synchronous operation. + /// + /// You need to keep a reference to the task + /// if you want to cancel it by calling the `Task.cancel()` method. + /// Discarding your reference to a detached task + /// doesn't implicitly cancel that task, + /// it only makes it impossible for you to explicitly cancel the task. + /// + /// - Parameters: + /// - priority: The priority of the queue. + /// Pass `nil` to use the priority from `Task.currentPriority`. + /// - fifoQueue: The queue on which to enqueue the task. + /// - isolatedActor: The actor to which the operation is isolated. + /// - operation: The operation to perform. + @discardableResult + public init( + priority: TaskPriority? = nil, + on fifoQueue: FIFOQueue, + isolatedTo isolatedActor: ActorType, + operation: @Sendable @escaping (isolated ActorType) async throws -> Success + ) where Failure == any Error { + let delivery = Delivery() + let semaphore = Semaphore() + let task = FIFOQueue.FIFOTask { + await semaphore.wait() + await delivery.execute({ @Sendable isolatedActor in + do { + try await delivery.sendValue(operation(isolatedActor)) + } catch { + await delivery.sendFailure(error) + } + }, in: isolatedActor, priority: priority).value + } + fifoQueue.taskStreamContinuation.yield(task) + self.init(priority: priority) { + try await withTaskCancellationHandler( + operation: { + await semaphore.signal() + return try await delivery.getValue() + }, + onCancel: delivery.cancel + ) + } + } } private struct UnsafeClosureHolder: @unchecked Sendable { - init(operation: sending @escaping @isolated(any) () async -> Success) { - self.operation = operation - } + init(operation: sending @escaping @isolated(any) () async -> Success) { + self.operation = operation + } - let operation: @isolated(any) () async -> Success + let operation: @isolated(any) () async -> Success } private struct UnsafeThrowingClosureHolder: @unchecked Sendable { - init(operation: sending @escaping @isolated(any) () async throws -> Success) { - self.operation = operation - } + init(operation: sending @escaping @isolated(any) () async throws -> Success) { + self.operation = operation + } - let operation: @isolated(any) () async throws -> Success + let operation: @isolated(any) () async throws -> Success } diff --git a/Sources/AsyncQueue/Utilities/Delivery.swift b/Sources/AsyncQueue/Utilities/Delivery.swift index 013d336..3ce6d4a 100644 --- a/Sources/AsyncQueue/Utilities/Delivery.swift +++ b/Sources/AsyncQueue/Utilities/Delivery.swift @@ -23,121 +23,121 @@ import Dispatch actor Delivery { - func sendValue(_ value: Success) { - self.value = value - } - - func sendFailure(_ failure: Failure) { - self.failure = failure - } - - nonisolated - func cancel() { - taskContainer.withLock { - $0.isCancelled = true - $0.task?.cancel() - } - } - - @discardableResult - func execute( - _ operation: sending @escaping (isolated ActorType) async -> Void, - in context: isolated ActorType, - priority: TaskPriority? = nil - ) -> Task { - // In Swift 6, a `Task` enqueued from an actor begins executing immediately on that actor. - // Since we're running on our actor's context already, we can just dispatch a Task to get first-enqueued-first-start task execution. - let task = Task(priority: priority) { - await operation(context) - } - taskContainer.withLock { - if $0.isCancelled { - task.cancel() - } - $0.task = task - } - return task - } - - private var value: Success? { - didSet { - if let value { - valueContinuations.forEach { $0.resume(returning: value) } - valueContinuations.removeAll() - } - } - } - - private var failure: Failure? { - didSet { - if let failure { - valueContinuations.forEach { $0.resume(throwing: failure) } - valueContinuations.removeAll() - } - } - } - - private var valueContinuations: [UnsafeContinuation] = [] - private let taskContainer = Locked(value: TaskContainer()) - - struct TaskContainer { - var task: Task? - var isCancelled = false - } + func sendValue(_ value: Success) { + self.value = value + } + + func sendFailure(_ failure: Failure) { + self.failure = failure + } + + nonisolated + func cancel() { + taskContainer.withLock { + $0.isCancelled = true + $0.task?.cancel() + } + } + + @discardableResult + func execute( + _ operation: sending @escaping (isolated ActorType) async -> Void, + in context: isolated ActorType, + priority: TaskPriority? = nil + ) -> Task { + // In Swift 6, a `Task` enqueued from an actor begins executing immediately on that actor. + // Since we're running on our actor's context already, we can just dispatch a Task to get first-enqueued-first-start task execution. + let task = Task(priority: priority) { + await operation(context) + } + taskContainer.withLock { + if $0.isCancelled { + task.cancel() + } + $0.task = task + } + return task + } + + private var value: Success? { + didSet { + if let value { + valueContinuations.forEach { $0.resume(returning: value) } + valueContinuations.removeAll() + } + } + } + + private var failure: Failure? { + didSet { + if let failure { + valueContinuations.forEach { $0.resume(throwing: failure) } + valueContinuations.removeAll() + } + } + } + + private var valueContinuations: [UnsafeContinuation] = [] + private let taskContainer = Locked(value: TaskContainer()) + + struct TaskContainer { + var task: Task? + var isCancelled = false + } } extension Delivery where Failure == Never { - func getValue() async -> Success { - if let value { - value - } else { - await withUnsafeContinuation { continuation in - valueContinuations.append(continuation) - } - } - } + func getValue() async -> Success { + if let value { + value + } else { + await withUnsafeContinuation { continuation in + valueContinuations.append(continuation) + } + } + } } extension Delivery where Failure == any Error { - func getValue() async throws -> Success { - if let value { - value - } else if let failure { - throw failure - } else { - try await withUnsafeThrowingContinuation { continuation in - valueContinuations.append(continuation) - } - } - } + func getValue() async throws -> Success { + if let value { + value + } else if let failure { + throw failure + } else { + try await withUnsafeThrowingContinuation { continuation in + valueContinuations.append(continuation) + } + } + } } // MARK: - Locked // We'd use `OSAllocatedUnfairLock` or `Mutex` but the minimum supported version is much higher than what we support. private struct Locked: @unchecked Sendable { - init(value: State) { - container = .init(value: value) - } - - func withLock(_ body: @Sendable (inout State) throws -> R) rethrows -> R where R: Sendable { - try lockQueue.sync { - var value = container.unsafeValue - let returnValue = try body(&value) - container.unsafeValue = value - return returnValue - } - } - - private let container: UnsafeContainer - - private final class UnsafeContainer: @unchecked Sendable { - init(value: State) { - unsafeValue = value - } - - var unsafeValue: State - } + init(value: State) { + container = .init(value: value) + } + + func withLock(_ body: @Sendable (inout State) throws -> R) rethrows -> R where R: Sendable { + try lockQueue.sync { + var value = container.unsafeValue + let returnValue = try body(&value) + container.unsafeValue = value + return returnValue + } + } + + private let container: UnsafeContainer + + private final class UnsafeContainer: @unchecked Sendable { + init(value: State) { + unsafeValue = value + } + + var unsafeValue: State + } } private let lockQueue = DispatchQueue(label: "LockedValue.lockQueue", target: DispatchQueue.global()) diff --git a/Sources/AsyncQueue/Utilities/Semaphore.swift b/Sources/AsyncQueue/Utilities/Semaphore.swift index 7c16b77..7b3034b 100644 --- a/Sources/AsyncQueue/Utilities/Semaphore.swift +++ b/Sources/AsyncQueue/Utilities/Semaphore.swift @@ -22,49 +22,49 @@ /// A thread-safe semaphore implementation. actor Semaphore { - // MARK: Initialization + // MARK: Initialization - init() {} + init() {} - // MARK: Public + // MARK: Public - /// Decrement the counting semaphore. If the resulting value is less than zero, this function waits for a signal to occur before returning. - /// - Returns: Whether the call triggered a suspension - @discardableResult - func wait() async -> Bool { - count -= 1 - guard count < 0 else { - // We don't need to wait because count is greater than or equal to zero. - return false - } + /// Decrement the counting semaphore. If the resulting value is less than zero, this function waits for a signal to occur before returning. + /// - Returns: Whether the call triggered a suspension + @discardableResult + func wait() async -> Bool { + count -= 1 + guard count < 0 else { + // We don't need to wait because count is greater than or equal to zero. + return false + } - await withUnsafeContinuation { continuation in - continuations.append(continuation) - } - return true - } + await withUnsafeContinuation { continuation in + continuations.append(continuation) + } + return true + } - /// Increment the counting semaphore. If the previous value was less than zero, this function resumes a waiting thread before returning. - func signal() { - count += 1 - guard !isWaiting else { - // Continue waiting. - return - } + /// Increment the counting semaphore. If the previous value was less than zero, this function resumes a waiting thread before returning. + func signal() { + count += 1 + guard !isWaiting else { + // Continue waiting. + return + } - for continuation in continuations { - continuation.resume() - } + for continuation in continuations { + continuation.resume() + } - continuations.removeAll() - } + continuations.removeAll() + } - var isWaiting: Bool { - count < 0 - } + var isWaiting: Bool { + count < 0 + } - // MARK: Private + // MARK: Private - private var continuations = [UnsafeContinuation]() - private var count = 0 + private var continuations = [UnsafeContinuation]() + private var count = 0 } diff --git a/Tests/AsyncQueueTests/ActorQueueTests.swift b/Tests/AsyncQueueTests/ActorQueueTests.swift index 9a5fe03..646bd88 100644 --- a/Tests/AsyncQueueTests/ActorQueueTests.swift +++ b/Tests/AsyncQueueTests/ActorQueueTests.swift @@ -26,379 +26,380 @@ import Testing @testable import AsyncQueue struct ActorQueueTests { - - // MARK: Initialization - - init() { - systemUnderTest = ActorQueue() - counter = Counter() - systemUnderTest.adoptExecutionContext(of: counter) - } - - // MARK: Behavior Tests - - @Test - func adoptExecutionContext_doesNotRetainActor() { - let systemUnderTest = ActorQueue() - var counter: Counter? = Counter() - weak var weakCounter = counter - systemUnderTest.adoptExecutionContext(of: counter!) - counter = nil - #expect(weakCounter == nil) - } - - @Test - func task_retainsAdoptedActorUntilEnqueuedTasksComplete() async { - let systemUnderTest = ActorQueue() - var counter: Counter? = Counter() - weak var weakCounter = counter - systemUnderTest.adoptExecutionContext(of: counter!) - - let semaphore = Semaphore() - Task(on: systemUnderTest) { counter in - await semaphore.wait() - } - - counter = nil - #expect(weakCounter != nil) - await semaphore.signal() - } - - @Test - func throwingTask_retainsAdoptedActorUntilEnqueuedTasksComplete() async { - let systemUnderTest = ActorQueue() - var counter: Counter? = Counter() - weak var weakCounter = counter - systemUnderTest.adoptExecutionContext(of: counter!) - - let semaphore = Semaphore() - Task(on: systemUnderTest) { counter in - await semaphore.wait() - try doWork() - } - - counter = nil - #expect(weakCounter != nil) - await semaphore.signal() - } - - @Test - func task_taskParameterIsAdoptedActor() async { - let semaphore = Semaphore() - Task(on: systemUnderTest) { [storedCounter = counter] counter in - #expect(counter === storedCounter) - await semaphore.signal() - } - - await semaphore.wait() - } - - @Test - func throwingTask_taskParameterIsAdoptedActor() async { - let semaphore = Semaphore() - Task(on: systemUnderTest) { [storedCounter = counter] counter in - #expect(counter === storedCounter) - await semaphore.signal() - try doWork() - } - - await semaphore.wait() - } - - @Test - func task_sendsEventsInOrder() async throws { - var lastTask: Task? - (1...1_000).forEach { iteration in - lastTask = Task(on: systemUnderTest) { counter in - counter.incrementAndExpectCount(equals: iteration) - } - } - // Drain the queue - try await #require(lastTask).value - } - - @Test - func throwingTask_sendsEventsInOrder() async throws { - var lastTask: Task? - (1...1_000).forEach { iteration in - lastTask = Task(on: systemUnderTest) { counter in - counter.incrementAndExpectCount(equals: iteration) - try doWork() - } - } - // Drain the queue - try await #require(lastTask).value - } - - @Test - func mainTask_sendsEventsInOrder() async throws { - var lastTask: Task? - (1...1_000).forEach { iteration in - lastTask = Task(on: MainActor.queue) { - await counter.incrementAndExpectCount(equals: iteration) - } - } - // Drain the queue - try await #require(lastTask).value - } - - @Test - func mainThrowingTask_sendsEventsInOrder() async throws { - var lastTask: Task? - (1...1_000).forEach { iteration in - lastTask = Task(on: MainActor.queue) { - await counter.incrementAndExpectCount(equals: iteration) - try doWork() - } - } - // Drain the queue - try await #require(lastTask).value - } - - @Test - func task_startsExecutionOfNextTaskAfterSuspension() async { - let systemUnderTest = ActorQueue() - let semaphore = AsyncQueue.Semaphore() - systemUnderTest.adoptExecutionContext(of: semaphore) - - let firstTask = Task(on: systemUnderTest) { semaphore in - await semaphore.wait() - } - let secondTask = Task(on: systemUnderTest) { semaphore in - // Signal the semaphore from the actor queue. - // If the actor queue were FIFO, this test would hang since this code would never execute: - // we'd still be waiting for the prior `wait()` tasks to finish. - semaphore.signal() - } - (_, _) = await (firstTask.value, secondTask.value) - } - - @Test - func throwingTask_startsExecutionOfNextTaskAfterSuspension() async throws { - let systemUnderTest = ActorQueue() - let semaphore = AsyncQueue.Semaphore() - systemUnderTest.adoptExecutionContext(of: semaphore) - - let firstTask = Task(on: systemUnderTest) { semaphore in - await semaphore.wait() - try doWork() - } - let secondTask = Task(on: systemUnderTest) { semaphore in - // Signal the semaphore from the actor queue. - // If the actor queue were FIFO, this test would hang since this code would never execute: - // we'd still be waiting for the prior `wait()` tasks to finish. - semaphore.signal() - try doWork() - } - (_, _) = try await (firstTask.value, secondTask.value) - } - - @Test - func task_allowsReentrancy() async { - await Task(on: systemUnderTest) { [systemUnderTest] counter in - await Task(on: systemUnderTest) { counter in - counter.incrementAndExpectCount(equals: 1) - }.value - counter.incrementAndExpectCount(equals: 2) - }.value - } - - @Test - func throwingTask_allowsReentrancy() async throws { - try await Task(on: systemUnderTest) { [systemUnderTest] counter in - try doWork() - try await Task(on: systemUnderTest) { counter in - try doWork() - counter.incrementAndExpectCount(equals: 1) - }.value - try doWork() - counter.incrementAndExpectCount(equals: 2) - }.value - } - - @Test - func mainTask_allowsReentrancy() async { - await Task(on: MainActor.queue) { [counter] in - await Task(on: MainActor.queue) { - await counter.incrementAndExpectCount(equals: 1) - }.value - await counter.incrementAndExpectCount(equals: 2) - }.value - } - - @Test - func mainThrowingTask_allowsReentrancy() async throws { - try await Task(on: MainActor.queue) { [counter] in - try doWork() - try await Task(on: MainActor.queue) { - try doWork() - await counter.incrementAndExpectCount(equals: 1) - }.value - try doWork() - await counter.incrementAndExpectCount(equals: 2) - }.value - } - - @Test - func task_executesEnqueuedTasksAfterQueueIsDeallocated() async throws { - var systemUnderTest: ActorQueue? = ActorQueue() - systemUnderTest?.adoptExecutionContext(of: counter) - - let expectation = Expectation() - let semaphore = AsyncQueue.Semaphore() - Task(on: try #require(systemUnderTest)) { counter in - // Make the task wait. - await semaphore.wait() - counter.incrementAndExpectCount(equals: 1) - expectation.fulfill() - } - weak var queue = systemUnderTest - // Nil out our reference to the queue to show that the enqueued tasks will still complete - systemUnderTest = nil - #expect(queue == nil) - // Signal the semaphore to unlock the enqueued tasks. - await semaphore.signal() - await expectation.fulfillment(withinSeconds: 30) - } - - @Test - func throwingTask_executesEnqueuedTasksAfterQueueIsDeallocated() async throws { - var systemUnderTest: ActorQueue? = ActorQueue() - systemUnderTest?.adoptExecutionContext(of: counter) - - let expectation = Expectation() - let semaphore = AsyncQueue.Semaphore() - Task(on: try #require(systemUnderTest)) { counter in - try doWork() - - // Make the task wait. - await semaphore.wait() - counter.incrementAndExpectCount(equals: 1) - expectation.fulfill() - } - weak var queue = systemUnderTest - // Nil out our reference to the queue to show that the enqueued tasks will still complete - systemUnderTest = nil - #expect(queue == nil) - // Signal the semaphore to unlock the enqueued tasks. - await semaphore.signal() - await expectation.fulfillment(withinSeconds: 30) - } - - @Test - func task_canBeCancelled() async { - let semaphore = Semaphore() - let task = Task(on: systemUnderTest) { _ in - await semaphore.wait() - #expect(Task.isCancelled) - } - task.cancel() - await semaphore.signal() - await task.value - } - - @Test - func throwingTask_canBeCancelled() async { - let semaphore = Semaphore() - let task = Task(on: systemUnderTest) { _ in - await semaphore.wait() - #expect(Task.isCancelled) - throw CancellationError() // This is wonky, but we can't `try` if we want 100% code coverage. - } - task.cancel() - await semaphore.signal() - try? await task.value - } - - @Test - func mainTask_canBeCancelled() async { - let semaphore = Semaphore() - let task = Task(on: MainActor.queue) { _ in - await semaphore.wait() - #expect(Task.isCancelled) - } - task.cancel() - await semaphore.signal() - await task.value - } - - @Test - func mainThrowingTask_canBeCancelled() async { - let semaphore = Semaphore() - let task = Task(on: MainActor.queue) { _ in - await semaphore.wait() - #expect(Task.isCancelled) - throw CancellationError() // This is wonky, but we can't `try` if we want 100% code coverage. - } - task.cancel() - await semaphore.signal() - try? await task.value - } - - @Test - func task_canReturn() async { - let expectedValue = UUID() - let returnedValue = await Task(on: systemUnderTest) { _ in expectedValue }.value - #expect(expectedValue == returnedValue) - } - - @Test - func throwingTask_canReturn() async throws { - let expectedValue = UUID() - @Sendable func generateValue() throws -> UUID { - expectedValue - } - #expect(try await Task(on: systemUnderTest) { _ in try generateValue() }.value == expectedValue) - } - - @Test - func throwingTask_canThrow() async { - struct TestError: Error, Equatable { - private let identifier = UUID() - } - let expectedError = TestError() - do { - try await Task(on: systemUnderTest) { _ in throw expectedError }.value - } catch { - #expect(error as? TestError == expectedError) - } - } - - @Test - func mainThrowingTask_canThrow() async { - struct TestError: Error, Equatable { - private let identifier = UUID() - } - let expectedError = TestError() - do { - try await Task(on: MainActor.queue) { throw expectedError }.value - } catch { - #expect(error as? TestError == expectedError) - } - } - - @Test - func mainTask_executesOnMainActor() async { - @MainActor - func executesOnMainActor() {} - await Task(on: MainActor.queue) { - executesOnMainActor() - }.value - } - - @Test - func mainThrowingTask_executesOnMainActor() async throws { - @MainActor - func executesOnMainActor() throws {} - try await Task(on: MainActor.queue) { - try executesOnMainActor() - }.value - } - - // MARK: Private - - private let systemUnderTest: ActorQueue - private let counter: Counter - - @Sendable private func doWork() throws -> Void {} + // MARK: Initialization + + init() { + systemUnderTest = ActorQueue() + counter = Counter() + systemUnderTest.adoptExecutionContext(of: counter) + } + + // MARK: Behavior Tests + + @Test + func adoptExecutionContext_doesNotRetainActor() { + let systemUnderTest = ActorQueue() + var counter: Counter? = Counter() + weak var weakCounter = counter + systemUnderTest.adoptExecutionContext(of: counter!) + counter = nil + #expect(weakCounter == nil) + } + + @Test + func task_retainsAdoptedActorUntilEnqueuedTasksComplete() async { + let systemUnderTest = ActorQueue() + var counter: Counter? = Counter() + weak var weakCounter = counter + systemUnderTest.adoptExecutionContext(of: counter!) + + let semaphore = Semaphore() + Task(on: systemUnderTest) { _ in + await semaphore.wait() + } + + counter = nil + #expect(weakCounter != nil) + await semaphore.signal() + } + + @Test + func throwingTask_retainsAdoptedActorUntilEnqueuedTasksComplete() async { + let systemUnderTest = ActorQueue() + var counter: Counter? = Counter() + weak var weakCounter = counter + systemUnderTest.adoptExecutionContext(of: counter!) + + let semaphore = Semaphore() + Task(on: systemUnderTest) { _ in + await semaphore.wait() + try doWork() + } + + counter = nil + #expect(weakCounter != nil) + await semaphore.signal() + } + + @Test + func task_taskParameterIsAdoptedActor() async { + let semaphore = Semaphore() + Task(on: systemUnderTest) { [storedCounter = counter] counter in + #expect(counter === storedCounter) + await semaphore.signal() + } + + await semaphore.wait() + } + + @Test + func throwingTask_taskParameterIsAdoptedActor() async { + let semaphore = Semaphore() + Task(on: systemUnderTest) { [storedCounter = counter] counter in + #expect(counter === storedCounter) + await semaphore.signal() + try doWork() + } + + await semaphore.wait() + } + + @Test + func task_sendsEventsInOrder() async throws { + var lastTask: Task? + for iteration in 1...1_000 { + lastTask = Task(on: systemUnderTest) { counter in + counter.incrementAndExpectCount(equals: iteration) + } + } + // Drain the queue + try await #require(lastTask).value + } + + @Test + func throwingTask_sendsEventsInOrder() async throws { + var lastTask: Task? + for iteration in 1...1_000 { + lastTask = Task(on: systemUnderTest) { counter in + counter.incrementAndExpectCount(equals: iteration) + try doWork() + } + } + // Drain the queue + try await #require(lastTask).value + } + + @Test + func mainTask_sendsEventsInOrder() async throws { + var lastTask: Task? + for iteration in 1...1_000 { + lastTask = Task(on: MainActor.queue) { + await counter.incrementAndExpectCount(equals: iteration) + } + } + // Drain the queue + try await #require(lastTask).value + } + + @Test + func mainThrowingTask_sendsEventsInOrder() async throws { + var lastTask: Task? + for iteration in 1...1_000 { + lastTask = Task(on: MainActor.queue) { + await counter.incrementAndExpectCount(equals: iteration) + try doWork() + } + } + // Drain the queue + try await #require(lastTask).value + } + + @Test + func task_startsExecutionOfNextTaskAfterSuspension() async { + let systemUnderTest = ActorQueue() + let semaphore = AsyncQueue.Semaphore() + systemUnderTest.adoptExecutionContext(of: semaphore) + + let firstTask = Task(on: systemUnderTest) { semaphore in + await semaphore.wait() + } + let secondTask = Task(on: systemUnderTest) { semaphore in + // Signal the semaphore from the actor queue. + // If the actor queue were FIFO, this test would hang since this code would never execute: + // we'd still be waiting for the prior `wait()` tasks to finish. + semaphore.signal() + } + _ = await (firstTask.value, secondTask.value) + } + + @Test + func throwingTask_startsExecutionOfNextTaskAfterSuspension() async throws { + let systemUnderTest = ActorQueue() + let semaphore = AsyncQueue.Semaphore() + systemUnderTest.adoptExecutionContext(of: semaphore) + + let firstTask = Task(on: systemUnderTest) { semaphore in + await semaphore.wait() + try doWork() + } + let secondTask = Task(on: systemUnderTest) { semaphore in + // Signal the semaphore from the actor queue. + // If the actor queue were FIFO, this test would hang since this code would never execute: + // we'd still be waiting for the prior `wait()` tasks to finish. + semaphore.signal() + try doWork() + } + _ = try await (firstTask.value, secondTask.value) + } + + @Test + func task_allowsReentrancy() async { + await Task(on: systemUnderTest) { [systemUnderTest] counter in + await Task(on: systemUnderTest) { counter in + counter.incrementAndExpectCount(equals: 1) + }.value + counter.incrementAndExpectCount(equals: 2) + }.value + } + + @Test + func throwingTask_allowsReentrancy() async throws { + try await Task(on: systemUnderTest) { [systemUnderTest] counter in + try doWork() + try await Task(on: systemUnderTest) { counter in + try doWork() + counter.incrementAndExpectCount(equals: 1) + }.value + try doWork() + counter.incrementAndExpectCount(equals: 2) + }.value + } + + @Test + func mainTask_allowsReentrancy() async { + await Task(on: MainActor.queue) { [counter] in + await Task(on: MainActor.queue) { + await counter.incrementAndExpectCount(equals: 1) + }.value + await counter.incrementAndExpectCount(equals: 2) + }.value + } + + @Test + func mainThrowingTask_allowsReentrancy() async throws { + try await Task(on: MainActor.queue) { [counter] in + try doWork() + try await Task(on: MainActor.queue) { + try doWork() + await counter.incrementAndExpectCount(equals: 1) + }.value + try doWork() + await counter.incrementAndExpectCount(equals: 2) + }.value + } + + @Test + func task_executesEnqueuedTasksAfterQueueIsDeallocated() async throws { + var systemUnderTest: ActorQueue? = ActorQueue() + systemUnderTest?.adoptExecutionContext(of: counter) + + let expectation = Expectation() + let semaphore = AsyncQueue.Semaphore() + try Task(on: #require(systemUnderTest)) { counter in + // Make the task wait. + await semaphore.wait() + counter.incrementAndExpectCount(equals: 1) + expectation.fulfill() + } + weak var queue = systemUnderTest + // Nil out our reference to the queue to show that the enqueued tasks will still complete + systemUnderTest = nil + #expect(queue == nil) + // Signal the semaphore to unlock the enqueued tasks. + await semaphore.signal() + await expectation.fulfillment(withinSeconds: 30) + } + + @Test + func throwingTask_executesEnqueuedTasksAfterQueueIsDeallocated() async throws { + var systemUnderTest: ActorQueue? = ActorQueue() + systemUnderTest?.adoptExecutionContext(of: counter) + + let expectation = Expectation() + let semaphore = AsyncQueue.Semaphore() + try Task(on: #require(systemUnderTest)) { counter in + try doWork() + + // Make the task wait. + await semaphore.wait() + counter.incrementAndExpectCount(equals: 1) + expectation.fulfill() + } + weak var queue = systemUnderTest + // Nil out our reference to the queue to show that the enqueued tasks will still complete + systemUnderTest = nil + #expect(queue == nil) + // Signal the semaphore to unlock the enqueued tasks. + await semaphore.signal() + await expectation.fulfillment(withinSeconds: 30) + } + + @Test + func task_canBeCancelled() async { + let semaphore = Semaphore() + let task = Task(on: systemUnderTest) { _ in + await semaphore.wait() + #expect(Task.isCancelled) + } + task.cancel() + await semaphore.signal() + await task.value + } + + @Test + func throwingTask_canBeCancelled() async { + let semaphore = Semaphore() + let task = Task(on: systemUnderTest) { _ in + await semaphore.wait() + #expect(Task.isCancelled) + throw CancellationError() // This is wonky, but we can't `try` if we want 100% code coverage. + } + task.cancel() + await semaphore.signal() + try? await task.value + } + + @Test + func mainTask_canBeCancelled() async { + let semaphore = Semaphore() + let task = Task(on: MainActor.queue) { _ in + await semaphore.wait() + #expect(Task.isCancelled) + } + task.cancel() + await semaphore.signal() + await task.value + } + + @Test + func mainThrowingTask_canBeCancelled() async { + let semaphore = Semaphore() + let task = Task(on: MainActor.queue) { _ in + await semaphore.wait() + #expect(Task.isCancelled) + throw CancellationError() // This is wonky, but we can't `try` if we want 100% code coverage. + } + task.cancel() + await semaphore.signal() + try? await task.value + } + + @Test + func task_canReturn() async { + let expectedValue = UUID() + let returnedValue = await Task(on: systemUnderTest) { _ in expectedValue }.value + #expect(expectedValue == returnedValue) + } + + @Test + func throwingTask_canReturn() async throws { + let expectedValue = UUID() + @Sendable + func generateValue() throws -> UUID { + expectedValue + } + #expect(try await Task(on: systemUnderTest) { _ in try generateValue() }.value == expectedValue) + } + + @Test + func throwingTask_canThrow() async { + struct TestError: Error, Equatable { + private let identifier = UUID() + } + let expectedError = TestError() + do { + try await Task(on: systemUnderTest) { _ in throw expectedError }.value + } catch { + #expect(error as? TestError == expectedError) + } + } + + @Test + func mainThrowingTask_canThrow() async { + struct TestError: Error, Equatable { + private let identifier = UUID() + } + let expectedError = TestError() + do { + try await Task(on: MainActor.queue) { throw expectedError }.value + } catch { + #expect(error as? TestError == expectedError) + } + } + + @Test + func mainTask_executesOnMainActor() async { + @MainActor + func executesOnMainActor() {} + await Task(on: MainActor.queue) { + executesOnMainActor() + }.value + } + + @Test + func mainThrowingTask_executesOnMainActor() async throws { + @MainActor + func executesOnMainActor() throws {} + try await Task(on: MainActor.queue) { + try executesOnMainActor() + }.value + } + + // MARK: Private + + private let systemUnderTest: ActorQueue + private let counter: Counter + + @Sendable + private func doWork() throws {} } diff --git a/Tests/AsyncQueueTests/DeliveryTests.swift b/Tests/AsyncQueueTests/DeliveryTests.swift index 23c1850..a124e6b 100644 --- a/Tests/AsyncQueueTests/DeliveryTests.swift +++ b/Tests/AsyncQueueTests/DeliveryTests.swift @@ -26,29 +26,28 @@ import Testing @testable import AsyncQueue struct DeliveryTests { + // MARK: Behavior Tests - // MARK: Behavior Tests + @Test + func getValue_whenValueExists() async { + let systemUnderTest = Delivery() + await systemUnderTest.sendValue(true) + await #expect(systemUnderTest.getValue() == true) + } - @Test - func getValue_whenValueExists() async { - let systemUnderTest = Delivery() - await systemUnderTest.sendValue(true) - await #expect(systemUnderTest.getValue() == true) - } + @Test + func getValueThrowing_whenValueExists() async throws { + let systemUnderTest = Delivery() + await systemUnderTest.sendValue(true) + try await #expect(systemUnderTest.getValue() == true) + } - @Test - func getValueThrowing_whenValueExists() async throws { - let systemUnderTest = Delivery() - await systemUnderTest.sendValue(true) - try await #expect(systemUnderTest.getValue() == true) - } - - @Test - func getValueThrowing_whenFailureExists() async throws { - let systemUnderTest = Delivery() - await systemUnderTest.sendFailure(CancellationError()) - await #expect(throws: CancellationError.self, performing: { - try await systemUnderTest.getValue() - }) - } + @Test + func getValueThrowing_whenFailureExists() async throws { + let systemUnderTest = Delivery() + await systemUnderTest.sendFailure(CancellationError()) + await #expect(throws: CancellationError.self, performing: { + try await systemUnderTest.getValue() + }) + } } diff --git a/Tests/AsyncQueueTests/ExpectationTests.swift b/Tests/AsyncQueueTests/ExpectationTests.swift index 6fad874..376f58f 100644 --- a/Tests/AsyncQueueTests/ExpectationTests.swift +++ b/Tests/AsyncQueueTests/ExpectationTests.swift @@ -23,85 +23,84 @@ import Testing struct ExpectationTests { + // MARK: Behavior Tests - // MARK: Behavior Tests + @Test + func fulfill_triggersExpectation() async { + await confirmation { confirmation in + let systemUnderTest = Expectation( + expectedCount: 1, + expect: { expectation, _, _ in + #expect(expectation) + confirmation() + } + ) + await systemUnderTest.fulfill().value + } + } - @Test - func fulfill_triggersExpectation() async { - await confirmation { confirmation in - let systemUnderTest = Expectation( - expectedCount: 1, - expect: { expectation, _, _ in - #expect(expectation) - confirmation() - } - ) - await systemUnderTest.fulfill().value - } - } + @Test + func fulfill_triggersExpectationOnceWhenCalledTwiceAndExpectedCountIsTwo() async { + await confirmation { confirmation in + let systemUnderTest = Expectation( + expectedCount: 2, + expect: { expectation, _, _ in + #expect(expectation) + confirmation() + } + ) + await systemUnderTest.fulfill().value + await systemUnderTest.fulfill().value + } + } - @Test - func fulfill_triggersExpectationOnceWhenCalledTwiceAndExpectedCountIsTwo() async { - await confirmation { confirmation in - let systemUnderTest = Expectation( - expectedCount: 2, - expect: { expectation, _, _ in - #expect(expectation) - confirmation() - } - ) - await systemUnderTest.fulfill().value - await systemUnderTest.fulfill().value - } - } + @Test + func fulfill_triggersExpectationWhenExpectedCountIsZero() async { + await confirmation { confirmation in + let systemUnderTest = Expectation( + expectedCount: 0, + expect: { expectation, _, _ in + #expect(!expectation) + confirmation() + } + ) + await systemUnderTest.fulfill().value + } + } - @Test - func fulfill_triggersExpectationWhenExpectedCountIsZero() async { - await confirmation { confirmation in - let systemUnderTest = Expectation( - expectedCount: 0, - expect: { expectation, _, _ in - #expect(!expectation) - confirmation() - } - ) - await systemUnderTest.fulfill().value - } - } + @Test + func fulfillment_doesNotWaitIfAlreadyFulfilled() async { + let systemUnderTest = Expectation(expectedCount: 0) + await systemUnderTest.fulfillment(withinSeconds: 30) + } - @Test - func fulfillment_doesNotWaitIfAlreadyFulfilled() async { - let systemUnderTest = Expectation(expectedCount: 0) - await systemUnderTest.fulfillment(withinSeconds: 30) - } + @MainActor // Global actor ensures Task ordering. + @Test + func fulfillment_waitsForFulfillment() async { + let systemUnderTest = Expectation(expectedCount: 1) + var hasFulfilled = false + let wait = Task { + await systemUnderTest.fulfillment(withinSeconds: 30) + #expect(hasFulfilled) + } + Task { + systemUnderTest.fulfill() + hasFulfilled = true + } + await wait.value + } - @MainActor // Global actor ensures Task ordering. - @Test - func fulfillment_waitsForFulfillment() async { - let systemUnderTest = Expectation(expectedCount: 1) - var hasFulfilled = false - let wait = Task { - await systemUnderTest.fulfillment(withinSeconds: 30) - #expect(hasFulfilled) - } - Task { - systemUnderTest.fulfill() - hasFulfilled = true - } - await wait.value - } - - @Test - func fulfillment_triggersFalseExpectationWhenItTimesOut() async { - await confirmation { confirmation in - let systemUnderTest = Expectation( - expectedCount: 1, - expect: { expectation, _, _ in - #expect(!expectation) - confirmation() - } - ) - await systemUnderTest.fulfillment(withinSeconds: 0) - } - } + @Test + func fulfillment_triggersFalseExpectationWhenItTimesOut() async { + await confirmation { confirmation in + let systemUnderTest = Expectation( + expectedCount: 1, + expect: { expectation, _, _ in + #expect(!expectation) + confirmation() + } + ) + await systemUnderTest.fulfillment(withinSeconds: 0) + } + } } diff --git a/Tests/AsyncQueueTests/FIFOQueueTests.swift b/Tests/AsyncQueueTests/FIFOQueueTests.swift index e514da7..24fa9f1 100644 --- a/Tests/AsyncQueueTests/FIFOQueueTests.swift +++ b/Tests/AsyncQueueTests/FIFOQueueTests.swift @@ -26,461 +26,463 @@ import Testing @testable import AsyncQueue struct FIFOQueueTests { - - // MARK: Behavior Tests - - @Test - func task_sendsEventsInOrder() async { - let counter = Counter() - for iteration in 1...1_000 { - Task(on: systemUnderTest) { - await counter.incrementAndExpectCount(equals: iteration) - } - } - await Task(on: systemUnderTest) { /* Drain the queue */ }.value - } - - @MainActor - @Test - func task_sendsEventsInOrderInLocalContext() async { - var count = 0 - for iteration in 1...1_000 { - Task(on: systemUnderTest) { - count += 1 - #expect(iteration == count) - } - } - await Task(on: systemUnderTest) { /* Drain the queue */ }.value - } - - @Test - func taskIsolatedTo_sendsEventsInOrder() async { - let counter = Counter() - for iteration in 1...1_000 { - Task(on: systemUnderTest, isolatedTo: counter) { counter in - counter.incrementAndExpectCount(equals: iteration) - } - } - await Task(on: systemUnderTest) { /* Drain the queue */ }.value - } - - @Test - func throwingTask_sendsEventsInOrder() async { - let counter = Counter() - for iteration in 1...1_000 { - Task(on: systemUnderTest) { - await counter.incrementAndExpectCount(equals: iteration) - try doWork() - } - } - await Task(on: systemUnderTest) { /* Drain the queue */ }.value - } - - @Test - func throwingTaskIsolatedTo_sendsEventsInOrder() async { - let counter = Counter() - for iteration in 1...1_000 { - Task(on: systemUnderTest, isolatedTo: counter) { counter in - counter.incrementAndExpectCount(equals: iteration) - try doWork() - } - } - await Task(on: systemUnderTest) { /* Drain the queue */ }.value - } - - @Test - func task_interleavedWithTaskIsolatedTo_andThrowing_sendsEventsInOrder() async { - let counter = Counter() - for iteration in 1...1_000 { - let mod = iteration % 4 - if mod == 0 { - Task(on: systemUnderTest) { - await counter.incrementAndExpectCount(equals: iteration) - } - } else if mod == 1 { - Task(on: systemUnderTest, isolatedTo: counter) { counter in - counter.incrementAndExpectCount(equals: iteration) - } - } else if mod == 2 { - Task(on: systemUnderTest) { - await counter.incrementAndExpectCount(equals: iteration) - try doWork() - } - } else { - Task(on: systemUnderTest, isolatedTo: counter) { counter in - counter.incrementAndExpectCount(equals: iteration) - try doWork() - } - } - } - await Task(on: systemUnderTest) { /* Drain the queue */ }.value - } - - @Test - func task_executesAsyncBlocksAtomically() async { - let semaphore = Semaphore() - for _ in 1...1_000 { - Task(on: systemUnderTest) { - let isWaiting = await semaphore.isWaiting - // This test will fail occasionally if we aren't executing atomically. - // You can prove this to yourself by deleting `on: systemUnderTest` above. - #expect(!isWaiting) - // Signal the semaphore before or after we wait – let the scheduler decide. - Task { - await semaphore.signal() - } - // Wait for the concurrent task to complete. - await semaphore.wait() - } - } - await Task(on: systemUnderTest) { /* Drain the queue */ }.value - } - - @Test - func taskIsolatedTo_executesAsyncBlocksAtomically() async { - let semaphore = Semaphore() - for _ in 1...1_000 { - Task(on: systemUnderTest, isolatedTo: semaphore) { semaphore in - let isWaiting = semaphore.isWaiting - // This test will fail occasionally if we aren't executing atomically. - // You can prove this to yourself by deleting `on: systemUnderTest` above. - #expect(!isWaiting) - // Signal the semaphore before or after we wait – let the scheduler decide. - Task { - semaphore.signal() - } - // Wait for the concurrent task to complete. - await semaphore.wait() - } - } - await Task(on: systemUnderTest) { /* Drain the queue */ }.value - } - - @Test - func throwingTask_executesAsyncBlocksAtomically() async { - let semaphore = Semaphore() - for _ in 1...1_000 { - Task(on: systemUnderTest) { - let isWaiting = await semaphore.isWaiting - // This test will fail occasionally if we aren't executing atomically. - // You can prove this to yourself by deleting `on: systemUnderTest` above. - #expect(!isWaiting) - // Signal the semaphore before or after we wait – let the scheduler decide. - Task { - await semaphore.signal() - } - // Wait for the concurrent task to complete. - await semaphore.wait() - try doWork() - } - } - await Task(on: systemUnderTest) { /* Drain the queue */ }.value - } - - @Test - func throwingTaskIsolatedTo_executesAsyncBlocksAtomically() async { - let semaphore = Semaphore() - for _ in 1...1_000 { - Task(on: systemUnderTest, isolatedTo: semaphore) { semaphore in - let isWaiting = semaphore.isWaiting - // This test will fail occasionally if we aren't executing atomically. - // You can prove this to yourself by deleting `on: systemUnderTest` above. - #expect(!isWaiting) - // Signal the semaphore before or after we wait – let the scheduler decide. - Task { - semaphore.signal() - } - // Wait for the concurrent task to complete. - await semaphore.wait() - try doWork() - } - } - await Task(on: systemUnderTest) { /* Drain the queue */ }.value - } - - @Test - func task_isNotReentrant() async { - let counter = Counter() - Task(on: systemUnderTest) { [systemUnderTest] in - Task(on: systemUnderTest) { - await counter.incrementAndExpectCount(equals: 2) - } - await counter.incrementAndExpectCount(equals: 1) - Task(on: systemUnderTest) { - await counter.incrementAndExpectCount(equals: 3) - } - } - await Task(on: systemUnderTest) { /* Drain the queue */ }.value - } - - @Test - func taskIsolatedTo_isNotReentrant() async { - let counter = Counter() - Task(on: systemUnderTest, isolatedTo: counter) { [systemUnderTest] counter in - Task(on: systemUnderTest, isolatedTo: counter) { counter in - counter.incrementAndExpectCount(equals: 2) - } - counter.incrementAndExpectCount(equals: 1) - Task(on: systemUnderTest, isolatedTo: counter) { counter in - counter.incrementAndExpectCount(equals: 3) - } - } - await Task(on: systemUnderTest) { /* Drain the queue */ }.value - } - - @Test - func throwingTask_isNotReentrant() async { - let counter = Counter() - Task(on: systemUnderTest) { [systemUnderTest] in - Task(on: systemUnderTest) { - await counter.incrementAndExpectCount(equals: 2) - try doWork() - } - await counter.incrementAndExpectCount(equals: 1) - Task(on: systemUnderTest) { - await counter.incrementAndExpectCount(equals: 3) - try doWork() - } - } - await Task(on: systemUnderTest) { /* Drain the queue */ }.value - } - - @Test - func throwingTaskIsolatedTo_isNotReentrant() async throws { - let counter = Counter() - Task(on: systemUnderTest, isolatedTo: counter) { [systemUnderTest] counter in - Task(on: systemUnderTest, isolatedTo: counter) { counter in - counter.incrementAndExpectCount(equals: 2) - try doWork() - } - counter.incrementAndExpectCount(equals: 1) - Task(on: systemUnderTest, isolatedTo: counter) { counter in - counter.incrementAndExpectCount(equals: 3) - try doWork() - } - } - await Task(on: systemUnderTest) { /* Drain the queue */ }.value - } - - @Test - func task_executesAfterQueueIsDeallocated() async throws { - var systemUnderTest: FIFOQueue? = FIFOQueue() - let counter = Counter() - let expectation = Expectation() - let semaphore = Semaphore() - Task(on: try #require(systemUnderTest)) { - // Make the queue wait. - await semaphore.wait() - await counter.incrementAndExpectCount(equals: 1) - } - Task(on: try #require(systemUnderTest)) { - // This async task should not execute until the semaphore is released. - await counter.incrementAndExpectCount(equals: 2) - expectation.fulfill() - } - weak var queue = systemUnderTest - // Nil out our reference to the queue to show that the enqueued tasks will still complete - systemUnderTest = nil - #expect(queue == nil) - // Signal the semaphore to unlock the remaining enqueued tasks. - await semaphore.signal() - - await expectation.fulfillment(withinSeconds: 30) - } - - @Test - func taskIsolatedTo_executesAfterQueueIsDeallocated() async throws { - var systemUnderTest: FIFOQueue? = FIFOQueue() - let counter = Counter() - let expectation = Expectation() - let semaphore = Semaphore() - Task(on: try #require(systemUnderTest), isolatedTo: counter) { counter in - // Make the queue wait. - await semaphore.wait() - counter.incrementAndExpectCount(equals: 1) - } - Task(on: try #require(systemUnderTest), isolatedTo: counter) { counter in - // This async task should not execute until the semaphore is released. - counter.incrementAndExpectCount(equals: 2) - expectation.fulfill() - } - weak var queue = systemUnderTest - // Nil out our reference to the queue to show that the enqueued tasks will still complete - systemUnderTest = nil - #expect(queue == nil) - // Signal the semaphore to unlock the remaining enqueued tasks. - await semaphore.signal() - - await expectation.fulfillment(withinSeconds: 30) - } - - @Test - func throwingTask_executesAfterQueueIsDeallocated() async throws { - var systemUnderTest: FIFOQueue? = FIFOQueue() - let counter = Counter() - let expectation = Expectation() - let semaphore = Semaphore() - Task(on: try #require(systemUnderTest)) { - // Make the queue wait. - await semaphore.wait() - await counter.incrementAndExpectCount(equals: 1) - try doWork() - } - Task(on: try #require(systemUnderTest)) { - // This async task should not execute until the semaphore is released. - await counter.incrementAndExpectCount(equals: 2) - expectation.fulfill() - try doWork() - } - weak var queue = systemUnderTest - // Nil out our reference to the queue to show that the enqueued tasks will still complete - systemUnderTest = nil - #expect(queue == nil) - // Signal the semaphore to unlock the remaining enqueued tasks. - await semaphore.signal() - - await expectation.fulfillment(withinSeconds: 30) - } - - @Test - func throwingTaskIsolatedTo_executesAfterQueueIsDeallocated() async throws { - var systemUnderTest: FIFOQueue? = FIFOQueue() - let counter = Counter() - let expectation = Expectation() - let semaphore = Semaphore() - Task(on: try #require(systemUnderTest), isolatedTo: counter) { counter in - // Make the queue wait. - await semaphore.wait() - counter.incrementAndExpectCount(equals: 1) - try doWork() - } - Task(on: try #require(systemUnderTest), isolatedTo: counter) { counter in - // This async task should not execute until the semaphore is released. - counter.incrementAndExpectCount(equals: 2) - expectation.fulfill() - try doWork() - } - weak var queue = systemUnderTest - // Nil out our reference to the queue to show that the enqueued tasks will still complete - systemUnderTest = nil - #expect(queue == nil) - // Signal the semaphore to unlock the remaining enqueued tasks. - await semaphore.signal() - - await expectation.fulfillment(withinSeconds: 30) - } - - @Test - func task_canBeCancelled() async { - let semaphore = Semaphore() - let task = Task(on: systemUnderTest) { - await semaphore.wait() - #expect(Task.isCancelled) - } - task.cancel() - await semaphore.signal() - await task.value - } - - @Test - func taskIsolatedTo_canBeCancelled() async { - let semaphore = Semaphore() - let task = Task(on: systemUnderTest, isolatedTo: Semaphore()) { _ in - await semaphore.wait() - #expect(Task.isCancelled) - } - task.cancel() - await semaphore.signal() - await task.value - } - - @Test - func throwingTask_canBeCancelled() async { - let semaphore = Semaphore() - let task = Task(on: systemUnderTest) { - await semaphore.wait() - #expect(Task.isCancelled) - throw CancellationError() // This is wonky, but we can't `try` if we want 100% code coverage. - } - task.cancel() - await semaphore.signal() - try? await task.value - } - - @Test - func throwingTaskIsolatedTo_canBeCancelled() async { - let semaphore = Semaphore() - let task = Task(on: systemUnderTest, isolatedTo: Semaphore()) { _ in - await semaphore.wait() - #expect(Task.isCancelled) - throw CancellationError() // This is wonky, but we can't `try` if we want 100% code coverage. - } - task.cancel() - await semaphore.signal() - try? await task.value - } - - @Test - func task_canReturn() async { - let expectedValue = UUID() - let returnedValue = await Task(on: systemUnderTest) { expectedValue }.value - #expect(expectedValue == returnedValue) - } - - @Test - func taskIsolatedTo_canReturn() async { - let expectedValue = UUID() - let returnedValue = await Task(on: systemUnderTest, isolatedTo: Semaphore()) { _ in expectedValue }.value - #expect(expectedValue == returnedValue) - } - - @Test - func throwingTask_canReturn() async throws { - let expectedValue = UUID() - @Sendable func generateValue() throws -> UUID { - expectedValue - } - #expect(try await Task(on: systemUnderTest) { try generateValue() }.value == expectedValue) - } - - @Test - func throwingTaskIsolatedTo_canReturn() async throws { - let expectedValue = UUID() - @Sendable func generateValue() throws -> UUID { - expectedValue - } - #expect(try await Task(on: systemUnderTest, isolatedTo: Semaphore()) { _ in try generateValue() }.value == expectedValue) - } - - @Test - func throwingTask_canThrow() async { - struct TestError: Error, Equatable { - private let identifier = UUID() - } - let expectedError = TestError() - do { - try await Task(on: systemUnderTest) { throw expectedError }.value - } catch { - #expect(error as? TestError == expectedError) - } - } - - @Test - func throwingTaskIsolatedTo_canThrow() async { - struct TestError: Error, Equatable { - private let identifier = UUID() - } - let expectedError = TestError() - do { - try await Task(on: systemUnderTest, isolatedTo: Semaphore()) { _ in throw expectedError }.value - } catch { - #expect(error as? TestError == expectedError) - } - } - - // MARK: Private - - private let systemUnderTest = FIFOQueue() - - @Sendable private func doWork() throws -> Void {} + // MARK: Behavior Tests + + @Test + func task_sendsEventsInOrder() async { + let counter = Counter() + for iteration in 1...1_000 { + Task(on: systemUnderTest) { + await counter.incrementAndExpectCount(equals: iteration) + } + } + await Task(on: systemUnderTest) { /* Drain the queue */ }.value + } + + @MainActor + @Test + func task_sendsEventsInOrderInLocalContext() async { + var count = 0 + for iteration in 1...1_000 { + Task(on: systemUnderTest) { + count += 1 + #expect(iteration == count) + } + } + await Task(on: systemUnderTest) { /* Drain the queue */ }.value + } + + @Test + func taskIsolatedTo_sendsEventsInOrder() async { + let counter = Counter() + for iteration in 1...1_000 { + Task(on: systemUnderTest, isolatedTo: counter) { counter in + counter.incrementAndExpectCount(equals: iteration) + } + } + await Task(on: systemUnderTest) { /* Drain the queue */ }.value + } + + @Test + func throwingTask_sendsEventsInOrder() async { + let counter = Counter() + for iteration in 1...1_000 { + Task(on: systemUnderTest) { + await counter.incrementAndExpectCount(equals: iteration) + try doWork() + } + } + await Task(on: systemUnderTest) { /* Drain the queue */ }.value + } + + @Test + func throwingTaskIsolatedTo_sendsEventsInOrder() async { + let counter = Counter() + for iteration in 1...1_000 { + Task(on: systemUnderTest, isolatedTo: counter) { counter in + counter.incrementAndExpectCount(equals: iteration) + try doWork() + } + } + await Task(on: systemUnderTest) { /* Drain the queue */ }.value + } + + @Test + func task_interleavedWithTaskIsolatedTo_andThrowing_sendsEventsInOrder() async { + let counter = Counter() + for iteration in 1...1_000 { + let mod = iteration % 4 + if mod == 0 { + Task(on: systemUnderTest) { + await counter.incrementAndExpectCount(equals: iteration) + } + } else if mod == 1 { + Task(on: systemUnderTest, isolatedTo: counter) { counter in + counter.incrementAndExpectCount(equals: iteration) + } + } else if mod == 2 { + Task(on: systemUnderTest) { + await counter.incrementAndExpectCount(equals: iteration) + try doWork() + } + } else { + Task(on: systemUnderTest, isolatedTo: counter) { counter in + counter.incrementAndExpectCount(equals: iteration) + try doWork() + } + } + } + await Task(on: systemUnderTest) { /* Drain the queue */ }.value + } + + @Test + func task_executesAsyncBlocksAtomically() async { + let semaphore = Semaphore() + for _ in 1...1_000 { + Task(on: systemUnderTest) { + let isWaiting = await semaphore.isWaiting + // This test will fail occasionally if we aren't executing atomically. + // You can prove this to yourself by deleting `on: systemUnderTest` above. + #expect(!isWaiting) + // Signal the semaphore before or after we wait – let the scheduler decide. + Task { + await semaphore.signal() + } + // Wait for the concurrent task to complete. + await semaphore.wait() + } + } + await Task(on: systemUnderTest) { /* Drain the queue */ }.value + } + + @Test + func taskIsolatedTo_executesAsyncBlocksAtomically() async { + let semaphore = Semaphore() + for _ in 1...1_000 { + Task(on: systemUnderTest, isolatedTo: semaphore) { semaphore in + let isWaiting = semaphore.isWaiting + // This test will fail occasionally if we aren't executing atomically. + // You can prove this to yourself by deleting `on: systemUnderTest` above. + #expect(!isWaiting) + // Signal the semaphore before or after we wait – let the scheduler decide. + Task { + semaphore.signal() + } + // Wait for the concurrent task to complete. + await semaphore.wait() + } + } + await Task(on: systemUnderTest) { /* Drain the queue */ }.value + } + + @Test + func throwingTask_executesAsyncBlocksAtomically() async { + let semaphore = Semaphore() + for _ in 1...1_000 { + Task(on: systemUnderTest) { + let isWaiting = await semaphore.isWaiting + // This test will fail occasionally if we aren't executing atomically. + // You can prove this to yourself by deleting `on: systemUnderTest` above. + #expect(!isWaiting) + // Signal the semaphore before or after we wait – let the scheduler decide. + Task { + await semaphore.signal() + } + // Wait for the concurrent task to complete. + await semaphore.wait() + try doWork() + } + } + await Task(on: systemUnderTest) { /* Drain the queue */ }.value + } + + @Test + func throwingTaskIsolatedTo_executesAsyncBlocksAtomically() async { + let semaphore = Semaphore() + for _ in 1...1_000 { + Task(on: systemUnderTest, isolatedTo: semaphore) { semaphore in + let isWaiting = semaphore.isWaiting + // This test will fail occasionally if we aren't executing atomically. + // You can prove this to yourself by deleting `on: systemUnderTest` above. + #expect(!isWaiting) + // Signal the semaphore before or after we wait – let the scheduler decide. + Task { + semaphore.signal() + } + // Wait for the concurrent task to complete. + await semaphore.wait() + try doWork() + } + } + await Task(on: systemUnderTest) { /* Drain the queue */ }.value + } + + @Test + func task_isNotReentrant() async { + let counter = Counter() + Task(on: systemUnderTest) { [systemUnderTest] in + Task(on: systemUnderTest) { + await counter.incrementAndExpectCount(equals: 2) + } + await counter.incrementAndExpectCount(equals: 1) + Task(on: systemUnderTest) { + await counter.incrementAndExpectCount(equals: 3) + } + } + await Task(on: systemUnderTest) { /* Drain the queue */ }.value + } + + @Test + func taskIsolatedTo_isNotReentrant() async { + let counter = Counter() + Task(on: systemUnderTest, isolatedTo: counter) { [systemUnderTest] counter in + Task(on: systemUnderTest, isolatedTo: counter) { counter in + counter.incrementAndExpectCount(equals: 2) + } + counter.incrementAndExpectCount(equals: 1) + Task(on: systemUnderTest, isolatedTo: counter) { counter in + counter.incrementAndExpectCount(equals: 3) + } + } + await Task(on: systemUnderTest) { /* Drain the queue */ }.value + } + + @Test + func throwingTask_isNotReentrant() async { + let counter = Counter() + Task(on: systemUnderTest) { [systemUnderTest] in + Task(on: systemUnderTest) { + await counter.incrementAndExpectCount(equals: 2) + try doWork() + } + await counter.incrementAndExpectCount(equals: 1) + Task(on: systemUnderTest) { + await counter.incrementAndExpectCount(equals: 3) + try doWork() + } + } + await Task(on: systemUnderTest) { /* Drain the queue */ }.value + } + + @Test + func throwingTaskIsolatedTo_isNotReentrant() async throws { + let counter = Counter() + Task(on: systemUnderTest, isolatedTo: counter) { [systemUnderTest] counter in + Task(on: systemUnderTest, isolatedTo: counter) { counter in + counter.incrementAndExpectCount(equals: 2) + try doWork() + } + counter.incrementAndExpectCount(equals: 1) + Task(on: systemUnderTest, isolatedTo: counter) { counter in + counter.incrementAndExpectCount(equals: 3) + try doWork() + } + } + await Task(on: systemUnderTest) { /* Drain the queue */ }.value + } + + @Test + func task_executesAfterQueueIsDeallocated() async throws { + var systemUnderTest: FIFOQueue? = FIFOQueue() + let counter = Counter() + let expectation = Expectation() + let semaphore = Semaphore() + try Task(on: #require(systemUnderTest)) { + // Make the queue wait. + await semaphore.wait() + await counter.incrementAndExpectCount(equals: 1) + } + try Task(on: #require(systemUnderTest)) { + // This async task should not execute until the semaphore is released. + await counter.incrementAndExpectCount(equals: 2) + expectation.fulfill() + } + weak var queue = systemUnderTest + // Nil out our reference to the queue to show that the enqueued tasks will still complete + systemUnderTest = nil + #expect(queue == nil) + // Signal the semaphore to unlock the remaining enqueued tasks. + await semaphore.signal() + + await expectation.fulfillment(withinSeconds: 30) + } + + @Test + func taskIsolatedTo_executesAfterQueueIsDeallocated() async throws { + var systemUnderTest: FIFOQueue? = FIFOQueue() + let counter = Counter() + let expectation = Expectation() + let semaphore = Semaphore() + try Task(on: #require(systemUnderTest), isolatedTo: counter) { counter in + // Make the queue wait. + await semaphore.wait() + counter.incrementAndExpectCount(equals: 1) + } + try Task(on: #require(systemUnderTest), isolatedTo: counter) { counter in + // This async task should not execute until the semaphore is released. + counter.incrementAndExpectCount(equals: 2) + expectation.fulfill() + } + weak var queue = systemUnderTest + // Nil out our reference to the queue to show that the enqueued tasks will still complete + systemUnderTest = nil + #expect(queue == nil) + // Signal the semaphore to unlock the remaining enqueued tasks. + await semaphore.signal() + + await expectation.fulfillment(withinSeconds: 30) + } + + @Test + func throwingTask_executesAfterQueueIsDeallocated() async throws { + var systemUnderTest: FIFOQueue? = FIFOQueue() + let counter = Counter() + let expectation = Expectation() + let semaphore = Semaphore() + try Task(on: #require(systemUnderTest)) { + // Make the queue wait. + await semaphore.wait() + await counter.incrementAndExpectCount(equals: 1) + try doWork() + } + try Task(on: #require(systemUnderTest)) { + // This async task should not execute until the semaphore is released. + await counter.incrementAndExpectCount(equals: 2) + expectation.fulfill() + try doWork() + } + weak var queue = systemUnderTest + // Nil out our reference to the queue to show that the enqueued tasks will still complete + systemUnderTest = nil + #expect(queue == nil) + // Signal the semaphore to unlock the remaining enqueued tasks. + await semaphore.signal() + + await expectation.fulfillment(withinSeconds: 30) + } + + @Test + func throwingTaskIsolatedTo_executesAfterQueueIsDeallocated() async throws { + var systemUnderTest: FIFOQueue? = FIFOQueue() + let counter = Counter() + let expectation = Expectation() + let semaphore = Semaphore() + try Task(on: #require(systemUnderTest), isolatedTo: counter) { counter in + // Make the queue wait. + await semaphore.wait() + counter.incrementAndExpectCount(equals: 1) + try doWork() + } + try Task(on: #require(systemUnderTest), isolatedTo: counter) { counter in + // This async task should not execute until the semaphore is released. + counter.incrementAndExpectCount(equals: 2) + expectation.fulfill() + try doWork() + } + weak var queue = systemUnderTest + // Nil out our reference to the queue to show that the enqueued tasks will still complete + systemUnderTest = nil + #expect(queue == nil) + // Signal the semaphore to unlock the remaining enqueued tasks. + await semaphore.signal() + + await expectation.fulfillment(withinSeconds: 30) + } + + @Test + func task_canBeCancelled() async { + let semaphore = Semaphore() + let task = Task(on: systemUnderTest) { + await semaphore.wait() + #expect(Task.isCancelled) + } + task.cancel() + await semaphore.signal() + await task.value + } + + @Test + func taskIsolatedTo_canBeCancelled() async { + let semaphore = Semaphore() + let task = Task(on: systemUnderTest, isolatedTo: Semaphore()) { _ in + await semaphore.wait() + #expect(Task.isCancelled) + } + task.cancel() + await semaphore.signal() + await task.value + } + + @Test + func throwingTask_canBeCancelled() async { + let semaphore = Semaphore() + let task = Task(on: systemUnderTest) { + await semaphore.wait() + #expect(Task.isCancelled) + throw CancellationError() // This is wonky, but we can't `try` if we want 100% code coverage. + } + task.cancel() + await semaphore.signal() + try? await task.value + } + + @Test + func throwingTaskIsolatedTo_canBeCancelled() async { + let semaphore = Semaphore() + let task = Task(on: systemUnderTest, isolatedTo: Semaphore()) { _ in + await semaphore.wait() + #expect(Task.isCancelled) + throw CancellationError() // This is wonky, but we can't `try` if we want 100% code coverage. + } + task.cancel() + await semaphore.signal() + try? await task.value + } + + @Test + func task_canReturn() async { + let expectedValue = UUID() + let returnedValue = await Task(on: systemUnderTest) { expectedValue }.value + #expect(expectedValue == returnedValue) + } + + @Test + func taskIsolatedTo_canReturn() async { + let expectedValue = UUID() + let returnedValue = await Task(on: systemUnderTest, isolatedTo: Semaphore()) { _ in expectedValue }.value + #expect(expectedValue == returnedValue) + } + + @Test + func throwingTask_canReturn() async throws { + let expectedValue = UUID() + @Sendable + func generateValue() throws -> UUID { + expectedValue + } + #expect(try await Task(on: systemUnderTest) { try generateValue() }.value == expectedValue) + } + + @Test + func throwingTaskIsolatedTo_canReturn() async throws { + let expectedValue = UUID() + @Sendable + func generateValue() throws -> UUID { + expectedValue + } + #expect(try await Task(on: systemUnderTest, isolatedTo: Semaphore()) { _ in try generateValue() }.value == expectedValue) + } + + @Test + func throwingTask_canThrow() async { + struct TestError: Error, Equatable { + private let identifier = UUID() + } + let expectedError = TestError() + do { + try await Task(on: systemUnderTest) { throw expectedError }.value + } catch { + #expect(error as? TestError == expectedError) + } + } + + @Test + func throwingTaskIsolatedTo_canThrow() async { + struct TestError: Error, Equatable { + private let identifier = UUID() + } + let expectedError = TestError() + do { + try await Task(on: systemUnderTest, isolatedTo: Semaphore()) { _ in throw expectedError }.value + } catch { + #expect(error as? TestError == expectedError) + } + } + + // MARK: Private + + private let systemUnderTest = FIFOQueue() + + @Sendable + private func doWork() throws {} } diff --git a/Tests/AsyncQueueTests/SemaphoreTests.swift b/Tests/AsyncQueueTests/SemaphoreTests.swift index dce513d..c5940db 100644 --- a/Tests/AsyncQueueTests/SemaphoreTests.swift +++ b/Tests/AsyncQueueTests/SemaphoreTests.swift @@ -25,118 +25,117 @@ import Testing @testable import AsyncQueue final class SemaphoreTests { - - // MARK: Initialization - - deinit { - Task { [systemUnderTest] in - let isWaiting = await systemUnderTest.isWaiting - #expect(!isWaiting) - } - } - - // MARK: Behavior Tests - - @Test - func wait_suspendsUntilEqualNumberOfSignalCalls() async { - /* - This test is tricky to pull off! - Our requirements: - 1. We need to call `wait()` before `signal()` - 2. We need to ensure that the `wait()` call suspends _before_ we call `signal()` - 3. We can't `await` the `wait()` call on the test's queue before calling `signal()` since that would deadlock the test. - 4. We must utilize a single actor's isolated context to avoid accidental interleaving when suspending to communicate across actor contexts. - - In order to ensure that we are executing the `wait()` calls before we call `signal()` _without awaiting a `wait()` call_, - we utilize the AsyncQueue.Semaphore's ordered execution context to enqueue ordered `Task`s similar to how an ActorQueue works. - */ - - let iterationCount = 1_000 - /// A counter that will only be accessed from within the `systemUnderTest`'s context - let unsafeCounter = UnsafeCounter() - - for _ in 1...iterationCount { - await systemUnderTest.enqueueAndCount(using: unsafeCounter) { systemUnderTest in - let didSuspend = await systemUnderTest.wait() - #expect(didSuspend) - - return { systemUnderTest in - // Signal that the suspended wait call above has resumed. - // This signal allows us to `wait()` for all of these enqueued `wait()` tasks to have completed later in this test. - systemUnderTest.signal() - } - } - } - - // Loop one fewer than iterationCount. - for _ in 1.. (@Sendable (isolated AsyncQueue.Semaphore) -> Void)?) async { - // Await the start of the soon-to-be-enqueued `Task` with a continuation. - await withCheckedContinuation { continuation in - // Re-enter the semaphore's ordered context but don't wait for the result. - Task { - // Now that we're back in the semaphore's ordered context, allow the calling code to resume. - continuation.resume() - let executeAfterIncrement = await task(self) - counter.countedTasksCompleted += 1 - executeAfterIncrement?(self) - } - } - } - - func execute(_ task: @Sendable (isolated AsyncQueue.Semaphore) async throws -> Void) async rethrows { - try await task(self) - } +extension AsyncQueue.Semaphore { + /// Enqueues an asynchronous task and increments a counter after the task completes. + /// This method suspends the caller until the asynchronous task has begun, ensuring ordered execution of enqueued tasks. + /// - Parameter task: A unit of work that returns work to execute after the task completes and the count is incremented. + fileprivate func enqueueAndCount(using counter: UnsafeCounter, _ task: @escaping @Sendable (isolated AsyncQueue.Semaphore) async -> (@Sendable (isolated AsyncQueue.Semaphore) -> Void)?) async { + // Await the start of the soon-to-be-enqueued `Task` with a continuation. + await withCheckedContinuation { continuation in + // Re-enter the semaphore's ordered context but don't wait for the result. + Task { + // Now that we're back in the semaphore's ordered context, allow the calling code to resume. + continuation.resume() + let executeAfterIncrement = await task(self) + counter.countedTasksCompleted += 1 + executeAfterIncrement?(self) + } + } + } + + fileprivate func execute(_ task: @Sendable (isolated AsyncQueue.Semaphore) async throws -> Void) async rethrows { + try await task(self) + } } // MARK: - UnsafeCounter private final class UnsafeCounter: @unchecked Sendable { - var countedTasksCompleted = 0 + var countedTasksCompleted = 0 } diff --git a/Tests/AsyncQueueTests/Utilities/Counter.swift b/Tests/AsyncQueueTests/Utilities/Counter.swift index 4ac95b9..8ac2e78 100644 --- a/Tests/AsyncQueueTests/Utilities/Counter.swift +++ b/Tests/AsyncQueueTests/Utilities/Counter.swift @@ -23,25 +23,25 @@ import Testing actor Counter { - func incrementAndExpectCount( - equals expectedCount: Int, - filePath: String = #filePath, - fileID: String = #fileID, - line: Int = #line, - column: Int = #column - ) { - increment() - #expect(expectedCount == count, sourceLocation: .init( - fileID: filePath, - filePath: filePath, - line: line, - column: column - )) - } + func incrementAndExpectCount( + equals expectedCount: Int, + filePath: String = #filePath, + fileID: String = #fileID, + line: Int = #line, + column: Int = #column + ) { + increment() + #expect(expectedCount == count, sourceLocation: .init( + fileID: fileID, + filePath: filePath, + line: line, + column: column + )) + } - func increment() { - count += 1 - } + func increment() { + count += 1 + } - var count = 0 + var count = 0 } diff --git a/Tests/AsyncQueueTests/Utilities/Expectation.swift b/Tests/AsyncQueueTests/Utilities/Expectation.swift index c3c9f34..a6073de 100644 --- a/Tests/AsyncQueueTests/Utilities/Expectation.swift +++ b/Tests/AsyncQueueTests/Utilities/Expectation.swift @@ -23,98 +23,98 @@ import Testing public actor Expectation { + // MARK: Initialization - // MARK: Initialization + public init( + expectedCount: UInt = 1 + ) { + self.init( + expectedCount: expectedCount, + expect: { #expect($0, $1, sourceLocation: $2) } + ) + } - public init( - expectedCount: UInt = 1 - ) { - self.init( - expectedCount: expectedCount, - expect: { #expect($0, $1, sourceLocation: $2) } - ) - } + init( + expectedCount: UInt, + expect: @escaping (Bool, Comment?, SourceLocation) -> Void + ) { + self.expectedCount = expectedCount + self.expect = expect + } - init( - expectedCount: UInt, - expect: @escaping (Bool, Comment?, SourceLocation) -> Void - ) { - self.expectedCount = expectedCount - self.expect = expect - } + // MARK: Public - // MARK: Public + public func fulfillment( + withinSeconds seconds: UInt64, + filePath: String = #filePath, + fileID: String = #fileID, + line: Int = #line, + column: Int = #column + ) async { + guard !isComplete else { return } + let wait = Task { + try await Task.sleep(nanoseconds: seconds * 1_000_000_000) + expect(isComplete, "Expectation not fulfilled within \(seconds) seconds", .init( + fileID: fileID, + filePath: filePath, + line: line, + column: column + )) + } + waits.append(wait) + try? await wait.value + } - public func fulfillment( - withinSeconds seconds: UInt64, - filePath: String = #filePath, - fileID: String = #fileID, - line: Int = #line, - column: Int = #column - ) async { - guard !isComplete else { return } - let wait = Task { - try await Task.sleep(nanoseconds: seconds * 1_000_000_000) - expect(isComplete, "Expectation not fulfilled within \(seconds) seconds", .init( - fileID: filePath, - filePath: filePath, - line: line, - column: column - )) - } - waits.append(wait) - try? await wait.value - } + @discardableResult + nonisolated + public func fulfill( + filePath: String = #filePath, + fileID: String = #fileID, + line: Int = #line, + column: Int = #column + ) -> Task { + Task { + await self._fulfill( + filePath: filePath, + fileID: fileID, + line: line, + column: column + ) + } + } - @discardableResult - nonisolated - public func fulfill( - filePath: String = #filePath, - fileID: String = #fileID, - line: Int = #line, - column: Int = #column - ) -> Task { - Task { - await self._fulfill( - filePath: filePath, - fileID: fileID, - line: line, - column: column - ) - } - } + // MARK: Private - // MARK: Private + private var waits = [Task]() + private var fulfillCount: UInt = 0 + private var isComplete: Bool { + expectedCount <= fulfillCount + } - private var waits = [Task]() - private var fulfillCount: UInt = 0 - private var isComplete: Bool { - expectedCount <= fulfillCount - } - private let expectedCount: UInt - private let expect: (Bool, Comment?, SourceLocation) -> Void + private let expectedCount: UInt + private let expect: (Bool, Comment?, SourceLocation) -> Void - private func _fulfill( - filePath: String, - fileID: String, - line: Int, - column: Int - ) { - fulfillCount += 1 - guard isComplete else { return } - expect( - expectedCount == fulfillCount, - "Expected \(expectedCount) calls to `fulfill()`. Received \(fulfillCount).", - .init( - fileID: filePath, - filePath: filePath, - line: line, - column: column - ) - ) - for wait in waits { - wait.cancel() - } - waits = [] - } + private func _fulfill( + filePath: String, + fileID: String, + line: Int, + column: Int + ) { + fulfillCount += 1 + guard isComplete else { return } + expect( + expectedCount == fulfillCount, + "Expected \(expectedCount) calls to `fulfill()`. Received \(fulfillCount).", + .init( + fileID: fileID, + filePath: filePath, + line: line, + column: column + ) + ) + for wait in waits { + wait.cancel() + } + waits = [] + } } diff --git a/lint.sh b/lint.sh new file mode 100755 index 0000000..374fee8 --- /dev/null +++ b/lint.sh @@ -0,0 +1,9 @@ +#!/bin/zsh + +set -e + +pushd $(git rev-parse --show-toplevel) + +swift run --package-path CLI -c release swiftformat . + +popd