Skip to content

Commit bd39b20

Browse files
committed
Replace use of ChainWatchInterface with WatchEvent
SimpleManyChannelMonitor is parameterized by ChainWatchInterface to signal what transactions and outputs to watch for on chain. The interface has grown to cover chain access (via get_chain_utxo) and block block filtering (via filter_block and reentered), which has added complexity for implementations and user (see ChainWatchInterfaceUtil). Pull the watch functionality out as a first step to eliminating ChainWatchInterface entirely.
1 parent 1ab0a1a commit bd39b20

File tree

4 files changed

+158
-50
lines changed

4 files changed

+158
-50
lines changed

lightning/src/chain/mod.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,41 @@
99

1010
//! Structs and traits which allow other parts of rust-lightning to interact with the blockchain.
1111
12+
use bitcoin::blockdata::script::Script;
13+
use bitcoin::hash_types::Txid;
14+
15+
use chain::transaction::OutPoint;
16+
1217
pub mod chaininterface;
1318
pub mod transaction;
1419
pub mod keysinterface;
20+
21+
/// An interface for providing [`WatchEvent`]s.
22+
///
23+
/// [`WatchEvent`]: enum.WatchEvent.html
24+
pub trait WatchEventProvider {
25+
/// Releases events produced since the last call. Subsequent calls must only return new events.
26+
fn release_pending_watch_events(&self) -> Vec<WatchEvent>;
27+
}
28+
29+
/// An event indicating on-chain activity to watch for pertaining to a channel.
30+
pub enum WatchEvent {
31+
/// Watch for a transaction with `txid` and having an output with `script_pubkey` as a spending
32+
/// condition.
33+
WatchTransaction {
34+
/// Identifier of the transaction.
35+
txid: Txid,
36+
37+
/// Spending condition for an output of the transaction.
38+
script_pubkey: Script,
39+
},
40+
/// Watch for spends of a transaction output identified by `outpoint` having `script_pubkey` as
41+
/// the spending condition.
42+
WatchOutput {
43+
/// Identifier for the output.
44+
outpoint: OutPoint,
45+
46+
/// Spending condition for the output.
47+
script_pubkey: Script,
48+
}
49+
}

lightning/src/ln/channelmonitor.rs

Lines changed: 95 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -40,15 +40,16 @@ use ln::chan_utils;
4040
use ln::chan_utils::{CounterpartyCommitmentSecrets, HTLCOutputInCommitment, HolderCommitmentTransaction, HTLCType};
4141
use ln::channelmanager::{HTLCSource, PaymentPreimage, PaymentHash};
4242
use ln::onchaintx::{OnchainTxHandler, InputDescriptors};
43-
use chain::chaininterface::{ChainListener, ChainWatchInterface, BroadcasterInterface, FeeEstimator};
43+
use chain;
44+
use chain::chaininterface::{ChainListener, ChainWatchInterface, ChainWatchedUtil, BroadcasterInterface, FeeEstimator};
4445
use chain::transaction::OutPoint;
4546
use chain::keysinterface::{SpendableOutputDescriptor, ChannelKeys};
4647
use util::logger::Logger;
4748
use util::ser::{Readable, MaybeReadable, Writer, Writeable, U48};
4849
use util::{byte_utils, events};
4950
use util::events::Event;
5051

51-
use std::collections::{HashMap, hash_map};
52+
use std::collections::{HashMap, HashSet, hash_map};
5253
use std::sync::Mutex;
5354
use std::{hash,cmp, mem};
5455
use std::ops::Deref;
@@ -198,12 +199,74 @@ pub struct SimpleManyChannelMonitor<Key, ChanSigner: ChannelKeys, T: Deref, F: D
198199
{
199200
/// The monitors
200201
pub monitors: Mutex<HashMap<Key, ChannelMonitor<ChanSigner>>>,
202+
watch_events: Mutex<WatchEventQueue>,
201203
chain_monitor: C,
202204
broadcaster: T,
203205
logger: L,
204206
fee_estimator: F
205207
}
206208

