Skip to content

Commit 1d2f5b3

Browse files
committed
WIP: Common ChainListener implementations and example
1 parent f1ca2b1 commit 1d2f5b3

File tree

3 files changed

+187
-26
lines changed

3 files changed

+187
-26
lines changed

lightning-block-sync/src/init.rs

Lines changed: 166 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,18 @@ use bitcoin::blockdata::block::{Block, BlockHeader};
55
use bitcoin::hash_types::BlockHash;
66
use bitcoin::network::constants::Network;
77

8+
use lightning::chain;
9+
use lightning::chain::chainmonitor::ChainMonitor;
10+
use lightning::chain::chaininterface;
11+
use lightning::chain::channelmonitor;
12+
use lightning::chain::channelmonitor::ChannelMonitor;
13+
use lightning::chain::keysinterface;
14+
use lightning::ln::channelmanager::ChannelManager;
15+
use lightning::util::logger;
16+
17+
use std::cell::RefCell;
18+
use std::ops::Deref;
19+
820
/// Performs a one-time sync of chain listeners using a single *trusted* block source, bringing each
921
/// listener's view of the chain from its paired block hash to `block_source`'s best chain tip.
1022
///
@@ -13,7 +25,88 @@ use bitcoin::network::constants::Network;
1325
/// paired with.
1426
///
1527
/// Useful during startup to bring the [`ChannelManager`] and each [`ChannelMonitor`] in sync before
16-
/// switching to [`SpvClient`].
28+
/// switching to [`SpvClient`]. For example:
29+
///
30+
/// ```
31+
/// use bitcoin::hash_types::BlockHash;
32+
/// use bitcoin::network::constants::Network;
33+
///
34+
/// use lightning::chain;
35+
/// use lightning::chain::Watch;
36+
/// use lightning::chain::chainmonitor::ChainMonitor;
37+
/// use lightning::chain::channelmonitor;
38+
/// use lightning::chain::channelmonitor::ChannelMonitor;
39+
/// use lightning::chain::chaininterface::BroadcasterInterface;
40+
/// use lightning::chain::chaininterface::FeeEstimator;
41+
/// use lightning::chain::keysinterface::ChannelKeys;
42+
/// use lightning::chain::keysinterface::KeysInterface;
43+
/// use lightning::ln::channelmanager::ChannelManager;
44+
/// use lightning::ln::channelmanager::ChannelManagerReadArgs;
45+
/// use lightning::util::config::UserConfig;
46+
/// use lightning::util::logger::Logger;
47+
/// use lightning::util::ser::ReadableArgs;
48+
///
49+
/// use lightning_block_sync::*;
50+
///
51+
/// use std::cell::RefCell;
52+
/// use std::io::Cursor;
53+
///
54+
/// async fn init_sync<
55+
/// B: BlockSource,
56+
/// K: KeysInterface<ChanKeySigner = S>,
57+
/// S: ChannelKeys,
58+
/// T: BroadcasterInterface,
59+
/// F: FeeEstimator,
60+
/// L: Logger,
61+
/// C: chain::Filter,
62+
/// P: channelmonitor::Persist<S>,
63+
/// >(
64+
/// block_source: &mut B,
65+
/// chain_monitor: &ChainMonitor<S, &C, &T, &F, &L, &P>,
66+
/// config: UserConfig,
67+
/// keys_manager: &K,
68+
/// tx_broadcaster: &T,
69+
/// fee_estimator: &F,
70+
/// logger: &L,
71+
/// persister: &P,
72+
/// ) {
73+
/// let serialized_monitor = "...";
74+
/// let (monitor_block_hash, mut monitor) = <(BlockHash, ChannelMonitor<S>)>::read(
75+
/// &mut Cursor::new(&serialized_monitor), keys_manager).unwrap();
76+
///
77+
/// let serialized_manager = "...";
78+
/// let (manager_block_hash, mut manager) = {
79+
/// let read_args = ChannelManagerReadArgs::new(
80+
/// keys_manager,
81+
/// fee_estimator,
82+
/// chain_monitor,
83+
/// tx_broadcaster,
84+
/// logger,
85+
/// config,
86+
/// vec![&mut monitor],
87+
/// );
88+
/// <(BlockHash, ChannelManager<S, &ChainMonitor<S, &C, &T, &F, &L, &P>, &T, &K, &F, &L>)>::read(
89+
/// &mut Cursor::new(&serialized_manager), read_args).unwrap()
90+
/// };
91+
///
92+
/// let mut cache = UnboundedCache::new();
93+
/// let mut monitor_listener = (RefCell::new(monitor), tx_broadcaster, fee_estimator, logger);
94+
/// let mut manager_listener = &manager;
95+
/// let listeners = vec![
96+
/// (monitor_block_hash, &mut monitor_listener as &mut dyn ChainListener),
97+
/// (manager_block_hash, &mut manager_listener as &mut dyn ChainListener),
98+
/// ];
99+
/// let chain_tip =
100+
/// init::sync_listeners(block_source, Network::Bitcoin, &mut cache, listeners).await.unwrap();
101+
///
102+
/// let monitor = monitor_listener.0.into_inner();
103+
/// chain_monitor.watch_channel(monitor.get_funding_txo().0, monitor);
104+
///
105+
/// let chain_poller = poll::ChainPoller::new(block_source, Network::Bitcoin);
106+
/// let chain_listener = (&chain_monitor, &manager_listener);
107+
/// let spv_client = SpvClient::new(chain_tip, chain_poller, &mut cache, chain_listener);
108+
/// }
109+
/// ```
17110
///
18111
/// [`SpvClient`]: ../struct.SpvClient.html
19112
/// [`ChannelManager`]: ../../lightning/ln/channelmanager/struct.ChannelManager.html
@@ -104,11 +197,11 @@ impl<'a, C: Cache> Cache for ReadOnlyCache<'a, C> {
104197
struct DynamicChainListener<'a>(&'a mut dyn ChainListener);
105198

