Skip to content

Commit 2a08045

Browse files
Aditya SharmaAditya Sharma
Aditya Sharma
authored and
Aditya Sharma
committed
chainmonitor: Add persistence logic for StubChannelMonitor and appropriate helpers to reload it.
1 parent f80b81b commit 2a08045

File tree

6 files changed

+259
-9
lines changed

6 files changed

+259
-9
lines changed

fuzz/src/chanmon_consistency.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ use lightning::chain::{
4545
};
4646
use lightning::events;
4747
use lightning::events::MessageSendEventsProvider;
48+
use lightning::ln::{ChannelId, PaymentHash, PaymentPreimage, PaymentSecret};
49+
use lightning::ln::channelmanager::{ChainParameters, ChannelDetails, ChannelManager, PaymentSendFailure, ChannelManagerReadArgs, PaymentId, RecipientOnionFields, StubChannel};
4850
use lightning::ln::channel::FEE_SPIKE_BUFFER_FEE_INCREASE_MULTIPLE;
4951
use lightning::ln::channel_state::ChannelDetails;
5052
use lightning::ln::channelmanager::{
@@ -285,6 +287,10 @@ impl chain::Watch<TestChannelSigner> for TestChainMonitor {
285287
) -> Vec<(OutPoint, ChannelId, Vec<MonitorEvent>, Option<PublicKey>)> {
286288
return self.chain_monitor.release_pending_monitor_events();
287289
}
290+
291+
fn watch_dummy(&self, funding_outpoint: OutPoint, stub_monitor: StubChannelMonitor<ChannelSigner>) -> Result<(), ()> {
292+
return self.chain_monitor.watch_dummy(funding_outpoint, monitor);
293+
}
288294
}
289295

290296
struct KeyProvider {

lightning/src/chain/chainmonitor.rs

Lines changed: 118 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use bitcoin::hash_types::{Txid, BlockHash};
2929
use crate::chain;
3030
use crate::chain::{ChannelMonitorUpdateStatus, Filter, WatchedOutput};
3131
use crate::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
32-
use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs, WithChannelMonitor};
32+
use crate::chain::channelmonitor::{ChannelMonitor, StubChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs, WithChannelMonitor, LATENCY_GRACE_PERIOD_BLOCKS};
3333
use crate::chain::transaction::{OutPoint, TransactionData};
3434
use crate::ln::types::ChannelId;
3535
use crate::sign::ecdsa::EcdsaChannelSigner;
@@ -161,6 +161,9 @@ pub trait Persist<ChannelSigner: EcdsaChannelSigner> {
161161
/// Archiving the data in a backup location (rather than deleting it fully) is useful for
162162
/// hedging against data loss in case of unexpected failure.
163163
fn archive_persisted_channel(&self, channel_funding_outpoint: OutPoint);
164+
/// Persist a new channel's data in response to a [`chain::Watch::watch_dummy`] call. This is
165+
/// called by [`ChannelManager`] for new stub channels received from peer storage backup,
166+
fn persist_new_stub_channel(&self, funding_txo: OutPoint, stub_monitor: &StubChannelMonitor<ChannelSigner>) -> Result<(), std::io::Error>;
164167
}
165168

166169
struct MonitorHolder<ChannelSigner: EcdsaChannelSigner> {
@@ -189,6 +192,19 @@ pub struct LockedChannelMonitor<'a, ChannelSigner: EcdsaChannelSigner> {
189192
funding_txo: OutPoint,
190193
}
191194

