Skip to content

Commit 65f7c9f

Browse files
authored
Merge pull request #389 from valentinewallace/split-chain-watch-interface
remove circular references in channelmanager and channelmonitor
2 parents 8d2c4e7 + f715183 commit 65f7c9f

9 files changed

+287
-231
lines changed

lightning/fuzz/fuzz_targets/chanmon_fail_consistency.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ pub fn do_test(data: &[u8]) {
193193
config.channel_options.fee_proportional_millionths = 0;
194194
config.channel_options.announced_channel = true;
195195
config.peer_channel_config_limits.min_dust_limit_satoshis = 0;
196-
(ChannelManager::new(Network::Bitcoin, fee_est.clone(), monitor.clone(), watch.clone(), broadcast.clone(), Arc::clone(&logger), keys_manager.clone(), config, 0).unwrap(),
196+
(ChannelManager::new(Network::Bitcoin, fee_est.clone(), monitor.clone(), broadcast.clone(), Arc::clone(&logger), keys_manager.clone(), config, 0).unwrap(),
197197
monitor)
198198
} }
199199
}
@@ -225,7 +225,6 @@ pub fn do_test(data: &[u8]) {
225225
keys_manager,
226226
fee_estimator: fee_est.clone(),
227227
monitor: monitor.clone(),
228-
chain_monitor: watch,
229228
tx_broadcaster: broadcast.clone(),
230229
logger,
231230
default_config: config,
@@ -246,7 +245,6 @@ pub fn do_test(data: &[u8]) {
246245
} }
247246
}
248247

