Skip to content

Commit f7a735d

Browse files
committed
Add ChainMonitor::archive_fully_resolved_monitor_channels
Archives fully resolved channel monitors by adding them to a backup location and removing them from the primary storage & the monitor set. This is useful for pruning fully resolved monitors from the monitor set and primary storage so they are not reloaded on every new new block connection. We also add a new function, `archive_persisted_channel` to the `Persist` trait that writes the monitor to an archive storage and removes it from the primary storage.
1 parent 905ead3 commit f7a735d

File tree

5 files changed

+159
-0
lines changed

5 files changed

+159
-0
lines changed

fuzz/src/utils/test_persister.rs

+3
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,7 @@ impl chainmonitor::Persist<TestChannelSigner> for TestPersister {
1717
fn update_persisted_channel(&self, _funding_txo: OutPoint, _update: Option<&channelmonitor::ChannelMonitorUpdate>, _data: &channelmonitor::ChannelMonitor<TestChannelSigner>, _update_id: MonitorUpdateId) -> chain::ChannelMonitorUpdateStatus {
1818
self.update_ret.lock().unwrap().clone()
1919
}
20+
21+
fn archive_persisted_channel(&self, _: OutPoint) {
22+
}
2023
}

lightning/src/chain/chainmonitor.rs

+25
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,11 @@ pub trait Persist<ChannelSigner: WriteableEcdsaChannelSigner> {
194194
///
195195
/// [`Writeable::write`]: crate::util::ser::Writeable::write
196196
fn update_persisted_channel(&self, channel_funding_outpoint: OutPoint, update: Option<&ChannelMonitorUpdate>, data: &ChannelMonitor<ChannelSigner>, update_id: MonitorUpdateId) -> ChannelMonitorUpdateStatus;
197+
/// Prevents the channel monitor from being loaded on startup.
198+
///
199+
/// Archiving the data in a backup location (rather than deleting it fully) is useful for
200+
/// hedging against data loss in case of unexpected failure.
201+
fn archive_persisted_channel(&self, channel_funding_outpoint: OutPoint);
197202
}
198203

199204
struct MonitorHolder<ChannelSigner: WriteableEcdsaChannelSigner> {
@@ -656,6 +661,26 @@ where C::Target: chain::Filter,
656661
}
657662
}
658663
}
664+
665+
/// Archives fully resolved channel monitors by calling [`Persist::archive_persisted_channel`].
666+
///
667+
/// This is useful for pruning fully resolved monitors from the monitor set and primary
668+
/// storage so they are not kept in memory and reloaded on restart.
669+
///
670+
/// Depending on the implementation of [`Persist::archive_persisted_channel`] the monitor data could be moved
671+
/// to an archive location or/and removed entirely.
672+
pub fn archive_fully_resolved_channel_monitors(&self) {
673+
let mut monitors = self.monitors.write().unwrap();
674+
monitors.retain(|funding_txo, monitor_holder| {
675+
if monitor_holder.monitor.is_fully_resolved() {
676+
self.persister.archive_persisted_channel(*funding_txo);
677+
false
678+
} else {
679+
true
680+
}
681+
});
682+
}
683+
659684
}
660685

661686
impl<ChannelSigner: WriteableEcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref>

lightning/src/ln/monitor_tests.rs

+53
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,59 @@ fn revoked_output_htlc_resolution_timing() {
158158
expect_payment_failed!(nodes[1], payment_hash_1, false);
159159
}
160160

