@@ -32,6 +32,7 @@ use chain::chaininterface::{BroadcasterInterface, FeeEstimator};
32
32
use chain:: channelmonitor:: { ChannelMonitor , ChannelMonitorUpdate , Balance , MonitorEvent , MonitorUpdated , TransactionOutputs } ;
33
33
use chain:: transaction:: { OutPoint , TransactionData } ;
34
34
use chain:: keysinterface:: Sign ;
35
+ use util:: atomic_counter:: AtomicCounter ;
35
36
use util:: logger:: Logger ;
36
37
use util:: events;
37
38
use util:: events:: EventHandler ;
@@ -40,10 +41,12 @@ use ln::channelmanager::ChannelDetails;
40
41
use prelude:: * ;
41
42
use sync:: { RwLock , RwLockReadGuard , Mutex } ;
42
43
use core:: ops:: Deref ;
44
+ use core:: sync:: atomic:: { AtomicBool , Ordering } ;
43
45
44
46
#[ derive( Clone , Copy , Hash , PartialEq , Eq ) ]
45
47
pub ( crate ) enum MonitorUpdate {
46
48
MonitorUpdateId ( u64 ) ,
49
+ SyncPersistId ( u64 ) ,
47
50
}
48
51
49
52
/// An opaque identifier describing a specific [`Persist`] method call.
@@ -90,6 +93,9 @@ pub trait Persist<ChannelSigner: Sign> {
90
93
/// updated monitor itself to disk/backups. See the `Persist` trait documentation for more
91
94
/// details.
92
95
///
96
+ /// During blockchain synchronization operations, this may be called with no
97
+ /// [`ChannelMonitorUpdate`], in which case the full [`ChannelMonitor`] needs to be persisted.
98
+ ///
93
99
/// If an implementer chooses to persist the updates only, they need to make
94
100
/// sure that all the updates are applied to the `ChannelMonitors` *before*
95
101
/// the set of channel monitors is given to the `ChannelManager`
@@ -107,7 +113,7 @@ pub trait Persist<ChannelSigner: Sign> {
107
113
/// [`ChannelMonitorUpdateErr`] for requirements when returning errors.
108
114
///
109
115
/// [`Writeable::write`]: crate::util::ser::Writeable::write
110
- fn update_persisted_channel ( & self , id : OutPoint , update : & ChannelMonitorUpdate , data : & ChannelMonitor < ChannelSigner > , update_id : MonitorUpdateId ) -> Result < ( ) , ChannelMonitorUpdateErr > ;
116
+ fn update_persisted_channel ( & self , id : OutPoint , update : & Option < ChannelMonitorUpdate > , data : & ChannelMonitor < ChannelSigner > , update_id : MonitorUpdateId ) -> Result < ( ) , ChannelMonitorUpdateErr > ;
111
117
}
112
118
113
119
struct MonitorHolder < ChannelSigner : Sign > {
@@ -118,7 +124,24 @@ struct MonitorHolder<ChannelSigner: Sign> {
118
124
/// update_persisted_channel, the user returns a TemporaryFailure, and then calls
119
125
/// channel_monitor_updated immediately, racing our insertion of the pending update into the
120
126
/// contained Vec.
127
+ ///
128
+ /// Beyond the synchronization of updates themselves, we cannot handle user events until after
129
+ /// any chain updates have been stored on disk. Thus, we scan this list when returning updates
130
+ /// to the ChannelManager, refusing to return any updates for a ChannelMonitor which is still
131
+ /// being persisted fully ro disk after a chain update.
132
+ ///
133
+ /// This avoids the possibility of handling, e.g. an on-chain claim, generating a claim monitor
134
+ /// event, resulting in the relevant ChannelManager generating a PaymentSent event and dropping
135
+ /// the pending payment entry, and then reloading before the monitor is persisted, resulting in
136
+ /// the ChannelManager re-adding the same payment entry, before the same block is replayed,
137
+ /// resulting in a duplicate PaymentSent event.
121
138
pending_monitor_updates : Mutex < Vec < MonitorUpdateId > > ,
139
+ /// When the user returns a PermanentFailure error from an update_persisted_channel call during
140
+ /// block processing, we inform the ChannelManager that the channel should be closed
141
+ /// asynchronously. In order to ensure no further changes happen before the ChannelManager has
142
+ /// processed the closure event, we set this to true and return PermanentFailure for any other
143
+ /// chain::Watch events.
144
+ channel_closed : AtomicBool ,
122
145
}
123
146
124
147
/// A read-only reference to a current ChannelMonitor.
@@ -154,6 +177,7 @@ pub struct ChainMonitor<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: De
154
177
P :: Target : Persist < ChannelSigner > ,
155
178
{
156
179
monitors : RwLock < HashMap < OutPoint , MonitorHolder < ChannelSigner > > > ,
180
+ sync_persistence_id : AtomicCounter ,
157
181
chain_source : Option < C > ,
158
182
broadcaster : T ,
159
183
logger : L ,
@@ -183,26 +207,50 @@ where C::Target: chain::Filter,
183
207
FN : Fn ( & ChannelMonitor < ChannelSigner > , & TransactionData ) -> Vec < TransactionOutputs >
184
208
{
185
209
let mut dependent_txdata = Vec :: new ( ) ;
186
- let monitors = self . monitors . read ( ) . unwrap ( ) ;
187
- for monitor_state in monitors. values ( ) {
188
- let mut txn_outputs = process ( & monitor_state. monitor , txdata) ;
210
+ {
211
+ let monitors = self . monitors . write ( ) . unwrap ( ) ;
212
+ for ( funding_outpoint, monitor_state) in monitors. iter ( ) {
213
+ let monitor = & monitor_state. monitor ;
214
+ let mut txn_outputs;
215
+ {
216
+ txn_outputs = process ( monitor, txdata) ;
217
+ let update_id = MonitorUpdateId {
218
+ contents : MonitorUpdate :: SyncPersistId ( self . sync_persistence_id . get_increment ( ) ) ,
219
+ } ;
220
+ let mut pending_monitor_updates = monitor_state. pending_monitor_updates . lock ( ) . unwrap ( ) ;
221
+
222
+ log_trace ! ( self . logger, "Syncing Channel Monitor for channel {}" , log_funding_info!( monitor) ) ;
223
+ match self . persister . update_persisted_channel ( * funding_outpoint, & None , monitor, update_id) {
224
+ Ok ( ( ) ) =>
225
+ log_trace ! ( self . logger, "Finished syncing Channel Monitor for channel {}" , log_funding_info!( monitor) ) ,
226
+ Err ( ChannelMonitorUpdateErr :: PermanentFailure ) => {
227
+ monitor_state. channel_closed . store ( true , Ordering :: Release ) ;
228
+ self . user_provided_events . lock ( ) . unwrap ( ) . push ( MonitorEvent :: UpdateFailed ( * funding_outpoint) ) ;
229
+ } ,
230
+ Err ( ChannelMonitorUpdateErr :: TemporaryFailure ) => {
231
+ log_debug ! ( self . logger, "Channel Monitor sync for channel {} in progress, holding events until completion!" , log_funding_info!( monitor) ) ;
232
+ pending_monitor_updates. push ( update_id) ;
233
+ } ,
234
+ }
235
+ }
189
236
190
- // Register any new outputs with the chain source for filtering, storing any dependent
191
- // transactions from within the block that previously had not been included in txdata.
192
- if let Some ( ref chain_source) = self . chain_source {
193
- let block_hash = header. block_hash ( ) ;
194
- for ( txid, mut outputs) in txn_outputs. drain ( ..) {
195
- for ( idx, output) in outputs. drain ( ..) {
196
- // Register any new outputs with the chain source for filtering and recurse
197
- // if it indicates that there are dependent transactions within the block
198
- // that had not been previously included in txdata.
199
- let output = WatchedOutput {
200
- block_hash : Some ( block_hash) ,
201
- outpoint : OutPoint { txid, index : idx as u16 } ,
202
- script_pubkey : output. script_pubkey ,
203
- } ;
204
- if let Some ( tx) = chain_source. register_output ( output) {
205
- dependent_txdata. push ( tx) ;
237
+ // Register any new outputs with the chain source for filtering, storing any dependent
238
+ // transactions from within the block that previously had not been included in txdata.
239
+ if let Some ( ref chain_source) = self . chain_source {
240
+ let block_hash = header. block_hash ( ) ;
241
+ for ( txid, mut outputs) in txn_outputs. drain ( ..) {
242
+ for ( idx, output) in outputs. drain ( ..) {
243
+ // Register any new outputs with the chain source for filtering and recurse
244
+ // if it indicates that there are dependent transactions within the block
245
+ // that had not been previously included in txdata.
246
+ let output = WatchedOutput {
247
+ block_hash : Some ( block_hash) ,
248
+ outpoint : OutPoint { txid, index : idx as u16 } ,
249
+ script_pubkey : output. script_pubkey ,
250
+ } ;
251
+ if let Some ( tx) = chain_source. register_output ( output) {
252
+ dependent_txdata. push ( tx) ;
253
+ }
206
254
}
207
255
}
208
256
}
@@ -228,6 +276,7 @@ where C::Target: chain::Filter,
228
276
pub fn new ( chain_source : Option < C > , broadcaster : T , logger : L , feeest : F , persister : P ) -> Self {
229
277
Self {
230
278
monitors : RwLock :: new ( HashMap :: new ( ) ) ,
279
+ sync_persistence_id : AtomicCounter :: new ( ) ,
231
280
chain_source,
232
281
broadcaster,
233
282
logger,
@@ -300,7 +349,7 @@ where C::Target: chain::Filter,
300
349
pending_monitor_updates. retain ( |update_id| * update_id != completed_update_id) ;
301
350
302
351
match completed_update_id {
303
- MonitorUpdateId { .. } => {
352
+ MonitorUpdateId { contents : MonitorUpdate :: MonitorUpdateId ( _ ) } => {
304
353
let monitor_update_pending_updates = pending_monitor_updates. iter ( ) . filter ( |update_id|
305
354
if let MonitorUpdate :: MonitorUpdateId ( _) = update_id. contents { true } else { false } ) . count ( ) ;
306
355
if monitor_update_pending_updates != 0 {
@@ -312,7 +361,12 @@ where C::Target: chain::Filter,
312
361
funding_txo,
313
362
monitor_update_id : monitor_data. monitor . get_latest_update_id ( ) ,
314
363
} ) ) ;
315
- }
364
+ } ,
365
+ MonitorUpdateId { contents : MonitorUpdate :: SyncPersistId ( _) } => {
366
+ // We've already done everything we need to, the next time release_monitor_events
367
+ // is called, any events for this ChannelMonitor will be returned if there's no
368
+ // more SyncPersistId events left.
369
+ } ,
316
370
}
317
371
}
318
372
@@ -458,7 +512,11 @@ where C::Target: chain::Filter,
458
512
monitor. load_outputs_to_watch ( chain_source) ;
459
513
}
460
514
}
461
- entry. insert ( MonitorHolder { monitor, pending_monitor_updates : Mutex :: new ( pending_monitor_updates) } ) ;
515
+ entry. insert ( MonitorHolder {
516
+ monitor,
517
+ pending_monitor_updates : Mutex :: new ( pending_monitor_updates) ,
518
+ channel_closed : AtomicBool :: new ( false ) ,
519
+ } ) ;
462
520
update_res
463
521
}
464
522
@@ -492,7 +550,7 @@ where C::Target: chain::Filter,
492
550
contents : MonitorUpdate :: MonitorUpdateId ( update. update_id ) ,
493
551
} ;
494
552
let mut pending_monitor_updates = monitor_state. pending_monitor_updates . lock ( ) . unwrap ( ) ;
495
- let persist_res = self . persister . update_persisted_channel ( funding_txo, & update, monitor, update_id) ;
553
+ let persist_res = self . persister . update_persisted_channel ( funding_txo, & Some ( update) , monitor, update_id) ;
496
554
if let Err ( e) = persist_res {
497
555
if e == ChannelMonitorUpdateErr :: TemporaryFailure {
498
556
pending_monitor_updates. push ( update_id) ;
@@ -501,6 +559,8 @@ where C::Target: chain::Filter,
501
559
}
502
560
if update_res. is_err ( ) {
503
561
Err ( ChannelMonitorUpdateErr :: PermanentFailure )
562
+ } else if monitor_state. channel_closed . load ( Ordering :: Acquire ) {
563
+ Err ( ChannelMonitorUpdateErr :: PermanentFailure )
504
564
} else {
505
565
persist_res
506
566
}
@@ -511,7 +571,17 @@ where C::Target: chain::Filter,
511
571
fn release_pending_monitor_events ( & self ) -> Vec < MonitorEvent > {
512
572
let mut pending_monitor_events = self . user_provided_events . lock ( ) . unwrap ( ) . split_off ( 0 ) ;
513
573
for monitor_state in self . monitors . read ( ) . unwrap ( ) . values ( ) {
514
- pending_monitor_events. append ( & mut monitor_state. monitor . get_and_clear_pending_monitor_events ( ) ) ;
574
+ let pending_monitor_update_count = monitor_state. pending_monitor_updates . lock ( ) . unwrap ( )
575
+ . iter ( ) . filter ( |update_id|
576
+ if let MonitorUpdate :: SyncPersistId ( _) = update_id. contents { true } else { false } )
577
+ . count ( ) ;
578
+ if pending_monitor_update_count > 0 {
579
+ log_info ! ( self . logger, "A Channel Monitor sync is still in progress, refusing to provide monitor events!" ) ;
580
+ } else if monitor_state. channel_closed . load ( Ordering :: Acquire ) {
581
+ log_info ! ( self . logger, "A Channel Monitor sync failed, refusing to provide monitor events!" ) ;
582
+ } else {
583
+ pending_monitor_events. append ( & mut monitor_state. monitor . get_and_clear_pending_monitor_events ( ) ) ;
584
+ }
515
585
}
516
586
pending_monitor_events
517
587
}
0 commit comments