195+
/// A read-only reference to a current StubChannelMonitors similar to [LockedChannelMonitor]
196+
pub struct LockedStubChannelMonitor<'a, ChannelSigner: EcdsaChannelSigner> {
197+
lock: RwLockReadGuard<'a, HashMap<OutPoint, StubChannelMonitor<ChannelSigner>>>,
198+
funding_txo: OutPoint,
199+
}
200+
201+
impl<ChannelSigner: EcdsaChannelSigner> Deref for LockedStubChannelMonitor<'_, ChannelSigner> {
202+
type Target = StubChannelMonitor<ChannelSigner>;
203+
fn deref(&self) -> &StubChannelMonitor<ChannelSigner> {
204+
&self.lock.get(&self.funding_txo).expect("Checked at construction")
205+
}
206+
}
207+
192208
impl<ChannelSigner: EcdsaChannelSigner> Deref for LockedChannelMonitor<'_, ChannelSigner> {
193209
type Target = ChannelMonitor<ChannelSigner>;
194210
fn deref(&self) -> &ChannelMonitor<ChannelSigner> {
@@ -220,6 +236,8 @@ pub struct ChainMonitor<ChannelSigner: EcdsaChannelSigner, C: Deref, T: Deref, F
220236
P::Target: Persist<ChannelSigner>,
221237
{
222238
monitors: RwLock<HashMap<OutPoint, MonitorHolder<ChannelSigner>>>,
239+
stub_monitors: RwLock<HashMap<OutPoint, StubChannelMonitor<ChannelSigner>>>,
240+
223241
chain_source: Option<C>,
224242
broadcaster: T,
225243
logger: L,
@@ -254,9 +272,10 @@ where C::Target: chain::Filter,
254272
/// updated `txdata`.
255273
///
256274
/// Calls which represent a new blockchain tip height should set `best_height`.
257-
fn process_chain_data<FN>(&self, header: &Header, best_height: Option<u32>, txdata: &TransactionData, process: FN)
275+
fn process_chain_data<FN, SN>(&self, header: &Header, best_height: Option<u32>, txdata: &TransactionData, process: FN, stub_process: SN)
258276
where
259-
FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs>
277+
FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs>,
278+
SN: Fn(&StubChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs>
260279
{
261280
let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down.";
262281
let funding_outpoints = hash_set_from_iter(self.monitors.read().unwrap().keys().cloned());
@@ -286,6 +305,14 @@ where C::Target: chain::Filter,
286305
}
287306
}
288307

308+
let stub_monitors = self.stub_monitors.write().unwrap();
309+
for (funding_outpoint, stub_monitor) in stub_monitors.iter() {
310+
if self.update_stub_with_chain_data(header, best_height, txdata, &stub_process, funding_outpoint, stub_monitor).is_err() {
311+
log_error!(self.logger, "{}", err_str);
312+
panic!("{}", err_str);
313+
};
314+
}
315+
289316
if let Some(height) = best_height {
290317
// If the best block height is being updated, update highest_chain_height under the
291318
// monitors write lock.
@@ -297,6 +324,34 @@ where C::Target: chain::Filter,
297324
}
298325
}
299326

327+
fn update_stub_with_chain_data<SN>(&self, header: &Header, _best_height: Option<u32>, txdata: &TransactionData, stub_process: SN,
328+
_funding_outpoint: &OutPoint, stub_monitor: &StubChannelMonitor<ChannelSigner>) -> Result<(), ()>
329+
where SN: Fn(&StubChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs> {
330+
let logger = WithChannelMonitor::from_stub(&self.logger, stub_monitor);
331+
let mut txn_outputs;
332+
{
333+
txn_outputs = stub_process(stub_monitor, txdata);
334+
}
335+
336+
if let Some(ref chain_source) = self.chain_source {
337+
let block_hash = header.block_hash();
338+
for (txid, mut outputs) in txn_outputs.drain(..) {
339+
for (idx, output) in outputs.drain(..) {
340+
// Register any new outputs with the chain source for filtering
341+
let output = WatchedOutput {
342+
block_hash: Some(block_hash),
343+
outpoint: OutPoint { txid, index: idx as u16 },
344+
script_pubkey: output.script_pubkey,
345+
};
346+
log_trace!(logger, "Adding monitoring for spends of outpoint from stub {} to the filter", output.outpoint);
347+
chain_source.register_output(output);
348+
}
349+
}
350+
}
351+
352+
Ok(())
353+
}
354+
300355
fn update_monitor_with_chain_data<FN>(
301356
&self, header: &Header, best_height: Option<u32>, txdata: &TransactionData, process: FN, funding_outpoint: &OutPoint,
302357
monitor_state: &MonitorHolder<ChannelSigner>, channel_count: usize,
@@ -366,6 +421,7 @@ where C::Target: chain::Filter,
366421
pub fn new(chain_source: Option<C>, broadcaster: T, logger: L, feeest: F, persister: P) -> Self {
367422
Self {
368423
monitors: RwLock::new(new_hash_map()),
424+
stub_monitors: RwLock::new(new_hash_map()),
369425
chain_source,
370426
broadcaster,
371427
logger,
@@ -416,6 +472,15 @@ where C::Target: chain::Filter,
416472
}
417473
}
418474

475+
pub fn get_stub_monitor(&self, funding_txo: OutPoint) -> Result<LockedStubChannelMonitor<'_, ChannelSigner>, ()> {
476+
let lock = self.stub_monitors.read().unwrap();
477+
if lock.get(&funding_txo).is_some() {
478+
Ok(LockedStubChannelMonitor { lock, funding_txo })
479+
} else {
480+
Err(())
481+
}
482+
}
483+
419484
/// Lists the funding outpoint and channel ID of each [`ChannelMonitor`] being monitored.
420485
///
421486
/// Note that [`ChannelMonitor`]s are not removed when a channel is closed as they are always
@@ -427,6 +492,13 @@ where C::Target: chain::Filter,
427492
}).collect()
428493
}
429494

