Skip to content

Commit 2f6a2e9

Browse files
chaininterface: add BlockNotifier struct
Adding this struct will allow us to remove the circular reference between ChainListeners and the ChainWatchInterface, because it separates out the responsibility of notifying listeners about new blocks from the responsibility of storing and retrieving watched transactions.
1 parent 1c95f69 commit 2f6a2e9

File tree

4 files changed

+247
-157
lines changed

4 files changed

+247
-157
lines changed

src/chain/chaininterface.rs

Lines changed: 69 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,75 @@ impl ChainWatchedUtil {
193193
}
194194
}
195195

196+
/// Utility for notifying listeners about new blocks, and handling block rescans if new watch
197+
/// data is registered.
198+
pub struct BlockNotifier<'a, 'b: 'a> {
199+
listeners: Mutex<Vec<Weak<ChainListener + 'a>>>, //TODO(vmw): try removing Weak
200+
chain_monitor: Arc<ChainWatchInterface<'b>>,
201+
}
202+
203+
impl<'a, 'b> BlockNotifier<'a, 'b> {
204+
/// Constructs a new BlockNotifier without any listeners.
205+
pub fn new(chain_monitor: Arc<ChainWatchInterface>) -> BlockNotifier {
206+
BlockNotifier {
207+
listeners: Mutex::new(Vec::new()),
208+
chain_monitor,
209+
}
210+
}
211+
212+
/// Register the given listener to receive events. Only a weak pointer is provided and
213+
/// the registration should be freed once that pointer expires.
214+
// TODO: unregister
215+
pub fn register_listener(&self, listener: Weak<ChainListener + 'a>) {
216+
let mut vec = self.listeners.lock().unwrap();
217+
vec.push(listener);
218+
}
219+
220+
/// Notify listeners that a block was connected given a full, unfiltered block.
221+
///
222+
/// Handles re-scanning the block and calling block_connected again if listeners register new
223+
/// watch data during the callbacks for you (see ChainListener::block_connected for more info).
224+
pub fn block_connected(&self, block: &'b Block, height: u32) {
225+
let mut reentered = true;
226+
while reentered {
227+
let (matched, matched_index) = self.chain_monitor.filter_block(block);
228+
reentered = self.block_connected_checked(&block.header, height, matched.as_slice(), matched_index.as_slice());
229+
}
230+
}
231+
232+
/// Notify listeners that a block was connected, given pre-filtered list of transactions in the
233+
/// block which matched the filter (probably using does_match_tx).
234+
///
235+
/// Returns true if notified listeners registered additional watch data (implying that the
236+
/// block must be re-scanned and this function called again prior to further block_connected
237+
/// calls, see ChainListener::block_connected for more info).
238+
pub fn block_connected_checked(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], indexes_of_txn_matched: &[u32]) -> bool {
239+
let last_seen = self.chain_monitor.reentered();
240+
241+
let listeners = self.listeners.lock().unwrap().clone();
242+
for listener in listeners.iter() {
243+
match listener.upgrade() {
244+
Some(arc) => arc.block_connected(header, height, txn_matched, indexes_of_txn_matched),
245+
None => ()
246+
}
247+
}
248+
return last_seen != self.chain_monitor.reentered();
249+
}
250+
251+
252+
/// Notify listeners that a block was disconnected.
253+
pub fn block_disconnected(&self, header: &BlockHeader, disconnected_height: u32) {
254+
let listeners = self.listeners.lock().unwrap().clone();
255+
for listener in listeners.iter() {
256+
match listener.upgrade() {
257+
Some(arc) => arc.block_disconnected(&header, disconnected_height),
258+
None => ()
259+
}
260+
}
261+
}
262+
263+
}
264+
196265
/// Utility to capture some common parts of ChainWatchInterface implementors.
197266
///
198267
/// Keeping a local copy of this in a ChainWatchInterface implementor is likely useful.
@@ -240,63 +309,11 @@ impl ChainWatchInterfaceUtil {
240309
ChainWatchInterfaceUtil {
241310
network: network,
242311
watched: Mutex::new(ChainWatchedUtil::new()),
243-
listeners: Mutex::new(Vec::new()),
244312
reentered: AtomicUsize::new(1),
245313
logger: logger,
246314
}
247315
}
248316

