@@ -18,7 +18,7 @@ use util::byte_utils;
18
18
use util:: events:: { MessageSendEvent , MessageSendEventsProvider } ;
19
19
use util:: logger:: Logger ;
20
20
21
- use std:: collections:: { HashMap , hash_map, HashSet , LinkedList } ;
21
+ use std:: collections:: { HashMap , hash_map, LinkedList } ;
22
22
use std:: sync:: { Arc , Mutex } ;
23
23
use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
24
24
use std:: { cmp, error, hash, fmt} ;
@@ -120,6 +120,10 @@ struct Peer {
120
120
sync_status : InitSyncTracker ,
121
121
122
122
awaiting_pong : bool ,
123
+
124
+ /// Indicates do_read_event() pushed a message into pending_outbound_buffer but didn't call
125
+ /// do_attempt_write_data() to avoid reentrancy. Cleared in process_events().
126
+ needing_send : bool ,
123
127
}
124
128
125
129
impl Peer {
@@ -140,9 +144,6 @@ impl Peer {
140
144
141
145
struct PeerHolder < Descriptor : SocketDescriptor > {
142
146
peers : HashMap < Descriptor , Peer > ,
143
- /// Added to by do_read_event for cases where we pushed a message onto the send buffer but
144
- /// didn't call do_attempt_write_data to avoid reentrancy. Cleared in process_events()
145
- peers_needing_send : HashSet < Descriptor > ,
146
147
/// Only add to this set when noise completes:
147
148
node_id_to_descriptor : HashMap < PublicKey , Descriptor > ,
148
149
}
@@ -228,7 +229,6 @@ impl<Descriptor: SocketDescriptor, CM: Deref> PeerManager<Descriptor, CM> where
228
229
message_handler : message_handler,
229
230
peers : Mutex :: new ( PeerHolder {
230
231
peers : HashMap :: new ( ) ,
231
- peers_needing_send : HashSet :: new ( ) ,
232
232
node_id_to_descriptor : HashMap :: new ( )
233
233
} ) ,
234
234
our_node_secret : our_node_secret,
@@ -299,6 +299,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref> PeerManager<Descriptor, CM> where
299
299
sync_status : InitSyncTracker :: NoSyncRequested ,
300
300
301
301
awaiting_pong : false ,
302
+ needing_send : false ,
302
303
} ) . is_some ( ) {
303
304
panic ! ( "PeerManager driver duplicated descriptors!" ) ;
304
305
} ;
@@ -336,6 +337,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref> PeerManager<Descriptor, CM> where
336
337
sync_status : InitSyncTracker :: NoSyncRequested ,
337
338
338
339
awaiting_pong : false ,
340
+ needing_send : false ,
339
341
} ) . is_some ( ) {
340
342
panic ! ( "PeerManager driver duplicated descriptors!" ) ;
341
343
} ;
@@ -485,7 +487,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref> PeerManager<Descriptor, CM> where
485
487
{
486
488
log_trace!( self , "Encoding and sending message of type {} to {}" , $msg_code, log_pubkey!( peer. their_node_id. unwrap( ) ) ) ;
487
489
peer. pending_outbound_buffer. push_back( peer. channel_encryptor. encrypt_message( & encode_msg!( $msg, $msg_code) [ ..] ) ) ;
488
- peers . peers_needing_send . insert ( peer_descriptor . clone ( ) ) ;
490
+ peer . needing_send = true ;
489
491
}
490
492
}
491
493
}
@@ -644,7 +646,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref> PeerManager<Descriptor, CM> where
644
646
645
647
if msg. features . initial_routing_sync ( ) {
646
648
peer. sync_status = InitSyncTracker :: ChannelsSyncing ( 0 ) ;
647
- peers . peers_needing_send . insert ( peer_descriptor . clone ( ) ) ;
649
+ peer . needing_send = true ;
648
650
}
649
651
650
652
if !peer. outbound {
@@ -1029,7 +1031,6 @@ impl<Descriptor: SocketDescriptor, CM: Deref> PeerManager<Descriptor, CM> where
1029
1031
match * action {
1030
1032
msgs:: ErrorAction :: DisconnectPeer { ref msg } => {
1031
1033
if let Some ( mut descriptor) = peers. node_id_to_descriptor . remove ( node_id) {
1032
- peers. peers_needing_send . remove ( & descriptor) ;
1033
1034
if let Some ( mut peer) = peers. peers . remove ( & descriptor) {
1034
1035
if let Some ( ref msg) = * msg {
1035
1036
log_trace ! ( self , "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}" ,
@@ -1063,11 +1064,10 @@ impl<Descriptor: SocketDescriptor, CM: Deref> PeerManager<Descriptor, CM> where
1063
1064
}
1064
1065
}
1065
1066
1066
- for mut descriptor in peers. peers_needing_send . drain ( ) {
1067
- match peers. peers . get_mut ( & descriptor) {
1068
- Some ( peer) => self . do_attempt_write_data ( & mut descriptor, peer) ,
1069
- None => panic ! ( "Inconsistent peers set state!" ) ,
1070
- }
1067
+ let peers_needing_send = peers. peers . iter_mut ( ) . filter ( |( _, peer) | peer. needing_send ) ;
1068
+ for ( descriptor, peer) in peers_needing_send {
1069
+ peer. needing_send = false ;
1070
+ self . do_attempt_write_data ( & mut descriptor. clone ( ) , peer)
1071
1071
}
1072
1072
}
1073
1073
}
@@ -1084,9 +1084,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref> PeerManager<Descriptor, CM> where
1084
1084
1085
1085
fn disconnect_event_internal ( & self , descriptor : & Descriptor , no_connection_possible : bool ) {
1086
1086
let mut peers = self . peers . lock ( ) . unwrap ( ) ;
1087
- peers. peers_needing_send . remove ( descriptor) ;
1088
- let peer_option = peers. peers . remove ( descriptor) ;
1089
- match peer_option {
1087
+ match peers. peers . remove ( descriptor) {
1090
1088
None => panic ! ( "Descriptor for disconnect_event is not already known to PeerManager" ) ,
1091
1089
Some ( peer) => {
1092
1090
match peer. their_node_id {
@@ -1108,13 +1106,11 @@ impl<Descriptor: SocketDescriptor, CM: Deref> PeerManager<Descriptor, CM> where
1108
1106
let mut peers_lock = self . peers . lock ( ) . unwrap ( ) ;
1109
1107
{
1110
1108
let peers = & mut * peers_lock;
1111
- let peers_needing_send = & mut peers. peers_needing_send ;
1112
1109
let node_id_to_descriptor = & mut peers. node_id_to_descriptor ;
1113
1110
let peers = & mut peers. peers ;
1114
1111
1115
1112
peers. retain ( |descriptor, peer| {
1116
1113
if peer. awaiting_pong == true {
1117
- peers_needing_send. remove ( descriptor) ;
1118
1114
match peer. their_node_id {
1119
1115
Some ( node_id) => {
1120
1116
node_id_to_descriptor. remove ( & node_id) ;
0 commit comments