Skip to content

Commit 380c822

Browse files
committed
Move pending-HTLC-updated ChannelMonitor from ManyChannelMonitor
This is important for a number of reasons: * Firstly, I hit this trying to implement rescan in the demo bitcoinrpc client - if individual ChannelMonitors are out of sync with each other, we cannot add them all into a ManyChannelMonitor together and then rescan, but need to rescan them individually without having to do a bunch of manual work. Of the three return values in ChannelMonitor::block_connected, only the HTLCsource stuff that is moved here makes no sense to be exposed to the user. * Secondly, the logic currently in ManyChannelMonitor cannot be reproduced by the user! HTLCSource is deliberately an opaque type but we use its data to decide which things to keep when inserting into the HashMap. This would prevent a user from properly implementing a replacement ManyChannelMonitor, which is unacceptable. * Finally, by moving the tracking into ChannelMonitor, we can serialize them out, which prevents us from forgetting them when loading from disk, though there are still other races which need to be handled to make this fully safe (see TODOs in ChannelManager). This is safe as no two entries can have the same HTLCSource across different channels (or, if they did, it would be a rather serious bug), though note that, IIRC, when this code was added, the HTLCSource field in the values was not present. We also take this opportunity to rename the fetch function to match our other event interfaces, makaing it clear that by calling the function the set of HTLCUpdates will also be cleared.
1 parent 2514530 commit 380c822

File tree

4 files changed

+96
-60
lines changed

4 files changed

+96
-60
lines changed

fuzz/src/chanmon_consistency.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,8 +121,8 @@ impl channelmonitor::ManyChannelMonitor<EnforcingChannelKeys> for TestChannelMon
121121
ret
122122
}
123123

124-
fn fetch_pending_htlc_updated(&self) -> Vec<HTLCUpdate> {
125-
return self.simple_monitor.fetch_pending_htlc_updated();
124+
fn get_and_clear_pending_htlcs_updated(&self) -> Vec<HTLCUpdate> {
125+
return self.simple_monitor.get_and_clear_pending_htlcs_updated();
126126
}
127127
}
128128

