Skip to content

Commit 5e3aadc

Browse files
Integrate ChannelDataPersister into ChainMonitor.
- add a test for the LinuxPersister. - update MonitorUpdateErr to have a boolean whether the updated ChannelMonitor should be persisted despite the error - The ChainMonitor should: 1) Call the ChannelDataPersister on startup to retrieve the stored monitors from disk, 2) Whenever a new channel is added or updated, these updates should be conveyed to the DataPersister and persisted to disk.
1 parent c66189c commit 5e3aadc

File tree

10 files changed

+163
-68
lines changed

10 files changed

+163
-68
lines changed

fuzz/src/chanmon_consistency.rs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ use lightning::chain::transaction::OutPoint;
3636
use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator};
3737
use lightning::chain::keysinterface::{KeysInterface, InMemoryChannelKeys};
3838
use lightning::ln::channelmanager::{ChannelManager, PaymentHash, PaymentPreimage, PaymentSecret, ChannelManagerReadArgs};
39+
use lightning::ln::data_persister::ChannelDataPersister;
3940
use lightning::ln::features::{ChannelFeatures, InitFeatures, NodeFeatures};
4041
use lightning::ln::msgs::{CommitmentUpdate, ChannelMessageHandler, ErrorAction, UpdateAddHTLC, Init};
4142
use lightning::util::enforcing_trait_impls::EnforcingChannelKeys;
@@ -47,6 +48,7 @@ use lightning::util::ser::{Readable, ReadableArgs, Writeable, Writer};
4748
use lightning::routing::router::{Route, RouteHop};
4849

4950

51+
use utils::test_data_persister::TestChanDataPersister;
5052
use utils::test_logger;
5153

