From f45bc991e17865dba9bc1144376ac3c40561bbce Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Thu, 11 May 2023 16:17:48 +0200 Subject: [PATCH 1/2] Regularly broadcast node announcement --- src/lib.rs | 39 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 38 insertions(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index 7b3f96d66..9ae5e6427 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -169,12 +169,15 @@ const BDK_CLIENT_CONCURRENCY: u8 = 8; // The timeout after which we abandon retrying failed payments. const LDK_PAYMENT_RETRY_TIMEOUT: Duration = Duration::from_secs(10); -// The time in between peer reconnection attempts. +// The time in-between peer reconnection attempts. const PEER_RECONNECTION_INTERVAL: Duration = Duration::from_secs(10); // The time in-between RGS sync attempts. const RGS_SYNC_INTERVAL: Duration = Duration::from_secs(60 * 60); +// The time in-between node announcement broadcast attempts. +const NODE_ANN_BCAST_INTERVAL: Duration = Duration::from_secs(60 * 60); + // The length in bytes of our wallets' keys seed. const WALLET_KEYS_SEED_LEN: usize = 64; @@ -870,6 +873,7 @@ impl Node { let mut stop_connect = self.stop_receiver.clone(); runtime.spawn(async move { let mut interval = tokio::time::interval(PEER_RECONNECTION_INTERVAL); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); loop { tokio::select! { _ = stop_connect.changed() => { @@ -902,6 +906,39 @@ impl Node { } }); + // Regularly broadcast node announcements. + let bcast_cm = Arc::clone(&self.channel_manager); + let bcast_pm = Arc::clone(&self.peer_manager); + let bcast_config = Arc::clone(&self.config); + let mut stop_bcast = self.stop_receiver.clone(); + runtime.spawn(async move { + let mut interval = tokio::time::interval(NODE_ANN_BCAST_INTERVAL); + loop { + tokio::select! { + _ = stop_bcast.changed() => { + return; + } + _ = interval.tick(), if bcast_cm.list_channels().iter().any(|chan| chan.is_public) => { + while bcast_pm.get_peer_node_ids().is_empty() { + // Sleep a bit and retry if we don't have any peers yet. + tokio::time::sleep(Duration::from_secs(5)).await; + + // Check back if we need to stop. + match stop_bcast.has_changed() { + Ok(false) => {}, + Ok(true) => return, + Err(_) => return, + } + } + + let addresses = + bcast_config.listening_address.iter().cloned().map(|a| a.0).collect(); + bcast_pm.broadcast_node_announcement([0; 3], [0; 32], addresses); + } + } + } + }); + // Setup background processing let background_persister = Arc::clone(&self.kv_store); let background_event_handler = Arc::clone(&event_handler); From d86aa1afd38ccdd3c9d1aca9b61caa9556713c4d Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Thu, 18 May 2023 09:38:56 +0200 Subject: [PATCH 2/2] Persist latest_node_ann_broadcast_timestamp We remember when we last broadcasted a node announcement and only re-announce if sufficient time has passed, we have at least one public channel, and we have some connected peers to gossip to. --- src/io/mod.rs | 4 ++++ src/io/utils.rs | 57 +++++++++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 43 +++++++++++++++++++++++++++---------- 3 files changed, 93 insertions(+), 11 deletions(-) diff --git a/src/io/mod.rs b/src/io/mod.rs index 9a9067e6c..4f5008440 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -30,6 +30,10 @@ pub(crate) const PAYMENT_INFO_PERSISTENCE_NAMESPACE: &str = "payments"; pub(crate) const LATEST_RGS_SYNC_TIMESTAMP_NAMESPACE: &str = ""; pub(crate) const LATEST_RGS_SYNC_TIMESTAMP_KEY: &str = "latest_rgs_sync_timestamp"; +/// The last time we broadcast a node announcement will be persisted under this key. +pub(crate) const LATEST_NODE_ANN_BCAST_TIMSTAMP_NAMESPACE: &str = ""; +pub(crate) const LATEST_NODE_ANN_BCAST_TIMSTAMP_KEY: &str = "latest_node_ann_bcast_timestamp"; + /// Provides an interface that allows to store and retrieve persisted values that are associated /// with given keys. /// diff --git a/src/io/utils.rs b/src/io/utils.rs index cd058d475..f31c7587f 100644 --- a/src/io/utils.rs +++ b/src/io/utils.rs @@ -228,3 +228,60 @@ where Error::PersistenceFailed }) } + +pub(crate) fn read_latest_node_ann_bcast_timestamp( + kv_store: K, +) -> Result +where + K::Target: KVStore, +{ + let mut reader = kv_store + .read(LATEST_NODE_ANN_BCAST_TIMSTAMP_NAMESPACE, LATEST_NODE_ANN_BCAST_TIMSTAMP_KEY)?; + u64::read(&mut reader).map_err(|_| { + std::io::Error::new( + std::io::ErrorKind::InvalidData, + "Failed to deserialize latest node announcment broadcast timestamp", + ) + }) +} + +pub(crate) fn write_latest_node_ann_bcast_timestamp( + updated_timestamp: u64, kv_store: K, logger: L, +) -> Result<(), Error> +where + K::Target: KVStore, + L::Target: Logger, +{ + let mut writer = kv_store + .write(LATEST_NODE_ANN_BCAST_TIMSTAMP_NAMESPACE, LATEST_NODE_ANN_BCAST_TIMSTAMP_KEY) + .map_err(|e| { + log_error!( + logger, + "Getting writer for key {}/{} failed due to: {}", + LATEST_NODE_ANN_BCAST_TIMSTAMP_NAMESPACE, + LATEST_NODE_ANN_BCAST_TIMSTAMP_KEY, + e + ); + Error::PersistenceFailed + })?; + updated_timestamp.write(&mut writer).map_err(|e| { + log_error!( + logger, + "Writing data to key {}/{} failed due to: {}", + LATEST_NODE_ANN_BCAST_TIMSTAMP_NAMESPACE, + LATEST_NODE_ANN_BCAST_TIMSTAMP_KEY, + e + ); + Error::PersistenceFailed + })?; + writer.commit().map_err(|e| { + log_error!( + logger, + "Committing data to key {}/{} failed due to: {}", + LATEST_NODE_ANN_BCAST_TIMSTAMP_NAMESPACE, + LATEST_NODE_ANN_BCAST_TIMSTAMP_KEY, + e + ); + Error::PersistenceFailed + }) +} diff --git a/src/lib.rs b/src/lib.rs index 9ae5e6427..cfbaa3bb6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -910,30 +910,51 @@ impl Node { let bcast_cm = Arc::clone(&self.channel_manager); let bcast_pm = Arc::clone(&self.peer_manager); let bcast_config = Arc::clone(&self.config); + let bcast_store = Arc::clone(&self.kv_store); + let bcast_logger = Arc::clone(&self.logger); let mut stop_bcast = self.stop_receiver.clone(); runtime.spawn(async move { - let mut interval = tokio::time::interval(NODE_ANN_BCAST_INTERVAL); + // We check every 30 secs whether our last broadcast is NODE_ANN_BCAST_INTERVAL away. + let mut interval = tokio::time::interval(Duration::from_secs(30)); loop { tokio::select! { _ = stop_bcast.changed() => { return; } - _ = interval.tick(), if bcast_cm.list_channels().iter().any(|chan| chan.is_public) => { - while bcast_pm.get_peer_node_ids().is_empty() { - // Sleep a bit and retry if we don't have any peers yet. - tokio::time::sleep(Duration::from_secs(5)).await; - - // Check back if we need to stop. - match stop_bcast.has_changed() { - Ok(false) => {}, - Ok(true) => return, - Err(_) => return, + _ = interval.tick() => { + let skip_broadcast = match io::utils::read_latest_node_ann_bcast_timestamp(Arc::clone(&bcast_store)) { + Ok(latest_bcast_time_secs) => { + // Skip if the time hasn't elapsed yet. + let next_bcast_unix_time = SystemTime::UNIX_EPOCH + Duration::from_secs(latest_bcast_time_secs) + NODE_ANN_BCAST_INTERVAL; + next_bcast_unix_time.elapsed().is_err() + } + Err(_) => { + // Don't skip if we haven't broadcasted before. + false } + }; + + if skip_broadcast { + continue; + } + + if bcast_cm.list_channels().iter().any(|chan| chan.is_public) { + // Skip if we don't have any public channels. + continue; + } + + if bcast_pm.get_peer_node_ids().is_empty() { + // Skip if we don't have any connected peers to gossip to. + continue; } let addresses = bcast_config.listening_address.iter().cloned().map(|a| a.0).collect(); bcast_pm.broadcast_node_announcement([0; 3], [0; 32], addresses); + + let unix_time_secs = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs(); + io::utils::write_latest_node_ann_bcast_timestamp(unix_time_secs, Arc::clone(&bcast_store), Arc::clone(&bcast_logger)) + .expect("Persistence failed"); } } }