Skip to content

Commit d4f98f0

Browse files
committed
Rename MultiFuturePoller and let it return concrete results
1 parent a7e4311 commit d4f98f0

File tree

3 files changed

+54
-22
lines changed

3 files changed

+54
-22
lines changed

lightning/src/events/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -2334,6 +2334,7 @@ pub trait EventsProvider {
23342334
/// currently succeed (e.g., due to a persistence failure).
23352335
///
23362336
/// LDK will ensure the event is persisted and will eventually be replayed.
2337+
#[derive(Clone, Debug)]
23372338
pub struct ReplayEvent();
23382339

23392340
/// A trait implemented for objects handling events from [`EventsProvider`].

lightning/src/onion_message/messenger.rs

+7-7
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use bitcoin::secp256k1::{self, PublicKey, Scalar, Secp256k1, SecretKey};
1818
use crate::blinded_path::{BlindedPath, IntroductionNode, NextMessageHop, NodeIdLookUp};
1919
use crate::blinded_path::message::{advance_path_by_one, ForwardNode, ForwardTlvs, ReceiveTlvs};
2020
use crate::blinded_path::utils;
21-
use crate::events::{Event, EventHandler, EventsProvider};
21+
use crate::events::{Event, EventHandler, EventsProvider, ReplayEvent};
2222
use crate::sign::{EntropySource, NodeSigner, Recipient};
2323
use crate::ln::features::{InitFeatures, NodeFeatures};
2424
use crate::ln::msgs::{self, OnionMessage, OnionMessageHandler, SocketAddress};
@@ -1317,7 +1317,7 @@ where
13171317
/// have an ordering requirement.
13181318
///
13191319
/// See the trait-level documentation of [`EventsProvider`] for requirements.
1320-
pub async fn process_pending_events_async<Future: core::future::Future<Output = Result<(), ()>> + core::marker::Unpin, H: Fn(Event) -> Future>(
1320+
pub async fn process_pending_events_async<Future: core::future::Future<Output = Result<(), ReplayEvent>> + core::marker::Unpin, H: Fn(Event) -> Future>(
13211321
&self, handler: H
13221322
) {
13231323
let mut intercepted_msgs = Vec::new();
@@ -1335,26 +1335,26 @@ where
13351335
for (node_id, recipient) in self.message_recipients.lock().unwrap().iter_mut() {
13361336
if let OnionMessageRecipient::PendingConnection(_, addresses, _) = recipient {
13371337
if let Some(addresses) = addresses.take() {
1338-
futures.push(Some(handler(Event::ConnectionNeeded { node_id: *node_id, addresses })));
1338+
futures.push(handler(Event::ConnectionNeeded { node_id: *node_id, addresses }));
13391339
}
13401340
}
13411341
}
13421342

13431343
for ev in intercepted_msgs {
13441344
if let Event::OnionMessageIntercepted { .. } = ev {} else { debug_assert!(false); }
1345-
futures.push(Some(handler(ev)));
1345+
futures.push(handler(ev));
13461346
}
13471347
// Let the `OnionMessageIntercepted` events finish before moving on to peer_connecteds
1348-
crate::util::async_poll::MultiFuturePoller(futures).await;
1348+
crate::util::async_poll::MultiEventFuturePoller::new(futures).await;
13491349

13501350
if peer_connecteds.len() <= 1 {
13511351
for event in peer_connecteds { handler(event).await; }
13521352
} else {
13531353
let mut futures = Vec::new();
13541354
for event in peer_connecteds {
1355-
futures.push(Some(handler(event)));
1355+
futures.push(handler(event));
13561356
}
1357-
crate::util::async_poll::MultiFuturePoller(futures).await;
1357+
crate::util::async_poll::MultiEventFuturePoller::new(futures).await;
13581358
}
13591359
}
13601360
}

lightning/src/util/async_poll.rs

+46-15
Original file line numberDiff line numberDiff line change
@@ -9,35 +9,66 @@
99

1010
//! Some utilities to make working with the standard library's [`Future`]s easier
1111
12+
use crate::events::ReplayEvent;
1213
use crate::prelude::*;
1314
use core::future::Future;
1415
use core::marker::Unpin;
1516
use core::pin::Pin;
1617
use core::task::{Context, Poll};
1718

18-
pub(crate) struct MultiFuturePoller<F: Future<Output = ()> + Unpin>(pub Vec<Option<F>>);
19+
enum EventFuture<F: Future<Output = Result<(), ReplayEvent>>> {
20+
Pending(F),
21+
Ready(Result<(), ReplayEvent>),
22+
}
23+
24+
pub(crate) struct MultiEventFuturePoller<F: Future<Output = Result<(), ReplayEvent>> + Unpin> {
25+
futures_state: Vec<EventFuture<F>>,
26+
}
1927

20-
impl<F: Future<Output = ()> + Unpin> Future for MultiFuturePoller<F> {
21-
type Output = ();
22-
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
28+
impl<F: Future<Output = Result<(), ReplayEvent>> + Unpin> MultiEventFuturePoller<F> {
29+
pub fn new(futures: Vec<F>) -> Self {
30+
let futures_state = futures.into_iter().map(|f| EventFuture::Pending(f)).collect();
31+
Self { futures_state }
32+
}
33+
}
34+
35+
impl<F: Future<Output = Result<(), ReplayEvent>> + Unpin> Future for MultiEventFuturePoller<F> {
36+
type Output = Vec<Result<(), ReplayEvent>>;
37+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Vec<Result<(), ReplayEvent>>> {
2338
let mut have_pending_futures = false;
24-
for fut_option in self.get_mut().0.iter_mut() {
25-
let mut fut = match fut_option.take() {
26-
None => continue,
27-
Some(fut) => fut,
28-
};
29-
match Pin::new(&mut fut).poll(cx) {
30-
Poll::Ready(()) => {},
31-
Poll::Pending => {
32-
have_pending_futures = true;
33-
*fut_option = Some(fut);
39+
let futures_state = &mut self.get_mut().futures_state;
40+
for state in futures_state.iter_mut() {
41+
match state {
42+
EventFuture::Pending(ref mut fut) => match Pin::new(fut).poll(cx) {
43+
Poll::Ready(res) => {
44+
*state = EventFuture::Ready(res);
45+
},
46+
Poll::Pending => {
47+
have_pending_futures = true;
48+
},
3449
},
50+
EventFuture::Ready(_) => continue,
3551
}
3652
}
53+
3754
if have_pending_futures {
3855
Poll::Pending
3956
} else {
40-
Poll::Ready(())
57+
let results = futures_state
58+
.iter()
59+
.filter_map(|e| match e {
60+
EventFuture::Ready(res) => Some(res),
61+
EventFuture::Pending(_) => {
62+
debug_assert!(
63+
false,
64+
"All futures are expected to be ready if none are pending"
65+
);
66+
None
67+
},
68+
})
69+
.cloned()
70+
.collect();
71+
Poll::Ready(results)
4172
}
4273
}
4374
}

0 commit comments

Comments
 (0)