Skip to content

Commit 283e93b

Browse files
committed
Add a background processing function that is async.
Adds a method which operates like BackgroundProcessor::start but instead of functioning by spawning a background thread it is async.
1 parent c878bb2 commit 283e93b

File tree

3 files changed

+206
-123
lines changed

3 files changed

+206
-123
lines changed

lightning-background-processor/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ rustdoc-args = ["--cfg", "docsrs"]
1717
bitcoin = "0.28.1"
1818
lightning = { version = "0.0.110", path = "../lightning", features = ["std"] }
1919
lightning-rapid-gossip-sync = { version = "0.0.110", path = "../lightning-rapid-gossip-sync" }
20+
futures = { version = "0.3", optional = true }
2021

2122
[dev-dependencies]
2223
lightning = { version = "0.0.110", path = "../lightning", features = ["_test_utils"] }

lightning-background-processor/src/lib.rs

+203-123
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ use std::thread::JoinHandle;
3131
use std::time::{Duration, Instant};
3232
use std::ops::Deref;
3333

34+
#[cfg(feature = "futures")]
35+
use futures::{select, future::FutureExt};
36+
3437
/// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
3538
/// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
3639
/// responsibilities are:
@@ -219,6 +222,203 @@ where A::Target: chain::Access, L::Target: Logger {
219222
}
220223
}
221224

