Skip to content

Commit 6d54db4

Browse files
committed
Use an opaque type to describe monitor updates in Persist
In the next commit, we'll be originating monitor updates both from the ChainMonitor and from the ChannelManager, making simple sequential update IDs impossible. Further, the existing async monitor update API was somewhat hard to work with - instead of being able to generate monitor_updated callbacks whenever a persistence process finishes, you had to ensure you only did so at least once all previous updates had also been persisted. Here we eat the complexity for the user by moving to an opaque type for monitor updates, tracking which updates are in-flight for the user and only generating monitor-persisted events once all pending updates have been committed.
1 parent 87908cf commit 6d54db4

File tree

6 files changed

+169
-101
lines changed

6 files changed

+169
-101
lines changed

fuzz/src/chanmon_consistency.rs

+9-9
Original file line numberDiff line numberDiff line change
@@ -855,25 +855,25 @@ pub fn do_test<Out: test_logger::Output>(data: &[u8], out: Out) {
855855

856856
0x08 => {
857857
if let Some((id, _)) = monitor_a.latest_monitors.lock().unwrap().get(&chan_1_funding) {
858-
monitor_a.chain_monitor.channel_monitor_updated(chan_1_funding, *id);
858+
monitor_a.chain_monitor.force_channel_monitor_updated(chan_1_funding, *id);
859859
nodes[0].process_monitor_events();
860860
}
861861
},
862862
0x09 => {
863863
if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_1_funding) {
864-
monitor_b.chain_monitor.channel_monitor_updated(chan_1_funding, *id);
864+
monitor_b.chain_monitor.force_channel_monitor_updated(chan_1_funding, *id);
865865
nodes[1].process_monitor_events();
866866
}
867867
},
868868
0x0a => {
869869
if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_2_funding) {
870-
monitor_b.chain_monitor.channel_monitor_updated(chan_2_funding, *id);
870+
monitor_b.chain_monitor.force_channel_monitor_updated(chan_2_funding, *id);
871871
nodes[1].process_monitor_events();
872872
}
873873
},
874874
0x0b => {
875875
if let Some((id, _)) = monitor_c.latest_monitors.lock().unwrap().get(&chan_2_funding) {
876-
monitor_c.chain_monitor.channel_monitor_updated(chan_2_funding, *id);
876+
monitor_c.chain_monitor.force_channel_monitor_updated(chan_2_funding, *id);
877877
nodes[2].process_monitor_events();
878878
}
879879
},
@@ -1075,25 +1075,25 @@ pub fn do_test<Out: test_logger::Output>(data: &[u8], out: Out) {
10751075
// Test that no channel is in a stuck state where neither party can send funds even
10761076
// after we resolve all pending events.
10771077
// First make sure there are no pending monitor updates, resetting the error state
1078-
// and calling channel_monitor_updated for each monitor.
1078+
// and calling force_channel_monitor_updated for each monitor.
10791079
*monitor_a.persister.update_ret.lock().unwrap() = Ok(());
10801080
*monitor_b.persister.update_ret.lock().unwrap() = Ok(());
10811081
*monitor_c.persister.update_ret.lock().unwrap() = Ok(());
10821082

10831083
if let Some((id, _)) = monitor_a.latest_monitors.lock().unwrap().get(&chan_1_funding) {
1084-
monitor_a.chain_monitor.channel_monitor_updated(chan_1_funding, *id);
1084+
monitor_a.chain_monitor.force_channel_monitor_updated(chan_1_funding, *id);
10851085
nodes[0].process_monitor_events();
10861086
}
10871087
if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_1_funding) {
1088-
monitor_b.chain_monitor.channel_monitor_updated(chan_1_funding, *id);
1088+
monitor_b.chain_monitor.force_channel_monitor_updated(chan_1_funding, *id);
10891089
nodes[1].process_monitor_events();
10901090
}
10911091
if let Some((id, _)) = monitor_b.latest_monitors.lock().unwrap().get(&chan_2_funding) {
1092-
monitor_b.chain_monitor.channel_monitor_updated(chan_2_funding, *id);
1092+
monitor_b.chain_monitor.force_channel_monitor_updated(chan_2_funding, *id);
10931093
nodes[1].process_monitor_events();
10941094
}
10951095
if let Some((id, _)) = monitor_c.latest_monitors.lock().unwrap().get(&chan_2_funding) {
1096-
monitor_c.chain_monitor.channel_monitor_updated(chan_2_funding, *id);
1096+
monitor_c.chain_monitor.force_channel_monitor_updated(chan_2_funding, *id);
10971097
nodes[2].process_monitor_events();
10981098
}
10991099

