@@ -268,12 +268,17 @@ impl Connection {
268
268
stream. set_nonblocking ( true ) . unwrap ( ) ;
269
269
let tokio_stream = Arc :: new ( TcpStream :: from_std ( stream) . unwrap ( ) ) ;
270
270
271
- ( Arc :: clone ( & tokio_stream) , write_receiver, read_receiver,
272
- Arc :: new ( Mutex :: new ( Self {
273
- writer : Some ( tokio_stream) , write_avail, read_waker, read_paused : false ,
271
+ let id = ID_COUNTER . fetch_add ( 1 , Ordering :: AcqRel ) ;
272
+ let writer = Some ( Arc :: clone ( & tokio_stream) ) ;
273
+ let conn = Arc :: new ( Mutex :: new ( Self {
274
+ writer,
275
+ write_avail,
276
+ read_waker,
277
+ read_paused : false ,
274
278
rl_requested_disconnect : false ,
275
- id : ID_COUNTER . fetch_add ( 1 , Ordering :: AcqRel )
276
- } ) ) )
279
+ id
280
+ } ) ) ;
281
+ ( tokio_stream, write_receiver, read_receiver, conn)
277
282
}
278
283
}
279
284
@@ -308,7 +313,8 @@ where PM::Target: APeerManager<Descriptor = SocketDescriptor> {
308
313
let last_us = Arc :: clone ( & us) ;
309
314
310
315
let handle_opt = if peer_manager. as_ref ( ) . new_inbound_connection ( SocketDescriptor :: new ( us. clone ( ) ) , remote_addr) . is_ok ( ) {
311
- Some ( tokio:: spawn ( Connection :: schedule_read ( peer_manager, us, reader, read_receiver, write_receiver) ) )
316
+ let handle = tokio:: spawn ( Connection :: schedule_read ( peer_manager, us, reader, read_receiver, write_receiver) ) ;
317
+ Some ( handle)
312
318
} else {
313
319
// Note that we will skip socket_disconnected here, in accordance with the PeerManager
314
320
// requirements.
@@ -350,13 +356,13 @@ where PM::Target: APeerManager<Descriptor = SocketDescriptor> {
350
356
#[ cfg( test) ]
351
357
let last_us = Arc :: clone ( & us) ;
352
358
let handle_opt = if let Ok ( initial_send) = peer_manager. as_ref ( ) . new_outbound_connection ( their_node_id, SocketDescriptor :: new ( us. clone ( ) ) , remote_addr) {
353
- Some ( tokio:: spawn ( async move {
359
+ let handle = tokio:: spawn ( async move {
354
360
// We should essentially always have enough room in a TCP socket buffer to send the
355
361
// initial 10s of bytes. However, tokio running in single-threaded mode will always
356
362
// fail writes and wake us back up later to write. Thus, we handle a single
357
363
// std::task::Poll::Pending but still expect to write the full set of bytes at once
358
364
// and use a relatively tight timeout.
359
- if let Ok ( Ok ( ( ) ) ) = tokio :: time :: timeout ( Duration :: from_millis ( 100 ) , async {
365
+ let send_fut = async {
360
366
loop {
361
367
match SocketDescriptor :: new ( us. clone ( ) ) . send_data ( & initial_send, true ) {
362
368
v if v == initial_send. len ( ) => break Ok ( ( ) ) ,
@@ -373,10 +379,13 @@ where PM::Target: APeerManager<Descriptor = SocketDescriptor> {
373
379
}
374
380
}
375
381
}
376
- } ) . await {
382
+ } ;
383
+ let timeout_send_fut = tokio:: time:: timeout ( Duration :: from_millis ( 100 ) , send_fut) ;
384
+ if let Ok ( Ok ( ( ) ) ) = timeout_send_fut. await {
377
385
Connection :: schedule_read ( peer_manager, us, reader, read_receiver, write_receiver) . await ;
378
386
}
379
- } ) )
387
+ } ) ;
388
+ Some ( handle)
380
389
} else {
381
390
// Note that we will skip socket_disconnected here, in accordance with the PeerManager
382
391
// requirements.
@@ -417,7 +426,8 @@ pub async fn connect_outbound<PM: Deref + 'static + Send + Sync + Clone>(
417
426
addr : SocketAddr ,
418
427
) -> Option < impl std:: future:: Future < Output =( ) > >
419
428
where PM :: Target : APeerManager < Descriptor = SocketDescriptor > {
420
- if let Ok ( Ok ( stream) ) = time:: timeout ( Duration :: from_secs ( 10 ) , async { TcpStream :: connect ( & addr) . await . map ( |s| s. into_std ( ) . unwrap ( ) ) } ) . await {
429
+ let connect_fut = async { TcpStream :: connect ( & addr) . await . map ( |s| s. into_std ( ) . unwrap ( ) ) } ;
430
+ if let Ok ( Ok ( stream) ) = time:: timeout ( Duration :: from_secs ( 10 ) , connect_fut) . await {
421
431
Some ( setup_outbound ( peer_manager, their_node_id, stream) )
422
432
} else { None }
423
433
}
@@ -560,7 +570,7 @@ impl Hash for SocketDescriptor {
560
570
mod tests {
561
571
use lightning:: ln:: features:: * ;
562
572
use lightning:: ln:: msgs:: * ;
563
- use lightning:: ln:: peer_handler:: { MessageHandler , PeerManager } ;
573
+ use lightning:: ln:: peer_handler:: { MessageHandler , PeerManager , IgnoringMessageHandler } ;
564
574
use lightning:: routing:: gossip:: NodeId ;
565
575
use lightning:: events:: * ;
566
576
use lightning:: util:: test_utils:: TestNodeSigner ;
@@ -699,12 +709,13 @@ mod tests {
699
709
disconnected_flag : AtomicBool :: new ( false ) ,
700
710
msg_events : Mutex :: new ( Vec :: new ( ) ) ,
701
711
} ) ;
702
- let a_manager = Arc :: new ( PeerManager :: new ( MessageHandler {
712
+ let a_msg_handler = MessageHandler {
703
713
chan_handler : Arc :: clone ( & a_handler) ,
704
714
route_handler : Arc :: clone ( & a_handler) ,
705
- onion_message_handler : Arc :: new ( lightning:: ln:: peer_handler:: IgnoringMessageHandler { } ) ,
706
- custom_message_handler : Arc :: new ( lightning:: ln:: peer_handler:: IgnoringMessageHandler { } ) ,
707
- } , 0 , & [ 1 ; 32 ] , Arc :: new ( TestLogger ( ) ) , Arc :: new ( TestNodeSigner :: new ( a_key) ) ) ) ;
715
+ onion_message_handler : Arc :: new ( IgnoringMessageHandler { } ) ,
716
+ custom_message_handler : Arc :: new ( IgnoringMessageHandler { } ) ,
717
+ } ;
718
+ let a_manager = Arc :: new ( PeerManager :: new ( a_msg_handler, 0 , & [ 1 ; 32 ] , Arc :: new ( TestLogger ( ) ) , Arc :: new ( TestNodeSigner :: new ( a_key) ) ) ) ;
708
719
709
720
let ( b_connected_sender, mut b_connected) = mpsc:: channel ( 1 ) ;
710
721
let ( b_disconnected_sender, mut b_disconnected) = mpsc:: channel ( 1 ) ;
@@ -715,12 +726,13 @@ mod tests {
715
726
disconnected_flag : AtomicBool :: new ( false ) ,
716
727
msg_events : Mutex :: new ( Vec :: new ( ) ) ,
717
728
} ) ;
718
- let b_manager = Arc :: new ( PeerManager :: new ( MessageHandler {
729
+ let b_msg_handler = MessageHandler {
719
730
chan_handler : Arc :: clone ( & b_handler) ,
720
731
route_handler : Arc :: clone ( & b_handler) ,
721
- onion_message_handler : Arc :: new ( lightning:: ln:: peer_handler:: IgnoringMessageHandler { } ) ,
722
- custom_message_handler : Arc :: new ( lightning:: ln:: peer_handler:: IgnoringMessageHandler { } ) ,
723
- } , 0 , & [ 2 ; 32 ] , Arc :: new ( TestLogger ( ) ) , Arc :: new ( TestNodeSigner :: new ( b_key) ) ) ) ;
732
+ onion_message_handler : Arc :: new ( IgnoringMessageHandler { } ) ,
733
+ custom_message_handler : Arc :: new ( IgnoringMessageHandler { } ) ,
734
+ } ;
735
+ let b_manager = Arc :: new ( PeerManager :: new ( b_msg_handler, 0 , & [ 2 ; 32 ] , Arc :: new ( TestLogger ( ) ) , Arc :: new ( TestNodeSigner :: new ( b_key) ) ) ) ;
724
736
725
737
// We bind on localhost, hoping the environment is properly configured with a local
726
738
// address. This may not always be the case in containers and the like, so if this test is
@@ -769,12 +781,13 @@ mod tests {
769
781
let b_key = SecretKey :: from_slice ( & [ 2 ; 32 ] ) . unwrap ( ) ;
770
782
let b_pub = PublicKey :: from_secret_key ( & secp_ctx, & b_key) ;
771
783
772
- let a_manager = Arc :: new ( PeerManager :: new ( MessageHandler {
784
+ let a_msg_handler = MessageHandler {
773
785
chan_handler : Arc :: new ( lightning:: ln:: peer_handler:: ErroringMessageHandler :: new ( ) ) ,
774
- onion_message_handler : Arc :: new ( lightning :: ln :: peer_handler :: IgnoringMessageHandler { } ) ,
786
+ onion_message_handler : Arc :: new ( IgnoringMessageHandler { } ) ,
775
787
route_handler : Arc :: new ( lightning:: ln:: peer_handler:: IgnoringMessageHandler { } ) ,
776
- custom_message_handler : Arc :: new ( lightning:: ln:: peer_handler:: IgnoringMessageHandler { } ) ,
777
- } , 0 , & [ 1 ; 32 ] , Arc :: new ( TestLogger ( ) ) , Arc :: new ( TestNodeSigner :: new ( a_key) ) ) ) ;
788
+ custom_message_handler : Arc :: new ( IgnoringMessageHandler { } ) ,
789
+ } ;
790
+ let a_manager = Arc :: new ( PeerManager :: new ( a_msg_handler, 0 , & [ 1 ; 32 ] , Arc :: new ( TestLogger ( ) ) , Arc :: new ( TestNodeSigner :: new ( a_key) ) ) ) ;
778
791
779
792
// Make two connections, one for an inbound and one for an outbound connection
780
793
let conn_a = {
0 commit comments