Skip to content
Closed
10 changes: 7 additions & 3 deletions lightning-background-processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
use lightning::chain::chainmonitor::ChainMonitor;
use lightning::chain::channelmonitor;
use lightning::chain::keysinterface::{Sign, KeysInterface};
use lightning::chain::utxointerface::UtxoPool;
use lightning::ln::channelmanager::ChannelManager;
use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler};
use lightning::ln::peer_handler::{PeerManager, SocketDescriptor};
Expand Down Expand Up @@ -111,12 +112,13 @@ impl BackgroundProcessor {
F: 'static + Deref + Send + Sync,
L: 'static + Deref + Send + Sync,
P: 'static + Deref + Send + Sync,
U: 'static + Deref + Send + Sync,
Descriptor: 'static + SocketDescriptor + Send + Sync,
CMH: 'static + Deref + Send + Sync,
RMH: 'static + Deref + Send + Sync,
EH: 'static + EventHandler + Send + Sync,
CMP: 'static + Send + ChannelManagerPersister<Signer, CW, T, K, F, L>,
M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P, U>> + Send + Sync,
CM: 'static + Deref<Target = ChannelManager<Signer, CW, T, K, F, L>> + Send + Sync,
PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L>> + Send + Sync,
>
Expand All @@ -128,6 +130,7 @@ impl BackgroundProcessor {
K::Target: 'static + KeysInterface<Signer = Signer>,
F::Target: 'static + FeeEstimator,
L::Target: 'static + Logger,
U::Target: 'static + UtxoPool,
P::Target: 'static + channelmonitor::Persist<Signer>,
CMH::Target: 'static + ChannelMessageHandler,
RMH::Target: 'static + RoutingMessageHandler,
Expand Down Expand Up @@ -207,7 +210,7 @@ mod tests {
fn disconnect_socket(&mut self) {}
}

type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>, Arc<test_utils::TestPool>>;

struct Node {
node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
Expand Down Expand Up @@ -241,13 +244,14 @@ mod tests {
let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))});
let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: 253 });
let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
let utxo_pool = Arc::new(test_utils::TestPool::new());
let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
let seed = [i as u8; 32];
let network = Network::Testnet;
let now = Duration::from_secs(genesis_block(network).header.time as u64);
let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone(), utxo_pool.clone()));
let best_block = BestBlock::from_genesis(network);
let params = ChainParameters { network, best_block };
let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), logger.clone(), keys_manager.clone(), UserConfig::default(), params));
Expand Down
4 changes: 3 additions & 1 deletion lightning-block-sync/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ BlockSourceResult<ValidatedBlockHeader> {
/// use lightning::chain::chaininterface::FeeEstimator;
/// use lightning::chain::keysinterface;
/// use lightning::chain::keysinterface::KeysInterface;
/// use lightning::chain::utxointerface::UtxoPool;
/// use lightning::ln::channelmanager::ChannelManager;
/// use lightning::ln::channelmanager::ChannelManagerReadArgs;
/// use lightning::util::config::UserConfig;
Expand All @@ -64,11 +65,12 @@ BlockSourceResult<ValidatedBlockHeader> {
/// T: BroadcasterInterface,
/// F: FeeEstimator,
/// L: Logger,
/// U: UtxoPool,
/// C: chain::Filter,
/// P: channelmonitor::Persist<S>,
/// >(
/// block_source: &mut B,
/// chain_monitor: &ChainMonitor<S, &C, &T, &F, &L, &P>,
/// chain_monitor: &ChainMonitor<S, &C, &T, &F, &L, &P, &U>,
/// config: UserConfig,
/// keys_manager: &K,
/// tx_broadcaster: &T,
Expand Down
3 changes: 2 additions & 1 deletion lightning-net-tokio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@
//! type ChainAccess = dyn lightning::chain::Access + Send + Sync;
//! type ChainFilter = dyn lightning::chain::Filter + Send + Sync;
//! type DataPersister = dyn lightning::chain::channelmonitor::Persist<lightning::chain::keysinterface::InMemorySigner> + Send + Sync;
//! type ChainMonitor = lightning::chain::chainmonitor::ChainMonitor<lightning::chain::keysinterface::InMemorySigner, Arc<ChainFilter>, Arc<TxBroadcaster>, Arc<FeeEstimator>, Arc<Logger>, Arc<DataPersister>>;
//! type UtxoPool = dyn lightning::chain::utxointerface::UtxoPool + Send + Sync;
//! type ChainMonitor = lightning::chain::chainmonitor::ChainMonitor<lightning::chain::keysinterface::InMemorySigner, Arc<ChainFilter>, Arc<TxBroadcaster>, Arc<FeeEstimator>, Arc<Logger>, Arc<DataPersister>, Arc<UtxoPool>>;
//! type ChannelManager = Arc<lightning::ln::channelmanager::SimpleArcChannelManager<ChainMonitor, TxBroadcaster, FeeEstimator, Logger>>;
//! type PeerManager = Arc<lightning::ln::peer_handler::SimpleArcPeerManager<lightning_net_tokio::SocketDescriptor, ChainMonitor, TxBroadcaster, FeeEstimator, ChainAccess, Logger>>;
//!
Expand Down
4 changes: 2 additions & 2 deletions lightning-persister/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,8 @@ mod tests {
let persister_1 = FilesystemPersister::new("test_filesystem_persister_1".to_string());
let chanmon_cfgs = create_chanmon_cfgs(2);
let mut node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
let chain_mon_0 = test_utils::TestChainMonitor::new(Some(&chanmon_cfgs[0].chain_source), &chanmon_cfgs[0].tx_broadcaster, &chanmon_cfgs[0].logger, &chanmon_cfgs[0].fee_estimator, &persister_0, &node_cfgs[0].keys_manager);
let chain_mon_1 = test_utils::TestChainMonitor::new(Some(&chanmon_cfgs[1].chain_source), &chanmon_cfgs[1].tx_broadcaster, &chanmon_cfgs[1].logger, &chanmon_cfgs[1].fee_estimator, &persister_1, &node_cfgs[1].keys_manager);
let chain_mon_0 = test_utils::TestChainMonitor::new(Some(&chanmon_cfgs[0].chain_source), &chanmon_cfgs[0].tx_broadcaster, &chanmon_cfgs[0].logger, &chanmon_cfgs[0].fee_estimator, &persister_0, &node_cfgs[0].keys_manager, &chanmon_cfgs[0].utxo_pool);
let chain_mon_1 = test_utils::TestChainMonitor::new(Some(&chanmon_cfgs[1].chain_source), &chanmon_cfgs[1].tx_broadcaster, &chanmon_cfgs[1].logger, &chanmon_cfgs[1].fee_estimator, &persister_1, &node_cfgs[1].keys_manager, &chanmon_cfgs[0].utxo_pool);
node_cfgs[0].chain_monitor = chain_mon_0;
node_cfgs[1].chain_monitor = chain_mon_1;
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
Expand Down
49 changes: 31 additions & 18 deletions lightning/src/chain/chainmonitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use chain::channelmonitor;
use chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateErr, MonitorEvent, Persist, TransactionOutputs};
use chain::transaction::{OutPoint, TransactionData};
use chain::keysinterface::Sign;
use chain::utxointerface::UtxoPool;
use util::logger::Logger;
use util::events;
use util::events::EventHandler;
Expand All @@ -51,12 +52,13 @@ use core::ops::Deref;
///
/// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
/// [module-level documentation]: crate::chain::chainmonitor
pub struct ChainMonitor<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref>
pub struct ChainMonitor<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref, U: Deref>
where C::Target: chain::Filter,
T::Target: BroadcasterInterface,
F::Target: FeeEstimator,
L::Target: Logger,
P::Target: channelmonitor::Persist<ChannelSigner>,
U::Target: UtxoPool,
{
/// The monitors
pub monitors: RwLock<HashMap<OutPoint, ChannelMonitor<ChannelSigner>>>,
Expand All @@ -65,14 +67,16 @@ pub struct ChainMonitor<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: De
logger: L,
fee_estimator: F,
persister: P,
utxo_pool: U,
}

impl<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref> ChainMonitor<ChannelSigner, C, T, F, L, P>
impl<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref, U: Deref> ChainMonitor<ChannelSigner, C, T, F, L, P, U>
where C::Target: chain::Filter,
T::Target: BroadcasterInterface,
F::Target: FeeEstimator,
L::Target: Logger,
P::Target: channelmonitor::Persist<ChannelSigner>,
U::Target: UtxoPool
{
/// Dispatches to per-channel monitors, which are responsible for updating their on-chain view
/// of a channel and reacting accordingly based on transactions in the given chain data. See
Expand Down Expand Up @@ -130,14 +134,15 @@ where C::Target: chain::Filter,
/// pre-filter blocks or only fetch blocks matching a compact filter. Otherwise, clients may
/// always need to fetch full blocks absent another means for determining which blocks contain
/// transactions relevant to the watched channels.
pub fn new(chain_source: Option<C>, broadcaster: T, logger: L, feeest: F, persister: P) -> Self {
pub fn new(chain_source: Option<C>, broadcaster: T, logger: L, feeest: F, persister: P, utxo_pool: U) -> Self {
Self {
monitors: RwLock::new(HashMap::new()),
chain_source,
broadcaster,
logger,
fee_estimator: feeest,
persister,
utxo_pool
}
}

Expand All @@ -151,53 +156,55 @@ where C::Target: chain::Filter,
}
}

impl<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref>
chain::Listen for ChainMonitor<ChannelSigner, C, T, F, L, P>
impl<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref, U: Deref>
chain::Listen for ChainMonitor<ChannelSigner, C, T, F, L, P, U>
where
C::Target: chain::Filter,
T::Target: BroadcasterInterface,
F::Target: FeeEstimator,
L::Target: Logger,
P::Target: channelmonitor::Persist<ChannelSigner>,
U::Target: UtxoPool
{
fn block_connected(&self, block: &Block, height: u32) {
let header = &block.header;
let txdata: Vec<_> = block.txdata.iter().enumerate().collect();
self.process_chain_data(header, &txdata, |monitor, txdata| {
monitor.block_connected(
header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger)
header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger, &*self.utxo_pool)
});
}

fn block_disconnected(&self, header: &BlockHeader, height: u32) {
let monitors = self.monitors.read().unwrap();
for monitor in monitors.values() {
monitor.block_disconnected(
header, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger);
header, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger, &*self.utxo_pool);
}
}
}

