diff --git a/ARCH.md b/ARCH.md index 9efccd9c9dd..bd5fc1bd316 100644 --- a/ARCH.md +++ b/ARCH.md @@ -51,9 +51,9 @@ At a high level, some of the common interfaces fit together as follows: --------------- / (as EventsProvider) ^ | | | PeerManager |- \ | | | --------------- \ | (is-a) | | - | ----------------- \ _---------------- / / - | | chain::Access | \ / | ChainMonitor |--------------- - | ----------------- \ / ---------------- + | -------------- \ _---------------- / / + | | UtxoLookup | \ / | ChainMonitor |--------------- + | -------------- \ / ---------------- | ^ \ / | (as RoutingMessageHandler) | v v \ ----------------- --------- ----------------- diff --git a/fuzz/src/full_stack.rs b/fuzz/src/full_stack.rs index 145b7c8350d..57c665a2ef1 100644 --- a/fuzz/src/full_stack.rs +++ b/fuzz/src/full_stack.rs @@ -41,6 +41,7 @@ use lightning::ln::peer_handler::{MessageHandler,PeerManager,SocketDescriptor,Ig use lightning::ln::msgs::{self, DecodeError}; use lightning::ln::script::ShutdownScript; use lightning::routing::gossip::{P2PGossipSync, NetworkGraph}; +use lightning::routing::utxo::UtxoLookup; use lightning::routing::router::{find_route, InFlightHtlcs, PaymentParameters, Route, RouteHop, RouteParameters, Router}; use lightning::routing::scoring::FixedPenaltyScorer; use lightning::util::config::UserConfig; @@ -183,7 +184,7 @@ impl<'a> std::hash::Hash for Peer<'a> { type ChannelMan<'a> = ChannelManager< Arc, Arc, Arc, Arc, Arc>>, Arc, Arc, Arc, Arc, Arc, &'a FuzzRouter, Arc>; -type PeerMan<'a> = PeerManager, Arc>, Arc>>, Arc, Arc>>, IgnoringMessageHandler, Arc, IgnoringMessageHandler, Arc>; +type PeerMan<'a> = PeerManager, Arc>, Arc>>, Arc, Arc>>, IgnoringMessageHandler, Arc, IgnoringMessageHandler, Arc>; struct MoneyLossDetector<'a> { manager: Arc>, diff --git a/fuzz/src/router.rs b/fuzz/src/router.rs index dbf619f1ddf..a7c50de4a47 100644 --- a/fuzz/src/router.rs +++ b/fuzz/src/router.rs @@ -11,11 +11,11 @@ use bitcoin::blockdata::script::Builder; use bitcoin::blockdata::transaction::TxOut; use bitcoin::hash_types::BlockHash; -use lightning::chain; use lightning::chain::transaction::OutPoint; use lightning::ln::channelmanager::{self, ChannelDetails, ChannelCounterparty}; use lightning::ln::msgs; use lightning::routing::gossip::{NetworkGraph, RoutingFees}; +use lightning::routing::utxo::{UtxoFuture, UtxoLookup, UtxoLookupError, UtxoResult}; use lightning::routing::router::{find_route, PaymentParameters, RouteHint, RouteHintHop, RouteParameters}; use lightning::routing::scoring::FixedPenaltyScorer; use lightning::util::config::UserConfig; @@ -81,17 +81,36 @@ impl InputData { } } -struct FuzzChainSource { +struct FuzzChainSource<'a, 'b, Out: test_logger::Output> { input: Arc, + net_graph: &'a NetworkGraph<&'b test_logger::TestLogger>, } -impl chain::Access for FuzzChainSource { - fn get_utxo(&self, _genesis_hash: &BlockHash, _short_channel_id: u64) -> Result { - match self.input.get_slice(2) { - Some(&[0, _]) => Err(chain::AccessError::UnknownChain), - Some(&[1, _]) => Err(chain::AccessError::UnknownTx), - Some(&[_, x]) => Ok(TxOut { value: 0, script_pubkey: Builder::new().push_int(x as i64).into_script().to_v0_p2wsh() }), - None => Err(chain::AccessError::UnknownTx), - _ => unreachable!(), +impl UtxoLookup for FuzzChainSource<'_, '_, Out> { + fn get_utxo(&self, _genesis_hash: &BlockHash, _short_channel_id: u64) -> UtxoResult { + let input_slice = self.input.get_slice(2); + if input_slice.is_none() { return UtxoResult::Sync(Err(UtxoLookupError::UnknownTx)); } + let input_slice = input_slice.unwrap(); + let txo_res = TxOut { + value: if input_slice[0] % 2 == 0 { 1_000_000 } else { 1_000 }, + script_pubkey: Builder::new().push_int(input_slice[1] as i64).into_script().to_v0_p2wsh(), + }; + match input_slice { + &[0, _] => UtxoResult::Sync(Err(UtxoLookupError::UnknownChain)), + &[1, _] => UtxoResult::Sync(Err(UtxoLookupError::UnknownTx)), + &[2, _] => { + let future = UtxoFuture::new(); + future.resolve_without_forwarding(self.net_graph, Ok(txo_res)); + UtxoResult::Async(future.clone()) + }, + &[3, _] => { + let future = UtxoFuture::new(); + future.resolve_without_forwarding(self.net_graph, Err(UtxoLookupError::UnknownTx)); + UtxoResult::Async(future.clone()) + }, + &[4, _] => { + UtxoResult::Async(UtxoFuture::new()) // the future will never resolve + }, + &[..] => UtxoResult::Sync(Ok(txo_res)), } } } @@ -171,6 +190,10 @@ pub fn do_test(data: &[u8], out: Out) { let our_pubkey = get_pubkey!(); let net_graph = NetworkGraph::new(genesis_block(Network::Bitcoin).header.block_hash(), &logger); + let chain_source = FuzzChainSource { + input: Arc::clone(&input), + net_graph: &net_graph, + }; let mut node_pks = HashSet::new(); let mut scid = 42; @@ -191,13 +214,14 @@ pub fn do_test(data: &[u8], out: Out) { let msg = decode_msg_with_len16!(msgs::UnsignedChannelAnnouncement, 32+8+33*4); node_pks.insert(get_pubkey_from_node_id!(msg.node_id_1)); node_pks.insert(get_pubkey_from_node_id!(msg.node_id_2)); - let _ = net_graph.update_channel_from_unsigned_announcement::<&FuzzChainSource>(&msg, &None); + let _ = net_graph.update_channel_from_unsigned_announcement:: + <&FuzzChainSource<'_, '_, Out>>(&msg, &None); }, 2 => { let msg = decode_msg_with_len16!(msgs::UnsignedChannelAnnouncement, 32+8+33*4); node_pks.insert(get_pubkey_from_node_id!(msg.node_id_1)); node_pks.insert(get_pubkey_from_node_id!(msg.node_id_2)); - let _ = net_graph.update_channel_from_unsigned_announcement(&msg, &Some(&FuzzChainSource { input: Arc::clone(&input) })); + let _ = net_graph.update_channel_from_unsigned_announcement(&msg, &Some(&chain_source)); }, 3 => { let _ = net_graph.update_channel_unsigned(&decode_msg!(msgs::UnsignedChannelUpdate, 72)); diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 087aa174722..9b1df752271 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -30,6 +30,7 @@ use lightning::ln::channelmanager::ChannelManager; use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler}; use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor}; use lightning::routing::gossip::{NetworkGraph, P2PGossipSync}; +use lightning::routing::utxo::UtxoLookup; use lightning::routing::router::Router; use lightning::routing::scoring::{Score, WriteableScore}; use lightning::util::events::{Event, EventHandler, EventsProvider}; @@ -116,13 +117,13 @@ const FIRST_NETWORK_PRUNE_TIMER: u64 = 1; /// Either [`P2PGossipSync`] or [`RapidGossipSync`]. pub enum GossipSync< - P: Deref>, + P: Deref>, R: Deref>, G: Deref>, - A: Deref, + U: Deref, L: Deref, > -where A::Target: chain::Access, L::Target: Logger { +where U::Target: UtxoLookup, L::Target: Logger { /// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7. P2P(P), /// Rapid gossip sync from a trusted server. @@ -132,13 +133,13 @@ where A::Target: chain::Access, L::Target: Logger { } impl< - P: Deref>, + P: Deref>, R: Deref>, G: Deref>, - A: Deref, + U: Deref, L: Deref, -> GossipSync -where A::Target: chain::Access, L::Target: Logger { +> GossipSync +where U::Target: UtxoLookup, L::Target: Logger { fn network_graph(&self) -> Option<&G> { match self { GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()), @@ -163,10 +164,10 @@ where A::Target: chain::Access, L::Target: Logger { } /// (C-not exported) as the bindings concretize everything and have constructors for us -impl>, G: Deref>, A: Deref, L: Deref> - GossipSync, G, A, L> +impl>, G: Deref>, U: Deref, L: Deref> + GossipSync, G, U, L> where - A::Target: chain::Access, + U::Target: UtxoLookup, L::Target: Logger, { /// Initializes a new [`GossipSync::P2P`] variant. @@ -178,10 +179,10 @@ where /// (C-not exported) as the bindings concretize everything and have constructors for us impl<'a, R: Deref>, G: Deref>, L: Deref> GossipSync< - &P2PGossipSync, + &P2PGossipSync, R, G, - &'a (dyn chain::Access + Send + Sync), + &'a (dyn UtxoLookup + Send + Sync), L, > where @@ -196,10 +197,10 @@ where /// (C-not exported) as the bindings concretize everything and have constructors for us impl<'a, L: Deref> GossipSync< - &P2PGossipSync<&'a NetworkGraph, &'a (dyn chain::Access + Send + Sync), L>, + &P2PGossipSync<&'a NetworkGraph, &'a (dyn UtxoLookup + Send + Sync), L>, &RapidGossipSync<&'a NetworkGraph, L>, &'a NetworkGraph, - &'a (dyn chain::Access + Send + Sync), + &'a (dyn UtxoLookup + Send + Sync), L, > where @@ -397,7 +398,7 @@ macro_rules! define_run_body { #[cfg(feature = "futures")] pub async fn process_events_async< 'a, - CA: 'static + Deref + Send + Sync, + UL: 'static + Deref + Send + Sync, CF: 'static + Deref + Send + Sync, CW: 'static + Deref + Send + Sync, T: 'static + Deref + Send + Sync, @@ -418,7 +419,7 @@ pub async fn process_events_async< PS: 'static + Deref + Send, M: 'static + Deref::Signer, CF, T, F, L, P>> + Send + Sync, CM: 'static + Deref> + Send + Sync, - PGS: 'static + Deref> + Send + Sync, + PGS: 'static + Deref> + Send + Sync, RGS: 'static + Deref> + Send, UMH: 'static + Deref + Send + Sync, PM: 'static + Deref> + Send + Sync, @@ -428,11 +429,11 @@ pub async fn process_events_async< Sleeper: Fn(Duration) -> SleepFuture >( persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM, - gossip_sync: GossipSync, peer_manager: PM, logger: L, scorer: Option, + gossip_sync: GossipSync, peer_manager: PM, logger: L, scorer: Option, sleeper: Sleeper, ) -> Result<(), io::Error> where - CA::Target: 'static + chain::Access, + UL::Target: 'static + UtxoLookup, CF::Target: 'static + chain::Filter, CW::Target: 'static + chain::Watch<::Signer>, T::Target: 'static + BroadcasterInterface, @@ -531,7 +532,7 @@ impl BackgroundProcessor { /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable pub fn start< 'a, - CA: 'static + Deref + Send + Sync, + UL: 'static + Deref + Send + Sync, CF: 'static + Deref + Send + Sync, CW: 'static + Deref + Send + Sync, T: 'static + Deref + Send + Sync, @@ -551,7 +552,7 @@ impl BackgroundProcessor { PS: 'static + Deref + Send, M: 'static + Deref::Signer, CF, T, F, L, P>> + Send + Sync, CM: 'static + Deref> + Send + Sync, - PGS: 'static + Deref> + Send + Sync, + PGS: 'static + Deref> + Send + Sync, RGS: 'static + Deref> + Send, UMH: 'static + Deref + Send + Sync, PM: 'static + Deref> + Send + Sync, @@ -559,10 +560,10 @@ impl BackgroundProcessor { SC: for <'b> WriteableScore<'b>, >( persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM, - gossip_sync: GossipSync, peer_manager: PM, logger: L, scorer: Option, + gossip_sync: GossipSync, peer_manager: PM, logger: L, scorer: Option, ) -> Self where - CA::Target: 'static + chain::Access, + UL::Target: 'static + UtxoLookup, CF::Target: 'static + chain::Filter, CW::Target: 'static + chain::Watch<::Signer>, T::Target: 'static + BroadcasterInterface, diff --git a/lightning-net-tokio/src/lib.rs b/lightning-net-tokio/src/lib.rs index 16680faaa58..b259f77eff8 100644 --- a/lightning-net-tokio/src/lib.rs +++ b/lightning-net-tokio/src/lib.rs @@ -33,12 +33,12 @@ //! type FeeEstimator = dyn lightning::chain::chaininterface::FeeEstimator + Send + Sync; //! type Logger = dyn lightning::util::logger::Logger + Send + Sync; //! type NodeSigner = dyn lightning::chain::keysinterface::NodeSigner + Send + Sync; -//! type ChainAccess = dyn lightning::chain::Access + Send + Sync; +//! type UtxoLookup = dyn lightning::routing::utxo::UtxoLookup + Send + Sync; //! type ChainFilter = dyn lightning::chain::Filter + Send + Sync; //! type DataPersister = dyn lightning::chain::chainmonitor::Persist + Send + Sync; //! type ChainMonitor = lightning::chain::chainmonitor::ChainMonitor, Arc, Arc, Arc, Arc>; //! type ChannelManager = Arc>; -//! type PeerManager = Arc>; +//! type PeerManager = Arc>; //! //! // Connect to node with pubkey their_node_id at addr: //! async fn connect_to_node(peer_manager: PeerManager, chain_monitor: Arc, channel_manager: ChannelManager, their_node_id: PublicKey, addr: SocketAddr) { @@ -176,8 +176,9 @@ impl Connection { let (event_waker, event_receiver) = mpsc::channel(1); tokio::spawn(Self::poll_event_process(peer_manager.clone(), event_receiver)); - // 8KB is nice and big but also should never cause any issues with stack overflowing. - let mut buf = [0; 8192]; + // 4KiB is nice and big without handling too many messages all at once, giving other peers + // a chance to do some work. + let mut buf = [0; 4096]; let mut our_descriptor = SocketDescriptor::new(us.clone()); // An enum describing why we did/are disconnecting: @@ -623,6 +624,7 @@ mod tests { fn handle_query_short_channel_ids(&self, _their_node_id: &PublicKey, _msg: QueryShortChannelIds) -> Result<(), LightningError> { Ok(()) } fn provided_node_features(&self) -> NodeFeatures { NodeFeatures::empty() } fn provided_init_features(&self, _their_node_id: &PublicKey) -> InitFeatures { InitFeatures::empty() } + fn processing_queue_high(&self) -> bool { false } } impl ChannelMessageHandler for MsgHandler { fn handle_open_channel(&self, _their_node_id: &PublicKey, _msg: &OpenChannel) {} diff --git a/lightning/src/chain/mod.rs b/lightning/src/chain/mod.rs index 0370c0840f9..cd11bc337ef 100644 --- a/lightning/src/chain/mod.rs +++ b/lightning/src/chain/mod.rs @@ -12,7 +12,6 @@ use bitcoin::blockdata::block::{Block, BlockHeader}; use bitcoin::blockdata::constants::genesis_block; use bitcoin::blockdata::script::Script; -use bitcoin::blockdata::transaction::TxOut; use bitcoin::hash_types::{BlockHash, Txid}; use bitcoin::network::constants::Network; use bitcoin::secp256k1::PublicKey; @@ -60,26 +59,6 @@ impl BestBlock { pub fn height(&self) -> u32 { self.height } } -/// An error when accessing the chain via [`Access`]. -#[derive(Clone, Debug)] -pub enum AccessError { - /// The requested chain is unknown. - UnknownChain, - - /// The requested transaction doesn't exist or hasn't confirmed. - UnknownTx, -} - -/// The `Access` trait defines behavior for accessing chain data and state, such as blocks and -/// UTXOs. -pub trait Access { - /// Returns the transaction output of a funding transaction encoded by [`short_channel_id`]. - /// Returns an error if `genesis_hash` is for a different chain or if such a transaction output - /// is unknown. - /// - /// [`short_channel_id`]: https://github.com/lightning/bolts/blob/master/07-routing-gossip.md#definition-of-short_channel_id - fn get_utxo(&self, genesis_hash: &BlockHash, short_channel_id: u64) -> Result; -} /// The `Listen` trait is used to notify when blocks have been connected or disconnected from the /// chain. diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 59d8e387529..513dc897e41 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -5044,7 +5044,7 @@ where ), chan), // Note that announcement_signatures fails if the channel cannot be announced, // so get_channel_update_for_broadcast will never fail by the time we get here. - update_msg: self.get_channel_update_for_broadcast(chan.get()).unwrap(), + update_msg: Some(self.get_channel_update_for_broadcast(chan.get()).unwrap()), }); }, hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) @@ -5970,7 +5970,7 @@ where msg: announcement, // Note that announcement_signatures fails if the channel cannot be announced, // so get_channel_update_for_broadcast will never fail by the time we get here. - update_msg: self.get_channel_update_for_broadcast(channel).unwrap(), + update_msg: Some(self.get_channel_update_for_broadcast(channel).unwrap()), }); } } @@ -6286,6 +6286,7 @@ where &events::MessageSendEvent::SendChannelAnnouncement { .. } => false, &events::MessageSendEvent::BroadcastChannelAnnouncement { .. } => true, &events::MessageSendEvent::BroadcastChannelUpdate { .. } => true, + &events::MessageSendEvent::BroadcastNodeAnnouncement { .. } => true, &events::MessageSendEvent::SendChannelUpdate { .. } => false, &events::MessageSendEvent::HandleError { .. } => false, &events::MessageSendEvent::SendChannelRangeQuery { .. } => false, diff --git a/lightning/src/ln/functional_test_utils.rs b/lightning/src/ln/functional_test_utils.rs index c2ce158f897..7bf51df5c7a 100644 --- a/lightning/src/ln/functional_test_utils.rs +++ b/lightning/src/ln/functional_test_utils.rs @@ -621,6 +621,9 @@ pub fn remove_first_msg_event_to_node(msg_node_id: &PublicKey, msg_events: &Vec< MessageSendEvent::BroadcastChannelUpdate { .. } => { false }, + MessageSendEvent::BroadcastNodeAnnouncement { .. } => { + false + }, MessageSendEvent::SendChannelUpdate { node_id, .. } => { node_id == msg_node_id }, @@ -1010,7 +1013,7 @@ pub fn create_chan_between_nodes_with_value_b<'a, 'b, 'c>(node_a: &Node<'a, 'b, assert_eq!(events_7.len(), 1); let (announcement, bs_update) = match events_7[0] { MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } => { - (msg, update_msg) + (msg, update_msg.clone().unwrap()) }, _ => panic!("Unexpected event"), }; @@ -1021,6 +1024,7 @@ pub fn create_chan_between_nodes_with_value_b<'a, 'b, 'c>(node_a: &Node<'a, 'b, let as_update = match events_8[0] { MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } => { assert!(*announcement == *msg); + let update_msg = update_msg.clone().unwrap(); assert_eq!(update_msg.contents.short_channel_id, announcement.contents.short_channel_id); assert_eq!(update_msg.contents.short_channel_id, bs_update.contents.short_channel_id); update_msg @@ -1031,7 +1035,7 @@ pub fn create_chan_between_nodes_with_value_b<'a, 'b, 'c>(node_a: &Node<'a, 'b, *node_a.network_chan_count.borrow_mut() += 1; expect_channel_ready_event(&node_b, &node_a.node.get_our_node_id()); - ((*announcement).clone(), (*as_update).clone(), (*bs_update).clone()) + ((*announcement).clone(), as_update, bs_update) } pub fn create_announced_chan_between_nodes<'a, 'b, 'c, 'd>(nodes: &'a Vec>, a: usize, b: usize) -> (msgs::ChannelUpdate, msgs::ChannelUpdate, [u8; 32], Transaction) { diff --git a/lightning/src/ln/msgs.rs b/lightning/src/ln/msgs.rs index c971df906c2..e8b40c71d89 100644 --- a/lightning/src/ln/msgs.rs +++ b/lightning/src/ln/msgs.rs @@ -1082,6 +1082,13 @@ pub trait RoutingMessageHandler : MessageSendEventsProvider { /// list of `short_channel_id`s. fn handle_query_short_channel_ids(&self, their_node_id: &PublicKey, msg: QueryShortChannelIds) -> Result<(), LightningError>; + // Handler queueing status: + /// Indicates that there are a large number of [`ChannelAnnouncement`] (or other) messages + /// pending some async action. While there is no guarantee of the rate of future messages, the + /// caller should seek to reduce the rate of new gossip messages handled, especially + /// [`ChannelAnnouncement`]s. + fn processing_queue_high(&self) -> bool; + // Handler information: /// Gets the node feature flags which this handler itself supports. All available handlers are /// queried similarly and their feature flags are OR'd together to form the [`NodeFeatures`] diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index 3e05c8ef300..f2406172655 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -81,6 +81,7 @@ impl RoutingMessageHandler for IgnoringMessageHandler { fn provided_init_features(&self, _their_node_id: &PublicKey) -> InitFeatures { InitFeatures::empty() } + fn processing_queue_high(&self) -> bool { false } } impl OnionMessageProvider for IgnoringMessageHandler { fn next_onion_message_for_peer(&self, _peer_node_id: PublicKey) -> Option { None } @@ -412,6 +413,12 @@ struct Peer { awaiting_pong_timer_tick_intervals: i8, received_message_since_timer_tick: bool, sent_gossip_timestamp_filter: bool, + + /// Indicates we've received a `channel_announcement` since the last time we had + /// [`PeerManager::gossip_processing_backlogged`] set (or, really, that we've received a + /// `channel_announcement` at all - we set this unconditionally but unset it every time we + /// check if we're gossip-processing-backlogged). + received_channel_announce_since_backlogged: bool, } impl Peer { @@ -448,8 +455,12 @@ impl Peer { /// Returns whether we should be reading bytes from this peer, based on whether its outbound /// buffer still has space and we don't need to pause reads to get some writes out. - fn should_read(&self) -> bool { - self.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE + fn should_read(&mut self, gossip_processing_backlogged: bool) -> bool { + if !gossip_processing_backlogged { + self.received_channel_announce_since_backlogged = false; + } + self.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE && + (!gossip_processing_backlogged || !self.received_channel_announce_since_backlogged) } /// Determines if we should push additional gossip background sync (aka "backfill") onto a peer's @@ -562,6 +573,9 @@ pub struct PeerManager bool { + peer.should_read(self.gossip_processing_backlogged.load(Ordering::Relaxed)) + } + + fn update_gossip_backlogged(&self) { + let new_state = self.message_handler.route_handler.processing_queue_high(); + let prev_state = self.gossip_processing_backlogged.swap(new_state, Ordering::Relaxed); + if prev_state && !new_state { + self.gossip_processing_backlog_lifted.store(true, Ordering::Relaxed); + } + } + + fn do_attempt_write_data(&self, descriptor: &mut Descriptor, peer: &mut Peer, force_one_write: bool) { + let mut have_written = false; while !peer.awaiting_write_event { if peer.should_buffer_onion_message() { if let Some(peer_node_id) = peer.their_node_id { @@ -903,13 +936,23 @@ impl return, + None => { + if force_one_write && !have_written { + if should_read { + let data_sent = descriptor.send_data(&[], should_read); + debug_assert_eq!(data_sent, 0, "Can't write more than no data"); + } + } + return + }, Some(buff) => buff, }; let pending = &next_buff[peer.pending_outbound_buffer_first_msg_offset..]; - let data_sent = descriptor.send_data(pending, peer.should_read()); + let data_sent = descriptor.send_data(pending, should_read); + have_written = true; peer.pending_outbound_buffer_first_msg_offset += data_sent; if peer.pending_outbound_buffer_first_msg_offset == next_buff.len() { peer.pending_outbound_buffer_first_msg_offset = 0; @@ -944,7 +987,7 @@ impl { let mut peer = peer_mutex.lock().unwrap(); peer.awaiting_write_event = false; - self.do_attempt_write_data(descriptor, &mut peer); + self.do_attempt_write_data(descriptor, &mut peer, false); } }; Ok(()) @@ -962,6 +1005,9 @@ impl Result { @@ -1191,7 +1237,7 @@ impl MessageHandlingError { e.into() })? { should_forward = Some(wire::Message::ChannelAnnouncement(msg)); } + self.update_gossip_backlogged(); }, wire::Message::NodeAnnouncement(msg) => { if self.message_handler.route_handler.handle_node_announcement(&msg) .map_err(|e| -> MessageHandlingError { e.into() })? { should_forward = Some(wire::Message::NodeAnnouncement(msg)); } + self.update_gossip_backlogged(); }, wire::Message::ChannelUpdate(msg) => { self.message_handler.chan_handler.handle_channel_update(&their_node_id, &msg); @@ -1416,6 +1468,7 @@ impl MessageHandlingError { e.into() })? { should_forward = Some(wire::Message::ChannelUpdate(msg)); } + self.update_gossip_backlogged(); }, wire::Message::QueryShortChannelIds(msg) => { self.message_handler.route_handler.handle_query_short_channel_ids(&their_node_id, msg)?; @@ -1567,6 +1620,9 @@ impl {}, } - match self.message_handler.route_handler.handle_channel_update(&update_msg) { - Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) => - self.forward_broadcast_msg(peers, &wire::Message::ChannelUpdate(update_msg), None), - _ => {}, + if let Some(msg) = update_msg { + match self.message_handler.route_handler.handle_channel_update(&msg) { + Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) => + self.forward_broadcast_msg(peers, &wire::Message::ChannelUpdate(msg), None), + _ => {}, + } } }, MessageSendEvent::BroadcastChannelUpdate { msg } => { @@ -1725,6 +1783,14 @@ impl {}, } }, + MessageSendEvent::BroadcastNodeAnnouncement { msg } => { + log_debug!(self.logger, "Handling BroadcastNodeAnnouncement event in peer_handler for node {}", msg.contents.node_id); + match self.message_handler.route_handler.handle_node_announcement(&msg) { + Ok(_) | Err(LightningError { action: msgs::ErrorAction::IgnoreDuplicateGossip, .. }) => + self.forward_broadcast_msg(peers, &wire::Message::NodeAnnouncement(msg), None), + _ => {}, + } + }, MessageSendEvent::SendChannelUpdate { ref node_id, ref msg } => { log_trace!(self.logger, "Handling SendChannelUpdate event in peer_handler for node {} for channel {}", log_pubkey!(node_id), msg.contents.short_channel_id); @@ -1786,7 +1852,9 @@ impl 0 && !peer.received_message_since_timer_tick) + || peer.awaiting_pong_timer_tick_intervals as u64 > + MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER as u64 * peers_lock.len() as u64 + { + descriptors_needing_disconnect.push(descriptor.clone()); + break; + } peer.received_message_since_timer_tick = false; - continue; - } - if (peer.awaiting_pong_timer_tick_intervals > 0 && !peer.received_message_since_timer_tick) - || peer.awaiting_pong_timer_tick_intervals as u64 > - MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER as u64 * peers_lock.len() as u64 - { - descriptors_needing_disconnect.push(descriptor.clone()); - continue; - } - peer.received_message_since_timer_tick = false; + if peer.awaiting_pong_timer_tick_intervals > 0 { + peer.awaiting_pong_timer_tick_intervals += 1; + break; + } - if peer.awaiting_pong_timer_tick_intervals > 0 { - peer.awaiting_pong_timer_tick_intervals += 1; - continue; + peer.awaiting_pong_timer_tick_intervals = 1; + let ping = msgs::Ping { + ponglen: 0, + byteslen: 64, + }; + self.enqueue_message(&mut *peer, &ping); + break; } - - peer.awaiting_pong_timer_tick_intervals = 1; - let ping = msgs::Ping { - ponglen: 0, - byteslen: 64, - }; - self.enqueue_message(&mut *peer, &ping); - self.do_attempt_write_data(&mut (descriptor.clone()), &mut *peer); + self.do_attempt_write_data(&mut (descriptor.clone()), &mut *peer, flush_read_disabled); } } diff --git a/lightning/src/ln/priv_short_conf_tests.rs b/lightning/src/ln/priv_short_conf_tests.rs index a6fd7c1dc09..adc2b59abbf 100644 --- a/lightning/src/ln/priv_short_conf_tests.rs +++ b/lightning/src/ln/priv_short_conf_tests.rs @@ -184,14 +184,14 @@ fn do_test_1_conf_open(connect_style: ConnectStyle) { msg.clone() } else { panic!("Unexpected event"); }; let (bs_announcement, bs_update) = if let MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } = bs_announce_events[1] { - (msg.clone(), update_msg.clone()) + (msg.clone(), update_msg.clone().unwrap()) } else { panic!("Unexpected event"); }; nodes[0].node.handle_announcement_signatures(&nodes[1].node.get_our_node_id(), &bs_announcement_sigs); let as_announce_events = nodes[0].node.get_and_clear_pending_msg_events(); assert_eq!(as_announce_events.len(), 1); let (announcement, as_update) = if let MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } = as_announce_events[0] { - (msg.clone(), update_msg.clone()) + (msg.clone(), update_msg.clone().unwrap()) } else { panic!("Unexpected event"); }; assert_eq!(announcement, bs_announcement); @@ -757,7 +757,7 @@ fn test_public_0conf_channel() { match bs_announcement[0] { MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } => { announcement = msg.clone(); - bs_update = update_msg.clone(); + bs_update = update_msg.clone().unwrap(); }, _ => panic!("Unexpected event"), }; @@ -767,6 +767,7 @@ fn test_public_0conf_channel() { match as_announcement[0] { MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } => { assert!(announcement == *msg); + let update_msg = update_msg.as_ref().unwrap(); assert_eq!(update_msg.contents.short_channel_id, scid); assert_eq!(update_msg.contents.short_channel_id, announcement.contents.short_channel_id); assert_eq!(update_msg.contents.short_channel_id, bs_update.contents.short_channel_id); diff --git a/lightning/src/ln/reload_tests.rs b/lightning/src/ln/reload_tests.rs index 2ca3f21fe94..427c4001b6d 100644 --- a/lightning/src/ln/reload_tests.rs +++ b/lightning/src/ln/reload_tests.rs @@ -140,7 +140,7 @@ fn test_funding_peer_disconnect() { assert_eq!(events_7.len(), 1); let (chan_announcement, as_update) = match events_7[0] { MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } => { - (msg.clone(), update_msg.clone()) + (msg.clone(), update_msg.clone().unwrap()) }, _ => panic!("Unexpected event {:?}", events_7[0]), }; @@ -153,7 +153,7 @@ fn test_funding_peer_disconnect() { let bs_update = match events_8[0] { MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } => { assert_eq!(*msg, chan_announcement); - update_msg.clone() + update_msg.clone().unwrap() }, _ => panic!("Unexpected event {:?}", events_8[0]), }; diff --git a/lightning/src/routing/gossip.rs b/lightning/src/routing/gossip.rs index a3331ab3beb..a499532e6a9 100644 --- a/lightning/src/routing/gossip.rs +++ b/lightning/src/routing/gossip.rs @@ -7,7 +7,7 @@ // You may not use this file except in accordance with one or both of these // licenses. -//! The top-level network map tracking logic lives here. +//! The [`NetworkGraph`] stores the network gossip and [`P2PGossipSync`] fetches it from peers use bitcoin::secp256k1::constants::PUBLIC_KEY_SIZE; use bitcoin::secp256k1::PublicKey; @@ -16,17 +16,14 @@ use bitcoin::secp256k1; use bitcoin::hashes::sha256d::Hash as Sha256dHash; use bitcoin::hashes::Hash; -use bitcoin::blockdata::transaction::TxOut; use bitcoin::hash_types::BlockHash; -use crate::chain; -use crate::chain::Access; -use crate::ln::chan_utils::make_funding_redeemscript_from_slices; use crate::ln::features::{ChannelFeatures, NodeFeatures, InitFeatures}; use crate::ln::msgs::{DecodeError, ErrorAction, Init, LightningError, RoutingMessageHandler, NetAddress, MAX_VALUE_MSAT}; use crate::ln::msgs::{ChannelAnnouncement, ChannelUpdate, NodeAnnouncement, GossipTimestampFilter}; use crate::ln::msgs::{QueryChannelRange, ReplyChannelRange, QueryShortChannelIds, ReplyShortChannelIdsEnd}; use crate::ln::msgs; +use crate::routing::utxo::{self, UtxoLookup}; use crate::util::ser::{Readable, ReadableArgs, Writeable, Writer, MaybeReadable}; use crate::util::logger::{Logger, Level}; use crate::util::events::{MessageSendEvent, MessageSendEventsProvider}; @@ -43,7 +40,6 @@ use crate::sync::{RwLock, RwLockReadGuard}; use core::sync::atomic::{AtomicUsize, Ordering}; use crate::sync::Mutex; use core::ops::{Bound, Deref}; -use bitcoin::hashes::hex::ToHex; #[cfg(feature = "std")] use std::time::{SystemTime, UNIX_EPOCH}; @@ -159,6 +155,8 @@ pub struct NetworkGraph where L::Target: Logger { /// resync them from gossip. Each `NodeId` is mapped to the time (in seconds) it was removed so /// that once some time passes, we can potentially resync it from gossip again. removed_nodes: Mutex>>, + /// Announcement messages which are awaiting an on-chain lookup to be processed. + pub(super) pending_checks: utxo::PendingChecks, } /// A read-only view of [`NetworkGraph`]. @@ -218,31 +216,30 @@ impl_writeable_tlv_based_enum_upgradable!(NetworkUpdate, /// This network graph is then used for routing payments. /// Provides interface to help with initial routing sync by /// serving historical announcements. -pub struct P2PGossipSync>, C: Deref, L: Deref> -where C::Target: chain::Access, L::Target: Logger +pub struct P2PGossipSync>, U: Deref, L: Deref> +where U::Target: UtxoLookup, L::Target: Logger { network_graph: G, - chain_access: Option, + utxo_lookup: Option, #[cfg(feature = "std")] full_syncs_requested: AtomicUsize, pending_events: Mutex>, logger: L, } -impl>, C: Deref, L: Deref> P2PGossipSync -where C::Target: chain::Access, L::Target: Logger +impl>, U: Deref, L: Deref> P2PGossipSync +where U::Target: UtxoLookup, L::Target: Logger { /// Creates a new tracker of the actual state of the network of channels and nodes, /// assuming an existing Network Graph. - /// Chain monitor is used to make sure announced channels exist on-chain, - /// channel data is correct, and that the announcement is signed with - /// channel owners' keys. - pub fn new(network_graph: G, chain_access: Option, logger: L) -> Self { + /// UTXO lookup is used to make sure announced channels exist on-chain, channel data is + /// correct, and the announcement is signed with channel owners' keys. + pub fn new(network_graph: G, utxo_lookup: Option, logger: L) -> Self { P2PGossipSync { network_graph, #[cfg(feature = "std")] full_syncs_requested: AtomicUsize::new(0), - chain_access, + utxo_lookup, pending_events: Mutex::new(vec![]), logger, } @@ -251,8 +248,8 @@ where C::Target: chain::Access, L::Target: Logger /// Adds a provider used to check new announcements. Does not affect /// existing announcements unless they are updated. /// Add, update or remove the provider would replace the current one. - pub fn add_chain_access(&mut self, chain_access: Option) { - self.chain_access = chain_access; + pub fn add_utxo_lookup(&mut self, utxo_lookup: Option) { + self.utxo_lookup = utxo_lookup; } /// Gets a reference to the underlying [`NetworkGraph`] which was provided in @@ -275,6 +272,36 @@ where C::Target: chain::Access, L::Target: Logger false } } + + /// Used to broadcast forward gossip messages which were validated async. + /// + /// Note that this will ignore events other than `Broadcast*` or messages with too much excess + /// data. + pub(super) fn forward_gossip_msg(&self, mut ev: MessageSendEvent) { + match &mut ev { + MessageSendEvent::BroadcastChannelAnnouncement { msg, ref mut update_msg } => { + if msg.contents.excess_data.len() > MAX_EXCESS_BYTES_FOR_RELAY { return; } + if update_msg.as_ref() + .map(|msg| msg.contents.excess_data.len()).unwrap_or(0) > MAX_EXCESS_BYTES_FOR_RELAY + { + *update_msg = None; + } + }, + MessageSendEvent::BroadcastChannelUpdate { msg } => { + if msg.contents.excess_data.len() > MAX_EXCESS_BYTES_FOR_RELAY { return; } + }, + MessageSendEvent::BroadcastNodeAnnouncement { msg } => { + if msg.contents.excess_data.len() > MAX_EXCESS_BYTES_FOR_RELAY || + msg.contents.excess_address_data.len() > MAX_EXCESS_BYTES_FOR_RELAY || + msg.contents.excess_data.len() + msg.contents.excess_address_data.len() > MAX_EXCESS_BYTES_FOR_RELAY + { + return; + } + }, + _ => return, + } + self.pending_events.lock().unwrap().push(ev); + } } impl NetworkGraph where L::Target: Logger { @@ -342,8 +369,8 @@ macro_rules! get_pubkey_from_node_id { } } -impl>, C: Deref, L: Deref> RoutingMessageHandler for P2PGossipSync -where C::Target: chain::Access, L::Target: Logger +impl>, U: Deref, L: Deref> RoutingMessageHandler for P2PGossipSync +where U::Target: UtxoLookup, L::Target: Logger { fn handle_node_announcement(&self, msg: &msgs::NodeAnnouncement) -> Result { self.network_graph.update_node_from_announcement(msg)?; @@ -353,8 +380,7 @@ where C::Target: chain::Access, L::Target: Logger } fn handle_channel_announcement(&self, msg: &msgs::ChannelAnnouncement) -> Result { - self.network_graph.update_channel_from_announcement(msg, &self.chain_access)?; - log_gossip!(self.logger, "Added channel_announcement for {}{}", msg.contents.short_channel_id, if !msg.contents.excess_data.is_empty() { " with excess uninterpreted data!" } else { "" }); + self.network_graph.update_channel_from_announcement(msg, &self.utxo_lookup)?; Ok(msg.contents.excess_data.len() <= MAX_EXCESS_BYTES_FOR_RELAY) } @@ -630,11 +656,15 @@ where C::Target: chain::Access, L::Target: Logger features.set_gossip_queries_optional(); features } + + fn processing_queue_high(&self) -> bool { + self.network_graph.pending_checks.too_many_checks_pending() + } } -impl>, C: Deref, L: Deref> MessageSendEventsProvider for P2PGossipSync +impl>, U: Deref, L: Deref> MessageSendEventsProvider for P2PGossipSync where - C::Target: chain::Access, + U::Target: UtxoLookup, L::Target: Logger, { fn get_and_clear_pending_msg_events(&self) -> Vec { @@ -1205,6 +1235,7 @@ impl ReadableArgs for NetworkGraph where L::Target: Logger { last_rapid_gossip_sync_timestamp: Mutex::new(last_rapid_gossip_sync_timestamp), removed_nodes: Mutex::new(HashMap::new()), removed_channels: Mutex::new(HashMap::new()), + pending_checks: utxo::PendingChecks::new(), }) } } @@ -1244,6 +1275,7 @@ impl NetworkGraph where L::Target: Logger { last_rapid_gossip_sync_timestamp: Mutex::new(None), removed_channels: Mutex::new(HashMap::new()), removed_nodes: Mutex::new(HashMap::new()), + pending_checks: utxo::PendingChecks::new(), } } @@ -1299,8 +1331,13 @@ impl NetworkGraph where L::Target: Logger { } fn update_node_from_announcement_intern(&self, msg: &msgs::UnsignedNodeAnnouncement, full_msg: Option<&msgs::NodeAnnouncement>) -> Result<(), LightningError> { - match self.nodes.write().unwrap().get_mut(&msg.node_id) { - None => Err(LightningError{err: "No existing channels for node_announcement".to_owned(), action: ErrorAction::IgnoreError}), + let mut nodes = self.nodes.write().unwrap(); + match nodes.get_mut(&msg.node_id) { + None => { + core::mem::drop(nodes); + self.pending_checks.check_hold_pending_node_announcement(msg, full_msg)?; + Err(LightningError{err: "No existing channels for node_announcement".to_owned(), action: ErrorAction::IgnoreError}) + }, Some(node) => { if let Some(node_info) = node.announcement_info.as_ref() { // The timestamp field is somewhat of a misnomer - the BOLTs use it to order @@ -1337,35 +1374,35 @@ impl NetworkGraph where L::Target: Logger { /// RoutingMessageHandler implementation to call it indirectly. This may be useful to accept /// routing messages from a source using a protocol other than the lightning P2P protocol. /// - /// If a `chain::Access` object is provided via `chain_access`, it will be called to verify + /// If a [`UtxoLookup`] object is provided via `utxo_lookup`, it will be called to verify /// the corresponding UTXO exists on chain and is correctly-formatted. - pub fn update_channel_from_announcement( - &self, msg: &msgs::ChannelAnnouncement, chain_access: &Option, + pub fn update_channel_from_announcement( + &self, msg: &msgs::ChannelAnnouncement, utxo_lookup: &Option, ) -> Result<(), LightningError> where - C::Target: chain::Access, + U::Target: UtxoLookup, { let msg_hash = hash_to_message!(&Sha256dHash::hash(&msg.contents.encode()[..])[..]); secp_verify_sig!(self.secp_ctx, &msg_hash, &msg.node_signature_1, &get_pubkey_from_node_id!(msg.contents.node_id_1, "channel_announcement"), "channel_announcement"); secp_verify_sig!(self.secp_ctx, &msg_hash, &msg.node_signature_2, &get_pubkey_from_node_id!(msg.contents.node_id_2, "channel_announcement"), "channel_announcement"); secp_verify_sig!(self.secp_ctx, &msg_hash, &msg.bitcoin_signature_1, &get_pubkey_from_node_id!(msg.contents.bitcoin_key_1, "channel_announcement"), "channel_announcement"); secp_verify_sig!(self.secp_ctx, &msg_hash, &msg.bitcoin_signature_2, &get_pubkey_from_node_id!(msg.contents.bitcoin_key_2, "channel_announcement"), "channel_announcement"); - self.update_channel_from_unsigned_announcement_intern(&msg.contents, Some(msg), chain_access) + self.update_channel_from_unsigned_announcement_intern(&msg.contents, Some(msg), utxo_lookup) } /// Store or update channel info from a channel announcement without verifying the associated /// signatures. Because we aren't given the associated signatures here we cannot relay the /// channel announcement to any of our peers. /// - /// If a `chain::Access` object is provided via `chain_access`, it will be called to verify + /// If a [`UtxoLookup`] object is provided via `utxo_lookup`, it will be called to verify /// the corresponding UTXO exists on chain and is correctly-formatted. - pub fn update_channel_from_unsigned_announcement( - &self, msg: &msgs::UnsignedChannelAnnouncement, chain_access: &Option + pub fn update_channel_from_unsigned_announcement( + &self, msg: &msgs::UnsignedChannelAnnouncement, utxo_lookup: &Option ) -> Result<(), LightningError> where - C::Target: chain::Access, + U::Target: UtxoLookup, { - self.update_channel_from_unsigned_announcement_intern(msg, None, chain_access) + self.update_channel_from_unsigned_announcement_intern(msg, None, utxo_lookup) } /// Update channel from partial announcement data received via rapid gossip sync @@ -1444,11 +1481,11 @@ impl NetworkGraph where L::Target: Logger { Ok(()) } - fn update_channel_from_unsigned_announcement_intern( - &self, msg: &msgs::UnsignedChannelAnnouncement, full_msg: Option<&msgs::ChannelAnnouncement>, chain_access: &Option + fn update_channel_from_unsigned_announcement_intern( + &self, msg: &msgs::UnsignedChannelAnnouncement, full_msg: Option<&msgs::ChannelAnnouncement>, utxo_lookup: &Option ) -> Result<(), LightningError> where - C::Target: chain::Access, + U::Target: UtxoLookup, { if msg.node_id_1 == msg.node_id_2 || msg.bitcoin_key_1 == msg.bitcoin_key_2 { return Err(LightningError{err: "Channel announcement node had a channel with itself".to_owned(), action: ErrorAction::IgnoreError}); @@ -1476,7 +1513,7 @@ impl NetworkGraph where L::Target: Logger { action: ErrorAction::IgnoreDuplicateGossip }); } - } else if chain_access.is_none() { + } else if utxo_lookup.is_none() { // Similarly, if we can't check the chain right now anyway, ignore the // duplicate announcement without bothering to take the channels write lock. return Err(LightningError { @@ -1499,32 +1536,8 @@ impl NetworkGraph where L::Target: Logger { } } - let utxo_value = match &chain_access { - &None => { - // Tentatively accept, potentially exposing us to DoS attacks - None - }, - &Some(ref chain_access) => { - match chain_access.get_utxo(&msg.chain_hash, msg.short_channel_id) { - Ok(TxOut { value, script_pubkey }) => { - let expected_script = - make_funding_redeemscript_from_slices(msg.bitcoin_key_1.as_slice(), msg.bitcoin_key_2.as_slice()).to_v0_p2wsh(); - if script_pubkey != expected_script { - return Err(LightningError{err: format!("Channel announcement key ({}) didn't match on-chain script ({})", expected_script.to_hex(), script_pubkey.to_hex()), action: ErrorAction::IgnoreError}); - } - //TODO: Check if value is worth storing, use it to inform routing, and compare it - //to the new HTLC max field in channel_update - Some(value) - }, - Err(chain::AccessError::UnknownChain) => { - return Err(LightningError{err: format!("Channel announced on an unknown chain ({})", msg.chain_hash.encode().to_hex()), action: ErrorAction::IgnoreError}); - }, - Err(chain::AccessError::UnknownTx) => { - return Err(LightningError{err: "Channel announced without corresponding UTXO entry".to_owned(), action: ErrorAction::IgnoreError}); - }, - } - }, - }; + let utxo_value = self.pending_checks.check_channel_announcement( + utxo_lookup, msg, full_msg)?; #[allow(unused_mut, unused_assignments)] let mut announcement_received_time = 0; @@ -1545,7 +1558,10 @@ impl NetworkGraph where L::Target: Logger { announcement_received_time, }; - self.add_channel_between_nodes(msg.short_channel_id, chan_info, utxo_value) + self.add_channel_between_nodes(msg.short_channel_id, chan_info, utxo_value)?; + + log_gossip!(self.logger, "Added channel_announcement for {}{}", msg.short_channel_id, if !msg.excess_data.is_empty() { " with excess uninterpreted data!" } else { "" }); + Ok(()) } /// Marks a channel in the graph as failed if a corresponding HTLC fail was sent. @@ -1749,7 +1765,11 @@ impl NetworkGraph where L::Target: Logger { let mut channels = self.channels.write().unwrap(); match channels.get_mut(&msg.short_channel_id) { - None => return Err(LightningError{err: "Couldn't find channel for update".to_owned(), action: ErrorAction::IgnoreError}), + None => { + core::mem::drop(channels); + self.pending_checks.check_hold_pending_channel_update(msg, full_msg)?; + return Err(LightningError{err: "Couldn't find channel for update".to_owned(), action: ErrorAction::IgnoreError}); + }, Some(channel) => { if msg.htlc_maximum_msat > MAX_VALUE_MSAT { return Err(LightningError{err: @@ -1903,13 +1923,13 @@ impl ReadOnlyNetworkGraph<'_> { } #[cfg(test)] -mod tests { - use crate::chain; +pub(crate) mod tests { use crate::ln::channelmanager; use crate::ln::chan_utils::make_funding_redeemscript; #[cfg(feature = "std")] use crate::ln::features::InitFeatures; use crate::routing::gossip::{P2PGossipSync, NetworkGraph, NetworkUpdate, NodeAlias, MAX_EXCESS_BYTES_FOR_RELAY, NodeId, RoutingFees, ChannelUpdateInfo, ChannelInfo, NodeAnnouncementInfo, NodeInfo}; + use crate::routing::utxo::{UtxoLookupError, UtxoResult}; use crate::ln::msgs::{RoutingMessageHandler, UnsignedNodeAnnouncement, NodeAnnouncement, UnsignedChannelAnnouncement, ChannelAnnouncement, UnsignedChannelUpdate, ChannelUpdate, ReplyChannelRange, QueryChannelRange, QueryShortChannelIds, MAX_VALUE_MSAT}; @@ -1970,7 +1990,7 @@ mod tests { assert!(!gossip_sync.should_request_full_sync(&node_id)); } - fn get_signed_node_announcement(f: F, node_key: &SecretKey, secp_ctx: &Secp256k1) -> NodeAnnouncement { + pub(crate) fn get_signed_node_announcement(f: F, node_key: &SecretKey, secp_ctx: &Secp256k1) -> NodeAnnouncement { let node_id = NodeId::from_pubkey(&PublicKey::from_secret_key(&secp_ctx, node_key)); let mut unsigned_announcement = UnsignedNodeAnnouncement { features: channelmanager::provided_node_features(&UserConfig::default()), @@ -1990,7 +2010,7 @@ mod tests { } } - fn get_signed_channel_announcement(f: F, node_1_key: &SecretKey, node_2_key: &SecretKey, secp_ctx: &Secp256k1) -> ChannelAnnouncement { + pub(crate) fn get_signed_channel_announcement(f: F, node_1_key: &SecretKey, node_2_key: &SecretKey, secp_ctx: &Secp256k1) -> ChannelAnnouncement { let node_id_1 = PublicKey::from_secret_key(&secp_ctx, node_1_key); let node_id_2 = PublicKey::from_secret_key(&secp_ctx, node_2_key); let node_1_btckey = &SecretKey::from_slice(&[40; 32]).unwrap(); @@ -2017,14 +2037,14 @@ mod tests { } } - fn get_channel_script(secp_ctx: &Secp256k1) -> Script { + pub(crate) fn get_channel_script(secp_ctx: &Secp256k1) -> Script { let node_1_btckey = SecretKey::from_slice(&[40; 32]).unwrap(); let node_2_btckey = SecretKey::from_slice(&[39; 32]).unwrap(); make_funding_redeemscript(&PublicKey::from_secret_key(secp_ctx, &node_1_btckey), &PublicKey::from_secret_key(secp_ctx, &node_2_btckey)).to_v0_p2wsh() } - fn get_signed_channel_update(f: F, node_key: &SecretKey, secp_ctx: &Secp256k1) -> ChannelUpdate { + pub(crate) fn get_signed_channel_update(f: F, node_key: &SecretKey, secp_ctx: &Secp256k1) -> ChannelUpdate { let mut unsigned_channel_update = UnsignedChannelUpdate { chain_hash: genesis_block(Network::Testnet).header.block_hash(), short_channel_id: 0, @@ -2141,7 +2161,7 @@ mod tests { // Test if an associated transaction were not on-chain (or not confirmed). let chain_source = test_utils::TestChainSource::new(Network::Testnet); - *chain_source.utxo_ret.lock().unwrap() = Err(chain::AccessError::UnknownTx); + *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Sync(Err(UtxoLookupError::UnknownTx)); let network_graph = NetworkGraph::new(genesis_hash, &logger); gossip_sync = P2PGossipSync::new(&network_graph, Some(&chain_source), &logger); @@ -2154,7 +2174,8 @@ mod tests { }; // Now test if the transaction is found in the UTXO set and the script is correct. - *chain_source.utxo_ret.lock().unwrap() = Ok(TxOut { value: 0, script_pubkey: good_script.clone() }); + *chain_source.utxo_ret.lock().unwrap() = + UtxoResult::Sync(Ok(TxOut { value: 0, script_pubkey: good_script.clone() })); let valid_announcement = get_signed_channel_announcement(|unsigned_announcement| { unsigned_announcement.short_channel_id += 2; }, node_1_privkey, node_2_privkey, &secp_ctx); @@ -2172,7 +2193,8 @@ mod tests { // If we receive announcement for the same channel, once we've validated it against the // chain, we simply ignore all new (duplicate) announcements. - *chain_source.utxo_ret.lock().unwrap() = Ok(TxOut { value: 0, script_pubkey: good_script }); + *chain_source.utxo_ret.lock().unwrap() = + UtxoResult::Sync(Ok(TxOut { value: 0, script_pubkey: good_script })); match gossip_sync.handle_channel_announcement(&valid_announcement) { Ok(_) => panic!(), Err(e) => assert_eq!(e.err, "Already have chain-validated channel") @@ -2246,7 +2268,8 @@ mod tests { { // Announce a channel we will update let good_script = get_channel_script(&secp_ctx); - *chain_source.utxo_ret.lock().unwrap() = Ok(TxOut { value: amount_sats, script_pubkey: good_script.clone() }); + *chain_source.utxo_ret.lock().unwrap() = + UtxoResult::Sync(Ok(TxOut { value: amount_sats, script_pubkey: good_script.clone() })); let valid_channel_announcement = get_signed_channel_announcement(|_| {}, node_1_privkey, node_2_privkey, &secp_ctx); short_channel_id = valid_channel_announcement.contents.short_channel_id; diff --git a/lightning/src/routing/mod.rs b/lightning/src/routing/mod.rs index 9bf0910663d..7fff856345c 100644 --- a/lightning/src/routing/mod.rs +++ b/lightning/src/routing/mod.rs @@ -9,6 +9,7 @@ //! Structs and impls for receiving messages about the network and storing the topology live here. +pub mod utxo; pub mod gossip; pub mod router; pub mod scoring; diff --git a/lightning/src/routing/router.rs b/lightning/src/routing/router.rs index 950384d957e..0e51b2bf86b 100644 --- a/lightning/src/routing/router.rs +++ b/lightning/src/routing/router.rs @@ -7,10 +7,7 @@ // You may not use this file except in accordance with one or both of these // licenses. -//! The top-level routing/network map tracking logic lives here. -//! -//! You probably want to create a P2PGossipSync and use that as your RoutingMessageHandler and then -//! interrogate it to get routes for your own payments. +//! The router finds paths within a [`NetworkGraph`] for a payment. use bitcoin::secp256k1::PublicKey; use bitcoin::hashes::Hash; @@ -2115,6 +2112,7 @@ fn build_route_from_hops_internal( #[cfg(test)] mod tests { use crate::routing::gossip::{NetworkGraph, P2PGossipSync, NodeId, EffectiveCapacity}; + use crate::routing::utxo::UtxoResult; use crate::routing::router::{get_route, build_route_from_hops_internal, add_random_cltv_offset, default_node_features, PaymentParameters, Route, RouteHint, RouteHintHop, RouteHop, RoutingFees, DEFAULT_MAX_TOTAL_CLTV_EXPIRY_DELTA, MAX_PATH_LENGTH_ESTIMATE}; @@ -3529,8 +3527,9 @@ mod tests { .push_opcode(opcodes::all::OP_PUSHNUM_2) .push_opcode(opcodes::all::OP_CHECKMULTISIG).into_script().to_v0_p2wsh(); - *chain_monitor.utxo_ret.lock().unwrap() = Ok(TxOut { value: 15, script_pubkey: good_script.clone() }); - gossip_sync.add_chain_access(Some(chain_monitor)); + *chain_monitor.utxo_ret.lock().unwrap() = + UtxoResult::Sync(Ok(TxOut { value: 15, script_pubkey: good_script.clone() })); + gossip_sync.add_utxo_lookup(Some(chain_monitor)); add_channel(&gossip_sync, &secp_ctx, &privkeys[0], &privkeys[2], ChannelFeatures::from_le_bytes(id_to_feature_flags(3)), 333); update_channel(&gossip_sync, &secp_ctx, &privkeys[0], UnsignedChannelUpdate { diff --git a/lightning/src/routing/utxo.rs b/lightning/src/routing/utxo.rs new file mode 100644 index 00000000000..020993f23fb --- /dev/null +++ b/lightning/src/routing/utxo.rs @@ -0,0 +1,861 @@ +// This file is Copyright its original authors, visible in version control +// history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license +// , at your option. +// You may not use this file except in accordance with one or both of these +// licenses. + +//! This module contains traits for LDK to access UTXOs to check gossip data is correct. +//! +//! When lightning nodes gossip channel information, they resist DoS attacks by checking that each +//! channel matches a UTXO on-chain, requiring at least some marginal on-chain transacting in +//! order to announce a channel. This module handles that checking. + +use bitcoin::{BlockHash, TxOut}; +use bitcoin::hashes::hex::ToHex; + +use crate::ln::chan_utils::make_funding_redeemscript_from_slices; +use crate::ln::msgs::{self, LightningError, ErrorAction}; +use crate::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync}; +use crate::util::events::MessageSendEvent; +use crate::util::logger::{Level, Logger}; +use crate::util::ser::Writeable; + +use crate::prelude::*; + +use alloc::sync::{Arc, Weak}; +use crate::sync::Mutex; +use core::ops::Deref; + +/// An error when accessing the chain via [`UtxoLookup`]. +#[derive(Clone, Debug)] +pub enum UtxoLookupError { + /// The requested chain is unknown. + UnknownChain, + + /// The requested transaction doesn't exist or hasn't confirmed. + UnknownTx, +} + +/// The result of a [`UtxoLookup::get_utxo`] call. A call may resolve either synchronously, +/// returning the `Sync` variant, or asynchronously, returning an [`UtxoFuture`] in the `Async` +/// variant. +#[derive(Clone)] +pub enum UtxoResult { + /// A result which was resolved synchronously. It either includes a [`TxOut`] for the output + /// requested or a [`UtxoLookupError`]. + Sync(Result), + /// A result which will be resolved asynchronously. It includes a [`UtxoFuture`], a `clone` of + /// which you must keep locally and call [`UtxoFuture::resolve`] on once the lookup completes. + /// + /// Note that in order to avoid runaway memory usage, the number of parallel checks is limited, + /// but only fairly loosely. Because a pending checks block all message processing, leaving + /// checks pending for an extended time may cause DoS of other functions. It is recommended you + /// keep a tight timeout on lookups, on the order of a few seconds. + Async(UtxoFuture), +} + +/// The `UtxoLookup` trait defines behavior for accessing on-chain UTXOs. +pub trait UtxoLookup { + /// Returns the transaction output of a funding transaction encoded by [`short_channel_id`]. + /// Returns an error if `genesis_hash` is for a different chain or if such a transaction output + /// is unknown. + /// + /// [`short_channel_id`]: https://github.com/lightning/bolts/blob/master/07-routing-gossip.md#definition-of-short_channel_id + fn get_utxo(&self, genesis_hash: &BlockHash, short_channel_id: u64) -> UtxoResult; +} + +enum ChannelAnnouncement { + Full(msgs::ChannelAnnouncement), + Unsigned(msgs::UnsignedChannelAnnouncement), +} +impl ChannelAnnouncement { + fn node_id_1(&self) -> &NodeId { + match self { + ChannelAnnouncement::Full(msg) => &msg.contents.node_id_1, + ChannelAnnouncement::Unsigned(msg) => &msg.node_id_1, + } + } +} + +enum NodeAnnouncement { + Full(msgs::NodeAnnouncement), + Unsigned(msgs::UnsignedNodeAnnouncement), +} +impl NodeAnnouncement { + fn timestamp(&self) -> u32 { + match self { + NodeAnnouncement::Full(msg) => msg.contents.timestamp, + NodeAnnouncement::Unsigned(msg) => msg.timestamp, + } + } +} + +enum ChannelUpdate { + Full(msgs::ChannelUpdate), + Unsigned(msgs::UnsignedChannelUpdate), +} +impl ChannelUpdate { + fn timestamp(&self) -> u32 { + match self { + ChannelUpdate::Full(msg) => msg.contents.timestamp, + ChannelUpdate::Unsigned(msg) => msg.timestamp, + } + } +} + +struct UtxoMessages { + complete: Option>, + channel_announce: Option, + latest_node_announce_a: Option, + latest_node_announce_b: Option, + latest_channel_update_a: Option, + latest_channel_update_b: Option, +} + +/// Represents a future resolution of a [`UtxoLookup::get_utxo`] query resolving async. +/// +/// See [`UtxoResult::Async`] and [`UtxoFuture::resolve`] for more info. +#[derive(Clone)] +pub struct UtxoFuture { + state: Arc>, +} + +/// A trivial implementation of [`UtxoLookup`] which is used to call back into the network graph +/// once we have a concrete resolution of a request. +struct UtxoResolver(Result); +impl UtxoLookup for UtxoResolver { + fn get_utxo(&self, _genesis_hash: &BlockHash, _short_channel_id: u64) -> UtxoResult { + UtxoResult::Sync(self.0.clone()) + } +} + +impl UtxoFuture { + /// Builds a new future for later resolution. + pub fn new() -> Self { + Self { state: Arc::new(Mutex::new(UtxoMessages { + complete: None, + channel_announce: None, + latest_node_announce_a: None, + latest_node_announce_b: None, + latest_channel_update_a: None, + latest_channel_update_b: None, + }))} + } + + /// Resolves this future against the given `graph` and with the given `result`. + /// + /// This is identical to calling [`UtxoFuture::resolve`] with a dummy `gossip`, disabling + /// forwarding the validated gossip message onwards to peers. + /// + /// Because this may cause the [`NetworkGraph`]'s [`processing_queue_high`] to flip, in order + /// to allow us to interact with peers again, you should call [`PeerManager::process_events`] + /// after this. + /// + /// [`processing_queue_high`]: crate::ln::msgs::RoutingMessageHandler::processing_queue_high + /// [`PeerManager::process_events`]: crate::ln::peer_handler::PeerManager::process_events + pub fn resolve_without_forwarding(&self, + graph: &NetworkGraph, result: Result) + where L::Target: Logger { + self.do_resolve(graph, result); + } + + /// Resolves this future against the given `graph` and with the given `result`. + /// + /// The given `gossip` is used to broadcast any validated messages onwards to all peers which + /// have available buffer space. + /// + /// Because this may cause the [`NetworkGraph`]'s [`processing_queue_high`] to flip, in order + /// to allow us to interact with peers again, you should call [`PeerManager::process_events`] + /// after this. + /// + /// [`processing_queue_high`]: crate::ln::msgs::RoutingMessageHandler::processing_queue_high + /// [`PeerManager::process_events`]: crate::ln::peer_handler::PeerManager::process_events + pub fn resolve>, U: Deref, GS: Deref>>(&self, + graph: &NetworkGraph, gossip: GS, result: Result + ) where L::Target: Logger, U::Target: UtxoLookup { + let mut res = self.do_resolve(graph, result); + for msg_opt in res.iter_mut() { + if let Some(msg) = msg_opt.take() { + gossip.forward_gossip_msg(msg); + } + } + } + + fn do_resolve(&self, graph: &NetworkGraph, result: Result) + -> [Option; 5] where L::Target: Logger { + let (announcement, node_a, node_b, update_a, update_b) = { + let mut pending_checks = graph.pending_checks.internal.lock().unwrap(); + let mut async_messages = self.state.lock().unwrap(); + + if async_messages.channel_announce.is_none() { + // We raced returning to `check_channel_announcement` which hasn't updated + // `channel_announce` yet. That's okay, we can set the `complete` field which it will + // check once it gets control again. + async_messages.complete = Some(result); + return [None, None, None, None, None]; + } + + let announcement_msg = match async_messages.channel_announce.as_ref().unwrap() { + ChannelAnnouncement::Full(signed_msg) => &signed_msg.contents, + ChannelAnnouncement::Unsigned(msg) => &msg, + }; + + pending_checks.lookup_completed(announcement_msg, &Arc::downgrade(&self.state)); + + (async_messages.channel_announce.take().unwrap(), + async_messages.latest_node_announce_a.take(), + async_messages.latest_node_announce_b.take(), + async_messages.latest_channel_update_a.take(), + async_messages.latest_channel_update_b.take()) + }; + + let mut res = [None, None, None, None, None]; + let mut res_idx = 0; + + // Now that we've updated our internal state, pass the pending messages back through the + // network graph with a different `UtxoLookup` which will resolve immediately. + // Note that we ignore errors as we don't disconnect peers anyway, so there's nothing to do + // with them. + let resolver = UtxoResolver(result); + match announcement { + ChannelAnnouncement::Full(signed_msg) => { + if graph.update_channel_from_announcement(&signed_msg, &Some(&resolver)).is_ok() { + res[res_idx] = Some(MessageSendEvent::BroadcastChannelAnnouncement { + msg: signed_msg, update_msg: None, + }); + res_idx += 1; + } + }, + ChannelAnnouncement::Unsigned(msg) => { + let _ = graph.update_channel_from_unsigned_announcement(&msg, &Some(&resolver)); + }, + } + + for announce in core::iter::once(node_a).chain(core::iter::once(node_b)) { + match announce { + Some(NodeAnnouncement::Full(signed_msg)) => { + if graph.update_node_from_announcement(&signed_msg).is_ok() { + res[res_idx] = Some(MessageSendEvent::BroadcastNodeAnnouncement { + msg: signed_msg, + }); + res_idx += 1; + } + }, + Some(NodeAnnouncement::Unsigned(msg)) => { + let _ = graph.update_node_from_unsigned_announcement(&msg); + }, + None => {}, + } + } + + for update in core::iter::once(update_a).chain(core::iter::once(update_b)) { + match update { + Some(ChannelUpdate::Full(signed_msg)) => { + if graph.update_channel(&signed_msg).is_ok() { + res[res_idx] = Some(MessageSendEvent::BroadcastChannelUpdate { + msg: signed_msg, + }); + res_idx += 1; + } + }, + Some(ChannelUpdate::Unsigned(msg)) => { + let _ = graph.update_channel_unsigned(&msg); + }, + None => {}, + } + } + + res + } +} + +struct PendingChecksContext { + channels: HashMap>>, + nodes: HashMap>>>, +} + +impl PendingChecksContext { + fn lookup_completed(&mut self, + msg: &msgs::UnsignedChannelAnnouncement, completed_state: &Weak> + ) { + if let hash_map::Entry::Occupied(e) = self.channels.entry(msg.short_channel_id) { + if Weak::ptr_eq(e.get(), &completed_state) { + e.remove(); + } + } + + if let hash_map::Entry::Occupied(mut e) = self.nodes.entry(msg.node_id_1) { + e.get_mut().retain(|elem| !Weak::ptr_eq(&elem, &completed_state)); + if e.get().is_empty() { e.remove(); } + } + if let hash_map::Entry::Occupied(mut e) = self.nodes.entry(msg.node_id_2) { + e.get_mut().retain(|elem| !Weak::ptr_eq(&elem, &completed_state)); + if e.get().is_empty() { e.remove(); } + } + } +} + +/// A set of messages which are pending UTXO lookups for processing. +pub(super) struct PendingChecks { + internal: Mutex, +} + +impl PendingChecks { + pub(super) fn new() -> Self { + PendingChecks { internal: Mutex::new(PendingChecksContext { + channels: HashMap::new(), nodes: HashMap::new(), + }) } + } + + /// Checks if there is a pending `channel_update` UTXO validation for the given channel, + /// and, if so, stores the channel message for handling later and returns an `Err`. + pub(super) fn check_hold_pending_channel_update( + &self, msg: &msgs::UnsignedChannelUpdate, full_msg: Option<&msgs::ChannelUpdate> + ) -> Result<(), LightningError> { + let mut pending_checks = self.internal.lock().unwrap(); + if let hash_map::Entry::Occupied(e) = pending_checks.channels.entry(msg.short_channel_id) { + let is_from_a = (msg.flags & 1) == 1; + match Weak::upgrade(e.get()) { + Some(msgs_ref) => { + let mut messages = msgs_ref.lock().unwrap(); + let latest_update = if is_from_a { + &mut messages.latest_channel_update_a + } else { + &mut messages.latest_channel_update_b + }; + if latest_update.is_none() || latest_update.as_ref().unwrap().timestamp() < msg.timestamp { + // If the messages we got has a higher timestamp, just blindly assume the + // signatures on the new message are correct and drop the old message. This + // may cause us to end up dropping valid `channel_update`s if a peer is + // malicious, but we should get the correct ones when the node updates them. + *latest_update = Some( + if let Some(msg) = full_msg { ChannelUpdate::Full(msg.clone()) } + else { ChannelUpdate::Unsigned(msg.clone()) }); + } + return Err(LightningError { + err: "Awaiting channel_announcement validation to accept channel_update".to_owned(), + action: ErrorAction::IgnoreAndLog(Level::Gossip), + }); + }, + None => { e.remove(); }, + } + } + Ok(()) + } + + /// Checks if there is a pending `node_announcement` UTXO validation for a channel with the + /// given node and, if so, stores the channel message for handling later and returns an `Err`. + pub(super) fn check_hold_pending_node_announcement( + &self, msg: &msgs::UnsignedNodeAnnouncement, full_msg: Option<&msgs::NodeAnnouncement> + ) -> Result<(), LightningError> { + let mut pending_checks = self.internal.lock().unwrap(); + if let hash_map::Entry::Occupied(mut e) = pending_checks.nodes.entry(msg.node_id) { + let mut found_at_least_one_chan = false; + e.get_mut().retain(|node_msgs| { + match Weak::upgrade(&node_msgs) { + Some(chan_mtx) => { + let mut chan_msgs = chan_mtx.lock().unwrap(); + if let Some(chan_announce) = &chan_msgs.channel_announce { + let latest_announce = + if *chan_announce.node_id_1() == msg.node_id { + &mut chan_msgs.latest_node_announce_a + } else { + &mut chan_msgs.latest_node_announce_b + }; + if latest_announce.is_none() || + latest_announce.as_ref().unwrap().timestamp() < msg.timestamp + { + *latest_announce = Some( + if let Some(msg) = full_msg { NodeAnnouncement::Full(msg.clone()) } + else { NodeAnnouncement::Unsigned(msg.clone()) }); + } + found_at_least_one_chan = true; + true + } else { + debug_assert!(false, "channel_announce is set before struct is added to node map"); + false + } + }, + None => false, + } + }); + if e.get().is_empty() { e.remove(); } + if found_at_least_one_chan { + return Err(LightningError { + err: "Awaiting channel_announcement validation to accept node_announcement".to_owned(), + action: ErrorAction::IgnoreAndLog(Level::Gossip), + }); + } + } + Ok(()) + } + + fn check_replace_previous_entry(msg: &msgs::UnsignedChannelAnnouncement, + full_msg: Option<&msgs::ChannelAnnouncement>, replacement: Option>>, + pending_channels: &mut HashMap>> + ) -> Result<(), msgs::LightningError> { + match pending_channels.entry(msg.short_channel_id) { + hash_map::Entry::Occupied(mut e) => { + // There's already a pending lookup for the given SCID. Check if the messages + // are the same and, if so, return immediately (don't bother spawning another + // lookup if we haven't gotten that far yet). + match Weak::upgrade(&e.get()) { + Some(pending_msgs) => { + let pending_matches = match &pending_msgs.lock().unwrap().channel_announce { + Some(ChannelAnnouncement::Full(pending_msg)) => Some(pending_msg) == full_msg, + Some(ChannelAnnouncement::Unsigned(pending_msg)) => pending_msg == msg, + None => { + // This shouldn't actually be reachable. We set the + // `channel_announce` field under the same lock as setting the + // channel map entry. Still, we can just treat it as + // non-matching and let the new request fly. + debug_assert!(false); + false + }, + }; + if pending_matches { + return Err(LightningError { + err: "Channel announcement is already being checked".to_owned(), + action: ErrorAction::IgnoreDuplicateGossip, + }); + } else { + // The earlier lookup is a different message. If we have another + // request in-flight now replace the original. + // Note that in the replace case whether to replace is somewhat + // arbitrary - both results will be handled, we're just updating the + // value that will be compared to future lookups with the same SCID. + if let Some(item) = replacement { + *e.get_mut() = item; + } + } + }, + None => { + // The earlier lookup already resolved. We can't be sure its the same + // so just remove/replace it and move on. + if let Some(item) = replacement { + *e.get_mut() = item; + } else { e.remove(); } + }, + } + }, + hash_map::Entry::Vacant(v) => { + if let Some(item) = replacement { v.insert(item); } + }, + } + Ok(()) + } + + pub(super) fn check_channel_announcement(&self, + utxo_lookup: &Option, msg: &msgs::UnsignedChannelAnnouncement, + full_msg: Option<&msgs::ChannelAnnouncement> + ) -> Result, msgs::LightningError> where U::Target: UtxoLookup { + let handle_result = |res| { + match res { + Ok(TxOut { value, script_pubkey }) => { + let expected_script = + make_funding_redeemscript_from_slices(msg.bitcoin_key_1.as_slice(), msg.bitcoin_key_2.as_slice()).to_v0_p2wsh(); + if script_pubkey != expected_script { + return Err(LightningError{ + err: format!("Channel announcement key ({}) didn't match on-chain script ({})", + expected_script.to_hex(), script_pubkey.to_hex()), + action: ErrorAction::IgnoreError + }); + } + Ok(Some(value)) + }, + Err(UtxoLookupError::UnknownChain) => { + Err(LightningError { + err: format!("Channel announced on an unknown chain ({})", + msg.chain_hash.encode().to_hex()), + action: ErrorAction::IgnoreError + }) + }, + Err(UtxoLookupError::UnknownTx) => { + Err(LightningError { + err: "Channel announced without corresponding UTXO entry".to_owned(), + action: ErrorAction::IgnoreError + }) + }, + } + }; + + Self::check_replace_previous_entry(msg, full_msg, None, + &mut self.internal.lock().unwrap().channels)?; + + match utxo_lookup { + &None => { + // Tentatively accept, potentially exposing us to DoS attacks + Ok(None) + }, + &Some(ref utxo_lookup) => { + match utxo_lookup.get_utxo(&msg.chain_hash, msg.short_channel_id) { + UtxoResult::Sync(res) => handle_result(res), + UtxoResult::Async(future) => { + let mut pending_checks = self.internal.lock().unwrap(); + let mut async_messages = future.state.lock().unwrap(); + if let Some(res) = async_messages.complete.take() { + // In the unlikely event the future resolved before we managed to get it, + // handle the result in-line. + handle_result(res) + } else { + Self::check_replace_previous_entry(msg, full_msg, + Some(Arc::downgrade(&future.state)), &mut pending_checks.channels)?; + async_messages.channel_announce = Some( + if let Some(msg) = full_msg { ChannelAnnouncement::Full(msg.clone()) } + else { ChannelAnnouncement::Unsigned(msg.clone()) }); + pending_checks.nodes.entry(msg.node_id_1) + .or_insert(Vec::new()).push(Arc::downgrade(&future.state)); + pending_checks.nodes.entry(msg.node_id_2) + .or_insert(Vec::new()).push(Arc::downgrade(&future.state)); + Err(LightningError { + err: "Channel being checked async".to_owned(), + action: ErrorAction::IgnoreAndLog(Level::Gossip), + }) + } + }, + } + } + } + } + + /// The maximum number of pending gossip checks before [`Self::too_many_checks_pending`] + /// returns `true`. Note that this isn't a strict upper-bound on the number of checks pending - + /// each peer may, at a minimum, read one more socket buffer worth of `channel_announcement`s + /// which we'll have to process. With a socket buffer of 4KB and a minimum + /// `channel_announcement` size of, roughly, 429 bytes, this may leave us with `10*our peer + /// count` messages to process beyond this limit. Because we'll probably have a few peers, + /// there's no reason for this constant to be materially less than 30 or so, and 32 in-flight + /// checks should be more than enough for decent parallelism. + const MAX_PENDING_LOOKUPS: usize = 32; + + /// Returns true if there are a large number of async checks pending and future + /// `channel_announcement` messages should be delayed. Note that this is only a hint and + /// messages already in-flight may still have to be handled for various reasons. + pub(super) fn too_many_checks_pending(&self) -> bool { + let mut pending_checks = self.internal.lock().unwrap(); + if pending_checks.channels.len() > Self::MAX_PENDING_LOOKUPS { + // If we have many channel checks pending, ensure we don't have any dangling checks + // (i.e. checks where the user told us they'd call back but drop'd the `UtxoFuture` + // instead) before we commit to applying backpressure. + pending_checks.channels.retain(|_, chan| { + Weak::upgrade(&chan).is_some() + }); + pending_checks.nodes.retain(|_, channels| { + channels.retain(|chan| Weak::upgrade(&chan).is_some()); + !channels.is_empty() + }); + pending_checks.channels.len() > Self::MAX_PENDING_LOOKUPS + } else { + false + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::routing::gossip::tests::*; + use crate::util::test_utils::{TestChainSource, TestLogger}; + use crate::ln::msgs; + + use bitcoin::blockdata::constants::genesis_block; + use bitcoin::secp256k1::{Secp256k1, SecretKey}; + + use core::sync::atomic::Ordering; + + fn get_network() -> (TestChainSource, NetworkGraph>) { + let logger = Box::new(TestLogger::new()); + let genesis_hash = genesis_block(bitcoin::Network::Testnet).header.block_hash(); + let chain_source = TestChainSource::new(bitcoin::Network::Testnet); + let network_graph = NetworkGraph::new(genesis_hash, logger); + + (chain_source, network_graph) + } + + fn get_test_objects() -> (msgs::ChannelAnnouncement, TestChainSource, + NetworkGraph>, bitcoin::Script, msgs::NodeAnnouncement, + msgs::NodeAnnouncement, msgs::ChannelUpdate, msgs::ChannelUpdate, msgs::ChannelUpdate) + { + let secp_ctx = Secp256k1::new(); + + let (chain_source, network_graph) = get_network(); + + let good_script = get_channel_script(&secp_ctx); + let node_1_privkey = &SecretKey::from_slice(&[42; 32]).unwrap(); + let node_2_privkey = &SecretKey::from_slice(&[41; 32]).unwrap(); + let valid_announcement = get_signed_channel_announcement(|_| {}, node_1_privkey, node_2_privkey, &secp_ctx); + + let node_a_announce = get_signed_node_announcement(|_| {}, node_1_privkey, &secp_ctx); + let node_b_announce = get_signed_node_announcement(|_| {}, node_2_privkey, &secp_ctx); + + // Note that we have to set the "direction" flag correctly on both messages + let chan_update_a = get_signed_channel_update(|msg| msg.flags = 0, node_1_privkey, &secp_ctx); + let chan_update_b = get_signed_channel_update(|msg| msg.flags = 1, node_2_privkey, &secp_ctx); + let chan_update_c = get_signed_channel_update(|msg| { + msg.flags = 1; msg.timestamp += 1; }, node_2_privkey, &secp_ctx); + + (valid_announcement, chain_source, network_graph, good_script, node_a_announce, + node_b_announce, chan_update_a, chan_update_b, chan_update_c) + } + + #[test] + fn test_fast_async_lookup() { + // Check that async lookups which resolve quicker than the future is returned to the + // `get_utxo` call can read it still resolve properly. + let (valid_announcement, chain_source, network_graph, good_script, ..) = get_test_objects(); + + let future = UtxoFuture::new(); + future.resolve_without_forwarding(&network_graph, + Ok(TxOut { value: 1_000_000, script_pubkey: good_script })); + *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone()); + + network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap(); + assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_some()); + } + + #[test] + fn test_async_lookup() { + // Test a simple async lookup + let (valid_announcement, chain_source, network_graph, good_script, + node_a_announce, node_b_announce, ..) = get_test_objects(); + + let future = UtxoFuture::new(); + *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone()); + + assert_eq!( + network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err, + "Channel being checked async"); + assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none()); + + future.resolve_without_forwarding(&network_graph, + Ok(TxOut { value: 0, script_pubkey: good_script })); + network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).unwrap(); + network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).unwrap(); + + assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_1) + .unwrap().announcement_info.is_none()); + + network_graph.update_node_from_announcement(&node_a_announce).unwrap(); + network_graph.update_node_from_announcement(&node_b_announce).unwrap(); + + assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_1) + .unwrap().announcement_info.is_some()); + } + + #[test] + fn test_invalid_async_lookup() { + // Test an async lookup which returns an incorrect script + let (valid_announcement, chain_source, network_graph, ..) = get_test_objects(); + + let future = UtxoFuture::new(); + *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone()); + + assert_eq!( + network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err, + "Channel being checked async"); + assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none()); + + future.resolve_without_forwarding(&network_graph, + Ok(TxOut { value: 1_000_000, script_pubkey: bitcoin::Script::new() })); + assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none()); + } + + #[test] + fn test_failing_async_lookup() { + // Test an async lookup which returns an error + let (valid_announcement, chain_source, network_graph, ..) = get_test_objects(); + + let future = UtxoFuture::new(); + *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone()); + + assert_eq!( + network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err, + "Channel being checked async"); + assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none()); + + future.resolve_without_forwarding(&network_graph, Err(UtxoLookupError::UnknownTx)); + assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none()); + } + + #[test] + fn test_updates_async_lookup() { + // Test async lookups will process pending channel_update/node_announcements once they + // complete. + let (valid_announcement, chain_source, network_graph, good_script, node_a_announce, + node_b_announce, chan_update_a, chan_update_b, ..) = get_test_objects(); + + let future = UtxoFuture::new(); + *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone()); + + assert_eq!( + network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err, + "Channel being checked async"); + assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none()); + + assert_eq!( + network_graph.update_node_from_announcement(&node_a_announce).unwrap_err().err, + "Awaiting channel_announcement validation to accept node_announcement"); + assert_eq!( + network_graph.update_node_from_announcement(&node_b_announce).unwrap_err().err, + "Awaiting channel_announcement validation to accept node_announcement"); + + assert_eq!(network_graph.update_channel(&chan_update_a).unwrap_err().err, + "Awaiting channel_announcement validation to accept channel_update"); + assert_eq!(network_graph.update_channel(&chan_update_b).unwrap_err().err, + "Awaiting channel_announcement validation to accept channel_update"); + + future.resolve_without_forwarding(&network_graph, + Ok(TxOut { value: 1_000_000, script_pubkey: good_script })); + + assert!(network_graph.read_only().channels() + .get(&valid_announcement.contents.short_channel_id).unwrap().one_to_two.is_some()); + assert!(network_graph.read_only().channels() + .get(&valid_announcement.contents.short_channel_id).unwrap().two_to_one.is_some()); + + assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_1) + .unwrap().announcement_info.is_some()); + assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_2) + .unwrap().announcement_info.is_some()); + } + + #[test] + fn test_latest_update_async_lookup() { + // Test async lookups will process the latest channel_update if two are received while + // awaiting an async UTXO lookup. + let (valid_announcement, chain_source, network_graph, good_script, _, + _, chan_update_a, chan_update_b, chan_update_c, ..) = get_test_objects(); + + let future = UtxoFuture::new(); + *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone()); + + assert_eq!( + network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err, + "Channel being checked async"); + assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none()); + + assert_eq!(network_graph.update_channel(&chan_update_a).unwrap_err().err, + "Awaiting channel_announcement validation to accept channel_update"); + assert_eq!(network_graph.update_channel(&chan_update_b).unwrap_err().err, + "Awaiting channel_announcement validation to accept channel_update"); + assert_eq!(network_graph.update_channel(&chan_update_c).unwrap_err().err, + "Awaiting channel_announcement validation to accept channel_update"); + + future.resolve_without_forwarding(&network_graph, + Ok(TxOut { value: 1_000_000, script_pubkey: good_script })); + + assert_eq!(chan_update_a.contents.timestamp, chan_update_b.contents.timestamp); + assert!(network_graph.read_only().channels() + .get(&valid_announcement.contents.short_channel_id).as_ref().unwrap() + .one_to_two.as_ref().unwrap().last_update != + network_graph.read_only().channels() + .get(&valid_announcement.contents.short_channel_id).as_ref().unwrap() + .two_to_one.as_ref().unwrap().last_update); + } + + #[test] + fn test_no_double_lookups() { + // Test that a pending async lookup will prevent a second async lookup from flying, but + // only if the channel_announcement message is identical. + let (valid_announcement, chain_source, network_graph, good_script, ..) = get_test_objects(); + + let future = UtxoFuture::new(); + *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone()); + + assert_eq!( + network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err, + "Channel being checked async"); + assert_eq!(chain_source.get_utxo_call_count.load(Ordering::Relaxed), 1); + + // If we make a second request with the same message, the call count doesn't increase... + let future_b = UtxoFuture::new(); + *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future_b.clone()); + assert_eq!( + network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err, + "Channel announcement is already being checked"); + assert_eq!(chain_source.get_utxo_call_count.load(Ordering::Relaxed), 1); + + // But if we make a third request with a tweaked message, we should get a second call + // against our new future... + let secp_ctx = Secp256k1::new(); + let replacement_pk_1 = &SecretKey::from_slice(&[99; 32]).unwrap(); + let replacement_pk_2 = &SecretKey::from_slice(&[98; 32]).unwrap(); + let invalid_announcement = get_signed_channel_announcement(|_| {}, replacement_pk_1, replacement_pk_2, &secp_ctx); + assert_eq!( + network_graph.update_channel_from_announcement(&invalid_announcement, &Some(&chain_source)).unwrap_err().err, + "Channel being checked async"); + assert_eq!(chain_source.get_utxo_call_count.load(Ordering::Relaxed), 2); + + // Still, if we resolve the original future, the original channel will be accepted. + future.resolve_without_forwarding(&network_graph, + Ok(TxOut { value: 1_000_000, script_pubkey: good_script })); + assert!(!network_graph.read_only().channels() + .get(&valid_announcement.contents.short_channel_id).unwrap() + .announcement_message.as_ref().unwrap() + .contents.features.supports_unknown_test_feature()); + } + + #[test] + fn test_checks_backpressure() { + // Test that too_many_checks_pending returns true when there are many checks pending, and + // returns false once they complete. + let secp_ctx = Secp256k1::new(); + let (chain_source, network_graph) = get_network(); + + // We cheat and use a single future for all the lookups to complete them all at once. + let future = UtxoFuture::new(); + *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone()); + + let node_1_privkey = &SecretKey::from_slice(&[42; 32]).unwrap(); + let node_2_privkey = &SecretKey::from_slice(&[41; 32]).unwrap(); + + for i in 0..PendingChecks::MAX_PENDING_LOOKUPS { + let valid_announcement = get_signed_channel_announcement( + |msg| msg.short_channel_id += 1 + i as u64, node_1_privkey, node_2_privkey, &secp_ctx); + network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err(); + assert!(!network_graph.pending_checks.too_many_checks_pending()); + } + + let valid_announcement = get_signed_channel_announcement( + |_| {}, node_1_privkey, node_2_privkey, &secp_ctx); + network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err(); + assert!(network_graph.pending_checks.too_many_checks_pending()); + + // Once the future completes the "too many checks" flag should reset. + future.resolve_without_forwarding(&network_graph, Err(UtxoLookupError::UnknownTx)); + assert!(!network_graph.pending_checks.too_many_checks_pending()); + } + + #[test] + fn test_checks_backpressure_drop() { + // Test that too_many_checks_pending returns true when there are many checks pending, and + // returns false if we drop some of the futures without completion. + let secp_ctx = Secp256k1::new(); + let (chain_source, network_graph) = get_network(); + + // We cheat and use a single future for all the lookups to complete them all at once. + *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(UtxoFuture::new()); + + let node_1_privkey = &SecretKey::from_slice(&[42; 32]).unwrap(); + let node_2_privkey = &SecretKey::from_slice(&[41; 32]).unwrap(); + + for i in 0..PendingChecks::MAX_PENDING_LOOKUPS { + let valid_announcement = get_signed_channel_announcement( + |msg| msg.short_channel_id += 1 + i as u64, node_1_privkey, node_2_privkey, &secp_ctx); + network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err(); + assert!(!network_graph.pending_checks.too_many_checks_pending()); + } + + let valid_announcement = get_signed_channel_announcement( + |_| {}, node_1_privkey, node_2_privkey, &secp_ctx); + network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err(); + assert!(network_graph.pending_checks.too_many_checks_pending()); + + // Once the future is drop'd (by resetting the `utxo_ret` value) the "too many checks" flag + // should reset to false. + *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Sync(Err(UtxoLookupError::UnknownTx)); + assert!(!network_graph.pending_checks.too_many_checks_pending()); + } +} diff --git a/lightning/src/util/events.rs b/lightning/src/util/events.rs index 475533c902a..265e9186611 100644 --- a/lightning/src/util/events.rs +++ b/lightning/src/util/events.rs @@ -1612,13 +1612,18 @@ pub enum MessageSendEvent { /// The channel_announcement which should be sent. msg: msgs::ChannelAnnouncement, /// The followup channel_update which should be sent. - update_msg: msgs::ChannelUpdate, + update_msg: Option, }, /// Used to indicate that a channel_update should be broadcast to all peers. BroadcastChannelUpdate { /// The channel_update which should be sent. msg: msgs::ChannelUpdate, }, + /// Used to indicate that a node_announcement should be broadcast to all peers. + BroadcastNodeAnnouncement { + /// The node_announcement which should be sent. + msg: msgs::NodeAnnouncement, + }, /// Used to indicate that a channel_update should be sent to a single peer. /// In contrast to [`Self::BroadcastChannelUpdate`], this is used when the channel is a /// private channel and we shouldn't be informing all of our peers of channel parameters. diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index be09c8f0065..83647a73385 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -22,8 +22,8 @@ use crate::ln::features::{ChannelFeatures, InitFeatures, NodeFeatures}; use crate::ln::{msgs, wire}; use crate::ln::msgs::LightningError; use crate::ln::script::ShutdownScript; -use crate::routing::gossip::NetworkGraph; -use crate::routing::gossip::NodeId; +use crate::routing::gossip::{NetworkGraph, NodeId}; +use crate::routing::utxo::{UtxoLookup, UtxoLookupError, UtxoResult}; use crate::routing::router::{find_route, InFlightHtlcs, Route, RouteHop, RouteParameters, Router, ScorerAccountingForInFlightHtlcs}; use crate::routing::scoring::FixedPenaltyScorer; use crate::util::config::UserConfig; @@ -571,6 +571,8 @@ impl msgs::RoutingMessageHandler for TestRoutingMessageHandler { features.set_gossip_queries_optional(); features } + + fn processing_queue_high(&self) -> bool { false } } impl events::MessageSendEventsProvider for TestRoutingMessageHandler { @@ -839,7 +841,8 @@ impl core::fmt::Debug for OnGetShutdownScriptpubkey { pub struct TestChainSource { pub genesis_hash: BlockHash, - pub utxo_ret: Mutex>, + pub utxo_ret: Mutex, + pub get_utxo_call_count: AtomicUsize, pub watched_txn: Mutex>, pub watched_outputs: Mutex>, } @@ -849,17 +852,19 @@ impl TestChainSource { let script_pubkey = Builder::new().push_opcode(opcodes::OP_TRUE).into_script(); Self { genesis_hash: genesis_block(network).block_hash(), - utxo_ret: Mutex::new(Ok(TxOut { value: u64::max_value(), script_pubkey })), + utxo_ret: Mutex::new(UtxoResult::Sync(Ok(TxOut { value: u64::max_value(), script_pubkey }))), + get_utxo_call_count: AtomicUsize::new(0), watched_txn: Mutex::new(HashSet::new()), watched_outputs: Mutex::new(HashSet::new()), } } } -impl chain::Access for TestChainSource { - fn get_utxo(&self, genesis_hash: &BlockHash, _short_channel_id: u64) -> Result { +impl UtxoLookup for TestChainSource { + fn get_utxo(&self, genesis_hash: &BlockHash, _short_channel_id: u64) -> UtxoResult { + self.get_utxo_call_count.fetch_add(1, Ordering::Relaxed); if self.genesis_hash != *genesis_hash { - return Err(chain::AccessError::UnknownChain); + return UtxoResult::Sync(Err(UtxoLookupError::UnknownChain)); } self.utxo_ret.lock().unwrap().clone()