Skip to content

Commit ed4a39f

Browse files
committed
Give peers which are sending us messages longer to respond to ping
See comment for rationale.
1 parent 3f9a7de commit ed4a39f

File tree

1 file changed

+100
-20
lines changed

1 file changed

+100
-20
lines changed

lightning/src/ln/peer_handler.rs

Lines changed: 100 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,28 @@ const OUTBOUND_BUFFER_LIMIT_READ_PAUSE: usize = 10;
298298
/// the peer.
299299
const OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP: usize = OUTBOUND_BUFFER_LIMIT_READ_PAUSE * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO;
300300

301+
/// If we've sent a ping, and are still awaiting a response, we may need to churn our way through
302+
/// the socket receive buffer before receiving the ping.
303+
///
304+
/// On a fairly old Arm64 board, with Linux defaults, this can take as long as 20 seconds, not
305+
/// including any network delays, outbound traffic, or the same for messages from other peers.
306+
///
307+
/// Thus, to avoid needlessly disconnecting a peer, we allow a peer to take this many timer ticks
308+
/// per connected peer to respond to a ping, as long as they send us at least one message during
309+
/// each tick, ensuring we aren't actually just disconnected.
310+
/// With a timer tick interval of five seconds, this translates to about 30 seconds per connected
311+
/// peer.
312+
///
313+
/// When we improve parallelism somewhat we should reduce this to e.g. this many timer ticks per
314+
/// two connected peers, assuming most LDK-running systems have at least two cores.
315+
const MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER: i8 = 6;
316+
317+
/// This is the minimum number of messages we expect a peer to be able to handle within one timer
318+
/// tick. Once we have sent this many messages since the last ping, we send a ping right away to
319+
/// ensures we don't just fill up our send buffer and leave the peer with too many messages to
320+
/// process before the next ping.
321+
const BUFFER_DRAIN_MSGS_PER_TICK: usize = 32;
322+
301323
struct Peer {
302324
channel_encryptor: PeerChannelEncryptor,
303325
their_node_id: Option<PublicKey>,
@@ -313,7 +335,9 @@ struct Peer {
313335

314336
sync_status: InitSyncTracker,
315337

316-
awaiting_pong: bool,
338+
msgs_sent_since_pong: usize,
339+
awaiting_pong_timer_tick_intervals: i8,
340+
received_message_since_timer_tick: bool,
317341
}
318342

319343
impl Peer {
@@ -555,7 +579,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
555579

556580
sync_status: InitSyncTracker::NoSyncRequested,
557581

558-
awaiting_pong: false,
582+
msgs_sent_since_pong: 0,
583+
awaiting_pong_timer_tick_intervals: 0,
584+
received_message_since_timer_tick: false,
559585
}).is_some() {
560586
panic!("PeerManager driver duplicated descriptors!");
561587
};
@@ -593,7 +619,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
593619

594620
sync_status: InitSyncTracker::NoSyncRequested,
595621

596-
awaiting_pong: false,
622+
msgs_sent_since_pong: 0,
623+
awaiting_pong_timer_tick_intervals: 0,
624+
received_message_since_timer_tick: false,
597625
}).is_some() {
598626
panic!("PeerManager driver duplicated descriptors!");
599627
};
@@ -602,7 +630,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
602630

