Skip to content

Commit 43a4a04

Browse files
committed
Handle fallible events in OnionMessenger
Previously, we would just fire-and-forget in `OnionMessenger`'s event handling. Since we now introduced the possibility of event handling failures, we here adapt the event handling logic to retain any events which we failed to handle to have them replayed upon the next invocation of `process_pending_events`/`process_pending_events_async`.
1 parent d4f98f0 commit 43a4a04

File tree

1 file changed

+84
-34
lines changed

1 file changed

+84
-34
lines changed

lightning/src/onion_message/messenger.rs

+84-34
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ use crate::util::ser::Writeable;
3636

3737
use core::fmt;
3838
use core::ops::Deref;
39+
use core::sync::atomic::{AtomicBool, Ordering};
3940
use crate::io;
4041
use crate::sync::Mutex;
4142
use crate::prelude::*;
@@ -262,6 +263,7 @@ pub struct OnionMessenger<
262263
intercept_messages_for_offline_peers: bool,
263264
pending_intercepted_msgs_events: Mutex<Vec<Event>>,
264265
pending_peer_connected_events: Mutex<Vec<Event>>,
266+
pending_events_processor: AtomicBool,
265267
}
266268

267269
/// [`OnionMessage`]s buffered to be sent.
@@ -1004,6 +1006,28 @@ where
10041006
}
10051007
}
10061008

1009+
macro_rules! drop_handled_events_and_abort { ($self: expr, $res: expr, $offset: expr, $event_queue: expr) => {
1010+
// We want ot make sure to cleanly abort upon event handling failure. To this end, we drop all
1011+
// successfully handled events from the given queue, reset the events processing flag, and
1012+
// return, to have the events eventually replayed upon next invocation.
1013+
{
1014+
let mut queue_lock = $event_queue.lock().unwrap();
1015+
1016+
// We skip `$offset` result entries to reach the ones relevant for the given `$event_queue`.
1017+
let mut res_iter = $res.iter().skip($offset);
1018+
1019+
// Keep all events which previously error'd *or* any that have been added since we dropped
1020+
// the Mutex before.
1021+
queue_lock.retain(|_| res_iter.next().map_or(true, |r| r.is_err()));
1022+
1023+
if $res.iter().any(|r| r.is_err()) {
1024+
// We failed handling some events. Return to have them eventually replayed.
1025+
$self.pending_events_processor.store(false, Ordering::Release);
1026+
return;
1027+
}
1028+
}
1029+
}}
1030+
10071031
impl<ES: Deref, NS: Deref, L: Deref, NL: Deref, MR: Deref, OMH: Deref, APH: Deref, CMH: Deref>
10081032
OnionMessenger<ES, NS, L, NL, MR, OMH, APH, CMH>
10091033
where
@@ -1080,6 +1104,7 @@ where
10801104
intercept_messages_for_offline_peers,
10811105
pending_intercepted_msgs_events: Mutex::new(Vec::new()),
10821106
pending_peer_connected_events: Mutex::new(Vec::new()),
1107+
pending_events_processor: AtomicBool::new(false),
10831108
}
10841109
}
10851110

