Skip to content

Commit 5cef02c

Browse files
committed
Fix (and test) net-tokio outbound conns without a threaded env
1 parent b51044b commit 5cef02c

File tree

1 file changed

+34
-10
lines changed

1 file changed

+34
-10
lines changed

lightning-net-tokio/src/lib.rs

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -295,17 +295,33 @@ pub fn setup_inbound<CMH: ChannelMessageHandler + 'static>(peer_manager: Arc<pee
295295
///
296296
/// See the module-level documentation for how to handle the event_notify mpsc::Sender.
297297
pub fn setup_outbound<CMH: ChannelMessageHandler + 'static>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, stream: TcpStream) -> impl std::future::Future<Output=()> {
298-
let (reader, receiver, us) = Connection::new(event_notify, stream);
298+
let (reader, mut write_receiver, us) = Connection::new(event_notify, stream);
299299

300300
let handle_opt = if let Ok(initial_send) = peer_manager.new_outbound_connection(their_node_id, SocketDescriptor::new(us.clone())) {
301301
Some(tokio::spawn(async move {
302-
if SocketDescriptor::new(us.clone()).send_data(&initial_send, true) != initial_send.len() {
303-
// We should essentially always have enough room in a TCP socket buffer to send the
304-
// initial 10s of bytes, if not, just give up as hopeless.
305-
eprintln!("Failed to write first full message to socket!");
306-
peer_manager.socket_disconnected(&SocketDescriptor::new(Arc::clone(&us)));
307-
} else {
308-
Connection::schedule_read(peer_manager, us, reader, receiver).await;
302+
// We should essentially always have enough room in a TCP socket buffer to send the
303+
// initial 10s of bytes, however, tokio running in single-threaded mode will always
304+
// fail writes and wake us back up later to write, so we handle a Pending, but still
305+
// expect to write the full set of bytes at once and use a relatively tight timeout.
306+
if let Ok(Ok(())) = tokio::time::timeout(Duration::from_millis(100), async {
307+
loop {
308+
match SocketDescriptor::new(us.clone()).send_data(&initial_send, true) {
309+
v if v == initial_send.len() => break Ok(()),
310+
0 => {
311+
write_receiver.recv().await;
312+
// In theory we could check for if we've been instructed to disconnect
313+
// the peer here, but its OK to just skip it - we'll check for it in
314+
// schedule_read prior to any relevant calls into RL.
315+
},
316+
_ => {
317+
eprintln!("Failed to write first full message to socket!");
318+
peer_manager.socket_disconnected(&SocketDescriptor::new(Arc::clone(&us)));
319+
break Err(());
320+
}
321+
}
322+
}
323+
}).await {
324+
Connection::schedule_read(peer_manager, us, reader, write_receiver).await;
309325
}
310326
}))
311327
} else {
@@ -546,8 +562,7 @@ mod tests {
546562
}
547563
}
548564

549-
#[tokio::test(threaded_scheduler)]
550-
async fn basic_connection_test() {
565+
async fn do_basic_connection_test() {
551566
let secp_ctx = Secp256k1::new();
552567
let a_key = SecretKey::from_slice(&[1; 32]).unwrap();
553568
let b_key = SecretKey::from_slice(&[1; 32]).unwrap();
@@ -612,4 +627,13 @@ mod tests {
612627
fut_a.await;
613628
fut_b.await;
614629
}
630+
631+
#[tokio::test(threaded_scheduler)]
632+
async fn basic_threaded_connection_test() {
633+
do_basic_connection_test().await;
634+
}
635+
#[tokio::test]
636+
async fn basic_unthreaded_connection_test() {
637+
do_basic_connection_test().await;
638+
}
615639
}

0 commit comments

Comments
 (0)