diff --git a/Cargo.toml b/Cargo.toml index bf8bed08c..96e47b260 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,7 +35,7 @@ lightning-net-tokio = { version = "0.1.0" } lightning-persister = { version = "0.1.0" } lightning-background-processor = { version = "0.1.0", features = ["futures"] } lightning-rapid-gossip-sync = { version = "0.1.0" } -lightning-block-sync = { version = "0.1.0", features = ["rpc-client", "tokio"] } +lightning-block-sync = { version = "0.1.0", features = ["rpc-client", "rest-client", "tokio"] } lightning-transaction-sync = { version = "0.1.0", features = ["esplora-async-https", "time", "electrum"] } lightning-liquidity = { version = "0.1.0", features = ["std"] } diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index 36767b790..3c240b43c 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -78,6 +78,7 @@ interface Builder { void set_chain_source_esplora(string server_url, EsploraSyncConfig? config); void set_chain_source_electrum(string server_url, ElectrumSyncConfig? config); void set_chain_source_bitcoind_rpc(string rpc_host, u16 rpc_port, string rpc_user, string rpc_password); + void set_chain_source_bitcoind_rest(string rest_host, u16 rest_port, string rpc_host, u16 rpc_port, string rpc_user, string rpc_password); void set_gossip_source_p2p(); void set_gossip_source_rgs(string rgs_server_url); void set_liquidity_source_lsps1(PublicKey node_id, SocketAddress address, string? token); diff --git a/docker-compose.yml b/docker-compose.yml index 425dc129a..e71fd70fb 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -13,10 +13,11 @@ services: "-rpcbind=0.0.0.0", "-rpcuser=user", "-rpcpassword=pass", - "-fallbackfee=0.00001" + "-fallbackfee=0.00001", + "-rest" ] ports: - - "18443:18443" # Regtest RPC port + - "18443:18443" # Regtest REST and RPC port - "18444:18444" # Regtest P2P port networks: - bitcoin-electrs diff --git a/src/builder.rs b/src/builder.rs index 31a0fee45..a177768f6 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -7,8 +7,9 @@ use crate::chain::{ChainSource, DEFAULT_ESPLORA_SERVER_URL}; use crate::config::{ - default_user_config, may_announce_channel, AnnounceError, Config, ElectrumSyncConfig, - EsploraSyncConfig, DEFAULT_LOG_FILENAME, DEFAULT_LOG_LEVEL, WALLET_KEYS_SEED_LEN, + default_user_config, may_announce_channel, AnnounceError, BitcoindRestClientConfig, Config, + ElectrumSyncConfig, EsploraSyncConfig, DEFAULT_LOG_FILENAME, DEFAULT_LOG_LEVEL, + WALLET_KEYS_SEED_LEN, }; use crate::connection::ConnectionManager; @@ -84,9 +85,21 @@ const LSPS_HARDENED_CHILD_INDEX: u32 = 577; #[derive(Debug, Clone)] enum ChainDataSourceConfig { - Esplora { server_url: String, sync_config: Option }, - Electrum { server_url: String, sync_config: Option }, - BitcoindRpc { rpc_host: String, rpc_port: u16, rpc_user: String, rpc_password: String }, + Esplora { + server_url: String, + sync_config: Option, + }, + Electrum { + server_url: String, + sync_config: Option, + }, + Bitcoind { + rpc_host: String, + rpc_port: u16, + rpc_user: String, + rpc_password: String, + rest_client_config: Option, + }, } #[derive(Debug, Clone)] @@ -299,13 +312,48 @@ impl NodeBuilder { self } - /// Configures the [`Node`] instance to source its chain data from the given Bitcoin Core RPC - /// endpoint. + /// Configures the [`Node`] instance to connect to a Bitcoin Core node via RPC. + /// + /// This method establishes an RPC connection that enables all essential chain operations including + /// transaction broadcasting and chain data synchronization. + /// + /// ## Parameters: + /// * `rpc_host`, `rpc_port`, `rpc_user`, `rpc_password` - Required parameters for the Bitcoin Core RPC + /// connection. pub fn set_chain_source_bitcoind_rpc( &mut self, rpc_host: String, rpc_port: u16, rpc_user: String, rpc_password: String, ) -> &mut Self { - self.chain_data_source_config = - Some(ChainDataSourceConfig::BitcoindRpc { rpc_host, rpc_port, rpc_user, rpc_password }); + self.chain_data_source_config = Some(ChainDataSourceConfig::Bitcoind { + rpc_host, + rpc_port, + rpc_user, + rpc_password, + rest_client_config: None, + }); + self + } + + /// Configures the [`Node`] instance to synchronize chain data from a Bitcoin Core REST endpoint. + /// + /// This method enables chain data synchronization via Bitcoin Core's REST interface. We pass + /// additional RPC configuration to non-REST-supported API calls like transaction broadcasting. + /// + /// ## Parameters: + /// * `rest_host`, `rest_port` - Required parameters for the Bitcoin Core REST connection. + /// * `rpc_host`, `rpc_port`, `rpc_user`, `rpc_password` - Required parameters for the Bitcoin Core RPC + /// connection + pub fn set_chain_source_bitcoind_rest( + &mut self, rest_host: String, rest_port: u16, rpc_host: String, rpc_port: u16, + rpc_user: String, rpc_password: String, + ) -> &mut Self { + self.chain_data_source_config = Some(ChainDataSourceConfig::Bitcoind { + rpc_host, + rpc_port, + rpc_user, + rpc_password, + rest_client_config: Some(BitcoindRestClientConfig { rest_host, rest_port }), + }); + self } @@ -716,8 +764,14 @@ impl ArcedNodeBuilder { self.inner.write().unwrap().set_chain_source_electrum(server_url, sync_config); } - /// Configures the [`Node`] instance to source its chain data from the given Bitcoin Core RPC - /// endpoint. + /// Configures the [`Node`] instance to connect to a Bitcoin Core node via RPC. + /// + /// This method establishes an RPC connection that enables all essential chain operations including + /// transaction broadcasting and chain data synchronization. + /// + /// ## Parameters: + /// * `rpc_host`, `rpc_port`, `rpc_user`, `rpc_password` - Required parameters for the Bitcoin Core RPC + /// connection. pub fn set_chain_source_bitcoind_rpc( &self, rpc_host: String, rpc_port: u16, rpc_user: String, rpc_password: String, ) { @@ -729,6 +783,29 @@ impl ArcedNodeBuilder { ); } + /// Configures the [`Node`] instance to synchronize chain data from a Bitcoin Core REST endpoint. + /// + /// This method enables chain data synchronization via Bitcoin Core's REST interface. We pass + /// additional RPC configuration to non-REST-supported API calls like transaction broadcasting. + /// + /// ## Parameters: + /// * `rest_host`, `rest_port` - Required parameters for the Bitcoin Core REST connection. + /// * `rpc_host`, `rpc_port`, `rpc_user`, `rpc_password` - Required parameters for the Bitcoin Core RPC + /// connection + pub fn set_chain_source_bitcoind_rest( + &self, rest_host: String, rest_port: u16, rpc_host: String, rpc_port: u16, + rpc_user: String, rpc_password: String, + ) { + self.inner.write().unwrap().set_chain_source_bitcoind_rest( + rest_host, + rest_port, + rpc_host, + rpc_port, + rpc_user, + rpc_password, + ); + } + /// Configures the [`Node`] instance to source its gossip data from the Lightning peer-to-peer /// network. pub fn set_gossip_source_p2p(&self) { @@ -1068,8 +1145,14 @@ fn build_with_store_internal( Arc::clone(&node_metrics), )) }, - Some(ChainDataSourceConfig::BitcoindRpc { rpc_host, rpc_port, rpc_user, rpc_password }) => { - Arc::new(ChainSource::new_bitcoind_rpc( + Some(ChainDataSourceConfig::Bitcoind { + rpc_host, + rpc_port, + rpc_user, + rpc_password, + rest_client_config, + }) => match rest_client_config { + Some(rest_client_config) => Arc::new(ChainSource::new_bitcoind_rest( rpc_host.clone(), *rpc_port, rpc_user.clone(), @@ -1079,10 +1162,25 @@ fn build_with_store_internal( Arc::clone(&tx_broadcaster), Arc::clone(&kv_store), Arc::clone(&config), + rest_client_config.clone(), Arc::clone(&logger), Arc::clone(&node_metrics), - )) + )), + None => Arc::new(ChainSource::new_bitcoind_rpc( + rpc_host.clone(), + *rpc_port, + rpc_user.clone(), + rpc_password.clone(), + Arc::clone(&wallet), + Arc::clone(&fee_estimator), + Arc::clone(&tx_broadcaster), + Arc::clone(&kv_store), + Arc::clone(&config), + Arc::clone(&logger), + Arc::clone(&node_metrics), + )), }, + None => { // Default to Esplora client. let server_url = DEFAULT_ESPLORA_SERVER_URL.to_string(); diff --git a/src/chain/bitcoind_rpc.rs b/src/chain/bitcoind.rs similarity index 53% rename from src/chain/bitcoind_rpc.rs rename to src/chain/bitcoind.rs index 3ca2c221f..98e77cac7 100644 --- a/src/chain/bitcoind_rpc.rs +++ b/src/chain/bitcoind.rs @@ -7,11 +7,14 @@ use crate::types::{ChainMonitor, ChannelManager, Sweeper, Wallet}; +use base64::prelude::BASE64_STANDARD; +use base64::Engine; +use bitcoin::{BlockHash, FeeRate, Transaction, Txid}; use lightning::chain::Listen; - -use lightning_block_sync::http::HttpEndpoint; -use lightning_block_sync::http::JsonResponse; +use lightning_block_sync::gossip::UtxoSource; +use lightning_block_sync::http::{HttpEndpoint, JsonResponse}; use lightning_block_sync::poll::ValidatedBlockHeader; +use lightning_block_sync::rest::RestClient; use lightning_block_sync::rpc::{RpcClient, RpcError}; use lightning_block_sync::{ AsyncBlockSourceResult, BlockData, BlockHeaderData, BlockSource, Cache, @@ -19,26 +22,31 @@ use lightning_block_sync::{ use serde::Serialize; -use bitcoin::{BlockHash, FeeRate, Transaction, Txid}; - -use base64::prelude::{Engine, BASE64_STANDARD}; - use std::collections::{HashMap, VecDeque}; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; -pub struct BitcoindRpcClient { - rpc_client: Arc, - latest_mempool_timestamp: AtomicU64, - mempool_entries_cache: tokio::sync::Mutex>, - mempool_txs_cache: tokio::sync::Mutex>, +pub enum BitcoindClient { + Rpc { + rpc_client: Arc, + latest_mempool_timestamp: AtomicU64, + mempool_entries_cache: tokio::sync::Mutex>, + mempool_txs_cache: tokio::sync::Mutex>, + }, + Rest { + rest_client: Arc, + rpc_client: Arc, + latest_mempool_timestamp: AtomicU64, + mempool_entries_cache: tokio::sync::Mutex>, + mempool_txs_cache: tokio::sync::Mutex>, + }, } -impl BitcoindRpcClient { - pub(crate) fn new(host: String, port: u16, rpc_user: String, rpc_password: String) -> Self { - let http_endpoint = HttpEndpoint::for_host(host.clone()).with_port(port); - let rpc_credentials = - BASE64_STANDARD.encode(format!("{}:{}", rpc_user.clone(), rpc_password.clone())); +impl BitcoindClient { + /// Creates a new RPC API client for the chain interactions with Bitcoin Core. + pub(crate) fn new_rpc(host: String, port: u16, rpc_user: String, rpc_password: String) -> Self { + let http_endpoint = endpoint(host, port); + let rpc_credentials = rpc_credentials(rpc_user, rpc_password); let rpc_client = Arc::new(RpcClient::new(&rpc_credentials, http_endpoint)); @@ -46,25 +54,104 @@ impl BitcoindRpcClient { let mempool_entries_cache = tokio::sync::Mutex::new(HashMap::new()); let mempool_txs_cache = tokio::sync::Mutex::new(HashMap::new()); - Self { rpc_client, latest_mempool_timestamp, mempool_entries_cache, mempool_txs_cache } + Self::Rpc { rpc_client, latest_mempool_timestamp, mempool_entries_cache, mempool_txs_cache } } - pub(crate) fn rpc_client(&self) -> Arc { - Arc::clone(&self.rpc_client) + /// Creates a new, primarily REST API client for the chain interactions + /// with Bitcoin Core. + /// + /// Aside the required REST host and port, we provide RPC configuration + /// options for necessary calls not supported by the REST interface. + pub(crate) fn new_rest( + rest_host: String, rest_port: u16, rpc_host: String, rpc_port: u16, rpc_user: String, + rpc_password: String, + ) -> Self { + let rest_endpoint = endpoint(rest_host, rest_port).with_path("/rest".to_string()); + let rest_client = Arc::new(RestClient::new(rest_endpoint)); + + let rpc_endpoint = endpoint(rpc_host, rpc_port); + let rpc_credentials = rpc_credentials(rpc_user, rpc_password); + let rpc_client = Arc::new(RpcClient::new(&rpc_credentials, rpc_endpoint)); + + let latest_mempool_timestamp = AtomicU64::new(0); + + let mempool_entries_cache = tokio::sync::Mutex::new(HashMap::new()); + let mempool_txs_cache = tokio::sync::Mutex::new(HashMap::new()); + + Self::Rest { + rest_client, + rpc_client, + latest_mempool_timestamp, + mempool_entries_cache, + mempool_txs_cache, + } } + pub(crate) fn utxo_source(&self) -> Arc { + match self { + BitcoindClient::Rpc { rpc_client, .. } => Arc::clone(rpc_client) as Arc, + BitcoindClient::Rest { rest_client, .. } => { + Arc::clone(rest_client) as Arc + }, + } + } + + /// Broadcasts the provided transaction. pub(crate) async fn broadcast_transaction(&self, tx: &Transaction) -> std::io::Result { + match self { + BitcoindClient::Rpc { rpc_client, .. } => { + Self::broadcast_transaction_inner(Arc::clone(rpc_client), tx).await + }, + BitcoindClient::Rest { rpc_client, .. } => { + // Bitcoin Core's REST interface does not support broadcasting transactions + // so we use the RPC client. + Self::broadcast_transaction_inner(Arc::clone(rpc_client), tx).await + }, + } + } + + async fn broadcast_transaction_inner( + rpc_client: Arc, tx: &Transaction, + ) -> std::io::Result { let tx_serialized = bitcoin::consensus::encode::serialize_hex(tx); let tx_json = serde_json::json!(tx_serialized); - self.rpc_client.call_method::("sendrawtransaction", &[tx_json]).await + rpc_client.call_method::("sendrawtransaction", &[tx_json]).await } + /// Retrieve the fee estimate needed for a transaction to begin + /// confirmation within the provided `num_blocks`. pub(crate) async fn get_fee_estimate_for_target( &self, num_blocks: usize, estimation_mode: FeeRateEstimationMode, + ) -> std::io::Result { + match self { + BitcoindClient::Rpc { rpc_client, .. } => { + Self::get_fee_estimate_for_target_inner( + Arc::clone(rpc_client), + num_blocks, + estimation_mode, + ) + .await + }, + BitcoindClient::Rest { rpc_client, .. } => { + // We rely on the internal RPC client to make this call, as this + // operation is not supported by Bitcoin Core's REST interface. + Self::get_fee_estimate_for_target_inner( + Arc::clone(rpc_client), + num_blocks, + estimation_mode, + ) + .await + }, + } + } + + /// Estimate the fee rate for the provided target number of blocks. + async fn get_fee_estimate_for_target_inner( + rpc_client: Arc, num_blocks: usize, estimation_mode: FeeRateEstimationMode, ) -> std::io::Result { let num_blocks_json = serde_json::json!(num_blocks); let estimation_mode_json = serde_json::json!(estimation_mode); - self.rpc_client + rpc_client .call_method::( "estimatesmartfee", &[num_blocks_json, estimation_mode_json], @@ -73,20 +160,59 @@ impl BitcoindRpcClient { .map(|resp| resp.0) } + /// Gets the mempool minimum fee rate. pub(crate) async fn get_mempool_minimum_fee_rate(&self) -> std::io::Result { - self.rpc_client + match self { + BitcoindClient::Rpc { rpc_client, .. } => { + Self::get_mempool_minimum_fee_rate_rpc(Arc::clone(rpc_client)).await + }, + BitcoindClient::Rest { rest_client, .. } => { + Self::get_mempool_minimum_fee_rate_rest(Arc::clone(rest_client)).await + }, + } + } + + /// Get the mempool minimum fee rate via RPC interface. + async fn get_mempool_minimum_fee_rate_rpc( + rpc_client: Arc, + ) -> std::io::Result { + rpc_client .call_method::("getmempoolinfo", &[]) .await .map(|resp| resp.0) } + /// Get the mempool minimum fee rate via REST interface. + async fn get_mempool_minimum_fee_rate_rest( + rest_client: Arc, + ) -> std::io::Result { + rest_client + .request_resource::("mempool/info.json") + .await + .map(|resp| resp.0) + } + + /// Gets the raw transaction for the provided transaction ID. Returns `None` if not found. pub(crate) async fn get_raw_transaction( &self, txid: &Txid, + ) -> std::io::Result> { + match self { + BitcoindClient::Rpc { rpc_client, .. } => { + Self::get_raw_transaction_rpc(Arc::clone(rpc_client), txid).await + }, + BitcoindClient::Rest { rest_client, .. } => { + Self::get_raw_transaction_rest(Arc::clone(rest_client), txid).await + }, + } + } + + /// Retrieve raw transaction for provided transaction ID via the RPC interface. + async fn get_raw_transaction_rpc( + rpc_client: Arc, txid: &Txid, ) -> std::io::Result> { let txid_hex = bitcoin::consensus::encode::serialize_hex(txid); let txid_json = serde_json::json!(txid_hex); - match self - .rpc_client + match rpc_client .call_method::("getrawtransaction", &[txid_json]) .await { @@ -118,25 +244,112 @@ impl BitcoindRpcClient { } } + /// Retrieve raw transaction for provided transaction ID via the REST interface. + async fn get_raw_transaction_rest( + rest_client: Arc, txid: &Txid, + ) -> std::io::Result> { + let txid_hex = bitcoin::consensus::encode::serialize_hex(txid); + let tx_path = format!("tx/{}.json", txid_hex); + match rest_client + .request_resource::(&tx_path) + .await + { + Ok(resp) => Ok(Some(resp.0)), + Err(e) => match e.kind() { + std::io::ErrorKind::Other => { + match e.into_inner() { + Some(inner) => { + let http_error_res: Result, _> = inner.downcast(); + match http_error_res { + Ok(http_error) => { + // Check if it's the HTTP NOT_FOUND error code. + if &http_error.status_code == "404" { + Ok(None) + } else { + Err(std::io::Error::new( + std::io::ErrorKind::Other, + http_error, + )) + } + }, + Err(_) => { + let error_msg = + format!("Failed to process {} response.", tx_path); + Err(std::io::Error::new( + std::io::ErrorKind::Other, + error_msg.as_str(), + )) + }, + } + }, + None => { + let error_msg = format!("Failed to process {} response.", tx_path); + Err(std::io::Error::new(std::io::ErrorKind::Other, error_msg.as_str())) + }, + } + }, + _ => { + let error_msg = format!("Failed to process {} response.", tx_path); + Err(std::io::Error::new(std::io::ErrorKind::Other, error_msg.as_str())) + }, + }, + } + } + + /// Retrieves the raw mempool. pub(crate) async fn get_raw_mempool(&self) -> std::io::Result> { + match self { + BitcoindClient::Rpc { rpc_client, .. } => { + Self::get_raw_mempool_rpc(Arc::clone(rpc_client)).await + }, + BitcoindClient::Rest { rest_client, .. } => { + Self::get_raw_mempool_rest(Arc::clone(rest_client)).await + }, + } + } + + /// Retrieves the raw mempool via the RPC interface. + async fn get_raw_mempool_rpc(rpc_client: Arc) -> std::io::Result> { let verbose_flag_json = serde_json::json!(false); - self.rpc_client + rpc_client .call_method::("getrawmempool", &[verbose_flag_json]) .await .map(|resp| resp.0) } + /// Retrieves the raw mempool via the REST interface. + async fn get_raw_mempool_rest(rest_client: Arc) -> std::io::Result> { + rest_client + .request_resource::( + "mempool/contents.json?verbose=false", + ) + .await + .map(|resp| resp.0) + } + + /// Retrieves an entry from the mempool if it exists, else return `None`. pub(crate) async fn get_mempool_entry( &self, txid: Txid, + ) -> std::io::Result> { + match self { + BitcoindClient::Rpc { rpc_client, .. } => { + Self::get_mempool_entry_inner(Arc::clone(rpc_client), txid).await + }, + BitcoindClient::Rest { rpc_client, .. } => { + Self::get_mempool_entry_inner(Arc::clone(rpc_client), txid).await + }, + } + } + + /// Retrieves the mempool entry of the provided transaction ID. + async fn get_mempool_entry_inner( + client: Arc, txid: Txid, ) -> std::io::Result> { let txid_hex = bitcoin::consensus::encode::serialize_hex(&txid); let txid_json = serde_json::json!(txid_hex); - match self - .rpc_client - .call_method::("getmempoolentry", &[txid_json]) - .await - { - Ok(resp) => Ok(Some(MempoolEntry { txid, height: resp.height, time: resp.time })), + + match client.call_method::("getmempoolentry", &[txid_json]).await { + Ok(resp) => Ok(Some(MempoolEntry { txid, time: resp.time, height: resp.height })), Err(e) => match e.into_inner() { Some(inner) => { let rpc_error_res: Result, _> = inner.downcast(); @@ -165,9 +378,22 @@ impl BitcoindRpcClient { } pub(crate) async fn update_mempool_entries_cache(&self) -> std::io::Result<()> { + match self { + BitcoindClient::Rpc { mempool_entries_cache, .. } => { + self.update_mempool_entries_cache_inner(mempool_entries_cache).await + }, + BitcoindClient::Rest { mempool_entries_cache, .. } => { + self.update_mempool_entries_cache_inner(mempool_entries_cache).await + }, + } + } + + async fn update_mempool_entries_cache_inner( + &self, mempool_entries_cache: &tokio::sync::Mutex>, + ) -> std::io::Result<()> { let mempool_txids = self.get_raw_mempool().await?; - let mut mempool_entries_cache = self.mempool_entries_cache.lock().await; + let mut mempool_entries_cache = mempool_entries_cache.lock().await; mempool_entries_cache.retain(|txid, _| mempool_txids.contains(txid)); if let Some(difference) = mempool_txids.len().checked_sub(mempool_entries_cache.capacity()) @@ -207,16 +433,54 @@ impl BitcoindRpcClient { /// This method is an adapted version of `bdk_bitcoind_rpc::Emitter::mempool`. It emits each /// transaction only once, unless we cannot assume the transaction's ancestors are already /// emitted. - async fn get_mempool_transactions_and_timestamp_at_height( + pub(crate) async fn get_mempool_transactions_and_timestamp_at_height( &self, best_processed_height: u32, ) -> std::io::Result> { - let prev_mempool_time = self.latest_mempool_timestamp.load(Ordering::Relaxed); + match self { + BitcoindClient::Rpc { + latest_mempool_timestamp, + mempool_entries_cache, + mempool_txs_cache, + .. + } => { + self.get_mempool_transactions_and_timestamp_at_height_inner( + latest_mempool_timestamp, + mempool_entries_cache, + mempool_txs_cache, + best_processed_height, + ) + .await + }, + BitcoindClient::Rest { + latest_mempool_timestamp, + mempool_entries_cache, + mempool_txs_cache, + .. + } => { + self.get_mempool_transactions_and_timestamp_at_height_inner( + latest_mempool_timestamp, + mempool_entries_cache, + mempool_txs_cache, + best_processed_height, + ) + .await + }, + } + } + + async fn get_mempool_transactions_and_timestamp_at_height_inner( + &self, latest_mempool_timestamp: &AtomicU64, + mempool_entries_cache: &tokio::sync::Mutex>, + mempool_txs_cache: &tokio::sync::Mutex>, + best_processed_height: u32, + ) -> std::io::Result> { + let prev_mempool_time = latest_mempool_timestamp.load(Ordering::Relaxed); let mut latest_time = prev_mempool_time; self.update_mempool_entries_cache().await?; - let mempool_entries_cache = self.mempool_entries_cache.lock().await; - let mut mempool_txs_cache = self.mempool_txs_cache.lock().await; + let mempool_entries_cache = mempool_entries_cache.lock().await; + let mut mempool_txs_cache = mempool_txs_cache.lock().await; mempool_txs_cache.retain(|txid, _| mempool_entries_cache.contains_key(txid)); if let Some(difference) = @@ -260,7 +524,7 @@ impl BitcoindRpcClient { } if !txs_to_emit.is_empty() { - self.latest_mempool_timestamp.store(latest_time, Ordering::Release); + latest_mempool_timestamp.store(latest_time, Ordering::Release); } Ok(txs_to_emit) } @@ -272,8 +536,33 @@ impl BitcoindRpcClient { async fn get_evicted_mempool_txids_and_timestamp( &self, unconfirmed_txids: Vec, ) -> std::io::Result> { - let latest_mempool_timestamp = self.latest_mempool_timestamp.load(Ordering::Relaxed); - let mempool_entries_cache = self.mempool_entries_cache.lock().await; + match self { + BitcoindClient::Rpc { latest_mempool_timestamp, mempool_entries_cache, .. } => { + Self::get_evicted_mempool_txids_and_timestamp_inner( + latest_mempool_timestamp, + mempool_entries_cache, + unconfirmed_txids, + ) + .await + }, + BitcoindClient::Rest { latest_mempool_timestamp, mempool_entries_cache, .. } => { + Self::get_evicted_mempool_txids_and_timestamp_inner( + latest_mempool_timestamp, + mempool_entries_cache, + unconfirmed_txids, + ) + .await + }, + } + } + + async fn get_evicted_mempool_txids_and_timestamp_inner( + latest_mempool_timestamp: &AtomicU64, + mempool_entries_cache: &tokio::sync::Mutex>, + unconfirmed_txids: Vec, + ) -> std::io::Result> { + let latest_mempool_timestamp = latest_mempool_timestamp.load(Ordering::Relaxed); + let mempool_entries_cache = mempool_entries_cache.lock().await; let evicted_txids = unconfirmed_txids .into_iter() .filter(|txid| mempool_entries_cache.contains_key(txid)) @@ -283,21 +572,42 @@ impl BitcoindRpcClient { } } -impl BlockSource for BitcoindRpcClient { +impl BlockSource for BitcoindClient { fn get_header<'a>( - &'a self, header_hash: &'a BlockHash, height_hint: Option, + &'a self, header_hash: &'a bitcoin::BlockHash, height_hint: Option, ) -> AsyncBlockSourceResult<'a, BlockHeaderData> { - Box::pin(async move { self.rpc_client.get_header(header_hash, height_hint).await }) + match self { + BitcoindClient::Rpc { rpc_client, .. } => { + Box::pin(async move { rpc_client.get_header(header_hash, height_hint).await }) + }, + BitcoindClient::Rest { rest_client, .. } => { + Box::pin(async move { rest_client.get_header(header_hash, height_hint).await }) + }, + } } fn get_block<'a>( - &'a self, header_hash: &'a BlockHash, + &'a self, header_hash: &'a bitcoin::BlockHash, ) -> AsyncBlockSourceResult<'a, BlockData> { - Box::pin(async move { self.rpc_client.get_block(header_hash).await }) + match self { + BitcoindClient::Rpc { rpc_client, .. } => { + Box::pin(async move { rpc_client.get_block(header_hash).await }) + }, + BitcoindClient::Rest { rest_client, .. } => { + Box::pin(async move { rest_client.get_block(header_hash).await }) + }, + } } - fn get_best_block(&self) -> AsyncBlockSourceResult<(BlockHash, Option)> { - Box::pin(async move { self.rpc_client.get_best_block().await }) + fn get_best_block(&self) -> AsyncBlockSourceResult<(bitcoin::BlockHash, Option)> { + match self { + BitcoindClient::Rpc { rpc_client, .. } => { + Box::pin(async move { rpc_client.get_best_block().await }) + }, + BitcoindClient::Rest { rest_client, .. } => { + Box::pin(async move { rest_client.get_best_block().await }) + }, + } } } @@ -325,7 +635,7 @@ impl TryInto for JsonResponse { } } -pub struct MempoolMinFeeResponse(pub FeeRate); +pub(crate) struct MempoolMinFeeResponse(pub FeeRate); impl TryInto for JsonResponse { type Error = std::io::Error; @@ -343,7 +653,7 @@ impl TryInto for JsonResponse { } } -pub struct GetRawTransactionResponse(pub Transaction); +pub(crate) struct GetRawTransactionResponse(pub Transaction); impl TryInto for JsonResponse { type Error = std::io::Error; @@ -530,3 +840,26 @@ impl Listen for ChainListener { self.output_sweeper.block_disconnected(header, height); } } + +pub(crate) fn rpc_credentials(rpc_user: String, rpc_password: String) -> String { + BASE64_STANDARD.encode(format!("{}:{}", rpc_user, rpc_password)) +} + +pub(crate) fn endpoint(host: String, port: u16) -> HttpEndpoint { + HttpEndpoint::for_host(host).with_port(port) +} + +#[derive(Debug)] +pub struct HttpError { + pub(crate) status_code: String, + pub(crate) contents: Vec, +} + +impl std::error::Error for HttpError {} + +impl std::fmt::Display for HttpError { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + let contents = String::from_utf8_lossy(&self.contents); + write!(f, "status_code: {}, contents: {}", self.status_code, contents) + } +} diff --git a/src/chain/mod.rs b/src/chain/mod.rs index df10ecac2..c3d5fdedc 100644 --- a/src/chain/mod.rs +++ b/src/chain/mod.rs @@ -5,18 +5,19 @@ // http://opensource.org/licenses/MIT>, at your option. You may not use this file except in // accordance with one or both of these licenses. -mod bitcoind_rpc; +mod bitcoind; mod electrum; -use crate::chain::bitcoind_rpc::{ - BitcoindRpcClient, BoundedHeaderCache, ChainListener, FeeRateEstimationMode, +use crate::chain::bitcoind::{ + BitcoindClient, BoundedHeaderCache, ChainListener, FeeRateEstimationMode, }; use crate::chain::electrum::ElectrumRuntimeClient; use crate::config::{ - BackgroundSyncConfig, Config, ElectrumSyncConfig, EsploraSyncConfig, BDK_CLIENT_CONCURRENCY, - BDK_CLIENT_STOP_GAP, BDK_WALLET_SYNC_TIMEOUT_SECS, FEE_RATE_CACHE_UPDATE_TIMEOUT_SECS, - LDK_WALLET_SYNC_TIMEOUT_SECS, RESOLVED_CHANNEL_MONITOR_ARCHIVAL_INTERVAL, - TX_BROADCAST_TIMEOUT_SECS, WALLET_SYNC_INTERVAL_MINIMUM_SECS, + BackgroundSyncConfig, BitcoindRestClientConfig, Config, ElectrumSyncConfig, EsploraSyncConfig, + BDK_CLIENT_CONCURRENCY, BDK_CLIENT_STOP_GAP, BDK_WALLET_SYNC_TIMEOUT_SECS, + FEE_RATE_CACHE_UPDATE_TIMEOUT_SECS, LDK_WALLET_SYNC_TIMEOUT_SECS, + RESOLVED_CHANNEL_MONITOR_ARCHIVAL_INTERVAL, TX_BROADCAST_TIMEOUT_SECS, + WALLET_SYNC_INTERVAL_MINIMUM_SECS, }; use crate::fee_estimator::{ apply_post_estimation_adjustments, get_all_conf_targets, get_num_block_defaults_for_target, @@ -215,8 +216,8 @@ pub(crate) enum ChainSource { logger: Arc, node_metrics: Arc>, }, - BitcoindRpc { - bitcoind_rpc_client: Arc, + Bitcoind { + api_client: Arc, header_cache: tokio::sync::Mutex, latest_chain_tip: RwLock>, onchain_wallet: Arc, @@ -293,18 +294,23 @@ impl ChainSource { } pub(crate) fn new_bitcoind_rpc( - host: String, port: u16, rpc_user: String, rpc_password: String, + rpc_host: String, rpc_port: u16, rpc_user: String, rpc_password: String, onchain_wallet: Arc, fee_estimator: Arc, tx_broadcaster: Arc, kv_store: Arc, config: Arc, logger: Arc, node_metrics: Arc>, ) -> Self { - let bitcoind_rpc_client = - Arc::new(BitcoindRpcClient::new(host, port, rpc_user, rpc_password)); + let api_client = Arc::new(BitcoindClient::new_rpc( + rpc_host.clone(), + rpc_port.clone(), + rpc_user.clone(), + rpc_password.clone(), + )); + let header_cache = tokio::sync::Mutex::new(BoundedHeaderCache::new()); let latest_chain_tip = RwLock::new(None); let wallet_polling_status = Mutex::new(WalletSyncStatus::Completed); - Self::BitcoindRpc { - bitcoind_rpc_client, + Self::Bitcoind { + api_client, header_cache, latest_chain_tip, onchain_wallet, @@ -318,6 +324,41 @@ impl ChainSource { } } + pub(crate) fn new_bitcoind_rest( + rpc_host: String, rpc_port: u16, rpc_user: String, rpc_password: String, + onchain_wallet: Arc, fee_estimator: Arc, + tx_broadcaster: Arc, kv_store: Arc, config: Arc, + rest_client_config: BitcoindRestClientConfig, logger: Arc, + node_metrics: Arc>, + ) -> Self { + let api_client = Arc::new(BitcoindClient::new_rest( + rest_client_config.rest_host, + rest_client_config.rest_port, + rpc_host, + rpc_port, + rpc_user, + rpc_password, + )); + + let header_cache = tokio::sync::Mutex::new(BoundedHeaderCache::new()); + let latest_chain_tip = RwLock::new(None); + let wallet_polling_status = Mutex::new(WalletSyncStatus::Completed); + + Self::Bitcoind { + api_client, + header_cache, + latest_chain_tip, + wallet_polling_status, + onchain_wallet, + fee_estimator, + tx_broadcaster, + kv_store, + config, + logger, + node_metrics, + } + } + pub(crate) fn start(&self, runtime: Arc) -> Result<(), Error> { match self { Self::Electrum { server_url, electrum_runtime_status, config, logger, .. } => { @@ -348,7 +389,7 @@ impl ChainSource { pub(crate) fn as_utxo_source(&self) -> Option> { match self { - Self::BitcoindRpc { bitcoind_rpc_client, .. } => Some(bitcoind_rpc_client.rpc_client()), + Self::Bitcoind { api_client, .. } => Some(api_client.utxo_source()), _ => None, } } @@ -399,8 +440,8 @@ impl ChainSource { return; } }, - Self::BitcoindRpc { - bitcoind_rpc_client, + Self::Bitcoind { + api_client, header_cache, latest_chain_tip, onchain_wallet, @@ -469,7 +510,7 @@ impl ChainSource { let mut locked_header_cache = header_cache.lock().await; let now = SystemTime::now(); match synchronize_listeners( - bitcoind_rpc_client.as_ref(), + api_client.as_ref(), config.network, &mut *locked_header_cache, chain_listeners.clone(), @@ -836,8 +877,8 @@ impl ChainSource { res }, - Self::BitcoindRpc { .. } => { - // In BitcoindRpc mode we sync lightning and onchain wallet in one go by via + Self::Bitcoind { .. } => { + // In BitcoindRpc mode we sync lightning and onchain wallet in one go via // `ChainPoller`. So nothing to do here. unreachable!("Onchain wallet will be synced via chain polling") }, @@ -1006,8 +1047,8 @@ impl ChainSource { res }, - Self::BitcoindRpc { .. } => { - // In BitcoindRpc mode we sync lightning and onchain wallet in one go by via + Self::Bitcoind { .. } => { + // In BitcoindRpc mode we sync lightning and onchain wallet in one go via // `ChainPoller`. So nothing to do here. unreachable!("Lightning wallet will be synced via chain polling") }, @@ -1029,8 +1070,8 @@ impl ChainSource { // `sync_onchain_wallet` and `sync_lightning_wallet`. So nothing to do here. unreachable!("Listeners will be synced via transction-based syncing") }, - Self::BitcoindRpc { - bitcoind_rpc_client, + Self::Bitcoind { + api_client, header_cache, latest_chain_tip, onchain_wallet, @@ -1059,7 +1100,7 @@ impl ChainSource { let chain_tip = if let Some(tip) = latest_chain_tip_opt { tip } else { - match validate_best_block_header(bitcoind_rpc_client.as_ref()).await { + match validate_best_block_header(api_client.as_ref()).await { Ok(tip) => { *latest_chain_tip.write().unwrap() = Some(tip); tip @@ -1077,8 +1118,7 @@ impl ChainSource { }; let mut locked_header_cache = header_cache.lock().await; - let chain_poller = - ChainPoller::new(Arc::clone(&bitcoind_rpc_client), config.network); + let chain_poller = ChainPoller::new(Arc::clone(&api_client), config.network); let chain_listener = ChainListener { onchain_wallet: Arc::clone(&onchain_wallet), channel_manager: Arc::clone(&channel_manager), @@ -1115,7 +1155,7 @@ impl ChainSource { let now = SystemTime::now(); let unconfirmed_txids = onchain_wallet.get_unconfirmed_txids(); - match bitcoind_rpc_client + match api_client .get_updated_mempool_transactions(cur_height, unconfirmed_txids) .await { @@ -1300,8 +1340,8 @@ impl ChainSource { Ok(()) }, - Self::BitcoindRpc { - bitcoind_rpc_client, + Self::Bitcoind { + api_client, fee_estimator, config, kv_store, @@ -1332,7 +1372,7 @@ impl ChainSource { ConfirmationTarget::Lightning( LdkConfirmationTarget::MinAllowedAnchorChannelRemoteFee, ) => { - let estimation_fut = bitcoind_rpc_client.get_mempool_minimum_fee_rate(); + let estimation_fut = api_client.get_mempool_minimum_fee_rate(); get_fee_rate_update!(estimation_fut) }, ConfirmationTarget::Lightning( @@ -1340,8 +1380,8 @@ impl ChainSource { ) => { let num_blocks = get_num_block_defaults_for_target(target); let estimation_mode = FeeRateEstimationMode::Conservative; - let estimation_fut = bitcoind_rpc_client - .get_fee_estimate_for_target(num_blocks, estimation_mode); + let estimation_fut = + api_client.get_fee_estimate_for_target(num_blocks, estimation_mode); get_fee_rate_update!(estimation_fut) }, ConfirmationTarget::Lightning( @@ -1349,16 +1389,16 @@ impl ChainSource { ) => { let num_blocks = get_num_block_defaults_for_target(target); let estimation_mode = FeeRateEstimationMode::Conservative; - let estimation_fut = bitcoind_rpc_client - .get_fee_estimate_for_target(num_blocks, estimation_mode); + let estimation_fut = + api_client.get_fee_estimate_for_target(num_blocks, estimation_mode); get_fee_rate_update!(estimation_fut) }, _ => { // Otherwise, we default to economical block-target estimate. let num_blocks = get_num_block_defaults_for_target(target); let estimation_mode = FeeRateEstimationMode::Economical; - let estimation_fut = bitcoind_rpc_client - .get_fee_estimate_for_target(num_blocks, estimation_mode); + let estimation_fut = + api_client.get_fee_estimate_for_target(num_blocks, estimation_mode); get_fee_rate_update!(estimation_fut) }, }; @@ -1530,7 +1570,7 @@ impl ChainSource { } } }, - Self::BitcoindRpc { bitcoind_rpc_client, tx_broadcaster, logger, .. } => { + Self::Bitcoind { api_client, tx_broadcaster, logger, .. } => { // While it's a bit unclear when we'd be able to lean on Bitcoin Core >v28 // features, we should eventually switch to use `submitpackage` via the // `rust-bitcoind-json-rpc` crate rather than just broadcasting individual @@ -1541,7 +1581,7 @@ impl ChainSource { let txid = tx.compute_txid(); let timeout_fut = tokio::time::timeout( Duration::from_secs(TX_BROADCAST_TIMEOUT_SECS), - bitcoind_rpc_client.broadcast_transaction(tx), + api_client.broadcast_transaction(tx), ); match timeout_fut.await { Ok(res) => match res { @@ -1595,7 +1635,7 @@ impl Filter for ChainSource { Self::Electrum { electrum_runtime_status, .. } => { electrum_runtime_status.write().unwrap().register_tx(txid, script_pubkey) }, - Self::BitcoindRpc { .. } => (), + Self::Bitcoind { .. } => (), } } fn register_output(&self, output: lightning::chain::WatchedOutput) { @@ -1604,7 +1644,7 @@ impl Filter for ChainSource { Self::Electrum { electrum_runtime_status, .. } => { electrum_runtime_status.write().unwrap().register_output(output) }, - Self::BitcoindRpc { .. } => (), + Self::Bitcoind { .. } => (), } } } diff --git a/src/config.rs b/src/config.rs index 4a39c1b56..a2930ea5a 100644 --- a/src/config.rs +++ b/src/config.rs @@ -397,6 +397,15 @@ impl Default for ElectrumSyncConfig { } } +/// Configuration for syncing with Bitcoin Core backend via REST. +#[derive(Debug, Clone)] +pub struct BitcoindRestClientConfig { + /// Host URL. + pub rest_host: String, + /// Host port. + pub rest_port: u16, +} + /// Options which apply on a per-channel basis and may change at runtime or based on negotiation /// with our counterparty. #[derive(Copy, Clone, Debug, PartialEq, Eq)] diff --git a/src/lib.rs b/src/lib.rs index b09f9a9f7..a75da763a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1277,7 +1277,7 @@ impl Node { .await?; chain_source.sync_onchain_wallet().await?; }, - ChainSource::BitcoindRpc { .. } => { + ChainSource::Bitcoind { .. } => { chain_source.update_fee_rate_estimates().await?; chain_source .poll_and_update_listeners(sync_cman, sync_cmon, sync_sweeper) diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 3258df791..daed86475 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -174,6 +174,7 @@ pub(crate) fn setup_bitcoind_and_electrsd() -> (BitcoinD, ElectrsD) { ); let mut bitcoind_conf = corepc_node::Conf::default(); bitcoind_conf.network = "regtest"; + bitcoind_conf.args.push("-rest"); let bitcoind = BitcoinD::with_conf(bitcoind_exe, &bitcoind_conf).unwrap(); let electrs_exe = env::var("ELECTRS_EXE") @@ -256,7 +257,8 @@ type TestNode = Node; pub(crate) enum TestChainSource<'a> { Esplora(&'a ElectrsD), Electrum(&'a ElectrsD), - BitcoindRpc(&'a BitcoinD), + BitcoindRpcSync(&'a BitcoinD), + BitcoindRestSync(&'a BitcoinD), } #[derive(Clone, Default)] @@ -317,7 +319,7 @@ pub(crate) fn setup_node( let sync_config = ElectrumSyncConfig { background_sync_config: None }; builder.set_chain_source_electrum(electrum_url.clone(), Some(sync_config)); }, - TestChainSource::BitcoindRpc(bitcoind) => { + TestChainSource::BitcoindRpcSync(bitcoind) => { let rpc_host = bitcoind.params.rpc_socket.ip().to_string(); let rpc_port = bitcoind.params.rpc_socket.port(); let values = bitcoind.params.get_cookie_values().unwrap().unwrap(); @@ -325,6 +327,23 @@ pub(crate) fn setup_node( let rpc_password = values.password; builder.set_chain_source_bitcoind_rpc(rpc_host, rpc_port, rpc_user, rpc_password); }, + TestChainSource::BitcoindRestSync(bitcoind) => { + let rpc_host = bitcoind.params.rpc_socket.ip().to_string(); + let rpc_port = bitcoind.params.rpc_socket.port(); + let values = bitcoind.params.get_cookie_values().unwrap().unwrap(); + let rpc_user = values.user; + let rpc_password = values.password; + let rest_host = bitcoind.params.rpc_socket.ip().to_string(); + let rest_port = bitcoind.params.rpc_socket.port(); + builder.set_chain_source_bitcoind_rest( + rest_host, + rest_port, + rpc_host, + rpc_port, + rpc_user, + rpc_password, + ); + }, } match &config.log_writer { diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index db48eca23..fbd95ef50 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -56,9 +56,17 @@ fn channel_full_cycle_electrum() { } #[test] -fn channel_full_cycle_bitcoind() { +fn channel_full_cycle_bitcoind_rpc_sync() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); - let chain_source = TestChainSource::BitcoindRpc(&bitcoind); + let chain_source = TestChainSource::BitcoindRpcSync(&bitcoind); + let (node_a, node_b) = setup_two_nodes(&chain_source, false, true, false); + do_channel_full_cycle(node_a, node_b, &bitcoind.client, &electrsd.client, false, true, false); +} + +#[test] +fn channel_full_cycle_bitcoind_rest_sync() { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = TestChainSource::BitcoindRestSync(&bitcoind); let (node_a, node_b) = setup_two_nodes(&chain_source, false, true, false); do_channel_full_cycle(node_a, node_b, &bitcoind.client, &electrsd.client, false, true, false); }