603631
fn do_attempt_write_data(&self, descriptor: &mut Descriptor, peer: &mut Peer) {
604632
while !peer.awaiting_write_event {
605-
if peer.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE {
633+
if peer.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE && peer.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK {
606634
match peer.sync_status {
607635
InitSyncTracker::NoSyncRequested => {},
608636
InitSyncTracker::ChannelsSyncing(c) if c < 0xffff_ffff_ffff_ffff => {
@@ -647,6 +675,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
647675
},
648676
}
649677
}
678+
if peer.msgs_sent_since_pong >= BUFFER_DRAIN_MSGS_PER_TICK {
679+
self.maybe_send_extra_ping(peer);
680+
}
650681

651682
if {
652683
let next_buff = match peer.pending_outbound_buffer.front() {
@@ -724,6 +755,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
724755

725756
/// Append a message to a peer's pending outbound/write buffer
726757
fn enqueue_encoded_message(&self, peer: &mut Peer, encoded_message: &Vec<u8>) {
758+
peer.msgs_sent_since_pong += 1;
727759
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_message[..]));
728760
}
729761

@@ -926,6 +958,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
926958
message: wire::Message<<<CMH as core::ops::Deref>::Target as wire::CustomMessageReader>::CustomMessage>
927959
) -> Result<Option<wire::Message<<<CMH as core::ops::Deref>::Target as wire::CustomMessageReader>::CustomMessage>>, MessageHandlingError> {
928960
log_trace!(self.logger, "Received message {:?} from {}", message, log_pubkey!(peer.their_node_id.unwrap()));
961+
peer.received_message_since_timer_tick = true;
929962

930963
// Need an Init as first message
931964
if let wire::Message::Init(_) = message {
@@ -989,7 +1022,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
9891022
}
9901023
},
9911024
wire::Message::Pong(_msg) => {
992-
peer.awaiting_pong = false;
1025+
peer.awaiting_pong_timer_tick_intervals = 0;
1026+
peer.msgs_sent_since_pong = 0;
9931027
},
9941028

9951029
// Channel messages:
@@ -1110,7 +1144,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
11101144
!peer.should_forward_channel_announcement(msg.contents.short_channel_id) {
11111145
continue
11121146
}
1113-
if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP {
1147+
if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
1148+
|| peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
1149+
{
11141150
log_trace!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
11151151
continue;
11161152
}
@@ -1133,7 +1169,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
11331169
!peer.should_forward_node_announcement(msg.contents.node_id) {
11341170
continue
11351171
}
1136-
if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP {
1172+
if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
1173+
|| peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
1174+
{
11371175
log_trace!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
11381176
continue;
11391177
}
@@ -1155,7 +1193,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
11551193
!peer.should_forward_channel_announcement(msg.contents.short_channel_id) {
11561194
continue
11571195
}
1158-
if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP {
1196+
if peer.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
1197+
|| peer.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
1198+
{
11591199
log_trace!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
11601200
continue;
11611201
}
@@ -1455,6 +1495,20 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
14551495
debug_assert!(peers.node_id_to_descriptor.is_empty());
14561496
}
14571497

1498+
/// This is called when we're blocked on sending additional gossip messages until we receive a
1499+
/// pong. If we aren't waiting on a pong, we take this opportunity to send a ping (setting
1500+
/// `awaiting_pong_timer_tick_intervals` to a special flag value to indicate this).
1501+
fn maybe_send_extra_ping(&self, peer: &mut Peer) {
1502+
if peer.awaiting_pong_timer_tick_intervals == 0 {
1503+
peer.awaiting_pong_timer_tick_intervals = -1;
1504+
let ping = msgs::Ping {
1505+
ponglen: 0,
1506+
byteslen: 64,
1507+
};
1508+
self.enqueue_message(peer, &ping);
1509+
}
1510+
}
1511+
14581512
/// Send pings to each peer and disconnect those which did not respond to the last round of
14591513
/// pings.
14601514
///
@@ -1473,9 +1527,18 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
14731527
let node_id_to_descriptor = &mut peers.node_id_to_descriptor;
14741528
let peers = &mut peers.peers;
14751529
let mut descriptors_needing_disconnect = Vec::new();
1530+
let peer_count = peers.len();
14761531

14771532
peers.retain(|descriptor, peer| {
1478-
if peer.awaiting_pong {
1533+
if !peer.channel_encryptor.is_ready_for_encryption() {
1534+
// The peer needs to complete its handshake before we can exchange messages
1535+
return true;
1536+
}
1537+
1538+
if (peer.awaiting_pong_timer_tick_intervals > 0 && !peer.received_message_since_timer_tick)
1539+
|| peer.awaiting_pong_timer_tick_intervals as u64 >
1540+
MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER as u64 * peer_count as u64
1541+
{
14791542
descriptors_needing_disconnect.push(descriptor.clone());
14801543
match peer.their_node_id {
14811544
Some(node_id) => {
@@ -1492,21 +1555,26 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
14921555
return false;
14931556
}
14941557

1495-
if !peer.channel_encryptor.is_ready_for_encryption() {
1496-
// The peer needs to complete its handshake before we can exchange messages
1558+
peer.received_message_since_timer_tick = false;
1559+
if peer.awaiting_pong_timer_tick_intervals == -1 {
1560+
// Magic value set in `maybe_send_extra_ping`.
1561+
peer.awaiting_pong_timer_tick_intervals = 1;
1562+
return true;
1563+
}
1564+
1565+
if peer.awaiting_pong_timer_tick_intervals > 0 {
1566+
peer.awaiting_pong_timer_tick_intervals += 1;
14971567
return true;
14981568
}
14991569

1570+
peer.awaiting_pong_timer_tick_intervals = 1;
15001571
let ping = msgs::Ping {
15011572
ponglen: 0,
15021573
byteslen: 64,
15031574
};
15041575
self.enqueue_message(peer, &ping);
1576+
self.do_attempt_write_data(&mut (descriptor.clone()), &mut *peer);
15051577

1506-
let mut descriptor_clone = descriptor.clone();
1507-
self.do_attempt_write_data(&mut descriptor_clone, peer);
1508-
1509-
peer.awaiting_pong = true;
15101578
true
15111579
});
15121580

@@ -1665,11 +1733,23 @@ mod tests {
16651733
// than can fit into a peer's buffer).
16661734
let (mut fd_a, mut fd_b) = establish_connection(&peers[0], &peers[1]);
16671735

1668-
// Make each peer to read the messages that the other peer just wrote to them.
1669-
peers[0].process_events();
1670-
peers[1].read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap();
1671-
peers[1].process_events();
1672-
peers[0].read_event(&mut fd_a, &fd_b.outbound_data.lock().unwrap().split_off(0)).unwrap();
1736+
// Make each peer to read the messages that the other peer just wrote to them. Note that
1737+
// due to the max-messagse-before-ping limits this may take a few iterations to complete.
1738+
for _ in 0..150/super::BUFFER_DRAIN_MSGS_PER_TICK + 1 {
1739+
peers[0].process_events();
1740+
let b_read_data = fd_a.outbound_data.lock().unwrap().split_off(0);
1741+
assert!(!b_read_data.is_empty());
1742+
1743+
peers[1].read_event(&mut fd_b, &b_read_data).unwrap();
1744+
peers[1].process_events();
1745+
1746+
let a_read_data = fd_b.outbound_data.lock().unwrap().split_off(0);
1747+
assert!(!a_read_data.is_empty());
1748+
peers[0].read_event(&mut fd_a, &a_read_data).unwrap();
1749+
1750+
peers[1].process_events();
1751+
assert_eq!(fd_b.outbound_data.lock().unwrap().len(), 0, "Until B receives data, it shouldn't send more messages");
1752+
}
16731753

16741754
// Check that each peer has received the expected number of channel updates and channel
16751755
// announcements.

0 commit comments

Comments
 (0)