@@ -860,6 +860,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
860
860
let features = InitFeatures :: known ( ) ;
861
861
let resp = msgs:: Init { features } ;
862
862
self . enqueue_message ( peer, & resp) ;
863
+ peer. awaiting_pong_timer_tick_intervals = 0 ;
863
864
} ,
864
865
NextNoiseStep :: ActThree => {
865
866
let their_node_id = try_potential_handleerror ! ( peer. channel_encryptor. process_act_three( & peer. pending_read_buffer[ ..] ) ) ;
@@ -870,6 +871,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
870
871
let features = InitFeatures :: known ( ) ;
871
872
let resp = msgs:: Init { features } ;
872
873
self . enqueue_message ( peer, & resp) ;
874
+ peer. awaiting_pong_timer_tick_intervals = 0 ;
873
875
} ,
874
876
NextNoiseStep :: NoiseComplete => {
875
877
if peer. pending_read_is_header {
@@ -1530,12 +1532,29 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
1530
1532
let peer_count = peers. len ( ) ;
1531
1533
1532
1534
peers. retain ( |descriptor, peer| {
1533
- if !peer. channel_encryptor . is_ready_for_encryption ( ) {
1534
- // The peer needs to complete its handshake before we can exchange messages
1535
+ let mut do_disconnect_peer = false ;
1536
+ if !peer. channel_encryptor . is_ready_for_encryption ( ) || peer. their_node_id . is_none ( ) {
1537
+ // The peer needs to complete its handshake before we can exchange messages. We
1538
+ // give peers one timer tick to complete handshake, reusing
1539
+ // `awaiting_pong_timer_tick_intervals` to track number of timer ticks taken
1540
+ // for handshake completion.
1541
+ if peer. awaiting_pong_timer_tick_intervals != 0 {
1542
+ do_disconnect_peer = true ;
1543
+ } else {
1544
+ peer. awaiting_pong_timer_tick_intervals = 1 ;
1545
+ return true ;
1546
+ }
1547
+ }
1548
+
1549
+ if peer. awaiting_pong_timer_tick_intervals == -1 {
1550
+ // Magic value set in `maybe_send_extra_ping`.
1551
+ peer. awaiting_pong_timer_tick_intervals = 1 ;
1552
+ peer. received_message_since_timer_tick = false ;
1535
1553
return true ;
1536
1554
}
1537
1555
1538
- if ( peer. awaiting_pong_timer_tick_intervals > 0 && !peer. received_message_since_timer_tick )
1556
+ if do_disconnect_peer
1557
+ || ( peer. awaiting_pong_timer_tick_intervals > 0 && !peer. received_message_since_timer_tick )
1539
1558
|| peer. awaiting_pong_timer_tick_intervals as u64 >
1540
1559
MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER as u64 * peer_count as u64
1541
1560
{
@@ -1546,21 +1565,11 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
1546
1565
node_id_to_descriptor. remove ( & node_id) ;
1547
1566
self . message_handler . chan_handler . peer_disconnected ( & node_id, false ) ;
1548
1567
}
1549
- None => {
1550
- // This can't actually happen as we should have hit
1551
- // is_ready_for_encryption() previously on this same peer.
1552
- unreachable ! ( ) ;
1553
- } ,
1568
+ None => { } ,
1554
1569
}
1555
1570
return false ;
1556
1571
}
1557
-
1558
1572
peer. received_message_since_timer_tick = false ;
1559
- if peer. awaiting_pong_timer_tick_intervals == -1 {
1560
- // Magic value set in `maybe_send_extra_ping`.
1561
- peer. awaiting_pong_timer_tick_intervals = 1 ;
1562
- return true ;
1563
- }
1564
1573
1565
1574
if peer. awaiting_pong_timer_tick_intervals > 0 {
1566
1575
peer. awaiting_pong_timer_tick_intervals += 1 ;
@@ -1758,4 +1767,37 @@ mod tests {
1758
1767
assert_eq ! ( cfgs[ 1 ] . routing_handler. chan_upds_recvd. load( Ordering :: Acquire ) , 100 ) ;
1759
1768
assert_eq ! ( cfgs[ 1 ] . routing_handler. chan_anns_recvd. load( Ordering :: Acquire ) , 50 ) ;
1760
1769
}
1770
+
1771
+ #[ test]
1772
+ fn test_handshake_timeout ( ) {
1773
+ // Tests that we time out a peer still waiting on handshake completion after a full timer
1774
+ // tick.
1775
+ let cfgs = create_peermgr_cfgs ( 2 ) ;
1776
+ cfgs[ 0 ] . routing_handler . request_full_sync . store ( true , Ordering :: Release ) ;
1777
+ cfgs[ 1 ] . routing_handler . request_full_sync . store ( true , Ordering :: Release ) ;
1778
+ let peers = create_network ( 2 , & cfgs) ;
1779
+
1780
+ let secp_ctx = Secp256k1 :: new ( ) ;
1781
+ let a_id = PublicKey :: from_secret_key ( & secp_ctx, & peers[ 0 ] . our_node_secret ) ;
1782
+ let mut fd_a = FileDescriptor { fd : 1 , outbound_data : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) } ;
1783
+ let mut fd_b = FileDescriptor { fd : 1 , outbound_data : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) } ;
1784
+ let initial_data = peers[ 1 ] . new_outbound_connection ( a_id, fd_b. clone ( ) ) . unwrap ( ) ;
1785
+ peers[ 0 ] . new_inbound_connection ( fd_a. clone ( ) ) . unwrap ( ) ;
1786
+
1787
+ // If we get a single timer tick before completion, that's fine
1788
+ assert_eq ! ( peers[ 0 ] . peers. lock( ) . unwrap( ) . peers. len( ) , 1 ) ;
1789
+ peers[ 0 ] . timer_tick_occurred ( ) ;
1790
+ assert_eq ! ( peers[ 0 ] . peers. lock( ) . unwrap( ) . peers. len( ) , 1 ) ;
1791
+
1792
+ assert_eq ! ( peers[ 0 ] . read_event( & mut fd_a, & initial_data) . unwrap( ) , false ) ;
1793
+ peers[ 0 ] . process_events ( ) ;
1794
+ assert_eq ! ( peers[ 1 ] . read_event( & mut fd_b, & fd_a. outbound_data. lock( ) . unwrap( ) . split_off( 0 ) ) . unwrap( ) , false ) ;
1795
+ peers[ 1 ] . process_events ( ) ;
1796
+
1797
+ // ...but if we get a second timer tick, we should disconnect the peer
1798
+ peers[ 0 ] . timer_tick_occurred ( ) ;
1799
+ assert_eq ! ( peers[ 0 ] . peers. lock( ) . unwrap( ) . peers. len( ) , 0 ) ;
1800
+
1801
+ assert ! ( peers[ 0 ] . read_event( & mut fd_a, & fd_b. outbound_data. lock( ) . unwrap( ) . split_off( 0 ) ) . is_err( ) ) ;
1802
+ }
1761
1803
}
0 commit comments