From 1b18131df41bf830f3f6008af41d7e7533ae48e2 Mon Sep 17 00:00:00 2001 From: FranzBusch Date: Sun, 11 Sep 2022 17:21:17 +0100 Subject: [PATCH 1/6] Remove `AsyncIterator: Sendable` requirement from debounce # Motivation The current implementation of `AsyncDebounceSequence` requires the base `AsyncIterator` to be `Sendable`. This is causing two problems: 1. It only allows users to use `debounce` if their `AsyncSequence.AsyncIterator` is `Sendable` 2. In `debounce` we are creating a lot of new `Task`s and reating `Task`s is not cheap. My main goal of this PR was to remove the `Sendable` constraint from `debounce`. # Modification This PR overhauls the implementation of `debounce` and aligns it with the implementation of the open `merge` PR https://github.com/apple/swift-async-algorithms/pull/185 . The most important changes are this: - I removed the `Sendable` requirement from the base sequences `AsyncIterator`. - Instead of creating new Tasks for the sleep and for the upstream consumption. I am now creating one Task and manipulate it by signalling continuations - I am not cancelling the sleep. Instead I am recalculating the time left to sleep when a sleep finishes. # Result In the end, this PR swaps the implementation of `AsyncDebounceSequence` and drops the `Sendable` constraint and passes all tests. Furthermore, on my local performance testing I saw up 150% speed increase in throughput. --- .../AsyncDebounceSequence.swift | 121 --- .../Debounce/AsyncDebounceSequence.swift | 115 +++ .../Debounce/DebounceStateMachine.swift | 747 ++++++++++++++++++ .../Debounce/DebounceStorage.swift | 294 +++++++ .../Performance/TestThroughput.swift | 6 + Tests/AsyncAlgorithmsTests/TestDebounce.swift | 10 + 6 files changed, 1172 insertions(+), 121 deletions(-) delete mode 100644 Sources/AsyncAlgorithms/AsyncDebounceSequence.swift create mode 100644 Sources/AsyncAlgorithms/Debounce/AsyncDebounceSequence.swift create mode 100644 Sources/AsyncAlgorithms/Debounce/DebounceStateMachine.swift create mode 100644 Sources/AsyncAlgorithms/Debounce/DebounceStorage.swift diff --git a/Sources/AsyncAlgorithms/AsyncDebounceSequence.swift b/Sources/AsyncAlgorithms/AsyncDebounceSequence.swift deleted file mode 100644 index 304dc01d..00000000 --- a/Sources/AsyncAlgorithms/AsyncDebounceSequence.swift +++ /dev/null @@ -1,121 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Swift Async Algorithms open source project -// -// Copyright (c) 2022 Apple Inc. and the Swift project authors -// Licensed under Apache License v2.0 with Runtime Library Exception -// -// See https://swift.org/LICENSE.txt for license information -// -//===----------------------------------------------------------------------===// - -extension AsyncSequence { - /// Creates an asynchronous sequence that emits the latest element after a given quiescence period - /// has elapsed by using a specified Clock. - @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) - public func debounce(for interval: C.Instant.Duration, tolerance: C.Instant.Duration? = nil, clock: C) -> AsyncDebounceSequence { - AsyncDebounceSequence(self, interval: interval, tolerance: tolerance, clock: clock) - } - - /// Creates an asynchronous sequence that emits the latest element after a given quiescence period - /// has elapsed. - @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) - public func debounce(for interval: Duration, tolerance: Duration? = nil) -> AsyncDebounceSequence { - debounce(for: interval, tolerance: tolerance, clock: .continuous) - } -} - -/// An `AsyncSequence` that emits the latest element after a given quiescence period -/// has elapsed. -@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) -public struct AsyncDebounceSequence: Sendable - where Base.AsyncIterator: Sendable, Base.Element: Sendable, Base: Sendable { - let base: Base - let interval: C.Instant.Duration - let tolerance: C.Instant.Duration? - let clock: C - - init(_ base: Base, interval: C.Instant.Duration, tolerance: C.Instant.Duration?, clock: C) { - self.base = base - self.interval = interval - self.tolerance = tolerance - self.clock = clock - } -} - -@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) -extension AsyncDebounceSequence: AsyncSequence { - public typealias Element = Base.Element - - /// The iterator for a `AsyncDebounceSequence` instance. - public struct Iterator: AsyncIteratorProtocol, Sendable { - enum Partial: Sendable { - case sleep - case produce(Result, Base.AsyncIterator) - } - var iterator: Base.AsyncIterator - var produce: Task? - var terminal = false - let interval: C.Instant.Duration - let tolerance: C.Instant.Duration? - let clock: C - - init(_ base: Base.AsyncIterator, interval: C.Instant.Duration, tolerance: C.Instant.Duration?, clock: C) { - self.iterator = base - self.interval = interval - self.tolerance = tolerance - self.clock = clock - } - - public mutating func next() async rethrows -> Base.Element? { - var last: C.Instant? - var lastResult: Result? - while !terminal { - let deadline = (last ?? clock.now).advanced(by: interval) - let sleep: Task = Task { [tolerance, clock] in - try? await clock.sleep(until: deadline, tolerance: tolerance) - return .sleep - } - let produce: Task = self.produce ?? Task { [iterator] in - var iter = iterator - do { - let value = try await iter.next() - return .produce(.success(value), iter) - } catch { - return .produce(.failure(error), iter) - } - } - self.produce = nil - switch await Task.select(sleep, produce).value { - case .sleep: - self.produce = produce - if let result = lastResult { - return try result._rethrowGet() - } - break - case .produce(let result, let iter): - lastResult = result - last = clock.now - sleep.cancel() - self.iterator = iter - switch result { - case .success(let value): - if value == nil { - terminal = true - return nil - } - case .failure: - terminal = true - try result._rethrowError() - } - break - } - } - return nil - } - } - - public func makeAsyncIterator() -> Iterator { - Iterator(base.makeAsyncIterator(), interval: interval, tolerance: tolerance, clock: clock) - } -} diff --git a/Sources/AsyncAlgorithms/Debounce/AsyncDebounceSequence.swift b/Sources/AsyncAlgorithms/Debounce/AsyncDebounceSequence.swift new file mode 100644 index 00000000..6b827709 --- /dev/null +++ b/Sources/AsyncAlgorithms/Debounce/AsyncDebounceSequence.swift @@ -0,0 +1,115 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2022 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +extension AsyncSequence { + /// Creates an asynchronous sequence that emits the latest element after a given quiescence period + /// has elapsed by using a specified Clock. + @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) + public func debounce(for interval: C.Instant.Duration, tolerance: C.Instant.Duration? = nil, clock: C) -> AsyncDebounceSequence where Self: Sendable { + AsyncDebounceSequence(self, interval: interval, tolerance: tolerance, clock: clock) + } + + /// Creates an asynchronous sequence that emits the latest element after a given quiescence period + /// has elapsed. + @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) + public func debounce(for interval: Duration, tolerance: Duration? = nil) -> AsyncDebounceSequence where Self: Sendable { + self.debounce(for: interval, tolerance: tolerance, clock: .continuous) + } +} + +/// An `AsyncSequence` that emits the latest element after a given quiescence period +/// has elapsed. +@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) +public struct AsyncDebounceSequence: Sendable where Base: Sendable { + /// This class is needed to hook the deinit to observe once all references to the ``AsyncDebounceSequence`` are dropped. + /// + /// If we get move-only types we should be able to drop this class and use the `deinit` of the ``AsyncDebounceSequence`` struct itself. + final class InternalClass: Sendable { + fileprivate let storage: DebounceStorage + + fileprivate init(storage: DebounceStorage) { + self.storage = storage + } + + deinit { + storage.sequenceDeinitialized() + } + } + + /// The internal class to hook the `deinit`. + let internalClass: InternalClass + + /// The underlying storage + fileprivate var storage: DebounceStorage { + self.internalClass.storage + } + + /// Initializes a new ``AsyncDebounceSequence``. + /// + /// - Parameters: + /// - base: The base sequence. + /// - interval: The interval to debounce. + /// - tolerance: The tolerance of the clock. + /// - clock: The clock. + public init(_ base: Base, interval: C.Instant.Duration, tolerance: C.Instant.Duration?, clock: C) { + let storage = DebounceStorage( + base: base, + interval: interval, + tolerance: tolerance, + clock: clock + ) + self.internalClass = .init(storage: storage) + } +} + +@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) +extension AsyncDebounceSequence: AsyncSequence { + public typealias Element = Base.Element + + public func makeAsyncIterator() -> AsyncIterator { + AsyncIterator(storage: self.internalClass.storage) + } +} + +@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) +extension AsyncDebounceSequence { + public struct AsyncIterator: AsyncIteratorProtocol { + /// This class is needed to hook the deinit to observe once all references to the ``AsyncIterator`` are dropped. + /// + /// If we get move-only types we should be able to drop this class and use the `deinit` of the ``AsyncIterator`` struct itself. + final class InternalClass: Sendable { + private let storage: DebounceStorage + + fileprivate init(storage: DebounceStorage) { + self.storage = storage + self.storage.iteratorInitialized() + } + + deinit { + self.storage.iteratorDeinitialized() + } + + func next() async rethrows -> Element? { + try await self.storage.next() + } + } + + let internalClass: InternalClass + + fileprivate init(storage: DebounceStorage) { + self.internalClass = InternalClass(storage: storage) + } + + public mutating func next() async rethrows -> Element? { + try await self.internalClass.next() + } + } +} diff --git a/Sources/AsyncAlgorithms/Debounce/DebounceStateMachine.swift b/Sources/AsyncAlgorithms/Debounce/DebounceStateMachine.swift new file mode 100644 index 00000000..d48253c1 --- /dev/null +++ b/Sources/AsyncAlgorithms/Debounce/DebounceStateMachine.swift @@ -0,0 +1,747 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2022 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) +struct DebounceStateMachine { + typealias Element = Base.Element + + private enum State { + /// The initial state before a call to `makeAsyncIterator` happened. + case initial(base: Base) + + /// The state while we are waiting for downstream demand. + case waitingForDemand( + task: Task, + upstreamContinuation: UnsafeContinuation?, + clockContinuation: UnsafeContinuation?, + bufferedElement: (element: Element, deadline: C.Instant)? + ) + + /// The state once the downstream signalled demand but before we received + /// the first element from the upstream. + case demandSignalled( + task: Task, + clockContinuation: UnsafeContinuation?, + downstreamContinuation: UnsafeContinuation + ) + + /// The state while we are consuming the upstream and waiting for the Clock.sleep to finish. + case debouncing( + task: Task, + upstreamContinuation: UnsafeContinuation?, + downstreamContinuation: UnsafeContinuation, + currentElement: (element: Element, deadline: C.Instant) + ) + + /// The state once any of the upstream sequences threw an `Error`. + case upstreamFailure( + error: Error + ) + + /// The state once all upstream sequences finished or the downstream consumer stopped, i.e. by dropping all references + /// or by getting their `Task` cancelled. + case finished + } + + /// The state machine's current state. + private var state: State + /// The interval to debounce. + private let interval: C.Instant.Duration + /// The clock. + private let clock: C + + init(base: Base, clock: C, interval: C.Instant.Duration) { + self.state = .initial(base: base) + self.clock = clock + self.interval = interval + } + + mutating func sequenceDeinitialized() { + switch self.state { + case .initial: + // The references to the sequence were dropped before any iterator was ever created + self.state = .finished + + case .waitingForDemand, .demandSignalled, .debouncing, .upstreamFailure: + // An iterator was created and we deinited the sequence. + // This is an expected pattern and we just continue on normal. + // Importantly since we are a unicast sequence no more iterators can be created + return + + case .finished: + // We are already finished so there is nothing left to clean up. + // This is just the references dropping afterwards. + return + } + } + + /// Actions returned by `iteratorInitialized()`. + enum IteratorInitializedAction { + /// Indicates that a new `Task` should be created that consumes the sequence. + case startTask(Base) + } + + mutating func iteratorInitialized() -> IteratorInitializedAction { + switch self.state { + case .initial(let base): + // This is the first iterator being created and we need to create our `Task` + // that is consuming the upstream sequences. + return .startTask(base) + + case .waitingForDemand, .demandSignalled, .debouncing, .upstreamFailure, .finished: + fatalError("debounce allows only a single AsyncIterator to be created") + } + } + + /// Actions returned by `iteratorDeinitialized()`. + enum IteratorDeinitializedAction { + /// Indicates that the `Task` needs to be cancelled and + /// the upstream and clock continuation need to be resumed with a `CancellationError`. + case cancelTaskAndUpstreamAndClockContinuations( + task: Task, + upstreamContinuation: UnsafeContinuation?, + clockContinuation: UnsafeContinuation? + ) + /// Indicates that nothing should be done. + case none + } + + mutating func iteratorDeinitialized() -> IteratorDeinitializedAction { + switch self.state { + case .initial: + // An iterator needs to be initialized before it can be deinitialized. + preconditionFailure("Internal inconsistency current state \(self.state) and received iteratorDeinitialized()") + + case .debouncing, .demandSignalled: + // An iterator was deinitialized while we have a suspended continuation. + preconditionFailure("Internal inconsistency current state \(self.state) and received iteratorDeinitialized()") + + case .waitingForDemand(let task, let upstreamContinuation, let clockContinuation, _): + // The iterator was dropped which signals that the consumer is finished. + // We can transition to finished now and need to clean everything up. + self.state = .finished + + return .cancelTaskAndUpstreamAndClockContinuations( + task: task, + upstreamContinuation: upstreamContinuation, + clockContinuation: clockContinuation + ) + + case .upstreamFailure: + // The iterator was dropped which signals that the consumer is finished. + // We can transition to finished now. The cleanup already happened when we + // transitioned to `upstreamFailure`. + self.state = .finished + + return .none + + case .finished: + // We are already finished so there is nothing left to clean up. + // This is just the references dropping afterwards. + return .none + } + } + + mutating func taskStarted(_ task: Task) { + switch self.state { + case .initial: + // The user called `makeAsyncIterator` and we are starting the `Task` + // to consume the upstream sequence + self.state = .waitingForDemand( + task: task, + upstreamContinuation: nil, + clockContinuation: nil, + bufferedElement: nil + ) + + case .debouncing, .demandSignalled, .waitingForDemand, .upstreamFailure, .finished: + // We only a single iterator to be created so this must never happen. + preconditionFailure("Internal inconsistency current state \(self.state) and received taskStarted()") + } + } + + /// Actions returned by `upstreamTaskSuspended()`. + enum UpstreamTaskSuspendedAction { + /// Indicates that the continuation should be resumed which will lead to calling `next` on the upstream. + case resumeContinuation( + upstreamContinuation: UnsafeContinuation + ) + /// Indicates that the continuation should be resumed with an Error because another upstream sequence threw. + case resumeContinuationWithError( + upstreamContinuation: UnsafeContinuation, + error: Error + ) + /// Indicates that nothing should be done. + case none + } + + mutating func upstreamTaskSuspended(_ continuation: UnsafeContinuation) -> UpstreamTaskSuspendedAction { + switch self.state { + case .initial: + // Child tasks are only created after we transitioned to `merging` + preconditionFailure("Internal inconsistency current state \(self.state) and received childTaskSuspended()") + + case .waitingForDemand(_, .some, _, _), .debouncing(_, .some, _, _): + // We already have an upstream continuation so we can never get a second one + preconditionFailure("Internal inconsistency current state \(self.state) and received childTaskSuspended()") + + case .upstreamFailure: + // The upstream already failed so it should never suspend again since the child task + // should have exited + preconditionFailure("Internal inconsistency current state \(self.state) and received childTaskSuspended()") + + case .waitingForDemand(let task, .none, let clockContinuation, let bufferedElement): + // The upstream task is ready to consume the next element + // we are just waiting to get demand + self.state = .waitingForDemand( + task: task, + upstreamContinuation: continuation, + clockContinuation: clockContinuation, + bufferedElement: bufferedElement + ) + + return .none + + case .demandSignalled: + // It can happen that the demand got signalled before our upstream suspended for the first time + // We need to resume it right away to demand the first element from the upstream + return .resumeContinuation(upstreamContinuation: continuation) + + case .debouncing(_, .none, _, _): + // We are currently debouncing and the upstream task suspended again + // We need to resume the continuation right away so that it continues to + // consume new elements from the upstream + + return .resumeContinuation(upstreamContinuation: continuation) + + case .finished: + // Since cancellation is cooperative it might be that child tasks are still getting + // suspended even though we already cancelled them. We must tolerate this and just resume + // the continuation with an error. + return .resumeContinuationWithError( + upstreamContinuation: continuation, + error: CancellationError() + ) + } + } + + /// Actions returned by `elementProduced()`. + enum ElementProducedAction { + /// Indicates that the clock continuation should be resumed to start the `Clock.sleep`. + case resumeClockContinuation( + clockContinuation: UnsafeContinuation?, + deadline: C.Instant + ) + /// Indicates that nothing should be done. + case none + } + + mutating func elementProduced(_ element: Element, deadline: C.Instant) -> ElementProducedAction { + switch self.state { + case .initial: + // Child tasks that are producing elements are only created after we transitioned to `merging` + preconditionFailure("Internal inconsistency current state \(self.state) and received elementProduced()") + + case .waitingForDemand(_, _, _, .some): + // We can only ever buffer one element because of the race of both child tasks + // After that element got buffered we are not resuming the upstream continuation + // and should never get another element until we get downstream demand signalled + preconditionFailure("Internal inconsistency current state \(self.state) and received elementProduced()") + + case .upstreamFailure: + // The upstream already failed so it should never have produced another element + preconditionFailure("Internal inconsistency current state \(self.state) and received childTaskSuspended()") + + case .waitingForDemand(let task, let upstreamContinuation, let clockContinuation, .none): + // We got an element even though we don't have an outstanding demand + // this can happen because we race the upstream and Clock child tasks + // and the upstream might finish after the Clock. We just need + // to buffer the element for the next demand. + self.state = .waitingForDemand( + task: task, + upstreamContinuation: upstreamContinuation, + clockContinuation: clockContinuation, + bufferedElement: (element, deadline) + ) + + return .none + + case .demandSignalled(let task, let clockContinuation, let downstreamContinuation): + // This is the first element that got produced after we got demand signalled + // We can now transition to debouncing and start the Clock.sleep + self.state = .debouncing( + task: task, + upstreamContinuation: nil, + downstreamContinuation: downstreamContinuation, + currentElement: (element, deadline) + ) + + let deadline = self.clock.now.advanced(by: self.interval) + return .resumeClockContinuation( + clockContinuation: clockContinuation, + deadline: deadline + ) + + case .debouncing(let task, let upstreamContinuation, let downstreamContinuation, _): + // We just got another element and the Clock hasn't finished sleeping yet + // We just need to store the new element + self.state = .debouncing( + task: task, + upstreamContinuation: upstreamContinuation, + downstreamContinuation: downstreamContinuation, + currentElement: (element, deadline) + ) + + return .none + + case .finished: + // Since cancellation is cooperative it might be that child tasks + // are still producing elements after we finished. + // We are just going to drop them since there is nothing we can do + return .none + } + } + + /// Actions returned by `upstreamFinished()`. + enum UpstreamFinishedAction { + /// Indicates that the task and the clock continuation should be cancelled. + case cancelTaskAndClockContinuation( + task: Task, + clockContinuation: UnsafeContinuation? + ) + /// Indicates that the downstream continuation should be resumed with `nil` and + /// the task and the upstream continuation should be cancelled. + case resumeContinuationWithNilAndCancelTaskAndUpstreamAndClockContinuation( + downstreamContinuation: UnsafeContinuation, + task: Task, + upstreamContinuation: UnsafeContinuation?, + clockContinuation: UnsafeContinuation? + ) + /// Indicates that nothing should be done. + case none + } + + mutating func upstreamFinished() -> UpstreamFinishedAction { + switch self.state { + case .initial: + preconditionFailure("Internal inconsistency current state \(self.state) and received upstreamFinished()") + + case .waitingForDemand(_, .some, _, _): + // We will never receive an upstream finished and have an outstanding continuation + // since we only receive finish after resuming the upstream continuation + preconditionFailure("Internal inconsistency current state \(self.state) and received upstreamFinished()") + + case .waitingForDemand(_, .none, _, .some): + // We will never receive an upstream finished while we have a buffered element + // To get there we would need to have received the buffered element and then + // received upstream finished all while waiting for demand; however, we should have + // never demanded the next element from upstream in the first place + preconditionFailure("Internal inconsistency current state \(self.state) and received upstreamFinished()") + + case .upstreamFailure: + // The upstream already failed so it should never have finished again + preconditionFailure("Internal inconsistency current state \(self.state) and received childTaskSuspended()") + + case .waitingForDemand(let task, .none, let clockContinuation, .none): + // We don't have any buffered element so we can just go ahead + // and transition to finished and cancel everything + self.state = .finished + + return .cancelTaskAndClockContinuation( + task: task, + clockContinuation: clockContinuation + ) + + case .demandSignalled(let task, let clockContinuation, let downstreamContinuation): + // We demanded the next element from the upstream after we got signalled demand + // and the upstream finished. This means we need to resume the downstream with nil + self.state = .finished + + return .resumeContinuationWithNilAndCancelTaskAndUpstreamAndClockContinuation( + downstreamContinuation: downstreamContinuation, + task: task, + upstreamContinuation: nil, + clockContinuation: clockContinuation + ) + + case .debouncing(let task, let upstreamContinuation, let downstreamContinuation, _): + // We are debouncing and the upstream finished. At this point + // we can just resume the downstream continuation with nil and cancel everything else + self.state = .finished + + return .resumeContinuationWithNilAndCancelTaskAndUpstreamAndClockContinuation( + downstreamContinuation: downstreamContinuation, + task: task, + upstreamContinuation: upstreamContinuation, + clockContinuation: nil + ) + + case .finished: + // This is just everything finishing up, nothing to do here + return .none + } + } + + /// Actions returned by `upstreamThrew()`. + enum UpstreamThrewAction { + /// Indicates that the task and the clock continuation should be cancelled. + case cancelTaskAndClockContinuation( + task: Task, + clockContinuation: UnsafeContinuation? + ) + /// Indicates that the downstream continuation should be resumed with the `error` and + /// the task and the upstream continuation should be cancelled. + case resumeContinuationWithErrorAndCancelTaskAndUpstreamContinuation( + downstreamContinuation: UnsafeContinuation, + error: Error, + task: Task, + upstreamContinuation: UnsafeContinuation?, + clockContinuation: UnsafeContinuation? + ) + /// Indicates that nothing should be done. + case none + } + + mutating func upstreamThrew(_ error: Error) -> UpstreamThrewAction { + switch self.state { + case .initial: + preconditionFailure("Internal inconsistency current state \(self.state) and received upstreamThrew()") + + case .waitingForDemand(_, .some, _, _): + // We will never receive an upstream threw and have an outstanding continuation + // since we only receive threw after resuming the upstream continuation + preconditionFailure("Internal inconsistency current state \(self.state) and received upstreamFinished()") + + case .waitingForDemand(_, .none, _, .some): + // We will never receive an upstream threw while we have a buffered element + // To get there we would need to have received the buffered element and then + // received upstream threw all while waiting for demand; however, we should have + // never demanded the next element from upstream in the first place + preconditionFailure("Internal inconsistency current state \(self.state) and received upstreamFinished()") + + case .upstreamFailure: + // The upstream already failed so it should never have throw again. + preconditionFailure("Internal inconsistency current state \(self.state) and received childTaskSuspended()") + + case .waitingForDemand(let task, .none, let clockContinuation, .none): + // We don't have any buffered element so we can just go ahead + // and transition to finished and cancel everything + self.state = .finished + + return .cancelTaskAndClockContinuation( + task: task, + clockContinuation: clockContinuation + ) + + case .demandSignalled(let task, let clockContinuation, let downstreamContinuation): + // We demanded the next element from the upstream after we got signalled demand + // and the upstream threw. This means we need to resume the downstream with the error + self.state = .finished + + return .resumeContinuationWithErrorAndCancelTaskAndUpstreamContinuation( + downstreamContinuation: downstreamContinuation, + error: error, + task: task, + upstreamContinuation: nil, + clockContinuation: clockContinuation + ) + + case .debouncing(let task, let upstreamContinuation, let downstreamContinuation, _): + // We are debouncing and the upstream threw. At this point + // we can just resume the downstream continuation with error and cancel everything else + self.state = .finished + + return .resumeContinuationWithErrorAndCancelTaskAndUpstreamContinuation( + downstreamContinuation: downstreamContinuation, + error: error, + task: task, + upstreamContinuation: upstreamContinuation, + clockContinuation: nil + ) + + case .finished: + // This is just everything finishing up, nothing to do here + return .none + } + } + + /// Actions returned by `clockTaskSuspended()`. + enum ClockTaskSuspendedAction { + /// Indicates that the continuation should be resumed which will lead to calling `sleep` on the Clock. + case resumeContinuation( + clockContinuation: UnsafeContinuation, + deadline: C.Instant + ) + /// Indicates that the continuation should be resumed with an Error because another upstream sequence threw. + case resumeContinuationWithError( + clockContinuation: UnsafeContinuation, + error: Error + ) + /// Indicates that nothing should be done. + case none + } + + mutating func clockTaskSuspended(_ continuation: UnsafeContinuation) -> ClockTaskSuspendedAction { + switch self.state { + case .initial: + // Child tasks are only created after we transitioned to `merging` + preconditionFailure("Internal inconsistency current state \(self.state) and received clockTaskSuspended()") + + case .waitingForDemand(_, _, .some, _): + // We already have a clock continuation so we can never get a second one + preconditionFailure("Internal inconsistency current state \(self.state) and received clockTaskSuspended()") + + case .demandSignalled(_, .some, _): + // We already have a clock continuation so we can never get a second one + preconditionFailure("Internal inconsistency current state \(self.state) and received clockTaskSuspended()") + + case .waitingForDemand(let task, let upstreamContinuation, .none, let bufferedElement): + // The clock child task suspended and we just need to store the continuation until + // demand is signalled + + self.state = .waitingForDemand( + task: task, + upstreamContinuation: upstreamContinuation, + clockContinuation: continuation, + bufferedElement: bufferedElement + ) + + return .none + + case .demandSignalled(let task, .none, let downstreamContinuation): + // The demand was signalled but we haven't gotten the first element from the upstream yet + // so we need to stay in this state and do nothing + self.state = .demandSignalled( + task: task, + clockContinuation: continuation, + downstreamContinuation: downstreamContinuation + ) + + return .none + + case .debouncing(_, _, _, let currentElement): + // We are currently debouncing and the Clock task suspended + // We need to resume the continuation right away. + return .resumeContinuation( + clockContinuation: continuation, + deadline: currentElement.deadline + ) + + case .upstreamFailure: + // The upstream failed while we were waiting to suspend the clock task again + // The task should have already been cancelled and we just need to cancel the continuation + return .resumeContinuationWithError( + clockContinuation: continuation, + error: CancellationError() + ) + + case .finished: + // Since cancellation is cooperative it might be that child tasks are still getting + // suspended even though we already cancelled them. We must tolerate this and just resume + // the continuation with an error. + return .resumeContinuationWithError( + clockContinuation: continuation, + error: CancellationError() + ) + } + } + + /// Actions returned by `clockSleepFinished()`. + enum ClockSleepFinishedAction { + /// Indicates that the downstream continuation should be resumed with the given element. + case resumeDownStreamContinuation( + downStreamContinuation: UnsafeContinuation, + element: Element + ) + /// Indicates that nothing should be done. + case none + } + + mutating func clockSleepFinished() -> ClockSleepFinishedAction { + switch self.state { + case .initial: + // Child tasks are only created after we transitioned to `merging` + preconditionFailure("Internal inconsistency current state \(self.state) and received clockSleepFinished()") + + case .waitingForDemand: + // This can never happen since we kicked-off the Clock.sleep because we got signalled demand. + preconditionFailure("Internal inconsistency current state \(self.state) and received clockSleepFinished()") + + case .demandSignalled: + // This can never happen since we are still waiting for the first element until we resume the Clock sleep. + preconditionFailure("Internal inconsistency current state \(self.state) and received clockSleepFinished()") + + case .debouncing(let task, let upstreamContinuation, let downstreamContinuation, let currentElement): + if currentElement.deadline <= self.clock.now { + // The deadline for the last produced element expired and we can forward it to the downstream + self.state = .waitingForDemand( + task: task, + upstreamContinuation: upstreamContinuation, + clockContinuation: nil, + bufferedElement: nil + ) + + return .resumeDownStreamContinuation( + downStreamContinuation: downstreamContinuation, + element: currentElement.element + ) + } else { + // The deadline is still in the future so we need to sleep again + return .none + } + + case .upstreamFailure: + // The upstream failed before the Clock.sleep finished + // We already cleaned everything up so nothing left to do here. + return .none + + case .finished: + // The upstream failed before the Clock.sleep finished + // We already cleaned everything up so nothing left to do here. + return .none + } + } + + /// Actions returned by `cancelled()`. + enum CancelledAction { + /// Indicates that the downstream continuation needs to be resumed and + /// task and the upstream continuations should be cancelled. + case resumeDownstreamContinuationWithNilAndCancelTaskAndUpstreamAndClockContinuation( + downstreamContinuation: UnsafeContinuation, + task: Task, + upstreamContinuation: UnsafeContinuation?, + clockContinuation: UnsafeContinuation? + ) + /// Indicates that nothing should be done. + case none + } + + mutating func cancelled() -> CancelledAction { + switch self.state { + case .initial: + // Since we are transitioning to `merging` before we return from `makeAsyncIterator` + // this can never happen + preconditionFailure("Internal inconsistency current state \(self.state) and received cancelled()") + + case .waitingForDemand: + // We got cancelled before we event got any demand. This can happen if a cancelled task + // calls next and the onCancel handler runs first. We can transition to finished right away. + self.state = .finished + + return .none + + case .demandSignalled(let task, let clockContinuation, let downstreamContinuation): + // We got cancelled while we were waiting for the first upstream element + // We can cancel everything at this point and return nil + self.state = .finished + + return .resumeDownstreamContinuationWithNilAndCancelTaskAndUpstreamAndClockContinuation( + downstreamContinuation: downstreamContinuation, + task: task, + upstreamContinuation: nil, + clockContinuation: clockContinuation + ) + + case .debouncing(let task, let upstreamContinuation, let downstreamContinuation, _): + // We got cancelled while debouncing. + // We can cancel everything at this point and return nil + self.state = .finished + + return .resumeDownstreamContinuationWithNilAndCancelTaskAndUpstreamAndClockContinuation( + downstreamContinuation: downstreamContinuation, + task: task, + upstreamContinuation: upstreamContinuation, + clockContinuation: nil + ) + + case .upstreamFailure: + // An upstream already threw and we cancelled everything already. + // We should stay in the upstream failure state until the error is consumed + return .none + + case .finished: + // We are already finished so nothing to do here: + self.state = .finished + + return .none + } + } + + /// Actions returned by `next()`. + enum NextAction { + case resumeUpstreamContinuation( + upstreamContinuation: UnsafeContinuation? + ) + case resumeUpstreamAndClockContinuation( + upstreamContinuation: UnsafeContinuation?, + clockContinuation: UnsafeContinuation?, + deadline: C.Instant + ) + /// Indicates that the downstream continuation should be resumed with `nil`. + case resumeDownstreamContinuationWithNil(UnsafeContinuation) + /// Indicates that the downstream continuation should be resumed with the error. + case resumeDownstreamContinuationWithError( + UnsafeContinuation, + Error + ) + } + + mutating func next(for continuation: UnsafeContinuation) -> NextAction { + switch self.state { + case .initial: + preconditionFailure("Internal inconsistency current state \(self.state) and received next()") + + case .demandSignalled, .debouncing: + // We already got demand signalled and have suspended the downstream task + // Getting a second next calls means the iterator was transferred across Tasks which is not allowed + preconditionFailure("Internal inconsistency current state \(self.state) and received next()") + + case .waitingForDemand(let task, let upstreamContinuation, let clockContinuation, let bufferedElement): + if let bufferedElement = bufferedElement { + // We already got an element from the last buffered one + // We can kick of the clock and upstream consumption right away and transition to debouncing + self.state = .debouncing( + task: task, + upstreamContinuation: nil, + downstreamContinuation: continuation, + currentElement: bufferedElement + ) + + return .resumeUpstreamAndClockContinuation( + upstreamContinuation: upstreamContinuation, + clockContinuation: clockContinuation, + deadline: bufferedElement.deadline + ) + } else { + // We don't have a buffered element so have to resume the upstream continuation + // to get the first one and transition to demandSignalled + self.state = .demandSignalled( + task: task, + clockContinuation: clockContinuation, + downstreamContinuation: continuation + ) + + return .resumeUpstreamContinuation(upstreamContinuation: upstreamContinuation) + } + + case .upstreamFailure(let error): + // The upstream threw and haven't delivered the error yet + // Let's deliver it and transition to finished + self.state = .finished + + return .resumeDownstreamContinuationWithError(continuation, error) + + case .finished: + // We are already finished so we are just returning `nil` + return .resumeDownstreamContinuationWithNil(continuation) + } + } +} diff --git a/Sources/AsyncAlgorithms/Debounce/DebounceStorage.swift b/Sources/AsyncAlgorithms/Debounce/DebounceStorage.swift new file mode 100644 index 00000000..677de7af --- /dev/null +++ b/Sources/AsyncAlgorithms/Debounce/DebounceStorage.swift @@ -0,0 +1,294 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2022 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) +final class DebounceStorage: @unchecked Sendable where Base: Sendable { + typealias Element = Base.Element + + /// The lock that protects our state. + private let lock = Lock.allocate() + /// The state machine. + private var stateMachine: DebounceStateMachine + /// The interval to debounce. + private let interval: C.Instant.Duration + /// The tolerance for the clock. + private let tolerance: C.Instant.Duration? + /// The clock. + private let clock: C + + init(base: Base, interval: C.Instant.Duration, tolerance: C.Instant.Duration?, clock: C) { + self.stateMachine = .init(base: base, clock: clock, interval: interval) + self.interval = interval + self.tolerance = tolerance + self.clock = clock + } + + deinit { + self.lock.deinitialize() + } + + func sequenceDeinitialized() { + self.lock.withLock { self.stateMachine.sequenceDeinitialized() } + } + + func iteratorInitialized() { + self.lock.withLockVoid { + let action = self.stateMachine.iteratorInitialized() + + switch action { + case .startTask(let base): + let task = Task { + await withThrowingTaskGroup(of: Void.self) { group in + // The task that consumes the upstream sequence + group.addTask { + var iterator = base.makeAsyncIterator() + + // This is our upstream consumption loop + loop: while true { + // We are creating a continuation before requesting the next + // element from upstream. This continuation is only resumed + // if the downstream consumer called `next` to signal his demand + // and until the Clock sleep finished. + try await withUnsafeThrowingContinuation { continuation in + let action = self.lock.withLock { + self.stateMachine.upstreamTaskSuspended(continuation) + } + + switch action { + case .resumeContinuation(let continuation): + // This happens if there is outstanding demand + // and we need to demand from upstream right away + continuation.resume(returning: ()) + + case .resumeContinuationWithError(let continuation, let error): + // This happens if the task got cancelled. + continuation.resume(throwing: error) + + case .none: + break + } + } + + // We got signalled from the downstream that we have demand so let's + // request a new element from the upstream + if let element = try await iterator.next() { + let action = self.lock.withLock { + let deadline = self.clock.now.advanced(by: self.interval) + return self.stateMachine.elementProduced(element, deadline: deadline) + } + + switch action { + case .resumeClockContinuation(let clockContinuation, let deadline): + clockContinuation?.resume(returning: deadline) + + case .none: + break + } + } else { + // The upstream returned `nil` which indicates that it finished + let action = self.lock.withLock { + self.stateMachine.upstreamFinished() + } + + // All of this is mostly cleanup around the Task and the outstanding + // continuations used for signalling. + switch action { + case .cancelTaskAndClockContinuation(let task, let clockContinuation): + task.cancel() + clockContinuation?.resume(throwing: CancellationError()) + + break loop + case .resumeContinuationWithNilAndCancelTaskAndUpstreamAndClockContinuation( + let downstreamContinuation, + let task, + let upstreamContinuation, + let clockContinuation + ): + upstreamContinuation?.resume(throwing: CancellationError()) + clockContinuation?.resume(throwing: CancellationError()) + task.cancel() + + downstreamContinuation.resume(returning: nil) + + break loop + case .none: + + break loop + } + } + } + } + + group.addTask { + // This is our clock scheduling loop + loop: while true { + do { + // We are creating a continuation sleeping on the Clock. + // This continuation is only resumed if the downstream consumer called `next`. + let deadline: C.Instant = try await withUnsafeThrowingContinuation { continuation in + let action = self.lock.withLock { + self.stateMachine.clockTaskSuspended(continuation) + } + + switch action { + case .resumeContinuation(let continuation, let deadline): + // This happens if there is outstanding demand + // and we need to demand from upstream right away + continuation.resume(returning: deadline) + + case .resumeContinuationWithError(let continuation, let error): + // This happens if the task got cancelled. + continuation.resume(throwing: error) + + case .none: + break + } + } + + try await self.clock.sleep(until: deadline, tolerance: self.tolerance) + + let action = self.lock.withLock { + self.stateMachine.clockSleepFinished() + } + + switch action { + case .resumeDownStreamContinuation(let downStreamContinuation, let element): + downStreamContinuation.resume(returning: element) + + case .none: + break + } + } catch { + // The only error that we expect is the `CancellationError` + // thrown from the Clock.sleep or from the withUnsafeContinuation. + // This happens if we are cleaning everything up. We can just drop that error and break our loop + precondition(error is CancellationError, "Received unexpected error \(error) in the Clock loop") + break loop + } + } + } + + do { + try await group.waitForAll() + } catch { + // The upstream sequence threw an error + let action = self.lock.withLock { + self.stateMachine.upstreamThrew(error) + } + + switch action { + case .resumeContinuationWithErrorAndCancelTaskAndUpstreamContinuation( + let downstreamContinuation, + let error, + let task, + let upstreamContinuation, + let clockContinuation + ): + upstreamContinuation?.resume(throwing: CancellationError()) + clockContinuation?.resume(throwing: CancellationError()) + + task.cancel() + + downstreamContinuation.resume(throwing: error) + + case .cancelTaskAndClockContinuation( + let task, + let clockContinuation + ): + clockContinuation?.resume(throwing: CancellationError()) + task.cancel() + + case .none: + break + } + + group.cancelAll() + } + } + } + + self.stateMachine.taskStarted(task) + } + } + } + + func iteratorDeinitialized() { + let action = self.lock.withLock { self.stateMachine.iteratorDeinitialized() } + + switch action { + case .cancelTaskAndUpstreamAndClockContinuations( + let task, + let upstreamContinuation, + let clockContinuation + ): + upstreamContinuation?.resume(throwing: CancellationError()) + clockContinuation?.resume(throwing: CancellationError()) + + task.cancel() + + case .none: + break + } + } + + func next() async rethrows -> Element? { + // We need to handle cancellation here because we are creating a continuation + // and because we need to cancel the `Task` we created to consume the upstream + return try await withTaskCancellationHandler { + // We always suspend since we can never return an element right away + + self.lock.lock() + return try await withUnsafeThrowingContinuation { continuation in + let action = self.stateMachine.next(for: continuation) + self.lock.unlock() + + switch action { + case .resumeUpstreamContinuation(let upstreamContinuation): + // This is signalling the upstream task that is consuming the upstream + // sequence to signal demand. + upstreamContinuation?.resume(returning: ()) + + case .resumeUpstreamAndClockContinuation(let upstreamContinuation, let clockContinuation, let deadline): + // This is signalling the upstream task that is consuming the upstream + // sequence to signal demand and start the clock task. + upstreamContinuation?.resume(returning: ()) + clockContinuation?.resume(returning: deadline) + + case .resumeDownstreamContinuationWithNil(let continuation): + continuation.resume(returning: nil) + + case .resumeDownstreamContinuationWithError(let continuation, let error): + continuation.resume(throwing: error) + } + } + } onCancel: { + let action = self.lock.withLock { self.stateMachine.cancelled() } + + switch action { + case .resumeDownstreamContinuationWithNilAndCancelTaskAndUpstreamAndClockContinuation( + let downstreamContinuation, + let task, + let upstreamContinuation, + let clockContinuation + ): + upstreamContinuation?.resume(throwing: CancellationError()) + clockContinuation?.resume(throwing: CancellationError()) + + task.cancel() + + downstreamContinuation.resume(returning: nil) + + case .none: + break + } + } + } +} diff --git a/Tests/AsyncAlgorithmsTests/Performance/TestThroughput.swift b/Tests/AsyncAlgorithmsTests/Performance/TestThroughput.swift index 2d490cc9..f1868b5b 100644 --- a/Tests/AsyncAlgorithmsTests/Performance/TestThroughput.swift +++ b/Tests/AsyncAlgorithmsTests/Performance/TestThroughput.swift @@ -64,5 +64,11 @@ final class TestThroughput: XCTestCase { zip($0, $1, $2) } } + @available(macOS 13.0, *) + func test_debounce() async { + await measureSequenceThroughput(source: (1...).async) { + $0.debounce(for: .zero, clock: ContinuousClock()) + } + } } #endif diff --git a/Tests/AsyncAlgorithmsTests/TestDebounce.swift b/Tests/AsyncAlgorithmsTests/TestDebounce.swift index 2005c134..8f717011 100644 --- a/Tests/AsyncAlgorithmsTests/TestDebounce.swift +++ b/Tests/AsyncAlgorithmsTests/TestDebounce.swift @@ -58,4 +58,14 @@ final class TestDebounce: XCTestCase { "----|" } } + + func test_Rethrows() async throws { + guard #available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) else { throw XCTSkip("Skipped due to Clock/Instant/Duration availability") } + + let debounce = [1].async.debounce(for: .zero, clock: ContinuousClock()) + for await _ in debounce {} + + let throwingDebounce = [1].async.map { try throwOn(2, $0) }.debounce(for: .zero, clock: ContinuousClock()) + for try await _ in throwingDebounce {} + } } From b14a530b1f1c8ac023c027c2dd8070cc63675bf7 Mon Sep 17 00:00:00 2001 From: FranzBusch Date: Mon, 12 Sep 2022 13:38:13 +0100 Subject: [PATCH 2/6] Fix https://github.com/apple/swift-async-algorithms/issues/174 --- .../Debounce/DebounceStateMachine.swift | 16 +++++++++++--- .../Debounce/DebounceStorage.swift | 17 +++++++++++++++ Sources/AsyncAlgorithms/Locking.swift | 21 +++++++++++++++++++ Tests/AsyncAlgorithmsTests/TestDebounce.swift | 4 ++-- 4 files changed, 53 insertions(+), 5 deletions(-) diff --git a/Sources/AsyncAlgorithms/Debounce/DebounceStateMachine.swift b/Sources/AsyncAlgorithms/Debounce/DebounceStateMachine.swift index d48253c1..1f2dd77c 100644 --- a/Sources/AsyncAlgorithms/Debounce/DebounceStateMachine.swift +++ b/Sources/AsyncAlgorithms/Debounce/DebounceStateMachine.swift @@ -325,6 +325,15 @@ struct DebounceStateMachine { upstreamContinuation: UnsafeContinuation?, clockContinuation: UnsafeContinuation? ) + /// Indicates that the downstream continuation should be resumed with `nil` and + /// the task and the upstream continuation should be cancelled. + case resumeContinuationWithElementAndCancelTaskAndUpstreamAndClockContinuation( + downstreamContinuation: UnsafeContinuation, + element: Element, + task: Task, + upstreamContinuation: UnsafeContinuation?, + clockContinuation: UnsafeContinuation? + ) /// Indicates that nothing should be done. case none } @@ -372,13 +381,14 @@ struct DebounceStateMachine { clockContinuation: clockContinuation ) - case .debouncing(let task, let upstreamContinuation, let downstreamContinuation, _): + case .debouncing(let task, let upstreamContinuation, let downstreamContinuation, let currentElement): // We are debouncing and the upstream finished. At this point - // we can just resume the downstream continuation with nil and cancel everything else + // we can just resume the downstream continuation with element and cancel everything else self.state = .finished - return .resumeContinuationWithNilAndCancelTaskAndUpstreamAndClockContinuation( + return .resumeContinuationWithElementAndCancelTaskAndUpstreamAndClockContinuation( downstreamContinuation: downstreamContinuation, + element: currentElement.element, task: task, upstreamContinuation: upstreamContinuation, clockContinuation: nil diff --git a/Sources/AsyncAlgorithms/Debounce/DebounceStorage.swift b/Sources/AsyncAlgorithms/Debounce/DebounceStorage.swift index 677de7af..5d621568 100644 --- a/Sources/AsyncAlgorithms/Debounce/DebounceStorage.swift +++ b/Sources/AsyncAlgorithms/Debounce/DebounceStorage.swift @@ -119,6 +119,23 @@ final class DebounceStorage: @unchecked Sendable downstreamContinuation.resume(returning: nil) break loop + + case .resumeContinuationWithElementAndCancelTaskAndUpstreamAndClockContinuation( + let downstreamContinuation, + let element, + let task, + let upstreamContinuation, + let clockContinuation + ): + upstreamContinuation?.resume(throwing: CancellationError()) + clockContinuation?.resume(throwing: CancellationError()) + task.cancel() + + downstreamContinuation.resume(returning: element) + + break loop + + case .none: break loop diff --git a/Sources/AsyncAlgorithms/Locking.swift b/Sources/AsyncAlgorithms/Locking.swift index eedad1ee..74396080 100644 --- a/Sources/AsyncAlgorithms/Locking.swift +++ b/Sources/AsyncAlgorithms/Locking.swift @@ -87,6 +87,27 @@ internal struct Lock { func unlock() { Lock.unlock(platformLock) } + + /// Acquire the lock for the duration of the given block. + /// + /// This convenience method should be preferred to `lock` and `unlock` in + /// most situations, as it ensures that the lock will be released regardless + /// of how `body` exits. + /// + /// - Parameter body: The block to execute while holding the lock. + /// - Returns: The value returned by the block. + func withLock(_ body: () throws -> T) rethrows -> T { + self.lock() + defer { + self.unlock() + } + return try body() + } + + // specialise Void return (for performance) + func withLockVoid(_ body: () throws -> Void) rethrows -> Void { + try self.withLock(body) + } } struct ManagedCriticalState { diff --git a/Tests/AsyncAlgorithmsTests/TestDebounce.swift b/Tests/AsyncAlgorithmsTests/TestDebounce.swift index 8f717011..d82105bc 100644 --- a/Tests/AsyncAlgorithmsTests/TestDebounce.swift +++ b/Tests/AsyncAlgorithmsTests/TestDebounce.swift @@ -27,7 +27,7 @@ final class TestDebounce: XCTestCase { validate { "abcd----e---f-g-|" $0.inputs[0].debounce(for: .steps(3), clock: $0.clock) - "------d----e----|" + "------d----e----[g|]" } } @@ -37,7 +37,7 @@ final class TestDebounce: XCTestCase { validate { "a|" $0.inputs[0].debounce(for: .steps(3), clock: $0.clock) - "-|" + "-[a|]" } } From 3e9395111cce452abeaa195c5b4febe56906e62a Mon Sep 17 00:00:00 2001 From: FranzBusch Date: Tue, 13 Sep 2022 16:54:23 +0100 Subject: [PATCH 3/6] Code review --- .../Debounce/AsyncDebounceSequence.swift | 45 ++++-------- .../Debounce/DebounceStateMachine.swift | 52 +++++--------- .../Debounce/DebounceStorage.swift | 71 +++++++------------ 3 files changed, 60 insertions(+), 108 deletions(-) diff --git a/Sources/AsyncAlgorithms/Debounce/AsyncDebounceSequence.swift b/Sources/AsyncAlgorithms/Debounce/AsyncDebounceSequence.swift index 6b827709..1fc39486 100644 --- a/Sources/AsyncAlgorithms/Debounce/AsyncDebounceSequence.swift +++ b/Sources/AsyncAlgorithms/Debounce/AsyncDebounceSequence.swift @@ -29,28 +29,10 @@ extension AsyncSequence { /// has elapsed. @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) public struct AsyncDebounceSequence: Sendable where Base: Sendable { - /// This class is needed to hook the deinit to observe once all references to the ``AsyncDebounceSequence`` are dropped. - /// - /// If we get move-only types we should be able to drop this class and use the `deinit` of the ``AsyncDebounceSequence`` struct itself. - final class InternalClass: Sendable { - fileprivate let storage: DebounceStorage - - fileprivate init(storage: DebounceStorage) { - self.storage = storage - } - - deinit { - storage.sequenceDeinitialized() - } - } - - /// The internal class to hook the `deinit`. - let internalClass: InternalClass - - /// The underlying storage - fileprivate var storage: DebounceStorage { - self.internalClass.storage - } + private let base: Base + private let clock: C + private let interval: C.Instant.Duration + private let tolerance: C.Instant.Duration? /// Initializes a new ``AsyncDebounceSequence``. /// @@ -60,13 +42,10 @@ public struct AsyncDebounceSequence: Sendable whe /// - tolerance: The tolerance of the clock. /// - clock: The clock. public init(_ base: Base, interval: C.Instant.Duration, tolerance: C.Instant.Duration?, clock: C) { - let storage = DebounceStorage( - base: base, - interval: interval, - tolerance: tolerance, - clock: clock - ) - self.internalClass = .init(storage: storage) + self.base = base + self.interval = interval + self.tolerance = tolerance + self.clock = clock } } @@ -75,7 +54,13 @@ extension AsyncDebounceSequence: AsyncSequence { public typealias Element = Base.Element public func makeAsyncIterator() -> AsyncIterator { - AsyncIterator(storage: self.internalClass.storage) + let storage = DebounceStorage( + base: self.base, + interval: self.interval, + tolerance: self.tolerance, + clock: self.clock + ) + return AsyncIterator(storage: storage) } } diff --git a/Sources/AsyncAlgorithms/Debounce/DebounceStateMachine.swift b/Sources/AsyncAlgorithms/Debounce/DebounceStateMachine.swift index 1f2dd77c..221728bf 100644 --- a/Sources/AsyncAlgorithms/Debounce/DebounceStateMachine.swift +++ b/Sources/AsyncAlgorithms/Debounce/DebounceStateMachine.swift @@ -30,14 +30,14 @@ struct DebounceStateMachine { case demandSignalled( task: Task, clockContinuation: UnsafeContinuation?, - downstreamContinuation: UnsafeContinuation + downstreamContinuation: UnsafeContinuation, Never> ) /// The state while we are consuming the upstream and waiting for the Clock.sleep to finish. case debouncing( task: Task, upstreamContinuation: UnsafeContinuation?, - downstreamContinuation: UnsafeContinuation, + downstreamContinuation: UnsafeContinuation, Never>, currentElement: (element: Element, deadline: C.Instant) ) @@ -110,11 +110,9 @@ struct DebounceStateMachine { upstreamContinuation: UnsafeContinuation?, clockContinuation: UnsafeContinuation? ) - /// Indicates that nothing should be done. - case none } - mutating func iteratorDeinitialized() -> IteratorDeinitializedAction { + mutating func iteratorDeinitialized() -> IteratorDeinitializedAction? { switch self.state { case .initial: // An iterator needs to be initialized before it can be deinitialized. @@ -179,11 +177,9 @@ struct DebounceStateMachine { upstreamContinuation: UnsafeContinuation, error: Error ) - /// Indicates that nothing should be done. - case none } - mutating func upstreamTaskSuspended(_ continuation: UnsafeContinuation) -> UpstreamTaskSuspendedAction { + mutating func upstreamTaskSuspended(_ continuation: UnsafeContinuation) -> UpstreamTaskSuspendedAction? { switch self.state { case .initial: // Child tasks are only created after we transitioned to `merging` @@ -240,11 +236,9 @@ struct DebounceStateMachine { clockContinuation: UnsafeContinuation?, deadline: C.Instant ) - /// Indicates that nothing should be done. - case none } - mutating func elementProduced(_ element: Element, deadline: C.Instant) -> ElementProducedAction { + mutating func elementProduced(_ element: Element, deadline: C.Instant) -> ElementProducedAction? { switch self.state { case .initial: // Child tasks that are producing elements are only created after we transitioned to `merging` @@ -320,7 +314,7 @@ struct DebounceStateMachine { /// Indicates that the downstream continuation should be resumed with `nil` and /// the task and the upstream continuation should be cancelled. case resumeContinuationWithNilAndCancelTaskAndUpstreamAndClockContinuation( - downstreamContinuation: UnsafeContinuation, + downstreamContinuation: UnsafeContinuation, Never>, task: Task, upstreamContinuation: UnsafeContinuation?, clockContinuation: UnsafeContinuation? @@ -328,17 +322,15 @@ struct DebounceStateMachine { /// Indicates that the downstream continuation should be resumed with `nil` and /// the task and the upstream continuation should be cancelled. case resumeContinuationWithElementAndCancelTaskAndUpstreamAndClockContinuation( - downstreamContinuation: UnsafeContinuation, + downstreamContinuation: UnsafeContinuation, Never>, element: Element, task: Task, upstreamContinuation: UnsafeContinuation?, clockContinuation: UnsafeContinuation? ) - /// Indicates that nothing should be done. - case none } - mutating func upstreamFinished() -> UpstreamFinishedAction { + mutating func upstreamFinished() -> UpstreamFinishedAction? { switch self.state { case .initial: preconditionFailure("Internal inconsistency current state \(self.state) and received upstreamFinished()") @@ -410,17 +402,15 @@ struct DebounceStateMachine { /// Indicates that the downstream continuation should be resumed with the `error` and /// the task and the upstream continuation should be cancelled. case resumeContinuationWithErrorAndCancelTaskAndUpstreamContinuation( - downstreamContinuation: UnsafeContinuation, + downstreamContinuation: UnsafeContinuation, Never>, error: Error, task: Task, upstreamContinuation: UnsafeContinuation?, clockContinuation: UnsafeContinuation? ) - /// Indicates that nothing should be done. - case none } - mutating func upstreamThrew(_ error: Error) -> UpstreamThrewAction { + mutating func upstreamThrew(_ error: Error) -> UpstreamThrewAction? { switch self.state { case .initial: preconditionFailure("Internal inconsistency current state \(self.state) and received upstreamThrew()") @@ -495,11 +485,9 @@ struct DebounceStateMachine { clockContinuation: UnsafeContinuation, error: Error ) - /// Indicates that nothing should be done. - case none } - mutating func clockTaskSuspended(_ continuation: UnsafeContinuation) -> ClockTaskSuspendedAction { + mutating func clockTaskSuspended(_ continuation: UnsafeContinuation) -> ClockTaskSuspendedAction? { switch self.state { case .initial: // Child tasks are only created after we transitioned to `merging` @@ -568,14 +556,12 @@ struct DebounceStateMachine { enum ClockSleepFinishedAction { /// Indicates that the downstream continuation should be resumed with the given element. case resumeDownStreamContinuation( - downStreamContinuation: UnsafeContinuation, + downStreamContinuation: UnsafeContinuation, Never>, element: Element ) - /// Indicates that nothing should be done. - case none } - mutating func clockSleepFinished() -> ClockSleepFinishedAction { + mutating func clockSleepFinished() -> ClockSleepFinishedAction? { switch self.state { case .initial: // Child tasks are only created after we transitioned to `merging` @@ -625,16 +611,14 @@ struct DebounceStateMachine { /// Indicates that the downstream continuation needs to be resumed and /// task and the upstream continuations should be cancelled. case resumeDownstreamContinuationWithNilAndCancelTaskAndUpstreamAndClockContinuation( - downstreamContinuation: UnsafeContinuation, + downstreamContinuation: UnsafeContinuation, Never>, task: Task, upstreamContinuation: UnsafeContinuation?, clockContinuation: UnsafeContinuation? ) - /// Indicates that nothing should be done. - case none } - mutating func cancelled() -> CancelledAction { + mutating func cancelled() -> CancelledAction? { switch self.state { case .initial: // Since we are transitioning to `merging` before we return from `makeAsyncIterator` @@ -696,15 +680,15 @@ struct DebounceStateMachine { deadline: C.Instant ) /// Indicates that the downstream continuation should be resumed with `nil`. - case resumeDownstreamContinuationWithNil(UnsafeContinuation) + case resumeDownstreamContinuationWithNil(UnsafeContinuation, Never>) /// Indicates that the downstream continuation should be resumed with the error. case resumeDownstreamContinuationWithError( - UnsafeContinuation, + UnsafeContinuation, Never>, Error ) } - mutating func next(for continuation: UnsafeContinuation) -> NextAction { + mutating func next(for continuation: UnsafeContinuation, Never>) -> NextAction { switch self.state { case .initial: preconditionFailure("Internal inconsistency current state \(self.state) and received next()") diff --git a/Sources/AsyncAlgorithms/Debounce/DebounceStorage.swift b/Sources/AsyncAlgorithms/Debounce/DebounceStorage.swift index 5d621568..462f8cc8 100644 --- a/Sources/AsyncAlgorithms/Debounce/DebounceStorage.swift +++ b/Sources/AsyncAlgorithms/Debounce/DebounceStorage.swift @@ -13,10 +13,8 @@ final class DebounceStorage: @unchecked Sendable where Base: Sendable { typealias Element = Base.Element - /// The lock that protects our state. - private let lock = Lock.allocate() - /// The state machine. - private var stateMachine: DebounceStateMachine + /// The state machine protected with a lock. + private let stateMachine: ManagedCriticalState> /// The interval to debounce. private let interval: C.Instant.Duration /// The tolerance for the clock. @@ -25,23 +23,19 @@ final class DebounceStorage: @unchecked Sendable private let clock: C init(base: Base, interval: C.Instant.Duration, tolerance: C.Instant.Duration?, clock: C) { - self.stateMachine = .init(base: base, clock: clock, interval: interval) + self.stateMachine = .init(.init(base: base, clock: clock, interval: interval)) self.interval = interval self.tolerance = tolerance self.clock = clock } - deinit { - self.lock.deinitialize() - } - func sequenceDeinitialized() { - self.lock.withLock { self.stateMachine.sequenceDeinitialized() } + self.stateMachine.withCriticalRegion { $0.sequenceDeinitialized() } } func iteratorInitialized() { - self.lock.withLockVoid { - let action = self.stateMachine.iteratorInitialized() + self.stateMachine.withCriticalRegion { + let action = $0.iteratorInitialized() switch action { case .startTask(let base): @@ -58,9 +52,7 @@ final class DebounceStorage: @unchecked Sendable // if the downstream consumer called `next` to signal his demand // and until the Clock sleep finished. try await withUnsafeThrowingContinuation { continuation in - let action = self.lock.withLock { - self.stateMachine.upstreamTaskSuspended(continuation) - } + let action = self.stateMachine.withCriticalRegion { $0.upstreamTaskSuspended(continuation) } switch action { case .resumeContinuation(let continuation): @@ -80,9 +72,9 @@ final class DebounceStorage: @unchecked Sendable // We got signalled from the downstream that we have demand so let's // request a new element from the upstream if let element = try await iterator.next() { - let action = self.lock.withLock { + let action = self.stateMachine.withCriticalRegion { let deadline = self.clock.now.advanced(by: self.interval) - return self.stateMachine.elementProduced(element, deadline: deadline) + return $0.elementProduced(element, deadline: deadline) } switch action { @@ -94,9 +86,7 @@ final class DebounceStorage: @unchecked Sendable } } else { // The upstream returned `nil` which indicates that it finished - let action = self.lock.withLock { - self.stateMachine.upstreamFinished() - } + let action = self.stateMachine.withCriticalRegion { $0.upstreamFinished() } // All of this is mostly cleanup around the Task and the outstanding // continuations used for signalling. @@ -116,7 +106,7 @@ final class DebounceStorage: @unchecked Sendable clockContinuation?.resume(throwing: CancellationError()) task.cancel() - downstreamContinuation.resume(returning: nil) + downstreamContinuation.resume(returning: .success(nil)) break loop @@ -131,11 +121,10 @@ final class DebounceStorage: @unchecked Sendable clockContinuation?.resume(throwing: CancellationError()) task.cancel() - downstreamContinuation.resume(returning: element) + downstreamContinuation.resume(returning: .success(element)) break loop - case .none: break loop @@ -151,9 +140,7 @@ final class DebounceStorage: @unchecked Sendable // We are creating a continuation sleeping on the Clock. // This continuation is only resumed if the downstream consumer called `next`. let deadline: C.Instant = try await withUnsafeThrowingContinuation { continuation in - let action = self.lock.withLock { - self.stateMachine.clockTaskSuspended(continuation) - } + let action = self.stateMachine.withCriticalRegion { $0.clockTaskSuspended(continuation) } switch action { case .resumeContinuation(let continuation, let deadline): @@ -172,13 +159,11 @@ final class DebounceStorage: @unchecked Sendable try await self.clock.sleep(until: deadline, tolerance: self.tolerance) - let action = self.lock.withLock { - self.stateMachine.clockSleepFinished() - } + let action = self.stateMachine.withCriticalRegion { $0.clockSleepFinished() } switch action { case .resumeDownStreamContinuation(let downStreamContinuation, let element): - downStreamContinuation.resume(returning: element) + downStreamContinuation.resume(returning: .success(element)) case .none: break @@ -197,9 +182,7 @@ final class DebounceStorage: @unchecked Sendable try await group.waitForAll() } catch { // The upstream sequence threw an error - let action = self.lock.withLock { - self.stateMachine.upstreamThrew(error) - } + let action = self.stateMachine.withCriticalRegion { $0.upstreamThrew(error) } switch action { case .resumeContinuationWithErrorAndCancelTaskAndUpstreamContinuation( @@ -214,7 +197,7 @@ final class DebounceStorage: @unchecked Sendable task.cancel() - downstreamContinuation.resume(throwing: error) + downstreamContinuation.resume(returning: .failure(error)) case .cancelTaskAndClockContinuation( let task, @@ -232,13 +215,13 @@ final class DebounceStorage: @unchecked Sendable } } - self.stateMachine.taskStarted(task) + $0.taskStarted(task) } } } func iteratorDeinitialized() { - let action = self.lock.withLock { self.stateMachine.iteratorDeinitialized() } + let action = self.stateMachine.withCriticalRegion { $0.iteratorDeinitialized() } switch action { case .cancelTaskAndUpstreamAndClockContinuations( @@ -262,10 +245,8 @@ final class DebounceStorage: @unchecked Sendable return try await withTaskCancellationHandler { // We always suspend since we can never return an element right away - self.lock.lock() - return try await withUnsafeThrowingContinuation { continuation in - let action = self.stateMachine.next(for: continuation) - self.lock.unlock() + let result: Result = await withUnsafeContinuation { continuation in + let action = self.stateMachine.withCriticalRegion { $0.next(for: continuation) } switch action { case .resumeUpstreamContinuation(let upstreamContinuation): @@ -280,14 +261,16 @@ final class DebounceStorage: @unchecked Sendable clockContinuation?.resume(returning: deadline) case .resumeDownstreamContinuationWithNil(let continuation): - continuation.resume(returning: nil) + continuation.resume(returning: .success(nil)) case .resumeDownstreamContinuationWithError(let continuation, let error): - continuation.resume(throwing: error) + continuation.resume(returning: .failure(error)) } } + + return try result._rethrowGet() } onCancel: { - let action = self.lock.withLock { self.stateMachine.cancelled() } + let action = self.stateMachine.withCriticalRegion { $0.cancelled() } switch action { case .resumeDownstreamContinuationWithNilAndCancelTaskAndUpstreamAndClockContinuation( @@ -301,7 +284,7 @@ final class DebounceStorage: @unchecked Sendable task.cancel() - downstreamContinuation.resume(returning: nil) + downstreamContinuation.resume(returning: .success(nil)) case .none: break From 4bfb1e6d6602f890d0363be3d55f25acbc776430 Mon Sep 17 00:00:00 2001 From: FranzBusch Date: Wed, 14 Sep 2022 09:44:44 +0100 Subject: [PATCH 4/6] Remove lock methods --- Sources/AsyncAlgorithms/Locking.swift | 21 --------------------- 1 file changed, 21 deletions(-) diff --git a/Sources/AsyncAlgorithms/Locking.swift b/Sources/AsyncAlgorithms/Locking.swift index 74396080..eedad1ee 100644 --- a/Sources/AsyncAlgorithms/Locking.swift +++ b/Sources/AsyncAlgorithms/Locking.swift @@ -87,27 +87,6 @@ internal struct Lock { func unlock() { Lock.unlock(platformLock) } - - /// Acquire the lock for the duration of the given block. - /// - /// This convenience method should be preferred to `lock` and `unlock` in - /// most situations, as it ensures that the lock will be released regardless - /// of how `body` exits. - /// - /// - Parameter body: The block to execute while holding the lock. - /// - Returns: The value returned by the block. - func withLock(_ body: () throws -> T) rethrows -> T { - self.lock() - defer { - self.unlock() - } - return try body() - } - - // specialise Void return (for performance) - func withLockVoid(_ body: () throws -> Void) rethrows -> Void { - try self.withLock(body) - } } struct ManagedCriticalState { From c88249aa72ef6d0020d2c499be84db70c0ec8f4c Mon Sep 17 00:00:00 2001 From: FranzBusch Date: Wed, 14 Sep 2022 10:01:44 +0100 Subject: [PATCH 5/6] Cleanup some unused code --- .../Debounce/DebounceStateMachine.swift | 19 ------------------- .../Debounce/DebounceStorage.swift | 4 ---- 2 files changed, 23 deletions(-) diff --git a/Sources/AsyncAlgorithms/Debounce/DebounceStateMachine.swift b/Sources/AsyncAlgorithms/Debounce/DebounceStateMachine.swift index 221728bf..ab5a2ad1 100644 --- a/Sources/AsyncAlgorithms/Debounce/DebounceStateMachine.swift +++ b/Sources/AsyncAlgorithms/Debounce/DebounceStateMachine.swift @@ -64,25 +64,6 @@ struct DebounceStateMachine { self.interval = interval } - mutating func sequenceDeinitialized() { - switch self.state { - case .initial: - // The references to the sequence were dropped before any iterator was ever created - self.state = .finished - - case .waitingForDemand, .demandSignalled, .debouncing, .upstreamFailure: - // An iterator was created and we deinited the sequence. - // This is an expected pattern and we just continue on normal. - // Importantly since we are a unicast sequence no more iterators can be created - return - - case .finished: - // We are already finished so there is nothing left to clean up. - // This is just the references dropping afterwards. - return - } - } - /// Actions returned by `iteratorInitialized()`. enum IteratorInitializedAction { /// Indicates that a new `Task` should be created that consumes the sequence. diff --git a/Sources/AsyncAlgorithms/Debounce/DebounceStorage.swift b/Sources/AsyncAlgorithms/Debounce/DebounceStorage.swift index 462f8cc8..d58a2192 100644 --- a/Sources/AsyncAlgorithms/Debounce/DebounceStorage.swift +++ b/Sources/AsyncAlgorithms/Debounce/DebounceStorage.swift @@ -29,10 +29,6 @@ final class DebounceStorage: @unchecked Sendable self.clock = clock } - func sequenceDeinitialized() { - self.stateMachine.withCriticalRegion { $0.sequenceDeinitialized() } - } - func iteratorInitialized() { self.stateMachine.withCriticalRegion { let action = $0.iteratorInitialized() From bda1f5fb39d635b305fae8bc8e1f12f319861fcd Mon Sep 17 00:00:00 2001 From: FranzBusch Date: Thu, 29 Sep 2022 14:16:51 +0200 Subject: [PATCH 6/6] Setup task after first call to next --- .../Debounce/AsyncDebounceSequence.swift | 1 - .../Debounce/DebounceStateMachine.swift | 41 +- .../Debounce/DebounceStorage.swift | 418 +++++++++--------- 3 files changed, 225 insertions(+), 235 deletions(-) diff --git a/Sources/AsyncAlgorithms/Debounce/AsyncDebounceSequence.swift b/Sources/AsyncAlgorithms/Debounce/AsyncDebounceSequence.swift index 1fc39486..5ae17f14 100644 --- a/Sources/AsyncAlgorithms/Debounce/AsyncDebounceSequence.swift +++ b/Sources/AsyncAlgorithms/Debounce/AsyncDebounceSequence.swift @@ -75,7 +75,6 @@ extension AsyncDebounceSequence { fileprivate init(storage: DebounceStorage) { self.storage = storage - self.storage.iteratorInitialized() } deinit { diff --git a/Sources/AsyncAlgorithms/Debounce/DebounceStateMachine.swift b/Sources/AsyncAlgorithms/Debounce/DebounceStateMachine.swift index ab5a2ad1..cd6cc1a8 100644 --- a/Sources/AsyncAlgorithms/Debounce/DebounceStateMachine.swift +++ b/Sources/AsyncAlgorithms/Debounce/DebounceStateMachine.swift @@ -14,7 +14,7 @@ struct DebounceStateMachine { typealias Element = Base.Element private enum State { - /// The initial state before a call to `makeAsyncIterator` happened. + /// The initial state before a call to `next` happened. case initial(base: Base) /// The state while we are waiting for downstream demand. @@ -64,24 +64,6 @@ struct DebounceStateMachine { self.interval = interval } - /// Actions returned by `iteratorInitialized()`. - enum IteratorInitializedAction { - /// Indicates that a new `Task` should be created that consumes the sequence. - case startTask(Base) - } - - mutating func iteratorInitialized() -> IteratorInitializedAction { - switch self.state { - case .initial(let base): - // This is the first iterator being created and we need to create our `Task` - // that is consuming the upstream sequences. - return .startTask(base) - - case .waitingForDemand, .demandSignalled, .debouncing, .upstreamFailure, .finished: - fatalError("debounce allows only a single AsyncIterator to be created") - } - } - /// Actions returned by `iteratorDeinitialized()`. enum IteratorDeinitializedAction { /// Indicates that the `Task` needs to be cancelled and @@ -96,8 +78,8 @@ struct DebounceStateMachine { mutating func iteratorDeinitialized() -> IteratorDeinitializedAction? { switch self.state { case .initial: - // An iterator needs to be initialized before it can be deinitialized. - preconditionFailure("Internal inconsistency current state \(self.state) and received iteratorDeinitialized()") + // Nothing to do here. No demand was signalled until now + return .none case .debouncing, .demandSignalled: // An iterator was deinitialized while we have a suspended continuation. @@ -129,16 +111,15 @@ struct DebounceStateMachine { } } - mutating func taskStarted(_ task: Task) { + mutating func taskStarted(_ task: Task, downstreamContinuation: UnsafeContinuation, Never>) { switch self.state { case .initial: - // The user called `makeAsyncIterator` and we are starting the `Task` + // The user called `next` and we are starting the `Task` // to consume the upstream sequence - self.state = .waitingForDemand( + self.state = .demandSignalled( task: task, - upstreamContinuation: nil, clockContinuation: nil, - bufferedElement: nil + downstreamContinuation: downstreamContinuation ) case .debouncing, .demandSignalled, .waitingForDemand, .upstreamFailure, .finished: @@ -652,6 +633,8 @@ struct DebounceStateMachine { /// Actions returned by `next()`. enum NextAction { + /// Indicates that a new `Task` should be created that consumes the sequence. + case startTask(Base) case resumeUpstreamContinuation( upstreamContinuation: UnsafeContinuation? ) @@ -671,8 +654,10 @@ struct DebounceStateMachine { mutating func next(for continuation: UnsafeContinuation, Never>) -> NextAction { switch self.state { - case .initial: - preconditionFailure("Internal inconsistency current state \(self.state) and received next()") + case .initial(let base): + // This is the first time we get demand singalled so we have to start the task + // The transition to the next state is done in the taskStarted method + return .startTask(base) case .demandSignalled, .debouncing: // We already got demand signalled and have suspended the downstream task diff --git a/Sources/AsyncAlgorithms/Debounce/DebounceStorage.swift b/Sources/AsyncAlgorithms/Debounce/DebounceStorage.swift index d58a2192..21e1ddf6 100644 --- a/Sources/AsyncAlgorithms/Debounce/DebounceStorage.swift +++ b/Sources/AsyncAlgorithms/Debounce/DebounceStorage.swift @@ -29,193 +29,6 @@ final class DebounceStorage: @unchecked Sendable self.clock = clock } - func iteratorInitialized() { - self.stateMachine.withCriticalRegion { - let action = $0.iteratorInitialized() - - switch action { - case .startTask(let base): - let task = Task { - await withThrowingTaskGroup(of: Void.self) { group in - // The task that consumes the upstream sequence - group.addTask { - var iterator = base.makeAsyncIterator() - - // This is our upstream consumption loop - loop: while true { - // We are creating a continuation before requesting the next - // element from upstream. This continuation is only resumed - // if the downstream consumer called `next` to signal his demand - // and until the Clock sleep finished. - try await withUnsafeThrowingContinuation { continuation in - let action = self.stateMachine.withCriticalRegion { $0.upstreamTaskSuspended(continuation) } - - switch action { - case .resumeContinuation(let continuation): - // This happens if there is outstanding demand - // and we need to demand from upstream right away - continuation.resume(returning: ()) - - case .resumeContinuationWithError(let continuation, let error): - // This happens if the task got cancelled. - continuation.resume(throwing: error) - - case .none: - break - } - } - - // We got signalled from the downstream that we have demand so let's - // request a new element from the upstream - if let element = try await iterator.next() { - let action = self.stateMachine.withCriticalRegion { - let deadline = self.clock.now.advanced(by: self.interval) - return $0.elementProduced(element, deadline: deadline) - } - - switch action { - case .resumeClockContinuation(let clockContinuation, let deadline): - clockContinuation?.resume(returning: deadline) - - case .none: - break - } - } else { - // The upstream returned `nil` which indicates that it finished - let action = self.stateMachine.withCriticalRegion { $0.upstreamFinished() } - - // All of this is mostly cleanup around the Task and the outstanding - // continuations used for signalling. - switch action { - case .cancelTaskAndClockContinuation(let task, let clockContinuation): - task.cancel() - clockContinuation?.resume(throwing: CancellationError()) - - break loop - case .resumeContinuationWithNilAndCancelTaskAndUpstreamAndClockContinuation( - let downstreamContinuation, - let task, - let upstreamContinuation, - let clockContinuation - ): - upstreamContinuation?.resume(throwing: CancellationError()) - clockContinuation?.resume(throwing: CancellationError()) - task.cancel() - - downstreamContinuation.resume(returning: .success(nil)) - - break loop - - case .resumeContinuationWithElementAndCancelTaskAndUpstreamAndClockContinuation( - let downstreamContinuation, - let element, - let task, - let upstreamContinuation, - let clockContinuation - ): - upstreamContinuation?.resume(throwing: CancellationError()) - clockContinuation?.resume(throwing: CancellationError()) - task.cancel() - - downstreamContinuation.resume(returning: .success(element)) - - break loop - - case .none: - - break loop - } - } - } - } - - group.addTask { - // This is our clock scheduling loop - loop: while true { - do { - // We are creating a continuation sleeping on the Clock. - // This continuation is only resumed if the downstream consumer called `next`. - let deadline: C.Instant = try await withUnsafeThrowingContinuation { continuation in - let action = self.stateMachine.withCriticalRegion { $0.clockTaskSuspended(continuation) } - - switch action { - case .resumeContinuation(let continuation, let deadline): - // This happens if there is outstanding demand - // and we need to demand from upstream right away - continuation.resume(returning: deadline) - - case .resumeContinuationWithError(let continuation, let error): - // This happens if the task got cancelled. - continuation.resume(throwing: error) - - case .none: - break - } - } - - try await self.clock.sleep(until: deadline, tolerance: self.tolerance) - - let action = self.stateMachine.withCriticalRegion { $0.clockSleepFinished() } - - switch action { - case .resumeDownStreamContinuation(let downStreamContinuation, let element): - downStreamContinuation.resume(returning: .success(element)) - - case .none: - break - } - } catch { - // The only error that we expect is the `CancellationError` - // thrown from the Clock.sleep or from the withUnsafeContinuation. - // This happens if we are cleaning everything up. We can just drop that error and break our loop - precondition(error is CancellationError, "Received unexpected error \(error) in the Clock loop") - break loop - } - } - } - - do { - try await group.waitForAll() - } catch { - // The upstream sequence threw an error - let action = self.stateMachine.withCriticalRegion { $0.upstreamThrew(error) } - - switch action { - case .resumeContinuationWithErrorAndCancelTaskAndUpstreamContinuation( - let downstreamContinuation, - let error, - let task, - let upstreamContinuation, - let clockContinuation - ): - upstreamContinuation?.resume(throwing: CancellationError()) - clockContinuation?.resume(throwing: CancellationError()) - - task.cancel() - - downstreamContinuation.resume(returning: .failure(error)) - - case .cancelTaskAndClockContinuation( - let task, - let clockContinuation - ): - clockContinuation?.resume(throwing: CancellationError()) - task.cancel() - - case .none: - break - } - - group.cancelAll() - } - } - } - - $0.taskStarted(task) - } - } - } - func iteratorDeinitialized() { let action = self.stateMachine.withCriticalRegion { $0.iteratorDeinitialized() } @@ -242,25 +55,34 @@ final class DebounceStorage: @unchecked Sendable // We always suspend since we can never return an element right away let result: Result = await withUnsafeContinuation { continuation in - let action = self.stateMachine.withCriticalRegion { $0.next(for: continuation) } - - switch action { - case .resumeUpstreamContinuation(let upstreamContinuation): - // This is signalling the upstream task that is consuming the upstream - // sequence to signal demand. - upstreamContinuation?.resume(returning: ()) - - case .resumeUpstreamAndClockContinuation(let upstreamContinuation, let clockContinuation, let deadline): - // This is signalling the upstream task that is consuming the upstream - // sequence to signal demand and start the clock task. - upstreamContinuation?.resume(returning: ()) - clockContinuation?.resume(returning: deadline) - - case .resumeDownstreamContinuationWithNil(let continuation): - continuation.resume(returning: .success(nil)) - - case .resumeDownstreamContinuationWithError(let continuation, let error): - continuation.resume(returning: .failure(error)) + self.stateMachine.withCriticalRegion { + let action = $0.next(for: continuation) + + switch action { + case .startTask(let base): + self.startTask( + stateMachine: &$0, + base: base, + downstreamContinuation: continuation + ) + + case .resumeUpstreamContinuation(let upstreamContinuation): + // This is signalling the upstream task that is consuming the upstream + // sequence to signal demand. + upstreamContinuation?.resume(returning: ()) + + case .resumeUpstreamAndClockContinuation(let upstreamContinuation, let clockContinuation, let deadline): + // This is signalling the upstream task that is consuming the upstream + // sequence to signal demand and start the clock task. + upstreamContinuation?.resume(returning: ()) + clockContinuation?.resume(returning: deadline) + + case .resumeDownstreamContinuationWithNil(let continuation): + continuation.resume(returning: .success(nil)) + + case .resumeDownstreamContinuationWithError(let continuation, let error): + continuation.resume(returning: .failure(error)) + } } } @@ -287,4 +109,188 @@ final class DebounceStorage: @unchecked Sendable } } } + + private func startTask( + stateMachine: inout DebounceStateMachine, + base: Base, + downstreamContinuation: UnsafeContinuation, Never> + ) { + let task = Task { + await withThrowingTaskGroup(of: Void.self) { group in + // The task that consumes the upstream sequence + group.addTask { + var iterator = base.makeAsyncIterator() + + // This is our upstream consumption loop + loop: while true { + // We are creating a continuation before requesting the next + // element from upstream. This continuation is only resumed + // if the downstream consumer called `next` to signal his demand + // and until the Clock sleep finished. + try await withUnsafeThrowingContinuation { continuation in + let action = self.stateMachine.withCriticalRegion { $0.upstreamTaskSuspended(continuation) } + + switch action { + case .resumeContinuation(let continuation): + // This happens if there is outstanding demand + // and we need to demand from upstream right away + continuation.resume(returning: ()) + + case .resumeContinuationWithError(let continuation, let error): + // This happens if the task got cancelled. + continuation.resume(throwing: error) + + case .none: + break + } + } + + // We got signalled from the downstream that we have demand so let's + // request a new element from the upstream + if let element = try await iterator.next() { + let action = self.stateMachine.withCriticalRegion { + let deadline = self.clock.now.advanced(by: self.interval) + return $0.elementProduced(element, deadline: deadline) + } + + switch action { + case .resumeClockContinuation(let clockContinuation, let deadline): + clockContinuation?.resume(returning: deadline) + + case .none: + break + } + } else { + // The upstream returned `nil` which indicates that it finished + let action = self.stateMachine.withCriticalRegion { $0.upstreamFinished() } + + // All of this is mostly cleanup around the Task and the outstanding + // continuations used for signalling. + switch action { + case .cancelTaskAndClockContinuation(let task, let clockContinuation): + task.cancel() + clockContinuation?.resume(throwing: CancellationError()) + + break loop + case .resumeContinuationWithNilAndCancelTaskAndUpstreamAndClockContinuation( + let downstreamContinuation, + let task, + let upstreamContinuation, + let clockContinuation + ): + upstreamContinuation?.resume(throwing: CancellationError()) + clockContinuation?.resume(throwing: CancellationError()) + task.cancel() + + downstreamContinuation.resume(returning: .success(nil)) + + break loop + + case .resumeContinuationWithElementAndCancelTaskAndUpstreamAndClockContinuation( + let downstreamContinuation, + let element, + let task, + let upstreamContinuation, + let clockContinuation + ): + upstreamContinuation?.resume(throwing: CancellationError()) + clockContinuation?.resume(throwing: CancellationError()) + task.cancel() + + downstreamContinuation.resume(returning: .success(element)) + + break loop + + case .none: + + break loop + } + } + } + } + + group.addTask { + // This is our clock scheduling loop + loop: while true { + do { + // We are creating a continuation sleeping on the Clock. + // This continuation is only resumed if the downstream consumer called `next`. + let deadline: C.Instant = try await withUnsafeThrowingContinuation { continuation in + let action = self.stateMachine.withCriticalRegion { $0.clockTaskSuspended(continuation) } + + switch action { + case .resumeContinuation(let continuation, let deadline): + // This happens if there is outstanding demand + // and we need to demand from upstream right away + continuation.resume(returning: deadline) + + case .resumeContinuationWithError(let continuation, let error): + // This happens if the task got cancelled. + continuation.resume(throwing: error) + + case .none: + break + } + } + + try await self.clock.sleep(until: deadline, tolerance: self.tolerance) + + let action = self.stateMachine.withCriticalRegion { $0.clockSleepFinished() } + + switch action { + case .resumeDownStreamContinuation(let downStreamContinuation, let element): + downStreamContinuation.resume(returning: .success(element)) + + case .none: + break + } + } catch { + // The only error that we expect is the `CancellationError` + // thrown from the Clock.sleep or from the withUnsafeContinuation. + // This happens if we are cleaning everything up. We can just drop that error and break our loop + precondition(error is CancellationError, "Received unexpected error \(error) in the Clock loop") + break loop + } + } + } + + do { + try await group.waitForAll() + } catch { + // The upstream sequence threw an error + let action = self.stateMachine.withCriticalRegion { $0.upstreamThrew(error) } + + switch action { + case .resumeContinuationWithErrorAndCancelTaskAndUpstreamContinuation( + let downstreamContinuation, + let error, + let task, + let upstreamContinuation, + let clockContinuation + ): + upstreamContinuation?.resume(throwing: CancellationError()) + clockContinuation?.resume(throwing: CancellationError()) + + task.cancel() + + downstreamContinuation.resume(returning: .failure(error)) + + case .cancelTaskAndClockContinuation( + let task, + let clockContinuation + ): + clockContinuation?.resume(throwing: CancellationError()) + task.cancel() + + case .none: + break + } + + group.cancelAll() + } + } + } + + stateMachine.taskStarted(task, downstreamContinuation: downstreamContinuation) + } }