Skip to content

Commit 890e74a

Browse files
Aditya SharmaAditya Sharma
Aditya Sharma
authored and
Aditya Sharma
committed
Add FundRecoverer to recover funds from PeerStorage
Introduce FundRecoverer, a utility to recover funds from PeerStorage while running the node in offline mode. - Define Fundrecoverer and Implement MessageSendEventsProvider, chain::Confirm, ChannelMessageHandler, events::EventsProvider. - Write Handler for your_peer_storage, peer_connected, channel_reestablish. - Implement functions to process new blocks defined in chain::Confirm. - Define RecoveryEvent which would be used to notify upstream to rescan the blockchain on the go. - Write watch_dummy() to persist the StubChannelMonitor and load outputs to watch. - Define ReadUtilOpt for defining parameters to be passed to ChannelMonitor::read_util in case of stub ChannelMonitors and otherwise. - Write utility for ChannelMonitor::read() to read ChannelMonitors from PeerStorage.
1 parent e56e900 commit 890e74a

File tree

4 files changed

+1355
-343
lines changed

4 files changed

+1355
-343
lines changed

lightning/src/chain/chainmonitor.rs

+86-67
Original file line numberDiff line numberDiff line change
@@ -254,73 +254,19 @@ pub struct ChainMonitor<ChannelSigner: EcdsaChannelSigner, C: Deref, T: Deref, F
254254
our_peerstorage_encryption_key: [u8;32],
255255
}
256256