106199
impl<'a> ChainListener for DynamicChainListener<'a> {
107-
fn block_connected(&mut self, _block: &Block, _height: u32) {
200+
fn block_connected(&self, _block: &Block, _height: u32) {
108201
unreachable!()
109202
}
110203

111-
fn block_disconnected(&mut self, header: &BlockHeader, height: u32) {
204+
fn block_disconnected(&self, header: &BlockHeader, height: u32) {
112205
self.0.block_disconnected(header, height)
113206
}
114207
}
@@ -117,19 +210,86 @@ impl<'a> ChainListener for DynamicChainListener<'a> {
117210
struct ChainListenerSet<'a>(Vec<(u32, &'a mut dyn ChainListener)>);
118211

119212
impl<'a> ChainListener for ChainListenerSet<'a> {
120-
fn block_connected(&mut self, block: &Block, height: u32) {
121-
for (starting_height, chain_listener) in self.0.iter_mut() {
213+
fn block_connected(&self, block: &Block, height: u32) {
214+
for (starting_height, chain_listener) in self.0.iter() {
122215
if height > *starting_height {
123216
chain_listener.block_connected(block, height);
124217
}
125218
}
126219
}
127220

128-
fn block_disconnected(&mut self, _header: &BlockHeader, _height: u32) {
221+
fn block_disconnected(&self, _header: &BlockHeader, _height: u32) {
129222
unreachable!()
130223
}
131224
}
132225

226+
impl<S, B: Deref, F: Deref, L: Deref> ChainListener for (RefCell<ChannelMonitor<S>>, B, F, L)
227+
where
228+
S: keysinterface::ChannelKeys,
229+
B::Target: chaininterface::BroadcasterInterface,
230+
F::Target: chaininterface::FeeEstimator,
231+
L::Target: logger::Logger,
232+
{
233+
fn block_connected(&self, block: &Block, height: u32) {
234+
let txdata: Vec<_> = block.txdata.iter().enumerate().collect();
235+
self.0.borrow_mut().block_connected(&block.header, &txdata, height, &*self.1, &*self.2, &*self.3);
236+
}
237+
238+
fn block_disconnected(&self, header: &BlockHeader, height: u32) {
239+
self.0.borrow_mut().block_disconnected(header, height, &*self.1, &*self.2, &*self.3);
240+
}
241+
}
242+
243+
impl<S, M: Deref, B: Deref, K: Deref, F: Deref, L: Deref> ChainListener for &ChannelManager<S, M, B, K, F, L>
244+
where
245+
S: keysinterface::ChannelKeys,
246+
M::Target: chain::Watch<Keys = S>,
247+
B::Target: chaininterface::BroadcasterInterface,
248+
K::Target: keysinterface::KeysInterface<ChanKeySigner = S>,
249+
F::Target: chaininterface::FeeEstimator,
250+
L::Target: logger::Logger,
251+
{
252+
fn block_connected(&self, block: &Block, height: u32) {
253+
let txdata: Vec<_> = block.txdata.iter().enumerate().collect();
254+
ChannelManager::block_connected(self, &block.header, &txdata, height);
255+
}
256+
257+
fn block_disconnected(&self, header: &BlockHeader, _height: u32) {
258+
ChannelManager::block_disconnected(self, header);
259+
}
260+
}
261+
262+
impl<S, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref> ChainListener for &ChainMonitor<S, C, T, F, L, P>
263+
where
264+
S: keysinterface::ChannelKeys,
265+
C::Target: chain::Filter,
266+
T::Target: chaininterface::BroadcasterInterface,
267+
F::Target: chaininterface::FeeEstimator,
268+
L::Target: logger::Logger,
269+
P::Target: channelmonitor::Persist<S>,
270+
{
271+
fn block_connected(&self, block: &Block, height: u32) {
272+
let txdata: Vec<_> = block.txdata.iter().enumerate().collect();
273+
ChainMonitor::block_connected(self, &block.header, &txdata, height);
274+
}
275+
276+
fn block_disconnected(&self, header: &BlockHeader, height: u32) {
277+
ChainMonitor::block_disconnected(self, header, height);
278+
}
279+
}
280+
281+
impl<T: ChainListener, U: ChainListener> ChainListener for (&T, &U) {
282+
fn block_connected(&self, block: &Block, height: u32) {
283+
self.0.block_connected(block, height);
284+
self.1.block_connected(block, height);
285+
}
286+
287+
fn block_disconnected(&self, header: &BlockHeader, height: u32) {
288+
self.0.block_disconnected(header, height);
289+
self.1.block_disconnected(header, height);
290+
}
291+
}
292+
133293
#[cfg(test)]
134294
mod tests {
135295
use crate::test_utils::{Blockchain, MockChainListener};

lightning-block-sync/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -167,10 +167,10 @@ pub struct SpvClient<'a, P: Poll, C: Cache, L: ChainListener> {
167167
/// Used when needing to replay chain data upon startup or as new chain events occur.
168168
pub trait ChainListener {
169169
/// Notifies the listener that a block was added at the given height.
170-
fn block_connected(&mut self, block: &Block, height: u32);
170+
fn block_connected(&self, block: &Block, height: u32);
171171

172172
/// Notifies the listener that a block was removed at the given height.
173-
fn block_disconnected(&mut self, header: &BlockHeader, height: u32);
173+
fn block_disconnected(&self, header: &BlockHeader, height: u32);
174174
}
175175

176176
/// The `Cache` trait defines behavior for managing a block header cache, where block headers are

lightning-block-sync/src/test_utils.rs

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use bitcoin::hash_types::BlockHash;
77
use bitcoin::network::constants::Network;
88
use bitcoin::util::uint::Uint256;
99

10+
use std::cell::RefCell;
1011
use std::collections::VecDeque;
1112

1213
#[derive(Default)]
@@ -163,37 +164,37 @@ impl BlockSource for Blockchain {
163164
pub struct NullChainListener;
164165

165166
impl ChainListener for NullChainListener {
166-
fn block_connected(&mut self, _block: &Block, _height: u32) {}
167-
fn block_disconnected(&mut self, _header: &BlockHeader, _height: u32) {}
167+
fn block_connected(&self, _block: &Block, _height: u32) {}
168+
fn block_disconnected(&self, _header: &BlockHeader, _height: u32) {}
168169
}
169170

170171
pub struct MockChainListener {
171-
expected_blocks_connected: VecDeque<BlockHeaderData>,
172-
expected_blocks_disconnected: VecDeque<BlockHeaderData>,
172+
expected_blocks_connected: RefCell<VecDeque<BlockHeaderData>>,
173+
expected_blocks_disconnected: RefCell<VecDeque<BlockHeaderData>>,
173174
}
174175

175176
impl MockChainListener {
176177
pub fn new() -> Self {
177178
Self {
178-
expected_blocks_connected: VecDeque::new(),
179-
expected_blocks_disconnected: VecDeque::new(),
179+
expected_blocks_connected: RefCell::new(VecDeque::new()),
180+
expected_blocks_disconnected: RefCell::new(VecDeque::new()),
180181
}
181182
}
182183

183-
pub fn expect_block_connected(mut self, block: BlockHeaderData) -> Self {
184-
self.expected_blocks_connected.push_back(block);
184+
pub fn expect_block_connected(self, block: BlockHeaderData) -> Self {
185+
self.expected_blocks_connected.borrow_mut().push_back(block);
185186
self
186187
}
187188

188-
pub fn expect_block_disconnected(mut self, block: BlockHeaderData) -> Self {
189-
self.expected_blocks_disconnected.push_back(block);
189+
pub fn expect_block_disconnected(self, block: BlockHeaderData) -> Self {
190+
self.expected_blocks_disconnected.borrow_mut().push_back(block);
190191
self
191192
}
192193
}
193194

194195
impl ChainListener for MockChainListener {
195-
fn block_connected(&mut self, block: &Block, height: u32) {
196-
match self.expected_blocks_connected.pop_front() {
196+
fn block_connected(&self, block: &Block, height: u32) {
197+
match self.expected_blocks_connected.borrow_mut().pop_front() {
197198
None => {
198199
panic!("Unexpected block connected: {:?}", block.block_hash());
199200
},
@@ -204,8 +205,8 @@ impl ChainListener for MockChainListener {
204205
}
205206
}
206207

207-
fn block_disconnected(&mut self, header: &BlockHeader, height: u32) {
208-
match self.expected_blocks_disconnected.pop_front() {
208+
fn block_disconnected(&self, header: &BlockHeader, height: u32) {
209+
match self.expected_blocks_disconnected.borrow_mut().pop_front() {
209210
None => {
210211
panic!("Unexpected block disconnected: {:?}", header.block_hash());
211212
},
@@ -222,11 +223,11 @@ impl Drop for MockChainListener {
222223
if std::thread::panicking() {
223224
return;
224225
}
225-
if !self.expected_blocks_connected.is_empty() {
226-
panic!("Expected blocks connected: {:?}", self.expected_blocks_connected);
226+
if !self.expected_blocks_connected.borrow().is_empty() {
227+
panic!("Expected blocks connected: {:?}", self.expected_blocks_connected.borrow());
227228
}
228-
if !self.expected_blocks_disconnected.is_empty() {
229-
panic!("Expected blocks disconnected: {:?}", self.expected_blocks_disconnected);
229+
if !self.expected_blocks_disconnected.borrow().is_empty() {
230+
panic!("Expected blocks disconnected: {:?}", self.expected_blocks_disconnected.borrow());
230231
}
231232
}
232233
}

0 commit comments

Comments
 (0)