@@ -206,7 +206,7 @@ pub struct PaymentPreimage(pub [u8;32]);
206
206
#[ derive( Hash , Copy , Clone , PartialEq , Eq , Debug ) ]
207
207
pub struct PaymentSecret ( pub [ u8 ; 32 ] ) ;
208
208
209
- type ShutdownResult = ( Option < OutPoint > , ChannelMonitorUpdate , Vec < ( HTLCSource , PaymentHash ) > ) ;
209
+ type ShutdownResult = ( Option < ( OutPoint , ChannelMonitorUpdate ) > , Vec < ( HTLCSource , PaymentHash ) > ) ;
210
210
211
211
/// Error type returned across the channel_state mutex boundary. When an Err is generated for a
212
212
/// Channel, we generally end up with a ChannelError::Close for which we have to close the channel
@@ -333,6 +333,15 @@ pub(super) struct ChannelHolder<Signer: Sign> {
333
333
pub ( super ) pending_msg_events : Vec < MessageSendEvent > ,
334
334
}
335
335
336
+ /// Events which we process internally but cannot be procsesed immediately at the generation site
337
+ /// for some reason. They are handled in timer_chan_freshness_every_min, so may be processed with
338
+ /// quite some time lag.
339
+ enum BackgroundEvent {
340
+ /// Handle a ChannelMonitorUpdate that closes a channel, broadcasting its current latest holder
341
+ /// commitment transaction.
342
+ ClosingMonitorUpdate ( ( OutPoint , ChannelMonitorUpdate ) ) ,
343
+ }
344
+
336
345
/// State we hold per-peer. In the future we should put channels in here, but for now we only hold
337
346
/// the latest Init features we heard from the peer.
338
347
struct PeerState {
@@ -436,6 +445,7 @@ pub struct ChannelManager<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref,
436
445
per_peer_state : RwLock < HashMap < PublicKey , Mutex < PeerState > > > ,
437
446
438
447
pending_events : Mutex < Vec < events:: Event > > ,
448
+ pending_background_events : Mutex < Vec < BackgroundEvent > > ,
439
449
/// Used when we have to take a BIG lock to make sure everything is self-consistent.
440
450
/// Essentially just when we're serializing ourselves out.
441
451
/// Taken first everywhere where we are making changes before any other locks.
@@ -794,6 +804,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
794
804
per_peer_state : RwLock :: new ( HashMap :: new ( ) ) ,
795
805
796
806
pending_events : Mutex :: new ( Vec :: new ( ) ) ,
807
+ pending_background_events : Mutex :: new ( Vec :: new ( ) ) ,
797
808
total_consistency_lock : RwLock :: new ( ( ) ) ,
798
809
persistence_notifier : PersistenceNotifier :: new ( ) ,
799
810
@@ -942,12 +953,12 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
942
953
943
954
#[ inline]
944
955
fn finish_force_close_channel ( & self , shutdown_res : ShutdownResult ) {
945
- let ( funding_txo_option , monitor_update , mut failed_htlcs) = shutdown_res;
956
+ let ( monitor_update_option , mut failed_htlcs) = shutdown_res;
946
957
log_trace ! ( self . logger, "Finishing force-closure of channel {} HTLCs to fail" , failed_htlcs. len( ) ) ;
947
958
for htlc_source in failed_htlcs. drain ( ..) {
948
959
self . fail_htlc_backwards_internal ( self . channel_state . lock ( ) . unwrap ( ) , htlc_source. 0 , & htlc_source. 1 , HTLCFailReason :: Reason { failure_code : 0x4000 | 8 , data : Vec :: new ( ) } ) ;
949
960
}
950
- if let Some ( funding_txo) = funding_txo_option {
961
+ if let Some ( ( funding_txo, monitor_update ) ) = monitor_update_option {
951
962
// There isn't anything we can do if we get an update failure - we're already
952
963
// force-closing. The monitor update on the required in-memory copy should broadcast
953
964
// the latest local state, which is the best we can do anyway. Thus, it is safe to
@@ -1854,13 +1865,42 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
1854
1865
events. append ( & mut new_events) ;
1855
1866
}
1856
1867
1868
+ /// Free the background events, generally called from timer_chan_freshness_every_min.
1869
+ ///
1870
+ /// Exposed for testing to allow us to process events quickly without generating accidental
1871
+ /// BroadcastChannelUpdate events in timer_chan_freshness_every_min.
1872
+ ///
1873
+ /// Expects the caller to have a total_consistency_lock read lock.
1874
+ fn process_background_events ( & self ) {
1875
+ let mut background_events = Vec :: new ( ) ;
1876
+ mem:: swap ( & mut * self . pending_background_events . lock ( ) . unwrap ( ) , & mut background_events) ;
1877
+ for event in background_events. drain ( ..) {
1878
+ match event {
1879
+ BackgroundEvent :: ClosingMonitorUpdate ( ( funding_txo, update) ) => {
1880
+ // The channel has already been closed, so no use bothering to care about the
1881
+ // monitor updating completing.
1882
+ let _ = self . chain_monitor . update_channel ( funding_txo, update) ;
1883
+ } ,
1884
+ }
1885
+ }
1886
+ }
1887
+
1888
+ #[ cfg( any( test, feature = "_test_utils" ) ) ]
1889
+ pub ( crate ) fn test_process_background_events ( & self ) {
1890
+ self . process_background_events ( ) ;
1891
+ }
1892
+
1857
1893
/// If a peer is disconnected we mark any channels with that peer as 'disabled'.
1858
1894
/// After some time, if channels are still disabled we need to broadcast a ChannelUpdate
1859
1895
/// to inform the network about the uselessness of these channels.
1860
1896
///
1861
1897
/// This method handles all the details, and must be called roughly once per minute.
1898
+ ///
1899
+ /// Note that in some rare cases this may generate a `chain::Watch::update_channel` call.
1862
1900
pub fn timer_chan_freshness_every_min ( & self ) {
1863
1901
let _persistence_guard = PersistenceNotifierGuard :: new ( & self . total_consistency_lock , & self . persistence_notifier ) ;
1902
+ self . process_background_events ( ) ;
1903
+
1864
1904
let mut channel_state_lock = self . channel_state . lock ( ) . unwrap ( ) ;
1865
1905
let channel_state = & mut * channel_state_lock;
1866
1906
for ( _, chan) in channel_state. by_id . iter_mut ( ) {
@@ -1953,6 +1993,10 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
1953
1993
//identify whether we sent it or not based on the (I presume) very different runtime
1954
1994
//between the branches here. We should make this async and move it into the forward HTLCs
1955
1995
//timer handling.
1996
+
1997
+ // Note that we MUST NOT end up calling methods on self.chain_monitor here - we're called
1998
+ // from block_connected which may run during initialization prior to the chain_monitor
1999
+ // being fully configured. See the docs for `ChannelManagerReadArgs` for more.
1956
2000
match source {
1957
2001
HTLCSource :: OutboundRoute { ref path, .. } => {
1958
2002
log_trace ! ( self . logger, "Failing outbound payment HTLC with payment_hash {}" , log_bytes!( payment_hash. 0 ) ) ;
@@ -2418,7 +2462,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
2418
2462
// We do not do a force-close here as that would generate a monitor update for
2419
2463
// a monitor that we didn't manage to store (and that we don't care about - we
2420
2464
// don't respond with the funding_signed so the channel can never go on chain).
2421
- let ( _funding_txo_option , _monitor_update, failed_htlcs) = chan. force_shutdown ( true ) ;
2465
+ let ( _monitor_update, failed_htlcs) = chan. force_shutdown ( true ) ;
2422
2466
assert ! ( failed_htlcs. is_empty( ) ) ;
2423
2467
return Err ( MsgHandleErrInternal :: send_err_msg_no_close ( "ChannelMonitor storage failure" . to_owned ( ) , funding_msg. channel_id ) ) ;
2424
2468
} ,
@@ -3100,6 +3144,29 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
3100
3144
self . finish_force_close_channel ( failure) ;
3101
3145
}
3102
3146
}
3147
+
3148
+ /// Handle a list of channel failures during a block_connected or block_disconnected call,
3149
+ /// pushing the channel monitor update (if any) to the background events queue and removing the
3150
+ /// Channel object.
3151
+ fn handle_init_event_channel_failures ( & self , mut failed_channels : Vec < ShutdownResult > ) {
3152
+ for mut failure in failed_channels. drain ( ..) {
3153
+ // Either a commitment transactions has been confirmed on-chain or
3154
+ // Channel::block_disconnected detected that the funding transaction has been
3155
+ // reorganized out of the main chain.
3156
+ // We cannot broadcast our latest local state via monitor update (as
3157
+ // Channel::force_shutdown tries to make us do) as we may still be in initialization,
3158
+ // so we track the update internally and handle it when the user next calls
3159
+ // timer_chan_freshness_every_min, guaranteeing we're running normally.
3160
+ if let Some ( ( funding_txo, update) ) = failure. 0 . take ( ) {
3161
+ assert_eq ! ( update. updates. len( ) , 1 ) ;
3162
+ if let ChannelMonitorUpdateStep :: ChannelForceClosed { should_broadcast } = update. updates [ 0 ] {
3163
+ assert ! ( should_broadcast) ;
3164
+ } else { unreachable ! ( ) ; }
3165
+ self . pending_background_events . lock ( ) . unwrap ( ) . push ( BackgroundEvent :: ClosingMonitorUpdate ( ( funding_txo, update) ) ) ;
3166
+ }
3167
+ self . finish_force_close_channel ( failure) ;
3168
+ }
3169
+ }
3103
3170
}
3104
3171
3105
3172
impl < Signer : Sign , M : Deref , T : Deref , K : Deref , F : Deref , L : Deref > MessageSendEventsProvider for ChannelManager < Signer , M , T , K , F , L >
@@ -3167,6 +3234,9 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
3167
3234
{
3168
3235
/// Updates channel state based on transactions seen in a connected block.
3169
3236
pub fn block_connected ( & self , header : & BlockHeader , txdata : & TransactionData , height : u32 ) {
3237
+ // Note that we MUST NOT end up calling methods on self.chain_monitor here - we're called
3238
+ // during initialization prior to the chain_monitor being fully configured in some cases.
3239
+ // See the docs for `ChannelManagerReadArgs` for more.
3170
3240
let header_hash = header. block_hash ( ) ;
3171
3241
log_trace ! ( self . logger, "Block {} at height {} connected" , header_hash, height) ;
3172
3242
let _persistence_guard = PersistenceNotifierGuard :: new ( & self . total_consistency_lock , & self . persistence_notifier ) ;
@@ -3218,9 +3288,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
3218
3288
if let Some ( short_id) = channel. get_short_channel_id ( ) {
3219
3289
short_to_id. remove ( & short_id) ;
3220
3290
}
3221
- // It looks like our counterparty went on-chain. We go ahead and
3222
- // broadcast our latest local state as well here, just in case its
3223
- // some kind of SPV attack, though we expect these to be dropped.
3291
+ // It looks like our counterparty went on-chain. Close the channel.
3224
3292
failed_channels. push ( channel. force_shutdown ( true ) ) ;
3225
3293
if let Ok ( update) = self . get_channel_update ( & channel) {
3226
3294
pending_msg_events. push ( events:: MessageSendEvent :: BroadcastChannelUpdate {
@@ -3254,9 +3322,8 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
3254
3322
!htlcs. is_empty ( ) // Only retain this entry if htlcs has at least one entry.
3255
3323
} ) ;
3256
3324
}
3257
- for failure in failed_channels. drain ( ..) {
3258
- self . finish_force_close_channel ( failure) ;
3259
- }
3325
+
3326
+ self . handle_init_event_channel_failures ( failed_channels) ;
3260
3327
3261
3328
for ( source, payment_hash, reason) in timed_out_htlcs. drain ( ..) {
3262
3329
self . fail_htlc_backwards_internal ( self . channel_state . lock ( ) . unwrap ( ) , source, & payment_hash, reason) ;
@@ -3282,6 +3349,9 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
3282
3349
/// If necessary, the channel may be force-closed without letting the counterparty participate
3283
3350
/// in the shutdown.
3284
3351
pub fn block_disconnected ( & self , header : & BlockHeader ) {
3352
+ // Note that we MUST NOT end up calling methods on self.chain_monitor here - we're called
3353
+ // during initialization prior to the chain_monitor being fully configured in some cases.
3354
+ // See the docs for `ChannelManagerReadArgs` for more.
3285
3355
let _persistence_guard = PersistenceNotifierGuard :: new ( & self . total_consistency_lock , & self . persistence_notifier ) ;
3286
3356
let mut failed_channels = Vec :: new ( ) ;
3287
3357
{
@@ -3306,9 +3376,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
3306
3376
}
3307
3377
} ) ;
3308
3378
}
3309
- for failure in failed_channels. drain ( ..) {
3310
- self . finish_force_close_channel ( failure) ;
3311
- }
3379
+ self . handle_init_event_channel_failures ( failed_channels) ;
3312
3380
self . latest_block_height . fetch_sub ( 1 , Ordering :: AcqRel ) ;
3313
3381
* self . last_block_hash . try_lock ( ) . expect ( "block_(dis)connected must not be called in parallel" ) = header. block_hash ( ) ;
3314
3382
}
@@ -3914,6 +3982,18 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> Writeable f
3914
3982
event. write ( writer) ?;
3915
3983
}
3916
3984
3985
+ let background_events = self . pending_background_events . lock ( ) . unwrap ( ) ;
3986
+ ( background_events. len ( ) as u64 ) . write ( writer) ?;
3987
+ for event in background_events. iter ( ) {
3988
+ match event {
3989
+ BackgroundEvent :: ClosingMonitorUpdate ( ( funding_txo, monitor_update) ) => {
3990
+ 0u8 . write ( writer) ?;
3991
+ funding_txo. write ( writer) ?;
3992
+ monitor_update. write ( writer) ?;
3993
+ } ,
3994
+ }
3995
+ }
3996
+
3917
3997
( self . last_node_announcement_serial . load ( Ordering :: Acquire ) as u32 ) . write ( writer) ?;
3918
3998
3919
3999
Ok ( ( ) )
@@ -3929,11 +4009,22 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> Writeable f
3929
4009
/// ChannelManager)>::read(reader, args).
3930
4010
/// This may result in closing some Channels if the ChannelMonitor is newer than the stored
3931
4011
/// ChannelManager state to ensure no loss of funds. Thus, transactions may be broadcasted.
3932
- /// 3) Register all relevant ChannelMonitor outpoints with your chain watch mechanism using
3933
- /// ChannelMonitor::get_outputs_to_watch() and ChannelMonitor::get_funding_txo().
4012
+ /// 3) If you are not fetching full blocks, register all relevant ChannelMonitor outpoints the same
4013
+ /// way you would handle a `chain::Filter` call using ChannelMonitor::get_outputs_to_watch() and
4014
+ /// ChannelMonitor::get_funding_txo().
3934
4015
/// 4) Reconnect blocks on your ChannelMonitors.
3935
- /// 5) Move the ChannelMonitors into your local chain::Watch.
3936
- /// 6) Disconnect/connect blocks on the ChannelManager.
4016
+ /// 5) Disconnect/connect blocks on the ChannelManager.
4017
+ /// 6) Move the ChannelMonitors into your local chain::Watch.
4018
+ ///
4019
+ /// Note that the ordering of #4-6 is not of importance, however all three must occur before you
4020
+ /// call any other methods on the newly-deserialized ChannelManager.
4021
+ ///
4022
+ /// Note that because some channels may be closed during deserialization, it is critical that you
4023
+ /// always deserialize only the latest version of a ChannelManager and ChannelMonitors available to
4024
+ /// you. If you deserialize an old ChannelManager (during which force-closure transactions may be
4025
+ /// broadcast), and then later deserialize a newer version of the same ChannelManager (which will
4026
+ /// not force-close the same channels but consider them live), you may end up revoking a state for
4027
+ /// which you've already broadcasted the transaction.
3937
4028
pub struct ChannelManagerReadArgs < ' a , Signer : ' a + Sign , M : Deref , T : Deref , K : Deref , F : Deref , L : Deref >
3938
4029
where M :: Target : chain:: Watch < Signer > ,
3939
4030
T :: Target : BroadcasterInterface ,
@@ -4064,7 +4155,7 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
4064
4155
channel. get_cur_counterparty_commitment_transaction_number ( ) > monitor. get_cur_counterparty_commitment_number ( ) ||
4065
4156
channel. get_latest_monitor_update_id ( ) < monitor. get_latest_update_id ( ) {
4066
4157
// But if the channel is behind of the monitor, close the channel:
4067
- let ( _, _ , mut new_failed_htlcs) = channel. force_shutdown ( true ) ;
4158
+ let ( _, mut new_failed_htlcs) = channel. force_shutdown ( true ) ;
4068
4159
failed_htlcs. append ( & mut new_failed_htlcs) ;
4069
4160
monitor. broadcast_latest_holder_commitment_txn ( & args. tx_broadcaster , & args. logger ) ;
4070
4161
} else {
@@ -4128,6 +4219,15 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
4128
4219
}
4129
4220
}
4130
4221
4222
+ let background_event_count: u64 = Readable :: read ( reader) ?;
4223
+ let mut pending_background_events_read: Vec < BackgroundEvent > = Vec :: with_capacity ( cmp:: min ( background_event_count as usize , MAX_ALLOC_SIZE /mem:: size_of :: < BackgroundEvent > ( ) ) ) ;
4224
+ for _ in 0 ..background_event_count {
4225
+ match <u8 as Readable >:: read ( reader) ? {
4226
+ 0 => pending_background_events_read. push ( BackgroundEvent :: ClosingMonitorUpdate ( ( Readable :: read ( reader) ?, Readable :: read ( reader) ?) ) ) ,
4227
+ _ => return Err ( DecodeError :: InvalidValue ) ,
4228
+ }
4229
+ }
4230
+
4131
4231
let last_node_announcement_serial: u32 = Readable :: read ( reader) ?;
4132
4232
4133
4233
let mut secp_ctx = Secp256k1 :: new ( ) ;
@@ -4157,6 +4257,7 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
4157
4257
per_peer_state : RwLock :: new ( per_peer_state) ,
4158
4258
4159
4259
pending_events : Mutex :: new ( pending_events_read) ,
4260
+ pending_background_events : Mutex :: new ( pending_background_events_read) ,
4160
4261
total_consistency_lock : RwLock :: new ( ( ) ) ,
4161
4262
persistence_notifier : PersistenceNotifier :: new ( ) ,
4162
4263
0 commit comments