Skip to content

Setup Support for delaying ChannelMonitorUpdate flight until an Event completes #2111

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lightning/src/ln/chanmon_update_fail_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ fn test_monitor_and_persister_update_fail() {
let mut node_0_per_peer_lock;
let mut node_0_peer_state_lock;
let mut channel = get_channel_ref!(nodes[0], nodes[1], node_0_per_peer_lock, node_0_peer_state_lock, chan.2);
if let Ok(update) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].logger) {
if let Ok(Some(update)) = channel.commitment_signed(&updates.commitment_signed, &node_cfgs[0].logger) {
// Check that even though the persister is returning a InProgress,
// because the update is bogus, ultimately the error that's returned
// should be a PermanentFailure.
Expand Down
173 changes: 128 additions & 45 deletions lightning/src/ln/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,21 @@ pub(crate) const MIN_AFFORDABLE_HTLC_COUNT: usize = 4;
/// * `EXPIRE_PREV_CONFIG_TICKS` = convergence_delay / tick_interval
pub(crate) const EXPIRE_PREV_CONFIG_TICKS: usize = 5;

struct PendingChannelMonitorUpdate {
update: ChannelMonitorUpdate,
/// In some cases we need to delay letting the [`ChannelMonitorUpdate`] go until after an
/// `Event` is processed by the user. This bool indicates the [`ChannelMonitorUpdate`] is
/// blocked on some external event and the [`ChannelManager`] will update us when we're ready.
///
/// [`ChannelManager`]: super::channelmanager::ChannelManager
blocked: bool,
}

impl_writeable_tlv_based!(PendingChannelMonitorUpdate, {
(0, update, required),
(2, blocked, required),
});

// TODO: We should refactor this to be an Inbound/OutboundChannel until initial setup handshaking
// has been completed, and then turn into a Channel to get compiler-time enforcement of things like
// calling channel_id() before we're set up or things like get_outbound_funding_signed on an
Expand Down Expand Up @@ -744,7 +759,7 @@ pub(super) struct Channel<Signer: ChannelSigner> {
/// If we then persist the [`channelmanager::ChannelManager`] and crash before the persistence
/// completes we still need to be able to complete the persistence. Thus, we have to keep a
/// copy of the [`ChannelMonitorUpdate`] here until it is complete.
pending_monitor_updates: Vec<ChannelMonitorUpdate>,
pending_monitor_updates: Vec<PendingChannelMonitorUpdate>,
}

#[cfg(any(test, fuzzing))]
Expand Down Expand Up @@ -1979,28 +1994,52 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
}

pub fn get_update_fulfill_htlc_and_commit<L: Deref>(&mut self, htlc_id: u64, payment_preimage: PaymentPreimage, logger: &L) -> UpdateFulfillCommitFetch where L::Target: Logger {
let release_cs_monitor = self.pending_monitor_updates.iter().all(|upd| !upd.blocked);
match self.get_update_fulfill_htlc(htlc_id, payment_preimage, logger) {
UpdateFulfillFetch::NewClaim { mut monitor_update, htlc_value_msat, msg: Some(_) } => {
let mut additional_update = self.build_commitment_no_status_check(logger);
// build_commitment_no_status_check may bump latest_monitor_id but we want them to be
// strictly increasing by one, so decrement it here.
self.latest_monitor_update_id = monitor_update.update_id;
monitor_update.updates.append(&mut additional_update.updates);
self.monitor_updating_paused(false, true, false, Vec::new(), Vec::new(), Vec::new());
self.pending_monitor_updates.push(monitor_update);
UpdateFulfillFetch::NewClaim { mut monitor_update, htlc_value_msat, msg } => {
// Even if we aren't supposed to let new monitor updates with commitment state
// updates run, we still need to push the preimage ChannelMonitorUpdateStep no
// matter what. Sadly, to push a new monitor update which flies before others
// already queued, we have to insert it into the pending queue and update the
// update_ids of all the following monitors.
let unblocked_update_pos = if release_cs_monitor && msg.is_some() {
let mut additional_update = self.build_commitment_no_status_check(logger);
// build_commitment_no_status_check may bump latest_monitor_id but we want them
// to be strictly increasing by one, so decrement it here.
self.latest_monitor_update_id = monitor_update.update_id;
monitor_update.updates.append(&mut additional_update.updates);
self.pending_monitor_updates.push(PendingChannelMonitorUpdate {
update: monitor_update, blocked: false,
});
self.pending_monitor_updates.len() - 1
} else {
let insert_pos = self.pending_monitor_updates.iter().position(|upd| upd.blocked)
.unwrap_or(self.pending_monitor_updates.len());
let new_mon_id = self.pending_monitor_updates.get(insert_pos)
.map(|upd| upd.update.update_id).unwrap_or(monitor_update.update_id);
monitor_update.update_id = new_mon_id;
self.pending_monitor_updates.insert(insert_pos, PendingChannelMonitorUpdate {
update: monitor_update, blocked: false,
});
for held_update in self.pending_monitor_updates.iter_mut().skip(insert_pos + 1) {
held_update.update.update_id += 1;
}
if msg.is_some() {
debug_assert!(false, "If there is a pending blocked monitor we should have MonitorUpdateInProgress set");
let update = self.build_commitment_no_status_check(logger);
self.pending_monitor_updates.push(PendingChannelMonitorUpdate {
update, blocked: true,
});
}
insert_pos
};
self.monitor_updating_paused(false, msg.is_some(), false, Vec::new(), Vec::new(), Vec::new());
UpdateFulfillCommitFetch::NewClaim {
monitor_update: self.pending_monitor_updates.last().unwrap(),
monitor_update: &self.pending_monitor_updates.get(unblocked_update_pos)
.expect("We just pushed the monitor update").update,
htlc_value_msat,
}
},
UpdateFulfillFetch::NewClaim { monitor_update, htlc_value_msat, msg: None } => {
self.monitor_updating_paused(false, false, false, Vec::new(), Vec::new(), Vec::new());
self.pending_monitor_updates.push(monitor_update);
UpdateFulfillCommitFetch::NewClaim {
monitor_update: self.pending_monitor_updates.last().unwrap(),
htlc_value_msat,
}
}
UpdateFulfillFetch::DuplicateClaim {} => UpdateFulfillCommitFetch::DuplicateClaim {},
}
}
Expand Down Expand Up @@ -3068,7 +3107,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
Ok(())
}

pub fn commitment_signed<L: Deref>(&mut self, msg: &msgs::CommitmentSigned, logger: &L) -> Result<&ChannelMonitorUpdate, ChannelError>
pub fn commitment_signed<L: Deref>(&mut self, msg: &msgs::CommitmentSigned, logger: &L) -> Result<Option<&ChannelMonitorUpdate>, ChannelError>
where L::Target: Logger
{
if (self.channel_state & (ChannelState::ChannelReady as u32)) != (ChannelState::ChannelReady as u32) {
Expand Down Expand Up @@ -3268,8 +3307,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
}
log_debug!(logger, "Received valid commitment_signed from peer in channel {}, updated HTLC state but awaiting a monitor update resolution to reply.",
log_bytes!(self.channel_id));
self.pending_monitor_updates.push(monitor_update);
return Ok(self.pending_monitor_updates.last().unwrap());
return Ok(self.push_ret_blockable_mon_update(monitor_update));
}

let need_commitment_signed = if need_commitment && (self.channel_state & (ChannelState::AwaitingRemoteRevoke as u32)) == 0 {
Expand All @@ -3286,9 +3324,8 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {

log_debug!(logger, "Received valid commitment_signed from peer in channel {}, updating HTLC state and responding with{} a revoke_and_ack.",
log_bytes!(self.channel_id()), if need_commitment_signed { " our own commitment_signed and" } else { "" });
self.pending_monitor_updates.push(monitor_update);
self.monitor_updating_paused(true, need_commitment_signed, false, Vec::new(), Vec::new(), Vec::new());
return Ok(self.pending_monitor_updates.last().unwrap());
return Ok(self.push_ret_blockable_mon_update(monitor_update));
}

/// Public version of the below, checking relevant preconditions first.
Expand Down Expand Up @@ -3403,8 +3440,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
update_add_htlcs.len(), update_fulfill_htlcs.len(), update_fail_htlcs.len());

self.monitor_updating_paused(false, true, false, Vec::new(), Vec::new(), Vec::new());
self.pending_monitor_updates.push(monitor_update);
(Some(self.pending_monitor_updates.last().unwrap()), htlcs_to_fail)
(self.push_ret_blockable_mon_update(monitor_update), htlcs_to_fail)
} else {
(None, Vec::new())
}
Expand All @@ -3415,7 +3451,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
/// waiting on this revoke_and_ack. The generation of this new commitment_signed may also fail,
/// generating an appropriate error *after* the channel state has been updated based on the
/// revoke_and_ack message.
pub fn revoke_and_ack<L: Deref>(&mut self, msg: &msgs::RevokeAndACK, logger: &L) -> Result<(Vec<(HTLCSource, PaymentHash)>, &ChannelMonitorUpdate), ChannelError>
pub fn revoke_and_ack<L: Deref>(&mut self, msg: &msgs::RevokeAndACK, logger: &L) -> Result<(Vec<(HTLCSource, PaymentHash)>, Option<&ChannelMonitorUpdate>), ChannelError>
where L::Target: Logger,
{
if (self.channel_state & (ChannelState::ChannelReady as u32)) != (ChannelState::ChannelReady as u32) {
Expand Down Expand Up @@ -3612,21 +3648,19 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
self.monitor_pending_failures.append(&mut revoked_htlcs);
self.monitor_pending_finalized_fulfills.append(&mut finalized_claimed_htlcs);
log_debug!(logger, "Received a valid revoke_and_ack for channel {} but awaiting a monitor update resolution to reply.", log_bytes!(self.channel_id()));
self.pending_monitor_updates.push(monitor_update);
return Ok((Vec::new(), self.pending_monitor_updates.last().unwrap()));
return Ok((Vec::new(), self.push_ret_blockable_mon_update(monitor_update)));
}

match self.free_holding_cell_htlcs(logger) {
(Some(_), htlcs_to_fail) => {
let mut additional_update = self.pending_monitor_updates.pop().unwrap();
let mut additional_update = self.pending_monitor_updates.pop().unwrap().update;
// free_holding_cell_htlcs may bump latest_monitor_id multiple times but we want them to be
// strictly increasing by one, so decrement it here.
self.latest_monitor_update_id = monitor_update.update_id;
monitor_update.updates.append(&mut additional_update.updates);

self.monitor_updating_paused(false, true, false, to_forward_infos, revoked_htlcs, finalized_claimed_htlcs);
self.pending_monitor_updates.push(monitor_update);
Ok((htlcs_to_fail, self.pending_monitor_updates.last().unwrap()))
Ok((htlcs_to_fail, self.push_ret_blockable_mon_update(monitor_update)))
},
(None, htlcs_to_fail) => {
if require_commitment {
Expand All @@ -3640,13 +3674,11 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
log_debug!(logger, "Received a valid revoke_and_ack for channel {}. Responding with a commitment update with {} HTLCs failed.",
log_bytes!(self.channel_id()), update_fail_htlcs.len() + update_fail_malformed_htlcs.len());
self.monitor_updating_paused(false, true, false, to_forward_infos, revoked_htlcs, finalized_claimed_htlcs);
self.pending_monitor_updates.push(monitor_update);
Ok((htlcs_to_fail, self.pending_monitor_updates.last().unwrap()))
Ok((htlcs_to_fail, self.push_ret_blockable_mon_update(monitor_update)))
} else {
log_debug!(logger, "Received a valid revoke_and_ack for channel {} with no reply necessary.", log_bytes!(self.channel_id()));
self.monitor_updating_paused(false, false, false, to_forward_infos, revoked_htlcs, finalized_claimed_htlcs);
self.pending_monitor_updates.push(monitor_update);
Ok((htlcs_to_fail, self.pending_monitor_updates.last().unwrap()))
Ok((htlcs_to_fail, self.push_ret_blockable_mon_update(monitor_update)))
}
}
}
Expand Down Expand Up @@ -3835,7 +3867,12 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
{
assert_eq!(self.channel_state & ChannelState::MonitorUpdateInProgress as u32, ChannelState::MonitorUpdateInProgress as u32);
self.channel_state &= !(ChannelState::MonitorUpdateInProgress as u32);
self.pending_monitor_updates.clear();
let mut found_blocked = false;
self.pending_monitor_updates.retain(|upd| {
if found_blocked { debug_assert!(upd.blocked, "No mons may be unblocked after a blocked one"); }
if upd.blocked { found_blocked = true; }
upd.blocked
});

// If we're past (or at) the FundingSent stage on an outbound channel, try to
// (re-)broadcast the funding transaction as we may have declined to broadcast it when we
Expand Down Expand Up @@ -4378,8 +4415,9 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
}],
};
self.monitor_updating_paused(false, false, false, Vec::new(), Vec::new(), Vec::new());
self.pending_monitor_updates.push(monitor_update);
Some(self.pending_monitor_updates.last().unwrap())
if self.push_blockable_mon_update(monitor_update) {
self.pending_monitor_updates.last().map(|upd| &upd.update)
} else { None }
} else { None };
let shutdown = if send_shutdown {
Some(msgs::Shutdown {
Expand Down Expand Up @@ -4951,8 +4989,49 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
(self.channel_state & ChannelState::MonitorUpdateInProgress as u32) != 0
}

pub fn get_next_monitor_update(&self) -> Option<&ChannelMonitorUpdate> {
self.pending_monitor_updates.first()
pub fn get_latest_complete_monitor_update_id(&self) -> u64 {
if self.pending_monitor_updates.is_empty() { return self.get_latest_monitor_update_id(); }
self.pending_monitor_updates[0].update.update_id - 1
}

/// Returns the next blocked monitor update, if one exists, and a bool which indicates a
/// further blocked monitor update exists after the next.
pub fn unblock_next_blocked_monitor_update(&mut self) -> Option<(&ChannelMonitorUpdate, bool)> {
for i in 0..self.pending_monitor_updates.len() {
if self.pending_monitor_updates[i].blocked {
self.pending_monitor_updates[i].blocked = false;
return Some((&self.pending_monitor_updates[i].update,
self.pending_monitor_updates.len() > i + 1));
}
}
None
}

/// Pushes a new monitor update into our monitor update queue, returning whether it should be
/// immediately given to the user for persisting or if it should be held as blocked.
fn push_blockable_mon_update(&mut self, update: ChannelMonitorUpdate) -> bool {
let release_monitor = self.pending_monitor_updates.iter().all(|upd| !upd.blocked);
self.pending_monitor_updates.push(PendingChannelMonitorUpdate {
update, blocked: !release_monitor
});
release_monitor
}

/// Pushes a new monitor update into our monitor update queue, returning a reference to it if
/// it should be immediately given to the user for persisting or `None` if it should be held as
/// blocked.
fn push_ret_blockable_mon_update(&mut self, update: ChannelMonitorUpdate)
-> Option<&ChannelMonitorUpdate> {
let release_monitor = self.push_blockable_mon_update(update);
if release_monitor { self.pending_monitor_updates.last().map(|upd| &upd.update) } else { None }
}

pub fn no_monitor_updates_pending(&self) -> bool {
self.pending_monitor_updates.is_empty()
}

pub fn complete_one_mon_update(&mut self, update_id: u64) {
self.pending_monitor_updates.retain(|upd| upd.update.update_id != update_id);
}

/// Returns true if funding_created was sent/received.
Expand Down Expand Up @@ -6000,8 +6079,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
Some(_) => {
let monitor_update = self.build_commitment_no_status_check(logger);
self.monitor_updating_paused(false, true, false, Vec::new(), Vec::new(), Vec::new());
self.pending_monitor_updates.push(monitor_update);
Ok(Some(self.pending_monitor_updates.last().unwrap()))
Ok(self.push_ret_blockable_mon_update(monitor_update))
},
None => Ok(None)
}
Expand Down Expand Up @@ -6090,8 +6168,9 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
}],
};
self.monitor_updating_paused(false, false, false, Vec::new(), Vec::new(), Vec::new());
self.pending_monitor_updates.push(monitor_update);
Some(self.pending_monitor_updates.last().unwrap())
if self.push_blockable_mon_update(monitor_update) {
self.pending_monitor_updates.last().map(|upd| &upd.update)
} else { None }
} else { None };
let shutdown = msgs::Shutdown {
channel_id: self.channel_id,
Expand Down Expand Up @@ -6528,6 +6607,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Writeable for Channel<Signer> {
(28, holder_max_accepted_htlcs, option),
(29, self.temporary_channel_id, option),
(31, channel_pending_event_emitted, option),
(33, self.pending_monitor_updates, vec_type),
});