249-
250248
let mut channel_txn = Vec::new();
251249
macro_rules! make_channel {
252250
($source: expr, $dest: expr, $chan_id: expr) => { {

lightning/fuzz/fuzz_targets/full_stack_target.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -144,8 +144,8 @@ impl<'a> Hash for Peer<'a> {
144144
}
145145
}
146146

147-
struct MoneyLossDetector<'a> {
148-
manager: Arc<ChannelManager>,
147+
struct MoneyLossDetector<'a, 'b> {
148+
manager: Arc<ChannelManager<'b>>,
149149
monitor: Arc<channelmonitor::SimpleManyChannelMonitor<OutPoint>>,
150150
handler: PeerManager<Peer<'a>>,
151151

@@ -157,8 +157,8 @@ struct MoneyLossDetector<'a> {
157157
max_height: usize,
158158
blocks_connected: u32,
159159
}
160-
impl<'a> MoneyLossDetector<'a> {
161-
pub fn new(peers: &'a RefCell<[bool; 256]>, manager: Arc<ChannelManager>, monitor: Arc<channelmonitor::SimpleManyChannelMonitor<OutPoint>>, handler: PeerManager<Peer<'a>>) -> Self {
160+
impl<'a, 'b> MoneyLossDetector<'a, 'b> {
161+
pub fn new(peers: &'a RefCell<[bool; 256]>, manager: Arc<ChannelManager<'b>>, monitor: Arc<channelmonitor::SimpleManyChannelMonitor<OutPoint>>, handler: PeerManager<Peer<'a>>) -> Self {
162162
MoneyLossDetector {
163163
manager,
164164
monitor,
@@ -217,7 +217,7 @@ impl<'a> MoneyLossDetector<'a> {
217217
}
218218
}
219219

220-
impl<'a> Drop for MoneyLossDetector<'a> {
220+
impl<'a, 'b> Drop for MoneyLossDetector<'a, 'b> {
221221
fn drop(&mut self) {
222222
if !::std::thread::panicking() {
223223
// Disconnect all peers
@@ -331,7 +331,7 @@ pub fn do_test(data: &[u8], logger: &Arc<Logger>) {
331331
config.channel_options.fee_proportional_millionths = slice_to_be32(get_slice!(4));
332332
config.channel_options.announced_channel = get_slice!(1)[0] != 0;
333333
config.peer_channel_config_limits.min_dust_limit_satoshis = 0;
334-
let channelmanager = ChannelManager::new(Network::Bitcoin, fee_est.clone(), monitor.clone(), watch.clone(), broadcast.clone(), Arc::clone(&logger), keys_manager.clone(), config, 0).unwrap();
334+
let channelmanager = ChannelManager::new(Network::Bitcoin, fee_est.clone(), monitor.clone(), broadcast.clone(), Arc::clone(&logger), keys_manager.clone(), config, 0).unwrap();
335335
let router = Arc::new(Router::new(PublicKey::from_secret_key(&Secp256k1::signing_only(), &keys_manager.get_node_secret()), watch.clone(), Arc::clone(&logger)));
336336

337337
let peers = RefCell::new([false; 256]);

lightning/fuzz/fuzz_targets/router_target.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@ extern crate secp256k1;
55

66
use bitcoin_hashes::sha256d::Hash as Sha256dHash;
77
use bitcoin::blockdata::script::{Script, Builder};
8+
use bitcoin::blockdata::block::Block;
9+
use bitcoin::blockdata::transaction::Transaction;
810

9-
use lightning::chain::chaininterface::{ChainError,ChainWatchInterface, ChainListener};
11+
use lightning::chain::chaininterface::{ChainError,ChainWatchInterface};
1012
use lightning::ln::channelmanager::ChannelDetails;
1113
use lightning::ln::msgs;
1214
use lightning::ln::msgs::{RoutingMessageHandler};
@@ -20,7 +22,7 @@ mod utils;
2022

2123
use utils::test_logger;
2224

23-
use std::sync::{Weak, Arc};
25+
use std::sync::Arc;
2426
use std::sync::atomic::{AtomicUsize, Ordering};
2527

2628
#[inline]
@@ -79,7 +81,10 @@ impl ChainWatchInterface for DummyChainWatcher {
7981
fn install_watch_tx(&self, _txid: &Sha256dHash, _script_pub_key: &Script) { }
8082
fn install_watch_outpoint(&self, _outpoint: (Sha256dHash, u32), _out_script: &Script) { }
8183
fn watch_all_txn(&self) { }
82-
fn register_listener(&self, _listener: Weak<ChainListener>) { }
84+
fn filter_block<'a>(&self, _block: &'a Block) -> (Vec<&'a Transaction>, Vec<u32>) {
85+
(Vec::new(), Vec::new())
86+
}
87+
fn reentered(&self) -> usize { 0 }
8388

8489
fn get_chain_utxo(&self, _genesis_hash: Sha256dHash, _unspent_tx_output_identifier: u64) -> Result<(Script, u64), ChainError> {
8590
match self.input.get_slice(2) {

lightning/src/chain/chaininterface.rs

Lines changed: 97 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -45,16 +45,20 @@ pub trait ChainWatchInterface: Sync + Send {
4545
/// Indicates that a listener needs to see all transactions.
4646
fn watch_all_txn(&self);
4747

48-
/// Register the given listener to receive events. Only a weak pointer is provided and the
49-
/// registration should be freed once that pointer expires.
50-
fn register_listener(&self, listener: Weak<ChainListener>);
51-
//TODO: unregister
52-
5348
/// Gets the script and value in satoshis for a given unspent transaction output given a
5449
/// short_channel_id (aka unspent_tx_output_identier). For BTC/tBTC channels the top three
5550
/// bytes are the block height, the next 3 the transaction index within the block, and the
5651
/// final two the output within the transaction.
5752
fn get_chain_utxo(&self, genesis_hash: Sha256dHash, unspent_tx_output_identifier: u64) -> Result<(Script, u64), ChainError>;
53+
54+
/// Gets the list of transactions and transaction indices that the ChainWatchInterface is
55+
/// watching for.
56+
fn filter_block<'a>(&self, block: &'a Block) -> (Vec<&'a Transaction>, Vec<u32>);
57+
58+
/// Returns a usize that changes when the ChainWatchInterface's watched data is modified.
59+
/// Users of `filter_block` should pre-save a copy of `reentered`'s return value and use it to
60+
/// determine whether they need to re-filter a given block.
61+
fn reentered(&self) -> usize;
5862
}
5963

6064
/// An interface to send a transaction to the Bitcoin network.
@@ -198,13 +202,81 @@ impl ChainWatchedUtil {
198202
}
199203
}
200204

205+
/// Utility for notifying listeners about new blocks, and handling block rescans if new watch
206+
/// data is registered.
207+
pub struct BlockNotifier<'a> {
208+
listeners: Mutex<Vec<Weak<ChainListener + 'a>>>, //TODO(vmw): try removing Weak
209+
chain_monitor: Arc<ChainWatchInterface>,
210+
}
211+
212+
impl<'a> BlockNotifier<'a> {
213+
/// Constructs a new BlockNotifier without any listeners.
214+
pub fn new(chain_monitor: Arc<ChainWatchInterface>) -> BlockNotifier<'a> {
215+
BlockNotifier {
216+
listeners: Mutex::new(Vec::new()),
217+
chain_monitor,
218+
}
219+
}
220+
221+
/// Register the given listener to receive events. Only a weak pointer is provided and
222+
/// the registration should be freed once that pointer expires.
223+
// TODO: unregister
224+
pub fn register_listener(&self, listener: Weak<ChainListener + 'a>) {
225+
let mut vec = self.listeners.lock().unwrap();
226+
vec.push(listener);
227+
}
228+
229+
/// Notify listeners that a block was connected given a full, unfiltered block.
230+
///
231+
/// Handles re-scanning the block and calling block_connected again if listeners register new
232+
/// watch data during the callbacks for you (see ChainListener::block_connected for more info).
233+
pub fn block_connected<'b>(&self, block: &'b Block, height: u32) {
234+
let mut reentered = true;
235+
while reentered {
236+
let (matched, matched_index) = self.chain_monitor.filter_block(block);
237+
reentered = self.block_connected_checked(&block.header, height, matched.as_slice(), matched_index.as_slice());
238+
}
239+
}
240+
241+
/// Notify listeners that a block was connected, given pre-filtered list of transactions in the
242+
/// block which matched the filter (probably using does_match_tx).
243+
///
244+
/// Returns true if notified listeners registered additional watch data (implying that the
245+
/// block must be re-scanned and this function called again prior to further block_connected
246+
/// calls, see ChainListener::block_connected for more info).
247+
pub fn block_connected_checked(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], indexes_of_txn_matched: &[u32]) -> bool {
248+
let last_seen = self.chain_monitor.reentered();
249+
250+
let listeners = self.listeners.lock().unwrap().clone();
251+
for listener in listeners.iter() {
252+
match listener.upgrade() {
253+
Some(arc) => arc.block_connected(header, height, txn_matched, indexes_of_txn_matched),
254+
None => ()
255+
}
256+
}
257+
return last_seen != self.chain_monitor.reentered();
258+
}
259+
260+
261+
/// Notify listeners that a block was disconnected.
262+
pub fn block_disconnected(&self, header: &BlockHeader, disconnected_height: u32) {
263+
let listeners = self.listeners.lock().unwrap().clone();
264+
for listener in listeners.iter() {
265+
match listener.upgrade() {
266+
Some(arc) => arc.block_disconnected(&header, disconnected_height),
267+
None => ()
268+
}
269+
}
270+
}
271+
272+
}
273+
201274
/// Utility to capture some common parts of ChainWatchInterface implementors.
202275
///
203276
/// Keeping a local copy of this in a ChainWatchInterface implementor is likely useful.
204277
pub struct ChainWatchInterfaceUtil {
205278
network: Network,
206279
watched: Mutex<ChainWatchedUtil>,
207-
listeners: Mutex<Vec<Weak<ChainListener>>>,
208280
reentered: AtomicUsize,
209281
logger: Arc<Logger>,
210282
}
@@ -232,17 +304,31 @@ impl ChainWatchInterface for ChainWatchInterfaceUtil {
232304
}
233305
}
234306

235-
fn register_listener(&self, listener: Weak<ChainListener>) {
236-
let mut vec = self.listeners.lock().unwrap();
237-
vec.push(listener);
238-
}
239-
240307
fn get_chain_utxo(&self, genesis_hash: Sha256dHash, _unspent_tx_output_identifier: u64) -> Result<(Script, u64), ChainError> {
241308
if genesis_hash != genesis_block(self.network).header.bitcoin_hash() {
242309
return Err(ChainError::NotWatched);
243310
}
244311
Err(ChainError::NotSupported)
245312
}
313+
314+
fn filter_block<'a>(&self, block: &'a Block) -> (Vec<&'a Transaction>, Vec<u32>) {
315+
let mut matched = Vec::new();
316+
let mut matched_index = Vec::new();
317+
{
318+
let watched = self.watched.lock().unwrap();
319+
for (index, transaction) in block.txdata.iter().enumerate() {
320+
if self.does_match_tx_unguarded(transaction, &watched) {
321+
matched.push(transaction);
322+
matched_index.push(index as u32);
323+
}
324+
}
325+
}
326+
(matched, matched_index)
327+
}
328+
329+
fn reentered(&self) -> usize {
330+
self.reentered.load(Ordering::Relaxed)
331+
}
246332
}
247333

248334
impl ChainWatchInterfaceUtil {
@@ -251,63 +337,11 @@ impl ChainWatchInterfaceUtil {
251337
ChainWatchInterfaceUtil {
252338
network: network,
253339
watched: Mutex::new(ChainWatchedUtil::new()),
254-
listeners: Mutex::new(Vec::new()),
255340
reentered: AtomicUsize::new(1),
256341
logger: logger,
257342
}
258343
}
259344

260-
/// Notify listeners that a block was connected given a full, unfiltered block.
261-
///
262-
/// Handles re-scanning the block and calling block_connected again if listeners register new
263-
/// watch data during the callbacks for you (see ChainListener::block_connected for more info).
264-
pub fn block_connected_with_filtering(&self, block: &Block, height: u32) {
265-
let mut reentered = true;
266-
while reentered {
267-
let mut matched = Vec::new();
268-
let mut matched_index = Vec::new();
269-
{
270-
let watched = self.watched.lock().unwrap();
271-
for (index, transaction) in block.txdata.iter().enumerate() {
272-
if self.does_match_tx_unguarded(transaction, &watched) {
273-
matched.push(transaction);
274-
matched_index.push(index as u32);
275-
}
276-
}
277-
}
278-
reentered = self.block_connected_checked(&block.header, height, matched.as_slice(), matched_index.as_slice());
279-
}
280-
}
281-
282-
/// Notify listeners that a block was disconnected.
283-
pub fn block_disconnected(&self, header: &BlockHeader, disconnected_height: u32) {
284-
let listeners = self.listeners.lock().unwrap().clone();
285-
for listener in listeners.iter() {
286-
match listener.upgrade() {
287-
Some(arc) => arc.block_disconnected(&header, disconnected_height),
288-
None => ()
289-
}
290-
}
291-
}
292-
293-
/// Notify listeners that a block was connected, given pre-filtered list of transactions in the
294-
/// block which matched the filter (probably using does_match_tx).
295-
///
296-
/// Returns true if notified listeners registered additional watch data (implying that the
297-
/// block must be re-scanned and this function called again prior to further block_connected
298-
/// calls, see ChainListener::block_connected for more info).
299-
pub fn block_connected_checked(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], indexes_of_txn_matched: &[u32]) -> bool {
300-
let last_seen = self.reentered.load(Ordering::Relaxed);
301-
302-
let listeners = self.listeners.lock().unwrap().clone();
303-
for listener in listeners.iter() {
304-
match listener.upgrade() {
305-
Some(arc) => arc.block_connected(header, height, txn_matched, indexes_of_txn_matched),
306-
None => ()
307-
}
308-
}
309-
return last_seen != self.reentered.load(Ordering::Relaxed);
310-
}
311345

312346
/// Checks if a given transaction matches the current filter.
313347
pub fn does_match_tx(&self, tx: &Transaction) -> bool {

lightning/src/ln/chanmon_update_fail_tests.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1627,11 +1627,11 @@ fn do_during_funding_monitor_fail(fail_on_generate: bool, restore_between_fails:
16271627
};
16281628

16291629
if confirm_a_first {
1630-
confirm_transaction(&nodes[0].chain_monitor, &funding_tx, funding_tx.version);
1630+
confirm_transaction(&nodes[0].block_notifier, &nodes[0].chain_monitor, &funding_tx, funding_tx.version);
16311631
nodes[1].node.handle_funding_locked(&nodes[0].node.get_our_node_id(), &get_event_msg!(nodes[0], MessageSendEvent::SendFundingLocked, nodes[1].node.get_our_node_id())).unwrap();
16321632
} else {
16331633
assert!(!restore_b_before_conf);
1634-
confirm_transaction(&nodes[1].chain_monitor, &funding_tx, funding_tx.version);
1634+
confirm_transaction(&nodes[1].block_notifier, &nodes[1].chain_monitor, &funding_tx, funding_tx.version);
16351635
assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
16361636
}
16371637

@@ -1643,7 +1643,7 @@ fn do_during_funding_monitor_fail(fail_on_generate: bool, restore_between_fails:
16431643
assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
16441644

16451645
if !restore_b_before_conf {
1646-
confirm_transaction(&nodes[1].chain_monitor, &funding_tx, funding_tx.version);
1646+
confirm_transaction(&nodes[1].block_notifier, &nodes[1].chain_monitor, &funding_tx, funding_tx.version);
16471647
assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
16481648
assert!(nodes[1].node.get_and_clear_pending_events().is_empty());
16491649
}
@@ -1655,12 +1655,12 @@ fn do_during_funding_monitor_fail(fail_on_generate: bool, restore_between_fails:
16551655
let (channel_id, (announcement, as_update, bs_update)) = if !confirm_a_first {
16561656
nodes[0].node.handle_funding_locked(&nodes[1].node.get_our_node_id(), &get_event_msg!(nodes[1], MessageSendEvent::SendFundingLocked, nodes[0].node.get_our_node_id())).unwrap();
16571657

1658-
confirm_transaction(&nodes[0].chain_monitor, &funding_tx, funding_tx.version);
1658+
confirm_transaction(&nodes[0].block_notifier, &nodes[0].chain_monitor, &funding_tx, funding_tx.version);
16591659
let (funding_locked, channel_id) = create_chan_between_nodes_with_value_confirm_second(&nodes[1], &nodes[0]);
16601660
(channel_id, create_chan_between_nodes_with_value_b(&nodes[0], &nodes[1], &funding_locked))
16611661
} else {
16621662
if restore_b_before_conf {
1663-
confirm_transaction(&nodes[1].chain_monitor, &funding_tx, funding_tx.version);
1663+
confirm_transaction(&nodes[1].block_notifier, &nodes[1].chain_monitor, &funding_tx, funding_tx.version);
16641664
}
16651665
let (funding_locked, channel_id) = create_chan_between_nodes_with_value_confirm_second(&nodes[0], &nodes[1]);
16661666
(channel_id, create_chan_between_nodes_with_value_b(&nodes[1], &nodes[0], &funding_locked))

0 commit comments

Comments
 (0)