Skip to content

Support blocking and non-blocking operations on the same Mutex #25

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
267 changes: 167 additions & 100 deletions src/mutex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,113 @@ impl<T> Mutex<T> {
}
}

macro_rules! acquire_slow_method {
($name:ident, $listener:ident, $wait:expr, $doc:literal$(, $maybe_async:ident)?$(,)?) => {
#[doc = $doc]
#[cold]
$($maybe_async)? fn $name(&self) {
// Get the current time.
#[cfg(not(target_arch = "wasm32"))]
let start = Instant::now();

loop {
// Start listening for events.
let $listener = self.lock_ops.listen();

// Try locking if nobody is being starved.
match self
.state
.compare_exchange(0, 1, Ordering::Acquire, Ordering::Acquire)
.unwrap_or_else(|x| x)
{
// Lock acquired!
0 => return,

// Lock is held and nobody is starved.
1 => {}

// Somebody is starved.
_ => break,
}
Comment on lines +102 to +115
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we factor this block of code into a separate method? It is repeated 2 times in this method with small variations in the _=> case, and with this PR applied would then be duplicated again, resulting in 5 repetitions...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we can factor that into a method, since it includes control flow. I could make it a macro if you like.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since it includes control flow

that can be factored out, using something like the core::ops::ControlFlow, e.g.:

enum ScxResult {
    /// Lock acquired
    Acquired,
    /// Lock is held and nobody is starved
    HeldUnstarved,
    /// Somebody is starved
    Starved,
}
fn do_scx(state: &...) -> ScxResult {
    use ScxResult::*;
    match state
        .compare_exchange(0, 1, Ordering::Acquire, Ordering::Acquire)
        .unwrap_or_else(|x| x)
    {
        0 => Acquired,
        1 => HeldUnstarved,
        _ => Starved,
    }
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could refactor this into a trait, like I did for the OnceCell in #27.


// Wait for a notification.
$wait;

// Try locking if nobody is being starved.
match self
.state
.compare_exchange(0, 1, Ordering::Acquire, Ordering::Acquire)
.unwrap_or_else(|x| x)
{
// Lock acquired!
0 => return,

// Lock is held and nobody is starved.
1 => {}

// Somebody is starved.
_ => {
// Notify the first listener in line because we probably received a
// notification that was meant for a starved task.
self.lock_ops.notify(1);
break;
}
}

// If waiting for too long, fall back to a fairer locking strategy that will prevent
// newer lock operations from starving us forever.
#[cfg(not(target_arch = "wasm32"))]
if start.elapsed() > Duration::from_micros(500) {
break;
}
}

// Increment the number of starved lock operations.
if self.state.fetch_add(2, Ordering::Release) > usize::MAX / 2 {
// In case of potential overflow, abort.
process::abort();
}

// Decrement the counter when exiting this function.
let _call = CallOnDrop(|| {
self.state.fetch_sub(2, Ordering::Release);
});

loop {
// Start listening for events.
let $listener = self.lock_ops.listen();

// Try locking if nobody else is being starved.
match self
.state
.compare_exchange(2, 2 | 1, Ordering::Acquire, Ordering::Acquire)
.unwrap_or_else(|x| x)
{
// Lock acquired!
2 => return,

// Lock is held by someone.
s if s % 2 == 1 => {}

// Lock is available.
_ => {
// Be fair: notify the first listener and then go wait in line.
self.lock_ops.notify(1);
}
}

// Wait for a notification.
$wait;

// Try acquiring the lock without waiting for others.
if self.state.fetch_or(1, Ordering::Acquire) % 2 == 0 {
return;
}
}
}
};
}

impl<T: ?Sized> Mutex<T> {
/// Acquires the mutex.
///
Expand All @@ -110,109 +217,42 @@ impl<T: ?Sized> Mutex<T> {
MutexGuard(self)
}

/// Slow path for acquiring the mutex.
#[cold]
async fn acquire_slow(&self) {
// Get the current time.
#[cfg(not(target_arch = "wasm32"))]
let start = Instant::now();

loop {
// Start listening for events.
let listener = self.lock_ops.listen();

// Try locking if nobody is being starved.
match self
.state
.compare_exchange(0, 1, Ordering::Acquire, Ordering::Acquire)
.unwrap_or_else(|x| x)
{
// Lock acquired!
0 => return,

// Lock is held and nobody is starved.
1 => {}

// Somebody is starved.
_ => break,
}

// Wait for a notification.
listener.await;

// Try locking if nobody is being starved.
match self
.state
.compare_exchange(0, 1, Ordering::Acquire, Ordering::Acquire)
.unwrap_or_else(|x| x)
{
// Lock acquired!
0 => return,

// Lock is held and nobody is starved.
1 => {}

// Somebody is starved.
_ => {
// Notify the first listener in line because we probably received a
// notification that was meant for a starved task.
self.lock_ops.notify(1);
break;
}
}

// If waiting for too long, fall back to a fairer locking strategy that will prevent
// newer lock operations from starving us forever.
#[cfg(not(target_arch = "wasm32"))]
if start.elapsed() > Duration::from_micros(500) {
break;
}
}

// Increment the number of starved lock operations.
if self.state.fetch_add(2, Ordering::Release) > usize::MAX / 2 {
// In case of potential overflow, abort.
process::abort();
}

// Decrement the counter when exiting this function.
let _call = CallOnDrop(|| {
self.state.fetch_sub(2, Ordering::Release);
});

loop {
// Start listening for events.
let listener = self.lock_ops.listen();

// Try locking if nobody else is being starved.
match self
.state
.compare_exchange(2, 2 | 1, Ordering::Acquire, Ordering::Acquire)
.unwrap_or_else(|x| x)
{
// Lock acquired!
2 => return,

// Lock is held by someone.
s if s % 2 == 1 => {}

// Lock is available.
_ => {
// Be fair: notify the first listener and then go wait in line.
self.lock_ops.notify(1);
}
}

// Wait for a notification.
listener.await;

// Try acquiring the lock without waiting for others.
if self.state.fetch_or(1, Ordering::Acquire) % 2 == 0 {
return;
}
/// Acquires the mutex for use in blocking (non-async) code.
///
/// Returns a guard that releases the mutex when dropped.
///
/// # Examples
///
/// ```
/// use async_lock::Mutex;
///
/// let mutex = Mutex::new(10);
/// let guard = mutex.lock_blocking();
/// assert_eq!(*guard, 10);
/// ```
#[inline]
pub fn lock_blocking(&self) -> MutexGuard<'_, T> {
if let Some(guard) = self.try_lock() {
return guard;
}
self.acquire_slow_blocking();
MutexGuard(self)
}

acquire_slow_method!(
acquire_slow,
listener,
listener.await,
"Slow path for acquiring the mutex.",
async,
);
acquire_slow_method!(
acquire_slow_blocking,
listener,
listener.wait(),
"Slow path for acquiring the mutex for blocking (non-async) code.",
);

/// Attempts to acquire the mutex.
///
/// If the mutex could not be acquired at this time, then [`None`] is returned. Otherwise, a
Expand Down Expand Up @@ -272,6 +312,14 @@ impl<T: ?Sized> Mutex<T> {
MutexGuardArc(self)
}

fn lock_arc_blocking_impl(self: Arc<Self>) -> MutexGuardArc<T> {
if let Some(guard) = self.try_lock_arc() {
return guard;
}
self.acquire_slow_blocking();
MutexGuardArc(self)
}

/// Acquires the mutex and clones a reference to it.
///
/// Returns an owned guard that releases the mutex when dropped.
Expand All @@ -293,6 +341,25 @@ impl<T: ?Sized> Mutex<T> {
self.clone().lock_arc_impl()
}

/// Acquires the mutex and clones a reference to it, for use in blocking (non-async) code.
///
/// Returns an owned guard that releases the mutex when dropped.
///
/// # Examples
///
/// ```
/// use async_lock::Mutex;
/// use std::sync::Arc;
///
/// let mutex = Arc::new(Mutex::new(10));
/// let guard = mutex.lock_arc_blocking();
/// assert_eq!(*guard, 10);
/// ```
#[inline]
pub fn lock_arc_blocking(self: &Arc<Self>) -> MutexGuardArc<T> {
self.clone().lock_arc_blocking_impl()
}

/// Attempts to acquire the mutex and clone a reference to it.
///
/// If the mutex could not be acquired at this time, then [`None`] is returned. Otherwise, an
Expand Down