Skip to content

Commit 018f090

Browse files
committed
Don't apply gossip backpressure to non-channel-announcing peers
When we apply the new gossip-async-check backpressure on peer connections, if a peer has never sent us a `channel_announcement` at all, we really shouldn't delay reading their messages. This does so by tracking, on a per-peer basis, whether they've sent us a channel_announcement, and resetting that state whenever we're not backlogged.
1 parent c601c38 commit 018f090

File tree

1 file changed

+30
-8
lines changed

1 file changed

+30
-8
lines changed

lightning/src/ln/peer_handler.rs

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -413,6 +413,12 @@ struct Peer {
413413
awaiting_pong_timer_tick_intervals: i8,
414414
received_message_since_timer_tick: bool,
415415
sent_gossip_timestamp_filter: bool,
416+
417+
/// Indicates we've received a `channel_announcement` since the last time we had
418+
/// [`PeerManager::gossip_processing_backlogged`] set (or, really, that we've received a
419+
/// `channel_announcement` at all - we set this unconditionally but unset it every time we
420+
/// check if we're gossip-processing-backlogged).
421+
received_channel_announce_since_backlogged: bool,
416422
}
417423

418424
impl Peer {
@@ -449,8 +455,12 @@ impl Peer {
449455

450456
/// Returns whether we should be reading bytes from this peer, based on whether its outbound
451457
/// buffer still has space and we don't need to pause reads to get some writes out.
452-
fn should_read(&self) -> bool {
453-
self.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE
458+
fn should_read(&mut self, gossip_processing_backlogged: bool) -> bool {
459+
if !gossip_processing_backlogged {
460+
self.received_channel_announce_since_backlogged = false;
461+
}
462+
self.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE &&
463+
(!gossip_processing_backlogged || !self.received_channel_announce_since_backlogged)
454464
}
455465

456466
/// Determines if we should push additional gossip background sync (aka "backfill") onto a peer's
@@ -799,6 +809,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
799809
awaiting_pong_timer_tick_intervals: 0,
800810
received_message_since_timer_tick: false,
801811
sent_gossip_timestamp_filter: false,
812+
813+
received_channel_announce_since_backlogged: false,
802814
})).is_some() {
803815
panic!("PeerManager driver duplicated descriptors!");
804816
};
@@ -846,14 +858,16 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
846858
awaiting_pong_timer_tick_intervals: 0,
847859
received_message_since_timer_tick: false,
848860
sent_gossip_timestamp_filter: false,
861+
862+
received_channel_announce_since_backlogged: false,
849863
})).is_some() {
850864
panic!("PeerManager driver duplicated descriptors!");
851865
};
852866
Ok(())
853867
}
854868

855-
fn peer_should_read(&self, peer: &Peer) -> bool {
856-
!self.gossip_processing_backlogged.load(Ordering::Relaxed) && peer.should_read()
869+
fn peer_should_read(&self, peer: &mut Peer) -> bool {
870+
peer.should_read(self.gossip_processing_backlogged.load(Ordering::Relaxed))
857871
}
858872

859873
fn update_gossip_backlogged(&self) {
@@ -922,10 +936,10 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
922936
self.maybe_send_extra_ping(peer);
923937
}
924938

939+
let should_read = self.peer_should_read(peer);
925940
let next_buff = match peer.pending_outbound_buffer.front() {
926941
None => {
927942
if force_one_write && !have_written {
928-
let should_read = self.peer_should_read(&peer);
929943
if should_read {
930944
let data_sent = descriptor.send_data(&[], should_read);
931945
debug_assert_eq!(data_sent, 0, "Can't write more than no data");
@@ -937,7 +951,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
937951
};
938952

939953
let pending = &next_buff[peer.pending_outbound_buffer_first_msg_offset..];
940-
let data_sent = descriptor.send_data(pending, self.peer_should_read(&peer));
954+
let data_sent = descriptor.send_data(pending, should_read);
941955
have_written = true;
942956
peer.pending_outbound_buffer_first_msg_offset += data_sent;
943957
if peer.pending_outbound_buffer_first_msg_offset == next_buff.len() {
@@ -1220,7 +1234,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
12201234
}
12211235
}
12221236
}
1223-
pause_read = !self.peer_should_read(&peer);
1237+
pause_read = !self.peer_should_read(peer);
12241238

12251239
if let Some(message) = msg_to_handle {
12261240
match self.handle_message(&peer_mutex, peer_lock, message) {
@@ -1306,6 +1320,10 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
13061320
return Ok(None);
13071321
}
13081322

1323+
if let wire::Message::ChannelAnnouncement(ref _msg) = message {
1324+
peer_lock.received_channel_announce_since_backlogged = true;
1325+
}
1326+
13091327
mem::drop(peer_lock);
13101328

13111329
if is_gossip_msg(message.type_id()) {
@@ -1827,7 +1845,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
18271845
}
18281846

18291847
for (descriptor, peer_mutex) in peers.iter() {
1830-
self.do_attempt_write_data(&mut (*descriptor).clone(), &mut *peer_mutex.lock().unwrap(), flush_read_disabled);
1848+
let mut peer = peer_mutex.lock().unwrap();
1849+
if flush_read_disabled { peer.received_channel_announce_since_backlogged = false; }
1850+
self.do_attempt_write_data(&mut (*descriptor).clone(), &mut *peer, flush_read_disabled);
18311851
}
18321852
}
18331853
if !peers_to_disconnect.is_empty() {
@@ -1962,6 +1982,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
19621982

19631983
for (descriptor, peer_mutex) in peers_lock.iter() {
19641984
let mut peer = peer_mutex.lock().unwrap();
1985+
if flush_read_disabled { peer.received_channel_announce_since_backlogged = false; }
1986+
19651987
if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_node_id.is_none() {
19661988
// The peer needs to complete its handshake before we can exchange messages. We
19671989
// give peers one timer tick to complete handshake, reusing

0 commit comments

Comments
 (0)