209+
struct WatchEventQueue {
210+
watched: ChainWatchedUtil,
211+
events: Vec<chain::WatchEvent>,
212+
}
213+
214+
impl WatchEventQueue {
215+
fn new() -> Self {
216+
Self {
217+
watched: ChainWatchedUtil::new(),
218+
events: Vec::new(),
219+
}
220+
}
221+
222+
fn watch_tx(&mut self, txid: &Txid, script_pubkey: &Script) {
223+
if self.watched.register_tx(txid, script_pubkey) {
224+
self.events.push(chain::WatchEvent::WatchTransaction {
225+
txid: *txid,
226+
script_pubkey: script_pubkey.clone()
227+
});
228+
}
229+
}
230+
231+
fn watch_output(&mut self, outpoint: (&Txid, usize), script_pubkey: &Script) {
232+
let (txid, index) = outpoint;
233+
if self.watched.register_outpoint((*txid, index as u32), script_pubkey) {
234+
self.events.push(chain::WatchEvent::WatchOutput {
235+
outpoint: OutPoint {
236+
txid: *txid,
237+
index: index as u16,
238+
},
239+
script_pubkey: script_pubkey.clone(),
240+
});
241+
}
242+
}
243+
244+
fn dequeue_events(&mut self) -> Vec<chain::WatchEvent> {
245+
let mut pending_events = Vec::with_capacity(self.events.len());
246+
pending_events.append(&mut self.events);
247+
pending_events
248+
}
249+
250+
fn filter_block<'a>(&self, txdata: &[(usize, &'a Transaction)]) -> Vec<(usize, &'a Transaction)> {
251+
let mut matched_txids = HashSet::new();
252+
txdata.iter().filter(|&&(_, tx)| {
253+
// A tx matches the filter if it either matches the filter directly (via does_match_tx)
254+
// or if it is a descendant of another matched transaction within the same block.
255+
let mut matched = self.watched.does_match_tx(tx);
256+
for input in tx.input.iter() {
257+
if matched || matched_txids.contains(&input.previous_output.txid) {
258+
matched = true;
259+
break;
260+
}
261+
}
262+
if matched {
263+
matched_txids.insert(tx.txid());
264+
}
265+
matched
266+
}).map(|e| *e).collect()
267+
}
268+
}
269+
207270
impl<Key : Send + cmp::Eq + hash::Hash, ChanSigner: ChannelKeys, T: Deref + Sync + Send, F: Deref + Sync + Send, L: Deref + Sync + Send, C: Deref + Sync + Send>
208271
ChainListener for SimpleManyChannelMonitor<Key, ChanSigner, T, F, L, C>
209272
where T::Target: BroadcasterInterface,
@@ -212,24 +275,19 @@ impl<Key : Send + cmp::Eq + hash::Hash, ChanSigner: ChannelKeys, T: Deref + Sync
212275
C::Target: ChainWatchInterface,
213276
{
214277
fn block_connected(&self, header: &BlockHeader, txdata: &[(usize, &Transaction)], height: u32) {
215-
let mut reentered = true;
216-
while reentered {
217-
let matched_indexes = self.chain_monitor.filter_block(header, txdata);
218-
let matched_txn: Vec<_> = matched_indexes.iter().map(|index| txdata[*index]).collect();
219-
let last_seen = self.chain_monitor.reentered();
220-
{
221-
let mut monitors = self.monitors.lock().unwrap();
222-
for monitor in monitors.values_mut() {
223-
let txn_outputs = monitor.block_connected(header, &matched_txn, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger);
224-
225-
for (ref txid, ref outputs) in txn_outputs {
226-
for (idx, output) in outputs.iter().enumerate() {
227-
self.chain_monitor.install_watch_outpoint((txid.clone(), idx as u32), &output.script_pubkey);
228-
}
278+
let mut watch_events = self.watch_events.lock().unwrap();
279+
let matched_txn = watch_events.filter_block(txdata);
280+
{
281+
let mut monitors = self.monitors.lock().unwrap();
282+
for monitor in monitors.values_mut() {
283+
let txn_outputs = monitor.block_connected(header, &matched_txn, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger);
284+
285+
for (ref txid, ref outputs) in txn_outputs {
286+
for (idx, output) in outputs.iter().enumerate() {
287+
watch_events.watch_output((txid, idx), &output.script_pubkey);
229288
}
230289
}
231290
}
232-
reentered = last_seen != self.chain_monitor.reentered();
233291
}
234292
}
235293

@@ -252,6 +310,7 @@ impl<Key : Send + cmp::Eq + hash::Hash + 'static, ChanSigner: ChannelKeys, T: De
252310
pub fn new(chain_monitor: C, broadcaster: T, logger: L, feeest: F) -> SimpleManyChannelMonitor<Key, ChanSigner, T, F, L, C> {
253311
let res = SimpleManyChannelMonitor {
254312
monitors: Mutex::new(HashMap::new()),
313+
watch_events: Mutex::new(WatchEventQueue::new()),
255314
chain_monitor,
256315
broadcaster,
257316
logger,
@@ -263,6 +322,7 @@ impl<Key : Send + cmp::Eq + hash::Hash + 'static, ChanSigner: ChannelKeys, T: De
263322

264323
/// Adds or updates the monitor which monitors the channel referred to by the given key.
265324
pub fn add_monitor_by_key(&self, key: Key, monitor: ChannelMonitor<ChanSigner>) -> Result<(), MonitorUpdateError> {
325+
let mut watch_events = self.watch_events.lock().unwrap();
266326
let mut monitors = self.monitors.lock().unwrap();
267327
let entry = match monitors.entry(key) {
268328
hash_map::Entry::Occupied(_) => return Err(MonitorUpdateError("Channel monitor for given key is already present")),
@@ -271,11 +331,11 @@ impl<Key : Send + cmp::Eq + hash::Hash + 'static, ChanSigner: ChannelKeys, T: De
271331
{
272332
let funding_txo = monitor.get_funding_txo();
273333
log_trace!(self.logger, "Got new Channel Monitor for channel {}", log_bytes!(funding_txo.0.to_channel_id()[..]));
274-
self.chain_monitor.install_watch_tx(&funding_txo.0.txid, &funding_txo.1);
275-
self.chain_monitor.install_watch_outpoint((funding_txo.0.txid, funding_txo.0.index as u32), &funding_txo.1);
334+
watch_events.watch_tx(&funding_txo.0.txid, &funding_txo.1);
335+
watch_events.watch_output((&funding_txo.0.txid, funding_txo.0.index as usize), &funding_txo.1);
276336
for (txid, outputs) in monitor.get_outputs_to_watch().iter() {
277337
for (idx, script) in outputs.iter().enumerate() {
278-
self.chain_monitor.install_watch_outpoint((*txid, idx as u32), script);
338+
watch_events.watch_output((txid, idx), script);
279339
}
280340
}
281341
}
@@ -342,6 +402,17 @@ impl<Key : Send + cmp::Eq + hash::Hash, ChanSigner: ChannelKeys, T: Deref, F: De
342402
}
343403
}
344404

405+
impl<Key : Send + cmp::Eq + hash::Hash, ChanSigner: ChannelKeys, T: Deref, F: Deref, L: Deref, C: Deref> chain::WatchEventProvider for SimpleManyChannelMonitor<Key, ChanSigner, T, F, L, C>
406+
where T::Target: BroadcasterInterface,
407+
F::Target: FeeEstimator,
408+
L::Target: Logger,
409+
C::Target: ChainWatchInterface,
410+
{
411+
fn release_pending_watch_events(&self) -> Vec<chain::WatchEvent> {
412+
self.watch_events.lock().unwrap().dequeue_events()
413+
}
414+
}
415+
345416
/// If an HTLC expires within this many blocks, don't try to claim it in a shared transaction,
346417
/// instead claiming it in its own individual transaction.
347418
pub(crate) const CLTV_SHARED_CLAIM_BUFFER: u32 = 12;
@@ -881,30 +952,14 @@ pub trait ManyChannelMonitor: Send + Sync {
881952

882953
/// Adds a monitor for the given `funding_txo`.
883954
///
884-
/// Implementer must also ensure that the funding_txo txid *and* outpoint are registered with
885-
/// any relevant ChainWatchInterfaces such that the provided monitor receives block_connected
886-
/// callbacks with the funding transaction, or any spends of it.
887-
///
888-
/// Further, the implementer must also ensure that each output returned in
889-
/// monitor.get_outputs_to_watch() is registered to ensure that the provided monitor learns about
890-
/// any spends of any of the outputs.
891-
///
892-
/// Any spends of outputs which should have been registered which aren't passed to
893-
/// ChannelMonitors via block_connected may result in FUNDS LOSS.
955+
/// Implementations must ensure that `monitor` receives block_connected calls for blocks with
956+
/// the funding transaction or any spends of it, as well as any spends of outputs returned by
957+
/// get_outputs_to_watch. Not doing so may result in LOST FUNDS.
894958
fn add_monitor(&self, funding_txo: OutPoint, monitor: ChannelMonitor<Self::Keys>) -> Result<(), ChannelMonitorUpdateErr>;
895959

896960
/// Updates a monitor for the given `funding_txo`.
897961
///
898-
/// Implementer must also ensure that the funding_txo txid *and* outpoint are registered with
899-
/// any relevant ChainWatchInterfaces such that the provided monitor receives block_connected
900-
/// callbacks with the funding transaction, or any spends of it.
901-
///
902-
/// Further, the implementer must also ensure that each output returned in
903-
/// monitor.get_watch_outputs() is registered to ensure that the provided monitor learns about
904-
/// any spends of any of the outputs.
905-
///
906-
/// Any spends of outputs which should have been registered which aren't passed to
907-
/// ChannelMonitors via block_connected may result in FUNDS LOSS.
962+
/// TODO(jkczyz): Determine where this should go from e73036c6845fd3cc16479a1b497db82a5ebb3897.
908963
///
909964
/// In case of distributed watchtowers deployment, even if an Err is return, the new version
910965
/// must be written to disk, as state may have been stored but rejected due to a block forcing

lightning/src/ln/functional_test_utils.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
//! A bunch of useful utilities for building networks of nodes and exchanging messages between
1111
//! nodes for functional tests.
1212
13+
use chain;
1314
use chain::chaininterface;
1415
use chain::transaction::OutPoint;
1516
use ln::channelmanager::{ChannelManager, ChannelManagerReadArgs, RAACommitmentOrder, PaymentPreimage, PaymentHash, PaymentSecret, PaymentSendFailure};
@@ -80,9 +81,29 @@ pub fn connect_blocks<'a, 'b, 'c, 'd>(node: &'a Node<'b, 'c, 'd>, depth: u32, he
8081
}
8182

8283
pub fn connect_block<'a, 'b, 'c, 'd>(node: &'a Node<'b, 'c, 'd>, block: &Block, height: u32) {
83-
node.block_notifier.block_connected(block, height)
84+
use chain::WatchEventProvider;
85+
use chain::chaininterface::ChainListener;
86+
87+
let watch_events = node.chan_monitor.simple_monitor.release_pending_watch_events();
88+
process_chain_watch_events(&watch_events);
89+
90+
let txdata: Vec<_> = block.txdata.iter().enumerate().collect();
91+
loop {
92+
node.chan_monitor.simple_monitor.block_connected(&block.header, &txdata, height);
93+
94+
let watch_events = node.chan_monitor.simple_monitor.release_pending_watch_events();
95+
process_chain_watch_events(&watch_events);
96+
97+
if watch_events.is_empty() {
98+
break;
99+
}
100+
}
101+
102+
node.node.block_connected(&block.header, &txdata, height);
84103
}
85104

105+
fn process_chain_watch_events(_events: &Vec<chain::WatchEvent>) {}
106+
86107
pub fn disconnect_block<'a, 'b, 'c, 'd>(node: &'a Node<'b, 'c, 'd>, header: &BlockHeader, height: u32) {
87108
node.block_notifier.block_disconnected(header, height)
88109
}

lightning/src/ln/functional_tests.rs

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -417,9 +417,6 @@ fn test_1_conf_open() {
417417
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
418418

419419
let tx = create_chan_between_nodes_with_value_init(&nodes[0], &nodes[1], 100000, 10001, InitFeatures::known(), InitFeatures::known());
420-
assert!(nodes[0].chain_monitor.does_match_tx(&tx));
421-
assert!(nodes[1].chain_monitor.does_match_tx(&tx));
422-
423420
let block = Block {
424421
header: BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 },
425422
txdata: vec![tx],
@@ -2785,8 +2782,8 @@ fn claim_htlc_outputs_single_tx() {
27852782

27862783
#[test]
27872784
fn test_htlc_on_chain_success() {
2788-
// Test that in case of a unilateral close onchain, we detect the state of output thanks to
2789-
// ChainWatchInterface and pass the preimage backward accordingly. So here we test that ChannelManager is
2785+
// Test that in case of a unilateral close onchain, we detect the state of output and pass
2786+
// the preimage backward accordingly. So here we test that ChannelManager is
27902787
// broadcasting the right event to other nodes in payment path.
27912788
// We test with two HTLCs simultaneously as that was not handled correctly in the past.
27922789
// A --------------------> B ----------------------> C (preimage)
@@ -2964,8 +2961,8 @@ fn test_htlc_on_chain_success() {
29642961

29652962
#[test]
29662963
fn test_htlc_on_chain_timeout() {
2967-
// Test that in case of a unilateral close onchain, we detect the state of output thanks to
2968-
// ChainWatchInterface and timeout the HTLC backward accordingly. So here we test that ChannelManager is
2964+
// Test that in case of a unilateral close onchain, we detect the state of output and
2965+
// timeout the HTLC backward accordingly. So here we test that ChannelManager is
29692966
// broadcasting the right event to other nodes in payment path.
29702967
// A ------------------> B ----------------------> C (timeout)
29712968
// B's commitment tx C's commitment tx
@@ -5177,8 +5174,8 @@ fn test_static_spendable_outputs_justice_tx_revoked_htlc_success_tx() {
51775174

51785175
#[test]
51795176
fn test_onchain_to_onchain_claim() {
5180-
// Test that in case of channel closure, we detect the state of output thanks to
5181-
// ChainWatchInterface and claim HTLC on downstream peer's remote commitment tx.
5177+
// Test that in case of channel closure, we detect the state of output and claim HTLC
5178+
// on downstream peer's remote commitment tx.
51825179
// First, have C claim an HTLC against its own latest commitment transaction.
51835180
// Then, broadcast these to B, which should update the monitor downstream on the A<->B
51845181
// channel.

0 commit comments

Comments
 (0)