Skip to content

Commit 3a3a45c

Browse files
committed
Add a parallel async event handler to OnionMessenger
This adds an `OnionMessenger::process_pending_events_async` mirroring the same in `ChannelManager`. However, unlike the one in `ChannelManager`, this processes the events in parallel by spawning all futures and using the new `MultiFuturePoller`. Because `OnionMessenger` just generates a stream of messages to store/fetch, we first process all the events to store new messages, `await` them, then process all the events to fetch stored messages, ensuring reordering shouldn't result in lost messages (unless we race with a peer disconnection, which could happen anyway).
1 parent 98022e6 commit 3a3a45c

File tree

1 file changed

+54
-4
lines changed

1 file changed

+54
-4
lines changed

lightning/src/onion_message/messenger.rs

+54-4
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ where
241241
offers_handler: OMH,
242242
custom_handler: CMH,
243243
intercept_messages_for_offline_peers: bool,
244-
pending_events: Mutex<Vec<Event>>,
244+
pending_events: Mutex<VecDeque<Event>>,
245245
}
246246

247247
/// [`OnionMessage`]s buffered to be sent.
@@ -963,7 +963,7 @@ where
963963
offers_handler,
964964
custom_handler,
965965
intercept_messages_for_offline_peers,
966-
pending_events: Mutex::new(Vec::new()),
966+
pending_events: Mutex::new(VecDeque::new()),
967967
}
968968
}
969969

@@ -1146,7 +1146,57 @@ where
11461146
log_trace!(self.logger, "Dropping event {:?}: buffer full", event);
11471147
return
11481148
}
1149-
pending_events.push(event);
1149+
pending_events.push_back(event);
1150+
}
1151+
1152+
/// Processes any events asynchronously using the given handler.
1153+
///
1154+
/// Note that the event handler is called in the order each event was generated, however
1155+
/// futures are polled in parallel for some events to allow for parallelism where events do not
1156+
/// have an ordering requirement.
1157+
///
1158+
/// See the trait-level documentation of [`EventsProvider`] for requirements.
1159+
pub async fn process_pending_events_async<Future: core::future::Future<Output = ()> + core::marker::Unpin, H: Fn(Event) -> Future>(
1160+
&self, handler: H
1161+
) {
1162+
let mut events = VecDeque::new();
1163+
core::mem::swap(&mut *self.pending_events.lock().unwrap(), &mut events);
1164+
let mut futures = Vec::with_capacity(events.len());
1165+
for (node_id, recipient) in self.message_recipients.lock().unwrap().iter_mut() {
1166+
if let OnionMessageRecipient::PendingConnection(_, addresses, _) = recipient {
1167+
if let Some(addresses) = addresses.take() {
1168+
futures.push(Some(handler(Event::ConnectionNeeded { node_id: *node_id, addresses })));
1169+
}
1170+
}
1171+
}
1172+
// We process events in parallel, but we want to complete `OnionMessageIntercepted` events
1173+
// prior to `OnionMessagePeerConnected` ones.
1174+
for _ in 0..events.len() {
1175+
let next_event = events.pop_front();
1176+
match next_event {
1177+
Some(Event::OnionMessageIntercepted { .. }) => {
1178+
futures.push(Some(handler(next_event.unwrap())));
1179+
},
1180+
Some(Event::OnionMessagePeerConnected { .. }) => {
1181+
events.push_back(next_event.unwrap());
1182+
},
1183+
None => debug_assert!(false, "We only iterate events.len() times"),
1184+
Some(_) => {
1185+
debug_assert!(false, "Decide on ordering for event type {:?}", next_event);
1186+
events.push_back(next_event.unwrap());
1187+
},
1188+
}
1189+
}
1190+
crate::util::async_poll::MultiFuturePoller(futures).await;
1191+
if events.len() <= 1 {
1192+
for event in events { handler(event).await; }
1193+
} else {
1194+
let mut futures = Vec::new();
1195+
for event in events {
1196+
futures.push(Some(handler(event)));
1197+
}
1198+
crate::util::async_poll::MultiFuturePoller(futures).await;
1199+
}
11501200
}
11511201
}
11521202

@@ -1192,7 +1242,7 @@ where
11921242
}
11931243
}
11941244
}
1195-
let mut events = Vec::new();
1245+
let mut events = VecDeque::new();
11961246
core::mem::swap(&mut *self.pending_events.lock().unwrap(), &mut events);
11971247
for ev in events {
11981248
handler.handle_event(ev);

0 commit comments

Comments
 (0)