Skip to content

Commit 44e1b78

Browse files
committed
Hold sep. Mutexes for pending intercepted_msgs/peer_connected events
This is a minor refactor that will allow us to access the individual event queue Mutexes separately, allowing us to drop the locks earlier when processing them individually.
1 parent c64b8e4 commit 44e1b78

File tree

1 file changed

+25
-24
lines changed

1 file changed

+25
-24
lines changed

lightning/src/onion_message/messenger.rs

Lines changed: 25 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -260,12 +260,8 @@ pub struct OnionMessenger<
260260
async_payments_handler: APH,
261261
custom_handler: CMH,
262262
intercept_messages_for_offline_peers: bool,
263-
pending_events: Mutex<PendingEvents>,
264-
}
265-
266-
struct PendingEvents {
267-
intercepted_msgs: Vec<Event>,
268-
peer_connecteds: Vec<Event>,
263+
pending_intercepted_msgs_events: Mutex<Vec<Event>>,
264+
pending_peer_connected_events: Mutex<Vec<Event>>,
269265
}
270266

271267
/// [`OnionMessage`]s buffered to be sent.
@@ -1082,10 +1078,8 @@ where
10821078
async_payments_handler,
10831079
custom_handler,
10841080
intercept_messages_for_offline_peers,
1085-
pending_events: Mutex::new(PendingEvents {
1086-
intercepted_msgs: Vec::new(),
1087-
peer_connecteds: Vec::new(),
1088-
}),
1081+
pending_intercepted_msgs_events: Mutex::new(Vec::new()),
1082+
pending_peer_connected_events: Mutex::new(Vec::new()),
10891083
}
10901084
}
10911085

@@ -1305,14 +1299,15 @@ where
13051299

13061300
fn enqueue_intercepted_event(&self, event: Event) {
13071301
const MAX_EVENTS_BUFFER_SIZE: usize = (1 << 10) * 256;
1308-
let mut pending_events = self.pending_events.lock().unwrap();
1309-
let total_buffered_bytes: usize =
1310-
pending_events.intercepted_msgs.iter().map(|ev| ev.serialized_length()).sum();
1302+
let mut pending_intercepted_msgs_events =
1303+
self.pending_intercepted_msgs_events.lock().unwrap();
1304+
let total_buffered_bytes: usize = pending_intercepted_msgs_events.iter()
1305+
.map(|ev| ev.serialized_length()).sum();
13111306
if total_buffered_bytes >= MAX_EVENTS_BUFFER_SIZE {
13121307
log_trace!(self.logger, "Dropping event {:?}: buffer full", event);
13131308
return
13141309
}
1315-
pending_events.intercepted_msgs.push(event);
1310+
pending_intercepted_msgs_events.push(event);
13161311
}
13171312

13181313
/// Processes any events asynchronously using the given handler.
@@ -1328,9 +1323,12 @@ where
13281323
let mut intercepted_msgs = Vec::new();
13291324
let mut peer_connecteds = Vec::new();
13301325
{
1331-
let mut pending_events = self.pending_events.lock().unwrap();
1332-
core::mem::swap(&mut pending_events.intercepted_msgs, &mut intercepted_msgs);
1333-
core::mem::swap(&mut pending_events.peer_connecteds, &mut peer_connecteds);
1326+
let mut pending_intercepted_msgs_events =
1327+
self.pending_intercepted_msgs_events.lock().unwrap();
1328+
let mut pending_peer_connected_events =
1329+
self.pending_peer_connected_events.lock().unwrap();
1330+
core::mem::swap(&mut *pending_intercepted_msgs_events, &mut intercepted_msgs);
1331+
core::mem::swap(&mut *pending_peer_connected_events, &mut peer_connecteds);
13341332
}
13351333

13361334
let mut futures = Vec::with_capacity(intercepted_msgs.len());
@@ -1406,18 +1404,19 @@ where
14061404
}
14071405
let mut events = Vec::new();
14081406
{
1409-
let mut pending_events = self.pending_events.lock().unwrap();
1407+
let mut pending_intercepted_msgs_events = self.pending_intercepted_msgs_events.lock().unwrap();
1408+
let mut pending_peer_connected_events = self.pending_peer_connected_events.lock().unwrap();
14101409
#[cfg(debug_assertions)] {
1411-
for ev in pending_events.intercepted_msgs.iter() {
1410+
for ev in pending_intercepted_msgs_events.iter() {
14121411
if let Event::OnionMessageIntercepted { .. } = ev {} else { panic!(); }
14131412
}
1414-
for ev in pending_events.peer_connecteds.iter() {
1413+
for ev in pending_peer_connected_events.iter() {
14151414
if let Event::OnionMessagePeerConnected { .. } = ev {} else { panic!(); }
14161415
}
14171416
}
1418-
core::mem::swap(&mut pending_events.intercepted_msgs, &mut events);
1419-
events.append(&mut pending_events.peer_connecteds);
1420-
pending_events.peer_connecteds.shrink_to(10); // Limit total heap usage
1417+
core::mem::swap(&mut *pending_intercepted_msgs_events, &mut events);
1418+
events.append(&mut pending_peer_connected_events);
1419+
pending_peer_connected_events.shrink_to(10); // Limit total heap usage
14211420
}
14221421
for ev in events {
14231422
handler.handle_event(ev);
@@ -1533,7 +1532,9 @@ where
15331532
.or_insert_with(|| OnionMessageRecipient::ConnectedPeer(VecDeque::new()))
15341533
.mark_connected();
15351534
if self.intercept_messages_for_offline_peers {
1536-
self.pending_events.lock().unwrap().peer_connecteds.push(
1535+
let mut pending_peer_connected_events =
1536+
self.pending_peer_connected_events.lock().unwrap();
1537+
pending_peer_connected_events.push(
15371538
Event::OnionMessagePeerConnected { peer_node_id: *their_node_id }
15381539
);
15391540
}

0 commit comments

Comments
 (0)