Skip to content

Commit 8733e70

Browse files
committed
Add a test of gossip message buffer limiting in PeerManager
This adds a simple test that the gossip message buffer in `PeerManager` is limited, including the new behavior of bypassing the limit when the broadcast comes from the `ChannelMessageHandler`.
1 parent 81ef0c6 commit 8733e70

File tree

3 files changed

+107
-17
lines changed

3 files changed

+107
-17
lines changed

lightning/src/ln/peer_handler.rs

+78-2
Original file line numberDiff line numberDiff line change
@@ -2726,20 +2726,21 @@ fn is_gossip_msg(type_id: u16) -> bool {
27262726

27272727
#[cfg(test)]
27282728
mod tests {
2729+
use super::*;
2730+
27292731
use crate::sign::{NodeSigner, Recipient};
27302732
use crate::events;
27312733
use crate::io;
27322734
use crate::ln::types::ChannelId;
27332735
use crate::ln::features::{InitFeatures, NodeFeatures};
27342736
use crate::ln::peer_channel_encryptor::PeerChannelEncryptor;
2735-
use crate::ln::peer_handler::{CustomMessageHandler, PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler, filter_addresses, ErroringMessageHandler, MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER};
27362737
use crate::ln::{msgs, wire};
27372738
use crate::ln::msgs::{Init, LightningError, SocketAddress};
27382739
use crate::util::test_utils;
27392740

27402741
use bitcoin::Network;
27412742
use bitcoin::constants::ChainHash;
2742-
use bitcoin::secp256k1::{PublicKey, SecretKey};
2743+
use bitcoin::secp256k1::{PublicKey, SecretKey, Secp256k1};
27432744

27442745
use crate::sync::{Arc, Mutex};
27452746
use core::convert::Infallible;
@@ -3197,6 +3198,8 @@ mod tests {
31973198
let cfgs = create_peermgr_cfgs(2);
31983199
cfgs[0].routing_handler.request_full_sync.store(true, Ordering::Release);
31993200
cfgs[1].routing_handler.request_full_sync.store(true, Ordering::Release);
3201+
cfgs[0].routing_handler.announcement_available_for_sync.store(true, Ordering::Release);
3202+
cfgs[1].routing_handler.announcement_available_for_sync.store(true, Ordering::Release);
32003203
let peers = create_network(2, &cfgs);
32013204

32023205
// By calling establish_connect, we trigger do_attempt_write_data between
@@ -3360,6 +3363,79 @@ mod tests {
33603363
assert_eq!(peer_b.peers.read().unwrap().len(), 0);
33613364
}
33623365

3366+
#[test]
3367+
fn test_gossip_flood_pause() {
3368+
use crate::routing::test_utils::channel_announcement;
3369+
use lightning_types::features::ChannelFeatures;
3370+
3371+
// Simple test which connects two nodes to a PeerManager and checks that if we run out of
3372+
// socket buffer space we'll stop forwarding gossip but still push our own gossip.
3373+
let cfgs = create_peermgr_cfgs(2);
3374+
let mut peers = create_network(2, &cfgs);
3375+
let (mut fd_a, mut fd_b) = establish_connection(&peers[0], &peers[1]);
3376+
3377+
macro_rules! drain_queues { () => {
3378+
loop {
3379+
peers[0].process_events();
3380+
peers[1].process_events();
3381+
3382+
let msg = fd_a.outbound_data.lock().unwrap().split_off(0);
3383+
if !msg.is_empty() {
3384+
assert_eq!(peers[1].read_event(&mut fd_b, &msg).unwrap(), false);
3385+
continue;
3386+
}
3387+
let msg = fd_b.outbound_data.lock().unwrap().split_off(0);
3388+
if !msg.is_empty() {
3389+
assert_eq!(peers[0].read_event(&mut fd_a, &msg).unwrap(), false);
3390+
continue;
3391+
}
3392+
break;
3393+
}
3394+
} }
3395+
3396+
// First, make sure all pending messages have been processed and queues drained.
3397+
drain_queues!();
3398+
3399+
let secp_ctx = Secp256k1::new();
3400+
let key = SecretKey::from_slice(&[1; 32]).unwrap();
3401+
let msg = channel_announcement(&key, &key, ChannelFeatures::empty(), 42, &secp_ctx);
3402+
let msg_ev = MessageSendEvent::BroadcastChannelAnnouncement {
3403+
msg,
3404+
update_msg: None,
3405+
};
3406+
3407+
fd_a.hang_writes.store(true, Ordering::Relaxed);
3408+
3409+
// Now push an arbitrarily large numebr of messages and check that only
3410+
// `OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP` messages end up in the queue.
3411+
for i in 0..OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP * 2 {
3412+
cfgs[0].routing_handler.pending_events.lock().unwrap().push(msg_ev.clone());
3413+
peers[0].process_events();
3414+
}
3415+
3416+
assert_eq!(peers[0].peers.read().unwrap().get(&fd_a).unwrap().lock().unwrap().gossip_broadcast_buffer.len(),
3417+
OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP);
3418+
3419+
// Check that if a broadcast message comes in from the channel handler (i.e. it is an
3420+
// announcement for our own channel), it gets queued anyway.
3421+
cfgs[0].chan_handler.pending_events.lock().unwrap().push(msg_ev);
3422+
peers[0].process_events();
3423+
assert_eq!(peers[0].peers.read().unwrap().get(&fd_a).unwrap().lock().unwrap().gossip_broadcast_buffer.len(),
3424+
OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP + 1);
3425+
3426+
// Finally, deliver all the messages and make sure we got the right count. Note that there
3427+
// was an extra message that had already moved from the broadcast queue to the encrypted
3428+
// message queue so we actually receive `OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP + 2` messages.
3429+
fd_a.hang_writes.store(false, Ordering::Relaxed);
3430+
cfgs[1].routing_handler.chan_anns_recvd.store(0, Ordering::Relaxed);
3431+
peers[0].write_buffer_space_avail(&mut fd_a).unwrap();
3432+
3433+
drain_queues!();
3434+
assert!(peers[0].peers.read().unwrap().get(&fd_a).unwrap().lock().unwrap().gossip_broadcast_buffer.is_empty());
3435+
assert_eq!(cfgs[1].routing_handler.chan_anns_recvd.load(Ordering::Relaxed),
3436+
OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP + 2);
3437+
}
3438+
33633439
#[test]
33643440
fn test_filter_addresses(){
33653441
// Tests the filter_addresses function.

lightning/src/routing/test_utils.rs

+16-8
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,10 @@ use crate::sync::{self, Arc};
2727

2828
use crate::routing::gossip::NodeId;
2929

30-
// Using the same keys for LN and BTC ids
31-
pub(crate) fn add_channel(
32-
gossip_sync: &P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>,
33-
secp_ctx: &Secp256k1<All>, node_1_privkey: &SecretKey, node_2_privkey: &SecretKey, features: ChannelFeatures, short_channel_id: u64
34-
) {
30+
pub(crate) fn channel_announcement(
31+
node_1_privkey: &SecretKey, node_2_privkey: &SecretKey, features: ChannelFeatures,
32+
short_channel_id: u64, secp_ctx: &Secp256k1<All>,
33+
) -> ChannelAnnouncement {
3534
let node_id_1 = NodeId::from_pubkey(&PublicKey::from_secret_key(&secp_ctx, node_1_privkey));
3635
let node_id_2 = NodeId::from_pubkey(&PublicKey::from_secret_key(&secp_ctx, node_2_privkey));
3736

@@ -47,13 +46,22 @@ pub(crate) fn add_channel(
4746
};
4847

4948
let msghash = hash_to_message!(&Sha256dHash::hash(&unsigned_announcement.encode()[..])[..]);
50-
let valid_announcement = ChannelAnnouncement {
49+
ChannelAnnouncement {
5150
node_signature_1: secp_ctx.sign_ecdsa(&msghash, node_1_privkey),
5251
node_signature_2: secp_ctx.sign_ecdsa(&msghash, node_2_privkey),
5352
bitcoin_signature_1: secp_ctx.sign_ecdsa(&msghash, node_1_privkey),
5453
bitcoin_signature_2: secp_ctx.sign_ecdsa(&msghash, node_2_privkey),
5554
contents: unsigned_announcement.clone(),
56-
};
55+
}
56+
}
57+
58+
// Using the same keys for LN and BTC ids
59+
pub(crate) fn add_channel(
60+
gossip_sync: &P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>,
61+
secp_ctx: &Secp256k1<All>, node_1_privkey: &SecretKey, node_2_privkey: &SecretKey, features: ChannelFeatures, short_channel_id: u64
62+
) {
63+
let valid_announcement =
64+
channel_announcement(node_1_privkey, node_2_privkey, features, short_channel_id, secp_ctx);
5765
match gossip_sync.handle_channel_announcement(&valid_announcement) {
5866
Ok(res) => assert!(res),
5967
_ => panic!()
@@ -105,7 +113,7 @@ pub(crate) fn update_channel(
105113

106114
pub(super) fn get_nodes(secp_ctx: &Secp256k1<All>) -> (SecretKey, PublicKey, Vec<SecretKey>, Vec<PublicKey>) {
107115
let privkeys: Vec<SecretKey> = (2..22).map(|i| {
108-
SecretKey::from_slice(&<Vec<u8>>::from_hex(&format!("{:02x}", i).repeat(32)).unwrap()[..]).unwrap()
116+
SecretKey::from_slice(&[i; 32]).unwrap()
109117
}).collect();
110118

111119
let pubkeys = privkeys.iter().map(|secret| PublicKey::from_secret_key(&secp_ctx, secret)).collect();

lightning/src/util/test_utils.rs

+13-7
Original file line numberDiff line numberDiff line change
@@ -1004,6 +1004,7 @@ pub struct TestRoutingMessageHandler {
10041004
pub chan_anns_recvd: AtomicUsize,
10051005
pub pending_events: Mutex<Vec<events::MessageSendEvent>>,
10061006
pub request_full_sync: AtomicBool,
1007+
pub announcement_available_for_sync: AtomicBool,
10071008
}
10081009

10091010
impl TestRoutingMessageHandler {
@@ -1013,27 +1014,32 @@ impl TestRoutingMessageHandler {
10131014
chan_anns_recvd: AtomicUsize::new(0),
10141015
pending_events: Mutex::new(vec![]),
10151016
request_full_sync: AtomicBool::new(false),
1017+
announcement_available_for_sync: AtomicBool::new(false),
10161018
}
10171019
}
10181020
}
10191021
impl msgs::RoutingMessageHandler for TestRoutingMessageHandler {
10201022
fn handle_node_announcement(&self, _msg: &msgs::NodeAnnouncement) -> Result<bool, msgs::LightningError> {
1021-
Err(msgs::LightningError { err: "".to_owned(), action: msgs::ErrorAction::IgnoreError })
1023+
Ok(true)
10221024
}
10231025
fn handle_channel_announcement(&self, _msg: &msgs::ChannelAnnouncement) -> Result<bool, msgs::LightningError> {
10241026
self.chan_anns_recvd.fetch_add(1, Ordering::AcqRel);
1025-
Err(msgs::LightningError { err: "".to_owned(), action: msgs::ErrorAction::IgnoreError })
1027+
Ok(true)
10261028
}
10271029
fn handle_channel_update(&self, _msg: &msgs::ChannelUpdate) -> Result<bool, msgs::LightningError> {
10281030
self.chan_upds_recvd.fetch_add(1, Ordering::AcqRel);
1029-
Err(msgs::LightningError { err: "".to_owned(), action: msgs::ErrorAction::IgnoreError })
1031+
Ok(true)
10301032
}
10311033
fn get_next_channel_announcement(&self, starting_point: u64) -> Option<(msgs::ChannelAnnouncement, Option<msgs::ChannelUpdate>, Option<msgs::ChannelUpdate>)> {
1032-
let chan_upd_1 = get_dummy_channel_update(starting_point);
1033-
let chan_upd_2 = get_dummy_channel_update(starting_point);
1034-
let chan_ann = get_dummy_channel_announcement(starting_point);
1034+
if self.announcement_available_for_sync.load(Ordering::Acquire) {
1035+
let chan_upd_1 = get_dummy_channel_update(starting_point);
1036+
let chan_upd_2 = get_dummy_channel_update(starting_point);
1037+
let chan_ann = get_dummy_channel_announcement(starting_point);
10351038

1036-
Some((chan_ann, Some(chan_upd_1), Some(chan_upd_2)))
1039+
Some((chan_ann, Some(chan_upd_1), Some(chan_upd_2)))
1040+
} else {
1041+
None
1042+
}
10371043
}
10381044

10391045
fn get_next_node_announcement(&self, _starting_point: Option<&NodeId>) -> Option<msgs::NodeAnnouncement> {

0 commit comments

Comments
 (0)