@@ -233,6 +233,15 @@ enum InitSyncTracker{
233
233
NodesSyncing ( PublicKey ) ,
234
234
}
235
235
236
+ /// When the outbound buffer has this many messages, we'll stop reading bytes from the peer until
237
+ /// we manage to send messages until we reach this limit.
238
+ /// We also use this as the target number of outbound gossip messages to keep in the write buffer,
239
+ /// refilled as we send bytes.
240
+ const OUTBOUND_BUFFER_LIMIT_READ_PAUSE : usize = 10 ;
241
+ /// When the outbound buffer has this many messages, we'll simply skip relaying gossip messages to
242
+ /// the peer.
243
+ const OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP : usize = 20 ;
244
+
236
245
struct Peer {
237
246
channel_encryptor : PeerChannelEncryptor ,
238
247
their_node_id : Option < PublicKey > ,
@@ -531,13 +540,12 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
531
540
}
532
541
}
533
542
}
534
- const MSG_BUFF_SIZE : usize = 10 ;
535
543
while !peer. awaiting_write_event {
536
- if peer. pending_outbound_buffer . len ( ) < MSG_BUFF_SIZE {
544
+ if peer. pending_outbound_buffer . len ( ) < OUTBOUND_BUFFER_LIMIT_READ_PAUSE {
537
545
match peer. sync_status {
538
546
InitSyncTracker :: NoSyncRequested => { } ,
539
547
InitSyncTracker :: ChannelsSyncing ( c) if c < 0xffff_ffff_ffff_ffff => {
540
- let steps = ( ( MSG_BUFF_SIZE - peer. pending_outbound_buffer . len ( ) + 2 ) / 3 ) as u8 ;
548
+ let steps = ( ( OUTBOUND_BUFFER_LIMIT_READ_PAUSE - peer. pending_outbound_buffer . len ( ) + 2 ) / 3 ) as u8 ;
541
549
let all_messages = self . message_handler . route_handler . get_next_channel_announcements ( c, steps) ;
542
550
for & ( ref announce, ref update_a_option, ref update_b_option) in all_messages. iter ( ) {
543
551
encode_and_send_msg ! ( announce) ;
@@ -554,7 +562,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
554
562
}
555
563
} ,
556
564
InitSyncTracker :: ChannelsSyncing ( c) if c == 0xffff_ffff_ffff_ffff => {
557
- let steps = ( MSG_BUFF_SIZE - peer. pending_outbound_buffer . len ( ) ) as u8 ;
565
+ let steps = ( OUTBOUND_BUFFER_LIMIT_READ_PAUSE - peer. pending_outbound_buffer . len ( ) ) as u8 ;
558
566
let all_messages = self . message_handler . route_handler . get_next_node_announcements ( None , steps) ;
559
567
for msg in all_messages. iter ( ) {
560
568
encode_and_send_msg ! ( msg) ;
@@ -566,7 +574,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
566
574
} ,
567
575
InitSyncTracker :: ChannelsSyncing ( _) => unreachable ! ( ) ,
568
576
InitSyncTracker :: NodesSyncing ( key) => {
569
- let steps = ( MSG_BUFF_SIZE - peer. pending_outbound_buffer . len ( ) ) as u8 ;
577
+ let steps = ( OUTBOUND_BUFFER_LIMIT_READ_PAUSE - peer. pending_outbound_buffer . len ( ) ) as u8 ;
570
578
let all_messages = self . message_handler . route_handler . get_next_node_announcements ( Some ( & key) , steps) ;
571
579
for msg in all_messages. iter ( ) {
572
580
encode_and_send_msg ! ( msg) ;
@@ -585,7 +593,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
585
593
Some ( buff) => buff,
586
594
} ;
587
595
588
- let should_be_reading = peer. pending_outbound_buffer . len ( ) < MSG_BUFF_SIZE ;
596
+ let should_be_reading = peer. pending_outbound_buffer . len ( ) < OUTBOUND_BUFFER_LIMIT_READ_PAUSE ;
589
597
let pending = & next_buff[ peer. pending_outbound_buffer_first_msg_offset ..] ;
590
598
let data_sent = descriptor. send_data ( pending, should_be_reading) ;
591
599
peer. pending_outbound_buffer_first_msg_offset += data_sent;
@@ -814,7 +822,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
814
822
}
815
823
}
816
824
817
- peer. pending_outbound_buffer . len ( ) > 10 // pause_read
825
+ peer. pending_outbound_buffer . len ( ) > OUTBOUND_BUFFER_LIMIT_READ_PAUSE // pause_read
818
826
}
819
827
} ;
820
828
@@ -1027,6 +1035,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
1027
1035
!peer. should_forward_channel_announcement ( msg. contents . short_channel_id ) {
1028
1036
continue
1029
1037
}
1038
+ if peer. pending_outbound_buffer . len ( ) > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP {
1039
+ continue ;
1040
+ }
1030
1041
if peer. their_node_id . as_ref ( ) == Some ( & msg. contents . node_id_1 ) ||
1031
1042
peer. their_node_id . as_ref ( ) == Some ( & msg. contents . node_id_2 ) {
1032
1043
continue ;
@@ -1046,6 +1057,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
1046
1057
!peer. should_forward_node_announcement ( msg. contents . node_id ) {
1047
1058
continue
1048
1059
}
1060
+ if peer. pending_outbound_buffer . len ( ) > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP {
1061
+ continue ;
1062
+ }
1049
1063
if peer. their_node_id . as_ref ( ) == Some ( & msg. contents . node_id ) {
1050
1064
continue ;
1051
1065
}
@@ -1064,6 +1078,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
1064
1078
!peer. should_forward_channel_announcement ( msg. contents . short_channel_id ) {
1065
1079
continue
1066
1080
}
1081
+ if peer. pending_outbound_buffer . len ( ) > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP {
1082
+ continue ;
1083
+ }
1067
1084
if except_node. is_some ( ) && peer. their_node_id . as_ref ( ) == except_node {
1068
1085
continue ;
1069
1086
}
0 commit comments