impl<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref>
chain::Confirm for ChainMonitor<ChannelSigner, C, T, F, L, P>
impl<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref, U: Deref>
chain::Confirm for ChainMonitor<ChannelSigner, C, T, F, L, P, U>
where
C::Target: chain::Filter,
T::Target: BroadcasterInterface,
F::Target: FeeEstimator,
L::Target: Logger,
P::Target: channelmonitor::Persist<ChannelSigner>,
U::Target: UtxoPool,
{
fn transactions_confirmed(&self, header: &BlockHeader, txdata: &TransactionData, height: u32) {
self.process_chain_data(header, txdata, |monitor, txdata| {
monitor.transactions_confirmed(
header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger)
header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger, &*self.utxo_pool)
});
}

fn transaction_unconfirmed(&self, txid: &Txid) {
let monitors = self.monitors.read().unwrap();
for monitor in monitors.values() {
monitor.transaction_unconfirmed(txid, &*self.broadcaster, &*self.fee_estimator, &*self.logger);
monitor.transaction_unconfirmed(txid, &*self.broadcaster, &*self.fee_estimator, &*self.logger, &*self.utxo_pool);
}
}

Expand All @@ -207,7 +214,7 @@ where
// it's still possible if a chain::Filter implementation returns a transaction.
debug_assert!(txdata.is_empty());
monitor.best_block_updated(
header, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger)
header, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger, &*self.utxo_pool)
});
}

