@@ -70,7 +70,7 @@ use lightning::ln::peer_handler;
7070use lightning:: ln:: peer_handler:: SocketDescriptor as LnSocketTrait ;
7171use lightning:: ln:: msgs:: ChannelMessageHandler ;
7272
73- use std:: task;
73+ use std:: { task, thread } ;
7474use std:: net:: SocketAddr ;
7575use std:: sync:: { Arc , Mutex , MutexGuard } ;
7676use std:: sync:: atomic:: { AtomicU64 , Ordering } ;
@@ -101,6 +101,11 @@ struct Connection {
101101 // socket. To wake it up (without otherwise changing its state, we can push a value into this
102102 // Sender.
103103 read_waker : mpsc:: Sender < ( ) > ,
104+ // When we are told by rust-lightning to disconnect, we can't return to rust-lightning until we
105+ // are sure we won't call any more read/write PeerManager functions with the same connection.
106+ // This is set to true if we're in such a condition (with disconnect checked before with the
107+ // top-level mutex held) and false when we can return.
108+ block_disconnect_socket : bool ,
104109 read_paused : bool ,
105110 rl_requested_disconnect : bool ,
106111 id : u64 ,
@@ -137,35 +142,43 @@ impl Connection {
137142 } }
138143 }
139144
145+ macro_rules! prepare_read_write_call {
146+ ( ) => { {
147+ let mut us_lock = us. lock( ) . unwrap( ) ;
148+ if us_lock. rl_requested_disconnect {
149+ shutdown_socket!( "disconnect_socket() call from RL" , Disconnect :: CloseConnection ) ;
150+ }
151+ us_lock. block_disconnect_socket = true ;
152+ } }
153+ }
154+
140155 let read_paused = us. lock ( ) . unwrap ( ) . read_paused ;
141156 tokio:: select! {
142157 v = write_avail_receiver. recv( ) => {
143158 assert!( v. is_some( ) ) ; // We can't have dropped the sending end, its in the us Arc!
144- if us. lock( ) . unwrap( ) . rl_requested_disconnect {
145- shutdown_socket!( "disconnect_socket() call from RL" , Disconnect :: CloseConnection ) ;
146- }
159+ prepare_read_write_call!( ) ;
147160 if let Err ( e) = peer_manager. write_buffer_space_avail( & mut our_descriptor) {
148161 shutdown_socket!( e, Disconnect :: CloseConnection ) ;
149162 }
163+ us. lock( ) . unwrap( ) . block_disconnect_socket = false ;
150164 } ,
151165 _ = read_wake_receiver. recv( ) => { } ,
152166 read = reader. read( & mut buf) , if !read_paused => match read {
153167 Ok ( 0 ) => shutdown_socket!( "Connection closed" , Disconnect :: PeerDisconnected ) ,
154168 Ok ( len) => {
155- if us. lock( ) . unwrap( ) . rl_requested_disconnect {
156- shutdown_socket!( "disconnect_socket() call from RL" , Disconnect :: CloseConnection ) ;
157- }
169+ prepare_read_write_call!( ) ;
158170 let read_res = peer_manager. read_event( & mut our_descriptor, & buf[ 0 ..len] ) ;
171+ let mut us_lock = us. lock( ) . unwrap( ) ;
159172 match read_res {
160173 Ok ( pause_read) => {
161- let mut us_lock = us. lock( ) . unwrap( ) ;
162174 if pause_read {
163175 us_lock. read_paused = true ;
164176 }
165177 Self :: event_trigger( & mut us_lock) ;
166178 } ,
167179 Err ( e) => shutdown_socket!( e, Disconnect :: CloseConnection ) ,
168180 }
181+ us_lock. block_disconnect_socket = false ;
169182 } ,
170183 Err ( e) => shutdown_socket!( e, Disconnect :: PeerDisconnected ) ,
171184 } ,
@@ -197,8 +210,8 @@ impl Connection {
197210
198211 ( reader, write_receiver, read_receiver,
199212 Arc :: new ( Mutex :: new ( Self {
200- writer : Some ( writer) , event_notify, write_avail, read_waker,
201- read_paused : false , rl_requested_disconnect : false ,
213+ writer : Some ( writer) , event_notify, write_avail, read_waker, read_paused : false ,
214+ block_disconnect_socket : false , rl_requested_disconnect : false ,
202215 id : ID_COUNTER . fetch_add ( 1 , Ordering :: AcqRel )
203216 } ) ) )
204217 }
@@ -405,15 +418,18 @@ impl peer_handler::SocketDescriptor for SocketDescriptor {
405418 }
406419
407420 fn disconnect_socket ( & mut self ) {
408- let mut us = self . conn . lock ( ) . unwrap ( ) ;
409- us. rl_requested_disconnect = true ;
410- us. read_paused = true ;
411- // Wake up the sending thread, assuming it is still alive
412- let _ = us. write_avail . try_send ( ( ) ) ;
413- // TODO: There's a race where we don't meet the requirements of disconnect_socket if the
414- // read task is about to call a PeerManager function (eg read_event or write_event).
415- // Ideally we need to release the us lock and block until we have confirmation from the
416- // read task that it has broken out of its main loop.
421+ {
422+ let mut us = self . conn . lock ( ) . unwrap ( ) ;
423+ us. rl_requested_disconnect = true ;
424+ us. read_paused = true ;
425+ // Wake up the sending thread, assuming it is still alive
426+ let _ = us. write_avail . try_send ( ( ) ) ;
427+ // Happy-path return:
428+ if !us. block_disconnect_socket { return ; }
429+ }
430+ while self . conn . lock ( ) . unwrap ( ) . block_disconnect_socket {
431+ thread:: yield_now ( ) ;
432+ }
417433 }
418434}
419435impl Clone for SocketDescriptor {
0 commit comments