From 51a3353740126b5f251958285b992a7673c65d1a Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Sat, 21 Jan 2023 03:28:35 +0000 Subject: [PATCH 01/16] Move `chain::Access` to `routing` and rename it `UtxoLookup` The `chain::Access` trait (and the `chain::AccessError` enum) is a bit strange - it only really makes sense if users import it via the `chain` module, otherwise they're left with a trait just called `Access`. Worse, for bindings users its always just called `Access`, in part because many downstream languages don't have a mechanism to import a module and then refer to it. Further, its stuck dangling in the `chain` top-level mod.rs file, sitting in a module that doesn't use it at all (it's only used in `routing::gossip`). Instead, we give it its full name - `UtxoLookup` (and rename the error enum `UtxoLookupError`) and put it in the a new `routing::utxo` module, next to `routing::gossip`. --- ARCH.md | 6 +- fuzz/src/full_stack.rs | 3 +- fuzz/src/router.rs | 12 ++-- lightning-background-processor/src/lib.rs | 45 ++++++------- lightning-net-tokio/src/lib.rs | 4 +- lightning/src/chain/mod.rs | 21 ------ lightning/src/routing/gossip.rs | 78 +++++++++++------------ lightning/src/routing/mod.rs | 1 + lightning/src/routing/router.rs | 2 +- lightning/src/routing/utxo.rs | 36 +++++++++++ lightning/src/util/test_utils.rs | 12 ++-- 11 files changed, 118 insertions(+), 102 deletions(-) create mode 100644 lightning/src/routing/utxo.rs diff --git a/ARCH.md b/ARCH.md index 9efccd9c9dd..bd5fc1bd316 100644 --- a/ARCH.md +++ b/ARCH.md @@ -51,9 +51,9 @@ At a high level, some of the common interfaces fit together as follows: --------------- / (as EventsProvider) ^ | | | PeerManager |- \ | | | --------------- \ | (is-a) | | - | ----------------- \ _---------------- / / - | | chain::Access | \ / | ChainMonitor |--------------- - | ----------------- \ / ---------------- + | -------------- \ _---------------- / / + | | UtxoLookup | \ / | ChainMonitor |--------------- + | -------------- \ / ---------------- | ^ \ / | (as RoutingMessageHandler) | v v \ ----------------- --------- ----------------- diff --git a/fuzz/src/full_stack.rs b/fuzz/src/full_stack.rs index 145b7c8350d..57c665a2ef1 100644 --- a/fuzz/src/full_stack.rs +++ b/fuzz/src/full_stack.rs @@ -41,6 +41,7 @@ use lightning::ln::peer_handler::{MessageHandler,PeerManager,SocketDescriptor,Ig use lightning::ln::msgs::{self, DecodeError}; use lightning::ln::script::ShutdownScript; use lightning::routing::gossip::{P2PGossipSync, NetworkGraph}; +use lightning::routing::utxo::UtxoLookup; use lightning::routing::router::{find_route, InFlightHtlcs, PaymentParameters, Route, RouteHop, RouteParameters, Router}; use lightning::routing::scoring::FixedPenaltyScorer; use lightning::util::config::UserConfig; @@ -183,7 +184,7 @@ impl<'a> std::hash::Hash for Peer<'a> { type ChannelMan<'a> = ChannelManager< Arc, Arc, Arc, Arc, Arc>>, Arc, Arc, Arc, Arc, Arc, &'a FuzzRouter, Arc>; -type PeerMan<'a> = PeerManager, Arc>, Arc>>, Arc, Arc>>, IgnoringMessageHandler, Arc, IgnoringMessageHandler, Arc>; +type PeerMan<'a> = PeerManager, Arc>, Arc>>, Arc, Arc>>, IgnoringMessageHandler, Arc, IgnoringMessageHandler, Arc>; struct MoneyLossDetector<'a> { manager: Arc>, diff --git a/fuzz/src/router.rs b/fuzz/src/router.rs index dbf619f1ddf..3d5c88bc3da 100644 --- a/fuzz/src/router.rs +++ b/fuzz/src/router.rs @@ -11,11 +11,11 @@ use bitcoin::blockdata::script::Builder; use bitcoin::blockdata::transaction::TxOut; use bitcoin::hash_types::BlockHash; -use lightning::chain; use lightning::chain::transaction::OutPoint; use lightning::ln::channelmanager::{self, ChannelDetails, ChannelCounterparty}; use lightning::ln::msgs; use lightning::routing::gossip::{NetworkGraph, RoutingFees}; +use lightning::routing::utxo::{UtxoLookup, UtxoLookupError}; use lightning::routing::router::{find_route, PaymentParameters, RouteHint, RouteHintHop, RouteParameters}; use lightning::routing::scoring::FixedPenaltyScorer; use lightning::util::config::UserConfig; @@ -84,13 +84,13 @@ impl InputData { struct FuzzChainSource { input: Arc, } -impl chain::Access for FuzzChainSource { - fn get_utxo(&self, _genesis_hash: &BlockHash, _short_channel_id: u64) -> Result { +impl UtxoLookup for FuzzChainSource { + fn get_utxo(&self, _genesis_hash: &BlockHash, _short_channel_id: u64) -> Result { match self.input.get_slice(2) { - Some(&[0, _]) => Err(chain::AccessError::UnknownChain), - Some(&[1, _]) => Err(chain::AccessError::UnknownTx), + Some(&[0, _]) => Err(UtxoLookupError::UnknownChain), + Some(&[1, _]) => Err(UtxoLookupError::UnknownTx), Some(&[_, x]) => Ok(TxOut { value: 0, script_pubkey: Builder::new().push_int(x as i64).into_script().to_v0_p2wsh() }), - None => Err(chain::AccessError::UnknownTx), + None => Err(UtxoLookupError::UnknownTx), _ => unreachable!(), } } diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 087aa174722..9b1df752271 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -30,6 +30,7 @@ use lightning::ln::channelmanager::ChannelManager; use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler}; use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor}; use lightning::routing::gossip::{NetworkGraph, P2PGossipSync}; +use lightning::routing::utxo::UtxoLookup; use lightning::routing::router::Router; use lightning::routing::scoring::{Score, WriteableScore}; use lightning::util::events::{Event, EventHandler, EventsProvider}; @@ -116,13 +117,13 @@ const FIRST_NETWORK_PRUNE_TIMER: u64 = 1; /// Either [`P2PGossipSync`] or [`RapidGossipSync`]. pub enum GossipSync< - P: Deref>, + P: Deref>, R: Deref>, G: Deref>, - A: Deref, + U: Deref, L: Deref, > -where A::Target: chain::Access, L::Target: Logger { +where U::Target: UtxoLookup, L::Target: Logger { /// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7. P2P(P), /// Rapid gossip sync from a trusted server. @@ -132,13 +133,13 @@ where A::Target: chain::Access, L::Target: Logger { } impl< - P: Deref>, + P: Deref>, R: Deref>, G: Deref>, - A: Deref, + U: Deref, L: Deref, -> GossipSync -where A::Target: chain::Access, L::Target: Logger { +> GossipSync +where U::Target: UtxoLookup, L::Target: Logger { fn network_graph(&self) -> Option<&G> { match self { GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()), @@ -163,10 +164,10 @@ where A::Target: chain::Access, L::Target: Logger { } /// (C-not exported) as the bindings concretize everything and have constructors for us -impl>, G: Deref>, A: Deref, L: Deref> - GossipSync, G, A, L> +impl>, G: Deref>, U: Deref, L: Deref> + GossipSync, G, U, L> where - A::Target: chain::Access, + U::Target: UtxoLookup, L::Target: Logger, { /// Initializes a new [`GossipSync::P2P`] variant. @@ -178,10 +179,10 @@ where /// (C-not exported) as the bindings concretize everything and have constructors for us impl<'a, R: Deref>, G: Deref>, L: Deref> GossipSync< - &P2PGossipSync, + &P2PGossipSync, R, G, - &'a (dyn chain::Access + Send + Sync), + &'a (dyn UtxoLookup + Send + Sync), L, > where @@ -196,10 +197,10 @@ where /// (C-not exported) as the bindings concretize everything and have constructors for us impl<'a, L: Deref> GossipSync< - &P2PGossipSync<&'a NetworkGraph, &'a (dyn chain::Access + Send + Sync), L>, + &P2PGossipSync<&'a NetworkGraph, &'a (dyn UtxoLookup + Send + Sync), L>, &RapidGossipSync<&'a NetworkGraph, L>, &'a NetworkGraph, - &'a (dyn chain::Access + Send + Sync), + &'a (dyn UtxoLookup + Send + Sync), L, > where @@ -397,7 +398,7 @@ macro_rules! define_run_body { #[cfg(feature = "futures")] pub async fn process_events_async< 'a, - CA: 'static + Deref + Send + Sync, + UL: 'static + Deref + Send + Sync, CF: 'static + Deref + Send + Sync, CW: 'static + Deref + Send + Sync, T: 'static + Deref + Send + Sync, @@ -418,7 +419,7 @@ pub async fn process_events_async< PS: 'static + Deref + Send, M: 'static + Deref::Signer, CF, T, F, L, P>> + Send + Sync, CM: 'static + Deref> + Send + Sync, - PGS: 'static + Deref> + Send + Sync, + PGS: 'static + Deref> + Send + Sync, RGS: 'static + Deref> + Send, UMH: 'static + Deref + Send + Sync, PM: 'static + Deref> + Send + Sync, @@ -428,11 +429,11 @@ pub async fn process_events_async< Sleeper: Fn(Duration) -> SleepFuture >( persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM, - gossip_sync: GossipSync, peer_manager: PM, logger: L, scorer: Option, + gossip_sync: GossipSync, peer_manager: PM, logger: L, scorer: Option, sleeper: Sleeper, ) -> Result<(), io::Error> where - CA::Target: 'static + chain::Access, + UL::Target: 'static + UtxoLookup, CF::Target: 'static + chain::Filter, CW::Target: 'static + chain::Watch<::Signer>, T::Target: 'static + BroadcasterInterface, @@ -531,7 +532,7 @@ impl BackgroundProcessor { /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable pub fn start< 'a, - CA: 'static + Deref + Send + Sync, + UL: 'static + Deref + Send + Sync, CF: 'static + Deref + Send + Sync, CW: 'static + Deref + Send + Sync, T: 'static + Deref + Send + Sync, @@ -551,7 +552,7 @@ impl BackgroundProcessor { PS: 'static + Deref + Send, M: 'static + Deref::Signer, CF, T, F, L, P>> + Send + Sync, CM: 'static + Deref> + Send + Sync, - PGS: 'static + Deref> + Send + Sync, + PGS: 'static + Deref> + Send + Sync, RGS: 'static + Deref> + Send, UMH: 'static + Deref + Send + Sync, PM: 'static + Deref> + Send + Sync, @@ -559,10 +560,10 @@ impl BackgroundProcessor { SC: for <'b> WriteableScore<'b>, >( persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM, - gossip_sync: GossipSync, peer_manager: PM, logger: L, scorer: Option, + gossip_sync: GossipSync, peer_manager: PM, logger: L, scorer: Option, ) -> Self where - CA::Target: 'static + chain::Access, + UL::Target: 'static + UtxoLookup, CF::Target: 'static + chain::Filter, CW::Target: 'static + chain::Watch<::Signer>, T::Target: 'static + BroadcasterInterface, diff --git a/lightning-net-tokio/src/lib.rs b/lightning-net-tokio/src/lib.rs index 16680faaa58..9b480fb0ae4 100644 --- a/lightning-net-tokio/src/lib.rs +++ b/lightning-net-tokio/src/lib.rs @@ -33,12 +33,12 @@ //! type FeeEstimator = dyn lightning::chain::chaininterface::FeeEstimator + Send + Sync; //! type Logger = dyn lightning::util::logger::Logger + Send + Sync; //! type NodeSigner = dyn lightning::chain::keysinterface::NodeSigner + Send + Sync; -//! type ChainAccess = dyn lightning::chain::Access + Send + Sync; +//! type UtxoLookup = dyn lightning::routing::utxo::UtxoLookup + Send + Sync; //! type ChainFilter = dyn lightning::chain::Filter + Send + Sync; //! type DataPersister = dyn lightning::chain::chainmonitor::Persist + Send + Sync; //! type ChainMonitor = lightning::chain::chainmonitor::ChainMonitor, Arc, Arc, Arc, Arc>; //! type ChannelManager = Arc>; -//! type PeerManager = Arc>; +//! type PeerManager = Arc>; //! //! // Connect to node with pubkey their_node_id at addr: //! async fn connect_to_node(peer_manager: PeerManager, chain_monitor: Arc, channel_manager: ChannelManager, their_node_id: PublicKey, addr: SocketAddr) { diff --git a/lightning/src/chain/mod.rs b/lightning/src/chain/mod.rs index 0370c0840f9..cd11bc337ef 100644 --- a/lightning/src/chain/mod.rs +++ b/lightning/src/chain/mod.rs @@ -12,7 +12,6 @@ use bitcoin::blockdata::block::{Block, BlockHeader}; use bitcoin::blockdata::constants::genesis_block; use bitcoin::blockdata::script::Script; -use bitcoin::blockdata::transaction::TxOut; use bitcoin::hash_types::{BlockHash, Txid}; use bitcoin::network::constants::Network; use bitcoin::secp256k1::PublicKey; @@ -60,26 +59,6 @@ impl BestBlock { pub fn height(&self) -> u32 { self.height } } -/// An error when accessing the chain via [`Access`]. -#[derive(Clone, Debug)] -pub enum AccessError { - /// The requested chain is unknown. - UnknownChain, - - /// The requested transaction doesn't exist or hasn't confirmed. - UnknownTx, -} - -/// The `Access` trait defines behavior for accessing chain data and state, such as blocks and -/// UTXOs. -pub trait Access { - /// Returns the transaction output of a funding transaction encoded by [`short_channel_id`]. - /// Returns an error if `genesis_hash` is for a different chain or if such a transaction output - /// is unknown. - /// - /// [`short_channel_id`]: https://github.com/lightning/bolts/blob/master/07-routing-gossip.md#definition-of-short_channel_id - fn get_utxo(&self, genesis_hash: &BlockHash, short_channel_id: u64) -> Result; -} /// The `Listen` trait is used to notify when blocks have been connected or disconnected from the /// chain. diff --git a/lightning/src/routing/gossip.rs b/lightning/src/routing/gossip.rs index a3331ab3beb..bbb8732f277 100644 --- a/lightning/src/routing/gossip.rs +++ b/lightning/src/routing/gossip.rs @@ -19,14 +19,13 @@ use bitcoin::hashes::Hash; use bitcoin::blockdata::transaction::TxOut; use bitcoin::hash_types::BlockHash; -use crate::chain; -use crate::chain::Access; use crate::ln::chan_utils::make_funding_redeemscript_from_slices; use crate::ln::features::{ChannelFeatures, NodeFeatures, InitFeatures}; use crate::ln::msgs::{DecodeError, ErrorAction, Init, LightningError, RoutingMessageHandler, NetAddress, MAX_VALUE_MSAT}; use crate::ln::msgs::{ChannelAnnouncement, ChannelUpdate, NodeAnnouncement, GossipTimestampFilter}; use crate::ln::msgs::{QueryChannelRange, ReplyChannelRange, QueryShortChannelIds, ReplyShortChannelIdsEnd}; use crate::ln::msgs; +use crate::routing::utxo::{UtxoLookup, UtxoLookupError}; use crate::util::ser::{Readable, ReadableArgs, Writeable, Writer, MaybeReadable}; use crate::util::logger::{Logger, Level}; use crate::util::events::{MessageSendEvent, MessageSendEventsProvider}; @@ -218,31 +217,30 @@ impl_writeable_tlv_based_enum_upgradable!(NetworkUpdate, /// This network graph is then used for routing payments. /// Provides interface to help with initial routing sync by /// serving historical announcements. -pub struct P2PGossipSync>, C: Deref, L: Deref> -where C::Target: chain::Access, L::Target: Logger +pub struct P2PGossipSync>, U: Deref, L: Deref> +where U::Target: UtxoLookup, L::Target: Logger { network_graph: G, - chain_access: Option, + utxo_lookup: Option, #[cfg(feature = "std")] full_syncs_requested: AtomicUsize, pending_events: Mutex>, logger: L, } -impl>, C: Deref, L: Deref> P2PGossipSync -where C::Target: chain::Access, L::Target: Logger +impl>, U: Deref, L: Deref> P2PGossipSync +where U::Target: UtxoLookup, L::Target: Logger { /// Creates a new tracker of the actual state of the network of channels and nodes, /// assuming an existing Network Graph. - /// Chain monitor is used to make sure announced channels exist on-chain, - /// channel data is correct, and that the announcement is signed with - /// channel owners' keys. - pub fn new(network_graph: G, chain_access: Option, logger: L) -> Self { + /// UTXO lookup is used to make sure announced channels exist on-chain, channel data is + /// correct, and the announcement is signed with channel owners' keys. + pub fn new(network_graph: G, utxo_lookup: Option, logger: L) -> Self { P2PGossipSync { network_graph, #[cfg(feature = "std")] full_syncs_requested: AtomicUsize::new(0), - chain_access, + utxo_lookup, pending_events: Mutex::new(vec![]), logger, } @@ -251,8 +249,8 @@ where C::Target: chain::Access, L::Target: Logger /// Adds a provider used to check new announcements. Does not affect /// existing announcements unless they are updated. /// Add, update or remove the provider would replace the current one. - pub fn add_chain_access(&mut self, chain_access: Option) { - self.chain_access = chain_access; + pub fn add_utxo_lookup(&mut self, utxo_lookup: Option) { + self.utxo_lookup = utxo_lookup; } /// Gets a reference to the underlying [`NetworkGraph`] which was provided in @@ -342,8 +340,8 @@ macro_rules! get_pubkey_from_node_id { } } -impl>, C: Deref, L: Deref> RoutingMessageHandler for P2PGossipSync -where C::Target: chain::Access, L::Target: Logger +impl>, U: Deref, L: Deref> RoutingMessageHandler for P2PGossipSync +where U::Target: UtxoLookup, L::Target: Logger { fn handle_node_announcement(&self, msg: &msgs::NodeAnnouncement) -> Result { self.network_graph.update_node_from_announcement(msg)?; @@ -353,7 +351,7 @@ where C::Target: chain::Access, L::Target: Logger } fn handle_channel_announcement(&self, msg: &msgs::ChannelAnnouncement) -> Result { - self.network_graph.update_channel_from_announcement(msg, &self.chain_access)?; + self.network_graph.update_channel_from_announcement(msg, &self.utxo_lookup)?; log_gossip!(self.logger, "Added channel_announcement for {}{}", msg.contents.short_channel_id, if !msg.contents.excess_data.is_empty() { " with excess uninterpreted data!" } else { "" }); Ok(msg.contents.excess_data.len() <= MAX_EXCESS_BYTES_FOR_RELAY) } @@ -632,9 +630,9 @@ where C::Target: chain::Access, L::Target: Logger } } -impl>, C: Deref, L: Deref> MessageSendEventsProvider for P2PGossipSync +impl>, U: Deref, L: Deref> MessageSendEventsProvider for P2PGossipSync where - C::Target: chain::Access, + U::Target: UtxoLookup, L::Target: Logger, { fn get_and_clear_pending_msg_events(&self) -> Vec { @@ -1337,35 +1335,35 @@ impl NetworkGraph where L::Target: Logger { /// RoutingMessageHandler implementation to call it indirectly. This may be useful to accept /// routing messages from a source using a protocol other than the lightning P2P protocol. /// - /// If a `chain::Access` object is provided via `chain_access`, it will be called to verify + /// If a [`UtxoLookup`] object is provided via `utxo_lookup`, it will be called to verify /// the corresponding UTXO exists on chain and is correctly-formatted. - pub fn update_channel_from_announcement( - &self, msg: &msgs::ChannelAnnouncement, chain_access: &Option, + pub fn update_channel_from_announcement( + &self, msg: &msgs::ChannelAnnouncement, utxo_lookup: &Option, ) -> Result<(), LightningError> where - C::Target: chain::Access, + U::Target: UtxoLookup, { let msg_hash = hash_to_message!(&Sha256dHash::hash(&msg.contents.encode()[..])[..]); secp_verify_sig!(self.secp_ctx, &msg_hash, &msg.node_signature_1, &get_pubkey_from_node_id!(msg.contents.node_id_1, "channel_announcement"), "channel_announcement"); secp_verify_sig!(self.secp_ctx, &msg_hash, &msg.node_signature_2, &get_pubkey_from_node_id!(msg.contents.node_id_2, "channel_announcement"), "channel_announcement"); secp_verify_sig!(self.secp_ctx, &msg_hash, &msg.bitcoin_signature_1, &get_pubkey_from_node_id!(msg.contents.bitcoin_key_1, "channel_announcement"), "channel_announcement"); secp_verify_sig!(self.secp_ctx, &msg_hash, &msg.bitcoin_signature_2, &get_pubkey_from_node_id!(msg.contents.bitcoin_key_2, "channel_announcement"), "channel_announcement"); - self.update_channel_from_unsigned_announcement_intern(&msg.contents, Some(msg), chain_access) + self.update_channel_from_unsigned_announcement_intern(&msg.contents, Some(msg), utxo_lookup) } /// Store or update channel info from a channel announcement without verifying the associated /// signatures. Because we aren't given the associated signatures here we cannot relay the /// channel announcement to any of our peers. /// - /// If a `chain::Access` object is provided via `chain_access`, it will be called to verify + /// If a [`UtxoLookup`] object is provided via `utxo_lookup`, it will be called to verify /// the corresponding UTXO exists on chain and is correctly-formatted. - pub fn update_channel_from_unsigned_announcement( - &self, msg: &msgs::UnsignedChannelAnnouncement, chain_access: &Option + pub fn update_channel_from_unsigned_announcement( + &self, msg: &msgs::UnsignedChannelAnnouncement, utxo_lookup: &Option ) -> Result<(), LightningError> where - C::Target: chain::Access, + U::Target: UtxoLookup, { - self.update_channel_from_unsigned_announcement_intern(msg, None, chain_access) + self.update_channel_from_unsigned_announcement_intern(msg, None, utxo_lookup) } /// Update channel from partial announcement data received via rapid gossip sync @@ -1444,11 +1442,11 @@ impl NetworkGraph where L::Target: Logger { Ok(()) } - fn update_channel_from_unsigned_announcement_intern( - &self, msg: &msgs::UnsignedChannelAnnouncement, full_msg: Option<&msgs::ChannelAnnouncement>, chain_access: &Option + fn update_channel_from_unsigned_announcement_intern( + &self, msg: &msgs::UnsignedChannelAnnouncement, full_msg: Option<&msgs::ChannelAnnouncement>, utxo_lookup: &Option ) -> Result<(), LightningError> where - C::Target: chain::Access, + U::Target: UtxoLookup, { if msg.node_id_1 == msg.node_id_2 || msg.bitcoin_key_1 == msg.bitcoin_key_2 { return Err(LightningError{err: "Channel announcement node had a channel with itself".to_owned(), action: ErrorAction::IgnoreError}); @@ -1476,7 +1474,7 @@ impl NetworkGraph where L::Target: Logger { action: ErrorAction::IgnoreDuplicateGossip }); } - } else if chain_access.is_none() { + } else if utxo_lookup.is_none() { // Similarly, if we can't check the chain right now anyway, ignore the // duplicate announcement without bothering to take the channels write lock. return Err(LightningError { @@ -1499,13 +1497,13 @@ impl NetworkGraph where L::Target: Logger { } } - let utxo_value = match &chain_access { + let utxo_value = match &utxo_lookup { &None => { // Tentatively accept, potentially exposing us to DoS attacks None }, - &Some(ref chain_access) => { - match chain_access.get_utxo(&msg.chain_hash, msg.short_channel_id) { + &Some(ref utxo_lookup) => { + match utxo_lookup.get_utxo(&msg.chain_hash, msg.short_channel_id) { Ok(TxOut { value, script_pubkey }) => { let expected_script = make_funding_redeemscript_from_slices(msg.bitcoin_key_1.as_slice(), msg.bitcoin_key_2.as_slice()).to_v0_p2wsh(); @@ -1516,10 +1514,10 @@ impl NetworkGraph where L::Target: Logger { //to the new HTLC max field in channel_update Some(value) }, - Err(chain::AccessError::UnknownChain) => { + Err(UtxoLookupError::UnknownChain) => { return Err(LightningError{err: format!("Channel announced on an unknown chain ({})", msg.chain_hash.encode().to_hex()), action: ErrorAction::IgnoreError}); }, - Err(chain::AccessError::UnknownTx) => { + Err(UtxoLookupError::UnknownTx) => { return Err(LightningError{err: "Channel announced without corresponding UTXO entry".to_owned(), action: ErrorAction::IgnoreError}); }, } @@ -1904,12 +1902,12 @@ impl ReadOnlyNetworkGraph<'_> { #[cfg(test)] mod tests { - use crate::chain; use crate::ln::channelmanager; use crate::ln::chan_utils::make_funding_redeemscript; #[cfg(feature = "std")] use crate::ln::features::InitFeatures; use crate::routing::gossip::{P2PGossipSync, NetworkGraph, NetworkUpdate, NodeAlias, MAX_EXCESS_BYTES_FOR_RELAY, NodeId, RoutingFees, ChannelUpdateInfo, ChannelInfo, NodeAnnouncementInfo, NodeInfo}; + use crate::routing::utxo::UtxoLookupError; use crate::ln::msgs::{RoutingMessageHandler, UnsignedNodeAnnouncement, NodeAnnouncement, UnsignedChannelAnnouncement, ChannelAnnouncement, UnsignedChannelUpdate, ChannelUpdate, ReplyChannelRange, QueryChannelRange, QueryShortChannelIds, MAX_VALUE_MSAT}; @@ -2141,7 +2139,7 @@ mod tests { // Test if an associated transaction were not on-chain (or not confirmed). let chain_source = test_utils::TestChainSource::new(Network::Testnet); - *chain_source.utxo_ret.lock().unwrap() = Err(chain::AccessError::UnknownTx); + *chain_source.utxo_ret.lock().unwrap() = Err(UtxoLookupError::UnknownTx); let network_graph = NetworkGraph::new(genesis_hash, &logger); gossip_sync = P2PGossipSync::new(&network_graph, Some(&chain_source), &logger); diff --git a/lightning/src/routing/mod.rs b/lightning/src/routing/mod.rs index 9bf0910663d..7fff856345c 100644 --- a/lightning/src/routing/mod.rs +++ b/lightning/src/routing/mod.rs @@ -9,6 +9,7 @@ //! Structs and impls for receiving messages about the network and storing the topology live here. +pub mod utxo; pub mod gossip; pub mod router; pub mod scoring; diff --git a/lightning/src/routing/router.rs b/lightning/src/routing/router.rs index 950384d957e..a2c98212f65 100644 --- a/lightning/src/routing/router.rs +++ b/lightning/src/routing/router.rs @@ -3530,7 +3530,7 @@ mod tests { .push_opcode(opcodes::all::OP_CHECKMULTISIG).into_script().to_v0_p2wsh(); *chain_monitor.utxo_ret.lock().unwrap() = Ok(TxOut { value: 15, script_pubkey: good_script.clone() }); - gossip_sync.add_chain_access(Some(chain_monitor)); + gossip_sync.add_utxo_lookup(Some(chain_monitor)); add_channel(&gossip_sync, &secp_ctx, &privkeys[0], &privkeys[2], ChannelFeatures::from_le_bytes(id_to_feature_flags(3)), 333); update_channel(&gossip_sync, &secp_ctx, &privkeys[0], UnsignedChannelUpdate { diff --git a/lightning/src/routing/utxo.rs b/lightning/src/routing/utxo.rs new file mode 100644 index 00000000000..1008e6a369e --- /dev/null +++ b/lightning/src/routing/utxo.rs @@ -0,0 +1,36 @@ +// This file is Copyright its original authors, visible in version control +// history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license +// , at your option. +// You may not use this file except in accordance with one or both of these +// licenses. + +//! This module contains traits for LDK to access UTXOs to check gossip data is correct. +//! +//! When lightning nodes gossip channel information, they resist DoS attacks by checking that each +//! channel matches a UTXO on-chain, requiring at least some marginal on-chain transacting in +//! order to announce a channel. This module handles that checking. + +use bitcoin::{BlockHash, TxOut}; + +/// An error when accessing the chain via [`UtxoLookup`]. +#[derive(Clone, Debug)] +pub enum UtxoLookupError { + /// The requested chain is unknown. + UnknownChain, + + /// The requested transaction doesn't exist or hasn't confirmed. + UnknownTx, +} + +/// The `UtxoLookup` trait defines behavior for accessing on-chain UTXOs. +pub trait UtxoLookup { + /// Returns the transaction output of a funding transaction encoded by [`short_channel_id`]. + /// Returns an error if `genesis_hash` is for a different chain or if such a transaction output + /// is unknown. + /// + /// [`short_channel_id`]: https://github.com/lightning/bolts/blob/master/07-routing-gossip.md#definition-of-short_channel_id + fn get_utxo(&self, genesis_hash: &BlockHash, short_channel_id: u64) -> Result; +} diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index be09c8f0065..77324231393 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -22,8 +22,8 @@ use crate::ln::features::{ChannelFeatures, InitFeatures, NodeFeatures}; use crate::ln::{msgs, wire}; use crate::ln::msgs::LightningError; use crate::ln::script::ShutdownScript; -use crate::routing::gossip::NetworkGraph; -use crate::routing::gossip::NodeId; +use crate::routing::gossip::{NetworkGraph, NodeId}; +use crate::routing::utxo::{UtxoLookup, UtxoLookupError}; use crate::routing::router::{find_route, InFlightHtlcs, Route, RouteHop, RouteParameters, Router, ScorerAccountingForInFlightHtlcs}; use crate::routing::scoring::FixedPenaltyScorer; use crate::util::config::UserConfig; @@ -839,7 +839,7 @@ impl core::fmt::Debug for OnGetShutdownScriptpubkey { pub struct TestChainSource { pub genesis_hash: BlockHash, - pub utxo_ret: Mutex>, + pub utxo_ret: Mutex>, pub watched_txn: Mutex>, pub watched_outputs: Mutex>, } @@ -856,10 +856,10 @@ impl TestChainSource { } } -impl chain::Access for TestChainSource { - fn get_utxo(&self, genesis_hash: &BlockHash, _short_channel_id: u64) -> Result { +impl UtxoLookup for TestChainSource { + fn get_utxo(&self, genesis_hash: &BlockHash, _short_channel_id: u64) -> Result { if self.genesis_hash != *genesis_hash { - return Err(chain::AccessError::UnknownChain); + return Err(UtxoLookupError::UnknownChain); } self.utxo_ret.lock().unwrap().clone() From 96b9cf28588619642a2fbf9ef6639e811d563e56 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Tue, 24 Jan 2023 05:04:21 +0000 Subject: [PATCH 02/16] Update the `gossip` and `router` module docs which were out of date --- lightning/src/routing/gossip.rs | 2 +- lightning/src/routing/router.rs | 5 +---- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/lightning/src/routing/gossip.rs b/lightning/src/routing/gossip.rs index bbb8732f277..e3645271b2b 100644 --- a/lightning/src/routing/gossip.rs +++ b/lightning/src/routing/gossip.rs @@ -7,7 +7,7 @@ // You may not use this file except in accordance with one or both of these // licenses. -//! The top-level network map tracking logic lives here. +//! The [`NetworkGraph`] stores the network gossip and [`P2PGossipSync`] fetches it from peers use bitcoin::secp256k1::constants::PUBLIC_KEY_SIZE; use bitcoin::secp256k1::PublicKey; diff --git a/lightning/src/routing/router.rs b/lightning/src/routing/router.rs index a2c98212f65..65fbbbc9646 100644 --- a/lightning/src/routing/router.rs +++ b/lightning/src/routing/router.rs @@ -7,10 +7,7 @@ // You may not use this file except in accordance with one or both of these // licenses. -//! The top-level routing/network map tracking logic lives here. -//! -//! You probably want to create a P2PGossipSync and use that as your RoutingMessageHandler and then -//! interrogate it to get routes for your own payments. +//! The router finds paths within a [`NetworkGraph`] for a payment. use bitcoin::secp256k1::PublicKey; use bitcoin::hashes::Hash; From d3105d7794e22f98f82a674e137671a9d4c5a0b3 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Sat, 21 Jan 2023 03:48:46 +0000 Subject: [PATCH 03/16] Move logic to check a `ChannelAnnouncement` to `gossip_checking` This commit is deliberately move-only, though the code being moved is somewhat crufty. --- lightning/src/routing/gossip.rs | 32 ++----------------------- lightning/src/routing/utxo.rs | 41 +++++++++++++++++++++++++++++++++ 2 files changed, 43 insertions(+), 30 deletions(-) diff --git a/lightning/src/routing/gossip.rs b/lightning/src/routing/gossip.rs index e3645271b2b..bfc3018643b 100644 --- a/lightning/src/routing/gossip.rs +++ b/lightning/src/routing/gossip.rs @@ -16,16 +16,14 @@ use bitcoin::secp256k1; use bitcoin::hashes::sha256d::Hash as Sha256dHash; use bitcoin::hashes::Hash; -use bitcoin::blockdata::transaction::TxOut; use bitcoin::hash_types::BlockHash; -use crate::ln::chan_utils::make_funding_redeemscript_from_slices; use crate::ln::features::{ChannelFeatures, NodeFeatures, InitFeatures}; use crate::ln::msgs::{DecodeError, ErrorAction, Init, LightningError, RoutingMessageHandler, NetAddress, MAX_VALUE_MSAT}; use crate::ln::msgs::{ChannelAnnouncement, ChannelUpdate, NodeAnnouncement, GossipTimestampFilter}; use crate::ln::msgs::{QueryChannelRange, ReplyChannelRange, QueryShortChannelIds, ReplyShortChannelIdsEnd}; use crate::ln::msgs; -use crate::routing::utxo::{UtxoLookup, UtxoLookupError}; +use crate::routing::utxo::{self, UtxoLookup}; use crate::util::ser::{Readable, ReadableArgs, Writeable, Writer, MaybeReadable}; use crate::util::logger::{Logger, Level}; use crate::util::events::{MessageSendEvent, MessageSendEventsProvider}; @@ -42,7 +40,6 @@ use crate::sync::{RwLock, RwLockReadGuard}; use core::sync::atomic::{AtomicUsize, Ordering}; use crate::sync::Mutex; use core::ops::{Bound, Deref}; -use bitcoin::hashes::hex::ToHex; #[cfg(feature = "std")] use std::time::{SystemTime, UNIX_EPOCH}; @@ -1497,32 +1494,7 @@ impl NetworkGraph where L::Target: Logger { } } - let utxo_value = match &utxo_lookup { - &None => { - // Tentatively accept, potentially exposing us to DoS attacks - None - }, - &Some(ref utxo_lookup) => { - match utxo_lookup.get_utxo(&msg.chain_hash, msg.short_channel_id) { - Ok(TxOut { value, script_pubkey }) => { - let expected_script = - make_funding_redeemscript_from_slices(msg.bitcoin_key_1.as_slice(), msg.bitcoin_key_2.as_slice()).to_v0_p2wsh(); - if script_pubkey != expected_script { - return Err(LightningError{err: format!("Channel announcement key ({}) didn't match on-chain script ({})", expected_script.to_hex(), script_pubkey.to_hex()), action: ErrorAction::IgnoreError}); - } - //TODO: Check if value is worth storing, use it to inform routing, and compare it - //to the new HTLC max field in channel_update - Some(value) - }, - Err(UtxoLookupError::UnknownChain) => { - return Err(LightningError{err: format!("Channel announced on an unknown chain ({})", msg.chain_hash.encode().to_hex()), action: ErrorAction::IgnoreError}); - }, - Err(UtxoLookupError::UnknownTx) => { - return Err(LightningError{err: "Channel announced without corresponding UTXO entry".to_owned(), action: ErrorAction::IgnoreError}); - }, - } - }, - }; + let utxo_value = utxo::check_channel_announcement(utxo_lookup, msg)?; #[allow(unused_mut, unused_assignments)] let mut announcement_received_time = 0; diff --git a/lightning/src/routing/utxo.rs b/lightning/src/routing/utxo.rs index 1008e6a369e..828ea3764fb 100644 --- a/lightning/src/routing/utxo.rs +++ b/lightning/src/routing/utxo.rs @@ -14,6 +14,15 @@ //! order to announce a channel. This module handles that checking. use bitcoin::{BlockHash, TxOut}; +use bitcoin::hashes::hex::ToHex; + +use crate::ln::chan_utils::make_funding_redeemscript_from_slices; +use crate::ln::msgs::{self, LightningError, ErrorAction}; +use crate::util::ser::Writeable; + +use crate::prelude::*; + +use core::ops::Deref; /// An error when accessing the chain via [`UtxoLookup`]. #[derive(Clone, Debug)] @@ -34,3 +43,35 @@ pub trait UtxoLookup { /// [`short_channel_id`]: https://github.com/lightning/bolts/blob/master/07-routing-gossip.md#definition-of-short_channel_id fn get_utxo(&self, genesis_hash: &BlockHash, short_channel_id: u64) -> Result; } + +pub(crate) fn check_channel_announcement( + utxo_lookup: &Option, msg: &msgs::UnsignedChannelAnnouncement +) -> Result, msgs::LightningError> where U::Target: UtxoLookup { + let utxo_value = match utxo_lookup { + &None => { + // Tentatively accept, potentially exposing us to DoS attacks + None + }, + &Some(ref utxo_lookup) => { + match utxo_lookup.get_utxo(&msg.chain_hash, msg.short_channel_id) { + Ok(TxOut { value, script_pubkey }) => { + let expected_script = + make_funding_redeemscript_from_slices(msg.bitcoin_key_1.as_slice(), msg.bitcoin_key_2.as_slice()).to_v0_p2wsh(); + if script_pubkey != expected_script { + return Err(LightningError{err: format!("Channel announcement key ({}) didn't match on-chain script ({})", expected_script.to_hex(), script_pubkey.to_hex()), action: ErrorAction::IgnoreError}); + } + //TODO: Check if value is worth storing, use it to inform routing, and compare it + //to the new HTLC max field in channel_update + Some(value) + }, + Err(UtxoLookupError::UnknownChain) => { + return Err(LightningError{err: format!("Channel announced on an unknown chain ({})", msg.chain_hash.encode().to_hex()), action: ErrorAction::IgnoreError}); + }, + Err(UtxoLookupError::UnknownTx) => { + return Err(LightningError{err: "Channel announced without corresponding UTXO entry".to_owned(), action: ErrorAction::IgnoreError}); + }, + } + } + }; + Ok(utxo_value) +} From 1e8553fc675698bd3142459ad6421d5bd7aa72c7 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Sat, 21 Jan 2023 03:51:22 +0000 Subject: [PATCH 04/16] Clean up `check_channel_announcement` style `check_channel_announcement` had long lines, a (very-)stale TODO and confusing variable assignment, which is all cleaned up here. --- lightning/src/routing/utxo.rs | 28 ++++++++++++++++++---------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/lightning/src/routing/utxo.rs b/lightning/src/routing/utxo.rs index 828ea3764fb..ee56f2d9288 100644 --- a/lightning/src/routing/utxo.rs +++ b/lightning/src/routing/utxo.rs @@ -47,10 +47,10 @@ pub trait UtxoLookup { pub(crate) fn check_channel_announcement( utxo_lookup: &Option, msg: &msgs::UnsignedChannelAnnouncement ) -> Result, msgs::LightningError> where U::Target: UtxoLookup { - let utxo_value = match utxo_lookup { + match utxo_lookup { &None => { // Tentatively accept, potentially exposing us to DoS attacks - None + Ok(None) }, &Some(ref utxo_lookup) => { match utxo_lookup.get_utxo(&msg.chain_hash, msg.short_channel_id) { @@ -58,20 +58,28 @@ pub(crate) fn check_channel_announcement( let expected_script = make_funding_redeemscript_from_slices(msg.bitcoin_key_1.as_slice(), msg.bitcoin_key_2.as_slice()).to_v0_p2wsh(); if script_pubkey != expected_script { - return Err(LightningError{err: format!("Channel announcement key ({}) didn't match on-chain script ({})", expected_script.to_hex(), script_pubkey.to_hex()), action: ErrorAction::IgnoreError}); + return Err(LightningError{ + err: format!("Channel announcement key ({}) didn't match on-chain script ({})", + expected_script.to_hex(), script_pubkey.to_hex()), + action: ErrorAction::IgnoreError + }); } - //TODO: Check if value is worth storing, use it to inform routing, and compare it - //to the new HTLC max field in channel_update - Some(value) + Ok(Some(value)) }, Err(UtxoLookupError::UnknownChain) => { - return Err(LightningError{err: format!("Channel announced on an unknown chain ({})", msg.chain_hash.encode().to_hex()), action: ErrorAction::IgnoreError}); + Err(LightningError { + err: format!("Channel announced on an unknown chain ({})", + msg.chain_hash.encode().to_hex()), + action: ErrorAction::IgnoreError + }) }, Err(UtxoLookupError::UnknownTx) => { - return Err(LightningError{err: "Channel announced without corresponding UTXO entry".to_owned(), action: ErrorAction::IgnoreError}); + Err(LightningError { + err: "Channel announced without corresponding UTXO entry".to_owned(), + action: ErrorAction::IgnoreError + }) }, } } - }; - Ok(utxo_value) + } } From 2cca65058e4ce3e0120a3fe78a14ed82d1c3a43b Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Wed, 8 Feb 2023 22:06:11 +0000 Subject: [PATCH 05/16] Add an async resolution option to `ChainAccess::get_utxo` For those operating in an async environment, requiring `ChainAccess::get_utxo` return information about the requested UTXO synchronously is incredibly painful. Requesting information about a random UTXO is likely to go over the network, and likely to be a rather slow request. Thus, here, we change the return type of `get_utxo` to have both a synchronous and asynchronous form. The asynchronous form requires the user construct a `AccessFuture` which they `clone` and pass back to us. Internally, an `AccessFuture` has an `Arc` to the `channel_announcement` message which we need to process. When the user completes their lookup, they call `resolve` on their `AccessFuture` which we pull the `channel_announcement` from and then apply to the network graph. --- fuzz/src/router.rs | 48 +++++++--- lightning/src/routing/gossip.rs | 7 +- lightning/src/routing/utxo.rs | 145 ++++++++++++++++++++++++++++--- lightning/src/util/test_utils.rs | 8 +- 4 files changed, 180 insertions(+), 28 deletions(-) diff --git a/fuzz/src/router.rs b/fuzz/src/router.rs index 3d5c88bc3da..a30c7d28af4 100644 --- a/fuzz/src/router.rs +++ b/fuzz/src/router.rs @@ -15,7 +15,7 @@ use lightning::chain::transaction::OutPoint; use lightning::ln::channelmanager::{self, ChannelDetails, ChannelCounterparty}; use lightning::ln::msgs; use lightning::routing::gossip::{NetworkGraph, RoutingFees}; -use lightning::routing::utxo::{UtxoLookup, UtxoLookupError}; +use lightning::routing::utxo::{UtxoFuture, UtxoLookup, UtxoLookupError, UtxoResult}; use lightning::routing::router::{find_route, PaymentParameters, RouteHint, RouteHintHop, RouteParameters}; use lightning::routing::scoring::FixedPenaltyScorer; use lightning::util::config::UserConfig; @@ -81,17 +81,36 @@ impl InputData { } } -struct FuzzChainSource { +struct FuzzChainSource<'a, 'b, Out: test_logger::Output> { input: Arc, + net_graph: &'a NetworkGraph<&'b test_logger::TestLogger>, } -impl UtxoLookup for FuzzChainSource { - fn get_utxo(&self, _genesis_hash: &BlockHash, _short_channel_id: u64) -> Result { - match self.input.get_slice(2) { - Some(&[0, _]) => Err(UtxoLookupError::UnknownChain), - Some(&[1, _]) => Err(UtxoLookupError::UnknownTx), - Some(&[_, x]) => Ok(TxOut { value: 0, script_pubkey: Builder::new().push_int(x as i64).into_script().to_v0_p2wsh() }), - None => Err(UtxoLookupError::UnknownTx), - _ => unreachable!(), +impl UtxoLookup for FuzzChainSource<'_, '_, Out> { + fn get_utxo(&self, _genesis_hash: &BlockHash, _short_channel_id: u64) -> UtxoResult { + let input_slice = self.input.get_slice(2); + if input_slice.is_none() { return UtxoResult::Sync(Err(UtxoLookupError::UnknownTx)); } + let input_slice = input_slice.unwrap(); + let txo_res = TxOut { + value: if input_slice[0] % 2 == 0 { 1_000_000 } else { 1_000 }, + script_pubkey: Builder::new().push_int(input_slice[1] as i64).into_script().to_v0_p2wsh(), + }; + match input_slice { + &[0, _] => UtxoResult::Sync(Err(UtxoLookupError::UnknownChain)), + &[1, _] => UtxoResult::Sync(Err(UtxoLookupError::UnknownTx)), + &[2, _] => { + let future = UtxoFuture::new(); + future.resolve(self.net_graph, Ok(txo_res)); + UtxoResult::Async(future.clone()) + }, + &[3, _] => { + let future = UtxoFuture::new(); + future.resolve(self.net_graph, Err(UtxoLookupError::UnknownTx)); + UtxoResult::Async(future.clone()) + }, + &[4, _] => { + UtxoResult::Async(UtxoFuture::new()) // the future will never resolve + }, + &[..] => UtxoResult::Sync(Ok(txo_res)), } } } @@ -171,6 +190,10 @@ pub fn do_test(data: &[u8], out: Out) { let our_pubkey = get_pubkey!(); let net_graph = NetworkGraph::new(genesis_block(Network::Bitcoin).header.block_hash(), &logger); + let chain_source = FuzzChainSource { + input: Arc::clone(&input), + net_graph: &net_graph, + }; let mut node_pks = HashSet::new(); let mut scid = 42; @@ -191,13 +214,14 @@ pub fn do_test(data: &[u8], out: Out) { let msg = decode_msg_with_len16!(msgs::UnsignedChannelAnnouncement, 32+8+33*4); node_pks.insert(get_pubkey_from_node_id!(msg.node_id_1)); node_pks.insert(get_pubkey_from_node_id!(msg.node_id_2)); - let _ = net_graph.update_channel_from_unsigned_announcement::<&FuzzChainSource>(&msg, &None); + let _ = net_graph.update_channel_from_unsigned_announcement:: + <&FuzzChainSource<'_, '_, Out>>(&msg, &None); }, 2 => { let msg = decode_msg_with_len16!(msgs::UnsignedChannelAnnouncement, 32+8+33*4); node_pks.insert(get_pubkey_from_node_id!(msg.node_id_1)); node_pks.insert(get_pubkey_from_node_id!(msg.node_id_2)); - let _ = net_graph.update_channel_from_unsigned_announcement(&msg, &Some(&FuzzChainSource { input: Arc::clone(&input) })); + let _ = net_graph.update_channel_from_unsigned_announcement(&msg, &Some(&chain_source)); }, 3 => { let _ = net_graph.update_channel_unsigned(&decode_msg!(msgs::UnsignedChannelUpdate, 72)); diff --git a/lightning/src/routing/gossip.rs b/lightning/src/routing/gossip.rs index bfc3018643b..dd9cabd25eb 100644 --- a/lightning/src/routing/gossip.rs +++ b/lightning/src/routing/gossip.rs @@ -155,6 +155,8 @@ pub struct NetworkGraph where L::Target: Logger { /// resync them from gossip. Each `NodeId` is mapped to the time (in seconds) it was removed so /// that once some time passes, we can potentially resync it from gossip again. removed_nodes: Mutex>>, + /// Announcement messages which are awaiting an on-chain lookup to be processed. + pub(super) pending_checks: utxo::PendingChecks, } /// A read-only view of [`NetworkGraph`]. @@ -1200,6 +1202,7 @@ impl ReadableArgs for NetworkGraph where L::Target: Logger { last_rapid_gossip_sync_timestamp: Mutex::new(last_rapid_gossip_sync_timestamp), removed_nodes: Mutex::new(HashMap::new()), removed_channels: Mutex::new(HashMap::new()), + pending_checks: utxo::PendingChecks::new(), }) } } @@ -1239,6 +1242,7 @@ impl NetworkGraph where L::Target: Logger { last_rapid_gossip_sync_timestamp: Mutex::new(None), removed_channels: Mutex::new(HashMap::new()), removed_nodes: Mutex::new(HashMap::new()), + pending_checks: utxo::PendingChecks::new(), } } @@ -1494,7 +1498,8 @@ impl NetworkGraph where L::Target: Logger { } } - let utxo_value = utxo::check_channel_announcement(utxo_lookup, msg)?; + let utxo_value = self.pending_checks.check_channel_announcement( + utxo_lookup, msg, full_msg)?; #[allow(unused_mut, unused_assignments)] let mut announcement_received_time = 0; diff --git a/lightning/src/routing/utxo.rs b/lightning/src/routing/utxo.rs index ee56f2d9288..18c7ff97d94 100644 --- a/lightning/src/routing/utxo.rs +++ b/lightning/src/routing/utxo.rs @@ -18,10 +18,14 @@ use bitcoin::hashes::hex::ToHex; use crate::ln::chan_utils::make_funding_redeemscript_from_slices; use crate::ln::msgs::{self, LightningError, ErrorAction}; +use crate::routing::gossip::{NetworkGraph, NodeId}; +use crate::util::logger::{Level, Logger}; use crate::util::ser::Writeable; use crate::prelude::*; +use alloc::sync::{Arc, Weak}; +use crate::sync::Mutex; use core::ops::Deref; /// An error when accessing the chain via [`UtxoLookup`]. @@ -34,6 +38,23 @@ pub enum UtxoLookupError { UnknownTx, } +/// The result of a [`UtxoLookup::get_utxo`] call. A call may resolve either synchronously, +/// returning the `Sync` variant, or asynchronously, returning an [`UtxoFuture`] in the `Async` +/// variant. +pub enum UtxoResult { + /// A result which was resolved synchronously. It either includes a [`TxOut`] for the output + /// requested or a [`UtxoLookupError`]. + Sync(Result), + /// A result which will be resolved asynchronously. It includes a [`UtxoFuture`], a `clone` of + /// which you must keep locally and call [`UtxoFuture::resolve`] on once the lookup completes. + /// + /// Note that in order to avoid runaway memory usage, the number of parallel checks is limited, + /// but only fairly loosely. Because a pending checks block all message processing, leaving + /// checks pending for an extended time may cause DoS of other functions. It is recommended you + /// keep a tight timeout on lookups, on the order of a few seconds. + Async(UtxoFuture), +} + /// The `UtxoLookup` trait defines behavior for accessing on-chain UTXOs. pub trait UtxoLookup { /// Returns the transaction output of a funding transaction encoded by [`short_channel_id`]. @@ -41,19 +62,93 @@ pub trait UtxoLookup { /// is unknown. /// /// [`short_channel_id`]: https://github.com/lightning/bolts/blob/master/07-routing-gossip.md#definition-of-short_channel_id - fn get_utxo(&self, genesis_hash: &BlockHash, short_channel_id: u64) -> Result; + fn get_utxo(&self, genesis_hash: &BlockHash, short_channel_id: u64) -> UtxoResult; +} + +enum ChannelAnnouncement { + Full(msgs::ChannelAnnouncement), + Unsigned(msgs::UnsignedChannelAnnouncement), +} + +struct UtxoMessages { + complete: Option>, + channel_announce: Option, +} + +/// Represents a future resolution of a [`UtxoLookup::get_utxo`] query resolving async. +/// +/// See [`UtxoResult::Async`] and [`UtxoFuture::resolve`] for more info. +#[derive(Clone)] +pub struct UtxoFuture { + state: Arc>, +} + +/// A trivial implementation of [`UtxoLookup`] which is used to call back into the network graph +/// once we have a concrete resolution of a request. +struct UtxoResolver(Result); +impl UtxoLookup for UtxoResolver { + fn get_utxo(&self, _genesis_hash: &BlockHash, _short_channel_id: u64) -> UtxoResult { + UtxoResult::Sync(self.0.clone()) + } +} + +impl UtxoFuture { + /// Builds a new future for later resolution. + pub fn new() -> Self { + Self { state: Arc::new(Mutex::new(UtxoMessages { + complete: None, + channel_announce: None, + }))} + } + + /// Resolves this future against the given `graph` and with the given `result`. + pub fn resolve(&self, graph: &NetworkGraph, result: Result) + where L::Target: Logger { + let announcement = { + let mut async_messages = self.state.lock().unwrap(); + + if async_messages.channel_announce.is_none() { + // We raced returning to `check_channel_announcement` which hasn't updated + // `channel_announce` yet. That's okay, we can set the `complete` field which it will + // check once it gets control again. + async_messages.complete = Some(result); + return; + } + + async_messages.channel_announce.take().unwrap() + }; + + // Now that we've updated our internal state, pass the pending messages back through the + // network graph with a different `UtxoLookup` which will resolve immediately. + // Note that we ignore errors as we don't disconnect peers anyway, so there's nothing to do + // with them. + let resolver = UtxoResolver(result); + match announcement { + ChannelAnnouncement::Full(signed_msg) => { + let _ = graph.update_channel_from_announcement(&signed_msg, &Some(&resolver)); + }, + ChannelAnnouncement::Unsigned(msg) => { + let _ = graph.update_channel_from_unsigned_announcement(&msg, &Some(&resolver)); + }, + } + } +} + +/// A set of messages which are pending UTXO lookups for processing. +pub(super) struct PendingChecks { } -pub(crate) fn check_channel_announcement( - utxo_lookup: &Option, msg: &msgs::UnsignedChannelAnnouncement -) -> Result, msgs::LightningError> where U::Target: UtxoLookup { - match utxo_lookup { - &None => { - // Tentatively accept, potentially exposing us to DoS attacks - Ok(None) - }, - &Some(ref utxo_lookup) => { - match utxo_lookup.get_utxo(&msg.chain_hash, msg.short_channel_id) { +impl PendingChecks { + pub(super) fn new() -> Self { + PendingChecks {} + } + + pub(super) fn check_channel_announcement(&self, + utxo_lookup: &Option, msg: &msgs::UnsignedChannelAnnouncement, + full_msg: Option<&msgs::ChannelAnnouncement> + ) -> Result, msgs::LightningError> where U::Target: UtxoLookup { + let handle_result = |res| { + match res { Ok(TxOut { value, script_pubkey }) => { let expected_script = make_funding_redeemscript_from_slices(msg.bitcoin_key_1.as_slice(), msg.bitcoin_key_2.as_slice()).to_v0_p2wsh(); @@ -80,6 +175,34 @@ pub(crate) fn check_channel_announcement( }) }, } + }; + + match utxo_lookup { + &None => { + // Tentatively accept, potentially exposing us to DoS attacks + Ok(None) + }, + &Some(ref utxo_lookup) => { + match utxo_lookup.get_utxo(&msg.chain_hash, msg.short_channel_id) { + UtxoResult::Sync(res) => handle_result(res), + UtxoResult::Async(future) => { + let mut async_messages = future.state.lock().unwrap(); + if let Some(res) = async_messages.complete.take() { + // In the unlikely event the future resolved before we managed to get it, + // handle the result in-line. + handle_result(res) + } else { + async_messages.channel_announce = Some( + if let Some(msg) = full_msg { ChannelAnnouncement::Full(msg.clone()) } + else { ChannelAnnouncement::Unsigned(msg.clone()) }); + Err(LightningError { + err: "Channel being checked async".to_owned(), + action: ErrorAction::IgnoreAndLog(Level::Gossip), + }) + } + }, + } + } } } } diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index 77324231393..b62479555cc 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -23,7 +23,7 @@ use crate::ln::{msgs, wire}; use crate::ln::msgs::LightningError; use crate::ln::script::ShutdownScript; use crate::routing::gossip::{NetworkGraph, NodeId}; -use crate::routing::utxo::{UtxoLookup, UtxoLookupError}; +use crate::routing::utxo::{UtxoLookup, UtxoLookupError, UtxoResult}; use crate::routing::router::{find_route, InFlightHtlcs, Route, RouteHop, RouteParameters, Router, ScorerAccountingForInFlightHtlcs}; use crate::routing::scoring::FixedPenaltyScorer; use crate::util::config::UserConfig; @@ -857,12 +857,12 @@ impl TestChainSource { } impl UtxoLookup for TestChainSource { - fn get_utxo(&self, genesis_hash: &BlockHash, _short_channel_id: u64) -> Result { + fn get_utxo(&self, genesis_hash: &BlockHash, _short_channel_id: u64) -> UtxoResult { if self.genesis_hash != *genesis_hash { - return Err(UtxoLookupError::UnknownChain); + return UtxoResult::Sync(Err(UtxoLookupError::UnknownChain)); } - self.utxo_ret.lock().unwrap().clone() + UtxoResult::Sync(self.utxo_ret.lock().unwrap().clone()) } } From 7388b6c1d795a5cbbf2c6c109bacd26e15c81500 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Wed, 8 Feb 2023 22:11:56 +0000 Subject: [PATCH 06/16] Track in-flight `channel_announcement` lookups and avoid duplicates If we receive two `channel_announcement`s for the same channel at the same time, we shouldn't spawn a second UTXO lookup for an identical message. This likely isn't too rare - if we start syncing the graph from two peers at the same time, it isn't unlikely that we'll end up with the same messages around the same time. In order to avoid this we keep a hash map of all the pending `channel_announcement` messages by SCID and simply ignore duplicate message lookups. --- lightning/src/routing/utxo.rs | 89 ++++++++++++++++++++++++++++++++++- 1 file changed, 88 insertions(+), 1 deletion(-) diff --git a/lightning/src/routing/utxo.rs b/lightning/src/routing/utxo.rs index 18c7ff97d94..c18b4f459fa 100644 --- a/lightning/src/routing/utxo.rs +++ b/lightning/src/routing/utxo.rs @@ -105,6 +105,7 @@ impl UtxoFuture { pub fn resolve(&self, graph: &NetworkGraph, result: Result) where L::Target: Logger { let announcement = { + let mut pending_checks = graph.pending_checks.internal.lock().unwrap(); let mut async_messages = self.state.lock().unwrap(); if async_messages.channel_announce.is_none() { @@ -114,6 +115,12 @@ impl UtxoFuture { async_messages.complete = Some(result); return; } + let announcement_msg = match async_messages.channel_announce.as_ref().unwrap() { + ChannelAnnouncement::Full(signed_msg) => &signed_msg.contents, + ChannelAnnouncement::Unsigned(msg) => &msg, + }; + + pending_checks.lookup_completed(announcement_msg, &Arc::downgrade(&self.state)); async_messages.channel_announce.take().unwrap() }; @@ -134,13 +141,87 @@ impl UtxoFuture { } } +struct PendingChecksContext { + channels: HashMap>>, +} + +impl PendingChecksContext { + fn lookup_completed(&mut self, + msg: &msgs::UnsignedChannelAnnouncement, completed_state: &Weak> + ) { + if let hash_map::Entry::Occupied(e) = self.channels.entry(msg.short_channel_id) { + if Weak::ptr_eq(e.get(), &completed_state) { + e.remove(); + } + } + } +} + /// A set of messages which are pending UTXO lookups for processing. pub(super) struct PendingChecks { + internal: Mutex, } impl PendingChecks { pub(super) fn new() -> Self { - PendingChecks {} + PendingChecks { internal: Mutex::new(PendingChecksContext { + channels: HashMap::new(), + }) } + } + + fn check_replace_previous_entry(msg: &msgs::UnsignedChannelAnnouncement, + full_msg: Option<&msgs::ChannelAnnouncement>, replacement: Option>>, + pending_channels: &mut HashMap>> + ) -> Result<(), msgs::LightningError> { + match pending_channels.entry(msg.short_channel_id) { + hash_map::Entry::Occupied(mut e) => { + // There's already a pending lookup for the given SCID. Check if the messages + // are the same and, if so, return immediately (don't bother spawning another + // lookup if we haven't gotten that far yet). + match Weak::upgrade(&e.get()) { + Some(pending_msgs) => { + let pending_matches = match &pending_msgs.lock().unwrap().channel_announce { + Some(ChannelAnnouncement::Full(pending_msg)) => Some(pending_msg) == full_msg, + Some(ChannelAnnouncement::Unsigned(pending_msg)) => pending_msg == msg, + None => { + // This shouldn't actually be reachable. We set the + // `channel_announce` field under the same lock as setting the + // channel map entry. Still, we can just treat it as + // non-matching and let the new request fly. + debug_assert!(false); + false + }, + }; + if pending_matches { + return Err(LightningError { + err: "Channel announcement is already being checked".to_owned(), + action: ErrorAction::IgnoreDuplicateGossip, + }); + } else { + // The earlier lookup is a different message. If we have another + // request in-flight now replace the original. + // Note that in the replace case whether to replace is somewhat + // arbitrary - both results will be handled, we're just updating the + // value that will be compared to future lookups with the same SCID. + if let Some(item) = replacement { + *e.get_mut() = item; + } + } + }, + None => { + // The earlier lookup already resolved. We can't be sure its the same + // so just remove/replace it and move on. + if let Some(item) = replacement { + *e.get_mut() = item; + } else { e.remove(); } + }, + } + }, + hash_map::Entry::Vacant(v) => { + if let Some(item) = replacement { v.insert(item); } + }, + } + Ok(()) } pub(super) fn check_channel_announcement(&self, @@ -177,6 +258,9 @@ impl PendingChecks { } }; + Self::check_replace_previous_entry(msg, full_msg, None, + &mut self.internal.lock().unwrap().channels)?; + match utxo_lookup { &None => { // Tentatively accept, potentially exposing us to DoS attacks @@ -186,12 +270,15 @@ impl PendingChecks { match utxo_lookup.get_utxo(&msg.chain_hash, msg.short_channel_id) { UtxoResult::Sync(res) => handle_result(res), UtxoResult::Async(future) => { + let mut pending_checks = self.internal.lock().unwrap(); let mut async_messages = future.state.lock().unwrap(); if let Some(res) = async_messages.complete.take() { // In the unlikely event the future resolved before we managed to get it, // handle the result in-line. handle_result(res) } else { + Self::check_replace_previous_entry(msg, full_msg, + Some(Arc::downgrade(&future.state)), &mut pending_checks.channels)?; async_messages.channel_announce = Some( if let Some(msg) = full_msg { ChannelAnnouncement::Full(msg.clone()) } else { ChannelAnnouncement::Unsigned(msg.clone()) }); From 67c9c7f2ae150a287370d56373f673e116172690 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Tue, 7 Feb 2023 20:38:20 +0000 Subject: [PATCH 07/16] Process `channel_update`/`node_announcement` async if needed If we have a `channel_announcement` which is waiting on a UTXO lookup before we can process it, and we receive a `channel_update` or `node_announcement` for the same channel or a node which is a part of the channel, we have to wait until the lookup completes until we can decide if we want to accept the new message. Here, we store the new message in the pending lookup state and process it asynchronously like the original `channel_announcement`. --- lightning/src/routing/gossip.rs | 15 ++- lightning/src/routing/utxo.rs | 179 +++++++++++++++++++++++++++++++- 2 files changed, 188 insertions(+), 6 deletions(-) diff --git a/lightning/src/routing/gossip.rs b/lightning/src/routing/gossip.rs index dd9cabd25eb..8a54e63e2b5 100644 --- a/lightning/src/routing/gossip.rs +++ b/lightning/src/routing/gossip.rs @@ -1298,8 +1298,13 @@ impl NetworkGraph where L::Target: Logger { } fn update_node_from_announcement_intern(&self, msg: &msgs::UnsignedNodeAnnouncement, full_msg: Option<&msgs::NodeAnnouncement>) -> Result<(), LightningError> { - match self.nodes.write().unwrap().get_mut(&msg.node_id) { - None => Err(LightningError{err: "No existing channels for node_announcement".to_owned(), action: ErrorAction::IgnoreError}), + let mut nodes = self.nodes.write().unwrap(); + match nodes.get_mut(&msg.node_id) { + None => { + core::mem::drop(nodes); + self.pending_checks.check_hold_pending_node_announcement(msg, full_msg)?; + Err(LightningError{err: "No existing channels for node_announcement".to_owned(), action: ErrorAction::IgnoreError}) + }, Some(node) => { if let Some(node_info) = node.announcement_info.as_ref() { // The timestamp field is somewhat of a misnomer - the BOLTs use it to order @@ -1724,7 +1729,11 @@ impl NetworkGraph where L::Target: Logger { let mut channels = self.channels.write().unwrap(); match channels.get_mut(&msg.short_channel_id) { - None => return Err(LightningError{err: "Couldn't find channel for update".to_owned(), action: ErrorAction::IgnoreError}), + None => { + core::mem::drop(channels); + self.pending_checks.check_hold_pending_channel_update(msg, full_msg)?; + return Err(LightningError{err: "Couldn't find channel for update".to_owned(), action: ErrorAction::IgnoreError}); + }, Some(channel) => { if msg.htlc_maximum_msat > MAX_VALUE_MSAT { return Err(LightningError{err: diff --git a/lightning/src/routing/utxo.rs b/lightning/src/routing/utxo.rs index c18b4f459fa..56b67184589 100644 --- a/lightning/src/routing/utxo.rs +++ b/lightning/src/routing/utxo.rs @@ -69,10 +69,48 @@ enum ChannelAnnouncement { Full(msgs::ChannelAnnouncement), Unsigned(msgs::UnsignedChannelAnnouncement), } +impl ChannelAnnouncement { + fn node_id_1(&self) -> &NodeId { + match self { + ChannelAnnouncement::Full(msg) => &msg.contents.node_id_1, + ChannelAnnouncement::Unsigned(msg) => &msg.node_id_1, + } + } +} + +enum NodeAnnouncement { + Full(msgs::NodeAnnouncement), + Unsigned(msgs::UnsignedNodeAnnouncement), +} +impl NodeAnnouncement { + fn timestamp(&self) -> u32 { + match self { + NodeAnnouncement::Full(msg) => msg.contents.timestamp, + NodeAnnouncement::Unsigned(msg) => msg.timestamp, + } + } +} + +enum ChannelUpdate { + Full(msgs::ChannelUpdate), + Unsigned(msgs::UnsignedChannelUpdate), +} +impl ChannelUpdate { + fn timestamp(&self) -> u32 { + match self { + ChannelUpdate::Full(msg) => msg.contents.timestamp, + ChannelUpdate::Unsigned(msg) => msg.timestamp, + } + } +} struct UtxoMessages { complete: Option>, channel_announce: Option, + latest_node_announce_a: Option, + latest_node_announce_b: Option, + latest_channel_update_a: Option, + latest_channel_update_b: Option, } /// Represents a future resolution of a [`UtxoLookup::get_utxo`] query resolving async. @@ -98,13 +136,17 @@ impl UtxoFuture { Self { state: Arc::new(Mutex::new(UtxoMessages { complete: None, channel_announce: None, + latest_node_announce_a: None, + latest_node_announce_b: None, + latest_channel_update_a: None, + latest_channel_update_b: None, }))} } /// Resolves this future against the given `graph` and with the given `result`. pub fn resolve(&self, graph: &NetworkGraph, result: Result) where L::Target: Logger { - let announcement = { + let (announcement, node_a, node_b, update_a, update_b) = { let mut pending_checks = graph.pending_checks.internal.lock().unwrap(); let mut async_messages = self.state.lock().unwrap(); @@ -115,6 +157,7 @@ impl UtxoFuture { async_messages.complete = Some(result); return; } + let announcement_msg = match async_messages.channel_announce.as_ref().unwrap() { ChannelAnnouncement::Full(signed_msg) => &signed_msg.contents, ChannelAnnouncement::Unsigned(msg) => &msg, @@ -122,7 +165,11 @@ impl UtxoFuture { pending_checks.lookup_completed(announcement_msg, &Arc::downgrade(&self.state)); - async_messages.channel_announce.take().unwrap() + (async_messages.channel_announce.take().unwrap(), + async_messages.latest_node_announce_a.take(), + async_messages.latest_node_announce_b.take(), + async_messages.latest_channel_update_a.take(), + async_messages.latest_channel_update_b.take()) }; // Now that we've updated our internal state, pass the pending messages back through the @@ -138,11 +185,36 @@ impl UtxoFuture { let _ = graph.update_channel_from_unsigned_announcement(&msg, &Some(&resolver)); }, } + + for announce in core::iter::once(node_a).chain(core::iter::once(node_b)) { + match announce { + Some(NodeAnnouncement::Full(signed_msg)) => { + let _ = graph.update_node_from_announcement(&signed_msg); + }, + Some(NodeAnnouncement::Unsigned(msg)) => { + let _ = graph.update_node_from_unsigned_announcement(&msg); + }, + None => {}, + } + } + + for update in core::iter::once(update_a).chain(core::iter::once(update_b)) { + match update { + Some(ChannelUpdate::Full(signed_msg)) => { + let _ = graph.update_channel(&signed_msg); + }, + Some(ChannelUpdate::Unsigned(msg)) => { + let _ = graph.update_channel_unsigned(&msg); + }, + None => {}, + } + } } } struct PendingChecksContext { channels: HashMap>>, + nodes: HashMap>>>, } impl PendingChecksContext { @@ -154,6 +226,15 @@ impl PendingChecksContext { e.remove(); } } + + if let hash_map::Entry::Occupied(mut e) = self.nodes.entry(msg.node_id_1) { + e.get_mut().retain(|elem| !Weak::ptr_eq(&elem, &completed_state)); + if e.get().is_empty() { e.remove(); } + } + if let hash_map::Entry::Occupied(mut e) = self.nodes.entry(msg.node_id_2) { + e.get_mut().retain(|elem| !Weak::ptr_eq(&elem, &completed_state)); + if e.get().is_empty() { e.remove(); } + } } } @@ -165,10 +246,98 @@ pub(super) struct PendingChecks { impl PendingChecks { pub(super) fn new() -> Self { PendingChecks { internal: Mutex::new(PendingChecksContext { - channels: HashMap::new(), + channels: HashMap::new(), nodes: HashMap::new(), }) } } + /// Checks if there is a pending `channel_update` UTXO validation for the given channel, + /// and, if so, stores the channel message for handling later and returns an `Err`. + pub(super) fn check_hold_pending_channel_update( + &self, msg: &msgs::UnsignedChannelUpdate, full_msg: Option<&msgs::ChannelUpdate> + ) -> Result<(), LightningError> { + let mut pending_checks = self.internal.lock().unwrap(); + if let hash_map::Entry::Occupied(e) = pending_checks.channels.entry(msg.short_channel_id) { + let is_from_a = (msg.flags & 1) == 1; + match Weak::upgrade(e.get()) { + Some(msgs_ref) => { + let mut messages = msgs_ref.lock().unwrap(); + let latest_update = if is_from_a { + &mut messages.latest_channel_update_a + } else { + &mut messages.latest_channel_update_b + }; + if latest_update.is_none() || latest_update.as_ref().unwrap().timestamp() < msg.timestamp { + // If the messages we got has a higher timestamp, just blindly assume the + // signatures on the new message are correct and drop the old message. This + // may cause us to end up dropping valid `channel_update`s if a peer is + // malicious, but we should get the correct ones when the node updates them. + *latest_update = Some( + if let Some(msg) = full_msg { ChannelUpdate::Full(msg.clone()) } + else { ChannelUpdate::Unsigned(msg.clone()) }); + } + return Err(LightningError { + err: "Awaiting channel_announcement validation to accept channel_update".to_owned(), + action: ErrorAction::IgnoreAndLog(Level::Gossip), + }); + }, + None => { e.remove(); }, + } + } + Ok(()) + } + + /// Checks if there is a pending `node_announcement` UTXO validation for a channel with the + /// given node and, if so, stores the channel message for handling later and returns an `Err`. + pub(super) fn check_hold_pending_node_announcement( + &self, msg: &msgs::UnsignedNodeAnnouncement, full_msg: Option<&msgs::NodeAnnouncement> + ) -> Result<(), LightningError> { + let mut pending_checks = self.internal.lock().unwrap(); + if let hash_map::Entry::Occupied(mut e) = pending_checks.nodes.entry(msg.node_id) { + let mut found_at_least_one_chan = false; + e.get_mut().retain(|node_msgs| { + match Weak::upgrade(&node_msgs) { + Some(chan_mtx) => { + let mut chan_msgs = chan_mtx.lock().unwrap(); + if let Some(chan_announce) = &chan_msgs.channel_announce { + let latest_announce = + if *chan_announce.node_id_1() == msg.node_id { + &mut chan_msgs.latest_node_announce_a + } else { + &mut chan_msgs.latest_node_announce_b + }; + if latest_announce.is_none() || + latest_announce.as_ref().unwrap().timestamp() < msg.timestamp + { + // If the messages we got has a higher timestamp, just blindly + // assume the signatures on the new message are correct and drop + // the old message. This may cause us to end up dropping valid + // `node_announcement`s if a peer is malicious, but we should get + // the correct ones when the node updates them. + *latest_announce = Some( + if let Some(msg) = full_msg { NodeAnnouncement::Full(msg.clone()) } + else { NodeAnnouncement::Unsigned(msg.clone()) }); + } + found_at_least_one_chan = true; + true + } else { + debug_assert!(false, "channel_announce is set before struct is added to node map"); + false + } + }, + None => false, + } + }); + if e.get().is_empty() { e.remove(); } + if found_at_least_one_chan { + return Err(LightningError { + err: "Awaiting channel_announcement validation to accept node_announcement".to_owned(), + action: ErrorAction::IgnoreAndLog(Level::Gossip), + }); + } + } + Ok(()) + } + fn check_replace_previous_entry(msg: &msgs::UnsignedChannelAnnouncement, full_msg: Option<&msgs::ChannelAnnouncement>, replacement: Option>>, pending_channels: &mut HashMap>> @@ -282,6 +451,10 @@ impl PendingChecks { async_messages.channel_announce = Some( if let Some(msg) = full_msg { ChannelAnnouncement::Full(msg.clone()) } else { ChannelAnnouncement::Unsigned(msg.clone()) }); + pending_checks.nodes.entry(msg.node_id_1) + .or_insert(Vec::new()).push(Arc::downgrade(&future.state)); + pending_checks.nodes.entry(msg.node_id_2) + .or_insert(Vec::new()).push(Arc::downgrade(&future.state)); Err(LightningError { err: "Channel being checked async".to_owned(), action: ErrorAction::IgnoreAndLog(Level::Gossip), From 41e6eba20106e66f4c28422ff8e4632e98b2fb42 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Sun, 22 Jan 2023 03:41:28 +0000 Subject: [PATCH 08/16] Add the ability to broadcast gossip msgs via the event pipeline When we process gossip messages asynchronously we may find that we want to forward a gossip message to a peer after we've returned from the existing `handle_*` method. In order to do so, we need to be able to send arbitrary loose gossip messages back to the `PeerManager` via `MessageSendEvent`. This commit modifies `MessageSendEvent` in order to support this. --- lightning/src/ln/channelmanager.rs | 5 +++-- lightning/src/ln/functional_test_utils.rs | 8 ++++++-- lightning/src/ln/peer_handler.rs | 18 ++++++++++++++---- lightning/src/ln/priv_short_conf_tests.rs | 7 ++++--- lightning/src/ln/reload_tests.rs | 4 ++-- lightning/src/util/events.rs | 7 ++++++- 6 files changed, 35 insertions(+), 14 deletions(-) diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 59d8e387529..513dc897e41 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -5044,7 +5044,7 @@ where ), chan), // Note that announcement_signatures fails if the channel cannot be announced, // so get_channel_update_for_broadcast will never fail by the time we get here. - update_msg: self.get_channel_update_for_broadcast(chan.get()).unwrap(), + update_msg: Some(self.get_channel_update_for_broadcast(chan.get()).unwrap()), }); }, hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) @@ -5970,7 +5970,7 @@ where msg: announcement, // Note that announcement_signatures fails if the channel cannot be announced, // so get_channel_update_for_broadcast will never fail by the time we get here. - update_msg: self.get_channel_update_for_broadcast(channel).unwrap(), + update_msg: Some(self.get_channel_update_for_broadcast(channel).unwrap()), }); } } @@ -6286,6 +6286,7 @@ where &events::MessageSendEvent::SendChannelAnnouncement { .. } => false, &events::MessageSendEvent::BroadcastChannelAnnouncement { .. } => true, &events::MessageSendEvent::BroadcastChannelUpdate { .. } => true, + &events::MessageSendEvent::BroadcastNodeAnnouncement { .. } => true, &events::MessageSendEvent::SendChannelUpdate { .. } => false, &events::MessageSendEvent::HandleError { .. } => false, &events::MessageSendEvent::SendChannelRangeQuery { .. } => false, diff --git a/lightning/src/ln/functional_test_utils.rs b/lightning/src/ln/functional_test_utils.rs index c2ce158f897..7bf51df5c7a 100644 --- a/lightning/src/ln/functional_test_utils.rs +++ b/lightning/src/ln/functional_test_utils.rs @@ -621,6 +621,9 @@ pub fn remove_first_msg_event_to_node(msg_node_id: &PublicKey, msg_events: &Vec< MessageSendEvent::BroadcastChannelUpdate { .. } => { false }, + MessageSendEvent::BroadcastNodeAnnouncement { .. } => { + false + }, MessageSendEvent::SendChannelUpdate { node_id, .. } => { node_id == msg_node_id }, @@ -1010,7 +1013,7 @@ pub fn create_chan_between_nodes_with_value_b<'a, 'b, 'c>(node_a: &Node<'a, 'b, assert_eq!(events_7.len(), 1); let (announcement, bs_update) = match events_7[0] { MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } => { - (msg, update_msg) + (msg, update_msg.clone().unwrap()) }, _ => panic!("Unexpected event"), }; @@ -1021,6 +1024,7 @@ pub fn create_chan_between_nodes_with_value_b<'a, 'b, 'c>(node_a: &Node<'a, 'b, let as_update = match events_8[0] { MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } => { assert!(*announcement == *msg); + let update_msg = update_msg.clone().unwrap(); assert_eq!(update_msg.contents.short_channel_id, announcement.contents.short_channel_id); assert_eq!(update_msg.contents.short_channel_id, bs_update.contents.short_channel_id); update_msg @@ -1031,7 +1035,7 @@ pub fn create_chan_between_nodes_with_value_b<'a, 'b, 'c>(node_a: &Node<'a, 'b, *node_a.network_chan_count.borrow_mut() += 1; expect_channel_ready_event(&node_b, &node_a.node.get_our_node_id()); - ((*announcement).clone(), (*as_update).clone(), (*bs_update).clone()) + ((*announcement).clone(), as_update, bs_update) } pub fn create_announced_chan_between_nodes<'a, 'b, 'c, 'd>(nodes: &'a Vec>, a: usize, b: usize) -> (msgs::ChannelUpdate, msgs::ChannelUpdate, [u8; 32], Transaction) { diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index 3e05c8ef300..4cbe2a1bf2b 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -1711,10 +1711,12 @@ impl {}, } - match self.message_handler.route_handler.handle_channel_update(&update_msg) { - Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) => - self.forward_broadcast_msg(peers, &wire::Message::ChannelUpdate(update_msg), None), - _ => {}, + if let Some(msg) = update_msg { + match self.message_handler.route_handler.handle_channel_update(&msg) { + Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) => + self.forward_broadcast_msg(peers, &wire::Message::ChannelUpdate(msg), None), + _ => {}, + } } }, MessageSendEvent::BroadcastChannelUpdate { msg } => { @@ -1725,6 +1727,14 @@ impl {}, } }, + MessageSendEvent::BroadcastNodeAnnouncement { msg } => { + log_debug!(self.logger, "Handling BroadcastNodeAnnouncement event in peer_handler for node {}", msg.contents.node_id); + match self.message_handler.route_handler.handle_node_announcement(&msg) { + Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) => + self.forward_broadcast_msg(peers, &wire::Message::NodeAnnouncement(msg), None), + _ => {}, + } + }, MessageSendEvent::SendChannelUpdate { ref node_id, ref msg } => { log_trace!(self.logger, "Handling SendChannelUpdate event in peer_handler for node {} for channel {}", log_pubkey!(node_id), msg.contents.short_channel_id); diff --git a/lightning/src/ln/priv_short_conf_tests.rs b/lightning/src/ln/priv_short_conf_tests.rs index a6fd7c1dc09..adc2b59abbf 100644 --- a/lightning/src/ln/priv_short_conf_tests.rs +++ b/lightning/src/ln/priv_short_conf_tests.rs @@ -184,14 +184,14 @@ fn do_test_1_conf_open(connect_style: ConnectStyle) { msg.clone() } else { panic!("Unexpected event"); }; let (bs_announcement, bs_update) = if let MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } = bs_announce_events[1] { - (msg.clone(), update_msg.clone()) + (msg.clone(), update_msg.clone().unwrap()) } else { panic!("Unexpected event"); }; nodes[0].node.handle_announcement_signatures(&nodes[1].node.get_our_node_id(), &bs_announcement_sigs); let as_announce_events = nodes[0].node.get_and_clear_pending_msg_events(); assert_eq!(as_announce_events.len(), 1); let (announcement, as_update) = if let MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } = as_announce_events[0] { - (msg.clone(), update_msg.clone()) + (msg.clone(), update_msg.clone().unwrap()) } else { panic!("Unexpected event"); }; assert_eq!(announcement, bs_announcement); @@ -757,7 +757,7 @@ fn test_public_0conf_channel() { match bs_announcement[0] { MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } => { announcement = msg.clone(); - bs_update = update_msg.clone(); + bs_update = update_msg.clone().unwrap(); }, _ => panic!("Unexpected event"), }; @@ -767,6 +767,7 @@ fn test_public_0conf_channel() { match as_announcement[0] { MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } => { assert!(announcement == *msg); + let update_msg = update_msg.as_ref().unwrap(); assert_eq!(update_msg.contents.short_channel_id, scid); assert_eq!(update_msg.contents.short_channel_id, announcement.contents.short_channel_id); assert_eq!(update_msg.contents.short_channel_id, bs_update.contents.short_channel_id); diff --git a/lightning/src/ln/reload_tests.rs b/lightning/src/ln/reload_tests.rs index 2ca3f21fe94..427c4001b6d 100644 --- a/lightning/src/ln/reload_tests.rs +++ b/lightning/src/ln/reload_tests.rs @@ -140,7 +140,7 @@ fn test_funding_peer_disconnect() { assert_eq!(events_7.len(), 1); let (chan_announcement, as_update) = match events_7[0] { MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } => { - (msg.clone(), update_msg.clone()) + (msg.clone(), update_msg.clone().unwrap()) }, _ => panic!("Unexpected event {:?}", events_7[0]), }; @@ -153,7 +153,7 @@ fn test_funding_peer_disconnect() { let bs_update = match events_8[0] { MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } => { assert_eq!(*msg, chan_announcement); - update_msg.clone() + update_msg.clone().unwrap() }, _ => panic!("Unexpected event {:?}", events_8[0]), }; diff --git a/lightning/src/util/events.rs b/lightning/src/util/events.rs index 475533c902a..265e9186611 100644 --- a/lightning/src/util/events.rs +++ b/lightning/src/util/events.rs @@ -1612,13 +1612,18 @@ pub enum MessageSendEvent { /// The channel_announcement which should be sent. msg: msgs::ChannelAnnouncement, /// The followup channel_update which should be sent. - update_msg: msgs::ChannelUpdate, + update_msg: Option, }, /// Used to indicate that a channel_update should be broadcast to all peers. BroadcastChannelUpdate { /// The channel_update which should be sent. msg: msgs::ChannelUpdate, }, + /// Used to indicate that a node_announcement should be broadcast to all peers. + BroadcastNodeAnnouncement { + /// The node_announcement which should be sent. + msg: msgs::NodeAnnouncement, + }, /// Used to indicate that a channel_update should be sent to a single peer. /// In contrast to [`Self::BroadcastChannelUpdate`], this is used when the channel is a /// private channel and we shouldn't be informing all of our peers of channel parameters. From 0da7bbd5ecc578ee13d75a31c4627070c69c875f Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Sun, 22 Jan 2023 04:14:58 +0000 Subject: [PATCH 09/16] Forward gossip messages which were verified asynchronously Gossip messages which were verified against the chain asynchronously should still be forwarded to peers, but must now go out via a new `P2PGossipSync` parameter in the `AccessResolver::resolve` method, allowing us to wire them up to the `P2PGossipSync`'s `MessageSendEventsProvider` implementation. --- fuzz/src/router.rs | 4 +-- lightning/src/routing/gossip.rs | 30 +++++++++++++++++ lightning/src/routing/utxo.rs | 57 +++++++++++++++++++++++++++++---- 3 files changed, 83 insertions(+), 8 deletions(-) diff --git a/fuzz/src/router.rs b/fuzz/src/router.rs index a30c7d28af4..a7c50de4a47 100644 --- a/fuzz/src/router.rs +++ b/fuzz/src/router.rs @@ -99,12 +99,12 @@ impl UtxoLookup for FuzzChainSource<'_, '_, Out> { &[1, _] => UtxoResult::Sync(Err(UtxoLookupError::UnknownTx)), &[2, _] => { let future = UtxoFuture::new(); - future.resolve(self.net_graph, Ok(txo_res)); + future.resolve_without_forwarding(self.net_graph, Ok(txo_res)); UtxoResult::Async(future.clone()) }, &[3, _] => { let future = UtxoFuture::new(); - future.resolve(self.net_graph, Err(UtxoLookupError::UnknownTx)); + future.resolve_without_forwarding(self.net_graph, Err(UtxoLookupError::UnknownTx)); UtxoResult::Async(future.clone()) }, &[4, _] => { diff --git a/lightning/src/routing/gossip.rs b/lightning/src/routing/gossip.rs index 8a54e63e2b5..2d3cda3f473 100644 --- a/lightning/src/routing/gossip.rs +++ b/lightning/src/routing/gossip.rs @@ -272,6 +272,36 @@ where U::Target: UtxoLookup, L::Target: Logger false } } + + /// Used to broadcast forward gossip messages which were validated async. + /// + /// Note that this will ignore events other than `Broadcast*` or messages with too much excess + /// data. + pub(super) fn forward_gossip_msg(&self, mut ev: MessageSendEvent) { + match &mut ev { + MessageSendEvent::BroadcastChannelAnnouncement { msg, ref mut update_msg } => { + if msg.contents.excess_data.len() > MAX_EXCESS_BYTES_FOR_RELAY { return; } + if update_msg.as_ref() + .map(|msg| msg.contents.excess_data.len()).unwrap_or(0) > MAX_EXCESS_BYTES_FOR_RELAY + { + *update_msg = None; + } + }, + MessageSendEvent::BroadcastChannelUpdate { msg } => { + if msg.contents.excess_data.len() > MAX_EXCESS_BYTES_FOR_RELAY { return; } + }, + MessageSendEvent::BroadcastNodeAnnouncement { msg } => { + if msg.contents.excess_data.len() > MAX_EXCESS_BYTES_FOR_RELAY || + msg.contents.excess_address_data.len() > MAX_EXCESS_BYTES_FOR_RELAY || + msg.contents.excess_data.len() + msg.contents.excess_address_data.len() > MAX_EXCESS_BYTES_FOR_RELAY + { + return; + } + }, + _ => return, + } + self.pending_events.lock().unwrap().push(ev); + } } impl NetworkGraph where L::Target: Logger { diff --git a/lightning/src/routing/utxo.rs b/lightning/src/routing/utxo.rs index 56b67184589..eb8e2a7c620 100644 --- a/lightning/src/routing/utxo.rs +++ b/lightning/src/routing/utxo.rs @@ -18,7 +18,8 @@ use bitcoin::hashes::hex::ToHex; use crate::ln::chan_utils::make_funding_redeemscript_from_slices; use crate::ln::msgs::{self, LightningError, ErrorAction}; -use crate::routing::gossip::{NetworkGraph, NodeId}; +use crate::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync}; +use crate::util::events::MessageSendEvent; use crate::util::logger::{Level, Logger}; use crate::util::ser::Writeable; @@ -144,8 +145,32 @@ impl UtxoFuture { } /// Resolves this future against the given `graph` and with the given `result`. - pub fn resolve(&self, graph: &NetworkGraph, result: Result) + /// + /// This is identical to calling [`UtxoFuture::resolve`] with a dummy `gossip`, disabling + /// forwarding the validated gossip message onwards to peers. + pub fn resolve_without_forwarding(&self, + graph: &NetworkGraph, result: Result) where L::Target: Logger { + self.do_resolve(graph, result); + } + + /// Resolves this future against the given `graph` and with the given `result`. + /// + /// The given `gossip` is used to broadcast any validated messages onwards to all peers which + /// have available buffer space. + pub fn resolve>, U: Deref, GS: Deref>>(&self, + graph: &NetworkGraph, gossip: GS, result: Result + ) where L::Target: Logger, U::Target: UtxoLookup { + let mut res = self.do_resolve(graph, result); + for msg_opt in res.iter_mut() { + if let Some(msg) = msg_opt.take() { + gossip.forward_gossip_msg(msg); + } + } + } + + fn do_resolve(&self, graph: &NetworkGraph, result: Result) + -> [Option; 5] where L::Target: Logger { let (announcement, node_a, node_b, update_a, update_b) = { let mut pending_checks = graph.pending_checks.internal.lock().unwrap(); let mut async_messages = self.state.lock().unwrap(); @@ -155,7 +180,7 @@ impl UtxoFuture { // `channel_announce` yet. That's okay, we can set the `complete` field which it will // check once it gets control again. async_messages.complete = Some(result); - return; + return [None, None, None, None, None]; } let announcement_msg = match async_messages.channel_announce.as_ref().unwrap() { @@ -172,6 +197,9 @@ impl UtxoFuture { async_messages.latest_channel_update_b.take()) }; + let mut res = [None, None, None, None, None]; + let mut res_idx = 0; + // Now that we've updated our internal state, pass the pending messages back through the // network graph with a different `UtxoLookup` which will resolve immediately. // Note that we ignore errors as we don't disconnect peers anyway, so there's nothing to do @@ -179,7 +207,12 @@ impl UtxoFuture { let resolver = UtxoResolver(result); match announcement { ChannelAnnouncement::Full(signed_msg) => { - let _ = graph.update_channel_from_announcement(&signed_msg, &Some(&resolver)); + if graph.update_channel_from_announcement(&signed_msg, &Some(&resolver)).is_ok() { + res[res_idx] = Some(MessageSendEvent::BroadcastChannelAnnouncement { + msg: signed_msg, update_msg: None, + }); + res_idx += 1; + } }, ChannelAnnouncement::Unsigned(msg) => { let _ = graph.update_channel_from_unsigned_announcement(&msg, &Some(&resolver)); @@ -189,7 +222,12 @@ impl UtxoFuture { for announce in core::iter::once(node_a).chain(core::iter::once(node_b)) { match announce { Some(NodeAnnouncement::Full(signed_msg)) => { - let _ = graph.update_node_from_announcement(&signed_msg); + if graph.update_node_from_announcement(&signed_msg).is_ok() { + res[res_idx] = Some(MessageSendEvent::BroadcastNodeAnnouncement { + msg: signed_msg, + }); + res_idx += 1; + } }, Some(NodeAnnouncement::Unsigned(msg)) => { let _ = graph.update_node_from_unsigned_announcement(&msg); @@ -201,7 +239,12 @@ impl UtxoFuture { for update in core::iter::once(update_a).chain(core::iter::once(update_b)) { match update { Some(ChannelUpdate::Full(signed_msg)) => { - let _ = graph.update_channel(&signed_msg); + if graph.update_channel(&signed_msg).is_ok() { + res[res_idx] = Some(MessageSendEvent::BroadcastChannelUpdate { + msg: signed_msg, + }); + res_idx += 1; + } }, Some(ChannelUpdate::Unsigned(msg)) => { let _ = graph.update_channel_unsigned(&msg); @@ -209,6 +252,8 @@ impl UtxoFuture { None => {}, } } + + res } } From 02b187856bc7f1d7edd46897e2f983a65a97230c Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Sun, 22 Jan 2023 05:12:45 +0000 Subject: [PATCH 10/16] Allow `RoutingMessageHandler` to signal backpressure Now that we allow `handle_channel_announcement` to (indirectly) spawn async tasks which will complete later, we have to ensure it can apply backpressure all the way up to the TCP socket to ensure we don't end up with too many buffers allocated for UTXO validation. We do this by adding a new method to `RoutingMessageHandler` which allows it to signal if there are "many" checks pending and `channel_announcement` messages should be delayed. The actual `PeerManager` implementation thereof is done in the next commit. --- lightning-net-tokio/src/lib.rs | 1 + lightning/src/ln/msgs.rs | 7 +++++ lightning/src/ln/peer_handler.rs | 1 + lightning/src/routing/gossip.rs | 4 +++ lightning/src/routing/utxo.rs | 46 ++++++++++++++++++++++++++++++++ lightning/src/util/test_utils.rs | 2 ++ 6 files changed, 61 insertions(+) diff --git a/lightning-net-tokio/src/lib.rs b/lightning-net-tokio/src/lib.rs index 9b480fb0ae4..9616e0db61d 100644 --- a/lightning-net-tokio/src/lib.rs +++ b/lightning-net-tokio/src/lib.rs @@ -623,6 +623,7 @@ mod tests { fn handle_query_short_channel_ids(&self, _their_node_id: &PublicKey, _msg: QueryShortChannelIds) -> Result<(), LightningError> { Ok(()) } fn provided_node_features(&self) -> NodeFeatures { NodeFeatures::empty() } fn provided_init_features(&self, _their_node_id: &PublicKey) -> InitFeatures { InitFeatures::empty() } + fn processing_queue_high(&self) -> bool { false } } impl ChannelMessageHandler for MsgHandler { fn handle_open_channel(&self, _their_node_id: &PublicKey, _msg: &OpenChannel) {} diff --git a/lightning/src/ln/msgs.rs b/lightning/src/ln/msgs.rs index c971df906c2..e8b40c71d89 100644 --- a/lightning/src/ln/msgs.rs +++ b/lightning/src/ln/msgs.rs @@ -1082,6 +1082,13 @@ pub trait RoutingMessageHandler : MessageSendEventsProvider { /// list of `short_channel_id`s. fn handle_query_short_channel_ids(&self, their_node_id: &PublicKey, msg: QueryShortChannelIds) -> Result<(), LightningError>; + // Handler queueing status: + /// Indicates that there are a large number of [`ChannelAnnouncement`] (or other) messages + /// pending some async action. While there is no guarantee of the rate of future messages, the + /// caller should seek to reduce the rate of new gossip messages handled, especially + /// [`ChannelAnnouncement`]s. + fn processing_queue_high(&self) -> bool; + // Handler information: /// Gets the node feature flags which this handler itself supports. All available handlers are /// queried similarly and their feature flags are OR'd together to form the [`NodeFeatures`] diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index 4cbe2a1bf2b..11f1206339d 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -81,6 +81,7 @@ impl RoutingMessageHandler for IgnoringMessageHandler { fn provided_init_features(&self, _their_node_id: &PublicKey) -> InitFeatures { InitFeatures::empty() } + fn processing_queue_high(&self) -> bool { false } } impl OnionMessageProvider for IgnoringMessageHandler { fn next_onion_message_for_peer(&self, _peer_node_id: PublicKey) -> Option { None } diff --git a/lightning/src/routing/gossip.rs b/lightning/src/routing/gossip.rs index 2d3cda3f473..e2c59b4f434 100644 --- a/lightning/src/routing/gossip.rs +++ b/lightning/src/routing/gossip.rs @@ -657,6 +657,10 @@ where U::Target: UtxoLookup, L::Target: Logger features.set_gossip_queries_optional(); features } + + fn processing_queue_high(&self) -> bool { + self.network_graph.pending_checks.too_many_checks_pending() + } } impl>, U: Deref, L: Deref> MessageSendEventsProvider for P2PGossipSync diff --git a/lightning/src/routing/utxo.rs b/lightning/src/routing/utxo.rs index eb8e2a7c620..da9c65c3b12 100644 --- a/lightning/src/routing/utxo.rs +++ b/lightning/src/routing/utxo.rs @@ -148,6 +148,13 @@ impl UtxoFuture { /// /// This is identical to calling [`UtxoFuture::resolve`] with a dummy `gossip`, disabling /// forwarding the validated gossip message onwards to peers. + /// + /// Because this may cause the [`NetworkGraph`]'s [`processing_queue_high`] to flip, in order + /// to allow us to interact with peers again, you should call [`PeerManager::process_events`] + /// after this. + /// + /// [`processing_queue_high`]: crate::ln::msgs::RoutingMessageHandler::processing_queue_high + /// [`PeerManager::process_events`]: crate::ln::peer_handler::PeerManager::process_events pub fn resolve_without_forwarding(&self, graph: &NetworkGraph, result: Result) where L::Target: Logger { @@ -158,6 +165,13 @@ impl UtxoFuture { /// /// The given `gossip` is used to broadcast any validated messages onwards to all peers which /// have available buffer space. + /// + /// Because this may cause the [`NetworkGraph`]'s [`processing_queue_high`] to flip, in order + /// to allow us to interact with peers again, you should call [`PeerManager::process_events`] + /// after this. + /// + /// [`processing_queue_high`]: crate::ln::msgs::RoutingMessageHandler::processing_queue_high + /// [`PeerManager::process_events`]: crate::ln::peer_handler::PeerManager::process_events pub fn resolve>, U: Deref, GS: Deref>>(&self, graph: &NetworkGraph, gossip: GS, result: Result ) where L::Target: Logger, U::Target: UtxoLookup { @@ -510,4 +524,36 @@ impl PendingChecks { } } } + + /// The maximum number of pending gossip checks before [`Self::too_many_checks_pending`] + /// returns `true`. Note that this isn't a strict upper-bound on the number of checks pending - + /// each peer may, at a minimum, read one more socket buffer worth of `channel_announcement`s + /// which we'll have to process. With a socket buffer of 4KB and a minimum + /// `channel_announcement` size of, roughly, 429 bytes, this may leave us with `10*our peer + /// count` messages to process beyond this limit. Because we'll probably have a few peers, + /// there's no reason for this constant to be materially less than 30 or so, and 32 in-flight + /// checks should be more than enough for decent parallelism. + const MAX_PENDING_LOOKUPS: usize = 32; + + /// Returns true if there are a large number of async checks pending and future + /// `channel_announcement` messages should be delayed. Note that this is only a hint and + /// messages already in-flight may still have to be handled for various reasons. + pub(super) fn too_many_checks_pending(&self) -> bool { + let mut pending_checks = self.internal.lock().unwrap(); + if pending_checks.channels.len() > Self::MAX_PENDING_LOOKUPS { + // If we have many channel checks pending, ensure we don't have any dangling checks + // (i.e. checks where the user told us they'd call back but drop'd the `AccessFuture` + // instead) before we commit to applying backpressure. + pending_checks.channels.retain(|_, chan| { + Weak::upgrade(&chan).is_some() + }); + pending_checks.nodes.retain(|_, channels| { + channels.retain(|chan| Weak::upgrade(&chan).is_some()); + !channels.is_empty() + }); + pending_checks.channels.len() > Self::MAX_PENDING_LOOKUPS + } else { + false + } + } } diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index b62479555cc..b47aef6f7e8 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -571,6 +571,8 @@ impl msgs::RoutingMessageHandler for TestRoutingMessageHandler { features.set_gossip_queries_optional(); features } + + fn processing_queue_high(&self) -> bool { false } } impl events::MessageSendEventsProvider for TestRoutingMessageHandler { From 00a70c25f9111f8f733f2ca4a0a61ba67d2d56a8 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Sun, 22 Jan 2023 18:08:33 +0000 Subject: [PATCH 11/16] Apply backpressure when we have too many gossip checks in-flight Now that the `RoutingMessageHandler` can signal that it needs to apply message backpressure, we implement it here in the `PeerManager`. There's not much complicated here, aside from noting that we need to add the ability to call `send_data` with no data to indicate that reading should resume (and track when we may need to make such calls when updating the routing-backpressure state). --- lightning/src/ln/peer_handler.rs | 102 +++++++++++++++++++++---------- 1 file changed, 71 insertions(+), 31 deletions(-) diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index 11f1206339d..1bbb30b6b56 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -563,6 +563,9 @@ pub struct PeerManager bool { + !self.gossip_processing_backlogged.load(Ordering::Relaxed) && peer.should_read() + } + + fn update_gossip_backlogged(&self) { + let new_state = self.message_handler.route_handler.processing_queue_high(); + let prev_state = self.gossip_processing_backlogged.swap(new_state, Ordering::Relaxed); + if prev_state && !new_state { + self.gossip_processing_backlog_lifted.store(true, Ordering::Relaxed); + } + } + + fn do_attempt_write_data(&self, descriptor: &mut Descriptor, peer: &mut Peer, force_one_write: bool) { + let mut have_written = false; while !peer.awaiting_write_event { if peer.should_buffer_onion_message() { if let Some(peer_node_id) = peer.their_node_id { @@ -905,12 +923,22 @@ impl return, + None => { + if force_one_write && !have_written { + let should_read = self.peer_should_read(&peer); + if should_read { + let data_sent = descriptor.send_data(&[], should_read); + debug_assert_eq!(data_sent, 0, "Can't write more than no data"); + } + } + return + }, Some(buff) => buff, }; let pending = &next_buff[peer.pending_outbound_buffer_first_msg_offset..]; - let data_sent = descriptor.send_data(pending, peer.should_read()); + let data_sent = descriptor.send_data(pending, self.peer_should_read(&peer)); + have_written = true; peer.pending_outbound_buffer_first_msg_offset += data_sent; if peer.pending_outbound_buffer_first_msg_offset == next_buff.len() { peer.pending_outbound_buffer_first_msg_offset = 0; @@ -945,7 +973,7 @@ impl { let mut peer = peer_mutex.lock().unwrap(); peer.awaiting_write_event = false; - self.do_attempt_write_data(descriptor, &mut peer); + self.do_attempt_write_data(descriptor, &mut peer, false); } }; Ok(()) @@ -1192,7 +1220,7 @@ impl MessageHandlingError { e.into() })? { should_forward = Some(wire::Message::ChannelAnnouncement(msg)); } + self.update_gossip_backlogged(); }, wire::Message::NodeAnnouncement(msg) => { if self.message_handler.route_handler.handle_node_announcement(&msg) .map_err(|e| -> MessageHandlingError { e.into() })? { should_forward = Some(wire::Message::NodeAnnouncement(msg)); } + self.update_gossip_backlogged(); }, wire::Message::ChannelUpdate(msg) => { self.message_handler.chan_handler.handle_channel_update(&their_node_id, &msg); @@ -1417,6 +1447,7 @@ impl MessageHandlingError { e.into() })? { should_forward = Some(wire::Message::ChannelUpdate(msg)); } + self.update_gossip_backlogged(); }, wire::Message::QueryShortChannelIds(msg) => { self.message_handler.route_handler.handle_query_short_channel_ids(&their_node_id, msg)?; @@ -1568,6 +1599,9 @@ impl 0 && !peer.received_message_since_timer_tick) + || peer.awaiting_pong_timer_tick_intervals as u64 > + MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER as u64 * peers_lock.len() as u64 + { + descriptors_needing_disconnect.push(descriptor.clone()); + break; + } peer.received_message_since_timer_tick = false; - continue; - } - if (peer.awaiting_pong_timer_tick_intervals > 0 && !peer.received_message_since_timer_tick) - || peer.awaiting_pong_timer_tick_intervals as u64 > - MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER as u64 * peers_lock.len() as u64 - { - descriptors_needing_disconnect.push(descriptor.clone()); - continue; - } - peer.received_message_since_timer_tick = false; + if peer.awaiting_pong_timer_tick_intervals > 0 { + peer.awaiting_pong_timer_tick_intervals += 1; + break; + } - if peer.awaiting_pong_timer_tick_intervals > 0 { - peer.awaiting_pong_timer_tick_intervals += 1; - continue; + peer.awaiting_pong_timer_tick_intervals = 1; + let ping = msgs::Ping { + ponglen: 0, + byteslen: 64, + }; + self.enqueue_message(&mut *peer, &ping); + break; } - - peer.awaiting_pong_timer_tick_intervals = 1; - let ping = msgs::Ping { - ponglen: 0, - byteslen: 64, - }; - self.enqueue_message(&mut *peer, &ping); - self.do_attempt_write_data(&mut (descriptor.clone()), &mut *peer); + self.do_attempt_write_data(&mut (descriptor.clone()), &mut *peer, flush_read_disabled); } } From c21480f7d3659091e7ebd0fdd71aa4de21866e75 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 30 Jan 2023 17:56:46 +0000 Subject: [PATCH 12/16] Don't apply gossip backpressure to non-channel-announcing peers When we apply the new gossip-async-check backpressure on peer connections, if a peer has never sent us a `channel_announcement` at all, we really shouldn't delay reading their messages. This does so by tracking, on a per-peer basis, whether they've sent us a channel_announcement, and resetting that state whenever we're not backlogged. --- lightning/src/ln/peer_handler.rs | 38 +++++++++++++++++++++++++------- 1 file changed, 30 insertions(+), 8 deletions(-) diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index 1bbb30b6b56..cecf4332e29 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -413,6 +413,12 @@ struct Peer { awaiting_pong_timer_tick_intervals: i8, received_message_since_timer_tick: bool, sent_gossip_timestamp_filter: bool, + + /// Indicates we've received a `channel_announcement` since the last time we had + /// [`PeerManager::gossip_processing_backlogged`] set (or, really, that we've received a + /// `channel_announcement` at all - we set this unconditionally but unset it every time we + /// check if we're gossip-processing-backlogged). + received_channel_announce_since_backlogged: bool, } impl Peer { @@ -449,8 +455,12 @@ impl Peer { /// Returns whether we should be reading bytes from this peer, based on whether its outbound /// buffer still has space and we don't need to pause reads to get some writes out. - fn should_read(&self) -> bool { - self.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE + fn should_read(&mut self, gossip_processing_backlogged: bool) -> bool { + if !gossip_processing_backlogged { + self.received_channel_announce_since_backlogged = false; + } + self.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE && + (!gossip_processing_backlogged || !self.received_channel_announce_since_backlogged) } /// Determines if we should push additional gossip background sync (aka "backfill") onto a peer's @@ -799,6 +809,8 @@ impl bool { - !self.gossip_processing_backlogged.load(Ordering::Relaxed) && peer.should_read() + fn peer_should_read(&self, peer: &mut Peer) -> bool { + peer.should_read(self.gossip_processing_backlogged.load(Ordering::Relaxed)) } fn update_gossip_backlogged(&self) { @@ -922,10 +936,10 @@ impl { if force_one_write && !have_written { - let should_read = self.peer_should_read(&peer); if should_read { let data_sent = descriptor.send_data(&[], should_read); debug_assert_eq!(data_sent, 0, "Can't write more than no data"); @@ -937,7 +951,7 @@ impl Date: Wed, 1 Feb 2023 21:09:46 +0000 Subject: [PATCH 13/16] Suggest a socket read buffer of 4KiB to limit message count ...and switch the same in `lightning-net-tokio` --- lightning-net-tokio/src/lib.rs | 5 +++-- lightning/src/ln/peer_handler.rs | 3 +++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/lightning-net-tokio/src/lib.rs b/lightning-net-tokio/src/lib.rs index 9616e0db61d..b259f77eff8 100644 --- a/lightning-net-tokio/src/lib.rs +++ b/lightning-net-tokio/src/lib.rs @@ -176,8 +176,9 @@ impl Connection { let (event_waker, event_receiver) = mpsc::channel(1); tokio::spawn(Self::poll_event_process(peer_manager.clone(), event_receiver)); - // 8KB is nice and big but also should never cause any issues with stack overflowing. - let mut buf = [0; 8192]; + // 4KiB is nice and big without handling too many messages all at once, giving other peers + // a chance to do some work. + let mut buf = [0; 4096]; let mut our_descriptor = SocketDescriptor::new(us.clone()); // An enum describing why we did/are disconnecting: diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index cecf4332e29..f2406172655 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -1005,6 +1005,9 @@ impl Result { From 4948c3b26fea84eb0e8ecaf06351250888e88c4f Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Wed, 8 Feb 2023 22:26:56 +0000 Subject: [PATCH 14/16] Support async results in `TestChainSource`, count `get_utxo` calls --- lightning/src/routing/gossip.rs | 13 ++++++++----- lightning/src/routing/router.rs | 4 +++- lightning/src/routing/utxo.rs | 1 + lightning/src/util/test_utils.rs | 9 ++++++--- 4 files changed, 18 insertions(+), 9 deletions(-) diff --git a/lightning/src/routing/gossip.rs b/lightning/src/routing/gossip.rs index e2c59b4f434..0eeacb822c1 100644 --- a/lightning/src/routing/gossip.rs +++ b/lightning/src/routing/gossip.rs @@ -1927,7 +1927,7 @@ mod tests { #[cfg(feature = "std")] use crate::ln::features::InitFeatures; use crate::routing::gossip::{P2PGossipSync, NetworkGraph, NetworkUpdate, NodeAlias, MAX_EXCESS_BYTES_FOR_RELAY, NodeId, RoutingFees, ChannelUpdateInfo, ChannelInfo, NodeAnnouncementInfo, NodeInfo}; - use crate::routing::utxo::UtxoLookupError; + use crate::routing::utxo::{UtxoLookupError, UtxoResult}; use crate::ln::msgs::{RoutingMessageHandler, UnsignedNodeAnnouncement, NodeAnnouncement, UnsignedChannelAnnouncement, ChannelAnnouncement, UnsignedChannelUpdate, ChannelUpdate, ReplyChannelRange, QueryChannelRange, QueryShortChannelIds, MAX_VALUE_MSAT}; @@ -2159,7 +2159,7 @@ mod tests { // Test if an associated transaction were not on-chain (or not confirmed). let chain_source = test_utils::TestChainSource::new(Network::Testnet); - *chain_source.utxo_ret.lock().unwrap() = Err(UtxoLookupError::UnknownTx); + *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Sync(Err(UtxoLookupError::UnknownTx)); let network_graph = NetworkGraph::new(genesis_hash, &logger); gossip_sync = P2PGossipSync::new(&network_graph, Some(&chain_source), &logger); @@ -2172,7 +2172,8 @@ mod tests { }; // Now test if the transaction is found in the UTXO set and the script is correct. - *chain_source.utxo_ret.lock().unwrap() = Ok(TxOut { value: 0, script_pubkey: good_script.clone() }); + *chain_source.utxo_ret.lock().unwrap() = + UtxoResult::Sync(Ok(TxOut { value: 0, script_pubkey: good_script.clone() })); let valid_announcement = get_signed_channel_announcement(|unsigned_announcement| { unsigned_announcement.short_channel_id += 2; }, node_1_privkey, node_2_privkey, &secp_ctx); @@ -2190,7 +2191,8 @@ mod tests { // If we receive announcement for the same channel, once we've validated it against the // chain, we simply ignore all new (duplicate) announcements. - *chain_source.utxo_ret.lock().unwrap() = Ok(TxOut { value: 0, script_pubkey: good_script }); + *chain_source.utxo_ret.lock().unwrap() = + UtxoResult::Sync(Ok(TxOut { value: 0, script_pubkey: good_script })); match gossip_sync.handle_channel_announcement(&valid_announcement) { Ok(_) => panic!(), Err(e) => assert_eq!(e.err, "Already have chain-validated channel") @@ -2264,7 +2266,8 @@ mod tests { { // Announce a channel we will update let good_script = get_channel_script(&secp_ctx); - *chain_source.utxo_ret.lock().unwrap() = Ok(TxOut { value: amount_sats, script_pubkey: good_script.clone() }); + *chain_source.utxo_ret.lock().unwrap() = + UtxoResult::Sync(Ok(TxOut { value: amount_sats, script_pubkey: good_script.clone() })); let valid_channel_announcement = get_signed_channel_announcement(|_| {}, node_1_privkey, node_2_privkey, &secp_ctx); short_channel_id = valid_channel_announcement.contents.short_channel_id; diff --git a/lightning/src/routing/router.rs b/lightning/src/routing/router.rs index 65fbbbc9646..0e51b2bf86b 100644 --- a/lightning/src/routing/router.rs +++ b/lightning/src/routing/router.rs @@ -2112,6 +2112,7 @@ fn build_route_from_hops_internal( #[cfg(test)] mod tests { use crate::routing::gossip::{NetworkGraph, P2PGossipSync, NodeId, EffectiveCapacity}; + use crate::routing::utxo::UtxoResult; use crate::routing::router::{get_route, build_route_from_hops_internal, add_random_cltv_offset, default_node_features, PaymentParameters, Route, RouteHint, RouteHintHop, RouteHop, RoutingFees, DEFAULT_MAX_TOTAL_CLTV_EXPIRY_DELTA, MAX_PATH_LENGTH_ESTIMATE}; @@ -3526,7 +3527,8 @@ mod tests { .push_opcode(opcodes::all::OP_PUSHNUM_2) .push_opcode(opcodes::all::OP_CHECKMULTISIG).into_script().to_v0_p2wsh(); - *chain_monitor.utxo_ret.lock().unwrap() = Ok(TxOut { value: 15, script_pubkey: good_script.clone() }); + *chain_monitor.utxo_ret.lock().unwrap() = + UtxoResult::Sync(Ok(TxOut { value: 15, script_pubkey: good_script.clone() })); gossip_sync.add_utxo_lookup(Some(chain_monitor)); add_channel(&gossip_sync, &secp_ctx, &privkeys[0], &privkeys[2], ChannelFeatures::from_le_bytes(id_to_feature_flags(3)), 333); diff --git a/lightning/src/routing/utxo.rs b/lightning/src/routing/utxo.rs index da9c65c3b12..5a129f88da4 100644 --- a/lightning/src/routing/utxo.rs +++ b/lightning/src/routing/utxo.rs @@ -42,6 +42,7 @@ pub enum UtxoLookupError { /// The result of a [`UtxoLookup::get_utxo`] call. A call may resolve either synchronously, /// returning the `Sync` variant, or asynchronously, returning an [`UtxoFuture`] in the `Async` /// variant. +#[derive(Clone)] pub enum UtxoResult { /// A result which was resolved synchronously. It either includes a [`TxOut`] for the output /// requested or a [`UtxoLookupError`]. diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index b47aef6f7e8..83647a73385 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -841,7 +841,8 @@ impl core::fmt::Debug for OnGetShutdownScriptpubkey { pub struct TestChainSource { pub genesis_hash: BlockHash, - pub utxo_ret: Mutex>, + pub utxo_ret: Mutex, + pub get_utxo_call_count: AtomicUsize, pub watched_txn: Mutex>, pub watched_outputs: Mutex>, } @@ -851,7 +852,8 @@ impl TestChainSource { let script_pubkey = Builder::new().push_opcode(opcodes::OP_TRUE).into_script(); Self { genesis_hash: genesis_block(network).block_hash(), - utxo_ret: Mutex::new(Ok(TxOut { value: u64::max_value(), script_pubkey })), + utxo_ret: Mutex::new(UtxoResult::Sync(Ok(TxOut { value: u64::max_value(), script_pubkey }))), + get_utxo_call_count: AtomicUsize::new(0), watched_txn: Mutex::new(HashSet::new()), watched_outputs: Mutex::new(HashSet::new()), } @@ -860,11 +862,12 @@ impl TestChainSource { impl UtxoLookup for TestChainSource { fn get_utxo(&self, genesis_hash: &BlockHash, _short_channel_id: u64) -> UtxoResult { + self.get_utxo_call_count.fetch_add(1, Ordering::Relaxed); if self.genesis_hash != *genesis_hash { return UtxoResult::Sync(Err(UtxoLookupError::UnknownChain)); } - UtxoResult::Sync(self.utxo_ret.lock().unwrap().clone()) + self.utxo_ret.lock().unwrap().clone() } } From 15d54ccd9c0bd0b83dd12ca31944802b0d57c317 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 23 Jan 2023 04:59:13 +0000 Subject: [PATCH 15/16] Add tests for the new async gossip checking internal APIs --- lightning/src/routing/gossip.rs | 10 +- lightning/src/routing/utxo.rs | 313 +++++++++++++++++++++++++++++++- 2 files changed, 312 insertions(+), 11 deletions(-) diff --git a/lightning/src/routing/gossip.rs b/lightning/src/routing/gossip.rs index 0eeacb822c1..13c8f09a377 100644 --- a/lightning/src/routing/gossip.rs +++ b/lightning/src/routing/gossip.rs @@ -1921,7 +1921,7 @@ impl ReadOnlyNetworkGraph<'_> { } #[cfg(test)] -mod tests { +pub(crate) mod tests { use crate::ln::channelmanager; use crate::ln::chan_utils::make_funding_redeemscript; #[cfg(feature = "std")] @@ -1988,7 +1988,7 @@ mod tests { assert!(!gossip_sync.should_request_full_sync(&node_id)); } - fn get_signed_node_announcement(f: F, node_key: &SecretKey, secp_ctx: &Secp256k1) -> NodeAnnouncement { + pub(crate) fn get_signed_node_announcement(f: F, node_key: &SecretKey, secp_ctx: &Secp256k1) -> NodeAnnouncement { let node_id = NodeId::from_pubkey(&PublicKey::from_secret_key(&secp_ctx, node_key)); let mut unsigned_announcement = UnsignedNodeAnnouncement { features: channelmanager::provided_node_features(&UserConfig::default()), @@ -2008,7 +2008,7 @@ mod tests { } } - fn get_signed_channel_announcement(f: F, node_1_key: &SecretKey, node_2_key: &SecretKey, secp_ctx: &Secp256k1) -> ChannelAnnouncement { + pub(crate) fn get_signed_channel_announcement(f: F, node_1_key: &SecretKey, node_2_key: &SecretKey, secp_ctx: &Secp256k1) -> ChannelAnnouncement { let node_id_1 = PublicKey::from_secret_key(&secp_ctx, node_1_key); let node_id_2 = PublicKey::from_secret_key(&secp_ctx, node_2_key); let node_1_btckey = &SecretKey::from_slice(&[40; 32]).unwrap(); @@ -2035,14 +2035,14 @@ mod tests { } } - fn get_channel_script(secp_ctx: &Secp256k1) -> Script { + pub(crate) fn get_channel_script(secp_ctx: &Secp256k1) -> Script { let node_1_btckey = SecretKey::from_slice(&[40; 32]).unwrap(); let node_2_btckey = SecretKey::from_slice(&[39; 32]).unwrap(); make_funding_redeemscript(&PublicKey::from_secret_key(secp_ctx, &node_1_btckey), &PublicKey::from_secret_key(secp_ctx, &node_2_btckey)).to_v0_p2wsh() } - fn get_signed_channel_update(f: F, node_key: &SecretKey, secp_ctx: &Secp256k1) -> ChannelUpdate { + pub(crate) fn get_signed_channel_update(f: F, node_key: &SecretKey, secp_ctx: &Secp256k1) -> ChannelUpdate { let mut unsigned_channel_update = UnsignedChannelUpdate { chain_hash: genesis_block(Network::Testnet).header.block_hash(), short_channel_id: 0, diff --git a/lightning/src/routing/utxo.rs b/lightning/src/routing/utxo.rs index 5a129f88da4..020993f23fb 100644 --- a/lightning/src/routing/utxo.rs +++ b/lightning/src/routing/utxo.rs @@ -368,11 +368,6 @@ impl PendingChecks { if latest_announce.is_none() || latest_announce.as_ref().unwrap().timestamp() < msg.timestamp { - // If the messages we got has a higher timestamp, just blindly - // assume the signatures on the new message are correct and drop - // the old message. This may cause us to end up dropping valid - // `node_announcement`s if a peer is malicious, but we should get - // the correct ones when the node updates them. *latest_announce = Some( if let Some(msg) = full_msg { NodeAnnouncement::Full(msg.clone()) } else { NodeAnnouncement::Unsigned(msg.clone()) }); @@ -543,7 +538,7 @@ impl PendingChecks { let mut pending_checks = self.internal.lock().unwrap(); if pending_checks.channels.len() > Self::MAX_PENDING_LOOKUPS { // If we have many channel checks pending, ensure we don't have any dangling checks - // (i.e. checks where the user told us they'd call back but drop'd the `AccessFuture` + // (i.e. checks where the user told us they'd call back but drop'd the `UtxoFuture` // instead) before we commit to applying backpressure. pending_checks.channels.retain(|_, chan| { Weak::upgrade(&chan).is_some() @@ -558,3 +553,309 @@ impl PendingChecks { } } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::routing::gossip::tests::*; + use crate::util::test_utils::{TestChainSource, TestLogger}; + use crate::ln::msgs; + + use bitcoin::blockdata::constants::genesis_block; + use bitcoin::secp256k1::{Secp256k1, SecretKey}; + + use core::sync::atomic::Ordering; + + fn get_network() -> (TestChainSource, NetworkGraph>) { + let logger = Box::new(TestLogger::new()); + let genesis_hash = genesis_block(bitcoin::Network::Testnet).header.block_hash(); + let chain_source = TestChainSource::new(bitcoin::Network::Testnet); + let network_graph = NetworkGraph::new(genesis_hash, logger); + + (chain_source, network_graph) + } + + fn get_test_objects() -> (msgs::ChannelAnnouncement, TestChainSource, + NetworkGraph>, bitcoin::Script, msgs::NodeAnnouncement, + msgs::NodeAnnouncement, msgs::ChannelUpdate, msgs::ChannelUpdate, msgs::ChannelUpdate) + { + let secp_ctx = Secp256k1::new(); + + let (chain_source, network_graph) = get_network(); + + let good_script = get_channel_script(&secp_ctx); + let node_1_privkey = &SecretKey::from_slice(&[42; 32]).unwrap(); + let node_2_privkey = &SecretKey::from_slice(&[41; 32]).unwrap(); + let valid_announcement = get_signed_channel_announcement(|_| {}, node_1_privkey, node_2_privkey, &secp_ctx); + + let node_a_announce = get_signed_node_announcement(|_| {}, node_1_privkey, &secp_ctx); + let node_b_announce = get_signed_node_announcement(|_| {}, node_2_privkey, &secp_ctx); + + // Note that we have to set the "direction" flag correctly on both messages + let chan_update_a = get_signed_channel_update(|msg| msg.flags = 0, node_1_privkey, &secp_ctx); + let chan_update_b = get_signed_channel_update(|msg| msg.flags = 1, node_2_privkey, &secp_ctx); + let chan_update_c = get_signed_channel_update(|msg| { + msg.flags = 1; msg.timestamp += 1; }, node_2_privkey, &secp_ctx); + + (valid_announcement, chain_source, network_graph, good_script, node_a_announce, + node_b_announce, chan_update_a, chan_update_b, chan_update_c) + } + + #[test] + fn test_fast_async_lookup() { + // Check that async lookups which resolve quicker than the future is returned to the + // `get_utxo` call can read it still resolve properly. + let (valid_announcement, chain_source, network_graph, good_script, ..) = get_test_objects(); + + let future = UtxoFuture::new(); + future.resolve_without_forwarding(&network_graph, + Ok(TxOut { value: 1_000_000, script_pubkey: good_script })); + *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone()); + + network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap(); + assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_some()); + } + + #[test] + fn test_async_lookup() { + // Test a simple async lookup + let (valid_announcement, chain_source, network_graph, good_script, + node_a_announce, node_b_announce, ..) = get_test_objects(); + + let future = UtxoFuture::new(); + *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone()); + + assert_eq!( + network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err, + "Channel being checked async"); + assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none()); + + future.resolve_without_forwarding(&network_graph, + Ok(TxOut { value: 0, script_pubkey: good_script })); + network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).unwrap(); + network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).unwrap(); + + assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_1) + .unwrap().announcement_info.is_none()); + + network_graph.update_node_from_announcement(&node_a_announce).unwrap(); + network_graph.update_node_from_announcement(&node_b_announce).unwrap(); + + assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_1) + .unwrap().announcement_info.is_some()); + } + + #[test] + fn test_invalid_async_lookup() { + // Test an async lookup which returns an incorrect script + let (valid_announcement, chain_source, network_graph, ..) = get_test_objects(); + + let future = UtxoFuture::new(); + *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone()); + + assert_eq!( + network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err, + "Channel being checked async"); + assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none()); + + future.resolve_without_forwarding(&network_graph, + Ok(TxOut { value: 1_000_000, script_pubkey: bitcoin::Script::new() })); + assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none()); + } + + #[test] + fn test_failing_async_lookup() { + // Test an async lookup which returns an error + let (valid_announcement, chain_source, network_graph, ..) = get_test_objects(); + + let future = UtxoFuture::new(); + *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone()); + + assert_eq!( + network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err, + "Channel being checked async"); + assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none()); + + future.resolve_without_forwarding(&network_graph, Err(UtxoLookupError::UnknownTx)); + assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none()); + } + + #[test] + fn test_updates_async_lookup() { + // Test async lookups will process pending channel_update/node_announcements once they + // complete. + let (valid_announcement, chain_source, network_graph, good_script, node_a_announce, + node_b_announce, chan_update_a, chan_update_b, ..) = get_test_objects(); + + let future = UtxoFuture::new(); + *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone()); + + assert_eq!( + network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err, + "Channel being checked async"); + assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none()); + + assert_eq!( + network_graph.update_node_from_announcement(&node_a_announce).unwrap_err().err, + "Awaiting channel_announcement validation to accept node_announcement"); + assert_eq!( + network_graph.update_node_from_announcement(&node_b_announce).unwrap_err().err, + "Awaiting channel_announcement validation to accept node_announcement"); + + assert_eq!(network_graph.update_channel(&chan_update_a).unwrap_err().err, + "Awaiting channel_announcement validation to accept channel_update"); + assert_eq!(network_graph.update_channel(&chan_update_b).unwrap_err().err, + "Awaiting channel_announcement validation to accept channel_update"); + + future.resolve_without_forwarding(&network_graph, + Ok(TxOut { value: 1_000_000, script_pubkey: good_script })); + + assert!(network_graph.read_only().channels() + .get(&valid_announcement.contents.short_channel_id).unwrap().one_to_two.is_some()); + assert!(network_graph.read_only().channels() + .get(&valid_announcement.contents.short_channel_id).unwrap().two_to_one.is_some()); + + assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_1) + .unwrap().announcement_info.is_some()); + assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_2) + .unwrap().announcement_info.is_some()); + } + + #[test] + fn test_latest_update_async_lookup() { + // Test async lookups will process the latest channel_update if two are received while + // awaiting an async UTXO lookup. + let (valid_announcement, chain_source, network_graph, good_script, _, + _, chan_update_a, chan_update_b, chan_update_c, ..) = get_test_objects(); + + let future = UtxoFuture::new(); + *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone()); + + assert_eq!( + network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err, + "Channel being checked async"); + assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none()); + + assert_eq!(network_graph.update_channel(&chan_update_a).unwrap_err().err, + "Awaiting channel_announcement validation to accept channel_update"); + assert_eq!(network_graph.update_channel(&chan_update_b).unwrap_err().err, + "Awaiting channel_announcement validation to accept channel_update"); + assert_eq!(network_graph.update_channel(&chan_update_c).unwrap_err().err, + "Awaiting channel_announcement validation to accept channel_update"); + + future.resolve_without_forwarding(&network_graph, + Ok(TxOut { value: 1_000_000, script_pubkey: good_script })); + + assert_eq!(chan_update_a.contents.timestamp, chan_update_b.contents.timestamp); + assert!(network_graph.read_only().channels() + .get(&valid_announcement.contents.short_channel_id).as_ref().unwrap() + .one_to_two.as_ref().unwrap().last_update != + network_graph.read_only().channels() + .get(&valid_announcement.contents.short_channel_id).as_ref().unwrap() + .two_to_one.as_ref().unwrap().last_update); + } + + #[test] + fn test_no_double_lookups() { + // Test that a pending async lookup will prevent a second async lookup from flying, but + // only if the channel_announcement message is identical. + let (valid_announcement, chain_source, network_graph, good_script, ..) = get_test_objects(); + + let future = UtxoFuture::new(); + *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone()); + + assert_eq!( + network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err, + "Channel being checked async"); + assert_eq!(chain_source.get_utxo_call_count.load(Ordering::Relaxed), 1); + + // If we make a second request with the same message, the call count doesn't increase... + let future_b = UtxoFuture::new(); + *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future_b.clone()); + assert_eq!( + network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err, + "Channel announcement is already being checked"); + assert_eq!(chain_source.get_utxo_call_count.load(Ordering::Relaxed), 1); + + // But if we make a third request with a tweaked message, we should get a second call + // against our new future... + let secp_ctx = Secp256k1::new(); + let replacement_pk_1 = &SecretKey::from_slice(&[99; 32]).unwrap(); + let replacement_pk_2 = &SecretKey::from_slice(&[98; 32]).unwrap(); + let invalid_announcement = get_signed_channel_announcement(|_| {}, replacement_pk_1, replacement_pk_2, &secp_ctx); + assert_eq!( + network_graph.update_channel_from_announcement(&invalid_announcement, &Some(&chain_source)).unwrap_err().err, + "Channel being checked async"); + assert_eq!(chain_source.get_utxo_call_count.load(Ordering::Relaxed), 2); + + // Still, if we resolve the original future, the original channel will be accepted. + future.resolve_without_forwarding(&network_graph, + Ok(TxOut { value: 1_000_000, script_pubkey: good_script })); + assert!(!network_graph.read_only().channels() + .get(&valid_announcement.contents.short_channel_id).unwrap() + .announcement_message.as_ref().unwrap() + .contents.features.supports_unknown_test_feature()); + } + + #[test] + fn test_checks_backpressure() { + // Test that too_many_checks_pending returns true when there are many checks pending, and + // returns false once they complete. + let secp_ctx = Secp256k1::new(); + let (chain_source, network_graph) = get_network(); + + // We cheat and use a single future for all the lookups to complete them all at once. + let future = UtxoFuture::new(); + *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone()); + + let node_1_privkey = &SecretKey::from_slice(&[42; 32]).unwrap(); + let node_2_privkey = &SecretKey::from_slice(&[41; 32]).unwrap(); + + for i in 0..PendingChecks::MAX_PENDING_LOOKUPS { + let valid_announcement = get_signed_channel_announcement( + |msg| msg.short_channel_id += 1 + i as u64, node_1_privkey, node_2_privkey, &secp_ctx); + network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err(); + assert!(!network_graph.pending_checks.too_many_checks_pending()); + } + + let valid_announcement = get_signed_channel_announcement( + |_| {}, node_1_privkey, node_2_privkey, &secp_ctx); + network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err(); + assert!(network_graph.pending_checks.too_many_checks_pending()); + + // Once the future completes the "too many checks" flag should reset. + future.resolve_without_forwarding(&network_graph, Err(UtxoLookupError::UnknownTx)); + assert!(!network_graph.pending_checks.too_many_checks_pending()); + } + + #[test] + fn test_checks_backpressure_drop() { + // Test that too_many_checks_pending returns true when there are many checks pending, and + // returns false if we drop some of the futures without completion. + let secp_ctx = Secp256k1::new(); + let (chain_source, network_graph) = get_network(); + + // We cheat and use a single future for all the lookups to complete them all at once. + *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(UtxoFuture::new()); + + let node_1_privkey = &SecretKey::from_slice(&[42; 32]).unwrap(); + let node_2_privkey = &SecretKey::from_slice(&[41; 32]).unwrap(); + + for i in 0..PendingChecks::MAX_PENDING_LOOKUPS { + let valid_announcement = get_signed_channel_announcement( + |msg| msg.short_channel_id += 1 + i as u64, node_1_privkey, node_2_privkey, &secp_ctx); + network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err(); + assert!(!network_graph.pending_checks.too_many_checks_pending()); + } + + let valid_announcement = get_signed_channel_announcement( + |_| {}, node_1_privkey, node_2_privkey, &secp_ctx); + network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err(); + assert!(network_graph.pending_checks.too_many_checks_pending()); + + // Once the future is drop'd (by resetting the `utxo_ret` value) the "too many checks" flag + // should reset to false. + *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Sync(Err(UtxoLookupError::UnknownTx)); + assert!(!network_graph.pending_checks.too_many_checks_pending()); + } +} From 1f0557522a66cdd46765def9b1df52f28daf9af1 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Thu, 26 Jan 2023 03:04:14 +0000 Subject: [PATCH 16/16] Move the channel_announcement process log into `NetworkGraph` This ensures its always written after we update the graph, no matter how we updated the graph. --- lightning/src/routing/gossip.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/lightning/src/routing/gossip.rs b/lightning/src/routing/gossip.rs index 13c8f09a377..a499532e6a9 100644 --- a/lightning/src/routing/gossip.rs +++ b/lightning/src/routing/gossip.rs @@ -381,7 +381,6 @@ where U::Target: UtxoLookup, L::Target: Logger fn handle_channel_announcement(&self, msg: &msgs::ChannelAnnouncement) -> Result { self.network_graph.update_channel_from_announcement(msg, &self.utxo_lookup)?; - log_gossip!(self.logger, "Added channel_announcement for {}{}", msg.contents.short_channel_id, if !msg.contents.excess_data.is_empty() { " with excess uninterpreted data!" } else { "" }); Ok(msg.contents.excess_data.len() <= MAX_EXCESS_BYTES_FOR_RELAY) } @@ -1559,7 +1558,10 @@ impl NetworkGraph where L::Target: Logger { announcement_received_time, }; - self.add_channel_between_nodes(msg.short_channel_id, chan_info, utxo_value) + self.add_channel_between_nodes(msg.short_channel_id, chan_info, utxo_value)?; + + log_gossip!(self.logger, "Added channel_announcement for {}{}", msg.short_channel_id, if !msg.excess_data.is_empty() { " with excess uninterpreted data!" } else { "" }); + Ok(()) } /// Marks a channel in the graph as failed if a corresponding HTLC fail was sent.