Skip to content

Commit ad18c4d

Browse files
Add commitment transaction broadcast as a ChannelMonitor event
To do this, we replace get_and_clear_pending_htlcs_updated with get_and_clear_pending_monitor_events, and which still transmits HTLCUpdates as before, but now also transmits a new MonitorEvent::CommitmentTxBroadcasted event when a channel's commitment transaction is broadcasted.
1 parent b3b4f43 commit ad18c4d

File tree

6 files changed

+136
-87
lines changed

6 files changed

+136
-87
lines changed

fuzz/src/chanmon_consistency.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use lightning::chain::transaction::OutPoint;
3434
use lightning::chain::chaininterface::{BroadcasterInterface,ConfirmationTarget,ChainListener,FeeEstimator,ChainWatchInterfaceUtil,ChainWatchInterface};
3535
use lightning::chain::keysinterface::{KeysInterface, InMemoryChannelKeys};
3636
use lightning::ln::channelmonitor;
37-
use lightning::ln::channelmonitor::{ChannelMonitor, ChannelMonitorUpdateErr, HTLCUpdate};
37+
use lightning::ln::channelmonitor::{ChannelMonitor, ChannelMonitorUpdateErr, MonitorEvent};
3838
use lightning::ln::channelmanager::{ChannelManager, PaymentHash, PaymentPreimage, PaymentSecret, ChannelManagerReadArgs};
3939
use lightning::ln::features::{ChannelFeatures, InitFeatures, NodeFeatures};
4040
use lightning::ln::msgs::{CommitmentUpdate, ChannelMessageHandler, ErrorAction, UpdateAddHTLC, Init};
@@ -135,8 +135,8 @@ impl channelmonitor::ManyChannelMonitor for TestChannelMonitor {
135135
self.update_ret.lock().unwrap().clone()
136136
}
137137

138-
fn get_and_clear_pending_htlcs_updated(&self) -> Vec<HTLCUpdate> {
139-
return self.simple_monitor.get_and_clear_pending_htlcs_updated();
138+
fn get_and_clear_pending_monitor_events(&self) -> Vec<MonitorEvent> {
139+
return self.simple_monitor.get_and_clear_pending_monitor_events();
140140
}
141141
}
142142

lightning/src/ln/channelmanager.rs

Lines changed: 49 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ use bitcoin::secp256k1;
3838
use chain::chaininterface::{BroadcasterInterface,ChainListener,FeeEstimator};
3939
use chain::transaction::OutPoint;
4040
use ln::channel::{Channel, ChannelError};
41-
use ln::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateErr, ManyChannelMonitor, HTLC_FAIL_BACK_BUFFER, CLTV_CLAIM_BUFFER, LATENCY_GRACE_PERIOD_BLOCKS, ANTI_REORG_DELAY};
41+
use ln::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateErr, ManyChannelMonitor, HTLC_FAIL_BACK_BUFFER, CLTV_CLAIM_BUFFER, LATENCY_GRACE_PERIOD_BLOCKS, ANTI_REORG_DELAY, MonitorEvent};
4242
use ln::features::{InitFeatures, NodeFeatures};
4343
use routing::router::{Route, RouteHop};
4444
use ln::msgs;
@@ -2966,6 +2966,48 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
29662966
Err(e) => { Err(APIError::APIMisuseError { err: e.err })}
29672967
}
29682968
}
2969+
2970+
/// Process pending events from the ManyChannelMonitor.
2971+
fn process_pending_monitor_events(&self) {
2972+
let mut failed_channels = Vec::new();
2973+
{
2974+
for monitor_event in self.monitor.get_and_clear_pending_monitor_events() {
2975+
match monitor_event {
2976+
MonitorEvent::HTLCEvent(htlc_update) => {
2977+
if let Some(preimage) = htlc_update.payment_preimage {
2978+
log_trace!(self.logger, "Claiming HTLC with preimage {} from our monitor", log_bytes!(preimage.0));
2979+
self.claim_funds_internal(self.channel_state.lock().unwrap(), htlc_update.source, preimage);
2980+
} else {
2981+
log_trace!(self.logger, "Failing HTLC with hash {} from our monitor", log_bytes!(htlc_update.payment_hash.0));
2982+
self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_update.source, &htlc_update.payment_hash, HTLCFailReason::Reason { failure_code: 0x4000 | 8, data: Vec::new() });
2983+
}
2984+
},
2985+
MonitorEvent::CommitmentTxBroadcasted(funding_outpoint) => {
2986+
let mut channel_lock = self.channel_state.lock().unwrap();
2987+
let channel_state = &mut *channel_lock;
2988+
let by_id = &mut channel_state.by_id;
2989+
let short_to_id = &mut channel_state.short_to_id;
2990+
let pending_msg_events = &mut channel_state.pending_msg_events;
2991+
if let Some(mut chan) = by_id.remove(&funding_outpoint.to_channel_id()) {
2992+
if let Some(short_id) = chan.get_short_channel_id() {
2993+
short_to_id.remove(&short_id);
2994+
}
2995+
failed_channels.push(chan.force_shutdown(false));
2996+
if let Ok(update) = self.get_channel_update(&chan) {
2997+
pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
2998+
msg: update
2999+
});
3000+
}
3001+
}
3002+
},
3003+
}
3004+
}
3005+
}
3006+
3007+
for failure in failed_channels.drain(..) {
3008+
self.finish_force_close_channel(failure);
3009+
}
3010+
}
29693011
}
29703012

