Skip to content

Commit ff12ea9

Browse files
committed
Move pending-HTLC-updated ChannelMonitor from ManyChannelMonitor
This is important for a number of reasons: * Firstly, I hit this trying to implement rescan in the demo bitcoinrpc client - if individual ChannelMonitors are out of sync with each other, we cannot add them all into a ManyChannelMonitor together and then rescan, but need to rescan them individually without having to do a bunch of manual work. Of the three return values in ChannelMonitor::block_connected, only the HTLCsource stuff that is moved here makes no sense to be exposed to the user. * Secondly, the logic currently in ManyChannelMonitor cannot be reproduced by the user! HTLCSource is deliberately an opaque type but we use its data to decide which things to keep when inserting into the HashMap. This would prevent a user from properly implementing a replacement ManyChannelMonitor, which is unacceptable. * Finally, by moving the tracking into ChannelMonitor, we can serialize them out, which prevents us from forgetting them when loading from disk, though there are still other races which need to be handled to make this fully safe (see TODOs in ChannelManager). This is safe as no two entries can have the same HTLCSource across different channels (or, if they did, it would be a rather serious bug), though note that, IIRC, when this code was added, the HTLCSource field in the values was not present. We also take this opportunity to rename the fetch function to match our other event interfaces, makaing it clear that by calling the function the set of HTLCUpdates will also be cleared.
1 parent f70058e commit ff12ea9

File tree

3 files changed

+94
-58
lines changed

3 files changed

+94
-58
lines changed

