Skip to content

Commit e4b8a59

Browse files
committed
refactor: Abstract SocketDescriptor writing queue
To enable better testing, put the code to enqueue data and flush it to a SocketDescriptor behind an object. This patch starts to implement traits for the various separate interface that each object needs. The idea is the consumer of an interface defines it and the dependency objects implement it. This will be useful when creating test doubles for unit tests and helps make the dependencies of each function more clear. Additional uses of trait-based contracts will be more clear in the next patches that start to add more testing.
1 parent 30286c0 commit e4b8a59

File tree

3 files changed

+441
-36
lines changed

3 files changed

+441
-36
lines changed

lightning/src/ln/peers/handler.rs

Lines changed: 81 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use util::events::{MessageSendEvent, MessageSendEventsProvider};
2929
use util::logger::Logger;
3030
use routing::network_graph::NetGraphMsgHandler;
3131

32-
use std::collections::{HashMap,hash_map,HashSet,LinkedList};
32+
use std::collections::{HashMap,hash_map,HashSet};
3333
use std::sync::{Arc, Mutex};
3434
use std::sync::atomic::{AtomicUsize, Ordering};
3535
use std::{cmp,error,hash,fmt};
@@ -40,6 +40,9 @@ use bitcoin::hashes::sha256::HashEngine as Sha256Engine;
4040
use bitcoin::hashes::{HashEngine, Hash};
4141
use ln::peers::handshake::PeerHandshake;
4242
use ln::peers::conduit::Conduit;
43+
use ln::peers::outbound_queue::OutboundQueue;
44+
45+
const MSG_BUFF_SIZE: usize = 10;
4346

4447
/// Provides references to trait impls which handle different types of messages.
4548
pub struct MessageHandler<CM: Deref, RM: Deref> where
@@ -120,6 +123,64 @@ enum InitSyncTracker{
120123
NodesSyncing(PublicKey),
121124
}
122125

126+
/// Trait representing a container that allows enqueuing of Vec<[u8]>
127+
pub(super) trait PayloadQueuer {
128+
/// Enqueue item to the queue
129+
fn push_back(&mut self, item: Vec<u8>);
130+
131+
/// Returns true if the queue is empty
132+
fn is_empty(&self) -> bool;
133+
134+
/// Returns the amount of available space in queue
135+
fn queue_space(&self) -> usize;
136+
}
137+
138+
/// Implement &mut PayloadQueuer passthroughs
139+
impl<'a, T> PayloadQueuer for &'a mut T where
140+
T: PayloadQueuer {
141+
fn push_back(&mut self, item: Vec<u8>) {
142+
T::push_back(self, item)
143+
}
144+
145+
fn is_empty(&self) -> bool {
146+
T::is_empty(self)
147+
}
148+
149+
fn queue_space(&self) -> usize {
150+
T::queue_space(self)
151+
}
152+
}
153+
154+
/// Trait representing a container that can try to flush data through a SocketDescriptor
155+
pub(super) trait SocketDescriptorFlusher {
156+
/// Write previously enqueued data to the SocketDescriptor. A return of false indicates the
157+
/// underlying SocketDescriptor could not fulfill the send_data() call and the blocked state
158+
/// has been set. Use unblock() when the SocketDescriptor may have more room.
159+
fn try_flush_one(&mut self, descriptor: &mut impl SocketDescriptor) -> bool;
160+
161+
/// Clear the blocked state caused when a previous write failed
162+
fn unblock(&mut self);
163+
164+
/// Check if the container is in a blocked state
165+
fn is_blocked(&self) -> bool;
166+
}
167+
168+
/// Implement &mut Flushable passthroughs
169+
impl<'a, T> SocketDescriptorFlusher for &'a mut T where
170+
T: SocketDescriptorFlusher {
171+
fn try_flush_one(&mut self, descriptor: &mut impl SocketDescriptor) -> bool {
172+
T::try_flush_one(self, descriptor)
173+
}
174+
175+
fn unblock(&mut self) {
176+
T::unblock(self)
177+
}
178+
179+
fn is_blocked(&self) -> bool {
180+
T::is_blocked(self)
181+
}
182+
}
183+
123184
enum PeerState {
124185
Authenticating(PeerHandshake),
125186
Connected(Conduit),
@@ -139,7 +200,7 @@ impl PeerState {
139200
}
140201
}
141202

