@@ -72,7 +72,7 @@ use core::{cmp, mem};
72
72
use core:: cell:: RefCell ;
73
73
use crate :: io:: Read ;
74
74
use crate :: sync:: { Arc , Mutex , RwLock , RwLockReadGuard , FairRwLock , LockTestExt , LockHeldState } ;
75
- use core:: sync:: atomic:: { AtomicUsize , Ordering } ;
75
+ use core:: sync:: atomic:: { AtomicUsize , AtomicBool , Ordering } ;
76
76
use core:: time:: Duration ;
77
77
use core:: ops:: Deref ;
78
78
@@ -926,6 +926,8 @@ where
926
926
927
927
/// See `ChannelManager` struct-level documentation for lock order requirements.
928
928
pending_events : Mutex < Vec < events:: Event > > ,
929
+ /// A simple atomic flag to ensure only one task at a time can be processing events asynchronously.
930
+ pending_events_processor : AtomicBool ,
929
931
/// See `ChannelManager` struct-level documentation for lock order requirements.
930
932
pending_background_events : Mutex < Vec < BackgroundEvent > > ,
931
933
/// Used when we have to take a BIG lock to make sure everything is self-consistent.
@@ -1680,30 +1682,40 @@ macro_rules! handle_new_monitor_update {
1680
1682
1681
1683
macro_rules! process_events_body {
1682
1684
( $self: expr, $event_to_handle: expr, $handle_event: expr) => {
1683
- // We'll acquire our total consistency lock until the returned future completes so that
1684
- // we can be sure no other persists happen while processing events.
1685
- let _read_guard = $self. total_consistency_lock. read( ) . unwrap( ) ;
1685
+ let mut processed_all_events = false ;
1686
+ while !processed_all_events {
1687
+ if $self. pending_events_processor. compare_exchange( false , true , Ordering :: Acquire , Ordering :: Relaxed ) . is_err( ) {
1688
+ return ;
1689
+ }
1686
1690
1687
- let mut result = NotifyOption :: SkipPersist ;
1691
+ let mut result = NotifyOption :: SkipPersist ;
1688
1692
1689
- // TODO: This behavior should be documented. It's unintuitive that we query
1690
- // ChannelMonitors when clearing other events.
1691
- if $self. process_pending_monitor_events( ) {
1692
- result = NotifyOption :: DoPersist ;
1693
- }
1693
+ // TODO: This behavior should be documented. It's unintuitive that we query
1694
+ // ChannelMonitors when clearing other events.
1695
+ if $self. process_pending_monitor_events( ) {
1696
+ result = NotifyOption :: DoPersist ;
1697
+ }
1694
1698
1695
- let pending_events = mem:: replace( & mut * $self. pending_events. lock( ) . unwrap( ) , vec![ ] ) ;
1696
- if !pending_events. is_empty( ) {
1697
- result = NotifyOption :: DoPersist ;
1698
- }
1699
+ let pending_events = $self. pending_events. lock( ) . unwrap( ) . clone( ) ;
1700
+ let num_events = pending_events. len( ) ;
1701
+ if !pending_events. is_empty( ) {
1702
+ result = NotifyOption :: DoPersist ;
1703
+ }
1699
1704
1700
- for event in pending_events {
1701
- $event_to_handle = event;
1702
- $handle_event;
1703
- }
1705
+ for event in pending_events {
1706
+ $event_to_handle = event;
1707
+ $handle_event;
1708
+ }
1704
1709
1705
- if result == NotifyOption :: DoPersist {
1706
- $self. persistence_notifier. notify( ) ;
1710
+ let mut pending_events = $self. pending_events. lock( ) . unwrap( ) ;
1711
+ pending_events. drain( ..num_events) ;
1712
+ processed_all_events = pending_events. is_empty( ) ;
1713
+
1714
+ if result == NotifyOption :: DoPersist {
1715
+ $self. persistence_notifier. notify( ) ;
1716
+ }
1717
+
1718
+ $self. pending_events_processor. store( false , Ordering :: Release ) ;
1707
1719
}
1708
1720
}
1709
1721
}
@@ -1771,6 +1783,7 @@ where
1771
1783
per_peer_state : FairRwLock :: new ( HashMap :: new ( ) ) ,
1772
1784
1773
1785
pending_events : Mutex :: new ( Vec :: new ( ) ) ,
1786
+ pending_events_processor : AtomicBool :: new ( false ) ,
1774
1787
pending_background_events : Mutex :: new ( Vec :: new ( ) ) ,
1775
1788
total_consistency_lock : RwLock :: new ( ( ) ) ,
1776
1789
persistence_notifier : Notifier :: new ( ) ,
@@ -4369,8 +4382,6 @@ where
4369
4382
}
4370
4383
4371
4384
fn channel_monitor_updated ( & self , funding_txo : & OutPoint , highest_applied_update_id : u64 , counterparty_node_id : Option < & PublicKey > ) {
4372
- debug_assert ! ( self . total_consistency_lock. try_write( ) . is_err( ) ) ; // Caller holds read lock
4373
-
4374
4385
let counterparty_node_id = match counterparty_node_id {
4375
4386
Some ( cp_id) => cp_id. clone ( ) ,
4376
4387
None => {
@@ -5312,7 +5323,8 @@ where
5312
5323
5313
5324
/// Process pending events from the [`chain::Watch`], returning whether any events were processed.
5314
5325
fn process_pending_monitor_events ( & self ) -> bool {
5315
- debug_assert ! ( self . total_consistency_lock. try_write( ) . is_err( ) ) ; // Caller holds read lock
5326
+ debug_assert ! ( self . total_consistency_lock. try_write( ) . is_err( ) ||
5327
+ self . pending_events_processor. load( Ordering :: Relaxed ) ) ; // Caller holds read lock or processes events asynchronously.
5316
5328
5317
5329
let mut failed_channels = Vec :: new ( ) ;
5318
5330
let mut pending_monitor_events = self . chain_monitor . release_pending_monitor_events ( ) ;
@@ -7916,6 +7928,7 @@ where
7916
7928
per_peer_state : FairRwLock :: new ( per_peer_state) ,
7917
7929
7918
7930
pending_events : Mutex :: new ( pending_events_read) ,
7931
+ pending_events_processor : AtomicBool :: new ( false ) ,
7919
7932
pending_background_events : Mutex :: new ( pending_background_events) ,
7920
7933
total_consistency_lock : RwLock :: new ( ( ) ) ,
7921
7934
persistence_notifier : Notifier :: new ( ) ,
0 commit comments