diff --git a/lightning/src/ln/chanmon_update_fail_tests.rs b/lightning/src/ln/chanmon_update_fail_tests.rs index 3f210cb46c5..d495910eed1 100644 --- a/lightning/src/ln/chanmon_update_fail_tests.rs +++ b/lightning/src/ln/chanmon_update_fail_tests.rs @@ -146,7 +146,7 @@ fn test_monitor_and_persister_update_fail() { let mut node_0_per_peer_lock; let mut node_0_peer_state_lock; let mut channel = get_channel_ref!(nodes[0], nodes[1], node_0_per_peer_lock, node_0_peer_state_lock, chan.2); - if let Ok(update) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].logger) { + if let Ok(Some(update)) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].logger) { // Check that even though the persister is returning a InProgress, // because the update is bogus, ultimately the error that's returned // should be a PermanentFailure. diff --git a/lightning/src/ln/channel.rs b/lightning/src/ln/channel.rs index 8b5e89d9441..73683eec326 100644 --- a/lightning/src/ln/channel.rs +++ b/lightning/src/ln/channel.rs @@ -479,6 +479,21 @@ pub(crate) const MIN_AFFORDABLE_HTLC_COUNT: usize = 4; /// * `EXPIRE_PREV_CONFIG_TICKS` = convergence_delay / tick_interval pub(crate) const EXPIRE_PREV_CONFIG_TICKS: usize = 5; +struct PendingChannelMonitorUpdate { + update: ChannelMonitorUpdate, + /// In some cases we need to delay letting the [`ChannelMonitorUpdate`] go until after an + /// `Event` is processed by the user. This bool indicates the [`ChannelMonitorUpdate`] is + /// blocked on some external event and the [`ChannelManager`] will update us when we're ready. + /// + /// [`ChannelManager`]: super::channelmanager::ChannelManager + blocked: bool, +} + +impl_writeable_tlv_based!(PendingChannelMonitorUpdate, { + (0, update, required), + (2, blocked, required), +}); + // TODO: We should refactor this to be an Inbound/OutboundChannel until initial setup handshaking // has been completed, and then turn into a Channel to get compiler-time enforcement of things like // calling channel_id() before we're set up or things like get_outbound_funding_signed on an @@ -744,7 +759,7 @@ pub(super) struct Channel { /// If we then persist the [`channelmanager::ChannelManager`] and crash before the persistence /// completes we still need to be able to complete the persistence. Thus, we have to keep a /// copy of the [`ChannelMonitorUpdate`] here until it is complete. - pending_monitor_updates: Vec, + pending_monitor_updates: Vec, } #[cfg(any(test, fuzzing))] @@ -1979,28 +1994,52 @@ impl Channel { } pub fn get_update_fulfill_htlc_and_commit(&mut self, htlc_id: u64, payment_preimage: PaymentPreimage, logger: &L) -> UpdateFulfillCommitFetch where L::Target: Logger { + let release_cs_monitor = self.pending_monitor_updates.iter().all(|upd| !upd.blocked); match self.get_update_fulfill_htlc(htlc_id, payment_preimage, logger) { - UpdateFulfillFetch::NewClaim { mut monitor_update, htlc_value_msat, msg: Some(_) } => { - let mut additional_update = self.build_commitment_no_status_check(logger); - // build_commitment_no_status_check may bump latest_monitor_id but we want them to be - // strictly increasing by one, so decrement it here. - self.latest_monitor_update_id = monitor_update.update_id; - monitor_update.updates.append(&mut additional_update.updates); - self.monitor_updating_paused(false, true, false, Vec::new(), Vec::new(), Vec::new()); - self.pending_monitor_updates.push(monitor_update); + UpdateFulfillFetch::NewClaim { mut monitor_update, htlc_value_msat, msg } => { + // Even if we aren't supposed to let new monitor updates with commitment state + // updates run, we still need to push the preimage ChannelMonitorUpdateStep no + // matter what. Sadly, to push a new monitor update which flies before others + // already queued, we have to insert it into the pending queue and update the + // update_ids of all the following monitors. + let unblocked_update_pos = if release_cs_monitor && msg.is_some() { + let mut additional_update = self.build_commitment_no_status_check(logger); + // build_commitment_no_status_check may bump latest_monitor_id but we want them + // to be strictly increasing by one, so decrement it here. + self.latest_monitor_update_id = monitor_update.update_id; + monitor_update.updates.append(&mut additional_update.updates); + self.pending_monitor_updates.push(PendingChannelMonitorUpdate { + update: monitor_update, blocked: false, + }); + self.pending_monitor_updates.len() - 1 + } else { + let insert_pos = self.pending_monitor_updates.iter().position(|upd| upd.blocked) + .unwrap_or(self.pending_monitor_updates.len()); + let new_mon_id = self.pending_monitor_updates.get(insert_pos) + .map(|upd| upd.update.update_id).unwrap_or(monitor_update.update_id); + monitor_update.update_id = new_mon_id; + self.pending_monitor_updates.insert(insert_pos, PendingChannelMonitorUpdate { + update: monitor_update, blocked: false, + }); + for held_update in self.pending_monitor_updates.iter_mut().skip(insert_pos + 1) { + held_update.update.update_id += 1; + } + if msg.is_some() { + debug_assert!(false, "If there is a pending blocked monitor we should have MonitorUpdateInProgress set"); + let update = self.build_commitment_no_status_check(logger); + self.pending_monitor_updates.push(PendingChannelMonitorUpdate { + update, blocked: true, + }); + } + insert_pos + }; + self.monitor_updating_paused(false, msg.is_some(), false, Vec::new(), Vec::new(), Vec::new()); UpdateFulfillCommitFetch::NewClaim { - monitor_update: self.pending_monitor_updates.last().unwrap(), + monitor_update: &self.pending_monitor_updates.get(unblocked_update_pos) + .expect("We just pushed the monitor update").update, htlc_value_msat, } }, - UpdateFulfillFetch::NewClaim { monitor_update, htlc_value_msat, msg: None } => { - self.monitor_updating_paused(false, false, false, Vec::new(), Vec::new(), Vec::new()); - self.pending_monitor_updates.push(monitor_update); - UpdateFulfillCommitFetch::NewClaim { - monitor_update: self.pending_monitor_updates.last().unwrap(), - htlc_value_msat, - } - } UpdateFulfillFetch::DuplicateClaim {} => UpdateFulfillCommitFetch::DuplicateClaim {}, } } @@ -3068,7 +3107,7 @@ impl Channel { Ok(()) } - pub fn commitment_signed(&mut self, msg: &msgs::CommitmentSigned, logger: &L) -> Result<&ChannelMonitorUpdate, ChannelError> + pub fn commitment_signed(&mut self, msg: &msgs::CommitmentSigned, logger: &L) -> Result, ChannelError> where L::Target: Logger { if (self.channel_state & (ChannelState::ChannelReady as u32)) != (ChannelState::ChannelReady as u32) { @@ -3268,8 +3307,7 @@ impl Channel { } log_debug!(logger, "Received valid commitment_signed from peer in channel {}, updated HTLC state but awaiting a monitor update resolution to reply.", log_bytes!(self.channel_id)); - self.pending_monitor_updates.push(monitor_update); - return Ok(self.pending_monitor_updates.last().unwrap()); + return Ok(self.push_ret_blockable_mon_update(monitor_update)); } let need_commitment_signed = if need_commitment && (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32)) == 0 { @@ -3286,9 +3324,8 @@ impl Channel { log_debug!(logger, "Received valid commitment_signed from peer in channel {}, updating HTLC state and responding with{} a revoke_and_ack.", log_bytes!(self.channel_id()), if need_commitment_signed { " our own commitment_signed and" } else { "" }); - self.pending_monitor_updates.push(monitor_update); self.monitor_updating_paused(true, need_commitment_signed, false, Vec::new(), Vec::new(), Vec::new()); - return Ok(self.pending_monitor_updates.last().unwrap()); + return Ok(self.push_ret_blockable_mon_update(monitor_update)); } /// Public version of the below, checking relevant preconditions first. @@ -3403,8 +3440,7 @@ impl Channel { update_add_htlcs.len(), update_fulfill_htlcs.len(), update_fail_htlcs.len()); self.monitor_updating_paused(false, true, false, Vec::new(), Vec::new(), Vec::new()); - self.pending_monitor_updates.push(monitor_update); - (Some(self.pending_monitor_updates.last().unwrap()), htlcs_to_fail) + (self.push_ret_blockable_mon_update(monitor_update), htlcs_to_fail) } else { (None, Vec::new()) } @@ -3415,7 +3451,7 @@ impl Channel { /// waiting on this revoke_and_ack. The generation of this new commitment_signed may also fail, /// generating an appropriate error *after* the channel state has been updated based on the /// revoke_and_ack message. - pub fn revoke_and_ack(&mut self, msg: &msgs::RevokeAndACK, logger: &L) -> Result<(Vec<(HTLCSource, PaymentHash)>, &ChannelMonitorUpdate), ChannelError> + pub fn revoke_and_ack(&mut self, msg: &msgs::RevokeAndACK, logger: &L) -> Result<(Vec<(HTLCSource, PaymentHash)>, Option<&ChannelMonitorUpdate>), ChannelError> where L::Target: Logger, { if (self.channel_state & (ChannelState::ChannelReady as u32)) != (ChannelState::ChannelReady as u32) { @@ -3612,21 +3648,19 @@ impl Channel { self.monitor_pending_failures.append(&mut revoked_htlcs); self.monitor_pending_finalized_fulfills.append(&mut finalized_claimed_htlcs); log_debug!(logger, "Received a valid revoke_and_ack for channel {} but awaiting a monitor update resolution to reply.", log_bytes!(self.channel_id())); - self.pending_monitor_updates.push(monitor_update); - return Ok((Vec::new(), self.pending_monitor_updates.last().unwrap())); + return Ok((Vec::new(), self.push_ret_blockable_mon_update(monitor_update))); } match self.free_holding_cell_htlcs(logger) { (Some(_), htlcs_to_fail) => { - let mut additional_update = self.pending_monitor_updates.pop().unwrap(); + let mut additional_update = self.pending_monitor_updates.pop().unwrap().update; // free_holding_cell_htlcs may bump latest_monitor_id multiple times but we want them to be // strictly increasing by one, so decrement it here. self.latest_monitor_update_id = monitor_update.update_id; monitor_update.updates.append(&mut additional_update.updates); self.monitor_updating_paused(false, true, false, to_forward_infos, revoked_htlcs, finalized_claimed_htlcs); - self.pending_monitor_updates.push(monitor_update); - Ok((htlcs_to_fail, self.pending_monitor_updates.last().unwrap())) + Ok((htlcs_to_fail, self.push_ret_blockable_mon_update(monitor_update))) }, (None, htlcs_to_fail) => { if require_commitment { @@ -3640,13 +3674,11 @@ impl Channel { log_debug!(logger, "Received a valid revoke_and_ack for channel {}. Responding with a commitment update with {} HTLCs failed.", log_bytes!(self.channel_id()), update_fail_htlcs.len() + update_fail_malformed_htlcs.len()); self.monitor_updating_paused(false, true, false, to_forward_infos, revoked_htlcs, finalized_claimed_htlcs); - self.pending_monitor_updates.push(monitor_update); - Ok((htlcs_to_fail, self.pending_monitor_updates.last().unwrap())) + Ok((htlcs_to_fail, self.push_ret_blockable_mon_update(monitor_update))) } else { log_debug!(logger, "Received a valid revoke_and_ack for channel {} with no reply necessary.", log_bytes!(self.channel_id())); self.monitor_updating_paused(false, false, false, to_forward_infos, revoked_htlcs, finalized_claimed_htlcs); - self.pending_monitor_updates.push(monitor_update); - Ok((htlcs_to_fail, self.pending_monitor_updates.last().unwrap())) + Ok((htlcs_to_fail, self.push_ret_blockable_mon_update(monitor_update))) } } } @@ -3835,7 +3867,12 @@ impl Channel { { assert_eq!(self.channel_state & ChannelState::MonitorUpdateInProgress as u32, ChannelState::MonitorUpdateInProgress as u32); self.channel_state &= !(ChannelState::MonitorUpdateInProgress as u32); - self.pending_monitor_updates.clear(); + let mut found_blocked = false; + self.pending_monitor_updates.retain(|upd| { + if found_blocked { debug_assert!(upd.blocked, "No mons may be unblocked after a blocked one"); } + if upd.blocked { found_blocked = true; } + upd.blocked + }); // If we're past (or at) the FundingSent stage on an outbound channel, try to // (re-)broadcast the funding transaction as we may have declined to broadcast it when we @@ -4378,8 +4415,9 @@ impl Channel { }], }; self.monitor_updating_paused(false, false, false, Vec::new(), Vec::new(), Vec::new()); - self.pending_monitor_updates.push(monitor_update); - Some(self.pending_monitor_updates.last().unwrap()) + if self.push_blockable_mon_update(monitor_update) { + self.pending_monitor_updates.last().map(|upd| &upd.update) + } else { None } } else { None }; let shutdown = if send_shutdown { Some(msgs::Shutdown { @@ -4951,8 +4989,49 @@ impl Channel { (self.channel_state & ChannelState::MonitorUpdateInProgress as u32) != 0 } - pub fn get_next_monitor_update(&self) -> Option<&ChannelMonitorUpdate> { - self.pending_monitor_updates.first() + pub fn get_latest_complete_monitor_update_id(&self) -> u64 { + if self.pending_monitor_updates.is_empty() { return self.get_latest_monitor_update_id(); } + self.pending_monitor_updates[0].update.update_id - 1 + } + + /// Returns the next blocked monitor update, if one exists, and a bool which indicates a + /// further blocked monitor update exists after the next. + pub fn unblock_next_blocked_monitor_update(&mut self) -> Option<(&ChannelMonitorUpdate, bool)> { + for i in 0..self.pending_monitor_updates.len() { + if self.pending_monitor_updates[i].blocked { + self.pending_monitor_updates[i].blocked = false; + return Some((&self.pending_monitor_updates[i].update, + self.pending_monitor_updates.len() > i + 1)); + } + } + None + } + + /// Pushes a new monitor update into our monitor update queue, returning whether it should be + /// immediately given to the user for persisting or if it should be held as blocked. + fn push_blockable_mon_update(&mut self, update: ChannelMonitorUpdate) -> bool { + let release_monitor = self.pending_monitor_updates.iter().all(|upd| !upd.blocked); + self.pending_monitor_updates.push(PendingChannelMonitorUpdate { + update, blocked: !release_monitor + }); + release_monitor + } + + /// Pushes a new monitor update into our monitor update queue, returning a reference to it if + /// it should be immediately given to the user for persisting or `None` if it should be held as + /// blocked. + fn push_ret_blockable_mon_update(&mut self, update: ChannelMonitorUpdate) + -> Option<&ChannelMonitorUpdate> { + let release_monitor = self.push_blockable_mon_update(update); + if release_monitor { self.pending_monitor_updates.last().map(|upd| &upd.update) } else { None } + } + + pub fn no_monitor_updates_pending(&self) -> bool { + self.pending_monitor_updates.is_empty() + } + + pub fn complete_one_mon_update(&mut self, update_id: u64) { + self.pending_monitor_updates.retain(|upd| upd.update.update_id != update_id); } /// Returns true if funding_created was sent/received. @@ -6000,8 +6079,7 @@ impl Channel { Some(_) => { let monitor_update = self.build_commitment_no_status_check(logger); self.monitor_updating_paused(false, true, false, Vec::new(), Vec::new(), Vec::new()); - self.pending_monitor_updates.push(monitor_update); - Ok(Some(self.pending_monitor_updates.last().unwrap())) + Ok(self.push_ret_blockable_mon_update(monitor_update)) }, None => Ok(None) } @@ -6090,8 +6168,9 @@ impl Channel { }], }; self.monitor_updating_paused(false, false, false, Vec::new(), Vec::new(), Vec::new()); - self.pending_monitor_updates.push(monitor_update); - Some(self.pending_monitor_updates.last().unwrap()) + if self.push_blockable_mon_update(monitor_update) { + self.pending_monitor_updates.last().map(|upd| &upd.update) + } else { None } } else { None }; let shutdown = msgs::Shutdown { channel_id: self.channel_id, @@ -6528,6 +6607,7 @@ impl Writeable for Channel { (28, holder_max_accepted_htlcs, option), (29, self.temporary_channel_id, option), (31, channel_pending_event_emitted, option), + (33, self.pending_monitor_updates, vec_type), }); Ok(()) @@ -6804,6 +6884,8 @@ impl<'a, 'b, 'c, ES: Deref, SP: Deref> ReadableArgs<(&'a ES, &'b SP, u32, &'c Ch let mut temporary_channel_id: Option<[u8; 32]> = None; let mut holder_max_accepted_htlcs: Option = None; + let mut pending_monitor_updates = Some(Vec::new()); + read_tlv_fields!(reader, { (0, announcement_sigs, option), (1, minimum_depth, option), @@ -6826,6 +6908,7 @@ impl<'a, 'b, 'c, ES: Deref, SP: Deref> ReadableArgs<(&'a ES, &'b SP, u32, &'c Ch (28, holder_max_accepted_htlcs, option), (29, temporary_channel_id, option), (31, channel_pending_event_emitted, option), + (33, pending_monitor_updates, vec_type), }); let (channel_keys_id, holder_signer) = if let Some(channel_keys_id) = channel_keys_id { @@ -6995,7 +7078,7 @@ impl<'a, 'b, 'c, ES: Deref, SP: Deref> ReadableArgs<(&'a ES, &'b SP, u32, &'c Ch channel_type: channel_type.unwrap(), channel_keys_id, - pending_monitor_updates: Vec::new(), + pending_monitor_updates: pending_monitor_updates.unwrap(), }) } } diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 498813de7a1..326206ed749 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -521,6 +521,20 @@ impl_writeable_tlv_based_enum_upgradable!(MonitorUpdateCompletionAction, (2, EmitEvent) => { (0, event, upgradable_required) }, ); +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) enum EventCompletionAction { + ReleaseRAAChannelMonitorUpdate { + counterparty_node_id: PublicKey, + channel_funding_outpoint: OutPoint, + }, +} +impl_writeable_tlv_based_enum!(EventCompletionAction, + (0, ReleaseRAAChannelMonitorUpdate) => { + (0, channel_funding_outpoint, required), + (2, counterparty_node_id, required), + }; +); + /// State we hold per-peer. pub(super) struct PeerState { /// `temporary_channel_id` or `channel_id` -> `channel`. @@ -932,8 +946,17 @@ where #[cfg(any(test, feature = "_test_utils"))] pub(super) per_peer_state: FairRwLock::Signer>>>>, + /// The set of events which we need to give to the user to handle. In some cases an event may + /// require some further action after the user handles it (currently only blocking a monitor + /// update from being handed to the user to ensure the included changes to the channel state + /// are handled by the user before they're persisted durably to disk). In that case, the second + /// element in the tuple is set to `Some` with further details of the action. + /// + /// Note that events MUST NOT be removed from pending_events after deserialization, as they + /// could be in the middle of being processed without the direct mutex held. + /// /// See `ChannelManager` struct-level documentation for lock order requirements. - pending_events: Mutex>, + pending_events: Mutex)>>, /// A simple atomic flag to ensure only one task at a time can be processing events asynchronously. pending_events_processor: AtomicBool, /// See `ChannelManager` struct-level documentation for lock order requirements. @@ -1446,10 +1469,10 @@ macro_rules! handle_error { }); } if let Some((channel_id, user_channel_id)) = chan_id { - $self.pending_events.lock().unwrap().push(events::Event::ChannelClosed { + $self.pending_events.lock().unwrap().push_back((events::Event::ChannelClosed { channel_id, user_channel_id, reason: ClosureReason::ProcessingError { err: err.err.clone() } - }); + }, None)); } } @@ -1581,13 +1604,13 @@ macro_rules! send_channel_ready { macro_rules! emit_channel_pending_event { ($locked_events: expr, $channel: expr) => { if $channel.should_emit_channel_pending_event() { - $locked_events.push(events::Event::ChannelPending { + $locked_events.push_back((events::Event::ChannelPending { channel_id: $channel.channel_id(), former_temporary_channel_id: $channel.temporary_channel_id(), counterparty_node_id: $channel.get_counterparty_node_id(), user_channel_id: $channel.get_user_id(), funding_txo: $channel.get_funding_txo().unwrap().into_bitcoin_outpoint(), - }); + }, None)); $channel.set_channel_pending_event_emitted(); } } @@ -1597,12 +1620,12 @@ macro_rules! emit_channel_ready_event { ($locked_events: expr, $channel: expr) => { if $channel.should_emit_channel_ready_event() { debug_assert!($channel.channel_pending_event_emitted()); - $locked_events.push(events::Event::ChannelReady { + $locked_events.push_back((events::Event::ChannelReady { channel_id: $channel.channel_id(), user_channel_id: $channel.get_user_id(), counterparty_node_id: $channel.get_counterparty_node_id(), channel_type: $channel.get_channel_type().clone(), - }); + }, None)); $channel.set_channel_ready_event_emitted(); } } @@ -1680,11 +1703,8 @@ macro_rules! handle_new_monitor_update { res }, ChannelMonitorUpdateStatus::Completed => { - if ($update_id == 0 || $chan.get_next_monitor_update() - .expect("We can't be processing a monitor update if it isn't queued") - .update_id == $update_id) && - $chan.get_latest_monitor_update_id() == $update_id - { + $chan.complete_one_mon_update($update_id); + if $chan.no_monitor_updates_pending() { handle_monitor_update_completion!($self, $update_id, $peer_state_lock, $peer_state, $per_peer_state_lock, $chan); } Ok(()) @@ -1724,9 +1744,14 @@ macro_rules! process_events_body { result = NotifyOption::DoPersist; } - for event in pending_events { + let mut post_event_actions = Vec::new(); + + for (event, action_opt) in pending_events { $event_to_handle = event; $handle_event; + if let Some(action) = action_opt { + post_event_actions.push(action); + } } { @@ -1736,6 +1761,12 @@ macro_rules! process_events_body { $self.pending_events_processor.store(false, Ordering::Release); } + if !post_event_actions.is_empty() { + $self.handle_post_event_actions(post_event_actions); + // If we had some actions, go around again as we may have more events now + processed_all_events = false; + } + if result == NotifyOption::DoPersist { $self.persistence_notifier.notify(); } @@ -1805,7 +1836,7 @@ where per_peer_state: FairRwLock::new(HashMap::new()), - pending_events: Mutex::new(Vec::new()), + pending_events: Mutex::new(VecDeque::new()), pending_events_processor: AtomicBool::new(false), pending_background_events: Mutex::new(Vec::new()), total_consistency_lock: RwLock::new(()), @@ -2013,15 +2044,17 @@ where let mut pending_events_lock = self.pending_events.lock().unwrap(); match channel.unbroadcasted_funding() { Some(transaction) => { - pending_events_lock.push(events::Event::DiscardFunding { channel_id: channel.channel_id(), transaction }) + pending_events_lock.push_back((events::Event::DiscardFunding { + channel_id: channel.channel_id(), transaction + }, None)); }, None => {}, } - pending_events_lock.push(events::Event::ChannelClosed { + pending_events_lock.push_back((events::Event::ChannelClosed { channel_id: channel.channel_id(), user_channel_id: channel.get_user_id(), reason: closure_reason - }); + }, None)); } fn close_channel_internal(&self, channel_id: &[u8; 32], counterparty_node_id: &PublicKey, target_feerate_sats_per_1000_weight: Option) -> Result<(), APIError> { @@ -3236,7 +3269,7 @@ where pub fn process_pending_htlc_forwards(&self) { let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); - let mut new_events = Vec::new(); + let mut new_events = VecDeque::new(); let mut failed_forwards = Vec::new(); let mut phantom_receives: Vec<(u64, OutPoint, u128, Vec<(PendingHTLCInfo, u64)>)> = Vec::new(); { @@ -3562,7 +3595,7 @@ where htlcs.push(claimable_htlc); let amount_msat = htlcs.iter().map(|htlc| htlc.value).sum(); htlcs.iter_mut().for_each(|htlc| htlc.total_value_received = Some(amount_msat)); - new_events.push(events::Event::PaymentClaimable { + new_events.push_back((events::Event::PaymentClaimable { receiver_node_id: Some(receiver_node_id), payment_hash, purpose: purpose(), @@ -3571,7 +3604,7 @@ where via_user_channel_id: Some(prev_user_channel_id), claim_deadline: Some(earliest_expiry - HTLC_FAIL_BACK_BUFFER), onion_fields: claimable_payment.onion_fields.clone(), - }); + }, None)); payment_claimable_generated = true; } else { // Nothing to do - we haven't reached the total @@ -3632,7 +3665,7 @@ where htlcs: vec![claimable_htlc], }); let prev_channel_id = prev_funding_outpoint.to_channel_id(); - new_events.push(events::Event::PaymentClaimable { + new_events.push_back((events::Event::PaymentClaimable { receiver_node_id: Some(receiver_node_id), payment_hash, amount_msat, @@ -3641,7 +3674,7 @@ where via_user_channel_id: Some(prev_user_channel_id), claim_deadline, onion_fields: Some(onion_fields), - }); + }, None)); }, hash_map::Entry::Occupied(_) => { log_trace!(self.logger, "Failing new keysend HTLC with payment_hash {} for a duplicative payment hash", log_bytes!(payment_hash.0)); @@ -4119,10 +4152,10 @@ where mem::drop(forward_htlcs); if push_forward_ev { self.push_pending_forwards_ev(); } let mut pending_events = self.pending_events.lock().unwrap(); - pending_events.push(events::Event::HTLCHandlingFailed { + pending_events.push_back((events::Event::HTLCHandlingFailed { prev_channel_id: outpoint.to_channel_id(), failed_next_destination: destination, - }); + }, None)); }, } } @@ -4385,13 +4418,13 @@ where MonitorUpdateCompletionAction::PaymentClaimed { payment_hash } => { let payment = self.claimable_payments.lock().unwrap().pending_claiming_payments.remove(&payment_hash); if let Some(ClaimingPayment { amount_msat, payment_purpose: purpose, receiver_node_id }) = payment { - self.pending_events.lock().unwrap().push(events::Event::PaymentClaimed { + self.pending_events.lock().unwrap().push_back((events::Event::PaymentClaimed { payment_hash, purpose, amount_msat, receiver_node_id: Some(receiver_node_id), - }); + }, None)); } }, MonitorUpdateCompletionAction::EmitEvent { event } => { - self.pending_events.lock().unwrap().push(event); + self.pending_events.lock().unwrap().push_back((event, None)); }, } } @@ -4715,15 +4748,13 @@ where }); } else { let mut pending_events = self.pending_events.lock().unwrap(); - pending_events.push( - events::Event::OpenChannelRequest { - temporary_channel_id: msg.temporary_channel_id.clone(), - counterparty_node_id: counterparty_node_id.clone(), - funding_satoshis: msg.funding_satoshis, - push_msat: msg.push_msat, - channel_type: channel.get_channel_type().clone(), - } - ); + pending_events.push_back((events::Event::OpenChannelRequest { + temporary_channel_id: msg.temporary_channel_id.clone(), + counterparty_node_id: counterparty_node_id.clone(), + funding_satoshis: msg.funding_satoshis, + push_msat: msg.push_msat, + channel_type: channel.get_channel_type().clone(), + }, None)); } entry.insert(channel); @@ -4751,13 +4782,13 @@ where } }; let mut pending_events = self.pending_events.lock().unwrap(); - pending_events.push(events::Event::FundingGenerationReady { + pending_events.push_back((events::Event::FundingGenerationReady { temporary_channel_id: msg.temporary_channel_id, counterparty_node_id: *counterparty_node_id, channel_value_satoshis: value, output_script, user_channel_id: user_id, - }); + }, None)); Ok(()) } @@ -5131,11 +5162,13 @@ where match peer_state.channel_by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { let funding_txo = chan.get().get_funding_txo(); - let monitor_update = try_chan_entry!(self, chan.get_mut().commitment_signed(&msg, &self.logger), chan); - let update_res = self.chain_monitor.update_channel(funding_txo.unwrap(), monitor_update); - let update_id = monitor_update.update_id; - handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, - peer_state, per_peer_state, chan) + let monitor_update_opt = try_chan_entry!(self, chan.get_mut().commitment_signed(&msg, &self.logger), chan); + if let Some(monitor_update) = monitor_update_opt { + let update_res = self.chain_monitor.update_channel(funding_txo.unwrap(), monitor_update); + let update_id = monitor_update.update_id; + handle_new_monitor_update!(self, update_res, update_id, peer_state_lock, + peer_state, per_peer_state, chan) + } else { Ok(()) } }, hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) } @@ -5145,7 +5178,7 @@ where fn forward_htlcs(&self, per_source_pending_forwards: &mut [(u64, OutPoint, u128, Vec<(PendingHTLCInfo, u64)>)]) { for &mut (prev_short_channel_id, prev_funding_outpoint, prev_user_channel_id, ref mut pending_forwards) in per_source_pending_forwards { let mut push_forward_event = false; - let mut new_intercept_events = Vec::new(); + let mut new_intercept_events = VecDeque::new(); let mut failed_intercept_forwards = Vec::new(); if !pending_forwards.is_empty() { for (forward_info, prev_htlc_id) in pending_forwards.drain(..) { @@ -5172,13 +5205,13 @@ where let mut pending_intercepts = self.pending_intercepted_htlcs.lock().unwrap(); match pending_intercepts.entry(intercept_id) { hash_map::Entry::Vacant(entry) => { - new_intercept_events.push(events::Event::HTLCIntercepted { + new_intercept_events.push_back((events::Event::HTLCIntercepted { requested_next_hop_scid: scid, payment_hash: forward_info.payment_hash, inbound_amount_msat: forward_info.incoming_amt_msat.unwrap(), expected_outbound_amount_msat: forward_info.outgoing_amt_msat, intercept_id - }); + }, None)); entry.insert(PendingAddHTLCInfo { prev_short_channel_id, prev_funding_outpoint, prev_htlc_id, prev_user_channel_id, forward_info }); }, @@ -5228,13 +5261,13 @@ where fn push_pending_forwards_ev(&self) { let mut pending_events = self.pending_events.lock().unwrap(); let forward_ev_exists = pending_events.iter() - .find(|ev| if let events::Event::PendingHTLCsForwardable { .. } = ev { true } else { false }) + .find(|(ev, _)| if let events::Event::PendingHTLCsForwardable { .. } = ev { true } else { false }) .is_some(); if !forward_ev_exists { - pending_events.push(events::Event::PendingHTLCsForwardable { + pending_events.push_back((events::Event::PendingHTLCsForwardable { time_forwardable: Duration::from_millis(MIN_HTLC_RELAY_HOLDING_CELL_MILLIS), - }); + }, None)); } } @@ -5250,11 +5283,13 @@ where match peer_state.channel_by_id.entry(msg.channel_id) { hash_map::Entry::Occupied(mut chan) => { let funding_txo = chan.get().get_funding_txo(); - let (htlcs_to_fail, monitor_update) = try_chan_entry!(self, chan.get_mut().revoke_and_ack(&msg, &self.logger), chan); - let update_res = self.chain_monitor.update_channel(funding_txo.unwrap(), monitor_update); - let update_id = monitor_update.update_id; - let res = handle_new_monitor_update!(self, update_res, update_id, - peer_state_lock, peer_state, per_peer_state, chan); + let (htlcs_to_fail, monitor_update_opt) = try_chan_entry!(self, chan.get_mut().revoke_and_ack(&msg, &self.logger), chan); + let res = if let Some(monitor_update) = monitor_update_opt { + let update_res = self.chain_monitor.update_channel(funding_txo.unwrap(), monitor_update); + let update_id = monitor_update.update_id; + handle_new_monitor_update!(self, update_res, update_id, + peer_state_lock, peer_state, per_peer_state, chan) + } else { Ok(()) }; (htlcs_to_fail, res) }, hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id)) @@ -5883,13 +5918,13 @@ where #[cfg(feature = "_test_utils")] pub fn push_pending_event(&self, event: events::Event) { let mut events = self.pending_events.lock().unwrap(); - events.push(event); + events.push_back((event, None)); } #[cfg(test)] pub fn pop_pending_event(&self) -> Option { let mut events = self.pending_events.lock().unwrap(); - if events.is_empty() { None } else { Some(events.remove(0)) } + events.pop_front().map(|(e, _)| e) } #[cfg(test)] @@ -5902,6 +5937,72 @@ where self.pending_outbound_payments.clear_pending_payments() } + fn handle_monitor_update_release(&self, counterparty_node_id: PublicKey, channel_funding_outpoint: OutPoint) { + let mut errors = Vec::new(); + loop { + let per_peer_state = self.per_peer_state.read().unwrap(); + if let Some(peer_state_mtx) = per_peer_state.get(&counterparty_node_id) { + let mut peer_state_lck = peer_state_mtx.lock().unwrap(); + let peer_state = &mut *peer_state_lck; + if self.pending_events.lock().unwrap().iter() + .any(|(_ev, action_opt)| action_opt == &Some(EventCompletionAction::ReleaseRAAChannelMonitorUpdate { + channel_funding_outpoint, counterparty_node_id + })) + { + // Check that, while holding the peer lock, we don't have another event + // blocking any monitor updates for this channel. If we do, let those + // events be the ones that ultimately release the monitor update(s). + log_trace!(self.logger, "Delaying monitor unlock for channel {} as another event is pending", + log_bytes!(&channel_funding_outpoint.to_channel_id()[..])); + break; + } + if let hash_map::Entry::Occupied(mut chan) = peer_state.channel_by_id.entry(channel_funding_outpoint.to_channel_id()) { + debug_assert_eq!(chan.get().get_funding_txo().unwrap(), channel_funding_outpoint); + if let Some((monitor_update, further_update_exists)) = chan.get_mut().unblock_next_blocked_monitor_update() { + log_debug!(self.logger, "Unlocking monitor updating for channel {} and updating monitor", + log_bytes!(&channel_funding_outpoint.to_channel_id()[..])); + let update_res = self.chain_monitor.update_channel(channel_funding_outpoint, monitor_update); + let update_id = monitor_update.update_id; + if let Err(e) = handle_new_monitor_update!(self, update_res, update_id, + peer_state_lck, peer_state, per_peer_state, chan) + { + errors.push((e, counterparty_node_id)); + } + if further_update_exists { + // If there are more `ChannelMonitorUpdate`s to process, restart at the + // top of the loop. + continue; + } + } else { + log_trace!(self.logger, "Unlocked monitor updating for channel {} without monitors to update", + log_bytes!(&channel_funding_outpoint.to_channel_id()[..])); + } + } + } else { + log_debug!(self.logger, + "Got a release post-RAA monitor update for peer {} but the channel is gone", + log_pubkey!(counterparty_node_id)); + } + break; + } + for (err, counterparty_node_id) in errors { + let res = Err::<(), _>(err); + let _ = handle_error!(self, res, counterparty_node_id); + } + } + + fn handle_post_event_actions(&self, actions: Vec) { + for action in actions { + match action { + EventCompletionAction::ReleaseRAAChannelMonitorUpdate { + channel_funding_outpoint, counterparty_node_id + } => { + self.handle_monitor_update_release(counterparty_node_id, channel_funding_outpoint); + } + } + } + } + /// Processes any events asynchronously in the order they were generated since the last call /// using the given event handler. /// @@ -7226,9 +7327,19 @@ where } let events = self.pending_events.lock().unwrap(); - (events.len() as u64).write(writer)?; - for event in events.iter() { - event.write(writer)?; + // LDK versions prior to 0.0.115 don't support post-event actions, thus if there's no + // actions at all, skip writing the required TLV. Otherwise, pre-0.0.115 versions will + // refuse to read the new ChannelManager. + let events_not_backwards_compatible = events.iter().any(|(_, action)| action.is_some()); + if events_not_backwards_compatible { + // If we're gonna write a even TLV that will overwrite our events anyway we might as + // well save the space and not write any events here. + 0u64.write(writer)?; + } else { + (events.len() as u64).write(writer)?; + for (event, _) in events.iter() { + event.write(writer)?; + } } let background_events = self.pending_background_events.lock().unwrap(); @@ -7309,6 +7420,7 @@ where (5, self.our_network_pubkey, required), (6, monitor_update_blocked_actions_per_peer, option), (7, self.fake_scid_rand_bytes, required), + (8, if events_not_backwards_compatible { Some(&*events) } else { None }, option), (9, htlc_purposes, vec_type), (11, self.probing_cookie_secret, required), (13, htlc_onion_fields, optional_vec), @@ -7318,6 +7430,47 @@ where } } +impl Writeable for VecDeque<(Event, Option)> { + fn write(&self, w: &mut W) -> Result<(), io::Error> { + (self.len() as u64).write(w)?; + for (event, action) in self.iter() { + event.write(w)?; + action.write(w)?; + #[cfg(debug_assertions)] { + // Events are MaybeReadable, in some cases indicating that they shouldn't actually + // be persisted and are regenerated on restart. However, if such an event has a + // post-event-handling action we'll write nothing for the event and would have to + // either forget the action or fail on deserialization (which we do below). Thus, + // check that the event is sane here. + let event_encoded = event.encode(); + let event_read: Option = + MaybeReadable::read(&mut &event_encoded[..]).unwrap(); + if action.is_some() { assert!(event_read.is_some()); } + } + } + Ok(()) + } +} +impl Readable for VecDeque<(Event, Option)> { + fn read(reader: &mut R) -> Result { + let len: u64 = Readable::read(reader)?; + const MAX_ALLOC_SIZE: u64 = 1024 * 16; + let mut events: Self = VecDeque::with_capacity(cmp::min( + MAX_ALLOC_SIZE/mem::size_of::<(events::Event, Option)>() as u64, + len) as usize); + for _ in 0..len { + let ev_opt = MaybeReadable::read(reader)?; + let action = Readable::read(reader)?; + if let Some(ev) = ev_opt { + events.push_back((ev, action)); + } else if action.is_some() { + return Err(DecodeError::InvalidValue); + } + } + Ok(events) + } +} + /// Arguments for the creation of a ChannelManager that are not deserialized. /// /// At a high-level, the process for deserializing a ChannelManager and resuming normal operation @@ -7484,7 +7637,7 @@ where let mut peer_channels: HashMap::Signer>>> = HashMap::with_capacity(cmp::min(channel_count as usize, 128)); let mut id_to_peer = HashMap::with_capacity(cmp::min(channel_count as usize, 128)); let mut short_to_chan_info = HashMap::with_capacity(cmp::min(channel_count as usize, 128)); - let mut channel_closures = Vec::new(); + let mut channel_closures = VecDeque::new(); let mut pending_background_events = Vec::new(); for _ in 0..channel_count { let mut channel: Channel<::Signer> = Channel::read(reader, ( @@ -7493,14 +7646,11 @@ where let funding_txo = channel.get_funding_txo().ok_or(DecodeError::InvalidValue)?; funding_txo_set.insert(funding_txo.clone()); if let Some(ref mut monitor) = args.channel_monitors.get_mut(&funding_txo) { - if channel.get_cur_holder_commitment_transaction_number() < monitor.get_cur_holder_commitment_number() || - channel.get_revoked_counterparty_commitment_transaction_number() < monitor.get_min_seen_secret() || - channel.get_cur_counterparty_commitment_transaction_number() < monitor.get_cur_counterparty_commitment_number() || - channel.get_latest_monitor_update_id() > monitor.get_latest_update_id() { + if channel.get_latest_complete_monitor_update_id() > monitor.get_latest_update_id() { // If the channel is ahead of the monitor, return InvalidValue: log_error!(args.logger, "A ChannelMonitor is stale compared to the current ChannelManager! This indicates a potentially-critical violation of the chain::Watch API!"); log_error!(args.logger, " The ChannelMonitor for channel {} is at update_id {} but the ChannelManager is at update_id {}.", - log_bytes!(channel.channel_id()), monitor.get_latest_update_id(), channel.get_latest_monitor_update_id()); + log_bytes!(channel.channel_id()), monitor.get_latest_update_id(), channel.get_latest_complete_monitor_update_id()); log_error!(args.logger, " The chain::Watch API *requires* that monitors are persisted durably before returning,"); log_error!(args.logger, " client applications must ensure that ChannelMonitor data is always available and the latest to avoid funds loss!"); log_error!(args.logger, " Without the latest ChannelMonitor we cannot continue without risking funds."); @@ -7520,11 +7670,11 @@ where pending_background_events.push(BackgroundEvent::ClosingMonitorUpdate(monitor_update)); } failed_htlcs.append(&mut new_failed_htlcs); - channel_closures.push(events::Event::ChannelClosed { + channel_closures.push_back((events::Event::ChannelClosed { channel_id: channel.channel_id(), user_channel_id: channel.get_user_id(), reason: ClosureReason::OutdatedChannelManager - }); + }, None)); for (channel_htlc_source, payment_hash) in channel.inflight_htlc_sources() { let mut found_htlc = false; for (monitor_htlc_source, _) in monitor.get_all_current_outbound_htlcs() { @@ -7569,11 +7719,11 @@ where // was in-progress, we never broadcasted the funding transaction and can still // safely discard the channel. let _ = channel.force_shutdown(false); - channel_closures.push(events::Event::ChannelClosed { + channel_closures.push_back((events::Event::ChannelClosed { channel_id: channel.channel_id(), user_channel_id: channel.get_user_id(), reason: ClosureReason::DisconnectedPeer, - }); + }, None)); } else { log_error!(args.logger, "Missing ChannelMonitor for channel {} needed by ChannelManager.", log_bytes!(channel.channel_id())); log_error!(args.logger, " The chain::Watch API *requires* that monitors are persisted durably before returning,"); @@ -7634,10 +7784,11 @@ where } let event_count: u64 = Readable::read(reader)?; - let mut pending_events_read: Vec = Vec::with_capacity(cmp::min(event_count as usize, MAX_ALLOC_SIZE/mem::size_of::())); + let mut pending_events_read: VecDeque<(events::Event, Option)> = + VecDeque::with_capacity(cmp::min(event_count as usize, MAX_ALLOC_SIZE/mem::size_of::<(events::Event, Option)>())); for _ in 0..event_count { match MaybeReadable::read(reader)? { - Some(event) => pending_events_read.push(event), + Some(event) => pending_events_read.push_back((event, None)), None => continue, } } @@ -7693,6 +7844,7 @@ where let mut claimable_htlc_onion_fields = None; let mut pending_claiming_payments = Some(HashMap::new()); let mut monitor_update_blocked_actions_per_peer = Some(Vec::new()); + let mut events_override = None; read_tlv_fields!(reader, { (1, pending_outbound_payments_no_retry, option), (2, pending_intercepted_htlcs, option), @@ -7701,6 +7853,7 @@ where (5, received_network_pubkey, option), (6, monitor_update_blocked_actions_per_peer, option), (7, fake_scid_rand_bytes, option), + (8, events_override, option), (9, claimable_htlc_purposes, vec_type), (11, probing_cookie_secret, option), (13, claimable_htlc_onion_fields, optional_vec), @@ -7713,6 +7866,10 @@ where probing_cookie_secret = Some(args.entropy_source.get_secure_random_bytes()); } + if let Some(events) = events_override { + pending_events_read = events; + } + if !channel_closures.is_empty() { pending_events_read.append(&mut channel_closures); } @@ -7808,7 +7965,7 @@ where if pending_forward_matches_htlc(&htlc_info) { log_info!(args.logger, "Removing pending intercepted HTLC with hash {} as it was forwarded to the closed channel {}", log_bytes!(htlc.payment_hash.0), log_bytes!(monitor.get_funding_txo().0.to_channel_id())); - pending_events_read.retain(|event| { + pending_events_read.retain(|(event, _)| { if let Event::HTLCIntercepted { intercept_id: ev_id, .. } = event { intercepted_id != ev_id } else { true } @@ -7844,9 +8001,9 @@ where // shut down before the timer hit. Either way, set the time_forwardable to a small // constant as enough time has likely passed that we should simply handle the forwards // now, or at least after the user gets a chance to reconnect to our peers. - pending_events_read.push(events::Event::PendingHTLCsForwardable { + pending_events_read.push_back((events::Event::PendingHTLCsForwardable { time_forwardable: Duration::from_secs(2), - }); + }, None)); } let inbound_pmt_key_material = args.node_signer.get_inbound_payment_key_material(); @@ -8000,12 +8157,12 @@ where previous_hop_monitor.provide_payment_preimage(&payment_hash, &payment_preimage, &args.tx_broadcaster, &bounded_fee_estimator, &args.logger); } } - pending_events_read.push(events::Event::PaymentClaimed { + pending_events_read.push_back((events::Event::PaymentClaimed { receiver_node_id, payment_hash, purpose: payment.purpose, amount_msat: claimable_amt_msat, - }); + }, None)); } } } diff --git a/lightning/src/ln/functional_tests.rs b/lightning/src/ln/functional_tests.rs index 2b8c8b8e341..8d08be05701 100644 --- a/lightning/src/ln/functional_tests.rs +++ b/lightning/src/ln/functional_tests.rs @@ -8449,7 +8449,7 @@ fn test_update_err_monitor_lockdown() { let mut node_0_per_peer_lock; let mut node_0_peer_state_lock; let mut channel = get_channel_ref!(nodes[0], nodes[1], node_0_per_peer_lock, node_0_peer_state_lock, chan_1.2); - if let Ok(update) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].logger) { + if let Ok(Some(update)) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].logger) { assert_eq!(watchtower.chain_monitor.update_channel(outpoint, &update), ChannelMonitorUpdateStatus::PermanentFailure); assert_eq!(nodes[0].chain_monitor.update_channel(outpoint, &update), ChannelMonitorUpdateStatus::Completed); } else { assert!(false); } @@ -8549,7 +8549,7 @@ fn test_concurrent_monitor_claim() { let mut node_0_per_peer_lock; let mut node_0_peer_state_lock; let mut channel = get_channel_ref!(nodes[0], nodes[1], node_0_per_peer_lock, node_0_peer_state_lock, chan_1.2); - if let Ok(update) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].logger) { + if let Ok(Some(update)) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].logger) { // Watchtower Alice should already have seen the block and reject the update assert_eq!(watchtower_alice.chain_monitor.update_channel(outpoint, &update), ChannelMonitorUpdateStatus::PermanentFailure); assert_eq!(watchtower_bob.chain_monitor.update_channel(outpoint, &update), ChannelMonitorUpdateStatus::Completed); diff --git a/lightning/src/ln/outbound_payment.rs b/lightning/src/ln/outbound_payment.rs index 5270ed35d88..f363a6d60ff 100644 --- a/lightning/src/ln/outbound_payment.rs +++ b/lightning/src/ln/outbound_payment.rs @@ -16,7 +16,7 @@ use bitcoin::secp256k1::{self, Secp256k1, SecretKey}; use crate::chain::keysinterface::{EntropySource, NodeSigner, Recipient}; use crate::events::{self, PaymentFailureReason}; use crate::ln::{PaymentHash, PaymentPreimage, PaymentSecret}; -use crate::ln::channelmanager::{ChannelDetails, HTLCSource, IDEMPOTENCY_TIMEOUT_TICKS, PaymentId}; +use crate::ln::channelmanager::{ChannelDetails, EventCompletionAction, HTLCSource, IDEMPOTENCY_TIMEOUT_TICKS, PaymentId}; use crate::ln::onion_utils::HTLCFailReason; use crate::routing::router::{InFlightHtlcs, Path, PaymentParameters, Route, RouteParameters, Router}; use crate::util::errors::APIError; @@ -487,7 +487,7 @@ impl OutboundPayments { retry_strategy: Retry, route_params: RouteParameters, router: &R, first_hops: Vec, compute_inflight_htlcs: IH, entropy_source: &ES, node_signer: &NS, best_block_height: u32, logger: &L, - pending_events: &Mutex>, send_payment_along_path: SP, + pending_events: &Mutex)>>, send_payment_along_path: SP, ) -> Result<(), RetryableSendFailure> where R::Target: Router, @@ -525,7 +525,7 @@ impl OutboundPayments { payment_id: PaymentId, retry_strategy: Retry, route_params: RouteParameters, router: &R, first_hops: Vec, inflight_htlcs: IH, entropy_source: &ES, node_signer: &NS, best_block_height: u32, logger: &L, - pending_events: &Mutex>, send_payment_along_path: SP + pending_events: &Mutex)>>, send_payment_along_path: SP ) -> Result where R::Target: Router, @@ -575,7 +575,8 @@ impl OutboundPayments { pub(super) fn check_retry_payments( &self, router: &R, first_hops: FH, inflight_htlcs: IH, entropy_source: &ES, node_signer: &NS, - best_block_height: u32, pending_events: &Mutex>, logger: &L, + best_block_height: u32, + pending_events: &Mutex)>>, logger: &L, send_payment_along_path: SP, ) where @@ -617,11 +618,11 @@ impl OutboundPayments { if !pmt.is_auto_retryable_now() && pmt.remaining_parts() == 0 { pmt.mark_abandoned(PaymentFailureReason::RetriesExhausted); if let PendingOutboundPayment::Abandoned { payment_hash, reason, .. } = pmt { - pending_events.lock().unwrap().push(events::Event::PaymentFailed { + pending_events.lock().unwrap().push_back((events::Event::PaymentFailed { payment_id: *pmt_id, payment_hash: *payment_hash, reason: *reason, - }); + }, None)); retain = false; } } @@ -645,7 +646,7 @@ impl OutboundPayments { keysend_preimage: Option, retry_strategy: Retry, route_params: RouteParameters, router: &R, first_hops: Vec, inflight_htlcs: IH, entropy_source: &ES, node_signer: &NS, best_block_height: u32, logger: &L, - pending_events: &Mutex>, send_payment_along_path: SP, + pending_events: &Mutex)>>, send_payment_along_path: SP, ) -> Result<(), RetryableSendFailure> where R::Target: Router, @@ -686,7 +687,7 @@ impl OutboundPayments { &self, payment_hash: PaymentHash, payment_id: PaymentId, route_params: RouteParameters, router: &R, first_hops: Vec, inflight_htlcs: &IH, entropy_source: &ES, node_signer: &NS, best_block_height: u32, logger: &L, - pending_events: &Mutex>, send_payment_along_path: &SP, + pending_events: &Mutex)>>, send_payment_along_path: &SP, ) where R::Target: Router, @@ -736,11 +737,11 @@ impl OutboundPayments { $payment.get_mut().mark_abandoned($reason); if let PendingOutboundPayment::Abandoned { reason, .. } = $payment.get() { if $payment.get().remaining_parts() == 0 { - pending_events.lock().unwrap().push(events::Event::PaymentFailed { + pending_events.lock().unwrap().push_back((events::Event::PaymentFailed { payment_id, payment_hash, reason: *reason, - }); + }, None)); $payment.remove(); } } @@ -808,7 +809,7 @@ impl OutboundPayments { &self, err: PaymentSendFailure, payment_id: PaymentId, payment_hash: PaymentHash, route: Route, mut route_params: RouteParameters, router: &R, first_hops: Vec, inflight_htlcs: &IH, entropy_source: &ES, node_signer: &NS, best_block_height: u32, logger: &L, - pending_events: &Mutex>, send_payment_along_path: &SP, + pending_events: &Mutex)>>, send_payment_along_path: &SP, ) where R::Target: Router, @@ -851,7 +852,8 @@ impl OutboundPayments { fn push_path_failed_evs_and_scids>, L: Deref>( payment_id: PaymentId, payment_hash: PaymentHash, route_params: &mut RouteParameters, - paths: Vec, path_results: I, logger: &L, pending_events: &Mutex> + paths: Vec, path_results: I, logger: &L, + pending_events: &Mutex)>>, ) where L::Target: Logger { let mut events = pending_events.lock().unwrap(); debug_assert_eq!(paths.len(), path_results.len()); @@ -865,7 +867,7 @@ impl OutboundPayments { failed_scid = Some(scid); route_params.payment_params.previously_failed_channels.push(scid); } - events.push(events::Event::PaymentPathFailed { + events.push_back((events::Event::PaymentPathFailed { payment_id: Some(payment_id), payment_hash, payment_failed_permanently: false, @@ -876,7 +878,7 @@ impl OutboundPayments { error_code: None, #[cfg(test)] error_data: None, - }); + }, None)); } } } @@ -1112,7 +1114,9 @@ impl OutboundPayments { pub(super) fn claim_htlc( &self, payment_id: PaymentId, payment_preimage: PaymentPreimage, session_priv: SecretKey, - path: Path, from_onchain: bool, pending_events: &Mutex>, logger: &L + path: Path, from_onchain: bool, + pending_events: &Mutex)>>, + logger: &L, ) where L::Target: Logger { let mut session_priv_bytes = [0; 32]; session_priv_bytes.copy_from_slice(&session_priv[..]); @@ -1122,14 +1126,12 @@ impl OutboundPayments { if !payment.get().is_fulfilled() { let payment_hash = PaymentHash(Sha256::hash(&payment_preimage.0).into_inner()); let fee_paid_msat = payment.get().get_pending_fee_msat(); - pending_events.push( - events::Event::PaymentSent { - payment_id: Some(payment_id), - payment_preimage, - payment_hash, - fee_paid_msat, - } - ); + pending_events.push_back((events::Event::PaymentSent { + payment_id: Some(payment_id), + payment_preimage, + payment_hash, + fee_paid_msat, + }, None)); payment.get_mut().mark_fulfilled(); } @@ -1142,13 +1144,11 @@ impl OutboundPayments { // irrevocably fulfilled. if payment.get_mut().remove(&session_priv_bytes, Some(&path)) { let payment_hash = Some(PaymentHash(Sha256::hash(&payment_preimage.0).into_inner())); - pending_events.push( - events::Event::PaymentPathSuccessful { - payment_id, - payment_hash, - path, - } - ); + pending_events.push_back((events::Event::PaymentPathSuccessful { + payment_id, + payment_hash, + path, + }, None)); } } } else { @@ -1156,7 +1156,9 @@ impl OutboundPayments { } } - pub(super) fn finalize_claims(&self, sources: Vec, pending_events: &Mutex>) { + pub(super) fn finalize_claims(&self, sources: Vec, + pending_events: &Mutex)>>) + { let mut outbounds = self.pending_outbound_payments.lock().unwrap(); let mut pending_events = pending_events.lock().unwrap(); for source in sources { @@ -1166,20 +1168,20 @@ impl OutboundPayments { if let hash_map::Entry::Occupied(mut payment) = outbounds.entry(payment_id) { assert!(payment.get().is_fulfilled()); if payment.get_mut().remove(&session_priv_bytes, None) { - pending_events.push( - events::Event::PaymentPathSuccessful { - payment_id, - payment_hash: payment.get().payment_hash(), - path, - } - ); + pending_events.push_back((events::Event::PaymentPathSuccessful { + payment_id, + payment_hash: payment.get().payment_hash(), + path, + }, None)); } } } } } - pub(super) fn remove_stale_resolved_payments(&self, pending_events: &Mutex>) { + pub(super) fn remove_stale_resolved_payments(&self, + pending_events: &Mutex)>>) + { // If an outbound payment was completed, and no pending HTLCs remain, we should remove it // from the map. However, if we did that immediately when the last payment HTLC is claimed, // this could race the user making a duplicate send_payment call and our idempotency @@ -1193,7 +1195,7 @@ impl OutboundPayments { if let PendingOutboundPayment::Fulfilled { session_privs, timer_ticks_without_htlcs, .. } = payment { let mut no_remaining_entries = session_privs.is_empty(); if no_remaining_entries { - for ev in pending_events.iter() { + for (ev, _) in pending_events.iter() { match ev { events::Event::PaymentSent { payment_id: Some(ev_payment_id), .. } | events::Event::PaymentPathSuccessful { payment_id: ev_payment_id, .. } | @@ -1221,8 +1223,9 @@ impl OutboundPayments { // Returns a bool indicating whether a PendingHTLCsForwardable event should be generated. pub(super) fn fail_htlc( &self, source: &HTLCSource, payment_hash: &PaymentHash, onion_error: &HTLCFailReason, - path: &Path, session_priv: &SecretKey, payment_id: &PaymentId, probing_cookie_secret: [u8; 32], - secp_ctx: &Secp256k1, pending_events: &Mutex>, logger: &L + path: &Path, session_priv: &SecretKey, payment_id: &PaymentId, + probing_cookie_secret: [u8; 32], secp_ctx: &Secp256k1, + pending_events: &Mutex)>>, logger: &L, ) -> bool where L::Target: Logger { #[cfg(test)] let (network_update, short_channel_id, payment_retryable, onion_error_code, onion_error_data) = onion_error.decode_onion_failure(secp_ctx, logger, &source); @@ -1334,24 +1337,25 @@ impl OutboundPayments { } }; let mut pending_events = pending_events.lock().unwrap(); - pending_events.push(path_failure); - if let Some(ev) = full_failure_ev { pending_events.push(ev); } + pending_events.push_back((path_failure, None)); + if let Some(ev) = full_failure_ev { pending_events.push_back((ev, None)); } pending_retry_ev } pub(super) fn abandon_payment( - &self, payment_id: PaymentId, reason: PaymentFailureReason, pending_events: &Mutex> + &self, payment_id: PaymentId, reason: PaymentFailureReason, + pending_events: &Mutex)>> ) { let mut outbounds = self.pending_outbound_payments.lock().unwrap(); if let hash_map::Entry::Occupied(mut payment) = outbounds.entry(payment_id) { payment.get_mut().mark_abandoned(reason); if let PendingOutboundPayment::Abandoned { payment_hash, reason, .. } = payment.get() { if payment.get().remaining_parts() == 0 { - pending_events.lock().unwrap().push(events::Event::PaymentFailed { + pending_events.lock().unwrap().push_back((events::Event::PaymentFailed { payment_id, payment_hash: *payment_hash, reason: *reason, - }); + }, None)); payment.remove(); } } @@ -1435,6 +1439,8 @@ mod tests { use crate::util::errors::APIError; use crate::util::test_utils; + use alloc::collections::VecDeque; + #[test] #[cfg(feature = "std")] fn fails_paying_after_expiration() { @@ -1460,7 +1466,7 @@ mod tests { payment_params, final_value_msat: 0, }; - let pending_events = Mutex::new(Vec::new()); + let pending_events = Mutex::new(VecDeque::new()); if on_retry { outbound_payments.add_new_pending_payment(PaymentHash([0; 32]), RecipientOnionFields::spontaneous_empty(), PaymentId([0; 32]), None, &Route { paths: vec![], payment_params: None }, @@ -1472,7 +1478,7 @@ mod tests { &pending_events, &|_, _, _, _, _, _, _, _| Ok(())); let events = pending_events.lock().unwrap(); assert_eq!(events.len(), 1); - if let Event::PaymentFailed { ref reason, .. } = events[0] { + if let Event::PaymentFailed { ref reason, .. } = events[0].0 { assert_eq!(reason.unwrap(), PaymentFailureReason::PaymentExpired); } else { panic!("Unexpected event"); } } else { @@ -1508,7 +1514,7 @@ mod tests { router.expect_find_route(route_params.clone(), Err(LightningError { err: String::new(), action: ErrorAction::IgnoreError })); - let pending_events = Mutex::new(Vec::new()); + let pending_events = Mutex::new(VecDeque::new()); if on_retry { outbound_payments.add_new_pending_payment(PaymentHash([0; 32]), RecipientOnionFields::spontaneous_empty(), PaymentId([0; 32]), None, &Route { paths: vec![], payment_params: None }, @@ -1520,7 +1526,7 @@ mod tests { &pending_events, &|_, _, _, _, _, _, _, _| Ok(())); let events = pending_events.lock().unwrap(); assert_eq!(events.len(), 1); - if let Event::PaymentFailed { .. } = events[0] { } else { panic!("Unexpected event"); } + if let Event::PaymentFailed { .. } = events[0].0 { } else { panic!("Unexpected event"); } } else { let err = outbound_payments.send_payment( PaymentHash([0; 32]), RecipientOnionFields::spontaneous_empty(), PaymentId([0; 32]), @@ -1570,7 +1576,7 @@ mod tests { // Ensure that a ChannelUnavailable error will result in blaming an scid in the // PaymentPathFailed event. - let pending_events = Mutex::new(Vec::new()); + let pending_events = Mutex::new(VecDeque::new()); outbound_payments.send_payment( PaymentHash([0; 32]), RecipientOnionFields::spontaneous_empty(), PaymentId([0; 32]), Retry::Attempts(0), route_params.clone(), &&router, vec![], || InFlightHtlcs::new(), @@ -1581,11 +1587,11 @@ mod tests { assert_eq!(events.len(), 2); if let Event::PaymentPathFailed { short_channel_id, - failure: PathFailure::InitialSend { err: APIError::ChannelUnavailable { .. }}, .. } = events[0] + failure: PathFailure::InitialSend { err: APIError::ChannelUnavailable { .. }}, .. } = events[0].0 { assert_eq!(short_channel_id, Some(failed_scid)); } else { panic!("Unexpected event"); } - if let Event::PaymentFailed { .. } = events[1] { } else { panic!("Unexpected event"); } + if let Event::PaymentFailed { .. } = events[1].0 { } else { panic!("Unexpected event"); } events.clear(); core::mem::drop(events); @@ -1608,10 +1614,10 @@ mod tests { assert_eq!(events.len(), 2); if let Event::PaymentPathFailed { short_channel_id, - failure: PathFailure::InitialSend { err: APIError::APIMisuseError { .. }}, .. } = events[0] + failure: PathFailure::InitialSend { err: APIError::APIMisuseError { .. }}, .. } = events[0].0 { assert_eq!(short_channel_id, None); } else { panic!("Unexpected event"); } - if let Event::PaymentFailed { .. } = events[1] { } else { panic!("Unexpected event"); } + if let Event::PaymentFailed { .. } = events[1].0 { } else { panic!("Unexpected event"); } } }