Skip to content

Commit f3aeeee

Browse files
committed
Persist RGS last_sync_timestamp across restarts
1 parent 6ab3092 commit f3aeeee

File tree

4 files changed

+105
-18
lines changed

4 files changed

+105
-18
lines changed

src/gossip.rs

+6-5
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,11 @@ impl GossipSource {
3030
}
3131

3232
pub fn new_rgs(
33-
server_url: String, network_graph: Arc<NetworkGraph>, logger: Arc<FilesystemLogger>,
33+
server_url: String, latest_sync_timestamp: u32, network_graph: Arc<NetworkGraph>,
34+
logger: Arc<FilesystemLogger>,
3435
) -> Self {
3536
let gossip_sync = Arc::new(RapidGossipSync::new(network_graph, Arc::clone(&logger)));
36-
let latest_sync_timestamp = AtomicU32::new(0);
37+
let latest_sync_timestamp = AtomicU32::new(latest_sync_timestamp);
3738
Self::RapidGossipSync { gossip_sync, server_url, latest_sync_timestamp, logger }
3839
}
3940

@@ -54,9 +55,9 @@ impl GossipSource {
5455
}
5556
}
5657

57-
pub async fn update_rgs_snapshot(&self) -> Result<(), Error> {
58+
pub async fn update_rgs_snapshot(&self) -> Result<u32, Error> {
5859
match self {
59-
Self::P2PNetwork { gossip_sync: _ } => Ok(()),
60+
Self::P2PNetwork { gossip_sync: _ } => Ok(0),
6061
Self::RapidGossipSync { gossip_sync, server_url, latest_sync_timestamp, logger } => {
6162
let query_timestamp = latest_sync_timestamp.load(Ordering::Acquire);
6263
let query_url = format!("{}/{}", server_url, query_timestamp);
@@ -76,7 +77,7 @@ impl GossipSource {
7677
.update_network_graph(&update_data)
7778
.map_err(|_| Error::GossipUpdateFailed)?;
7879
latest_sync_timestamp.store(new_latest_sync_timestamp, Ordering::Release);
79-
Ok(())
80+
Ok(new_latest_sync_timestamp)
8081
}
8182
Err(e) => {
8283
log_trace!(logger, "Failed to retrieve RGS gossip update: {}", e);

src/io/mod.rs

+4
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ pub(crate) const PEER_INFO_PERSISTENCE_KEY: &str = "peers";
2626
/// The payment information will be persisted under this prefix.
2727
pub(crate) const PAYMENT_INFO_PERSISTENCE_NAMESPACE: &str = "payments";
2828

29+
/// RapidGossipSync's `latest_sync_timestamp` will be persisted under this key.
30+
pub(crate) const RGS_LATEST_SYNC_TIMESTAMP_NAMESPACE: &str = "";
31+
pub(crate) const RGS_LATEST_SYNC_TIMESTAMP_KEY: &str = "rgs_latest_sync_timestamp";
32+
2933
/// Provides an interface that allows to store and retrieve persisted values that are associated
3034
/// with given keys.
3135
///

src/io/utils.rs

+58-2
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
11
use super::*;
22
use crate::WALLET_KEYS_SEED_LEN;
33

4+
use crate::logger::log_error;
45
use crate::peer_store::PeerStore;
5-
use crate::{EventQueue, PaymentDetails};
6+
use crate::{Error, EventQueue, PaymentDetails};
67

78
use lightning::chain::channelmonitor::ChannelMonitor;
89
use lightning::chain::keysinterface::{EntropySource, SignerProvider};
910
use lightning::routing::gossip::NetworkGraph;
1011
use lightning::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringParameters};
1112
use lightning::util::logger::Logger;
12-
use lightning::util::ser::{Readable, ReadableArgs};
13+
use lightning::util::ser::{Readable, ReadableArgs, Writeable};
1314

1415
use bitcoin::hash_types::{BlockHash, Txid};
1516
use bitcoin::hashes::hex::FromHex;
@@ -172,3 +173,58 @@ where
172173
}
173174
Ok(res)
174175
}
176+
177+
pub(crate) fn read_rgs_latest_sync_timestamp<K: Deref>(kv_store: K) -> Result<u32, std::io::Error>
178+
where
179+
K::Target: KVStore,
180+
{
181+
let mut reader =
182+
kv_store.read(RGS_LATEST_SYNC_TIMESTAMP_NAMESPACE, RGS_LATEST_SYNC_TIMESTAMP_KEY)?;
183+
u32::read(&mut reader).map_err(|_| {
184+
std::io::Error::new(
185+
std::io::ErrorKind::InvalidData,
186+
"Failed to deserialize latest RGS sync timestamp",
187+
)
188+
})
189+
}
190+
191+
pub(crate) fn write_rgs_latest_sync_timestamp<K: Deref, L: Deref>(
192+
updated_timestamp: u32, kv_store: K, logger: L,
193+
) -> Result<(), Error>
194+
where
195+
K::Target: KVStore,
196+
L::Target: Logger,
197+
{
198+
let mut writer = kv_store
199+
.write(RGS_LATEST_SYNC_TIMESTAMP_NAMESPACE, RGS_LATEST_SYNC_TIMESTAMP_KEY)
200+
.map_err(|e| {
201+
log_error!(
202+
logger,
203+
"Getting writer for key {}/{} failed due to: {}",
204+
RGS_LATEST_SYNC_TIMESTAMP_NAMESPACE,
205+
RGS_LATEST_SYNC_TIMESTAMP_KEY,
206+
e
207+
);
208+
Error::PersistenceFailed
209+
})?;
210+
updated_timestamp.write(&mut writer).map_err(|e| {
211+
log_error!(
212+
logger,
213+
"Writing data to key {}/{} failed due to: {}",
214+
RGS_LATEST_SYNC_TIMESTAMP_NAMESPACE,
215+
RGS_LATEST_SYNC_TIMESTAMP_KEY,
216+
e
217+
);
218+
Error::PersistenceFailed
219+
})?;
220+
writer.commit().map_err(|e| {
221+
log_error!(
222+
logger,
223+
"Committing data to key {}/{} failed due to: {}",
224+
RGS_LATEST_SYNC_TIMESTAMP_NAMESPACE,
225+
RGS_LATEST_SYNC_TIMESTAMP_KEY,
226+
e
227+
);
228+
Error::PersistenceFailed
229+
})
230+
}

src/lib.rs

+37-11
Original file line numberDiff line numberDiff line change
@@ -539,13 +539,30 @@ impl Builder {
539539

540540
let gossip_source = match gossip_source_config {
541541
GossipSourceConfig::P2PNetwork => {
542-
Arc::new(GossipSource::new_p2p(Arc::clone(&network_graph), Arc::clone(&logger)))
542+
let p2p_source = Arc::new(GossipSource::new_p2p(
543+
Arc::clone(&network_graph),
544+
Arc::clone(&logger),
545+
));
546+
547+
// Reset the RGS sync timestamp in case we somehow switch gossip sources
548+
io::utils::write_rgs_latest_sync_timestamp(
549+
0,
550+
Arc::clone(&kv_store),
551+
Arc::clone(&logger),
552+
)
553+
.expect("Persistence failed");
554+
p2p_source
555+
}
556+
GossipSourceConfig::RapidGossipSync(rgs_server) => {
557+
let latest_sync_timestamp =
558+
io::utils::read_rgs_latest_sync_timestamp(Arc::clone(&kv_store)).unwrap_or(0);
559+
Arc::new(GossipSource::new_rgs(
560+
rgs_server.clone(),
561+
latest_sync_timestamp,
562+
Arc::clone(&network_graph),
563+
Arc::clone(&logger),
564+
))
543565
}
544-
GossipSourceConfig::RapidGossipSync(rgs_server) => Arc::new(GossipSource::new_rgs(
545-
rgs_server.clone(),
546-
Arc::clone(&network_graph),
547-
Arc::clone(&logger),
548-
)),
549566
};
550567

551568
let msg_handler = match gossip_source.as_gossip_sync() {
@@ -727,6 +744,7 @@ impl Node {
727744

728745
if self.gossip_source.is_rgs() {
729746
let gossip_source = Arc::clone(&self.gossip_source);
747+
let gossip_sync_store = Arc::clone(&self.kv_store);
730748
let gossip_sync_logger = Arc::clone(&self.logger);
731749
let stop_gossip_sync = Arc::clone(&stop_running);
732750
runtime.spawn(async move {
@@ -739,11 +757,19 @@ impl Node {
739757

740758
let now = Instant::now();
741759
match gossip_source.update_rgs_snapshot().await {
742-
Ok(()) => log_info!(
743-
gossip_sync_logger,
744-
"Background sync of RGS gossip data finished in {}ms.",
745-
now.elapsed().as_millis()
746-
),
760+
Ok(updated_timestamp) => {
761+
log_info!(
762+
gossip_sync_logger,
763+
"Background sync of RGS gossip data finished in {}ms.",
764+
now.elapsed().as_millis()
765+
);
766+
io::utils::write_rgs_latest_sync_timestamp(
767+
updated_timestamp,
768+
Arc::clone(&gossip_sync_store),
769+
Arc::clone(&gossip_sync_logger),
770+
)
771+
.expect("Persistence failed");
772+
}
747773
Err(e) => log_error!(
748774
gossip_sync_logger,
749775
"Background sync of RGS gossip data failed: {}",

0 commit comments

Comments
 (0)