Skip to content

Add a background processor which is async #1657

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lightning-background-processor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ rustdoc-args = ["--cfg", "docsrs"]
bitcoin = "0.28.1"
lightning = { version = "0.0.110", path = "../lightning", features = ["std"] }
lightning-rapid-gossip-sync = { version = "0.0.110", path = "../lightning-rapid-gossip-sync" }
futures = { version = "0.3", optional = true }

[dev-dependencies]
lightning = { version = "0.0.110", path = "../lightning", features = ["_test_utils"] }
Expand Down
326 changes: 203 additions & 123 deletions lightning-background-processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ use std::thread::JoinHandle;
use std::time::{Duration, Instant};
use std::ops::Deref;

#[cfg(feature = "futures")]
use futures::{select, future::FutureExt};

/// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
/// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
/// responsibilities are:
Expand Down Expand Up @@ -219,6 +222,203 @@ where A::Target: chain::Access, L::Target: Logger {
}
}

macro_rules! define_run_body {
($persister: ident, $event_handler: ident, $chain_monitor: ident, $channel_manager: ident,
$gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
$loop_exit_check: expr, $await: expr)
=> { {
let event_handler = DecoratingEventHandler {
event_handler: $event_handler,
gossip_sync: &$gossip_sync,
};

log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
$channel_manager.timer_tick_occurred();

let mut last_freshness_call = Instant::now();
let mut last_ping_call = Instant::now();
let mut last_prune_call = Instant::now();
let mut last_scorer_persist_call = Instant::now();
let mut have_pruned = false;

loop {
$channel_manager.process_pending_events(&event_handler);
$chain_monitor.process_pending_events(&event_handler);

// Note that the PeerManager::process_events may block on ChannelManager's locks,
// hence it comes last here. When the ChannelManager finishes whatever it's doing,
// we want to ensure we get into `persist_manager` as quickly as we can, especially
// without running the normal event processing above and handing events to users.
//
// Specifically, on an *extremely* slow machine, we may see ChannelManager start
// processing a message effectively at any point during this loop. In order to
// minimize the time between such processing completing and persisting the updated
// ChannelManager, we want to minimize methods blocking on a ChannelManager
// generally, and as a fallback place such blocking only immediately before
// persistence.
$peer_manager.process_events();

// We wait up to 100ms, but track how long it takes to detect being put to sleep,
// see `await_start`'s use below.
let await_start = Instant::now();
let updates_available = $await;
let await_time = await_start.elapsed();

if updates_available {
log_trace!($logger, "Persisting ChannelManager...");
$persister.persist_manager(&*$channel_manager)?;
log_trace!($logger, "Done persisting ChannelManager.");
}
// Exit the loop if the background processor was requested to stop.
if $loop_exit_check {
log_trace!($logger, "Terminating background processor.");
break;
}
if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
$channel_manager.timer_tick_occurred();
last_freshness_call = Instant::now();
}
if await_time > Duration::from_secs(1) {
// On various platforms, we may be starved of CPU cycles for several reasons.
// E.g. on iOS, if we've been in the background, we will be entirely paused.
// Similarly, if we're on a desktop platform and the device has been asleep, we
// may not get any cycles.
// We detect this by checking if our max-100ms-sleep, above, ran longer than a
// full second, at which point we assume sockets may have been killed (they
// appear to be at least on some platforms, even if it has only been a second).
// Note that we have to take care to not get here just because user event
// processing was slow at the top of the loop. For example, the sample client
// may call Bitcoin Core RPCs during event handling, which very often takes
// more than a handful of seconds to complete, and shouldn't disconnect all our
// peers.
log_trace!($logger, "100ms sleep took more than a second, disconnecting peers.");
$peer_manager.disconnect_all_peers();
last_ping_call = Instant::now();
} else if last_ping_call.elapsed().as_secs() > PING_TIMER {
log_trace!($logger, "Calling PeerManager's timer_tick_occurred");
$peer_manager.timer_tick_occurred();
last_ping_call = Instant::now();
}

// Note that we want to run a graph prune once not long after startup before
// falling back to our usual hourly prunes. This avoids short-lived clients never
// pruning their network graph. We run once 60 seconds after startup before
// continuing our normal cadence.
if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
// The network graph must not be pruned while rapid sync completion is pending
log_trace!($logger, "Assessing prunability of network graph");
if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
network_graph.remove_stale_channels();

if let Err(e) = $persister.persist_graph(network_graph) {
log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
}

last_prune_call = Instant::now();
have_pruned = true;
} else {
log_trace!($logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.");
}
}

if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER {
if let Some(ref scorer) = $scorer {
log_trace!($logger, "Persisting scorer");
if let Err(e) = $persister.persist_scorer(&scorer) {
log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
}
}
last_scorer_persist_call = Instant::now();
}
}

// After we exit, ensure we persist the ChannelManager one final time - this avoids
// some races where users quit while channel updates were in-flight, with
// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
$persister.persist_manager(&*$channel_manager)?;

// Persist Scorer on exit
if let Some(ref scorer) = $scorer {
$persister.persist_scorer(&scorer)?;
}

// Persist NetworkGraph on exit
if let Some(network_graph) = $gossip_sync.network_graph() {
$persister.persist_graph(network_graph)?;
}

Ok(())
} }
}