161+
#[test]
162+
fn archive_fully_resolved_monitors() {
163+
// Test we can archive fully resolved channel monitor.
164+
let chanmon_cfgs = create_chanmon_cfgs(2);
165+
let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
166+
let mut user_config = test_default_channel_config();
167+
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[Some(user_config), Some(user_config)]);
168+
let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs);
169+
170+
let (_, _, chan_id, funding_tx) =
171+
create_announced_chan_between_nodes_with_value(&nodes, 0, 1, 1_000_000, 1_000_000);
172+
173+
nodes[0].node.close_channel(&chan_id, &nodes[1].node.get_our_node_id()).unwrap();
174+
let node_0_shutdown = get_event_msg!(nodes[0], MessageSendEvent::SendShutdown, nodes[1].node.get_our_node_id());
175+
nodes[1].node.handle_shutdown(&nodes[0].node.get_our_node_id(), &node_0_shutdown);
176+
let node_1_shutdown = get_event_msg!(nodes[1], MessageSendEvent::SendShutdown, nodes[0].node.get_our_node_id());
177+
nodes[0].node.handle_shutdown(&nodes[1].node.get_our_node_id(), &node_1_shutdown);
178+
179+
let node_0_closing_signed = get_event_msg!(nodes[0], MessageSendEvent::SendClosingSigned, nodes[1].node.get_our_node_id());
180+
nodes[1].node.handle_closing_signed(&nodes[0].node.get_our_node_id(), &node_0_closing_signed);
181+
let node_1_closing_signed = get_event_msg!(nodes[1], MessageSendEvent::SendClosingSigned, nodes[0].node.get_our_node_id());
182+
nodes[0].node.handle_closing_signed(&nodes[1].node.get_our_node_id(), &node_1_closing_signed);
183+
let (_, node_0_2nd_closing_signed) = get_closing_signed_broadcast!(nodes[0].node, nodes[1].node.get_our_node_id());
184+
nodes[1].node.handle_closing_signed(&nodes[0].node.get_our_node_id(), &node_0_2nd_closing_signed.unwrap());
185+
let (_, _) = get_closing_signed_broadcast!(nodes[1].node, nodes[0].node.get_our_node_id());
186+
187+
let shutdown_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().split_off(0);
188+
189+
mine_transaction(&nodes[0], &shutdown_tx[0]);
190+
mine_transaction(&nodes[1], &shutdown_tx[0]);
191+
192+
connect_blocks(&nodes[0], 6);
193+
connect_blocks(&nodes[1], 6);
194+
195+
check_closed_event!(nodes[0], 1, ClosureReason::LocallyInitiatedCooperativeClosure, [nodes[1].node.get_our_node_id()], 1000000);
196+
check_closed_event!(nodes[1], 1, ClosureReason::CounterpartyInitiatedCooperativeClosure, [nodes[0].node.get_our_node_id()], 1000000);
197+
198+
assert_eq!(nodes[0].chain_monitor.chain_monitor.list_monitors().len(), 1);
199+
// First archive should set balances_empty_height to current block height
200+
nodes[0].chain_monitor.chain_monitor.archive_fully_resolved_channel_monitors();
201+
connect_blocks(&nodes[0], 2016);
202+
// Second call after 2016 blocks, should archive the monitor
203+
nodes[0].chain_monitor.chain_monitor.archive_fully_resolved_channel_monitors();
204+
// Should have no monitors left
205+
assert_eq!(nodes[0].chain_monitor.chain_monitor.list_monitors().len(), 0);
206+
// Remove the corresponding outputs and transactions the chain source is
207+
// watching. This is to make sure the `Drop` function assertions pass.
208+
nodes.get_mut(0).unwrap().chain_source.remove_watched_txn_and_outputs(
209+
OutPoint { txid: funding_tx.txid(), index: 0 },
210+
funding_tx.output[0].script_pubkey.clone()
211+
);
212+
}
213+
161214
fn do_chanmon_claim_value_coop_close(anchors: bool) {
162215
// Tests `get_claimable_balances` returns the correct values across a simple cooperative claim.
163216
// Specifically, this tests that the channel non-HTLC balances show up in

lightning/src/util/persist.rs

+58
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,11 @@ pub const CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";
5656
/// The primary namespace under which [`ChannelMonitorUpdate`]s will be persisted.
5757
pub const CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE: &str = "monitor_updates";
5858

59+
/// The primary namespace under which archived [`ChannelMonitor`]s will be persisted.
60+
pub const ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE: &str = "archived_monitors";
61+
/// The secondary namespace under which archived [`ChannelMonitor`]s will be persisted.
62+
pub const ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";
63+
5964
/// The primary namespace under which the [`NetworkGraph`] will be persisted.
6065
pub const NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE: &str = "";
6166
/// The secondary namespace under which the [`NetworkGraph`] will be persisted.
@@ -212,6 +217,34 @@ impl<ChannelSigner: WriteableEcdsaChannelSigner, K: KVStore + ?Sized> Persist<Ch
212217
Err(_) => chain::ChannelMonitorUpdateStatus::UnrecoverableError
213218
}
214219
}
220+
221+
fn archive_persisted_channel(&self, funding_txo: OutPoint) {
222+
let monitor_name = MonitorName::from(funding_txo);
223+
let monitor = match self.read(
224+
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
225+
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
226+
monitor_name.as_str(),
227+
) {
228+
Ok(monitor) => monitor,
229+
Err(_) => return
230+
};
231+
match self.write(
232+
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
233+
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
234+
monitor_name.as_str(),
235+
&monitor
236+
) {
237+
Ok(()) => {}
238+
Err(_e) => return
239+
};
240+
let key = format!("{}_{}", funding_txo.txid.to_string(), funding_txo.index);
241+
let _ = self.remove(
242+
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
243+
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
244+
&key,
245+
false
246+
);
247+
}
215248
}
216249

