@@ -52,7 +52,7 @@ use lightning::ln::peer_handler;
52
52
use lightning:: ln:: peer_handler:: SocketDescriptor as LnSocketTrait ;
53
53
use lightning:: ln:: msgs:: ChannelMessageHandler ;
54
54
55
- use std:: task;
55
+ use std:: { task, thread } ;
56
56
use std:: net:: SocketAddr ;
57
57
use std:: sync:: { Arc , Mutex } ;
58
58
use std:: sync:: atomic:: { AtomicU64 , Ordering } ;
@@ -84,6 +84,11 @@ struct Connection {
84
84
// send into any read_blocker to wake the reading future back up and set read_paused back to
85
85
// false.
86
86
read_blocker : Option < oneshot:: Sender < ( ) > > ,
87
+ // When we are told by rust-lightning to disconnect, we can't return to rust-lightning until we
88
+ // are sure we won't call any more read/write PeerManager functions with the same connection.
89
+ // This is set to true if we're in such a condition (with disconnect checked before with the
90
+ // top-level mutex held) and false when we can return.
91
+ disconnect_block : bool ,
87
92
read_paused : bool ,
88
93
// If we get disconnected via SocketDescriptor::disconnect_socket(), we don't call
89
94
// disconnect_event(), but if we get an Err return value out of PeerManager, in general, we do.
@@ -105,20 +110,29 @@ impl Connection {
105
110
} }
106
111
}
107
112
113
+ macro_rules! prepare_read_write_call {
114
+ ( ) => { {
115
+ let mut us_lock = us. lock( ) . unwrap( ) ;
116
+ if us_lock. disconnect {
117
+ shutdown_socket!( "disconnect_socket() call from RL" ) ;
118
+ }
119
+ us_lock. disconnect_block = true ;
120
+ } }
121
+ }
122
+
108
123
// Whenever we want to block on reading or waiting for reading to resume, we have to
109
124
// at least select with the write_avail_receiver, which is used by the
110
125
// SocketDescriptor to wake us up if we need to shut down the socket or if we need
111
126
// to generate a write_buffer_space_avail call.
112
127
macro_rules! select_write_ev {
113
128
( $v: expr) => { {
114
129
assert!( $v. is_some( ) ) ; // We can't have dropped the sending end, its in the us Arc!
115
- if us. lock( ) . unwrap( ) . disconnect {
116
- shutdown_socket!( "disconnect_socket() call from RL" ) ;
117
- }
130
+ prepare_read_write_call!( ) ;
118
131
if let Err ( e) = peer_manager. write_buffer_space_avail( & mut SocketDescriptor :: new( us. clone( ) ) ) {
119
132
us. lock( ) . unwrap( ) . need_disconnect_event = false ;
120
133
shutdown_socket!( e) ;
121
134
}
135
+ us. lock( ) . unwrap( ) . disconnect_block = false ;
122
136
} }
123
137
}
124
138
@@ -151,6 +165,7 @@ impl Connection {
151
165
v = write_avail_receiver. recv( ) => select_write_ev!( v) ,
152
166
}
153
167
}
168
+ prepare_read_write_call!( ) ;
154
169
match peer_manager. read_event( & mut SocketDescriptor :: new( Arc :: clone( & us) ) , & buf[ 0 ..len] ) {
155
170
Ok ( pause_read) => {
156
171
if pause_read {
@@ -172,6 +187,7 @@ impl Connection {
172
187
shutdown_socket!( e)
173
188
} ,
174
189
}
190
+ us. lock( ) . unwrap( ) . disconnect_block = false ;
175
191
} ,
176
192
Err ( e) => {
177
193
println!( "Connection closed: {}" , e) ;
@@ -180,6 +196,7 @@ impl Connection {
180
196
} ,
181
197
}
182
198
}
199
+ us. lock ( ) . unwrap ( ) . disconnect_block = false ;
183
200
let writer_option = us. lock ( ) . unwrap ( ) . writer . take ( ) ;
184
201
if let Some ( mut writer) = writer_option {
185
202
// If the socket is already closed, shutdown() will fail, so just ignore it.
@@ -209,7 +226,7 @@ impl Connection {
209
226
210
227
( reader, receiver,
211
228
Arc :: new ( Mutex :: new ( Self {
212
- writer : Some ( writer) , event_notify, write_avail,
229
+ writer : Some ( writer) , event_notify, write_avail, disconnect_block : false ,
213
230
read_blocker : None , read_paused : false , need_disconnect_event : true , disconnect : false ,
214
231
id : ID_COUNTER . fetch_add ( 1 , Ordering :: AcqRel )
215
232
} ) ) )
@@ -380,16 +397,19 @@ impl peer_handler::SocketDescriptor for SocketDescriptor {
380
397
}
381
398
382
399
fn disconnect_socket ( & mut self ) {
383
- let mut us = self . conn . lock ( ) . unwrap ( ) ;
384
- us. need_disconnect_event = false ;
385
- us. disconnect = true ;
386
- us. read_paused = true ;
387
- // Wake up the sending thread, assuming its still alive
388
- let _ = us. write_avail . try_send ( ( ) ) ;
389
- // TODO: There's a race where we don't meet the requirements of disconnect_socket if the
390
- // read task is about to call a PeerManager function (eg read_event or write_event).
391
- // Ideally we need to release the us lock and block until we have confirmation from the
392
- // read task that it has broken out of its main loop.
400
+ {
401
+ let mut us = self . conn . lock ( ) . unwrap ( ) ;
402
+ us. need_disconnect_event = false ;
403
+ us. disconnect = true ;
404
+ us. read_paused = true ;
405
+ // Wake up the sending thread, assuming its still alive
406
+ let _ = us. write_avail . try_send ( ( ) ) ;
407
+ // Happy-path return:
408
+ if !us. disconnect_block { return ; }
409
+ }
410
+ while self . conn . lock ( ) . unwrap ( ) . disconnect_block {
411
+ thread:: yield_now ( ) ;
412
+ }
393
413
}
394
414
}
395
415
impl Clone for SocketDescriptor {
0 commit comments