Skip to content

Commit 4f6a09c

Browse files
committed
Add a test of gossip message buffer limiting in PeerManager
This adds a simple test that the gossip message buffer in `PeerManager` is limited, including the new behavior of bypassing the limit when the broadcast comes from the `ChannelMessageHandler`.
1 parent 533d812 commit 4f6a09c

File tree

3 files changed

+109
-19
lines changed

3 files changed

+109
-19
lines changed

lightning/src/ln/peer_handler.rs

+78-2
Original file line numberDiff line numberDiff line change
@@ -2725,20 +2725,21 @@ fn is_gossip_msg(type_id: u16) -> bool {
27252725

27262726
#[cfg(test)]
27272727
mod tests {
2728+
use super::*;
2729+
27282730
use crate::sign::{NodeSigner, Recipient};
27292731
use crate::events;
27302732
use crate::io;
27312733
use crate::ln::types::ChannelId;
27322734
use crate::ln::features::{InitFeatures, NodeFeatures};
27332735
use crate::ln::peer_channel_encryptor::PeerChannelEncryptor;
2734-
use crate::ln::peer_handler::{CustomMessageHandler, PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler, filter_addresses, ErroringMessageHandler, MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER};
27352736
use crate::ln::{msgs, wire};
27362737
use crate::ln::msgs::{Init, LightningError, SocketAddress};
27372738
use crate::util::test_utils;
27382739

27392740
use bitcoin::Network;
27402741
use bitcoin::constants::ChainHash;
2741-
use bitcoin::secp256k1::{PublicKey, SecretKey};
2742+
use bitcoin::secp256k1::{PublicKey, SecretKey, Secp256k1};
27422743

27432744
use crate::sync::{Arc, Mutex};
27442745
use core::convert::Infallible;
@@ -3196,6 +3197,8 @@ mod tests {
31963197
let cfgs = create_peermgr_cfgs(2);
31973198
cfgs[0].routing_handler.request_full_sync.store(true, Ordering::Release);
31983199
cfgs[1].routing_handler.request_full_sync.store(true, Ordering::Release);
3200+
cfgs[0].routing_handler.announcement_available_for_sync.store(true, Ordering::Release);
3201+
cfgs[1].routing_handler.announcement_available_for_sync.store(true, Ordering::Release);
31993202
let peers = create_network(2, &cfgs);
32003203

32013204
// By calling establish_connect, we trigger do_attempt_write_data between
@@ -3359,6 +3362,79 @@ mod tests {
33593362
assert_eq!(peer_b.peers.read().unwrap().len(), 0);
33603363
}
33613364

3365+
#[test]
3366+
fn test_gossip_flood_pause() {
3367+
use crate::routing::test_utils::channel_announcement;
3368+
use lightning_types::features::ChannelFeatures;
3369+
3370+
// Simple test which connects two nodes to a PeerManager and checks that if we run out of
3371+
// socket buffer space we'll stop forwarding gossip but still push our own gossip.
3372+
let cfgs = create_peermgr_cfgs(2);
3373+
let mut peers = create_network(2, &cfgs);
3374+
let (mut fd_a, mut fd_b) = establish_connection(&peers[0], &peers[1]);
3375+
3376+
macro_rules! drain_queues { () => {
3377+
loop {
3378+
peers[0].process_events();
3379+
peers[1].process_events();
3380+
3381+
let msg = fd_a.outbound_data.lock().unwrap().split_off(0);
3382+
if !msg.is_empty() {
3383+
assert_eq!(peers[1].read_event(&mut fd_b, &msg).unwrap(), false);
3384+
continue;
3385+
}
3386+
let msg = fd_b.outbound_data.lock().unwrap().split_off(0);
3387+
if !msg.is_empty() {
3388+
assert_eq!(peers[0].read_event(&mut fd_a, &msg).unwrap(), false);
3389+
continue;
3390+
}
3391+
break;
3392+
}
3393+
} }
3394+
3395+
// First, make sure all pending messages have been processed and queues drained.
3396+
drain_queues!();
3397+
3398+
let secp_ctx = Secp256k1::new();
3399+
let key = SecretKey::from_slice(&[1; 32]).unwrap();
3400+
let msg = channel_announcement(&key, &key, ChannelFeatures::empty(), 42, &secp_ctx);
3401+
let msg_ev = MessageSendEvent::BroadcastChannelAnnouncement {
3402+
msg,
3403+
update_msg: None,
3404+
};
3405+
3406+
fd_a.hang_writes.store(true, Ordering::Relaxed);
3407+
3408+
// Now push an arbitrarily large number of messages and check that only
3409+
// `OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP` messages end up in the queue.
3410+
for i in 0..OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP * 2 {
3411+
cfgs[0].routing_handler.pending_events.lock().unwrap().push(msg_ev.clone());
3412+
peers[0].process_events();
3413+
}
3414+
3415+
assert_eq!(peers[0].peers.read().unwrap().get(&fd_a).unwrap().lock().unwrap().gossip_broadcast_buffer.len(),
3416+
OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP);
3417+
3418+
// Check that if a broadcast message comes in from the channel handler (i.e. it is an
3419+
// announcement for our own channel), it gets queued anyway.
3420+
cfgs[0].chan_handler.pending_events.lock().unwrap().push(msg_ev);
3421+
peers[0].process_events();
3422+
assert_eq!(peers[0].peers.read().unwrap().get(&fd_a).unwrap().lock().unwrap().gossip_broadcast_buffer.len(),
3423+
OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP + 1);
3424+
3425+
// Finally, deliver all the messages and make sure we got the right count. Note that there
3426+
// was an extra message that had already moved from the broadcast queue to the encrypted
3427+
// message queue so we actually receive `OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP + 2` messages.
3428+
fd_a.hang_writes.store(false, Ordering::Relaxed);
3429+
cfgs[1].routing_handler.chan_anns_recvd.store(0, Ordering::Relaxed);
3430+
peers[0].write_buffer_space_avail(&mut fd_a).unwrap();
3431+
3432+
drain_queues!();
3433+
assert!(peers[0].peers.read().unwrap().get(&fd_a).unwrap().lock().unwrap().gossip_broadcast_buffer.is_empty());
3434+
assert_eq!(cfgs[1].routing_handler.chan_anns_recvd.load(Ordering::Relaxed),
3435+
OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP + 2);
3436+
}
3437+
33623438
#[test]
33633439
fn test_filter_addresses(){
33643440
// Tests the filter_addresses function.

lightning/src/routing/test_utils.rs

+18-10
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,11 @@ use crate::sync::{self, Arc};
2727

2828
use crate::routing::gossip::NodeId;
2929

30-
// Using the same keys for LN and BTC ids
31-
pub(crate) fn add_channel(
32-
gossip_sync: &P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>,
33-
secp_ctx: &Secp256k1<All>, node_1_privkey: &SecretKey, node_2_privkey: &SecretKey, features: ChannelFeatures, short_channel_id: u64
34-
) {
35-
let node_1_pubkey = PublicKey::from_secret_key(&secp_ctx, node_1_privkey);
36-
let node_id_1 = NodeId::from_pubkey(&node_1_pubkey);
30+
pub(crate) fn channel_announcement(
31+
node_1_privkey: &SecretKey, node_2_privkey: &SecretKey, features: ChannelFeatures,
32+
short_channel_id: u64, secp_ctx: &Secp256k1<All>,
33+
) -> ChannelAnnouncement {
34+
let node_id_1 = NodeId::from_pubkey(&PublicKey::from_secret_key(&secp_ctx, node_1_privkey));
3735
let node_id_2 = NodeId::from_pubkey(&PublicKey::from_secret_key(&secp_ctx, node_2_privkey));
3836

3937
let unsigned_announcement = UnsignedChannelAnnouncement {
@@ -48,13 +46,23 @@ pub(crate) fn add_channel(
4846
};
4947

5048
let msghash = hash_to_message!(&Sha256dHash::hash(&unsigned_announcement.encode()[..])[..]);
51-
let valid_announcement = ChannelAnnouncement {
49+
ChannelAnnouncement {
5250
node_signature_1: secp_ctx.sign_ecdsa(&msghash, node_1_privkey),
5351
node_signature_2: secp_ctx.sign_ecdsa(&msghash, node_2_privkey),
5452
bitcoin_signature_1: secp_ctx.sign_ecdsa(&msghash, node_1_privkey),
5553
bitcoin_signature_2: secp_ctx.sign_ecdsa(&msghash, node_2_privkey),
5654
contents: unsigned_announcement.clone(),
57-
};
55+
}
56+
}
57+
58+
// Using the same keys for LN and BTC ids
59+
pub(crate) fn add_channel(
60+
gossip_sync: &P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>,
61+
secp_ctx: &Secp256k1<All>, node_1_privkey: &SecretKey, node_2_privkey: &SecretKey, features: ChannelFeatures, short_channel_id: u64
62+
) {
63+
let valid_announcement =
64+
channel_announcement(node_1_privkey, node_2_privkey, features, short_channel_id, secp_ctx);
65+
let node_1_pubkey = PublicKey::from_secret_key(&secp_ctx, node_1_privkey);
5866
match gossip_sync.handle_channel_announcement(Some(node_1_pubkey), &valid_announcement) {
5967
Ok(res) => assert!(res),
6068
_ => panic!()
@@ -108,7 +116,7 @@ pub(crate) fn update_channel(
108116

109117
pub(super) fn get_nodes(secp_ctx: &Secp256k1<All>) -> (SecretKey, PublicKey, Vec<SecretKey>, Vec<PublicKey>) {
110118
let privkeys: Vec<SecretKey> = (2..22).map(|i| {
111-
SecretKey::from_slice(&<Vec<u8>>::from_hex(&format!("{:02x}", i).repeat(32)).unwrap()[..]).unwrap()
119+
SecretKey::from_slice(&[i; 32]).unwrap()
112120
}).collect();
113121

114122
let pubkeys = privkeys.iter().map(|secret| PublicKey::from_secret_key(&secp_ctx, secret)).collect();

lightning/src/util/test_utils.rs

+13-7
Original file line numberDiff line numberDiff line change
@@ -986,6 +986,7 @@ pub struct TestRoutingMessageHandler {
986986
pub chan_anns_recvd: AtomicUsize,
987987
pub pending_events: Mutex<Vec<events::MessageSendEvent>>,
988988
pub request_full_sync: AtomicBool,
989+
pub announcement_available_for_sync: AtomicBool,
989990
}
990991

991992
impl TestRoutingMessageHandler {
@@ -995,27 +996,32 @@ impl TestRoutingMessageHandler {
995996
chan_anns_recvd: AtomicUsize::new(0),
996997
pending_events: Mutex::new(vec![]),
997998
request_full_sync: AtomicBool::new(false),
999+
announcement_available_for_sync: AtomicBool::new(false),
9981000
}
9991001
}
10001002
}
10011003
impl msgs::RoutingMessageHandler for TestRoutingMessageHandler {
10021004
fn handle_node_announcement(&self, _their_node_id: Option<PublicKey>, _msg: &msgs::NodeAnnouncement) -> Result<bool, msgs::LightningError> {
1003-
Err(msgs::LightningError { err: "".to_owned(), action: msgs::ErrorAction::IgnoreError })
1005+
Ok(true)
10041006
}
10051007
fn handle_channel_announcement(&self, _their_node_id: Option<PublicKey>, _msg: &msgs::ChannelAnnouncement) -> Result<bool, msgs::LightningError> {
10061008
self.chan_anns_recvd.fetch_add(1, Ordering::AcqRel);
1007-
Err(msgs::LightningError { err: "".to_owned(), action: msgs::ErrorAction::IgnoreError })
1009+
Ok(true)
10081010
}
10091011
fn handle_channel_update(&self, _their_node_id: Option<PublicKey>, _msg: &msgs::ChannelUpdate) -> Result<bool, msgs::LightningError> {
10101012
self.chan_upds_recvd.fetch_add(1, Ordering::AcqRel);
1011-
Err(msgs::LightningError { err: "".to_owned(), action: msgs::ErrorAction::IgnoreError })
1013+
Ok(true)
10121014
}
10131015
fn get_next_channel_announcement(&self, starting_point: u64) -> Option<(msgs::ChannelAnnouncement, Option<msgs::ChannelUpdate>, Option<msgs::ChannelUpdate>)> {
1014-
let chan_upd_1 = get_dummy_channel_update(starting_point);
1015-
let chan_upd_2 = get_dummy_channel_update(starting_point);
1016-
let chan_ann = get_dummy_channel_announcement(starting_point);
1016+
if self.announcement_available_for_sync.load(Ordering::Acquire) {
1017+
let chan_upd_1 = get_dummy_channel_update(starting_point);
1018+
let chan_upd_2 = get_dummy_channel_update(starting_point);
1019+
let chan_ann = get_dummy_channel_announcement(starting_point);
10171020

1018-
Some((chan_ann, Some(chan_upd_1), Some(chan_upd_2)))
1021+
Some((chan_ann, Some(chan_upd_1), Some(chan_upd_2)))
1022+
} else {
1023+
None
1024+
}
10191025
}
10201026

10211027
fn get_next_node_announcement(&self, _starting_point: Option<&NodeId>) -> Option<msgs::NodeAnnouncement> {

0 commit comments

Comments
 (0)