Skip to content

Commit e0df118

Browse files
committed
Track payments after they resolve until all HTLCs are finalized
In the next commit, we will reload lost pending payments from ChannelMonitors during restart. However, in order to avoid re-adding pending payments which have already been fulfilled, we must ensure that we do not fully remove pending payments until all HTLCs for the payment have been fully removed from their ChannelMonitors. We do so here, introducing a new PendingOutboundPayment variant called `Completed` which only tracks the set of pending HTLCs.
1 parent b94d50b commit e0df118

File tree

1 file changed

+109
-16
lines changed

1 file changed

+109
-16
lines changed

lightning/src/ln/channelmanager.rs

Lines changed: 109 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -416,13 +416,44 @@ pub(crate) enum PendingOutboundPayment {
416416
/// Our best known block height at the time this payment was initiated.
417417
starting_block_height: u32,
418418
},
419+
/// When a payment completes, we continue tracking it until all pendings HTLCs have been
420+
/// resolved. This ensures we don't look up pending payments in ChannelMonitors on restart and
421+
/// add a pending payment that was already completed.
422+
Completed {
423+
session_privs: HashSet<[u8; 32]>,
424+
},
419425
}
420426

421427
impl PendingOutboundPayment {
428+
fn is_retryable(&self) -> bool {
429+
match self {
430+
PendingOutboundPayment::Retryable { .. } => true,
431+
_ => false,
432+
}
433+
}
434+
fn is_completed(&self) -> bool {
435+
match self {
436+
PendingOutboundPayment::Completed { .. } => true,
437+
_ => false,
438+
}
439+
}
440+
441+
fn mark_completed(&mut self) {
442+
let mut session_privs = HashSet::new();
443+
core::mem::swap(&mut session_privs, match self {
444+
PendingOutboundPayment::Legacy { session_privs } |
445+
PendingOutboundPayment::Retryable { session_privs, .. } |
446+
PendingOutboundPayment::Completed { session_privs }
447+
=> session_privs
448+
});
449+
*self = PendingOutboundPayment::Completed { session_privs };
450+
}
451+
422452
fn remove(&mut self, session_priv: &[u8; 32], part_amt_msat: u64) -> bool {
423453
let remove_res = match self {
424454
PendingOutboundPayment::Legacy { session_privs } |
425-
PendingOutboundPayment::Retryable { session_privs, .. } => {
455+
PendingOutboundPayment::Retryable { session_privs, .. } |
456+
PendingOutboundPayment::Completed { session_privs } => {
426457
session_privs.remove(session_priv)
427458
}
428459
};
@@ -440,6 +471,7 @@ impl PendingOutboundPayment {
440471
PendingOutboundPayment::Retryable { session_privs, .. } => {
441472
session_privs.insert(session_priv)
442473
}
474+
PendingOutboundPayment::Completed { .. } => false
443475
};
444476
if insert_res {
445477
if let PendingOutboundPayment::Retryable { ref mut pending_amt_msat, .. } = self {
@@ -452,7 +484,8 @@ impl PendingOutboundPayment {
452484
fn remaining_parts(&self) -> usize {
453485
match self {
454486
PendingOutboundPayment::Legacy { session_privs } |
455-
PendingOutboundPayment::Retryable { session_privs, .. } => {
487+
PendingOutboundPayment::Retryable { session_privs, .. } |
488+
PendingOutboundPayment::Completed { session_privs } => {
456489
session_privs.len()
457490
}
458491
}
@@ -1961,6 +1994,15 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
19611994
let onion_packet = onion_utils::construct_onion_packet(onion_payloads, onion_keys, prng_seed, payment_hash);
19621995

19631996
let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
1997+
let mut pending_outbounds = self.pending_outbound_payments.lock().unwrap();
1998+
let payment_entry = pending_outbounds.entry(payment_id);
1999+
if let hash_map::Entry::Occupied(payment) = &payment_entry {
2000+
if !payment.get().is_retryable() {
2001+
return Err(APIError::RouteError {
2002+
err: "Payment already completed"
2003+
});
2004+
}
2005+
}
19642006

19652007
let err: Result<(), _> = loop {
19662008
let mut channel_lock = self.channel_state.lock().unwrap();
@@ -1987,8 +2029,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
19872029
}, onion_packet, &self.logger),
19882030
channel_state, chan);
19892031

1990-
let mut pending_outbounds = self.pending_outbound_payments.lock().unwrap();
1991-
let payment = pending_outbounds.entry(payment_id).or_insert_with(|| PendingOutboundPayment::Retryable {
2032+
let payment = payment_entry.or_insert_with(|| PendingOutboundPayment::Retryable {
19922033
session_privs: HashSet::new(),
19932034
pending_amt_msat: 0,
19942035
payment_hash: *payment_hash,
@@ -2184,7 +2225,12 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
21842225
return Err(PaymentSendFailure::ParameterError(APIError::APIMisuseError {
21852226
err: "Unable to retry payments that were initially sent on LDK versions prior to 0.0.102".to_string()
21862227
}))
2187-
}
2228+
},
2229+
PendingOutboundPayment::Completed { .. } => {
2230+
return Err(PaymentSendFailure::ParameterError(APIError::RouteError {
2231+
err: "Payment already completed"
2232+
}));
2233+
},
21882234
}
21892235
} else {
21902236
return Err(PaymentSendFailure::ParameterError(APIError::APIMisuseError {
@@ -3012,7 +3058,9 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
30123058
session_priv_bytes.copy_from_slice(&session_priv[..]);
30133059
let mut outbounds = self.pending_outbound_payments.lock().unwrap();
30143060
if let hash_map::Entry::Occupied(mut payment) = outbounds.entry(payment_id) {
3015-
if payment.get_mut().remove(&session_priv_bytes, path.last().unwrap().fee_msat) {
3061+
if payment.get_mut().remove(&session_priv_bytes, path.last().unwrap().fee_msat) &&
3062+
!payment.get().is_completed()
3063+
{
30163064
self.pending_events.lock().unwrap().push(
30173065
events::Event::PaymentPathFailed {
30183066
payment_hash,
@@ -3061,6 +3109,10 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
30613109
log_trace!(self.logger, "Received duplicative fail for HTLC with payment_hash {}", log_bytes!(payment_hash.0));
30623110
return;
30633111
}
3112+
if sessions.get().is_completed() {
3113+
log_trace!(self.logger, "Received failure of HTLC with payment_hash {} after payment completion", log_bytes!(payment_hash.0));
3114+
return;
3115+
}
30643116
if sessions.get().remaining_parts() == 0 {
30653117
all_paths_failed = true;
30663118
}
@@ -3307,15 +3359,45 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
33073359
} else { unreachable!(); }
33083360
}
33093361

3362+
fn finalize_claims(&self, mut sources: Vec<HTLCSource>) {
3363+
for source in sources.drain(..) {
3364+
if let HTLCSource::OutboundRoute { session_priv, payment_id, .. } = source {
3365+
let mut session_priv_bytes = [0; 32];
3366+
session_priv_bytes.copy_from_slice(&session_priv[..]);
3367+
let mut outbounds = self.pending_outbound_payments.lock().unwrap();
3368+
if let hash_map::Entry::Occupied(mut sessions) = outbounds.entry(payment_id) {
3369+
assert!(sessions.get().is_completed());
3370+
sessions.get_mut().remove(&session_priv_bytes, 0); // Note that the amount is no longer tracked
3371+
if sessions.get().remaining_parts() == 0 {
3372+
sessions.remove();
3373+
}
3374+
}
3375+
}
3376+
}
3377+
}
33103378
fn claim_funds_internal(&self, mut channel_state_lock: MutexGuard<ChannelHolder<Signer>>, source: HTLCSource, payment_preimage: PaymentPreimage, forwarded_htlc_value_msat: Option<u64>, from_onchain: bool) {
33113379
match source {
33123380
HTLCSource::OutboundRoute { session_priv, payment_id, path, .. } => {
33133381
mem::drop(channel_state_lock);
33143382
let mut session_priv_bytes = [0; 32];
33153383
session_priv_bytes.copy_from_slice(&session_priv[..]);
33163384
let mut outbounds = self.pending_outbound_payments.lock().unwrap();
3317-
let found_payment = if let Some(mut sessions) = outbounds.remove(&payment_id) {
3318-
sessions.remove(&session_priv_bytes, path.last().unwrap().fee_msat)
3385+
let found_payment = if let hash_map::Entry::Occupied(mut sessions) = outbounds.entry(payment_id) {
3386+
let found_payment = !sessions.get().is_completed();
3387+
sessions.get_mut().mark_completed();
3388+
if from_onchain {
3389+
// We currently immediately remove HTLCs which were fulfilled on-chain.
3390+
// This could potentially lead to removing a pending payment too early,
3391+
// with a reorg of one block causing us to re-add the completed payment on
3392+
// restart.
3393+
// TODO: We should have a second monitor event that informs us of payments
3394+
// irrevocably completing.
3395+
sessions.get_mut().remove(&session_priv_bytes, path.last().unwrap().fee_msat);
3396+
if sessions.get().remaining_parts() == 0 {
3397+
sessions.remove();
3398+
}
3399+
}
3400+
found_payment
33193401
} else { false };
33203402
if found_payment {
33213403
let payment_hash = PaymentHash(Sha256::hash(&payment_preimage.0).into_inner());
@@ -3390,7 +3472,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
33903472
let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
33913473

33923474
let chan_restoration_res;
3393-
let mut pending_failures = {
3475+
let (mut pending_failures, finalized_claims) = {
33943476
let mut channel_lock = self.channel_state.lock().unwrap();
33953477
let channel_state = &mut *channel_lock;
33963478
let mut channel = match channel_state.by_id.entry(funding_txo.to_channel_id()) {
@@ -3412,14 +3494,14 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
34123494
msg: self.get_channel_update_for_unicast(channel.get()).unwrap(),
34133495
})
34143496
} else { None };
3415-
// TODO: Handle updates.finalized_claims!
34163497
chan_restoration_res = handle_chan_restoration_locked!(self, channel_lock, channel_state, channel, updates.raa, updates.commitment_update, updates.order, None, updates.forwards, updates.funding_broadcastable, updates.funding_locked);
34173498
if let Some(upd) = channel_update {
34183499
channel_state.pending_msg_events.push(upd);
34193500
}
3420-
updates.failures
3501+
(updates.failures, updates.finalized_claims)
34213502
};
34223503
post_handle_chan_restoration!(self, chan_restoration_res);
3504+
self.finalize_claims(finalized_claims);
34233505
for failure in pending_failures.drain(..) {
34243506
self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), failure.0, &failure.1, failure.2);
34253507
}
@@ -3941,6 +4023,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
39414023
});
39424024
}
39434025
break Ok((raa_updates.to_forward_htlcs, raa_updates.failed_htlcs,
4026+
raa_updates.finalized_claim_htlcs,
39444027
chan.get().get_short_channel_id()
39454028
.expect("RAA should only work on a short-id-available channel"),
39464029
chan.get().get_funding_txo().unwrap()))
@@ -3950,11 +4033,14 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
39504033
};
39514034
self.fail_holding_cell_htlcs(htlcs_to_fail, msg.channel_id);
39524035
match res {
3953-
Ok((pending_forwards, mut pending_failures, short_channel_id, channel_outpoint)) => {
4036+
Ok((pending_forwards, mut pending_failures, finalized_claim_htlcs,
4037+
short_channel_id, channel_outpoint)) =>
4038+
{
39544039
for failure in pending_failures.drain(..) {
39554040
self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), failure.0, &failure.1, failure.2);
39564041
}
39574042
self.forward_htlcs(&mut [(short_channel_id, channel_outpoint, pending_forwards)]);
4043+
self.finalize_claims(finalized_claim_htlcs);
39584044
Ok(())
39594045
},
39604046
Err(e) => Err(e)
@@ -5317,10 +5403,13 @@ impl_writeable_tlv_based!(PendingInboundPayment, {
53175403
(8, min_value_msat, required),
53185404
});
53195405

5320-
impl_writeable_tlv_based_enum!(PendingOutboundPayment,
5406+
impl_writeable_tlv_based_enum_upgradable!(PendingOutboundPayment,
53215407
(0, Legacy) => {
53225408
(0, session_privs, required),
53235409
},
5410+
(1, Completed) => {
5411+
(0, session_privs, required),
5412+
},
53245413
(2, Retryable) => {
53255414
(0, session_privs, required),
53265415
(2, payment_hash, required),
@@ -5329,7 +5418,7 @@ impl_writeable_tlv_based_enum!(PendingOutboundPayment,
53295418
(8, pending_amt_msat, required),
53305419
(10, starting_block_height, required),
53315420
},
5332-
;);
5421+
);
53335422

53345423
impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> Writeable for ChannelManager<Signer, M, T, K, F, L>
53355424
where M::Target: chain::Watch<Signer>,
@@ -5422,7 +5511,9 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> Writeable f
54225511
// For backwards compat, write the session privs and their total length.
54235512
let mut num_pending_outbounds_compat: u64 = 0;
54245513
for (_, outbound) in pending_outbound_payments.iter() {
5425-
num_pending_outbounds_compat += outbound.remaining_parts() as u64;
5514+
if !outbound.is_completed() {
5515+
num_pending_outbounds_compat += outbound.remaining_parts() as u64;
5516+
}
54265517
}
54275518
num_pending_outbounds_compat.write(writer)?;
54285519
for (_, outbound) in pending_outbound_payments.iter() {
@@ -5433,6 +5524,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> Writeable f
54335524
session_priv.write(writer)?;
54345525
}
54355526
}
5527+
PendingOutboundPayment::Completed { .. }=> {},
54365528
}
54375529
}
54385530

@@ -5443,7 +5535,8 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> Writeable f
54435535
PendingOutboundPayment::Legacy { session_privs } |
54445536
PendingOutboundPayment::Retryable { session_privs, .. } => {
54455537
pending_outbound_payments_no_retry.insert(*id, session_privs.clone());
5446-
}
5538+
},
5539+
_ => {},
54475540
}
54485541
}
54495542
write_tlv_fields!(writer, {

0 commit comments

Comments
 (0)