225+
macro_rules! define_run_body {
226+
($persister: ident, $event_handler: ident, $chain_monitor: ident, $channel_manager: ident,
227+
$gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
228+
$loop_exit_check: expr, $await: expr)
229+
=> { {
230+
let event_handler = DecoratingEventHandler {
231+
event_handler: $event_handler,
232+
gossip_sync: &$gossip_sync,
233+
};
234+
235+
log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
236+
$channel_manager.timer_tick_occurred();
237+
238+
let mut last_freshness_call = Instant::now();
239+
let mut last_ping_call = Instant::now();
240+
let mut last_prune_call = Instant::now();
241+
let mut last_scorer_persist_call = Instant::now();
242+
let mut have_pruned = false;
243+
244+
loop {
245+
$channel_manager.process_pending_events(&event_handler);
246+
$chain_monitor.process_pending_events(&event_handler);
247+
248+
// Note that the PeerManager::process_events may block on ChannelManager's locks,
249+
// hence it comes last here. When the ChannelManager finishes whatever it's doing,
250+
// we want to ensure we get into `persist_manager` as quickly as we can, especially
251+
// without running the normal event processing above and handing events to users.
252+
//
253+
// Specifically, on an *extremely* slow machine, we may see ChannelManager start
254+
// processing a message effectively at any point during this loop. In order to
255+
// minimize the time between such processing completing and persisting the updated
256+
// ChannelManager, we want to minimize methods blocking on a ChannelManager
257+
// generally, and as a fallback place such blocking only immediately before
258+
// persistence.
259+
$peer_manager.process_events();
260+
261+
// We wait up to 100ms, but track how long it takes to detect being put to sleep,
262+
// see `await_start`'s use below.
263+
let await_start = Instant::now();
264+
let updates_available = $await;
265+
let await_time = await_start.elapsed();
266+
267+
if updates_available {
268+
log_trace!($logger, "Persisting ChannelManager...");
269+
$persister.persist_manager(&*$channel_manager)?;
270+
log_trace!($logger, "Done persisting ChannelManager.");
271+
}
272+
// Exit the loop if the background processor was requested to stop.
273+
if $loop_exit_check {
274+
log_trace!($logger, "Terminating background processor.");
275+
break;
276+
}
277+
if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
278+
log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
279+
$channel_manager.timer_tick_occurred();
280+
last_freshness_call = Instant::now();
281+
}
282+
if await_time > Duration::from_secs(1) {
283+
// On various platforms, we may be starved of CPU cycles for several reasons.
284+
// E.g. on iOS, if we've been in the background, we will be entirely paused.
285+
// Similarly, if we're on a desktop platform and the device has been asleep, we
286+
// may not get any cycles.
287+
// We detect this by checking if our max-100ms-sleep, above, ran longer than a
288+
// full second, at which point we assume sockets may have been killed (they
289+
// appear to be at least on some platforms, even if it has only been a second).
290+
// Note that we have to take care to not get here just because user event
291+
// processing was slow at the top of the loop. For example, the sample client
292+
// may call Bitcoin Core RPCs during event handling, which very often takes
293+
// more than a handful of seconds to complete, and shouldn't disconnect all our
294+
// peers.
295+
log_trace!($logger, "100ms sleep took more than a second, disconnecting peers.");
296+
$peer_manager.disconnect_all_peers();
297+
last_ping_call = Instant::now();
298+
} else if last_ping_call.elapsed().as_secs() > PING_TIMER {
299+
log_trace!($logger, "Calling PeerManager's timer_tick_occurred");
300+
$peer_manager.timer_tick_occurred();
301+
last_ping_call = Instant::now();
302+
}
303+
304+
// Note that we want to run a graph prune once not long after startup before
305+
// falling back to our usual hourly prunes. This avoids short-lived clients never
306+
// pruning their network graph. We run once 60 seconds after startup before
307+
// continuing our normal cadence.
308+
if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
309+
// The network graph must not be pruned while rapid sync completion is pending
310+
log_trace!($logger, "Assessing prunability of network graph");
311+
if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
312+
network_graph.remove_stale_channels();
313+
314+
if let Err(e) = $persister.persist_graph(network_graph) {
315+
log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
316+
}
317+
318+
last_prune_call = Instant::now();
319+
have_pruned = true;
320+
} else {
321+
log_trace!($logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.");
322+
}
323+
}
324+
325+
if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER {
326+
if let Some(ref scorer) = $scorer {
327+
log_trace!($logger, "Persisting scorer");
328+
if let Err(e) = $persister.persist_scorer(&scorer) {
329+
log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
330+
}
331+
}
332+
last_scorer_persist_call = Instant::now();
333+
}
334+
}
335+
336+
// After we exit, ensure we persist the ChannelManager one final time - this avoids
337+
// some races where users quit while channel updates were in-flight, with
338+
// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
339+
$persister.persist_manager(&*$channel_manager)?;
340+
341+
// Persist Scorer on exit
342+
if let Some(ref scorer) = $scorer {
343+
$persister.persist_scorer(&scorer)?;
344+
}
345+
346+
// Persist NetworkGraph on exit
347+
if let Some(network_graph) = $gossip_sync.network_graph() {
348+
$persister.persist_graph(network_graph)?;
349+
}
350+
351+
Ok(())
352+
} }
353+
}
354+
355+
/// Processes background events in a future.
356+
///
357+
/// `sleeper` should return a future which completes in the given amount of time and returns a
358+
/// boolean indicating whether the background processing should continue. Once `sleeper` returns a
359+
/// future which outputs false, the loop will exit and this function's future will complete.
360+
///
361+
/// See [`BackgroundProcessor::start`] for information on which actions this handles.
362+
#[cfg(feature = "futures")]
363+
pub async fn process_events_async<
364+
'a,
365+
Signer: 'static + Sign,
366+
CA: 'static + Deref + Send + Sync,
367+
CF: 'static + Deref + Send + Sync,
368+
CW: 'static + Deref + Send + Sync,
369+
T: 'static + Deref + Send + Sync,
370+
K: 'static + Deref + Send + Sync,
371+
F: 'static + Deref + Send + Sync,
372+
G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
373+
L: 'static + Deref + Send + Sync,
374+
P: 'static + Deref + Send + Sync,
375+
Descriptor: 'static + SocketDescriptor + Send + Sync,
376+
CMH: 'static + Deref + Send + Sync,
377+
RMH: 'static + Deref + Send + Sync,
378+
EH: 'static + EventHandler + Send,
379+
PS: 'static + Deref + Send,
380+
M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
381+
CM: 'static + Deref<Target = ChannelManager<Signer, CW, T, K, F, L>> + Send + Sync,
382+
PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
383+
RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
384+
UMH: 'static + Deref + Send + Sync,
385+
PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L, UMH>> + Send + Sync,
386+
S: 'static + Deref<Target = SC> + Send + Sync,
387+
SC: WriteableScore<'a>,
388+
SleepFuture: core::future::Future<Output = bool>,
389+
Sleeper: Fn(Duration) -> SleepFuture
390+
>(
391+
persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
392+
gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
393+
sleeper: Sleeper,
394+
) -> Result<(), std::io::Error>
395+
where
396+
CA::Target: 'static + chain::Access,
397+
CF::Target: 'static + chain::Filter,
398+
CW::Target: 'static + chain::Watch<Signer>,
399+
T::Target: 'static + BroadcasterInterface,
400+
K::Target: 'static + KeysInterface<Signer = Signer>,
401+
F::Target: 'static + FeeEstimator,
402+
L::Target: 'static + Logger,
403+
P::Target: 'static + Persist<Signer>,
404+
CMH::Target: 'static + ChannelMessageHandler,
405+
RMH::Target: 'static + RoutingMessageHandler,
406+
UMH::Target: 'static + CustomMessageHandler,
407+
PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
408+
{
409+
let mut should_continue = true;
410+
define_run_body!(persister, event_handler, chain_monitor, channel_manager,
411+
gossip_sync, peer_manager, logger, scorer, should_continue, {
412+
select! {
413+
_ = channel_manager.get_persistable_update_future().fuse() => true,
414+
cont = sleeper(Duration::from_millis(100)).fuse() => {
415+
should_continue = cont;
416+
false
417+
}
418+
}
419+
})
420+
}
421+
222422
impl BackgroundProcessor {
223423
/// Start a background thread that takes care of responsibilities enumerated in the [top-level
224424
/// documentation].
@@ -310,129 +510,9 @@ impl BackgroundProcessor {
310510
let stop_thread = Arc::new(AtomicBool::new(false));
311511
let stop_thread_clone = stop_thread.clone();
312512
let handle = thread::spawn(move || -> Result<(), std::io::Error> {
313-
let event_handler = DecoratingEventHandler {
314-
event_handler,
315-
gossip_sync: &gossip_sync,
316-
};
317-
318-
log_trace!(logger, "Calling ChannelManager's timer_tick_occurred on startup");
319-
channel_manager.timer_tick_occurred();
320-
321-
let mut last_freshness_call = Instant::now();
322-
let mut last_ping_call = Instant::now();
323-
let mut last_prune_call = Instant::now();
324-
let mut last_scorer_persist_call = Instant::now();
325-
let mut have_pruned = false;
326-
327-
loop {
328-
channel_manager.process_pending_events(&event_handler);
329-
chain_monitor.process_pending_events(&event_handler);
330-
331-
// Note that the PeerManager::process_events may block on ChannelManager's locks,
332-
// hence it comes last here. When the ChannelManager finishes whatever it's doing,
333-
// we want to ensure we get into `persist_manager` as quickly as we can, especially
334-
// without running the normal event processing above and handing events to users.
335-
//
336-
// Specifically, on an *extremely* slow machine, we may see ChannelManager start
337-
// processing a message effectively at any point during this loop. In order to
338-
// minimize the time between such processing completing and persisting the updated
339-
// ChannelManager, we want to minimize methods blocking on a ChannelManager
340-
// generally, and as a fallback place such blocking only immediately before
341-
// persistence.
342-
peer_manager.process_events();
343-
344-
// We wait up to 100ms, but track how long it takes to detect being put to sleep,
345-
// see `await_start`'s use below.
346-
let await_start = Instant::now();
347-
let updates_available =
348-
channel_manager.await_persistable_update_timeout(Duration::from_millis(100));
349-
let await_time = await_start.elapsed();
350-
351-
if updates_available {
352-
log_trace!(logger, "Persisting ChannelManager...");
353-
persister.persist_manager(&*channel_manager)?;
354-
log_trace!(logger, "Done persisting ChannelManager.");
355-
}
356-
// Exit the loop if the background processor was requested to stop.
357-
if stop_thread.load(Ordering::Acquire) == true {
358-
log_trace!(logger, "Terminating background processor.");
359-
break;
360-
}
361-
if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
362-
log_trace!(logger, "Calling ChannelManager's timer_tick_occurred");
363-
channel_manager.timer_tick_occurred();
364-
last_freshness_call = Instant::now();
365-
}
366-
if await_time > Duration::from_secs(1) {
367-
// On various platforms, we may be starved of CPU cycles for several reasons.
368-
// E.g. on iOS, if we've been in the background, we will be entirely paused.
369-
// Similarly, if we're on a desktop platform and the device has been asleep, we
370-
// may not get any cycles.
371-
// We detect this by checking if our max-100ms-sleep, above, ran longer than a
372-
// full second, at which point we assume sockets may have been killed (they
373-
// appear to be at least on some platforms, even if it has only been a second).
374-
// Note that we have to take care to not get here just because user event
375-
// processing was slow at the top of the loop. For example, the sample client
376-
// may call Bitcoin Core RPCs during event handling, which very often takes
377-
// more than a handful of seconds to complete, and shouldn't disconnect all our
378-
// peers.
379-
log_trace!(logger, "100ms sleep took more than a second, disconnecting peers.");
380-
peer_manager.disconnect_all_peers();
381-
last_ping_call = Instant::now();
382-
} else if last_ping_call.elapsed().as_secs() > PING_TIMER {
383-
log_trace!(logger, "Calling PeerManager's timer_tick_occurred");
384-
peer_manager.timer_tick_occurred();
385-
last_ping_call = Instant::now();
386-
}
387-
388-
// Note that we want to run a graph prune once not long after startup before
389-
// falling back to our usual hourly prunes. This avoids short-lived clients never
390-
// pruning their network graph. We run once 60 seconds after startup before
391-
// continuing our normal cadence.
392-
if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
393-
// The network graph must not be pruned while rapid sync completion is pending
394-
log_trace!(logger, "Assessing prunability of network graph");
395-
if let Some(network_graph) = gossip_sync.prunable_network_graph() {
396-
network_graph.remove_stale_channels();
397-
398-
if let Err(e) = persister.persist_graph(network_graph) {
399-
log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
400-
}
401-
402-
last_prune_call = Instant::now();
403-
have_pruned = true;
404-
} else {
405-
log_trace!(logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.");
406-
}
407-
}
408-
409-
if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER {
410-
if let Some(ref scorer) = scorer {
411-
log_trace!(logger, "Persisting scorer");
412-
if let Err(e) = persister.persist_scorer(&scorer) {
413-
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
414-
}
415-
}
416-
last_scorer_persist_call = Instant::now();
417-
}
418-
}
419-
420-
// After we exit, ensure we persist the ChannelManager one final time - this avoids
421-
// some races where users quit while channel updates were in-flight, with
422-
// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
423-
persister.persist_manager(&*channel_manager)?;
424-
425-
// Persist Scorer on exit
426-
if let Some(ref scorer) = scorer {
427-
persister.persist_scorer(&scorer)?;
428-
}
429-
430-
// Persist NetworkGraph on exit
431-
if let Some(network_graph) = gossip_sync.network_graph() {
432-
persister.persist_graph(network_graph)?;
433-
}
434-
435-
Ok(())
513+
define_run_body!(persister, event_handler, chain_monitor, channel_manager,
514+
gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
515+
channel_manager.await_persistable_update_timeout(Duration::from_millis(100)))
436516
});
437517
Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
438518
}

lightning/src/util/wakers.rs

+2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ use core::mem;
1818
use core::time::Duration;
1919
use sync::{Condvar, Mutex};
2020

21+
use prelude::{Box, Vec};
22+
2123
#[cfg(any(test, feature = "std"))]
2224
use std::time::Instant;
2325

0 commit comments

Comments
 (0)