Skip to content

Commit 0f7367b

Browse files
committed
refactor: Continue constricting interfaces in message queue path
Introduce the MessageQueuer interface used by Transport to queue messages for send and use it in all the existing places where messages are queued.
1 parent f008fd2 commit 0f7367b

File tree

4 files changed

+75
-80
lines changed

4 files changed

+75
-80
lines changed

lightning/src/ln/peers/handler.rs

Lines changed: 47 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,12 @@ use bitcoin::hashes::sha256::Hash as Sha256;
3939
use bitcoin::hashes::sha256::HashEngine as Sha256Engine;
4040
use bitcoin::hashes::{HashEngine, Hash};
4141
use ln::peers::outbound_queue::OutboundQueue;
42-
use ln::peers::transport::Transport;
42+
use ln::peers::transport::{PayloadQueuer, Transport};
4343

4444
const MSG_BUFF_SIZE: usize = 10;
4545

46-
/// Interface PeerHandler uses to interact with the Transport object
47-
pub(super) trait ITransport {
46+
/// Interface PeerManager uses to interact with the Transport object
47+
pub(super) trait ITransport: MessageQueuer {
4848
/// Instantiate the new outbound Transport
4949
fn new_outbound(initiator_static_private_key: &SecretKey, responder_static_public_key: &PublicKey, initiator_ephemeral_private_key: &SecretKey) -> Self;
5050

@@ -65,41 +65,16 @@ pub(super) trait ITransport {
6565

6666
/// Returns all Messages that have been received and can be parsed by the Transport
6767
fn drain_messages<L: Deref>(&mut self, logger: L) -> Result<Vec<Message>, PeerHandleError> where L::Target: Logger;
68+
}
6869

70+
/// Interface PeerManager uses to queue message to send. Used primarily to restrict the interface in
71+
/// specific contexts. e.g. Only queueing during read_event(). No flushing allowed.
72+
pub(super) trait MessageQueuer {
6973
/// Encodes, encrypts, and enqueues a message to the outbound queue. Panics if the connection is
7074
/// not established yet.
7175
fn enqueue_message<M: Encode + Writeable, Q: PayloadQueuer, L: Deref>(&mut self, message: &M, output_buffer: &mut Q, logger: L) where L::Target: Logger;
7276
}
7377

74-
75-
/// Trait representing a container that allows enqueuing of Vec<[u8]>
76-
pub(super) trait PayloadQueuer {
77-
/// Enqueue item to the queue
78-
fn push_back(&mut self, item: Vec<u8>);
79-
80-
/// Returns true if the queue is empty
81-
fn is_empty(&self) -> bool;
82-
83-
/// Returns the amount of available space in queue
84-
fn queue_space(&self) -> usize;
85-
}
86-
87-
/// Implement &mut PayloadQueuer passthroughs
88-
impl<'a, T> PayloadQueuer for &'a mut T where
89-
T: PayloadQueuer {
90-
fn push_back(&mut self, item: Vec<u8>) {
91-
T::push_back(self, item)
92-
}
93-
94-
fn is_empty(&self) -> bool {
95-
T::is_empty(self)
96-
}
97-
98-
fn queue_space(&self) -> usize {
99-
T::queue_space(self)
100-
}
101-
}
102-
10378
/// Trait representing a container that can try to flush data through a SocketDescriptor
10479
pub(super) trait SocketDescriptorFlusher {
10580
/// Write previously enqueued data to the SocketDescriptor. A return of false indicates the
@@ -520,10 +495,10 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
520495

521496
fn do_attempt_write_data<Q: PayloadQueuer + SocketDescriptorFlusher>(
522497
&self,
523-
descriptor: &mut Descriptor,
524-
sync_status: &mut InitSyncTracker,
525-
transport: &mut impl ITransport,
526-
pending_outbound_buffer: &mut Q) {
498+
descriptor: &mut Descriptor,
499+
sync_status: &mut InitSyncTracker,
500+
message_queuer: &mut impl MessageQueuer,
501+
pending_outbound_buffer: &mut Q) {
527502
while !pending_outbound_buffer.is_blocked() {
528503
let queue_space = pending_outbound_buffer.queue_space();
529504
if queue_space > 0 {
@@ -533,12 +508,12 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
533508
let steps = ((queue_space + 2) / 3) as u8;
534509
let all_messages = self.message_handler.route_handler.get_next_channel_announcements(c, steps);
535510
for &(ref announce, ref update_a_option, ref update_b_option) in all_messages.iter() {
536-
transport.enqueue_message(announce, pending_outbound_buffer, &*self.logger);
511+
message_queuer.enqueue_message(announce, pending_outbound_buffer, &*self.logger);
537512
if let &Some(ref update_a) = update_a_option {
538-
transport.enqueue_message(update_a, pending_outbound_buffer, &*self.logger);
513+
message_queuer.enqueue_message(update_a, pending_outbound_buffer, &*self.logger);
539514
}
540515
if let &Some(ref update_b) = update_b_option {
541-
transport.enqueue_message(update_b, pending_outbound_buffer, &*self.logger);
516+
message_queuer.enqueue_message(update_b, pending_outbound_buffer, &*self.logger);
542517
}
543518
*sync_status = InitSyncTracker::ChannelsSyncing(announce.contents.short_channel_id + 1);
544519
}
@@ -550,7 +525,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
550525
let steps = queue_space as u8;
551526
let all_messages = self.message_handler.route_handler.get_next_node_announcements(None, steps);
552527
for msg in all_messages.iter() {
553-
transport.enqueue_message(msg, pending_outbound_buffer, &*self.logger);
528+
message_queuer.enqueue_message(msg, pending_outbound_buffer, &*self.logger);
554529
*sync_status = InitSyncTracker::NodesSyncing(msg.contents.node_id);
555530
}
556531
if all_messages.is_empty() || all_messages.len() != steps as usize {
@@ -562,7 +537,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
562537
let steps = queue_space as u8;
563538
let all_messages = self.message_handler.route_handler.get_next_node_announcements(Some(&key), steps);
564539
for msg in all_messages.iter() {
565-
transport.enqueue_message(msg, pending_outbound_buffer, &*self.logger);
540+
message_queuer.enqueue_message(msg, pending_outbound_buffer, &*self.logger);
566541
*sync_status = InitSyncTracker::NodesSyncing(msg.contents.node_id);
567542
}
568543
if all_messages.is_empty() || all_messages.len() != steps as usize {
@@ -613,8 +588,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
613588
}
614589

615590
/// Append a message to a peer's pending outbound/write buffer, and update the map of peers needing sends accordingly.
616-
fn enqueue_message<M: Encode + Writeable>(&self, peers_needing_send: &mut HashSet<Descriptor>, transport: &mut impl ITransport, output_buffer: &mut impl PayloadQueuer, descriptor: &Descriptor, message: &M) {
617-
transport.enqueue_message(message, output_buffer, &*self.logger);
591+
fn enqueue_message<M: Encode + Writeable>(&self, peers_needing_send: &mut HashSet<Descriptor>, message_queuer: &mut impl MessageQueuer, output_buffer: &mut impl PayloadQueuer, descriptor: &Descriptor, message: &M) {
592+
message_queuer.enqueue_message(message, output_buffer, &*self.logger);
618593
peers_needing_send.insert(descriptor.clone());
619594
}
620595

@@ -705,7 +680,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
705680
}
706681
}
707682

708-
if let Err(handling_error) = self.handle_message(peers_needing_send, peer_descriptor, message, outbound, sync_status, awaiting_pong, their_features, transport, pending_outbound_buffer) {
683+
if let Err(handling_error) = self.handle_message(peers_needing_send, peer_descriptor, message, outbound, sync_status, awaiting_pong, their_features, transport.get_their_node_id(), transport, pending_outbound_buffer) {
709684
match handling_error {
710685
MessageHandlingError::PeerHandleError(e) => { return Err(e) },
711686
MessageHandlingError::LightningError(e) => {
@@ -730,15 +705,16 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
730705
sync_status: &mut InitSyncTracker,
731706
awaiting_pong: &mut bool,
732707
their_features: &mut Option<InitFeatures>,
733-
transport: &mut impl ITransport,
708+
their_node_id: PublicKey,
709+
message_queuer: &mut impl MessageQueuer,
734710
pending_outbound_buffer: &mut impl PayloadQueuer
735711
) -> Result<(), MessageHandlingError> {
736-
log_trace!(self.logger, "Received message of type {} from {}", message.type_id(), log_pubkey!(transport.get_their_node_id()));
712+
log_trace!(self.logger, "Received message of type {} from {}", message.type_id(), log_pubkey!(&their_node_id));
737713

738714
// Need an Init as first message
739715
if let wire::Message::Init(_) = message {
740716
} else if their_features.is_none() {
741-
log_trace!(self.logger, "Peer {} sent non-Init first message", log_pubkey!(transport.get_their_node_id()));
717+
log_trace!(self.logger, "Peer {} sent non-Init first message", log_pubkey!(&their_node_id));
742718
return Err(PeerHandleError{ no_connection_possible: false }.into());
743719
}
744720

@@ -771,21 +747,21 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
771747
peers_needing_send.insert(peer_descriptor.clone());
772748
}
773749
if !msg.features.supports_static_remote_key() {
774-
log_debug!(self.logger, "Peer {} does not support static remote key, disconnecting with no_connection_possible", log_pubkey!(transport.get_their_node_id()));
750+
log_debug!(self.logger, "Peer {} does not support static remote key, disconnecting with no_connection_possible", log_pubkey!(&their_node_id));
775751
return Err(PeerHandleError{ no_connection_possible: true }.into());
776752
}
777753

778754
if !outbound {
779755
let mut features = InitFeatures::known();
780-
if !self.message_handler.route_handler.should_request_full_sync(&transport.get_their_node_id()) {
756+
if !self.message_handler.route_handler.should_request_full_sync(&their_node_id) {
781757
features.clear_initial_routing_sync();
782758
}
783759

784760
let resp = msgs::Init { features };
785-
self.enqueue_message(peers_needing_send, transport, pending_outbound_buffer, &peer_descriptor, &resp);
761+
self.enqueue_message(peers_needing_send, message_queuer, pending_outbound_buffer, &peer_descriptor, &resp);
786762
}
787763

788-
self.message_handler.chan_handler.peer_connected(&transport.get_their_node_id(), &msg);
764+
self.message_handler.chan_handler.peer_connected(&their_node_id, &msg);
789765
*their_features = Some(msg.features);
790766
},
791767
wire::Message::Error(msg) => {
@@ -798,11 +774,11 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
798774
}
799775

800776
if data_is_printable {
801-
log_debug!(self.logger, "Got Err message from {}: {}", log_pubkey!(transport.get_their_node_id()), msg.data);
777+
log_debug!(self.logger, "Got Err message from {}: {}", log_pubkey!(&their_node_id), msg.data);
802778
} else {
803-
log_debug!(self.logger, "Got Err message from {} with non-ASCII error message", log_pubkey!(transport.get_their_node_id()));
779+
log_debug!(self.logger, "Got Err message from {} with non-ASCII error message", log_pubkey!(&their_node_id));
804780
}
805-
self.message_handler.chan_handler.handle_error(&transport.get_their_node_id(), &msg);
781+
self.message_handler.chan_handler.handle_error(&their_node_id, &msg);
806782
if msg.channel_id == [0; 32] {
807783
return Err(PeerHandleError{ no_connection_possible: true }.into());
808784
}
@@ -811,7 +787,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
811787
wire::Message::Ping(msg) => {
812788
if msg.ponglen < 65532 {
813789
let resp = msgs::Pong { byteslen: msg.ponglen };
814-
self.enqueue_message(peers_needing_send, transport, pending_outbound_buffer, &peer_descriptor, &resp);
790+
self.enqueue_message(peers_needing_send, message_queuer, pending_outbound_buffer, &peer_descriptor, &resp);
815791
}
816792
},
817793
wire::Message::Pong(_msg) => {
@@ -820,59 +796,59 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, TransportImpl
820796

821797
// Channel messages:
822798
wire::Message::OpenChannel(msg) => {
823-
self.message_handler.chan_handler.handle_open_channel(&transport.get_their_node_id(), their_features.clone().unwrap(), &msg);
799+
self.message_handler.chan_handler.handle_open_channel(&their_node_id, their_features.clone().unwrap(), &msg);
824800
},
825801
wire::Message::AcceptChannel(msg) => {
826-
self.message_handler.chan_handler.handle_accept_channel(&transport.get_their_node_id(), their_features.clone().unwrap(), &msg);
802+
self.message_handler.chan_handler.handle_accept_channel(&their_node_id, their_features.clone().unwrap(), &msg);
827803
},
828804

829805
wire::Message::FundingCreated(msg) => {
830-
self.message_handler.chan_handler.handle_funding_created(&transport.get_their_node_id(), &msg);
806+
self.message_handler.chan_handler.handle_funding_created(&their_node_id, &msg);
831807
},
832808
wire::Message::FundingSigned(msg) => {
833-
self.message_handler.chan_handler.handle_funding_signed(&transport.get_their_node_id(), &msg);
809+
self.message_handler.chan_handler.handle_funding_signed(&their_node_id, &msg);
834810
},
835811
wire::Message::FundingLocked(msg) => {
836-
self.message_handler.chan_handler.handle_funding_locked(&transport.get_their_node_id(), &msg);
812+
self.message_handler.chan_handler.handle_funding_locked(&their_node_id, &msg);
837813
},
838814

839815
wire::Message::Shutdown(msg) => {
840-
self.message_handler.chan_handler.handle_shutdown(&transport.get_their_node_id(), &msg);
816+
self.message_handler.chan_handler.handle_shutdown(&their_node_id, &msg);
841817
},
842818
wire::Message::ClosingSigned(msg) => {
843-
self.message_handler.chan_handler.handle_closing_signed(&transport.get_their_node_id(), &msg);
819+
self.message_handler.chan_handler.handle_closing_signed(&their_node_id, &msg);
844820
},
845821

846822
// Commitment messages:
847823
wire::Message::UpdateAddHTLC(msg) => {
848-
self.message_handler.chan_handler.handle_update_add_htlc(&transport.get_their_node_id(), &msg);
824+
self.message_handler.chan_handler.handle_update_add_htlc(&their_node_id, &msg);
849825
},
850826
wire::Message::UpdateFulfillHTLC(msg) => {
851-
self.message_handler.chan_handler.handle_update_fulfill_htlc(&transport.get_their_node_id(), &msg);
827+
self.message_handler.chan_handler.handle_update_fulfill_htlc(&their_node_id, &msg);
852828
},
853829
wire::Message::UpdateFailHTLC(msg) => {
854-
self.message_handler.chan_handler.handle_update_fail_htlc(&transport.get_their_node_id(), &msg);
830+
self.message_handler.chan_handler.handle_update_fail_htlc(&their_node_id, &msg);
855831
},
856832
wire::Message::UpdateFailMalformedHTLC(msg) => {
857-
self.message_handler.chan_handler.handle_update_fail_malformed_htlc(&transport.get_their_node_id(), &msg);
833+
self.message_handler.chan_handler.handle_update_fail_malformed_htlc(&their_node_id, &msg);
858834
},
859835

860836
wire::Message::CommitmentSigned(msg) => {
861-
self.message_handler.chan_handler.handle_commitment_signed(&transport.get_their_node_id(), &msg);
837+
self.message_handler.chan_handler.handle_commitment_signed(&their_node_id, &msg);
862838
},
863839
wire::Message::RevokeAndACK(msg) => {
864-
self.message_handler.chan_handler.handle_revoke_and_ack(&transport.get_their_node_id(), &msg);
840+
self.message_handler.chan_handler.handle_revoke_and_ack(&their_node_id, &msg);
865841
},
866842
wire::Message::UpdateFee(msg) => {
867-
self.message_handler.chan_handler.handle_update_fee(&transport.get_their_node_id(), &msg);
843+
self.message_handler.chan_handler.handle_update_fee(&their_node_id, &msg);
868844
},
869845
wire::Message::ChannelReestablish(msg) => {
870-
self.message_handler.chan_handler.handle_channel_reestablish(&transport.get_their_node_id(), &msg);
846+
self.message_handler.chan_handler.handle_channel_reestablish(&their_node_id, &msg);
871847
},
872848

873849
// Routing messages:
874850
wire::Message::AnnouncementSignatures(msg) => {
875-
self.message_handler.chan_handler.handle_announcement_signatures(&transport.get_their_node_id(), &msg);
851+
self.message_handler.chan_handler.handle_announcement_signatures(&their_node_id, &msg);
876852
},
877853
wire::Message::ChannelAnnouncement(msg) => {
878854
let should_forward = match self.message_handler.route_handler.handle_channel_announcement(&msg) {

lightning/src/ln/peers/outbound_queue.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
/// Abstracts the buffer used to write data through a SocketDescriptor handling partial writes and
22
/// flow control.
33
4-
use ln::peers::handler::{SocketDescriptor, PayloadQueuer, SocketDescriptorFlusher};
4+
use ln::peers::handler::{SocketDescriptor, SocketDescriptorFlusher};
5+
use ln::peers::transport::PayloadQueuer;
56
use std::collections::LinkedList;
67
use std::cmp;
78

lightning/src/ln/peers/test_util.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ use bitcoin::secp256k1;
44
use bitcoin::secp256k1::key::{PublicKey, SecretKey};
55

66
use ln::peers::conduit::Conduit;
7-
use ln::peers::handler::{SocketDescriptor, PayloadQueuer, ITransport, PeerHandleError};
8-
use ln::peers::transport::IPeerHandshake;
7+
use ln::peers::handler::{SocketDescriptor, ITransport, PeerHandleError, MessageQueuer};
8+
use ln::peers::transport::{IPeerHandshake, PayloadQueuer};
99

1010
use std::rc::Rc;
1111
use std::cell::{RefCell};
@@ -300,7 +300,9 @@ impl<'a> ITransport for &'a RefCell<TransportStub> {
300300
fn drain_messages<L: Deref>(&mut self, logger: L) -> Result<Vec<Message>, PeerHandleError> where L::Target: Logger {
301301
self.borrow_mut().drain_messages(logger)
302302
}
303+
}
303304

305+
impl<'a> MessageQueuer for &'a RefCell<TransportStub> {
304306
fn enqueue_message<M: Encode + Writeable, Q: PayloadQueuer, L: Deref>(&mut self, message: &M, output_buffer: &mut Q, logger: L) where L::Target: Logger {
305307
self.borrow_mut().enqueue_message(message, output_buffer, logger)
306308
}
@@ -351,7 +353,9 @@ impl ITransport for TransportStub {
351353
fn drain_messages<L: Deref>(&mut self, _logger: L) -> Result<Vec<Message>, PeerHandleError> where L::Target: Logger {
352354
Ok(self.messages.drain(..).collect())
353355
}
356+
}
354357

358+
impl MessageQueuer for TransportStub {
355359
fn enqueue_message<M: Encode + Writeable, Q: PayloadQueuer, L: Deref>(&mut self, message: &M, output_buffer: &mut Q, _logger: L)
356360
where L::Target: Logger {
357361
let mut buffer = VecWriter(Vec::new());

lightning/src/ln/peers/transport.rs

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
use bitcoin::secp256k1::{SecretKey, PublicKey};
44

55
use ln::peers::conduit::Conduit;
6-
use ln::peers::handler::{ITransport, PeerHandleError, PayloadQueuer};
6+
use ln::peers::handler::{ITransport, PeerHandleError, MessageQueuer};
77
use ln::peers::handshake::PeerHandshake;
88
use ln::{wire, msgs};
99
use ln::wire::{Encode, Message};
@@ -28,6 +28,18 @@ pub trait IPeerHandshake {
2828
fn process_act(&mut self, input: &[u8]) -> Result<(Option<Vec<u8>>, Option<(Conduit, PublicKey)>), String>;
2929
}
3030

31+
/// Trait representing a container that allows enqueuing of Vec<[u8]>
32+
pub(super) trait PayloadQueuer {
33+
/// Enqueue item to the queue
34+
fn push_back(&mut self, item: Vec<u8>);
35+
36+
/// Returns true if the queue is empty
37+
fn is_empty(&self) -> bool;
38+
39+
/// Returns the amount of available space in queue
40+
fn queue_space(&self) -> usize;
41+
}
42+
3143
pub(super) struct Transport<PeerHandshakeImpl: IPeerHandshake=PeerHandshake> {
3244
pub(super) conduit: Option<Conduit>,
3345
handshake: PeerHandshakeImpl,
@@ -146,6 +158,13 @@ impl<PeerHandshakeImpl: IPeerHandshake> ITransport for Transport<PeerHandshakeIm
146158
self.conduit.is_some()
147159
}
148160

161+
fn get_their_node_id(&self) -> PublicKey {
162+
assert!(self.is_connected(), "Retrieving the remote node_id is only supported after transport is connected");
163+
self.their_node_id.unwrap()
164+
}
165+
}
166+
167+
impl<PeerHandshakeImpl: IPeerHandshake> MessageQueuer for Transport<PeerHandshakeImpl> {
149168
fn enqueue_message<M: Encode + Writeable, Q: PayloadQueuer, L: Deref>(&mut self, message: &M, output_buffer: &mut Q, logger: L)
150169
where L::Target: Logger {
151170

@@ -160,11 +179,6 @@ impl<PeerHandshakeImpl: IPeerHandshake> ITransport for Transport<PeerHandshakeIm
160179
}
161180
}
162181
}
163-
164-
fn get_their_node_id(&self) -> PublicKey {
165-
assert!(self.is_connected(), "Retrieving the remote node_id is only supported after transport is connected");
166-
self.their_node_id.unwrap()
167-
}
168182
}
169183

170184
#[cfg(test)]

0 commit comments

Comments
 (0)