Skip to content

Commit de6dded

Browse files
authored
Merge pull request #864 from valentinewallace/background-process-peer-events
Call peer_manager.process_events() in BackgroundProcessor
2 parents df732f4 + 0c34529 commit de6dded

File tree

1 file changed

+46
-20
lines changed
  • background-processor/src

1 file changed

+46
-20
lines changed

background-processor/src/lib.rs

Lines changed: 46 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ use lightning::chain;
1212
use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
1313
use lightning::chain::keysinterface::{Sign, KeysInterface};
1414
use lightning::ln::channelmanager::ChannelManager;
15+
use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler};
16+
use lightning::ln::peer_handler::{PeerManager, SocketDescriptor};
1517
use lightning::util::logger::Logger;
1618
use std::sync::Arc;
1719
use std::sync::atomic::{AtomicBool, Ordering};
@@ -63,40 +65,50 @@ impl BackgroundProcessor {
6365
/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
6466
/// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
6567
/// [`FilesystemPersister::persist_manager`]: lightning_persister::FilesystemPersister::persist_manager
66-
pub fn start<PM, Signer, M, T, K, F, L>(persist_manager: PM, manager: Arc<ChannelManager<Signer, Arc<M>, Arc<T>, Arc<K>, Arc<F>, Arc<L>>>, logger: Arc<L>) -> Self
67-
where Signer: 'static + Sign,
68-
M: 'static + chain::Watch<Signer>,
69-
T: 'static + BroadcasterInterface,
70-
K: 'static + KeysInterface<Signer=Signer>,
71-
F: 'static + FeeEstimator,
72-
L: 'static + Logger,
73-
PM: 'static + Send + Fn(&ChannelManager<Signer, Arc<M>, Arc<T>, Arc<K>, Arc<F>, Arc<L>>) -> Result<(), std::io::Error>,
68+
pub fn start<PM, Signer, M, T, K, F, L, Descriptor: 'static + SocketDescriptor + Send, CM, RM>(
69+
persist_channel_manager: PM,
70+
channel_manager: Arc<ChannelManager<Signer, Arc<M>, Arc<T>, Arc<K>, Arc<F>, Arc<L>>>,
71+
peer_manager: Arc<PeerManager<Descriptor, Arc<CM>, Arc<RM>, Arc<L>>>, logger: Arc<L>,
72+
) -> Self
73+
where
74+
Signer: 'static + Sign,
75+
M: 'static + chain::Watch<Signer>,
76+
T: 'static + BroadcasterInterface,
77+
K: 'static + KeysInterface<Signer = Signer>,
78+
F: 'static + FeeEstimator,
79+
L: 'static + Logger,
80+
CM: 'static + ChannelMessageHandler,
81+
RM: 'static + RoutingMessageHandler,
82+
PM: 'static
83+
+ Send
84+
+ Fn(
85+
&ChannelManager<Signer, Arc<M>, Arc<T>, Arc<K>, Arc<F>, Arc<L>>,
86+
) -> Result<(), std::io::Error>,
7487
{
7588
let stop_thread = Arc::new(AtomicBool::new(false));
7689
let stop_thread_clone = stop_thread.clone();
7790
let handle = thread::spawn(move || -> Result<(), std::io::Error> {
7891
let mut current_time = Instant::now();
7992
loop {
80-
let updates_available = manager.await_persistable_update_timeout(Duration::from_millis(100));
93+
peer_manager.process_events();
94+
let updates_available =
95+
channel_manager.await_persistable_update_timeout(Duration::from_millis(100));
8196
if updates_available {
82-
persist_manager(&*manager)?;
97+
persist_channel_manager(&*channel_manager)?;
8398
}
8499
// Exit the loop if the background processor was requested to stop.
85100
if stop_thread.load(Ordering::Acquire) == true {
86101
log_trace!(logger, "Terminating background processor.");
87-
return Ok(())
102+
return Ok(());
88103
}
89104
if current_time.elapsed().as_secs() > CHAN_FRESHNESS_TIMER {
90105
log_trace!(logger, "Calling manager's timer_chan_freshness_every_min");
91-
manager.timer_chan_freshness_every_min();
106+
channel_manager.timer_chan_freshness_every_min();
92107
current_time = Instant::now();
93108
}
94109
}
95110
});
96-
Self {
97-
stop_thread: stop_thread_clone,
98-
thread_handle: handle,
99-
}
111+
Self { stop_thread: stop_thread_clone, thread_handle: handle }
100112
}
101113