249-
/// Notify listeners that a block was connected given a full, unfiltered block.
250-
///
251-
/// Handles re-scanning the block and calling block_connected again if listeners register new
252-
/// watch data during the callbacks for you (see ChainListener::block_connected for more info).
253-
pub fn block_connected_with_filtering(&self, block: &Block, height: u32) {
254-
let mut reentered = true;
255-
while reentered {
256-
let mut matched = Vec::new();
257-
let mut matched_index = Vec::new();
258-
{
259-
let watched = self.watched.lock().unwrap();
260-
for (index, transaction) in block.txdata.iter().enumerate() {
261-
if self.does_match_tx_unguarded(transaction, &watched) {
262-
matched.push(transaction);
263-
matched_index.push(index as u32);
264-
}
265-
}
266-
}
267-
reentered = self.block_connected_checked(&block.header, height, matched.as_slice(), matched_index.as_slice());
268-
}
269-
}
270-
271-
/// Notify listeners that a block was disconnected.
272-
pub fn block_disconnected(&self, header: &BlockHeader, disconnected_height: u32) {
273-
let listeners = self.listeners.lock().unwrap().clone();
274-
for listener in listeners.iter() {
275-
match listener.upgrade() {
276-
Some(arc) => arc.block_disconnected(&header, disconnected_height),
277-
None => ()
278-
}
279-
}
280-
}
281-
282-
/// Notify listeners that a block was connected, given pre-filtered list of transactions in the
283-
/// block which matched the filter (probably using does_match_tx).
284-
///
285-
/// Returns true if notified listeners registered additional watch data (implying that the
286-
/// block must be re-scanned and this function called again prior to further block_connected
287-
/// calls, see ChainListener::block_connected for more info).
288-
pub fn block_connected_checked(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], indexes_of_txn_matched: &[u32]) -> bool {
289-
let last_seen = self.reentered.load(Ordering::Relaxed);
290-
291-
let listeners = self.listeners.lock().unwrap().clone();
292-
for listener in listeners.iter() {
293-
match listener.upgrade() {
294-
Some(arc) => arc.block_connected(header, height, txn_matched, indexes_of_txn_matched),
295-
None => ()
296-
}
297-
}
298-
return last_seen != self.reentered.load(Ordering::Relaxed);
299-
}
300317

