Skip to content

Commit b839814

Browse files
committed
f Have sync event handler drop the queue lock during event handling
1 parent a18d0e3 commit b839814

File tree

1 file changed

+37
-6
lines changed

1 file changed

+37
-6
lines changed

lightning/src/onion_message/messenger.rs

+37-6
Original file line numberDiff line numberDiff line change
@@ -1472,7 +1472,7 @@ where
14721472
{
14731473
let pending_intercepted_msgs_events = self.pending_intercepted_msgs_events.lock().unwrap();
14741474
intercepted_msgs = pending_intercepted_msgs_events.clone();
1475-
let mut pending_peer_connected_events = self.pending_peer_connected_events.lock().unwrap();
1475+
let pending_peer_connected_events = self.pending_peer_connected_events.lock().unwrap();
14761476
peer_connecteds = pending_peer_connected_events.clone();
14771477
#[cfg(debug_assertions)] {
14781478
for ev in pending_intercepted_msgs_events.iter() {
@@ -1482,14 +1482,45 @@ where
14821482
if let Event::OnionMessagePeerConnected { .. } = ev {} else { panic!(); }
14831483
}
14841484
}
1485-
pending_peer_connected_events.shrink_to(10); // Limit total heap usage
14861485
}
14871486

1488-
let mut res_iter = intercepted_msgs.into_iter().map(|ev| handler.handle_event(ev));
1489-
drop_handled_events_and_abort!(self, res_iter, self.pending_intercepted_msgs_events);
1487+
let mut handling_intercepted_msgs_failed = false;
1488+
let mut num_handled_intercepted_events = 0;
1489+
for ev in intercepted_msgs {
1490+
match handler.handle_event(ev) {
1491+
Ok(()) => num_handled_intercepted_events += 1,
1492+
Err(ReplayEvent ()) => {
1493+
handling_intercepted_msgs_failed = true;
1494+
break;
1495+
}
1496+
}
1497+
}
1498+
1499+
{
1500+
let mut pending_intercepted_msgs_events = self.pending_intercepted_msgs_events.lock().unwrap();
1501+
pending_intercepted_msgs_events.drain(..num_handled_intercepted_events);
1502+
}
14901503

1491-
let mut res_iter = peer_connecteds.into_iter().map(|ev| handler.handle_event(ev));
1492-
drop_handled_events_and_abort!(self, res_iter, self.pending_peer_connected_events);
1504+
if handling_intercepted_msgs_failed {
1505+
self.pending_events_processor.store(false, Ordering::Release);
1506+
return;
1507+
}
1508+
1509+
let mut num_handled_peer_connecteds = 0;
1510+
for ev in peer_connecteds {
1511+
match handler.handle_event(ev) {
1512+
Ok(()) => num_handled_peer_connecteds += 1,
1513+
Err(ReplayEvent ()) => {
1514+
break;
1515+
}
1516+
}
1517+
}
1518+
1519+
{
1520+
let mut pending_peer_connected_events = self.pending_peer_connected_events.lock().unwrap();
1521+
pending_peer_connected_events.drain(..num_handled_peer_connecteds);
1522+
pending_peer_connected_events.shrink_to(10); // Limit total heap usage
1523+
}
14931524

14941525
self.pending_events_processor.store(false, Ordering::Release);
14951526
}

0 commit comments

Comments
 (0)