@@ -333,6 +333,15 @@ pub(super) struct ChannelHolder<Signer: Sign> {
333
333
pub ( super ) pending_msg_events : Vec < MessageSendEvent > ,
334
334
}
335
335
336
+ /// Events which we process internally, but which cannot be procsesed immediately at the
337
+ /// generation site for some reason. They are handled in timer_chan_freshness_every_min, so may be
338
+ /// processed with quite some time lag.
339
+ enum BackgroundManagerEvent {
340
+ /// Handle a ChannelMonitorUpdate which closes a channel, broadcasting its current latest holder
341
+ /// commitment transaction.
342
+ ClosingMonitorUpdate ( ( OutPoint , ChannelMonitorUpdate ) ) ,
343
+ }
344
+
336
345
/// State we hold per-peer. In the future we should put channels in here, but for now we only hold
337
346
/// the latest Init features we heard from the peer.
338
347
struct PeerState {
@@ -436,6 +445,7 @@ pub struct ChannelManager<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref,
436
445
per_peer_state : RwLock < HashMap < PublicKey , Mutex < PeerState > > > ,
437
446
438
447
pending_events : Mutex < Vec < events:: Event > > ,
448
+ pending_background_events : Mutex < Vec < BackgroundManagerEvent > > ,
439
449
/// Used when we have to take a BIG lock to make sure everything is self-consistent.
440
450
/// Essentially just when we're serializing ourselves out.
441
451
/// Taken first everywhere where we are making changes before any other locks.
@@ -793,6 +803,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
793
803
per_peer_state : RwLock :: new ( HashMap :: new ( ) ) ,
794
804
795
805
pending_events : Mutex :: new ( Vec :: new ( ) ) ,
806
+ pending_background_events : Mutex :: new ( Vec :: new ( ) ) ,
796
807
total_consistency_lock : RwLock :: new ( ( ) ) ,
797
808
persistence_notifier : PersistenceNotifier :: new ( ) ,
798
809
@@ -1853,13 +1864,35 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
1853
1864
events. append ( & mut new_events) ;
1854
1865
}
1855
1866
1867
+ /// Free the background events, generally called from timer_chan_freshness_every_min.
1868
+ ///
1869
+ /// Exposed for testing to allow us to process events quickly without generating accidental
1870
+ /// BroadcastChannelUpdate events in timer_chan_freshness_every_min.
1871
+ ///
1872
+ /// Expects the caller to have a total_consistency_lock read lock.
1873
+ pub ( crate ) fn process_background_events ( & self ) {
1874
+ let mut background_events = Vec :: new ( ) ;
1875
+ mem:: swap ( & mut * self . pending_background_events . lock ( ) . unwrap ( ) , & mut background_events) ;
1876
+ for event in background_events. drain ( ..) {
1877
+ match event {
1878
+ BackgroundManagerEvent :: ClosingMonitorUpdate ( ( funding_txo, update) ) => {
1879
+ // The channel has already been closed, so no use bothering to care about the
1880
+ // monitor updating completing.
1881
+ let _ = self . chain_monitor . update_channel ( funding_txo, update) ;
1882
+ } ,
1883
+ }
1884
+ }
1885
+ }
1886
+
1856
1887
/// If a peer is disconnected we mark any channels with that peer as 'disabled'.
1857
1888
/// After some time, if channels are still disabled we need to broadcast a ChannelUpdate
1858
1889
/// to inform the network about the uselessness of these channels.
1859
1890
///
1860
1891
/// This method handles all the details, and must be called roughly once per minute.
1861
1892
pub fn timer_chan_freshness_every_min ( & self ) {
1862
1893
let _persistence_guard = PersistenceNotifierGuard :: new ( & self . total_consistency_lock , & self . persistence_notifier ) ;
1894
+ self . process_background_events ( ) ;
1895
+
1863
1896
let mut channel_state_lock = self . channel_state . lock ( ) . unwrap ( ) ;
1864
1897
let channel_state = & mut * channel_state_lock;
1865
1898
for ( _, chan) in channel_state. by_id . iter_mut ( ) {
@@ -1952,6 +1985,10 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
1952
1985
//identify whether we sent it or not based on the (I presume) very different runtime
1953
1986
//between the branches here. We should make this async and move it into the forward HTLCs
1954
1987
//timer handling.
1988
+
1989
+ // Note that we MUST NOT end up calling methods on self.chain_monitor here - we're called
1990
+ // from block_connected which may run during initialization prior to the chain_monitor
1991
+ // being fully configured. See the docs for `ChannelManagerReadArgs` for more.
1955
1992
match source {
1956
1993
HTLCSource :: OutboundRoute { ref path, .. } => {
1957
1994
log_trace ! ( self . logger, "Failing outbound payment HTLC with payment_hash {}" , log_bytes!( payment_hash. 0 ) ) ;
@@ -3166,6 +3203,9 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
3166
3203
{
3167
3204
/// Updates channel state based on transactions seen in a connected block.
3168
3205
pub fn block_connected ( & self , header : & BlockHeader , txdata : & TransactionData , height : u32 ) {
3206
+ // Note that we MUST NOT end up calling methods on self.chain_monitor here - we're called
3207
+ // during initialization prior to the chain_monitor being fully configured in some cases.
3208
+ // See the docs for `ChannelManagerReadArgs` for more.
3169
3209
let header_hash = header. block_hash ( ) ;
3170
3210
log_trace ! ( self . logger, "Block {} at height {} connected" , header_hash, height) ;
3171
3211
let _persistence_guard = PersistenceNotifierGuard :: new ( & self . total_consistency_lock , & self . persistence_notifier ) ;
@@ -3217,9 +3257,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
3217
3257
if let Some ( short_id) = channel. get_short_channel_id ( ) {
3218
3258
short_to_id. remove ( & short_id) ;
3219
3259
}
3220
- // It looks like our counterparty went on-chain. We go ahead and
3221
- // broadcast our latest local state as well here, just in case its
3222
- // some kind of SPV attack, though we expect these to be dropped.
3260
+ // It looks like our counterparty went on-chain. Close the channel.
3223
3261
failed_channels. push ( channel. force_shutdown ( true ) ) ;
3224
3262
if let Ok ( update) = self . get_channel_update ( & channel) {
3225
3263
pending_msg_events. push ( events:: MessageSendEvent :: BroadcastChannelUpdate {
@@ -3253,7 +3291,18 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
3253
3291
!htlcs. is_empty ( ) // Only retain this entry if htlcs has at least one entry.
3254
3292
} ) ;
3255
3293
}
3256
- for failure in failed_channels. drain ( ..) {
3294
+ for mut failure in failed_channels. drain ( ..) {
3295
+ // It looks like our counterparty went on-chain. We cannot broadcast our latest local
3296
+ // state via monitor update (as Channel::force_shutdown tries to make us do) as we may
3297
+ // still be in initialization, so we track the update internally and handle it when the
3298
+ // user next calls timer_chan_freshness_every_min, guaranteeing we're running normally.
3299
+ if let Some ( ( funding_txo, update) ) = failure. 0 . take ( ) {
3300
+ assert_eq ! ( update. updates. len( ) , 1 ) ;
3301
+ if let ChannelMonitorUpdateStep :: ChannelForceClosed { should_broadcast } = update. updates [ 0 ] {
3302
+ assert ! ( should_broadcast) ;
3303
+ } else { unreachable ! ( ) ; }
3304
+ self . pending_background_events . lock ( ) . unwrap ( ) . push ( BackgroundManagerEvent :: ClosingMonitorUpdate ( ( funding_txo, update) ) ) ;
3305
+ }
3257
3306
self . finish_force_close_channel ( failure) ;
3258
3307
}
3259
3308
@@ -3281,6 +3330,9 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
3281
3330
/// If necessary, the channel may be force-closed without letting the counterparty participate
3282
3331
/// in the shutdown.
3283
3332
pub fn block_disconnected ( & self , header : & BlockHeader ) {
3333
+ // Note that we MUST NOT end up calling methods on self.chain_monitor here - we're called
3334
+ // during initialization prior to the chain_monitor being fully configured in some cases.
3335
+ // See the docs for `ChannelManagerReadArgs` for more.
3284
3336
let _persistence_guard = PersistenceNotifierGuard :: new ( & self . total_consistency_lock , & self . persistence_notifier ) ;
3285
3337
let mut failed_channels = Vec :: new ( ) ;
3286
3338
{
@@ -3305,7 +3357,19 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
3305
3357
}
3306
3358
} ) ;
3307
3359
}
3308
- for failure in failed_channels. drain ( ..) {
3360
+ for mut failure in failed_channels. drain ( ..) {
3361
+ // Channel::block_disconnected tells us to close if the funding transaction was
3362
+ // un-confirmed due to a reorg. We cannot broadcast our latest local state via monitor
3363
+ // update (as Channel::force_shutdown tries to make us do) as we may still be in
3364
+ // initialization, so we track the update internally and handle it when the user next
3365
+ // calls timer_chan_freshness_every_min, guaranteeing we're running normally.
3366
+ if let Some ( ( funding_txo, update) ) = failure. 0 . take ( ) {
3367
+ assert_eq ! ( update. updates. len( ) , 1 ) ;
3368
+ if let ChannelMonitorUpdateStep :: ChannelForceClosed { should_broadcast } = update. updates [ 0 ] {
3369
+ assert ! ( should_broadcast) ;
3370
+ } else { unreachable ! ( ) ; }
3371
+ self . pending_background_events . lock ( ) . unwrap ( ) . push ( BackgroundManagerEvent :: ClosingMonitorUpdate ( ( funding_txo, update) ) ) ;
3372
+ }
3309
3373
self . finish_force_close_channel ( failure) ;
3310
3374
}
3311
3375
self . latest_block_height . fetch_sub ( 1 , Ordering :: AcqRel ) ;
@@ -3913,6 +3977,17 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> Writeable f
3913
3977
event. write ( writer) ?;
3914
3978
}
3915
3979
3980
+ let background_events = self . pending_background_events . lock ( ) . unwrap ( ) ;
3981
+ ( background_events. len ( ) as u64 ) . write ( writer) ?;
3982
+ for event in background_events. iter ( ) {
3983
+ match event {
3984
+ BackgroundManagerEvent :: ClosingMonitorUpdate ( ( funding_txo, monitor_update) ) => {
3985
+ funding_txo. write ( writer) ?;
3986
+ monitor_update. write ( writer) ?;
3987
+ } ,
3988
+ }
3989
+ }
3990
+
3916
3991
( self . last_node_announcement_serial . load ( Ordering :: Acquire ) as u32 ) . write ( writer) ?;
3917
3992
3918
3993
Ok ( ( ) )
@@ -3931,8 +4006,11 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> Writeable f
3931
4006
/// 3) Register all relevant ChannelMonitor outpoints with your chain watch mechanism using
3932
4007
/// ChannelMonitor::get_outputs_to_watch() and ChannelMonitor::get_funding_txo().
3933
4008
/// 4) Reconnect blocks on your ChannelMonitors.
3934
- /// 5) Move the ChannelMonitors into your local chain::Watch.
3935
- /// 6) Disconnect/connect blocks on the ChannelManager.
4009
+ /// 5) Disconnect/connect blocks on the ChannelManager.
4010
+ /// 6) Move the ChannelMonitors into your local chain::Watch.
4011
+ ///
4012
+ /// Note that the ordering of #4-6 is not of importance, however all three must occur before you
4013
+ /// call any other methods on the newly-deserialized ChannelManager.
3936
4014
///
3937
4015
/// Note that because some channels may be closed during deserialization, it is critical that you
3938
4016
/// always deserialize only the latest version of a ChannelManager and ChannelMonitors available to
@@ -4134,6 +4212,15 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
4134
4212
}
4135
4213
}
4136
4214
4215
+ let background_event_count: u64 = Readable :: read ( reader) ?;
4216
+ let mut pending_background_events_read: Vec < BackgroundManagerEvent > = Vec :: with_capacity ( cmp:: min ( background_event_count as usize , MAX_ALLOC_SIZE /mem:: size_of :: < BackgroundManagerEvent > ( ) ) ) ;
4217
+ for _ in 0 ..background_event_count {
4218
+ match <u8 as Readable >:: read ( reader) ? {
4219
+ 0 => pending_background_events_read. push ( BackgroundManagerEvent :: ClosingMonitorUpdate ( ( Readable :: read ( reader) ?, Readable :: read ( reader) ?) ) ) ,
4220
+ _ => return Err ( DecodeError :: InvalidValue ) ,
4221
+ }
4222
+ }
4223
+
4137
4224
let last_node_announcement_serial: u32 = Readable :: read ( reader) ?;
4138
4225
4139
4226
let channel_manager = ChannelManager {
@@ -4160,6 +4247,7 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
4160
4247
per_peer_state : RwLock :: new ( per_peer_state) ,
4161
4248
4162
4249
pending_events : Mutex :: new ( pending_events_read) ,
4250
+ pending_background_events : Mutex :: new ( pending_background_events_read) ,
4163
4251
total_consistency_lock : RwLock :: new ( ( ) ) ,
4164
4252
persistence_notifier : PersistenceNotifier :: new ( ) ,
4165
4253
0 commit comments