29713013
impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> events::MessageSendEventsProvider for ChannelManager<ChanSigner, M, T, K, F, L>
@@ -2976,21 +3018,9 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
29763018
L::Target: Logger,
29773019
{
29783020
fn get_and_clear_pending_msg_events(&self) -> Vec<events::MessageSendEvent> {
2979-
// TODO: Event release to users and serialization is currently race-y: it's very easy for a
2980-
// user to serialize a ChannelManager with pending events in it and lose those events on
2981-
// restart. This is doubly true for the fail/fulfill-backs from monitor events!
2982-
{
2983-
//TODO: This behavior should be documented.
2984-
for htlc_update in self.monitor.get_and_clear_pending_htlcs_updated() {
2985-
if let Some(preimage) = htlc_update.payment_preimage {
2986-
log_trace!(self.logger, "Claiming HTLC with preimage {} from our monitor", log_bytes!(preimage.0));
2987-
self.claim_funds_internal(self.channel_state.lock().unwrap(), htlc_update.source, preimage);
2988-
} else {
2989-
log_trace!(self.logger, "Failing HTLC with hash {} from our monitor", log_bytes!(htlc_update.payment_hash.0));
2990-
self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_update.source, &htlc_update.payment_hash, HTLCFailReason::Reason { failure_code: 0x4000 | 8, data: Vec::new() });
2991-
}
2992-
}
2993-
}
3021+
//TODO: This behavior should be documented. It's non-intuitive that we query
3022+
// ChannelMonitors when clearing other events.
3023+
self.process_pending_monitor_events();
29943024

29953025
let mut ret = Vec::new();
29963026
let mut channel_state = self.channel_state.lock().unwrap();
@@ -3007,21 +3037,9 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
30073037
L::Target: Logger,
30083038
{
30093039
fn get_and_clear_pending_events(&self) -> Vec<events::Event> {
3010-
// TODO: Event release to users and serialization is currently race-y: it's very easy for a
3011-
// user to serialize a ChannelManager with pending events in it and lose those events on
3012-
// restart. This is doubly true for the fail/fulfill-backs from monitor events!
3013-
{
3014-
//TODO: This behavior should be documented.
3015-
for htlc_update in self.monitor.get_and_clear_pending_htlcs_updated() {
3016-
if let Some(preimage) = htlc_update.payment_preimage {
3017-
log_trace!(self.logger, "Claiming HTLC with preimage {} from our monitor", log_bytes!(preimage.0));
3018-
self.claim_funds_internal(self.channel_state.lock().unwrap(), htlc_update.source, preimage);
3019-
} else {
3020-
log_trace!(self.logger, "Failing HTLC with hash {} from our monitor", log_bytes!(htlc_update.payment_hash.0));
3021-
self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_update.source, &htlc_update.payment_hash, HTLCFailReason::Reason { failure_code: 0x4000 | 8, data: Vec::new() });
3022-
}
3023-
}
3024-
}
3040+
//TODO: This behavior should be documented. It's non-intuitive that we query
3041+
// ChannelMonitors when clearing other events.
3042+
self.process_pending_monitor_events();
30253043

30263044
let mut ret = Vec::new();
30273045
let mut pending_events = self.pending_events.lock().unwrap();
@@ -3104,21 +3122,6 @@ impl<ChanSigner: ChannelKeys, M: Deref + Sync + Send, T: Deref + Sync + Send, K:
31043122
}
31053123
}
31063124
}
3107-
if channel.is_funding_initiated() && channel.channel_monitor().would_broadcast_at_height(height, &self.logger) {
3108-
if let Some(short_id) = channel.get_short_channel_id() {
3109-
short_to_id.remove(&short_id);
3110-
}
3111-
// If would_broadcast_at_height() is true, the channel_monitor will broadcast
3112-
// the latest local tx for us, so we should skip that here (it doesn't really
3113-
// hurt anything, but does make tests a bit simpler).
3114-
failed_channels.push(channel.force_shutdown(false));
3115-
if let Ok(update) = self.get_channel_update(&channel) {
3116-
pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
3117-
msg: update
3118-
});
3119-
}
3120-
return false;
3121-
}
31223125
true
31233126
});
31243127

