Skip to content

Commit b5f432a

Browse files
committed
Store OnionMessenger events in different Vecs
In the next commit, `OnionMessenger` events are handled in parallel using rust async. When we do that, we'll want to handle `OnionMessageIntercepted` events prior to `OnionMessagePeerConnected` ones. While we'd generally prefer to handle all events in the order they were generated, if we want to handle them in parallel, we don't want a `OnionMessageIntercepted` event to start being processed, then handle an `OnionMessagePeerConnected` prior to the first completing. This could cause us to store a freshly-intercepted message for a peer in a DB that was just wiped because the peer is now connected. This does run the risk of processing a `OnionMessagePeerConnected` event prior to an `OnionMessageIntercepted` event (because a peer connected, then disconnected, then we received a message for that peer all before any events were handled), that is somewhat less likely and discarding a message in a rare race is better than leaving a message lying around undelivered. Thus, here, we store `OnionMessenger` events in separate `Vec`s which we can pull from in message-type-order.
1 parent bd3e8eb commit b5f432a

File tree

1 file changed

+30
-11
lines changed

1 file changed

+30
-11
lines changed

lightning/src/onion_message/messenger.rs

+30-11
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,12 @@ where
241241
offers_handler: OMH,
242242
custom_handler: CMH,
243243
intercept_messages_for_offline_peers: bool,
244-
pending_events: Mutex<Vec<Event>>,
244+
pending_events: Mutex<PendingEvents>,
245+
}
246+
247+
struct PendingEvents {
248+
intercepted_msgs: Vec<Event>,
249+
peer_connecteds: Vec<Event>,
245250
}
246251

247252
/// [`OnionMessage`]s buffered to be sent.
@@ -963,7 +968,10 @@ where
963968
offers_handler,
964969
custom_handler,
965970
intercept_messages_for_offline_peers,
966-
pending_events: Mutex::new(Vec::new()),
971+
pending_events: Mutex::new(PendingEvents {
972+
intercepted_msgs: Vec::new(),
973+
peer_connecteds: Vec::new(),
974+
}),
967975
}
968976
}
969977

@@ -1135,18 +1143,16 @@ where
11351143
msgs
11361144
}
11371145

1138-
fn enqueue_event(&self, event: Event) {
1146+
fn enqueue_intercepted_event(&self, event: Event) {
11391147
const MAX_EVENTS_BUFFER_SIZE: usize = (1 << 10) * 256;
11401148
let mut pending_events = self.pending_events.lock().unwrap();
1141-
let total_buffered_bytes: usize = pending_events
1142-
.iter()
1143-
.map(|ev| ev.serialized_length())
1144-
.sum();
1149+
let total_buffered_bytes: usize =
1150+
pending_events.intercepted_msgs.iter().map(|ev| ev.serialized_length()).sum();
11451151
if total_buffered_bytes >= MAX_EVENTS_BUFFER_SIZE {
11461152
log_trace!(self.logger, "Dropping event {:?}: buffer full", event);
11471153
return
11481154
}
1149-
pending_events.push(event);
1155+
pending_events.intercepted_msgs.push(event);
11501156
}
11511157
}
11521158

@@ -1193,7 +1199,20 @@ where
11931199
}
11941200
}
11951201
let mut events = Vec::new();
1196-
core::mem::swap(&mut *self.pending_events.lock().unwrap(), &mut events);
1202+
{
1203+
let mut pending_events = self.pending_events.lock().unwrap();
1204+
#[cfg(debug_assertions)] {
1205+
for ev in pending_events.intercepted_msgs.iter() {
1206+
if let Event::OnionMessageIntercepted { .. } = ev {} else { panic!(); }
1207+
}
1208+
for ev in pending_events.peer_connecteds.iter() {
1209+
if let Event::OnionMessagePeerConnected { .. } = ev {} else { panic!(); }
1210+
}
1211+
}
1212+
core::mem::swap(&mut pending_events.intercepted_msgs, &mut events);
1213+
events.append(&mut pending_events.peer_connecteds);
1214+
pending_events.peer_connecteds.shrink_to(10); // Limit total heap usage
1215+
}
11971216
for ev in events {
11981217
handler.handle_event(ev);
11991218
}
@@ -1271,7 +1290,7 @@ where
12711290
log_trace!(logger, "Forwarding an onion message to peer {}", next_node_id);
12721291
},
12731292
_ if self.intercept_messages_for_offline_peers => {
1274-
self.enqueue_event(
1293+
self.enqueue_intercepted_event(
12751294
Event::OnionMessageIntercepted {
12761295
peer_node_id: next_node_id, message: onion_message
12771296
}
@@ -1299,7 +1318,7 @@ where
12991318
.or_insert_with(|| OnionMessageRecipient::ConnectedPeer(VecDeque::new()))
13001319
.mark_connected();
13011320
if self.intercept_messages_for_offline_peers {
1302-
self.enqueue_event(
1321+
self.pending_events.lock().unwrap().peer_connecteds.push(
13031322
Event::OnionMessagePeerConnected { peer_node_id: *their_node_id }
13041323
);
13051324
}

0 commit comments

Comments
 (0)