diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 940d1b029e7..314de746508 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -26,6 +26,8 @@ use lightning::chain::chainmonitor::{ChainMonitor, Persist}; use lightning::events::EventHandler; #[cfg(feature = "std")] use lightning::events::EventsProvider; +#[cfg(feature = "futures")] +use lightning::events::ReplayEvent; use lightning::events::{Event, PathFailure}; use lightning::ln::channelmanager::AChannelManager; @@ -583,6 +585,7 @@ use futures_util::{dummy_waker, Selector, SelectorOutput}; /// could setup `process_events_async` like this: /// ``` /// # use lightning::io; +/// # use lightning::events::ReplayEvent; /// # use std::sync::{Arc, RwLock}; /// # use std::sync::atomic::{AtomicBool, Ordering}; /// # use std::time::SystemTime; @@ -600,7 +603,7 @@ use futures_util::{dummy_waker, Selector, SelectorOutput}; /// # } /// # struct EventHandler {} /// # impl EventHandler { -/// # async fn handle_event(&self, _: lightning::events::Event) {} +/// # async fn handle_event(&self, _: lightning::events::Event) -> Result<(), ReplayEvent> { Ok(()) } /// # } /// # #[derive(Eq, PartialEq, Clone, Hash)] /// # struct SocketDescriptor {} @@ -698,7 +701,7 @@ pub async fn process_events_async< G: 'static + Deref> + Send + Sync, L: 'static + Deref + Send + Sync, P: 'static + Deref + Send + Sync, - EventHandlerFuture: core::future::Future, + EventHandlerFuture: core::future::Future>, EventHandler: Fn(Event) -> EventHandlerFuture, PS: 'static + Deref + Send, M: 'static @@ -751,12 +754,16 @@ where if update_scorer(scorer, &event, duration_since_epoch) { log_trace!(logger, "Persisting scorer after update"); if let Err(e) = persister.persist_scorer(&scorer) { - log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e) + log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e); + // We opt not to abort early on persistence failure here as persisting + // the scorer is non-critical and we still hope that it will have + // resolved itself when it is potentially critical in event handling + // below. } } } } - event_handler(event).await; + event_handler(event).await }) }; define_run_body!( @@ -913,7 +920,7 @@ impl BackgroundProcessor { } } } - event_handler.handle_event(event); + event_handler.handle_event(event) }; define_run_body!( persister, @@ -1012,10 +1019,13 @@ mod tests { use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey}; use bitcoin::transaction::Version; use bitcoin::{Amount, ScriptBuf, Txid}; + use core::sync::atomic::{AtomicBool, Ordering}; use lightning::chain::channelmonitor::ANTI_REORG_DELAY; use lightning::chain::transaction::OutPoint; use lightning::chain::{chainmonitor, BestBlock, Confirm, Filter}; - use lightning::events::{Event, MessageSendEvent, MessageSendEventsProvider, PathFailure}; + use lightning::events::{ + Event, MessageSendEvent, MessageSendEventsProvider, PathFailure, ReplayEvent, + }; use lightning::ln::channelmanager; use lightning::ln::channelmanager::{ ChainParameters, PaymentId, BREAKDOWN_TIMEOUT, MIN_CLTV_EXPIRY_DELTA, @@ -1757,7 +1767,7 @@ mod tests { // Initiate the background processors to watch each node. let data_dir = nodes[0].kv_store.get_data_dir(); let persister = Arc::new(Persister::new(data_dir)); - let event_handler = |_: _| {}; + let event_handler = |_: _| Ok(()); let bg_processor = BackgroundProcessor::start( persister, event_handler, @@ -1847,7 +1857,7 @@ mod tests { let (_, nodes) = create_nodes(1, "test_timer_tick_called"); let data_dir = nodes[0].kv_store.get_data_dir(); let persister = Arc::new(Persister::new(data_dir)); - let event_handler = |_: _| {}; + let event_handler = |_: _| Ok(()); let bg_processor = BackgroundProcessor::start( persister, event_handler, @@ -1889,7 +1899,7 @@ mod tests { let persister = Arc::new( Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"), ); - let event_handler = |_: _| {}; + let event_handler = |_: _| Ok(()); let bg_processor = BackgroundProcessor::start( persister, event_handler, @@ -1924,7 +1934,7 @@ mod tests { let bp_future = super::process_events_async( persister, - |_: _| async {}, + |_: _| async { Ok(()) }, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), @@ -1957,7 +1967,7 @@ mod tests { let data_dir = nodes[0].kv_store.get_data_dir(); let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test")); - let event_handler = |_: _| {}; + let event_handler = |_: _| Ok(()); let bg_processor = BackgroundProcessor::start( persister, event_handler, @@ -1986,7 +1996,7 @@ mod tests { let data_dir = nodes[0].kv_store.get_data_dir(); let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test")); - let event_handler = |_: _| {}; + let event_handler = |_: _| Ok(()); let bg_processor = BackgroundProcessor::start( persister, event_handler, @@ -2021,13 +2031,16 @@ mod tests { // Set up a background event handler for FundingGenerationReady events. let (funding_generation_send, funding_generation_recv) = std::sync::mpsc::sync_channel(1); let (channel_pending_send, channel_pending_recv) = std::sync::mpsc::sync_channel(1); - let event_handler = move |event: Event| match event { - Event::FundingGenerationReady { .. } => funding_generation_send - .send(handle_funding_generation_ready!(event, channel_value)) - .unwrap(), - Event::ChannelPending { .. } => channel_pending_send.send(()).unwrap(), - Event::ChannelReady { .. } => {}, - _ => panic!("Unexpected event: {:?}", event), + let event_handler = move |event: Event| { + match event { + Event::FundingGenerationReady { .. } => funding_generation_send + .send(handle_funding_generation_ready!(event, channel_value)) + .unwrap(), + Event::ChannelPending { .. } => channel_pending_send.send(()).unwrap(), + Event::ChannelReady { .. } => {}, + _ => panic!("Unexpected event: {:?}", event), + } + Ok(()) }; let bg_processor = BackgroundProcessor::start( @@ -2082,11 +2095,14 @@ mod tests { // Set up a background event handler for SpendableOutputs events. let (sender, receiver) = std::sync::mpsc::sync_channel(1); - let event_handler = move |event: Event| match event { - Event::SpendableOutputs { .. } => sender.send(event).unwrap(), - Event::ChannelReady { .. } => {}, - Event::ChannelClosed { .. } => {}, - _ => panic!("Unexpected event: {:?}", event), + let event_handler = move |event: Event| { + match event { + Event::SpendableOutputs { .. } => sender.send(event).unwrap(), + Event::ChannelReady { .. } => {}, + Event::ChannelClosed { .. } => {}, + _ => panic!("Unexpected event: {:?}", event), + } + Ok(()) }; let persister = Arc::new(Persister::new(data_dir)); let bg_processor = BackgroundProcessor::start( @@ -2215,12 +2231,60 @@ mod tests { } } + #[test] + fn test_event_handling_failures_are_replayed() { + let (_, nodes) = create_nodes(2, "test_event_handling_failures_are_replayed"); + let channel_value = 100000; + let data_dir = nodes[0].kv_store.get_data_dir(); + let persister = Arc::new(Persister::new(data_dir.clone())); + + let (first_event_send, first_event_recv) = std::sync::mpsc::sync_channel(1); + let (second_event_send, second_event_recv) = std::sync::mpsc::sync_channel(1); + let should_fail_event_handling = Arc::new(AtomicBool::new(true)); + let event_handler = move |event: Event| { + if let Ok(true) = should_fail_event_handling.compare_exchange( + true, + false, + Ordering::Acquire, + Ordering::Relaxed, + ) { + first_event_send.send(event).unwrap(); + return Err(ReplayEvent()); + } + + second_event_send.send(event).unwrap(); + Ok(()) + }; + + let bg_processor = BackgroundProcessor::start( + persister, + event_handler, + nodes[0].chain_monitor.clone(), + nodes[0].node.clone(), + Some(nodes[0].messenger.clone()), + nodes[0].no_gossip_sync(), + nodes[0].peer_manager.clone(), + nodes[0].logger.clone(), + Some(nodes[0].scorer.clone()), + ); + + begin_open_channel!(nodes[0], nodes[1], channel_value); + assert_eq!( + first_event_recv.recv_timeout(Duration::from_secs(EVENT_DEADLINE)), + second_event_recv.recv_timeout(Duration::from_secs(EVENT_DEADLINE)) + ); + + if !std::thread::panicking() { + bg_processor.stop().unwrap(); + } + } + #[test] fn test_scorer_persistence() { let (_, nodes) = create_nodes(2, "test_scorer_persistence"); let data_dir = nodes[0].kv_store.get_data_dir(); let persister = Arc::new(Persister::new(data_dir)); - let event_handler = |_: _| {}; + let event_handler = |_: _| Ok(()); let bg_processor = BackgroundProcessor::start( persister, event_handler, @@ -2315,7 +2379,7 @@ mod tests { let data_dir = nodes[0].kv_store.get_data_dir(); let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender)); - let event_handler = |_: _| {}; + let event_handler = |_: _| Ok(()); let background_processor = BackgroundProcessor::start( persister, event_handler, @@ -2350,7 +2414,7 @@ mod tests { let (exit_sender, exit_receiver) = tokio::sync::watch::channel(()); let bp_future = super::process_events_async( persister, - |_: _| async {}, + |_: _| async { Ok(()) }, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), @@ -2492,12 +2556,15 @@ mod tests { #[test] fn test_payment_path_scoring() { let (sender, receiver) = std::sync::mpsc::sync_channel(1); - let event_handler = move |event: Event| match event { - Event::PaymentPathFailed { .. } => sender.send(event).unwrap(), - Event::PaymentPathSuccessful { .. } => sender.send(event).unwrap(), - Event::ProbeSuccessful { .. } => sender.send(event).unwrap(), - Event::ProbeFailed { .. } => sender.send(event).unwrap(), - _ => panic!("Unexpected event: {:?}", event), + let event_handler = move |event: Event| { + match event { + Event::PaymentPathFailed { .. } => sender.send(event).unwrap(), + Event::PaymentPathSuccessful { .. } => sender.send(event).unwrap(), + Event::ProbeSuccessful { .. } => sender.send(event).unwrap(), + Event::ProbeFailed { .. } => sender.send(event).unwrap(), + _ => panic!("Unexpected event: {:?}", event), + } + Ok(()) }; let (_, nodes) = create_nodes(1, "test_payment_path_scoring"); @@ -2543,6 +2610,7 @@ mod tests { Event::ProbeFailed { .. } => sender_ref.send(event).await.unwrap(), _ => panic!("Unexpected event: {:?}", event), } + Ok(()) } }; diff --git a/lightning-invoice/src/utils.rs b/lightning-invoice/src/utils.rs index 00b49c371ea..fa301a8dc06 100644 --- a/lightning-invoice/src/utils.rs +++ b/lightning-invoice/src/utils.rs @@ -1391,6 +1391,7 @@ mod test { } else { other_events.borrow_mut().push(event); } + Ok(()) }; nodes[fwd_idx].node.process_pending_events(&forward_event_handler); nodes[fwd_idx].node.process_pending_events(&forward_event_handler); diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index e6bb9d90778..93e1dae6ce3 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -33,8 +33,7 @@ use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance use crate::chain::transaction::{OutPoint, TransactionData}; use crate::ln::types::ChannelId; use crate::sign::ecdsa::EcdsaChannelSigner; -use crate::events; -use crate::events::{Event, EventHandler}; +use crate::events::{self, Event, EventHandler, ReplayEvent}; use crate::util::logger::{Logger, WithContext}; use crate::util::errors::APIError; use crate::util::wakers::{Future, Notifier}; @@ -533,7 +532,7 @@ where C::Target: chain::Filter, pub fn get_and_clear_pending_events(&self) -> Vec { use crate::events::EventsProvider; let events = core::cell::RefCell::new(Vec::new()); - let event_handler = |event: events::Event| events.borrow_mut().push(event); + let event_handler = |event: events::Event| Ok(events.borrow_mut().push(event)); self.process_pending_events(&event_handler); events.into_inner() } @@ -544,7 +543,7 @@ where C::Target: chain::Filter, /// See the trait-level documentation of [`EventsProvider`] for requirements. /// /// [`EventsProvider`]: crate::events::EventsProvider - pub async fn process_pending_events_async Future>( + pub async fn process_pending_events_async>, H: Fn(Event) -> Future>( &self, handler: H ) { // Sadly we can't hold the monitors read lock through an async call. Thus we have to do a @@ -552,8 +551,13 @@ where C::Target: chain::Filter, let mons_to_process = self.monitors.read().unwrap().keys().cloned().collect::>(); for funding_txo in mons_to_process { let mut ev; - super::channelmonitor::process_events_body!( - self.monitors.read().unwrap().get(&funding_txo).map(|m| &m.monitor), ev, handler(ev).await); + match super::channelmonitor::process_events_body!( + self.monitors.read().unwrap().get(&funding_txo).map(|m| &m.monitor), ev, handler(ev).await) { + Ok(()) => {}, + Err(ReplayEvent ()) => { + self.event_notifier.notify(); + } + } } } @@ -880,7 +884,12 @@ impl(&self, handler: H) where H::Target: EventHandler { for monitor_state in self.monitors.read().unwrap().values() { - monitor_state.monitor.process_pending_events(&handler); + match monitor_state.monitor.process_pending_events(&handler) { + Ok(()) => {}, + Err(ReplayEvent ()) => { + self.event_notifier.notify(); + } + } } } } diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index 13f2ff044a2..5ecea825100 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -51,7 +51,7 @@ use crate::chain::Filter; use crate::util::logger::{Logger, Record}; use crate::util::ser::{Readable, ReadableArgs, RequiredWrapper, MaybeReadable, UpgradableRequired, Writer, Writeable, U48}; use crate::util::byte_utils; -use crate::events::{ClosureReason, Event, EventHandler}; +use crate::events::{ClosureReason, Event, EventHandler, ReplayEvent}; use crate::events::bump_transaction::{AnchorDescriptor, BumpTransactionEvent}; #[allow(unused_imports)] @@ -1159,34 +1159,53 @@ impl Writeable for ChannelMonitorImpl { macro_rules! _process_events_body { ($self_opt: expr, $event_to_handle: expr, $handle_event: expr) => { loop { + let mut handling_res = Ok(()); let (pending_events, repeated_events); if let Some(us) = $self_opt { let mut inner = us.inner.lock().unwrap(); if inner.is_processing_pending_events { - break; + break handling_res; } inner.is_processing_pending_events = true; pending_events = inner.pending_events.clone(); repeated_events = inner.get_repeated_events(); - } else { break; } - let num_events = pending_events.len(); + } else { break handling_res; } - for event in pending_events.into_iter().chain(repeated_events.into_iter()) { + let mut num_handled_events = 0; + for event in pending_events { $event_to_handle = event; - $handle_event; + match $handle_event { + Ok(()) => num_handled_events += 1, + Err(e) => { + // If we encounter an error we stop handling events and make sure to replay + // any unhandled events on the next invocation. + handling_res = Err(e); + break; + } + } + } + + if handling_res.is_ok() { + for event in repeated_events { + // For repeated events we ignore any errors as they will be replayed eventually + // anyways. + $event_to_handle = event; + let _ = $handle_event; + } } if let Some(us) = $self_opt { let mut inner = us.inner.lock().unwrap(); - inner.pending_events.drain(..num_events); + inner.pending_events.drain(..num_handled_events); inner.is_processing_pending_events = false; - if !inner.pending_events.is_empty() { - // If there's more events to process, go ahead and do so. + if handling_res.is_ok() && !inner.pending_events.is_empty() { + // If there's more events to process and we didn't fail so far, go ahead and do + // so. continue; } } - break; + break handling_res; } } } @@ -1498,21 +1517,23 @@ impl ChannelMonitor { /// An [`EventHandler`] may safely call back to the provider, though this shouldn't be needed in /// order to handle these events. /// + /// Will return a [`ReplayEvent`] error if event handling failed and should eventually be retried. + /// /// [`SpendableOutputs`]: crate::events::Event::SpendableOutputs /// [`BumpTransaction`]: crate::events::Event::BumpTransaction - pub fn process_pending_events(&self, handler: &H) where H::Target: EventHandler { + pub fn process_pending_events(&self, handler: &H) -> Result<(), ReplayEvent> where H::Target: EventHandler { let mut ev; - process_events_body!(Some(self), ev, handler.handle_event(ev)); + process_events_body!(Some(self), ev, handler.handle_event(ev)) } /// Processes any events asynchronously. /// /// See [`Self::process_pending_events`] for more information. - pub async fn process_pending_events_async Future>( + pub async fn process_pending_events_async>, H: Fn(Event) -> Future>( &self, handler: &H - ) { + ) -> Result<(), ReplayEvent> { let mut ev; - process_events_body!(Some(self), ev, { handler(ev).await }); + process_events_body!(Some(self), ev, { handler(ev).await }) } #[cfg(test)] diff --git a/lightning/src/events/mod.rs b/lightning/src/events/mod.rs index acee931138f..0e24e8e82f7 100644 --- a/lightning/src/events/mod.rs +++ b/lightning/src/events/mod.rs @@ -551,6 +551,10 @@ pub enum Event { /// Note that *all inputs* in the funding transaction must spend SegWit outputs or your /// counterparty can steal your funds! /// + /// # Failure Behavior and Persistence + /// This event will eventually be replayed after failures-to-handle (i.e., the event handler + /// returning `Err(ReplayEvent ())`), but won't be persisted across restarts. + /// /// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager /// [`ChannelManager::funding_transaction_generated`]: crate::ln::channelmanager::ChannelManager::funding_transaction_generated FundingGenerationReady { @@ -608,6 +612,10 @@ pub enum Event { /// # Note /// This event used to be called `PaymentReceived` in LDK versions 0.0.112 and earlier. /// + /// # Failure Behavior and Persistence + /// This event will eventually be replayed after failures-to-handle (i.e., the event handler + /// returning `Err(ReplayEvent ())`) and will be persisted across restarts. + /// /// [`ChannelManager::claim_funds`]: crate::ln::channelmanager::ChannelManager::claim_funds /// [`ChannelManager::claim_funds_with_known_custom_tlvs`]: crate::ln::channelmanager::ChannelManager::claim_funds_with_known_custom_tlvs /// [`FailureCode::InvalidOnionPayload`]: crate::ln::channelmanager::FailureCode::InvalidOnionPayload @@ -677,6 +685,10 @@ pub enum Event { /// [`ChannelManager::claim_funds`] twice for the same [`Event::PaymentClaimable`] you may get /// multiple `PaymentClaimed` events. /// + /// # Failure Behavior and Persistence + /// This event will eventually be replayed after failures-to-handle (i.e., the event handler + /// returning `Err(ReplayEvent ())`) and will be persisted across restarts. + /// /// [`ChannelManager::claim_funds`]: crate::ln::channelmanager::ChannelManager::claim_funds PaymentClaimed { /// The node that received the payment. @@ -716,6 +728,10 @@ pub enum Event { /// This event will not be generated for onion message forwards; only for sends including /// replies. Handlers should connect to the node otherwise any buffered messages may be lost. /// + /// # Failure Behavior and Persistence + /// This event will eventually be replayed after failures-to-handle (i.e., the event handler + /// returning `Err(ReplayEvent ())`), but won't be persisted across restarts. + /// /// [`OnionMessage`]: msgs::OnionMessage /// [`MessageRouter`]: crate::onion_message::messenger::MessageRouter /// [`Destination`]: crate::onion_message::messenger::Destination @@ -730,6 +746,10 @@ pub enum Event { /// or was explicitly abandoned by [`ChannelManager::abandon_payment`]. This may be for an /// [`InvoiceRequest`] sent for an [`Offer`] or for a [`Refund`] that hasn't been redeemed. /// + /// # Failure Behavior and Persistence + /// This event will eventually be replayed after failures-to-handle (i.e., the event handler + /// returning `Err(ReplayEvent ())`) and will be persisted across restarts. + /// /// [`ChannelManager::abandon_payment`]: crate::ln::channelmanager::ChannelManager::abandon_payment /// [`InvoiceRequest`]: crate::offers::invoice_request::InvoiceRequest /// [`Offer`]: crate::offers::offer::Offer @@ -746,6 +766,10 @@ pub enum Event { /// [`ChannelManager::abandon_payment`] to abandon the associated payment. See those docs for /// further details. /// + /// # Failure Behavior and Persistence + /// This event will eventually be replayed after failures-to-handle (i.e., the event handler + /// returning `Err(ReplayEvent ())`) and will be persisted across restarts. + /// /// [`InvoiceRequest`]: crate::offers::invoice_request::InvoiceRequest /// [`Refund`]: crate::offers::refund::Refund /// [`UserConfig::manually_handle_bolt12_invoices`]: crate::util::config::UserConfig::manually_handle_bolt12_invoices @@ -768,6 +792,10 @@ pub enum Event { /// /// Note for MPP payments: in rare cases, this event may be preceded by a `PaymentPathFailed` /// event. In this situation, you SHOULD treat this payment as having succeeded. + /// + /// # Failure Behavior and Persistence + /// This event will eventually be replayed after failures-to-handle (i.e., the event handler + /// returning `Err(ReplayEvent ())`) and will be persisted across restarts. PaymentSent { /// The `payment_id` passed to [`ChannelManager::send_payment`]. /// @@ -806,6 +834,10 @@ pub enum Event { /// received and processed. In this case, the [`Event::PaymentFailed`] event MUST be ignored, /// and the payment MUST be treated as having succeeded. /// + /// # Failure Behavior and Persistence + /// This event will eventually be replayed after failures-to-handle (i.e., the event handler + /// returning `Err(ReplayEvent ())`) and will be persisted across restarts. + /// /// [`Retry`]: crate::ln::channelmanager::Retry /// [`ChannelManager::abandon_payment`]: crate::ln::channelmanager::ChannelManager::abandon_payment PaymentFailed { @@ -825,6 +857,10 @@ pub enum Event { /// /// Always generated after [`Event::PaymentSent`] and thus useful for scoring channels. See /// [`Event::PaymentSent`] for obtaining the payment preimage. + /// + /// # Failure Behavior and Persistence + /// This event will eventually be replayed after failures-to-handle (i.e., the event handler + /// returning `Err(ReplayEvent ())`) and will be persisted across restarts. PaymentPathSuccessful { /// The `payment_id` passed to [`ChannelManager::send_payment`]. /// @@ -850,6 +886,10 @@ pub enum Event { /// See [`ChannelManager::abandon_payment`] for giving up on this payment before its retries have /// been exhausted. /// + /// # Failure Behavior and Persistence + /// This event will eventually be replayed after failures-to-handle (i.e., the event handler + /// returning `Err(ReplayEvent ())`) and will be persisted across restarts. + /// /// [`ChannelManager::abandon_payment`]: crate::ln::channelmanager::ChannelManager::abandon_payment PaymentPathFailed { /// The `payment_id` passed to [`ChannelManager::send_payment`]. @@ -889,6 +929,10 @@ pub enum Event { error_data: Option>, }, /// Indicates that a probe payment we sent returned successful, i.e., only failed at the destination. + /// + /// # Failure Behavior and Persistence + /// This event will eventually be replayed after failures-to-handle (i.e., the event handler + /// returning `Err(ReplayEvent ())`) and will be persisted across restarts. ProbeSuccessful { /// The id returned by [`ChannelManager::send_probe`]. /// @@ -902,6 +946,10 @@ pub enum Event { path: Path, }, /// Indicates that a probe payment we sent failed at an intermediary node on the path. + /// + /// # Failure Behavior and Persistence + /// This event will eventually be replayed after failures-to-handle (i.e., the event handler + /// returning `Err(ReplayEvent ())`) and will be persisted across restarts. ProbeFailed { /// The id returned by [`ChannelManager::send_probe`]. /// @@ -923,6 +971,10 @@ pub enum Event { /// Used to indicate that [`ChannelManager::process_pending_htlc_forwards`] should be called at /// a time in the future. /// + /// # Failure Behavior and Persistence + /// This event will eventually be replayed after failures-to-handle (i.e., the event handler + /// returning `Err(ReplayEvent ())`) and will be regenerated after restarts. + /// /// [`ChannelManager::process_pending_htlc_forwards`]: crate::ln::channelmanager::ChannelManager::process_pending_htlc_forwards PendingHTLCsForwardable { /// The minimum amount of time that should be waited prior to calling @@ -939,6 +991,10 @@ pub enum Event { /// [`ChannelManager::fail_intercepted_htlc`] MUST be called in response to this event. See /// their docs for more information. /// + /// # Failure Behavior and Persistence + /// This event will eventually be replayed after failures-to-handle (i.e., the event handler + /// returning `Err(ReplayEvent ())`) and will be persisted across restarts. + /// /// [`ChannelManager::get_intercept_scid`]: crate::ln::channelmanager::ChannelManager::get_intercept_scid /// [`UserConfig::accept_intercept_htlcs`]: crate::util::config::UserConfig::accept_intercept_htlcs /// [`ChannelManager::forward_intercepted_htlc`]: crate::ln::channelmanager::ChannelManager::forward_intercepted_htlc @@ -974,6 +1030,10 @@ pub enum Event { /// You may hand them to the [`OutputSweeper`] utility which will store and (re-)generate spending /// transactions for you. /// + /// # Failure Behavior and Persistence + /// This event will eventually be replayed after failures-to-handle (i.e., the event handler + /// returning `Err(ReplayEvent ())`) and will be persisted across restarts. + /// /// [`OutputSweeper`]: crate::util::sweep::OutputSweeper SpendableOutputs { /// The outputs which you should store as spendable by you. @@ -985,6 +1045,10 @@ pub enum Event { }, /// This event is generated when a payment has been successfully forwarded through us and a /// forwarding fee earned. + /// + /// # Failure Behavior and Persistence + /// This event will eventually be replayed after failures-to-handle (i.e., the event handler + /// returning `Err(ReplayEvent ())`) and will be persisted across restarts. PaymentForwarded { /// The channel id of the incoming channel between the previous node and us. /// @@ -1046,6 +1110,10 @@ pub enum Event { /// This event is emitted when the funding transaction has been signed and is broadcast to the /// network. For 0conf channels it will be immediately followed by the corresponding /// [`Event::ChannelReady`] event. + /// + /// # Failure Behavior and Persistence + /// This event will eventually be replayed after failures-to-handle (i.e., the event handler + /// returning `Err(ReplayEvent ())`) and will be persisted across restarts. ChannelPending { /// The `channel_id` of the channel that is pending confirmation. channel_id: ChannelId, @@ -1075,6 +1143,10 @@ pub enum Event { /// be used. This event is emitted either when the funding transaction has been confirmed /// on-chain, or, in case of a 0conf channel, when both parties have confirmed the channel /// establishment. + /// + /// # Failure Behavior and Persistence + /// This event will eventually be replayed after failures-to-handle (i.e., the event handler + /// returning `Err(ReplayEvent ())`) and will be persisted across restarts. ChannelReady { /// The `channel_id` of the channel that is ready. channel_id: ChannelId, @@ -1101,6 +1173,10 @@ pub enum Event { /// /// [`ChannelManager::accept_inbound_channel`]: crate::ln::channelmanager::ChannelManager::accept_inbound_channel /// [`UserConfig::manually_accept_inbound_channels`]: crate::util::config::UserConfig::manually_accept_inbound_channels + /// + /// # Failure Behavior and Persistence + /// This event will eventually be replayed after failures-to-handle (i.e., the event handler + /// returning `Err(ReplayEvent ())`) and will be persisted across restarts. ChannelClosed { /// The `channel_id` of the channel which has been closed. Note that on-chain transactions /// resolving the channel are likely still awaiting confirmation. @@ -1135,6 +1211,10 @@ pub enum Event { /// inputs for another purpose. /// /// This event is not guaranteed to be generated for channels that are closed due to a restart. + /// + /// # Failure Behavior and Persistence + /// This event will eventually be replayed after failures-to-handle (i.e., the event handler + /// returning `Err(ReplayEvent ())`) and will be persisted across restarts. DiscardFunding { /// The channel_id of the channel which has been closed. channel_id: ChannelId, @@ -1150,6 +1230,10 @@ pub enum Event { /// The event is only triggered when a new open channel request is received and the /// [`UserConfig::manually_accept_inbound_channels`] config flag is set to true. /// + /// # Failure Behavior and Persistence + /// This event will eventually be replayed after failures-to-handle (i.e., the event handler + /// returning `Err(ReplayEvent ())`) and will be persisted across restarts. + /// /// [`ChannelManager::accept_inbound_channel`]: crate::ln::channelmanager::ChannelManager::accept_inbound_channel /// [`ChannelManager::force_close_without_broadcasting_txn`]: crate::ln::channelmanager::ChannelManager::force_close_without_broadcasting_txn /// [`UserConfig::manually_accept_inbound_channels`]: crate::util::config::UserConfig::manually_accept_inbound_channels @@ -1206,6 +1290,10 @@ pub enum Event { /// /// This event, however, does not get generated if an HTLC fails to meet the forwarding /// requirements (i.e. insufficient fees paid, or a CLTV that is too soon). + /// + /// # Failure Behavior and Persistence + /// This event will eventually be replayed after failures-to-handle (i.e., the event handler + /// returning `Err(ReplayEvent ())`) and will be persisted across restarts. HTLCHandlingFailed { /// The channel over which the HTLC was received. prev_channel_id: ChannelId, @@ -1219,6 +1307,10 @@ pub enum Event { /// [`ChannelHandshakeConfig::negotiate_anchors_zero_fee_htlc_tx`] config flag is set to true. /// It is limited to the scope of channels with anchor outputs. /// + /// # Failure Behavior and Persistence + /// This event will eventually be replayed after failures-to-handle (i.e., the event handler + /// returning `Err(ReplayEvent ())`), but will only be regenerated as needed after restarts. + /// /// [`ChannelHandshakeConfig::negotiate_anchors_zero_fee_htlc_tx`]: crate::util::config::ChannelHandshakeConfig::negotiate_anchors_zero_fee_htlc_tx BumpTransaction(BumpTransactionEvent), /// We received an onion message that is intended to be forwarded to a peer @@ -1226,6 +1318,10 @@ pub enum Event { /// `OnionMessenger` was initialized with /// [`OnionMessenger::new_with_offline_peer_interception`], see its docs. /// + /// # Failure Behavior and Persistence + /// This event will eventually be replayed after failures-to-handle (i.e., the event handler + /// returning `Err(ReplayEvent ())`), but won't be persisted across restarts. + /// /// [`OnionMessenger::new_with_offline_peer_interception`]: crate::onion_message::messenger::OnionMessenger::new_with_offline_peer_interception OnionMessageIntercepted { /// The node id of the offline peer. @@ -1239,6 +1335,10 @@ pub enum Event { /// initialized with /// [`OnionMessenger::new_with_offline_peer_interception`], see its docs. /// + /// # Failure Behavior and Persistence + /// This event will eventually be replayed after failures-to-handle (i.e., the event handler + /// returning `Err(ReplayEvent ())`), but won't be persisted across restarts. + /// /// [`OnionMessenger::new_with_offline_peer_interception`]: crate::onion_message::messenger::OnionMessenger::new_with_offline_peer_interception OnionMessagePeerConnected { /// The node id of the peer we just connected to, who advertises support for @@ -2300,8 +2400,12 @@ pub trait MessageSendEventsProvider { /// /// In order to ensure no [`Event`]s are lost, implementors of this trait will persist [`Event`]s /// and replay any unhandled events on startup. An [`Event`] is considered handled when -/// [`process_pending_events`] returns, thus handlers MUST fully handle [`Event`]s and persist any -/// relevant changes to disk *before* returning. +/// [`process_pending_events`] returns `Ok(())`, thus handlers MUST fully handle [`Event`]s and +/// persist any relevant changes to disk *before* returning `Ok(())`. In case of an error (e.g., +/// persistence failure) implementors should return `Err(ReplayEvent())`, signalling to the +/// [`EventsProvider`] to replay unhandled events on the next invocation (generally immediately). +/// Note that some events might not be replayed, please refer to the documentation for +/// the individual [`Event`] variants for more detail. /// /// Further, because an application may crash between an [`Event`] being handled and the /// implementor of this trait being re-serialized, [`Event`] handling must be idempotent - in @@ -2328,26 +2432,34 @@ pub trait EventsProvider { fn process_pending_events(&self, handler: H) where H::Target: EventHandler; } +/// An error type that may be returned to LDK in order to safely abort event handling if it can't +/// currently succeed (e.g., due to a persistence failure). +/// +/// Depending on the type, LDK may ensure the event is persisted and will eventually be replayed. +/// Please refer to the documentation of each [`Event`] variant for more details. +#[derive(Clone, Copy, Debug)] +pub struct ReplayEvent(); + /// A trait implemented for objects handling events from [`EventsProvider`]. /// /// An async variation also exists for implementations of [`EventsProvider`] that support async /// event handling. The async event handler should satisfy the generic bounds: `F: -/// core::future::Future, H: Fn(Event) -> F`. +/// core::future::Future>, H: Fn(Event) -> F`. pub trait EventHandler { /// Handles the given [`Event`]. /// /// See [`EventsProvider`] for details that must be considered when implementing this method. - fn handle_event(&self, event: Event); + fn handle_event(&self, event: Event) -> Result<(), ReplayEvent>; } -impl EventHandler for F where F: Fn(Event) { - fn handle_event(&self, event: Event) { +impl EventHandler for F where F: Fn(Event) -> Result<(), ReplayEvent> { + fn handle_event(&self, event: Event) -> Result<(), ReplayEvent> { self(event) } } impl EventHandler for Arc { - fn handle_event(&self, event: Event) { + fn handle_event(&self, event: Event) -> Result<(), ReplayEvent> { self.deref().handle_event(event) } } diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index b14b6e60877..561053d85b4 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -41,7 +41,7 @@ use crate::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, Fee use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, WithChannelMonitor, ChannelMonitorUpdateStep, HTLC_FAIL_BACK_BUFFER, CLTV_CLAIM_BUFFER, LATENCY_GRACE_PERIOD_BLOCKS, ANTI_REORG_DELAY, MonitorEvent, CLOSED_CHANNEL_UPDATE_ID}; use crate::chain::transaction::{OutPoint, TransactionData}; use crate::events; -use crate::events::{Event, EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider, ClosureReason, HTLCDestination, PaymentFailureReason}; +use crate::events::{Event, EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider, ClosureReason, HTLCDestination, PaymentFailureReason, ReplayEvent}; // Since this struct is returned in `list_channels` methods, expose it here in case users want to // construct one themselves. use crate::ln::inbound_payment; @@ -1395,35 +1395,38 @@ where /// } /// /// // On the event processing thread once the peer has responded -/// channel_manager.process_pending_events(&|event| match event { -/// Event::FundingGenerationReady { -/// temporary_channel_id, counterparty_node_id, channel_value_satoshis, output_script, -/// user_channel_id, .. -/// } => { -/// assert_eq!(user_channel_id, 42); -/// let funding_transaction = wallet.create_funding_transaction( -/// channel_value_satoshis, output_script -/// ); -/// match channel_manager.funding_transaction_generated( -/// &temporary_channel_id, &counterparty_node_id, funding_transaction -/// ) { -/// Ok(()) => println!("Funding channel {}", temporary_channel_id), -/// Err(e) => println!("Error funding channel {}: {:?}", temporary_channel_id, e), -/// } -/// }, -/// Event::ChannelPending { channel_id, user_channel_id, former_temporary_channel_id, .. } => { -/// assert_eq!(user_channel_id, 42); -/// println!( -/// "Channel {} now {} pending (funding transaction has been broadcasted)", channel_id, -/// former_temporary_channel_id.unwrap() -/// ); -/// }, -/// Event::ChannelReady { channel_id, user_channel_id, .. } => { -/// assert_eq!(user_channel_id, 42); -/// println!("Channel {} ready", channel_id); -/// }, -/// // ... -/// # _ => {}, +/// channel_manager.process_pending_events(&|event| { +/// match event { +/// Event::FundingGenerationReady { +/// temporary_channel_id, counterparty_node_id, channel_value_satoshis, output_script, +/// user_channel_id, .. +/// } => { +/// assert_eq!(user_channel_id, 42); +/// let funding_transaction = wallet.create_funding_transaction( +/// channel_value_satoshis, output_script +/// ); +/// match channel_manager.funding_transaction_generated( +/// &temporary_channel_id, &counterparty_node_id, funding_transaction +/// ) { +/// Ok(()) => println!("Funding channel {}", temporary_channel_id), +/// Err(e) => println!("Error funding channel {}: {:?}", temporary_channel_id, e), +/// } +/// }, +/// Event::ChannelPending { channel_id, user_channel_id, former_temporary_channel_id, .. } => { +/// assert_eq!(user_channel_id, 42); +/// println!( +/// "Channel {} now {} pending (funding transaction has been broadcasted)", channel_id, +/// former_temporary_channel_id.unwrap() +/// ); +/// }, +/// Event::ChannelReady { channel_id, user_channel_id, .. } => { +/// assert_eq!(user_channel_id, 42); +/// println!("Channel {} ready", channel_id); +/// }, +/// // ... +/// # _ => {}, +/// } +/// Ok(()) /// }); /// # } /// ``` @@ -1447,28 +1450,31 @@ where /// # fn example(channel_manager: T) { /// # let channel_manager = channel_manager.get_cm(); /// # let error_message = "Channel force-closed"; -/// channel_manager.process_pending_events(&|event| match event { -/// Event::OpenChannelRequest { temporary_channel_id, counterparty_node_id, .. } => { -/// if !is_trusted(counterparty_node_id) { -/// match channel_manager.force_close_without_broadcasting_txn( -/// &temporary_channel_id, &counterparty_node_id, error_message.to_string() -/// ) { -/// Ok(()) => println!("Rejecting channel {}", temporary_channel_id), -/// Err(e) => println!("Error rejecting channel {}: {:?}", temporary_channel_id, e), +/// channel_manager.process_pending_events(&|event| { +/// match event { +/// Event::OpenChannelRequest { temporary_channel_id, counterparty_node_id, .. } => { +/// if !is_trusted(counterparty_node_id) { +/// match channel_manager.force_close_without_broadcasting_txn( +/// &temporary_channel_id, &counterparty_node_id, error_message.to_string() +/// ) { +/// Ok(()) => println!("Rejecting channel {}", temporary_channel_id), +/// Err(e) => println!("Error rejecting channel {}: {:?}", temporary_channel_id, e), +/// } +/// return Ok(()); /// } -/// return; -/// } /// -/// let user_channel_id = 43; -/// match channel_manager.accept_inbound_channel( -/// &temporary_channel_id, &counterparty_node_id, user_channel_id -/// ) { -/// Ok(()) => println!("Accepting channel {}", temporary_channel_id), -/// Err(e) => println!("Error accepting channel {}: {:?}", temporary_channel_id, e), -/// } -/// }, -/// // ... -/// # _ => {}, +/// let user_channel_id = 43; +/// match channel_manager.accept_inbound_channel( +/// &temporary_channel_id, &counterparty_node_id, user_channel_id +/// ) { +/// Ok(()) => println!("Accepting channel {}", temporary_channel_id), +/// Err(e) => println!("Error accepting channel {}: {:?}", temporary_channel_id, e), +/// } +/// }, +/// // ... +/// # _ => {}, +/// } +/// Ok(()) /// }); /// # } /// ``` @@ -1497,13 +1503,16 @@ where /// } /// /// // On the event processing thread -/// channel_manager.process_pending_events(&|event| match event { -/// Event::ChannelClosed { channel_id, user_channel_id, .. } => { -/// assert_eq!(user_channel_id, 42); -/// println!("Channel {} closed", channel_id); -/// }, -/// // ... -/// # _ => {}, +/// channel_manager.process_pending_events(&|event| { +/// match event { +/// Event::ChannelClosed { channel_id, user_channel_id, .. } => { +/// assert_eq!(user_channel_id, 42); +/// println!("Channel {} closed", channel_id); +/// }, +/// // ... +/// # _ => {}, +/// } +/// Ok(()) /// }); /// # } /// ``` @@ -1553,30 +1562,33 @@ where /// }; /// /// // On the event processing thread -/// channel_manager.process_pending_events(&|event| match event { -/// Event::PaymentClaimable { payment_hash, purpose, .. } => match purpose { -/// PaymentPurpose::Bolt11InvoicePayment { payment_preimage: Some(payment_preimage), .. } => { -/// assert_eq!(payment_hash, known_payment_hash); -/// println!("Claiming payment {}", payment_hash); -/// channel_manager.claim_funds(payment_preimage); -/// }, -/// PaymentPurpose::Bolt11InvoicePayment { payment_preimage: None, .. } => { -/// println!("Unknown payment hash: {}", payment_hash); +/// channel_manager.process_pending_events(&|event| { +/// match event { +/// Event::PaymentClaimable { payment_hash, purpose, .. } => match purpose { +/// PaymentPurpose::Bolt11InvoicePayment { payment_preimage: Some(payment_preimage), .. } => { +/// assert_eq!(payment_hash, known_payment_hash); +/// println!("Claiming payment {}", payment_hash); +/// channel_manager.claim_funds(payment_preimage); +/// }, +/// PaymentPurpose::Bolt11InvoicePayment { payment_preimage: None, .. } => { +/// println!("Unknown payment hash: {}", payment_hash); +/// }, +/// PaymentPurpose::SpontaneousPayment(payment_preimage) => { +/// assert_ne!(payment_hash, known_payment_hash); +/// println!("Claiming spontaneous payment {}", payment_hash); +/// channel_manager.claim_funds(payment_preimage); +/// }, +/// // ... +/// # _ => {}, /// }, -/// PaymentPurpose::SpontaneousPayment(payment_preimage) => { -/// assert_ne!(payment_hash, known_payment_hash); -/// println!("Claiming spontaneous payment {}", payment_hash); -/// channel_manager.claim_funds(payment_preimage); +/// Event::PaymentClaimed { payment_hash, amount_msat, .. } => { +/// assert_eq!(payment_hash, known_payment_hash); +/// println!("Claimed {} msats", amount_msat); /// }, /// // ... -/// # _ => {}, -/// }, -/// Event::PaymentClaimed { payment_hash, amount_msat, .. } => { -/// assert_eq!(payment_hash, known_payment_hash); -/// println!("Claimed {} msats", amount_msat); -/// }, -/// // ... -/// # _ => {}, +/// # _ => {}, +/// } +/// Ok(()) /// }); /// # } /// ``` @@ -1619,11 +1631,14 @@ where /// ); /// /// // On the event processing thread -/// channel_manager.process_pending_events(&|event| match event { -/// Event::PaymentSent { payment_hash, .. } => println!("Paid {}", payment_hash), -/// Event::PaymentFailed { payment_hash, .. } => println!("Failed paying {}", payment_hash), -/// // ... -/// # _ => {}, +/// channel_manager.process_pending_events(&|event| { +/// match event { +/// Event::PaymentSent { payment_hash, .. } => println!("Paid {}", payment_hash), +/// Event::PaymentFailed { payment_hash, .. } => println!("Failed paying {}", payment_hash), +/// // ... +/// # _ => {}, +/// } +/// Ok(()) /// }); /// # } /// ``` @@ -1657,23 +1672,25 @@ where /// let bech32_offer = offer.to_string(); /// /// // On the event processing thread -/// channel_manager.process_pending_events(&|event| match event { -/// Event::PaymentClaimable { payment_hash, purpose, .. } => match purpose { -/// PaymentPurpose::Bolt12OfferPayment { payment_preimage: Some(payment_preimage), .. } => { -/// println!("Claiming payment {}", payment_hash); -/// channel_manager.claim_funds(payment_preimage); +/// channel_manager.process_pending_events(&|event| { +/// match event { +/// Event::PaymentClaimable { payment_hash, purpose, .. } => match purpose { +/// PaymentPurpose::Bolt12OfferPayment { payment_preimage: Some(payment_preimage), .. } => { +/// println!("Claiming payment {}", payment_hash); +/// channel_manager.claim_funds(payment_preimage); +/// }, +/// PaymentPurpose::Bolt12OfferPayment { payment_preimage: None, .. } => { +/// println!("Unknown payment hash: {}", payment_hash); +/// } +/// # _ => {}, /// }, -/// PaymentPurpose::Bolt12OfferPayment { payment_preimage: None, .. } => { -/// println!("Unknown payment hash: {}", payment_hash); +/// Event::PaymentClaimed { payment_hash, amount_msat, .. } => { +/// println!("Claimed {} msats", amount_msat); /// }, /// // ... -/// # _ => {}, -/// }, -/// Event::PaymentClaimed { payment_hash, amount_msat, .. } => { -/// println!("Claimed {} msats", amount_msat); -/// }, -/// // ... -/// # _ => {}, +/// # _ => {}, +/// } +/// Ok(()) /// }); /// # Ok(()) /// # } @@ -1719,12 +1736,15 @@ where /// ); /// /// // On the event processing thread -/// channel_manager.process_pending_events(&|event| match event { -/// Event::PaymentSent { payment_id: Some(payment_id), .. } => println!("Paid {}", payment_id), -/// Event::PaymentFailed { payment_id, .. } => println!("Failed paying {}", payment_id), -/// Event::InvoiceRequestFailed { payment_id, .. } => println!("Failed paying {}", payment_id), -/// // ... -/// # _ => {}, +/// channel_manager.process_pending_events(&|event| { +/// match event { +/// Event::PaymentSent { payment_id: Some(payment_id), .. } => println!("Paid {}", payment_id), +/// Event::PaymentFailed { payment_id, .. } => println!("Failed paying {}", payment_id), +/// Event::InvoiceRequestFailed { payment_id, .. } => println!("Failed paying {}", payment_id), +/// // ... +/// # _ => {}, +/// } +/// Ok(()) /// }); /// # } /// ``` @@ -1779,11 +1799,14 @@ where /// ); /// /// // On the event processing thread -/// channel_manager.process_pending_events(&|event| match event { -/// Event::PaymentSent { payment_id: Some(payment_id), .. } => println!("Paid {}", payment_id), -/// Event::PaymentFailed { payment_id, .. } => println!("Failed paying {}", payment_id), -/// // ... -/// # _ => {}, +/// channel_manager.process_pending_events(&|event| { +/// match event { +/// Event::PaymentSent { payment_id: Some(payment_id), .. } => println!("Paid {}", payment_id), +/// Event::PaymentFailed { payment_id, .. } => println!("Failed paying {}", payment_id), +/// // ... +/// # _ => {}, +/// } +/// Ok(()) /// }); /// # Ok(()) /// # } @@ -1809,18 +1832,19 @@ where /// }; /// /// // On the event processing thread -/// channel_manager.process_pending_events(&|event| match event { -/// Event::PaymentClaimable { payment_hash, purpose, .. } => match purpose { -/// PaymentPurpose::Bolt12RefundPayment { payment_preimage: Some(payment_preimage), .. } => { -/// assert_eq!(payment_hash, known_payment_hash); -/// println!("Claiming payment {}", payment_hash); -/// channel_manager.claim_funds(payment_preimage); -/// }, -/// PaymentPurpose::Bolt12RefundPayment { payment_preimage: None, .. } => { -/// println!("Unknown payment hash: {}", payment_hash); -/// }, -/// // ... -/// # _ => {}, +/// channel_manager.process_pending_events(&|event| { +/// match event { +/// Event::PaymentClaimable { payment_hash, purpose, .. } => match purpose { +/// PaymentPurpose::Bolt12RefundPayment { payment_preimage: Some(payment_preimage), .. } => { +/// assert_eq!(payment_hash, known_payment_hash); +/// println!("Claiming payment {}", payment_hash); +/// channel_manager.claim_funds(payment_preimage); +/// }, +/// PaymentPurpose::Bolt12RefundPayment { payment_preimage: None, .. } => { +/// println!("Unknown payment hash: {}", payment_hash); +/// }, +/// // ... +/// # _ => {}, /// }, /// Event::PaymentClaimed { payment_hash, amount_msat, .. } => { /// assert_eq!(payment_hash, known_payment_hash); @@ -1828,6 +1852,8 @@ where /// }, /// // ... /// # _ => {}, +/// } +/// Ok(()) /// }); /// # } /// ``` @@ -2831,8 +2857,9 @@ macro_rules! handle_new_monitor_update { macro_rules! process_events_body { ($self: expr, $event_to_handle: expr, $handle_event: expr) => { + let mut handling_failed = false; let mut processed_all_events = false; - while !processed_all_events { + while !handling_failed && !processed_all_events { if $self.pending_events_processor.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed).is_err() { return; } @@ -2856,24 +2883,34 @@ macro_rules! process_events_body { } let pending_events = $self.pending_events.lock().unwrap().clone(); - let num_events = pending_events.len(); if !pending_events.is_empty() { result = NotifyOption::DoPersist; } let mut post_event_actions = Vec::new(); + let mut num_handled_events = 0; for (event, action_opt) in pending_events { $event_to_handle = event; - $handle_event; - if let Some(action) = action_opt { - post_event_actions.push(action); + match $handle_event { + Ok(()) => { + if let Some(action) = action_opt { + post_event_actions.push(action); + } + num_handled_events += 1; + } + Err(_e) => { + // If we encounter an error we stop handling events and make sure to replay + // any unhandled events on the next invocation. + handling_failed = true; + break; + } } } { let mut pending_events = $self.pending_events.lock().unwrap(); - pending_events.drain(..num_events); + pending_events.drain(..num_handled_events); processed_all_events = pending_events.is_empty(); // Note that `push_pending_forwards_ev` relies on `pending_events_processor` being // updated here with the `pending_events` lock acquired. @@ -9240,7 +9277,7 @@ where #[cfg(any(test, feature = "_test_utils"))] pub fn get_and_clear_pending_events(&self) -> Vec { let events = core::cell::RefCell::new(Vec::new()); - let event_handler = |event: events::Event| events.borrow_mut().push(event); + let event_handler = |event: events::Event| Ok(events.borrow_mut().push(event)); self.process_pending_events(&event_handler); events.into_inner() } @@ -9347,7 +9384,7 @@ where /// using the given event handler. /// /// See the trait-level documentation of [`EventsProvider`] for requirements. - pub async fn process_pending_events_async Future>( + pub async fn process_pending_events_async>, H: Fn(Event) -> Future>( &self, handler: H ) { let mut ev; diff --git a/lightning/src/onion_message/functional_tests.rs b/lightning/src/onion_message/functional_tests.rs index 371b4f5879d..16e62bf33f4 100644 --- a/lightning/src/onion_message/functional_tests.rs +++ b/lightning/src/onion_message/functional_tests.rs @@ -307,7 +307,7 @@ fn disconnect_peers(node_a: &MessengerNode, node_b: &MessengerNode) { fn release_events(node: &MessengerNode) -> Vec { let events = core::cell::RefCell::new(Vec::new()); - node.messenger.process_pending_events(&|e| events.borrow_mut().push(e)); + node.messenger.process_pending_events(&|e| Ok(events.borrow_mut().push(e))); events.into_inner() } diff --git a/lightning/src/onion_message/messenger.rs b/lightning/src/onion_message/messenger.rs index 28f1bc79253..7c7cd261089 100644 --- a/lightning/src/onion_message/messenger.rs +++ b/lightning/src/onion_message/messenger.rs @@ -18,7 +18,7 @@ use bitcoin::secp256k1::{self, PublicKey, Scalar, Secp256k1, SecretKey}; use crate::blinded_path::{BlindedPath, IntroductionNode, NextMessageHop, NodeIdLookUp}; use crate::blinded_path::message::{advance_path_by_one, ForwardNode, ForwardTlvs, MessageContext, OffersContext, ReceiveTlvs}; use crate::blinded_path::utils; -use crate::events::{Event, EventHandler, EventsProvider}; +use crate::events::{Event, EventHandler, EventsProvider, ReplayEvent}; use crate::sign::{EntropySource, NodeSigner, Recipient}; use crate::ln::features::{InitFeatures, NodeFeatures}; use crate::ln::msgs::{self, OnionMessage, OnionMessageHandler, SocketAddress}; @@ -31,11 +31,13 @@ use super::packet::OnionMessageContents; use super::packet::ParsedOnionMessageContents; use super::offers::OffersMessageHandler; use super::packet::{BIG_PACKET_HOP_DATA_LEN, ForwardControlTlvs, Packet, Payload, ReceiveControlTlvs, SMALL_PACKET_HOP_DATA_LEN}; +use crate::util::async_poll::{MultiResultFuturePoller, ResultFuture}; use crate::util::logger::{Logger, WithContext}; use crate::util::ser::Writeable; use core::fmt; use core::ops::Deref; +use core::sync::atomic::{AtomicBool, Ordering}; use crate::io; use crate::sync::Mutex; use crate::prelude::*; @@ -261,12 +263,9 @@ pub struct OnionMessenger< async_payments_handler: APH, custom_handler: CMH, intercept_messages_for_offline_peers: bool, - pending_events: Mutex, -} - -struct PendingEvents { - intercepted_msgs: Vec, - peer_connecteds: Vec, + pending_intercepted_msgs_events: Mutex>, + pending_peer_connected_events: Mutex>, + pending_events_processor: AtomicBool, } /// [`OnionMessage`]s buffered to be sent. @@ -1021,6 +1020,28 @@ where } } +macro_rules! drop_handled_events_and_abort { ($self: expr, $res: expr, $offset: expr, $event_queue: expr) => { + // We want to make sure to cleanly abort upon event handling failure. To this end, we drop all + // successfully handled events from the given queue, reset the events processing flag, and + // return, to have the events eventually replayed upon next invocation. + { + let mut queue_lock = $event_queue.lock().unwrap(); + + // We skip `$offset` result entries to reach the ones relevant for the given `$event_queue`. + let mut res_iter = $res.iter().skip($offset); + + // Keep all events which previously error'd *or* any that have been added since we dropped + // the Mutex before. + queue_lock.retain(|_| res_iter.next().map_or(true, |r| r.is_err())); + + if $res.iter().any(|r| r.is_err()) { + // We failed handling some events. Return to have them eventually replayed. + $self.pending_events_processor.store(false, Ordering::Release); + return; + } + } +}} + impl OnionMessenger where @@ -1095,10 +1116,9 @@ where async_payments_handler, custom_handler, intercept_messages_for_offline_peers, - pending_events: Mutex::new(PendingEvents { - intercepted_msgs: Vec::new(), - peer_connecteds: Vec::new(), - }), + pending_intercepted_msgs_events: Mutex::new(Vec::new()), + pending_peer_connected_events: Mutex::new(Vec::new()), + pending_events_processor: AtomicBool::new(false), } } @@ -1316,14 +1336,15 @@ where fn enqueue_intercepted_event(&self, event: Event) { const MAX_EVENTS_BUFFER_SIZE: usize = (1 << 10) * 256; - let mut pending_events = self.pending_events.lock().unwrap(); - let total_buffered_bytes: usize = - pending_events.intercepted_msgs.iter().map(|ev| ev.serialized_length()).sum(); + let mut pending_intercepted_msgs_events = + self.pending_intercepted_msgs_events.lock().unwrap(); + let total_buffered_bytes: usize = pending_intercepted_msgs_events.iter() + .map(|ev| ev.serialized_length()).sum(); if total_buffered_bytes >= MAX_EVENTS_BUFFER_SIZE { log_trace!(self.logger, "Dropping event {:?}: buffer full", event); return } - pending_events.intercepted_msgs.push(event); + pending_intercepted_msgs_events.push(event); } /// Processes any events asynchronously using the given handler. @@ -1333,42 +1354,63 @@ where /// have an ordering requirement. /// /// See the trait-level documentation of [`EventsProvider`] for requirements. - pub async fn process_pending_events_async + core::marker::Unpin, H: Fn(Event) -> Future>( + pub async fn process_pending_events_async> + core::marker::Unpin, H: Fn(Event) -> Future>( &self, handler: H ) { - let mut intercepted_msgs = Vec::new(); - let mut peer_connecteds = Vec::new(); - { - let mut pending_events = self.pending_events.lock().unwrap(); - core::mem::swap(&mut pending_events.intercepted_msgs, &mut intercepted_msgs); - core::mem::swap(&mut pending_events.peer_connecteds, &mut peer_connecteds); + if self.pending_events_processor.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed).is_err() { + return; } - let mut futures = Vec::with_capacity(intercepted_msgs.len()); - for (node_id, recipient) in self.message_recipients.lock().unwrap().iter_mut() { - if let OnionMessageRecipient::PendingConnection(_, addresses, _) = recipient { - if let Some(addresses) = addresses.take() { - futures.push(Some(handler(Event::ConnectionNeeded { node_id: *node_id, addresses }))); + { + let intercepted_msgs = self.pending_intercepted_msgs_events.lock().unwrap().clone(); + let mut futures = Vec::with_capacity(intercepted_msgs.len()); + for (node_id, recipient) in self.message_recipients.lock().unwrap().iter_mut() { + if let OnionMessageRecipient::PendingConnection(_, addresses, _) = recipient { + if let Some(addresses) = addresses.take() { + let future = ResultFuture::Pending(handler(Event::ConnectionNeeded { node_id: *node_id, addresses })); + futures.push(future); + } } } - } - for ev in intercepted_msgs { - if let Event::OnionMessageIntercepted { .. } = ev {} else { debug_assert!(false); } - futures.push(Some(handler(ev))); + // The offset in the `futures` vec at which `intercepted_msgs` start. We don't bother + // replaying `ConnectionNeeded` events. + let intercepted_msgs_offset = futures.len(); + + for ev in intercepted_msgs { + if let Event::OnionMessageIntercepted { .. } = ev {} else { debug_assert!(false); } + let future = ResultFuture::Pending(handler(ev)); + futures.push(future); + } + // Let the `OnionMessageIntercepted` events finish before moving on to peer_connecteds + let res = MultiResultFuturePoller::new(futures).await; + drop_handled_events_and_abort!(self, res, intercepted_msgs_offset, self.pending_intercepted_msgs_events); } - // Let the `OnionMessageIntercepted` events finish before moving on to peer_connecteds - crate::util::async_poll::MultiFuturePoller(futures).await; - if peer_connecteds.len() <= 1 { - for event in peer_connecteds { handler(event).await; } - } else { - let mut futures = Vec::new(); - for event in peer_connecteds { - futures.push(Some(handler(event))); + { + let peer_connecteds = self.pending_peer_connected_events.lock().unwrap().clone(); + let num_peer_connecteds = peer_connecteds.len(); + if num_peer_connecteds <= 1 { + for event in peer_connecteds { + if handler(event).await.is_ok() { + self.pending_peer_connected_events.lock().unwrap().drain(..num_peer_connecteds); + } else { + // We failed handling the event. Return to have it eventually replayed. + self.pending_events_processor.store(false, Ordering::Release); + return; + } + } + } else { + let mut futures = Vec::new(); + for event in peer_connecteds { + let future = ResultFuture::Pending(handler(event)); + futures.push(future); + } + let res = MultiResultFuturePoller::new(futures).await; + drop_handled_events_and_abort!(self, res, 0, self.pending_peer_connected_events); } - crate::util::async_poll::MultiFuturePoller(futures).await; } + self.pending_events_processor.store(false, Ordering::Release); } } @@ -1408,31 +1450,42 @@ where CMH::Target: CustomOnionMessageHandler, { fn process_pending_events(&self, handler: H) where H::Target: EventHandler { + if self.pending_events_processor.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed).is_err() { + return; + } + for (node_id, recipient) in self.message_recipients.lock().unwrap().iter_mut() { if let OnionMessageRecipient::PendingConnection(_, addresses, _) = recipient { if let Some(addresses) = addresses.take() { - handler.handle_event(Event::ConnectionNeeded { node_id: *node_id, addresses }); + let _ = handler.handle_event(Event::ConnectionNeeded { node_id: *node_id, addresses }); } } } - let mut events = Vec::new(); + let intercepted_msgs; + let peer_connecteds; { - let mut pending_events = self.pending_events.lock().unwrap(); + let pending_intercepted_msgs_events = self.pending_intercepted_msgs_events.lock().unwrap(); + intercepted_msgs = pending_intercepted_msgs_events.clone(); + let mut pending_peer_connected_events = self.pending_peer_connected_events.lock().unwrap(); + peer_connecteds = pending_peer_connected_events.clone(); #[cfg(debug_assertions)] { - for ev in pending_events.intercepted_msgs.iter() { + for ev in pending_intercepted_msgs_events.iter() { if let Event::OnionMessageIntercepted { .. } = ev {} else { panic!(); } } - for ev in pending_events.peer_connecteds.iter() { + for ev in pending_peer_connected_events.iter() { if let Event::OnionMessagePeerConnected { .. } = ev {} else { panic!(); } } } - core::mem::swap(&mut pending_events.intercepted_msgs, &mut events); - events.append(&mut pending_events.peer_connecteds); - pending_events.peer_connecteds.shrink_to(10); // Limit total heap usage - } - for ev in events { - handler.handle_event(ev); + pending_peer_connected_events.shrink_to(10); // Limit total heap usage } + + let res = intercepted_msgs.into_iter().map(|ev| handler.handle_event(ev)).collect::>(); + drop_handled_events_and_abort!(self, res, 0, self.pending_intercepted_msgs_events); + + let res = peer_connecteds.into_iter().map(|ev| handler.handle_event(ev)).collect::>(); + drop_handled_events_and_abort!(self, res, 0, self.pending_peer_connected_events); + + self.pending_events_processor.store(false, Ordering::Release); } } @@ -1558,7 +1611,9 @@ where .or_insert_with(|| OnionMessageRecipient::ConnectedPeer(VecDeque::new())) .mark_connected(); if self.intercept_messages_for_offline_peers { - self.pending_events.lock().unwrap().peer_connecteds.push( + let mut pending_peer_connected_events = + self.pending_peer_connected_events.lock().unwrap(); + pending_peer_connected_events.push( Event::OnionMessagePeerConnected { peer_node_id: *their_node_id } ); } diff --git a/lightning/src/util/async_poll.rs b/lightning/src/util/async_poll.rs index 7a368af7bae..c18ada73a47 100644 --- a/lightning/src/util/async_poll.rs +++ b/lightning/src/util/async_poll.rs @@ -15,29 +15,62 @@ use core::marker::Unpin; use core::pin::Pin; use core::task::{Context, Poll}; -pub(crate) struct MultiFuturePoller + Unpin>(pub Vec>); +pub(crate) enum ResultFuture>, E: Copy + Unpin> { + Pending(F), + Ready(Result<(), E>), +} + +pub(crate) struct MultiResultFuturePoller< + F: Future> + Unpin, + E: Copy + Unpin, +> { + futures_state: Vec>, +} -impl + Unpin> Future for MultiFuturePoller { - type Output = (); - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { +impl> + Unpin, E: Copy + Unpin> MultiResultFuturePoller { + pub fn new(futures_state: Vec>) -> Self { + Self { futures_state } + } +} + +impl> + Unpin, E: Copy + Unpin> Future + for MultiResultFuturePoller +{ + type Output = Vec>; + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>> { let mut have_pending_futures = false; - for fut_option in self.get_mut().0.iter_mut() { - let mut fut = match fut_option.take() { - None => continue, - Some(fut) => fut, - }; - match Pin::new(&mut fut).poll(cx) { - Poll::Ready(()) => {}, - Poll::Pending => { - have_pending_futures = true; - *fut_option = Some(fut); + let futures_state = &mut self.get_mut().futures_state; + for state in futures_state.iter_mut() { + match state { + ResultFuture::Pending(ref mut fut) => match Pin::new(fut).poll(cx) { + Poll::Ready(res) => { + *state = ResultFuture::Ready(res); + }, + Poll::Pending => { + have_pending_futures = true; + }, }, + ResultFuture::Ready(_) => continue, } } + if have_pending_futures { Poll::Pending } else { - Poll::Ready(()) + let results = futures_state + .drain(..) + .filter_map(|e| match e { + ResultFuture::Ready(res) => Some(res), + ResultFuture::Pending(_) => { + debug_assert!( + false, + "All futures are expected to be ready if none are pending" + ); + None + }, + }) + .collect(); + Poll::Ready(results) } } }