lightning/src/ln/channelmonitor.rs

Lines changed: 59 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,16 @@ pub enum ChannelMonitorUpdateErr {
148148
#[derive(Debug)]
149149
pub struct MonitorUpdateError(pub &'static str);
150150

151+
/// An event to be processed by the ChannelManager.
152+
#[derive(PartialEq)]
153+
pub enum MonitorEvent {
154+
/// A monitor event containing an HTLCUpdate.
155+
HTLCEvent(HTLCUpdate),
156+
157+
/// A monitor event that the Channel's commitment transaction was broadcasted.
158+
CommitmentTxBroadcasted(OutPoint),
159+
}
160+
151161
/// Simple structure send back by ManyChannelMonitor in case of HTLC detected onchain from a
152162
/// forward channel and from which info are needed to update HTLC in a backward channel.
153163
#[derive(Clone, PartialEq)]
@@ -292,12 +302,12 @@ impl<ChanSigner: ChannelKeys, T: Deref + Sync + Send, F: Deref + Sync + Send, L:
292302
}
293303
}
294304

295-
fn get_and_clear_pending_htlcs_updated(&self) -> Vec<HTLCUpdate> {
296-
let mut pending_htlcs_updated = Vec::new();
305+
fn get_and_clear_pending_monitor_events(&self) -> Vec<MonitorEvent> {
306+
let mut pending_monitor_events = Vec::new();
297307
for chan in self.monitors.lock().unwrap().values_mut() {
298-
pending_htlcs_updated.append(&mut chan.get_and_clear_pending_htlcs_updated());
308+
pending_monitor_events.append(&mut chan.get_and_clear_pending_monitor_events());
299309
}
300-
pending_htlcs_updated
310+
pending_monitor_events
301311
}
302312
}
303313

