Skip to content

Commit 26e61e8

Browse files
authored
Merge pull request #365 from tnull/2024-10-chain-source-refactor
Refactor syncing and introduce `ChainSource`
2 parents 0c816c8 + 94ff68f commit 26e61e8

File tree

19 files changed

+1168
-1055
lines changed

19 files changed

+1168
-1055
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use std::str::FromStr;
2424
fn main() {
2525
let mut builder = Builder::new();
2626
builder.set_network(Network::Testnet);
27-
builder.set_esplora_server("https://blockstream.info/testnet/api".to_string());
27+
builder.set_chain_source_esplora("https://blockstream.info/testnet/api".to_string(), None);
2828
builder.set_gossip_source_rgs("https://rapidsync.lightningdevkit.org/testnet/snapshot".to_string());
2929

3030
let node = builder.build().unwrap();

bindings/kotlin/ldk-node-jvm/lib/src/test/kotlin/org/lightningdevkit/ldknode/LibraryTest.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,9 +130,9 @@ class LibraryTest {
130130
println("Config 2: $config2")
131131

132132
val builder1 = Builder.fromConfig(config1)
133-
builder1.setEsploraServer(esploraEndpoint)
133+
builder1.setChainSourceEsplora(esploraEndpoint, null)
134134
val builder2 = Builder.fromConfig(config2)
135-
builder2.setEsploraServer(esploraEndpoint)
135+
builder2.setChainSourceEsplora(esploraEndpoint, null)
136136

137137
val node1 = builder1.build()
138138
val node2 = builder2.build()

bindings/ldk_node.udl

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,6 @@ dictionary Config {
99
Network network;
1010
sequence<SocketAddress>? listening_addresses;
1111
NodeAlias? node_alias;
12-
u64 onchain_wallet_sync_interval_secs;
13-
u64 wallet_sync_interval_secs;
14-
u64 fee_rate_cache_update_interval_secs;
1512
sequence<PublicKey> trusted_peers_0conf;
1613
u64 probing_liquidity_limit_multiplier;
1714
LogLevel log_level;
@@ -24,6 +21,12 @@ dictionary AnchorChannelsConfig {
2421
u64 per_channel_reserve_sats;
2522
};
2623

24+
dictionary EsploraSyncConfig {
25+
u64 onchain_wallet_sync_interval_secs;
26+
u64 lightning_wallet_sync_interval_secs;
27+
u64 fee_rate_cache_update_interval_secs;
28+
};
29+
2730
interface Builder {
2831
constructor();
2932
[Name=from_config]
@@ -32,7 +35,7 @@ interface Builder {
3235
[Throws=BuildError]
3336
void set_entropy_seed_bytes(sequence<u8> seed_bytes);
3437
void set_entropy_bip39_mnemonic(Mnemonic mnemonic, string? passphrase);
35-
void set_esplora_server(string esplora_server_url);
38+
void set_chain_source_esplora(string server_url, EsploraSyncConfig? config);
3639
void set_gossip_source_p2p();
3740
void set_gossip_source_rgs(string rgs_server_url);
3841
void set_liquidity_source_lsps2(SocketAddress address, PublicKey node_id, string? token);
@@ -218,11 +221,12 @@ dictionary NodeStatus {
218221
boolean is_running;
219222
boolean is_listening;
220223
BestBlock current_best_block;
221-
u64? latest_wallet_sync_timestamp;
224+
u64? latest_lightning_wallet_sync_timestamp;
222225
u64? latest_onchain_wallet_sync_timestamp;
223226
u64? latest_fee_rate_cache_update_timestamp;
224227
u64? latest_rgs_snapshot_timestamp;
225228
u64? latest_node_announcement_broadcast_timestamp;
229+
u32? latest_channel_monitor_archival_height;
226230
};
227231

228232
dictionary BestBlock {

bindings/python/src/ldk_node/test_ldk_node.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ def setup_node(tmp_dir, esplora_endpoint, listening_addresses):
8484
config = default_config()
8585
builder = Builder.from_config(config)
8686
builder.set_storage_dir_path(tmp_dir)
87-
builder.set_esplora_server(esplora_endpoint)
87+
builder.set_chain_source_esplora(esplora_endpoint, None)
8888
builder.set_network(DEFAULT_TEST_NETWORK)
8989
builder.set_listening_addresses(listening_addresses)
9090
return builder.build()

src/builder.rs

Lines changed: 87 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,15 @@
55
// http://opensource.org/licenses/MIT>, at your option. You may not use this file except in
66
// accordance with one or both of these licenses.
77

8-
use crate::config::{
9-
default_user_config, Config, DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS, DEFAULT_ESPLORA_SERVER_URL,
10-
WALLET_KEYS_SEED_LEN,
11-
};
8+
use crate::chain::{ChainSource, DEFAULT_ESPLORA_SERVER_URL};
9+
use crate::config::{default_user_config, Config, EsploraSyncConfig, WALLET_KEYS_SEED_LEN};
10+
1211
use crate::connection::ConnectionManager;
1312
use crate::event::EventQueue;
1413
use crate::fee_estimator::OnchainFeeEstimator;
1514
use crate::gossip::GossipSource;
16-
use crate::io;
1715
use crate::io::sqlite_store::SqliteStore;
16+
use crate::io::utils::{read_node_metrics, write_node_metrics};
1817
#[cfg(any(vss, vss_test))]
1918
use crate::io::vss_store::VssStore;
2019
use crate::liquidity::LiquiditySource;
@@ -29,6 +28,7 @@ use crate::types::{
2928
};
3029
use crate::wallet::persist::KVStoreWalletPersister;
3130
use crate::wallet::Wallet;
31+
use crate::{io, NodeMetrics};
3232
use crate::{LogLevel, Node};
3333

3434
use lightning::chain::{chainmonitor, BestBlock, Watch};
@@ -52,8 +52,6 @@ use lightning::util::sweep::OutputSweeper;
5252

5353
use lightning_persister::fs_store::FilesystemStore;
5454

55-
use lightning_transaction_sync::EsploraSyncClient;
56-
5755
use lightning_liquidity::lsps2::client::LSPS2ClientConfig;
5856
use lightning_liquidity::{LiquidityClientConfig, LiquidityManager};
5957

@@ -79,7 +77,7 @@ use std::time::SystemTime;
7977

8078
#[derive(Debug, Clone)]
8179
enum ChainDataSourceConfig {
82-
Esplora(String),
80+
Esplora { server_url: String, sync_config: Option<EsploraSyncConfig> },
8381
}
8482

8583
#[derive(Debug, Clone)]
@@ -239,8 +237,14 @@ impl NodeBuilder {
239237
}
240238

241239
/// Configures the [`Node`] instance to source its chain data from the given Esplora server.
242-
pub fn set_esplora_server(&mut self, esplora_server_url: String) -> &mut Self {
243-
self.chain_data_source_config = Some(ChainDataSourceConfig::Esplora(esplora_server_url));
240+
///
241+
/// If no `sync_config` is given, default values are used. See [`EsploraSyncConfig`] for more
242+
/// information.
243+
pub fn set_chain_source_esplora(
244+
&mut self, server_url: String, sync_config: Option<EsploraSyncConfig>,
245+
) -> &mut Self {
246+
self.chain_data_source_config =
247+
Some(ChainDataSourceConfig::Esplora { server_url, sync_config });
244248
self
245249
}
246250

@@ -466,8 +470,13 @@ impl ArcedNodeBuilder {
466470
}
467471

468472
/// Configures the [`Node`] instance to source its chain data from the given Esplora server.
469-
pub fn set_esplora_server(&self, esplora_server_url: String) {
470-
self.inner.write().unwrap().set_esplora_server(esplora_server_url);
473+
///
474+
/// If no `sync_config` is given, default values are used. See [`EsploraSyncConfig`] for more
475+
/// information.
476+
pub fn set_chain_source_esplora(
477+
&self, server_url: String, sync_config: Option<EsploraSyncConfig>,
478+
) {
479+
self.inner.write().unwrap().set_chain_source_esplora(server_url, sync_config);
471480
}
472481

473482
/// Configures the [`Node`] instance to source its gossip data from the Lightning peer-to-peer
@@ -555,6 +564,19 @@ fn build_with_store_internal(
555564
liquidity_source_config: Option<&LiquiditySourceConfig>, seed_bytes: [u8; 64],
556565
logger: Arc<FilesystemLogger>, kv_store: Arc<DynStore>,
557566
) -> Result<Node, BuildError> {
567+
// Initialize the status fields.
568+
let is_listening = Arc::new(AtomicBool::new(false));
569+
let node_metrics = match read_node_metrics(Arc::clone(&kv_store), Arc::clone(&logger)) {
570+
Ok(metrics) => Arc::new(RwLock::new(metrics)),
571+
Err(e) => {
572+
if e.kind() == std::io::ErrorKind::NotFound {
573+
Arc::new(RwLock::new(NodeMetrics::default()))
574+
} else {
575+
return Err(BuildError::ReadFailed);
576+
}
577+
},
578+
};
579+
558580
// Initialize the on-chain wallet and chain access
559581
let xprv = bitcoin::bip32::Xpriv::new_master(config.network, &seed_bytes).map_err(|e| {
560582
log_error!(logger, "Failed to derive master secret: {}", e);
@@ -586,62 +608,54 @@ fn build_with_store_internal(
586608
})?,
587609
};
588610

589-
let (esplora_client, tx_sync, tx_broadcaster, fee_estimator) = match chain_data_source_config {
590-
Some(ChainDataSourceConfig::Esplora(server_url)) => {
591-
let mut client_builder = esplora_client::Builder::new(&server_url.clone());
592-
client_builder = client_builder.timeout(DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS);
593-
let esplora_client = client_builder.build_async().unwrap();
594-
let tx_sync = Arc::new(EsploraSyncClient::from_client(
595-
esplora_client.clone(),
596-
Arc::clone(&logger),
597-
));
598-
let tx_broadcaster = Arc::new(TransactionBroadcaster::new(
599-
tx_sync.client().clone(),
600-
Arc::clone(&logger),
601-
));
602-
let fee_estimator = Arc::new(OnchainFeeEstimator::new(
603-
tx_sync.client().clone(),
611+
let tx_broadcaster = Arc::new(TransactionBroadcaster::new(Arc::clone(&logger)));
612+
let fee_estimator = Arc::new(OnchainFeeEstimator::new());
613+
let wallet = Arc::new(Wallet::new(
614+
bdk_wallet,
615+
wallet_persister,
616+
Arc::clone(&tx_broadcaster),
617+
Arc::clone(&fee_estimator),
618+
Arc::clone(&logger),
619+
));
620+
621+
let chain_source = match chain_data_source_config {
622+
Some(ChainDataSourceConfig::Esplora { server_url, sync_config }) => {
623+
let sync_config = sync_config.unwrap_or(EsploraSyncConfig::default());
624+
Arc::new(ChainSource::new_esplora(
625+
server_url.clone(),
626+
sync_config,
627+
Arc::clone(&wallet),
628+
Arc::clone(&fee_estimator),
629+
Arc::clone(&tx_broadcaster),
630+
Arc::clone(&kv_store),
604631
Arc::clone(&config),
605632
Arc::clone(&logger),
606-
));
607-
(esplora_client, tx_sync, tx_broadcaster, fee_estimator)
633+
Arc::clone(&node_metrics),
634+
))
608635
},
609636
None => {
610637
// Default to Esplora client.
611638
let server_url = DEFAULT_ESPLORA_SERVER_URL.to_string();
612-
let mut client_builder = esplora_client::Builder::new(&server_url);
613-
client_builder = client_builder.timeout(DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS);
614-
let esplora_client = client_builder.build_async().unwrap();
615-
let tx_sync = Arc::new(EsploraSyncClient::from_client(
616-
esplora_client.clone(),
617-
Arc::clone(&logger),
618-
));
619-
let tx_broadcaster = Arc::new(TransactionBroadcaster::new(
620-
tx_sync.client().clone(),
621-
Arc::clone(&logger),
622-
));
623-
let fee_estimator = Arc::new(OnchainFeeEstimator::new(
624-
tx_sync.client().clone(),
639+
let sync_config = EsploraSyncConfig::default();
640+
Arc::new(ChainSource::new_esplora(
641+
server_url.clone(),
642+
sync_config,
643+
Arc::clone(&wallet),
644+
Arc::clone(&fee_estimator),
645+
Arc::clone(&tx_broadcaster),
646+
Arc::clone(&kv_store),
625647
Arc::clone(&config),
626648
Arc::clone(&logger),
627-
));
628-
(esplora_client, tx_sync, tx_broadcaster, fee_estimator)
649+
Arc::clone(&node_metrics),
650+
))
629651
},
630652
};
631653

632654
let runtime = Arc::new(RwLock::new(None));
633-
let wallet = Arc::new(Wallet::new(
634-
bdk_wallet,
635-
wallet_persister,
636-
esplora_client,
637-
Arc::clone(&tx_broadcaster),
638-
Arc::clone(&fee_estimator),
639-
Arc::clone(&logger),
640-
));
641655

642656
// Initialize the ChainMonitor
643657
let chain_monitor: Arc<ChainMonitor> = Arc::new(chainmonitor::ChainMonitor::new(
644-
Some(Arc::clone(&tx_sync)),
658+
Some(Arc::clone(&chain_source)),
645659
Arc::clone(&tx_broadcaster),
646660
Arc::clone(&logger),
647661
Arc::clone(&fee_estimator),
@@ -824,23 +838,24 @@ fn build_with_store_internal(
824838
Arc::new(GossipSource::new_p2p(Arc::clone(&network_graph), Arc::clone(&logger)));
825839

826840
// Reset the RGS sync timestamp in case we somehow switch gossip sources
827-
io::utils::write_latest_rgs_sync_timestamp(
828-
0,
829-
Arc::clone(&kv_store),
830-
Arc::clone(&logger),
831-
)
832-
.map_err(|e| {
833-
log_error!(logger, "Failed writing to store: {}", e);
834-
BuildError::WriteFailed
835-
})?;
841+
{
842+
let mut locked_node_metrics = node_metrics.write().unwrap();
843+
locked_node_metrics.latest_rgs_snapshot_timestamp = None;
844+
write_node_metrics(
845+
&*locked_node_metrics,
846+
Arc::clone(&kv_store),
847+
Arc::clone(&logger),
848+
)
849+
.map_err(|e| {
850+
log_error!(logger, "Failed writing to store: {}", e);
851+
BuildError::WriteFailed
852+
})?;
853+
}
836854
p2p_source
837855
},
838856
GossipSourceConfig::RapidGossipSync(rgs_server) => {
839-
let latest_sync_timestamp = io::utils::read_latest_rgs_sync_timestamp(
840-
Arc::clone(&kv_store),
841-
Arc::clone(&logger),
842-
)
843-
.unwrap_or(0);
857+
let latest_sync_timestamp =
858+
node_metrics.read().unwrap().latest_rgs_snapshot_timestamp.unwrap_or(0);
844859
Arc::new(GossipSource::new_rgs(
845860
rgs_server.clone(),
846861
latest_sync_timestamp,
@@ -857,7 +872,7 @@ fn build_with_store_internal(
857872
let liquidity_manager = Arc::new(LiquidityManager::new(
858873
Arc::clone(&keys_manager),
859874
Arc::clone(&channel_manager),
860-
Some(Arc::clone(&tx_sync)),
875+
Some(Arc::clone(&chain_source)),
861876
None,
862877
None,
863878
liquidity_client_config,
@@ -925,7 +940,7 @@ fn build_with_store_internal(
925940
let output_sweeper = match io::utils::read_output_sweeper(
926941
Arc::clone(&tx_broadcaster),
927942
Arc::clone(&fee_estimator),
928-
Arc::clone(&tx_sync),
943+
Arc::clone(&chain_source),
929944
Arc::clone(&keys_manager),
930945
Arc::clone(&kv_store),
931946
Arc::clone(&logger),
@@ -937,7 +952,7 @@ fn build_with_store_internal(
937952
channel_manager.current_best_block(),
938953
Arc::clone(&tx_broadcaster),
939954
Arc::clone(&fee_estimator),
940-
Some(Arc::clone(&tx_sync)),
955+
Some(Arc::clone(&chain_source)),
941956
Arc::clone(&keys_manager),
942957
Arc::clone(&keys_manager),
943958
Arc::clone(&kv_store),
@@ -999,23 +1014,14 @@ fn build_with_store_internal(
9991014
let (stop_sender, _) = tokio::sync::watch::channel(());
10001015
let (event_handling_stopped_sender, _) = tokio::sync::watch::channel(());
10011016

1002-
let is_listening = Arc::new(AtomicBool::new(false));
1003-
let latest_wallet_sync_timestamp = Arc::new(RwLock::new(None));
1004-
let latest_onchain_wallet_sync_timestamp = Arc::new(RwLock::new(None));
1005-
let latest_fee_rate_cache_update_timestamp = Arc::new(RwLock::new(None));
1006-
let latest_rgs_snapshot_timestamp = Arc::new(RwLock::new(None));
1007-
let latest_node_announcement_broadcast_timestamp = Arc::new(RwLock::new(None));
1008-
let latest_channel_monitor_archival_height = Arc::new(RwLock::new(None));
1009-
10101017
Ok(Node {
10111018
runtime,
10121019
stop_sender,
10131020
event_handling_stopped_sender,
10141021
config,
10151022
wallet,
1016-
tx_sync,
1023+
chain_source,
10171024
tx_broadcaster,
1018-
fee_estimator,
10191025
event_queue,
10201026
channel_manager,
10211027
chain_monitor,
@@ -1034,12 +1040,7 @@ fn build_with_store_internal(
10341040
peer_store,
10351041
payment_store,
10361042
is_listening,
1037-
latest_wallet_sync_timestamp,
1038-
latest_onchain_wallet_sync_timestamp,
1039-
latest_fee_rate_cache_update_timestamp,
1040-
latest_rgs_snapshot_timestamp,
1041-
latest_node_announcement_broadcast_timestamp,
1042-
latest_channel_monitor_archival_height,
1043+
node_metrics,
10431044
})
10441045
}
10451046

0 commit comments

Comments
 (0)