Skip to content

Call peer_manager.process_events() in BackgroundProcessor #864

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 46 additions & 20 deletions background-processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ use lightning::chain;
use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
use lightning::chain::keysinterface::{Sign, KeysInterface};
use lightning::ln::channelmanager::ChannelManager;
use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler};
use lightning::ln::peer_handler::{PeerManager, SocketDescriptor};
use lightning::util::logger::Logger;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
Expand Down Expand Up @@ -63,40 +65,50 @@ impl BackgroundProcessor {
/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
/// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
/// [`FilesystemPersister::persist_manager`]: lightning_persister::FilesystemPersister::persist_manager
pub fn start<PM, Signer, M, T, K, F, L>(persist_manager: PM, manager: Arc<ChannelManager<Signer, Arc<M>, Arc<T>, Arc<K>, Arc<F>, Arc<L>>>, logger: Arc<L>) -> Self
where Signer: 'static + Sign,
M: 'static + chain::Watch<Signer>,
T: 'static + BroadcasterInterface,
K: 'static + KeysInterface<Signer=Signer>,
F: 'static + FeeEstimator,
L: 'static + Logger,
PM: 'static + Send + Fn(&ChannelManager<Signer, Arc<M>, Arc<T>, Arc<K>, Arc<F>, Arc<L>>) -> Result<(), std::io::Error>,
pub fn start<PM, Signer, M, T, K, F, L, Descriptor: 'static + SocketDescriptor + Send, CM, RM>(
persist_channel_manager: PM,
channel_manager: Arc<ChannelManager<Signer, Arc<M>, Arc<T>, Arc<K>, Arc<F>, Arc<L>>>,
peer_manager: Arc<PeerManager<Descriptor, Arc<CM>, Arc<RM>, Arc<L>>>, logger: Arc<L>,
) -> Self
where
Signer: 'static + Sign,
M: 'static + chain::Watch<Signer>,
T: 'static + BroadcasterInterface,
K: 'static + KeysInterface<Signer = Signer>,
F: 'static + FeeEstimator,
L: 'static + Logger,
CM: 'static + ChannelMessageHandler,
RM: 'static + RoutingMessageHandler,
PM: 'static
+ Send
+ Fn(
&ChannelManager<Signer, Arc<M>, Arc<T>, Arc<K>, Arc<F>, Arc<L>>,
) -> Result<(), std::io::Error>,
{
let stop_thread = Arc::new(AtomicBool::new(false));
let stop_thread_clone = stop_thread.clone();
let handle = thread::spawn(move || -> Result<(), std::io::Error> {
let mut current_time = Instant::now();
loop {
let updates_available = manager.await_persistable_update_timeout(Duration::from_millis(100));
peer_manager.process_events();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, unrelated to this PR, but we really need to fix support for knowing we need to persist if something happened while we weren't awaiting. Probably some unique id that just gets incremented every time we would wake up any waiters and then get that as a part of the write process. Then we can always check that the ID is the latest.

let updates_available =
channel_manager.await_persistable_update_timeout(Duration::from_millis(100));
if updates_available {
persist_manager(&*manager)?;
persist_channel_manager(&*channel_manager)?;
}
// Exit the loop if the background processor was requested to stop.
if stop_thread.load(Ordering::Acquire) == true {
log_trace!(logger, "Terminating background processor.");
return Ok(())
return Ok(());
}
if current_time.elapsed().as_secs() > CHAN_FRESHNESS_TIMER {
log_trace!(logger, "Calling manager's timer_chan_freshness_every_min");
manager.timer_chan_freshness_every_min();
channel_manager.timer_chan_freshness_every_min();
current_time = Instant::now();
}
}
});
Self {
stop_thread: stop_thread_clone,
thread_handle: handle,
}
Self { stop_thread: stop_thread_clone, thread_handle: handle }
}

/// Stop `BackgroundProcessor`'s thread.
Expand All @@ -120,6 +132,7 @@ mod tests {
use lightning::ln::channelmanager::{ChainParameters, ChannelManager, SimpleArcChannelManager};
use lightning::ln::features::InitFeatures;
use lightning::ln::msgs::ChannelMessageHandler;
use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor};
use lightning::util::config::UserConfig;
use lightning::util::events::{Event, EventsProvider, MessageSendEventsProvider, MessageSendEvent};
use lightning::util::logger::Logger;
Expand All @@ -132,10 +145,21 @@ mod tests {
use std::time::Duration;
use super::BackgroundProcessor;

#[derive(Clone, Eq, Hash, PartialEq)]
struct TestDescriptor{}
impl SocketDescriptor for TestDescriptor {
fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
0
}

fn disconnect_socket(&mut self) {}
}

type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;

struct Node {
node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, Arc<test_utils::TestLogger>>>,
persister: Arc<FilesystemPersister>,
logger: Arc<test_utils::TestLogger>,
}
Expand Down Expand Up @@ -176,7 +200,9 @@ mod tests {
latest_height: 0,
};
let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster, logger.clone(), keys_manager.clone(), UserConfig::default(), params));
let node = Node { node: manager, persister, logger };
let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new() )};
let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(), &seed, logger.clone()));
let node = Node { node: manager, peer_manager, persister, logger };
nodes.push(node);
}
nodes
Expand Down Expand Up @@ -220,7 +246,7 @@ mod tests {
// Initiate the background processors to watch each node.
let data_dir = nodes[0].persister.get_data_dir();
let callback = move |node: &ChannelManager<InMemorySigner, Arc<ChainMonitor>, Arc<test_utils::TestBroadcaster>, Arc<KeysManager>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>>| FilesystemPersister::persist_manager(data_dir.clone(), node);
let bg_processor = BackgroundProcessor::start(callback, nodes[0].node.clone(), nodes[0].logger.clone());
let bg_processor = BackgroundProcessor::start(callback, nodes[0].node.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());

// Go through the channel creation process until each node should have something persisted.
let tx = open_channel!(nodes[0], nodes[1], 100000);
Expand Down Expand Up @@ -275,7 +301,7 @@ mod tests {
let nodes = create_nodes(1, "test_chan_freshness_called".to_string());
let data_dir = nodes[0].persister.get_data_dir();
let callback = move |node: &ChannelManager<InMemorySigner, Arc<ChainMonitor>, Arc<test_utils::TestBroadcaster>, Arc<KeysManager>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>>| FilesystemPersister::persist_manager(data_dir.clone(), node);
let bg_processor = BackgroundProcessor::start(callback, nodes[0].node.clone(), nodes[0].logger.clone());
let bg_processor = BackgroundProcessor::start(callback, nodes[0].node.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
loop {
let log_entries = nodes[0].logger.lines.lock().unwrap();
let desired_log = "Calling manager's timer_chan_freshness_every_min".to_string();
Expand All @@ -302,7 +328,7 @@ mod tests {
}

let nodes = create_nodes(2, "test_persist_error".to_string());
let bg_processor = BackgroundProcessor::start(persist_manager, nodes[0].node.clone(), nodes[0].logger.clone());
let bg_processor = BackgroundProcessor::start(persist_manager, nodes[0].node.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
open_channel!(nodes[0], nodes[1], 100000);

let _ = bg_processor.thread_handle.join().unwrap().expect_err("Errored persisting manager: test");
Expand Down