From 54ce83fc491cd17a15d81448e486d6e771f04e79 Mon Sep 17 00:00:00 2001 From: jtnunley Date: Wed, 31 Aug 2022 14:59:48 -0700 Subject: [PATCH 01/11] implement no_std event listeners implement no_std event listeners --- .github/workflows/ci.yml | 4 + Cargo.toml | 10 +- src/lib.rs | 980 ++++++++++++++++++--------------------- src/sync.rs | 81 ++++ 4 files changed, 553 insertions(+), 522 deletions(-) create mode 100644 src/sync.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2ab378b..6284597 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -29,6 +29,10 @@ jobs: if: startsWith(matrix.rust, 'nightly') run: cargo check -Z features=dev_dep - run: cargo test + - name: Install cargo-hack + uses: taiki-e/install-action@cargo-hack + - run: rustup target add thumbv7m-none-eabi + - run: cargo hack build --target thumbv7m-none-eabi --no-default-features --no-dev-deps msrv: runs-on: ubuntu-latest diff --git a/Cargo.toml b/Cargo.toml index 38f4cee..b5919d5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,15 @@ categories = ["asynchronous", "concurrency"] exclude = ["/.*"] [dependencies] -parking = "2.0.0" +concurrent-queue = { git = "https://github.com/smol-rs/concurrent-queue.git", default-features = false } +parking = { version = "2", optional = true } + +[target.'cfg(loom)'.dependencies] +loom = "0.5" + +[features] +default = ["std"] +std = ["parking", "concurrent-queue/std"] [dev-dependencies] criterion = "0.3.4" diff --git a/src/lib.rs b/src/lib.rs index 0290a48..e04569b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -61,51 +61,48 @@ //! ``` #![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)] +#![no_std] -use std::cell::{Cell, UnsafeCell}; -use std::fmt; -use std::future::Future; -use std::mem::{self, ManuallyDrop}; -use std::ops::{Deref, DerefMut}; +extern crate alloc; +#[cfg(feature = "std")] +extern crate std; + +mod sync; + +use alloc::sync::Arc; + +use core::fmt; +use core::future::Future; +use core::mem::{ManuallyDrop, MaybeUninit}; +use core::ptr; +use core::task::{Context, Poll, Waker}; + +#[cfg(feature = "std")] use std::panic::{RefUnwindSafe, UnwindSafe}; -use std::pin::Pin; -use std::ptr::{self, NonNull}; -use std::sync::atomic::{self, AtomicPtr, AtomicUsize, Ordering}; -use std::sync::{Arc, Mutex, MutexGuard}; -use std::task::{Context, Poll, Waker}; +#[cfg(feature = "std")] use std::time::{Duration, Instant}; -use std::usize; -use parking::Unparker; +use concurrent_queue::ConcurrentQueue; -/// Inner state of [`Event`]. -struct Inner { - /// The number of notified entries, or `usize::MAX` if all of them have been notified. - /// - /// If there are no entries, this value is set to `usize::MAX`. - notified: AtomicUsize, +use sync::{ + AtomicPtr, AtomicUsize, AtomicWithMut, + Ordering::{self, AcqRel, Acquire, Release, SeqCst}, + UnsafeCell, +}; - /// A linked list holding registered listeners. - list: Mutex, +#[cfg(feature = "std")] +use sync::{pair, Unparker}; - /// A single cached list entry to avoid allocations on the fast path of the insertion. - cache: UnsafeCell, -} +/// Inner state for an `Event`. +struct Inner { + /// The queue that contains the listeners. + queue: ConcurrentQueue>, -impl Inner { - /// Locks the list. - fn lock(&self) -> ListGuard<'_> { - ListGuard { - inner: self, - guard: self.list.lock().unwrap(), - } - } + /// The number of non-notified entries in the queue. + len: AtomicUsize, - /// Returns the pointer to the single cached list entry. - #[inline(always)] - fn cache_ptr(&self) -> NonNull { - unsafe { NonNull::new_unchecked(self.cache.get()) } - } + /// The number of notified entries in the queue. + notified: AtomicUsize, } /// A synchronization primitive for notifying async tasks and threads. @@ -128,59 +125,47 @@ impl Inner { /// /// Listeners are registered and notified in the first-in first-out fashion, ensuring fairness. pub struct Event { - /// A pointer to heap-allocated inner state. + /// The linked list containing the event listeners. /// - /// This pointer is initially null and gets lazily initialized on first use. Semantically, it - /// is an `Arc` so it's important to keep in mind that it contributes to the [`Arc`]'s - /// reference count. + /// Semantically, this is an `Option>` that can be atomically + /// const-initialized. inner: AtomicPtr, } -unsafe impl Send for Event {} -unsafe impl Sync for Event {} - -impl UnwindSafe for Event {} -impl RefUnwindSafe for Event {} - impl Event { /// Creates a new [`Event`]. - /// - /// # Examples - /// - /// ``` - /// use event_listener::Event; - /// - /// let event = Event::new(); - /// ``` - #[inline] - pub const fn new() -> Event { - Event { + #[cfg(not(loom))] + pub const fn new() -> Self { + Self { + inner: AtomicPtr::new(ptr::null_mut()), + } + } + + /// Creates a new [`Event`]. + #[cfg(loom)] + pub fn new() -> Self { + Self { inner: AtomicPtr::new(ptr::null_mut()), } } /// Returns a guard listening for a notification. - /// - /// This method emits a `SeqCst` fence after registering a listener. - /// - /// # Examples - /// - /// ``` - /// use event_listener::Event; - /// - /// let event = Event::new(); - /// let listener = event.listen(); - /// ``` - #[cold] pub fn listen(&self) -> EventListener { - let inner = self.inner(); + // Load or get a reference to the listener. + let inner = unsafe { ManuallyDrop::new(Arc::from_raw(self.inner())) }; + + // Create a new listener in the queue. + let entry = Arc::new(Listener::default()); + inner.queue.push(entry.clone()).ok(); + let listener = EventListener { - inner: unsafe { Arc::clone(&ManuallyDrop::new(Arc::from_raw(inner))) }, - entry: unsafe { Some((*inner).lock().insert((*inner).cache_ptr())) }, + inner: Arc::clone(&*inner), + entry: Some(entry), }; - // Make sure the listener is registered before whatever happens next. - full_fence(); + // Update the non-notified length. + inner.len.fetch_add(1, Release); + listener } @@ -221,9 +206,7 @@ impl Event { if let Some(inner) = self.try_inner() { // Notify if there is at least one unnotified listener and the number of notified // listeners is less than `n`. - if inner.notified.load(Ordering::Acquire) < n { - inner.lock().notify(n); - } + inner.notify(n); } } @@ -265,9 +248,7 @@ impl Event { if let Some(inner) = self.try_inner() { // Notify if there is at least one unnotified listener and the number of notified // listeners is less than `n`. - if inner.notified.load(Ordering::Acquire) < n { - inner.lock().notify(n); - } + inner.notify(n); } } @@ -308,8 +289,8 @@ impl Event { if let Some(inner) = self.try_inner() { // Notify if there is at least one unnotified listener. - if inner.notified.load(Ordering::Acquire) < usize::MAX { - inner.lock().notify_additional(n); + if inner.len.load(Acquire) > 0 { + inner.notify_additional(n); } } } @@ -353,8 +334,8 @@ impl Event { pub fn notify_additional_relaxed(&self, n: usize) { if let Some(inner) = self.try_inner() { // Notify if there is at least one unnotified listener. - if inner.notified.load(Ordering::Acquire) < usize::MAX { - inner.lock().notify_additional(n); + if inner.len.load(Acquire) > 0 { + inner.notify_additional(n); } } } @@ -366,40 +347,27 @@ impl Event { unsafe { inner.as_ref() } } - /// Returns a raw pointer to the inner state, initializing it if necessary. - /// - /// This returns a raw pointer instead of reference because `from_raw` - /// requires raw/mut provenance: + /// Get the pointer to the linked list of listeners. fn inner(&self) -> *const Inner { - let mut inner = self.inner.load(Ordering::Acquire); + // If the pointer is null, try to initialize it. + let mut inner = self.inner.load(Acquire); - // Initialize the state if this is its first use. if inner.is_null() { // Allocate on the heap. let new = Arc::new(Inner { - notified: AtomicUsize::new(usize::MAX), - list: std::sync::Mutex::new(List { - head: None, - tail: None, - start: None, - len: 0, - notified: 0, - cache_used: false, - }), - cache: UnsafeCell::new(Entry { - state: Cell::new(State::Created), - prev: Cell::new(None), - next: Cell::new(None), - }), + queue: ConcurrentQueue::unbounded(), + len: AtomicUsize::new(0), + notified: AtomicUsize::new(0), }); - // Convert the heap-allocated state into a raw pointer. + + // Convert to a pointer. let new = Arc::into_raw(new) as *mut Inner; - // Attempt to replace the null-pointer with the new state pointer. + // Attempt to replace the original state with the new pointer. inner = self .inner - .compare_exchange(inner, new, Ordering::AcqRel, Ordering::Acquire) - .unwrap_or_else(|x| x); + .compare_exchange(inner, new, AcqRel, Acquire) + .unwrap_or_else(|e| e); // Check if the old pointer value was indeed null. if inner.is_null() { @@ -418,242 +386,186 @@ impl Event { } } -impl Drop for Event { - #[inline] - fn drop(&mut self) { - let inner: *mut Inner = *self.inner.get_mut(); - - // If the state pointer has been initialized, deallocate it. - if !inner.is_null() { - unsafe { - drop(Arc::from_raw(inner)); - } - } - } -} - impl fmt::Debug for Event { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.pad("Event { .. }") } } -impl Default for Event { - fn default() -> Event { - Event::new() - } -} - /// A guard waiting for a notification from an [`Event`]. -/// -/// There are two ways for a listener to wait for a notification: -/// -/// 1. In an asynchronous manner using `.await`. -/// 2. In a blocking manner by calling [`EventListener::wait()`] on it. -/// -/// If a notified listener is dropped without receiving a notification, dropping will notify -/// another active listener. Whether one *additional* listener will be notified depends on what -/// kind of notification was delivered. pub struct EventListener { - /// A reference to [`Event`]'s inner state. + /// The reference to the original linked list. inner: Arc, - /// A pointer to this listener's entry in the linked list. - entry: Option>, + /// The specific entry that this listener is listening on. + entry: Option>, } -unsafe impl Send for EventListener {} -unsafe impl Sync for EventListener {} - -impl UnwindSafe for EventListener {} -impl RefUnwindSafe for EventListener {} - impl EventListener { - /// Blocks until a notification is received. + /// Drops this listener and discards its notification (if any) without notifying another + /// active listener. /// - /// # Examples + /// Returns `true` if a notification was discarded. /// + /// # Examples /// ``` /// use event_listener::Event; /// /// let event = Event::new(); - /// let listener = event.listen(); + /// let listener1 = event.listen(); + /// let listener2 = event.listen(); /// - /// // Notify `listener`. /// event.notify(1); /// - /// // Receive the notification. - /// listener.wait(); + /// assert!(listener1.discard()); + /// assert!(!listener2.discard()); /// ``` - pub fn wait(self) { - self.wait_internal(None); + pub fn discard(mut self) -> bool { + self.orphan().is_some() } - /// Blocks until a notification is received or a timeout is reached. - /// - /// Returns `true` if a notification was received. + /// Returns `true` if this listener listens to the given `Event`. /// /// # Examples /// /// ``` - /// use std::time::Duration; /// use event_listener::Event; /// /// let event = Event::new(); /// let listener = event.listen(); /// - /// // There are no notification so this times out. - /// assert!(!listener.wait_timeout(Duration::from_secs(1))); + /// assert!(listener.listens_to(&event)); /// ``` - pub fn wait_timeout(self, timeout: Duration) -> bool { - self.wait_internal(Some(Instant::now() + timeout)) + #[inline] + pub fn listens_to(&self, event: &Event) -> bool { + ptr::eq::(&*self.inner, event.inner.load(Ordering::Acquire)) } - /// Blocks until a notification is received or a deadline is reached. - /// - /// Returns `true` if a notification was received. + /// Returns `true` if both listeners listen to the same `Event`. /// /// # Examples /// /// ``` - /// use std::time::{Duration, Instant}; /// use event_listener::Event; /// /// let event = Event::new(); - /// let listener = event.listen(); + /// let listener1 = event.listen(); + /// let listener2 = event.listen(); /// - /// // There are no notification so this times out. - /// assert!(!listener.wait_deadline(Instant::now() + Duration::from_secs(1))); + /// assert!(listener1.same_event(&listener2)); /// ``` - pub fn wait_deadline(self, deadline: Instant) -> bool { - self.wait_internal(Some(deadline)) + pub fn same_event(&self, other: &EventListener) -> bool { + ptr::eq::(&*self.inner, &*other.inner) } - /// Drops this listener and discards its notification (if any) without notifying another - /// active listener. - /// - /// Returns `true` if a notification was discarded. + /// A wrapper around `entry.orphan()` that also updates counts in the main structure. + fn orphan(&mut self) -> Option { + self.inner.len.fetch_sub(1, Release); + + if let Some(entry) = self.entry.take() { + if let Some(additional) = entry.orphan() { + // Decrement the number of notified entries. + self.inner.notified.fetch_sub(1, Release); + return Some(additional); + } + } + + None + } +} + +#[cfg(feature = "std")] +impl EventListener { + /// Blocks until a notification is received. /// /// # Examples + /// /// ``` /// use event_listener::Event; /// /// let event = Event::new(); - /// let listener1 = event.listen(); - /// let listener2 = event.listen(); + /// let listener = event.listen(); /// + /// // Notify `listener`. /// event.notify(1); /// - /// assert!(listener1.discard()); - /// assert!(!listener2.discard()); + /// // Receive the notification. + /// listener.wait(); /// ``` - pub fn discard(mut self) -> bool { - // If this listener has never picked up a notification... - if let Some(entry) = self.entry.take() { - let mut list = self.inner.lock(); - // Remove the listener from the list and return `true` if it was notified. - if let State::Notified(_) = list.remove(entry, self.inner.cache_ptr()) { - return true; - } - } - false + pub fn wait(self) { + self.wait_internal(None); } - /// Returns `true` if this listener listens to the given `Event`. + /// Blocks until a notification is received or a timeout is reached. + /// + /// Returns `true` if a notification was received. /// /// # Examples /// /// ``` + /// use std::time::Duration; /// use event_listener::Event; /// /// let event = Event::new(); /// let listener = event.listen(); /// - /// assert!(listener.listens_to(&event)); + /// // There are no notification so this times out. + /// assert!(!listener.wait_timeout(Duration::from_secs(1))); /// ``` - #[inline] - pub fn listens_to(&self, event: &Event) -> bool { - ptr::eq::(&*self.inner, event.inner.load(Ordering::Acquire)) + pub fn wait_timeout(self, duration: Duration) -> bool { + self.wait_internal(Instant::now().checked_add(duration)) } - /// Returns `true` if both listeners listen to the same `Event`. + /// Blocks until a notification is received or a deadline is reached. + /// + /// Returns `true` if a notification was received. /// /// # Examples /// /// ``` + /// use std::time::{Duration, Instant}; /// use event_listener::Event; /// /// let event = Event::new(); - /// let listener1 = event.listen(); - /// let listener2 = event.listen(); + /// let listener = event.listen(); /// - /// assert!(listener1.same_event(&listener2)); + /// // There are no notification so this times out. + /// assert!(!listener.wait_deadline(Instant::now() + Duration::from_secs(1))); /// ``` - pub fn same_event(&self, other: &EventListener) -> bool { - ptr::eq::(&*self.inner, &*other.inner) + pub fn wait_deadline(self, deadline: Instant) -> bool { + self.wait_internal(Some(deadline)) } fn wait_internal(mut self, deadline: Option) -> bool { - // Take out the entry pointer and set it to `None`. - let entry = match self.entry.take() { - None => unreachable!("cannot wait twice on an `EventListener`"), - Some(entry) => entry, - }; - let (parker, unparker) = parking::pair(); - - // Set this listener's state to `Waiting`. - { - let mut list = self.inner.lock(); - let e = unsafe { entry.as_ref() }; - - // Do a dummy replace operation in order to take out the state. - match e.state.replace(State::Notified(false)) { - State::Notified(_) => { - // If this listener has been notified, remove it from the list and return. - list.remove(entry, self.inner.cache_ptr()); - return true; - } - // Otherwise, set the state to `Waiting`. - _ => e.state.set(State::Waiting(unparker)), + // Take out the entry. + let entry = self.entry.as_ref().expect("already waited"); + + // Create a parker/unparker pair. + let (parker, unparker) = pair(); + + // Begin looping on `entry.wait()`. + let notified = loop { + let unparker = unparker.clone(); + + if entry.wait(move || Task::Thread(unparker)) { + // We have been notified. + break true; } - } - // Wait until a notification is received or the timeout is reached. - loop { + // Park the thread and see if we have been notified. match deadline { - None => parker.park(), - Some(deadline) => { - // Check for timeout. - let now = Instant::now(); - if now >= deadline { - // Remove the entry and check if notified. - return self - .inner - .lock() - .remove(entry, self.inner.cache_ptr()) - .is_notified(); + if !parker.park_deadline(deadline) { + // The timeout elapsed. Return false. + break false; } - - // Park until the deadline. - parker.park_timeout(deadline - now); } + None => parker.park(), } + }; - let mut list = self.inner.lock(); - let e = unsafe { entry.as_ref() }; - - // Do a dummy replace operation in order to take out the state. - match e.state.replace(State::Notified(false)) { - State::Notified(_) => { - // If this listener has been notified, remove it from the list and return. - list.remove(entry, self.inner.cache_ptr()); - return true; - } - // Otherwise, set the state back to `Waiting`. - state => e.state.set(state), - } - } + self.orphan(); + notified } } @@ -663,341 +575,367 @@ impl fmt::Debug for EventListener { } } +impl Unpin for EventListener {} + impl Future for EventListener { type Output = (); - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut list = self.inner.lock(); + fn poll(mut self: core::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let entry = self.entry.as_ref().expect("already waited"); - let entry = match self.entry { - None => unreachable!("cannot poll a completed `EventListener` future"), - Some(entry) => entry, - }; - let state = unsafe { &entry.as_ref().state }; - - // Do a dummy replace operation in order to take out the state. - match state.replace(State::Notified(false)) { - State::Notified(_) => { - // If this listener has been notified, remove it from the list and return. - list.remove(entry, self.inner.cache_ptr()); - drop(list); - self.entry = None; - return Poll::Ready(()); - } - State::Created => { - // If the listener was just created, put it in the `Polling` state. - state.set(State::Polling(cx.waker().clone())); - } - State::Polling(w) => { - // If the listener was in the `Polling` state, update the waker. - if w.will_wake(cx.waker()) { - state.set(State::Polling(w)); - } else { - state.set(State::Polling(cx.waker().clone())); - } - } - State::Waiting(_) => { - unreachable!("cannot poll and wait on `EventListener` at the same time") - } + if entry.wait(|| Task::Waker(cx.waker().clone())) { + self.orphan(); + Poll::Ready(()) + } else { + Poll::Pending } - - Poll::Pending } } impl Drop for EventListener { fn drop(&mut self) { - // If this listener has never picked up a notification... - if let Some(entry) = self.entry.take() { - let mut list = self.inner.lock(); - - // But if a notification was delivered to it... - if let State::Notified(additional) = list.remove(entry, self.inner.cache_ptr()) { - // Then pass it on to another active listener. - if additional { - list.notify_additional(1); - } else { - list.notify(1); - } + // If this listener has never picked up a notification, but a notification was delivered to it... + if let Some(additional) = self.orphan() { + // Then pass it on to another active listener. + if additional { + self.inner.notify_additional(1); + } else { + self.inner.notify(1); } } } } -/// A guard holding the linked list locked. -struct ListGuard<'a> { - /// A reference to [`Event`]'s inner state. - inner: &'a Inner, +impl Inner { + /// Notifies `n` listeners. + fn notify(&self, n: usize) { + let notified = self.notified.load(Acquire); - /// The actual guard that acquired the linked list. - guard: MutexGuard<'a, List>, -} + if notified >= n { + return; + } -impl Drop for ListGuard<'_> { - #[inline] - fn drop(&mut self) { - let list = &mut **self; + self.notify_internal(n - notified, false) + } - // Update the atomic `notified` counter. - let notified = if list.notified < list.len { - list.notified - } else { - usize::MAX - }; - self.inner.notified.store(notified, Ordering::Release); + /// Notifies `n` additional listeners. + fn notify_additional(&self, n: usize) { + self.notify_internal(n, true) } -} -impl Deref for ListGuard<'_> { - type Target = List; + fn notify_internal(&self, mut n: usize, additional: bool) { + while n > 0 { + n -= 1; - #[inline] - fn deref(&self) -> &List { - &*self.guard + // Notify the first unnotified entry. + match self.queue.pop() { + Err(_) => break, + Ok(entry) => { + if entry.notify(additional) { + // Increment the number of notified entries. + self.notified.fetch_add(1, Release); + } + } + } + } } } -impl DerefMut for ListGuard<'_> { - #[inline] - fn deref_mut(&mut self) -> &mut List { - &mut *self.guard - } -} +/// The internal listener for the `Event`. +struct Listener { + /// The current state of the listener. + state: AtomicUsize, -/// The state of a listener. -enum State { - /// It has just been created. - Created, + /// The task that this listener is blocked on. + task: UnsafeCell>, +} - /// It has received a notification. - /// - /// The `bool` is `true` if this was an "additional" notification. - Notified(bool), +unsafe impl Send for Listener {} +unsafe impl Sync for Listener {} - /// An async task is polling it. - Polling(Waker), +#[cfg(feature = "std")] +impl UnwindSafe for Listener {} +#[cfg(feature = "std")] +impl RefUnwindSafe for Listener {} - /// A thread is blocked on it. - Waiting(Unparker), +impl Default for Listener { + fn default() -> Self { + Self { + state: AtomicUsize::new(State::Created.into()), + task: UnsafeCell::new(MaybeUninit::uninit()), + } + } } -impl State { - /// Returns `true` if this is the `Notified` state. - #[inline] - fn is_notified(&self) -> bool { - match self { - State::Notified(_) => true, - State::Created | State::Polling(_) | State::Waiting(_) => false, +impl Listener { + /// Begin waiting on this `Listener`. + /// + /// Returns `true` if the listener was notified. + fn wait(&self, task: impl FnOnce() -> Task) -> bool { + // Eagerly assume that we are in `Created` state, and that we want to + // transition to writing our task. + loop { + match self + .state + .compare_exchange( + State::Created.into(), + State::WritingTask.into(), + AcqRel, + Acquire, + ) + .map(State::from) + .map_err(State::from) + { + Ok(State::Created) => { + // We now hold the "lock" on the task slot. Write the task to the slot. + self.task.with_mut(|slot| unsafe { + ptr::write(slot, MaybeUninit::new((task)())); + }); + + // We are done writing to the task slot. Transition to `Task`. + // No other thread can transition to `Task` from `WritingTask`. + self.state.store(State::Task.into(), Release); + + // Now, we should wait for a notification. + return false; + } + Err(State::WritingTask) => { + // We must be in the process of being woken up. Wait for the task to be written. + busy_wait(); + } + Err(State::Notified | State::NotifiedAdditional) => { + // We were already notified. We are done. + return true; + } + Err(State::Task) => { + // The task is still there. Wait for it to be removed. + return false; + } + state => unreachable!("Unintelligible state: {:?}", state), + } } } -} -/// An entry representing a registered listener. -struct Entry { - /// The state of this listener. - state: Cell, + /// Notify this `Listener`. + /// + /// Returns `true` if the listener was successfully notified. + fn notify(&self, additional: bool) -> bool { + let new_state = if additional { + State::NotifiedAdditional + } else { + State::Notified + }; - /// Previous entry in the linked list. - prev: Cell>>, + loop { + let state = self.state.load(Acquire).into(); + + // Determine what state we're in. + match state { + State::Created | State::Notified | State::NotifiedAdditional => { + // Indicate that the listener was notified. + if self + .state + .compare_exchange(state.into(), new_state.into(), AcqRel, Acquire) + .is_ok() + { + return true; + } + } + State::WritingTask => { + // The listener is currently writing the task, wait until they finish. + } + State::Task => { + if self + .state + .compare_exchange( + State::Task.into(), + State::WritingTask.into(), + AcqRel, + Acquire, + ) + .is_err() + { + // Someone else got to it before we did. + continue; + } - /// Next entry in the linked list. - next: Cell>>, -} + // SAFETY: Since we hold the lock, we can now read out the primitive. + let task = self + .task + .with_mut(|task| unsafe { ptr::read(task.cast::()) }); -/// A linked list of entries. -struct List { - /// First entry in the list. - head: Option>, + // SAFETY: No other code makes a change when `WritingTask` is detected. + self.state.store(new_state.into(), Release); + + // Wake the task up and return. + task.wake(); + return true; + } + State::Orphaned => { + // This task is no longer being monitored. + return false; + } + } - /// Last entry in the list. - tail: Option>, + busy_wait(); + } + } - /// The first unnotified entry in the list. - start: Option>, + /// Orphan this `Listener`. + /// + /// Returns `Some` if a notification was discarded. The `bool` is `true` + /// if the notification was an additional notification. + fn orphan(&self) -> Option { + let mut state = self.state.load(Acquire).into(); - /// Total number of entries in the list. - len: usize, + loop { + match state { + State::Created | State::Notified | State::NotifiedAdditional => { + // Indicate that the listener was notified. + if let Err(new_state) = self.state.compare_exchange( + state.into(), + State::Orphaned.into(), + AcqRel, + Acquire, + ) { + // We failed to transition to `Orphaned`. Try again. + state = new_state.into(); + continue; + } - /// The number of notified entries in the list. - notified: usize, + match state { + State::Notified => return Some(false), + State::NotifiedAdditional => return Some(true), + _ => return None, + } + } + State::WritingTask => { + // We're in the middle of being notified, wait for them to finish writing first. + } + State::Task => { + if let Err(new_state) = self.state.compare_exchange( + State::Task.into(), + State::WritingTask.into(), + AcqRel, + Acquire, + ) { + // Someone else got to it before we did. + state = new_state.into(); + continue; + } - /// Whether the cached entry is used. - cache_used: bool, -} + // SAFETY: Since we hold the lock, we can now drop the primitive. + self.task + .with_mut(|task| unsafe { ptr::drop_in_place(task.cast::()) }); -impl List { - /// Inserts a new entry into the list. - fn insert(&mut self, cache: NonNull) -> NonNull { - unsafe { - let entry = Entry { - state: Cell::new(State::Created), - prev: Cell::new(self.tail), - next: Cell::new(None), - }; - - let entry = if self.cache_used { - // Allocate an entry that is going to become the new tail. - NonNull::new_unchecked(Box::into_raw(Box::new(entry))) - } else { - // No need to allocate - we can use the cached entry. - self.cache_used = true; - cache.as_ptr().write(entry); - cache - }; - - // Replace the tail with the new entry. - match mem::replace(&mut self.tail, Some(entry)) { - None => self.head = Some(entry), - Some(t) => t.as_ref().next.set(Some(entry)), - } + // SAFETY: No other code makes a change when `WritingTask` is detected. + self.state.store(State::Orphaned.into(), Release); - // If there were no unnotified entries, this one is the first now. - if self.start.is_none() { - self.start = self.tail; + // We did not discard a notification. + return None; + } + State::Orphaned => { + // We have already been orphaned. + return None; + } } - // Bump the entry count. - self.len += 1; - - entry + busy_wait(); + state = self.state.load(Acquire).into(); } } +} - /// Removes an entry from the list and returns its state. - fn remove(&mut self, entry: NonNull, cache: NonNull) -> State { - unsafe { - let prev = entry.as_ref().prev.get(); - let next = entry.as_ref().next.get(); - - // Unlink from the previous entry. - match prev { - None => self.head = next, - Some(p) => p.as_ref().next.set(next), - } - - // Unlink from the next entry. - match next { - None => self.tail = prev, - Some(n) => n.as_ref().prev.set(prev), - } - - // If this was the first unnotified entry, move the pointer to the next one. - if self.start == Some(entry) { - self.start = next; +impl Drop for Listener { + fn drop(&mut self) { + let Self { + ref mut state, + ref mut task, + } = self; + + state.with_mut(|state| { + if let State::Task = (*state).into() { + // We're still holding onto a task, drop it. + task.with_mut(|task| unsafe { ptr::drop_in_place(task.cast::()) }); } + }); + } +} - // Extract the state. - let state = if ptr::eq(entry.as_ptr(), cache.as_ptr()) { - // Free the cached entry. - self.cache_used = false; - entry.as_ref().state.replace(State::Created) - } else { - // Deallocate the entry. - Box::from_raw(entry.as_ptr()).state.into_inner() - }; +/// The state that a `Listener` can be in. +#[derive(Debug, Copy, Clone)] +#[repr(usize)] +enum State { + /// The listener was just created. + Created, - // Update the counters. - if state.is_notified() { - self.notified -= 1; - } - self.len -= 1; + /// The listener has been notified through `notify()`. + Notified, - state - } - } + /// The listener has been notified through `notify_additional()`. + NotifiedAdditional, - /// Notifies a number of entries. - #[cold] - fn notify(&mut self, mut n: usize) { - if n <= self.notified { - return; - } - n -= self.notified; + /// The listener is being used to hold a task. + /// + /// If the listener is in this state, then the `task` field is guaranteed to + /// be initialized. + Task, - while n > 0 { - n -= 1; + /// The `task` field is being written to. + WritingTask, - // Notify the first unnotified entry. - match self.start { - None => break, - Some(e) => { - // Get the entry and move the pointer forward. - let e = unsafe { e.as_ref() }; - self.start = e.next.get(); - - // Set the state of this entry to `Notified` and notify. - match e.state.replace(State::Notified(false)) { - State::Notified(_) => {} - State::Created => {} - State::Polling(w) => w.wake(), - State::Waiting(t) => { - t.unpark(); - } - } + /// The listener is being dropped. + Orphaned, +} - // Update the counter. - self.notified += 1; - } - } +impl From for State { + fn from(val: usize) -> Self { + match val { + 0 => State::Created, + 1 => State::Notified, + 2 => State::NotifiedAdditional, + 3 => State::Task, + 4 => State::WritingTask, + 5 => State::Orphaned, + _ => unreachable!("invalid state"), } } +} - /// Notifies a number of additional entries. - #[cold] - fn notify_additional(&mut self, mut n: usize) { - while n > 0 { - n -= 1; +impl From for usize { + fn from(state: State) -> Self { + state as usize + } +} - // Notify the first unnotified entry. - match self.start { - None => break, - Some(e) => { - // Get the entry and move the pointer forward. - let e = unsafe { e.as_ref() }; - self.start = e.next.get(); - - // Set the state of this entry to `Notified` and notify. - match e.state.replace(State::Notified(true)) { - State::Notified(_) => {} - State::Created => {} - State::Polling(w) => w.wake(), - State::Waiting(t) => { - t.unpark(); - } - } +/// The task to wake up once a notification is received. +enum Task { + /// The task is an async task waiting on a `Waker`. + Waker(Waker), + /// The task is a thread blocked on the `Unparker`. + #[cfg(feature = "std")] + Thread(Unparker), +} - // Update the counter. - self.notified += 1; - } +impl Task { + fn wake(self) { + match self { + Self::Waker(waker) => waker.wake(), + #[cfg(feature = "std")] + Self::Thread(unparker) => { + unparker.unpark(); } } } } -/// Equivalent to `atomic::fence(Ordering::SeqCst)`, but in some cases faster. +/// Indicate to the compiler/scheduler that we're in a spin loop. +fn busy_wait() { + #[allow(deprecated)] + core::sync::atomic::spin_loop_hint(); +} + #[inline] fn full_fence() { - if cfg!(all( - any(target_arch = "x86", target_arch = "x86_64"), - not(miri) - )) { - // HACK(stjepang): On x86 architectures there are two different ways of executing - // a `SeqCst` fence. - // - // 1. `atomic::fence(SeqCst)`, which compiles into a `mfence` instruction. - // 2. `_.compare_exchange(_, _, SeqCst, SeqCst)`, which compiles into a `lock cmpxchg` instruction. - // - // Both instructions have the effect of a full barrier, but empirical benchmarks have shown - // that the second one is sometimes a bit faster. - // - // The ideal solution here would be to use inline assembly, but we're instead creating a - // temporary atomic variable and compare-and-exchanging its value. No sane compiler to - // x86 platforms is going to optimize this away. - atomic::compiler_fence(Ordering::SeqCst); - let a = AtomicUsize::new(0); - let _ = a.compare_exchange(0, 1, Ordering::SeqCst, Ordering::SeqCst); - atomic::compiler_fence(Ordering::SeqCst); - } else { - atomic::fence(Ordering::SeqCst); - } + sync::fence(SeqCst); } diff --git a/src/sync.rs b/src/sync.rs new file mode 100644 index 0000000..3a8df7d --- /dev/null +++ b/src/sync.rs @@ -0,0 +1,81 @@ +#[cfg(not(loom))] +mod sync_impl { + pub(crate) use core::sync::atomic::{fence, AtomicPtr, AtomicUsize, Ordering}; + + #[cfg(feature = "std")] + pub(crate) use parking::{pair, Unparker}; + + pub(crate) trait AtomicWithMut { + type Item; + + fn with_mut R>(&mut self, f: F) -> R; + } + + impl AtomicWithMut for AtomicUsize { + type Item = usize; + + fn with_mut R>(&mut self, f: F) -> R { + f(self.get_mut()) + } + } + + impl AtomicWithMut for AtomicPtr { + type Item = *mut T; + + fn with_mut R>(&mut self, f: F) -> R { + f(self.get_mut()) + } + } + + pub(crate) struct UnsafeCell(core::cell::UnsafeCell); + + impl UnsafeCell { + pub(crate) const fn new(item: T) -> Self { + Self(core::cell::UnsafeCell::new(item)) + } + + pub(crate) fn with_mut(&self, f: F) -> R + where + F: FnOnce(*mut T) -> R, + { + f(self.0.get()) + } + } +} + +#[cfg(loom)] +mod sync_impl { + pub(crate) use loom::cell::UnsafeCell; + pub(crate) use loom::sync::atomic::{ + fence, AtomicBool, AtomicPtr, AtomicU32, AtomicUsize, Ordering, + }; + + /// Re-implementation of `parking::pair` based on loom. + pub(crate) fn pair() -> (Parker, Unparker) { + let th = loom::thread::current(); + + (Parker(Default::default()), Unparker(th)) + } + + /// Re-implementation of `parking::Parker` based on loom. + pub(crate) struct Parker(core::marker::PhantomData<*mut ()>); + + impl Parker { + pub(crate) fn park(&self) { + loom::thread::park(); + } + + // park_timeout is not available in loom + } + + /// Re-implementation of `parking::Unparker` based on loom. + pub(crate) struct Unparker(loom::thread::Thread); + + impl Unparker { + pub(crate) fn unpark(&self) { + self.0.unpark(); + } + } +} + +pub(crate) use sync_impl::*; From bac26db6edccd1b42166783eeebb9f6d7d369b35 Mon Sep 17 00:00:00 2001 From: jtnunley Date: Tue, 6 Sep 2022 08:59:03 -0700 Subject: [PATCH 02/11] fix leaky destructor + msrv --- src/lib.rs | 74 +++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 51 insertions(+), 23 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index e04569b..b779d3f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -392,6 +392,19 @@ impl fmt::Debug for Event { } } +impl Drop for Event { + fn drop(&mut self) { + // Drop the inner state. + self.inner.with_mut(|inner| { + if !inner.is_null() { + unsafe { + drop(Arc::::from_raw(*inner)); + } + } + }); + } +} + /// A guard waiting for a notification from an [`Event`]. pub struct EventListener { /// The reference to the original linked list. @@ -672,46 +685,55 @@ impl Listener { /// /// Returns `true` if the listener was notified. fn wait(&self, task: impl FnOnce() -> Task) -> bool { - // Eagerly assume that we are in `Created` state, and that we want to - // transition to writing our task. + let mut state = self.state.load(Acquire).into(); + loop { - match self - .state - .compare_exchange( - State::Created.into(), - State::WritingTask.into(), - AcqRel, - Acquire, - ) - .map(State::from) - .map_err(State::from) - { - Ok(State::Created) => { + match state { + State::Created | State::Task => { + // Try to "lock" the listener. + if let Err(e) = self.state.compare_exchange( + state.into(), + State::WritingTask.into(), + SeqCst, + SeqCst, + ) { + state = e.into(); + busy_wait(); + continue; + } + // We now hold the "lock" on the task slot. Write the task to the slot. - self.task.with_mut(|slot| unsafe { - ptr::write(slot, MaybeUninit::new((task)())); + let task = self.task.with_mut(|slot| unsafe { + // If there already was a task, swap it out and wake it instead of replacing it. + if matches!(state, State::Task) { + Some(ptr::replace(slot.cast(), (task)())) + } else { + ptr::write(slot.cast(), (task)()); + None + } }); // We are done writing to the task slot. Transition to `Task`. // No other thread can transition to `Task` from `WritingTask`. self.state.store(State::Task.into(), Release); + // If we yielded a task, wake it now. + if let Some(task) = task { + task.wake(); + } + // Now, we should wait for a notification. return false; } - Err(State::WritingTask) => { + State::WritingTask => { // We must be in the process of being woken up. Wait for the task to be written. busy_wait(); } - Err(State::Notified | State::NotifiedAdditional) => { + State::Notified | State::NotifiedAdditional => { // We were already notified. We are done. return true; } - Err(State::Task) => { - // The task is still there. Wait for it to be removed. - return false; - } - state => unreachable!("Unintelligible state: {:?}", state), + State::Orphaned => unreachable!("orphaned listener"), } } } @@ -800,6 +822,7 @@ impl Listener { ) { // We failed to transition to `Orphaned`. Try again. state = new_state.into(); + busy_wait(); continue; } @@ -912,6 +935,7 @@ impl From for usize { enum Task { /// The task is an async task waiting on a `Waker`. Waker(Waker), + /// The task is a thread blocked on the `Unparker`. #[cfg(feature = "std")] Thread(Unparker), @@ -931,7 +955,11 @@ impl Task { /// Indicate to the compiler/scheduler that we're in a spin loop. fn busy_wait() { + #[cfg(feature = "std")] + std::thread::yield_now(); + #[allow(deprecated)] + #[cfg(not(feature = "std"))] core::sync::atomic::spin_loop_hint(); } From d3295ca6c7452fbb0a79badaf5218329749dfe1d Mon Sep 17 00:00:00 2001 From: jtnunley Date: Tue, 6 Sep 2022 09:22:13 -0700 Subject: [PATCH 03/11] fix msrv + loom compilation --- src/lib.rs | 18 +++++++++++++----- src/sync.rs | 8 +++++--- 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index b779d3f..e1257da 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -525,6 +525,7 @@ impl EventListener { /// // There are no notification so this times out. /// assert!(!listener.wait_timeout(Duration::from_secs(1))); /// ``` + #[cfg(not(loom))] pub fn wait_timeout(self, duration: Duration) -> bool { self.wait_internal(Instant::now().checked_add(duration)) } @@ -545,6 +546,7 @@ impl EventListener { /// // There are no notification so this times out. /// assert!(!listener.wait_deadline(Instant::now() + Duration::from_secs(1))); /// ``` + #[cfg(not(loom))] pub fn wait_deadline(self, deadline: Instant) -> bool { self.wait_internal(Some(deadline)) } @@ -568,9 +570,15 @@ impl EventListener { // Park the thread and see if we have been notified. match deadline { Some(deadline) => { - if !parker.park_deadline(deadline) { - // The timeout elapsed. Return false. - break false; + #[cfg(loom)] + panic!("`wait_deadline` is not supported with loom"); + + #[cfg(not(loom))] + { + if !parker.park_deadline(deadline) { + // The timeout elapsed. Return false. + break false; + } } } None => parker.park(), @@ -705,7 +713,7 @@ impl Listener { // We now hold the "lock" on the task slot. Write the task to the slot. let task = self.task.with_mut(|slot| unsafe { // If there already was a task, swap it out and wake it instead of replacing it. - if matches!(state, State::Task) { + if state == State::Task { Some(ptr::replace(slot.cast(), (task)())) } else { ptr::write(slot.cast(), (task)()); @@ -886,7 +894,7 @@ impl Drop for Listener { } /// The state that a `Listener` can be in. -#[derive(Debug, Copy, Clone)] +#[derive(Debug, Copy, Clone, PartialEq, Eq)] #[repr(usize)] enum State { /// The listener was just created. diff --git a/src/sync.rs b/src/sync.rs index 3a8df7d..1b892d5 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -46,9 +46,7 @@ mod sync_impl { #[cfg(loom)] mod sync_impl { pub(crate) use loom::cell::UnsafeCell; - pub(crate) use loom::sync::atomic::{ - fence, AtomicBool, AtomicPtr, AtomicU32, AtomicUsize, Ordering, - }; + pub(crate) use loom::sync::atomic::{fence, AtomicPtr, AtomicUsize, Ordering}; /// Re-implementation of `parking::pair` based on loom. pub(crate) fn pair() -> (Parker, Unparker) { @@ -69,6 +67,7 @@ mod sync_impl { } /// Re-implementation of `parking::Unparker` based on loom. + #[derive(Clone)] pub(crate) struct Unparker(loom::thread::Thread); impl Unparker { @@ -76,6 +75,9 @@ mod sync_impl { self.0.unpark(); } } + + #[allow(dead_code)] + pub(crate) trait AtomicWithMut {} } pub(crate) use sync_impl::*; From 577feafcbf1d6ea2137f7c977afc163e3134a561 Mon Sep 17 00:00:00 2001 From: jtnunley Date: Tue, 6 Sep 2022 14:18:55 -0700 Subject: [PATCH 04/11] Make sure we handle panics --- src/lib.rs | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index e1257da..0f4cceb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -73,7 +73,7 @@ use alloc::sync::Arc; use core::fmt; use core::future::Future; -use core::mem::{ManuallyDrop, MaybeUninit}; +use core::mem::{ManuallyDrop, MaybeUninit, forget}; use core::ptr; use core::task::{Context, Poll, Waker}; @@ -711,12 +711,16 @@ impl Listener { } // We now hold the "lock" on the task slot. Write the task to the slot. + let guard = ResetState(&self.state, state); + let task = task(); + forget(guard); + let task = self.task.with_mut(|slot| unsafe { - // If there already was a task, swap it out and wake it instead of replacing it. + // If there already was a task, swap it out and wake it. if state == State::Task { - Some(ptr::replace(slot.cast(), (task)())) + Some(ptr::replace(slot.cast(), task)) } else { - ptr::write(slot.cast(), (task)()); + ptr::write(slot.cast(), task); None } }); @@ -743,6 +747,18 @@ impl Listener { } State::Orphaned => unreachable!("orphaned listener"), } + + /// The task() closure may clone a user-defined waker, which can panic. + /// + /// This panic would leave the listener in the `WritingTask` state, which will + /// lead to an infinite loop. This guard resets it back to the original state. + struct ResetState<'a>(&'a AtomicUsize, State); + + impl Drop for ResetState<'_> { + fn drop(&mut self) { + self.0.store(self.1.into(), Release); + } + } } } From fa6d6afb39a5d8265080018449cd11841c6c007b Mon Sep 17 00:00:00 2001 From: jtnunley Date: Tue, 6 Sep 2022 14:28:55 -0700 Subject: [PATCH 05/11] fmt --- src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 0f4cceb..5ec7a80 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -73,7 +73,7 @@ use alloc::sync::Arc; use core::fmt; use core::future::Future; -use core::mem::{ManuallyDrop, MaybeUninit, forget}; +use core::mem::{forget, ManuallyDrop, MaybeUninit}; use core::ptr; use core::task::{Context, Poll, Waker}; @@ -749,7 +749,7 @@ impl Listener { } /// The task() closure may clone a user-defined waker, which can panic. - /// + /// /// This panic would leave the listener in the `WritingTask` state, which will /// lead to an infinite loop. This guard resets it back to the original state. struct ResetState<'a>(&'a AtomicUsize, State); From 1587b7299d4a20916a703cb5ff9fcb074839079b Mon Sep 17 00:00:00 2001 From: jtnunley Date: Fri, 9 Sep 2022 09:54:47 -0700 Subject: [PATCH 06/11] fix MIRI failure --- src/lib.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index 5ec7a80..52837a6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -57,7 +57,9 @@ //! //! // Wait for a notification and continue the loop. //! listener.wait(); -//! } +//! } +//! # // Sleep to prevent MIRI failure, https://github.com/rust-lang/miri/issues/1371 +//! # std::thread::sleep(std::time::Duration::from_secs(3)); //! ``` #![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)] From 290ac53c4ab2629036e7ccdf9372c75ca1f1ec7e Mon Sep 17 00:00:00 2001 From: jtnunley Date: Fri, 9 Sep 2022 14:57:32 -0700 Subject: [PATCH 07/11] fmt --- src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index 52837a6..a79d7ae 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -57,7 +57,7 @@ //! //! // Wait for a notification and continue the loop. //! listener.wait(); -//! } +//! } //! # // Sleep to prevent MIRI failure, https://github.com/rust-lang/miri/issues/1371 //! # std::thread::sleep(std::time::Duration::from_secs(3)); //! ``` From b69f19eb2bce39f2011669f880268cbf169dac81 Mon Sep 17 00:00:00 2001 From: jtnunley Date: Tue, 20 Sep 2022 09:54:55 -0700 Subject: [PATCH 08/11] Add portable-atomic and loom from concurrent-queue --- Cargo.toml | 8 +++++-- src/lib.rs | 60 +++++++++++++++++++++++++++++++++++++++++++++-------- src/sync.rs | 58 ++++++++++++++++++++++++++++++++++++++++++--------- 3 files changed, 105 insertions(+), 21 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b5919d5..8e41c3b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,11 +15,15 @@ categories = ["asynchronous", "concurrency"] exclude = ["/.*"] [dependencies] -concurrent-queue = { git = "https://github.com/smol-rs/concurrent-queue.git", default-features = false } +concurrent-queue = { git = "https://github.com/smol-rs/concurrent-queue.git", branch = "loom", default-features = false } parking = { version = "2", optional = true } +portable-atomic = { version = "0.3", default-features = false, optional = true } +# Enables loom testing. This feature is permanently unstable and the API may +# change at any time. [target.'cfg(loom)'.dependencies] -loom = "0.5" +loom = { version = "0.5", optional = true } +concurrent-queue = { git = "https://github.com/smol-rs/concurrent-queue.git", branch = "loom", features = ["loom"] } [features] default = ["std"] diff --git a/src/lib.rs b/src/lib.rs index a79d7ae..f211a46 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -61,6 +61,18 @@ //! # // Sleep to prevent MIRI failure, https://github.com/rust-lang/miri/issues/1371 //! # std::thread::sleep(std::time::Duration::from_secs(3)); //! ``` +//! +//! # Features +//! +//! There is also a `portable-atomic` feature, which uses a polyfill from the +//! [`portable-atomic`] crate to provide atomic operations on platforms that do not support them. +//! See the [`README`] for the [`portable-atomic`] crate for more information on how to use it on +//! single-threaded targets. Note that even with this feature enabled, `event-listener` still +//! requires a global allocator to be available. See the documentation for the +//! [`std::alloc::GlobalAlloc`] trait for more information. +//! +//! [`portable-atomic`]: https://crates.io/crates/portable-atomic +//! [`README`]: https://github.com/taiki-e/portable-atomic/blob/main/README.md#optional-cfg #![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)] #![no_std] @@ -81,16 +93,19 @@ use core::task::{Context, Poll, Waker}; #[cfg(feature = "std")] use std::panic::{RefUnwindSafe, UnwindSafe}; +#[cfg(all(feature = "std", not(loom)))] +use std::time::Duration; #[cfg(feature = "std")] -use std::time::{Duration, Instant}; +use std::time::Instant; use concurrent_queue::ConcurrentQueue; -use sync::{ - AtomicPtr, AtomicUsize, AtomicWithMut, - Ordering::{self, AcqRel, Acquire, Release, SeqCst}, - UnsafeCell, -}; +use sync::atomic::Ordering::{self, AcqRel, Acquire, Release, SeqCst}; +use sync::atomic::{AtomicPtr, AtomicUsize}; +use sync::UnsafeCell; + +#[cfg(not(loom))] +use sync::AtomicWithMut; #[cfg(feature = "std")] use sync::{pair, Unparker}; @@ -573,7 +588,10 @@ impl EventListener { match deadline { Some(deadline) => { #[cfg(loom)] - panic!("`wait_deadline` is not supported with loom"); + { + let _ = deadline; + panic!("`wait_deadline` is not supported with loom"); + } #[cfg(not(loom))] { @@ -870,6 +888,7 @@ impl Listener { ) { // Someone else got to it before we did. state = new_state.into(); + busy_wait(); continue; } @@ -982,7 +1001,7 @@ impl Task { /// Indicate to the compiler/scheduler that we're in a spin loop. fn busy_wait() { #[cfg(feature = "std")] - std::thread::yield_now(); + sync::thread::yield_now(); #[allow(deprecated)] #[cfg(not(feature = "std"))] @@ -991,5 +1010,28 @@ fn busy_wait() { #[inline] fn full_fence() { - sync::fence(SeqCst); + if cfg!(all( + any(target_arch = "x86", target_arch = "x86_64"), + not(miri), + not(loom) + )) { + // HACK(stjepang): On x86 architectures there are two different ways of executing + // a `SeqCst` fence. + // + // 1. `atomic::fence(SeqCst)`, which compiles into a `mfence` instruction. + // 2. `_.compare_exchange(_, _, SeqCst, SeqCst)`, which compiles into a `lock cmpxchg` instruction. + // + // Both instructions have the effect of a full barrier, but empirical benchmarks have shown + // that the second one is sometimes a bit faster. + // + // The ideal solution here would be to use inline assembly, but we're instead creating a + // temporary atomic variable and compare-and-exchanging its value. No sane compiler to + // x86 platforms is going to optimize this away. + sync::atomic::compiler_fence(Ordering::SeqCst); + let a = AtomicUsize::new(0); + let _ = a.compare_exchange(0, 1, Ordering::SeqCst, Ordering::SeqCst); + sync::atomic::compiler_fence(Ordering::SeqCst); + } else { + sync::atomic::fence(Ordering::SeqCst); + } } diff --git a/src/sync.rs b/src/sync.rs index 1b892d5..5ec0cee 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -1,9 +1,14 @@ #[cfg(not(loom))] mod sync_impl { - pub(crate) use core::sync::atomic::{fence, AtomicPtr, AtomicUsize, Ordering}; + #[cfg(not(feature = "portable-atomic"))] + pub(crate) use core::sync::atomic; + #[cfg(feature = "portable-atomic")] + pub(crate) use portable_atomic as atomic; #[cfg(feature = "std")] pub(crate) use parking::{pair, Unparker}; + #[cfg(feature = "std")] + pub(crate) use std::thread; pub(crate) trait AtomicWithMut { type Item; @@ -11,7 +16,7 @@ mod sync_impl { fn with_mut R>(&mut self, f: F) -> R; } - impl AtomicWithMut for AtomicUsize { + impl AtomicWithMut for atomic::AtomicUsize { type Item = usize; fn with_mut R>(&mut self, f: F) -> R { @@ -19,7 +24,7 @@ mod sync_impl { } } - impl AtomicWithMut for AtomicPtr { + impl AtomicWithMut for atomic::AtomicPtr { type Item = *mut T; fn with_mut R>(&mut self, f: F) -> R { @@ -46,21 +51,41 @@ mod sync_impl { #[cfg(loom)] mod sync_impl { pub(crate) use loom::cell::UnsafeCell; - pub(crate) use loom::sync::atomic::{fence, AtomicPtr, AtomicUsize, Ordering}; + pub(crate) use loom::thread; + + pub(crate) mod atomic { + pub(crate) use core::sync::atomic::compiler_fence; + pub(crate) use loom::sync::atomic::*; + } + + use loom::sync::{Arc, Condvar, Mutex}; /// Re-implementation of `parking::pair` based on loom. pub(crate) fn pair() -> (Parker, Unparker) { - let th = loom::thread::current(); + let inner = Arc::new(Inner { + mutex: Mutex::new(false), + condvar: Condvar::new(), + }); - (Parker(Default::default()), Unparker(th)) + (Parker(inner.clone()), Unparker(inner)) } /// Re-implementation of `parking::Parker` based on loom. - pub(crate) struct Parker(core::marker::PhantomData<*mut ()>); + pub(crate) struct Parker(Arc); impl Parker { pub(crate) fn park(&self) { - loom::thread::park(); + let mut state = self.0.mutex.lock().unwrap(); + + loop { + // If we haven't been notified, wait. + if *state { + *state = false; + break; + } else { + state = self.0.condvar.wait(state).unwrap(); + } + } } // park_timeout is not available in loom @@ -68,14 +93,27 @@ mod sync_impl { /// Re-implementation of `parking::Unparker` based on loom. #[derive(Clone)] - pub(crate) struct Unparker(loom::thread::Thread); + pub(crate) struct Unparker(Arc); impl Unparker { pub(crate) fn unpark(&self) { - self.0.unpark(); + let mut state = self.0.mutex.lock().unwrap(); + *state = true; + drop(state); + + self.0.condvar.notify_one(); } } + /// Internals of `Parker` and `Unparker. + struct Inner { + /// The mutex used to synchronize access to the state. + mutex: Mutex, + + /// The condition variable used to notify the thread. + condvar: Condvar, + } + #[allow(dead_code)] pub(crate) trait AtomicWithMut {} } From 50767acc0d6f2785decf2818dccea94834ab7d20 Mon Sep 17 00:00:00 2001 From: jtnunley Date: Tue, 20 Sep 2022 10:25:54 -0700 Subject: [PATCH 09/11] Fix thumbv miscompilation --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 8e41c3b..f2c9d2f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,7 +23,7 @@ portable-atomic = { version = "0.3", default-features = false, optional = true } # change at any time. [target.'cfg(loom)'.dependencies] loom = { version = "0.5", optional = true } -concurrent-queue = { git = "https://github.com/smol-rs/concurrent-queue.git", branch = "loom", features = ["loom"] } +concurrent-queue = { git = "https://github.com/smol-rs/concurrent-queue.git", branch = "loom", default-features = false, features = ["loom"] } [features] default = ["std"] From efad495e7af4208074c735de727b642be1a8953d Mon Sep 17 00:00:00 2001 From: jtnunley Date: Fri, 23 Sep 2022 08:50:23 -0700 Subject: [PATCH 10/11] Documentation typo --- src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index f211a46..e3b182c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -64,7 +64,7 @@ //! //! # Features //! -//! There is also a `portable-atomic` feature, which uses a polyfill from the +//! There is a `portable-atomic` feature, which uses a polyfill from the //! [`portable-atomic`] crate to provide atomic operations on platforms that do not support them. //! See the [`README`] for the [`portable-atomic`] crate for more information on how to use it on //! single-threaded targets. Note that even with this feature enabled, `event-listener` still From 0e7ca36e11a372d2a65e7ea0831eecf21da5492e Mon Sep 17 00:00:00 2001 From: jtnunley Date: Mon, 3 Oct 2022 11:36:07 -0700 Subject: [PATCH 11/11] Try to optimize the new algorithm --- benches/bench.rs | 12 ++ src/lib.rs | 367 +++--------------------------------- src/listener.rs | 478 +++++++++++++++++++++++++++++++++++++++++++++++ tests/loom.rs | 290 ++++++++++++++++++++++++++++ 4 files changed, 811 insertions(+), 336 deletions(-) create mode 100644 src/listener.rs create mode 100644 tests/loom.rs diff --git a/benches/bench.rs b/benches/bench.rs index 26840e5..c65b0aa 100644 --- a/benches/bench.rs +++ b/benches/bench.rs @@ -20,6 +20,18 @@ fn bench_events(c: &mut Criterion) { } }); }); + + c.bench_function("notify_single", |b| { + let ev = Event::new(); + + b.iter(|| { + for _ in 0..COUNT { + let handle = ev.listen(); + ev.notify(1); + handle.wait(); + } + }); + }); } criterion_group!(benches, bench_events); diff --git a/src/lib.rs b/src/lib.rs index e3b182c..6c4af8d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -81,18 +81,17 @@ extern crate alloc; #[cfg(feature = "std")] extern crate std; +mod listener; mod sync; use alloc::sync::Arc; use core::fmt; use core::future::Future; -use core::mem::{forget, ManuallyDrop, MaybeUninit}; -use core::ptr; -use core::task::{Context, Poll, Waker}; +use core::mem::ManuallyDrop; +use core::ptr::{self, NonNull}; +use core::task::{Context, Poll}; -#[cfg(feature = "std")] -use std::panic::{RefUnwindSafe, UnwindSafe}; #[cfg(all(feature = "std", not(loom)))] use std::time::Duration; #[cfg(feature = "std")] @@ -100,20 +99,25 @@ use std::time::Instant; use concurrent_queue::ConcurrentQueue; -use sync::atomic::Ordering::{self, AcqRel, Acquire, Release, SeqCst}; +use listener::{CachedListener, Listener, Task}; +use sync::atomic::Ordering::{self, AcqRel, Acquire, Release}; use sync::atomic::{AtomicPtr, AtomicUsize}; -use sync::UnsafeCell; #[cfg(not(loom))] use sync::AtomicWithMut; #[cfg(feature = "std")] -use sync::{pair, Unparker}; +use sync::pair; /// Inner state for an `Event`. struct Inner { /// The queue that contains the listeners. - queue: ConcurrentQueue>, + /// + /// The pointers to the listeners are either heap-allocated or stored in the cache. + queue: ConcurrentQueue>, + + /// A cached listener, used to avoid unnecessary heap allocation. + cached: CachedListener, /// The number of non-notified entries in the queue. len: AtomicUsize, @@ -172,8 +176,8 @@ impl Event { let inner = unsafe { ManuallyDrop::new(Arc::from_raw(self.inner())) }; // Create a new listener in the queue. - let entry = Arc::new(Listener::default()); - inner.queue.push(entry.clone()).ok(); + let entry = Listener::alloc(&inner); + inner.queue.push(entry).ok(); let listener = EventListener { inner: Arc::clone(&*inner), @@ -373,6 +377,7 @@ impl Event { // Allocate on the heap. let new = Arc::new(Inner { queue: ConcurrentQueue::unbounded(), + cached: CachedListener::default(), len: AtomicUsize::new(0), notified: AtomicUsize::new(0), }); @@ -414,9 +419,14 @@ impl Drop for Event { // Drop the inner state. self.inner.with_mut(|inner| { if !inner.is_null() { - unsafe { - drop(Arc::::from_raw(*inner)); + // Get the Arc and notify all remaining listeners. + let inner = unsafe { Arc::::from_raw(*inner) }; + + while let Ok(listener) = inner.queue.pop() { + Listener::notify(listener, false, &inner); } + + drop(inner); } }); } @@ -428,7 +438,7 @@ pub struct EventListener { inner: Arc, /// The specific entry that this listener is listening on. - entry: Option>, + entry: Option>, } impl EventListener { @@ -493,7 +503,7 @@ impl EventListener { self.inner.len.fetch_sub(1, Release); if let Some(entry) = self.entry.take() { - if let Some(additional) = entry.orphan() { + if let Some(additional) = Listener::orphan(entry, &self.inner) { // Decrement the number of notified entries. self.inner.notified.fetch_sub(1, Release); return Some(additional); @@ -570,7 +580,7 @@ impl EventListener { fn wait_internal(mut self, deadline: Option) -> bool { // Take out the entry. - let entry = self.entry.as_ref().expect("already waited"); + let entry = self.entry.expect("already waited"); // Create a parker/unparker pair. let (parker, unparker) = pair(); @@ -579,7 +589,7 @@ impl EventListener { let notified = loop { let unparker = unparker.clone(); - if entry.wait(move || Task::Thread(unparker)) { + if Listener::wait(entry, move || Task::Thread(unparker)) { // We have been notified. break true; } @@ -622,9 +632,9 @@ impl Future for EventListener { type Output = (); fn poll(mut self: core::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let entry = self.entry.as_ref().expect("already waited"); + let entry = self.entry.expect("already waited"); - if entry.wait(|| Task::Waker(cx.waker().clone())) { + if Listener::wait(entry, || Task::Waker(cx.waker().clone())) { self.orphan(); Poll::Ready(()) } else { @@ -664,6 +674,7 @@ impl Inner { self.notify_internal(n, true) } + #[cold] fn notify_internal(&self, mut n: usize, additional: bool) { while n > 0 { n -= 1; @@ -672,7 +683,7 @@ impl Inner { match self.queue.pop() { Err(_) => break, Ok(entry) => { - if entry.notify(additional) { + if Listener::notify(entry, additional, self) { // Increment the number of notified entries. self.notified.fetch_add(1, Release); } @@ -682,322 +693,6 @@ impl Inner { } } -/// The internal listener for the `Event`. -struct Listener { - /// The current state of the listener. - state: AtomicUsize, - - /// The task that this listener is blocked on. - task: UnsafeCell>, -} - -unsafe impl Send for Listener {} -unsafe impl Sync for Listener {} - -#[cfg(feature = "std")] -impl UnwindSafe for Listener {} -#[cfg(feature = "std")] -impl RefUnwindSafe for Listener {} - -impl Default for Listener { - fn default() -> Self { - Self { - state: AtomicUsize::new(State::Created.into()), - task: UnsafeCell::new(MaybeUninit::uninit()), - } - } -} - -impl Listener { - /// Begin waiting on this `Listener`. - /// - /// Returns `true` if the listener was notified. - fn wait(&self, task: impl FnOnce() -> Task) -> bool { - let mut state = self.state.load(Acquire).into(); - - loop { - match state { - State::Created | State::Task => { - // Try to "lock" the listener. - if let Err(e) = self.state.compare_exchange( - state.into(), - State::WritingTask.into(), - SeqCst, - SeqCst, - ) { - state = e.into(); - busy_wait(); - continue; - } - - // We now hold the "lock" on the task slot. Write the task to the slot. - let guard = ResetState(&self.state, state); - let task = task(); - forget(guard); - - let task = self.task.with_mut(|slot| unsafe { - // If there already was a task, swap it out and wake it. - if state == State::Task { - Some(ptr::replace(slot.cast(), task)) - } else { - ptr::write(slot.cast(), task); - None - } - }); - - // We are done writing to the task slot. Transition to `Task`. - // No other thread can transition to `Task` from `WritingTask`. - self.state.store(State::Task.into(), Release); - - // If we yielded a task, wake it now. - if let Some(task) = task { - task.wake(); - } - - // Now, we should wait for a notification. - return false; - } - State::WritingTask => { - // We must be in the process of being woken up. Wait for the task to be written. - busy_wait(); - } - State::Notified | State::NotifiedAdditional => { - // We were already notified. We are done. - return true; - } - State::Orphaned => unreachable!("orphaned listener"), - } - - /// The task() closure may clone a user-defined waker, which can panic. - /// - /// This panic would leave the listener in the `WritingTask` state, which will - /// lead to an infinite loop. This guard resets it back to the original state. - struct ResetState<'a>(&'a AtomicUsize, State); - - impl Drop for ResetState<'_> { - fn drop(&mut self) { - self.0.store(self.1.into(), Release); - } - } - } - } - - /// Notify this `Listener`. - /// - /// Returns `true` if the listener was successfully notified. - fn notify(&self, additional: bool) -> bool { - let new_state = if additional { - State::NotifiedAdditional - } else { - State::Notified - }; - - loop { - let state = self.state.load(Acquire).into(); - - // Determine what state we're in. - match state { - State::Created | State::Notified | State::NotifiedAdditional => { - // Indicate that the listener was notified. - if self - .state - .compare_exchange(state.into(), new_state.into(), AcqRel, Acquire) - .is_ok() - { - return true; - } - } - State::WritingTask => { - // The listener is currently writing the task, wait until they finish. - } - State::Task => { - if self - .state - .compare_exchange( - State::Task.into(), - State::WritingTask.into(), - AcqRel, - Acquire, - ) - .is_err() - { - // Someone else got to it before we did. - continue; - } - - // SAFETY: Since we hold the lock, we can now read out the primitive. - let task = self - .task - .with_mut(|task| unsafe { ptr::read(task.cast::()) }); - - // SAFETY: No other code makes a change when `WritingTask` is detected. - self.state.store(new_state.into(), Release); - - // Wake the task up and return. - task.wake(); - return true; - } - State::Orphaned => { - // This task is no longer being monitored. - return false; - } - } - - busy_wait(); - } - } - - /// Orphan this `Listener`. - /// - /// Returns `Some` if a notification was discarded. The `bool` is `true` - /// if the notification was an additional notification. - fn orphan(&self) -> Option { - let mut state = self.state.load(Acquire).into(); - - loop { - match state { - State::Created | State::Notified | State::NotifiedAdditional => { - // Indicate that the listener was notified. - if let Err(new_state) = self.state.compare_exchange( - state.into(), - State::Orphaned.into(), - AcqRel, - Acquire, - ) { - // We failed to transition to `Orphaned`. Try again. - state = new_state.into(); - busy_wait(); - continue; - } - - match state { - State::Notified => return Some(false), - State::NotifiedAdditional => return Some(true), - _ => return None, - } - } - State::WritingTask => { - // We're in the middle of being notified, wait for them to finish writing first. - } - State::Task => { - if let Err(new_state) = self.state.compare_exchange( - State::Task.into(), - State::WritingTask.into(), - AcqRel, - Acquire, - ) { - // Someone else got to it before we did. - state = new_state.into(); - busy_wait(); - continue; - } - - // SAFETY: Since we hold the lock, we can now drop the primitive. - self.task - .with_mut(|task| unsafe { ptr::drop_in_place(task.cast::()) }); - - // SAFETY: No other code makes a change when `WritingTask` is detected. - self.state.store(State::Orphaned.into(), Release); - - // We did not discard a notification. - return None; - } - State::Orphaned => { - // We have already been orphaned. - return None; - } - } - - busy_wait(); - state = self.state.load(Acquire).into(); - } - } -} - -impl Drop for Listener { - fn drop(&mut self) { - let Self { - ref mut state, - ref mut task, - } = self; - - state.with_mut(|state| { - if let State::Task = (*state).into() { - // We're still holding onto a task, drop it. - task.with_mut(|task| unsafe { ptr::drop_in_place(task.cast::()) }); - } - }); - } -} - -/// The state that a `Listener` can be in. -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -#[repr(usize)] -enum State { - /// The listener was just created. - Created, - - /// The listener has been notified through `notify()`. - Notified, - - /// The listener has been notified through `notify_additional()`. - NotifiedAdditional, - - /// The listener is being used to hold a task. - /// - /// If the listener is in this state, then the `task` field is guaranteed to - /// be initialized. - Task, - - /// The `task` field is being written to. - WritingTask, - - /// The listener is being dropped. - Orphaned, -} - -impl From for State { - fn from(val: usize) -> Self { - match val { - 0 => State::Created, - 1 => State::Notified, - 2 => State::NotifiedAdditional, - 3 => State::Task, - 4 => State::WritingTask, - 5 => State::Orphaned, - _ => unreachable!("invalid state"), - } - } -} - -impl From for usize { - fn from(state: State) -> Self { - state as usize - } -} - -/// The task to wake up once a notification is received. -enum Task { - /// The task is an async task waiting on a `Waker`. - Waker(Waker), - - /// The task is a thread blocked on the `Unparker`. - #[cfg(feature = "std")] - Thread(Unparker), -} - -impl Task { - fn wake(self) { - match self { - Self::Waker(waker) => waker.wake(), - #[cfg(feature = "std")] - Self::Thread(unparker) => { - unparker.unpark(); - } - } - } -} - /// Indicate to the compiler/scheduler that we're in a spin loop. fn busy_wait() { #[cfg(feature = "std")] diff --git a/src/listener.rs b/src/listener.rs new file mode 100644 index 0000000..2495026 --- /dev/null +++ b/src/listener.rs @@ -0,0 +1,478 @@ +//! Implementation of the inner listener primitive. +//! +//! This is stored in a `ConcurrentQueue` inside of an event and in the `EventListener` handle. +//! This means that the maximum refcount of the event is 2. In addition, it will either be +//! allocated to the heap or to a buffer inside of the `Event` itself. +//! +//! This module aims to create a primitive that fulfills all of these requirements while being +//! safe to use from the main module. + +use super::Inner; + +use alloc::boxed::Box; + +use core::cell::UnsafeCell as StdUnsafeCell; +use core::mem::{forget, MaybeUninit}; +use core::ptr::{self, NonNull}; +use core::task::Waker; + +#[cfg(feature = "std")] +use std::panic::{RefUnwindSafe, UnwindSafe}; + +use crate::busy_wait; +use crate::sync::atomic::Ordering::{AcqRel, Acquire, Release, SeqCst}; +use crate::sync::atomic::{fence, AtomicBool, AtomicUsize}; +use crate::sync::UnsafeCell; + +#[cfg(not(loom))] +use crate::sync::AtomicWithMut; + +#[cfg(feature = "std")] +use crate::sync::Unparker; + +/// The internal listener for the `Event`. +pub(crate) struct Listener { + /// The current state of the listener. + state: AtomicUsize, + + /// The task that this listener is blocked on. + task: UnsafeCell>, +} + +/// A cached `Listener` on the stack. +pub(crate) struct CachedListener { + /// Whether or not the listener is cached. + cached: AtomicBool, + + /// The cached listener. + listener: StdUnsafeCell>, +} + +unsafe impl Send for Listener {} +unsafe impl Sync for Listener {} + +#[cfg(feature = "std")] +impl UnwindSafe for Listener {} +#[cfg(feature = "std")] +impl RefUnwindSafe for Listener {} + +impl Default for Listener { + fn default() -> Self { + // Assume that listeners start off as queued. + Self { + state: AtomicUsize::new(ListenerStatus::Created as usize | QUEUED_MASK), + task: UnsafeCell::new(MaybeUninit::uninit()), + } + } +} + +impl Default for CachedListener { + fn default() -> Self { + Self { + cached: AtomicBool::new(false), + listener: MaybeUninit::uninit().into(), + } + } +} + +impl Listener { + /// Allocate a new `Listener`. + pub(crate) fn alloc(event: &Inner) -> NonNull { + // If the cache is open, then we don't have to allocate. + if !event.cached.cached.swap(true, Acquire) { + // SAFETY: The cache is open and the queue is empty, so we know that the cache is + // valid to write to. + let ptr = event.cached.listener.get(); + unsafe { + ptr.write(MaybeUninit::new(Listener::default())); + return NonNull::new_unchecked(ptr as *mut Listener); + } + } + + // The cache is unavailable, so we have to allocate. + let listener = Box::new(Listener::default()); + let ptr = Box::into_raw(listener); + + unsafe { NonNull::new_unchecked(ptr) } + } + + /// Begin waiting on this `Listener`. + /// + /// Returns `true` if the listener was notified. + pub(crate) fn wait(this: NonNull, task: impl FnOnce() -> Task) -> bool { + let this = unsafe { this.as_ref() }; + let mut state: State = this.state.load(Acquire).into(); + + loop { + match state.status { + ListenerStatus::Created | ListenerStatus::Task => { + // Try to "lock" the listener. + let writing_state = State { + status: ListenerStatus::WritingTask, + ..state + }; + if let Err(e) = this.state.compare_exchange( + state.into(), + writing_state.into(), + SeqCst, + SeqCst, + ) { + state = e.into(); + busy_wait(); + continue; + } + + // We now hold the "lock" on the task slot. Write the task to the slot. + let guard = ResetState(&this.state, state); + let task = task(); + forget(guard); + + let task = this.task.with_mut(|slot| unsafe { + // If there already was a task, swap it out and wake it. + if state.status == ListenerStatus::Task { + Some(ptr::replace(slot.cast(), task)) + } else { + ptr::write(slot.cast(), task); + None + } + }); + + // We are done writing to the task slot. Transition to `Task`. + // No other thread can transition to `Task` from `WritingTask`. + state.status = ListenerStatus::Task; + this.state.store(state.into(), Release); + + // If we yielded a task, wake it now. + if let Some(task) = task { + task.wake(); + } + + // Now, we should wait for a notification. + return false; + } + ListenerStatus::WritingTask => { + // We must be in the process of being woken up. Wait for the task to be written. + busy_wait(); + } + ListenerStatus::Notified | ListenerStatus::NotifiedAdditional => { + // We were already notified. We are done. + return true; + } + ListenerStatus::Orphaned => { + // The event was dropped. We are done. + return false; + } + } + + /// The task() closure may clone a user-defined waker, which can panic. + /// + /// This panic would leave the listener in the `WritingTask` state, which will + /// lead to an infinite loop. This guard resets it back to the original state. + struct ResetState<'a>(&'a AtomicUsize, State); + + impl Drop for ResetState<'_> { + fn drop(&mut self) { + self.0.store(self.1.into(), Release); + } + } + } + } + + /// Notify this `Listener`. + /// + /// Returns `true` if the listener was successfully notified. This implicity deques the listener and + /// may destroy this listener if the top-level EventListener has already orphaned it. + pub(crate) fn notify(entry: NonNull, additional: bool, event: &Inner) -> bool { + let this = unsafe { entry.as_ref() }; + + let new_state = if additional { + ListenerStatus::NotifiedAdditional + } else { + ListenerStatus::Notified + }; + let new_state = State { + status: new_state, + queued: false, + }; + + let mut state: State = this.state.load(Acquire).into(); + + let notified = loop { + // Determine what state we're in. + match state.status { + ListenerStatus::Created + | ListenerStatus::Notified + | ListenerStatus::NotifiedAdditional => { + // Indicate that the listener was notified and is now unqueued. + if let Err(e) = + this.state + .compare_exchange(state.into(), new_state.into(), AcqRel, Acquire) + { + // Someone got to it before we did, try again. + state = e.into(); + continue; + } else { + // We successfully notified the listener. + break true; + } + } + ListenerStatus::WritingTask => { + // The listener is currently writing the task, wait until they finish. + } + ListenerStatus::Task => { + // We need to wake the listener before we can do anything else. + let writing_state = State { + queued: state.queued, + status: ListenerStatus::WritingTask, + }; + + if let Err(e) = this.state.compare_exchange( + state.into(), + writing_state.into(), + AcqRel, + Acquire, + ) { + // Someone else got to it before we did. + state = e.into(); + busy_wait(); + continue; + } + + // SAFETY: Since we hold the lock, we can now read out the primitive. + let task = this + .task + .with_mut(|task| unsafe { ptr::read(task.cast::()) }); + + // SAFETY: No other code makes a change when `WritingTask` is detected. + this.state.store(new_state.into(), Release); + + // Wake the task up and return. + task.wake(); + break true; + } + ListenerStatus::Orphaned => { + // This task is no longer being monitored. + break false; + } + } + }; + + // If the listener orphaned this listener, we need to destroy it. + if let ListenerStatus::Orphaned = state.status { + // Create a memory fence and then destroy it. + fence(Acquire); + Self::destroy(entry, event); + } + + notified + } + + /// Orphan this `Listener`. + /// + /// Returns `Some` if a notification was discarded. The `bool` is `true` + /// if the notification was an additional notification. If this entry was already + /// removed from the queue, this destroys the entry as well. + pub(crate) fn orphan(entry: NonNull, event: &Inner) -> Option { + let this = unsafe { entry.as_ref() }; + let mut state: State = this.state.load(Acquire).into(); + + let result = loop { + match state.status { + ListenerStatus::Created + | ListenerStatus::Notified + | ListenerStatus::NotifiedAdditional => { + let new_state = State { + status: ListenerStatus::Orphaned, + queued: state.queued, + }; + + // Indicate that the listener was notified. + if let Err(new_state) = + this.state + .compare_exchange(state.into(), new_state.into(), AcqRel, Acquire) + { + // We failed to transition to `Orphaned`. Try again. + state = new_state.into(); + busy_wait(); + continue; + } + + match state.status { + ListenerStatus::Notified => break Some(false), + ListenerStatus::NotifiedAdditional => break Some(true), + _ => break None, + } + } + ListenerStatus::WritingTask => { + // We're in the middle of being notified, wait for them to finish writing first. + } + ListenerStatus::Task => { + let writing_state = State { + queued: state.queued, + status: ListenerStatus::WritingTask, + }; + + if let Err(new_state) = this.state.compare_exchange( + state.into(), + writing_state.into(), + AcqRel, + Acquire, + ) { + // Someone else got to it before we did. + state = new_state.into(); + busy_wait(); + continue; + } + + // SAFETY: Since we hold the lock, we can now drop the primitive. + this.task + .with_mut(|task| unsafe { ptr::drop_in_place(task.cast::()) }); + + // SAFETY: No other code makes a change when `WritingTask` is detected. + state.status = ListenerStatus::Orphaned; + this.state.store(state.into(), Release); + + // We did not discard a notification. + break None; + } + ListenerStatus::Orphaned => { + // We have already been orphaned. + break None; + } + } + + busy_wait(); + state = this.state.load(Acquire).into(); + }; + + // If the entry is dequeued, we need to destroy it. + if !state.queued { + fence(Acquire); + Self::destroy(entry, event); + } + + // At this point, if we are no longer queued, we can drop the listener. + result + } + + fn destroy(entry: NonNull, event: &Inner) { + // If this pointer is equal to the cache pointer, we can just clear the cache. + let cache_ptr = event.cached.listener.get().cast(); + if entry.as_ptr() == cache_ptr { + unsafe { + ptr::drop_in_place(cache_ptr); + } + + // Mark the cache as empty. + event.cached.cached.store(false, Release); + + return; + } + + drop(unsafe { Box::from_raw(entry.as_ptr()) }); + } +} + +impl Drop for Listener { + fn drop(&mut self) { + let Self { + ref mut state, + ref mut task, + } = self; + + state.with_mut(|state| { + if let ListenerStatus::Task = State::from(*state).status { + // We're still holding onto a task, drop it. + task.with_mut(|task| unsafe { ptr::drop_in_place(task.cast::()) }); + } + }); + } +} + +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +struct State { + /// The status associated with the top-level EventListener. + status: ListenerStatus, + + /// The status associated with the queue. + queued: bool, +} + +/// The state that a `Listener` can be in. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +#[repr(usize)] +enum ListenerStatus { + /// The listener was just created. + Created, + + /// The listener has been notified through `notify()`. + Notified, + + /// The listener has been notified through `notify_additional()`. + NotifiedAdditional, + + /// The listener is being used to hold a task. + /// + /// If the listener is in this state, then the `task` field is guaranteed to + /// be initialized. + Task, + + /// The `task` field is being written to. + WritingTask, + + /// The listener is being dropped. + Orphaned, +} + +const LISTENER_STATUS_MASK: usize = 0b1111; +const QUEUED_SHIFT: usize = 4; +const QUEUED_MASK: usize = 0b1 << QUEUED_SHIFT; + +impl From for State { + fn from(val: usize) -> Self { + let status = match val & LISTENER_STATUS_MASK { + 0 => ListenerStatus::Created, + 1 => ListenerStatus::Notified, + 2 => ListenerStatus::NotifiedAdditional, + 3 => ListenerStatus::Task, + 4 => ListenerStatus::WritingTask, + 5 => ListenerStatus::Orphaned, + _ => unreachable!("invalid state"), + }; + + let queued = (val & QUEUED_MASK) != 0; + + Self { status, queued } + } +} + +impl From for usize { + fn from(state: State) -> Self { + let status = state.status as usize; + let queued = if state.queued { QUEUED_MASK } else { 0 }; + + status | queued + } +} + +/// The task to wake up once a notification is received. +pub(crate) enum Task { + /// The task is an async task waiting on a `Waker`. + Waker(Waker), + + /// The task is a thread blocked on the `Unparker`. + #[cfg(feature = "std")] + Thread(Unparker), +} + +impl Task { + fn wake(self) { + match self { + Self::Waker(waker) => waker.wake(), + #[cfg(feature = "std")] + Self::Thread(unparker) => { + unparker.unpark(); + } + } + } +} diff --git a/tests/loom.rs b/tests/loom.rs new file mode 100644 index 0000000..c3aaa19 --- /dev/null +++ b/tests/loom.rs @@ -0,0 +1,290 @@ +#![cfg(loom)] + +use concurrent_queue::{ConcurrentQueue, PopError, PushError}; +use event_listener::{Event, EventListener}; +use loom::cell::UnsafeCell; +use loom::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use loom::sync::Arc; +use loom::thread; +use std::ops; + +/// A basic MPMC channel based on a ConcurrentQueue, Event, and loom primitives. +struct Channel { + /// The queue used to contain items. + queue: ConcurrentQueue, + + /// The number of senders. + senders: AtomicUsize, + + /// The number of receivers. + receivers: AtomicUsize, + + /// The event that is signaled when a new item is pushed. + push_event: Event, + + /// The event that is signaled when a new item is popped. + pop_event: Event, +} + +/// The sending side of a channel. +struct Sender { + /// The channel. + channel: Arc>, +} + +/// The receiving side of a channel. +struct Receiver { + /// The channel. + channel: Arc>, +} + +/// Create a new pair of senders/receivers based on a queue. +fn pair() -> (Sender, Receiver) { + let channel = Arc::new(Channel { + queue: ConcurrentQueue::bounded(10), + senders: AtomicUsize::new(1), + receivers: AtomicUsize::new(1), + push_event: Event::new(), + pop_event: Event::new(), + }); + + ( + Sender { + channel: channel.clone(), + }, + Receiver { channel }, + ) +} + +impl Clone for Sender { + fn clone(&self) -> Self { + self.channel.senders.fetch_add(1, Ordering::SeqCst); + Sender { + channel: self.channel.clone(), + } + } +} + +impl Drop for Sender { + fn drop(&mut self) { + if self.channel.senders.fetch_sub(1, Ordering::SeqCst) == 1 { + // Close the channel and notify the receivers. + self.channel.queue.close(); + self.channel.push_event.notify_additional(core::usize::MAX); + } + } +} + +impl Clone for Receiver { + fn clone(&self) -> Self { + self.channel.receivers.fetch_add(1, Ordering::SeqCst); + Receiver { + channel: self.channel.clone(), + } + } +} + +impl Drop for Receiver { + fn drop(&mut self) { + if self.channel.receivers.fetch_sub(1, Ordering::SeqCst) == 1 { + // Close the channel and notify the senders. + self.channel.queue.close(); + self.channel.pop_event.notify_additional(core::usize::MAX); + } + } +} + +impl Sender { + /// Send a value. + /// + /// Returns an error with the value if the channel is closed. + fn send(&self, mut value: T) -> Result<(), T> { + let mut listener = None; + + loop { + match self.channel.queue.push(value) { + Ok(()) => { + // Notify a single receiver. + self.channel.push_event.notify_additional(1); + return Ok(()); + } + Err(PushError::Closed(val)) => return Err(val), + Err(PushError::Full(val)) => { + // Wait for a receiver to pop an item. + match listener.take() { + Some(listener) => listener.wait(), + None => { + listener = Some(self.channel.pop_event.listen()); + } + } + } + } + } + } +} + +impl Receiver { + /// Receive a value. + /// + /// Returns an error if the channel is closed. + fn recv(&self) -> Result { + let mut listener = None; + + loop { + match self.channel.queue.pop() { + Ok(value) => { + // Notify a single sender. + self.channel.pop_event.notify_additional(1); + return Ok(value); + } + Err(PopError::Closed) => return Err(()), + Err(PopError::Empty) => { + // Wait for a sender to push an item. + match listener.take() { + Some(listener) => listener.wait(), + None => { + listener = Some(self.channel.push_event.listen()); + } + } + } + } + } + } +} + +/// A basic Mutex based on Event and loom primitives. +struct Mutex { + /// The inner value. + value: UnsafeCell, + + /// The event that is signaled when the value is unlocked. + event: Event, + + /// Is this mutex locked? + locked: AtomicBool, +} + +/// A guard that unlocks the mutex when dropped. +struct MutexGuard<'a, T> { + /// The mutex. + mutex: &'a Mutex, +} + +impl Mutex { + /// Create a new mutex. + fn new(value: T) -> Mutex { + Mutex { + value: UnsafeCell::new(value), + event: Event::new(), + locked: AtomicBool::new(false), + } + } + + /// Lock the mutex. + fn lock(&self) -> MutexGuard<'_, T> { + let mut listener = None; + + loop { + match self + .locked + .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) + { + Ok(_) => return MutexGuard { mutex: self }, + Err(_) => { + // Wait for the mutex to be unlocked. + match listener.take() { + Some(listener) => listener.wait(), + None => { + listener = Some(self.event.listen()); + } + } + } + } + } + } + + /// Get the inner value. + fn into_inner(self) -> T { + self.value.into_inner() + } +} + +impl<'a, T> MutexGuard<'a, T> { + fn with(&mut self, f: impl FnOnce(&mut T) -> R) -> R { + f(unsafe { &mut *self.mutex.value.get() }) + } +} + +impl Drop for MutexGuard<'_, T> { + fn drop(&mut self) { + self.mutex.locked.store(false, Ordering::SeqCst); + self.mutex.event.notify(1); + } +} + +#[test] +fn spsc() { + loom::model(|| { + // Create a new pair of senders/receivers. + let (tx, rx) = pair(); + + // Push each onto a thread and run them. + let handle = thread::spawn(move || { + for i in 0..limit { + if tx.send(i).is_err() { + break; + } + } + }); + + let mut recv_values = vec![]; + + loop { + match rx.recv() { + Ok(value) => recv_values.push(value), + Err(()) => break, + } + } + + // Values may not be in order. + recv_values.sort_unstable(); + assert_eq!(recv_values, (0..limit).collect::>()); + + // Join the handle before we exit. + handle.join().unwrap(); + }); +} + +#[test] +fn contended_mutex() { + loom::model(|| { + // Create a new mutex. + let mutex = Arc::new(Mutex::new(0)); + + // Create a bunch of threads that increment the mutex. + let mut handles = vec![]; + let num_threads = loom::MAX_THREADS - 1; + + for _ in 0..num_threads { + let mutex = mutex.clone(); + let handle = thread::spawn(move || { + let mut mutex = mutex.lock(); + mutex.with(|val| { + *val += 1; + }); + }); + handles.push(handle); + } + + // Join the handles. + for handle in handles { + handle.join().unwrap(); + } + + // The mutex should have the correct value. + let value = Arc::try_unwrap(mutex) + .unwrap_or_else(|_| panic!("mutex is locked")) + .into_inner(); + assert_eq!(value, num_threads); + }); +}