257-
impl<ChannelSigner: EcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref> ChainMonitor<ChannelSigner, C, T, F, L, P>
258-
where C::Target: chain::Filter,
259-
T::Target: BroadcasterInterface,
260-
F::Target: FeeEstimator,
261-
L::Target: Logger,
262-
P::Target: Persist<ChannelSigner>,
263-
{
264-
/// Dispatches to per-channel monitors, which are responsible for updating their on-chain view
265-
/// of a channel and reacting accordingly based on transactions in the given chain data. See
266-
/// [`ChannelMonitor::block_connected`] for details. Any HTLCs that were resolved on chain will
267-
/// be returned by [`chain::Watch::release_pending_monitor_events`].
268-
///
269-
/// Calls back to [`chain::Filter`] if any monitor indicated new outputs to watch. Subsequent
270-
/// calls must not exclude any transactions matching the new outputs nor any in-block
271-
/// descendants of such transactions. It is not necessary to re-fetch the block to obtain
272-
/// updated `txdata`.
273-
///
274-
/// Calls which represent a new blockchain tip height should set `best_height`.
275-
fn process_chain_data<FN>(&self, header: &Header, best_height: Option<u32>, txdata: &TransactionData, process: FN)
257+
pub(crate) fn update_monitor_with_chain_data_util <FN, P: Deref, ChannelSigner, C:Deref, L:Deref>(
258+
persister: &P, chain_source: &Option<C>, logger: &L, header: &Header, best_height: Option<u32>, txdata: &TransactionData, process: FN, funding_outpoint: &OutPoint,
259+
monitor_state: &MonitorHolder<ChannelSigner>, channel_count: usize,
260+
) -> Result<(), ()>
276261
where
277-
FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs>
262+
C::Target: chain::Filter,
263+
FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs>,
264+
P::Target: Persist<ChannelSigner>,
265+
L::Target: Logger,
266+
ChannelSigner: EcdsaChannelSigner,
278267
{
279-
let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down.";
280-
let funding_outpoints = hash_set_from_iter(self.monitors.read().unwrap().keys().cloned());
281-
let channel_count = funding_outpoints.len();
282-
for funding_outpoint in funding_outpoints.iter() {
283-
let monitor_lock = self.monitors.read().unwrap();
284-
if let Some(monitor_state) = monitor_lock.get(funding_outpoint) {
285-
if self.update_monitor_with_chain_data(header, best_height, txdata, &process, funding_outpoint, &monitor_state, channel_count).is_err() {
286-
// Take the monitors lock for writing so that we poison it and any future
287-
// operations going forward fail immediately.
288-
core::mem::drop(monitor_lock);
289-
let _poison = self.monitors.write().unwrap();
290-
log_error!(self.logger, "{}", err_str);
291-
panic!("{}", err_str);
292-
}
293-
}
294-
}
295-
296-
// do some followup cleanup if any funding outpoints were added in between iterations
297-
let monitor_states = self.monitors.write().unwrap();
298-
for (funding_outpoint, monitor_state) in monitor_states.iter() {
299-
if !funding_outpoints.contains(funding_outpoint) {
300-
if self.update_monitor_with_chain_data(header, best_height, txdata, &process, funding_outpoint, &monitor_state, channel_count).is_err() {
301-
log_error!(self.logger, "{}", err_str);
302-
panic!("{}", err_str);
303-
}
304-
}
305-
}
306-
307-
if let Some(height) = best_height {
308-
// If the best block height is being updated, update highest_chain_height under the
309-
// monitors write lock.
310-
let old_height = self.highest_chain_height.load(Ordering::Acquire);
311-
let new_height = height as usize;
312-
if new_height > old_height {
313-
self.highest_chain_height.store(new_height, Ordering::Release);
314-
}
315-
}
316-
}
317-
318-
fn update_monitor_with_chain_data<FN>(
319-
&self, header: &Header, best_height: Option<u32>, txdata: &TransactionData, process: FN, funding_outpoint: &OutPoint,
320-
monitor_state: &MonitorHolder<ChannelSigner>, channel_count: usize,
321-
) -> Result<(), ()> where FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs> {
322268
let monitor = &monitor_state.monitor;
323-
let logger = WithChannelMonitor::from(&self.logger, &monitor, None);
269+
let logger = WithChannelMonitor::from(logger, &monitor, None);
324270

325271
let mut txn_outputs = process(monitor, txdata);
326272

@@ -345,7 +291,7 @@ where C::Target: chain::Filter,
345291
// `ChannelMonitorUpdate` after a channel persist for a channel with the same
346292
// `latest_update_id`.
347293
let _pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap();
348-
match self.persister.update_persisted_channel(*funding_outpoint, None, monitor) {
294+
match persister.update_persisted_channel(*funding_outpoint, None, monitor) {
349295
ChannelMonitorUpdateStatus::Completed =>
350296
log_trace!(logger, "Finished syncing Channel Monitor for channel {} for block-data",
351297
log_funding_info!(monitor)
@@ -361,7 +307,7 @@ where C::Target: chain::Filter,
361307

362308
// Register any new outputs with the chain source for filtering, storing any dependent
363309
// transactions from within the block that previously had not been included in txdata.
364-
if let Some(ref chain_source) = self.chain_source {
310+
if let Some(ref chain_source_ref) = chain_source {
365311
let block_hash = header.block_hash();
366312
for (txid, mut outputs) in txn_outputs.drain(..) {
367313
for (idx, output) in outputs.drain(..) {
@@ -372,13 +318,62 @@ where C::Target: chain::Filter,
372318
script_pubkey: output.script_pubkey,
373319
};
374320
log_trace!(logger, "Adding monitoring for spends of outpoint {} to the filter", output.outpoint);
375-
chain_source.register_output(output);
321+
chain_source_ref.register_output(output);
376322
}
377323
}
378324
}
379325
Ok(())
380326
}
381327

328+
// Utility function for process_chain_data to prevent code duplication in [`FundRecoverer`]
329+
pub(crate) fn process_chain_data_util<FN, ChannelSigner: EcdsaChannelSigner, L: Deref, P: Deref, C: Deref>(persister: &P, chain_source: &Option<C>,
330+
logger: &L, monitors: &RwLock<HashMap<OutPoint, MonitorHolder<ChannelSigner>>>, highest_chain_height: &AtomicUsize,
331+
header: &Header, best_height: Option<u32>, txdata: &TransactionData, process: FN)
332+
where
333+
FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs>,
334+
L::Target: Logger,
335+
P::Target: Persist<ChannelSigner>,
336+
C::Target: chain::Filter,
337+
{
338+
let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down.";
339+
let funding_outpoints = hash_set_from_iter(monitors.read().unwrap().keys().cloned());
340+
let channel_count = funding_outpoints.len();
341+
for funding_outpoint in funding_outpoints.iter() {
342+
let monitor_lock = monitors.read().unwrap();
343+
if let Some(monitor_state) = monitor_lock.get(funding_outpoint) {
344+
if update_monitor_with_chain_data_util(persister, chain_source, logger, header, best_height, txdata, &process, funding_outpoint, &monitor_state, channel_count).is_err() {
345+
// Take the monitors lock for writing so that we poison it and any future
346+
// operations going forward fail immediately.
347+
core::mem::drop(monitor_lock);
348+
let _poison = monitors.write().unwrap();
349+
log_error!(logger, "{}", err_str);
350+
panic!("{}", err_str);
351+
}
352+
}
353+
}
354+
355+
// do some followup cleanup if any funding outpoints were added in between iterations
356+
let monitor_states = monitors.write().unwrap();
357+
for (funding_outpoint, monitor_state) in monitor_states.iter() {
358+
if !funding_outpoints.contains(funding_outpoint) {
359+
if update_monitor_with_chain_data_util(persister, chain_source, logger, header, best_height, txdata, &process, funding_outpoint, &monitor_state, channel_count).is_err() {
360+
log_error!(logger, "{}", err_str);
361+
panic!("{}", err_str);
362+
}
363+
}
364+
}
365+
366+
if let Some(height) = best_height {
367+
// If the best block height is being updated, update highest_chain_height under the
368+
// monitors write lock.
369+
let old_height = highest_chain_height.load(Ordering::Acquire);
370+
let new_height = height as usize;
371+
if new_height > old_height {
372+
highest_chain_height.store(new_height, Ordering::Release);
373+
}
374+
}
375+
}
376+
382377
impl<ChannelSigner: EcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref> MessageSendEventsProvider for ChainMonitor<ChannelSigner, C, T, F, L, P>
383378
where C::Target: chain::Filter,
384379
T::Target: BroadcasterInterface,
@@ -425,6 +420,30 @@ where C::Target: chain::Filter,
425420
}
426421
}
427422

423+
impl<ChannelSigner: EcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref> ChainMonitor<ChannelSigner, C, T, F, L, P>
424+
where C::Target: chain::Filter,
425+
T::Target: BroadcasterInterface,
426+
F::Target: FeeEstimator,
427+
L::Target: Logger,
428+
P::Target: Persist<ChannelSigner>,
429+
{
430+
/// Dispatches to per-channel monitors, which are responsible for updating their on-chain view
431+
/// of a channel and reacting accordingly based on transactions in the given chain data. See
432+
/// [`ChannelMonitor::block_connected`] for details. Any HTLCs that were resolved on chain will
433+
/// be returned by [`chain::Watch::release_pending_monitor_events`].
434+
///
435+
/// Calls back to [`chain::Filter`] if any monitor indicated new outputs to watch. Subsequent
436+
/// calls must not exclude any transactions matching the new outputs nor any in-block
437+
/// descendants of such transactions. It is not necessary to re-fetch the block to obtain
438+
/// updated `txdata`.
439+
///
440+
/// Calls which represent a new blockchain tip height should set `best_height`.
441+
fn process_chain_data<FN>(&self, header: &Header, best_height: Option<u32>, txdata: &TransactionData, process: FN)
442+
where
443+
FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs>,
444+
{
445+
process_chain_data_util(&self.persister, &self.chain_source, &self.logger, &self.monitors, &self.highest_chain_height, header, best_height, txdata, process);
446+
}
428447

429448
/// Creates a new `ChainMonitor` used to watch on-chain activity pertaining to channels.
430449
///

0 commit comments

Comments
 (0)