lightning/src/ln/channelmanager.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2549,7 +2549,7 @@ impl<ChanSigner: ChannelKeys, M: Deref> events::MessageSendEventsProvider for Ch
25492549
// restart. This is doubly true for the fail/fulfill-backs from monitor events!
25502550
{
25512551
//TODO: This behavior should be documented.
2552-
for htlc_update in self.monitor.fetch_pending_htlc_updated() {
2552+
for htlc_update in self.monitor.get_and_clear_pending_htlcs_updated() {
25532553
if let Some(preimage) = htlc_update.payment_preimage {
25542554
log_trace!(self, "Claiming HTLC with preimage {} from our monitor", log_bytes!(preimage.0));
25552555
self.claim_funds_internal(self.channel_state.lock().unwrap(), htlc_update.source, preimage);
@@ -2574,7 +2574,7 @@ impl<ChanSigner: ChannelKeys, M: Deref> events::EventsProvider for ChannelManage
25742574
// restart. This is doubly true for the fail/fulfill-backs from monitor events!
25752575
{
25762576
//TODO: This behavior should be documented.
2577-
for htlc_update in self.monitor.fetch_pending_htlc_updated() {
2577+
for htlc_update in self.monitor.get_and_clear_pending_htlcs_updated() {
25782578
if let Some(preimage) = htlc_update.payment_preimage {
25792579
log_trace!(self, "Claiming HTLC with preimage {} from our monitor", log_bytes!(preimage.0));
25802580
self.claim_funds_internal(self.channel_state.lock().unwrap(), htlc_update.source, preimage);

lightning/src/ln/channelmonitor.rs

Lines changed: 90 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -123,8 +123,12 @@ pub trait ManyChannelMonitor<ChanSigner: ChannelKeys>: Send + Sync {
123123
fn add_update_monitor(&self, funding_txo: OutPoint, monitor: ChannelMonitor<ChanSigner>) -> Result<(), ChannelMonitorUpdateErr>;
124124

125125
/// Used by ChannelManager to get list of HTLC resolved onchain and which needed to be updated
126-
/// with success or failure backward
127-
fn fetch_pending_htlc_updated(&self) -> Vec<HTLCUpdate>;
126+
/// with success or failure.
127+
///
128+
/// You should probably just call through to
129+
/// ChannelMonitor::get_and_clear_pending_htlcs_updated() for each ChannelMonitor and return
130+
/// the full list.
131+
fn get_and_clear_pending_htlcs_updated(&self) -> Vec<HTLCUpdate>;
128132
}
129133

130134
/// A simple implementation of a ManyChannelMonitor and ChainListener. Can be used to create a
@@ -146,7 +150,6 @@ pub struct SimpleManyChannelMonitor<Key, ChanSigner: ChannelKeys> {
146150
chain_monitor: Arc<ChainWatchInterface>,
147151
broadcaster: Arc<BroadcasterInterface>,
148152
pending_events: Mutex<Vec<events::Event>>,
149-
pending_htlc_updated: Mutex<HashMap<PaymentHash, Vec<(HTLCSource, Option<PaymentPreimage>)>>>,
150153
logger: Arc<Logger>,
151154
fee_estimator: Arc<FeeEstimator>
152155
}
@@ -155,11 +158,10 @@ impl<'a, Key : Send + cmp::Eq + hash::Hash, ChanSigner: ChannelKeys> ChainListen
155158
fn block_connected(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], _indexes_of_txn_matched: &[u32]) {
156159
let block_hash = header.bitcoin_hash();
157160
let mut new_events: Vec<events::Event> = Vec::with_capacity(0);
158-
let mut htlc_updated_infos = Vec::new();
159161
{
160162
let mut monitors = self.monitors.lock().unwrap();
161163
for monitor in monitors.values_mut() {
162-
let (txn_outputs, spendable_outputs, mut htlc_updated) = monitor.block_connected(txn_matched, height, &block_hash, &*self.broadcaster, &*self.fee_estimator);
164+
let (txn_outputs, spendable_outputs) = monitor.block_connected(txn_matched, height, &block_hash, &*self.broadcaster, &*self.fee_estimator);
163165
if spendable_outputs.len() > 0 {
164166
new_events.push(events::Event::SpendableOutputs {
165167
outputs: spendable_outputs,
@@ -171,35 +173,6 @@ impl<'a, Key : Send + cmp::Eq + hash::Hash, ChanSigner: ChannelKeys> ChainListen
171173
self.chain_monitor.install_watch_outpoint((txid.clone(), idx as u32), &output.script_pubkey);
172174
}
173175
}
174-
htlc_updated_infos.append(&mut htlc_updated);
175-
}
176-
}
177-
{
178-
// ChannelManager will just need to fetch pending_htlc_updated and pass state backward
179-
let mut pending_htlc_updated = self.pending_htlc_updated.lock().unwrap();
180-
for htlc in htlc_updated_infos.drain(..) {
181-
match pending_htlc_updated.entry(htlc.2) {
182-
hash_map::Entry::Occupied(mut e) => {
183-
// In case of reorg we may have htlc outputs solved in a different way so
184-
// we prefer to keep claims but don't store duplicate updates for a given
185-
// (payment_hash, HTLCSource) pair.
186-
let mut existing_claim = false;
187-
e.get_mut().retain(|htlc_data| {
188-
if htlc.0 == htlc_data.0 {
189-
if htlc_data.1.is_some() {
190-
existing_claim = true;
191-
true
192-
} else { false }
193-
} else { true }
194-
});
195-
if !existing_claim {
196-
e.get_mut().push((htlc.0, htlc.1));
197-
}
198-
}
199-
hash_map::Entry::Vacant(e) => {
200-
e.insert(vec![(htlc.0, htlc.1)]);
201-
}
202-
}
203176
}
204177
}
205178
let mut pending_events = self.pending_events.lock().unwrap();
@@ -224,7 +197,6 @@ impl<Key : Send + cmp::Eq + hash::Hash + 'static, ChanSigner: ChannelKeys> Simpl
224197
chain_monitor,
225198
broadcaster,
226199
pending_events: Mutex::new(Vec::new()),
227-
pending_htlc_updated: Mutex::new(HashMap::new()),
228200
logger,
229201
fee_estimator: feeest,
230202
};
@@ -272,17 +244,10 @@ impl<ChanSigner: ChannelKeys> ManyChannelMonitor<ChanSigner> for SimpleManyChann
272244
}
273245
}
274246

275-
fn fetch_pending_htlc_updated(&self) -> Vec<HTLCUpdate> {
276-
let mut updated = self.pending_htlc_updated.lock().unwrap();
277-
let mut pending_htlcs_updated = Vec::with_capacity(updated.len());
278-
for (k, v) in updated.drain() {
279-
for htlc_data in v {
280-
pending_htlcs_updated.push(HTLCUpdate {
281-
payment_hash: k,
282-
payment_preimage: htlc_data.1,
283-
source: htlc_data.0,
284-
});
285-
}
247+
fn get_and_clear_pending_htlcs_updated(&self) -> Vec<HTLCUpdate> {
248+
let mut pending_htlcs_updated = Vec::new();
249+
for chan in self.monitors.lock().unwrap().values_mut() {
250+
pending_htlcs_updated.append(&mut chan.get_and_clear_pending_htlcs_updated());
286251
}
287252
pending_htlcs_updated
288253
}
@@ -628,6 +593,8 @@ pub struct ChannelMonitor<ChanSigner: ChannelKeys> {
628593

629594
payment_preimages: HashMap<PaymentHash, PaymentPreimage>,
630595

596+
pending_htlcs_updated: HashMap<PaymentHash, Vec<(HTLCSource, Option<PaymentPreimage>)>>,
597+
631598
destination_script: Script,
632599
// Thanks to data loss protection, we may be able to claim our non-htlc funds
633600
// back, this is the script we have to spend from but we need to
@@ -732,6 +699,7 @@ impl<ChanSigner: ChannelKeys> PartialEq for ChannelMonitor<ChanSigner> {
732699
self.current_remote_commitment_number != other.current_remote_commitment_number ||
733700
self.current_local_signed_commitment_tx != other.current_local_signed_commitment_tx ||
734701
self.payment_preimages != other.payment_preimages ||
702+
self.pending_htlcs_updated != other.pending_htlcs_updated ||
735703
self.destination_script != other.destination_script ||
736704
self.to_remote_rescue != other.to_remote_rescue ||
737705
self.pending_claim_requests != other.pending_claim_requests ||
@@ -919,6 +887,16 @@ impl<ChanSigner: ChannelKeys + Writeable> ChannelMonitor<ChanSigner> {
919887
writer.write_all(&payment_preimage.0[..])?;
920888
}
921889

890+
writer.write_all(&byte_utils::be64_to_array(self.pending_htlcs_updated.len() as u64))?;
891+
for (payment_hash, data) in self.pending_htlcs_updated.iter() {
892+
writer.write_all(&payment_hash.0[..])?;
893+
writer.write_all(&byte_utils::be64_to_array(data.len() as u64))?;
894+
for &(ref source, ref payment_preimage) in data.iter() {
895+
source.write(writer)?;
896+
write_option!(payment_preimage);
897+
}
898+
}
899+
922900
self.last_block_hash.write(writer)?;
923901
self.destination_script.write(writer)?;
924902
if let Some((ref to_remote_script, ref local_key)) = self.to_remote_rescue {
@@ -1028,6 +1006,8 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
10281006
current_remote_commitment_number: 1 << 48,
10291007

10301008
payment_preimages: HashMap::new(),
1009+
pending_htlcs_updated: HashMap::new(),
1010+
10311011
destination_script: destination_script,
10321012
to_remote_rescue: None,
10331013

@@ -1384,6 +1364,22 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
13841364
res
13851365
}
13861366

1367+
/// Get the list of HTLCs who's status has been updated on chain. This should be called by
1368+
/// ChannelManager via ManyChannelMonitor::get_and_clear_pending_htlcs_updated().
1369+
pub fn get_and_clear_pending_htlcs_updated(&mut self) -> Vec<HTLCUpdate> {
1370+
let mut pending_htlcs_updated = Vec::with_capacity(self.pending_htlcs_updated.len());
1371+
for (k, v) in self.pending_htlcs_updated.drain() {
1372+
for htlc_data in v {
1373+
pending_htlcs_updated.push(HTLCUpdate {
1374+
payment_hash: k,
1375+
payment_preimage: htlc_data.1,
1376+
source: htlc_data.0,
1377+
});
1378+
}
1379+
}
1380+
pending_htlcs_updated
1381+
}
1382+
13871383
/// Can only fail if idx is < get_min_seen_secret
13881384
pub(super) fn get_secret(&self, idx: u64) -> Option<[u8; 32]> {
13891385
for i in 0..self.old_secrets.len() {
@@ -2362,7 +2358,35 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
23622358
}
23632359
}
23642360

2365-
fn block_connected(&mut self, txn_matched: &[&Transaction], height: u32, block_hash: &Sha256dHash, broadcaster: &BroadcasterInterface, fee_estimator: &FeeEstimator)-> (Vec<(Sha256dHash, Vec<TxOut>)>, Vec<SpendableOutputDescriptor>, Vec<(HTLCSource, Option<PaymentPreimage>, PaymentHash)>) {
2361+
fn append_htlc_updated(&mut self, mut htlc_updated_infos: Vec<(HTLCSource, Option<PaymentPreimage>, PaymentHash)>) {
2362+
// ChannelManager will just need to fetch pending_htlcs_updated and pass state backward
2363+
for htlc in htlc_updated_infos.drain(..) {
2364+
match self.pending_htlcs_updated.entry(htlc.2) {
2365+
hash_map::Entry::Occupied(mut e) => {
2366+
// In case of reorg we may have htlc outputs solved in a different way so
2367+
// we prefer to keep claims but don't store duplicate updates for a given
2368+
// (payment_hash, HTLCSource) pair.
2369+
let mut existing_claim = false;
2370+
e.get_mut().retain(|htlc_data| {
2371+
if htlc.0 == htlc_data.0 {
2372+
if htlc_data.1.is_some() {
2373+
existing_claim = true;
2374+
true
2375+
} else { false }
2376+
} else { true }
2377+
});
2378+
if !existing_claim {
2379+
e.get_mut().push((htlc.0, htlc.1));
2380+
}
2381+
}
2382+
hash_map::Entry::Vacant(e) => {
2383+
e.insert(vec![(htlc.0, htlc.1)]);
2384+
}
2385+
}
2386+
}
2387+
}
2388+
2389+
fn block_connected(&mut self, txn_matched: &[&Transaction], height: u32, block_hash: &Sha256dHash, broadcaster: &BroadcasterInterface, fee_estimator: &FeeEstimator)-> (Vec<(Sha256dHash, Vec<TxOut>)>, Vec<SpendableOutputDescriptor>) {
23662390
for tx in txn_matched {
23672391
let mut output_val = 0;
23682392
for out in tx.output.iter() {
@@ -2375,7 +2399,6 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
23752399
log_trace!(self, "Block {} at height {} connected with {} txn matched", block_hash, height, txn_matched.len());
23762400
let mut watch_outputs = Vec::new();
23772401
let mut spendable_outputs = Vec::new();
2378-
let mut htlc_updated = Vec::new();
23792402
let mut bump_candidates = HashSet::new();
23802403
for tx in txn_matched {
23812404
if tx.input.len() == 1 {
@@ -2434,10 +2457,8 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
24342457
// While all commitment/HTLC-Success/HTLC-Timeout transactions have one input, HTLCs
24352458
// can also be resolved in a few other ways which can have more than one output. Thus,
24362459
// we call is_resolving_htlc_output here outside of the tx.input.len() == 1 check.
2437-
let mut updated = self.is_resolving_htlc_output(&tx, height);
2438-
if updated.len() > 0 {
2439-
htlc_updated.append(&mut updated);
2440-
}
2460+
let htlcs_updated = self.is_resolving_htlc_output(&tx, height);
2461+
self.append_htlc_updated(htlcs_updated);
24412462

24422463
// Scan all input to verify is one of the outpoint spent is of interest for us
24432464
let mut claimed_outputs_material = Vec::new();
@@ -2560,7 +2581,7 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
25602581
},
25612582
OnchainEvent::HTLCUpdate { htlc_update } => {
25622583
log_trace!(self, "HTLC {} failure update has got enough confirmations to be passed upstream", log_bytes!((htlc_update.1).0));
2563-
htlc_updated.push((htlc_update.0, None, htlc_update.1));
2584+
self.append_htlc_updated(vec![(htlc_update.0, None, htlc_update.1)]);
25642585
},
25652586
OnchainEvent::ContentiousOutpoint { outpoint, .. } => {
25662587
self.claimable_outpoints.remove(&outpoint);
@@ -2589,7 +2610,7 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
25892610
}
25902611
}
25912612
self.last_block_hash = block_hash.clone();
2592-
(watch_outputs, spendable_outputs, htlc_updated)
2613+
(watch_outputs, spendable_outputs)
25932614
}
25942615

25952616
fn block_disconnected(&mut self, height: u32, block_hash: &Sha256dHash, broadcaster: &BroadcasterInterface, fee_estimator: &FeeEstimator) {
@@ -3178,6 +3199,20 @@ impl<R: ::std::io::Read, ChanSigner: ChannelKeys + Readable<R>> ReadableArgs<R,
31783199
}
31793200
}
31803201

3202+
let pending_htlcs_updated_len: u64 = Readable::read(reader)?;
3203+
let mut pending_htlcs_updated = HashMap::with_capacity(cmp::min(pending_htlcs_updated_len as usize, MAX_ALLOC_SIZE / (32 + 8*3)));
3204+
for _ in 0..pending_htlcs_updated_len {
3205+
let payment_hash: PaymentHash = Readable::read(reader)?;
3206+
let htlcs_len: u64 = Readable::read(reader)?;
3207+
let mut htlcs = Vec::with_capacity(cmp::min(htlcs_len as usize, MAX_ALLOC_SIZE / 64));
3208+
for _ in 0..htlcs_len {
3209+
htlcs.push((Readable::read(reader)?, Readable::read(reader)?));
3210+
}
3211+
if let Some(_) = pending_htlcs_updated.insert(payment_hash, htlcs) {
3212+
return Err(DecodeError::InvalidValue);
3213+
}
3214+
}
3215+
31813216
let last_block_hash: Sha256dHash = Readable::read(reader)?;
31823217
let destination_script = Readable::read(reader)?;
31833218
let to_remote_rescue = match <u8 as Readable<R>>::read(reader)? {
@@ -3264,6 +3299,7 @@ impl<R: ::std::io::Read, ChanSigner: ChannelKeys + Readable<R>> ReadableArgs<R,
32643299
current_remote_commitment_number,
32653300

32663301
payment_preimages,
3302+
pending_htlcs_updated,
32673303

32683304
destination_script,
32693305
to_remote_rescue,

lightning/src/util/test_utils.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,8 @@ impl channelmonitor::ManyChannelMonitor<EnforcingChannelKeys> for TestChannelMon
7474
self.update_ret.lock().unwrap().clone()
7575
}
7676

77-
fn fetch_pending_htlc_updated(&self) -> Vec<HTLCUpdate> {
78-
return self.simple_monitor.fetch_pending_htlc_updated();
77+
fn get_and_clear_pending_htlcs_updated(&self) -> Vec<HTLCUpdate> {
78+
return self.simple_monitor.get_and_clear_pending_htlcs_updated();
7979
}
8080
}
8181

0 commit comments

Comments
 (0)