301318
/// Checks if a given transaction matches the current filter.
302319
pub fn does_match_tx(&self, tx: &Transaction) -> bool {

src/ln/chanmon_update_fail_tests.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1627,11 +1627,11 @@ fn do_during_funding_monitor_fail(fail_on_generate: bool, restore_between_fails:
16271627
};
16281628

16291629
if confirm_a_first {
1630-
confirm_transaction(&nodes[0].chain_monitor, &funding_tx, funding_tx.version);
1630+
confirm_transaction(&nodes[0].block_notifier, &nodes[0].chain_monitor, &funding_tx, funding_tx.version);
16311631
nodes[1].node.handle_funding_locked(&nodes[0].node.get_our_node_id(), &get_event_msg!(nodes[0], MessageSendEvent::SendFundingLocked, nodes[1].node.get_our_node_id())).unwrap();
16321632
} else {
16331633
assert!(!restore_b_before_conf);
1634-
confirm_transaction(&nodes[1].chain_monitor, &funding_tx, funding_tx.version);
1634+
confirm_transaction(&nodes[1].block_notifier, &nodes[1].chain_monitor, &funding_tx, funding_tx.version);
16351635
assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
16361636
}
16371637

@@ -1643,7 +1643,7 @@ fn do_during_funding_monitor_fail(fail_on_generate: bool, restore_between_fails:
16431643
assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
16441644

16451645
if !restore_b_before_conf {
1646-
confirm_transaction(&nodes[1].chain_monitor, &funding_tx, funding_tx.version);
1646+
confirm_transaction(&nodes[1].block_notifier, &nodes[1].chain_monitor, &funding_tx, funding_tx.version);
16471647
assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
16481648
assert!(nodes[1].node.get_and_clear_pending_events().is_empty());
16491649
}
@@ -1655,12 +1655,12 @@ fn do_during_funding_monitor_fail(fail_on_generate: bool, restore_between_fails:
16551655
let (channel_id, (announcement, as_update, bs_update)) = if !confirm_a_first {
16561656
nodes[0].node.handle_funding_locked(&nodes[1].node.get_our_node_id(), &get_event_msg!(nodes[1], MessageSendEvent::SendFundingLocked, nodes[0].node.get_our_node_id())).unwrap();
16571657

1658-
confirm_transaction(&nodes[0].chain_monitor, &funding_tx, funding_tx.version);
1658+
confirm_transaction(&nodes[0].block_notifier, &nodes[0].chain_monitor, &funding_tx, funding_tx.version);
16591659
let (funding_locked, channel_id) = create_chan_between_nodes_with_value_confirm_second(&nodes[1], &nodes[0]);
16601660
(channel_id, create_chan_between_nodes_with_value_b(&nodes[0], &nodes[1], &funding_locked))
16611661
} else {
16621662
if restore_b_before_conf {
1663-
confirm_transaction(&nodes[1].chain_monitor, &funding_tx, funding_tx.version);
1663+
confirm_transaction(&nodes[1].block_notifier, &nodes[1].chain_monitor, &funding_tx, funding_tx.version);
16641664
}
16651665
let (funding_locked, channel_id) = create_chan_between_nodes_with_value_confirm_second(&nodes[0], &nodes[1]);
16661666
(channel_id, create_chan_between_nodes_with_value_b(&nodes[1], &nodes[0], &funding_locked))

src/ln/functional_test_utils.rs

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,27 +34,28 @@ use std::sync::{Arc, Mutex};
3434
use std::mem;
3535

3636
pub const CHAN_CONFIRM_DEPTH: u32 = 100;
37-
pub fn confirm_transaction(chain: &chaininterface::ChainWatchInterfaceUtil, tx: &Transaction, chan_id: u32) {
37+
pub fn confirm_transaction(notifier: &chaininterface::BlockNotifier, chain: &chaininterface::ChainWatchInterfaceUtil, tx: &Transaction, chan_id: u32) {
3838
assert!(chain.does_match_tx(tx));
3939
let mut header = BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
40-
chain.block_connected_checked(&header, 1, &[tx; 1], &[chan_id; 1]);
40+
notifier.block_connected_checked(&header, 1, &[tx; 1], &[chan_id; 1]);
4141
for i in 2..CHAN_CONFIRM_DEPTH {
4242
header = BlockHeader { version: 0x20000000, prev_blockhash: header.bitcoin_hash(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
43-
chain.block_connected_checked(&header, i, &[tx; 0], &[0; 0]);
43+
notifier.block_connected_checked(&header, i, &vec![], &[0; 0]);
4444
}
4545
}
4646

47-
pub fn connect_blocks(chain: &chaininterface::ChainWatchInterfaceUtil, depth: u32, height: u32, parent: bool, prev_blockhash: Sha256d) -> Sha256d {
47+
pub fn connect_blocks(notifier: &chaininterface::BlockNotifier, depth: u32, height: u32, parent: bool, prev_blockhash: Sha256d) -> Sha256d {
4848
let mut header = BlockHeader { version: 0x2000000, prev_blockhash: if parent { prev_blockhash } else { Default::default() }, merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
49-
chain.block_connected_checked(&header, height + 1, &Vec::new(), &Vec::new());
49+
notifier.block_connected_checked(&header, height + 1, &Vec::new(), &Vec::new());
5050
for i in 2..depth + 1 {
5151
header = BlockHeader { version: 0x20000000, prev_blockhash: header.bitcoin_hash(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 };
52-
chain.block_connected_checked(&header, height + i, &Vec::new(), &Vec::new());
52+
notifier.block_connected_checked(&header, height + i, &Vec::new(), &Vec::new());
5353
}
5454
header.bitcoin_hash()
5555
}
5656

5757
pub struct Node {
58+
pub block_notifier: Arc<chaininterface::BlockNotifier<'a, 'b>>,
5859
pub chain_monitor: Arc<chaininterface::ChainWatchInterfaceUtil>,
5960
pub tx_broadcaster: Arc<test_utils::TestBroadcaster>,
6061
pub chan_monitor: Arc<test_utils::TestChannelMonitor>,
@@ -220,7 +221,7 @@ pub fn create_chan_between_nodes_with_value_init(node_a: &Node, node_b: &Node, c
220221
}
221222

222223
pub fn create_chan_between_nodes_with_value_confirm_first(node_recv: &Node, node_conf: &Node, tx: &Transaction) {
223-
confirm_transaction(&node_conf.chain_monitor, &tx, tx.version);
224+
confirm_transaction(&node_conf.block_notifier, &node_conf.chain_monitor, &tx, tx.version);
224225
node_recv.node.handle_funding_locked(&node_conf.node.get_our_node_id(), &get_event_msg!(node_conf, MessageSendEvent::SendFundingLocked, node_recv.node.get_our_node_id())).unwrap();
225226
}
226227

@@ -246,7 +247,7 @@ pub fn create_chan_between_nodes_with_value_confirm_second(node_recv: &Node, nod
246247

247248
pub fn create_chan_between_nodes_with_value_confirm(node_a: &Node, node_b: &Node, tx: &Transaction) -> ((msgs::FundingLocked, msgs::AnnouncementSignatures), [u8; 32]) {
248249
create_chan_between_nodes_with_value_confirm_first(node_a, node_b, tx);
249-
confirm_transaction(&node_a.chain_monitor, &tx, tx.version);
250+
confirm_transaction(&node_a.block_notifier, &node_a.chain_monitor, &tx, tx.version);
250251
create_chan_between_nodes_with_value_confirm_second(node_b, node_a)
251252
}
252253

@@ -836,19 +837,25 @@ pub fn create_network(node_count: usize, node_config: &[Option<UserConfig>]) ->
836837
let logger: Arc<Logger> = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
837838
let feeest = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: 253 });
838839
let chain_monitor = Arc::new(chaininterface::ChainWatchInterfaceUtil::new(Network::Testnet, Arc::clone(&logger)));
840+
let block_notifier = Arc::new(chaininterface::BlockNotifier::new(chain_monitor.clone()));
839841
let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new())});
840842
let mut seed = [0; 32];
841843
rng.fill_bytes(&mut seed);
842844
let keys_manager = Arc::new(test_utils::TestKeysInterface::new(&seed, Network::Testnet, Arc::clone(&logger)));
843845
let chan_monitor = Arc::new(test_utils::TestChannelMonitor::new(chain_monitor.clone(), tx_broadcaster.clone(), logger.clone(), feeest.clone()));
846+
let weak_res = Arc::downgrade(&chan_monitor.simple_monitor);
847+
block_notifier.register_listener(weak_res);
844848
let mut default_config = UserConfig::new();
845849
default_config.channel_options.announced_channel = true;
846850
default_config.peer_channel_config_limits.force_announced_channel_preference = false;
847851
let node = ChannelManager::new(Network::Testnet, feeest.clone(), chan_monitor.clone(), tx_broadcaster.clone(), Arc::clone(&logger), keys_manager.clone(), if node_config[i].is_some() { node_config[i].clone().unwrap() } else { default_config }).unwrap();
852+
let weak_res = Arc::downgrade(&node);
853+
block_notifier.register_listener(weak_res);
848854
let router = Router::new(PublicKey::from_secret_key(&secp_ctx, &keys_manager.get_node_secret()), chain_monitor.clone(), Arc::clone(&logger));
849855
nodes.push(Node { chain_monitor, tx_broadcaster, chan_monitor, node, router, keys_manager, node_seed: seed,
850856
network_payment_count: payment_count.clone(),
851857
network_chan_count: chan_count.clone(),
858+
block_notifier,
852859
});
853860
}
854861

0 commit comments

Comments
 (0)