Skip to content

Commit 461f3be

Browse files
committed
Remove peers_needing_send set
Motivated by #456, remove the peers_needing_send set in favor of just scanning the peers during process_events() and attempting to send data for those peers that have items in their outbound queue or pending sync items to be sent out.
1 parent d2a4fa7 commit 461f3be

File tree

1 file changed

+33
-43
lines changed

1 file changed

+33
-43
lines changed

lightning/src/ln/peers/handler.rs

Lines changed: 33 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use util::events::{MessageSendEvent, MessageSendEventsProvider};
2929
use util::logger::Logger;
3030
use routing::network_graph::NetGraphMsgHandler;
3131

32-
use std::collections::{HashMap,HashSet};
32+
use std::collections::HashMap;
3333
use std::sync::{Arc, Mutex};
3434
use std::sync::atomic::{AtomicUsize, Ordering};
3535
use std::{cmp,error,hash,fmt};
@@ -246,9 +246,6 @@ impl<TransportImpl: ITransport> Peer<TransportImpl> {
246246

247247
struct PeerHolder<Descriptor: SocketDescriptor, TransportImpl: ITransport> {
248248
peers: HashMap<Descriptor, Peer<TransportImpl>>,
249-
/// Added to by do_read_event for cases where we pushed a message onto the send buffer but
250-
/// didn't call do_attempt_write_data to avoid reentrancy. Cleared in process_events()
251-
peers_needing_send: HashSet<Descriptor>,
252249
/// Peers in this map have completed the NOISE handshake and received an Init message
253250
node_id_to_descriptor: HashMap<PublicKey, Descriptor>,
254251
}
@@ -290,9 +287,6 @@ impl<Descriptor: SocketDescriptor, TransportImpl: ITransport> PeerHolder<Descrip
290287

291288
// Removes all associated metadata for descriptor and returns the Peer object associated with it
292289
fn remove_peer_by_descriptor(&mut self, descriptor: &Descriptor) -> Peer<TransportImpl> {
293-
// may or may not be in this set depending on in-flight messages
294-
self.peers_needing_send.remove(descriptor);
295-
296290
let peer_option = self.peers.remove(descriptor);
297291
match peer_option {
298292
None => panic!("Descriptor for disconnect_event is not already known to PeerManager"),
@@ -309,6 +303,23 @@ impl<Descriptor: SocketDescriptor, TransportImpl: ITransport> PeerHolder<Descrip
309303
}
310304
}
311305
}
306+
307+
// Returns the collection of peers that have data to send. Could be due to items in their outbound
308+
// queue or sync messages that need to be sent out.
309+
fn peers_needing_send<'a>(&'a mut self) -> Filter<IterMut<'a, Descriptor, Peer<TransportImpl>>, fn(&(&'a Descriptor, &'a mut Peer<TransportImpl>)) -> bool> {
310+
self.peers.iter_mut().filter(|(_, peer)| {
311+
let has_outbound_sync = match &peer.post_init_state {
312+
None => false,
313+
Some(post_init_state) => match &post_init_state.sync_status {
314+
InitSyncTracker::NoSyncRequested => false,
315+
InitSyncTracker::ChannelsSyncing(_) => true,
316+
InitSyncTracker::NodesSyncing(_) => true,
317+
}
318+
};
319+
320+
has_outbound_sync || !peer.outbound_queue.is_empty()
321+
})
322+
}
312323
}
313324

314325
#[cfg(not(any(target_pointer_width = "32", target_pointer_width = "64")))]
@@ -508,7 +519,6 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
508519
message_handler,
509520
peers: Mutex::new(PeerHolder {
510521
peers: HashMap::new(),
511-
peers_needing_send: HashSet::new(),
512522
node_id_to_descriptor: HashMap::new()
513523
}),
514524
our_node_secret,
@@ -663,7 +673,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
663673
None => panic!("Descriptor for read_event is not already known to PeerManager"),
664674
Some(peer) => peer
665675
};
666-
self.do_read_event(peer_descriptor, peer, &mut peers.peers_needing_send, &mut peers.node_id_to_descriptor, data)
676+
self.do_read_event(peer_descriptor, peer, &mut peers.node_id_to_descriptor, data)
667677
};
668678

