Skip to content

Add Tokio example to process_events_async docs #2004

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 83 additions & 1 deletion lightning-background-processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,8 @@ use core::task;
///
/// `sleeper` should return a future which completes in the given amount of time and returns a
/// boolean indicating whether the background processing should exit. Once `sleeper` returns a
/// future which outputs true, the loop will exit and this function's future will complete.
/// future which outputs `true`, the loop will exit and this function's future will complete.
/// The `sleeper` future is free to return early after it has triggered the exit condition.
///
/// See [`BackgroundProcessor::start`] for information on which actions this handles.
///
Expand All @@ -479,6 +480,87 @@ use core::task;
/// mobile device, where we may need to check for interruption of the application regularly. If you
/// are unsure, you should set the flag, as the performance impact of it is minimal unless there
/// are hundreds or thousands of simultaneous process calls running.
///
/// For example, in order to process background events in a [Tokio](https://tokio.rs/) task, you
/// could setup `process_events_async` like this:
/// ```
/// # struct MyPersister {}
/// # impl lightning::util::persist::KVStorePersister for MyPersister {
/// # fn persist<W: lightning::util::ser::Writeable>(&self, key: &str, object: &W) -> lightning::io::Result<()> { Ok(()) }
/// # }
/// # struct MyEventHandler {}
/// # impl MyEventHandler {
/// # async fn handle_event(&self, _: lightning::events::Event) {}
/// # }
/// # #[derive(Eq, PartialEq, Clone, Hash)]
/// # struct MySocketDescriptor {}
/// # impl lightning::ln::peer_handler::SocketDescriptor for MySocketDescriptor {
/// # fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize { 0 }
/// # fn disconnect_socket(&mut self) {}
/// # }
/// # use std::sync::{Arc, Mutex};
/// # use std::sync::atomic::{AtomicBool, Ordering};
/// # use lightning_background_processor::{process_events_async, GossipSync};
/// # type MyBroadcaster = dyn lightning::chain::chaininterface::BroadcasterInterface + Send + Sync;
/// # type MyFeeEstimator = dyn lightning::chain::chaininterface::FeeEstimator + Send + Sync;
/// # type MyNodeSigner = dyn lightning::chain::keysinterface::NodeSigner + Send + Sync;
/// # type MyUtxoLookup = dyn lightning::routing::utxo::UtxoLookup + Send + Sync;
/// # type MyFilter = dyn lightning::chain::Filter + Send + Sync;
/// # type MyLogger = dyn lightning::util::logger::Logger + Send + Sync;
/// # type MyChainMonitor = lightning::chain::chainmonitor::ChainMonitor<lightning::chain::keysinterface::InMemorySigner, Arc<MyFilter>, Arc<MyBroadcaster>, Arc<MyFeeEstimator>, Arc<MyLogger>, Arc<MyPersister>>;
/// # type MyPeerManager = lightning::ln::peer_handler::SimpleArcPeerManager<MySocketDescriptor, MyChainMonitor, MyBroadcaster, MyFeeEstimator, MyUtxoLookup, MyLogger>;
/// # type MyNetworkGraph = lightning::routing::gossip::NetworkGraph<Arc<MyLogger>>;
/// # type MyGossipSync = lightning::routing::gossip::P2PGossipSync<Arc<MyNetworkGraph>, Arc<MyUtxoLookup>, Arc<MyLogger>>;
/// # type MyChannelManager = lightning::ln::channelmanager::SimpleArcChannelManager<MyChainMonitor, MyBroadcaster, MyFeeEstimator, MyLogger>;
/// # type MyScorer = Mutex<lightning::routing::scoring::ProbabilisticScorer<Arc<MyNetworkGraph>, Arc<MyLogger>>>;
///
/// # async fn setup_background_processing(my_persister: Arc<MyPersister>, my_event_handler: Arc<MyEventHandler>, my_chain_monitor: Arc<MyChainMonitor>, my_channel_manager: Arc<MyChannelManager>, my_gossip_sync: Arc<MyGossipSync>, my_logger: Arc<MyLogger>, my_scorer: Arc<MyScorer>, my_peer_manager: Arc<MyPeerManager>) {
/// let background_persister = Arc::clone(&my_persister);
/// let background_event_handler = Arc::clone(&my_event_handler);
/// let background_chain_mon = Arc::clone(&my_chain_monitor);
/// let background_chan_man = Arc::clone(&my_channel_manager);
/// let background_gossip_sync = GossipSync::p2p(Arc::clone(&my_gossip_sync));
/// let background_peer_man = Arc::clone(&my_peer_manager);
/// let background_logger = Arc::clone(&my_logger);
/// let background_scorer = Arc::clone(&my_scorer);
///
/// // Setup the sleeper.
/// let (stop_sender, stop_receiver) = tokio::sync::watch::channel(());
///
/// let sleeper = move |d| {
/// let mut receiver = stop_receiver.clone();
/// Box::pin(async move {
/// tokio::select!{
/// _ = tokio::time::sleep(d) => false,
/// _ = receiver.changed() => true,
/// }
/// })
/// };
///
/// let mobile_interruptable_platform = false;
///
/// let handle = tokio::spawn(async move {
/// process_events_async(
/// background_persister,
/// |e| background_event_handler.handle_event(e),
/// background_chain_mon,
/// background_chan_man,
/// background_gossip_sync,
/// background_peer_man,
/// background_logger,
/// Some(background_scorer),
/// sleeper,
/// mobile_interruptable_platform,
/// )
/// .await
/// .expect("Failed to process events");
/// });
///
/// // Stop the background processing.
/// stop_sender.send(()).unwrap();
/// handle.await.unwrap();
/// # }
///```
#[cfg(feature = "futures")]
pub async fn process_events_async<
'a,
Expand Down