@@ -1605,7 +1605,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
1605
1605
}
1606
1606
1607
1607
for msg in msgs_to_forward. drain ( ..) {
1608
- self . forward_broadcast_msg ( & * peers, & msg, peer_node_id. as_ref ( ) . map ( |( pk, _) | pk) ) ;
1608
+ self . forward_broadcast_msg ( & * peers, & msg, peer_node_id. as_ref ( ) . map ( |( pk, _) | pk) , false ) ;
1609
1609
}
1610
1610
1611
1611
Ok ( pause_read)
@@ -1948,7 +1948,13 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
1948
1948
Ok ( should_forward)
1949
1949
}
1950
1950
1951
- fn forward_broadcast_msg ( & self , peers : & HashMap < Descriptor , Mutex < Peer > > , msg : & wire:: Message < <<CMH as Deref >:: Target as wire:: CustomMessageReader >:: CustomMessage > , except_node : Option < & PublicKey > ) {
1951
+ /// Forwards a gossip `msg` to `peers` excluding node(s) that generated the gossip message and
1952
+ /// excluding `except_node`.
1953
+ ///
1954
+ /// If the message queue for a peer is somewhat full, the message will not be forwarded to them
1955
+ /// unless `allow_large_buffer` is set, in which case the message will be treated as critical
1956
+ /// and delivered no matter the available buffer space.
1957
+ fn forward_broadcast_msg ( & self , peers : & HashMap < Descriptor , Mutex < Peer > > , msg : & wire:: Message < <<CMH as Deref >:: Target as wire:: CustomMessageReader >:: CustomMessage > , except_node : Option < & PublicKey > , allow_large_buffer : bool ) {
1952
1958
match msg {
1953
1959
wire:: Message :: ChannelAnnouncement ( ref msg) => {
1954
1960
log_gossip ! ( self . logger, "Sending message to all peers except {:?} or the announced channel's counterparties: {:?}" , except_node, msg) ;
@@ -1963,7 +1969,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
1963
1969
debug_assert ! ( peer. their_node_id. is_some( ) ) ;
1964
1970
debug_assert ! ( peer. channel_encryptor. is_ready_for_encryption( ) ) ;
1965
1971
let logger = WithContext :: from ( & self . logger , peer. their_node_id . map ( |p| p. 0 ) , None , None ) ;
1966
- if peer. buffer_full_drop_gossip_broadcast ( ) {
1972
+ if peer. buffer_full_drop_gossip_broadcast ( ) && !allow_large_buffer {
1967
1973
log_gossip ! ( logger, "Skipping broadcast message to {:?} as its outbound buffer is full" , peer. their_node_id) ;
1968
1974
continue ;
1969
1975
}
@@ -1991,7 +1997,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
1991
1997
debug_assert ! ( peer. their_node_id. is_some( ) ) ;
1992
1998
debug_assert ! ( peer. channel_encryptor. is_ready_for_encryption( ) ) ;
1993
1999
let logger = WithContext :: from ( & self . logger , peer. their_node_id . map ( |p| p. 0 ) , None , None ) ;
1994
- if peer. buffer_full_drop_gossip_broadcast ( ) {
2000
+ if peer. buffer_full_drop_gossip_broadcast ( ) && !allow_large_buffer {
1995
2001
log_gossip ! ( logger, "Skipping broadcast message to {:?} as its outbound buffer is full" , peer. their_node_id) ;
1996
2002
continue ;
1997
2003
}
@@ -2019,7 +2025,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
2019
2025
debug_assert ! ( peer. their_node_id. is_some( ) ) ;
2020
2026
debug_assert ! ( peer. channel_encryptor. is_ready_for_encryption( ) ) ;
2021
2027
let logger = WithContext :: from ( & self . logger , peer. their_node_id . map ( |p| p. 0 ) , None , None ) ;
2022
- if peer. buffer_full_drop_gossip_broadcast ( ) {
2028
+ if peer. buffer_full_drop_gossip_broadcast ( ) && !allow_large_buffer {
2023
2029
log_gossip ! ( logger, "Skipping broadcast message to {:?} as its outbound buffer is full" , peer. their_node_id) ;
2024
2030
continue ;
2025
2031
}
@@ -2101,6 +2107,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
2101
2107
}
2102
2108
}
2103
2109
}
2110
+
2111
+ // Handles a `MessageSendEvent`, using `from_chan_handler` to decide if we should
2112
+ // robustly gossip broadcast events even if a peer's message buffer is full.
2104
2113
let mut handle_event = |event, from_chan_handler| {
2105
2114
match event {
2106
2115
MessageSendEvent :: SendAcceptChannel { ref node_id, ref msg } => {
@@ -2295,31 +2304,39 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
2295
2304
MessageSendEvent :: BroadcastChannelAnnouncement { msg, update_msg } => {
2296
2305
log_debug ! ( self . logger, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}" , msg. contents. short_channel_id) ;
2297
2306
match self . message_handler . route_handler . handle_channel_announcement ( & msg) {
2298
- Ok ( _) | Err ( LightningError { action : msgs:: ErrorAction :: IgnoreDuplicateGossip , .. } ) =>
2299
- self . forward_broadcast_msg ( peers, & wire:: Message :: ChannelAnnouncement ( msg) , None ) ,
2307
+ Ok ( _) | Err ( LightningError { action : msgs:: ErrorAction :: IgnoreDuplicateGossip , .. } ) => {
2308
+ let forward = wire:: Message :: ChannelAnnouncement ( msg) ;
2309
+ self . forward_broadcast_msg ( peers, & forward, None , from_chan_handler) ;
2310
+ } ,
2300
2311
_ => { } ,
2301
2312
}
2302
2313
if let Some ( msg) = update_msg {
2303
2314
match self . message_handler . route_handler . handle_channel_update ( & msg) {
2304
- Ok ( _) | Err ( LightningError { action : msgs:: ErrorAction :: IgnoreDuplicateGossip , .. } ) =>
2305
- self . forward_broadcast_msg ( peers, & wire:: Message :: ChannelUpdate ( msg) , None ) ,
2315
+ Ok ( _) | Err ( LightningError { action : msgs:: ErrorAction :: IgnoreDuplicateGossip , .. } ) => {
2316
+ let forward = wire:: Message :: ChannelUpdate ( msg) ;
2317
+ self . forward_broadcast_msg ( peers, & forward, None , from_chan_handler) ;
2318
+ } ,
2306
2319
_ => { } ,
2307
2320
}
2308
2321
}
2309
2322
} ,
2310
2323
MessageSendEvent :: BroadcastChannelUpdate { msg } => {
2311
2324
log_debug ! ( self . logger, "Handling BroadcastChannelUpdate event in peer_handler for contents {:?}" , msg. contents) ;
2312
2325
match self . message_handler . route_handler . handle_channel_update ( & msg) {
2313
- Ok ( _) | Err ( LightningError { action : msgs:: ErrorAction :: IgnoreDuplicateGossip , .. } ) =>
2314
- self . forward_broadcast_msg ( peers, & wire:: Message :: ChannelUpdate ( msg) , None ) ,
2326
+ Ok ( _) | Err ( LightningError { action : msgs:: ErrorAction :: IgnoreDuplicateGossip , .. } ) => {
2327
+ let forward = wire:: Message :: ChannelUpdate ( msg) ;
2328
+ self . forward_broadcast_msg ( peers, & forward, None , from_chan_handler) ;
2329
+ } ,
2315
2330
_ => { } ,
2316
2331
}
2317
2332
} ,
2318
2333
MessageSendEvent :: BroadcastNodeAnnouncement { msg } => {
2319
2334
log_debug ! ( self . logger, "Handling BroadcastNodeAnnouncement event in peer_handler for node {}" , msg. contents. node_id) ;
2320
2335
match self . message_handler . route_handler . handle_node_announcement ( & msg) {
2321
- Ok ( _) | Err ( LightningError { action : msgs:: ErrorAction :: IgnoreDuplicateGossip , .. } ) =>
2322
- self . forward_broadcast_msg ( peers, & wire:: Message :: NodeAnnouncement ( msg) , None ) ,
2336
+ Ok ( _) | Err ( LightningError { action : msgs:: ErrorAction :: IgnoreDuplicateGossip , .. } ) => {
2337
+ let forward = wire:: Message :: NodeAnnouncement ( msg) ;
2338
+ self . forward_broadcast_msg ( peers, & forward, None , from_chan_handler) ;
2339
+ } ,
2323
2340
_ => { } ,
2324
2341
}
2325
2342
} ,
@@ -2690,7 +2707,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
2690
2707
2691
2708
log_debug ! ( self . logger, "Broadcasting NodeAnnouncement after passing it to our own RoutingMessageHandler." ) ;
2692
2709
let _ = self . message_handler . route_handler . handle_node_announcement ( & msg) ;
2693
- self . forward_broadcast_msg ( & * self . peers . read ( ) . unwrap ( ) , & wire:: Message :: NodeAnnouncement ( msg) , None ) ;
2710
+ self . forward_broadcast_msg ( & * self . peers . read ( ) . unwrap ( ) , & wire:: Message :: NodeAnnouncement ( msg) , None , true ) ;
2694
2711
}
2695
2712
}
2696
2713
0 commit comments