142-
fn process_peer_data(&mut self, data: &[u8], mutable_response_buffer: &mut LinkedList<Vec<u8>>) -> PeerDataProcessingDecision {
203+
fn process_peer_data(&mut self, data: &[u8], pending_outbound_buffer: &mut impl PayloadQueuer) -> PeerDataProcessingDecision {
143204
let (new_state_opt, decision) = match self {
144205
&mut PeerState::Authenticating(ref mut handshake) => {
145206
match handshake.process_act(data) {
@@ -153,7 +214,7 @@ impl PeerState {
153214

154215
// Any response generated by the handshake sequence is put into the response buffer
155216
if let Some(response_vec) = response_vec_option {
156-
mutable_response_buffer.push_back(response_vec);
217+
pending_outbound_buffer.push_back(response_vec);
157218
}
158219

159220
// if process_act() returns the conduit and remote static public key (node id)
@@ -186,9 +247,7 @@ struct Peer {
186247
their_node_id: Option<PublicKey>,
187248
their_features: Option<InitFeatures>,
188249

189-
pending_outbound_buffer: LinkedList<Vec<u8>>,
190-
pending_outbound_buffer_first_msg_offset: usize,
191-
awaiting_write_event: bool,
250+
pending_outbound_buffer: OutboundQueue,
192251

193252
sync_status: InitSyncTracker,
194253

@@ -375,9 +434,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
375434
their_node_id: Some(their_node_id.clone()),
376435
their_features: None,
377436

378-
pending_outbound_buffer: LinkedList::new(),
379-
pending_outbound_buffer_first_msg_offset: 0,
380-
awaiting_write_event: false,
437+
pending_outbound_buffer: OutboundQueue::new(MSG_BUFF_SIZE),
381438

382439
sync_status: InitSyncTracker::NoSyncRequested,
383440

@@ -407,9 +464,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
407464
their_node_id: None,
408465
their_features: None,
409466

410-
pending_outbound_buffer: LinkedList::new(),
411-
pending_outbound_buffer_first_msg_offset: 0,
412-
awaiting_write_event: false,
467+
pending_outbound_buffer: OutboundQueue::new(MSG_BUFF_SIZE),
413468

414469
sync_status: InitSyncTracker::NoSyncRequested,
415470

@@ -432,13 +487,14 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
432487
}
433488
}
434489
}
435-
const MSG_BUFF_SIZE: usize = 10;
436-
while !peer.awaiting_write_event {
437-
if peer.pending_outbound_buffer.len() < MSG_BUFF_SIZE {
490+
491+
while !peer.pending_outbound_buffer.is_blocked() {
492+
let queue_space = peer.pending_outbound_buffer.queue_space();
493+
if queue_space > 0 {
438494
match peer.sync_status {
439495
InitSyncTracker::NoSyncRequested => {},
440496
InitSyncTracker::ChannelsSyncing(c) if c < 0xffff_ffff_ffff_ffff => {
441-
let steps = ((MSG_BUFF_SIZE - peer.pending_outbound_buffer.len() + 2) / 3) as u8;
497+
let steps = ((queue_space + 2) / 3) as u8;
442498
let all_messages = self.message_handler.route_handler.get_next_channel_announcements(c, steps);
443499
for &(ref announce, ref update_a_option, ref update_b_option) in all_messages.iter() {
444500
encode_and_send_msg!(announce);
@@ -455,7 +511,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
455511
}
456512
},
457513
InitSyncTracker::ChannelsSyncing(c) if c == 0xffff_ffff_ffff_ffff => {
458-
let steps = (MSG_BUFF_SIZE - peer.pending_outbound_buffer.len()) as u8;
514+
let steps = queue_space as u8;
459515
let all_messages = self.message_handler.route_handler.get_next_node_announcements(None, steps);
460516
for msg in all_messages.iter() {
461517
encode_and_send_msg!(msg);
@@ -467,7 +523,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
467523
},
468524
InitSyncTracker::ChannelsSyncing(_) => unreachable!(),
469525
InitSyncTracker::NodesSyncing(key) => {
470-
let steps = (MSG_BUFF_SIZE - peer.pending_outbound_buffer.len()) as u8;
526+
let steps = queue_space as u8;
471527
let all_messages = self.message_handler.route_handler.get_next_node_announcements(Some(&key), steps);
472528
for msg in all_messages.iter() {
473529
encode_and_send_msg!(msg);
@@ -480,23 +536,12 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
480536
}
481537
}
482538

483-
if {
484-
let next_buff = match peer.pending_outbound_buffer.front() {
485-
None => return,
486-
Some(buff) => buff,
487-
};
488-
489-
let should_be_reading = peer.pending_outbound_buffer.len() < MSG_BUFF_SIZE;
490-
let pending = &next_buff[peer.pending_outbound_buffer_first_msg_offset..];
491-
let data_sent = descriptor.send_data(pending, should_be_reading);
492-
peer.pending_outbound_buffer_first_msg_offset += data_sent;
493-
if peer.pending_outbound_buffer_first_msg_offset == next_buff.len() { true } else { false }
494-
} {
495-
peer.pending_outbound_buffer_first_msg_offset = 0;
496-
peer.pending_outbound_buffer.pop_front();
497-
} else {
498-
peer.awaiting_write_event = true;
539+
// No messages to send
540+
if peer.pending_outbound_buffer.is_empty() {
541+
break;
499542
}
543+
544+
peer.pending_outbound_buffer.try_flush_one(descriptor);
500545
}
501546
}
502547

@@ -515,7 +560,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
515560
match peers.peers.get_mut(descriptor) {
516561
None => panic!("Descriptor for write_event is not already known to PeerManager"),
517562
Some(peer) => {
518-
peer.awaiting_write_event = false;
563+
peer.pending_outbound_buffer.unblock();
519564
self.do_attempt_write_data(descriptor, peer);
520565
}
521566
};
@@ -693,7 +738,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
693738

694739
self.do_attempt_write_data(peer_descriptor, peer);
695740

696-
peer.pending_outbound_buffer.len() > 10 // pause_read
741+
peer.pending_outbound_buffer.queue_space() == 0 // pause_read
697742
}
698743
};
699744

lightning/src/ln/peers/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
mod chacha;
1616
pub mod handler;
1717
mod hkdf5869rfc;
18+
mod outbound_queue;
1819

1920
#[cfg(feature = "fuzztarget")]
2021
pub mod conduit;

0 commit comments

Comments
 (0)