Ok(())
Expand Down Expand Up @@ -6804,6 +6884,8 @@ impl<'a, 'b, 'c, ES: Deref, SP: Deref> ReadableArgs<(&'a ES, &'b SP, u32, &'c Ch
let mut temporary_channel_id: Option<[u8; 32]> = None;
let mut holder_max_accepted_htlcs: Option<u16> = None;

let mut pending_monitor_updates = Some(Vec::new());

read_tlv_fields!(reader, {
(0, announcement_sigs, option),
(1, minimum_depth, option),
Expand All @@ -6826,6 +6908,7 @@ impl<'a, 'b, 'c, ES: Deref, SP: Deref> ReadableArgs<(&'a ES, &'b SP, u32, &'c Ch
(28, holder_max_accepted_htlcs, option),
(29, temporary_channel_id, option),
(31, channel_pending_event_emitted, option),
(33, pending_monitor_updates, vec_type),
});

let (channel_keys_id, holder_signer) = if let Some(channel_keys_id) = channel_keys_id {
Expand Down Expand Up @@ -6995,7 +7078,7 @@ impl<'a, 'b, 'c, ES: Deref, SP: Deref> ReadableArgs<(&'a ES, &'b SP, u32, &'c Ch
channel_type: channel_type.unwrap(),
channel_keys_id,

pending_monitor_updates: Vec::new(),
pending_monitor_updates: pending_monitor_updates.unwrap(),
})
}
}
Expand Down
Loading