Skip to content

remove circular references in channelmanager and channelmonitor #389

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions lightning/fuzz/fuzz_targets/chanmon_fail_consistency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ pub fn do_test(data: &[u8]) {
config.channel_options.fee_proportional_millionths = 0;
config.channel_options.announced_channel = true;
config.peer_channel_config_limits.min_dust_limit_satoshis = 0;
(ChannelManager::new(Network::Bitcoin, fee_est.clone(), monitor.clone(), watch.clone(), broadcast.clone(), Arc::clone(&logger), keys_manager.clone(), config, 0).unwrap(),
(ChannelManager::new(Network::Bitcoin, fee_est.clone(), monitor.clone(), broadcast.clone(), Arc::clone(&logger), keys_manager.clone(), config, 0).unwrap(),
monitor)
} }
}
Expand Down Expand Up @@ -225,7 +225,6 @@ pub fn do_test(data: &[u8]) {
keys_manager,
fee_estimator: fee_est.clone(),
monitor: monitor.clone(),
chain_monitor: watch,
tx_broadcaster: broadcast.clone(),
logger,
default_config: config,
Expand All @@ -246,7 +245,6 @@ pub fn do_test(data: &[u8]) {
} }
}


let mut channel_txn = Vec::new();
macro_rules! make_channel {
($source: expr, $dest: expr, $chan_id: expr) => { {
Expand Down
12 changes: 6 additions & 6 deletions lightning/fuzz/fuzz_targets/full_stack_target.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,8 @@ impl<'a> Hash for Peer<'a> {
}
}

struct MoneyLossDetector<'a> {
manager: Arc<ChannelManager>,
struct MoneyLossDetector<'a, 'b> {
manager: Arc<ChannelManager<'b>>,
monitor: Arc<channelmonitor::SimpleManyChannelMonitor<OutPoint>>,
handler: PeerManager<Peer<'a>>,

Expand All @@ -157,8 +157,8 @@ struct MoneyLossDetector<'a> {
max_height: usize,
blocks_connected: u32,
}
impl<'a> MoneyLossDetector<'a> {
pub fn new(peers: &'a RefCell<[bool; 256]>, manager: Arc<ChannelManager>, monitor: Arc<channelmonitor::SimpleManyChannelMonitor<OutPoint>>, handler: PeerManager<Peer<'a>>) -> Self {
impl<'a, 'b> MoneyLossDetector<'a, 'b> {
pub fn new(peers: &'a RefCell<[bool; 256]>, manager: Arc<ChannelManager<'b>>, monitor: Arc<channelmonitor::SimpleManyChannelMonitor<OutPoint>>, handler: PeerManager<Peer<'a>>) -> Self {
MoneyLossDetector {
manager,
monitor,
Expand Down Expand Up @@ -217,7 +217,7 @@ impl<'a> MoneyLossDetector<'a> {
}
}

impl<'a> Drop for MoneyLossDetector<'a> {
impl<'a, 'b> Drop for MoneyLossDetector<'a, 'b> {
fn drop(&mut self) {
if !::std::thread::panicking() {
// Disconnect all peers
Expand Down Expand Up @@ -331,7 +331,7 @@ pub fn do_test(data: &[u8], logger: &Arc<Logger>) {
config.channel_options.fee_proportional_millionths = slice_to_be32(get_slice!(4));
config.channel_options.announced_channel = get_slice!(1)[0] != 0;
config.peer_channel_config_limits.min_dust_limit_satoshis = 0;
let channelmanager = ChannelManager::new(Network::Bitcoin, fee_est.clone(), monitor.clone(), watch.clone(), broadcast.clone(), Arc::clone(&logger), keys_manager.clone(), config, 0).unwrap();
let channelmanager = ChannelManager::new(Network::Bitcoin, fee_est.clone(), monitor.clone(), broadcast.clone(), Arc::clone(&logger), keys_manager.clone(), config, 0).unwrap();
let router = Arc::new(Router::new(PublicKey::from_secret_key(&Secp256k1::signing_only(), &keys_manager.get_node_secret()), watch.clone(), Arc::clone(&logger)));

let peers = RefCell::new([false; 256]);
Expand Down
11 changes: 8 additions & 3 deletions lightning/fuzz/fuzz_targets/router_target.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ extern crate secp256k1;

use bitcoin_hashes::sha256d::Hash as Sha256dHash;
use bitcoin::blockdata::script::{Script, Builder};
use bitcoin::blockdata::block::Block;
use bitcoin::blockdata::transaction::Transaction;

use lightning::chain::chaininterface::{ChainError,ChainWatchInterface, ChainListener};
use lightning::chain::chaininterface::{ChainError,ChainWatchInterface};
use lightning::ln::channelmanager::ChannelDetails;
use lightning::ln::msgs;
use lightning::ln::msgs::{RoutingMessageHandler};
Expand All @@ -20,7 +22,7 @@ mod utils;

use utils::test_logger;

use std::sync::{Weak, Arc};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};

#[inline]
Expand Down Expand Up @@ -79,7 +81,10 @@ impl ChainWatchInterface for DummyChainWatcher {
fn install_watch_tx(&self, _txid: &Sha256dHash, _script_pub_key: &Script) { }
fn install_watch_outpoint(&self, _outpoint: (Sha256dHash, u32), _out_script: &Script) { }
fn watch_all_txn(&self) { }
fn register_listener(&self, _listener: Weak<ChainListener>) { }
fn filter_block<'a>(&self, _block: &'a Block) -> (Vec<&'a Transaction>, Vec<u32>) {
(Vec::new(), Vec::new())
}
fn reentered(&self) -> usize { 0 }

fn get_chain_utxo(&self, _genesis_hash: Sha256dHash, _unspent_tx_output_identifier: u64) -> Result<(Script, u64), ChainError> {
match self.input.get_slice(2) {
Expand Down
160 changes: 97 additions & 63 deletions lightning/src/chain/chaininterface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,20 @@ pub trait ChainWatchInterface: Sync + Send {
/// Indicates that a listener needs to see all transactions.
fn watch_all_txn(&self);

/// Register the given listener to receive events. Only a weak pointer is provided and the
/// registration should be freed once that pointer expires.
fn register_listener(&self, listener: Weak<ChainListener>);
//TODO: unregister

/// Gets the script and value in satoshis for a given unspent transaction output given a
/// short_channel_id (aka unspent_tx_output_identier). For BTC/tBTC channels the top three
/// bytes are the block height, the next 3 the transaction index within the block, and the
/// final two the output within the transaction.
fn get_chain_utxo(&self, genesis_hash: Sha256dHash, unspent_tx_output_identifier: u64) -> Result<(Script, u64), ChainError>;

/// Gets the list of transactions and transaction indices that the ChainWatchInterface is
/// watching for.
fn filter_block<'a>(&self, block: &'a Block) -> (Vec<&'a Transaction>, Vec<u32>);

/// Returns a usize that changes when the ChainWatchInterface's watched data is modified.
/// Users of `filter_block` should pre-save a copy of `reentered`'s return value and use it to
/// determine whether they need to re-filter a given block.
fn reentered(&self) -> usize;
}

/// An interface to send a transaction to the Bitcoin network.
Expand Down Expand Up @@ -198,13 +202,81 @@ impl ChainWatchedUtil {
}
}

/// Utility for notifying listeners about new blocks, and handling block rescans if new watch
/// data is registered.
pub struct BlockNotifier<'a> {
listeners: Mutex<Vec<Weak<ChainListener + 'a>>>, //TODO(vmw): try removing Weak
chain_monitor: Arc<ChainWatchInterface>,
}

impl<'a> BlockNotifier<'a> {
/// Constructs a new BlockNotifier without any listeners.
pub fn new(chain_monitor: Arc<ChainWatchInterface>) -> BlockNotifier<'a> {
BlockNotifier {
listeners: Mutex::new(Vec::new()),
chain_monitor,
}
}

/// Register the given listener to receive events. Only a weak pointer is provided and
/// the registration should be freed once that pointer expires.
// TODO: unregister
pub fn register_listener(&self, listener: Weak<ChainListener + 'a>) {
let mut vec = self.listeners.lock().unwrap();
vec.push(listener);
}

/// Notify listeners that a block was connected given a full, unfiltered block.
///
/// Handles re-scanning the block and calling block_connected again if listeners register new
/// watch data during the callbacks for you (see ChainListener::block_connected for more info).
pub fn block_connected<'b>(&self, block: &'b Block, height: u32) {
let mut reentered = true;
while reentered {
let (matched, matched_index) = self.chain_monitor.filter_block(block);
reentered = self.block_connected_checked(&block.header, height, matched.as_slice(), matched_index.as_slice());
}
}

/// Notify listeners that a block was connected, given pre-filtered list of transactions in the
/// block which matched the filter (probably using does_match_tx).
///
/// Returns true if notified listeners registered additional watch data (implying that the
/// block must be re-scanned and this function called again prior to further block_connected
/// calls, see ChainListener::block_connected for more info).
pub fn block_connected_checked(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], indexes_of_txn_matched: &[u32]) -> bool {
let last_seen = self.chain_monitor.reentered();

let listeners = self.listeners.lock().unwrap().clone();
for listener in listeners.iter() {
match listener.upgrade() {
Some(arc) => arc.block_connected(header, height, txn_matched, indexes_of_txn_matched),
None => ()
}
}
return last_seen != self.chain_monitor.reentered();
}


/// Notify listeners that a block was disconnected.
pub fn block_disconnected(&self, header: &BlockHeader, disconnected_height: u32) {
let listeners = self.listeners.lock().unwrap().clone();
for listener in listeners.iter() {
match listener.upgrade() {
Some(arc) => arc.block_disconnected(&header, disconnected_height),
None => ()
}
}
}

}

/// Utility to capture some common parts of ChainWatchInterface implementors.
///
/// Keeping a local copy of this in a ChainWatchInterface implementor is likely useful.
pub struct ChainWatchInterfaceUtil {
network: Network,
watched: Mutex<ChainWatchedUtil>,
listeners: Mutex<Vec<Weak<ChainListener>>>,
reentered: AtomicUsize,
logger: Arc<Logger>,
}
Expand Down Expand Up @@ -232,17 +304,31 @@ impl ChainWatchInterface for ChainWatchInterfaceUtil {
}
}

fn register_listener(&self, listener: Weak<ChainListener>) {
let mut vec = self.listeners.lock().unwrap();
vec.push(listener);
}

fn get_chain_utxo(&self, genesis_hash: Sha256dHash, _unspent_tx_output_identifier: u64) -> Result<(Script, u64), ChainError> {
if genesis_hash != genesis_block(self.network).header.bitcoin_hash() {
return Err(ChainError::NotWatched);
}
Err(ChainError::NotSupported)
}

fn filter_block<'a>(&self, block: &'a Block) -> (Vec<&'a Transaction>, Vec<u32>) {
let mut matched = Vec::new();
let mut matched_index = Vec::new();
{
let watched = self.watched.lock().unwrap();
for (index, transaction) in block.txdata.iter().enumerate() {
if self.does_match_tx_unguarded(transaction, &watched) {
matched.push(transaction);
matched_index.push(index as u32);
}
}
}
(matched, matched_index)
}

fn reentered(&self) -> usize {
self.reentered.load(Ordering::Relaxed)
}
}

impl ChainWatchInterfaceUtil {
Expand All @@ -251,63 +337,11 @@ impl ChainWatchInterfaceUtil {
ChainWatchInterfaceUtil {
network: network,
watched: Mutex::new(ChainWatchedUtil::new()),
listeners: Mutex::new(Vec::new()),
reentered: AtomicUsize::new(1),
logger: logger,
}
}

/// Notify listeners that a block was connected given a full, unfiltered block.
///
/// Handles re-scanning the block and calling block_connected again if listeners register new
/// watch data during the callbacks for you (see ChainListener::block_connected for more info).
pub fn block_connected_with_filtering(&self, block: &Block, height: u32) {
let mut reentered = true;
while reentered {
let mut matched = Vec::new();
let mut matched_index = Vec::new();
{
let watched = self.watched.lock().unwrap();
for (index, transaction) in block.txdata.iter().enumerate() {
if self.does_match_tx_unguarded(transaction, &watched) {
matched.push(transaction);
matched_index.push(index as u32);
}
}
}
reentered = self.block_connected_checked(&block.header, height, matched.as_slice(), matched_index.as_slice());
}
}

/// Notify listeners that a block was disconnected.
pub fn block_disconnected(&self, header: &BlockHeader, disconnected_height: u32) {
let listeners = self.listeners.lock().unwrap().clone();
for listener in listeners.iter() {
match listener.upgrade() {
Some(arc) => arc.block_disconnected(&header, disconnected_height),
None => ()
}
}
}

/// Notify listeners that a block was connected, given pre-filtered list of transactions in the
/// block which matched the filter (probably using does_match_tx).
///
/// Returns true if notified listeners registered additional watch data (implying that the
/// block must be re-scanned and this function called again prior to further block_connected
/// calls, see ChainListener::block_connected for more info).
pub fn block_connected_checked(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], indexes_of_txn_matched: &[u32]) -> bool {
let last_seen = self.reentered.load(Ordering::Relaxed);

let listeners = self.listeners.lock().unwrap().clone();
for listener in listeners.iter() {
match listener.upgrade() {
Some(arc) => arc.block_connected(header, height, txn_matched, indexes_of_txn_matched),
None => ()
}
}
return last_seen != self.reentered.load(Ordering::Relaxed);
}

/// Checks if a given transaction matches the current filter.
pub fn does_match_tx(&self, tx: &Transaction) -> bool {
Expand Down
10 changes: 5 additions & 5 deletions lightning/src/ln/chanmon_update_fail_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1627,11 +1627,11 @@ fn do_during_funding_monitor_fail(fail_on_generate: bool, restore_between_fails:
};

if confirm_a_first {
confirm_transaction(&nodes[0].chain_monitor, &funding_tx, funding_tx.version);
confirm_transaction(&nodes[0].block_notifier, &nodes[0].chain_monitor, &funding_tx, funding_tx.version);
nodes[1].node.handle_funding_locked(&nodes[0].node.get_our_node_id(), &get_event_msg!(nodes[0], MessageSendEvent::SendFundingLocked, nodes[1].node.get_our_node_id())).unwrap();
} else {
assert!(!restore_b_before_conf);
confirm_transaction(&nodes[1].chain_monitor, &funding_tx, funding_tx.version);
confirm_transaction(&nodes[1].block_notifier, &nodes[1].chain_monitor, &funding_tx, funding_tx.version);
assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
}

Expand All @@ -1643,7 +1643,7 @@ fn do_during_funding_monitor_fail(fail_on_generate: bool, restore_between_fails:
assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());

if !restore_b_before_conf {
confirm_transaction(&nodes[1].chain_monitor, &funding_tx, funding_tx.version);
confirm_transaction(&nodes[1].block_notifier, &nodes[1].chain_monitor, &funding_tx, funding_tx.version);
assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
assert!(nodes[1].node.get_and_clear_pending_events().is_empty());
}
Expand All @@ -1655,12 +1655,12 @@ fn do_during_funding_monitor_fail(fail_on_generate: bool, restore_between_fails:
let (channel_id, (announcement, as_update, bs_update)) = if !confirm_a_first {
nodes[0].node.handle_funding_locked(&nodes[1].node.get_our_node_id(), &get_event_msg!(nodes[1], MessageSendEvent::SendFundingLocked, nodes[0].node.get_our_node_id())).unwrap();

confirm_transaction(&nodes[0].chain_monitor, &funding_tx, funding_tx.version);
confirm_transaction(&nodes[0].block_notifier, &nodes[0].chain_monitor, &funding_tx, funding_tx.version);
let (funding_locked, channel_id) = create_chan_between_nodes_with_value_confirm_second(&nodes[1], &nodes[0]);
(channel_id, create_chan_between_nodes_with_value_b(&nodes[0], &nodes[1], &funding_locked))
} else {
if restore_b_before_conf {
confirm_transaction(&nodes[1].chain_monitor, &funding_tx, funding_tx.version);
confirm_transaction(&nodes[1].block_notifier, &nodes[1].chain_monitor, &funding_tx, funding_tx.version);
}
let (funding_locked, channel_id) = create_chan_between_nodes_with_value_confirm_second(&nodes[0], &nodes[1]);
(channel_id, create_chan_between_nodes_with_value_b(&nodes[1], &nodes[0], &funding_locked))
Expand Down
Loading