@@ -5779,35 +5779,40 @@ where
57795779 pub async fn process_pending_events_async < Future : core:: future:: Future , H : Fn ( Event ) -> Future > (
57805780 & self , handler : H
57815781 ) {
5782- if self . pending_events_processor . compare_exchange ( false , true , Ordering :: Acquire , Ordering :: Relaxed ) . is_err ( ) {
5783- return ;
5784- }
5782+ let mut processed_all_events = false ;
5783+ while !processed_all_events {
5784+ if self . pending_events_processor . compare_exchange ( false , true , Ordering :: Acquire , Ordering :: Relaxed ) . is_err ( ) {
5785+ return ;
5786+ }
57855787
5786- let mut result = NotifyOption :: SkipPersist ;
5788+ let mut result = NotifyOption :: SkipPersist ;
57875789
5788- // TODO: This behavior should be documented. It's unintuitive that we query
5789- // ChannelMonitors when clearing other events.
5790- if self . process_pending_monitor_events ( ) {
5791- result = NotifyOption :: DoPersist ;
5792- }
5790+ // TODO: This behavior should be documented. It's unintuitive that we query
5791+ // ChannelMonitors when clearing other events.
5792+ if self . process_pending_monitor_events ( ) {
5793+ result = NotifyOption :: DoPersist ;
5794+ }
57935795
5794- let pending_events = self . pending_events . lock ( ) . unwrap ( ) . clone ( ) ;
5795- let num_events = pending_events. len ( ) ;
5796- if !pending_events. is_empty ( ) {
5797- result = NotifyOption :: DoPersist ;
5798- }
5796+ let pending_events = self . pending_events . lock ( ) . unwrap ( ) . clone ( ) ;
5797+ let num_events = pending_events. len ( ) ;
5798+ if !pending_events. is_empty ( ) {
5799+ result = NotifyOption :: DoPersist ;
5800+ }
57995801
5800- for event in pending_events {
5801- handler ( event) . await ;
5802- }
5802+ for event in pending_events {
5803+ handler ( event) . await ;
5804+ }
58035805
5804- self . pending_events . lock ( ) . unwrap ( ) . drain ( ..num_events) ;
5806+ let mut pending_events = self . pending_events . lock ( ) . unwrap ( ) . clone ( ) ;
5807+ pending_events. drain ( ..num_events) ;
5808+ processed_all_events = pending_events. is_empty ( ) ;
58055809
5806- if result == NotifyOption :: DoPersist {
5807- self . persistence_notifier . notify ( ) ;
5808- }
5810+ if result == NotifyOption :: DoPersist {
5811+ self . persistence_notifier . notify ( ) ;
5812+ }
58095813
5810- self . pending_events_processor . store ( false , Ordering :: Release ) ;
5814+ self . pending_events_processor . store ( false , Ordering :: Release ) ;
5815+ }
58115816 }
58125817}
58135818
0 commit comments