@@ -287,9 +287,6 @@ impl Peer {
287
287
288
288
struct PeerHolder < Descriptor : SocketDescriptor > {
289
289
peers : HashMap < Descriptor , Peer > ,
290
- /// Added to by do_read_event for cases where we pushed a message onto the send buffer but
291
- /// didn't call do_attempt_write_data to avoid reentrancy. Cleared in process_events()
292
- peers_needing_send : HashSet < Descriptor > ,
293
290
/// Only add to this set when noise completes:
294
291
node_id_to_descriptor : HashMap < PublicKey , Descriptor > ,
295
292
}
@@ -420,7 +417,6 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
420
417
message_handler,
421
418
peers : Mutex :: new ( PeerHolder {
422
419
peers : HashMap :: new ( ) ,
423
- peers_needing_send : HashSet :: new ( ) ,
424
420
node_id_to_descriptor : HashMap :: new ( )
425
421
} ) ,
426
422
our_node_secret,
@@ -652,14 +648,13 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
652
648
}
653
649
654
650
/// Append a message to a peer's pending outbound/write buffer, and update the map of peers needing sends accordingly.
655
- fn enqueue_message < M : Encode + Writeable > ( & self , peers_needing_send : & mut HashSet < Descriptor > , peer : & mut Peer , descriptor : Descriptor , message : & M ) {
651
+ fn enqueue_message < M : Encode + Writeable > ( & self , peer : & mut Peer , message : & M ) {
656
652
let mut buffer = VecWriter ( Vec :: new ( ) ) ;
657
653
wire:: write ( message, & mut buffer) . unwrap ( ) ; // crash if the write failed
658
654
let encoded_message = buffer. 0 ;
659
655
660
656
log_trace ! ( self . logger, "Enqueueing message of type {} to {}" , message. type_id( ) , log_pubkey!( peer. their_node_id. unwrap( ) ) ) ;
661
657
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encoded_message[ ..] ) ) ;
662
- peers_needing_send. insert ( descriptor) ;
663
658
}
664
659
665
660
fn do_read_event ( & self , peer_descriptor : & mut Descriptor , data : & [ u8 ] ) -> Result < bool , PeerHandleError > {
@@ -703,7 +698,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
703
698
} ,
704
699
msgs:: ErrorAction :: SendErrorMessage { msg } => {
705
700
log_trace!( self . logger, "Got Err handling message, sending Error message because {}" , e. err) ;
706
- self . enqueue_message( & mut peers . peers_needing_send , peer, peer_descriptor . clone ( ) , & msg) ;
701
+ self . enqueue_message( peer, & msg) ;
707
702
continue ;
708
703
} ,
709
704
}
@@ -745,7 +740,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
745
740
insert_node_id ! ( ) ;
746
741
let features = InitFeatures :: known ( ) ;
747
742
let resp = msgs:: Init { features } ;
748
- self . enqueue_message ( & mut peers . peers_needing_send , peer, peer_descriptor . clone ( ) , & resp) ;
743
+ self . enqueue_message ( peer, & resp) ;
749
744
} ,
750
745
NextNoiseStep :: ActThree => {
751
746
let their_node_id = try_potential_handleerror ! ( peer. channel_encryptor. process_act_three( & peer. pending_read_buffer[ ..] ) ) ;
@@ -755,7 +750,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
755
750
insert_node_id ! ( ) ;
756
751
let features = InitFeatures :: known ( ) ;
757
752
let resp = msgs:: Init { features } ;
758
- self . enqueue_message ( & mut peers . peers_needing_send , peer, peer_descriptor . clone ( ) , & resp) ;
753
+ self . enqueue_message ( peer, & resp) ;
759
754
} ,
760
755
NextNoiseStep :: NoiseComplete => {
761
756
if peer. pending_read_is_header {
@@ -803,7 +798,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
803
798
}
804
799
} ;
805
800
806
- match self . handle_message ( & mut peers . peers_needing_send , peer, peer_descriptor . clone ( ) , message) {
801
+ match self . handle_message ( peer, message) {
807
802
Err ( handling_error) => match handling_error {
808
803
MessageHandlingError :: PeerHandleError ( e) => { return Err ( e) } ,
809
804
MessageHandlingError :: LightningError ( e) => {
@@ -838,7 +833,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
838
833
839
834
/// Process an incoming message and return a decision (ok, lightning error, peer handling error) regarding the next action with the peer
840
835
/// Returns the message back if it needs to be broadcasted to all other peers.
841
- fn handle_message ( & self , peers_needing_send : & mut HashSet < Descriptor > , peer : & mut Peer , peer_descriptor : Descriptor , message : wire:: Message ) -> Result < Option < wire:: Message > , MessageHandlingError > {
836
+ fn handle_message ( & self , peer : & mut Peer , message : wire:: Message ) -> Result < Option < wire:: Message > , MessageHandlingError > {
842
837
log_trace ! ( self . logger, "Received message of type {} from {}" , message. type_id( ) , log_pubkey!( peer. their_node_id. unwrap( ) ) ) ;
843
838
844
839
// Need an Init as first message
@@ -873,7 +868,6 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
873
868
874
869
if msg. features . initial_routing_sync ( ) {
875
870
peer. sync_status = InitSyncTracker :: ChannelsSyncing ( 0 ) ;
876
- peers_needing_send. insert ( peer_descriptor. clone ( ) ) ;
877
871
}
878
872
if !msg. features . supports_static_remote_key ( ) {
879
873
log_debug ! ( self . logger, "Peer {} does not support static remote key, disconnecting with no_connection_possible" , log_pubkey!( peer. their_node_id. unwrap( ) ) ) ;
@@ -908,7 +902,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
908
902
wire:: Message :: Ping ( msg) => {
909
903
if msg. ponglen < 65532 {
910
904
let resp = msgs:: Pong { byteslen : msg. ponglen } ;
911
- self . enqueue_message ( peers_needing_send , peer, peer_descriptor . clone ( ) , & resp) ;
905
+ self . enqueue_message ( peer, & resp) ;
912
906
}
913
907
} ,
914
908
wire:: Message :: Pong ( _msg) => {
@@ -1030,7 +1024,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
1030
1024
wire:: Message :: ChannelAnnouncement ( ref msg) => {
1031
1025
let encoded_msg = encode_msg ! ( msg) ;
1032
1026
1033
- for ( ref descriptor , ref mut peer) in peers. peers . iter_mut ( ) {
1027
+ for ( _ , ref mut peer) in peers. peers . iter_mut ( ) {
1034
1028
if !peer. channel_encryptor . is_ready_for_encryption ( ) || peer. their_features . is_none ( ) ||
1035
1029
!peer. should_forward_channel_announcement ( msg. contents . short_channel_id ) {
1036
1030
continue
@@ -1046,13 +1040,12 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
1046
1040
continue ;
1047
1041
}
1048
1042
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encoded_msg[ ..] ) ) ;
1049
- peers. peers_needing_send . insert ( ( * descriptor) . clone ( ) ) ;
1050
1043
}
1051
1044
} ,
1052
1045
wire:: Message :: NodeAnnouncement ( ref msg) => {
1053
1046
let encoded_msg = encode_msg ! ( msg) ;
1054
1047
1055
- for ( ref descriptor , ref mut peer) in peers. peers . iter_mut ( ) {
1048
+ for ( _ , ref mut peer) in peers. peers . iter_mut ( ) {
1056
1049
if !peer. channel_encryptor . is_ready_for_encryption ( ) || peer. their_features . is_none ( ) ||
1057
1050
!peer. should_forward_node_announcement ( msg. contents . node_id ) {
1058
1051
continue
@@ -1067,13 +1060,12 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
1067
1060
continue ;
1068
1061
}
1069
1062
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encoded_msg[ ..] ) ) ;
1070
- peers. peers_needing_send . insert ( ( * descriptor) . clone ( ) ) ;
1071
1063
}
1072
1064
} ,
1073
1065
wire:: Message :: ChannelUpdate ( ref msg) => {
1074
1066
let encoded_msg = encode_msg ! ( msg) ;
1075
1067
1076
- for ( ref descriptor , ref mut peer) in peers. peers . iter_mut ( ) {
1068
+ for ( _ , ref mut peer) in peers. peers . iter_mut ( ) {
1077
1069
if !peer. channel_encryptor . is_ready_for_encryption ( ) || peer. their_features . is_none ( ) ||
1078
1070
!peer. should_forward_channel_announcement ( msg. contents . short_channel_id ) {
1079
1071
continue
@@ -1085,7 +1077,6 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
1085
1077
continue ;
1086
1078
}
1087
1079
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encoded_msg[ ..] ) ) ;
1088
- peers. peers_needing_send . insert ( ( * descriptor) . clone ( ) ) ;
1089
1080
}
1090
1081
} ,
1091
1082
_ => debug_assert ! ( false , "We shouldn't attempt to forward anything but gossip messages" ) ,
@@ -1132,17 +1123,15 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
1132
1123
log_trace ! ( self . logger, "Handling SendAcceptChannel event in peer_handler for node {} for channel {}" ,
1133
1124
log_pubkey!( node_id) ,
1134
1125
log_bytes!( msg. temporary_channel_id) ) ;
1135
- let ( mut descriptor , peer) = get_peer_for_forwarding ! ( node_id) ;
1126
+ let ( _ , peer) = get_peer_for_forwarding ! ( node_id) ;
1136
1127
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg) ) ) ;
1137
- self . do_attempt_write_data ( & mut descriptor, peer) ;
1138
1128
} ,
1139
1129
MessageSendEvent :: SendOpenChannel { ref node_id, ref msg } => {
1140
1130
log_trace ! ( self . logger, "Handling SendOpenChannel event in peer_handler for node {} for channel {}" ,
1141
1131
log_pubkey!( node_id) ,
1142
1132
log_bytes!( msg. temporary_channel_id) ) ;
1143
- let ( mut descriptor , peer) = get_peer_for_forwarding ! ( node_id) ;
1133
+ let ( _ , peer) = get_peer_for_forwarding ! ( node_id) ;
1144
1134
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg) ) ) ;
1145
- self . do_attempt_write_data ( & mut descriptor, peer) ;
1146
1135
} ,
1147
1136
MessageSendEvent :: SendFundingCreated { ref node_id, ref msg } => {
1148
1137
log_trace ! ( self . logger, "Handling SendFundingCreated event in peer_handler for node {} for channel {} (which becomes {})" ,
@@ -1151,33 +1140,29 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
1151
1140
log_funding_channel_id!( msg. funding_txid, msg. funding_output_index) ) ;
1152
1141
// TODO: If the peer is gone we should generate a DiscardFunding event
1153
1142
// indicating to the wallet that they should just throw away this funding transaction
1154
- let ( mut descriptor , peer) = get_peer_for_forwarding ! ( node_id) ;
1143
+ let ( _ , peer) = get_peer_for_forwarding ! ( node_id) ;
1155
1144
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg) ) ) ;
1156
- self . do_attempt_write_data ( & mut descriptor, peer) ;
1157
1145
} ,
1158
1146
MessageSendEvent :: SendFundingSigned { ref node_id, ref msg } => {
1159
1147
log_trace ! ( self . logger, "Handling SendFundingSigned event in peer_handler for node {} for channel {}" ,
1160
1148
log_pubkey!( node_id) ,
1161
1149
log_bytes!( msg. channel_id) ) ;
1162
- let ( mut descriptor , peer) = get_peer_for_forwarding ! ( node_id) ;
1150
+ let ( _ , peer) = get_peer_for_forwarding ! ( node_id) ;
1163
1151
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg) ) ) ;
1164
- self . do_attempt_write_data ( & mut descriptor, peer) ;
1165
1152
} ,
1166
1153
MessageSendEvent :: SendFundingLocked { ref node_id, ref msg } => {
1167
1154
log_trace ! ( self . logger, "Handling SendFundingLocked event in peer_handler for node {} for channel {}" ,
1168
1155
log_pubkey!( node_id) ,
1169
1156
log_bytes!( msg. channel_id) ) ;
1170
- let ( mut descriptor , peer) = get_peer_for_forwarding ! ( node_id) ;
1157
+ let ( _ , peer) = get_peer_for_forwarding ! ( node_id) ;
1171
1158
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg) ) ) ;
1172
- self . do_attempt_write_data ( & mut descriptor, peer) ;
1173
1159
} ,
1174
1160
MessageSendEvent :: SendAnnouncementSignatures { ref node_id, ref msg } => {
1175
1161
log_trace ! ( self . logger, "Handling SendAnnouncementSignatures event in peer_handler for node {} for channel {})" ,
1176
1162
log_pubkey!( node_id) ,
1177
1163
log_bytes!( msg. channel_id) ) ;
1178
- let ( mut descriptor , peer) = get_peer_for_forwarding ! ( node_id) ;
1164
+ let ( _ , peer) = get_peer_for_forwarding ! ( node_id) ;
1179
1165
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg) ) ) ;
1180
- self . do_attempt_write_data ( & mut descriptor, peer) ;
1181
1166
} ,
1182
1167
MessageSendEvent :: UpdateHTLCs { ref node_id, updates : msgs:: CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => {
1183
1168
log_trace ! ( self . logger, "Handling UpdateHTLCs event in peer_handler for node {} with {} adds, {} fulfills, {} fails for channel {}" ,
@@ -1186,7 +1171,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
1186
1171
update_fulfill_htlcs. len( ) ,
1187
1172
update_fail_htlcs. len( ) ,
1188
1173
log_bytes!( commitment_signed. channel_id) ) ;
1189
- let ( mut descriptor , peer) = get_peer_for_forwarding ! ( node_id) ;
1174
+ let ( _ , peer) = get_peer_for_forwarding ! ( node_id) ;
1190
1175
for msg in update_add_htlcs {
1191
1176
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg) ) ) ;
1192
1177
}
@@ -1203,39 +1188,34 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
1203
1188
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg) ) ) ;
1204
1189
}
1205
1190
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( commitment_signed) ) ) ;
1206
- self . do_attempt_write_data ( & mut descriptor, peer) ;
1207
1191
} ,
1208
1192
MessageSendEvent :: SendRevokeAndACK { ref node_id, ref msg } => {
1209
1193
log_trace ! ( self . logger, "Handling SendRevokeAndACK event in peer_handler for node {} for channel {}" ,
1210
1194
log_pubkey!( node_id) ,
1211
1195
log_bytes!( msg. channel_id) ) ;
1212
- let ( mut descriptor , peer) = get_peer_for_forwarding ! ( node_id) ;
1196
+ let ( _ , peer) = get_peer_for_forwarding ! ( node_id) ;
1213
1197
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg) ) ) ;
1214
- self . do_attempt_write_data ( & mut descriptor, peer) ;
1215
1198
} ,
1216
1199
MessageSendEvent :: SendClosingSigned { ref node_id, ref msg } => {
1217
1200
log_trace ! ( self . logger, "Handling SendClosingSigned event in peer_handler for node {} for channel {}" ,
1218
1201
log_pubkey!( node_id) ,
1219
1202
log_bytes!( msg. channel_id) ) ;
1220
- let ( mut descriptor , peer) = get_peer_for_forwarding ! ( node_id) ;
1203
+ let ( _ , peer) = get_peer_for_forwarding ! ( node_id) ;
1221
1204
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg) ) ) ;
1222
- self . do_attempt_write_data ( & mut descriptor, peer) ;
1223
1205
} ,
1224
1206
MessageSendEvent :: SendShutdown { ref node_id, ref msg } => {
1225
1207
log_trace ! ( self . logger, "Handling Shutdown event in peer_handler for node {} for channel {}" ,
1226
1208
log_pubkey!( node_id) ,
1227
1209
log_bytes!( msg. channel_id) ) ;
1228
- let ( mut descriptor , peer) = get_peer_for_forwarding ! ( node_id) ;
1210
+ let ( _ , peer) = get_peer_for_forwarding ! ( node_id) ;
1229
1211
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg) ) ) ;
1230
- self . do_attempt_write_data ( & mut descriptor, peer) ;
1231
1212
} ,
1232
1213
MessageSendEvent :: SendChannelReestablish { ref node_id, ref msg } => {
1233
1214
log_trace ! ( self . logger, "Handling SendChannelReestablish event in peer_handler for node {} for channel {}" ,
1234
1215
log_pubkey!( node_id) ,
1235
1216
log_bytes!( msg. channel_id) ) ;
1236
- let ( mut descriptor , peer) = get_peer_for_forwarding ! ( node_id) ;
1217
+ let ( _ , peer) = get_peer_for_forwarding ! ( node_id) ;
1237
1218
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg) ) ) ;
1238
- self . do_attempt_write_data ( & mut descriptor, peer) ;
1239
1219
} ,
1240
1220
MessageSendEvent :: BroadcastChannelAnnouncement { msg, update_msg } => {
1241
1221
log_trace ! ( self . logger, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}" , msg. contents. short_channel_id) ;
@@ -1263,7 +1243,6 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
1263
1243
match * action {
1264
1244
msgs:: ErrorAction :: DisconnectPeer { ref msg } => {
1265
1245
if let Some ( mut descriptor) = peers. node_id_to_descriptor . remove ( node_id) {
1266
- peers. peers_needing_send . remove ( & descriptor) ;
1267
1246
if let Some ( mut peer) = peers. peers . remove ( & descriptor) {
1268
1247
if let Some ( ref msg) = * msg {
1269
1248
log_trace ! ( self . logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}" ,
@@ -1286,21 +1265,18 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
1286
1265
log_trace ! ( self . logger, "Handling SendErrorMessage HandleError event in peer_handler for node {} with message {}" ,
1287
1266
log_pubkey!( node_id) ,
1288
1267
msg. data) ;
1289
- let ( mut descriptor , peer) = get_peer_for_forwarding ! ( node_id) ;
1268
+ let ( _ , peer) = get_peer_for_forwarding ! ( node_id) ;
1290
1269
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg) ) ) ;
1291
- self . do_attempt_write_data ( & mut descriptor, peer) ;
1292
1270
} ,
1293
1271
}
1294
1272
} ,
1295
1273
MessageSendEvent :: SendChannelRangeQuery { ref node_id, ref msg } => {
1296
- let ( mut descriptor , peer) = get_peer_for_forwarding ! ( node_id) ;
1274
+ let ( _ , peer) = get_peer_for_forwarding ! ( node_id) ;
1297
1275
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg) ) ) ;
1298
- self . do_attempt_write_data ( & mut descriptor, peer) ;
1299
1276
} ,
1300
1277
MessageSendEvent :: SendShortIdsQuery { ref node_id, ref msg } => {
1301
- let ( mut descriptor , peer) = get_peer_for_forwarding ! ( node_id) ;
1278
+ let ( _ , peer) = get_peer_for_forwarding ! ( node_id) ;
1302
1279
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg) ) ) ;
1303
- self . do_attempt_write_data ( & mut descriptor, peer) ;
1304
1280
}
1305
1281
MessageSendEvent :: SendReplyChannelRange { ref node_id, ref msg } => {
1306
1282
log_trace ! ( self . logger, "Handling SendReplyChannelRange event in peer_handler for node {} with num_scids={} first_blocknum={} number_of_blocks={}, sync_complete={}" ,
@@ -1309,18 +1285,14 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
1309
1285
msg. first_blocknum,
1310
1286
msg. number_of_blocks,
1311
1287
msg. sync_complete) ;
1312
- let ( mut descriptor , peer) = get_peer_for_forwarding ! ( node_id) ;
1288
+ let ( _ , peer) = get_peer_for_forwarding ! ( node_id) ;
1313
1289
peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encode_msg ! ( msg) ) ) ;
1314
- self . do_attempt_write_data ( & mut descriptor, peer) ;
1315
1290
}
1316
1291
}
1317
1292
}
1318
1293
1319
- for mut descriptor in peers. peers_needing_send . drain ( ) {
1320
- match peers. peers . get_mut ( & descriptor) {
1321
- Some ( peer) => self . do_attempt_write_data ( & mut descriptor, peer) ,
1322
- None => panic ! ( "Inconsistent peers set state!" ) ,
1323
- }
1294
+ for ( descriptor, ref mut peer) in peers. peers . iter_mut ( ) {
1295
+ self . do_attempt_write_data ( & mut ( * descriptor) . clone ( ) , peer) ;
1324
1296
}
1325
1297
}
1326
1298
}
@@ -1339,7 +1311,6 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
1339
1311
1340
1312
fn disconnect_event_internal ( & self , descriptor : & Descriptor , no_connection_possible : bool ) {
1341
1313
let mut peers = self . peers . lock ( ) . unwrap ( ) ;
1342
- peers. peers_needing_send . remove ( descriptor) ;
1343
1314
let peer_option = peers. peers . remove ( descriptor) ;
1344
1315
match peer_option {
1345
1316
None => panic ! ( "Descriptor for disconnect_event is not already known to PeerManager" ) ,
@@ -1367,7 +1338,6 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
1367
1338
if let Some ( mut descriptor) = peers_lock. node_id_to_descriptor . remove ( & node_id) {
1368
1339
log_trace ! ( self . logger, "Disconnecting peer with id {} due to client request" , node_id) ;
1369
1340
peers_lock. peers . remove ( & descriptor) ;
1370
- peers_lock. peers_needing_send . remove ( & descriptor) ;
1371
1341
self . message_handler . chan_handler . peer_disconnected ( & node_id, no_connection_possible) ;
1372
1342
descriptor. disconnect_socket ( ) ;
1373
1343
}
@@ -1381,14 +1351,12 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
1381
1351
let mut peers_lock = self . peers . lock ( ) . unwrap ( ) ;
1382
1352
{
1383
1353
let peers = & mut * peers_lock;
1384
- let peers_needing_send = & mut peers. peers_needing_send ;
1385
1354
let node_id_to_descriptor = & mut peers. node_id_to_descriptor ;
1386
1355
let peers = & mut peers. peers ;
1387
1356
let mut descriptors_needing_disconnect = Vec :: new ( ) ;
1388
1357
1389
1358
peers. retain ( |descriptor, peer| {
1390
1359
if peer. awaiting_pong {
1391
- peers_needing_send. remove ( descriptor) ;
1392
1360
descriptors_needing_disconnect. push ( descriptor. clone ( ) ) ;
1393
1361
match peer. their_node_id {
1394
1362
Some ( node_id) => {
0 commit comments