Skip to content

Commit ba06170

Browse files
Aditya SharmaAditya Sharma
Aditya Sharma
authored and
Aditya Sharma
committed
chainmonitor: Add persistence logic for StubChannelMonitor and appropriate helpers to reload it.
1 parent 103da7e commit ba06170

File tree

6 files changed

+259
-9
lines changed

6 files changed

+259
-9
lines changed

fuzz/src/chanmon_consistency.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ use lightning::chain::{
4545
};
4646
use lightning::events;
4747
use lightning::events::MessageSendEventsProvider;
48+
use lightning::ln::{ChannelId, PaymentHash, PaymentPreimage, PaymentSecret};
49+
use lightning::ln::channelmanager::{ChainParameters, ChannelDetails, ChannelManager, PaymentSendFailure, ChannelManagerReadArgs, PaymentId, RecipientOnionFields, StubChannel};
4850
use lightning::ln::channel::FEE_SPIKE_BUFFER_FEE_INCREASE_MULTIPLE;
4951
use lightning::ln::channel_state::ChannelDetails;
5052
use lightning::ln::channelmanager::{
@@ -285,6 +287,10 @@ impl chain::Watch<TestChannelSigner> for TestChainMonitor {
285287
) -> Vec<(OutPoint, ChannelId, Vec<MonitorEvent>, Option<PublicKey>)> {
286288
return self.chain_monitor.release_pending_monitor_events();
287289
}
290+
291+
fn watch_dummy(&self, funding_outpoint: OutPoint, stub_monitor: StubChannelMonitor<ChannelSigner>) -> Result<(), ()> {
292+
return self.chain_monitor.watch_dummy(funding_outpoint, monitor);
293+
}
288294
}
289295

290296
struct KeyProvider {

lightning/src/chain/chainmonitor.rs

Lines changed: 118 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use bitcoin::hash_types::{Txid, BlockHash};
2929
use crate::chain;
3030
use crate::chain::{ChannelMonitorUpdateStatus, Filter, WatchedOutput};
3131
use crate::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
32-
use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs, WithChannelMonitor};
32+
use crate::chain::channelmonitor::{ChannelMonitor, StubChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs, WithChannelMonitor, LATENCY_GRACE_PERIOD_BLOCKS};
3333
use crate::chain::transaction::{OutPoint, TransactionData};
3434
use crate::ln::types::ChannelId;
3535
use crate::sign::ecdsa::EcdsaChannelSigner;
@@ -162,6 +162,9 @@ pub trait Persist<ChannelSigner: EcdsaChannelSigner> {
162162
/// Archiving the data in a backup location (rather than deleting it fully) is useful for
163163
/// hedging against data loss in case of unexpected failure.
164164
fn archive_persisted_channel(&self, channel_funding_outpoint: OutPoint);
165+
/// Persist a new channel's data in response to a [`chain::Watch::watch_dummy`] call. This is
166+
/// called by [`ChannelManager`] for new stub channels received from peer storage backup,
167+
fn persist_new_stub_channel(&self, funding_txo: OutPoint, stub_monitor: &StubChannelMonitor<ChannelSigner>) -> Result<(), std::io::Error>;
165168
}
166169

167170
struct MonitorHolder<ChannelSigner: EcdsaChannelSigner> {
@@ -190,6 +193,19 @@ pub struct LockedChannelMonitor<'a, ChannelSigner: EcdsaChannelSigner> {
190193
funding_txo: OutPoint,
191194
}
192195

