Skip to content

Commit 74cd96f

Browse files
authored
Merge pull request #736 from bmancini55/gossip_queries
Initiate gossip_queries
2 parents a008464 + c026764 commit 74cd96f

File tree

13 files changed

+530
-98
lines changed

13 files changed

+530
-98
lines changed

fuzz/src/full_stack.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use bitcoin::blockdata::script::{Builder, Script};
1919
use bitcoin::blockdata::opcodes;
2020
use bitcoin::consensus::encode::deserialize;
2121
use bitcoin::network::constants::Network;
22+
use bitcoin::blockdata::constants::genesis_block;
2223

2324
use bitcoin::hashes::Hash as TraitImport;
2425
use bitcoin::hashes::HashEngine as TraitImportEngine;
@@ -343,7 +344,7 @@ pub fn do_test(data: &[u8], logger: &Arc<dyn Logger>) {
343344
config.peer_channel_config_limits.min_dust_limit_satoshis = 0;
344345
let channelmanager = Arc::new(ChannelManager::new(Network::Bitcoin, fee_est.clone(), monitor.clone(), broadcast.clone(), Arc::clone(&logger), keys_manager.clone(), config, 0));
345346
let our_id = PublicKey::from_secret_key(&Secp256k1::signing_only(), &keys_manager.get_node_secret());
346-
let net_graph_msg_handler = Arc::new(NetGraphMsgHandler::new(None, Arc::clone(&logger)));
347+
let net_graph_msg_handler = Arc::new(NetGraphMsgHandler::new(genesis_block(Network::Bitcoin).header.block_hash(), None, Arc::clone(&logger)));
347348

348349
let peers = RefCell::new([false; 256]);
349350
let mut loss_detector = MoneyLossDetector::new(&peers, channelmanager.clone(), monitor.clone(), PeerManager::new(MessageHandler {
@@ -609,7 +610,7 @@ mod tests {
609610
// What each byte represents is broken down below, and then everything is concatenated into
610611
// one large test at the end (you want %s/ -.*//g %s/\n\| \|\t\|\///g).
611612

612-
// Following BOLT 8, lightning message on the wire are: 2-byte encrypted message length +
613+
// Following BOLT 8, lightning message on the wire are: 2-byte encrypted message length +
613614
// 16-byte MAC of the encrypted message length + encrypted Lightning message + 16-byte MAC
614615
// of the Lightning message
615616
// I.e 2nd inbound read, len 18 : 0006 (encrypted message length) + 03000000000000000000000000000000 (MAC of the encrypted message length)

fuzz/src/router.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ use lightning::util::ser::Readable;
2121
use lightning::routing::network_graph::{NetworkGraph, RoutingFees};
2222

2323
use bitcoin::secp256k1::key::PublicKey;
24+
use bitcoin::network::constants::Network;
25+
use bitcoin::blockdata::constants::genesis_block;
2426

2527
use utils::test_logger;
2628

@@ -155,7 +157,7 @@ pub fn do_test<Out: test_logger::Output>(data: &[u8], out: Out) {
155157
let logger: Arc<dyn Logger> = Arc::new(test_logger::TestLogger::new("".to_owned(), out));
156158

157159
let our_pubkey = get_pubkey!();
158-
let mut net_graph = NetworkGraph::new();
160+
let mut net_graph = NetworkGraph::new(genesis_block(Network::Bitcoin).header.block_hash());
159161

160162
let mut node_pks = HashSet::new();
161163
let mut scid = 42;

lightning-net-tokio/src/lib.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -535,7 +535,11 @@ mod tests {
535535
fn handle_htlc_fail_channel_update(&self, _update: &HTLCFailChannelUpdate) { }
536536
fn get_next_channel_announcements(&self, _starting_point: u64, _batch_amount: u8) -> Vec<(ChannelAnnouncement, Option<ChannelUpdate>, Option<ChannelUpdate>)> { Vec::new() }
537537
fn get_next_node_announcements(&self, _starting_point: Option<&PublicKey>, _batch_amount: u8) -> Vec<NodeAnnouncement> { Vec::new() }
538-
fn should_request_full_sync(&self, _node_id: &PublicKey) -> bool { false }
538+
fn sync_routing_table(&self, _their_node_id: &PublicKey, _init_msg: &Init) { }
539+
fn handle_reply_channel_range(&self, _their_node_id: &PublicKey, _msg: ReplyChannelRange) -> Result<(), LightningError> { Ok(()) }
540+
fn handle_reply_short_channel_ids_end(&self, _their_node_id: &PublicKey, _msg: ReplyShortChannelIdsEnd) -> Result<(), LightningError> { Ok(()) }
541+
fn handle_query_channel_range(&self, _their_node_id: &PublicKey, _msg: QueryChannelRange) -> Result<(), LightningError> { Ok(()) }
542+
fn handle_query_short_channel_ids(&self, _their_node_id: &PublicKey, _msg: QueryShortChannelIds) -> Result<(), LightningError> { Ok(()) }
539543
}
540544
impl ChannelMessageHandler for MsgHandler {
541545
fn handle_open_channel(&self, _their_node_id: &PublicKey, _their_features: InitFeatures, _msg: &OpenChannel) {}

lightning/src/ln/channelmanager.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3404,6 +3404,8 @@ impl<ChanSigner: ChannelKeys, M: Deref + Sync + Send, T: Deref + Sync + Send, K:
34043404
&events::MessageSendEvent::BroadcastChannelUpdate { .. } => true,
34053405
&events::MessageSendEvent::HandleError { ref node_id, .. } => node_id != counterparty_node_id,
34063406
&events::MessageSendEvent::PaymentFailureNetworkUpdate { .. } => true,
3407+
&events::MessageSendEvent::SendChannelRangeQuery { .. } => false,
3408+
&events::MessageSendEvent::SendShortIdsQuery { .. } => false,
34073409
}
34083410
});
34093411
}

lightning/src/ln/features.rs

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ mod sealed {
104104
],
105105
optional_features: [
106106
// Byte 0
107-
DataLossProtect | InitialRoutingSync | UpfrontShutdownScript,
107+
DataLossProtect | InitialRoutingSync | UpfrontShutdownScript | GossipQueries,
108108
// Byte 1
109109
VariableLengthOnion | PaymentSecret,
110110
// Byte 2
@@ -122,7 +122,7 @@ mod sealed {
122122
],
123123
optional_features: [
124124
// Byte 0
125-
DataLossProtect | UpfrontShutdownScript,
125+
DataLossProtect | UpfrontShutdownScript | GossipQueries,
126126
// Byte 1
127127
VariableLengthOnion | PaymentSecret,
128128
// Byte 2
@@ -243,6 +243,8 @@ mod sealed {
243243
"Feature flags for `initial_routing_sync`.");
244244
define_feature!(5, UpfrontShutdownScript, [InitContext, NodeContext],
245245
"Feature flags for `option_upfront_shutdown_script`.");
246+
define_feature!(7, GossipQueries, [InitContext, NodeContext],
247+
"Feature flags for `gossip_queries`.");
246248
define_feature!(9, VariableLengthOnion, [InitContext, NodeContext],
247249
"Feature flags for `var_onion_optin`.");
248250
define_feature!(13, StaticRemoteKey, [InitContext, NodeContext],
@@ -473,6 +475,22 @@ impl<T: sealed::UpfrontShutdownScript> Features<T> {
473475
}
474476
}
475477

478+
479+
impl<T: sealed::GossipQueries> Features<T> {
480+
#[cfg(test)]
481+
pub(crate) fn requires_gossip_queries(&self) -> bool {
482+
<T as sealed::GossipQueries>::requires_feature(&self.flags)
483+
}
484+
pub(crate) fn supports_gossip_queries(&self) -> bool {
485+
<T as sealed::GossipQueries>::supports_feature(&self.flags)
486+
}
487+
#[cfg(test)]
488+
pub(crate) fn clear_gossip_queries(mut self) -> Self {
489+
<T as sealed::GossipQueries>::clear_bits(&mut self.flags);
490+
self
491+
}
492+
}
493+
476494
impl<T: sealed::VariableLengthOnion> Features<T> {
477495
#[cfg(test)]
478496
pub(crate) fn requires_variable_length_onion(&self) -> bool {
@@ -497,6 +515,10 @@ impl<T: sealed::InitialRoutingSync> Features<T> {
497515
pub(crate) fn initial_routing_sync(&self) -> bool {
498516
<T as sealed::InitialRoutingSync>::supports_feature(&self.flags)
499517
}
518+
// We are no longer setting initial_routing_sync now that gossip_queries
519+
// is enabled. This feature is ignored by a peer when gossip_queries has
520+
// been negotiated.
521+
#[cfg(test)]
500522
pub(crate) fn clear_initial_routing_sync(&mut self) {
501523
<T as sealed::InitialRoutingSync>::clear_bits(&mut self.flags)
502524
}
@@ -568,6 +590,11 @@ mod tests {
568590
assert!(!InitFeatures::known().requires_upfront_shutdown_script());
569591
assert!(!NodeFeatures::known().requires_upfront_shutdown_script());
570592

593+
assert!(InitFeatures::known().supports_gossip_queries());
594+
assert!(NodeFeatures::known().supports_gossip_queries());
595+
assert!(!InitFeatures::known().requires_gossip_queries());
596+
assert!(!NodeFeatures::known().requires_gossip_queries());
597+
571598
assert!(InitFeatures::known().supports_data_loss_protect());
572599
assert!(NodeFeatures::known().supports_data_loss_protect());
573600
assert!(!InitFeatures::known().requires_data_loss_protect());
@@ -620,9 +647,10 @@ mod tests {
620647

621648
#[test]
622649
fn convert_to_context_with_relevant_flags() {
623-
let init_features = InitFeatures::known().clear_upfront_shutdown_script();
650+
let init_features = InitFeatures::known().clear_upfront_shutdown_script().clear_gossip_queries();
624651
assert!(init_features.initial_routing_sync());
625652
assert!(!init_features.supports_upfront_shutdown_script());
653+
assert!(!init_features.supports_gossip_queries());
626654

627655
let node_features: NodeFeatures = init_features.to_context();
628656
{
@@ -639,8 +667,10 @@ mod tests {
639667
// Check that cleared flags are kept blank when converting back:
640668
// - initial_routing_sync was not applicable to NodeContext
641669
// - upfront_shutdown_script was cleared before converting
670+
// - gossip_queries was cleared before converting
642671
let features: InitFeatures = node_features.to_context_internal();
643672
assert!(!features.initial_routing_sync());
644673
assert!(!features.supports_upfront_shutdown_script());
674+
assert!(!init_features.supports_gossip_queries());
645675
}
646676
}

lightning/src/ln/functional_test_utils.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1171,7 +1171,7 @@ pub fn create_network<'a, 'b: 'a, 'c: 'b>(node_count: usize, cfgs: &'b Vec<NodeC
11711171
let payment_count = Rc::new(RefCell::new(0));
11721172

11731173
for i in 0..node_count {
1174-
let net_graph_msg_handler = NetGraphMsgHandler::new(None, cfgs[i].logger);
1174+
let net_graph_msg_handler = NetGraphMsgHandler::new(cfgs[i].chain_source.genesis_hash, None, cfgs[i].logger);
11751175
nodes.push(Node{ chain_source: cfgs[i].chain_source,
11761176
tx_broadcaster: cfgs[i].tx_broadcaster, chain_monitor: &cfgs[i].chain_monitor,
11771177
keys_manager: &cfgs[i].keys_manager, node: &chan_mgrs[i], net_graph_msg_handler,

lightning/src/ln/msgs.rs

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -804,7 +804,13 @@ pub trait ChannelMessageHandler : events::MessageSendEventsProvider + Send + Syn
804804
}
805805

806806
/// A trait to describe an object which can receive routing messages.
807-
pub trait RoutingMessageHandler : Send + Sync {
807+
///
808+
/// # Implementor DoS Warnings
809+
///
810+
/// For `gossip_queries` messages there are potential DoS vectors when handling
811+
/// inbound queries. Implementors using an on-disk network graph should be aware of
812+
/// repeated disk I/O for queries accessing different parts of the network graph.
813+
pub trait RoutingMessageHandler : Send + Sync + events::MessageSendEventsProvider {
808814
/// Handle an incoming node_announcement message, returning true if it should be forwarded on,
809815
/// false or returning an Err otherwise.
810816
fn handle_node_announcement(&self, msg: &NodeAnnouncement) -> Result<bool, LightningError>;
@@ -825,8 +831,25 @@ pub trait RoutingMessageHandler : Send + Sync {
825831
/// immediately higher (as defined by <PublicKey as Ord>::cmp) than starting_point.
826832
/// If None is provided for starting_point, we start at the first node.
827833
fn get_next_node_announcements(&self, starting_point: Option<&PublicKey>, batch_amount: u8) -> Vec<NodeAnnouncement>;
828-
/// Returns whether a full sync should be requested from a peer.
829-
fn should_request_full_sync(&self, node_id: &PublicKey) -> bool;
834+
/// Called when a connection is established with a peer. This can be used to
835+
/// perform routing table synchronization using a strategy defined by the
836+
/// implementor.
837+
fn sync_routing_table(&self, their_node_id: &PublicKey, init: &Init);
838+
/// Handles the reply of a query we initiated to learn about channels
839+
/// for a given range of blocks. We can expect to receive one or more
840+
/// replies to a single query.
841+
fn handle_reply_channel_range(&self, their_node_id: &PublicKey, msg: ReplyChannelRange) -> Result<(), LightningError>;
842+
/// Handles the reply of a query we initiated asking for routing gossip
843+
/// messages for a list of channels. We should receive this message when
844+
/// a node has completed its best effort to send us the pertaining routing
845+
/// gossip messages.
846+
fn handle_reply_short_channel_ids_end(&self, their_node_id: &PublicKey, msg: ReplyShortChannelIdsEnd) -> Result<(), LightningError>;
847+
/// Handles when a peer asks us to send a list of short_channel_ids
848+
/// for the requested range of blocks.
849+
fn handle_query_channel_range(&self, their_node_id: &PublicKey, msg: QueryChannelRange) -> Result<(), LightningError>;
850+
/// Handles when a peer asks us to send routing gossip messages for a
851+
/// list of short_channel_ids.
852+
fn handle_query_short_channel_ids(&self, their_node_id: &PublicKey, msg: QueryShortChannelIds) -> Result<(), LightningError>;
830853
}
831854

832855
mod fuzzy_internal_msgs {

lightning/src/ln/peer_handler.rs

Lines changed: 32 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -584,11 +584,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
584584

585585
peer.their_node_id = Some(their_node_id);
586586
insert_node_id!();
587-
let mut features = InitFeatures::known();
588-
if !self.message_handler.route_handler.should_request_full_sync(&peer.their_node_id.unwrap()) {
589-
features.clear_initial_routing_sync();
590-
}
591-
587+
let features = InitFeatures::known();
592588
let resp = msgs::Init { features };
593589
self.enqueue_message(&mut peers.peers_needing_send, peer, peer_descriptor.clone(), &resp);
594590
},
@@ -694,10 +690,11 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
694690
}
695691

696692
log_info!(
697-
self.logger, "Received peer Init message: data_loss_protect: {}, initial_routing_sync: {}, upfront_shutdown_script: {}, static_remote_key: {}, unknown flags (local and global): {}",
693+
self.logger, "Received peer Init message: data_loss_protect: {}, initial_routing_sync: {}, upfront_shutdown_script: {}, gossip_queries: {}, static_remote_key: {}, unknown flags (local and global): {}",
698694
if msg.features.supports_data_loss_protect() { "supported" } else { "not supported"},
699695
if msg.features.initial_routing_sync() { "requested" } else { "not requested" },
700696
if msg.features.supports_upfront_shutdown_script() { "supported" } else { "not supported"},
697+
if msg.features.supports_gossip_queries() { "supported" } else { "not supported" },
701698
if msg.features.supports_static_remote_key() { "supported" } else { "not supported"},
702699
if msg.features.supports_unknown_bits() { "present" } else { "none" }
703700
);
@@ -712,15 +709,13 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
712709
}
713710

714711
if !peer.outbound {
715-
let mut features = InitFeatures::known();
716-
if !self.message_handler.route_handler.should_request_full_sync(&peer.their_node_id.unwrap()) {
717-
features.clear_initial_routing_sync();
718-
}
719-
712+
let features = InitFeatures::known();
720713
let resp = msgs::Init { features };
721714
self.enqueue_message(peers_needing_send, peer, peer_descriptor.clone(), &resp);
722715
}
723716

717+
self.message_handler.route_handler.sync_routing_table(&peer.their_node_id.unwrap(), &msg);
718+
724719
self.message_handler.chan_handler.peer_connected(&peer.their_node_id.unwrap(), &msg);
725720
peer.their_features = Some(msg.features);
726721
},
@@ -840,6 +835,21 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
840835
// TODO: forward msg along to all our other peers!
841836
}
842837
},
838+
wire::Message::QueryShortChannelIds(msg) => {
839+
self.message_handler.route_handler.handle_query_short_channel_ids(&peer.their_node_id.unwrap(), msg)?;
840+
},
841+
wire::Message::ReplyShortChannelIdsEnd(msg) => {
842+
self.message_handler.route_handler.handle_reply_short_channel_ids_end(&peer.their_node_id.unwrap(), msg)?;
843+
},
844+
wire::Message::QueryChannelRange(msg) => {
845+
self.message_handler.route_handler.handle_query_channel_range(&peer.their_node_id.unwrap(), msg)?;
846+
},
847+
wire::Message::ReplyChannelRange(msg) => {
848+
self.message_handler.route_handler.handle_reply_channel_range(&peer.their_node_id.unwrap(), msg)?;
849+
},
850+
wire::Message::GossipTimestampFilter(_msg) => {
851+
// TODO: handle message
852+
},
843853

844854
// Unknown messages:
845855
wire::Message::Unknown(msg_type) if msg_type.is_even() => {
@@ -864,6 +874,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
864874
// drop optional-ish messages when send buffers get full!
865875

866876
let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_msg_events();
877+
events_generated.append(&mut self.message_handler.route_handler.get_and_clear_pending_msg_events());
867878
let mut peers_lock = self.peers.lock().unwrap();
868879
let peers = &mut *peers_lock;
869880
for event in events_generated.drain(..) {
@@ -1115,6 +1126,16 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
11151126
self.do_attempt_write_data(&mut descriptor, peer);
11161127
},
11171128
}
1129+
},
1130+
MessageSendEvent::SendChannelRangeQuery { ref node_id, ref msg } => {
1131+
let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {});
1132+
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
1133+
self.do_attempt_write_data(&mut descriptor, peer);
1134+
},
1135+
MessageSendEvent::SendShortIdsQuery { ref node_id, ref msg } => {
1136+
let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {});
1137+
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
1138+
self.do_attempt_write_data(&mut descriptor, peer);
11181139
}
11191140
}
11201141
}
@@ -1302,13 +1323,6 @@ mod tests {
13021323
(fd_a.clone(), fd_b.clone())
13031324
}
13041325

1305-
fn establish_connection_and_read_events<'a>(peer_a: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestRoutingMessageHandler, &'a test_utils::TestLogger>, peer_b: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestRoutingMessageHandler, &'a test_utils::TestLogger>) -> (FileDescriptor, FileDescriptor) {
1306-
let (mut fd_a, mut fd_b) = establish_connection(peer_a, peer_b);
1307-
assert_eq!(peer_b.read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
1308-
assert_eq!(peer_a.read_event(&mut fd_a, &fd_b.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
1309-
(fd_a.clone(), fd_b.clone())
1310-
}
1311-
13121326
#[test]
13131327
fn test_disconnect_peer() {
13141328
// Simple test which builds a network of PeerManager, connects and brings them to NoiseState::Finished and
@@ -1377,41 +1391,4 @@ mod tests {
13771391
assert_eq!(cfgs[1].routing_handler.chan_upds_recvd.load(Ordering::Acquire), 100);
13781392
assert_eq!(cfgs[1].routing_handler.chan_anns_recvd.load(Ordering::Acquire), 50);
13791393
}
1380-
1381-
#[test]
1382-
fn limit_initial_routing_sync_requests() {
1383-
// Inbound peer 0 requests initial_routing_sync, but outbound peer 1 does not.
1384-
{
1385-
let cfgs = create_peermgr_cfgs(2);
1386-
cfgs[0].routing_handler.request_full_sync.store(true, Ordering::Release);
1387-
let peers = create_network(2, &cfgs);
1388-
let (fd_0_to_1, fd_1_to_0) = establish_connection_and_read_events(&peers[0], &peers[1]);
1389-
1390-
let peer_0 = peers[0].peers.lock().unwrap();
1391-
let peer_1 = peers[1].peers.lock().unwrap();
1392-
1393-
let peer_0_features = peer_1.peers.get(&fd_1_to_0).unwrap().their_features.as_ref();
1394-
let peer_1_features = peer_0.peers.get(&fd_0_to_1).unwrap().their_features.as_ref();
1395-
1396-
assert!(peer_0_features.unwrap().initial_routing_sync());
1397-
assert!(!peer_1_features.unwrap().initial_routing_sync());
1398-
}
1399-
1400-
// Outbound peer 1 requests initial_routing_sync, but inbound peer 0 does not.
1401-
{
1402-
let cfgs = create_peermgr_cfgs(2);
1403-
cfgs[1].routing_handler.request_full_sync.store(true, Ordering::Release);
1404-
let peers = create_network(2, &cfgs);
1405-
let (fd_0_to_1, fd_1_to_0) = establish_connection_and_read_events(&peers[0], &peers[1]);
1406-
1407-
let peer_0 = peers[0].peers.lock().unwrap();
1408-
let peer_1 = peers[1].peers.lock().unwrap();
1409-
1410-
let peer_0_features = peer_1.peers.get(&fd_1_to_0).unwrap().their_features.as_ref();
1411-
let peer_1_features = peer_0.peers.get(&fd_0_to_1).unwrap().their_features.as_ref();
1412-
1413-
assert!(!peer_0_features.unwrap().initial_routing_sync());
1414-
assert!(peer_1_features.unwrap().initial_routing_sync());
1415-
}
1416-
}
14171394
}

0 commit comments

Comments
 (0)