/// Processes background events in a future.
///
/// `sleeper` should return a future which completes in the given amount of time and returns a
/// boolean indicating whether the background processing should continue. Once `sleeper` returns a
/// future which outputs false, the loop will exit and this function's future will complete.
///
/// See [`BackgroundProcessor::start`] for information on which actions this handles.
#[cfg(feature = "futures")]
pub async fn process_events_async<
'a,
Signer: 'static + Sign,
CA: 'static + Deref + Send + Sync,
CF: 'static + Deref + Send + Sync,
CW: 'static + Deref + Send + Sync,
T: 'static + Deref + Send + Sync,
K: 'static + Deref + Send + Sync,
F: 'static + Deref + Send + Sync,
G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
L: 'static + Deref + Send + Sync,
P: 'static + Deref + Send + Sync,
Descriptor: 'static + SocketDescriptor + Send + Sync,
CMH: 'static + Deref + Send + Sync,
RMH: 'static + Deref + Send + Sync,
EH: 'static + EventHandler + Send,
PS: 'static + Deref + Send,
M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
CM: 'static + Deref<Target = ChannelManager<Signer, CW, T, K, F, L>> + Send + Sync,
PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
UMH: 'static + Deref + Send + Sync,
PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L, UMH>> + Send + Sync,
S: 'static + Deref<Target = SC> + Send + Sync,
SC: WriteableScore<'a>,
Comment on lines +363 to +387
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wonder if there is a way to move the common type parameters to BackgroundProcessor struct so as not to repeat this long trait bounds list. Then have an "implementation" type parameter to choose between sync and async. Then the macro could be a function without repeating all the trait bounds again.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We spoke a bit offline about this, but I'm not sure how. I went down this rabbit hole initially and ended up repeating all the type stuff at least 4 times before I gave up and added a macro.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if you tried this already, but would transitioning BackgroundProcessor to be a trait with all of the generic parameters listed as associated types and the common logic between sync and async be extracted into a trait method help?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I think you still end up with the same issue - you create a trait, which lists all the generic crap, then you create two public structs which implement that trait (which both also have all the generic crap) then you create two impl blocks, which both also have all the generic crap. So now you listed it 5 times, I think :(

Copy link
Contributor

@wpaulino wpaulino Aug 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we can just use a trait to group all the different generic bounds of start and process_events_async together and not actually implement it on BackgroundProcessor so that we can just reference the generic bounds by the new trait?

start's full function signature would resemble something like:

