@@ -49,7 +49,7 @@ use crate::chain::Filter;
49
49
use crate :: util:: logger:: Logger ;
50
50
use crate :: util:: ser:: { Readable , ReadableArgs , RequiredWrapper , MaybeReadable , UpgradableRequired , Writer , Writeable , U48 } ;
51
51
use crate :: util:: byte_utils;
52
- use crate :: events:: Event ;
52
+ use crate :: events:: { Event , EventHandler } ;
53
53
use crate :: events:: bump_transaction:: { AnchorDescriptor , HTLCDescriptor , BumpTransactionEvent } ;
54
54
55
55
use crate :: prelude:: * ;
@@ -738,11 +738,6 @@ impl Readable for IrrevocablyResolvedHTLC {
738
738
/// You MUST ensure that no ChannelMonitors for a given channel anywhere contain out-of-date
739
739
/// information and are actively monitoring the chain.
740
740
///
741
- /// Pending Events or updated HTLCs which have not yet been read out by
742
- /// get_and_clear_pending_monitor_events or get_and_clear_pending_events are serialized to disk and
743
- /// reloaded at deserialize-time. Thus, you must ensure that, when handling events, all events
744
- /// gotten are fully handled before re-serializing the new state.
745
- ///
746
741
/// Note that the deserializer is only implemented for (BlockHash, ChannelMonitor), which
747
742
/// tells you the last block hash which was block_connect()ed. You MUST rescan any blocks along
748
743
/// the "reorg path" (ie disconnecting blocks until you find a common ancestor from both the
@@ -752,7 +747,7 @@ pub struct ChannelMonitor<Signer: WriteableEcdsaChannelSigner> {
752
747
#[ cfg( test) ]
753
748
pub ( crate ) inner : Mutex < ChannelMonitorImpl < Signer > > ,
754
749
#[ cfg( not( test) ) ]
755
- inner : Mutex < ChannelMonitorImpl < Signer > > ,
750
+ pub ( super ) inner : Mutex < ChannelMonitorImpl < Signer > > ,
756
751
}
757
752
758
753
#[ derive( PartialEq ) ]
@@ -829,7 +824,8 @@ pub(crate) struct ChannelMonitorImpl<Signer: WriteableEcdsaChannelSigner> {
829
824
// we further MUST NOT generate events during block/transaction-disconnection.
830
825
pending_monitor_events : Vec < MonitorEvent > ,
831
826
832
- pending_events : Vec < Event > ,
827
+ pub ( super ) pending_events : Vec < Event > ,
828
+ pub ( super ) pending_events_processor : bool ,
833
829
834
830
// Used to track on-chain events (i.e., transactions part of channels confirmed on chain) on
835
831
// which to take actions once they reach enough confirmations. Each entry includes the
@@ -1088,6 +1084,38 @@ impl<Signer: WriteableEcdsaChannelSigner> Writeable for ChannelMonitorImpl<Signe
1088
1084
}
1089
1085
}
1090
1086
1087
+ macro_rules! _process_events_body {
1088
+ ( $self_opt: expr, $event_to_handle: expr, $handle_event: expr) => {
1089
+ loop {
1090
+ let ( pending_events, repeated_events) ;
1091
+ if let Some ( us) = $self_opt {
1092
+ let mut inner = us. inner. lock( ) . unwrap( ) ;
1093
+ if inner. pending_events_processor {
1094
+ break ;
1095
+ }
1096
+ inner. pending_events_processor = true ;
1097
+
1098
+ pending_events = inner. pending_events. clone( ) ;
1099
+ repeated_events = inner. get_repeated_events( ) ;
1100
+ } else { break ; }
1101
+ let num_events = pending_events. len( ) ;
1102
+
1103
+ for event in pending_events. into_iter( ) . chain( repeated_events. into_iter( ) ) {
1104
+ $event_to_handle = event;
1105
+ $handle_event;
1106
+ }
1107
+
1108
+ if let Some ( us) = $self_opt {
1109
+ let mut inner = us. inner. lock( ) . unwrap( ) ;
1110
+ inner. pending_events. drain( ..num_events) ;
1111
+ inner. pending_events_processor = false ;
1112
+ }
1113
+ break ;
1114
+ }
1115
+ }
1116
+ }
1117
+ pub ( super ) use _process_events_body as process_events_body;
1118
+
1091
1119
impl < Signer : WriteableEcdsaChannelSigner > ChannelMonitor < Signer > {
1092
1120
/// For lockorder enforcement purposes, we need to have a single site which constructs the
1093
1121
/// `inner` mutex, otherwise cases where we lock two monitors at the same time (eg in our
@@ -1179,6 +1207,7 @@ impl<Signer: WriteableEcdsaChannelSigner> ChannelMonitor<Signer> {
1179
1207
payment_preimages : HashMap :: new ( ) ,
1180
1208
pending_monitor_events : Vec :: new ( ) ,
1181
1209
pending_events : Vec :: new ( ) ,
1210
+ pending_events_processor : false ,
1182
1211
1183
1212
onchain_events_awaiting_threshold_conf : Vec :: new ( ) ,
1184
1213
outputs_to_watch,
@@ -1306,14 +1335,35 @@ impl<Signer: WriteableEcdsaChannelSigner> ChannelMonitor<Signer> {
1306
1335
self . inner . lock ( ) . unwrap ( ) . get_and_clear_pending_monitor_events ( )
1307
1336
}
1308
1337
1309
- /// Gets the list of pending events which were generated by previous actions, clearing the list
1310
- /// in the process.
1338
+ /// Processes [`SpendableOutputs`] events produced from each [`ChannelMonitor`] upon maturity.
1339
+ ///
1340
+ /// For channels featuring anchor outputs, this method will also process [`BumpTransaction`]
1341
+ /// events produced from each [`ChannelMonitor`] while there is a balance to claim onchain
1342
+ /// within each channel. As the confirmation of a commitment transaction may be critical to the
1343
+ /// safety of funds, we recommend invoking this every 30 seconds, or lower if running in an
1344
+ /// environment with spotty connections, like on mobile.
1311
1345
///
1312
- /// This is called by the [`EventsProvider::process_pending_events`] implementation for
1313
- /// [`ChainMonitor`] .
1346
+ /// An [`EventHandler`] may safely call back to the provider, though this shouldn't be needed in
1347
+ /// order to handle these events .
1314
1348
///
1315
- /// [`EventsProvider::process_pending_events`]: crate::events::EventsProvider::process_pending_events
1316
- /// [`ChainMonitor`]: crate::chain::chainmonitor::ChainMonitor
1349
+ /// [`SpendableOutputs`]: events::Event::SpendableOutputs
1350
+ /// [`BumpTransaction`]: events::Event::BumpTransaction
1351
+ pub fn process_pending_events < H : Deref > ( & self , handler : & H ) where H :: Target : EventHandler {
1352
+ let mut ev;
1353
+ process_events_body ! ( Some ( self ) , ev, handler. handle_event( ev) ) ;
1354
+ }
1355
+
1356
+ /// Processes any events asynchronously.
1357
+ ///
1358
+ /// See [`Self::process_pending_events`] for more information.
1359
+ pub async fn process_pending_events_async < Future : core:: future:: Future , H : Fn ( Event ) -> Future > (
1360
+ & self , handler : & H
1361
+ ) {
1362
+ let mut ev;
1363
+ process_events_body ! ( Some ( self ) , ev, { handler( ev) . await } ) ;
1364
+ }
1365
+
1366
+ #[ cfg( test) ]
1317
1367
pub fn get_and_clear_pending_events ( & self ) -> Vec < Event > {
1318
1368
self . inner . lock ( ) . unwrap ( ) . get_and_clear_pending_events ( )
1319
1369
}
@@ -2531,9 +2581,11 @@ impl<Signer: WriteableEcdsaChannelSigner> ChannelMonitorImpl<Signer> {
2531
2581
ret
2532
2582
}
2533
2583
2534
- pub fn get_and_clear_pending_events ( & mut self ) -> Vec < Event > {
2584
+ /// Gets the set of events which are repeated regularly (i.e. those which RBF bump
2585
+ /// transactions). We're okay if we lose these on restart as they'll be regenerated for us at
2586
+ /// some regular inverval.
2587
+ pub ( super ) fn get_repeated_events ( & mut self ) -> Vec < Event > {
2535
2588
let mut ret = Vec :: new ( ) ;
2536
- mem:: swap ( & mut ret, & mut self . pending_events ) ;
2537
2589
for ( claim_id, claim_event) in self . onchain_tx_handler . get_and_clear_pending_claim_events ( ) . drain ( ..) {
2538
2590
match claim_event {
2539
2591
ClaimEvent :: BumpCommitment {
@@ -4096,6 +4148,7 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP
4096
4148
payment_preimages,
4097
4149
pending_monitor_events : pending_monitor_events. unwrap ( ) ,
4098
4150
pending_events,
4151
+ pending_events_processor : false ,
4099
4152
4100
4153
onchain_events_awaiting_threshold_conf,
4101
4154
outputs_to_watch,
0 commit comments