495+
pub fn list_stub_monitors(&self) -> Vec<(OutPoint, ChannelId)> {
496+
self.stub_monitors.read().unwrap().iter().map(|(outpoint, stub_monitor)| {
497+
let channel_id = stub_monitor.channel_id();
498+
(*outpoint, channel_id)
499+
}).collect()
500+
}
501+
430502
#[cfg(not(c_bindings))]
431503
/// Lists the pending updates for each [`ChannelMonitor`] (by `OutPoint` being monitored).
432504
/// Each `Vec<u64>` contains `update_id`s from [`ChannelMonitor::get_latest_update_id`] for updates
@@ -658,6 +730,9 @@ where
658730
self.process_chain_data(header, Some(height), &txdata, |monitor, txdata| {
659731
monitor.block_connected(
660732
header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &self.logger)
733+
}, |stub_monitor, txdata| {
734+
stub_monitor.block_connected(
735+
header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &self.logger)
661736
});
662737
// Assume we may have some new events and wake the event processor
663738
self.event_notifier.notify();
@@ -687,6 +762,9 @@ where
687762
self.process_chain_data(header, None, txdata, |monitor, txdata| {
688763
monitor.transactions_confirmed(
689764
header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &self.logger)
765+
}, |stub_monitor, txdata| {
766+
stub_monitor.transactions_confirmed(
767+
header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &self.logger)
690768
});
691769
// Assume we may have some new events and wake the event processor
692770
self.event_notifier.notify();
@@ -709,6 +787,10 @@ where
709787
monitor.best_block_updated(
710788
header, height, &*self.broadcaster, &*self.fee_estimator, &self.logger
711789
)
790+
}, |stub_monitor, txdata| {
791+
debug_assert!(txdata.is_empty());
792+
stub_monitor.best_block_updated(
793+
header, height, &*self.broadcaster, &*self.fee_estimator, &self.logger)
712794
});
713795
// Assume we may have some new events and wake the event processor
714796
self.event_notifier.notify();
@@ -735,6 +817,39 @@ where C::Target: chain::Filter,
735817
L::Target: Logger,
736818
P::Target: Persist<ChannelSigner>,
737819
{
820+
fn watch_dummy(&self, funding_outpoint: OutPoint, stub_monitor: StubChannelMonitor<ChannelSigner>) -> Result<(), ()> {
821+
let logger = WithChannelMonitor::from_stub(&self.logger, &stub_monitor);
822+
let mut monitors = self.monitors.write().unwrap();
823+
match monitors.entry(funding_outpoint) {
824+
hash_map::Entry::Occupied(_) => {
825+
log_error!(logger, "Failed to add new channel data: channel monitor for given outpoint is already present");
826+
return Err(());
827+
},
828+
hash_map::Entry::Vacant(e) => e,
829+
};
830+
let mut stub_monitors = self.stub_monitors.write().unwrap();
831+
let entry = match stub_monitors.entry(funding_outpoint) {
832+
hash_map::Entry::Occupied(_) => {
833+
log_error!(logger, "Failed to add new channel data: channel monitor for given outpoint is already present");
834+
return Err(());
835+
},
836+
hash_map::Entry::Vacant(e) => e,
837+
};
838+
log_trace!(logger, "Got new StubChannelMonitor for channel {}", stub_monitor.channel_id());
839+
let persist_res = self.persister.persist_new_stub_channel(funding_outpoint, &stub_monitor);
840+
841+
if persist_res.is_err() {
842+
log_error!(logger, "Failed to add new dummy channel data");
843+
return Err(());
844+
}
845+
if let Some(ref chain_source) = self.chain_source {
846+
stub_monitor.load_outputs_to_watch(chain_source , &self.logger);
847+
}
848+
entry.insert(stub_monitor);
849+
850+
Ok(())
851+
}
852+
738853
fn watch_channel(&self, funding_outpoint: OutPoint, monitor: ChannelMonitor<ChannelSigner>) -> Result<ChannelMonitorUpdateStatus, ()> {
739854
let logger = WithChannelMonitor::from(&self.logger, &monitor, None);
740855
let mut monitors = self.monitors.write().unwrap();

lightning/src/chain/mod.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use bitcoin::hash_types::{BlockHash, Txid};
1616
use bitcoin::network::Network;
1717
use bitcoin::secp256k1::PublicKey;
1818

19-
use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, MonitorEvent};
19+
use crate::chain::channelmonitor::{ChannelMonitor, StubChannelMonitor, ChannelMonitorUpdate, MonitorEvent};
2020
use crate::ln::types::ChannelId;
2121
use crate::sign::ecdsa::EcdsaChannelSigner;
2222
use crate::chain::transaction::{OutPoint, TransactionData};
@@ -305,6 +305,16 @@ pub trait Watch<ChannelSigner: EcdsaChannelSigner> {
305305
/// For details on asynchronous [`ChannelMonitor`] updating and returning
306306
/// [`MonitorEvent::Completed`] here, see [`ChannelMonitorUpdateStatus::InProgress`].
307307
fn release_pending_monitor_events(&self) -> Vec<(OutPoint, ChannelId, Vec<MonitorEvent>, Option<PublicKey>)>;
308+
309+
/// Watches a dummy channel identified by `funding_txo` using `monitor`.
310+
/// This is called when we receive a peer storage and finds an unknown channel in it.
311+
///
312+
/// A return of `Err(())` indicates that the channel is already being tracked and there is no
313+
/// need to take any action.
314+
///
315+
/// If the given `funding_txo` has previously been registered via `watch_channel`, `Err(())`
316+
/// must be returned.
317+
fn watch_dummy(&self, funding_outpoint: OutPoint, stub_monitor: StubChannelMonitor<ChannelSigner>) -> Result<(), ()>;
308318
}
309319

310320
/// The `Filter` trait defines behavior for indicating chain activity of interest pertaining to

0 commit comments

Comments
 (0)