lightning/src/ln/channelmanager.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2514,7 +2514,7 @@ impl<ChanSigner: ChannelKeys, M: Deref> events::MessageSendEventsProvider for Ch
25142514
// restart. This is doubly true for the fail/fulfill-backs from monitor events!
25152515
{
25162516
//TODO: This behavior should be documented.
2517-
for htlc_update in self.monitor.fetch_pending_htlc_updated() {
2517+
for htlc_update in self.monitor.get_and_clear_pending_htlcs_updated() {
25182518
if let Some(preimage) = htlc_update.payment_preimage {
25192519
log_trace!(self, "Claiming HTLC with preimage {} from our monitor", log_bytes!(preimage.0));
25202520
self.claim_funds_internal(self.channel_state.lock().unwrap(), htlc_update.source, preimage);
@@ -2539,7 +2539,7 @@ impl<ChanSigner: ChannelKeys, M: Deref> events::EventsProvider for ChannelManage
25392539
// restart. This is doubly true for the fail/fulfill-backs from monitor events!
25402540
{
25412541
//TODO: This behavior should be documented.
2542-
for htlc_update in self.monitor.fetch_pending_htlc_updated() {
2542+
for htlc_update in self.monitor.get_and_clear_pending_htlcs_updated() {
25432543
if let Some(preimage) = htlc_update.payment_preimage {
25442544
log_trace!(self, "Claiming HTLC with preimage {} from our monitor", log_bytes!(preimage.0));
25452545
self.claim_funds_internal(self.channel_state.lock().unwrap(), htlc_update.source, preimage);

lightning/src/ln/channelmonitor.rs

Lines changed: 90 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -123,8 +123,12 @@ pub trait ManyChannelMonitor: Send + Sync {
123123
fn add_update_monitor(&self, funding_txo: OutPoint, monitor: ChannelMonitor) -> Result<(), ChannelMonitorUpdateErr>;
124124

125125
/// Used by ChannelManager to get list of HTLC resolved onchain and which needed to be updated
126-
/// with success or failure backward
127-
fn fetch_pending_htlc_updated(&self) -> Vec<HTLCUpdate>;
126+
/// with success or failure.
127+
///
128+
/// You should probably just call through to
129+
/// ChannelMonitor::get_and_clear_pending_htlcs_updated() for each ChannelMonitor and return
130+
/// the full list.
131+
fn get_and_clear_pending_htlcs_updated(&self) -> Vec<HTLCUpdate>;
128132
}
129133

130134
/// A simple implementation of a ManyChannelMonitor and ChainListener. Can be used to create a
@@ -146,7 +150,6 @@ pub struct SimpleManyChannelMonitor<Key> {
146150
chain_monitor: Arc<ChainWatchInterface>,
147151
broadcaster: Arc<BroadcasterInterface>,
148152
pending_events: Mutex<Vec<events::Event>>,
149-
pending_htlc_updated: Mutex<HashMap<PaymentHash, Vec<(HTLCSource, Option<PaymentPreimage>)>>>,
150153
logger: Arc<Logger>,
151154
fee_estimator: Arc<FeeEstimator>
152155
}
@@ -155,11 +158,10 @@ impl<'a, Key : Send + cmp::Eq + hash::Hash> ChainListener for SimpleManyChannelM
155158
fn block_connected(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], _indexes_of_txn_matched: &[u32]) {
156159
let block_hash = header.bitcoin_hash();
157160
let mut new_events: Vec<events::Event> = Vec::with_capacity(0);
158-
let mut htlc_updated_infos = Vec::new();
159161
{
160162
let mut monitors = self.monitors.lock().unwrap();
161163
for monitor in monitors.values_mut() {
162-
let (txn_outputs, spendable_outputs, mut htlc_updated) = monitor.block_connected(txn_matched, height, &block_hash, &*self.broadcaster, &*self.fee_estimator);
164+
let (txn_outputs, spendable_outputs) = monitor.block_connected(txn_matched, height, &block_hash, &*self.broadcaster, &*self.fee_estimator);
163165
if spendable_outputs.len() > 0 {
164166
new_events.push(events::Event::SpendableOutputs {
165167
outputs: spendable_outputs,
@@ -171,35 +173,6 @@ impl<'a, Key : Send + cmp::Eq + hash::Hash> ChainListener for SimpleManyChannelM
171173
self.chain_monitor.install_watch_outpoint((txid.clone(), idx as u32), &output.script_pubkey);
172174
}
173175
}
174-
htlc_updated_infos.append(&mut htlc_updated);
175-
}
176-
}
177-
{
178-
// ChannelManager will just need to fetch pending_htlc_updated and pass state backward
179-
let mut pending_htlc_updated = self.pending_htlc_updated.lock().unwrap();
180-
for htlc in htlc_updated_infos.drain(..) {
181-
match pending_htlc_updated.entry(htlc.2) {
182-
hash_map::Entry::Occupied(mut e) => {
183-
// In case of reorg we may have htlc outputs solved in a different way so
184-
// we prefer to keep claims but don't store duplicate updates for a given
185-
// (payment_hash, HTLCSource) pair.
186-
let mut existing_claim = false;
187-
e.get_mut().retain(|htlc_data| {
188-
if htlc.0 == htlc_data.0 {
189-
if htlc_data.1.is_some() {
190-
existing_claim = true;
191-
true
192-
} else { false }
193-
} else { true }
194-
});
195-
if !existing_claim {
196-
e.get_mut().push((htlc.0, htlc.1));
197-
}
198-
}
199-
hash_map::Entry::Vacant(e) => {
200-
e.insert(vec![(htlc.0, htlc.1)]);
201-
}
202-
}
203176
}
204177
}
205178
let mut pending_events = self.pending_events.lock().unwrap();
@@ -224,7 +197,6 @@ impl<Key : Send + cmp::Eq + hash::Hash + 'static> SimpleManyChannelMonitor<Key>
224197
chain_monitor,
225198
broadcaster,
226199
pending_events: Mutex::new(Vec::new()),
227-
pending_htlc_updated: Mutex::new(HashMap::new()),
228200
logger,
229201
fee_estimator: feeest,
230202
};
@@ -272,17 +244,10 @@ impl ManyChannelMonitor for SimpleManyChannelMonitor<OutPoint> {
272244
}
273245
}
274246

275-
fn fetch_pending_htlc_updated(&self) -> Vec<HTLCUpdate> {
276-
let mut updated = self.pending_htlc_updated.lock().unwrap();
277-
let mut pending_htlcs_updated = Vec::with_capacity(updated.len());
278-
for (k, v) in updated.drain() {
279-
for htlc_data in v {
280-
pending_htlcs_updated.push(HTLCUpdate {
281-
payment_hash: k,
282-
payment_preimage: htlc_data.1,
283-
source: htlc_data.0,
284-
});
285-
}
247+
fn get_and_clear_pending_htlcs_updated(&self) -> Vec<HTLCUpdate> {
248+
let mut pending_htlcs_updated = Vec::new();
249+
for chan in self.monitors.lock().unwrap().values_mut() {
250+
pending_htlcs_updated.append(&mut chan.get_and_clear_pending_htlcs_updated());
286251
}
287252
pending_htlcs_updated
288253
}
@@ -604,6 +569,8 @@ pub struct ChannelMonitor {
604569

605570
payment_preimages: HashMap<PaymentHash, PaymentPreimage>,
606571

572+
pending_htlcs_updated: HashMap<PaymentHash, Vec<(HTLCSource, Option<PaymentPreimage>)>>,
573+
607574
destination_script: Script,
608575
// Thanks to data loss protection, we may be able to claim our non-htlc funds
609576
// back, this is the script we have to spend from but we need to
@@ -708,6 +675,7 @@ impl PartialEq for ChannelMonitor {
708675
self.current_remote_commitment_number != other.current_remote_commitment_number ||
709676
self.current_local_signed_commitment_tx != other.current_local_signed_commitment_tx ||
710677
self.payment_preimages != other.payment_preimages ||
678+
self.pending_htlcs_updated != other.pending_htlcs_updated ||
711679
self.destination_script != other.destination_script ||
712680
self.to_remote_rescue != other.to_remote_rescue ||
713681
self.pending_claim_requests != other.pending_claim_requests ||
@@ -761,6 +729,8 @@ impl ChannelMonitor {
761729
current_remote_commitment_number: 1 << 48,
762730

763731
payment_preimages: HashMap::new(),
732+
pending_htlcs_updated: HashMap::new(),
733+
764734
destination_script: destination_script,
765735
to_remote_rescue: None,
766736

@@ -1117,6 +1087,22 @@ impl ChannelMonitor {
11171087
res
11181088
}
11191089

1090+
/// Get the list of HTLCs who's status has been updated on chain. This should be called by
1091+
/// ChannelManager via ManyChannelMonitor::fetch_pending_htlcs_updated().
1092+
pub fn get_and_clear_pending_htlcs_updated(&mut self) -> Vec<HTLCUpdate> {
1093+
let mut pending_htlcs_updated = Vec::with_capacity(self.pending_htlcs_updated.len());
1094+
for (k, v) in self.pending_htlcs_updated.drain() {
1095+
for htlc_data in v {
1096+
pending_htlcs_updated.push(HTLCUpdate {
1097+
payment_hash: k,
1098+
payment_preimage: htlc_data.1,
1099+
source: htlc_data.0,
1100+
});
1101+
}
1102+
}
1103+
pending_htlcs_updated
1104+
}
1105+
11201106
/// Serializes into a vec, with various modes for the exposed pub fns
11211107
fn write<W: Writer>(&self, writer: &mut W, for_local_storage: bool) -> Result<(), ::std::io::Error> {
11221108
//TODO: We still write out all the serialization here manually instead of using the fancy
@@ -1284,6 +1270,16 @@ impl ChannelMonitor {
12841270
writer.write_all(&payment_preimage.0[..])?;
12851271
}
12861272

1273+
writer.write_all(&byte_utils::be64_to_array(self.pending_htlcs_updated.len() as u64))?;
1274+
for (payment_hash, data) in self.pending_htlcs_updated.iter() {
1275+
writer.write_all(&payment_hash.0[..])?;
1276+
writer.write_all(&byte_utils::be64_to_array(data.len() as u64))?;
1277+
for (source, payment_preimage) in data.iter() {
1278+
source.write(writer)?;
1279+
write_option!(payment_preimage);
1280+
}
1281+
}
1282+
12871283
self.last_block_hash.write(writer)?;
12881284
self.destination_script.write(writer)?;
12891285
if let Some((ref to_remote_script, ref local_key)) = self.to_remote_rescue {
@@ -2334,11 +2330,38 @@ impl ChannelMonitor {
23342330
}
23352331
}
23362332

2337-
fn block_connected(&mut self, txn_matched: &[&Transaction], height: u32, block_hash: &Sha256dHash, broadcaster: &BroadcasterInterface, fee_estimator: &FeeEstimator)-> (Vec<(Sha256dHash, Vec<TxOut>)>, Vec<SpendableOutputDescriptor>, Vec<(HTLCSource, Option<PaymentPreimage>, PaymentHash)>) {
2333+
fn append_htlc_updated(&mut self, mut htlc_updated_infos: Vec<(HTLCSource, Option<PaymentPreimage>, PaymentHash)>) {
2334+
// ChannelManager will just need to fetch pending_htlcs_updated and pass state backward
2335+
for htlc in htlc_updated_infos.drain(..) {
2336+
match self.pending_htlcs_updated.entry(htlc.2) {
2337+
hash_map::Entry::Occupied(mut e) => {
2338+
// In case of reorg we may have htlc outputs solved in a different way so
2339+
// we prefer to keep claims but don't store duplicate updates for a given
2340+
// (payment_hash, HTLCSource) pair.
2341+
let mut existing_claim = false;
2342+
e.get_mut().retain(|htlc_data| {
2343+
if htlc.0 == htlc_data.0 {
2344+
if htlc_data.1.is_some() {
2345+
existing_claim = true;
2346+
true
2347+
} else { false }
2348+
} else { true }
2349+
});
2350+
if !existing_claim {
2351+
e.get_mut().push((htlc.0, htlc.1));
2352+
}
2353+
}
2354+
hash_map::Entry::Vacant(e) => {
2355+
e.insert(vec![(htlc.0, htlc.1)]);
2356+
}
2357+
}
2358+
}
2359+
}
2360+
2361+
fn block_connected(&mut self, txn_matched: &[&Transaction], height: u32, block_hash: &Sha256dHash, broadcaster: &BroadcasterInterface, fee_estimator: &FeeEstimator)-> (Vec<(Sha256dHash, Vec<TxOut>)>, Vec<SpendableOutputDescriptor>) {
23382362
log_trace!(self, "Block {} at height {} connected with {} txn matched", block_hash, height, txn_matched.len());
23392363
let mut watch_outputs = Vec::new();
23402364
let mut spendable_outputs = Vec::new();
2341-
let mut htlc_updated = Vec::new();
23422365
let mut bump_candidates = HashSet::new();
23432366
for tx in txn_matched {
23442367
if tx.input.len() == 1 {
@@ -2397,10 +2420,8 @@ impl ChannelMonitor {
23972420
// While all commitment/HTLC-Success/HTLC-Timeout transactions have one input, HTLCs
23982421
// can also be resolved in a few other ways which can have more than one output. Thus,
23992422
// we call is_resolving_htlc_output here outside of the tx.input.len() == 1 check.
2400-
let mut updated = self.is_resolving_htlc_output(&tx, height);
2401-
if updated.len() > 0 {
2402-
htlc_updated.append(&mut updated);
2403-
}
2423+
let htlcs_updated = self.is_resolving_htlc_output(&tx, height);
2424+
self.append_htlc_updated(htlcs_updated);
24042425

24052426
// Scan all input to verify is one of the outpoint spent is of interest for us
24062427
let mut claimed_outputs_material = Vec::new();
@@ -2523,7 +2544,7 @@ impl ChannelMonitor {
25232544
},
25242545
OnchainEvent::HTLCUpdate { htlc_update } => {
25252546
log_trace!(self, "HTLC {} failure update has got enough confirmations to be passed upstream", log_bytes!((htlc_update.1).0));
2526-
htlc_updated.push((htlc_update.0, None, htlc_update.1));
2547+
self.append_htlc_updated(vec![(htlc_update.0, None, htlc_update.1)]);
25272548
},
25282549
OnchainEvent::ContentiousOutpoint { outpoint, .. } => {
25292550
self.claimable_outpoints.remove(&outpoint);
@@ -2552,7 +2573,7 @@ impl ChannelMonitor {
25522573
}
25532574
}
25542575
self.last_block_hash = block_hash.clone();
2555-
(watch_outputs, spendable_outputs, htlc_updated)
2576+
(watch_outputs, spendable_outputs)
25562577
}
25572578

25582579
fn block_disconnected(&mut self, height: u32, block_hash: &Sha256dHash, broadcaster: &BroadcasterInterface, fee_estimator: &FeeEstimator) {
@@ -3140,6 +3161,20 @@ impl<R: ::std::io::Read> ReadableArgs<R, Arc<Logger>> for (Sha256dHash, ChannelM
31403161
}
31413162
}
31423163

3164+
let pending_htlcs_updated_len: u64 = Readable::read(reader)?;
3165+
let mut pending_htlcs_updated = HashMap::with_capacity(cmp::min(pending_htlcs_updated_len as usize, MAX_ALLOC_SIZE / (32 + 8*3)));
3166+
for _ in 0..pending_htlcs_updated_len {
3167+
let payment_hash: PaymentHash = Readable::read(reader)?;
3168+
let htlcs_len: u64 = Readable::read(reader)?;
3169+
let mut htlcs = Vec::with_capacity(cmp::min(htlcs_len as usize, MAX_ALLOC_SIZE / 64));
3170+
for _ in 0..htlcs_len {
3171+
htlcs.push((Readable::read(reader)?, Readable::read(reader)?));
3172+
}
3173+
if let Some(_) = pending_htlcs_updated.insert(payment_hash, htlcs) {
3174+
return Err(DecodeError::InvalidValue);
3175+
}
3176+
}
3177+
31433178
let last_block_hash: Sha256dHash = Readable::read(reader)?;
31443179
let destination_script = Readable::read(reader)?;
31453180
let to_remote_rescue = match <u8 as Readable<R>>::read(reader)? {
@@ -3226,6 +3261,7 @@ impl<R: ::std::io::Read> ReadableArgs<R, Arc<Logger>> for (Sha256dHash, ChannelM
32263261
current_remote_commitment_number,
32273262

32283263
payment_preimages,
3264+
pending_htlcs_updated,
32293265

32303266
destination_script,
32313267
to_remote_rescue,

lightning/src/util/test_utils.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,8 @@ impl channelmonitor::ManyChannelMonitor for TestChannelMonitor {
7373
self.update_ret.lock().unwrap().clone()
7474
}
7575

76-
fn fetch_pending_htlc_updated(&self) -> Vec<HTLCUpdate> {
77-
return self.simple_monitor.fetch_pending_htlc_updated();
76+
fn get_and_clear_pending_htlcs_updated(&self) -> Vec<HTLCUpdate> {
77+
return self.simple_monitor.get_and_clear_pending_htlcs_updated();
7878
}
7979
}
8080

0 commit comments

Comments
 (0)