From 68b3d2e4536c8057389c3cb56636e332f5beccb2 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Tue, 9 Aug 2022 01:57:09 +0000 Subject: [PATCH 1/5] Move PersistenceNotifier to a new util module It was always somewhat strange to have a bunch of notification logic in `channelmanager`, and with the next commit adding a bunch more, its moved here first. --- lightning/src/ln/channelmanager.rs | 132 +------------------------ lightning/src/util/mod.rs | 1 + lightning/src/util/wakers.rs | 150 +++++++++++++++++++++++++++++ 3 files changed, 155 insertions(+), 128 deletions(-) create mode 100644 lightning/src/util/wakers.rs diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 692d46eda90..577c3d82cfe 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -54,6 +54,8 @@ use chain::keysinterface::{Sign, KeysInterface, KeysManager, InMemorySigner, Rec use util::config::{UserConfig, ChannelConfig}; use util::events::{EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider, ClosureReason, HTLCDestination}; use util::{byte_utils, events}; +use util::crypto::sign; +use util::wakers::PersistenceNotifier; use util::scid_utils::fake_scid; use util::ser::{BigSize, FixedLengthReader, Readable, ReadableArgs, MaybeReadable, Writeable, Writer, VecWriter}; use util::logger::{Level, Logger}; @@ -64,15 +66,11 @@ use prelude::*; use core::{cmp, mem}; use core::cell::RefCell; use io::Read; -use sync::{Arc, Condvar, Mutex, MutexGuard, RwLock, RwLockReadGuard}; +use sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard}; use core::sync::atomic::{AtomicUsize, Ordering}; use core::time::Duration; use core::ops::Deref; -#[cfg(any(test, feature = "std"))] -use std::time::Instant; -use util::crypto::sign; - // We hold various information about HTLC relay in the HTLC objects in Channel itself: // // Upon receipt of an HTLC from a peer, we'll give it a PendingHTLCStatus indicating if it should @@ -5992,10 +5990,7 @@ where #[cfg(any(test, feature = "_test_utils"))] pub fn get_persistence_condvar_value(&self) -> bool { - let mutcond = &self.persistence_notifier.persistence_lock; - let &(ref mtx, _) = mutcond; - let guard = mtx.lock().unwrap(); - *guard + self.persistence_notifier.needs_persist() } /// Gets the latest best block which was connected either via the [`chain::Listen`] or @@ -6237,77 +6232,6 @@ impl } } -/// Used to signal to the ChannelManager persister that the manager needs to be re-persisted to -/// disk/backups, through `await_persistable_update_timeout` and `await_persistable_update`. -struct PersistenceNotifier { - /// Users won't access the persistence_lock directly, but rather wait on its bool using - /// `wait_timeout` and `wait`. - persistence_lock: (Mutex, Condvar), -} - -impl PersistenceNotifier { - fn new() -> Self { - Self { - persistence_lock: (Mutex::new(false), Condvar::new()), - } - } - - fn wait(&self) { - loop { - let &(ref mtx, ref cvar) = &self.persistence_lock; - let mut guard = mtx.lock().unwrap(); - if *guard { - *guard = false; - return; - } - guard = cvar.wait(guard).unwrap(); - let result = *guard; - if result { - *guard = false; - return - } - } - } - - #[cfg(any(test, feature = "std"))] - fn wait_timeout(&self, max_wait: Duration) -> bool { - let current_time = Instant::now(); - loop { - let &(ref mtx, ref cvar) = &self.persistence_lock; - let mut guard = mtx.lock().unwrap(); - if *guard { - *guard = false; - return true; - } - guard = cvar.wait_timeout(guard, max_wait).unwrap().0; - // Due to spurious wakeups that can happen on `wait_timeout`, here we need to check if the - // desired wait time has actually passed, and if not then restart the loop with a reduced wait - // time. Note that this logic can be highly simplified through the use of - // `Condvar::wait_while` and `Condvar::wait_timeout_while`, if and when our MSRV is raised to - // 1.42.0. - let elapsed = current_time.elapsed(); - let result = *guard; - if result || elapsed >= max_wait { - *guard = false; - return result; - } - match max_wait.checked_sub(elapsed) { - None => return result, - Some(_) => continue - } - } - } - - // Signal to the ChannelManager persister that there are updates necessitating persisting to disk. - fn notify(&self) { - let &(ref persist_mtx, ref cnd) = &self.persistence_lock; - let mut persistence_lock = persist_mtx.lock().unwrap(); - *persistence_lock = true; - mem::drop(persistence_lock); - cnd.notify_all(); - } -} - const SERIALIZATION_VERSION: u8 = 1; const MIN_SERIALIZATION_VERSION: u8 = 1; @@ -7355,54 +7279,6 @@ mod tests { use util::test_utils; use chain::keysinterface::KeysInterface; - #[cfg(feature = "std")] - #[test] - fn test_wait_timeout() { - use ln::channelmanager::PersistenceNotifier; - use sync::Arc; - use core::sync::atomic::AtomicBool; - use std::thread; - - let persistence_notifier = Arc::new(PersistenceNotifier::new()); - let thread_notifier = Arc::clone(&persistence_notifier); - - let exit_thread = Arc::new(AtomicBool::new(false)); - let exit_thread_clone = exit_thread.clone(); - thread::spawn(move || { - loop { - let &(ref persist_mtx, ref cnd) = &thread_notifier.persistence_lock; - let mut persistence_lock = persist_mtx.lock().unwrap(); - *persistence_lock = true; - cnd.notify_all(); - - if exit_thread_clone.load(Ordering::SeqCst) { - break - } - } - }); - - // Check that we can block indefinitely until updates are available. - let _ = persistence_notifier.wait(); - - // Check that the PersistenceNotifier will return after the given duration if updates are - // available. - loop { - if persistence_notifier.wait_timeout(Duration::from_millis(100)) { - break - } - } - - exit_thread.store(true, Ordering::SeqCst); - - // Check that the PersistenceNotifier will return after the given duration even if no updates - // are available. - loop { - if !persistence_notifier.wait_timeout(Duration::from_millis(100)) { - break - } - } - } - #[test] fn test_notify_limits() { // Check that a few cases which don't require the persistence of a new ChannelManager, diff --git a/lightning/src/util/mod.rs b/lightning/src/util/mod.rs index c6181ab269a..21976113cc1 100644 --- a/lightning/src/util/mod.rs +++ b/lightning/src/util/mod.rs @@ -21,6 +21,7 @@ pub mod ser; pub mod message_signing; pub mod invoice; pub mod persist; +pub mod wakers; pub(crate) mod atomic_counter; pub(crate) mod byte_utils; diff --git a/lightning/src/util/wakers.rs b/lightning/src/util/wakers.rs new file mode 100644 index 00000000000..ba218e4e1f5 --- /dev/null +++ b/lightning/src/util/wakers.rs @@ -0,0 +1,150 @@ +// This file is Copyright its original authors, visible in version control +// history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license +// , at your option. +// You may not use this file except in accordance with one or both of these +// licenses. + +//! Utilities which allow users to block on some future notification from LDK. These are +//! specifically used by [`ChannelManager`] to allow waiting until the [`ChannelManager`] needs to +//! be re-persisted. +//! +//! [`ChannelManager`]: crate::ln::channelmanager::ChannelManager + +use core::mem; +use core::time::Duration; +use sync::{Condvar, Mutex}; + +#[cfg(any(test, feature = "std"))] +use std::time::Instant; + +/// Used to signal to the ChannelManager persister that the manager needs to be re-persisted to +/// disk/backups, through `await_persistable_update_timeout` and `await_persistable_update`. +pub(crate) struct PersistenceNotifier { + /// Users won't access the persistence_lock directly, but rather wait on its bool using + /// `wait_timeout` and `wait`. + persistence_lock: (Mutex, Condvar), +} + +impl PersistenceNotifier { + pub(crate) fn new() -> Self { + Self { + persistence_lock: (Mutex::new(false), Condvar::new()), + } + } + + pub(crate) fn wait(&self) { + loop { + let &(ref mtx, ref cvar) = &self.persistence_lock; + let mut guard = mtx.lock().unwrap(); + if *guard { + *guard = false; + return; + } + guard = cvar.wait(guard).unwrap(); + let result = *guard; + if result { + *guard = false; + return + } + } + } + + #[cfg(any(test, feature = "std"))] + pub(crate) fn wait_timeout(&self, max_wait: Duration) -> bool { + let current_time = Instant::now(); + loop { + let &(ref mtx, ref cvar) = &self.persistence_lock; + let mut guard = mtx.lock().unwrap(); + if *guard { + *guard = false; + return true; + } + guard = cvar.wait_timeout(guard, max_wait).unwrap().0; + // Due to spurious wakeups that can happen on `wait_timeout`, here we need to check if the + // desired wait time has actually passed, and if not then restart the loop with a reduced wait + // time. Note that this logic can be highly simplified through the use of + // `Condvar::wait_while` and `Condvar::wait_timeout_while`, if and when our MSRV is raised to + // 1.42.0. + let elapsed = current_time.elapsed(); + let result = *guard; + if result || elapsed >= max_wait { + *guard = false; + return result; + } + match max_wait.checked_sub(elapsed) { + None => return result, + Some(_) => continue + } + } + } + + /// Wake waiters, tracking that persistence needs to occur. + pub(crate) fn notify(&self) { + let &(ref persist_mtx, ref cnd) = &self.persistence_lock; + let mut persistence_lock = persist_mtx.lock().unwrap(); + *persistence_lock = true; + mem::drop(persistence_lock); + cnd.notify_all(); + } + + #[cfg(any(test, feature = "_test_utils"))] + pub fn needs_persist(&self) -> bool { + let &(ref mtx, _) = &self.persistence_lock; + let guard = mtx.lock().unwrap(); + *guard + } +} + +#[cfg(test)] +mod tests { + #[cfg(feature = "std")] + #[test] + fn test_wait_timeout() { + use super::*; + use sync::Arc; + use core::sync::atomic::{AtomicBool, Ordering}; + use std::thread; + + let persistence_notifier = Arc::new(PersistenceNotifier::new()); + let thread_notifier = Arc::clone(&persistence_notifier); + + let exit_thread = Arc::new(AtomicBool::new(false)); + let exit_thread_clone = exit_thread.clone(); + thread::spawn(move || { + loop { + let &(ref persist_mtx, ref cnd) = &thread_notifier.persistence_lock; + let mut persistence_lock = persist_mtx.lock().unwrap(); + *persistence_lock = true; + cnd.notify_all(); + + if exit_thread_clone.load(Ordering::SeqCst) { + break + } + } + }); + + // Check that we can block indefinitely until updates are available. + let _ = persistence_notifier.wait(); + + // Check that the PersistenceNotifier will return after the given duration if updates are + // available. + loop { + if persistence_notifier.wait_timeout(Duration::from_millis(100)) { + break + } + } + + exit_thread.store(true, Ordering::SeqCst); + + // Check that the PersistenceNotifier will return after the given duration even if no updates + // are available. + loop { + if !persistence_notifier.wait_timeout(Duration::from_millis(100)) { + break + } + } + } +} From 47e9ca15b2d7794a681b4933dd6adc67b38b02cb Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Fri, 12 Aug 2022 23:53:50 +0000 Subject: [PATCH 2/5] Rename `PersistenceNotifier` to simply `Notifier` ... as it is no longer persistence-specific (though still only used for persistence). --- lightning/src/ln/channelmanager.rs | 16 ++++++++-------- lightning/src/util/wakers.rs | 10 +++++----- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 577c3d82cfe..3279050a079 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -55,7 +55,7 @@ use util::config::{UserConfig, ChannelConfig}; use util::events::{EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider, ClosureReason, HTLCDestination}; use util::{byte_utils, events}; use util::crypto::sign; -use util::wakers::PersistenceNotifier; +use util::wakers::Notifier; use util::scid_utils::fake_scid; use util::ser::{BigSize, FixedLengthReader, Readable, ReadableArgs, MaybeReadable, Writeable, Writer, VecWriter}; use util::logger::{Level, Logger}; @@ -790,10 +790,10 @@ pub struct ChannelManager, - persistence_notifier: PersistenceNotifier, + persistence_notifier: Notifier, keys_manager: K, @@ -833,18 +833,18 @@ enum NotifyOption { /// notify or not based on whether relevant changes have been made, providing a closure to /// `optionally_notify` which returns a `NotifyOption`. struct PersistenceNotifierGuard<'a, F: Fn() -> NotifyOption> { - persistence_notifier: &'a PersistenceNotifier, + persistence_notifier: &'a Notifier, should_persist: F, // We hold onto this result so the lock doesn't get released immediately. _read_guard: RwLockReadGuard<'a, ()>, } impl<'a> PersistenceNotifierGuard<'a, fn() -> NotifyOption> { // We don't care what the concrete F is here, it's unused - fn notify_on_drop(lock: &'a RwLock<()>, notifier: &'a PersistenceNotifier) -> PersistenceNotifierGuard<'a, impl Fn() -> NotifyOption> { + fn notify_on_drop(lock: &'a RwLock<()>, notifier: &'a Notifier) -> PersistenceNotifierGuard<'a, impl Fn() -> NotifyOption> { PersistenceNotifierGuard::optionally_notify(lock, notifier, || -> NotifyOption { NotifyOption::DoPersist }) } - fn optionally_notify NotifyOption>(lock: &'a RwLock<()>, notifier: &'a PersistenceNotifier, persist_check: F) -> PersistenceNotifierGuard<'a, F> { + fn optionally_notify NotifyOption>(lock: &'a RwLock<()>, notifier: &'a Notifier, persist_check: F) -> PersistenceNotifierGuard<'a, F> { let read_guard = lock.read().unwrap(); PersistenceNotifierGuard { @@ -1625,7 +1625,7 @@ impl ChannelMana pending_events: Mutex::new(Vec::new()), pending_background_events: Mutex::new(Vec::new()), total_consistency_lock: RwLock::new(()), - persistence_notifier: PersistenceNotifier::new(), + persistence_notifier: Notifier::new(), keys_manager, @@ -7240,7 +7240,7 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> pending_events: Mutex::new(pending_events_read), pending_background_events: Mutex::new(pending_background_events_read), total_consistency_lock: RwLock::new(()), - persistence_notifier: PersistenceNotifier::new(), + persistence_notifier: Notifier::new(), keys_manager: args.keys_manager, logger: args.logger, diff --git a/lightning/src/util/wakers.rs b/lightning/src/util/wakers.rs index ba218e4e1f5..1532133eeda 100644 --- a/lightning/src/util/wakers.rs +++ b/lightning/src/util/wakers.rs @@ -22,13 +22,13 @@ use std::time::Instant; /// Used to signal to the ChannelManager persister that the manager needs to be re-persisted to /// disk/backups, through `await_persistable_update_timeout` and `await_persistable_update`. -pub(crate) struct PersistenceNotifier { +pub(crate) struct Notifier { /// Users won't access the persistence_lock directly, but rather wait on its bool using /// `wait_timeout` and `wait`. persistence_lock: (Mutex, Condvar), } -impl PersistenceNotifier { +impl Notifier { pub(crate) fn new() -> Self { Self { persistence_lock: (Mutex::new(false), Condvar::new()), @@ -108,7 +108,7 @@ mod tests { use core::sync::atomic::{AtomicBool, Ordering}; use std::thread; - let persistence_notifier = Arc::new(PersistenceNotifier::new()); + let persistence_notifier = Arc::new(Notifier::new()); let thread_notifier = Arc::clone(&persistence_notifier); let exit_thread = Arc::new(AtomicBool::new(false)); @@ -129,7 +129,7 @@ mod tests { // Check that we can block indefinitely until updates are available. let _ = persistence_notifier.wait(); - // Check that the PersistenceNotifier will return after the given duration if updates are + // Check that the Notifier will return after the given duration if updates are // available. loop { if persistence_notifier.wait_timeout(Duration::from_millis(100)) { @@ -139,7 +139,7 @@ mod tests { exit_thread.store(true, Ordering::SeqCst); - // Check that the PersistenceNotifier will return after the given duration even if no updates + // Check that the Notifier will return after the given duration even if no updates // are available. loop { if !persistence_notifier.wait_timeout(Duration::from_millis(100)) { From 2035cebe8af3b223d51bdc8a5a5962de70929493 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 29 Aug 2022 21:13:58 +0000 Subject: [PATCH 3/5] Remove internal references to `persistence` in waker.rs --- lightning/src/ln/channelmanager.rs | 2 +- lightning/src/util/wakers.rs | 33 +++++++++++++++--------------- 2 files changed, 17 insertions(+), 18 deletions(-) diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 3279050a079..911db0db0f7 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -5990,7 +5990,7 @@ where #[cfg(any(test, feature = "_test_utils"))] pub fn get_persistence_condvar_value(&self) -> bool { - self.persistence_notifier.needs_persist() + self.persistence_notifier.notify_pending() } /// Gets the latest best block which was connected either via the [`chain::Listen`] or diff --git a/lightning/src/util/wakers.rs b/lightning/src/util/wakers.rs index 1532133eeda..9aeb5370b73 100644 --- a/lightning/src/util/wakers.rs +++ b/lightning/src/util/wakers.rs @@ -20,24 +20,23 @@ use sync::{Condvar, Mutex}; #[cfg(any(test, feature = "std"))] use std::time::Instant; -/// Used to signal to the ChannelManager persister that the manager needs to be re-persisted to -/// disk/backups, through `await_persistable_update_timeout` and `await_persistable_update`. +/// Used to signal to one of many waiters that the condition they're waiting on has happened. pub(crate) struct Notifier { - /// Users won't access the persistence_lock directly, but rather wait on its bool using + /// Users won't access the lock directly, but rather wait on its bool using /// `wait_timeout` and `wait`. - persistence_lock: (Mutex, Condvar), + lock: (Mutex, Condvar), } impl Notifier { pub(crate) fn new() -> Self { Self { - persistence_lock: (Mutex::new(false), Condvar::new()), + lock: (Mutex::new(false), Condvar::new()), } } pub(crate) fn wait(&self) { loop { - let &(ref mtx, ref cvar) = &self.persistence_lock; + let &(ref mtx, ref cvar) = &self.lock; let mut guard = mtx.lock().unwrap(); if *guard { *guard = false; @@ -56,7 +55,7 @@ impl Notifier { pub(crate) fn wait_timeout(&self, max_wait: Duration) -> bool { let current_time = Instant::now(); loop { - let &(ref mtx, ref cvar) = &self.persistence_lock; + let &(ref mtx, ref cvar) = &self.lock; let mut guard = mtx.lock().unwrap(); if *guard { *guard = false; @@ -81,18 +80,18 @@ impl Notifier { } } - /// Wake waiters, tracking that persistence needs to occur. + /// Wake waiters, tracking that wake needs to occur even if there are currently no waiters. pub(crate) fn notify(&self) { - let &(ref persist_mtx, ref cnd) = &self.persistence_lock; - let mut persistence_lock = persist_mtx.lock().unwrap(); - *persistence_lock = true; - mem::drop(persistence_lock); + let &(ref persist_mtx, ref cnd) = &self.lock; + let mut lock = persist_mtx.lock().unwrap(); + *lock = true; + mem::drop(lock); cnd.notify_all(); } #[cfg(any(test, feature = "_test_utils"))] - pub fn needs_persist(&self) -> bool { - let &(ref mtx, _) = &self.persistence_lock; + pub fn notify_pending(&self) -> bool { + let &(ref mtx, _) = &self.lock; let guard = mtx.lock().unwrap(); *guard } @@ -115,9 +114,9 @@ mod tests { let exit_thread_clone = exit_thread.clone(); thread::spawn(move || { loop { - let &(ref persist_mtx, ref cnd) = &thread_notifier.persistence_lock; - let mut persistence_lock = persist_mtx.lock().unwrap(); - *persistence_lock = true; + let &(ref persist_mtx, ref cnd) = &thread_notifier.lock; + let mut lock = persist_mtx.lock().unwrap(); + *lock = true; cnd.notify_all(); if exit_thread_clone.load(Ordering::SeqCst) { From c6890cfc3317c0b1ba4d1e6cd71b3bb75386571e Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Tue, 9 Aug 2022 04:15:21 +0000 Subject: [PATCH 4/5] Add a `Future` which can receive manager persistence events This allows users who don't wish to block a full thread to receive persistence events. The `Future` added here is really just a trivial list of callbacks, but from that we can build a (somewhat ineffecient) std::future::Future implementation and can (at least once a mapping for Box is added) include the future in no-std bindings as well. Fixes #1595 --- lightning/src/ln/channelmanager.rs | 9 +- lightning/src/util/wakers.rs | 245 +++++++++++++++++++++++++---- 2 files changed, 222 insertions(+), 32 deletions(-) diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 911db0db0f7..467db740f0f 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -55,7 +55,7 @@ use util::config::{UserConfig, ChannelConfig}; use util::events::{EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider, ClosureReason, HTLCDestination}; use util::{byte_utils, events}; use util::crypto::sign; -use util::wakers::Notifier; +use util::wakers::{Future, Notifier}; use util::scid_utils::fake_scid; use util::ser::{BigSize, FixedLengthReader, Readable, ReadableArgs, MaybeReadable, Writeable, Writer, VecWriter}; use util::logger::{Level, Logger}; @@ -5988,6 +5988,13 @@ where self.persistence_notifier.wait() } + /// Gets a [`Future`] that completes when a persistable update is available. Note that + /// callbacks registered on the [`Future`] MUST NOT call back into this [`ChannelManager`] and + /// should instead register actions to be taken later. + pub fn get_persistable_update_future(&self) -> Future { + self.persistence_notifier.get_future() + } + #[cfg(any(test, feature = "_test_utils"))] pub fn get_persistence_condvar_value(&self) -> bool { self.persistence_notifier.notify_pending() diff --git a/lightning/src/util/wakers.rs b/lightning/src/util/wakers.rs index 9aeb5370b73..9636466aaa2 100644 --- a/lightning/src/util/wakers.rs +++ b/lightning/src/util/wakers.rs @@ -13,6 +13,7 @@ //! //! [`ChannelManager`]: crate::ln::channelmanager::ChannelManager +use alloc::sync::Arc; use core::mem; use core::time::Duration; use sync::{Condvar, Mutex}; @@ -20,32 +21,37 @@ use sync::{Condvar, Mutex}; #[cfg(any(test, feature = "std"))] use std::time::Instant; +use core::future::Future as StdFuture; +use core::task::{Context, Poll}; +use core::pin::Pin; + +use prelude::*; + /// Used to signal to one of many waiters that the condition they're waiting on has happened. pub(crate) struct Notifier { - /// Users won't access the lock directly, but rather wait on its bool using - /// `wait_timeout` and `wait`. - lock: (Mutex, Condvar), + notify_pending: Mutex<(bool, Option>>)>, + condvar: Condvar, } impl Notifier { pub(crate) fn new() -> Self { Self { - lock: (Mutex::new(false), Condvar::new()), + notify_pending: Mutex::new((false, None)), + condvar: Condvar::new(), } } pub(crate) fn wait(&self) { loop { - let &(ref mtx, ref cvar) = &self.lock; - let mut guard = mtx.lock().unwrap(); - if *guard { - *guard = false; + let mut guard = self.notify_pending.lock().unwrap(); + if guard.0 { + guard.0 = false; return; } - guard = cvar.wait(guard).unwrap(); - let result = *guard; + guard = self.condvar.wait(guard).unwrap(); + let result = guard.0; if result { - *guard = false; + guard.0 = false; return } } @@ -55,22 +61,21 @@ impl Notifier { pub(crate) fn wait_timeout(&self, max_wait: Duration) -> bool { let current_time = Instant::now(); loop { - let &(ref mtx, ref cvar) = &self.lock; - let mut guard = mtx.lock().unwrap(); - if *guard { - *guard = false; + let mut guard = self.notify_pending.lock().unwrap(); + if guard.0 { + guard.0 = false; return true; } - guard = cvar.wait_timeout(guard, max_wait).unwrap().0; + guard = self.condvar.wait_timeout(guard, max_wait).unwrap().0; // Due to spurious wakeups that can happen on `wait_timeout`, here we need to check if the // desired wait time has actually passed, and if not then restart the loop with a reduced wait // time. Note that this logic can be highly simplified through the use of // `Condvar::wait_while` and `Condvar::wait_timeout_while`, if and when our MSRV is raised to // 1.42.0. let elapsed = current_time.elapsed(); - let result = *guard; + let result = guard.0; if result || elapsed >= max_wait { - *guard = false; + guard.0 = false; return result; } match max_wait.checked_sub(elapsed) { @@ -82,29 +87,128 @@ impl Notifier { /// Wake waiters, tracking that wake needs to occur even if there are currently no waiters. pub(crate) fn notify(&self) { - let &(ref persist_mtx, ref cnd) = &self.lock; - let mut lock = persist_mtx.lock().unwrap(); - *lock = true; + let mut lock = self.notify_pending.lock().unwrap(); + lock.0 = true; + if let Some(future_state) = lock.1.take() { + future_state.lock().unwrap().complete(); + } mem::drop(lock); - cnd.notify_all(); + self.condvar.notify_all(); + } + + /// Gets a [`Future`] that will get woken up with any waiters + pub(crate) fn get_future(&self) -> Future { + let mut lock = self.notify_pending.lock().unwrap(); + if lock.0 { + Future { + state: Arc::new(Mutex::new(FutureState { + callbacks: Vec::new(), + complete: false, + })) + } + } else if let Some(existing_state) = &lock.1 { + Future { state: Arc::clone(&existing_state) } + } else { + let state = Arc::new(Mutex::new(FutureState { + callbacks: Vec::new(), + complete: false, + })); + lock.1 = Some(Arc::clone(&state)); + Future { state } + } } #[cfg(any(test, feature = "_test_utils"))] pub fn notify_pending(&self) -> bool { - let &(ref mtx, _) = &self.lock; - let guard = mtx.lock().unwrap(); - *guard + self.notify_pending.lock().unwrap().0 + } +} + +/// A callback which is called when a [`Future`] completes. +/// +/// Note that this MUST NOT call back into LDK directly, it must instead schedule actions to be +/// taken later. Rust users should use the [`std::future::Future`] implementation for [`Future`] +/// instead. +/// +/// Note that the [`std::future::Future`] implementation may only work for runtimes which schedule +/// futures when they receive a wake, rather than immediately executing them. +pub trait FutureCallback : Send { + /// The method which is called. + fn call(&self); +} + +impl FutureCallback for F { + fn call(&self) { (self)(); } +} + +pub(crate) struct FutureState { + callbacks: Vec>, + complete: bool, +} + +impl FutureState { + fn complete(&mut self) { + for callback in self.callbacks.drain(..) { + callback.call(); + } + self.complete = true; + } +} + +/// A simple future which can complete once, and calls some callback(s) when it does so. +pub struct Future { + state: Arc>, +} + +impl Future { + /// Registers a callback to be called upon completion of this future. If the future has already + /// completed, the callback will be called immediately. + pub fn register_callback(&self, callback: Box) { + let mut state = self.state.lock().unwrap(); + if state.complete { + mem::drop(state); + callback.call(); + } else { + state.callbacks.push(callback); + } + } +} + +mod std_future { + use core::task::Waker; + pub struct StdWaker(pub Waker); + impl super::FutureCallback for StdWaker { + fn call(&self) { self.0.wake_by_ref() } + } +} + +/// (C-not exported) as Rust Futures aren't usable in language bindings. +impl<'a> StdFuture for Future { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut state = self.state.lock().unwrap(); + if state.complete { + Poll::Ready(()) + } else { + let waker = cx.waker().clone(); + state.callbacks.push(Box::new(std_future::StdWaker(waker))); + Poll::Pending + } } } #[cfg(test)] mod tests { + use super::*; + use core::sync::atomic::{AtomicBool, Ordering}; + use core::future::Future as FutureTrait; + use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; + #[cfg(feature = "std")] #[test] fn test_wait_timeout() { - use super::*; use sync::Arc; - use core::sync::atomic::{AtomicBool, Ordering}; use std::thread; let persistence_notifier = Arc::new(Notifier::new()); @@ -114,10 +218,9 @@ mod tests { let exit_thread_clone = exit_thread.clone(); thread::spawn(move || { loop { - let &(ref persist_mtx, ref cnd) = &thread_notifier.lock; - let mut lock = persist_mtx.lock().unwrap(); - *lock = true; - cnd.notify_all(); + let mut lock = thread_notifier.notify_pending.lock().unwrap(); + lock.0 = true; + thread_notifier.condvar.notify_all(); if exit_thread_clone.load(Ordering::SeqCst) { break @@ -146,4 +249,84 @@ mod tests { } } } + + #[test] + fn test_future_callbacks() { + let future = Future { + state: Arc::new(Mutex::new(FutureState { + callbacks: Vec::new(), + complete: false, + })) + }; + let callback = Arc::new(AtomicBool::new(false)); + let callback_ref = Arc::clone(&callback); + future.register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst)))); + + assert!(!callback.load(Ordering::SeqCst)); + future.state.lock().unwrap().complete(); + assert!(callback.load(Ordering::SeqCst)); + future.state.lock().unwrap().complete(); + } + + #[test] + fn test_pre_completed_future_callbacks() { + let future = Future { + state: Arc::new(Mutex::new(FutureState { + callbacks: Vec::new(), + complete: false, + })) + }; + future.state.lock().unwrap().complete(); + + let callback = Arc::new(AtomicBool::new(false)); + let callback_ref = Arc::clone(&callback); + future.register_callback(Box::new(move || assert!(!callback_ref.fetch_or(true, Ordering::SeqCst)))); + + assert!(callback.load(Ordering::SeqCst)); + assert!(future.state.lock().unwrap().callbacks.is_empty()); + } + + // Rather annoyingly, there's no safe way in Rust std to construct a Waker despite it being + // totally possible to construct from a trait implementation (though somewhat less effecient + // compared to a raw VTable). Instead, we have to write out a lot of boilerplate to build a + // waker, which we do here with a trivial Arc data element to track woke-ness. + const WAKER_V_TABLE: RawWakerVTable = RawWakerVTable::new(waker_clone, wake, wake_by_ref, drop); + unsafe fn wake_by_ref(ptr: *const ()) { let p = ptr as *const Arc; assert!(!(*p).fetch_or(true, Ordering::SeqCst)); } + unsafe fn drop(ptr: *const ()) { let p = ptr as *mut Arc; Box::from_raw(p); } + unsafe fn wake(ptr: *const ()) { wake_by_ref(ptr); drop(ptr); } + unsafe fn waker_clone(ptr: *const ()) -> RawWaker { + let p = ptr as *const Arc; + RawWaker::new(Box::into_raw(Box::new(Arc::clone(&*p))) as *const (), &WAKER_V_TABLE) + } + + fn create_waker() -> (Arc, Waker) { + let a = Arc::new(AtomicBool::new(false)); + let waker = unsafe { Waker::from_raw(waker_clone((&a as *const Arc) as *const ())) }; + (a, waker) + } + + #[test] + fn test_future() { + let mut future = Future { + state: Arc::new(Mutex::new(FutureState { + callbacks: Vec::new(), + complete: false, + })) + }; + let mut second_future = Future { state: Arc::clone(&future.state) }; + + let (woken, waker) = create_waker(); + assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Pending); + assert!(!woken.load(Ordering::SeqCst)); + + let (second_woken, second_waker) = create_waker(); + assert_eq!(Pin::new(&mut second_future).poll(&mut Context::from_waker(&second_waker)), Poll::Pending); + assert!(!second_woken.load(Ordering::SeqCst)); + + future.state.lock().unwrap().complete(); + assert!(woken.load(Ordering::SeqCst)); + assert!(second_woken.load(Ordering::SeqCst)); + assert_eq!(Pin::new(&mut future).poll(&mut Context::from_waker(&waker)), Poll::Ready(())); + assert_eq!(Pin::new(&mut second_future).poll(&mut Context::from_waker(&second_waker)), Poll::Ready(())); + } } From 2a5bac22bf5bceca6e052d63a21681ed20f1479c Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Tue, 9 Aug 2022 06:01:10 +0000 Subject: [PATCH 5/5] Add a background processing function that is async. Adds a method which operates like BackgroundProcessor::start but instead of functioning by spawning a background thread it is async. --- lightning-background-processor/Cargo.toml | 1 + lightning-background-processor/src/lib.rs | 326 ++++++++++++++-------- lightning/src/util/wakers.rs | 2 + 3 files changed, 206 insertions(+), 123 deletions(-) diff --git a/lightning-background-processor/Cargo.toml b/lightning-background-processor/Cargo.toml index 2df83f2b75a..e51e2ac1b8e 100644 --- a/lightning-background-processor/Cargo.toml +++ b/lightning-background-processor/Cargo.toml @@ -17,6 +17,7 @@ rustdoc-args = ["--cfg", "docsrs"] bitcoin = "0.28.1" lightning = { version = "0.0.110", path = "../lightning", features = ["std"] } lightning-rapid-gossip-sync = { version = "0.0.110", path = "../lightning-rapid-gossip-sync" } +futures = { version = "0.3", optional = true } [dev-dependencies] lightning = { version = "0.0.110", path = "../lightning", features = ["_test_utils"] } diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 484439b3907..d38c30e584f 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -31,6 +31,9 @@ use std::thread::JoinHandle; use std::time::{Duration, Instant}; use std::ops::Deref; +#[cfg(feature = "futures")] +use futures::{select, future::FutureExt}; + /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its /// responsibilities are: @@ -219,6 +222,203 @@ where A::Target: chain::Access, L::Target: Logger { } } +macro_rules! define_run_body { + ($persister: ident, $event_handler: ident, $chain_monitor: ident, $channel_manager: ident, + $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident, + $loop_exit_check: expr, $await: expr) + => { { + let event_handler = DecoratingEventHandler { + event_handler: $event_handler, + gossip_sync: &$gossip_sync, + }; + + log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup"); + $channel_manager.timer_tick_occurred(); + + let mut last_freshness_call = Instant::now(); + let mut last_ping_call = Instant::now(); + let mut last_prune_call = Instant::now(); + let mut last_scorer_persist_call = Instant::now(); + let mut have_pruned = false; + + loop { + $channel_manager.process_pending_events(&event_handler); + $chain_monitor.process_pending_events(&event_handler); + + // Note that the PeerManager::process_events may block on ChannelManager's locks, + // hence it comes last here. When the ChannelManager finishes whatever it's doing, + // we want to ensure we get into `persist_manager` as quickly as we can, especially + // without running the normal event processing above and handing events to users. + // + // Specifically, on an *extremely* slow machine, we may see ChannelManager start + // processing a message effectively at any point during this loop. In order to + // minimize the time between such processing completing and persisting the updated + // ChannelManager, we want to minimize methods blocking on a ChannelManager + // generally, and as a fallback place such blocking only immediately before + // persistence. + $peer_manager.process_events(); + + // We wait up to 100ms, but track how long it takes to detect being put to sleep, + // see `await_start`'s use below. + let await_start = Instant::now(); + let updates_available = $await; + let await_time = await_start.elapsed(); + + if updates_available { + log_trace!($logger, "Persisting ChannelManager..."); + $persister.persist_manager(&*$channel_manager)?; + log_trace!($logger, "Done persisting ChannelManager."); + } + // Exit the loop if the background processor was requested to stop. + if $loop_exit_check { + log_trace!($logger, "Terminating background processor."); + break; + } + if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER { + log_trace!($logger, "Calling ChannelManager's timer_tick_occurred"); + $channel_manager.timer_tick_occurred(); + last_freshness_call = Instant::now(); + } + if await_time > Duration::from_secs(1) { + // On various platforms, we may be starved of CPU cycles for several reasons. + // E.g. on iOS, if we've been in the background, we will be entirely paused. + // Similarly, if we're on a desktop platform and the device has been asleep, we + // may not get any cycles. + // We detect this by checking if our max-100ms-sleep, above, ran longer than a + // full second, at which point we assume sockets may have been killed (they + // appear to be at least on some platforms, even if it has only been a second). + // Note that we have to take care to not get here just because user event + // processing was slow at the top of the loop. For example, the sample client + // may call Bitcoin Core RPCs during event handling, which very often takes + // more than a handful of seconds to complete, and shouldn't disconnect all our + // peers. + log_trace!($logger, "100ms sleep took more than a second, disconnecting peers."); + $peer_manager.disconnect_all_peers(); + last_ping_call = Instant::now(); + } else if last_ping_call.elapsed().as_secs() > PING_TIMER { + log_trace!($logger, "Calling PeerManager's timer_tick_occurred"); + $peer_manager.timer_tick_occurred(); + last_ping_call = Instant::now(); + } + + // Note that we want to run a graph prune once not long after startup before + // falling back to our usual hourly prunes. This avoids short-lived clients never + // pruning their network graph. We run once 60 seconds after startup before + // continuing our normal cadence. + if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } { + // The network graph must not be pruned while rapid sync completion is pending + log_trace!($logger, "Assessing prunability of network graph"); + if let Some(network_graph) = $gossip_sync.prunable_network_graph() { + network_graph.remove_stale_channels(); + + if let Err(e) = $persister.persist_graph(network_graph) { + log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e) + } + + last_prune_call = Instant::now(); + have_pruned = true; + } else { + log_trace!($logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph."); + } + } + + if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER { + if let Some(ref scorer) = $scorer { + log_trace!($logger, "Persisting scorer"); + if let Err(e) = $persister.persist_scorer(&scorer) { + log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e) + } + } + last_scorer_persist_call = Instant::now(); + } + } + + // After we exit, ensure we persist the ChannelManager one final time - this avoids + // some races where users quit while channel updates were in-flight, with + // ChannelMonitor update(s) persisted without a corresponding ChannelManager update. + $persister.persist_manager(&*$channel_manager)?; + + // Persist Scorer on exit + if let Some(ref scorer) = $scorer { + $persister.persist_scorer(&scorer)?; + } + + // Persist NetworkGraph on exit + if let Some(network_graph) = $gossip_sync.network_graph() { + $persister.persist_graph(network_graph)?; + } + + Ok(()) + } } +} + +/// Processes background events in a future. +/// +/// `sleeper` should return a future which completes in the given amount of time and returns a +/// boolean indicating whether the background processing should continue. Once `sleeper` returns a +/// future which outputs false, the loop will exit and this function's future will complete. +/// +/// See [`BackgroundProcessor::start`] for information on which actions this handles. +#[cfg(feature = "futures")] +pub async fn process_events_async< + 'a, + Signer: 'static + Sign, + CA: 'static + Deref + Send + Sync, + CF: 'static + Deref + Send + Sync, + CW: 'static + Deref + Send + Sync, + T: 'static + Deref + Send + Sync, + K: 'static + Deref + Send + Sync, + F: 'static + Deref + Send + Sync, + G: 'static + Deref> + Send + Sync, + L: 'static + Deref + Send + Sync, + P: 'static + Deref + Send + Sync, + Descriptor: 'static + SocketDescriptor + Send + Sync, + CMH: 'static + Deref + Send + Sync, + RMH: 'static + Deref + Send + Sync, + EH: 'static + EventHandler + Send, + PS: 'static + Deref + Send, + M: 'static + Deref> + Send + Sync, + CM: 'static + Deref> + Send + Sync, + PGS: 'static + Deref> + Send + Sync, + RGS: 'static + Deref> + Send, + UMH: 'static + Deref + Send + Sync, + PM: 'static + Deref> + Send + Sync, + S: 'static + Deref + Send + Sync, + SC: WriteableScore<'a>, + SleepFuture: core::future::Future, + Sleeper: Fn(Duration) -> SleepFuture +>( + persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM, + gossip_sync: GossipSync, peer_manager: PM, logger: L, scorer: Option, + sleeper: Sleeper, +) -> Result<(), std::io::Error> +where + CA::Target: 'static + chain::Access, + CF::Target: 'static + chain::Filter, + CW::Target: 'static + chain::Watch, + T::Target: 'static + BroadcasterInterface, + K::Target: 'static + KeysInterface, + F::Target: 'static + FeeEstimator, + L::Target: 'static + Logger, + P::Target: 'static + Persist, + CMH::Target: 'static + ChannelMessageHandler, + RMH::Target: 'static + RoutingMessageHandler, + UMH::Target: 'static + CustomMessageHandler, + PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>, +{ + let mut should_continue = true; + define_run_body!(persister, event_handler, chain_monitor, channel_manager, + gossip_sync, peer_manager, logger, scorer, should_continue, { + select! { + _ = channel_manager.get_persistable_update_future().fuse() => true, + cont = sleeper(Duration::from_millis(100)).fuse() => { + should_continue = cont; + false + } + } + }) +} + impl BackgroundProcessor { /// Start a background thread that takes care of responsibilities enumerated in the [top-level /// documentation]. @@ -310,129 +510,9 @@ impl BackgroundProcessor { let stop_thread = Arc::new(AtomicBool::new(false)); let stop_thread_clone = stop_thread.clone(); let handle = thread::spawn(move || -> Result<(), std::io::Error> { - let event_handler = DecoratingEventHandler { - event_handler, - gossip_sync: &gossip_sync, - }; - - log_trace!(logger, "Calling ChannelManager's timer_tick_occurred on startup"); - channel_manager.timer_tick_occurred(); - - let mut last_freshness_call = Instant::now(); - let mut last_ping_call = Instant::now(); - let mut last_prune_call = Instant::now(); - let mut last_scorer_persist_call = Instant::now(); - let mut have_pruned = false; - - loop { - channel_manager.process_pending_events(&event_handler); - chain_monitor.process_pending_events(&event_handler); - - // Note that the PeerManager::process_events may block on ChannelManager's locks, - // hence it comes last here. When the ChannelManager finishes whatever it's doing, - // we want to ensure we get into `persist_manager` as quickly as we can, especially - // without running the normal event processing above and handing events to users. - // - // Specifically, on an *extremely* slow machine, we may see ChannelManager start - // processing a message effectively at any point during this loop. In order to - // minimize the time between such processing completing and persisting the updated - // ChannelManager, we want to minimize methods blocking on a ChannelManager - // generally, and as a fallback place such blocking only immediately before - // persistence. - peer_manager.process_events(); - - // We wait up to 100ms, but track how long it takes to detect being put to sleep, - // see `await_start`'s use below. - let await_start = Instant::now(); - let updates_available = - channel_manager.await_persistable_update_timeout(Duration::from_millis(100)); - let await_time = await_start.elapsed(); - - if updates_available { - log_trace!(logger, "Persisting ChannelManager..."); - persister.persist_manager(&*channel_manager)?; - log_trace!(logger, "Done persisting ChannelManager."); - } - // Exit the loop if the background processor was requested to stop. - if stop_thread.load(Ordering::Acquire) == true { - log_trace!(logger, "Terminating background processor."); - break; - } - if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER { - log_trace!(logger, "Calling ChannelManager's timer_tick_occurred"); - channel_manager.timer_tick_occurred(); - last_freshness_call = Instant::now(); - } - if await_time > Duration::from_secs(1) { - // On various platforms, we may be starved of CPU cycles for several reasons. - // E.g. on iOS, if we've been in the background, we will be entirely paused. - // Similarly, if we're on a desktop platform and the device has been asleep, we - // may not get any cycles. - // We detect this by checking if our max-100ms-sleep, above, ran longer than a - // full second, at which point we assume sockets may have been killed (they - // appear to be at least on some platforms, even if it has only been a second). - // Note that we have to take care to not get here just because user event - // processing was slow at the top of the loop. For example, the sample client - // may call Bitcoin Core RPCs during event handling, which very often takes - // more than a handful of seconds to complete, and shouldn't disconnect all our - // peers. - log_trace!(logger, "100ms sleep took more than a second, disconnecting peers."); - peer_manager.disconnect_all_peers(); - last_ping_call = Instant::now(); - } else if last_ping_call.elapsed().as_secs() > PING_TIMER { - log_trace!(logger, "Calling PeerManager's timer_tick_occurred"); - peer_manager.timer_tick_occurred(); - last_ping_call = Instant::now(); - } - - // Note that we want to run a graph prune once not long after startup before - // falling back to our usual hourly prunes. This avoids short-lived clients never - // pruning their network graph. We run once 60 seconds after startup before - // continuing our normal cadence. - if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } { - // The network graph must not be pruned while rapid sync completion is pending - log_trace!(logger, "Assessing prunability of network graph"); - if let Some(network_graph) = gossip_sync.prunable_network_graph() { - network_graph.remove_stale_channels(); - - if let Err(e) = persister.persist_graph(network_graph) { - log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}", e) - } - - last_prune_call = Instant::now(); - have_pruned = true; - } else { - log_trace!(logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph."); - } - } - - if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER { - if let Some(ref scorer) = scorer { - log_trace!(logger, "Persisting scorer"); - if let Err(e) = persister.persist_scorer(&scorer) { - log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e) - } - } - last_scorer_persist_call = Instant::now(); - } - } - - // After we exit, ensure we persist the ChannelManager one final time - this avoids - // some races where users quit while channel updates were in-flight, with - // ChannelMonitor update(s) persisted without a corresponding ChannelManager update. - persister.persist_manager(&*channel_manager)?; - - // Persist Scorer on exit - if let Some(ref scorer) = scorer { - persister.persist_scorer(&scorer)?; - } - - // Persist NetworkGraph on exit - if let Some(network_graph) = gossip_sync.network_graph() { - persister.persist_graph(network_graph)?; - } - - Ok(()) + define_run_body!(persister, event_handler, chain_monitor, channel_manager, + gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire), + channel_manager.await_persistable_update_timeout(Duration::from_millis(100))) }); Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) } } diff --git a/lightning/src/util/wakers.rs b/lightning/src/util/wakers.rs index 9636466aaa2..b81dacbd0b3 100644 --- a/lightning/src/util/wakers.rs +++ b/lightning/src/util/wakers.rs @@ -18,6 +18,8 @@ use core::mem; use core::time::Duration; use sync::{Condvar, Mutex}; +use prelude::{Box, Vec}; + #[cfg(any(test, feature = "std"))] use std::time::Instant;