Skip to content

Commit 669280e

Browse files
committed
Don't drop ChannelMonitor Events until they're processed
We currently assume the owner of `ChannelMonitor`s won't persist the `ChannelMonitor` while `Event`s are being processed. This is fine, except (a) its generally hard to do so and (b) the `ChainMonitor` doesn't even do this. Thus, in rare cases, a user could begin processing events which are, generated by connecting a transaction or a new best-block, take some time to do so, and while doing so process a further chain event, causing persistece. This could lose the event being processed alltogether, which could lose the user funds. This should be very rare, but may have been made slightly more reachable with (a) the async event processing making it more common to do networking in event handling, (b) the new future generation in the `ChainMonitor`, which now wakes the `background-processor` directly when chain actions happen on the `ChainMonitor`.
1 parent 0d1072b commit 669280e

File tree

2 files changed

+82
-28
lines changed

2 files changed

+82
-28
lines changed

lightning/src/chain/chainmonitor.rs

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -520,12 +520,13 @@ where C::Target: chain::Filter,
520520
pub async fn process_pending_events_async<Future: core::future::Future, H: Fn(Event) -> Future>(
521521
&self, handler: H
522522
) {
523-
let mut pending_events = Vec::new();
524-
for monitor_state in self.monitors.read().unwrap().values() {
525-
pending_events.append(&mut monitor_state.monitor.get_and_clear_pending_events());
526-
}
527-
for event in pending_events {
528-
handler(event).await;
523+
// Sadly we can't hold the monitors read lock through an async call. Thus we have to do a
524+
// crazy dance to process a monitor's events then only remove them once we've done so.
525+
let mons_to_process = self.monitors.read().unwrap().keys().cloned().collect::<Vec<_>>();
526+
for funding_txo in mons_to_process {
527+
let mut ev;
528+
super::channelmonitor::process_events_body!(
529+
self.monitors.read().unwrap().get(&funding_txo).map(|m| &m.monitor), ev, handler(ev).await);
529530
}
530531
}
531532

@@ -796,12 +797,8 @@ impl<ChannelSigner: WriteableEcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L
796797
/// [`SpendableOutputs`]: events::Event::SpendableOutputs
797798
/// [`BumpTransaction`]: events::Event::BumpTransaction
798799
fn process_pending_events<H: Deref>(&self, handler: H) where H::Target: EventHandler {
799-
let mut pending_events = Vec::new();
800800
for monitor_state in self.monitors.read().unwrap().values() {
801-
pending_events.append(&mut monitor_state.monitor.get_and_clear_pending_events());
802-
}
803-
for event in pending_events {
804-
handler.handle_event(event);
801+
monitor_state.monitor.process_pending_events(&handler);
805802
}
806803
}
807804
}

lightning/src/chain/channelmonitor.rs

Lines changed: 74 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ use crate::chain::Filter;
4949
use crate::util::logger::Logger;
5050
use crate::util::ser::{Readable, ReadableArgs, RequiredWrapper, MaybeReadable, UpgradableRequired, Writer, Writeable, U48};
5151
use crate::util::byte_utils;
52-
use crate::events::Event;
52+
use crate::events::{Event, EventHandler};
5353
use crate::events::bump_transaction::{AnchorDescriptor, HTLCDescriptor, BumpTransactionEvent};
5454

5555
use crate::prelude::*;
@@ -738,11 +738,6 @@ impl Readable for IrrevocablyResolvedHTLC {
738738
/// You MUST ensure that no ChannelMonitors for a given channel anywhere contain out-of-date
739739
/// information and are actively monitoring the chain.
740740
///
741-
/// Pending Events or updated HTLCs which have not yet been read out by
742-
/// get_and_clear_pending_monitor_events or get_and_clear_pending_events are serialized to disk and
743-
/// reloaded at deserialize-time. Thus, you must ensure that, when handling events, all events
744-
/// gotten are fully handled before re-serializing the new state.
745-
///
746741
/// Note that the deserializer is only implemented for (BlockHash, ChannelMonitor), which
747742
/// tells you the last block hash which was block_connect()ed. You MUST rescan any blocks along
748743
/// the "reorg path" (ie disconnecting blocks until you find a common ancestor from both the
@@ -752,7 +747,7 @@ pub struct ChannelMonitor<Signer: WriteableEcdsaChannelSigner> {
752747
#[cfg(test)]
753748
pub(crate) inner: Mutex<ChannelMonitorImpl<Signer>>,
754749
#[cfg(not(test))]
755-
inner: Mutex<ChannelMonitorImpl<Signer>>,
750+
pub(super) inner: Mutex<ChannelMonitorImpl<Signer>>,
756751
}
757752

758753
#[derive(PartialEq)]
@@ -829,7 +824,8 @@ pub(crate) struct ChannelMonitorImpl<Signer: WriteableEcdsaChannelSigner> {
829824
// we further MUST NOT generate events during block/transaction-disconnection.
830825
pending_monitor_events: Vec<MonitorEvent>,
831826

832-
pending_events: Vec<Event>,
827+
pub(super) pending_events: Vec<Event>,
828+
pub(super) pending_events_processor: bool,
833829

834830
// Used to track on-chain events (i.e., transactions part of channels confirmed on chain) on
835831
// which to take actions once they reach enough confirmations. Each entry includes the
@@ -1088,6 +1084,38 @@ impl<Signer: WriteableEcdsaChannelSigner> Writeable for ChannelMonitorImpl<Signe
10881084
}
10891085
}
10901086

