@@ -1020,21 +1020,25 @@ where
1020
1020
}
1021
1021
}
1022
1022
1023
- macro_rules! drop_handled_events_and_abort { ( $self: expr, $res : expr , $offset : expr, $event_queue: expr) => {
1023
+ macro_rules! drop_handled_events_and_abort { ( $self: expr, $res_iter : expr, $event_queue: expr) => {
1024
1024
// We want to make sure to cleanly abort upon event handling failure. To this end, we drop all
1025
1025
// successfully handled events from the given queue, reset the events processing flag, and
1026
1026
// return, to have the events eventually replayed upon next invocation.
1027
1027
{
1028
1028
let mut queue_lock = $event_queue. lock( ) . unwrap( ) ;
1029
1029
1030
- // We skip `$offset` result entries to reach the ones relevant for the given `$event_queue`.
1031
- let mut res_iter = $res. iter( ) . skip( $offset) ;
1032
-
1033
1030
// Keep all events which previously error'd *or* any that have been added since we dropped
1034
1031
// the Mutex before.
1035
- queue_lock. retain( |_| res_iter. next( ) . map_or( true , |r| r. is_err( ) ) ) ;
1032
+ let mut any_error = false ;
1033
+ queue_lock. retain( |_| {
1034
+ $res_iter. next( ) . map_or( true , |r| {
1035
+ let is_err = r. is_err( ) ;
1036
+ any_error |= is_err;
1037
+ is_err
1038
+ } )
1039
+ } ) ;
1036
1040
1037
- if $res . iter ( ) . any ( |r| r . is_err ( ) ) {
1041
+ if any_error {
1038
1042
// We failed handling some events. Return to have them eventually replayed.
1039
1043
$self. pending_events_processor. store( false , Ordering :: Release ) ;
1040
1044
return ;
@@ -1384,7 +1388,8 @@ where
1384
1388
}
1385
1389
// Let the `OnionMessageIntercepted` events finish before moving on to peer_connecteds
1386
1390
let res = MultiResultFuturePoller :: new ( futures) . await ;
1387
- drop_handled_events_and_abort ! ( self , res, intercepted_msgs_offset, self . pending_intercepted_msgs_events) ;
1391
+ let mut res_iter = res. iter ( ) . skip ( intercepted_msgs_offset) ;
1392
+ drop_handled_events_and_abort ! ( self , res_iter, self . pending_intercepted_msgs_events) ;
1388
1393
}
1389
1394
1390
1395
{
@@ -1407,7 +1412,8 @@ where
1407
1412
futures. push ( future) ;
1408
1413
}
1409
1414
let res = MultiResultFuturePoller :: new ( futures) . await ;
1410
- drop_handled_events_and_abort ! ( self , res, 0 , self . pending_peer_connected_events) ;
1415
+ let mut res_iter = res. iter ( ) ;
1416
+ drop_handled_events_and_abort ! ( self , res_iter, self . pending_peer_connected_events) ;
1411
1417
}
1412
1418
}
1413
1419
self . pending_events_processor . store ( false , Ordering :: Release ) ;
@@ -1466,7 +1472,7 @@ where
1466
1472
{
1467
1473
let pending_intercepted_msgs_events = self . pending_intercepted_msgs_events . lock ( ) . unwrap ( ) ;
1468
1474
intercepted_msgs = pending_intercepted_msgs_events. clone ( ) ;
1469
- let mut pending_peer_connected_events = self . pending_peer_connected_events . lock ( ) . unwrap ( ) ;
1475
+ let pending_peer_connected_events = self . pending_peer_connected_events . lock ( ) . unwrap ( ) ;
1470
1476
peer_connecteds = pending_peer_connected_events. clone ( ) ;
1471
1477
#[ cfg( debug_assertions) ] {
1472
1478
for ev in pending_intercepted_msgs_events. iter ( ) {
@@ -1476,14 +1482,45 @@ where
1476
1482
if let Event :: OnionMessagePeerConnected { .. } = ev { } else { panic ! ( ) ; }
1477
1483
}
1478
1484
}
1479
- pending_peer_connected_events. shrink_to ( 10 ) ; // Limit total heap usage
1480
1485
}
1481
1486
1482
- let res = intercepted_msgs. into_iter ( ) . map ( |ev| handler. handle_event ( ev) ) . collect :: < Vec < _ > > ( ) ;
1483
- drop_handled_events_and_abort ! ( self , res, 0 , self . pending_intercepted_msgs_events) ;
1487
+ let mut handling_intercepted_msgs_failed = false ;
1488
+ let mut num_handled_intercepted_events = 0 ;
1489
+ for ev in intercepted_msgs {
1490
+ match handler. handle_event ( ev) {
1491
+ Ok ( ( ) ) => num_handled_intercepted_events += 1 ,
1492
+ Err ( ReplayEvent ( ) ) => {
1493
+ handling_intercepted_msgs_failed = true ;
1494
+ break ;
1495
+ }
1496
+ }
1497
+ }
1498
+
1499
+ {
1500
+ let mut pending_intercepted_msgs_events = self . pending_intercepted_msgs_events . lock ( ) . unwrap ( ) ;
1501
+ pending_intercepted_msgs_events. drain ( ..num_handled_intercepted_events) ;
1502
+ }
1503
+
1504
+ if handling_intercepted_msgs_failed {
1505
+ self . pending_events_processor . store ( false , Ordering :: Release ) ;
1506
+ return ;
1507
+ }
1484
1508
1485
- let res = peer_connecteds. into_iter ( ) . map ( |ev| handler. handle_event ( ev) ) . collect :: < Vec < _ > > ( ) ;
1486
- drop_handled_events_and_abort ! ( self , res, 0 , self . pending_peer_connected_events) ;
1509
+ let mut num_handled_peer_connecteds = 0 ;
1510
+ for ev in peer_connecteds {
1511
+ match handler. handle_event ( ev) {
1512
+ Ok ( ( ) ) => num_handled_peer_connecteds += 1 ,
1513
+ Err ( ReplayEvent ( ) ) => {
1514
+ break ;
1515
+ }
1516
+ }
1517
+ }
1518
+
1519
+ {
1520
+ let mut pending_peer_connected_events = self . pending_peer_connected_events . lock ( ) . unwrap ( ) ;
1521
+ pending_peer_connected_events. drain ( ..num_handled_peer_connecteds) ;
1522
+ pending_peer_connected_events. shrink_to ( 10 ) ; // Limit total heap usage
1523
+ }
1487
1524
1488
1525
self . pending_events_processor . store ( false , Ordering :: Release ) ;
1489
1526
}
0 commit comments