@@ -70,7 +70,7 @@ use lightning::ln::peer_handler;
70
70
use lightning:: ln:: peer_handler:: SocketDescriptor as LnSocketTrait ;
71
71
use lightning:: ln:: msgs:: ChannelMessageHandler ;
72
72
73
- use std:: task;
73
+ use std:: { task, thread } ;
74
74
use std:: net:: SocketAddr ;
75
75
use std:: sync:: { Arc , Mutex , MutexGuard } ;
76
76
use std:: sync:: atomic:: { AtomicU64 , Ordering } ;
@@ -101,6 +101,11 @@ struct Connection {
101
101
// socket. To wake it up (without otherwise changing its state, we can push a value into this
102
102
// Sender.
103
103
read_waker : mpsc:: Sender < ( ) > ,
104
+ // When we are told by rust-lightning to disconnect, we can't return to rust-lightning until we
105
+ // are sure we won't call any more read/write PeerManager functions with the same connection.
106
+ // This is set to true if we're in such a condition (with disconnect checked before with the
107
+ // top-level mutex held) and false when we can return.
108
+ block_disconnect_socket : bool ,
104
109
read_paused : bool ,
105
110
rl_requested_disconnect : bool ,
106
111
id : u64 ,
@@ -135,35 +140,43 @@ impl Connection {
135
140
} }
136
141
}
137
142
143
+ macro_rules! prepare_read_write_call {
144
+ ( ) => { {
145
+ let mut us_lock = us. lock( ) . unwrap( ) ;
146
+ if us_lock. rl_requested_disconnect {
147
+ shutdown_socket!( "disconnect_socket() call from RL" , Disconnect :: CloseConnection ) ;
148
+ }
149
+ us_lock. block_disconnect_socket = true ;
150
+ } }
151
+ }
152
+
138
153
let read_paused = us. lock ( ) . unwrap ( ) . read_paused ;
139
154
tokio:: select! {
140
155
v = write_avail_receiver. recv( ) => {
141
156
assert!( v. is_some( ) ) ; // We can't have dropped the sending end, its in the us Arc!
142
- if us. lock( ) . unwrap( ) . rl_requested_disconnect {
143
- shutdown_socket!( "disconnect_socket() call from RL" , Disconnect :: CloseConnection ) ;
144
- }
157
+ prepare_read_write_call!( ) ;
145
158
if let Err ( e) = peer_manager. write_buffer_space_avail( & mut our_descriptor) {
146
159
shutdown_socket!( e, Disconnect :: CloseConnection ) ;
147
160
}
161
+ us. lock( ) . unwrap( ) . block_disconnect_socket = false ;
148
162
} ,
149
163
_ = read_wake_receiver. recv( ) => { } ,
150
164
read = reader. read( & mut buf) , if !read_paused => match read {
151
165
Ok ( 0 ) => shutdown_socket!( "Connection closed" , Disconnect :: PeerDisconnected ) ,
152
166
Ok ( len) => {
153
- if us. lock( ) . unwrap( ) . rl_requested_disconnect {
154
- shutdown_socket!( "disconnect_socket() call from RL" , Disconnect :: CloseConnection ) ;
155
- }
167
+ prepare_read_write_call!( ) ;
156
168
let read_res = peer_manager. read_event( & mut our_descriptor, & buf[ 0 ..len] ) ;
169
+ let mut us_lock = us. lock( ) . unwrap( ) ;
157
170
match read_res {
158
171
Ok ( pause_read) => {
159
- let mut us_lock = us. lock( ) . unwrap( ) ;
160
172
if pause_read {
161
173
us_lock. read_paused = true ;
162
174
}
163
175
Self :: event_trigger( & mut us_lock) ;
164
176
} ,
165
177
Err ( e) => shutdown_socket!( e, Disconnect :: CloseConnection ) ,
166
178
}
179
+ us_lock. block_disconnect_socket = false ;
167
180
} ,
168
181
Err ( e) => shutdown_socket!( e, Disconnect :: PeerDisconnected ) ,
169
182
} ,
@@ -195,8 +208,8 @@ impl Connection {
195
208
196
209
( reader, write_receiver, read_receiver,
197
210
Arc :: new ( Mutex :: new ( Self {
198
- writer : Some ( writer) , event_notify, write_avail, read_waker,
199
- read_paused : false , rl_requested_disconnect : false ,
211
+ writer : Some ( writer) , event_notify, write_avail, read_waker, read_paused : false ,
212
+ block_disconnect_socket : false , rl_requested_disconnect : false ,
200
213
id : ID_COUNTER . fetch_add ( 1 , Ordering :: AcqRel )
201
214
} ) ) )
202
215
}
@@ -403,15 +416,18 @@ impl peer_handler::SocketDescriptor for SocketDescriptor {
403
416
}
404
417
405
418
fn disconnect_socket ( & mut self ) {
406
- let mut us = self . conn . lock ( ) . unwrap ( ) ;
407
- us. rl_requested_disconnect = true ;
408
- us. read_paused = true ;
409
- // Wake up the sending thread, assuming it is still alive
410
- let _ = us. write_avail . try_send ( ( ) ) ;
411
- // TODO: There's a race where we don't meet the requirements of disconnect_socket if the
412
- // read task is about to call a PeerManager function (eg read_event or write_event).
413
- // Ideally we need to release the us lock and block until we have confirmation from the
414
- // read task that it has broken out of its main loop.
419
+ {
420
+ let mut us = self . conn . lock ( ) . unwrap ( ) ;
421
+ us. rl_requested_disconnect = true ;
422
+ us. read_paused = true ;
423
+ // Wake up the sending thread, assuming it is still alive
424
+ let _ = us. write_avail . try_send ( ( ) ) ;
425
+ // Happy-path return:
426
+ if !us. block_disconnect_socket { return ; }
427
+ }
428
+ while self . conn . lock ( ) . unwrap ( ) . block_disconnect_socket {
429
+ thread:: yield_now ( ) ;
430
+ }
415
431
}
416
432
}
417
433
impl Clone for SocketDescriptor {
0 commit comments