Skip to content

Commit 804e658

Browse files
Add persistence_lock to ChannelManager
This will allow the ChannelManager to signal when it has new updates to persist.
1 parent 6277aa6 commit 804e658

File tree

1 file changed

+66
-2
lines changed

1 file changed

+66
-2
lines changed

lightning/src/ln/channelmanager.rs

Lines changed: 66 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ use util::errors::APIError;
5858
use std::{cmp, mem};
5959
use std::collections::{HashMap, hash_map, HashSet};
6060
use std::io::{Cursor, Read};
61-
use std::sync::{Arc, Mutex, MutexGuard, RwLock};
61+
use std::sync::{Arc, Mutex, MutexGuard, RwLock, Condvar};
6262
use std::sync::atomic::{AtomicUsize, Ordering};
6363
use std::time::Duration;
6464
use std::marker::{Sync, Send};
@@ -439,6 +439,41 @@ pub struct ChannelManager<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref,
439439
/// Taken first everywhere where we are making changes before any other locks.
440440
total_consistency_lock: RwLock<()>,
441441

442+
/// Used to signal to the ChannelManager persister that the manager needs to be re-persisted to
443+
/// disk/backups.
444+
/// Example usage:
445+
/// ```
446+
/// use std::sync::Arc;
447+
/// type TxBroadcaster = dyn lightning::chain::chaininterface::BroadcasterInterface;
448+
/// type FeeEstimator = dyn lightning::chain::chaininterface::FeeEstimator;
449+
/// type Logger = dyn lightning::util::logger::Logger;
450+
/// type ChainAccess = dyn lightning::chain::Access;
451+
/// type ChainFilter = dyn lightning::chain::Filter;
452+
/// type DataPersister = dyn lightning::chain::channelmonitor::Persist<lightning::chain::keysinterface::InMemoryChannelKeys>;
453+
/// type ChainMonitor = lightning::chain::chainmonitor::ChainMonitor<lightning::chain::keysinterface::InMemoryChannelKeys, Arc<ChainFilter>, Arc<TxBroadcaster>, Arc<FeeEstimator>, Arc<Logger>, Arc<DataPersister>>;
454+
/// type ChannelManager = lightning::ln::channelmanager::SimpleArcChannelManager<ChainMonitor, TxBroadcaster, FeeEstimator, Logger>;
455+
/// fn start(manager: ChannelManager) {
456+
/// let mutcond = Arc::clone(&manager.persistence_lock);
457+
/// // ChannelManager persistence should generally be run in a background thread since it can be
458+
/// // time-consuming to persist.
459+
/// std::thread::spawn(move || {
460+
/// let &(ref mtx, ref cnd) = &*mutcond;
461+
/// loop {
462+
/// let mut guard = mtx.lock().unwrap();
463+
/// // If there's a new update, we break out of the while loop, reset the condition variable,
464+
/// // and persist the ChannelManager to disk/backups.
465+
/// while !*guard {
466+
/// guard = cnd.wait(guard).unwrap();
467+
/// }
468+
/// *guard = false;
469+
/// std::mem::drop(guard); // Don't hold the lock while persisting the ChannelManager.
470+
/// // Persist the ChannelManager here.
471+
/// }
472+
/// });
473+
/// }
474+
/// ```
475+
pub persistence_lock: Arc<(Mutex<bool>, Condvar)>,
476+
442477
keys_manager: K,
443478

444479
logger: L,
@@ -759,6 +794,8 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
759794
pending_events: Mutex::new(Vec::new()),
760795
total_consistency_lock: RwLock::new(()),
761796

797+
persistence_lock: Arc::new((Mutex::new(false), Condvar::new())),
798+
762799
keys_manager,
763800

764801
logger,
@@ -912,6 +949,7 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
912949
// the latest local state, which is the best we can do anyway. Thus, it is safe to
913950
// ignore the result here.
914951
let _ = self.chain_monitor.update_channel(funding_txo, monitor_update);
952+
self.persist_updates();
915953
}
916954
}
917955

@@ -1312,6 +1350,7 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
13121350
commitment_signed,
13131351
},
13141352
});
1353+
self.persist_updates();
13151354
},
13161355
None => {},
13171356
}
@@ -1706,6 +1745,7 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
17061745
commitment_signed: commitment_msg,
17071746
},
17081747
});
1748+
self.persist_updates();
17091749
}
17101750
} else {
17111751
unreachable!();
@@ -2125,6 +2165,7 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
21252165
}
21262166
});
21272167
}
2168+
self.persist_updates();
21282169
return Ok(())
21292170
},
21302171
Err(e) => {
@@ -2339,6 +2380,15 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
23392380
Ok(())
23402381
}
23412382

2383+
// Signal to the ChannelManager persister that there are updates necessitating persisting to disk.
2384+
fn persist_updates(&self) {
2385+
let &(ref persist_mtx, ref cnd) = &*self.persistence_lock;
2386+
let mut persistence_lock = persist_mtx.lock().unwrap();
2387+
*persistence_lock = true;
2388+
cnd.notify_all();
2389+
2390+
}
2391+
23422392
fn internal_funding_created(&self, counterparty_node_id: &PublicKey, msg: &msgs::FundingCreated) -> Result<(), MsgHandleErrInternal> {
23432393
let ((funding_msg, monitor), mut chan) = {
23442394
let mut channel_lock = self.channel_state.lock().unwrap();
@@ -2373,6 +2423,9 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
23732423
},
23742424
}
23752425
}
2426+
2427+
self.persist_updates();
2428+
23762429
let mut channel_state_lock = self.channel_state.lock().unwrap();
23772430
let channel_state = &mut *channel_state_lock;
23782431
match channel_state.by_id.entry(funding_msg.channel_id) {
@@ -2411,6 +2464,9 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
24112464
hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id))
24122465
}
24132466
};
2467+
2468+
self.persist_updates();
2469+
24142470
let mut pending_events = self.pending_events.lock().unwrap();
24152471
pending_events.push(events::Event::FundingBroadcastSafe {
24162472
funding_txo,
@@ -2706,6 +2762,7 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
27062762
msg,
27072763
});
27082764
}
2765+
self.persist_updates();
27092766
Ok(())
27102767
},
27112768
hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id))
@@ -2797,9 +2854,13 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
27972854
self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), failure.0, &failure.1, failure.2);
27982855
}
27992856
self.forward_htlcs(&mut [(short_channel_id, channel_outpoint, pending_forwards)]);
2857+
self.persist_updates();
28002858
Ok(())
28012859
},
2802-
Err(e) => Err(e)
2860+
Err(e) => {
2861+
self.persist_updates();
2862+
Err(e)
2863+
}
28032864
}
28042865
}
28052866

@@ -2940,6 +3001,7 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
29403001
msg,
29413002
});
29423003
}
3004+
self.persist_updates();
29433005
Ok(())
29443006
},
29453007
hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id))
@@ -2989,6 +3051,7 @@ impl<ChanSigner: ChannelKeys, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
29893051
commitment_signed,
29903052
},
29913053
});
3054+
self.persist_updates();
29923055
}
29933056
},
29943057
}
@@ -3988,6 +4051,7 @@ impl<'a, ChanSigner: ChannelKeys + Readable, M: Deref, T: Deref, K: Deref, F: De
39884051

39894052
pending_events: Mutex::new(pending_events_read),
39904053
total_consistency_lock: RwLock::new(()),
4054+
persistence_lock: Arc::new((Mutex::new(false), Condvar::new())),
39914055
keys_manager: args.keys_manager,
39924056
logger: args.logger,
39934057
default_configuration: args.default_config,

0 commit comments

Comments
 (0)