@@ -563,6 +563,9 @@ pub struct PeerManager<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: D
563
563
564
564
peer_counter : AtomicCounter ,
565
565
566
+ gossip_processing_backlogged : AtomicBool ,
567
+ gossip_processing_backlog_lifted : AtomicBool ,
568
+
566
569
node_signer : NS ,
567
570
568
571
logger : L ,
@@ -721,6 +724,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
721
724
blocked_event_processors : AtomicBool :: new ( false ) ,
722
725
ephemeral_key_midstate,
723
726
peer_counter : AtomicCounter :: new ( ) ,
727
+ gossip_processing_backlogged : AtomicBool :: new ( false ) ,
728
+ gossip_processing_backlog_lifted : AtomicBool :: new ( false ) ,
724
729
last_node_announcement_serial : AtomicU32 :: new ( current_time) ,
725
730
logger,
726
731
custom_message_handler,
@@ -847,7 +852,20 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
847
852
Ok ( ( ) )
848
853
}
849
854
850
- fn do_attempt_write_data ( & self , descriptor : & mut Descriptor , peer : & mut Peer ) {
855
+ fn peer_should_read ( & self , peer : & Peer ) -> bool {
856
+ !self . gossip_processing_backlogged . load ( Ordering :: Relaxed ) && peer. should_read ( )
857
+ }
858
+
859
+ fn update_gossip_backlogged ( & self ) {
860
+ let new_state = self . message_handler . route_handler . processing_queue_high ( ) ;
861
+ let prev_state = self . gossip_processing_backlogged . swap ( new_state, Ordering :: Relaxed ) ;
862
+ if prev_state && !new_state {
863
+ self . gossip_processing_backlog_lifted . store ( true , Ordering :: Relaxed ) ;
864
+ }
865
+ }
866
+
867
+ fn do_attempt_write_data ( & self , descriptor : & mut Descriptor , peer : & mut Peer , force_one_write : bool ) {
868
+ let mut have_written = false ;
851
869
while !peer. awaiting_write_event {
852
870
if peer. should_buffer_onion_message ( ) {
853
871
if let Some ( peer_node_id) = peer. their_node_id {
@@ -905,12 +923,22 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
905
923
}
906
924
907
925
let next_buff = match peer. pending_outbound_buffer . front ( ) {
908
- None => return ,
926
+ None => {
927
+ if force_one_write && !have_written {
928
+ let should_read = self . peer_should_read ( & peer) ;
929
+ if should_read {
930
+ let data_sent = descriptor. send_data ( & [ ] , should_read) ;
931
+ debug_assert_eq ! ( data_sent, 0 , "Can't write more than no data" ) ;
932
+ }
933
+ }
934
+ return
935
+ } ,
909
936
Some ( buff) => buff,
910
937
} ;
911
938
912
939
let pending = & next_buff[ peer. pending_outbound_buffer_first_msg_offset ..] ;
913
- let data_sent = descriptor. send_data ( pending, peer. should_read ( ) ) ;
940
+ let data_sent = descriptor. send_data ( pending, self . peer_should_read ( & peer) ) ;
941
+ have_written = true ;
914
942
peer. pending_outbound_buffer_first_msg_offset += data_sent;
915
943
if peer. pending_outbound_buffer_first_msg_offset == next_buff. len ( ) {
916
944
peer. pending_outbound_buffer_first_msg_offset = 0 ;
@@ -945,7 +973,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
945
973
Some ( peer_mutex) => {
946
974
let mut peer = peer_mutex. lock ( ) . unwrap ( ) ;
947
975
peer. awaiting_write_event = false ;
948
- self . do_attempt_write_data ( descriptor, & mut peer) ;
976
+ self . do_attempt_write_data ( descriptor, & mut peer, false ) ;
949
977
}
950
978
} ;
951
979
Ok ( ( ) )
@@ -1192,7 +1220,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
1192
1220
}
1193
1221
}
1194
1222
}
1195
- pause_read = !peer . should_read ( ) ;
1223
+ pause_read = !self . peer_should_read ( & peer ) ;
1196
1224
1197
1225
if let Some ( message) = msg_to_handle {
1198
1226
match self . handle_message ( & peer_mutex, peer_lock, message) {
@@ -1404,19 +1432,22 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
1404
1432
. map_err ( |e| -> MessageHandlingError { e. into ( ) } ) ? {
1405
1433
should_forward = Some ( wire:: Message :: ChannelAnnouncement ( msg) ) ;
1406
1434
}
1435
+ self . update_gossip_backlogged ( ) ;
1407
1436
} ,
1408
1437
wire:: Message :: NodeAnnouncement ( msg) => {
1409
1438
if self . message_handler . route_handler . handle_node_announcement ( & msg)
1410
1439
. map_err ( |e| -> MessageHandlingError { e. into ( ) } ) ? {
1411
1440
should_forward = Some ( wire:: Message :: NodeAnnouncement ( msg) ) ;
1412
1441
}
1442
+ self . update_gossip_backlogged ( ) ;
1413
1443
} ,
1414
1444
wire:: Message :: ChannelUpdate ( msg) => {
1415
1445
self . message_handler . chan_handler . handle_channel_update ( & their_node_id, & msg) ;
1416
1446
if self . message_handler . route_handler . handle_channel_update ( & msg)
1417
1447
. map_err ( |e| -> MessageHandlingError { e. into ( ) } ) ? {
1418
1448
should_forward = Some ( wire:: Message :: ChannelUpdate ( msg) ) ;
1419
1449
}
1450
+ self . update_gossip_backlogged ( ) ;
1420
1451
} ,
1421
1452
wire:: Message :: QueryShortChannelIds ( msg) => {
1422
1453
self . message_handler . route_handler . handle_query_short_channel_ids ( & their_node_id, msg) ?;
@@ -1564,6 +1595,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
1564
1595
}
1565
1596
}
1566
1597
1598
+ self . update_gossip_backlogged ( ) ;
1599
+ let flush_read_disabled = self . gossip_processing_backlog_lifted . swap ( false , Ordering :: Relaxed ) ;
1600
+
1567
1601
let mut peers_to_disconnect = HashMap :: new ( ) ;
1568
1602
let mut events_generated = self . message_handler . chan_handler . get_and_clear_pending_msg_events ( ) ;
1569
1603
events_generated. append ( & mut self . message_handler . route_handler . get_and_clear_pending_msg_events ( ) ) ;
@@ -1793,7 +1827,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
1793
1827
}
1794
1828
1795
1829
for ( descriptor, peer_mutex) in peers. iter ( ) {
1796
- self . do_attempt_write_data ( & mut ( * descriptor) . clone ( ) , & mut * peer_mutex. lock ( ) . unwrap ( ) ) ;
1830
+ self . do_attempt_write_data ( & mut ( * descriptor) . clone ( ) , & mut * peer_mutex. lock ( ) . unwrap ( ) , flush_read_disabled ) ;
1797
1831
}
1798
1832
}
1799
1833
if !peers_to_disconnect. is_empty ( ) {
@@ -1815,7 +1849,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
1815
1849
self . enqueue_message ( & mut * peer, & msg) ;
1816
1850
// This isn't guaranteed to work, but if there is enough free
1817
1851
// room in the send buffer, put the error message there...
1818
- self . do_attempt_write_data ( & mut descriptor, & mut * peer) ;
1852
+ self . do_attempt_write_data ( & mut descriptor, & mut * peer, false ) ;
1819
1853
} else {
1820
1854
log_trace ! ( self . logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with no message" , log_pubkey!( node_id) ) ;
1821
1855
}
@@ -1923,6 +1957,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
1923
1957
{
1924
1958
let peers_lock = self . peers . read ( ) . unwrap ( ) ;
1925
1959
1960
+ self . update_gossip_backlogged ( ) ;
1961
+ let flush_read_disabled = self . gossip_processing_backlog_lifted . swap ( false , Ordering :: Relaxed ) ;
1962
+
1926
1963
for ( descriptor, peer_mutex) in peers_lock. iter ( ) {
1927
1964
let mut peer = peer_mutex. lock ( ) . unwrap ( ) ;
1928
1965
if !peer. channel_encryptor . is_ready_for_encryption ( ) || peer. their_node_id . is_none ( ) {
@@ -1938,34 +1975,37 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
1938
1975
continue ;
1939
1976
}
1940
1977
1941
- if peer. awaiting_pong_timer_tick_intervals == -1 {
1942
- // Magic value set in `maybe_send_extra_ping`.
1943
- peer. awaiting_pong_timer_tick_intervals = 1 ;
1978
+ loop { // Used as a `goto` to skip writing a Ping message.
1979
+ if peer. awaiting_pong_timer_tick_intervals == -1 {
1980
+ // Magic value set in `maybe_send_extra_ping`.
1981
+ peer. awaiting_pong_timer_tick_intervals = 1 ;
1982
+ peer. received_message_since_timer_tick = false ;
1983
+ break ;
1984
+ }
1985
+
1986
+ if ( peer. awaiting_pong_timer_tick_intervals > 0 && !peer. received_message_since_timer_tick )
1987
+ || peer. awaiting_pong_timer_tick_intervals as u64 >
1988
+ MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER as u64 * peers_lock. len ( ) as u64
1989
+ {
1990
+ descriptors_needing_disconnect. push ( descriptor. clone ( ) ) ;
1991
+ break ;
1992
+ }
1944
1993
peer. received_message_since_timer_tick = false ;
1945
- continue ;
1946
- }
1947
1994
1948
- if ( peer. awaiting_pong_timer_tick_intervals > 0 && !peer. received_message_since_timer_tick )
1949
- || peer. awaiting_pong_timer_tick_intervals as u64 >
1950
- MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER as u64 * peers_lock. len ( ) as u64
1951
- {
1952
- descriptors_needing_disconnect. push ( descriptor. clone ( ) ) ;
1953
- continue ;
1954
- }
1955
- peer. received_message_since_timer_tick = false ;
1995
+ if peer. awaiting_pong_timer_tick_intervals > 0 {
1996
+ peer. awaiting_pong_timer_tick_intervals += 1 ;
1997
+ break ;
1998
+ }
1956
1999
1957
- if peer. awaiting_pong_timer_tick_intervals > 0 {
1958
- peer. awaiting_pong_timer_tick_intervals += 1 ;
1959
- continue ;
2000
+ peer. awaiting_pong_timer_tick_intervals = 1 ;
2001
+ let ping = msgs:: Ping {
2002
+ ponglen : 0 ,
2003
+ byteslen : 64 ,
2004
+ } ;
2005
+ self . enqueue_message ( & mut * peer, & ping) ;
2006
+ break ;
1960
2007
}
1961
-
1962
- peer. awaiting_pong_timer_tick_intervals = 1 ;
1963
- let ping = msgs:: Ping {
1964
- ponglen : 0 ,
1965
- byteslen : 64 ,
1966
- } ;
1967
- self . enqueue_message ( & mut * peer, & ping) ;
1968
- self . do_attempt_write_data ( & mut ( descriptor. clone ( ) ) , & mut * peer) ;
2008
+ self . do_attempt_write_data ( & mut ( descriptor. clone ( ) ) , & mut * peer, flush_read_disabled) ;
1969
2009
}
1970
2010
}
1971
2011
0 commit comments