pub fn start<BPP: BackgroundProcessorParams>(
    persister: BPP::PS,
    event_handler: BPP::EH,
    chain_monitor: BPP::M,
    channel_manager: BPP::CM,
    gossip_sync: GossipSync<BPP::PGS, BPP::RGS, BPP::G, BPP::CA, BPP::L>,
    peer_manager: BPP::PM,
    logger: BPP::L,
    scorer: Option<BPP::S>,
) -> Self

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sadly the trait methods would have to support being both async and sync, which I don't think we could capture in a single trait?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True. That best I could find was this: https://docs.rs/maybe-async/latest/maybe_async/

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heh, I suppose we could....a macro is probably more readable than a proc macro, though :(

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably could just feature gate two different versions if they are mutually exclusive, but probably not worth doing here. Just figured we could avoid the duplicative type parameters.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw. it's probably worth mentioning here that BDK went this route to provide blocking/async chain client implementations side-by-side (cf. https://github.com/bitcoindevkit/bdk/blob/master/macros/src/lib.rs#L84-L146). IIUC, they ran into some issues when splitting off the Esplora client from the main project and are looking into providing alternatives to give the user more control which impl they want to use.

SleepFuture: core::future::Future<Output = bool>,
Sleeper: Fn(Duration) -> SleepFuture
>(
persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
sleeper: Sleeper,
) -> Result<(), std::io::Error>
where
CA::Target: 'static + chain::Access,
CF::Target: 'static + chain::Filter,
CW::Target: 'static + chain::Watch<Signer>,
T::Target: 'static + BroadcasterInterface,
K::Target: 'static + KeysInterface<Signer = Signer>,
F::Target: 'static + FeeEstimator,
L::Target: 'static + Logger,
P::Target: 'static + Persist<Signer>,
CMH::Target: 'static + ChannelMessageHandler,
RMH::Target: 'static + RoutingMessageHandler,
UMH::Target: 'static + CustomMessageHandler,
PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
{
let mut should_continue = true;
define_run_body!(persister, event_handler, chain_monitor, channel_manager,
gossip_sync, peer_manager, logger, scorer, should_continue, {
select! {
_ = channel_manager.get_persistable_update_future().fuse() => true,
cont = sleeper(Duration::from_millis(100)).fuse() => {
should_continue = cont;
false
}
}
})
}

impl BackgroundProcessor {
/// Start a background thread that takes care of responsibilities enumerated in the [top-level
/// documentation].
Expand Down Expand Up @@ -310,129 +510,9 @@ impl BackgroundProcessor {
let stop_thread = Arc::new(AtomicBool::new(false));
let stop_thread_clone = stop_thread.clone();
let handle = thread::spawn(move || -> Result<(), std::io::Error> {
let event_handler = DecoratingEventHandler {
event_handler,
gossip_sync: &gossip_sync,
};

log_trace!(logger, "Calling ChannelManager's timer_tick_occurred on startup");
channel_manager.timer_tick_occurred();

let mut last_freshness_call = Instant::now();
let mut last_ping_call = Instant::now();
let mut last_prune_call = Instant::now();
let mut last_scorer_persist_call = Instant::now();
let mut have_pruned = false;

loop {
channel_manager.process_pending_events(&event_handler);
chain_monitor.process_pending_events(&event_handler);

// Note that the PeerManager::process_events may block on ChannelManager's locks,
// hence it comes last here. When the ChannelManager finishes whatever it's doing,
// we want to ensure we get into `persist_manager` as quickly as we can, especially
// without running the normal event processing above and handing events to users.
//
// Specifically, on an *extremely* slow machine, we may see ChannelManager start
// processing a message effectively at any point during this loop. In order to
// minimize the time between such processing completing and persisting the updated
// ChannelManager, we want to minimize methods blocking on a ChannelManager
// generally, and as a fallback place such blocking only immediately before
// persistence.
peer_manager.process_events();

// We wait up to 100ms, but track how long it takes to detect being put to sleep,
// see `await_start`'s use below.
let await_start = Instant::now();
let updates_available =
channel_manager.await_persistable_update_timeout(Duration::from_millis(100));
let await_time = await_start.elapsed();

if updates_available {
log_trace!(logger, "Persisting ChannelManager...");
persister.persist_manager(&*channel_manager)?;
log_trace!(logger, "Done persisting ChannelManager.");
}
// Exit the loop if the background processor was requested to stop.
if stop_thread.load(Ordering::Acquire) == true {
log_trace!(logger, "Terminating background processor.");
break;
}
if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
log_trace!(logger, "Calling ChannelManager's timer_tick_occurred");
channel_manager.timer_tick_occurred();
last_freshness_call = Instant::now();
}
if await_time > Duration::from_secs(1) {
// On various platforms, we may be starved of CPU cycles for several reasons.
// E.g. on iOS, if we've been in the background, we will be entirely paused.
// Similarly, if we're on a desktop platform and the device has been asleep, we
// may not get any cycles.
// We detect this by checking if our max-100ms-sleep, above, ran longer than a
// full second, at which point we assume sockets may have been killed (they
// appear to be at least on some platforms, even if it has only been a second).
// Note that we have to take care to not get here just because user event
// processing was slow at the top of the loop. For example, the sample client
// may call Bitcoin Core RPCs during event handling, which very often takes
// more than a handful of seconds to complete, and shouldn't disconnect all our
// peers.
log_trace!(logger, "100ms sleep took more than a second, disconnecting peers.");
peer_manager.disconnect_all_peers();
last_ping_call = Instant::now();
} else if last_ping_call.elapsed().as_secs() > PING_TIMER {
log_trace!(logger, "Calling PeerManager's timer_tick_occurred");
peer_manager.timer_tick_occurred();
last_ping_call = Instant::now();
}

// Note that we want to run a graph prune once not long after startup before
// falling back to our usual hourly prunes. This avoids short-lived clients never
// pruning their network graph. We run once 60 seconds after startup before
// continuing our normal cadence.
if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
// The network graph must not be pruned while rapid sync completion is pending
log_trace!(logger, "Assessing prunability of network graph");
if let Some(network_graph) = gossip_sync.prunable_network_graph() {
network_graph.remove_stale_channels();

if let Err(e) = persister.persist_graph(network_graph) {
log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
}

last_prune_call = Instant::now();
have_pruned = true;
} else {
log_trace!(logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.");
}
}

if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER {
if let Some(ref scorer) = scorer {
log_trace!(logger, "Persisting scorer");
if let Err(e) = persister.persist_scorer(&scorer) {
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
}
}
last_scorer_persist_call = Instant::now();
}
}

// After we exit, ensure we persist the ChannelManager one final time - this avoids
// some races where users quit while channel updates were in-flight, with
// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
persister.persist_manager(&*channel_manager)?;

// Persist Scorer on exit
if let Some(ref scorer) = scorer {
persister.persist_scorer(&scorer)?;
}

// Persist NetworkGraph on exit
if let Some(network_graph) = gossip_sync.network_graph() {
persister.persist_graph(network_graph)?;
}

Ok(())
define_run_body!(persister, event_handler, chain_monitor, channel_manager,
gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
channel_manager.await_persistable_update_timeout(Duration::from_millis(100)))
});
Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
}
Expand Down
Loading