From c17eb746a9522b7bd2642a7b3fa6a9aff2ab765e Mon Sep 17 00:00:00 2001 From: Josh Triplett Date: Fri, 1 Jul 2022 20:33:01 -0700 Subject: [PATCH] Support blocking and non-blocking operations on the same Mutex Add `lock_blocking` and `lock_arc_blocking` methods to allow sharing the same lock between blocking and non-blocking code. Introduce a helper macro to avoid duplicating the code of the `acquire_slow` method. --- src/mutex.rs | 267 ++++++++++++++++++++++++++++++++------------------- 1 file changed, 167 insertions(+), 100 deletions(-) diff --git a/src/mutex.rs b/src/mutex.rs index e5d67e2..6fbf855 100644 --- a/src/mutex.rs +++ b/src/mutex.rs @@ -85,6 +85,113 @@ impl Mutex { } } +macro_rules! acquire_slow_method { + ($name:ident, $listener:ident, $wait:expr, $doc:literal$(, $maybe_async:ident)?$(,)?) => { + #[doc = $doc] + #[cold] + $($maybe_async)? fn $name(&self) { + // Get the current time. + #[cfg(not(target_arch = "wasm32"))] + let start = Instant::now(); + + loop { + // Start listening for events. + let $listener = self.lock_ops.listen(); + + // Try locking if nobody is being starved. + match self + .state + .compare_exchange(0, 1, Ordering::Acquire, Ordering::Acquire) + .unwrap_or_else(|x| x) + { + // Lock acquired! + 0 => return, + + // Lock is held and nobody is starved. + 1 => {} + + // Somebody is starved. + _ => break, + } + + // Wait for a notification. + $wait; + + // Try locking if nobody is being starved. + match self + .state + .compare_exchange(0, 1, Ordering::Acquire, Ordering::Acquire) + .unwrap_or_else(|x| x) + { + // Lock acquired! + 0 => return, + + // Lock is held and nobody is starved. + 1 => {} + + // Somebody is starved. + _ => { + // Notify the first listener in line because we probably received a + // notification that was meant for a starved task. + self.lock_ops.notify(1); + break; + } + } + + // If waiting for too long, fall back to a fairer locking strategy that will prevent + // newer lock operations from starving us forever. + #[cfg(not(target_arch = "wasm32"))] + if start.elapsed() > Duration::from_micros(500) { + break; + } + } + + // Increment the number of starved lock operations. + if self.state.fetch_add(2, Ordering::Release) > usize::MAX / 2 { + // In case of potential overflow, abort. + process::abort(); + } + + // Decrement the counter when exiting this function. + let _call = CallOnDrop(|| { + self.state.fetch_sub(2, Ordering::Release); + }); + + loop { + // Start listening for events. + let $listener = self.lock_ops.listen(); + + // Try locking if nobody else is being starved. + match self + .state + .compare_exchange(2, 2 | 1, Ordering::Acquire, Ordering::Acquire) + .unwrap_or_else(|x| x) + { + // Lock acquired! + 2 => return, + + // Lock is held by someone. + s if s % 2 == 1 => {} + + // Lock is available. + _ => { + // Be fair: notify the first listener and then go wait in line. + self.lock_ops.notify(1); + } + } + + // Wait for a notification. + $wait; + + // Try acquiring the lock without waiting for others. + if self.state.fetch_or(1, Ordering::Acquire) % 2 == 0 { + return; + } + } + } + }; +} + impl Mutex { /// Acquires the mutex. /// @@ -110,109 +217,42 @@ impl Mutex { MutexGuard(self) } - /// Slow path for acquiring the mutex. - #[cold] - async fn acquire_slow(&self) { - // Get the current time. - #[cfg(not(target_arch = "wasm32"))] - let start = Instant::now(); - - loop { - // Start listening for events. - let listener = self.lock_ops.listen(); - - // Try locking if nobody is being starved. - match self - .state - .compare_exchange(0, 1, Ordering::Acquire, Ordering::Acquire) - .unwrap_or_else(|x| x) - { - // Lock acquired! - 0 => return, - - // Lock is held and nobody is starved. - 1 => {} - - // Somebody is starved. - _ => break, - } - - // Wait for a notification. - listener.await; - - // Try locking if nobody is being starved. - match self - .state - .compare_exchange(0, 1, Ordering::Acquire, Ordering::Acquire) - .unwrap_or_else(|x| x) - { - // Lock acquired! - 0 => return, - - // Lock is held and nobody is starved. - 1 => {} - - // Somebody is starved. - _ => { - // Notify the first listener in line because we probably received a - // notification that was meant for a starved task. - self.lock_ops.notify(1); - break; - } - } - - // If waiting for too long, fall back to a fairer locking strategy that will prevent - // newer lock operations from starving us forever. - #[cfg(not(target_arch = "wasm32"))] - if start.elapsed() > Duration::from_micros(500) { - break; - } - } - - // Increment the number of starved lock operations. - if self.state.fetch_add(2, Ordering::Release) > usize::MAX / 2 { - // In case of potential overflow, abort. - process::abort(); - } - - // Decrement the counter when exiting this function. - let _call = CallOnDrop(|| { - self.state.fetch_sub(2, Ordering::Release); - }); - - loop { - // Start listening for events. - let listener = self.lock_ops.listen(); - - // Try locking if nobody else is being starved. - match self - .state - .compare_exchange(2, 2 | 1, Ordering::Acquire, Ordering::Acquire) - .unwrap_or_else(|x| x) - { - // Lock acquired! - 2 => return, - - // Lock is held by someone. - s if s % 2 == 1 => {} - - // Lock is available. - _ => { - // Be fair: notify the first listener and then go wait in line. - self.lock_ops.notify(1); - } - } - - // Wait for a notification. - listener.await; - - // Try acquiring the lock without waiting for others. - if self.state.fetch_or(1, Ordering::Acquire) % 2 == 0 { - return; - } + /// Acquires the mutex for use in blocking (non-async) code. + /// + /// Returns a guard that releases the mutex when dropped. + /// + /// # Examples + /// + /// ``` + /// use async_lock::Mutex; + /// + /// let mutex = Mutex::new(10); + /// let guard = mutex.lock_blocking(); + /// assert_eq!(*guard, 10); + /// ``` + #[inline] + pub fn lock_blocking(&self) -> MutexGuard<'_, T> { + if let Some(guard) = self.try_lock() { + return guard; } + self.acquire_slow_blocking(); + MutexGuard(self) } + acquire_slow_method!( + acquire_slow, + listener, + listener.await, + "Slow path for acquiring the mutex.", + async, + ); + acquire_slow_method!( + acquire_slow_blocking, + listener, + listener.wait(), + "Slow path for acquiring the mutex for blocking (non-async) code.", + ); + /// Attempts to acquire the mutex. /// /// If the mutex could not be acquired at this time, then [`None`] is returned. Otherwise, a @@ -272,6 +312,14 @@ impl Mutex { MutexGuardArc(self) } + fn lock_arc_blocking_impl(self: Arc) -> MutexGuardArc { + if let Some(guard) = self.try_lock_arc() { + return guard; + } + self.acquire_slow_blocking(); + MutexGuardArc(self) + } + /// Acquires the mutex and clones a reference to it. /// /// Returns an owned guard that releases the mutex when dropped. @@ -293,6 +341,25 @@ impl Mutex { self.clone().lock_arc_impl() } + /// Acquires the mutex and clones a reference to it, for use in blocking (non-async) code. + /// + /// Returns an owned guard that releases the mutex when dropped. + /// + /// # Examples + /// + /// ``` + /// use async_lock::Mutex; + /// use std::sync::Arc; + /// + /// let mutex = Arc::new(Mutex::new(10)); + /// let guard = mutex.lock_arc_blocking(); + /// assert_eq!(*guard, 10); + /// ``` + #[inline] + pub fn lock_arc_blocking(self: &Arc) -> MutexGuardArc { + self.clone().lock_arc_blocking_impl() + } + /// Attempts to acquire the mutex and clone a reference to it. /// /// If the mutex could not be acquired at this time, then [`None`] is returned. Otherwise, an