Skip to content

Commit a7e4311

Browse files
committed
Make event handling fallible
Previously, we would require our users to handle all events successfully inline or panic will trying to do so. If they would exit the `EventHandler` any other way we'd forget about the event and wouldn't replay them after restart. Here, we implement fallible event handling, allowing the user to return `Err(())` which signals to our event providers they should abort event processing and replay any unhandled events later (i.e., in the next invocation).
1 parent 44e1b78 commit a7e4311

File tree

8 files changed

+257
-179
lines changed

8 files changed

+257
-179
lines changed

lightning-background-processor/src/lib.rs

+47-30
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ use lightning::events::{Event, PathFailure};
2929
use lightning::events::EventHandler;
3030
#[cfg(feature = "std")]
3131
use lightning::events::EventsProvider;
32+
#[cfg(feature = "futures")]
33+
use lightning::events::ReplayEvent;
3234

3335
use lightning::ln::channelmanager::AChannelManager;
3436
use lightning::ln::msgs::OnionMessageHandler;
@@ -539,6 +541,7 @@ use core::task;
539541
/// could setup `process_events_async` like this:
540542
/// ```
541543
/// # use lightning::io;
544+
/// # use lightning::events::ReplayEvent;
542545
/// # use std::sync::{Arc, RwLock};
543546
/// # use std::sync::atomic::{AtomicBool, Ordering};
544547
/// # use std::time::SystemTime;
@@ -556,7 +559,7 @@ use core::task;
556559
/// # }
557560
/// # struct EventHandler {}
558561
/// # impl EventHandler {
559-
/// # async fn handle_event(&self, _: lightning::events::Event) {}
562+
/// # async fn handle_event(&self, _: lightning::events::Event) -> Result<(), ReplayEvent> { Ok(()) }
560563
/// # }
561564
/// # #[derive(Eq, PartialEq, Clone, Hash)]
562565
/// # struct SocketDescriptor {}
@@ -654,7 +657,7 @@ pub async fn process_events_async<
654657
G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
655658
L: 'static + Deref + Send + Sync,
656659
P: 'static + Deref + Send + Sync,
657-
EventHandlerFuture: core::future::Future<Output = ()>,
660+
EventHandlerFuture: core::future::Future<Output = Result<(), ReplayEvent>>,
658661
EventHandler: Fn(Event) -> EventHandlerFuture,
659662
PS: 'static + Deref + Send,
660663
M: 'static + Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P>> + Send + Sync,
@@ -703,12 +706,16 @@ where
703706
if update_scorer(scorer, &event, duration_since_epoch) {
704707
log_trace!(logger, "Persisting scorer after update");
705708
if let Err(e) = persister.persist_scorer(&scorer) {
706-
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
709+
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e);
710+
// We opt not to abort early on persistence failure here as persisting
711+
// the scorer is non-critical and we still hope that it will have
712+
// resolved itself when it is potentially critical in event handling
713+
// below.
707714
}
708715
}
709716
}
710717
}
711-
event_handler(event).await;
718+
event_handler(event).await
712719
})
713720
};
714721
define_run_body!(
@@ -841,7 +848,7 @@ impl BackgroundProcessor {
841848
}
842849
}
843850
}
844-
event_handler.handle_event(event);
851+
event_handler.handle_event(event)
845852
};
846853
define_run_body!(
847854
persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
@@ -1425,7 +1432,7 @@ mod tests {
14251432
// Initiate the background processors to watch each node.
14261433
let data_dir = nodes[0].kv_store.get_data_dir();
14271434
let persister = Arc::new(Persister::new(data_dir));
1428-
let event_handler = |_: _| {};
1435+
let event_handler = |_: _| { Ok(()) };
14291436
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
14301437

14311438
macro_rules! check_persisted_data {
@@ -1493,7 +1500,7 @@ mod tests {
14931500
let (_, nodes) = create_nodes(1, "test_timer_tick_called");
14941501
let data_dir = nodes[0].kv_store.get_data_dir();
14951502
let persister = Arc::new(Persister::new(data_dir));
1496-
let event_handler = |_: _| {};
1503+
let event_handler = |_: _| { Ok(()) };
14971504
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
14981505
loop {
14991506
let log_entries = nodes[0].logger.lines.lock().unwrap();
@@ -1522,7 +1529,7 @@ mod tests {
15221529

15231530
let data_dir = nodes[0].kv_store.get_data_dir();
15241531
let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
1525-
let event_handler = |_: _| {};
1532+
let event_handler = |_: _| { Ok(()) };
15261533
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
15271534
match bg_processor.join() {
15281535
Ok(_) => panic!("Expected error persisting manager"),
@@ -1544,7 +1551,7 @@ mod tests {
15441551
let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
15451552

15461553
let bp_future = super::process_events_async(
1547-
persister, |_: _| {async {}}, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()),
1554+
persister, |_: _| {async { Ok(()) }}, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()),
15481555
nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(),
15491556
Some(nodes[0].scorer.clone()), move |dur: Duration| {
15501557
Box::pin(async move {
@@ -1568,7 +1575,7 @@ mod tests {
15681575
let (_, nodes) = create_nodes(2, "test_persist_network_graph_error");
15691576
let data_dir = nodes[0].kv_store.get_data_dir();
15701577
let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
1571-
let event_handler = |_: _| {};
1578+
let event_handler = |_: _| { Ok(()) };
15721579
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
15731580

15741581
match bg_processor.stop() {
@@ -1586,7 +1593,7 @@ mod tests {
15861593
let (_, nodes) = create_nodes(2, "test_persist_scorer_error");
15871594
let data_dir = nodes[0].kv_store.get_data_dir();
15881595
let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
1589-
let event_handler = |_: _| {};
1596+
let event_handler = |_: _| { Ok(()) };
15901597
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
15911598

15921599
match bg_processor.stop() {
@@ -1608,11 +1615,14 @@ mod tests {
16081615
// Set up a background event handler for FundingGenerationReady events.
16091616
let (funding_generation_send, funding_generation_recv) = std::sync::mpsc::sync_channel(1);
16101617
let (channel_pending_send, channel_pending_recv) = std::sync::mpsc::sync_channel(1);
1611-
let event_handler = move |event: Event| match event {
1612-
Event::FundingGenerationReady { .. } => funding_generation_send.send(handle_funding_generation_ready!(event, channel_value)).unwrap(),
1613-
Event::ChannelPending { .. } => channel_pending_send.send(()).unwrap(),
1614-
Event::ChannelReady { .. } => {},
1615-
_ => panic!("Unexpected event: {:?}", event),
1618+
let event_handler = move |event: Event| {
1619+
match event {
1620+
Event::FundingGenerationReady { .. } => funding_generation_send.send(handle_funding_generation_ready!(event, channel_value)).unwrap(),
1621+
Event::ChannelPending { .. } => channel_pending_send.send(()).unwrap(),
1622+
Event::ChannelReady { .. } => {},
1623+
_ => panic!("Unexpected event: {:?}", event),
1624+
}
1625+
Ok(())
16161626
};
16171627

16181628
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
@@ -1648,11 +1658,14 @@ mod tests {
16481658

16491659
// Set up a background event handler for SpendableOutputs events.
16501660
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1651-
let event_handler = move |event: Event| match event {
1652-
Event::SpendableOutputs { .. } => sender.send(event).unwrap(),
1653-
Event::ChannelReady { .. } => {},
1654-
Event::ChannelClosed { .. } => {},
1655-
_ => panic!("Unexpected event: {:?}", event),
1661+
let event_handler = move |event: Event| {
1662+
match event {
1663+
Event::SpendableOutputs { .. } => sender.send(event).unwrap(),
1664+
Event::ChannelReady { .. } => {},
1665+
Event::ChannelClosed { .. } => {},
1666+
_ => panic!("Unexpected event: {:?}", event),
1667+
}
1668+
Ok(())
16561669
};
16571670
let persister = Arc::new(Persister::new(data_dir));
16581671
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
@@ -1766,7 +1779,7 @@ mod tests {
17661779
let (_, nodes) = create_nodes(2, "test_scorer_persistence");
17671780
let data_dir = nodes[0].kv_store.get_data_dir();
17681781
let persister = Arc::new(Persister::new(data_dir));
1769-
let event_handler = |_: _| {};
1782+
let event_handler = |_: _| { Ok(()) };
17701783
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
17711784

17721785
loop {
@@ -1839,7 +1852,7 @@ mod tests {
18391852
let data_dir = nodes[0].kv_store.get_data_dir();
18401853
let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
18411854

1842-
let event_handler = |_: _| {};
1855+
let event_handler = |_: _| { Ok(()) };
18431856
let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
18441857

18451858
do_test_not_pruning_network_graph_until_graph_sync_completion!(nodes,
@@ -1860,7 +1873,7 @@ mod tests {
18601873

18611874
let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
18621875
let bp_future = super::process_events_async(
1863-
persister, |_: _| {async {}}, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()),
1876+
persister, |_: _| {async { Ok(()) }}, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()),
18641877
nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(),
18651878
Some(nodes[0].scorer.clone()), move |dur: Duration| {
18661879
let mut exit_receiver = exit_receiver.clone();
@@ -1987,12 +2000,15 @@ mod tests {
19872000
#[test]
19882001
fn test_payment_path_scoring() {
19892002
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1990-
let event_handler = move |event: Event| match event {
1991-
Event::PaymentPathFailed { .. } => sender.send(event).unwrap(),
1992-
Event::PaymentPathSuccessful { .. } => sender.send(event).unwrap(),
1993-
Event::ProbeSuccessful { .. } => sender.send(event).unwrap(),
1994-
Event::ProbeFailed { .. } => sender.send(event).unwrap(),
1995-
_ => panic!("Unexpected event: {:?}", event),
2003+
let event_handler = move |event: Event| {
2004+
match event {
2005+
Event::PaymentPathFailed { .. } => sender.send(event).unwrap(),
2006+
Event::PaymentPathSuccessful { .. } => sender.send(event).unwrap(),
2007+
Event::ProbeSuccessful { .. } => sender.send(event).unwrap(),
2008+
Event::ProbeFailed { .. } => sender.send(event).unwrap(),
2009+
_ => panic!("Unexpected event: {:?}", event),
2010+
}
2011+
Ok(())
19962012
};
19972013

19982014
let (_, nodes) = create_nodes(1, "test_payment_path_scoring");
@@ -2025,6 +2041,7 @@ mod tests {
20252041
Event::ProbeFailed { .. } => { sender_ref.send(event).await.unwrap() },
20262042
_ => panic!("Unexpected event: {:?}", event),
20272043
}
2044+
Ok(())
20282045
}
20292046
};
20302047

lightning-invoice/src/utils.rs

+1
Original file line numberDiff line numberDiff line change
@@ -1391,6 +1391,7 @@ mod test {
13911391
} else {
13921392
other_events.borrow_mut().push(event);
13931393
}
1394+
Ok(())
13941395
};
13951396
nodes[fwd_idx].node.process_pending_events(&forward_event_handler);
13961397
nodes[fwd_idx].node.process_pending_events(&forward_event_handler);

lightning/src/chain/chainmonitor.rs

+3-4
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,7 @@ use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance
3333
use crate::chain::transaction::{OutPoint, TransactionData};
3434
use crate::ln::types::ChannelId;
3535
use crate::sign::ecdsa::EcdsaChannelSigner;
36-
use crate::events;
37-
use crate::events::{Event, EventHandler};
36+
use crate::events::{self, Event, EventHandler, ReplayEvent};
3837
use crate::util::logger::{Logger, WithContext};
3938
use crate::util::errors::APIError;
4039
use crate::util::wakers::{Future, Notifier};
@@ -533,7 +532,7 @@ where C::Target: chain::Filter,
533532
pub fn get_and_clear_pending_events(&self) -> Vec<events::Event> {
534533
use crate::events::EventsProvider;
535534
let events = core::cell::RefCell::new(Vec::new());
536-
let event_handler = |event: events::Event| events.borrow_mut().push(event);
535+
let event_handler = |event: events::Event| Ok(events.borrow_mut().push(event));
537536
self.process_pending_events(&event_handler);
538537
events.into_inner()
539538
}
@@ -544,7 +543,7 @@ where C::Target: chain::Filter,
544543
/// See the trait-level documentation of [`EventsProvider`] for requirements.
545544
///
546545
/// [`EventsProvider`]: crate::events::EventsProvider
547-
pub async fn process_pending_events_async<Future: core::future::Future, H: Fn(Event) -> Future>(
546+
pub async fn process_pending_events_async<Future: core::future::Future<Output = Result<(),ReplayEvent>>, H: Fn(Event) -> Future>(
548547
&self, handler: H
549548
) {
550549
// Sadly we can't hold the monitors read lock through an async call. Thus we have to do a

lightning/src/chain/channelmonitor.rs

+24-7
Original file line numberDiff line numberDiff line change
@@ -1170,19 +1170,36 @@ macro_rules! _process_events_body {
11701170
pending_events = inner.pending_events.clone();
11711171
repeated_events = inner.get_repeated_events();
11721172
} else { break; }
1173-
let num_events = pending_events.len();
11741173

1175-
for event in pending_events.into_iter().chain(repeated_events.into_iter()) {
1174+
let mut num_handled_events = 0;
1175+
let mut handling_failed = false;
1176+
for event in pending_events.into_iter() {
11761177
$event_to_handle = event;
1177-
$handle_event;
1178+
match $handle_event {
1179+
Ok(()) => num_handled_events += 1,
1180+
Err(_) => {
1181+
// If we encounter an error we stop handling events and make sure to replay
1182+
// any unhandled events on the next invocation.
1183+
handling_failed = true;
1184+
break;
1185+
}
1186+
}
1187+
}
1188+
1189+
for event in repeated_events.into_iter() {
1190+
// For repeated events we ignore any errors as they will be replayed eventually
1191+
// anyways.
1192+
$event_to_handle = event;
1193+
$handle_event.ok();
11781194
}
11791195

11801196
if let Some(us) = $self_opt {
11811197
let mut inner = us.inner.lock().unwrap();
1182-
inner.pending_events.drain(..num_events);
1198+
inner.pending_events.drain(..num_handled_events);
11831199
inner.is_processing_pending_events = false;
1184-
if !inner.pending_events.is_empty() {
1185-
// If there's more events to process, go ahead and do so.
1200+
if !handling_failed && !inner.pending_events.is_empty() {
1201+
// If there's more events to process and we didn't fail so far, go ahead and do
1202+
// so.
11861203
continue;
11871204
}
11881205
}
@@ -1508,7 +1525,7 @@ impl<Signer: EcdsaChannelSigner> ChannelMonitor<Signer> {
15081525
/// Processes any events asynchronously.
15091526
///
15101527
/// See [`Self::process_pending_events`] for more information.
1511-
pub async fn process_pending_events_async<Future: core::future::Future, H: Fn(Event) -> Future>(
1528+
pub async fn process_pending_events_async<Future: core::future::Future<Output = Result<(), ()>>, H: Fn(Event) -> Future>(
15121529
&self, handler: &H
15131530
) {
15141531
let mut ev;

lightning/src/events/mod.rs

+15-7
Original file line numberDiff line numberDiff line change
@@ -2300,8 +2300,10 @@ pub trait MessageSendEventsProvider {
23002300
///
23012301
/// In order to ensure no [`Event`]s are lost, implementors of this trait will persist [`Event`]s
23022302
/// and replay any unhandled events on startup. An [`Event`] is considered handled when
2303-
/// [`process_pending_events`] returns, thus handlers MUST fully handle [`Event`]s and persist any
2304-
/// relevant changes to disk *before* returning.
2303+
/// [`process_pending_events`] returns `Ok(())`, thus handlers MUST fully handle [`Event`]s and
2304+
/// persist any relevant changes to disk *before* returning `Ok(())`. In case of a (e.g.,
2305+
/// persistence failure) implementors should return `Err(ReplayEvent())`, signalling to the
2306+
/// [`EventsProvider`] to replay unhandled events on the next invocation.
23052307
///
23062308
/// Further, because an application may crash between an [`Event`] being handled and the
23072309
/// implementor of this trait being re-serialized, [`Event`] handling must be idempotent - in
@@ -2328,26 +2330,32 @@ pub trait EventsProvider {
23282330
fn process_pending_events<H: Deref>(&self, handler: H) where H::Target: EventHandler;
23292331
}
23302332

2333+
/// An error type that may be returned to LDK in order to safely abort event handling if it can't
2334+
/// currently succeed (e.g., due to a persistence failure).
2335+
///
2336+
/// LDK will ensure the event is persisted and will eventually be replayed.
2337+
pub struct ReplayEvent();
2338+
23312339
/// A trait implemented for objects handling events from [`EventsProvider`].
23322340
///
23332341
/// An async variation also exists for implementations of [`EventsProvider`] that support async
23342342
/// event handling. The async event handler should satisfy the generic bounds: `F:
2335-
/// core::future::Future, H: Fn(Event) -> F`.
2343+
/// core::future::Future<Output = Result<(), ReplayEvent>>, H: Fn(Event) -> F`.
23362344
pub trait EventHandler {
23372345
/// Handles the given [`Event`].
23382346
///
23392347
/// See [`EventsProvider`] for details that must be considered when implementing this method.
2340-
fn handle_event(&self, event: Event);
2348+
fn handle_event(&self, event: Event) -> Result<(), ReplayEvent>;
23412349
}
23422350

2343-
impl<F> EventHandler for F where F: Fn(Event) {
2344-
fn handle_event(&self, event: Event) {
2351+
impl<F> EventHandler for F where F: Fn(Event) -> Result<(), ReplayEvent> {
2352+
fn handle_event(&self, event: Event) -> Result<(), ReplayEvent> {
23452353
self(event)
23462354
}
23472355
}
23482356

23492357
impl<T: EventHandler> EventHandler for Arc<T> {
2350-
fn handle_event(&self, event: Event) {
2358+
fn handle_event(&self, event: Event) -> Result<(), ReplayEvent> {
23512359
self.deref().handle_event(event)
23522360
}
23532361
}

0 commit comments

Comments
 (0)