102114
/// Stop `BackgroundProcessor`'s thread.
@@ -120,6 +132,7 @@ mod tests {
120132
use lightning::ln::channelmanager::{ChainParameters, ChannelManager, SimpleArcChannelManager};
121133
use lightning::ln::features::InitFeatures;
122134
use lightning::ln::msgs::ChannelMessageHandler;
135+
use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor};
123136
use lightning::util::config::UserConfig;
124137
use lightning::util::events::{Event, EventsProvider, MessageSendEventsProvider, MessageSendEvent};
125138
use lightning::util::logger::Logger;
@@ -132,10 +145,21 @@ mod tests {
132145
use std::time::Duration;
133146
use super::BackgroundProcessor;
134147

148+
#[derive(Clone, Eq, Hash, PartialEq)]
149+
struct TestDescriptor{}
150+
impl SocketDescriptor for TestDescriptor {
151+
fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
152+
0
153+
}
154+
155+
fn disconnect_socket(&mut self) {}
156+
}
157+
135158
type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
136159

137160
struct Node {
138161
node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
162+
peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, Arc<test_utils::TestLogger>>>,
139163
persister: Arc<FilesystemPersister>,
140164
logger: Arc<test_utils::TestLogger>,
141165
}
@@ -176,7 +200,9 @@ mod tests {
176200
latest_height: 0,
177201
};
178202
let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster, logger.clone(), keys_manager.clone(), UserConfig::default(), params));
179-
let node = Node { node: manager, persister, logger };
203+
let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new() )};
204+
let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(), &seed, logger.clone()));
205+
let node = Node { node: manager, peer_manager, persister, logger };
180206
nodes.push(node);
181207
}
182208
nodes
@@ -220,7 +246,7 @@ mod tests {
220246
// Initiate the background processors to watch each node.
221247
let data_dir = nodes[0].persister.get_data_dir();
222248
let callback = move |node: &ChannelManager<InMemorySigner, Arc<ChainMonitor>, Arc<test_utils::TestBroadcaster>, Arc<KeysManager>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>>| FilesystemPersister::persist_manager(data_dir.clone(), node);
223-
let bg_processor = BackgroundProcessor::start(callback, nodes[0].node.clone(), nodes[0].logger.clone());
249+
let bg_processor = BackgroundProcessor::start(callback, nodes[0].node.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
224250

225251
// Go through the channel creation process until each node should have something persisted.
226252
let tx = open_channel!(nodes[0], nodes[1], 100000);
@@ -275,7 +301,7 @@ mod tests {
275301
let nodes = create_nodes(1, "test_chan_freshness_called".to_string());
276302
let data_dir = nodes[0].persister.get_data_dir();
277303
let callback = move |node: &ChannelManager<InMemorySigner, Arc<ChainMonitor>, Arc<test_utils::TestBroadcaster>, Arc<KeysManager>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>>| FilesystemPersister::persist_manager(data_dir.clone(), node);
278-
let bg_processor = BackgroundProcessor::start(callback, nodes[0].node.clone(), nodes[0].logger.clone());
304+
let bg_processor = BackgroundProcessor::start(callback, nodes[0].node.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
279305
loop {
280306
let log_entries = nodes[0].logger.lines.lock().unwrap();
281307
let desired_log = "Calling manager's timer_chan_freshness_every_min".to_string();
@@ -302,7 +328,7 @@ mod tests {
302328
}
303329

304330
let nodes = create_nodes(2, "test_persist_error".to_string());
305-
let bg_processor = BackgroundProcessor::start(persist_manager, nodes[0].node.clone(), nodes[0].logger.clone());
331+
let bg_processor = BackgroundProcessor::start(persist_manager, nodes[0].node.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
306332
open_channel!(nodes[0], nodes[1], 100000);
307333

308334
let _ = bg_processor.thread_handle.join().unwrap().expect_err("Errored persisting manager: test");

0 commit comments

Comments
 (0)