1087+
macro_rules! _process_events_body {
1088+
($self_opt: expr, $event_to_handle: expr, $handle_event: expr) => {
1089+
loop {
1090+
let (pending_events, repeated_events);
1091+
if let Some(us) = $self_opt {
1092+
let mut inner = us.inner.lock().unwrap();
1093+
if inner.pending_events_processor {
1094+
break;
1095+
}
1096+
inner.pending_events_processor = true;
1097+
1098+
pending_events = inner.pending_events.clone();
1099+
repeated_events = inner.get_repeated_events();
1100+
} else { break; }
1101+
let num_events = pending_events.len();
1102+
1103+
for event in pending_events.into_iter().chain(repeated_events.into_iter()) {
1104+
$event_to_handle = event;
1105+
$handle_event;
1106+
}
1107+
1108+
if let Some(us) = $self_opt {
1109+
let mut inner = us.inner.lock().unwrap();
1110+
inner.pending_events.drain(..num_events);
1111+
inner.pending_events_processor = false;
1112+
}
1113+
break;
1114+
}
1115+
}
1116+
}
1117+
pub(super) use _process_events_body as process_events_body;
1118+
10911119
impl<Signer: WriteableEcdsaChannelSigner> ChannelMonitor<Signer> {
10921120
/// For lockorder enforcement purposes, we need to have a single site which constructs the
10931121
/// `inner` mutex, otherwise cases where we lock two monitors at the same time (eg in our
@@ -1179,6 +1207,7 @@ impl<Signer: WriteableEcdsaChannelSigner> ChannelMonitor<Signer> {
11791207
payment_preimages: HashMap::new(),
11801208
pending_monitor_events: Vec::new(),
11811209
pending_events: Vec::new(),
1210+
pending_events_processor: false,
11821211

11831212
onchain_events_awaiting_threshold_conf: Vec::new(),
11841213
outputs_to_watch,
@@ -1306,16 +1335,41 @@ impl<Signer: WriteableEcdsaChannelSigner> ChannelMonitor<Signer> {
13061335
self.inner.lock().unwrap().get_and_clear_pending_monitor_events()
13071336
}
13081337

1309-
/// Gets the list of pending events which were generated by previous actions, clearing the list
1310-
/// in the process.
1338+
/// Processes [`SpendableOutputs`] events produced from each [`ChannelMonitor`] upon maturity.
1339+
///
1340+
/// For channels featuring anchor outputs, this method will also process [`BumpTransaction`]
1341+
/// events produced from each [`ChannelMonitor`] while there is a balance to claim onchain
1342+
/// within each channel. As the confirmation of a commitment transaction may be critical to the
1343+
/// safety of funds, we recommend invoking this every 30 seconds, or lower if running in an
1344+
/// environment with spotty connections, like on mobile.
1345+
///
1346+
/// An [`EventHandler`] may safely call back to the provider, though this shouldn't be needed in
1347+
/// order to handle these events.
13111348
///
1312-
/// This is called by the [`EventsProvider::process_pending_events`] implementation for
1313-
/// [`ChainMonitor`].
1349+
/// [`SpendableOutputs`]: events::Event::SpendableOutputs
1350+
/// [`BumpTransaction`]: events::Event::BumpTransaction
1351+
pub fn process_pending_events<H: Deref>(&self, handler: &H) where H::Target: EventHandler {
1352+
let mut ev;
1353+
process_events_body!(Some(self), ev, handler.handle_event(ev));
1354+
}
1355+
1356+
/// Processes any events asynchronously.
13141357
///
1315-
/// [`EventsProvider::process_pending_events`]: crate::events::EventsProvider::process_pending_events
1316-
/// [`ChainMonitor`]: crate::chain::chainmonitor::ChainMonitor
1358+
/// See [`Self::process_pending_events`] for more information.
1359+
pub async fn process_pending_events_async<Future: core::future::Future, H: Fn(Event) -> Future>(
1360+
&self, handler: &H
1361+
) {
1362+
let mut ev;
1363+
process_events_body!(Some(self), ev, { handler(ev).await });
1364+
}
1365+
1366+
#[cfg(test)]
13171367
pub fn get_and_clear_pending_events(&self) -> Vec<Event> {
1318-
self.inner.lock().unwrap().get_and_clear_pending_events()
1368+
let mut ret = Vec::new();
1369+
let mut lck = self.inner.lock().unwrap();
1370+
mem::swap(&mut ret, &mut lck.pending_events);
1371+
ret.append(&mut lck.get_repeated_events());
1372+
ret
13191373
}
13201374

13211375
pub(crate) fn get_min_seen_secret(&self) -> u64 {
@@ -2531,9 +2585,11 @@ impl<Signer: WriteableEcdsaChannelSigner> ChannelMonitorImpl<Signer> {
25312585
ret
25322586
}
25332587

2534-
pub fn get_and_clear_pending_events(&mut self) -> Vec<Event> {
2588+
/// Gets the set of events which are repeated regularly (i.e. those which RBF bump
2589+
/// transactions). We're okay if we lose these on restart as they'll be regenerated for us at
2590+
/// some regular inverval.
2591+
pub(super) fn get_repeated_events(&mut self) -> Vec<Event> {
25352592
let mut ret = Vec::new();
2536-
mem::swap(&mut ret, &mut self.pending_events);
25372593
for (claim_id, claim_event) in self.onchain_tx_handler.get_and_clear_pending_claim_events().drain(..) {
25382594
match claim_event {
25392595
ClaimEvent::BumpCommitment {
@@ -4096,6 +4152,7 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP
40964152
payment_preimages,
40974153
pending_monitor_events: pending_monitor_events.unwrap(),
40984154
pending_events,
4155+
pending_events_processor: false,
40994156

41004157
onchain_events_awaiting_threshold_conf,
41014158
outputs_to_watch,

0 commit comments

Comments
 (0)