Skip to content

Commit e241ca4

Browse files
authored
Merge pull request #813 from jkczyz/2021-02-channel-monitor-mutex
Use interior mutability in ChannelMonitor
2 parents 59d2040 + e8ea0d9 commit e241ca4

File tree

10 files changed

+368
-192
lines changed

10 files changed

+368
-192
lines changed

fuzz/src/chanmon_consistency.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ impl chain::Watch<EnforcingSigner> for TestChainMonitor {
126126
hash_map::Entry::Occupied(entry) => entry,
127127
hash_map::Entry::Vacant(_) => panic!("Didn't have monitor on update call"),
128128
};
129-
let mut deserialized_monitor = <(BlockHash, channelmonitor::ChannelMonitor<EnforcingSigner>)>::
129+
let deserialized_monitor = <(BlockHash, channelmonitor::ChannelMonitor<EnforcingSigner>)>::
130130
read(&mut Cursor::new(&map_entry.get().1), &OnlyReadsKeysInterface {}).unwrap().1;
131131
deserialized_monitor.update_monitor(&update, &&TestBroadcaster{}, &&FuzzEstimator{}, &self.logger).unwrap();
132132
let mut ser = VecWriter(Vec::new());

lightning-block-sync/src/init.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ use lightning::chain;
3838
///
3939
/// use lightning_block_sync::*;
4040
///
41-
/// use std::cell::RefCell;
4241
/// use std::io::Cursor;
4342
///
4443
/// async fn init_sync<
@@ -83,7 +82,7 @@ use lightning::chain;
8382
///
8483
/// // Synchronize any channel monitors and the channel manager to be on the best block.
8584
/// let mut cache = UnboundedCache::new();
86-
/// let mut monitor_listener = (RefCell::new(monitor), &*tx_broadcaster, &*fee_estimator, &*logger);
85+
/// let mut monitor_listener = (monitor, &*tx_broadcaster, &*fee_estimator, &*logger);
8786
/// let listeners = vec![
8887
/// (monitor_block_hash, &mut monitor_listener as &mut dyn chain::Listen),
8988
/// (manager_block_hash, &mut manager as &mut dyn chain::Listen),
@@ -92,7 +91,7 @@ use lightning::chain;
9291
/// block_source, Network::Bitcoin, &mut cache, listeners).await.unwrap();
9392
///
9493
/// // Allow the chain monitor to watch any channels.
95-
/// let monitor = monitor_listener.0.into_inner();
94+
/// let monitor = monitor_listener.0;
9695
/// chain_monitor.watch_channel(monitor.get_funding_txo().0, monitor);
9796
///
9897
/// // Create an SPV client to notify the chain monitor and channel manager of block events.

lightning/src/chain/chainmonitor.rs

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ use util::events;
4343
use util::events::Event;
4444

4545
use std::collections::{HashMap, hash_map};
46-
use std::sync::Mutex;
46+
use std::sync::RwLock;
4747
use std::ops::Deref;
4848

