Skip to content

Commit 48fe3b3

Browse files
committed
Fix (and test) net-tokio outbound conns without a threaded env
1 parent 92e41ab commit 48fe3b3

File tree

1 file changed

+34
-10
lines changed

1 file changed

+34
-10
lines changed

lightning-net-tokio/src/lib.rs

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -296,17 +296,33 @@ pub fn setup_inbound<CMH: ChannelMessageHandler + 'static>(peer_manager: Arc<pee
296296
///
297297
/// See the module-level documentation for how to handle the event_notify mpsc::Sender.
298298
pub fn setup_outbound<CMH: ChannelMessageHandler + 'static>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, stream: TcpStream) -> impl std::future::Future<Output=()> {
299-
let (reader, receiver, us) = Connection::new(event_notify, stream);
299+
let (reader, mut write_receiver, us) = Connection::new(event_notify, stream);
300300

301301
let handle_opt = if let Ok(initial_send) = peer_manager.new_outbound_connection(their_node_id, SocketDescriptor::new(us.clone())) {
302302
Some(tokio::spawn(async move {
303-
if SocketDescriptor::new(us.clone()).send_data(&initial_send, true) != initial_send.len() {
304-
// We should essentially always have enough room in a TCP socket buffer to send the
305-
// initial 10s of bytes, if not, just give up as hopeless.
306-
eprintln!("Failed to write first full message to socket!");
307-
peer_manager.socket_disconnected(&SocketDescriptor::new(Arc::clone(&us)));
308-
} else {
309-
Connection::schedule_read(peer_manager, us, reader, receiver).await;
303+
// We should essentially always have enough room in a TCP socket buffer to send the
304+
// initial 10s of bytes, however, tokio running in single-threaded mode will always
305+
// fail writes and wake us back up later to write, so we handle a Pending, but still
306+
// expect to write the full set of bytes at once and use a relatively tight timeout.
307+
if let Ok(Ok(())) = tokio::time::timeout(Duration::from_millis(100), async {
308+
loop {
309+
match SocketDescriptor::new(us.clone()).send_data(&initial_send, true) {
310+
v if v == initial_send.len() => break Ok(()),
311+
0 => {
312+
write_receiver.recv().await;
313+
// In theory we could check for if we've been instructed to disconnect
314+
// the peer here, but its OK to just skip it - we'll check for it in
315+
// schedule_read prior to any relevant calls into RL.
316+
},
317+
_ => {
318+
eprintln!("Failed to write first full message to socket!");
319+
peer_manager.socket_disconnected(&SocketDescriptor::new(Arc::clone(&us)));
320+
break Err(());
321+
}
322+
}
323+
}
324+
}).await {
325+
Connection::schedule_read(peer_manager, us, reader, write_receiver).await;
310326
}
311327
}))
312328
} else {
@@ -547,8 +563,7 @@ mod tests {
547563
}
548564
}
549565

550-
#[tokio::test(threaded_scheduler)]
551-
async fn basic_connection_test() {
566+
async fn do_basic_connection_test() {
552567
let secp_ctx = Secp256k1::new();
553568
let a_key = SecretKey::from_slice(&[1; 32]).unwrap();
554569
let b_key = SecretKey::from_slice(&[1; 32]).unwrap();
@@ -613,4 +628,13 @@ mod tests {
613628
fut_a.await;
614629
fut_b.await;
615630
}
631+
632+
#[tokio::test(threaded_scheduler)]
633+
async fn basic_threaded_connection_test() {
634+
do_basic_connection_test().await;
635+
}
636+
#[tokio::test]
637+
async fn basic_unthreaded_connection_test() {
638+
do_basic_connection_test().await;
639+
}
616640
}

0 commit comments

Comments
 (0)