@@ -1047,21 +1047,25 @@ where
1047
1047
}
1048
1048
}
1049
1049
1050
- macro_rules! drop_handled_events_and_abort { ( $self: expr, $res : expr , $offset : expr, $event_queue: expr) => {
1050
+ macro_rules! drop_handled_events_and_abort { ( $self: expr, $res_iter : expr, $event_queue: expr) => {
1051
1051
// We want to make sure to cleanly abort upon event handling failure. To this end, we drop all
1052
1052
// successfully handled events from the given queue, reset the events processing flag, and
1053
1053
// return, to have the events eventually replayed upon next invocation.
1054
1054
{
1055
1055
let mut queue_lock = $event_queue. lock( ) . unwrap( ) ;
1056
1056
1057
- // We skip `$offset` result entries to reach the ones relevant for the given `$event_queue`.
1058
- let mut res_iter = $res. iter( ) . skip( $offset) ;
1059
-
1060
1057
// Keep all events which previously error'd *or* any that have been added since we dropped
1061
1058
// the Mutex before.
1062
- queue_lock. retain( |_| res_iter. next( ) . map_or( true , |r| r. is_err( ) ) ) ;
1059
+ let mut any_error = false ;
1060
+ queue_lock. retain( |_| {
1061
+ $res_iter. next( ) . map_or( true , |r| {
1062
+ let is_err = r. is_err( ) ;
1063
+ any_error |= is_err;
1064
+ is_err
1065
+ } )
1066
+ } ) ;
1063
1067
1064
- if $res . iter ( ) . any ( |r| r . is_err ( ) ) {
1068
+ if any_error {
1065
1069
// We failed handling some events. Return to have them eventually replayed.
1066
1070
$self. pending_events_processor. store( false , Ordering :: Release ) ;
1067
1071
$self. event_notifier. notify( ) ;
@@ -1426,7 +1430,8 @@ where
1426
1430
}
1427
1431
// Let the `OnionMessageIntercepted` events finish before moving on to peer_connecteds
1428
1432
let res = MultiResultFuturePoller :: new ( futures) . await ;
1429
- drop_handled_events_and_abort ! ( self , res, intercepted_msgs_offset, self . pending_intercepted_msgs_events) ;
1433
+ let mut res_iter = res. iter ( ) . skip ( intercepted_msgs_offset) ;
1434
+ drop_handled_events_and_abort ! ( self , res_iter, self . pending_intercepted_msgs_events) ;
1430
1435
}
1431
1436
1432
1437
{
@@ -1449,7 +1454,8 @@ where
1449
1454
futures. push ( future) ;
1450
1455
}
1451
1456
let res = MultiResultFuturePoller :: new ( futures) . await ;
1452
- drop_handled_events_and_abort ! ( self , res, 0 , self . pending_peer_connected_events) ;
1457
+ let mut res_iter = res. iter ( ) ;
1458
+ drop_handled_events_and_abort ! ( self , res_iter, self . pending_peer_connected_events) ;
1453
1459
}
1454
1460
}
1455
1461
self . pending_events_processor . store ( false , Ordering :: Release ) ;
@@ -1508,7 +1514,7 @@ where
1508
1514
{
1509
1515
let pending_intercepted_msgs_events = self . pending_intercepted_msgs_events . lock ( ) . unwrap ( ) ;
1510
1516
intercepted_msgs = pending_intercepted_msgs_events. clone ( ) ;
1511
- let mut pending_peer_connected_events = self . pending_peer_connected_events . lock ( ) . unwrap ( ) ;
1517
+ let pending_peer_connected_events = self . pending_peer_connected_events . lock ( ) . unwrap ( ) ;
1512
1518
peer_connecteds = pending_peer_connected_events. clone ( ) ;
1513
1519
#[ cfg( debug_assertions) ] {
1514
1520
for ev in pending_intercepted_msgs_events. iter ( ) {
@@ -1518,14 +1524,47 @@ where
1518
1524
if let Event :: OnionMessagePeerConnected { .. } = ev { } else { panic ! ( ) ; }
1519
1525
}
1520
1526
}
1521
- pending_peer_connected_events. shrink_to ( 10 ) ; // Limit total heap usage
1522
1527
}
1523
1528
1524
- let res = intercepted_msgs. into_iter ( ) . map ( |ev| handler. handle_event ( ev) ) . collect :: < Vec < _ > > ( ) ;
1525
- drop_handled_events_and_abort ! ( self , res, 0 , self . pending_intercepted_msgs_events) ;
1529
+ let mut handling_intercepted_msgs_failed = false ;
1530
+ let mut num_handled_intercepted_events = 0 ;
1531
+ for ev in intercepted_msgs {
1532
+ match handler. handle_event ( ev) {
1533
+ Ok ( ( ) ) => num_handled_intercepted_events += 1 ,
1534
+ Err ( ReplayEvent ( ) ) => {
1535
+ handling_intercepted_msgs_failed = true ;
1536
+ break ;
1537
+ }
1538
+ }
1539
+ }
1540
+
1541
+ {
1542
+ let mut pending_intercepted_msgs_events = self . pending_intercepted_msgs_events . lock ( ) . unwrap ( ) ;
1543
+ pending_intercepted_msgs_events. drain ( ..num_handled_intercepted_events) ;
1544
+ }
1526
1545
1527
- let res = peer_connecteds. into_iter ( ) . map ( |ev| handler. handle_event ( ev) ) . collect :: < Vec < _ > > ( ) ;
1528
- drop_handled_events_and_abort ! ( self , res, 0 , self . pending_peer_connected_events) ;
1546
+ if handling_intercepted_msgs_failed {
1547
+ self . pending_events_processor . store ( false , Ordering :: Release ) ;
1548
+ self . event_notifier . notify ( ) ;
1549
+ return ;
1550
+ }
1551
+
1552
+ let mut num_handled_peer_connecteds = 0 ;
1553
+ for ev in peer_connecteds {
1554
+ match handler. handle_event ( ev) {
1555
+ Ok ( ( ) ) => num_handled_peer_connecteds += 1 ,
1556
+ Err ( ReplayEvent ( ) ) => {
1557
+ self . event_notifier . notify ( ) ;
1558
+ break ;
1559
+ }
1560
+ }
1561
+ }
1562
+
1563
+ {
1564
+ let mut pending_peer_connected_events = self . pending_peer_connected_events . lock ( ) . unwrap ( ) ;
1565
+ pending_peer_connected_events. drain ( ..num_handled_peer_connecteds) ;
1566
+ pending_peer_connected_events. shrink_to ( 10 ) ; // Limit total heap usage
1567
+ }
1529
1568
1530
1569
self . pending_events_processor . store ( false , Ordering :: Release ) ;
1531
1570
}
0 commit comments