Skip to content

Commit f363fdb

Browse files
chaininterface+multi: add filter_block and reentered to ChainWatchInterface
Because filter_block takes a `&'a Block` and returns a list of `&'a Transaction`s , we must add a lifetime to the ChainWatchInterface, which bubbles up in a lot of places. These places include adding a lifetime to the Node struct, which causes a lot of rearranging tests so that variables don't go out of scope before the Node that owns them does.
1 parent 2f6a2e9 commit f363fdb

10 files changed

+189
-144
lines changed

fuzz/fuzz_targets/chanmon_fail_consistency.rs

Lines changed: 9 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -81,16 +81,16 @@ impl Writer for VecWriter {
8181
}
8282

8383
static mut IN_RESTORE: bool = false;
84-
pub struct TestChannelMonitor {
85-
pub simple_monitor: Arc<channelmonitor::SimpleManyChannelMonitor<OutPoint>>,
84+
pub struct TestChannelMonitor<'a> {
85+
pub simple_monitor: Arc<channelmonitor::SimpleManyChannelMonitor<'a, OutPoint>>,
8686
pub update_ret: Mutex<Result<(), channelmonitor::ChannelMonitorUpdateErr>>,
8787
pub latest_good_update: Mutex<HashMap<OutPoint, Vec<u8>>>,
8888
pub latest_update_good: Mutex<HashMap<OutPoint, bool>>,
8989
pub latest_updates_good_at_last_ser: Mutex<HashMap<OutPoint, bool>>,
9090
pub should_update_manager: atomic::AtomicBool,
9191
}
92-
impl TestChannelMonitor {
93-
pub fn new(chain_monitor: Arc<chaininterface::ChainWatchInterface>, broadcaster: Arc<chaininterface::BroadcasterInterface>, logger: Arc<Logger>, feeest: Arc<chaininterface::FeeEstimator>) -> Self {
92+
impl<'a> TestChannelMonitor<'a> {
93+
pub fn new(chain_monitor: Arc<chaininterface::ChainWatchInterface<'a>>, broadcaster: Arc<chaininterface::BroadcasterInterface>, logger: Arc<Logger>, feeest: Arc<chaininterface::FeeEstimator>) -> Self {
9494
Self {
9595
simple_monitor: channelmonitor::SimpleManyChannelMonitor::new(chain_monitor, broadcaster, logger, feeest),
9696
update_ret: Mutex::new(Ok(())),
@@ -101,7 +101,7 @@ impl TestChannelMonitor {
101101
}
102102
}
103103
}
104-
impl channelmonitor::ManyChannelMonitor for TestChannelMonitor {
104+
impl<'a> channelmonitor::ManyChannelMonitor for TestChannelMonitor<'a> {
105105
fn add_update_monitor(&self, funding_txo: OutPoint, monitor: channelmonitor::ChannelMonitor) -> Result<(), channelmonitor::ChannelMonitorUpdateErr> {
106106
let ret = self.update_ret.lock().unwrap().clone();
107107
if let Ok(()) = ret {
@@ -198,55 +198,6 @@ pub fn do_test(data: &[u8]) {
198198
} }
199199
}
200200

201-
macro_rules! reload_node {
202-
($ser: expr, $node_id: expr, $old_monitors: expr) => { {
203-
let logger: Arc<Logger> = Arc::new(test_logger::TestLogger::new($node_id.to_string()));
204-
let watch = Arc::new(ChainWatchInterfaceUtil::new(Network::Bitcoin, Arc::clone(&logger)));
205-
let monitor = Arc::new(TestChannelMonitor::new(watch.clone(), broadcast.clone(), logger.clone(), fee_est.clone()));
206-
207-
let keys_manager = Arc::new(KeyProvider { node_id: $node_id, session_id: atomic::AtomicU8::new(0), channel_id: atomic::AtomicU8::new(0) });
208-
let mut config = UserConfig::new();
209-
config.channel_options.fee_proportional_millionths = 0;
210-
config.channel_options.announced_channel = true;
211-
config.peer_channel_config_limits.min_dust_limit_satoshis = 0;
212-
213-
let mut monitors = HashMap::new();
214-
let mut old_monitors = $old_monitors.latest_good_update.lock().unwrap();
215-
for (outpoint, monitor_ser) in old_monitors.drain() {
216-
monitors.insert(outpoint, <(Sha256d, ChannelMonitor)>::read(&mut Cursor::new(&monitor_ser), Arc::clone(&logger)).expect("Failed to read monitor").1);
217-
monitor.latest_good_update.lock().unwrap().insert(outpoint, monitor_ser);
218-
}
219-
let mut monitor_refs = HashMap::new();
220-
for (outpoint, monitor) in monitors.iter() {
221-
monitor_refs.insert(*outpoint, monitor);
222-
}
223-
224-
let read_args = ChannelManagerReadArgs {
225-
keys_manager,
226-
fee_estimator: fee_est.clone(),
227-
monitor: monitor.clone(),
228-
chain_monitor: watch,
229-
tx_broadcaster: broadcast.clone(),
230-
logger,
231-
default_config: config,
232-
channel_monitors: &monitor_refs,
233-
};
234-
235-
let res = (<(Sha256d, ChannelManager)>::read(&mut Cursor::new(&$ser.0), read_args).expect("Failed to read manager").1, monitor);
236-
for (_, was_good) in $old_monitors.latest_updates_good_at_last_ser.lock().unwrap().iter() {
237-
if !was_good {
238-
// If the last time we updated a monitor we didn't successfully update (and we
239-
// have sense updated our serialized copy of the ChannelManager) we may
240-
// force-close the channel on our counterparty cause we know we're missing
241-
// something. Thus, we just return here since we can't continue to test.
242-
return;
243-
}
244-
}
245-
res
246-
} }
247-
}
248-
249-
250201
let mut channel_txn = Vec::new();
251202
macro_rules! make_channel {
252203
($source: expr, $dest: expr, $chan_id: expr) => { {
@@ -356,11 +307,11 @@ pub fn do_test(data: &[u8]) {
356307

357308
// 3 nodes is enough to hit all the possible cases, notably unknown-source-unknown-dest
358309
// forwarding.
359-
let (mut node_a, mut monitor_a) = make_node!(0);
360-
let (mut node_b, mut monitor_b) = make_node!(1);
361-
let (mut node_c, mut monitor_c) = make_node!(2);
310+
let (node_a, monitor_a) = make_node!(0);
311+
let (node_b, monitor_b) = make_node!(1);
312+
let (node_c, monitor_c) = make_node!(2);
362313

363-
let mut nodes = [node_a, node_b, node_c];
314+
let nodes = [node_a, node_b, node_c];
364315

365316
make_channel!(nodes[0], nodes[1], 0);
366317
make_channel!(nodes[1], nodes[2], 1);
@@ -688,10 +639,6 @@ pub fn do_test(data: &[u8]) {
688639
chan_a_disconnected = true;
689640
drain_msg_events_on_disconnect!(0);
690641
}
691-
let (new_node_a, new_monitor_a) = reload_node!(node_a_ser, 0, monitor_a);
692-
node_a = Arc::new(new_node_a);
693-
nodes[0] = node_a.clone();
694-
monitor_a = new_monitor_a;
695642
},
696643
0x20 => {
697644
if !chan_a_disconnected {
@@ -706,21 +653,13 @@ pub fn do_test(data: &[u8]) {
706653
nodes[2].get_and_clear_pending_msg_events();
707654
bc_events.clear();
708655
}
709-
let (new_node_b, new_monitor_b) = reload_node!(node_b_ser, 1, monitor_b);
710-
node_b = Arc::new(new_node_b);
711-
nodes[1] = node_b.clone();
712-
monitor_b = new_monitor_b;
713656
},
714657
0x21 => {
715658
if !chan_b_disconnected {
716659
nodes[1].peer_disconnected(&nodes[2].get_our_node_id(), false);
717660
chan_b_disconnected = true;
718661
drain_msg_events_on_disconnect!(2);
719662
}
720-
let (new_node_c, new_monitor_c) = reload_node!(node_c_ser, 2, monitor_c);
721-
node_c = Arc::new(new_node_c);
722-
nodes[2] = node_c.clone();
723-
monitor_c = new_monitor_c;
724663
},
725664
_ => test_return!(),
726665
}

fuzz/fuzz_targets/full_stack_target.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -144,9 +144,9 @@ impl<'a> Hash for Peer<'a> {
144144
}
145145
}
146146

147-
struct MoneyLossDetector<'a> {
148-
manager: Arc<ChannelManager>,
149-
monitor: Arc<channelmonitor::SimpleManyChannelMonitor<OutPoint>>,
147+
struct MoneyLossDetector<'a, 'b> {
148+
manager: Arc<ChannelManager<'b>>,
149+
monitor: Arc<channelmonitor::SimpleManyChannelMonitor<'b, OutPoint>>,
150150
handler: PeerManager<Peer<'a>>,
151151

152152
peers: &'a RefCell<[bool; 256]>,
@@ -157,8 +157,8 @@ struct MoneyLossDetector<'a> {
157157
max_height: usize,
158158
blocks_connected: u32,
159159
}
160-
impl<'a> MoneyLossDetector<'a> {
161-
pub fn new(peers: &'a RefCell<[bool; 256]>, manager: Arc<ChannelManager>, monitor: Arc<channelmonitor::SimpleManyChannelMonitor<OutPoint>>, handler: PeerManager<Peer<'a>>) -> Self {
160+
impl<'a, 'b> MoneyLossDetector<'a, 'b> {
161+
pub fn new(peers: &'a RefCell<[bool; 256]>, manager: Arc<ChannelManager<'b>>, monitor: Arc<channelmonitor::SimpleManyChannelMonitor<'b, OutPoint>>, handler: PeerManager<Peer<'a>>) -> Self {
162162
MoneyLossDetector {
163163
manager,
164164
monitor,
@@ -217,7 +217,7 @@ impl<'a> MoneyLossDetector<'a> {
217217
}
218218
}
219219

220-
impl<'a> Drop for MoneyLossDetector<'a> {
220+
impl<'a, 'b> Drop for MoneyLossDetector<'a, 'b> {
221221
fn drop(&mut self) {
222222
if !::std::thread::panicking() {
223223
// Disconnect all peers

fuzz/fuzz_targets/router_target.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ extern crate secp256k1;
55

66
use bitcoin_hashes::sha256d::Hash as Sha256dHash;
77
use bitcoin::blockdata::script::{Script, Builder};
8+
use bitcoin::blockdata::block::Block;
9+
use bitcoin::blockdata::transaction::Transaction;
810

911
use lightning::chain::chaininterface::{ChainError,ChainWatchInterface, ChainListener};
1012
use lightning::ln::channelmanager::ChannelDetails;
@@ -75,11 +77,14 @@ struct DummyChainWatcher {
7577
input: Arc<InputData>,
7678
}
7779

78-
impl ChainWatchInterface for DummyChainWatcher {
80+
impl<'a> ChainWatchInterface<'a> for DummyChainWatcher {
7981
fn install_watch_tx(&self, _txid: &Sha256dHash, _script_pub_key: &Script) { }
8082
fn install_watch_outpoint(&self, _outpoint: (Sha256dHash, u32), _out_script: &Script) { }
8183
fn watch_all_txn(&self) { }
82-
fn register_listener(&self, _listener: Weak<ChainListener>) { }
84+
fn filter_block(&self, _block: &'a Block) -> (Vec<&'a Transaction>, Vec<u32>) {
85+
(Vec::new(), Vec::new())
86+
}
87+
fn reentered(&self) -> usize { 0 }
8388

8489
fn get_chain_utxo(&self, _genesis_hash: Sha256dHash, _unspent_tx_output_identifier: u64) -> Result<(Script, u64), ChainError> {
8590
match self.input.get_slice(2) {

src/chain/chaininterface.rs

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ pub enum ChainError {
3434
/// Note that all of the functions implemented here *must* be reentrant-safe (obviously - they're
3535
/// called from inside the library in response to ChainListener events, P2P events, or timer
3636
/// events).
37-
pub trait ChainWatchInterface: Sync + Send {
37+
pub trait ChainWatchInterface<'a>: Sync + Send {
3838
/// Provides a txid/random-scriptPubKey-in-the-tx which much be watched for.
3939
fn install_watch_tx(&self, txid: &Sha256dHash, script_pub_key: &Script);
4040

@@ -50,6 +50,15 @@ pub trait ChainWatchInterface: Sync + Send {
5050
/// bytes are the block height, the next 3 the transaction index within the block, and the
5151
/// final two the output within the transaction.
5252
fn get_chain_utxo(&self, genesis_hash: Sha256dHash, unspent_tx_output_identifier: u64) -> Result<(Script, u64), ChainError>;
53+
54+
/// Gets the list of transactions and transaction indices that the ChainWatchInterface is
55+
/// watching for.
56+
fn filter_block(&self, block: &'a Block) -> (Vec<&'a Transaction>, Vec<u32>);
57+
58+
/// Returns a usize that changes when the ChainWatchInterface's watched data is modified.
59+
/// Users of `filter_block` should pre-save a copy of `reentered`'s return value and use it to
60+
/// determine whether they need to re-filter a given block.
61+
fn reentered(&self) -> usize;
5362
}
5463

5564
/// An interface to send a transaction to the Bitcoin network.
@@ -273,7 +282,7 @@ pub struct ChainWatchInterfaceUtil {
273282
}
274283

275284
/// Register listener
276-
impl ChainWatchInterface for ChainWatchInterfaceUtil {
285+
impl<'a> ChainWatchInterface<'a> for ChainWatchInterfaceUtil {
277286
fn install_watch_tx(&self, txid: &Sha256dHash, script_pub_key: &Script) {
278287
let mut watched = self.watched.lock().unwrap();
279288
if watched.register_tx(txid, script_pub_key) {
@@ -301,6 +310,25 @@ impl ChainWatchInterface for ChainWatchInterfaceUtil {
301310
}
302311
Err(ChainError::NotSupported)
303312
}
313+
314+
fn filter_block(&self, block: &'a Block) -> (Vec<&'a Transaction>, Vec<u32>) {
315+
let mut matched = Vec::new();
316+
let mut matched_index = Vec::new();
317+
{
318+
let watched = self.watched.lock().unwrap();
319+
for (index, transaction) in block.txdata.iter().enumerate() {
320+
if self.does_match_tx_unguarded(transaction, &watched) {
321+
matched.push(transaction);
322+
matched_index.push(index as u32);
323+
}
324+
}
325+
}
326+
(matched, matched_index)
327+
}
328+
329+
fn reentered(&self) -> usize {
330+
self.reentered.load(Ordering::Relaxed)
331+
}
304332
}
305333

306334
impl ChainWatchInterfaceUtil {

src/ln/channelmanager.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -318,11 +318,11 @@ const ERR: () = "You need at least 32 bit pointers (well, usize, but we'll assum
318318
/// the "reorg path" (ie call block_disconnected() until you get to a common block and then call
319319
/// block_connected() to step towards your best block) upon deserialization before using the
320320
/// object!
321-
pub struct ChannelManager {
321+
pub struct ChannelManager<'a> {
322322
default_configuration: UserConfig,
323323
genesis_hash: Sha256dHash,
324324
fee_estimator: Arc<FeeEstimator>,
325-
monitor: Arc<ManyChannelMonitor>,
325+
monitor: Arc<ManyChannelMonitor + 'a>,
326326
tx_broadcaster: Arc<BroadcasterInterface>,
327327

328328
#[cfg(test)]
@@ -575,7 +575,7 @@ macro_rules! maybe_break_monitor_err {
575575
}
576576
}
577577

578-
impl ChannelManager {
578+
impl<'a> ChannelManager<'a> {
579579
/// Constructs a new ChannelManager to hold several channels and route between them.
580580
///
581581
/// This is the main "logic hub" for all channel-related actions, and implements
@@ -584,7 +584,7 @@ impl ChannelManager {
584584
/// Non-proportional fees are fixed according to our risk using the provided fee estimator.
585585
///
586586
/// panics if channel_value_satoshis is >= `MAX_FUNDING_SATOSHIS`!
587-
pub fn new(network: Network, feeest: Arc<FeeEstimator>, monitor: Arc<ManyChannelMonitor>, chain_monitor: Arc<ChainWatchInterface>, tx_broadcaster: Arc<BroadcasterInterface>, logger: Arc<Logger>,keys_manager: Arc<KeysInterface>, config: UserConfig) -> Result<Arc<ChannelManager>, secp256k1::Error> {
587+
pub fn new(network: Network, feeest: Arc<FeeEstimator>, monitor: Arc<ManyChannelMonitor + 'a>, tx_broadcaster: Arc<BroadcasterInterface>, logger: Arc<Logger>,keys_manager: Arc<KeysInterface>, config: UserConfig) -> Result<Arc<ChannelManager<'a>>, secp256k1::Error> {
588588
let secp_ctx = Secp256k1::new();
589589

590590
let res = Arc::new(ChannelManager {
@@ -2457,7 +2457,7 @@ impl ChannelManager {
24572457
}
24582458
}
24592459

2460-
impl events::MessageSendEventsProvider for ChannelManager {
2460+
impl<'a> events::MessageSendEventsProvider for ChannelManager<'a> {
24612461
fn get_and_clear_pending_msg_events(&self) -> Vec<events::MessageSendEvent> {
24622462
// TODO: Event release to users and serialization is currently race-y: it's very easy for a
24632463
// user to serialize a ChannelManager with pending events in it and lose those events on
@@ -2482,7 +2482,7 @@ impl events::MessageSendEventsProvider for ChannelManager {
24822482
}
24832483
}
24842484

2485-
impl events::EventsProvider for ChannelManager {
2485+
impl<'a> events::EventsProvider for ChannelManager<'a> {
24862486
fn get_and_clear_pending_events(&self) -> Vec<events::Event> {
24872487
// TODO: Event release to users and serialization is currently race-y: it's very easy for a
24882488
// user to serialize a ChannelManager with pending events in it and lose those events on
@@ -2507,7 +2507,7 @@ impl events::EventsProvider for ChannelManager {
25072507
}
25082508
}
25092509

2510-
impl ChainListener for ChannelManager {
2510+
impl<'a> ChainListener for ChannelManager<'a> {
25112511
fn block_connected(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], indexes_of_txn_matched: &[u32]) {
25122512
let header_hash = header.bitcoin_hash();
25132513
log_trace!(self, "Block {} at height {} connected with {} txn matched", header_hash, height, txn_matched.len());
@@ -2621,7 +2621,7 @@ impl ChainListener for ChannelManager {
26212621
}
26222622
}
26232623

2624-
impl ChannelMessageHandler for ChannelManager {
2624+
impl<'a> ChannelMessageHandler for ChannelManager<'a> {
26252625
//TODO: Handle errors and close channel (or so)
26262626
fn handle_open_channel(&self, their_node_id: &PublicKey, their_local_features: LocalFeatures, msg: &msgs::OpenChannel) -> Result<(), HandleError> {
26272627
let _ = self.total_consistency_lock.read().unwrap();
@@ -3006,7 +3006,7 @@ impl<R: ::std::io::Read> Readable<R> for HTLCForwardInfo {
30063006
}
30073007
}
30083008

3009-
impl Writeable for ChannelManager {
3009+
impl<'a> Writeable for ChannelManager<'a> {
30103010
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ::std::io::Error> {
30113011
let _ = self.total_consistency_lock.write().unwrap();
30123012

@@ -3084,7 +3084,7 @@ pub struct ChannelManagerReadArgs<'a> {
30843084
/// No calls to the ManyChannelMonitor will be made during deserialization. It is assumed that
30853085
/// you have deserialized ChannelMonitors separately and will add them to your
30863086
/// ManyChannelMonitor after deserializing this ChannelManager.
3087-
pub monitor: Arc<ManyChannelMonitor>,
3087+
pub monitor: Arc<ManyChannelMonitor + 'a>,
30883088

30893089
/// The BroadcasterInterface which will be used in the ChannelManager in the future and may be
30903090
/// used to broadcast the latest local commitment transactions of channels which must be
@@ -3110,7 +3110,7 @@ pub struct ChannelManagerReadArgs<'a> {
31103110
pub channel_monitors: &'a HashMap<OutPoint, &'a ChannelMonitor>,
31113111
}
31123112

3113-
impl<'a, R : ::std::io::Read> ReadableArgs<R, ChannelManagerReadArgs<'a>> for (Sha256dHash, ChannelManager) {
3113+
impl<'a, R : ::std::io::Read> ReadableArgs<R, ChannelManagerReadArgs<'a>> for (Sha256dHash, ChannelManager<'a>) {
31143114
fn read(reader: &mut R, args: ChannelManagerReadArgs<'a>) -> Result<Self, DecodeError> {
31153115
let _ver: u8 = Readable::read(reader)?;
31163116
let min_ver: u8 = Readable::read(reader)?;

0 commit comments

Comments
 (0)