Skip to content

Commit 3081296

Browse files
committed
f Have cleanup macro take iterator rather than collections
1 parent 5ba12be commit 3081296

File tree

1 file changed

+18
-22
lines changed

1 file changed

+18
-22
lines changed

lightning/src/onion_message/messenger.rs

+18-22
Original file line numberDiff line numberDiff line change
@@ -1020,21 +1020,25 @@ where
10201020
}
10211021
}
10221022

1023-
macro_rules! drop_handled_events_and_abort { ($self: expr, $res: expr, $offset: expr, $event_queue: expr) => {
1023+
macro_rules! drop_handled_events_and_abort { ($self: expr, $res_iter: expr, $event_queue: expr) => {
10241024
// We want to make sure to cleanly abort upon event handling failure. To this end, we drop all
10251025
// successfully handled events from the given queue, reset the events processing flag, and
10261026
// return, to have the events eventually replayed upon next invocation.
10271027
{
10281028
let mut queue_lock = $event_queue.lock().unwrap();
10291029

1030-
// We skip `$offset` result entries to reach the ones relevant for the given `$event_queue`.
1031-
let mut res_iter = $res.iter().skip($offset);
1032-
10331030
// Keep all events which previously error'd *or* any that have been added since we dropped
10341031
// the Mutex before.
1035-
queue_lock.retain(|_| res_iter.next().map_or(true, |r| r.is_err()));
1032+
let mut any_error = false;
1033+
queue_lock.retain(|_| {
1034+
$res_iter.next().map_or(true, |r| {
1035+
let is_err = r.is_err();
1036+
any_error |= is_err;
1037+
is_err
1038+
})
1039+
});
10361040

1037-
if $res.iter().any(|r| r.is_err()) {
1041+
if did_error {
10381042
// We failed handling some events. Return to have them eventually replayed.
10391043
$self.pending_events_processor.store(false, Ordering::Release);
10401044
return;
@@ -1384,7 +1388,8 @@ where
13841388
}
13851389
// Let the `OnionMessageIntercepted` events finish before moving on to peer_connecteds
13861390
let res = MultiResultFuturePoller::new(futures).await;
1387-
drop_handled_events_and_abort!(self, res, intercepted_msgs_offset, self.pending_intercepted_msgs_events);
1391+
let mut res_iter = res.iter().skip(intercepted_msgs_offset);
1392+
drop_handled_events_and_abort!(self, res_iter, self.pending_intercepted_msgs_events);
13881393
}
13891394

13901395
{
@@ -1407,7 +1412,8 @@ where
14071412
futures.push(future);
14081413
}
14091414
let res = MultiResultFuturePoller::new(futures).await;
1410-
drop_handled_events_and_abort!(self, res, 0, self.pending_peer_connected_events);
1415+
let mut res_iter = res.iter();
1416+
drop_handled_events_and_abort!(self, res_iter, self.pending_peer_connected_events);
14111417
}
14121418
}
14131419
self.pending_events_processor.store(false, Ordering::Release);
@@ -1479,21 +1485,11 @@ where
14791485
pending_peer_connected_events.shrink_to(10); // Limit total heap usage
14801486
}
14811487

1482-
if intercepted_msgs.len() == 1 {
1483-
let res = intercepted_msgs.into_iter().next().map(|ev| handler.handle_event(ev));
1484-
drop_handled_events_and_abort!(self, res, 0, self.pending_intercepted_msgs_events);
1485-
} else {
1486-
let res = intercepted_msgs.into_iter().map(|ev| handler.handle_event(ev)).collect::<Vec<_>>();
1487-
drop_handled_events_and_abort!(self, res, 0, self.pending_intercepted_msgs_events);
1488-
};
1488+
let mut res_iter = intercepted_msgs.into_iter().map(|ev| handler.handle_event(ev));
1489+
drop_handled_events_and_abort!(self, res_iter, self.pending_intercepted_msgs_events);
14891490

1490-
if peer_connecteds.len() == 1 {
1491-
let res = peer_connecteds.into_iter().next().map(|ev| handler.handle_event(ev));
1492-
drop_handled_events_and_abort!(self, res, 0, self.pending_peer_connected_events);
1493-
} else {
1494-
let res = peer_connecteds.into_iter().map(|ev| handler.handle_event(ev)).collect::<Vec<_>>();
1495-
drop_handled_events_and_abort!(self, res, 0, self.pending_peer_connected_events);
1496-
}
1491+
let mut res_iter = peer_connecteds.into_iter().map(|ev| handler.handle_event(ev));
1492+
drop_handled_events_and_abort!(self, res_iter, self.pending_peer_connected_events);
14971493

14981494
self.pending_events_processor.store(false, Ordering::Release);
14991495
}

0 commit comments

Comments
 (0)