@@ -29,7 +29,7 @@ use util::events::{MessageSendEvent, MessageSendEventsProvider};
29
29
use util:: logger:: Logger ;
30
30
use routing:: network_graph:: NetGraphMsgHandler ;
31
31
32
- use std:: collections:: { HashMap , HashSet } ;
32
+ use std:: collections:: HashMap ;
33
33
use std:: sync:: { Arc , Mutex } ;
34
34
use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
35
35
use std:: { cmp, error, hash, fmt} ;
@@ -246,9 +246,6 @@ impl<TransportImpl: ITransport> Peer<TransportImpl> {
246
246
247
247
struct PeerHolder < Descriptor : SocketDescriptor , TransportImpl : ITransport > {
248
248
peers : HashMap < Descriptor , Peer < TransportImpl > > ,
249
- /// Added to by do_read_event for cases where we pushed a message onto the send buffer but
250
- /// didn't call do_attempt_write_data to avoid reentrancy. Cleared in process_events()
251
- peers_needing_send : HashSet < Descriptor > ,
252
249
/// Peers in this map have completed the NOISE handshake and received an Init message
253
250
node_id_to_descriptor : HashMap < PublicKey , Descriptor > ,
254
251
}
@@ -290,9 +287,6 @@ impl<Descriptor: SocketDescriptor, TransportImpl: ITransport> PeerHolder<Descrip
290
287
291
288
// Removes all associated metadata for descriptor and returns the Peer object associated with it
292
289
fn remove_peer_by_descriptor ( & mut self , descriptor : & Descriptor ) -> Peer < TransportImpl > {
293
- // may or may not be in this set depending on in-flight messages
294
- self . peers_needing_send . remove ( descriptor) ;
295
-
296
290
let peer_option = self . peers . remove ( descriptor) ;
297
291
match peer_option {
298
292
None => panic ! ( "Descriptor for disconnect_event is not already known to PeerManager" ) ,
@@ -309,6 +303,23 @@ impl<Descriptor: SocketDescriptor, TransportImpl: ITransport> PeerHolder<Descrip
309
303
}
310
304
}
311
305
}
306
+
307
+ // Returns the collection of peers that have data to send. Could be due to items in their outbound
308
+ // queue or sync messages that need to be sent out.
309
+ fn peers_needing_send < ' a > ( & ' a mut self ) -> Filter < IterMut < ' a , Descriptor , Peer < TransportImpl > > , fn ( & ( & ' a Descriptor , & ' a mut Peer < TransportImpl > ) ) -> bool > {
310
+ self . peers . iter_mut ( ) . filter ( |( _, peer) | {
311
+ let has_outbound_sync = match & peer. post_init_state {
312
+ None => false ,
313
+ Some ( post_init_state) => match & post_init_state. sync_status {
314
+ InitSyncTracker :: NoSyncRequested => false ,
315
+ InitSyncTracker :: ChannelsSyncing ( _) => true ,
316
+ InitSyncTracker :: NodesSyncing ( _) => true ,
317
+ }
318
+ } ;
319
+
320
+ has_outbound_sync || !peer. outbound_queue . is_empty ( )
321
+ } )
322
+ }
312
323
}
313
324
314
325
#[ cfg( not( any( target_pointer_width = "32" , target_pointer_width = "64" ) ) ) ]
@@ -508,7 +519,6 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
508
519
message_handler,
509
520
peers : Mutex :: new ( PeerHolder {
510
521
peers : HashMap :: new ( ) ,
511
- peers_needing_send : HashSet :: new ( ) ,
512
522
node_id_to_descriptor : HashMap :: new ( )
513
523
} ) ,
514
524
our_node_secret,
@@ -663,7 +673,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
663
673
None => panic ! ( "Descriptor for read_event is not already known to PeerManager" ) ,
664
674
Some ( peer) => peer
665
675
} ;
666
- self . do_read_event ( peer_descriptor, peer, & mut peers. peers_needing_send , & mut peers . node_id_to_descriptor , data)
676
+ self . do_read_event ( peer_descriptor, peer, & mut peers. node_id_to_descriptor , data)
667
677
} ;
668
678
669
679
match result {
@@ -675,12 +685,6 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
675
685
}
676
686
}
677
687
678
- /// Append a message to a peer's pending outbound/write buffer, and update the map of peers needing sends accordingly.
679
- fn enqueue_message < M : Encode + Writeable > ( & self , peers_needing_send : & mut HashSet < Descriptor > , message_queuer : & mut impl MessageQueuer , output_buffer : & mut impl PayloadQueuer , descriptor : & Descriptor , message : & M ) {
680
- message_queuer. enqueue_message ( message, output_buffer, & * self . logger ) ;
681
- peers_needing_send. insert ( descriptor. clone ( ) ) ;
682
- }
683
-
684
688
// Returns a valid PostInitState given a Init message
685
689
fn post_init_state_from_init_message ( & self , init_message : & msgs:: Init , their_node_id : & PublicKey ) -> Result < PostInitState , PeerHandleError > {
686
690
if init_message. features . requires_unknown_bits ( ) {
@@ -716,18 +720,18 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
716
720
}
717
721
718
722
// Add an Init message to the outbound queue
719
- fn enqueue_init_message ( & self , descriptor : & Descriptor , peer : & mut Peer < TransportImpl > , peers_needing_send : & mut HashSet < Descriptor > ) {
723
+ fn enqueue_init_message ( & self , peer : & mut Peer < TransportImpl > ) {
720
724
let mut features = InitFeatures :: known ( ) ;
721
725
if !self . message_handler . route_handler . should_request_full_sync ( & peer. transport . get_their_node_id ( ) ) {
722
726
features. clear_initial_routing_sync ( ) ;
723
727
}
724
728
725
729
let resp = msgs:: Init { features } ;
726
- self . enqueue_message ( peers_needing_send , & mut peer . transport , & mut peer. outbound_queue , descriptor , & resp ) ;
730
+ peer . transport . enqueue_message ( & resp , & mut peer. outbound_queue , & * self . logger ) ;
727
731
}
728
732
729
733
// Process an incoming Init message and set Peer and PeerManager state accordingly
730
- fn process_init_message ( & self , message : Message , descriptor : & Descriptor , peer : & mut Peer < TransportImpl > , peers_needing_send : & mut HashSet < Descriptor > , node_id_to_descriptor : & mut HashMap < PublicKey , Descriptor > ) -> Result < ( ) , PeerHandleError > {
734
+ fn process_init_message ( & self , message : Message , descriptor : & Descriptor , peer : & mut Peer < TransportImpl > , node_id_to_descriptor : & mut HashMap < PublicKey , Descriptor > ) -> Result < ( ) , PeerHandleError > {
731
735
let their_node_id = peer. transport . get_their_node_id ( ) ;
732
736
733
737
match message {
@@ -740,13 +744,10 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
740
744
741
745
let new_post_init_state = self . post_init_state_from_init_message ( init_message, & their_node_id) ?;
742
746
743
- if let InitSyncTracker :: ChannelsSyncing ( _) = new_post_init_state. sync_status {
744
- peers_needing_send. insert ( descriptor. clone ( ) ) ;
745
- }
746
-
747
747
if !peer. outbound {
748
- self . enqueue_init_message ( descriptor , peer, peers_needing_send ) ;
748
+ self . enqueue_init_message ( peer) ;
749
749
}
750
+
750
751
node_id_to_descriptor. insert ( their_node_id. clone ( ) , descriptor. clone ( ) ) ;
751
752
self . message_handler . chan_handler . peer_connected ( & their_node_id, init_message) ;
752
753
@@ -762,7 +763,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
762
763
Ok ( ( ) )
763
764
}
764
765
765
- fn do_read_event ( & self , peer_descriptor : & mut Descriptor , peer : & mut Peer < TransportImpl > , peers_needing_send : & mut HashSet < Descriptor > , node_id_to_descriptor : & mut HashMap < PublicKey , Descriptor > , data : & [ u8 ] ) -> Result < bool , PeerHandleError > {
766
+ fn do_read_event ( & self , peer_descriptor : & mut Descriptor , peer : & mut Peer < TransportImpl > , node_id_to_descriptor : & mut HashMap < PublicKey , Descriptor > , data : & [ u8 ] ) -> Result < bool , PeerHandleError > {
766
767
767
768
match peer. transport . process_input ( data, & mut peer. outbound_queue ) {
768
769
Err ( e) => {
@@ -775,13 +776,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
775
776
}
776
777
777
778
if newly_connected && peer. outbound {
778
- self . enqueue_init_message ( peer_descriptor, peer, peers_needing_send) ;
779
- }
780
-
781
- // If the transport layer placed items in the outbound queue, we need
782
- // to schedule ourselves for flush during the next process_events()
783
- if !peer. outbound_queue . is_empty ( ) {
784
- peers_needing_send. insert ( peer_descriptor. clone ( ) ) ;
779
+ self . enqueue_init_message ( peer) ;
785
780
}
786
781
}
787
782
}
@@ -790,7 +785,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
790
785
791
786
if peer. transport . is_connected ( ) && peer. post_init_state . is_none ( ) && received_messages. len ( ) > 0 {
792
787
let init_message = received_messages. remove ( 0 ) ;
793
- self . process_init_message ( init_message, peer_descriptor, peer, peers_needing_send , node_id_to_descriptor) ?;
788
+ self . process_init_message ( init_message, peer_descriptor, peer, node_id_to_descriptor) ?;
794
789
}
795
790
796
791
for message in received_messages {
@@ -811,7 +806,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
811
806
} ,
812
807
msgs:: ErrorAction :: SendErrorMessage { msg } => {
813
808
log_trace!( self . logger, "Got Err handling message, sending Error message because {}" , e. err) ;
814
- self . enqueue_message( peers_needing_send , & mut peer . transport , & mut peer. outbound_queue, peer_descriptor , & msg ) ;
809
+ peer . transport . enqueue_message( & msg , & mut peer. outbound_queue, & * self . logger ) ;
815
810
continue ;
816
811
} ,
817
812
}
@@ -820,7 +815,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
820
815
}
821
816
}
822
817
823
- if let Err ( handling_error) = self . handle_message ( message, peer_descriptor , peer, peers_needing_send ) {
818
+ if let Err ( handling_error) = self . handle_message ( message, peer) {
824
819
match handling_error {
825
820
MessageHandlingError :: PeerHandleError ( e) => { return Err ( e) } ,
826
821
MessageHandlingError :: LightningError ( e) => {
@@ -836,9 +831,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
836
831
/// Process an incoming message and return a decision (ok, lightning error, peer handling error) regarding the next action with the peer
837
832
fn handle_message ( & self ,
838
833
message : wire:: Message ,
839
- peer_descriptor : & mut Descriptor ,
840
- peer : & mut Peer < TransportImpl > ,
841
- peers_needing_send : & mut HashSet < Descriptor > ) -> Result < ( ) , MessageHandlingError > {
834
+ peer : & mut Peer < TransportImpl > ) -> Result < ( ) , MessageHandlingError > {
842
835
843
836
let their_node_id = peer. transport . get_their_node_id ( ) ;
844
837
let post_init_state = peer. post_init_state . as_mut ( ) . unwrap ( ) ;
@@ -873,7 +866,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
873
866
wire:: Message :: Ping ( msg) => {
874
867
if msg. ponglen < 65532 {
875
868
let resp = msgs:: Pong { byteslen : msg. ponglen } ;
876
- self . enqueue_message ( peers_needing_send , & mut peer . transport , & mut peer. outbound_queue , & peer_descriptor , & resp ) ;
869
+ peer . transport . enqueue_message ( & resp , & mut peer. outbound_queue , & * self . logger ) ;
877
870
}
878
871
} ,
879
872
wire:: Message :: Pong ( _msg) => {
@@ -1251,11 +1244,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
1251
1244
}
1252
1245
}
1253
1246
1254
- for mut descriptor in peers. peers_needing_send . drain ( ) {
1255
- match peers. peers . get_mut ( & descriptor) {
1256
- Some ( peer) => self . do_attempt_write_data ( & mut descriptor, & mut peer. post_init_state , & mut peer. transport , & mut peer. outbound_queue ) ,
1257
- None => panic ! ( "Inconsistent peers set state!" ) ,
1258
- }
1247
+ for ( descriptor, peer) in peers. peers_needing_send ( ) {
1248
+ self . do_attempt_write_data ( & mut descriptor. clone ( ) , & mut peer. post_init_state , & mut peer. transport , & mut peer. outbound_queue ) ;
1259
1249
}
1260
1250
}
1261
1251
}
0 commit comments