@@ -16,7 +16,7 @@ use util::byte_utils;
16
16
use util:: events:: { MessageSendEvent } ;
17
17
use util:: logger:: Logger ;
18
18
19
- use std:: collections:: { HashMap , hash_map, HashSet , LinkedList } ;
19
+ use std:: collections:: { HashMap , hash_map, LinkedList } ;
20
20
use std:: sync:: { Arc , Mutex } ;
21
21
use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
22
22
use std:: { cmp, error, hash, fmt} ;
@@ -117,6 +117,10 @@ struct Peer {
117
117
sync_status : InitSyncTracker ,
118
118
119
119
awaiting_pong : bool ,
120
+
121
+ /// Indicates do_read_event() pushed a message into pending_outbound_buffer but didn't call
122
+ /// do_attempt_write_data() to avoid reentrancy. Cleared in process_events().
123
+ needing_send : bool ,
120
124
}
121
125
122
126
impl Peer {
@@ -137,9 +141,6 @@ impl Peer {
137
141
138
142
struct PeerHolder < Descriptor : SocketDescriptor > {
139
143
peers : HashMap < Descriptor , Peer > ,
140
- /// Added to by do_read_event for cases where we pushed a message onto the send buffer but
141
- /// didn't call do_attempt_write_data to avoid reentrancy. Cleared in process_events()
142
- peers_needing_send : HashSet < Descriptor > ,
143
144
/// Only add to this set when noise completes:
144
145
node_id_to_descriptor : HashMap < PublicKey , Descriptor > ,
145
146
}
@@ -204,7 +205,6 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
204
205
message_handler : message_handler,
205
206
peers : Mutex :: new ( PeerHolder {
206
207
peers : HashMap :: new ( ) ,
207
- peers_needing_send : HashSet :: new ( ) ,
208
208
node_id_to_descriptor : HashMap :: new ( )
209
209
} ) ,
210
210
our_node_secret : our_node_secret,
@@ -275,6 +275,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
275
275
sync_status : InitSyncTracker :: NoSyncRequested ,
276
276
277
277
awaiting_pong : false ,
278
+ needing_send : false ,
278
279
} ) . is_some ( ) {
279
280
panic ! ( "PeerManager driver duplicated descriptors!" ) ;
280
281
} ;
@@ -312,6 +313,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
312
313
sync_status : InitSyncTracker :: NoSyncRequested ,
313
314
314
315
awaiting_pong : false ,
316
+ needing_send : false ,
315
317
} ) . is_some ( ) {
316
318
panic ! ( "PeerManager driver duplicated descriptors!" ) ;
317
319
} ;
@@ -461,7 +463,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
461
463
{
462
464
log_trace!( self , "Encoding and sending message of type {} to {}" , $msg_code, log_pubkey!( peer. their_node_id. unwrap( ) ) ) ;
463
465
peer. pending_outbound_buffer. push_back( peer. channel_encryptor. encrypt_message( & encode_msg!( $msg, $msg_code) [ ..] ) ) ;
464
- peers . peers_needing_send . insert ( peer_descriptor . clone ( ) ) ;
466
+ peer . needing_send = true ;
465
467
}
466
468
}
467
469
}
@@ -620,7 +622,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
620
622
621
623
if msg. features . initial_routing_sync ( ) {
622
624
peer. sync_status = InitSyncTracker :: ChannelsSyncing ( 0 ) ;
623
- peers . peers_needing_send . insert ( peer_descriptor . clone ( ) ) ;
625
+ peer . needing_send = true ;
624
626
}
625
627
peer. their_features = Some ( msg. features ) ;
626
628
@@ -1005,7 +1007,6 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
1005
1007
match * action {
1006
1008
msgs:: ErrorAction :: DisconnectPeer { ref msg } => {
1007
1009
if let Some ( mut descriptor) = peers. node_id_to_descriptor . remove ( node_id) {
1008
- peers. peers_needing_send . remove ( & descriptor) ;
1009
1010
if let Some ( mut peer) = peers. peers . remove ( & descriptor) {
1010
1011
if let Some ( ref msg) = * msg {
1011
1012
log_trace ! ( self , "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}" ,
@@ -1039,11 +1040,10 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
1039
1040
}
1040
1041
}
1041
1042
1042
- for mut descriptor in peers. peers_needing_send . drain ( ) {
1043
- match peers. peers . get_mut ( & descriptor) {
1044
- Some ( peer) => self . do_attempt_write_data ( & mut descriptor, peer) ,
1045
- None => panic ! ( "Inconsistent peers set state!" ) ,
1046
- }
1043
+ let peers_needing_send = peers. peers . iter_mut ( ) . filter ( |( _, peer) | peer. needing_send ) ;
1044
+ for ( descriptor, peer) in peers_needing_send {
1045
+ peer. needing_send = false ;
1046
+ self . do_attempt_write_data ( & mut descriptor. clone ( ) , peer)
1047
1047
}
1048
1048
}
1049
1049
}
@@ -1060,9 +1060,7 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
1060
1060
1061
1061
fn disconnect_event_internal ( & self , descriptor : & Descriptor , no_connection_possible : bool ) {
1062
1062
let mut peers = self . peers . lock ( ) . unwrap ( ) ;
1063
- peers. peers_needing_send . remove ( descriptor) ;
1064
- let peer_option = peers. peers . remove ( descriptor) ;
1065
- match peer_option {
1063
+ match peers. peers . remove ( descriptor) {
1066
1064
None => panic ! ( "Descriptor for disconnect_event is not already known to PeerManager" ) ,
1067
1065
Some ( peer) => {
1068
1066
match peer. their_node_id {
@@ -1084,13 +1082,11 @@ impl<Descriptor: SocketDescriptor> PeerManager<Descriptor> {
1084
1082
let mut peers_lock = self . peers . lock ( ) . unwrap ( ) ;
1085
1083
{
1086
1084
let peers = & mut * peers_lock;
1087
- let peers_needing_send = & mut peers. peers_needing_send ;
1088
1085
let node_id_to_descriptor = & mut peers. node_id_to_descriptor ;
1089
1086
let peers = & mut peers. peers ;
1090
1087
1091
1088
peers. retain ( |descriptor, peer| {
1092
1089
if peer. awaiting_pong == true {
1093
- peers_needing_send. remove ( descriptor) ;
1094
1090
match peer. their_node_id {
1095
1091
Some ( node_id) => {
1096
1092
node_id_to_descriptor. remove ( & node_id) ;
0 commit comments