Skip to content

Start verifying gossip data via bitcoind RPC #428

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1126,6 +1126,12 @@ fn build_with_store_internal(

liquidity_source.as_ref().map(|l| l.set_peer_manager(Arc::clone(&peer_manager)));

gossip_source.set_gossip_verifier(
Arc::clone(&chain_source),
Arc::clone(&peer_manager),
Arc::clone(&runtime),
);

let connection_manager =
Arc::new(ConnectionManager::new(Arc::clone(&peer_manager), Arc::clone(&logger)));

Expand Down
4 changes: 4 additions & 0 deletions src/chain/bitcoind_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ impl BitcoindRpcClient {
Self { rpc_client, latest_mempool_timestamp }
}

pub(crate) fn rpc_client(&self) -> Arc<RpcClient> {
Arc::clone(&self.rpc_client)
}

pub(crate) async fn broadcast_transaction(&self, tx: &Transaction) -> std::io::Result<Txid> {
let tx_serialized = bitcoin::consensus::encode::serialize_hex(tx);
let tx_json = serde_json::json!(tx_serialized);
Expand Down
8 changes: 8 additions & 0 deletions src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use lightning::util::ser::Writeable;

use lightning_transaction_sync::EsploraSyncClient;

use lightning_block_sync::gossip::UtxoSource;
use lightning_block_sync::init::{synchronize_listeners, validate_best_block_header};
use lightning_block_sync::poll::{ChainPoller, ChainTip, ValidatedBlockHeader};
use lightning_block_sync::SpvClient;
Expand Down Expand Up @@ -192,6 +193,13 @@ impl ChainSource {
}
}

pub(crate) fn as_utxo_source(&self) -> Option<Arc<dyn UtxoSource>> {
match self {
Self::BitcoindRpc { bitcoind_rpc_client, .. } => Some(bitcoind_rpc_client.rpc_client()),
_ => None,
}
}

pub(crate) async fn continuously_sync_wallets(
&self, mut stop_sync_receiver: tokio::sync::watch::Receiver<()>,
channel_manager: Arc<ChannelManager>, chain_monitor: Arc<ChainMonitor>,
Expand Down
67 changes: 59 additions & 8 deletions src/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,23 @@
// http://opensource.org/licenses/MIT>, at your option. You may not use this file except in
// accordance with one or both of these licenses.

use crate::chain::ChainSource;
use crate::config::RGS_SYNC_TIMEOUT_SECS;
use crate::logger::{log_trace, FilesystemLogger, Logger};
use crate::types::{GossipSync, Graph, P2PGossipSync, RapidGossipSync};
use crate::logger::{log_error, log_trace, FilesystemLogger, Logger};
use crate::types::{GossipSync, Graph, P2PGossipSync, PeerManager, RapidGossipSync, UtxoLookup};
use crate::Error;

use lightning::routing::utxo::UtxoLookup;
use lightning_block_sync::gossip::{FutureSpawner, GossipVerifier};

use std::future::Future;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use std::time::Duration;

pub(crate) enum GossipSource {
P2PNetwork {
gossip_sync: Arc<P2PGossipSync>,
logger: Arc<FilesystemLogger>,
},
RapidGossipSync {
gossip_sync: Arc<RapidGossipSync>,
Expand All @@ -32,10 +35,10 @@ impl GossipSource {
pub fn new_p2p(network_graph: Arc<Graph>, logger: Arc<FilesystemLogger>) -> Self {
let gossip_sync = Arc::new(P2PGossipSync::new(
network_graph,
None::<Arc<dyn UtxoLookup + Send + Sync>>,
logger,
None::<Arc<UtxoLookup>>,
Arc::clone(&logger),
));
Self::P2PNetwork { gossip_sync }
Self::P2PNetwork { gossip_sync, logger }
}

pub fn new_rgs(
Expand All @@ -58,9 +61,30 @@ impl GossipSource {
}
}

pub(crate) fn set_gossip_verifier(
&self, chain_source: Arc<ChainSource>, peer_manager: Arc<PeerManager>,
runtime: Arc<RwLock<Option<Arc<tokio::runtime::Runtime>>>>,
) {
match self {
Self::P2PNetwork { gossip_sync, logger } => {
if let Some(utxo_source) = chain_source.as_utxo_source() {
let spawner = RuntimeSpawner::new(Arc::clone(&runtime), Arc::clone(&logger));
let gossip_verifier = Arc::new(GossipVerifier::new(
utxo_source,
spawner,
Arc::clone(gossip_sync),
peer_manager,
));
gossip_sync.add_utxo_lookup(Some(gossip_verifier));
}
},
_ => (),
}
}

pub async fn update_rgs_snapshot(&self) -> Result<u32, Error> {
match self {
Self::P2PNetwork { gossip_sync: _ } => Ok(0),
Self::P2PNetwork { gossip_sync: _, .. } => Ok(0),
Self::RapidGossipSync { gossip_sync, server_url, latest_sync_timestamp, logger } => {
let query_timestamp = latest_sync_timestamp.load(Ordering::Acquire);
let query_url = format!("{}/{}", server_url, query_timestamp);
Expand Down Expand Up @@ -101,3 +125,30 @@ impl GossipSource {
}
}
}

pub(crate) struct RuntimeSpawner {
runtime: Arc<RwLock<Option<Arc<tokio::runtime::Runtime>>>>,
logger: Arc<FilesystemLogger>,
}

impl RuntimeSpawner {
pub(crate) fn new(
runtime: Arc<RwLock<Option<Arc<tokio::runtime::Runtime>>>>, logger: Arc<FilesystemLogger>,
) -> Self {
Self { runtime, logger }
}
}

impl FutureSpawner for RuntimeSpawner {
fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
let rt_lock = self.runtime.read().unwrap();
if rt_lock.is_none() {
log_error!(self.logger, "Tried spawing a future while the runtime wasn't available. This should never happen.");
debug_assert!(false, "Tried spawing a future while the runtime wasn't available. This should never happen.");
return;
}

let runtime = rt_lock.as_ref().unwrap();
runtime.spawn(future);
}
}
7 changes: 6 additions & 1 deletion src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use crate::chain::ChainSource;
use crate::config::ChannelConfig;
use crate::fee_estimator::OnchainFeeEstimator;
use crate::gossip::RuntimeSpawner;
use crate::logger::FilesystemLogger;
use crate::message_handler::NodeCustomMessageHandler;

Expand All @@ -25,6 +26,9 @@ use lightning::sign::InMemorySigner;
use lightning::util::persist::KVStore;
use lightning::util::ser::{Readable, Writeable, Writer};
use lightning::util::sweep::OutputSweeper;

use lightning_block_sync::gossip::{GossipVerifier, UtxoSource};

use lightning_net_tokio::SocketDescriptor;

use bitcoin::secp256k1::PublicKey;
Expand Down Expand Up @@ -91,7 +95,8 @@ pub(crate) type Scorer = ProbabilisticScorer<Arc<Graph>, Arc<FilesystemLogger>>;

pub(crate) type Graph = gossip::NetworkGraph<Arc<FilesystemLogger>>;

pub(crate) type UtxoLookup = dyn lightning::routing::utxo::UtxoLookup + Send + Sync;
pub(crate) type UtxoLookup =
GossipVerifier<RuntimeSpawner, Arc<dyn UtxoSource>, Arc<FilesystemLogger>>;

pub(crate) type P2PGossipSync =
lightning::routing::gossip::P2PGossipSync<Arc<Graph>, Arc<UtxoLookup>, Arc<FilesystemLogger>>;
Expand Down
Loading