669679
match result {
@@ -675,12 +685,6 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
675685
}
676686
}
677687

678-
/// Append a message to a peer's pending outbound/write buffer, and update the map of peers needing sends accordingly.
679-
fn enqueue_message<M: Encode + Writeable>(&self, peers_needing_send: &mut HashSet<Descriptor>, message_queuer: &mut impl MessageQueuer, output_buffer: &mut impl PayloadQueuer, descriptor: &Descriptor, message: &M) {
680-
message_queuer.enqueue_message(message, output_buffer, &*self.logger);
681-
peers_needing_send.insert(descriptor.clone());
682-
}
683-
684688
// Returns a valid PostInitState given a Init message
685689
fn post_init_state_from_init_message(&self, init_message: &msgs::Init, their_node_id: &PublicKey) -> Result<PostInitState, PeerHandleError> {
686690
if init_message.features.requires_unknown_bits() {
@@ -716,18 +720,18 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
716720
}
717721

718722
// Add an Init message to the outbound queue
719-
fn enqueue_init_message(&self, descriptor: &Descriptor, peer: &mut Peer<TransportImpl>, peers_needing_send: &mut HashSet<Descriptor>) {
723+
fn enqueue_init_message(&self, peer: &mut Peer<TransportImpl>) {
720724
let mut features = InitFeatures::known();
721725
if !self.message_handler.route_handler.should_request_full_sync(&peer.transport.get_their_node_id()) {
722726
features.clear_initial_routing_sync();
723727
}
724728

725729
let resp = msgs::Init { features };
726-
self.enqueue_message(peers_needing_send, &mut peer.transport, &mut peer.outbound_queue, descriptor, &resp);
730+
peer.transport.enqueue_message(&resp, &mut peer.outbound_queue, &*self.logger);
727731
}
728732

729733
// Process an incoming Init message and set Peer and PeerManager state accordingly
730-
fn process_init_message(&self, message: Message, descriptor: &Descriptor, peer: &mut Peer<TransportImpl>, peers_needing_send: &mut HashSet<Descriptor>, node_id_to_descriptor: &mut HashMap<PublicKey, Descriptor>) -> Result<(), PeerHandleError> {
734+
fn process_init_message(&self, message: Message, descriptor: &Descriptor, peer: &mut Peer<TransportImpl>, node_id_to_descriptor: &mut HashMap<PublicKey, Descriptor>) -> Result<(), PeerHandleError> {
731735
let their_node_id = peer.transport.get_their_node_id();
732736

733737
match message {
@@ -740,13 +744,10 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
740744

741745
let new_post_init_state = self.post_init_state_from_init_message(init_message, &their_node_id)?;
742746

743-
if let InitSyncTracker::ChannelsSyncing(_) = new_post_init_state.sync_status {
744-
peers_needing_send.insert(descriptor.clone());
745-
}
746-
747747
if !peer.outbound {
748-
self.enqueue_init_message(descriptor, peer, peers_needing_send);
748+
self.enqueue_init_message(peer);
749749
}
750+
750751
node_id_to_descriptor.insert(their_node_id.clone(), descriptor.clone());
751752
self.message_handler.chan_handler.peer_connected(&their_node_id, init_message);
752753

@@ -762,7 +763,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
762763
Ok(())
763764
}
764765

765-
fn do_read_event(&self, peer_descriptor: &mut Descriptor, peer: &mut Peer<TransportImpl>, peers_needing_send: &mut HashSet<Descriptor>, node_id_to_descriptor: &mut HashMap<PublicKey, Descriptor>, data: &[u8]) -> Result<bool, PeerHandleError> {
766+
fn do_read_event(&self, peer_descriptor: &mut Descriptor, peer: &mut Peer<TransportImpl>, node_id_to_descriptor: &mut HashMap<PublicKey, Descriptor>, data: &[u8]) -> Result<bool, PeerHandleError> {
766767

767768
match peer.transport.process_input(data, &mut peer.outbound_queue) {
768769
Err(e) => {
@@ -775,13 +776,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
775776
}
776777

777778
if newly_connected && peer.outbound {
778-
self.enqueue_init_message(peer_descriptor, peer, peers_needing_send);
779-
}
780-
781-
// If the transport layer placed items in the outbound queue, we need
782-
// to schedule ourselves for flush during the next process_events()
783-
if !peer.outbound_queue.is_empty() {
784-
peers_needing_send.insert(peer_descriptor.clone());
779+
self.enqueue_init_message(peer);
785780
}
786781
}
787782
}
@@ -790,7 +785,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
790785

791786
if peer.transport.is_connected() && peer.post_init_state.is_none() && received_messages.len() > 0 {
792787
let init_message = received_messages.remove(0);
793-
self.process_init_message(init_message, peer_descriptor, peer, peers_needing_send, node_id_to_descriptor)?;
788+
self.process_init_message(init_message, peer_descriptor, peer, node_id_to_descriptor)?;
794789
}
795790

796791
for message in received_messages {
@@ -811,7 +806,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
811806
},
812807
msgs::ErrorAction::SendErrorMessage { msg } => {
813808
log_trace!(self.logger, "Got Err handling message, sending Error message because {}", e.err);
814-
self.enqueue_message(peers_needing_send, &mut peer.transport, &mut peer.outbound_queue, peer_descriptor, &msg);
809+
peer.transport.enqueue_message(&msg, &mut peer.outbound_queue, &*self.logger);
815810
continue;
816811
},
817812
}
@@ -820,7 +815,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
820815
}
821816
}
822817

823-
if let Err(handling_error) = self.handle_message(message, peer_descriptor, peer, peers_needing_send) {
818+
if let Err(handling_error) = self.handle_message(message, peer) {
824819
match handling_error {
825820
MessageHandlingError::PeerHandleError(e) => { return Err(e) },
826821
MessageHandlingError::LightningError(e) => {
@@ -836,9 +831,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
836831
/// Process an incoming message and return a decision (ok, lightning error, peer handling error) regarding the next action with the peer
837832
fn handle_message(&self,
838833
message: wire::Message,
839-
peer_descriptor: &mut Descriptor,
840-
peer: &mut Peer<TransportImpl>,
841-
peers_needing_send: &mut HashSet<Descriptor>) -> Result<(), MessageHandlingError> {
834+
peer: &mut Peer<TransportImpl>) -> Result<(), MessageHandlingError> {
842835

843836
let their_node_id = peer.transport.get_their_node_id();
844837
let post_init_state = peer.post_init_state.as_mut().unwrap();
@@ -873,7 +866,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
873866
wire::Message::Ping(msg) => {
874867
if msg.ponglen < 65532 {
875868
let resp = msgs::Pong { byteslen: msg.ponglen };
876-
self.enqueue_message(peers_needing_send, &mut peer.transport, &mut peer.outbound_queue, &peer_descriptor, &resp);
869+
peer.transport.enqueue_message(&resp, &mut peer.outbound_queue, &*self.logger);
877870
}
878871
},
879872
wire::Message::Pong(_msg) => {
@@ -1251,11 +1244,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
12511244
}
12521245
}
12531246

1254-
for mut descriptor in peers.peers_needing_send.drain() {
1255-
match peers.peers.get_mut(&descriptor) {
1256-
Some(peer) => self.do_attempt_write_data(&mut descriptor, &mut peer.post_init_state, &mut peer.transport, &mut peer.outbound_queue),
1257-
None => panic!("Inconsistent peers set state!"),
1258-
}
1247+
for (descriptor, peer) in peers.peers_needing_send() {
1248+
self.do_attempt_write_data(&mut descriptor.clone(), &mut peer.post_init_state, &mut peer.transport, &mut peer.outbound_queue);
12591249
}
12601250
}
12611251
}

0 commit comments

Comments
 (0)