diff --git a/Cargo.toml b/Cargo.toml index db8c8c2e3..85a0ef46b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,7 @@ lightning = { version = "0.0.115", features = ["max_level_trace", "std"] } lightning-invoice = { version = "0.23" } lightning-net-tokio = { version = "0.0.115" } lightning-persister = { version = "0.0.115" } -lightning-background-processor = { version = "0.0.115" } +lightning-background-processor = { version = "0.0.115", features = ["futures"] } lightning-rapid-gossip-sync = { version = "0.0.115" } lightning-transaction-sync = { version = "0.0.115", features = ["esplora-async-https"] } @@ -21,7 +21,7 @@ lightning-transaction-sync = { version = "0.0.115", features = ["esplora-async-h #lightning-invoice = { git = "https://github.com/lightningdevkit/rust-lightning", branch="main" } #lightning-net-tokio = { git = "https://github.com/lightningdevkit/rust-lightning", branch="main" } #lightning-persister = { git = "https://github.com/lightningdevkit/rust-lightning", branch="main" } -#lightning-background-processor = { git = "https://github.com/lightningdevkit/rust-lightning", branch="main" } +#lightning-background-processor = { git = "https://github.com/lightningdevkit/rust-lightning", branch="main", features = ["futures"] } #lightning-rapid-gossip-sync = { git = "https://github.com/lightningdevkit/rust-lightning", branch="main" } #lightning-transaction-sync = { git = "https://github.com/lightningdevkit/rust-lightning", branch="main", features = ["esplora-async"] } @@ -29,7 +29,7 @@ lightning-transaction-sync = { version = "0.0.115", features = ["esplora-async-h #lightning-invoice = { path = "../rust-lightning/lightning-invoice" } #lightning-net-tokio = { path = "../rust-lightning/lightning-net-tokio" } #lightning-persister = { path = "../rust-lightning/lightning-persister" } -#lightning-background-processor = { path = "../rust-lightning/lightning-background-processor" } +#lightning-background-processor = { path = "../rust-lightning/lightning-background-processor", features = ["futures"] } #lightning-rapid-gossip-sync = { path = "../rust-lightning/lightning-rapid-gossip-sync" } #lightning-transaction-sync = { path = "../rust-lightning/lightning-transaction-sync", features = ["esplora-async"] } diff --git a/src/event.rs b/src/event.rs index 09143f224..24c0e2af6 100644 --- a/src/event.rs +++ b/src/event.rs @@ -11,7 +11,6 @@ use crate::logger::{log_error, log_info, Logger}; use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator}; use lightning::events::Event as LdkEvent; -use lightning::events::EventHandler as LdkEventHandler; use lightning::events::PaymentPurpose; use lightning::impl_writeable_tlv_based_enum; use lightning::ln::PaymentHash; @@ -24,7 +23,7 @@ use bitcoin::OutPoint; use rand::{thread_rng, Rng}; use std::collections::VecDeque; use std::ops::Deref; -use std::sync::{Arc, Condvar, Mutex}; +use std::sync::{Arc, Condvar, Mutex, RwLock}; use std::time::Duration; /// An event emitted by [`Node`], which should be handled by the user. @@ -248,7 +247,7 @@ where network_graph: Arc, keys_manager: Arc, payment_store: Arc>, - tokio_runtime: Arc, + runtime: Arc>>, logger: L, _config: Arc, } @@ -262,7 +261,7 @@ where wallet: Arc>, event_queue: Arc>, channel_manager: Arc, network_graph: Arc, keys_manager: Arc, payment_store: Arc>, - tokio_runtime: Arc, logger: L, _config: Arc, + runtime: Arc>>, logger: L, _config: Arc, ) -> Self { Self { event_queue, @@ -272,18 +271,12 @@ where keys_manager, payment_store, logger, - tokio_runtime, + runtime, _config, } } -} -impl LdkEventHandler for EventHandler -where - K::Target: KVStore, - L::Target: Logger, -{ - fn handle_event(&self, event: LdkEvent) { + pub async fn handle_event(&self, event: LdkEvent) { match event { LdkEvent::FundingGenerationReady { temporary_channel_id, @@ -538,12 +531,17 @@ where let forwarding_channel_manager = self.channel_manager.clone(); let min = time_forwardable.as_millis() as u64; - self.tokio_runtime.spawn(async move { - let millis_to_sleep = thread_rng().gen_range(min..min * 5) as u64; - tokio::time::sleep(Duration::from_millis(millis_to_sleep)).await; + let runtime_lock = self.runtime.read().unwrap(); + debug_assert!(runtime_lock.is_some()); + + if let Some(runtime) = runtime_lock.as_ref() { + runtime.spawn(async move { + let millis_to_sleep = thread_rng().gen_range(min..min * 5) as u64; + tokio::time::sleep(Duration::from_millis(millis_to_sleep)).await; - forwarding_channel_manager.process_pending_htlc_forwards(); - }); + forwarding_channel_manager.process_pending_htlc_forwards(); + }); + } } LdkEvent::SpendableOutputs { outputs } => { // TODO: We should eventually remember the outputs and supply them to the wallet's coin selection, once BDK allows us to do so. diff --git a/src/lib.rs b/src/lib.rs index 8b2d1333b..c2d2a7ece 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -117,7 +117,7 @@ use lightning::routing::utxo::UtxoLookup; use lightning::util::config::{ChannelHandshakeConfig, ChannelHandshakeLimits, UserConfig}; use lightning::util::ser::ReadableArgs; -use lightning_background_processor::BackgroundProcessor; +use lightning_background_processor::process_events_async; use lightning_background_processor::GossipSync as BPGossipSync; use lightning_transaction_sync::EsploraSyncClient; @@ -347,7 +347,13 @@ impl Builder { EsploraBlockchain::from_client(tx_sync.client().clone(), BDK_CLIENT_STOP_GAP) .with_concurrency(BDK_CLIENT_CONCURRENCY); - let wallet = Arc::new(Wallet::new(blockchain, bdk_wallet, Arc::clone(&logger))); + let runtime = Arc::new(RwLock::new(None)); + let wallet = Arc::new(Wallet::new( + blockchain, + bdk_wallet, + Arc::clone(&runtime), + Arc::clone(&logger), + )); let kv_store = Arc::new(FilesystemStore::new(ldk_data_dir.clone().into())); @@ -556,10 +562,11 @@ impl Builder { } }; - let running = RwLock::new(None); + let stop_running = Arc::new(AtomicBool::new(false)); Node { - running, + runtime, + stop_running, config, wallet, tx_sync, @@ -579,20 +586,12 @@ impl Builder { } } -/// Wraps all objects that need to be preserved during the run time of [`Node`]. Will be dropped -/// upon [`Node::stop()`]. -struct Runtime { - tokio_runtime: Arc, - _background_processor: BackgroundProcessor, - stop_networking: Arc, - stop_wallet_sync: Arc, -} - /// The main interface object of LDK Node, wrapping the necessary LDK and BDK functionalities. /// /// Needs to be initialized and instantiated through [`Builder::build`]. pub struct Node { - running: RwLock>, + runtime: Arc>>, + stop_running: Arc, config: Arc, wallet: Arc>, tx_sync: Arc>>, @@ -618,48 +617,15 @@ impl Node { /// a thread-safe manner. pub fn start(&self) -> Result<(), Error> { // Acquire a run lock and hold it until we're setup. - let mut run_lock = self.running.write().unwrap(); - if run_lock.is_some() { + let mut runtime_lock = self.runtime.write().unwrap(); + if runtime_lock.is_some() { // We're already running. return Err(Error::AlreadyRunning); } - let runtime = self.setup_runtime()?; - *run_lock = Some(runtime); - Ok(()) - } + let runtime = tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap(); - /// Disconnects all peers, stops all running background tasks, and shuts down [`Node`]. - /// - /// After this returns most API methods will return [`Error::NotRunning`]. - pub fn stop(&self) -> Result<(), Error> { - let mut run_lock = self.running.write().unwrap(); - if run_lock.is_none() { - return Err(Error::NotRunning); - } - - let runtime = run_lock.as_ref().unwrap(); - - // Stop wallet sync - runtime.stop_wallet_sync.store(true, Ordering::Release); - - // Stop networking - runtime.stop_networking.store(true, Ordering::Release); - self.peer_manager.disconnect_all_peers(); - - // Drop the held runtimes. - self.wallet.drop_runtime(); - - // Drop the runtime, which stops the background processor and any possibly remaining tokio threads. - *run_lock = None; - Ok(()) - } - - fn setup_runtime(&self) -> Result { - let tokio_runtime = - Arc::new(tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap()); - - self.wallet.set_runtime(Arc::clone(&tokio_runtime)); + let stop_running = Arc::new(AtomicBool::new(false)); let event_handler = Arc::new(EventHandler::new( Arc::clone(&self.wallet), @@ -668,7 +634,7 @@ impl Node { Arc::clone(&self.network_graph), Arc::clone(&self.keys_manager), Arc::clone(&self.payment_store), - Arc::clone(&tokio_runtime), + Arc::clone(&self.runtime), Arc::clone(&self.logger), Arc::clone(&self.config), )); @@ -679,8 +645,7 @@ impl Node { let sync_cman = Arc::clone(&self.channel_manager); let sync_cmon = Arc::clone(&self.chain_monitor); let sync_logger = Arc::clone(&self.logger); - let stop_wallet_sync = Arc::new(AtomicBool::new(false)); - let stop_sync = Arc::clone(&stop_wallet_sync); + let stop_sync = Arc::clone(&stop_running); std::thread::spawn(move || { tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on( @@ -711,8 +676,8 @@ impl Node { }); let sync_logger = Arc::clone(&self.logger); - let stop_sync = Arc::clone(&stop_wallet_sync); - tokio_runtime.spawn(async move { + let stop_sync = Arc::clone(&stop_running); + runtime.spawn(async move { loop { if stop_sync.load(Ordering::Acquire) { return; @@ -736,14 +701,13 @@ impl Node { } }); - let stop_networking = Arc::new(AtomicBool::new(false)); if let Some(listening_address) = &self.config.listening_address { // Setup networking let peer_manager_connection_handler = Arc::clone(&self.peer_manager); - let stop_listen = Arc::clone(&stop_networking); + let stop_listen = Arc::clone(&stop_running); let listening_address = listening_address.clone(); - tokio_runtime.spawn(async move { + runtime.spawn(async move { let listener = tokio::net::TcpListener::bind(listening_address).await.expect( "Failed to bind to listen address/port - is something else already listening on it?", @@ -770,8 +734,8 @@ impl Node { let connect_pm = Arc::clone(&self.peer_manager); let connect_logger = Arc::clone(&self.logger); let connect_peer_store = Arc::clone(&self.peer_store); - let stop_connect = Arc::clone(&stop_networking); - tokio_runtime.spawn(async move { + let stop_connect = Arc::clone(&stop_running); + runtime.spawn(async move { let mut interval = tokio::time::interval(PEER_RECONNECTION_INTERVAL); loop { if stop_connect.load(Ordering::Acquire) { @@ -803,19 +767,61 @@ impl Node { }); // Setup background processing - let _background_processor = BackgroundProcessor::start( - Arc::clone(&self.kv_store), - Arc::clone(&event_handler), - Arc::clone(&self.chain_monitor), - Arc::clone(&self.channel_manager), - BPGossipSync::p2p(Arc::clone(&self.gossip_sync)), - Arc::clone(&self.peer_manager), - Arc::clone(&self.logger), - Some(Arc::clone(&self.scorer)), - ); + let background_persister = Arc::clone(&self.kv_store); + let background_event_handler = Arc::clone(&event_handler); + let background_chain_mon = Arc::clone(&self.chain_monitor); + let background_chan_man = Arc::clone(&self.channel_manager); + let background_gossip_sync = BPGossipSync::p2p(Arc::clone(&self.gossip_sync)); + let background_peer_man = Arc::clone(&self.peer_manager); + let background_logger = Arc::clone(&self.logger); + let background_scorer = Arc::clone(&self.scorer); + let stop_background_processing = Arc::clone(&stop_running); + let sleeper = move |d| { + let stop = Arc::clone(&stop_background_processing); + Box::pin(async move { + if stop.load(Ordering::Acquire) { + true + } else { + tokio::time::sleep(d).await; + false + } + }) + }; + + runtime.spawn(async move { + process_events_async( + background_persister, + |e| background_event_handler.handle_event(e), + background_chain_mon, + background_chan_man, + background_gossip_sync, + background_peer_man, + background_logger, + Some(background_scorer), + sleeper, + true, + ) + .await + .expect("Failed to process events"); + }); + + *runtime_lock = Some(runtime); + Ok(()) + } - // TODO: frequently check back on background_processor if there was an error - Ok(Runtime { tokio_runtime, _background_processor, stop_networking, stop_wallet_sync }) + /// Disconnects all peers, stops all running background tasks, and shuts down [`Node`]. + /// + /// After this returns most API methods will return [`Error::NotRunning`]. + pub fn stop(&self) -> Result<(), Error> { + let runtime = self.runtime.write().unwrap().take().ok_or(Error::NotRunning)?; + // Stop the runtime. + self.stop_running.store(true, Ordering::Release); + + // Stop disconnect peers. + self.peer_manager.disconnect_all_peers(); + + runtime.shutdown_timeout(Duration::from_secs(10)); + Ok(()) } /// Blocks until the next event is available. @@ -865,12 +871,11 @@ impl Node { pub fn connect( &self, node_id: PublicKey, address: SocketAddr, permanently: bool, ) -> Result<(), Error> { - let runtime_lock = self.running.read().unwrap(); - if runtime_lock.is_none() { + let rt_lock = self.runtime.read().unwrap(); + if rt_lock.is_none() { return Err(Error::NotRunning); } - - let runtime = runtime_lock.as_ref().unwrap(); + let runtime = rt_lock.as_ref().unwrap(); let peer_info = PeerInfo { pubkey: node_id, address }; @@ -882,7 +887,7 @@ impl Node { let con_pm = Arc::clone(&self.peer_manager); tokio::task::block_in_place(move || { - runtime.tokio_runtime.block_on(async move { + runtime.block_on(async move { let res = connect_peer_if_necessary(con_peer_pubkey, con_peer_addr, con_pm, con_logger) .await; @@ -908,8 +913,8 @@ impl Node { /// Will also remove the peer from the peer store, i.e., after this has been called we won't /// try to reconnect on restart. pub fn disconnect(&self, counterparty_node_id: &PublicKey) -> Result<(), Error> { - let runtime_lock = self.running.read().unwrap(); - if runtime_lock.is_none() { + let rt_lock = self.runtime.read().unwrap(); + if rt_lock.is_none() { return Err(Error::NotRunning); } @@ -939,12 +944,11 @@ impl Node { &self, node_id: PublicKey, address: SocketAddr, channel_amount_sats: u64, push_to_counterparty_msat: Option, announce_channel: bool, ) -> Result<(), Error> { - let runtime_lock = self.running.read().unwrap(); - if runtime_lock.is_none() { + let rt_lock = self.runtime.read().unwrap(); + if rt_lock.is_none() { return Err(Error::NotRunning); } - - let runtime = runtime_lock.as_ref().unwrap(); + let runtime = rt_lock.as_ref().unwrap(); let cur_balance = self.wallet.get_balance()?; if cur_balance.get_spendable() < channel_amount_sats { @@ -962,7 +966,7 @@ impl Node { let con_pm = Arc::clone(&self.peer_manager); tokio::task::block_in_place(move || { - runtime.tokio_runtime.block_on(async move { + runtime.block_on(async move { let res = connect_peer_if_necessary(con_peer_pubkey, con_peer_addr, con_pm, con_logger) .await; @@ -1017,10 +1021,12 @@ impl Node { /// /// Note that the wallets will be also synced regularly in the background. pub fn sync_wallets(&self) -> Result<(), Error> { - let runtime_lock = self.running.read().unwrap(); - if runtime_lock.is_none() { + let rt_lock = self.runtime.read().unwrap(); + if rt_lock.is_none() { return Err(Error::NotRunning); } + let runtime = rt_lock.as_ref().unwrap(); + let wallet = Arc::clone(&self.wallet); let tx_sync = Arc::clone(&self.tx_sync); let sync_cman = Arc::clone(&self.channel_manager); @@ -1031,7 +1037,6 @@ impl Node { &*sync_cmon as &(dyn Confirm + Sync + Send), ]; - let runtime = runtime_lock.as_ref().unwrap(); tokio::task::block_in_place(move || { tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on( async move { @@ -1056,7 +1061,7 @@ impl Node { let sync_logger = Arc::clone(&self.logger); tokio::task::block_in_place(move || { - runtime.tokio_runtime.block_on(async move { + runtime.block_on(async move { let now = Instant::now(); match tx_sync.sync(confirmables).await { Ok(()) => { @@ -1091,7 +1096,8 @@ impl Node { /// Send a payement given an invoice. pub fn send_payment(&self, invoice: &Invoice) -> Result { - if self.running.read().unwrap().is_none() { + let rt_lock = self.runtime.read().unwrap(); + if rt_lock.is_none() { return Err(Error::NotRunning); } @@ -1157,7 +1163,8 @@ impl Node { pub fn send_payment_using_amount( &self, invoice: &Invoice, amount_msat: u64, ) -> Result { - if self.running.read().unwrap().is_none() { + let rt_lock = self.runtime.read().unwrap(); + if rt_lock.is_none() { return Err(Error::NotRunning); } @@ -1245,7 +1252,8 @@ impl Node { pub fn send_spontaneous_payment( &self, amount_msat: u64, node_id: &PublicKey, ) -> Result { - if self.running.read().unwrap().is_none() { + let rt_lock = self.runtime.read().unwrap(); + if rt_lock.is_none() { return Err(Error::NotRunning); } diff --git a/src/wallet.rs b/src/wallet.rs index beb8861c6..ac336cd79 100644 --- a/src/wallet.rs +++ b/src/wallet.rs @@ -38,7 +38,7 @@ where inner: Mutex>, // A cache storing the most recently retrieved fee rate estimations. fee_rate_cache: RwLock>, - tokio_runtime: RwLock>>, + runtime: Arc>>, sync_lock: (Mutex<()>, Condvar), logger: Arc, } @@ -48,13 +48,13 @@ where D: BatchDatabase, { pub(crate) fn new( - blockchain: EsploraBlockchain, wallet: bdk::Wallet, logger: Arc, + blockchain: EsploraBlockchain, wallet: bdk::Wallet, + runtime: Arc>>, logger: Arc, ) -> Self { let inner = Mutex::new(wallet); let fee_rate_cache = RwLock::new(HashMap::new()); - let tokio_runtime = RwLock::new(None); let sync_lock = (Mutex::new(()), Condvar::new()); - Self { blockchain, inner, fee_rate_cache, tokio_runtime, sync_lock, logger } + Self { blockchain, inner, fee_rate_cache, runtime, sync_lock, logger } } pub(crate) async fn sync(&self) -> Result<(), Error> { @@ -115,14 +115,6 @@ where res } - pub(crate) fn set_runtime(&self, tokio_runtime: Arc) { - *self.tokio_runtime.write().unwrap() = Some(tokio_runtime); - } - - pub(crate) fn drop_runtime(&self) { - *self.tokio_runtime.write().unwrap() = None; - } - pub(crate) async fn update_fee_estimates(&self) -> Result<(), Error> { let mut locked_fee_rate_cache = self.fee_rate_cache.write().unwrap(); @@ -239,7 +231,7 @@ where D: BatchDatabase, { fn broadcast_transaction(&self, tx: &Transaction) { - let locked_runtime = self.tokio_runtime.read().unwrap(); + let locked_runtime = self.runtime.read().unwrap(); if locked_runtime.as_ref().is_none() { log_error!(self.logger, "Failed to broadcast transaction: No runtime."); return;