@@ -333,6 +333,15 @@ pub(super) struct ChannelHolder<Signer: Sign> {
333
333
pub ( super ) pending_msg_events : Vec < MessageSendEvent > ,
334
334
}
335
335
336
+ /// Events which we process internally but cannot be procsesed immediately at the generation site
337
+ /// for some reason. They are handled in timer_chan_freshness_every_min, so may be processed with
338
+ /// quite some time lag.
339
+ enum BackgroundEvent {
340
+ /// Handle a ChannelMonitorUpdate that closes a channel, broadcasting its current latest holder
341
+ /// commitment transaction.
342
+ ClosingMonitorUpdate ( ( OutPoint , ChannelMonitorUpdate ) ) ,
343
+ }
344
+
336
345
/// State we hold per-peer. In the future we should put channels in here, but for now we only hold
337
346
/// the latest Init features we heard from the peer.
338
347
struct PeerState {
@@ -436,6 +445,7 @@ pub struct ChannelManager<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref,
436
445
per_peer_state : RwLock < HashMap < PublicKey , Mutex < PeerState > > > ,
437
446
438
447
pending_events : Mutex < Vec < events:: Event > > ,
448
+ pending_background_events : Mutex < Vec < BackgroundEvent > > ,
439
449
/// Used when we have to take a BIG lock to make sure everything is self-consistent.
440
450
/// Essentially just when we're serializing ourselves out.
441
451
/// Taken first everywhere where we are making changes before any other locks.
@@ -794,6 +804,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
794
804
per_peer_state : RwLock :: new ( HashMap :: new ( ) ) ,
795
805
796
806
pending_events : Mutex :: new ( Vec :: new ( ) ) ,
807
+ pending_background_events : Mutex :: new ( Vec :: new ( ) ) ,
797
808
total_consistency_lock : RwLock :: new ( ( ) ) ,
798
809
persistence_notifier : PersistenceNotifier :: new ( ) ,
799
810
@@ -1854,13 +1865,40 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
1854
1865
events. append ( & mut new_events) ;
1855
1866
}
1856
1867
1868
+ /// Free the background events, generally called from timer_chan_freshness_every_min.
1869
+ ///
1870
+ /// Exposed for testing to allow us to process events quickly without generating accidental
1871
+ /// BroadcastChannelUpdate events in timer_chan_freshness_every_min.
1872
+ ///
1873
+ /// Expects the caller to have a total_consistency_lock read lock.
1874
+ fn process_background_events ( & self ) {
1875
+ let mut background_events = Vec :: new ( ) ;
1876
+ mem:: swap ( & mut * self . pending_background_events . lock ( ) . unwrap ( ) , & mut background_events) ;
1877
+ for event in background_events. drain ( ..) {
1878
+ match event {
1879
+ BackgroundEvent :: ClosingMonitorUpdate ( ( funding_txo, update) ) => {
1880
+ // The channel has already been closed, so no use bothering to care about the
1881
+ // monitor updating completing.
1882
+ let _ = self . chain_monitor . update_channel ( funding_txo, update) ;
1883
+ } ,
1884
+ }
1885
+ }
1886
+ }
1887
+
1888
+ #[ cfg( any( test, feature = "_test_utils" ) ) ]
1889
+ pub ( crate ) fn test_process_background_events ( & self ) {
1890
+ self . process_background_events ( ) ;
1891
+ }
1892
+
1857
1893
/// If a peer is disconnected we mark any channels with that peer as 'disabled'.
1858
1894
/// After some time, if channels are still disabled we need to broadcast a ChannelUpdate
1859
1895
/// to inform the network about the uselessness of these channels.
1860
1896
///
1861
1897
/// This method handles all the details, and must be called roughly once per minute.
1862
1898
pub fn timer_chan_freshness_every_min ( & self ) {
1863
1899
let _persistence_guard = PersistenceNotifierGuard :: new ( & self . total_consistency_lock , & self . persistence_notifier ) ;
1900
+ self . process_background_events ( ) ;
1901
+
1864
1902
let mut channel_state_lock = self . channel_state . lock ( ) . unwrap ( ) ;
1865
1903
let channel_state = & mut * channel_state_lock;
1866
1904
for ( _, chan) in channel_state. by_id . iter_mut ( ) {
@@ -1953,6 +1991,10 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
1953
1991
//identify whether we sent it or not based on the (I presume) very different runtime
1954
1992
//between the branches here. We should make this async and move it into the forward HTLCs
1955
1993
//timer handling.
1994
+
1995
+ // Note that we MUST NOT end up calling methods on self.chain_monitor here - we're called
1996
+ // from block_connected which may run during initialization prior to the chain_monitor
1997
+ // being fully configured. See the docs for `ChannelManagerReadArgs` for more.
1956
1998
match source {
1957
1999
HTLCSource :: OutboundRoute { ref path, .. } => {
1958
2000
log_trace ! ( self . logger, "Failing outbound payment HTLC with payment_hash {}" , log_bytes!( payment_hash. 0 ) ) ;
@@ -3100,6 +3142,29 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
3100
3142
self . finish_force_close_channel ( failure) ;
3101
3143
}
3102
3144
}
3145
+
3146
+ /// Handle a list of channel failures during a block_connected or block_disconnected call,
3147
+ /// pushing the channel monitor update (if any) to the background events queue and removing the
3148
+ /// Channel object.
3149
+ fn handle_init_event_channel_failures ( & self , mut failed_channels : Vec < ShutdownResult > ) {
3150
+ for mut failure in failed_channels. drain ( ..) {
3151
+ // Either a commitment transactions has been confirmed on-chain or
3152
+ // Channel::block_disconnected detected that the funding transaction has been
3153
+ // reorganized out of the main chain.
3154
+ // We cannot broadcast our latest local state via monitor update (as
3155
+ // Channel::force_shutdown tries to make us do) as we may still be in initialization,
3156
+ // so we track the update internally and handle it when the user next calls
3157
+ // timer_chan_freshness_every_min, guaranteeing we're running normally.
3158
+ if let Some ( ( funding_txo, update) ) = failure. 0 . take ( ) {
3159
+ assert_eq ! ( update. updates. len( ) , 1 ) ;
3160
+ if let ChannelMonitorUpdateStep :: ChannelForceClosed { should_broadcast } = update. updates [ 0 ] {
3161
+ assert ! ( should_broadcast) ;
3162
+ } else { unreachable ! ( ) ; }
3163
+ self . pending_background_events . lock ( ) . unwrap ( ) . push ( BackgroundEvent :: ClosingMonitorUpdate ( ( funding_txo, update) ) ) ;
3164
+ }
3165
+ self . finish_force_close_channel ( failure) ;
3166
+ }
3167
+ }
3103
3168
}
3104
3169
3105
3170
impl < Signer : Sign , M : Deref , T : Deref , K : Deref , F : Deref , L : Deref > MessageSendEventsProvider for ChannelManager < Signer , M , T , K , F , L >
@@ -3167,6 +3232,9 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
3167
3232
{
3168
3233
/// Updates channel state based on transactions seen in a connected block.
3169
3234
pub fn block_connected ( & self , header : & BlockHeader , txdata : & TransactionData , height : u32 ) {
3235
+ // Note that we MUST NOT end up calling methods on self.chain_monitor here - we're called
3236
+ // during initialization prior to the chain_monitor being fully configured in some cases.
3237
+ // See the docs for `ChannelManagerReadArgs` for more.
3170
3238
let header_hash = header. block_hash ( ) ;
3171
3239
log_trace ! ( self . logger, "Block {} at height {} connected" , header_hash, height) ;
3172
3240
let _persistence_guard = PersistenceNotifierGuard :: new ( & self . total_consistency_lock , & self . persistence_notifier ) ;
@@ -3218,9 +3286,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
3218
3286
if let Some ( short_id) = channel. get_short_channel_id ( ) {
3219
3287
short_to_id. remove ( & short_id) ;
3220
3288
}
3221
- // It looks like our counterparty went on-chain. We go ahead and
3222
- // broadcast our latest local state as well here, just in case its
3223
- // some kind of SPV attack, though we expect these to be dropped.
3289
+ // It looks like our counterparty went on-chain. Close the channel.
3224
3290
failed_channels. push ( channel. force_shutdown ( true ) ) ;
3225
3291
if let Ok ( update) = self . get_channel_update ( & channel) {
3226
3292
pending_msg_events. push ( events:: MessageSendEvent :: BroadcastChannelUpdate {
@@ -3254,9 +3320,8 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
3254
3320
!htlcs. is_empty ( ) // Only retain this entry if htlcs has at least one entry.
3255
3321
} ) ;
3256
3322
}
3257
- for failure in failed_channels. drain ( ..) {
3258
- self . finish_force_close_channel ( failure) ;
3259
- }
3323
+
3324
+ self . handle_init_event_channel_failures ( failed_channels) ;
3260
3325
3261
3326
for ( source, payment_hash, reason) in timed_out_htlcs. drain ( ..) {
3262
3327
self . fail_htlc_backwards_internal ( self . channel_state . lock ( ) . unwrap ( ) , source, & payment_hash, reason) ;
@@ -3282,6 +3347,9 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
3282
3347
/// If necessary, the channel may be force-closed without letting the counterparty participate
3283
3348
/// in the shutdown.
3284
3349
pub fn block_disconnected ( & self , header : & BlockHeader ) {
3350
+ // Note that we MUST NOT end up calling methods on self.chain_monitor here - we're called
3351
+ // during initialization prior to the chain_monitor being fully configured in some cases.
3352
+ // See the docs for `ChannelManagerReadArgs` for more.
3285
3353
let _persistence_guard = PersistenceNotifierGuard :: new ( & self . total_consistency_lock , & self . persistence_notifier ) ;
3286
3354
let mut failed_channels = Vec :: new ( ) ;
3287
3355
{
@@ -3306,9 +3374,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
3306
3374
}
3307
3375
} ) ;
3308
3376
}
3309
- for failure in failed_channels. drain ( ..) {
3310
- self . finish_force_close_channel ( failure) ;
3311
- }
3377
+ self . handle_init_event_channel_failures ( failed_channels) ;
3312
3378
self . latest_block_height . fetch_sub ( 1 , Ordering :: AcqRel ) ;
3313
3379
* self . last_block_hash . try_lock ( ) . expect ( "block_(dis)connected must not be called in parallel" ) = header. block_hash ( ) ;
3314
3380
}
@@ -3914,6 +3980,18 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> Writeable f
3914
3980
event. write ( writer) ?;
3915
3981
}
3916
3982
3983
+ let background_events = self . pending_background_events . lock ( ) . unwrap ( ) ;
3984
+ ( background_events. len ( ) as u64 ) . write ( writer) ?;
3985
+ for event in background_events. iter ( ) {
3986
+ match event {
3987
+ BackgroundEvent :: ClosingMonitorUpdate ( ( funding_txo, monitor_update) ) => {
3988
+ 0u8 . write ( writer) ?;
3989
+ funding_txo. write ( writer) ?;
3990
+ monitor_update. write ( writer) ?;
3991
+ } ,
3992
+ }
3993
+ }
3994
+
3917
3995
( self . last_node_announcement_serial . load ( Ordering :: Acquire ) as u32 ) . write ( writer) ?;
3918
3996
3919
3997
Ok ( ( ) )
@@ -3932,8 +4010,11 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> Writeable f
3932
4010
/// 3) Register all relevant ChannelMonitor outpoints with your chain watch mechanism using
3933
4011
/// ChannelMonitor::get_outputs_to_watch() and ChannelMonitor::get_funding_txo().
3934
4012
/// 4) Reconnect blocks on your ChannelMonitors.
3935
- /// 5) Move the ChannelMonitors into your local chain::Watch.
3936
- /// 6) Disconnect/connect blocks on the ChannelManager.
4013
+ /// 5) Disconnect/connect blocks on the ChannelManager.
4014
+ /// 6) Move the ChannelMonitors into your local chain::Watch.
4015
+ ///
4016
+ /// Note that the ordering of #4-6 is not of importance, however all three must occur before you
4017
+ /// call any other methods on the newly-deserialized ChannelManager.
3937
4018
///
3938
4019
/// Note that because some channels may be closed during deserialization, it is critical that you
3939
4020
/// always deserialize only the latest version of a ChannelManager and ChannelMonitors available to
@@ -4135,6 +4216,15 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
4135
4216
}
4136
4217
}
4137
4218
4219
+ let background_event_count: u64 = Readable :: read ( reader) ?;
4220
+ let mut pending_background_events_read: Vec < BackgroundEvent > = Vec :: with_capacity ( cmp:: min ( background_event_count as usize , MAX_ALLOC_SIZE /mem:: size_of :: < BackgroundEvent > ( ) ) ) ;
4221
+ for _ in 0 ..background_event_count {
4222
+ match <u8 as Readable >:: read ( reader) ? {
4223
+ 0 => pending_background_events_read. push ( BackgroundEvent :: ClosingMonitorUpdate ( ( Readable :: read ( reader) ?, Readable :: read ( reader) ?) ) ) ,
4224
+ _ => return Err ( DecodeError :: InvalidValue ) ,
4225
+ }
4226
+ }
4227
+
4138
4228
let last_node_announcement_serial: u32 = Readable :: read ( reader) ?;
4139
4229
4140
4230
let mut secp_ctx = Secp256k1 :: new ( ) ;
@@ -4164,6 +4254,7 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
4164
4254
per_peer_state : RwLock :: new ( per_peer_state) ,
4165
4255
4166
4256
pending_events : Mutex :: new ( pending_events_read) ,
4257
+ pending_background_events : Mutex :: new ( pending_background_events_read) ,
4167
4258
total_consistency_lock : RwLock :: new ( ( ) ) ,
4168
4259
persistence_notifier : PersistenceNotifier :: new ( ) ,
4169
4260
0 commit comments