217250
/// Read previously persisted [`ChannelMonitor`]s from the store.
@@ -718,6 +751,31 @@ where
718751
self.persist_new_channel(funding_txo, monitor, monitor_update_call_id)
719752
}
720753
}
754+
755+
fn archive_persisted_channel(&self, funding_txo: OutPoint) {
756+
let monitor_name = MonitorName::from(funding_txo);
757+
let monitor = match self.read_monitor(&monitor_name) {
758+
Ok((_block_hash, monitor)) => monitor,
759+
Err(_) => return
760+
};
761+
match self.kv_store.write(
762+
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
763+
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
764+
monitor_name.as_str(),
765+
&monitor.encode()
766+
) {
767+
Ok(()) => {},
768+
Err(_e) => {
769+
return
770+
}
771+
};
772+
let _ = self.kv_store.remove(
773+
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
774+
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
775+
monitor_name.as_str(),
776+
false,
777+
);
778+
}
721779
}
722780

723781
impl<K: Deref, L: Deref, ES: Deref, SP: Deref> MonitorUpdatingPersister<K, L, ES, SP>

lightning/src/util/test_utils.rs

+20
Original file line numberDiff line numberDiff line change
@@ -504,6 +504,10 @@ impl<Signer: sign::ecdsa::WriteableEcdsaChannelSigner> chainmonitor::Persist<Sig
504504
}
505505
res
506506
}
507+
508+
fn archive_persisted_channel(&self, funding_txo: OutPoint) {
509+
<TestPersister as chainmonitor::Persist<TestChannelSigner>>::archive_persisted_channel(&self.persister, funding_txo);
510+
}
507511
}
508512

509513
pub struct TestPersister {
@@ -552,6 +556,18 @@ impl<Signer: sign::ecdsa::WriteableEcdsaChannelSigner> chainmonitor::Persist<Sig
552556
}
553557
ret
554558
}
559+
560+
fn archive_persisted_channel(&self, funding_txo: OutPoint) {
561+
// remove the channel from the offchain_monitor_updates map
562+
match self.offchain_monitor_updates.lock().unwrap().remove(&funding_txo) {
563+
Some(_) => {},
564+
None => {
565+
// If the channel was not in the offchain_monitor_updates map, it should be in the
566+
// chain_sync_monitor_persistences map.
567+
assert!(self.chain_sync_monitor_persistences.lock().unwrap().remove(&funding_txo).is_some());
568+
}
569+
};
570+
}
555571
}
556572

557573
pub struct TestStore {
@@ -1363,6 +1379,10 @@ impl TestChainSource {
13631379
watched_outputs: Mutex::new(new_hash_set()),
13641380
}
13651381
}
1382+
pub fn remove_watched_txn_and_outputs(&self, outpoint: OutPoint, script_pubkey: ScriptBuf) {
1383+
self.watched_outputs.lock().unwrap().remove(&(outpoint, script_pubkey.clone()));
1384+
self.watched_txn.lock().unwrap().remove(&(outpoint.txid, script_pubkey));
1385+
}
13661386
}
13671387

13681388
impl UtxoLookup for TestChainSource {

0 commit comments

Comments
 (0)