Skip to content

Commit 572e760

Browse files
committed
Use a MessageSendEvent-handling fn rather than a single lopp
Rather than building a single `Vec` of `MessageSendEvent`s to handle then iterating over them, we move the body of the loop into a lambda and run the loop twice. In some cases, this may save a single allocation, but more importantly it sets us up for the next commit, which needs to know from which handler the `MessageSendEvent` it is processing came from.
1 parent 61153f1 commit 572e760

File tree

1 file changed

+65
-56
lines changed

1 file changed

+65
-56
lines changed

lightning/src/ln/peer_handler.rs

Lines changed: 65 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -2066,64 +2066,66 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
20662066
{
20672067
let peers_lock = self.peers.read().unwrap();
20682068

2069-
let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_msg_events();
2070-
events_generated.append(&mut self.message_handler.route_handler.get_and_clear_pending_msg_events());
2069+
let chan_events = self.message_handler.chan_handler.get_and_clear_pending_msg_events();
2070+
let route_events = self.message_handler.route_handler.get_and_clear_pending_msg_events();
20712071

20722072
let peers = &*peers_lock;
20732073
macro_rules! get_peer_for_forwarding {
20742074
($node_id: expr) => {
20752075
{
20762076
if peers_to_disconnect.get($node_id).is_some() {
20772077
// If we've "disconnected" this peer, do not send to it.
2078-
continue;
2079-
}
2080-
let descriptor_opt = self.node_id_to_descriptor.lock().unwrap().get($node_id).cloned();
2081-
match descriptor_opt {
2082-
Some(descriptor) => match peers.get(&descriptor) {
2083-
Some(peer_mutex) => {
2084-
let peer_lock = peer_mutex.lock().unwrap();
2085-
if !peer_lock.handshake_complete() {
2086-
continue;
2078+
None
2079+
} else {
2080+
let descriptor_opt = self.node_id_to_descriptor.lock().unwrap().get($node_id).cloned();
2081+
match descriptor_opt {
2082+
Some(descriptor) => match peers.get(&descriptor) {
2083+
Some(peer_mutex) => {
2084+
let peer_lock = peer_mutex.lock().unwrap();
2085+
if !peer_lock.handshake_complete() {
2086+
None
2087+
} else {
2088+
Some(peer_lock)
2089+
}
2090+
},
2091+
None => {
2092+
debug_assert!(false, "Inconsistent peers set state!");
2093+
None
20872094
}
2088-
peer_lock
20892095
},
20902096
None => {
2091-
debug_assert!(false, "Inconsistent peers set state!");
2092-
continue;
2093-
}
2094-
},
2095-
None => {
2096-
continue;
2097-
},
2097+
None
2098+
},
2099+
}
20982100
}
20992101
}
21002102
}
21012103
}
2102-
for event in events_generated.drain(..) {
2104+
let mut handle_event = |event, from_chan_handler| {
21032105
match event {
21042106
MessageSendEvent::SendAcceptChannel { ref node_id, ref msg } => {
21052107
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id), None), "Handling SendAcceptChannel event in peer_handler for node {} for channel {}",
21062108
log_pubkey!(node_id),
21072109
&msg.common_fields.temporary_channel_id);
2108-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
2110+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
21092111
},
21102112
MessageSendEvent::SendAcceptChannelV2 { ref node_id, ref msg } => {
21112113
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id), None), "Handling SendAcceptChannelV2 event in peer_handler for node {} for channel {}",
21122114
log_pubkey!(node_id),
21132115
&msg.common_fields.temporary_channel_id);
2114-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
2116+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
21152117
},
21162118
MessageSendEvent::SendOpenChannel { ref node_id, ref msg } => {
21172119
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id), None), "Handling SendOpenChannel event in peer_handler for node {} for channel {}",
21182120
log_pubkey!(node_id),
21192121
&msg.common_fields.temporary_channel_id);
2120-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
2122+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
21212123
},
21222124
MessageSendEvent::SendOpenChannelV2 { ref node_id, ref msg } => {
21232125
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id), None), "Handling SendOpenChannelV2 event in peer_handler for node {} for channel {}",
21242126
log_pubkey!(node_id),
21252127
&msg.common_fields.temporary_channel_id);
2126-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
2128+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
21272129
},
21282130
MessageSendEvent::SendFundingCreated { ref node_id, ref msg } => {
21292131
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.temporary_channel_id), None), "Handling SendFundingCreated event in peer_handler for node {} for channel {} (which becomes {})",
@@ -2132,107 +2134,107 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
21322134
ChannelId::v1_from_funding_txid(msg.funding_txid.as_byte_array(), msg.funding_output_index));
21332135
// TODO: If the peer is gone we should generate a DiscardFunding event
21342136
// indicating to the wallet that they should just throw away this funding transaction
2135-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
2137+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
21362138
},
21372139
MessageSendEvent::SendFundingSigned { ref node_id, ref msg } => {
21382140
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendFundingSigned event in peer_handler for node {} for channel {}",
21392141
log_pubkey!(node_id),
21402142
&msg.channel_id);
2141-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
2143+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
21422144
},
21432145
MessageSendEvent::SendChannelReady { ref node_id, ref msg } => {
21442146
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendChannelReady event in peer_handler for node {} for channel {}",
21452147
log_pubkey!(node_id),
21462148
&msg.channel_id);
2147-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
2149+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
21482150
},
21492151
MessageSendEvent::SendStfu { ref node_id, ref msg} => {
21502152
let logger = WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None);
21512153
log_debug!(logger, "Handling SendStfu event in peer_handler for node {} for channel {}",
21522154
log_pubkey!(node_id),
21532155
&msg.channel_id);
2154-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
2156+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
21552157
}
21562158
MessageSendEvent::SendSpliceInit { ref node_id, ref msg} => {
21572159
let logger = WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None);
21582160
log_debug!(logger, "Handling SendSpliceInit event in peer_handler for node {} for channel {}",
21592161
log_pubkey!(node_id),
21602162
&msg.channel_id);
2161-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
2163+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
21622164
}
21632165
MessageSendEvent::SendSpliceAck { ref node_id, ref msg} => {
21642166
let logger = WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None);
21652167
log_debug!(logger, "Handling SendSpliceAck event in peer_handler for node {} for channel {}",
21662168
log_pubkey!(node_id),
21672169
&msg.channel_id);
2168-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
2170+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
21692171
}
21702172
MessageSendEvent::SendSpliceLocked { ref node_id, ref msg} => {
21712173
let logger = WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None);
21722174
log_debug!(logger, "Handling SendSpliceLocked event in peer_handler for node {} for channel {}",
21732175
log_pubkey!(node_id),
21742176
&msg.channel_id);
2175-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
2177+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
21762178
}
21772179
MessageSendEvent::SendTxAddInput { ref node_id, ref msg } => {
21782180
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxAddInput event in peer_handler for node {} for channel {}",
21792181
log_pubkey!(node_id),
21802182
&msg.channel_id);
2181-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
2183+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
21822184
},
21832185
MessageSendEvent::SendTxAddOutput { ref node_id, ref msg } => {
21842186
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxAddOutput event in peer_handler for node {} for channel {}",
21852187
log_pubkey!(node_id),
21862188
&msg.channel_id);
2187-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
2189+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
21882190
},
21892191
MessageSendEvent::SendTxRemoveInput { ref node_id, ref msg } => {
21902192
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxRemoveInput event in peer_handler for node {} for channel {}",
21912193
log_pubkey!(node_id),
21922194
&msg.channel_id);
2193-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
2195+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
21942196
},
21952197
MessageSendEvent::SendTxRemoveOutput { ref node_id, ref msg } => {
21962198
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxRemoveOutput event in peer_handler for node {} for channel {}",
21972199
log_pubkey!(node_id),
21982200
&msg.channel_id);
2199-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
2201+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
22002202
},
22012203
MessageSendEvent::SendTxComplete { ref node_id, ref msg } => {
22022204
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxComplete event in peer_handler for node {} for channel {}",
22032205
log_pubkey!(node_id),
22042206
&msg.channel_id);
2205-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
2207+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
22062208
},
22072209
MessageSendEvent::SendTxSignatures { ref node_id, ref msg } => {
22082210
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxSignatures event in peer_handler for node {} for channel {}",
22092211
log_pubkey!(node_id),
22102212
&msg.channel_id);
2211-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
2213+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
22122214
},
22132215
MessageSendEvent::SendTxInitRbf { ref node_id, ref msg } => {
22142216
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxInitRbf event in peer_handler for node {} for channel {}",
22152217
log_pubkey!(node_id),
22162218
&msg.channel_id);
2217-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
2219+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
22182220
},
22192221
MessageSendEvent::SendTxAckRbf { ref node_id, ref msg } => {
22202222
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxAckRbf event in peer_handler for node {} for channel {}",
22212223
log_pubkey!(node_id),
22222224
&msg.channel_id);
2223-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
2225+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
22242226
},
22252227
MessageSendEvent::SendTxAbort { ref node_id, ref msg } => {
22262228
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxAbort event in peer_handler for node {} for channel {}",
22272229
log_pubkey!(node_id),
22282230
&msg.channel_id);
2229-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
2231+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
22302232
},
22312233
MessageSendEvent::SendAnnouncementSignatures { ref node_id, ref msg } => {
22322234
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendAnnouncementSignatures event in peer_handler for node {} for channel {})",
22332235
log_pubkey!(node_id),
22342236
&msg.channel_id);
2235-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
2237+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
22362238
},
22372239
MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => {
22382240
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(commitment_signed.channel_id), None), "Handling UpdateHTLCs event in peer_handler for node {} with {} adds, {} fulfills, {} fails for channel {}",
@@ -2241,7 +2243,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
22412243
update_fulfill_htlcs.len(),
22422244
update_fail_htlcs.len(),
22432245
&commitment_signed.channel_id);
2244-
let mut peer = get_peer_for_forwarding!(node_id);
2246+
let mut peer = get_peer_for_forwarding!(node_id)?;
22452247
for msg in update_add_htlcs {
22462248
self.enqueue_message(&mut *peer, msg);
22472249
}
@@ -2263,32 +2265,32 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
22632265
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendRevokeAndACK event in peer_handler for node {} for channel {}",
22642266
log_pubkey!(node_id),
22652267
&msg.channel_id);
2266-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
2268+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
22672269
},
22682270
MessageSendEvent::SendClosingSigned { ref node_id, ref msg } => {
22692271
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendClosingSigned event in peer_handler for node {} for channel {}",
22702272
log_pubkey!(node_id),
22712273
&msg.channel_id);
2272-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
2274+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
22732275
},
22742276
MessageSendEvent::SendShutdown { ref node_id, ref msg } => {
22752277
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling Shutdown event in peer_handler for node {} for channel {}",
22762278
log_pubkey!(node_id),
22772279
&msg.channel_id);
2278-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
2280+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
22792281
},
22802282
MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } => {
22812283
log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendChannelReestablish event in peer_handler for node {} for channel {}",
22822284
log_pubkey!(node_id),
22832285
&msg.channel_id);
2284-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
2286+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
22852287
},
22862288
MessageSendEvent::SendChannelAnnouncement { ref node_id, ref msg, ref update_msg } => {
22872289
log_debug!(WithContext::from(&self.logger, Some(*node_id), None, None), "Handling SendChannelAnnouncement event in peer_handler for node {} for short channel id {}",
22882290
log_pubkey!(node_id),
22892291
msg.contents.short_channel_id);
2290-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
2291-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), update_msg);
2292+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
2293+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, update_msg);
22922294
},
22932295
MessageSendEvent::BroadcastChannelAnnouncement { msg, update_msg } => {
22942296
log_debug!(self.logger, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}", msg.contents.short_channel_id);
@@ -2324,7 +2326,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
23242326
MessageSendEvent::SendChannelUpdate { ref node_id, ref msg } => {
23252327
log_trace!(WithContext::from(&self.logger, Some(*node_id), None, None), "Handling SendChannelUpdate event in peer_handler for node {} for channel {}",
23262328
log_pubkey!(node_id), msg.contents.short_channel_id);
2327-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
2329+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
23282330
},
23292331
MessageSendEvent::HandleError { node_id, action } => {
23302332
let logger = WithContext::from(&self.logger, Some(node_id), None, None);
@@ -2362,21 +2364,21 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
23622364
log_trace!(logger, "Handling SendErrorMessage HandleError event in peer_handler for node {} with message {}",
23632365
log_pubkey!(node_id),
23642366
msg.data);
2365-
self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id), msg);
2367+
self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id)?, msg);
23662368
},
23672369
msgs::ErrorAction::SendWarningMessage { ref msg, ref log_level } => {
23682370
log_given_level!(logger, *log_level, "Handling SendWarningMessage HandleError event in peer_handler for node {} with message {}",
23692371
log_pubkey!(node_id),
23702372
msg.data);
2371-
self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id), msg);
2373+
self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id)?, msg);
23722374
},
23732375
}
23742376
},
23752377
MessageSendEvent::SendChannelRangeQuery { ref node_id, ref msg } => {
2376-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
2378+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
23772379
},
23782380
MessageSendEvent::SendShortIdsQuery { ref node_id, ref msg } => {
2379-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
2381+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
23802382
}
23812383
MessageSendEvent::SendReplyChannelRange { ref node_id, ref msg } => {
23822384
log_gossip!(WithContext::from(&self.logger, Some(*node_id), None, None), "Handling SendReplyChannelRange event in peer_handler for node {} with num_scids={} first_blocknum={} number_of_blocks={}, sync_complete={}",
@@ -2385,17 +2387,24 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
23852387
msg.first_blocknum,
23862388
msg.number_of_blocks,
23872389
msg.sync_complete);
2388-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
2390+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
23892391
}
23902392
MessageSendEvent::SendGossipTimestampFilter { ref node_id, ref msg } => {
2391-
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
2393+
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
23922394
}
23932395
}
2396+
Some(())
2397+
};
2398+
for event in chan_events {
2399+
handle_event(event, true);
2400+
}
2401+
for event in route_events {
2402+
handle_event(event, false);
23942403
}
23952404

23962405
for (node_id, msg) in self.message_handler.custom_message_handler.get_and_clear_pending_msg() {
23972406
if peers_to_disconnect.get(&node_id).is_some() { continue; }
2398-
self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id), &msg);
2407+
self.enqueue_message(&mut *if let Some(peer) = get_peer_for_forwarding!(&node_id) { peer } else { continue; }, &msg);
23992408
}
24002409

24012410
for (descriptor, peer_mutex) in peers.iter() {

0 commit comments

Comments
 (0)