@@ -70,7 +70,7 @@ use lightning::ln::peer_handler;
70
70
use lightning:: ln:: peer_handler:: SocketDescriptor as LnSocketTrait ;
71
71
use lightning:: ln:: msgs:: ChannelMessageHandler ;
72
72
73
- use std:: task;
73
+ use std:: { task, thread } ;
74
74
use std:: net:: SocketAddr ;
75
75
use std:: sync:: { Arc , Mutex } ;
76
76
use std:: sync:: atomic:: { AtomicU64 , Ordering } ;
@@ -112,6 +112,11 @@ struct Connection {
112
112
// send into any read_blocker to wake the reading future back up and set read_paused back to
113
113
// false.
114
114
read_blocker : Option < oneshot:: Sender < ( ) > > ,
115
+ // When we are told by rust-lightning to disconnect, we can't return to rust-lightning until we
116
+ // are sure we won't call any more read/write PeerManager functions with the same connection.
117
+ // This is set to true if we're in such a condition (with disconnect checked before with the
118
+ // top-level mutex held) and false when we can return.
119
+ block_disconnect_socket : bool ,
115
120
read_paused : bool ,
116
121
disconnect_state : DisconnectionState ,
117
122
id : u64 ,
@@ -129,20 +134,29 @@ impl Connection {
129
134
} }
130
135
}
131
136
137
+ macro_rules! prepare_read_write_call {
138
+ ( ) => { {
139
+ let mut us_lock = us. lock( ) . unwrap( ) ;
140
+ if us_lock. disconnect_state == DisconnectionState :: RLTriggeredDisconnect {
141
+ shutdown_socket!( "disconnect_socket() call from RL" ) ;
142
+ }
143
+ us_lock. block_disconnect_socket = true ;
144
+ } }
145
+ }
146
+
132
147
// Whenever we want to block on reading or waiting for reading to resume, we have to
133
148
// at least select with the write_avail_receiver, which is used by the
134
149
// SocketDescriptor to wake us up if we need to shut down the socket or if we need
135
150
// to generate a write_buffer_space_avail call.
136
151
macro_rules! select_write_ev {
137
152
( $v: expr) => { {
138
153
assert!( $v. is_some( ) ) ; // We can't have dropped the sending end, its in the us Arc!
139
- if us. lock( ) . unwrap( ) . disconnect_state == DisconnectionState :: RLTriggeredDisconnect {
140
- shutdown_socket!( "disconnect_socket() call from RL" ) ;
141
- }
154
+ prepare_read_write_call!( ) ;
142
155
if let Err ( e) = peer_manager. write_buffer_space_avail( & mut SocketDescriptor :: new( us. clone( ) ) ) {
143
156
us. lock( ) . unwrap( ) . disconnect_state = DisconnectionState :: RLTriggeredDisconnect ;
144
157
shutdown_socket!( e) ;
145
158
}
159
+ us. lock( ) . unwrap( ) . block_disconnect_socket = false ;
146
160
} }
147
161
}
148
162
@@ -175,6 +189,7 @@ impl Connection {
175
189
v = write_avail_receiver. recv( ) => select_write_ev!( v) ,
176
190
}
177
191
}
192
+ prepare_read_write_call!( ) ;
178
193
match peer_manager. read_event( & mut SocketDescriptor :: new( Arc :: clone( & us) ) , & buf[ 0 ..len] ) {
179
194
Ok ( pause_read) => {
180
195
if pause_read {
@@ -196,6 +211,7 @@ impl Connection {
196
211
shutdown_socket!( e)
197
212
} ,
198
213
}
214
+ us. lock( ) . unwrap( ) . block_disconnect_socket = false ;
199
215
} ,
200
216
Err ( e) => {
201
217
println!( "Connection closed: {}" , e) ;
@@ -204,6 +220,7 @@ impl Connection {
204
220
} ,
205
221
}
206
222
}
223
+ us. lock ( ) . unwrap ( ) . block_disconnect_socket = false ;
207
224
let writer_option = us. lock ( ) . unwrap ( ) . writer . take ( ) ;
208
225
if let Some ( mut writer) = writer_option {
209
226
// If the socket is already closed, shutdown() will fail, so just ignore it.
@@ -233,7 +250,7 @@ impl Connection {
233
250
234
251
( reader, receiver,
235
252
Arc :: new ( Mutex :: new ( Self {
236
- writer : Some ( writer) , event_notify, write_avail,
253
+ writer : Some ( writer) , event_notify, write_avail, block_disconnect_socket : false ,
237
254
read_blocker : None , read_paused : false , disconnect_state : DisconnectionState :: NeedDisconnectEvent ,
238
255
id : ID_COUNTER . fetch_add ( 1 , Ordering :: AcqRel )
239
256
} ) ) )
@@ -422,15 +439,18 @@ impl peer_handler::SocketDescriptor for SocketDescriptor {
422
439
}
423
440
424
441
fn disconnect_socket ( & mut self ) {
425
- let mut us = self . conn . lock ( ) . unwrap ( ) ;
426
- us. disconnect_state = DisconnectionState :: RLTriggeredDisconnect ;
427
- us. read_paused = true ;
428
- // Wake up the sending thread, assuming it is still alive
429
- let _ = us. write_avail . try_send ( ( ) ) ;
430
- // TODO: There's a race where we don't meet the requirements of disconnect_socket if the
431
- // read task is about to call a PeerManager function (eg read_event or write_event).
432
- // Ideally we need to release the us lock and block until we have confirmation from the
433
- // read task that it has broken out of its main loop.
442
+ {
443
+ let mut us = self . conn . lock ( ) . unwrap ( ) ;
444
+ us. disconnect_state = DisconnectionState :: RLTriggeredDisconnect ;
445
+ us. read_paused = true ;
446
+ // Wake up the sending thread, assuming it is still alive
447
+ let _ = us. write_avail . try_send ( ( ) ) ;
448
+ // Happy-path return:
449
+ if !us. block_disconnect_socket { return ; }
450
+ }
451
+ while self . conn . lock ( ) . unwrap ( ) . block_disconnect_socket {
452
+ thread:: yield_now ( ) ;
453
+ }
434
454
}
435
455
}
436
456
impl Clone for SocketDescriptor {
0 commit comments