fuzz/src/utils/test_persister.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use lightning::chain;
22
use lightning::chain::{chainmonitor, channelmonitor};
3+
use lightning::chain::chainmonitor::MonitorUpdateId;
34
use lightning::chain::transaction::OutPoint;
45
use lightning::util::enforcing_trait_impls::EnforcingSigner;
56

@@ -9,11 +10,11 @@ pub struct TestPersister {
910
pub update_ret: Mutex<Result<(), chain::ChannelMonitorUpdateErr>>,
1011
}
1112
impl chainmonitor::Persist<EnforcingSigner> for TestPersister {
12-
fn persist_new_channel(&self, _funding_txo: OutPoint, _data: &channelmonitor::ChannelMonitor<EnforcingSigner>) -> Result<(), chain::ChannelMonitorUpdateErr> {
13+
fn persist_new_channel(&self, _funding_txo: OutPoint, _data: &channelmonitor::ChannelMonitor<EnforcingSigner>, _update_id: MonitorUpdateId) -> Result<(), chain::ChannelMonitorUpdateErr> {
1314
self.update_ret.lock().unwrap().clone()
1415
}
1516

16-
fn update_persisted_channel(&self, _funding_txo: OutPoint, _update: &channelmonitor::ChannelMonitorUpdate, _data: &channelmonitor::ChannelMonitor<EnforcingSigner>) -> Result<(), chain::ChannelMonitorUpdateErr> {
17+
fn update_persisted_channel(&self, _funding_txo: OutPoint, _update: &channelmonitor::ChannelMonitorUpdate, _data: &channelmonitor::ChannelMonitor<EnforcingSigner>, _update_id: MonitorUpdateId) -> Result<(), chain::ChannelMonitorUpdateErr> {
1718
self.update_ret.lock().unwrap().clone()
1819
}
1920
}

lightning-persister/src/lib.rs

+8-4
Original file line numberDiff line numberDiff line change
@@ -159,13 +159,13 @@ impl FilesystemPersister {
159159
}
160160

161161
impl<ChannelSigner: Sign> chainmonitor::Persist<ChannelSigner> for FilesystemPersister {
162-
fn persist_new_channel(&self, funding_txo: OutPoint, monitor: &ChannelMonitor<ChannelSigner>) -> Result<(), chain::ChannelMonitorUpdateErr> {
162+
fn persist_new_channel(&self, funding_txo: OutPoint, monitor: &ChannelMonitor<ChannelSigner>, _update_id: chainmonitor::MonitorUpdateId) -> Result<(), chain::ChannelMonitorUpdateErr> {
163163
let filename = format!("{}_{}", funding_txo.txid.to_hex(), funding_txo.index);
164164
util::write_to_file(self.path_to_monitor_data(), filename, monitor)
165165
.map_err(|_| chain::ChannelMonitorUpdateErr::PermanentFailure)
166166
}
167167

168-
fn update_persisted_channel(&self, funding_txo: OutPoint, _update: &ChannelMonitorUpdate, monitor: &ChannelMonitor<ChannelSigner>) -> Result<(), chain::ChannelMonitorUpdateErr> {
168+
fn update_persisted_channel(&self, funding_txo: OutPoint, _update: &ChannelMonitorUpdate, monitor: &ChannelMonitor<ChannelSigner>, _update_id: chainmonitor::MonitorUpdateId) -> Result<(), chain::ChannelMonitorUpdateErr> {
169169
let filename = format!("{}_{}", funding_txo.txid.to_hex(), funding_txo.index);
170170
util::write_to_file(self.path_to_monitor_data(), filename, monitor)
171171
.map_err(|_| chain::ChannelMonitorUpdateErr::PermanentFailure)
@@ -296,6 +296,8 @@ mod tests {
296296
nodes[1].node.force_close_channel(&chan.2).unwrap();
297297
check_closed_event!(nodes[1], 1, ClosureReason::HolderForceClosed);
298298
let mut added_monitors = nodes[1].chain_monitor.added_monitors.lock().unwrap();
299+
let update_map = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap();
300+
let update_id = update_map.get(&added_monitors[0].0.to_channel_id()).unwrap();
299301

300302
// Set the persister's directory to read-only, which should result in
301303
// returning a permanent failure when we then attempt to persist a
@@ -309,7 +311,7 @@ mod tests {
309311
txid: Txid::from_hex("8984484a580b825b9972d7adb15050b3ab624ccd731946b3eeddb92f4e7ef6be").unwrap(),
310312
index: 0
311313
};
312-
match persister.persist_new_channel(test_txo, &added_monitors[0].1) {
314+
match persister.persist_new_channel(test_txo, &added_monitors[0].1, update_id.2) {
313315
Err(ChannelMonitorUpdateErr::PermanentFailure) => {},
314316
_ => panic!("unexpected result from persisting new channel")
315317
}
@@ -333,6 +335,8 @@ mod tests {
333335
nodes[1].node.force_close_channel(&chan.2).unwrap();
334336
check_closed_event!(nodes[1], 1, ClosureReason::HolderForceClosed);
335337
let mut added_monitors = nodes[1].chain_monitor.added_monitors.lock().unwrap();
338+
let update_map = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap();
339+
let update_id = update_map.get(&added_monitors[0].0.to_channel_id()).unwrap();
336340

337341
// Create the persister with an invalid directory name and test that the
338342
// channel fails to open because the directories fail to be created. There
@@ -344,7 +348,7 @@ mod tests {
344348
txid: Txid::from_hex("8984484a580b825b9972d7adb15050b3ab624ccd731946b3eeddb92f4e7ef6be").unwrap(),
345349
index: 0
346350
};
347-
match persister.persist_new_channel(test_txo, &added_monitors[0].1) {
351+
match persister.persist_new_channel(test_txo, &added_monitors[0].1, update_id.2) {
348352
Err(ChannelMonitorUpdateErr::PermanentFailure) => {},
349353
_ => panic!("unexpected result from persisting new channel")
350354
}

lightning/src/chain/chainmonitor.rs

+93-35
Original file line numberDiff line numberDiff line change
@@ -41,36 +41,54 @@ use prelude::*;
4141
use sync::{RwLock, RwLockReadGuard, Mutex};
4242
use core::ops::Deref;
4343

44+
#[derive(Clone, Copy, Hash, PartialEq, Eq)]
45+
pub(crate) enum MonitorUpdate {
46+
MonitorUpdateId(u64),
47+
}
48+
49+
/// An opaque identifier describing a specific [`Persist`] method call.
50+
#[derive(Clone, Copy, Hash, PartialEq, Eq)]
51+
pub struct MonitorUpdateId {
52+
pub(crate) contents: MonitorUpdate,
53+
}
54+
4455
/// `Persist` defines behavior for persisting channel monitors: this could mean
4556
/// writing once to disk, and/or uploading to one or more backup services.
4657
///
47-
/// Note that for every new monitor, you **must** persist the new `ChannelMonitor`
48-
/// to disk/backups. And, on every update, you **must** persist either the
49-
/// `ChannelMonitorUpdate` or the updated monitor itself. Otherwise, there is risk
50-
/// of situations such as revoking a transaction, then crashing before this
51-
/// revocation can be persisted, then unintentionally broadcasting a revoked
52-
/// transaction and losing money. This is a risk because previous channel states
53-
/// are toxic, so it's important that whatever channel state is persisted is
54-
/// kept up-to-date.
58+
/// Each method can return three possible values:
59+
/// * If persistence (including any relevant `fsync()` calls) happens immediately, the
60+
/// implementation should return `Ok(())`, indicating normal channel operation should continue.
61+
/// * If persistence happens asynchronously, implementations should first ensure the
62+
/// [`ChannelMonitor`] or [`ChannelMonitorUpdate`] are written durably to disk, and then return
63+
/// `Err(ChannelMonitorUpdateErr::TemporaryFailure)` while the update continues in the
64+
/// background. Once the update completes, [`ChainMonitor::channel_monitor_updated`] should be
65+
/// called with the corresponding [`MonitorUpdateId`].
66+
///
67+
/// Note that unlike the direct [`chain::Watch`] interface,
68+
/// [`ChainMonitor::channel_monitor_updated`] must be called once for *each* update which occurs.
69+
///
70+
/// * If persistence fails for some reason, implementations should return
71+
/// `Err(ChannelMonitorUpdateErr::PermanentFailure)`, in which case the channel will likely be
72+
/// closed without broadcasting the latest state. See
73+
/// [`ChannelMonitorUpdateErr::PermanentFailure`] for more details.
5574
pub trait Persist<ChannelSigner: Sign> {
56-
/// Persist a new channel's data. The data can be stored any way you want, but
57-
/// the identifier provided by Rust-Lightning is the channel's outpoint (and
58-
/// it is up to you to maintain a correct mapping between the outpoint and the
59-
/// stored channel data). Note that you **must** persist every new monitor to
60-
/// disk. See the `Persist` trait documentation for more details.
75+
/// Persist a new channel's data. The data can be stored any way you want, but the identifier
76+
/// provided by LDK is the channel's outpoint (and it is up to you to maintain a correct
77+
/// mapping between the outpoint and the stored channel data). Note that you **must** persist
78+
/// every new monitor to disk.
6179
///
6280
/// See [`Writeable::write`] on [`ChannelMonitor`] for writing out a `ChannelMonitor`
6381
/// and [`ChannelMonitorUpdateErr`] for requirements when returning errors.
6482
///
6583
/// [`Writeable::write`]: crate::util::ser::Writeable::write
66-
fn persist_new_channel(&self, id: OutPoint, data: &ChannelMonitor<ChannelSigner>) -> Result<(), ChannelMonitorUpdateErr>;
84+
fn persist_new_channel(&self, id: OutPoint, data: &ChannelMonitor<ChannelSigner>, update_id: MonitorUpdateId) -> Result<(), ChannelMonitorUpdateErr>;
6785

68-
/// Update one channel's data. The provided `ChannelMonitor` has already
69-
/// applied the given update.
86+
/// Update one channel's data. The provided [`ChannelMonitor`] has already applied the given
87+
/// update.
7088
///
71-
/// Note that on every update, you **must** persist either the
72-
/// `ChannelMonitorUpdate` or the updated monitor itself to disk/backups. See
73-
/// the `Persist` trait documentation for more details.
89+
/// Note that on every update, you **must** persist either the [`ChannelMonitorUpdate`] or the
90+
/// updated monitor itself to disk/backups. See the `Persist` trait documentation for more
91+
/// details.
7492
///
7593
/// If an implementer chooses to persist the updates only, they need to make
7694
/// sure that all the updates are applied to the `ChannelMonitors` *before*
@@ -89,11 +107,18 @@ pub trait Persist<ChannelSigner: Sign> {
89107
/// [`ChannelMonitorUpdateErr`] for requirements when returning errors.
90108
///
91109
/// [`Writeable::write`]: crate::util::ser::Writeable::write
92-
fn update_persisted_channel(&self, id: OutPoint, update: &ChannelMonitorUpdate, data: &ChannelMonitor<ChannelSigner>) -> Result<(), ChannelMonitorUpdateErr>;
110+
fn update_persisted_channel(&self, id: OutPoint, update: &ChannelMonitorUpdate, data: &ChannelMonitor<ChannelSigner>, update_id: MonitorUpdateId) -> Result<(), ChannelMonitorUpdateErr>;
93111
}
94112

95113
struct MonitorHolder<ChannelSigner: Sign> {
96114
monitor: ChannelMonitor<ChannelSigner>,
115+
/// The full set of pending monitor updates for this Channel.
116+
///
117+
/// Note that this lock must be held during updates to prevent a race where we call
118+
/// update_persisted_channel, the user returns a TemporaryFailure, and then calls
119+
/// channel_monitor_updated immediately, racing our insertion of the pending update into the
120+
/// contained Vec.
121+
pending_monitor_updates: Mutex<Vec<MonitorUpdateId>>,
97122
}
98123

99124
/// A read-only reference to a current ChannelMonitor.
@@ -262,23 +287,43 @@ where C::Target: chain::Filter,
262287
/// Indicates the persistence of a [`ChannelMonitor`] has completed after
263288
/// [`ChannelMonitorUpdateErr::TemporaryFailure`] was returned from an update operation.
264289
///
265-
/// All ChannelMonitor updates up to and including highest_applied_update_id must have been
266-
/// fully committed in every copy of the given channels' ChannelMonitors.
267-
///
268-
/// Note that there is no effect to calling with a highest_applied_update_id other than the
269-
/// current latest ChannelMonitorUpdate and one call to this function after multiple
270-
/// ChannelMonitorUpdateErr::TemporaryFailures is fine. The highest_applied_update_id field
271-
/// exists largely only to prevent races between this and concurrent update_monitor calls.
272-
///
273290
/// Thus, the anticipated use is, at a high level:
274291
/// 1) This [`ChainMonitor`] calls [`Persist::update_persisted_channel`] which stores the
275292
/// update to disk and begins updating any remote (e.g. watchtower/backup) copies,
276293
/// returning [`ChannelMonitorUpdateErr::TemporaryFailure`],
277294
/// 2) once all remote copies are updated, you call this function with the update_id that
278-
/// completed, and once it is the latest the Channel will be re-enabled.
279-
pub fn channel_monitor_updated(&self, funding_txo: OutPoint, highest_applied_update_id: u64) {
295+
/// completed, and once all pending updates have completed the Channel will be re-enabled.
296+
pub fn channel_monitor_updated(&self, funding_txo: OutPoint, completed_update_id: MonitorUpdateId) {
297+
let monitors = self.monitors.read().unwrap();
298+
let monitor_data = if let Some(mon) = monitors.get(&funding_txo) { mon } else { return; };
299+
let mut pending_monitor_updates = monitor_data.pending_monitor_updates.lock().unwrap();
300+
pending_monitor_updates.retain(|update_id| *update_id != completed_update_id);
301+
302+
match completed_update_id {
303+
MonitorUpdateId { .. } => {
304+
let monitor_update_pending_updates = pending_monitor_updates.iter().filter(|update_id|
305+
if let MonitorUpdate::MonitorUpdateId(_) = update_id.contents { true } else { false }).count();
306+
if monitor_update_pending_updates != 0 {
307+
// If there are still monitor updates pending, we cannot yet construct an
308+
// UpdateCompleted event.
309+
return;
310+
}
311+
self.user_provided_events.lock().unwrap().push(MonitorEvent::UpdateCompleted(MonitorUpdated {
312+
funding_txo,
313+
monitor_update_id: monitor_data.monitor.get_latest_update_id(),
314+
}));
315+
}
316+
}
317+
}
318+
319+
/// This wrapper avoids having to update some of our tests for now as they assume the direct
320+
/// chain::Watch API wherein we mark a monitor fully-updated by just calling
321+
/// channel_monitor_updated once with the higest ID.
322+
#[cfg(any(test, feature = "fuzztarget"))]
323+
pub fn force_channel_monitor_updated(&self, funding_txo: OutPoint, monitor_update_id: u64) {
280324
self.user_provided_events.lock().unwrap().push(MonitorEvent::UpdateCompleted(MonitorUpdated {
281-
funding_txo, monitor_update_id: highest_applied_update_id
325+
funding_txo,
326+
monitor_update_id,
282327
}));
283328
}
284329

@@ -392,12 +437,18 @@ where C::Target: chain::Filter,
392437
return Err(ChannelMonitorUpdateErr::PermanentFailure)},
393438
hash_map::Entry::Vacant(e) => e,
394439
};
395-
let update_res = self.persister.persist_new_channel(funding_outpoint, &monitor);
440+
let update_id = MonitorUpdateId {
441+
contents: MonitorUpdate::MonitorUpdateId(monitor.get_latest_update_id()),
442+
};
443+
let mut pending_monitor_updates = Vec::new();
444+
let update_res = self.persister.persist_new_channel(funding_outpoint, &monitor, update_id);
396445
if update_res.is_err() {
397446
log_error!(self.logger, "Failed to persist new channel data: {:?}", update_res);
398447
}
399448
if update_res == Err(ChannelMonitorUpdateErr::PermanentFailure) {
400449
return update_res;
450+
} else if update_res.is_err() {
451+
pending_monitor_updates.push(update_id);
401452
}
402453
{
403454
let funding_txo = monitor.get_funding_txo();
@@ -407,7 +458,7 @@ where C::Target: chain::Filter,
407458
monitor.load_outputs_to_watch(chain_source);
408459
}
409460
}
410-
entry.insert(MonitorHolder { monitor });
461+
entry.insert(MonitorHolder { monitor, pending_monitor_updates: Mutex::new(pending_monitor_updates) });
411462
update_res
412463
}
413464

@@ -437,8 +488,15 @@ where C::Target: chain::Filter,
437488
}
438489
// Even if updating the monitor returns an error, the monitor's state will
439490
// still be changed. So, persist the updated monitor despite the error.
440-
let persist_res = self.persister.update_persisted_channel(funding_txo, &update, monitor);
441-
if let Err(ref e) = persist_res {
491+
let update_id = MonitorUpdateId {
492+
contents: MonitorUpdate::MonitorUpdateId(update.update_id),
493+
};
494+
let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap();
495+
let persist_res = self.persister.update_persisted_channel(funding_txo, &update, monitor, update_id);
496+
if let Err(e) = persist_res {
497+
if e == ChannelMonitorUpdateErr::TemporaryFailure {
498+
pending_monitor_updates.push(update_id);
499+
}
442500
log_error!(self.logger, "Failed to persist channel monitor update: {:?}", e);
443501
}
444502
if update_res.is_err() {

0 commit comments

Comments
 (0)