4949
/// An implementation of [`chain::Watch`] for monitoring channels.
@@ -64,7 +64,7 @@ pub struct ChainMonitor<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: De
6464
P::Target: channelmonitor::Persist<ChannelSigner>,
6565
{
6666
/// The monitors
67-
pub monitors: Mutex<HashMap<OutPoint, ChannelMonitor<ChannelSigner>>>,
67+
pub monitors: RwLock<HashMap<OutPoint, ChannelMonitor<ChannelSigner>>>,
6868
chain_source: Option<C>,
6969
broadcaster: T,
7070
logger: L,
@@ -93,8 +93,8 @@ where C::Target: chain::Filter,
9393
/// [`chain::Watch::release_pending_monitor_events`]: ../trait.Watch.html#tymethod.release_pending_monitor_events
9494
/// [`chain::Filter`]: ../trait.Filter.html
9595
pub fn block_connected(&self, header: &BlockHeader, txdata: &TransactionData, height: u32) {
96-
let mut monitors = self.monitors.lock().unwrap();
97-
for monitor in monitors.values_mut() {
96+
let monitors = self.monitors.read().unwrap();
97+
for monitor in monitors.values() {
9898
let mut txn_outputs = monitor.block_connected(header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger);
9999

100100
if let Some(ref chain_source) = self.chain_source {
@@ -113,8 +113,8 @@ where C::Target: chain::Filter,
113113
///
114114
/// [`ChannelMonitor::block_disconnected`]: ../channelmonitor/struct.ChannelMonitor.html#method.block_disconnected
115115
pub fn block_disconnected(&self, header: &BlockHeader, disconnected_height: u32) {
116-
let mut monitors = self.monitors.lock().unwrap();
117-
for monitor in monitors.values_mut() {
116+
let monitors = self.monitors.read().unwrap();
117+
for monitor in monitors.values() {
118118
monitor.block_disconnected(header, disconnected_height, &*self.broadcaster, &*self.fee_estimator, &*self.logger);
119119
}
120120
}
@@ -130,7 +130,7 @@ where C::Target: chain::Filter,
130130
/// [`chain::Filter`]: ../trait.Filter.html
131131
pub fn new(chain_source: Option<C>, broadcaster: T, logger: L, feeest: F, persister: P) -> Self {
132132
Self {
133-
monitors: Mutex::new(HashMap::new()),
133+
monitors: RwLock::new(HashMap::new()),
134134
chain_source,
135135
broadcaster,
136136
logger,
@@ -177,7 +177,7 @@ where C::Target: chain::Filter,
177177
///
178178
/// [`chain::Filter`]: ../trait.Filter.html
179179
fn watch_channel(&self, funding_outpoint: OutPoint, monitor: ChannelMonitor<ChannelSigner>) -> Result<(), ChannelMonitorUpdateErr> {
180-
let mut monitors = self.monitors.lock().unwrap();
180+
let mut monitors = self.monitors.write().unwrap();
181181
let entry = match monitors.entry(funding_outpoint) {
182182
hash_map::Entry::Occupied(_) => {
183183
log_error!(self.logger, "Failed to add new channel data: channel monitor for given outpoint is already present");
@@ -209,8 +209,8 @@ where C::Target: chain::Filter,
209209
/// `ChainMonitor` monitors lock.
210210
fn update_channel(&self, funding_txo: OutPoint, update: ChannelMonitorUpdate) -> Result<(), ChannelMonitorUpdateErr> {
211211
// Update the monitor that watches the channel referred to by the given outpoint.
212-
let mut monitors = self.monitors.lock().unwrap();
213-
match monitors.get_mut(&funding_txo) {
212+
let monitors = self.monitors.read().unwrap();
213+
match monitors.get(&funding_txo) {
214214
None => {
215215
log_error!(self.logger, "Failed to update channel monitor: no such monitor registered");
216216

@@ -222,15 +222,15 @@ where C::Target: chain::Filter,
222222
#[cfg(not(any(test, feature = "fuzztarget")))]
223223
Err(ChannelMonitorUpdateErr::PermanentFailure)
224224
},
225-
Some(orig_monitor) => {
226-
log_trace!(self.logger, "Updating Channel Monitor for channel {}", log_funding_info!(orig_monitor));
227-
let update_res = orig_monitor.update_monitor(&update, &self.broadcaster, &self.fee_estimator, &self.logger);
225+
Some(monitor) => {
226+
log_trace!(self.logger, "Updating Channel Monitor for channel {}", log_funding_info!(monitor));
227+
let update_res = monitor.update_monitor(&update, &self.broadcaster, &self.fee_estimator, &self.logger);
228228
if let Err(e) = &update_res {
229229
log_error!(self.logger, "Failed to update channel monitor: {:?}", e);
230230
}
231231
// Even if updating the monitor returns an error, the monitor's state will
232232
// still be changed. So, persist the updated monitor despite the error.
233-
let persist_res = self.persister.update_persisted_channel(funding_txo, &update, orig_monitor);
233+
let persist_res = self.persister.update_persisted_channel(funding_txo, &update, monitor);
234234
if let Err(ref e) = persist_res {
235235
log_error!(self.logger, "Failed to persist channel monitor update: {:?}", e);
236236
}
@@ -245,8 +245,8 @@ where C::Target: chain::Filter,
245245

246246
fn release_pending_monitor_events(&self) -> Vec<MonitorEvent> {
247247
let mut pending_monitor_events = Vec::new();
248-
for chan in self.monitors.lock().unwrap().values_mut() {
249-
pending_monitor_events.append(&mut chan.get_and_clear_pending_monitor_events());
248+
for monitor in self.monitors.read().unwrap().values() {
249+
pending_monitor_events.append(&mut monitor.get_and_clear_pending_monitor_events());
250250
}
251251
pending_monitor_events
252252
}
@@ -261,8 +261,8 @@ impl<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref> even
261261
{
262262
fn get_and_clear_pending_events(&self) -> Vec<Event> {
263263
let mut pending_events = Vec::new();
264-
for chan in self.monitors.lock().unwrap().values_mut() {
265-
pending_events.append(&mut chan.get_and_clear_pending_events());
264+
for monitor in self.monitors.read().unwrap().values() {
265+
pending_events.append(&mut monitor.get_and_clear_pending_events());
266266
}
267267
pending_events
268268
}

0 commit comments

Comments
 (0)