Skip to content

Commit 0f8f31b

Browse files
committed
Fix (and test) net-tokio outbound conns without a threaded env
1 parent c9fbff3 commit 0f8f31b

File tree

1 file changed

+33
-9
lines changed

1 file changed

+33
-9
lines changed

lightning-net-tokio/src/lib.rs

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -265,18 +265,34 @@ pub fn setup_inbound<CMH: ChannelMessageHandler + 'static>(peer_manager: Arc<pee
265265
///
266266
/// See the module-level documentation for how to handle the event_notify mpsc::Sender.
267267
pub fn setup_outbound<CMH: ChannelMessageHandler + 'static>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, stream: TcpStream) -> impl std::future::Future<Output=()> {
268-
let (reader, write_receiver, read_receiver, us) = Connection::new(event_notify, stream);
268+
let (reader, mut write_receiver, read_receiver, us) = Connection::new(event_notify, stream);
269269
#[cfg(debug_assertions)]
270270
let last_us = Arc::clone(&us);
271271

272272
let handle_opt = if let Ok(initial_send) = peer_manager.new_outbound_connection(their_node_id, SocketDescriptor::new(us.clone())) {
273273
Some(tokio::spawn(async move {
274-
if SocketDescriptor::new(us.clone()).send_data(&initial_send, true) != initial_send.len() {
275-
// We should essentially always have enough room in a TCP socket buffer to send the
276-
// initial 10s of bytes, if not, just give up as hopeless.
277-
eprintln!("Failed to write first full message to socket!");
278-
peer_manager.socket_disconnected(&SocketDescriptor::new(Arc::clone(&us)));
279-
} else {
274+
// We should essentially always have enough room in a TCP socket buffer to send the
275+
// initial 10s of bytes, however, tokio running in single-threaded mode will always
276+
// fail writes and wake us back up later to write, so we handle a Pending, but still
277+
// expect to write the full set of bytes at once and use a relatively tight timeout.
278+
if let Ok(Ok(())) = tokio::time::timeout(Duration::from_millis(100), async {
279+
loop {
280+
match SocketDescriptor::new(us.clone()).send_data(&initial_send, true) {
281+
v if v == initial_send.len() => break Ok(()),
282+
0 => {
283+
write_receiver.recv().await;
284+
// In theory we could check for if we've been instructed to disconnect
285+
// the peer here, but its OK to just skip it - we'll check for it in
286+
// schedule_read prior to any relevant calls into RL.
287+
},
288+
_ => {
289+
eprintln!("Failed to write first full message to socket!");
290+
peer_manager.socket_disconnected(&SocketDescriptor::new(Arc::clone(&us)));
291+
break Err(());
292+
}
293+
}
294+
}
295+
}).await {
280296
Connection::schedule_read(peer_manager, us, reader, read_receiver, write_receiver).await;
281297
}
282298
}))
@@ -525,8 +541,7 @@ mod tests {
525541
}
526542
}
527543

528-
#[tokio::test(threaded_scheduler)]
529-
async fn basic_connection_test() {
544+
async fn do_basic_connection_test() {
530545
let secp_ctx = Secp256k1::new();
531546
let a_key = SecretKey::from_slice(&[1; 32]).unwrap();
532547
let b_key = SecretKey::from_slice(&[1; 32]).unwrap();
@@ -591,4 +606,13 @@ mod tests {
591606
fut_a.await;
592607
fut_b.await;
593608
}
609+
610+
#[tokio::test(threaded_scheduler)]
611+
async fn basic_threaded_connection_test() {
612+
do_basic_connection_test().await;
613+
}
614+
#[tokio::test]
615+
async fn basic_unthreaded_connection_test() {
616+
do_basic_connection_test().await;
617+
}
594618
}

0 commit comments

Comments
 (0)