@@ -729,7 +739,7 @@ impl Readable for ChannelMonitorUpdateStep {
729739
/// information and are actively monitoring the chain.
730740
///
731741
/// Pending Events or updated HTLCs which have not yet been read out by
732-
/// get_and_clear_pending_htlcs_updated or get_and_clear_pending_events are serialized to disk and
742+
/// get_and_clear_pending_monitor_events or get_and_clear_pending_events are serialized to disk and
733743
/// reloaded at deserialize-time. Thus, you must ensure that, when handling events, all events
734744
/// gotten are fully handled before re-serializing the new state.
735745
pub struct ChannelMonitor<ChanSigner: ChannelKeys> {
@@ -784,7 +794,7 @@ pub struct ChannelMonitor<ChanSigner: ChannelKeys> {
784794

785795
payment_preimages: HashMap<PaymentHash, PaymentPreimage>,
786796

787-
pending_htlcs_updated: Vec<HTLCUpdate>,
797+
pending_monitor_events: Vec<MonitorEvent>,
788798
pending_events: Vec<events::Event>,
789799

790800
// Used to track onchain events, i.e transactions parts of channels confirmed on chain, on which
@@ -881,9 +891,9 @@ pub trait ManyChannelMonitor: Send + Sync {
881891
/// with success or failure.
882892
///
883893
/// You should probably just call through to
884-
/// ChannelMonitor::get_and_clear_pending_htlcs_updated() for each ChannelMonitor and return
894+
/// ChannelMonitor::get_and_clear_pending_monitor_events() for each ChannelMonitor and return
885895
/// the full list.
886-
fn get_and_clear_pending_htlcs_updated(&self) -> Vec<HTLCUpdate>;
896+
fn get_and_clear_pending_monitor_events(&self) -> Vec<MonitorEvent>;
887897
}
888898

889899
#[cfg(any(test, feature = "fuzztarget"))]
@@ -914,7 +924,7 @@ impl<ChanSigner: ChannelKeys> PartialEq for ChannelMonitor<ChanSigner> {
914924
self.current_local_commitment_number != other.current_local_commitment_number ||
915925
self.current_local_commitment_tx != other.current_local_commitment_tx ||
916926
self.payment_preimages != other.payment_preimages ||
917-
self.pending_htlcs_updated != other.pending_htlcs_updated ||
927+
self.pending_monitor_events != other.pending_monitor_events ||
918928
self.pending_events.len() != other.pending_events.len() || // We trust events to round-trip properly
919929
self.onchain_events_waiting_threshold_conf != other.onchain_events_waiting_threshold_conf ||
920930
self.outputs_to_watch != other.outputs_to_watch ||
@@ -1070,9 +1080,15 @@ impl<ChanSigner: ChannelKeys + Writeable> ChannelMonitor<ChanSigner> {
10701080
writer.write_all(&payment_preimage.0[..])?;
10711081
}
10721082

1073-
writer.write_all(&byte_utils::be64_to_array(self.pending_htlcs_updated.len() as u64))?;
1074-
for data in self.pending_htlcs_updated.iter() {
1075-
data.write(writer)?;
1083+
writer.write_all(&byte_utils::be64_to_array(self.pending_monitor_events.len() as u64))?;
1084+
for event in self.pending_monitor_events.iter() {
1085+
match event {
1086+
MonitorEvent::HTLCEvent(upd) => {
1087+
0u8.write(writer)?;
1088+
upd.write(writer)?;
1089+
},
1090+
MonitorEvent::CommitmentTxBroadcasted(_) => 1u8.write(writer)?
1091+
}
10761092
}
10771093

10781094
writer.write_all(&byte_utils::be64_to_array(self.pending_events.len() as u64))?;
@@ -1187,7 +1203,7 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
11871203
current_local_commitment_number: 0xffff_ffff_ffff - ((((local_tx_sequence & 0xffffff) << 3*8) | (local_tx_locktime as u64 & 0xffffff)) ^ commitment_transaction_number_obscure_factor),
11881204

11891205
payment_preimages: HashMap::new(),
1190-
pending_htlcs_updated: Vec::new(),
1206+
pending_monitor_events: Vec::new(),
11911207
pending_events: Vec::new(),
11921208

11931209
onchain_events_waiting_threshold_conf: HashMap::new(),
@@ -1351,6 +1367,7 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
13511367
for tx in self.get_latest_local_commitment_txn(logger).iter() {
13521368
broadcaster.broadcast_transaction(tx);
13531369
}
1370+
self.pending_monitor_events.push(MonitorEvent::CommitmentTxBroadcasted(self.funding_info.0));
13541371
}
13551372

13561373
/// Used in Channel to cheat wrt the update_ids since it plays games, will be removed soon!
@@ -1443,10 +1460,10 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
14431460
}
14441461

14451462
/// Get the list of HTLCs who's status has been updated on chain. This should be called by
1446-
/// ChannelManager via ManyChannelMonitor::get_and_clear_pending_htlcs_updated().
1447-
pub fn get_and_clear_pending_htlcs_updated(&mut self) -> Vec<HTLCUpdate> {
1463+
/// ChannelManager via ManyChannelMonitor::get_and_clear_pending_monitor_events().
1464+
pub fn get_and_clear_pending_monitor_events(&mut self) -> Vec<MonitorEvent> {
14481465
let mut ret = Vec::new();
1449-
mem::swap(&mut ret, &mut self.pending_htlcs_updated);
1466+
mem::swap(&mut ret, &mut self.pending_monitor_events);
14501467
ret
14511468
}
14521469

@@ -1938,7 +1955,9 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
19381955
claimable_outpoints.push(ClaimRequest { absolute_timelock: height, aggregable: false, outpoint: BitcoinOutPoint { txid: self.funding_info.0.txid.clone(), vout: self.funding_info.0.index as u32 }, witness_data: InputMaterial::Funding { funding_redeemscript: self.funding_redeemscript.clone() }});
19391956
}
19401957
if should_broadcast {
1958+
self.pending_monitor_events.push(MonitorEvent::CommitmentTxBroadcasted(self.funding_info.0));
19411959
if let Some(commitment_tx) = self.onchain_tx_handler.get_fully_signed_local_tx(&self.funding_redeemscript) {
1960+
self.local_tx_signed = true;
19421961
let (mut new_outpoints, new_outputs, _) = self.broadcast_by_local_state(&commitment_tx, &self.current_local_commitment_tx);
19431962
if !new_outputs.is_empty() {
19441963
watch_outputs.push((self.current_local_commitment_tx.txid.clone(), new_outputs));
@@ -1951,11 +1970,11 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
19511970
match ev {
19521971
OnchainEvent::HTLCUpdate { htlc_update } => {
19531972
log_trace!(logger, "HTLC {} failure update has got enough confirmations to be passed upstream", log_bytes!((htlc_update.1).0));
1954-
self.pending_htlcs_updated.push(HTLCUpdate {
1973+
self.pending_monitor_events.push(MonitorEvent::HTLCEvent(HTLCUpdate {
19551974
payment_hash: htlc_update.1,
19561975
payment_preimage: None,
19571976
source: htlc_update.0,
1958-
});
1977+
}));
19591978
},
19601979
OnchainEvent::MaturingOutput { descriptor } => {
19611980
log_trace!(logger, "Descriptor {} has got enough confirmations to be passed upstream", log_spendable!(descriptor));
@@ -1966,6 +1985,7 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
19661985
}
19671986
}
19681987
}
1988+
19691989
self.onchain_tx_handler.block_connected(txn_matched, claimable_outpoints, height, &*broadcaster, &*fee_estimator, &*logger);
19701990

19711991
self.last_block_hash = block_hash.clone();
@@ -1993,7 +2013,7 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
19932013
self.last_block_hash = block_hash.clone();
19942014
}
19952015

1996-
pub(super) fn would_broadcast_at_height<L: Deref>(&self, height: u32, logger: &L) -> bool where L::Target: Logger {
2016+
fn would_broadcast_at_height<L: Deref>(&self, height: u32, logger: &L) -> bool where L::Target: Logger {
19972017
// We need to consider all HTLCs which are:
19982018
// * in any unrevoked remote commitment transaction, as they could broadcast said
19992019
// transactions and we'd end up in a race, or
@@ -2151,22 +2171,26 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
21512171
if let Some((source, payment_hash)) = payment_data {
21522172
let mut payment_preimage = PaymentPreimage([0; 32]);
21532173
if accepted_preimage_claim {
2154-
if !self.pending_htlcs_updated.iter().any(|update| update.source == source) {
2174+
if !self.pending_monitor_events.iter().any(
2175+
|update| if let &MonitorEvent::HTLCEvent(ref upd) = update { upd.source == source } else { false }) {
21552176
payment_preimage.0.copy_from_slice(&input.witness[3]);
2156-
self.pending_htlcs_updated.push(HTLCUpdate {
2177+
self.pending_monitor_events.push(MonitorEvent::HTLCEvent(HTLCUpdate {
21572178
source,
21582179
payment_preimage: Some(payment_preimage),
21592180
payment_hash
2160-
});
2181+
}));
21612182
}
21622183
} else if offered_preimage_claim {
2163-
if !self.pending_htlcs_updated.iter().any(|update| update.source == source) {
2184+
if !self.pending_monitor_events.iter().any(
2185+
|update| if let &MonitorEvent::HTLCEvent(ref upd) = update {
2186+
upd.source == source
2187+
} else { false }) {
21642188
payment_preimage.0.copy_from_slice(&input.witness[1]);
2165-
self.pending_htlcs_updated.push(HTLCUpdate {
2189+
self.pending_monitor_events.push(MonitorEvent::HTLCEvent(HTLCUpdate {
21662190
source,
21672191
payment_preimage: Some(payment_preimage),
21682192
payment_hash
2169-
});
2193+
}));
21702194
}
21712195
} else {
21722196
log_info!(logger, "Failing HTLC with payment_hash {} timeout by a spend tx, waiting for confirmation (at height{})", log_bytes!(payment_hash.0), height + ANTI_REORG_DELAY - 1);
@@ -2422,10 +2446,15 @@ impl<ChanSigner: ChannelKeys + Readable> Readable for (BlockHash, ChannelMonitor
24222446
}
24232447
}
24242448

2425-
let pending_htlcs_updated_len: u64 = Readable::read(reader)?;
2426-
let mut pending_htlcs_updated = Vec::with_capacity(cmp::min(pending_htlcs_updated_len as usize, MAX_ALLOC_SIZE / (32 + 8*3)));
2427-
for _ in 0..pending_htlcs_updated_len {
2428-
pending_htlcs_updated.push(Readable::read(reader)?);
2449+
let pending_monitor_events_len: u64 = Readable::read(reader)?;
2450+
let mut pending_monitor_events = Vec::with_capacity(cmp::min(pending_monitor_events_len as usize, MAX_ALLOC_SIZE / (32 + 8*3)));
2451+
for _ in 0..pending_monitor_events_len {
2452+
let ev = match <u8 as Readable>::read(reader)? {
2453+
0 => MonitorEvent::HTLCEvent(Readable::read(reader)?),
2454+
1 => MonitorEvent::CommitmentTxBroadcasted(funding_info.0),
2455+
_ => return Err(DecodeError::InvalidValue)
2456+
};
2457+
pending_monitor_events.push(ev);
24292458
}
24302459

24312460
let pending_events_len: u64 = Readable::read(reader)?;
@@ -2516,7 +2545,7 @@ impl<ChanSigner: ChannelKeys + Readable> Readable for (BlockHash, ChannelMonitor
25162545
current_local_commitment_number,
25172546

25182547
payment_preimages,
2519-
pending_htlcs_updated,
2548+
pending_monitor_events,
25202549
pending_events,
25212550

25222551
onchain_events_waiting_threshold_conf,

0 commit comments

Comments
 (0)