Skip to content

Commit 3475cf9

Browse files
committed
Add a parallel async event handler to OnionMessenger
This adds an `OnionMessenger::process_pending_events_async` mirroring the same in `ChannelManager`. However, unlike the one in `ChannelManager`, this processes the events in parallel by spawning all futures and using the new `MultiFuturePoller`. Because `OnionMessenger` just generates a stream of messages to store/fetch, we first process all the events to store new messages, `await` them, then process all the events to fetch stored messages, ensuring reordering shouldn't result in lost messages (unless we race with a peer disconnection, which could happen anyway).
1 parent b5f432a commit 3475cf9

File tree

1 file changed

+45
-0
lines changed

1 file changed

+45
-0
lines changed

lightning/src/onion_message/messenger.rs

+45
Original file line numberDiff line numberDiff line change
@@ -1154,6 +1154,51 @@ where
11541154
}
11551155
pending_events.intercepted_msgs.push(event);
11561156
}
1157+
1158+
/// Processes any events asynchronously using the given handler.
1159+
///
1160+
/// Note that the event handler is called in the order each event was generated, however
1161+
/// futures are polled in parallel for some events to allow for parallelism where events do not
1162+
/// have an ordering requirement.
1163+
///
1164+
/// See the trait-level documentation of [`EventsProvider`] for requirements.
1165+
pub async fn process_pending_events_async<Future: core::future::Future<Output = ()> + core::marker::Unpin, H: Fn(Event) -> Future>(
1166+
&self, handler: H
1167+
) {
1168+
let mut intercepted_msgs = Vec::new();
1169+
let mut peer_connecteds = Vec::new();
1170+
{
1171+
let mut pending_events = self.pending_events.lock().unwrap();
1172+
core::mem::swap(&mut pending_events.intercepted_msgs, &mut intercepted_msgs);
1173+
core::mem::swap(&mut pending_events.peer_connecteds, &mut peer_connecteds);
1174+
}
1175+
1176+
let mut futures = Vec::with_capacity(intercepted_msgs.len());
1177+
for (node_id, recipient) in self.message_recipients.lock().unwrap().iter_mut() {
1178+
if let OnionMessageRecipient::PendingConnection(_, addresses, _) = recipient {
1179+
if let Some(addresses) = addresses.take() {
1180+
futures.push(Some(handler(Event::ConnectionNeeded { node_id: *node_id, addresses })));
1181+
}
1182+
}
1183+
}
1184+
1185+
for ev in intercepted_msgs {
1186+
if let Event::OnionMessageIntercepted { .. } = ev {} else { debug_assert!(false); }
1187+
futures.push(Some(handler(ev)));
1188+
}
1189+
// Let the `OnionMessageIntercepted` events finish before moving on to peer_connecteds
1190+
crate::util::async_poll::MultiFuturePoller(futures).await;
1191+
1192+
if peer_connecteds.len() <= 1 {
1193+
for event in peer_connecteds { handler(event).await; }
1194+
} else {
1195+
let mut futures = Vec::new();
1196+
for event in peer_connecteds {
1197+
futures.push(Some(handler(event)));
1198+
}
1199+
crate::util::async_poll::MultiFuturePoller(futures).await;
1200+
}
1201+
}
11571202
}
11581203

11591204
fn outbound_buffer_full(peer_node_id: &PublicKey, buffer: &HashMap<PublicKey, OnionMessageRecipient>) -> bool {

0 commit comments

Comments
 (0)