@@ -151,13 +151,20 @@ impl Peer {
151151 }
152152}
153153
154+ enum AnnouncementMsg {
155+ ChanUpdate ( msgs:: ChannelUpdate ) ,
156+ ChanAnnounce ( msgs:: ChannelAnnouncement ) ,
157+ NodeAnnounce ( msgs:: NodeAnnouncement ) ,
158+ }
159+
154160struct PeerHolder < Descriptor : SocketDescriptor > {
155161 peers : HashMap < Descriptor , Peer > ,
156162 /// Added to by do_read_event for cases where we pushed a message onto the send buffer but
157163 /// didn't call do_attempt_write_data to avoid reentrancy. Cleared in process_events()
158164 peers_needing_send : HashSet < Descriptor > ,
159165 /// Only add to this set when noise completes:
160166 node_id_to_descriptor : HashMap < PublicKey , Descriptor > ,
167+ pending_broadcasts : Vec < ( PublicKey , AnnouncementMsg ) > ,
161168}
162169
163170#[ cfg( not( any( target_pointer_width = "32" , target_pointer_width = "64" ) ) ) ]
@@ -226,7 +233,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, L: Deref> PeerManager<Descriptor,
226233 peers : Mutex :: new ( PeerHolder {
227234 peers : HashMap :: new ( ) ,
228235 peers_needing_send : HashSet :: new ( ) ,
229- node_id_to_descriptor : HashMap :: new ( )
236+ node_id_to_descriptor : HashMap :: new ( ) ,
237+ pending_broadcasts : Vec :: new ( ) ,
230238 } ) ,
231239 our_node_secret,
232240 ephemeral_key_midstate,
@@ -750,21 +758,21 @@ impl<Descriptor: SocketDescriptor, CM: Deref, L: Deref> PeerManager<Descriptor,
750758 let should_forward = try_potential_handleerror ! ( self . message_handler. route_handler. handle_channel_announcement( & msg) ) ;
751759
752760 if should_forward {
753- // TODO: forward msg along to all our other peers!
761+ peers . pending_broadcasts . push ( ( peer . their_node_id . unwrap ( ) . clone ( ) , AnnouncementMsg :: ChanAnnounce ( msg) ) ) ;
754762 }
755763 } ,
756764 wire:: Message :: NodeAnnouncement ( msg) => {
757765 let should_forward = try_potential_handleerror ! ( self . message_handler. route_handler. handle_node_announcement( & msg) ) ;
758766
759767 if should_forward {
760- // TODO: forward msg along to all our other peers!
768+ peers . pending_broadcasts . push ( ( peer . their_node_id . unwrap ( ) . clone ( ) , AnnouncementMsg :: NodeAnnounce ( msg) ) ) ;
761769 }
762770 } ,
763771 wire:: Message :: ChannelUpdate ( msg) => {
764772 let should_forward = try_potential_handleerror ! ( self . message_handler. route_handler. handle_channel_update( & msg) ) ;
765773
766774 if should_forward {
767- // TODO: forward msg along to all our other peers!
775+ peers . pending_broadcasts . push ( ( peer . their_node_id . unwrap ( ) . clone ( ) , AnnouncementMsg :: ChanUpdate ( msg) ) ) ;
768776 }
769777 } ,
770778
@@ -808,6 +816,54 @@ impl<Descriptor: SocketDescriptor, CM: Deref, L: Deref> PeerManager<Descriptor,
808816 let mut events_generated = self . message_handler . chan_handler . get_and_clear_pending_msg_events ( ) ;
809817 let mut peers_lock = self . peers . lock ( ) . unwrap ( ) ;
810818 let peers = & mut * peers_lock;
819+
820+ macro_rules! broadcast_msgs {
821+ ( { $( $except_check: stmt) , * } , { $( $encoded_msg: expr) , * } ) => { {
822+ for ( ref descriptor, ref mut peer) in peers. peers. iter_mut( ) {
823+ if !peer. channel_encryptor. is_ready_for_encryption( ) || peer. their_features. is_none( ) {
824+ continue
825+ }
826+ match peer. their_node_id {
827+ None => continue ,
828+ Some ( their_node_id) => {
829+ $(
830+ if { $except_check } ( & peer, their_node_id) { continue }
831+ ) *
832+ }
833+ }
834+ $( peer. pending_outbound_buffer. push_back( peer. channel_encryptor. encrypt_message( & $encoded_msg) ) ; ) *
835+ self . do_attempt_write_data( & mut ( * descriptor) . clone( ) , peer) ;
836+ }
837+ } }
838+ }
839+
840+ for ( from_node_id, broadcast) in peers. pending_broadcasts . drain ( ..) {
841+ match broadcast {
842+ AnnouncementMsg :: ChanUpdate ( msg) => {
843+ let encoded_msg = encode_msg ! ( & msg) ;
844+ broadcast_msgs ! ( { |peer: & & mut Peer , _| !peer. should_forward_channel_announcement( msg. contents. short_channel_id) ,
845+ |_, their_node_id| their_node_id == from_node_id } ,
846+ { encoded_msg } ) ;
847+ } ,
848+ AnnouncementMsg :: ChanAnnounce ( msg) => {
849+ let encoded_msg = encode_msg ! ( & msg) ;
850+ broadcast_msgs ! ( { |peer: & & mut Peer , _| !peer. should_forward_channel_announcement( msg. contents. short_channel_id) ,
851+ |_, their_node_id| their_node_id == msg. contents. node_id_1,
852+ |_, their_node_id| their_node_id == msg. contents. node_id_2,
853+ |_, their_node_id| their_node_id == from_node_id } ,
854+ { encoded_msg } ) ;
855+ } ,
856+ AnnouncementMsg :: NodeAnnounce ( msg) => {
857+ let encoded_msg = encode_msg ! ( & msg) ;
858+
859+ broadcast_msgs ! ( { |peer: & & mut Peer , _| !peer. should_forward_node_announcement( msg. contents. node_id) ,
860+ |_, their_node_id| their_node_id == msg. contents. node_id,
861+ |_, their_node_id| their_node_id == from_node_id } ,
862+ { encoded_msg } ) ;
863+ }
864+ }
865+ }
866+
811867 for event in events_generated. drain ( ..) {
812868 macro_rules! get_peer_for_forwarding {
813869 ( $node_id: expr, $handle_no_such_peer: block) => {
@@ -970,54 +1026,29 @@ impl<Descriptor: SocketDescriptor, CM: Deref, L: Deref> PeerManager<Descriptor,
9701026 if self . message_handler . route_handler . handle_channel_announcement ( msg) . is_ok ( ) && self . message_handler . route_handler . handle_channel_update ( update_msg) . is_ok ( ) {
9711027 let encoded_msg = encode_msg ! ( msg) ;
9721028 let encoded_update_msg = encode_msg ! ( update_msg) ;
973-
974- for ( ref descriptor, ref mut peer) in peers. peers . iter_mut ( ) {
975- if !peer. channel_encryptor . is_ready_for_encryption ( ) || peer. their_features . is_none ( ) ||
976- !peer. should_forward_channel_announcement ( msg. contents . short_channel_id ) {
977- continue
978- }
979- match peer. their_node_id {
980- None => continue ,
981- Some ( their_node_id) => {
982- if their_node_id == msg. contents . node_id_1 || their_node_id == msg. contents . node_id_2 {
983- continue
984- }
985- }
986- }
987- peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encoded_msg[ ..] ) ) ;
988- peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encoded_update_msg[ ..] ) ) ;
989- self . do_attempt_write_data ( & mut ( * descriptor) . clone ( ) , peer) ;
990- }
1029+ broadcast_msgs ! ( { |peer: & & mut Peer , _| !peer. should_forward_channel_announcement( msg. contents. short_channel_id) ,
1030+ |_, their_node_id| their_node_id == msg. contents. node_id_1,
1031+ |_, their_node_id| their_node_id == msg. contents. node_id_2 } ,
1032+ { encoded_msg, encoded_update_msg } ) ;
9911033 }
9921034 } ,
9931035 MessageSendEvent :: BroadcastNodeAnnouncement { ref msg } => {
9941036 log_trace ! ( self . logger, "Handling BroadcastNodeAnnouncement event in peer_handler" ) ;
9951037 if self . message_handler . route_handler . handle_node_announcement ( msg) . is_ok ( ) {
9961038 let encoded_msg = encode_msg ! ( msg) ;
9971039
998- for ( ref descriptor, ref mut peer) in peers. peers . iter_mut ( ) {
999- if !peer. channel_encryptor . is_ready_for_encryption ( ) || peer. their_features . is_none ( ) ||
1000- !peer. should_forward_node_announcement ( msg. contents . node_id ) {
1001- continue
1002- }
1003- peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encoded_msg[ ..] ) ) ;
1004- self . do_attempt_write_data ( & mut ( * descriptor) . clone ( ) , peer) ;
1005- }
1040+ broadcast_msgs ! ( { |peer: & & mut Peer , _| !peer. should_forward_node_announcement( msg. contents. node_id) ,
1041+ |_, their_node_id| their_node_id == msg. contents. node_id } ,
1042+ { encoded_msg } ) ;
10061043 }
10071044 } ,
10081045 MessageSendEvent :: BroadcastChannelUpdate { ref msg } => {
10091046 log_trace ! ( self . logger, "Handling BroadcastChannelUpdate event in peer_handler for short channel id {}" , msg. contents. short_channel_id) ;
10101047 if self . message_handler . route_handler . handle_channel_update ( msg) . is_ok ( ) {
10111048 let encoded_msg = encode_msg ! ( msg) ;
10121049
1013- for ( ref descriptor, ref mut peer) in peers. peers . iter_mut ( ) {
1014- if !peer. channel_encryptor . is_ready_for_encryption ( ) || peer. their_features . is_none ( ) ||
1015- !peer. should_forward_channel_announcement ( msg. contents . short_channel_id ) {
1016- continue
1017- }
1018- peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encoded_msg[ ..] ) ) ;
1019- self . do_attempt_write_data ( & mut ( * descriptor) . clone ( ) , peer) ;
1020- }
1050+ broadcast_msgs ! ( { |peer: & & mut Peer , _| !peer. should_forward_channel_announcement( msg. contents. short_channel_id) } ,
1051+ { encoded_msg } ) ;
10211052 }
10221053 } ,
10231054 MessageSendEvent :: PaymentFailureNetworkUpdate { ref update } => {
0 commit comments