@@ -263,18 +263,34 @@ pub fn setup_inbound<CMH: ChannelMessageHandler + 'static>(peer_manager: Arc<pee
263
263
///
264
264
/// See the module-level documentation for how to handle the event_notify mpsc::Sender.
265
265
pub fn setup_outbound < CMH : ChannelMessageHandler + ' static > ( peer_manager : Arc < peer_handler:: PeerManager < SocketDescriptor , Arc < CMH > > > , event_notify : mpsc:: Sender < ( ) > , their_node_id : PublicKey , stream : TcpStream ) -> impl std:: future:: Future < Output =( ) > {
266
- let ( reader, write_receiver, read_receiver, us) = Connection :: new ( event_notify, stream) ;
266
+ let ( reader, mut write_receiver, read_receiver, us) = Connection :: new ( event_notify, stream) ;
267
267
#[ cfg( debug_assertions) ]
268
268
let last_us = Arc :: clone ( & us) ;
269
269
270
270
let handle_opt = if let Ok ( initial_send) = peer_manager. new_outbound_connection ( their_node_id, SocketDescriptor :: new ( us. clone ( ) ) ) {
271
271
Some ( tokio:: spawn ( async move {
272
- if SocketDescriptor :: new ( us. clone ( ) ) . send_data ( & initial_send, true ) != initial_send. len ( ) {
273
- // We should essentially always have enough room in a TCP socket buffer to send the
274
- // initial 10s of bytes, if not, just give up as hopeless.
275
- eprintln ! ( "Failed to write first full message to socket!" ) ;
276
- peer_manager. socket_disconnected ( & SocketDescriptor :: new ( Arc :: clone ( & us) ) ) ;
277
- } else {
272
+ // We should essentially always have enough room in a TCP socket buffer to send the
273
+ // initial 10s of bytes, however, tokio running in single-threaded mode will always
274
+ // fail writes and wake us back up later to write, so we handle a Pending, but still
275
+ // expect to write the full set of bytes at once and use a relatively tight timeout.
276
+ if let Ok ( Ok ( ( ) ) ) = tokio:: time:: timeout ( Duration :: from_millis ( 100 ) , async {
277
+ loop {
278
+ match SocketDescriptor :: new ( us. clone ( ) ) . send_data ( & initial_send, true ) {
279
+ v if v == initial_send. len ( ) => break Ok ( ( ) ) ,
280
+ 0 => {
281
+ write_receiver. recv ( ) . await ;
282
+ // In theory we could check for if we've been instructed to disconnect
283
+ // the peer here, but its OK to just skip it - we'll check for it in
284
+ // schedule_read prior to any relevant calls into RL.
285
+ } ,
286
+ _ => {
287
+ eprintln ! ( "Failed to write first full message to socket!" ) ;
288
+ peer_manager. socket_disconnected ( & SocketDescriptor :: new ( Arc :: clone ( & us) ) ) ;
289
+ break Err ( ( ) ) ;
290
+ }
291
+ }
292
+ }
293
+ } ) . await {
278
294
Connection :: schedule_read ( peer_manager, us, reader, read_receiver, write_receiver) . await ;
279
295
}
280
296
} ) )
@@ -523,8 +539,7 @@ mod tests {
523
539
}
524
540
}
525
541
526
- #[ tokio:: test( threaded_scheduler) ]
527
- async fn basic_connection_test ( ) {
542
+ async fn do_basic_connection_test ( ) {
528
543
let secp_ctx = Secp256k1 :: new ( ) ;
529
544
let a_key = SecretKey :: from_slice ( & [ 1 ; 32 ] ) . unwrap ( ) ;
530
545
let b_key = SecretKey :: from_slice ( & [ 1 ; 32 ] ) . unwrap ( ) ;
@@ -589,4 +604,13 @@ mod tests {
589
604
fut_a. await ;
590
605
fut_b. await ;
591
606
}
607
+
608
+ #[ tokio:: test( threaded_scheduler) ]
609
+ async fn basic_threaded_connection_test ( ) {
610
+ do_basic_connection_test ( ) . await ;
611
+ }
612
+ #[ tokio:: test]
613
+ async fn basic_unthreaded_connection_test ( ) {
614
+ do_basic_connection_test ( ) . await ;
615
+ }
592
616
}
0 commit comments