@@ -1320,42 +1345,57 @@ where
13201345
pub async fn process_pending_events_async<Future: core::future::Future<Output = Result<(), ReplayEvent>> + core::marker::Unpin, H: Fn(Event) -> Future>(
13211346
&self, handler: H
13221347
) {
1323-
let mut intercepted_msgs = Vec::new();
1324-
let mut peer_connecteds = Vec::new();
1325-
{
1326-
let mut pending_intercepted_msgs_events =
1327-
self.pending_intercepted_msgs_events.lock().unwrap();
1328-
let mut pending_peer_connected_events =
1329-
self.pending_peer_connected_events.lock().unwrap();
1330-
core::mem::swap(&mut *pending_intercepted_msgs_events, &mut intercepted_msgs);
1331-
core::mem::swap(&mut *pending_peer_connected_events, &mut peer_connecteds);
1348+
if self.pending_events_processor.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed).is_err() {
1349+
return;
13321350
}
13331351

1334-
let mut futures = Vec::with_capacity(intercepted_msgs.len());
1335-
for (node_id, recipient) in self.message_recipients.lock().unwrap().iter_mut() {
1336-
if let OnionMessageRecipient::PendingConnection(_, addresses, _) = recipient {
1337-
if let Some(addresses) = addresses.take() {
1338-
futures.push(handler(Event::ConnectionNeeded { node_id: *node_id, addresses }));
1352+
{
1353+
let intercepted_msgs = self.pending_intercepted_msgs_events.lock().unwrap().clone();
1354+
let mut futures = Vec::with_capacity(intercepted_msgs.len());
1355+
for (node_id, recipient) in self.message_recipients.lock().unwrap().iter_mut() {
1356+
if let OnionMessageRecipient::PendingConnection(_, addresses, _) = recipient {
1357+
if let Some(addresses) = addresses.take() {
1358+
futures.push(handler(Event::ConnectionNeeded { node_id: *node_id, addresses }));
1359+
}
13391360
}
13401361
}
1341-
}
13421362

1343-
for ev in intercepted_msgs {
1344-
if let Event::OnionMessageIntercepted { .. } = ev {} else { debug_assert!(false); }
1345-
futures.push(handler(ev));
1363+
// The offset in the `futures` vec at which `intercepted_msgs` start. We don't bother
1364+
// replaying `ConnectionNeeded` events.
1365+
let intercepted_msgs_offset = futures.len();
1366+
1367+
for ev in intercepted_msgs {
1368+
if let Event::OnionMessageIntercepted { .. } = ev {} else { debug_assert!(false); }
1369+
futures.push(handler(ev));
1370+
}
1371+
// Let the `OnionMessageIntercepted` events finish before moving on to peer_connecteds
1372+
let res = crate::util::async_poll::MultiEventFuturePoller::new(futures).await;
1373+
drop_handled_events_and_abort!(self, res, intercepted_msgs_offset, self.pending_intercepted_msgs_events);
13461374
}
1347-
// Let the `OnionMessageIntercepted` events finish before moving on to peer_connecteds
1348-
crate::util::async_poll::MultiEventFuturePoller::new(futures).await;
13491375

1350-
if peer_connecteds.len() <= 1 {
1351-
for event in peer_connecteds { handler(event).await; }
1352-
} else {
1353-
let mut futures = Vec::new();
1354-
for event in peer_connecteds {
1355-
futures.push(handler(event));
1376+
{
1377+
let peer_connecteds = self.pending_peer_connected_events.lock().unwrap().clone();
1378+
let num_peer_connecteds = peer_connecteds.len();
1379+
if num_peer_connecteds <= 1 {
1380+
for event in peer_connecteds {
1381+
if handler(event).await.is_ok() {
1382+
self.pending_peer_connected_events.lock().unwrap().drain(..num_peer_connecteds);
1383+
} else {
1384+
// We failed handling the event. Return to have it eventually replayed.
1385+
self.pending_events_processor.store(false, Ordering::Release);
1386+
return;
1387+
}
1388+
}
1389+
} else {
1390+
let mut futures = Vec::new();
1391+
for event in peer_connecteds {
1392+
futures.push(handler(event));
1393+
}
1394+
let res = crate::util::async_poll::MultiEventFuturePoller::new(futures).await;
1395+
drop_handled_events_and_abort!(self, res, 0, self.pending_peer_connected_events);
13561396
}
1357-
crate::util::async_poll::MultiEventFuturePoller::new(futures).await;
13581397
}
1398+
self.pending_events_processor.store(false, Ordering::Release);
13591399
}
13601400
}
13611401

@@ -1395,17 +1435,24 @@ where
13951435
CMH::Target: CustomOnionMessageHandler,
13961436
{
13971437
fn process_pending_events<H: Deref>(&self, handler: H) where H::Target: EventHandler {
1438+
if self.pending_events_processor.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed).is_err() {
1439+
return;
1440+
}
1441+
13981442
for (node_id, recipient) in self.message_recipients.lock().unwrap().iter_mut() {
13991443
if let OnionMessageRecipient::PendingConnection(_, addresses, _) = recipient {
14001444
if let Some(addresses) = addresses.take() {
14011445
handler.handle_event(Event::ConnectionNeeded { node_id: *node_id, addresses }).ok();
14021446
}
14031447
}
14041448
}
1405-
let mut events = Vec::new();
1449+
let intercepted_msgs;
1450+
let peer_connecteds;
14061451
{
1407-
let mut pending_intercepted_msgs_events = self.pending_intercepted_msgs_events.lock().unwrap();
1452+
let pending_intercepted_msgs_events = self.pending_intercepted_msgs_events.lock().unwrap();
1453+
intercepted_msgs = pending_intercepted_msgs_events.clone();
14081454
let mut pending_peer_connected_events = self.pending_peer_connected_events.lock().unwrap();
1455+
peer_connecteds = pending_peer_connected_events.clone();
14091456
#[cfg(debug_assertions)] {
14101457
for ev in pending_intercepted_msgs_events.iter() {
14111458
if let Event::OnionMessageIntercepted { .. } = ev {} else { panic!(); }
@@ -1414,13 +1461,16 @@ where
14141461
if let Event::OnionMessagePeerConnected { .. } = ev {} else { panic!(); }
14151462
}
14161463
}
1417-
core::mem::swap(&mut *pending_intercepted_msgs_events, &mut events);
1418-
events.append(&mut pending_peer_connected_events);
14191464
pending_peer_connected_events.shrink_to(10); // Limit total heap usage
14201465
}
1421-
for ev in events {
1422-
handler.handle_event(ev);
1423-
}
1466+
1467+
let res = intercepted_msgs.into_iter().map(|ev| handler.handle_event(ev)).collect::<Vec<_>>();
1468+
drop_handled_events_and_abort!(self, res, 0, self.pending_intercepted_msgs_events);
1469+
1470+
let res = peer_connecteds.into_iter().map(|ev| handler.handle_event(ev)).collect::<Vec<_>>();
1471+
drop_handled_events_and_abort!(self, res, 0, self.pending_peer_connected_events);
1472+
1473+
self.pending_events_processor.store(false, Ordering::Release);
14241474
}
14251475
}
14261476

0 commit comments

Comments
 (0)