@@ -1715,23 +1715,39 @@ macro_rules! process_events_body {
1715
1715
}
1716
1716
1717
1717
let _single_processor = $self. pending_events_processor. lock( ) . unwrap( ) ;
1718
- let mut next_event = $self. pending_events. lock( ) . unwrap( ) . front( ) . map( |ev| ( * ev) . clone( ) ) ;
1719
1718
loop {
1720
- if let Some ( ( event, _action) ) = next_event {
1721
- result = NotifyOption :: DoPersist ;
1722
- let ev_clone;
1723
- #[ cfg( debug_assertions) ] {
1724
- ev_clone = event. clone( ) ;
1719
+ let mut next_event = $self. pending_events. lock( ) . unwrap( ) . front( ) . map( |ev| ( * ev) . clone( ) ) ;
1720
+ let mut post_event_actions = Vec :: new( ) ;
1721
+ loop {
1722
+ if let Some ( ( event, action_opt) ) = next_event {
1723
+ result = NotifyOption :: DoPersist ;
1724
+ let _ev_clone: Event ;
1725
+ #[ cfg( debug_assertions) ] {
1726
+ _ev_clone = event. clone( ) ;
1727
+ }
1728
+ $event_to_handle = event;
1729
+ $handle_event;
1730
+ let mut pending_events = $self. pending_events. lock( ) . unwrap( ) ;
1731
+ // We're required to take the `pending_events_processor` lock any time we
1732
+ // remove from `pending_events`, however have no general way to enforce that.
1733
+ // Instead, here, we assert that the next event hasn't changed out from under
1734
+ // us.
1735
+ #[ cfg( debug_assertions) ] {
1736
+ debug_assert_eq!( _ev_clone, pending_events. front( ) . unwrap( ) . 0 ) ;
1737
+ }
1738
+ debug_assert_eq!( action_opt, pending_events. front( ) . unwrap( ) . 1 ) ;
1739
+ if let Some ( action) = action_opt {
1740
+ post_event_actions. push( action) ;
1741
+ }
1742
+ pending_events. pop_front( ) ;
1743
+ next_event = pending_events. front( ) . map( |ev| ev. clone( ) ) ;
1744
+ } else {
1745
+ break ;
1725
1746
}
1726
- $event_to_handle = event;
1727
- $handle_event;
1728
- let mut pending_events = $self. pending_events. lock( ) . unwrap( ) ;
1729
- debug_assert_eq!( ev_clone, pending_events. front( ) . unwrap( ) . 0 ) ;
1730
- pending_events. pop_front( ) ;
1731
- next_event = pending_events. front( ) . map( |ev| ev. clone( ) ) ;
1732
- } else {
1733
- break ;
1734
1747
}
1748
+ if post_event_actions. is_empty( ) { break ; }
1749
+ $self. handle_post_event_actions( post_event_actions) ;
1750
+ // If we had some actions, go around again as we may have more events now
1735
1751
}
1736
1752
1737
1753
if result == NotifyOption :: DoPersist {
@@ -5836,6 +5852,66 @@ where
5836
5852
self . pending_outbound_payments . clear_pending_payments ( )
5837
5853
}
5838
5854
5855
+ fn handle_monitor_update_release ( & self , counterparty_node_id : PublicKey , channel_funding_outpoint : OutPoint ) {
5856
+ loop {
5857
+ let per_peer_state = self . per_peer_state . read ( ) . unwrap ( ) ;
5858
+ if let Some ( peer_state_mtx) = per_peer_state. get ( & counterparty_node_id) {
5859
+ let mut peer_state_lck = peer_state_mtx. lock ( ) . unwrap ( ) ;
5860
+ let peer_state = & mut * peer_state_lck;
5861
+ if self . pending_events . lock ( ) . unwrap ( ) . iter ( )
5862
+ . any ( |( _ev, action_opt) | action_opt == & Some ( EventCompletionAction :: ReleaseRAAChannelMonitorUpdate {
5863
+ channel_funding_outpoint, counterparty_node_id
5864
+ } ) )
5865
+ {
5866
+ // Check that, while holding the peer lock, we don't have another event
5867
+ // blocking any monitor updates for this channel. If we do, let those
5868
+ // events be the ones that ultimately release the monitor update(s).
5869
+ log_trace ! ( self . logger, "Delaying monitor unlock for channel {} as another event is pending" ,
5870
+ log_bytes!( & channel_funding_outpoint. to_channel_id( ) [ ..] ) ) ;
5871
+ return ;
5872
+ }
5873
+ if let hash_map:: Entry :: Occupied ( mut chan) = peer_state. channel_by_id . entry ( channel_funding_outpoint. to_channel_id ( ) ) {
5874
+ debug_assert_eq ! ( chan. get( ) . get_funding_txo( ) . unwrap( ) , channel_funding_outpoint) ;
5875
+ if let Some ( ( monitor_update, further_update_exists) ) = chan. get_mut ( ) . unblock_next_blocked_monitor_update ( ) {
5876
+ log_debug ! ( self . logger, "Unlocking monitor updating for channel {} and updating monitor" ,
5877
+ log_bytes!( & channel_funding_outpoint. to_channel_id( ) [ ..] ) ) ;
5878
+ let update_res = self . chain_monitor . update_channel ( channel_funding_outpoint, monitor_update) ;
5879
+ let update_id = monitor_update. update_id ;
5880
+ let _ = handle_error ! ( self ,
5881
+ handle_new_monitor_update!( self , update_res, update_id,
5882
+ peer_state_lck, peer_state, per_peer_state, chan) ,
5883
+ counterparty_node_id) ;
5884
+ if further_update_exists {
5885
+ // If there are more `ChannelMonitorUpdate`s to process, restart at the
5886
+ // top of the loop.
5887
+ continue ;
5888
+ }
5889
+ } else {
5890
+ log_trace ! ( self . logger, "Unlocked monitor updating for channel {} without monitors to update" ,
5891
+ log_bytes!( & channel_funding_outpoint. to_channel_id( ) [ ..] ) ) ;
5892
+ }
5893
+ }
5894
+ } else {
5895
+ log_debug ! ( self . logger,
5896
+ "Got a release post-RAA monitor update for peer {} but the channel is gone" ,
5897
+ log_pubkey!( counterparty_node_id) ) ;
5898
+ }
5899
+ break ;
5900
+ }
5901
+ }
5902
+
5903
+ fn handle_post_event_actions ( & self , actions : Vec < EventCompletionAction > ) {
5904
+ for action in actions {
5905
+ match action {
5906
+ EventCompletionAction :: ReleaseRAAChannelMonitorUpdate {
5907
+ channel_funding_outpoint, counterparty_node_id
5908
+ } => {
5909
+ self . handle_monitor_update_release ( counterparty_node_id, channel_funding_outpoint) ;
5910
+ }
5911
+ }
5912
+ }
5913
+ }
5914
+
5839
5915
/// Processes any events asynchronously in the order they were generated since the last call
5840
5916
/// using the given event handler.
5841
5917
///
0 commit comments