5254
use bitcoin::secp256k1::key::{PublicKey,SecretKey};
@@ -84,7 +86,7 @@ impl Writer for VecWriter {
8486

8587
struct TestChainMonitor {
8688
pub logger: Arc<dyn Logger>,
87-
pub chain_monitor: Arc<chainmonitor::ChainMonitor<EnforcingChannelKeys, Arc<dyn chain::Filter>, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>>>,
89+
pub chain_monitor: Arc<chainmonitor::ChainMonitor<EnforcingChannelKeys, Arc<dyn chain::Filter>, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>, Arc<dyn ChannelDataPersister<Keys=EnforcingChannelKeys>>>>,
8890
pub update_ret: Mutex<Result<(), channelmonitor::ChannelMonitorUpdateErr>>,
8991
// If we reload a node with an old copy of ChannelMonitors, the ChannelManager deserialization
9092
// logic will automatically force-close our channels for us (as we don't have an up-to-date
@@ -95,9 +97,9 @@ struct TestChainMonitor {
9597
pub should_update_manager: atomic::AtomicBool,
9698
}
9799
impl TestChainMonitor {
98-
pub fn new(broadcaster: Arc<TestBroadcaster>, logger: Arc<dyn Logger>, feeest: Arc<FuzzEstimator>) -> Self {
100+
pub fn new(broadcaster: Arc<TestBroadcaster>, logger: Arc<dyn Logger>, feeest: Arc<FuzzEstimator>, data_persister: Arc<dyn ChannelDataPersister<Keys=EnforcingChannelKeys>>) -> Self {
99101
Self {
100-
chain_monitor: Arc::new(chainmonitor::ChainMonitor::new(None, broadcaster, logger.clone(), feeest)),
102+
chain_monitor: Arc::new(chainmonitor::ChainMonitor::new(None, broadcaster, logger.clone(), feeest, data_persister)),
101103
logger,
102104
update_ret: Mutex::new(Ok(())),
103105
latest_monitors: Mutex::new(HashMap::new()),
@@ -127,7 +129,7 @@ impl chain::Watch for TestChainMonitor {
127129
};
128130
let mut deserialized_monitor = <(BlockHash, channelmonitor::ChannelMonitor<EnforcingChannelKeys>)>::
129131
read(&mut Cursor::new(&map_entry.get().1)).unwrap().1;
130-
deserialized_monitor.update_monitor(update.clone(), &&TestBroadcaster {}, &self.logger).unwrap();
132+
deserialized_monitor.update_monitor(&update, &&TestBroadcaster {}, &self.logger).unwrap();
131133
let mut ser = VecWriter(Vec::new());
132134
deserialized_monitor.write_for_disk(&mut ser).unwrap();
133135
map_entry.insert((update.update_id, ser.0));
@@ -192,7 +194,8 @@ pub fn do_test<Out: test_logger::Output>(data: &[u8], out: Out) {
192194
macro_rules! make_node {
193195
($node_id: expr) => { {
194196
let logger: Arc<dyn Logger> = Arc::new(test_logger::TestLogger::new($node_id.to_string(), out.clone()));
195-
let monitor = Arc::new(TestChainMonitor::new(broadcast.clone(), logger.clone(), fee_est.clone()));
197+
let data_persister = Arc::new(TestChanDataPersister{});
198+
let monitor = Arc::new(TestChainMonitor::new(broadcast.clone(), logger.clone(), fee_est.clone(), data_persister.clone()));
196199

197200
let keys_manager = Arc::new(KeyProvider { node_id: $node_id, rand_bytes_id: atomic::AtomicU8::new(0) });
198201
let mut config = UserConfig::default();
@@ -207,7 +210,8 @@ pub fn do_test<Out: test_logger::Output>(data: &[u8], out: Out) {
207210
macro_rules! reload_node {
208211
($ser: expr, $node_id: expr, $old_monitors: expr) => { {
209212
let logger: Arc<dyn Logger> = Arc::new(test_logger::TestLogger::new($node_id.to_string(), out.clone()));
210-
let chain_monitor = Arc::new(TestChainMonitor::new(broadcast.clone(), logger.clone(), fee_est.clone()));
213+
let data_persister = Arc::new(TestChanDataPersister{});
214+
let chain_monitor = Arc::new(TestChainMonitor::new(broadcast.clone(), logger.clone(), fee_est.clone(), data_persister.clone()));
211215

212216
let keys_manager = Arc::new(KeyProvider { node_id: $node_id, rand_bytes_id: atomic::AtomicU8::new(0) });
213217
let mut config = UserConfig::default();

fuzz/src/full_stack.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use lightning::chain::chainmonitor;
3131
use lightning::chain::transaction::OutPoint;
3232
use lightning::chain::keysinterface::{InMemoryChannelKeys, KeysInterface};
3333
use lightning::ln::channelmanager::{ChannelManager, PaymentHash, PaymentPreimage, PaymentSecret};
34+
use lightning::ln::data_persister::ChannelDataPersister;
3435
use lightning::ln::peer_handler::{MessageHandler,PeerManager,SocketDescriptor};
3536
use lightning::routing::router::get_route;
3637
use lightning::routing::network_graph::NetGraphMsgHandler;
@@ -39,6 +40,7 @@ use lightning::util::enforcing_trait_impls::EnforcingChannelKeys;
3940
use lightning::util::logger::Logger;
4041
use lightning::util::config::UserConfig;
4142

43+
use utils::test_data_persister::TestChanDataPersister;
4244
use utils::test_logger;
4345

4446
use bitcoin::secp256k1::key::{PublicKey,SecretKey};
@@ -145,13 +147,13 @@ impl<'a> std::hash::Hash for Peer<'a> {
145147

146148
type ChannelMan = ChannelManager<
147149
EnforcingChannelKeys,
148-
Arc<chainmonitor::ChainMonitor<EnforcingChannelKeys, Arc<dyn chain::Filter>, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>>>,
150+
Arc<chainmonitor::ChainMonitor<EnforcingChannelKeys, Arc<dyn chain::Filter>, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>, Arc<dyn ChannelDataPersister<Keys=EnforcingChannelKeys>>>>,
149151
Arc<TestBroadcaster>, Arc<KeyProvider>, Arc<FuzzEstimator>, Arc<dyn Logger>>;
150152
type PeerMan<'a> = PeerManager<Peer<'a>, Arc<ChannelMan>, Arc<NetGraphMsgHandler<Arc<dyn chain::Access>, Arc<dyn Logger>>>, Arc<dyn Logger>>;
151153

152154
struct MoneyLossDetector<'a> {
153155
manager: Arc<ChannelMan>,
154-
monitor: Arc<chainmonitor::ChainMonitor<EnforcingChannelKeys, Arc<dyn chain::Filter>, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>>>,
156+
monitor: Arc<chainmonitor::ChainMonitor<EnforcingChannelKeys, Arc<dyn chain::Filter>, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>, Arc<dyn ChannelDataPersister<Keys=EnforcingChannelKeys>>>>,
155157
handler: PeerMan<'a>,
156158

157159
peers: &'a RefCell<[bool; 256]>,
@@ -165,7 +167,7 @@ struct MoneyLossDetector<'a> {
165167
impl<'a> MoneyLossDetector<'a> {
166168
pub fn new(peers: &'a RefCell<[bool; 256]>,
167169
manager: Arc<ChannelMan>,
168-
monitor: Arc<chainmonitor::ChainMonitor<EnforcingChannelKeys, Arc<dyn chain::Filter>, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>>>,
170+
monitor: Arc<chainmonitor::ChainMonitor<EnforcingChannelKeys, Arc<dyn chain::Filter>, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>, Arc<dyn ChannelDataPersister<Keys=EnforcingChannelKeys>>>>,
169171
handler: PeerMan<'a>) -> Self {
170172
MoneyLossDetector {
171173
manager,
@@ -333,7 +335,8 @@ pub fn do_test(data: &[u8], logger: &Arc<dyn Logger>) {
333335
};
334336

335337
let broadcast = Arc::new(TestBroadcaster{});
336-
let monitor = Arc::new(chainmonitor::ChainMonitor::new(None, broadcast.clone(), Arc::clone(&logger), fee_est.clone()));
338+
let data_persister: Arc<dyn ChannelDataPersister<Keys = EnforcingChannelKeys>> = Arc::new(TestChanDataPersister{});
339+
let monitor = Arc::new(chainmonitor::ChainMonitor::new(None, broadcast.clone(), Arc::clone(&logger), fee_est.clone(), data_persister.clone()));
337340

338341
let keys_manager = Arc::new(KeyProvider { node_secret: our_network_key.clone(), counter: AtomicU64::new(0) });
339342
let mut config = UserConfig::default();

fuzz/src/utils/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,5 @@
77
// You may not use this file except in accordance with one or both of these
88
// licenses.
99

10+
pub mod test_data_persister;
1011
pub mod test_logger;

fuzz/src/utils/test_data_persister.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
use lightning::ln::data_persister::ChannelDataPersister;
2+
use lightning::chain::channelmonitor;
3+
use lightning::chain::transaction::OutPoint;
4+
use lightning::util::enforcing_trait_impls::EnforcingChannelKeys;
5+
6+
pub struct TestChanDataPersister {}
7+
impl ChannelDataPersister for TestChanDataPersister {
8+
type Keys = EnforcingChannelKeys;
9+
10+
fn persist_channel_data(&self, _funding_txo: OutPoint, _data: &channelmonitor::ChannelMonitor<EnforcingChannelKeys>) -> Result<(), channelmonitor::ChannelMonitorUpdateErr> {
11+
Ok(())
12+
}
13+
14+
fn update_channel_data(&self, _funding_txo: OutPoint, _update: &channelmonitor::ChannelMonitorUpdate, _data: &channelmonitor::ChannelMonitor<EnforcingChannelKeys>) -> Result<(), channelmonitor::ChannelMonitorUpdateErr> {
15+
Ok(())
16+
}
17+
}

lightning-net-tokio/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@
3636
//! type Logger = dyn lightning::util::logger::Logger;
3737
//! type ChainAccess = dyn lightning::chain::Access;
3838
//! type ChainFilter = dyn lightning::chain::Filter;
39-
//! type ChainMonitor = lightning::chain::chainmonitor::ChainMonitor<lightning::chain::keysinterface::InMemoryChannelKeys, Arc<ChainFilter>, Arc<TxBroadcaster>, Arc<FeeEstimator>, Arc<Logger>>;
39+
//! type DataPersister = dyn lightning::ln::data_persister::ChannelDataPersister<Keys = lightning::chain::keysinterface::InMemoryChannelKeys>;
40+
//! type ChainMonitor = lightning::chain::chainmonitor::ChainMonitor<lightning::chain::keysinterface::InMemoryChannelKeys, Arc<ChainFilter>, Arc<TxBroadcaster>, Arc<FeeEstimator>, Arc<Logger>, Arc<DataPersister>>;
4041
//! type ChannelManager = lightning::ln::channelmanager::SimpleArcChannelManager<ChainMonitor, TxBroadcaster, FeeEstimator, Logger>;
4142
//! type PeerManager = lightning::ln::peer_handler::SimpleArcPeerManager<lightning_net_tokio::SocketDescriptor, ChainMonitor, TxBroadcaster, FeeEstimator, ChainAccess, Logger>;
4243
//!

lightning/src/chain/chainmonitor.rs

Lines changed: 66 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ use chain::chaininterface::{BroadcasterInterface, FeeEstimator};
3737
use chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateErr, MonitorEvent, MonitorUpdateError};
3838
use chain::transaction::{OutPoint, TransactionData};
3939
use chain::keysinterface::ChannelKeys;
40+
use ln::data_persister::ChannelDataPersister;
4041
use util::logger::Logger;
4142
use util::events;
4243
use util::events::Event;
@@ -55,25 +56,28 @@ use std::ops::Deref;
5556
/// [`chain::Watch`]: ../trait.Watch.html
5657
/// [`ChannelManager`]: ../../ln/channelmanager/struct.ChannelManager.html
5758
/// [module-level documentation]: index.html
58-
pub struct ChainMonitor<ChanSigner: ChannelKeys, C: Deref, T: Deref, F: Deref, L: Deref>
59+
pub struct ChainMonitor<ChanSigner: ChannelKeys, C: Deref, T: Deref, F: Deref, L: Deref, D: Deref>
5960
where C::Target: chain::Filter,
6061
T::Target: BroadcasterInterface,
6162
F::Target: FeeEstimator,
6263
L::Target: Logger,
64+
D::Target: ChannelDataPersister<Keys=ChanSigner>,
6365
{
6466
/// The monitors
6567
pub monitors: Mutex<HashMap<OutPoint, ChannelMonitor<ChanSigner>>>,
6668
chain_source: Option<C>,
6769
broadcaster: T,
6870
logger: L,
69-
fee_estimator: F
71+
fee_estimator: F,
72+
data_persister: D,
7073
}
7174

72-
impl<ChanSigner: ChannelKeys, C: Deref, T: Deref, F: Deref, L: Deref> ChainMonitor<ChanSigner, C, T, F, L>
73-
where C::Target: chain::Filter,
74-
T::Target: BroadcasterInterface,
75-
F::Target: FeeEstimator,
76-
L::Target: Logger,
75+
impl<ChanSigner: ChannelKeys, C: Deref, T: Deref, F: Deref, L: Deref, D: Deref> ChainMonitor<ChanSigner, C, T, F, L, D>
76+
where C::Target: chain::Filter,
77+
T::Target: BroadcasterInterface,
78+
F::Target: FeeEstimator,
79+
L::Target: Logger,
80+
D::Target: ChannelDataPersister<Keys=ChanSigner>,
7781
{
7882
/// Dispatches to per-channel monitors, which are responsible for updating their on-chain view
7983
/// of a channel and reacting accordingly based on transactions in the connected block. See
@@ -128,13 +132,14 @@ impl<ChanSigner: ChannelKeys, C: Deref, T: Deref, F: Deref, L: Deref> ChainMonit
128132
/// transactions relevant to the watched channels.
129133
///
130134
/// [`chain::Filter`]: ../trait.Filter.html
131-
pub fn new(chain_source: Option<C>, broadcaster: T, logger: L, feeest: F) -> Self {
135+
pub fn new(chain_source: Option<C>, broadcaster: T, logger: L, feeest: F, data_persister: D) -> Self {
132136
Self {
133137
monitors: Mutex::new(HashMap::new()),
134138
chain_source,
135139
broadcaster,
136140
logger,
137141
fee_estimator: feeest,
142+
data_persister,
138143
}
139144
}
140145

@@ -143,10 +148,12 @@ impl<ChanSigner: ChannelKeys, C: Deref, T: Deref, F: Deref, L: Deref> ChainMonit
143148
/// Calls back to [`chain::Filter`] with the funding transaction and outputs to watch.
144149
///
145150
/// [`chain::Filter`]: ../trait.Filter.html
146-
fn add_monitor(&self, outpoint: OutPoint, monitor: ChannelMonitor<ChanSigner>) -> Result<(), MonitorUpdateError> {
151+
fn add_monitor(&self, outpoint: OutPoint, monitor: ChannelMonitor<ChanSigner>) -> Result<(), ChannelMonitorUpdateErr> {
147152
let mut monitors = self.monitors.lock().unwrap();
148153
let entry = match monitors.entry(outpoint) {
149-
hash_map::Entry::Occupied(_) => return Err(MonitorUpdateError("Channel monitor for given outpoint is already present")),
154+
hash_map::Entry::Occupied(_) => {
155+
log_error!(self.logger, "Failed to add new channel data: channel monitor for given outpoint is already present");
156+
return Err(ChannelMonitorUpdateErr::PermanentFailure)},
150157
hash_map::Entry::Vacant(e) => e,
151158
};
152159
{
@@ -162,43 +169,69 @@ impl<ChanSigner: ChannelKeys, C: Deref, T: Deref, F: Deref, L: Deref> ChainMonit
162169
}
163170
}
164171
}
172+
match self.data_persister.persist_channel_data(outpoint, &monitor) {
173+
Err(e) => {
174+
log_error!(self.logger, "Failed to persist new channel data");
175+
return Err(e);
176+
},
177+
_ => {}
178+
}
165179
entry.insert(monitor);
166180
Ok(())
167181
}
168182

169183
/// Updates the monitor that watches the channel referred to by the given outpoint.
170-
fn update_monitor(&self, outpoint: OutPoint, update: ChannelMonitorUpdate) -> Result<(), MonitorUpdateError> {
184+
fn update_monitor(&self, outpoint: OutPoint, update: ChannelMonitorUpdate) -> Result<(), ChannelMonitorUpdateErr> {
171185
let mut monitors = self.monitors.lock().unwrap();
172-
match monitors.get_mut(&outpoint) {
173-
Some(orig_monitor) => {
174-
log_trace!(self.logger, "Updating Channel Monitor for channel {}", log_funding_info!(orig_monitor));
175-
orig_monitor.update_monitor(update, &self.broadcaster, &self.logger)
176-
},
177-
None => Err(MonitorUpdateError("No such monitor registered"))
178-
}
186+
if let Some(orig_monitor) = monitors.get_mut(&outpoint) {
187+
log_trace!(self.logger, "Updating Channel Monitor for channel {}", log_funding_info!(orig_monitor));
188+
let mut should_persist = true;
189+
let res = orig_monitor.update_monitor(&update, &self.broadcaster, &self.logger);
190+
match res {
191+
Err(MonitorUpdateError::PersistMonitor(msg)) => {
192+
log_error!(self.logger, "{}", msg);
193+
},
194+
Err(MonitorUpdateError::NoPersistMonitor(msg)) => {
195+
log_error!(self.logger, "{}", msg);
196+
should_persist = false;
197+
},
198+
Ok(()) => {},
199+
}
200+
if should_persist {
201+
match self.data_persister.update_channel_data(outpoint, &update, orig_monitor) {
202+
Err(e) => {
203+
if !res.is_err() {
204+
return Err(e);
205+
}
206+
},
207+
_ => {}
208+
}
209+
}
210+
if res.is_err() { return Err(ChannelMonitorUpdateErr::PermanentFailure); }
211+
} else {
212+
log_error!(self.logger, "Failed to update channel monitor: no such monitor registered");
213+
return Err(ChannelMonitorUpdateErr::PermanentFailure);
214+
}
215+
216+
Ok(())
179217
}
180218
}
181219

182-
impl<ChanSigner: ChannelKeys, C: Deref + Sync + Send, T: Deref + Sync + Send, F: Deref + Sync + Send, L: Deref + Sync + Send> chain::Watch for ChainMonitor<ChanSigner, C, T, F, L>
183-
where C::Target: chain::Filter,
184-
T::Target: BroadcasterInterface,
185-
F::Target: FeeEstimator,
186-
L::Target: Logger,
220+
impl<ChanSigner: ChannelKeys, C: Deref + Sync + Send, T: Deref + Sync + Send, F: Deref + Sync + Send, L: Deref + Sync + Send, D: Deref + Sync + Send> chain::Watch for ChainMonitor<ChanSigner, C, T, F, L, D>
221+
where C::Target: chain::Filter,
222+
T::Target: BroadcasterInterface,
223+
F::Target: FeeEstimator,
224+
L::Target: Logger,
225+
D::Target: ChannelDataPersister<Keys=ChanSigner>,
187226
{
188227
type Keys = ChanSigner;
189228

190229
fn watch_channel(&self, funding_txo: OutPoint, monitor: ChannelMonitor<ChanSigner>) -> Result<(), ChannelMonitorUpdateErr> {
191-
match self.add_monitor(funding_txo, monitor) {
192-
Ok(_) => Ok(()),
193-
Err(_) => Err(ChannelMonitorUpdateErr::PermanentFailure),
194-
}
230+
self.add_monitor(funding_txo, monitor)
195231
}
196232

197233
fn update_channel(&self, funding_txo: OutPoint, update: ChannelMonitorUpdate) -> Result<(), ChannelMonitorUpdateErr> {
198-
match self.update_monitor(funding_txo, update) {
199-
Ok(_) => Ok(()),
200-
Err(_) => Err(ChannelMonitorUpdateErr::PermanentFailure),
201-
}
234+
self.update_monitor(funding_txo, update)
202235
}
203236

204237
fn release_pending_monitor_events(&self) -> Vec<MonitorEvent> {
@@ -210,11 +243,12 @@ impl<ChanSigner: ChannelKeys, C: Deref + Sync + Send, T: Deref + Sync + Send, F:
210243
}
211244
}
212245

213-
impl<ChanSigner: ChannelKeys, C: Deref, T: Deref, F: Deref, L: Deref> events::EventsProvider for ChainMonitor<ChanSigner, C, T, F, L>
246+
impl<ChanSigner: ChannelKeys, C: Deref, T: Deref, F: Deref, L: Deref, D: Deref> events::EventsProvider for ChainMonitor<ChanSigner, C, T, F, L, D>
214247
where C::Target: chain::Filter,
215248
T::Target: BroadcasterInterface,
216249
F::Target: FeeEstimator,
217250
L::Target: Logger,
251+
D::Target: ChannelDataPersister<Keys=ChanSigner>,
218252
{
219253
fn get_and_clear_pending_events(&self) -> Vec<Event> {
220254
let mut pending_events = Vec::new();

0 commit comments

Comments
 (0)