Skip to content

Free holding cell on monitor-updating-restored when there's no upd #755

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 22 additions & 5 deletions lightning/src/ln/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2605,7 +2605,10 @@ impl<ChanSigner: ChannelKeys> Channel<ChanSigner> {
/// Indicates that the latest ChannelMonitor update has been committed by the client
/// successfully and we should restore normal operation. Returns messages which should be sent
/// to the remote side.
pub fn monitor_updating_restored<L: Deref>(&mut self, logger: &L) -> (Option<msgs::RevokeAndACK>, Option<msgs::CommitmentUpdate>, RAACommitmentOrder, Vec<(PendingHTLCInfo, u64)>, Vec<(HTLCSource, PaymentHash, HTLCFailReason)>, bool, Option<msgs::FundingLocked>) where L::Target: Logger {
pub fn monitor_updating_restored<L: Deref>(&mut self, logger: &L) -> (
Option<msgs::RevokeAndACK>, Option<msgs::CommitmentUpdate>, RAACommitmentOrder, Option<ChannelMonitorUpdate>,
Vec<(PendingHTLCInfo, u64)>, Vec<(HTLCSource, PaymentHash, HTLCFailReason)>, Vec<(HTLCSource, PaymentHash)>,
bool, Option<msgs::FundingLocked>) where L::Target: Logger {
assert_eq!(self.channel_state & ChannelState::MonitorUpdateFailed as u32, ChannelState::MonitorUpdateFailed as u32);
self.channel_state &= !(ChannelState::MonitorUpdateFailed as u32);

Expand Down Expand Up @@ -2635,25 +2638,39 @@ impl<ChanSigner: ChannelKeys> Channel<ChanSigner> {
if self.channel_state & (ChannelState::PeerDisconnected as u32) != 0 {
self.monitor_pending_revoke_and_ack = false;
self.monitor_pending_commitment_signed = false;
return (None, None, RAACommitmentOrder::RevokeAndACKFirst, forwards, failures, needs_broadcast_safe, funding_locked);
return (None, None, RAACommitmentOrder::RevokeAndACKFirst, None, forwards, failures, Vec::new(), needs_broadcast_safe, funding_locked);
}

let raa = if self.monitor_pending_revoke_and_ack {
Some(self.get_last_revoke_and_ack())
} else { None };
let commitment_update = if self.monitor_pending_commitment_signed {
let mut commitment_update = if self.monitor_pending_commitment_signed {
Some(self.get_last_commitment_update(logger))
} else { None };

let mut order = self.resend_order.clone();
self.monitor_pending_revoke_and_ack = false;
self.monitor_pending_commitment_signed = false;
let order = self.resend_order.clone();

let mut htlcs_failed_to_forward = Vec::new();
let mut chanmon_update = None;
if commitment_update.is_none() && self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32) == 0 {
order = RAACommitmentOrder::RevokeAndACKFirst;

let (update_opt, mut failed_htlcs) = self.free_holding_cell_htlcs(logger).unwrap();
htlcs_failed_to_forward.append(&mut failed_htlcs);
if let Some((com_update, mon_update)) = update_opt {
commitment_update = Some(com_update);
chanmon_update = Some(mon_update);
}
}

log_trace!(logger, "Restored monitor updating resulting in {}{} commitment update and {} RAA, with {} first",
if needs_broadcast_safe { "a funding broadcast safe, " } else { "" },
if commitment_update.is_some() { "a" } else { "no" },
if raa.is_some() { "an" } else { "no" },
match order { RAACommitmentOrder::CommitmentFirst => "commitment", RAACommitmentOrder::RevokeAndACKFirst => "RAA"});
(raa, commitment_update, order, forwards, failures, needs_broadcast_safe, funding_locked)
(raa, commitment_update, order, chanmon_update, forwards, failures, htlcs_failed_to_forward, needs_broadcast_safe, funding_locked)
}

pub fn update_fee<F: Deref>(&mut self, fee_estimator: &F, msg: &msgs::UpdateFee) -> Result<(), ChannelError>
Expand Down
74 changes: 46 additions & 28 deletions lightning/src/ln/channelmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2198,6 +2198,10 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
/// ChannelMonitorUpdateErr::TemporaryFailures is fine. The highest_applied_update_id field
/// exists largely only to prevent races between this and concurrent update_monitor calls.
///
/// XXX: Update to note re-entrancy - this is really terrible - the reentrancy only happens in
/// a really rare case making it incredibly likely users will miss it and never hit it in
/// testing.
///
/// Thus, the anticipated use is, at a high level:
/// 1) You register a chain::Watch with this ChannelManager,
/// 2) it stores each update to disk, and begins updating any remote (eg watchtower) copies of
Expand All @@ -2209,42 +2213,53 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
pub fn channel_monitor_updated(&self, funding_txo: &OutPoint, highest_applied_update_id: u64) {
let _consistency_lock = self.total_consistency_lock.read().unwrap();

let mut close_results = Vec::new();
let mut htlc_forwards = Vec::new();
let mut htlc_failures = Vec::new();
let mut pending_events = Vec::new();
let mut htlc_forwards = None;
let mut htlc_failures;
let htlc_forwarding_failures;
let mut pending_event = None;

{
let (counterparty_node_id, res) = loop {
let mut channel_lock = self.channel_state.lock().unwrap();
let channel_state = &mut *channel_lock;
let short_to_id = &mut channel_state.short_to_id;
let pending_msg_events = &mut channel_state.pending_msg_events;
let channel = match channel_state.by_id.get_mut(&funding_txo.to_channel_id()) {
Some(chan) => chan,
None => return,
let mut channel = match channel_state.by_id.entry(funding_txo.to_channel_id()) {
hash_map::Entry::Vacant(_) => return,
hash_map::Entry::Occupied(e) => e,
};
if !channel.is_awaiting_monitor_update() || channel.get_latest_monitor_update_id() != highest_applied_update_id {
if !channel.get().is_awaiting_monitor_update() || channel.get().get_latest_monitor_update_id() != highest_applied_update_id {
return;
}
let counterparty_node_id = channel.get().get_counterparty_node_id();

let (raa, commitment_update, order, pending_forwards, mut pending_failures, needs_broadcast_safe, funding_locked) = channel.monitor_updating_restored(&self.logger);
let (raa, commitment_update, order, chanmon_update, pending_forwards, pending_failures, forwarding_failds, needs_broadcast_safe, funding_locked) = channel.get_mut().monitor_updating_restored(&self.logger);
if !pending_forwards.is_empty() {
htlc_forwards.push((channel.get_short_channel_id().expect("We can't have pending forwards before funding confirmation"), funding_txo.clone(), pending_forwards));
htlc_forwards = Some((channel.get().get_short_channel_id().expect("We can't have pending forwards before funding confirmation"), funding_txo.clone(), pending_forwards));
}
htlc_failures.append(&mut pending_failures);
htlc_failures = pending_failures;
htlc_forwarding_failures = forwarding_failds;

macro_rules! handle_cs { () => {
if let Some(monitor_update) = chanmon_update {
assert!(order == RAACommitmentOrder::RevokeAndACKFirst);
assert!(!needs_broadcast_safe);
assert!(funding_locked.is_none());
assert!(commitment_update.is_some());
if let Err(e) = self.chain_monitor.update_channel(*funding_txo, monitor_update) {
break (counterparty_node_id,
handle_monitor_err!(self, e, channel_state, channel, RAACommitmentOrder::CommitmentFirst, false, true));
}
}
if let Some(update) = commitment_update {
pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
node_id: channel.get_counterparty_node_id(),
node_id: counterparty_node_id,
updates: update,
});
}
} }
macro_rules! handle_raa { () => {
if let Some(revoke_and_ack) = raa {
pending_msg_events.push(events::MessageSendEvent::SendRevokeAndACK {
node_id: channel.get_counterparty_node_id(),
node_id: counterparty_node_id,
msg: revoke_and_ack,
});
}
Expand All @@ -2260,35 +2275,38 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
},
}
if needs_broadcast_safe {
pending_events.push(events::Event::FundingBroadcastSafe {
funding_txo: channel.get_funding_txo().unwrap(),
user_channel_id: channel.get_user_id(),
pending_event = Some(events::Event::FundingBroadcastSafe {
funding_txo: channel.get().get_funding_txo().unwrap(),
user_channel_id: channel.get().get_user_id(),
});
}
if let Some(msg) = funding_locked {
pending_msg_events.push(events::MessageSendEvent::SendFundingLocked {
node_id: channel.get_counterparty_node_id(),
node_id: counterparty_node_id,
msg,
});
if let Some(announcement_sigs) = self.get_announcement_sigs(channel) {
if let Some(announcement_sigs) = self.get_announcement_sigs(channel.get()) {
pending_msg_events.push(events::MessageSendEvent::SendAnnouncementSignatures {
node_id: channel.get_counterparty_node_id(),
node_id: counterparty_node_id,
msg: announcement_sigs,
});
}
short_to_id.insert(channel.get_short_channel_id().unwrap(), channel.channel_id());
channel_state.short_to_id.insert(channel.get().get_short_channel_id().unwrap(), channel.get().channel_id());
}
}
break (counterparty_node_id, Ok(()));
};
let _ = handle_error!(self, res, counterparty_node_id);

self.pending_events.lock().unwrap().append(&mut pending_events);
if let Some(ev) = pending_event {
self.pending_events.lock().unwrap().push(ev);
}

self.fail_holding_cell_htlcs(htlc_forwarding_failures, funding_txo.to_channel_id());
for failure in htlc_failures.drain(..) {
self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), failure.0, &failure.1, failure.2);
}
self.forward_htlcs(&mut htlc_forwards[..]);

for res in close_results.drain(..) {
self.finish_force_close_channel(res);
if let Some(forwards) = htlc_forwards {
self.forward_htlcs(&mut [forwards][..]);
}
}

Expand Down