196+
/// A read-only reference to a current StubChannelMonitors similar to [LockedChannelMonitor]
197+
pub struct LockedStubChannelMonitor<'a, ChannelSigner: EcdsaChannelSigner> {
198+
lock: RwLockReadGuard<'a, HashMap<OutPoint, StubChannelMonitor<ChannelSigner>>>,
199+
funding_txo: OutPoint,
200+
}
201+
202+
impl<ChannelSigner: EcdsaChannelSigner> Deref for LockedStubChannelMonitor<'_, ChannelSigner> {
203+
type Target = StubChannelMonitor<ChannelSigner>;
204+
fn deref(&self) -> &StubChannelMonitor<ChannelSigner> {
205+
&self.lock.get(&self.funding_txo).expect("Checked at construction")
206+
}
207+
}
208+
193209
impl<ChannelSigner: EcdsaChannelSigner> Deref for LockedChannelMonitor<'_, ChannelSigner> {
194210
type Target = ChannelMonitor<ChannelSigner>;
195211
fn deref(&self) -> &ChannelMonitor<ChannelSigner> {
@@ -221,6 +237,8 @@ pub struct ChainMonitor<ChannelSigner: EcdsaChannelSigner, C: Deref, T: Deref, F
221237
P::Target: Persist<ChannelSigner>,
222238
{
223239
monitors: RwLock<HashMap<OutPoint, MonitorHolder<ChannelSigner>>>,
240+
stub_monitors: RwLock<HashMap<OutPoint, StubChannelMonitor<ChannelSigner>>>,
241+
224242
chain_source: Option<C>,
225243
broadcaster: T,
226244
logger: L,
@@ -255,9 +273,10 @@ where C::Target: chain::Filter,
255273
/// updated `txdata`.
256274
///
257275
/// Calls which represent a new blockchain tip height should set `best_height`.
258-
fn process_chain_data<FN>(&self, header: &Header, best_height: Option<u32>, txdata: &TransactionData, process: FN)
276+
fn process_chain_data<FN, SN>(&self, header: &Header, best_height: Option<u32>, txdata: &TransactionData, process: FN, stub_process: SN)
259277
where
260-
FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs>
278+
FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs>,
279+
SN: Fn(&StubChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs>
261280
{
262281
let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down.";
263282
let funding_outpoints = hash_set_from_iter(self.monitors.read().unwrap().keys().cloned());
@@ -287,6 +306,14 @@ where C::Target: chain::Filter,
287306
}
288307
}
289308

309+
let stub_monitors = self.stub_monitors.write().unwrap();
310+
for (funding_outpoint, stub_monitor) in stub_monitors.iter() {
311+
if self.update_stub_with_chain_data(header, best_height, txdata, &stub_process, funding_outpoint, stub_monitor).is_err() {
312+
log_error!(self.logger, "{}", err_str);
313+
panic!("{}", err_str);
314+
};
315+
}
316+
290317
if let Some(height) = best_height {
291318
// If the best block height is being updated, update highest_chain_height under the
292319
// monitors write lock.
@@ -298,6 +325,34 @@ where C::Target: chain::Filter,
298325
}
299326
}
300327

328+
fn update_stub_with_chain_data<SN>(&self, header: &Header, _best_height: Option<u32>, txdata: &TransactionData, stub_process: SN,
329+
_funding_outpoint: &OutPoint, stub_monitor: &StubChannelMonitor<ChannelSigner>) -> Result<(), ()>
330+
where SN: Fn(&StubChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs> {
331+
let logger = WithChannelMonitor::from_stub(&self.logger, stub_monitor);
332+
let mut txn_outputs;
333+
{
334+
txn_outputs = stub_process(stub_monitor, txdata);
335+
}
336+
337+
if let Some(ref chain_source) = self.chain_source {
338+
let block_hash = header.block_hash();
339+
for (txid, mut outputs) in txn_outputs.drain(..) {
340+
for (idx, output) in outputs.drain(..) {
341+
// Register any new outputs with the chain source for filtering
342+
let output = WatchedOutput {
343+
block_hash: Some(block_hash),
344+
outpoint: OutPoint { txid, index: idx as u16 },
345+
script_pubkey: output.script_pubkey,
346+
};
347+
log_trace!(logger, "Adding monitoring for spends of outpoint from stub {} to the filter", output.outpoint);
348+
chain_source.register_output(output);
349+
}
350+
}
351+
}
352+
353+
Ok(())
354+
}
355+
301356
fn update_monitor_with_chain_data<FN>(
302357
&self, header: &Header, best_height: Option<u32>, txdata: &TransactionData, process: FN, funding_outpoint: &OutPoint,
303358
monitor_state: &MonitorHolder<ChannelSigner>, channel_count: usize,
@@ -367,6 +422,7 @@ where C::Target: chain::Filter,
367422
pub fn new(chain_source: Option<C>, broadcaster: T, logger: L, feeest: F, persister: P) -> Self {
368423
Self {
369424
monitors: RwLock::new(new_hash_map()),
425+
stub_monitors: RwLock::new(new_hash_map()),
370426
chain_source,
371427
broadcaster,
372428
logger,
@@ -417,6 +473,15 @@ where C::Target: chain::Filter,
417473
}
418474
}
419475

476+
pub fn get_stub_monitor(&self, funding_txo: OutPoint) -> Result<LockedStubChannelMonitor<'_, ChannelSigner>, ()> {
477+
let lock = self.stub_monitors.read().unwrap();
478+
if lock.get(&funding_txo).is_some() {
479+
Ok(LockedStubChannelMonitor { lock, funding_txo })
480+
} else {
481+
Err(())
482+
}
483+
}
484+
420485
/// Lists the funding outpoint and channel ID of each [`ChannelMonitor`] being monitored.
421486
///
422487
/// Note that [`ChannelMonitor`]s are not removed when a channel is closed as they are always
@@ -428,6 +493,13 @@ where C::Target: chain::Filter,
428493
}).collect()
429494
}
430495