Expand All @@ -224,13 +231,14 @@ where
}
}

impl<ChannelSigner: Sign, C: Deref , T: Deref , F: Deref , L: Deref , P: Deref >
chain::Watch<ChannelSigner> for ChainMonitor<ChannelSigner, C, T, F, L, P>
impl<ChannelSigner: Sign, C: Deref , T: Deref , F: Deref , L: Deref , P: Deref, U: Deref>
chain::Watch<ChannelSigner> for ChainMonitor<ChannelSigner, C, T, F, L, P, U>
where C::Target: chain::Filter,
T::Target: BroadcasterInterface,
F::Target: FeeEstimator,
L::Target: Logger,
P::Target: channelmonitor::Persist<ChannelSigner>,
U::Target: UtxoPool,
{
/// Adds the monitor that watches the channel referred to by the given outpoint.
///
Expand Down Expand Up @@ -281,7 +289,7 @@ where C::Target: chain::Filter,
},
Some(monitor) => {
log_trace!(self.logger, "Updating Channel Monitor for channel {}", log_funding_info!(monitor));
let update_res = monitor.update_monitor(&update, &self.broadcaster, &self.fee_estimator, &self.logger);
let update_res = monitor.update_monitor(&update, &self.broadcaster, &self.fee_estimator, &self.logger, &self.utxo_pool);
if let Err(e) = &update_res {
log_error!(self.logger, "Failed to update channel monitor: {:?}", e);
}
Expand Down Expand Up @@ -309,12 +317,13 @@ where C::Target: chain::Filter,
}
}

impl<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref> events::EventsProvider for ChainMonitor<ChannelSigner, C, T, F, L, P>
impl<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref, U: Deref> events::EventsProvider for ChainMonitor<ChannelSigner, C, T, F, L, P, U>
where C::Target: chain::Filter,
T::Target: BroadcasterInterface,
F::Target: FeeEstimator,
L::Target: Logger,
P::Target: channelmonitor::Persist<ChannelSigner>,
U::Target: UtxoPool
{
/// Processes [`SpendableOutputs`] events produced from each [`ChannelMonitor`] upon maturity.
///
Expand Down Expand Up @@ -366,10 +375,14 @@ mod tests {
};

// Set expectations on nodes[1]'s chain source to return dependent transactions.
let htlc_output = TxOutReference(commitment_tx.clone(), 0);
let to_local_output = TxOutReference(commitment_tx.clone(), 1);
let anchor_a_output = TxOutReference(commitment_tx.clone(), 0);
let anchor_b_output = TxOutReference(commitment_tx.clone(), 1);
let htlc_output = TxOutReference(commitment_tx.clone(), 2);
let to_local_output = TxOutReference(commitment_tx.clone(), 3);
let htlc_timeout_output = TxOutReference(htlc_tx.clone(), 0);
nodes[1].chain_source
.expect(OnRegisterOutput { with: anchor_a_output, returns: None })
.expect(OnRegisterOutput { with: anchor_b_output, returns: None })
.expect(OnRegisterOutput { with: htlc_output, returns: Some((1, htlc_tx)) })
.expect(OnRegisterOutput { with: to_local_output, returns: None })
.expect(OnRegisterOutput { with: htlc_timeout_output, returns: None });
Expand Down
Loading