From 1b18131df41bf830f3f6008af41d7e7533ae48e2 Mon Sep 17 00:00:00 2001 From: FranzBusch Date: Sun, 11 Sep 2022 17:21:17 +0100 Subject: [PATCH] Remove `AsyncIterator: Sendable` requirement from debounce # Motivation The current implementation of `AsyncDebounceSequence` requires the base `AsyncIterator` to be `Sendable`. This is causing two problems: 1. It only allows users to use `debounce` if their `AsyncSequence.AsyncIterator` is `Sendable` 2. In `debounce` we are creating a lot of new `Task`s and reating `Task`s is not cheap. My main goal of this PR was to remove the `Sendable` constraint from `debounce`. # Modification This PR overhauls the implementation of `debounce` and aligns it with the implementation of the open `merge` PR https://github.com/apple/swift-async-algorithms/pull/185 . The most important changes are this: - I removed the `Sendable` requirement from the base sequences `AsyncIterator`. - Instead of creating new Tasks for the sleep and for the upstream consumption. I am now creating one Task and manipulate it by signalling continuations - I am not cancelling the sleep. Instead I am recalculating the time left to sleep when a sleep finishes. # Result In the end, this PR swaps the implementation of `AsyncDebounceSequence` and drops the `Sendable` constraint and passes all tests. Furthermore, on my local performance testing I saw up 150% speed increase in throughput. --- .../AsyncDebounceSequence.swift | 121 --- .../Debounce/AsyncDebounceSequence.swift | 115 +++ .../Debounce/DebounceStateMachine.swift | 747 ++++++++++++++++++ .../Debounce/DebounceStorage.swift | 294 +++++++ .../Performance/TestThroughput.swift | 6 + Tests/AsyncAlgorithmsTests/TestDebounce.swift | 10 + 6 files changed, 1172 insertions(+), 121 deletions(-) delete mode 100644 Sources/AsyncAlgorithms/AsyncDebounceSequence.swift create mode 100644 Sources/AsyncAlgorithms/Debounce/AsyncDebounceSequence.swift create mode 100644 Sources/AsyncAlgorithms/Debounce/DebounceStateMachine.swift create mode 100644 Sources/AsyncAlgorithms/Debounce/DebounceStorage.swift diff --git a/Sources/AsyncAlgorithms/AsyncDebounceSequence.swift b/Sources/AsyncAlgorithms/AsyncDebounceSequence.swift deleted file mode 100644 index 304dc01d..00000000 --- a/Sources/AsyncAlgorithms/AsyncDebounceSequence.swift +++ /dev/null @@ -1,121 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the Swift Async Algorithms open source project -// -// Copyright (c) 2022 Apple Inc. and the Swift project authors -// Licensed under Apache License v2.0 with Runtime Library Exception -// -// See https://swift.org/LICENSE.txt for license information -// -//===----------------------------------------------------------------------===// - -extension AsyncSequence { - /// Creates an asynchronous sequence that emits the latest element after a given quiescence period - /// has elapsed by using a specified Clock. - @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) - public func debounce(for interval: C.Instant.Duration, tolerance: C.Instant.Duration? = nil, clock: C) -> AsyncDebounceSequence { - AsyncDebounceSequence(self, interval: interval, tolerance: tolerance, clock: clock) - } - - /// Creates an asynchronous sequence that emits the latest element after a given quiescence period - /// has elapsed. - @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) - public func debounce(for interval: Duration, tolerance: Duration? = nil) -> AsyncDebounceSequence { - debounce(for: interval, tolerance: tolerance, clock: .continuous) - } -} - -/// An `AsyncSequence` that emits the latest element after a given quiescence period -/// has elapsed. -@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) -public struct AsyncDebounceSequence: Sendable - where Base.AsyncIterator: Sendable, Base.Element: Sendable, Base: Sendable { - let base: Base - let interval: C.Instant.Duration - let tolerance: C.Instant.Duration? - let clock: C - - init(_ base: Base, interval: C.Instant.Duration, tolerance: C.Instant.Duration?, clock: C) { - self.base = base - self.interval = interval - self.tolerance = tolerance - self.clock = clock - } -} - -@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) -extension AsyncDebounceSequence: AsyncSequence { - public typealias Element = Base.Element - - /// The iterator for a `AsyncDebounceSequence` instance. - public struct Iterator: AsyncIteratorProtocol, Sendable { - enum Partial: Sendable { - case sleep - case produce(Result, Base.AsyncIterator) - } - var iterator: Base.AsyncIterator - var produce: Task? - var terminal = false - let interval: C.Instant.Duration - let tolerance: C.Instant.Duration? - let clock: C - - init(_ base: Base.AsyncIterator, interval: C.Instant.Duration, tolerance: C.Instant.Duration?, clock: C) { - self.iterator = base - self.interval = interval - self.tolerance = tolerance - self.clock = clock - } - - public mutating func next() async rethrows -> Base.Element? { - var last: C.Instant? - var lastResult: Result? - while !terminal { - let deadline = (last ?? clock.now).advanced(by: interval) - let sleep: Task = Task { [tolerance, clock] in - try? await clock.sleep(until: deadline, tolerance: tolerance) - return .sleep - } - let produce: Task = self.produce ?? Task { [iterator] in - var iter = iterator - do { - let value = try await iter.next() - return .produce(.success(value), iter) - } catch { - return .produce(.failure(error), iter) - } - } - self.produce = nil - switch await Task.select(sleep, produce).value { - case .sleep: - self.produce = produce - if let result = lastResult { - return try result._rethrowGet() - } - break - case .produce(let result, let iter): - lastResult = result - last = clock.now - sleep.cancel() - self.iterator = iter - switch result { - case .success(let value): - if value == nil { - terminal = true - return nil - } - case .failure: - terminal = true - try result._rethrowError() - } - break - } - } - return nil - } - } - - public func makeAsyncIterator() -> Iterator { - Iterator(base.makeAsyncIterator(), interval: interval, tolerance: tolerance, clock: clock) - } -} diff --git a/Sources/AsyncAlgorithms/Debounce/AsyncDebounceSequence.swift b/Sources/AsyncAlgorithms/Debounce/AsyncDebounceSequence.swift new file mode 100644 index 00000000..6b827709 --- /dev/null +++ b/Sources/AsyncAlgorithms/Debounce/AsyncDebounceSequence.swift @@ -0,0 +1,115 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2022 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +extension AsyncSequence { + /// Creates an asynchronous sequence that emits the latest element after a given quiescence period + /// has elapsed by using a specified Clock. + @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) + public func debounce(for interval: C.Instant.Duration, tolerance: C.Instant.Duration? = nil, clock: C) -> AsyncDebounceSequence where Self: Sendable { + AsyncDebounceSequence(self, interval: interval, tolerance: tolerance, clock: clock) + } + + /// Creates an asynchronous sequence that emits the latest element after a given quiescence period + /// has elapsed. + @available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) + public func debounce(for interval: Duration, tolerance: Duration? = nil) -> AsyncDebounceSequence where Self: Sendable { + self.debounce(for: interval, tolerance: tolerance, clock: .continuous) + } +} + +/// An `AsyncSequence` that emits the latest element after a given quiescence period +/// has elapsed. +@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) +public struct AsyncDebounceSequence: Sendable where Base: Sendable { + /// This class is needed to hook the deinit to observe once all references to the ``AsyncDebounceSequence`` are dropped. + /// + /// If we get move-only types we should be able to drop this class and use the `deinit` of the ``AsyncDebounceSequence`` struct itself. + final class InternalClass: Sendable { + fileprivate let storage: DebounceStorage + + fileprivate init(storage: DebounceStorage) { + self.storage = storage + } + + deinit { + storage.sequenceDeinitialized() + } + } + + /// The internal class to hook the `deinit`. + let internalClass: InternalClass + + /// The underlying storage + fileprivate var storage: DebounceStorage { + self.internalClass.storage + } + + /// Initializes a new ``AsyncDebounceSequence``. + /// + /// - Parameters: + /// - base: The base sequence. + /// - interval: The interval to debounce. + /// - tolerance: The tolerance of the clock. + /// - clock: The clock. + public init(_ base: Base, interval: C.Instant.Duration, tolerance: C.Instant.Duration?, clock: C) { + let storage = DebounceStorage( + base: base, + interval: interval, + tolerance: tolerance, + clock: clock + ) + self.internalClass = .init(storage: storage) + } +} + +@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) +extension AsyncDebounceSequence: AsyncSequence { + public typealias Element = Base.Element + + public func makeAsyncIterator() -> AsyncIterator { + AsyncIterator(storage: self.internalClass.storage) + } +} + +@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) +extension AsyncDebounceSequence { + public struct AsyncIterator: AsyncIteratorProtocol { + /// This class is needed to hook the deinit to observe once all references to the ``AsyncIterator`` are dropped. + /// + /// If we get move-only types we should be able to drop this class and use the `deinit` of the ``AsyncIterator`` struct itself. + final class InternalClass: Sendable { + private let storage: DebounceStorage + + fileprivate init(storage: DebounceStorage) { + self.storage = storage + self.storage.iteratorInitialized() + } + + deinit { + self.storage.iteratorDeinitialized() + } + + func next() async rethrows -> Element? { + try await self.storage.next() + } + } + + let internalClass: InternalClass + + fileprivate init(storage: DebounceStorage) { + self.internalClass = InternalClass(storage: storage) + } + + public mutating func next() async rethrows -> Element? { + try await self.internalClass.next() + } + } +} diff --git a/Sources/AsyncAlgorithms/Debounce/DebounceStateMachine.swift b/Sources/AsyncAlgorithms/Debounce/DebounceStateMachine.swift new file mode 100644 index 00000000..d48253c1 --- /dev/null +++ b/Sources/AsyncAlgorithms/Debounce/DebounceStateMachine.swift @@ -0,0 +1,747 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2022 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) +struct DebounceStateMachine { + typealias Element = Base.Element + + private enum State { + /// The initial state before a call to `makeAsyncIterator` happened. + case initial(base: Base) + + /// The state while we are waiting for downstream demand. + case waitingForDemand( + task: Task, + upstreamContinuation: UnsafeContinuation?, + clockContinuation: UnsafeContinuation?, + bufferedElement: (element: Element, deadline: C.Instant)? + ) + + /// The state once the downstream signalled demand but before we received + /// the first element from the upstream. + case demandSignalled( + task: Task, + clockContinuation: UnsafeContinuation?, + downstreamContinuation: UnsafeContinuation + ) + + /// The state while we are consuming the upstream and waiting for the Clock.sleep to finish. + case debouncing( + task: Task, + upstreamContinuation: UnsafeContinuation?, + downstreamContinuation: UnsafeContinuation, + currentElement: (element: Element, deadline: C.Instant) + ) + + /// The state once any of the upstream sequences threw an `Error`. + case upstreamFailure( + error: Error + ) + + /// The state once all upstream sequences finished or the downstream consumer stopped, i.e. by dropping all references + /// or by getting their `Task` cancelled. + case finished + } + + /// The state machine's current state. + private var state: State + /// The interval to debounce. + private let interval: C.Instant.Duration + /// The clock. + private let clock: C + + init(base: Base, clock: C, interval: C.Instant.Duration) { + self.state = .initial(base: base) + self.clock = clock + self.interval = interval + } + + mutating func sequenceDeinitialized() { + switch self.state { + case .initial: + // The references to the sequence were dropped before any iterator was ever created + self.state = .finished + + case .waitingForDemand, .demandSignalled, .debouncing, .upstreamFailure: + // An iterator was created and we deinited the sequence. + // This is an expected pattern and we just continue on normal. + // Importantly since we are a unicast sequence no more iterators can be created + return + + case .finished: + // We are already finished so there is nothing left to clean up. + // This is just the references dropping afterwards. + return + } + } + + /// Actions returned by `iteratorInitialized()`. + enum IteratorInitializedAction { + /// Indicates that a new `Task` should be created that consumes the sequence. + case startTask(Base) + } + + mutating func iteratorInitialized() -> IteratorInitializedAction { + switch self.state { + case .initial(let base): + // This is the first iterator being created and we need to create our `Task` + // that is consuming the upstream sequences. + return .startTask(base) + + case .waitingForDemand, .demandSignalled, .debouncing, .upstreamFailure, .finished: + fatalError("debounce allows only a single AsyncIterator to be created") + } + } + + /// Actions returned by `iteratorDeinitialized()`. + enum IteratorDeinitializedAction { + /// Indicates that the `Task` needs to be cancelled and + /// the upstream and clock continuation need to be resumed with a `CancellationError`. + case cancelTaskAndUpstreamAndClockContinuations( + task: Task, + upstreamContinuation: UnsafeContinuation?, + clockContinuation: UnsafeContinuation? + ) + /// Indicates that nothing should be done. + case none + } + + mutating func iteratorDeinitialized() -> IteratorDeinitializedAction { + switch self.state { + case .initial: + // An iterator needs to be initialized before it can be deinitialized. + preconditionFailure("Internal inconsistency current state \(self.state) and received iteratorDeinitialized()") + + case .debouncing, .demandSignalled: + // An iterator was deinitialized while we have a suspended continuation. + preconditionFailure("Internal inconsistency current state \(self.state) and received iteratorDeinitialized()") + + case .waitingForDemand(let task, let upstreamContinuation, let clockContinuation, _): + // The iterator was dropped which signals that the consumer is finished. + // We can transition to finished now and need to clean everything up. + self.state = .finished + + return .cancelTaskAndUpstreamAndClockContinuations( + task: task, + upstreamContinuation: upstreamContinuation, + clockContinuation: clockContinuation + ) + + case .upstreamFailure: + // The iterator was dropped which signals that the consumer is finished. + // We can transition to finished now. The cleanup already happened when we + // transitioned to `upstreamFailure`. + self.state = .finished + + return .none + + case .finished: + // We are already finished so there is nothing left to clean up. + // This is just the references dropping afterwards. + return .none + } + } + + mutating func taskStarted(_ task: Task) { + switch self.state { + case .initial: + // The user called `makeAsyncIterator` and we are starting the `Task` + // to consume the upstream sequence + self.state = .waitingForDemand( + task: task, + upstreamContinuation: nil, + clockContinuation: nil, + bufferedElement: nil + ) + + case .debouncing, .demandSignalled, .waitingForDemand, .upstreamFailure, .finished: + // We only a single iterator to be created so this must never happen. + preconditionFailure("Internal inconsistency current state \(self.state) and received taskStarted()") + } + } + + /// Actions returned by `upstreamTaskSuspended()`. + enum UpstreamTaskSuspendedAction { + /// Indicates that the continuation should be resumed which will lead to calling `next` on the upstream. + case resumeContinuation( + upstreamContinuation: UnsafeContinuation + ) + /// Indicates that the continuation should be resumed with an Error because another upstream sequence threw. + case resumeContinuationWithError( + upstreamContinuation: UnsafeContinuation, + error: Error + ) + /// Indicates that nothing should be done. + case none + } + + mutating func upstreamTaskSuspended(_ continuation: UnsafeContinuation) -> UpstreamTaskSuspendedAction { + switch self.state { + case .initial: + // Child tasks are only created after we transitioned to `merging` + preconditionFailure("Internal inconsistency current state \(self.state) and received childTaskSuspended()") + + case .waitingForDemand(_, .some, _, _), .debouncing(_, .some, _, _): + // We already have an upstream continuation so we can never get a second one + preconditionFailure("Internal inconsistency current state \(self.state) and received childTaskSuspended()") + + case .upstreamFailure: + // The upstream already failed so it should never suspend again since the child task + // should have exited + preconditionFailure("Internal inconsistency current state \(self.state) and received childTaskSuspended()") + + case .waitingForDemand(let task, .none, let clockContinuation, let bufferedElement): + // The upstream task is ready to consume the next element + // we are just waiting to get demand + self.state = .waitingForDemand( + task: task, + upstreamContinuation: continuation, + clockContinuation: clockContinuation, + bufferedElement: bufferedElement + ) + + return .none + + case .demandSignalled: + // It can happen that the demand got signalled before our upstream suspended for the first time + // We need to resume it right away to demand the first element from the upstream + return .resumeContinuation(upstreamContinuation: continuation) + + case .debouncing(_, .none, _, _): + // We are currently debouncing and the upstream task suspended again + // We need to resume the continuation right away so that it continues to + // consume new elements from the upstream + + return .resumeContinuation(upstreamContinuation: continuation) + + case .finished: + // Since cancellation is cooperative it might be that child tasks are still getting + // suspended even though we already cancelled them. We must tolerate this and just resume + // the continuation with an error. + return .resumeContinuationWithError( + upstreamContinuation: continuation, + error: CancellationError() + ) + } + } + + /// Actions returned by `elementProduced()`. + enum ElementProducedAction { + /// Indicates that the clock continuation should be resumed to start the `Clock.sleep`. + case resumeClockContinuation( + clockContinuation: UnsafeContinuation?, + deadline: C.Instant + ) + /// Indicates that nothing should be done. + case none + } + + mutating func elementProduced(_ element: Element, deadline: C.Instant) -> ElementProducedAction { + switch self.state { + case .initial: + // Child tasks that are producing elements are only created after we transitioned to `merging` + preconditionFailure("Internal inconsistency current state \(self.state) and received elementProduced()") + + case .waitingForDemand(_, _, _, .some): + // We can only ever buffer one element because of the race of both child tasks + // After that element got buffered we are not resuming the upstream continuation + // and should never get another element until we get downstream demand signalled + preconditionFailure("Internal inconsistency current state \(self.state) and received elementProduced()") + + case .upstreamFailure: + // The upstream already failed so it should never have produced another element + preconditionFailure("Internal inconsistency current state \(self.state) and received childTaskSuspended()") + + case .waitingForDemand(let task, let upstreamContinuation, let clockContinuation, .none): + // We got an element even though we don't have an outstanding demand + // this can happen because we race the upstream and Clock child tasks + // and the upstream might finish after the Clock. We just need + // to buffer the element for the next demand. + self.state = .waitingForDemand( + task: task, + upstreamContinuation: upstreamContinuation, + clockContinuation: clockContinuation, + bufferedElement: (element, deadline) + ) + + return .none + + case .demandSignalled(let task, let clockContinuation, let downstreamContinuation): + // This is the first element that got produced after we got demand signalled + // We can now transition to debouncing and start the Clock.sleep + self.state = .debouncing( + task: task, + upstreamContinuation: nil, + downstreamContinuation: downstreamContinuation, + currentElement: (element, deadline) + ) + + let deadline = self.clock.now.advanced(by: self.interval) + return .resumeClockContinuation( + clockContinuation: clockContinuation, + deadline: deadline + ) + + case .debouncing(let task, let upstreamContinuation, let downstreamContinuation, _): + // We just got another element and the Clock hasn't finished sleeping yet + // We just need to store the new element + self.state = .debouncing( + task: task, + upstreamContinuation: upstreamContinuation, + downstreamContinuation: downstreamContinuation, + currentElement: (element, deadline) + ) + + return .none + + case .finished: + // Since cancellation is cooperative it might be that child tasks + // are still producing elements after we finished. + // We are just going to drop them since there is nothing we can do + return .none + } + } + + /// Actions returned by `upstreamFinished()`. + enum UpstreamFinishedAction { + /// Indicates that the task and the clock continuation should be cancelled. + case cancelTaskAndClockContinuation( + task: Task, + clockContinuation: UnsafeContinuation? + ) + /// Indicates that the downstream continuation should be resumed with `nil` and + /// the task and the upstream continuation should be cancelled. + case resumeContinuationWithNilAndCancelTaskAndUpstreamAndClockContinuation( + downstreamContinuation: UnsafeContinuation, + task: Task, + upstreamContinuation: UnsafeContinuation?, + clockContinuation: UnsafeContinuation? + ) + /// Indicates that nothing should be done. + case none + } + + mutating func upstreamFinished() -> UpstreamFinishedAction { + switch self.state { + case .initial: + preconditionFailure("Internal inconsistency current state \(self.state) and received upstreamFinished()") + + case .waitingForDemand(_, .some, _, _): + // We will never receive an upstream finished and have an outstanding continuation + // since we only receive finish after resuming the upstream continuation + preconditionFailure("Internal inconsistency current state \(self.state) and received upstreamFinished()") + + case .waitingForDemand(_, .none, _, .some): + // We will never receive an upstream finished while we have a buffered element + // To get there we would need to have received the buffered element and then + // received upstream finished all while waiting for demand; however, we should have + // never demanded the next element from upstream in the first place + preconditionFailure("Internal inconsistency current state \(self.state) and received upstreamFinished()") + + case .upstreamFailure: + // The upstream already failed so it should never have finished again + preconditionFailure("Internal inconsistency current state \(self.state) and received childTaskSuspended()") + + case .waitingForDemand(let task, .none, let clockContinuation, .none): + // We don't have any buffered element so we can just go ahead + // and transition to finished and cancel everything + self.state = .finished + + return .cancelTaskAndClockContinuation( + task: task, + clockContinuation: clockContinuation + ) + + case .demandSignalled(let task, let clockContinuation, let downstreamContinuation): + // We demanded the next element from the upstream after we got signalled demand + // and the upstream finished. This means we need to resume the downstream with nil + self.state = .finished + + return .resumeContinuationWithNilAndCancelTaskAndUpstreamAndClockContinuation( + downstreamContinuation: downstreamContinuation, + task: task, + upstreamContinuation: nil, + clockContinuation: clockContinuation + ) + + case .debouncing(let task, let upstreamContinuation, let downstreamContinuation, _): + // We are debouncing and the upstream finished. At this point + // we can just resume the downstream continuation with nil and cancel everything else + self.state = .finished + + return .resumeContinuationWithNilAndCancelTaskAndUpstreamAndClockContinuation( + downstreamContinuation: downstreamContinuation, + task: task, + upstreamContinuation: upstreamContinuation, + clockContinuation: nil + ) + + case .finished: + // This is just everything finishing up, nothing to do here + return .none + } + } + + /// Actions returned by `upstreamThrew()`. + enum UpstreamThrewAction { + /// Indicates that the task and the clock continuation should be cancelled. + case cancelTaskAndClockContinuation( + task: Task, + clockContinuation: UnsafeContinuation? + ) + /// Indicates that the downstream continuation should be resumed with the `error` and + /// the task and the upstream continuation should be cancelled. + case resumeContinuationWithErrorAndCancelTaskAndUpstreamContinuation( + downstreamContinuation: UnsafeContinuation, + error: Error, + task: Task, + upstreamContinuation: UnsafeContinuation?, + clockContinuation: UnsafeContinuation? + ) + /// Indicates that nothing should be done. + case none + } + + mutating func upstreamThrew(_ error: Error) -> UpstreamThrewAction { + switch self.state { + case .initial: + preconditionFailure("Internal inconsistency current state \(self.state) and received upstreamThrew()") + + case .waitingForDemand(_, .some, _, _): + // We will never receive an upstream threw and have an outstanding continuation + // since we only receive threw after resuming the upstream continuation + preconditionFailure("Internal inconsistency current state \(self.state) and received upstreamFinished()") + + case .waitingForDemand(_, .none, _, .some): + // We will never receive an upstream threw while we have a buffered element + // To get there we would need to have received the buffered element and then + // received upstream threw all while waiting for demand; however, we should have + // never demanded the next element from upstream in the first place + preconditionFailure("Internal inconsistency current state \(self.state) and received upstreamFinished()") + + case .upstreamFailure: + // The upstream already failed so it should never have throw again. + preconditionFailure("Internal inconsistency current state \(self.state) and received childTaskSuspended()") + + case .waitingForDemand(let task, .none, let clockContinuation, .none): + // We don't have any buffered element so we can just go ahead + // and transition to finished and cancel everything + self.state = .finished + + return .cancelTaskAndClockContinuation( + task: task, + clockContinuation: clockContinuation + ) + + case .demandSignalled(let task, let clockContinuation, let downstreamContinuation): + // We demanded the next element from the upstream after we got signalled demand + // and the upstream threw. This means we need to resume the downstream with the error + self.state = .finished + + return .resumeContinuationWithErrorAndCancelTaskAndUpstreamContinuation( + downstreamContinuation: downstreamContinuation, + error: error, + task: task, + upstreamContinuation: nil, + clockContinuation: clockContinuation + ) + + case .debouncing(let task, let upstreamContinuation, let downstreamContinuation, _): + // We are debouncing and the upstream threw. At this point + // we can just resume the downstream continuation with error and cancel everything else + self.state = .finished + + return .resumeContinuationWithErrorAndCancelTaskAndUpstreamContinuation( + downstreamContinuation: downstreamContinuation, + error: error, + task: task, + upstreamContinuation: upstreamContinuation, + clockContinuation: nil + ) + + case .finished: + // This is just everything finishing up, nothing to do here + return .none + } + } + + /// Actions returned by `clockTaskSuspended()`. + enum ClockTaskSuspendedAction { + /// Indicates that the continuation should be resumed which will lead to calling `sleep` on the Clock. + case resumeContinuation( + clockContinuation: UnsafeContinuation, + deadline: C.Instant + ) + /// Indicates that the continuation should be resumed with an Error because another upstream sequence threw. + case resumeContinuationWithError( + clockContinuation: UnsafeContinuation, + error: Error + ) + /// Indicates that nothing should be done. + case none + } + + mutating func clockTaskSuspended(_ continuation: UnsafeContinuation) -> ClockTaskSuspendedAction { + switch self.state { + case .initial: + // Child tasks are only created after we transitioned to `merging` + preconditionFailure("Internal inconsistency current state \(self.state) and received clockTaskSuspended()") + + case .waitingForDemand(_, _, .some, _): + // We already have a clock continuation so we can never get a second one + preconditionFailure("Internal inconsistency current state \(self.state) and received clockTaskSuspended()") + + case .demandSignalled(_, .some, _): + // We already have a clock continuation so we can never get a second one + preconditionFailure("Internal inconsistency current state \(self.state) and received clockTaskSuspended()") + + case .waitingForDemand(let task, let upstreamContinuation, .none, let bufferedElement): + // The clock child task suspended and we just need to store the continuation until + // demand is signalled + + self.state = .waitingForDemand( + task: task, + upstreamContinuation: upstreamContinuation, + clockContinuation: continuation, + bufferedElement: bufferedElement + ) + + return .none + + case .demandSignalled(let task, .none, let downstreamContinuation): + // The demand was signalled but we haven't gotten the first element from the upstream yet + // so we need to stay in this state and do nothing + self.state = .demandSignalled( + task: task, + clockContinuation: continuation, + downstreamContinuation: downstreamContinuation + ) + + return .none + + case .debouncing(_, _, _, let currentElement): + // We are currently debouncing and the Clock task suspended + // We need to resume the continuation right away. + return .resumeContinuation( + clockContinuation: continuation, + deadline: currentElement.deadline + ) + + case .upstreamFailure: + // The upstream failed while we were waiting to suspend the clock task again + // The task should have already been cancelled and we just need to cancel the continuation + return .resumeContinuationWithError( + clockContinuation: continuation, + error: CancellationError() + ) + + case .finished: + // Since cancellation is cooperative it might be that child tasks are still getting + // suspended even though we already cancelled them. We must tolerate this and just resume + // the continuation with an error. + return .resumeContinuationWithError( + clockContinuation: continuation, + error: CancellationError() + ) + } + } + + /// Actions returned by `clockSleepFinished()`. + enum ClockSleepFinishedAction { + /// Indicates that the downstream continuation should be resumed with the given element. + case resumeDownStreamContinuation( + downStreamContinuation: UnsafeContinuation, + element: Element + ) + /// Indicates that nothing should be done. + case none + } + + mutating func clockSleepFinished() -> ClockSleepFinishedAction { + switch self.state { + case .initial: + // Child tasks are only created after we transitioned to `merging` + preconditionFailure("Internal inconsistency current state \(self.state) and received clockSleepFinished()") + + case .waitingForDemand: + // This can never happen since we kicked-off the Clock.sleep because we got signalled demand. + preconditionFailure("Internal inconsistency current state \(self.state) and received clockSleepFinished()") + + case .demandSignalled: + // This can never happen since we are still waiting for the first element until we resume the Clock sleep. + preconditionFailure("Internal inconsistency current state \(self.state) and received clockSleepFinished()") + + case .debouncing(let task, let upstreamContinuation, let downstreamContinuation, let currentElement): + if currentElement.deadline <= self.clock.now { + // The deadline for the last produced element expired and we can forward it to the downstream + self.state = .waitingForDemand( + task: task, + upstreamContinuation: upstreamContinuation, + clockContinuation: nil, + bufferedElement: nil + ) + + return .resumeDownStreamContinuation( + downStreamContinuation: downstreamContinuation, + element: currentElement.element + ) + } else { + // The deadline is still in the future so we need to sleep again + return .none + } + + case .upstreamFailure: + // The upstream failed before the Clock.sleep finished + // We already cleaned everything up so nothing left to do here. + return .none + + case .finished: + // The upstream failed before the Clock.sleep finished + // We already cleaned everything up so nothing left to do here. + return .none + } + } + + /// Actions returned by `cancelled()`. + enum CancelledAction { + /// Indicates that the downstream continuation needs to be resumed and + /// task and the upstream continuations should be cancelled. + case resumeDownstreamContinuationWithNilAndCancelTaskAndUpstreamAndClockContinuation( + downstreamContinuation: UnsafeContinuation, + task: Task, + upstreamContinuation: UnsafeContinuation?, + clockContinuation: UnsafeContinuation? + ) + /// Indicates that nothing should be done. + case none + } + + mutating func cancelled() -> CancelledAction { + switch self.state { + case .initial: + // Since we are transitioning to `merging` before we return from `makeAsyncIterator` + // this can never happen + preconditionFailure("Internal inconsistency current state \(self.state) and received cancelled()") + + case .waitingForDemand: + // We got cancelled before we event got any demand. This can happen if a cancelled task + // calls next and the onCancel handler runs first. We can transition to finished right away. + self.state = .finished + + return .none + + case .demandSignalled(let task, let clockContinuation, let downstreamContinuation): + // We got cancelled while we were waiting for the first upstream element + // We can cancel everything at this point and return nil + self.state = .finished + + return .resumeDownstreamContinuationWithNilAndCancelTaskAndUpstreamAndClockContinuation( + downstreamContinuation: downstreamContinuation, + task: task, + upstreamContinuation: nil, + clockContinuation: clockContinuation + ) + + case .debouncing(let task, let upstreamContinuation, let downstreamContinuation, _): + // We got cancelled while debouncing. + // We can cancel everything at this point and return nil + self.state = .finished + + return .resumeDownstreamContinuationWithNilAndCancelTaskAndUpstreamAndClockContinuation( + downstreamContinuation: downstreamContinuation, + task: task, + upstreamContinuation: upstreamContinuation, + clockContinuation: nil + ) + + case .upstreamFailure: + // An upstream already threw and we cancelled everything already. + // We should stay in the upstream failure state until the error is consumed + return .none + + case .finished: + // We are already finished so nothing to do here: + self.state = .finished + + return .none + } + } + + /// Actions returned by `next()`. + enum NextAction { + case resumeUpstreamContinuation( + upstreamContinuation: UnsafeContinuation? + ) + case resumeUpstreamAndClockContinuation( + upstreamContinuation: UnsafeContinuation?, + clockContinuation: UnsafeContinuation?, + deadline: C.Instant + ) + /// Indicates that the downstream continuation should be resumed with `nil`. + case resumeDownstreamContinuationWithNil(UnsafeContinuation) + /// Indicates that the downstream continuation should be resumed with the error. + case resumeDownstreamContinuationWithError( + UnsafeContinuation, + Error + ) + } + + mutating func next(for continuation: UnsafeContinuation) -> NextAction { + switch self.state { + case .initial: + preconditionFailure("Internal inconsistency current state \(self.state) and received next()") + + case .demandSignalled, .debouncing: + // We already got demand signalled and have suspended the downstream task + // Getting a second next calls means the iterator was transferred across Tasks which is not allowed + preconditionFailure("Internal inconsistency current state \(self.state) and received next()") + + case .waitingForDemand(let task, let upstreamContinuation, let clockContinuation, let bufferedElement): + if let bufferedElement = bufferedElement { + // We already got an element from the last buffered one + // We can kick of the clock and upstream consumption right away and transition to debouncing + self.state = .debouncing( + task: task, + upstreamContinuation: nil, + downstreamContinuation: continuation, + currentElement: bufferedElement + ) + + return .resumeUpstreamAndClockContinuation( + upstreamContinuation: upstreamContinuation, + clockContinuation: clockContinuation, + deadline: bufferedElement.deadline + ) + } else { + // We don't have a buffered element so have to resume the upstream continuation + // to get the first one and transition to demandSignalled + self.state = .demandSignalled( + task: task, + clockContinuation: clockContinuation, + downstreamContinuation: continuation + ) + + return .resumeUpstreamContinuation(upstreamContinuation: upstreamContinuation) + } + + case .upstreamFailure(let error): + // The upstream threw and haven't delivered the error yet + // Let's deliver it and transition to finished + self.state = .finished + + return .resumeDownstreamContinuationWithError(continuation, error) + + case .finished: + // We are already finished so we are just returning `nil` + return .resumeDownstreamContinuationWithNil(continuation) + } + } +} diff --git a/Sources/AsyncAlgorithms/Debounce/DebounceStorage.swift b/Sources/AsyncAlgorithms/Debounce/DebounceStorage.swift new file mode 100644 index 00000000..677de7af --- /dev/null +++ b/Sources/AsyncAlgorithms/Debounce/DebounceStorage.swift @@ -0,0 +1,294 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Async Algorithms open source project +// +// Copyright (c) 2022 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// +//===----------------------------------------------------------------------===// + +@available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) +final class DebounceStorage: @unchecked Sendable where Base: Sendable { + typealias Element = Base.Element + + /// The lock that protects our state. + private let lock = Lock.allocate() + /// The state machine. + private var stateMachine: DebounceStateMachine + /// The interval to debounce. + private let interval: C.Instant.Duration + /// The tolerance for the clock. + private let tolerance: C.Instant.Duration? + /// The clock. + private let clock: C + + init(base: Base, interval: C.Instant.Duration, tolerance: C.Instant.Duration?, clock: C) { + self.stateMachine = .init(base: base, clock: clock, interval: interval) + self.interval = interval + self.tolerance = tolerance + self.clock = clock + } + + deinit { + self.lock.deinitialize() + } + + func sequenceDeinitialized() { + self.lock.withLock { self.stateMachine.sequenceDeinitialized() } + } + + func iteratorInitialized() { + self.lock.withLockVoid { + let action = self.stateMachine.iteratorInitialized() + + switch action { + case .startTask(let base): + let task = Task { + await withThrowingTaskGroup(of: Void.self) { group in + // The task that consumes the upstream sequence + group.addTask { + var iterator = base.makeAsyncIterator() + + // This is our upstream consumption loop + loop: while true { + // We are creating a continuation before requesting the next + // element from upstream. This continuation is only resumed + // if the downstream consumer called `next` to signal his demand + // and until the Clock sleep finished. + try await withUnsafeThrowingContinuation { continuation in + let action = self.lock.withLock { + self.stateMachine.upstreamTaskSuspended(continuation) + } + + switch action { + case .resumeContinuation(let continuation): + // This happens if there is outstanding demand + // and we need to demand from upstream right away + continuation.resume(returning: ()) + + case .resumeContinuationWithError(let continuation, let error): + // This happens if the task got cancelled. + continuation.resume(throwing: error) + + case .none: + break + } + } + + // We got signalled from the downstream that we have demand so let's + // request a new element from the upstream + if let element = try await iterator.next() { + let action = self.lock.withLock { + let deadline = self.clock.now.advanced(by: self.interval) + return self.stateMachine.elementProduced(element, deadline: deadline) + } + + switch action { + case .resumeClockContinuation(let clockContinuation, let deadline): + clockContinuation?.resume(returning: deadline) + + case .none: + break + } + } else { + // The upstream returned `nil` which indicates that it finished + let action = self.lock.withLock { + self.stateMachine.upstreamFinished() + } + + // All of this is mostly cleanup around the Task and the outstanding + // continuations used for signalling. + switch action { + case .cancelTaskAndClockContinuation(let task, let clockContinuation): + task.cancel() + clockContinuation?.resume(throwing: CancellationError()) + + break loop + case .resumeContinuationWithNilAndCancelTaskAndUpstreamAndClockContinuation( + let downstreamContinuation, + let task, + let upstreamContinuation, + let clockContinuation + ): + upstreamContinuation?.resume(throwing: CancellationError()) + clockContinuation?.resume(throwing: CancellationError()) + task.cancel() + + downstreamContinuation.resume(returning: nil) + + break loop + case .none: + + break loop + } + } + } + } + + group.addTask { + // This is our clock scheduling loop + loop: while true { + do { + // We are creating a continuation sleeping on the Clock. + // This continuation is only resumed if the downstream consumer called `next`. + let deadline: C.Instant = try await withUnsafeThrowingContinuation { continuation in + let action = self.lock.withLock { + self.stateMachine.clockTaskSuspended(continuation) + } + + switch action { + case .resumeContinuation(let continuation, let deadline): + // This happens if there is outstanding demand + // and we need to demand from upstream right away + continuation.resume(returning: deadline) + + case .resumeContinuationWithError(let continuation, let error): + // This happens if the task got cancelled. + continuation.resume(throwing: error) + + case .none: + break + } + } + + try await self.clock.sleep(until: deadline, tolerance: self.tolerance) + + let action = self.lock.withLock { + self.stateMachine.clockSleepFinished() + } + + switch action { + case .resumeDownStreamContinuation(let downStreamContinuation, let element): + downStreamContinuation.resume(returning: element) + + case .none: + break + } + } catch { + // The only error that we expect is the `CancellationError` + // thrown from the Clock.sleep or from the withUnsafeContinuation. + // This happens if we are cleaning everything up. We can just drop that error and break our loop + precondition(error is CancellationError, "Received unexpected error \(error) in the Clock loop") + break loop + } + } + } + + do { + try await group.waitForAll() + } catch { + // The upstream sequence threw an error + let action = self.lock.withLock { + self.stateMachine.upstreamThrew(error) + } + + switch action { + case .resumeContinuationWithErrorAndCancelTaskAndUpstreamContinuation( + let downstreamContinuation, + let error, + let task, + let upstreamContinuation, + let clockContinuation + ): + upstreamContinuation?.resume(throwing: CancellationError()) + clockContinuation?.resume(throwing: CancellationError()) + + task.cancel() + + downstreamContinuation.resume(throwing: error) + + case .cancelTaskAndClockContinuation( + let task, + let clockContinuation + ): + clockContinuation?.resume(throwing: CancellationError()) + task.cancel() + + case .none: + break + } + + group.cancelAll() + } + } + } + + self.stateMachine.taskStarted(task) + } + } + } + + func iteratorDeinitialized() { + let action = self.lock.withLock { self.stateMachine.iteratorDeinitialized() } + + switch action { + case .cancelTaskAndUpstreamAndClockContinuations( + let task, + let upstreamContinuation, + let clockContinuation + ): + upstreamContinuation?.resume(throwing: CancellationError()) + clockContinuation?.resume(throwing: CancellationError()) + + task.cancel() + + case .none: + break + } + } + + func next() async rethrows -> Element? { + // We need to handle cancellation here because we are creating a continuation + // and because we need to cancel the `Task` we created to consume the upstream + return try await withTaskCancellationHandler { + // We always suspend since we can never return an element right away + + self.lock.lock() + return try await withUnsafeThrowingContinuation { continuation in + let action = self.stateMachine.next(for: continuation) + self.lock.unlock() + + switch action { + case .resumeUpstreamContinuation(let upstreamContinuation): + // This is signalling the upstream task that is consuming the upstream + // sequence to signal demand. + upstreamContinuation?.resume(returning: ()) + + case .resumeUpstreamAndClockContinuation(let upstreamContinuation, let clockContinuation, let deadline): + // This is signalling the upstream task that is consuming the upstream + // sequence to signal demand and start the clock task. + upstreamContinuation?.resume(returning: ()) + clockContinuation?.resume(returning: deadline) + + case .resumeDownstreamContinuationWithNil(let continuation): + continuation.resume(returning: nil) + + case .resumeDownstreamContinuationWithError(let continuation, let error): + continuation.resume(throwing: error) + } + } + } onCancel: { + let action = self.lock.withLock { self.stateMachine.cancelled() } + + switch action { + case .resumeDownstreamContinuationWithNilAndCancelTaskAndUpstreamAndClockContinuation( + let downstreamContinuation, + let task, + let upstreamContinuation, + let clockContinuation + ): + upstreamContinuation?.resume(throwing: CancellationError()) + clockContinuation?.resume(throwing: CancellationError()) + + task.cancel() + + downstreamContinuation.resume(returning: nil) + + case .none: + break + } + } + } +} diff --git a/Tests/AsyncAlgorithmsTests/Performance/TestThroughput.swift b/Tests/AsyncAlgorithmsTests/Performance/TestThroughput.swift index 2d490cc9..f1868b5b 100644 --- a/Tests/AsyncAlgorithmsTests/Performance/TestThroughput.swift +++ b/Tests/AsyncAlgorithmsTests/Performance/TestThroughput.swift @@ -64,5 +64,11 @@ final class TestThroughput: XCTestCase { zip($0, $1, $2) } } + @available(macOS 13.0, *) + func test_debounce() async { + await measureSequenceThroughput(source: (1...).async) { + $0.debounce(for: .zero, clock: ContinuousClock()) + } + } } #endif diff --git a/Tests/AsyncAlgorithmsTests/TestDebounce.swift b/Tests/AsyncAlgorithmsTests/TestDebounce.swift index 2005c134..8f717011 100644 --- a/Tests/AsyncAlgorithmsTests/TestDebounce.swift +++ b/Tests/AsyncAlgorithmsTests/TestDebounce.swift @@ -58,4 +58,14 @@ final class TestDebounce: XCTestCase { "----|" } } + + func test_Rethrows() async throws { + guard #available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *) else { throw XCTSkip("Skipped due to Clock/Instant/Duration availability") } + + let debounce = [1].async.debounce(for: .zero, clock: ContinuousClock()) + for await _ in debounce {} + + let throwingDebounce = [1].async.map { try throwOn(2, $0) }.debounce(for: .zero, clock: ContinuousClock()) + for try await _ in throwingDebounce {} + } }