496+
pub fn list_stub_monitors(&self) -> Vec<(OutPoint, ChannelId)> {
497+
self.stub_monitors.read().unwrap().iter().map(|(outpoint, stub_monitor)| {
498+
let channel_id = stub_monitor.channel_id();
499+
(*outpoint, channel_id)
500+
}).collect()
501+
}
502+
431503
#[cfg(not(c_bindings))]
432504
/// Lists the pending updates for each [`ChannelMonitor`] (by `OutPoint` being monitored).
433505
/// Each `Vec<u64>` contains `update_id`s from [`ChannelMonitor::get_latest_update_id`] for updates
@@ -654,6 +726,9 @@ where
654726
self.process_chain_data(header, Some(height), &txdata, |monitor, txdata| {
655727
monitor.block_connected(
656728
header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &self.logger)
729+
}, |stub_monitor, txdata| {
730+
stub_monitor.block_connected(
731+
header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &self.logger)
657732
});
658733
// Assume we may have some new events and wake the event processor
659734
self.event_notifier.notify();
@@ -683,6 +758,9 @@ where
683758
self.process_chain_data(header, None, txdata, |monitor, txdata| {
684759
monitor.transactions_confirmed(
685760
header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &self.logger)
761+
}, |stub_monitor, txdata| {
762+
stub_monitor.transactions_confirmed(
763+
header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &self.logger)
686764
});
687765
// Assume we may have some new events and wake the event processor
688766
self.event_notifier.notify();
@@ -705,6 +783,10 @@ where
705783
monitor.best_block_updated(
706784
header, height, &*self.broadcaster, &*self.fee_estimator, &self.logger
707785
)
786+
}, |stub_monitor, txdata| {
787+
debug_assert!(txdata.is_empty());
788+
stub_monitor.best_block_updated(
789+
header, height, &*self.broadcaster, &*self.fee_estimator, &self.logger)
708790
});
709791
// Assume we may have some new events and wake the event processor
710792
self.event_notifier.notify();
@@ -731,6 +813,39 @@ where C::Target: chain::Filter,
731813
L::Target: Logger,
732814
P::Target: Persist<ChannelSigner>,
733815
{
816+
fn watch_dummy(&self, funding_outpoint: OutPoint, stub_monitor: StubChannelMonitor<ChannelSigner>) -> Result<(), ()> {
817+
let logger = WithChannelMonitor::from_stub(&self.logger, &stub_monitor);
818+
let mut monitors = self.monitors.write().unwrap();
819+
match monitors.entry(funding_outpoint) {
820+
hash_map::Entry::Occupied(_) => {
821+
log_error!(logger, "Failed to add new channel data: channel monitor for given outpoint is already present");
822+
return Err(());
823+
},
824+
hash_map::Entry::Vacant(e) => e,
825+
};
826+
let mut stub_monitors = self.stub_monitors.write().unwrap();
827+
let entry = match stub_monitors.entry(funding_outpoint) {
828+
hash_map::Entry::Occupied(_) => {
829+
log_error!(logger, "Failed to add new channel data: channel monitor for given outpoint is already present");
830+
return Err(());
831+
},
832+
hash_map::Entry::Vacant(e) => e,
833+
};
834+
log_trace!(logger, "Got new StubChannelMonitor for channel {}", stub_monitor.channel_id());
835+
let persist_res = self.persister.persist_new_stub_channel(funding_outpoint, &stub_monitor);
836+
837+
if persist_res.is_err() {
838+
log_error!(logger, "Failed to add new dummy channel data");
839+
return Err(());
840+
}
841+
if let Some(ref chain_source) = self.chain_source {
842+
stub_monitor.load_outputs_to_watch(chain_source , &self.logger);
843+
}
844+
entry.insert(stub_monitor);
845+
846+
Ok(())
847+
}
848+
734849
fn watch_channel(&self, funding_outpoint: OutPoint, monitor: ChannelMonitor<ChannelSigner>) -> Result<ChannelMonitorUpdateStatus, ()> {
735850
let logger = WithChannelMonitor::from(&self.logger, &monitor, None);
736851
let mut monitors = self.monitors.write().unwrap();

lightning/src/chain/mod.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use bitcoin::hash_types::{BlockHash, Txid};
1616
use bitcoin::network::Network;
1717
use bitcoin::secp256k1::PublicKey;
1818

19-
use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, MonitorEvent};
19+
use crate::chain::channelmonitor::{ChannelMonitor, StubChannelMonitor, ChannelMonitorUpdate, MonitorEvent};
2020
use crate::ln::types::ChannelId;
2121
use crate::sign::ecdsa::EcdsaChannelSigner;
2222
use crate::chain::transaction::{OutPoint, TransactionData};
@@ -305,6 +305,16 @@ pub trait Watch<ChannelSigner: EcdsaChannelSigner> {
305305
/// For details on asynchronous [`ChannelMonitor`] updating and returning
306306
/// [`MonitorEvent::Completed`] here, see [`ChannelMonitorUpdateStatus::InProgress`].
307307
fn release_pending_monitor_events(&self) -> Vec<(OutPoint, ChannelId, Vec<MonitorEvent>, Option<PublicKey>)>;
308+
309+
/// Watches a dummy channel identified by `funding_txo` using `monitor`.
310+
/// This is called when we receive a peer storage and finds an unknown channel in it.
311+
///
312+
/// A return of `Err(())` indicates that the channel is already being tracked and there is no
313+
/// need to take any action.
314+
///
315+
/// If the given `funding_txo` has previously been registered via `watch_channel`, `Err(())`
316+
/// must be returned.
317+
fn watch_dummy(&self, funding_outpoint: OutPoint, stub_monitor: StubChannelMonitor<ChannelSigner>) -> Result<(), ()>;
308318
}
309319

310320
/// The `Filter` trait defines behavior for indicating chain activity of interest pertaining to

0 commit comments

Comments
 (0)