Skip to content

Regularly broadcast node announcements #93

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ pub(crate) const PAYMENT_INFO_PERSISTENCE_NAMESPACE: &str = "payments";
pub(crate) const LATEST_RGS_SYNC_TIMESTAMP_NAMESPACE: &str = "";
pub(crate) const LATEST_RGS_SYNC_TIMESTAMP_KEY: &str = "latest_rgs_sync_timestamp";

/// The last time we broadcast a node announcement will be persisted under this key.
pub(crate) const LATEST_NODE_ANN_BCAST_TIMSTAMP_NAMESPACE: &str = "";
pub(crate) const LATEST_NODE_ANN_BCAST_TIMSTAMP_KEY: &str = "latest_node_ann_bcast_timestamp";

/// Provides an interface that allows to store and retrieve persisted values that are associated
/// with given keys.
///
Expand Down
57 changes: 57 additions & 0 deletions src/io/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,3 +228,60 @@ where
Error::PersistenceFailed
})
}

pub(crate) fn read_latest_node_ann_bcast_timestamp<K: Deref>(
kv_store: K,
) -> Result<u64, std::io::Error>
where
K::Target: KVStore,
{
let mut reader = kv_store
.read(LATEST_NODE_ANN_BCAST_TIMSTAMP_NAMESPACE, LATEST_NODE_ANN_BCAST_TIMSTAMP_KEY)?;
u64::read(&mut reader).map_err(|_| {
std::io::Error::new(
std::io::ErrorKind::InvalidData,
"Failed to deserialize latest node announcment broadcast timestamp",
)
})
}

pub(crate) fn write_latest_node_ann_bcast_timestamp<K: Deref, L: Deref>(
updated_timestamp: u64, kv_store: K, logger: L,
) -> Result<(), Error>
where
K::Target: KVStore,
L::Target: Logger,
{
let mut writer = kv_store
.write(LATEST_NODE_ANN_BCAST_TIMSTAMP_NAMESPACE, LATEST_NODE_ANN_BCAST_TIMSTAMP_KEY)
.map_err(|e| {
log_error!(
logger,
"Getting writer for key {}/{} failed due to: {}",
LATEST_NODE_ANN_BCAST_TIMSTAMP_NAMESPACE,
LATEST_NODE_ANN_BCAST_TIMSTAMP_KEY,
e
);
Error::PersistenceFailed
})?;
updated_timestamp.write(&mut writer).map_err(|e| {
log_error!(
logger,
"Writing data to key {}/{} failed due to: {}",
LATEST_NODE_ANN_BCAST_TIMSTAMP_NAMESPACE,
LATEST_NODE_ANN_BCAST_TIMSTAMP_KEY,
e
);
Error::PersistenceFailed
})?;
writer.commit().map_err(|e| {
log_error!(
logger,
"Committing data to key {}/{} failed due to: {}",
LATEST_NODE_ANN_BCAST_TIMSTAMP_NAMESPACE,
LATEST_NODE_ANN_BCAST_TIMSTAMP_KEY,
e
);
Error::PersistenceFailed
})
}
60 changes: 59 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,12 +169,15 @@ const BDK_CLIENT_CONCURRENCY: u8 = 8;
// The timeout after which we abandon retrying failed payments.
const LDK_PAYMENT_RETRY_TIMEOUT: Duration = Duration::from_secs(10);

// The time in between peer reconnection attempts.
// The time in-between peer reconnection attempts.
const PEER_RECONNECTION_INTERVAL: Duration = Duration::from_secs(10);

// The time in-between RGS sync attempts.
const RGS_SYNC_INTERVAL: Duration = Duration::from_secs(60 * 60);

// The time in-between node announcement broadcast attempts.
const NODE_ANN_BCAST_INTERVAL: Duration = Duration::from_secs(60 * 60);

// The length in bytes of our wallets' keys seed.
const WALLET_KEYS_SEED_LEN: usize = 64;

Expand Down Expand Up @@ -870,6 +873,7 @@ impl Node {
let mut stop_connect = self.stop_receiver.clone();
runtime.spawn(async move {
let mut interval = tokio::time::interval(PEER_RECONNECTION_INTERVAL);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
tokio::select! {
_ = stop_connect.changed() => {
Expand Down Expand Up @@ -902,6 +906,60 @@ impl Node {
}
});

// Regularly broadcast node announcements.
let bcast_cm = Arc::clone(&self.channel_manager);
let bcast_pm = Arc::clone(&self.peer_manager);
let bcast_config = Arc::clone(&self.config);
let bcast_store = Arc::clone(&self.kv_store);
let bcast_logger = Arc::clone(&self.logger);
let mut stop_bcast = self.stop_receiver.clone();
runtime.spawn(async move {
// We check every 30 secs whether our last broadcast is NODE_ANN_BCAST_INTERVAL away.
let mut interval = tokio::time::interval(Duration::from_secs(30));
loop {
tokio::select! {
_ = stop_bcast.changed() => {
return;
}
_ = interval.tick() => {
let skip_broadcast = match io::utils::read_latest_node_ann_bcast_timestamp(Arc::clone(&bcast_store)) {
Ok(latest_bcast_time_secs) => {
// Skip if the time hasn't elapsed yet.
let next_bcast_unix_time = SystemTime::UNIX_EPOCH + Duration::from_secs(latest_bcast_time_secs) + NODE_ANN_BCAST_INTERVAL;
next_bcast_unix_time.elapsed().is_err()
}
Err(_) => {
// Don't skip if we haven't broadcasted before.
false
}
};

if skip_broadcast {
continue;
}

if bcast_cm.list_channels().iter().any(|chan| chan.is_public) {
// Skip if we don't have any public channels.
continue;
}

if bcast_pm.get_peer_node_ids().is_empty() {
// Skip if we don't have any connected peers to gossip to.
continue;
}

let addresses =
bcast_config.listening_address.iter().cloned().map(|a| a.0).collect();
bcast_pm.broadcast_node_announcement([0; 3], [0; 32], addresses);

let unix_time_secs = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs();
io::utils::write_latest_node_ann_bcast_timestamp(unix_time_secs, Arc::clone(&bcast_store), Arc::clone(&bcast_logger))
.expect("Persistence failed");
}
}
}
});

// Setup background processing
let background_persister = Arc::clone(&self.kv_store);
let background_event_handler = Arc::clone(&event_handler);
Expand Down