From bcf8687301a2f7522eec203bbc30e4286939221b Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 14 Nov 2022 23:53:13 +0000 Subject: [PATCH 1/4] Remove excess module This appears to have been added with the intent of having a sealed trait, which was never committed. --- lightning/src/util/wakers.rs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/lightning/src/util/wakers.rs b/lightning/src/util/wakers.rs index 655fc9cf7ed..60684dcadaf 100644 --- a/lightning/src/util/wakers.rs +++ b/lightning/src/util/wakers.rs @@ -198,12 +198,10 @@ impl Future { } } -mod std_future { - use core::task::Waker; - pub struct StdWaker(pub Waker); - impl super::FutureCallback for StdWaker { - fn call(&self) { self.0.wake_by_ref() } - } +use core::task::Waker; +struct StdWaker(pub Waker); +impl FutureCallback for StdWaker { + fn call(&self) { self.0.wake_by_ref() } } /// (C-not exported) as Rust Futures aren't usable in language bindings. @@ -216,7 +214,7 @@ impl<'a> StdFuture for Future { Poll::Ready(()) } else { let waker = cx.waker().clone(); - state.callbacks.push(Box::new(std_future::StdWaker(waker))); + state.callbacks.push(Box::new(StdWaker(waker))); Poll::Pending } } From 7527e4b7df674fc2f5442514bb7b1d2294e59ce8 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 14 Nov 2022 23:49:27 +0000 Subject: [PATCH 2/4] Unset the needs-notify bit in a Notifier when a Future is fetched If a `Notifier` gets `notify()`ed and the a `Future` is fetched, even though the `Future` is marked completed from the start and the user may pass callbacks which are called, we'll never wipe the needs-notify bit in the `Notifier`. The solution is to keep track of the `FutureState` in the returned `Future` even though its `complete` from the start, adding a new flag in the `FutureState` which indicates callbacks have been made and checking that flag when waiting or returning a second `Future`. --- lightning/src/util/wakers.rs | 79 +++++++++++++++++++++++------------- 1 file changed, 51 insertions(+), 28 deletions(-) diff --git a/lightning/src/util/wakers.rs b/lightning/src/util/wakers.rs index 60684dcadaf..6d8b03cbd7c 100644 --- a/lightning/src/util/wakers.rs +++ b/lightning/src/util/wakers.rs @@ -15,7 +15,7 @@ use alloc::sync::Arc; use core::mem; -use crate::sync::{Condvar, Mutex}; +use crate::sync::{Condvar, Mutex, MutexGuard}; use crate::prelude::*; @@ -41,9 +41,22 @@ impl Notifier { } } + fn propagate_future_state_to_notify_flag(&self) -> MutexGuard<(bool, Option>>)> { + let mut lock = self.notify_pending.lock().unwrap(); + if let Some(existing_state) = &lock.1 { + if existing_state.lock().unwrap().callbacks_made { + // If the existing `FutureState` has completed and actually made callbacks, + // consider the notification flag to have been cleared and reset the future state. + lock.1.take(); + lock.0 = false; + } + } + lock + } + pub(crate) fn wait(&self) { loop { - let mut guard = self.notify_pending.lock().unwrap(); + let mut guard = self.propagate_future_state_to_notify_flag(); if guard.0 { guard.0 = false; return; @@ -61,7 +74,7 @@ impl Notifier { pub(crate) fn wait_timeout(&self, max_wait: Duration) -> bool { let current_time = Instant::now(); loop { - let mut guard = self.notify_pending.lock().unwrap(); + let mut guard = self.propagate_future_state_to_notify_flag(); if guard.0 { guard.0 = false; return true; @@ -88,17 +101,8 @@ impl Notifier { /// Wake waiters, tracking that wake needs to occur even if there are currently no waiters. pub(crate) fn notify(&self) { let mut lock = self.notify_pending.lock().unwrap(); - let mut future_probably_generated_calls = false; - if let Some(future_state) = lock.1.take() { - future_probably_generated_calls |= future_state.lock().unwrap().complete(); - future_probably_generated_calls |= Arc::strong_count(&future_state) > 1; - } - if future_probably_generated_calls { - // If a future made some callbacks or has not yet been drop'd (i.e. the state has more - // than the one reference we hold), assume the user was notified and skip setting the - // notification-required flag. This will not cause the `wait` functions above to return - // and avoid any future `Future`s starting in a completed state. - return; + if let Some(future_state) = &lock.1 { + future_state.lock().unwrap().complete(); } lock.0 = true; mem::drop(lock); @@ -107,20 +111,14 @@ impl Notifier { /// Gets a [`Future`] that will get woken up with any waiters pub(crate) fn get_future(&self) -> Future { - let mut lock = self.notify_pending.lock().unwrap(); - if lock.0 { - Future { - state: Arc::new(Mutex::new(FutureState { - callbacks: Vec::new(), - complete: true, - })) - } - } else if let Some(existing_state) = &lock.1 { + let mut lock = self.propagate_future_state_to_notify_flag(); + if let Some(existing_state) = &lock.1 { Future { state: Arc::clone(&existing_state) } } else { let state = Arc::new(Mutex::new(FutureState { callbacks: Vec::new(), - complete: false, + complete: lock.0, + callbacks_made: false, })); lock.1 = Some(Arc::clone(&state)); Future { state } @@ -153,17 +151,16 @@ impl FutureCallback for F { pub(crate) struct FutureState { callbacks: Vec>, complete: bool, + callbacks_made: bool, } impl FutureState { - fn complete(&mut self) -> bool { - let mut made_calls = false; + fn complete(&mut self) { for callback in self.callbacks.drain(..) { callback.call(); - made_calls = true; + self.callbacks_made = true; } self.complete = true; - made_calls } } @@ -180,6 +177,7 @@ impl Future { pub fn register_callback(&self, callback: Box) { let mut state = self.state.lock().unwrap(); if state.complete { + state.callbacks_made = true; mem::drop(state); callback.call(); } else { @@ -283,6 +281,28 @@ mod tests { assert!(!callback.load(Ordering::SeqCst)); } + #[test] + fn new_future_wipes_notify_bit() { + // Previously, if we were only using the `Future` interface to learn when a `Notifier` has + // been notified, we'd never mark the notifier as not-awaiting-notify if a `Future` is + // fetched after the notify bit has been set. + let notifier = Notifier::new(); + notifier.notify(); + + let callback = Arc::new(AtomicBool::new(false)); + let callback_ref = Arc::clone(&callback); + notifier.get_future().register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst)))); + assert!(callback.load(Ordering::SeqCst)); + + let callback = Arc::new(AtomicBool::new(false)); + let callback_ref = Arc::clone(&callback); + notifier.get_future().register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst)))); + assert!(!callback.load(Ordering::SeqCst)); + + notifier.notify(); + assert!(callback.load(Ordering::SeqCst)); + } + #[cfg(feature = "std")] #[test] fn test_wait_timeout() { @@ -334,6 +354,7 @@ mod tests { state: Arc::new(Mutex::new(FutureState { callbacks: Vec::new(), complete: false, + callbacks_made: false, })) }; let callback = Arc::new(AtomicBool::new(false)); @@ -352,6 +373,7 @@ mod tests { state: Arc::new(Mutex::new(FutureState { callbacks: Vec::new(), complete: false, + callbacks_made: false, })) }; future.state.lock().unwrap().complete(); @@ -389,6 +411,7 @@ mod tests { state: Arc::new(Mutex::new(FutureState { callbacks: Vec::new(), complete: false, + callbacks_made: false, })) }; let mut second_future = Future { state: Arc::clone(&future.state) }; From 5f053e43af4971d614574bb00c41a4e75fe8dbfd Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Tue, 15 Nov 2022 00:24:25 +0000 Subject: [PATCH 3/4] Wipe `Notifier` `FutureState` when returning from a waiter. When we return from one of the wait functions in `Notifier`, we should also ensure that the next `Future` doesn't start in the `complete` state, as we have already notified the user, as far as we're concerned. This is technically a regression from the previous commit, but as it is a logically separate change it is in its own commit. --- lightning/src/util/wakers.rs | 39 +++++++++++++++++++----------------- 1 file changed, 21 insertions(+), 18 deletions(-) diff --git a/lightning/src/util/wakers.rs b/lightning/src/util/wakers.rs index 6d8b03cbd7c..976ce69f885 100644 --- a/lightning/src/util/wakers.rs +++ b/lightning/src/util/wakers.rs @@ -33,6 +33,20 @@ pub(crate) struct Notifier { condvar: Condvar, } +macro_rules! check_woken { + ($guard: expr, $retval: expr) => { { + if $guard.0 { + $guard.0 = false; + if $guard.1.as_ref().map(|l| l.lock().unwrap().complete).unwrap_or(false) { + // If we're about to return as woken, and the future state is marked complete, wipe + // the future state and let the next future wait until we get a new notify. + $guard.1.take(); + } + return $retval; + } + } } +} + impl Notifier { pub(crate) fn new() -> Self { Self { @@ -57,16 +71,9 @@ impl Notifier { pub(crate) fn wait(&self) { loop { let mut guard = self.propagate_future_state_to_notify_flag(); - if guard.0 { - guard.0 = false; - return; - } + check_woken!(guard, ()); guard = self.condvar.wait(guard).unwrap(); - let result = guard.0; - if result { - guard.0 = false; - return - } + check_woken!(guard, ()); } } @@ -75,24 +82,20 @@ impl Notifier { let current_time = Instant::now(); loop { let mut guard = self.propagate_future_state_to_notify_flag(); - if guard.0 { - guard.0 = false; - return true; - } + check_woken!(guard, true); guard = self.condvar.wait_timeout(guard, max_wait).unwrap().0; + check_woken!(guard, true); // Due to spurious wakeups that can happen on `wait_timeout`, here we need to check if the // desired wait time has actually passed, and if not then restart the loop with a reduced wait // time. Note that this logic can be highly simplified through the use of // `Condvar::wait_while` and `Condvar::wait_timeout_while`, if and when our MSRV is raised to // 1.42.0. let elapsed = current_time.elapsed(); - let result = guard.0; - if result || elapsed >= max_wait { - guard.0 = false; - return result; + if elapsed >= max_wait { + return false; } match max_wait.checked_sub(elapsed) { - None => return result, + None => return false, Some(_) => continue } } From 0a1e48f9c7ebf752457b0bcd904e0ae1efb005b8 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Tue, 15 Nov 2022 00:29:10 +0000 Subject: [PATCH 4/4] Await `Future::poll` `Complete`d before unsetting notify-required When we mark a future as complete, if the user is using the `std::future::Future` impl to get notified, we shouldn't just assume we have completed the `Future` when we call the `Waker`. A `Future` may have been `drop`'d at that point (or may not be `poll`'d again) even though we wake the `Waker`. Because we now have a `callbacks_made` flag, we can fix this rather trivially, simply not setting the flag until the `Future` is `poll`'d `Complete`. --- lightning/src/util/wakers.rs | 46 ++++++++++++++++++++++++++++++++---- 1 file changed, 41 insertions(+), 5 deletions(-) diff --git a/lightning/src/util/wakers.rs b/lightning/src/util/wakers.rs index 976ce69f885..fdbc22f1166 100644 --- a/lightning/src/util/wakers.rs +++ b/lightning/src/util/wakers.rs @@ -152,16 +152,19 @@ impl FutureCallback for F { } pub(crate) struct FutureState { - callbacks: Vec>, + // When we're tracking whether a callback counts as having woken the user's code, we check the + // first bool - set to false if we're just calling a Waker, and true if we're calling an actual + // user-provided function. + callbacks: Vec<(bool, Box)>, complete: bool, callbacks_made: bool, } impl FutureState { fn complete(&mut self) { - for callback in self.callbacks.drain(..) { + for (counts_as_call, callback) in self.callbacks.drain(..) { callback.call(); - self.callbacks_made = true; + self.callbacks_made |= counts_as_call; } self.complete = true; } @@ -184,7 +187,7 @@ impl Future { mem::drop(state); callback.call(); } else { - state.callbacks.push(callback); + state.callbacks.push((true, callback)); } } @@ -212,10 +215,11 @@ impl<'a> StdFuture for Future { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut state = self.state.lock().unwrap(); if state.complete { + state.callbacks_made = true; Poll::Ready(()) } else { let waker = cx.waker().clone(); - state.callbacks.push(Box::new(StdWaker(waker))); + state.callbacks.push((false, Box::new(StdWaker(waker)))); Poll::Pending } } @@ -433,4 +437,36 @@ mod tests { assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(())); assert_eq!(Pin::new(&mut second_future).poll(&mut Context::from_waker(&second_waker)), Poll::Ready(())); } + + #[test] + fn test_dropped_future_doesnt_count() { + // Tests that if a Future gets drop'd before it is poll()ed `Ready` it doesn't count as + // having been woken, leaving the notify-required flag set. + let notifier = Notifier::new(); + notifier.notify(); + + // If we get a future and don't touch it we're definitely still notify-required. + notifier.get_future(); + assert!(notifier.wait_timeout(Duration::from_millis(1))); + assert!(!notifier.wait_timeout(Duration::from_millis(1))); + + // Even if we poll'd once but didn't observe a `Ready`, we should be notify-required. + let mut future = notifier.get_future(); + let (woken, waker) = create_waker(); + assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending); + + notifier.notify(); + assert!(woken.load(Ordering::SeqCst)); + assert!(notifier.wait_timeout(Duration::from_millis(1))); + + // However, once we do poll `Ready` it should wipe the notify-required flag. + let mut future = notifier.get_future(); + let (woken, waker) = create_waker(); + assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending); + + notifier.notify(); + assert!(woken.load(Ordering::SeqCst)); + assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(())); + assert!(!notifier.wait_timeout(Duration::from_millis(1))); + } }