Skip to content

Commit 38cce22

Browse files
committed
Utility for syncing a set of chain listeners
Add a utility for syncing a set of chain listeners to a common chain tip. Required to use before creating an SpvClient when the chain listener used with the client is actually a set of listeners each of which may have had left off at a different block. This would occur when the listeners had been persisted individually at different frequencies (e.g., a ChainMonitor's individual ChannelMonitors).
1 parent 45b1de4 commit 38cce22

File tree

2 files changed

+363
-55
lines changed

2 files changed

+363
-55
lines changed

lightning-block-sync/src/init.rs

Lines changed: 280 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,280 @@
1+
use crate::{BlockSource, BlockSourceResult, Cache, ChainListener, ChainNotifier};
2+
use crate::poll::{ChainPoller, Validate, ValidatedBlockHeader};
3+
4+
use bitcoin::blockdata::block::{Block, BlockHeader};
5+
use bitcoin::hash_types::BlockHash;
6+
use bitcoin::network::constants::Network;
7+
8+
/// Performs a one-time sync of chain listeners using a single *trusted* block source, bringing each
9+
/// listener's view of the chain from its paired block hash to `block_source`'s best chain tip.
10+
///
11+
/// Upon success, the returned header can be used to initialize [`SpvClient`]. In the case of
12+
/// failure, each listener may be left at a different block hash than the one it was originally
13+
/// paired with.
14+
///
15+
/// Useful during startup to bring the [`ChannelManager`] and each [`ChannelMonitor`] in sync before
16+
/// switching to [`SpvClient`].
17+
///
18+
/// [`SpvClient`]: ../struct.SpvClient.html
19+
/// [`ChannelManager`]: ../../lightning/ln/channelmanager/struct.ChannelManager.html
20+
/// [`ChannelMonitor`]: ../../lightning/chain/channelmonitor/struct.ChannelMonitor.html
21+
pub async fn sync_listeners<B: BlockSource, C: Cache>(
22+
block_source: &mut B,
23+
network: Network,
24+
header_cache: &mut C,
25+
mut chain_listeners: Vec<(BlockHash, &mut dyn ChainListener)>,
26+
) -> BlockSourceResult<ValidatedBlockHeader> {
27+
let (best_block_hash, best_block_height) = block_source.get_best_block().await?;
28+
let new_header = block_source
29+
.get_header(&best_block_hash, best_block_height).await?
30+
.validate(best_block_hash)?;
31+
32+
// Fetch the header for the block hash paired with each listener.
33+
let mut chain_listeners_with_old_headers = Vec::new();
34+
for (old_block, chain_listener) in chain_listeners.drain(..) {
35+
let old_header = match header_cache.look_up(&old_block) {
36+
Some(header) => *header,
37+
None => block_source
38+
.get_header(&old_block, None).await?
39+
.validate(old_block)?
40+
};
41+
chain_listeners_with_old_headers.push((old_header, chain_listener))
42+
}
43+
44+
// Find differences and disconnect blocks for each listener individually.
45+
let mut chain_poller = ChainPoller::new(block_source, network);
46+
let mut chain_listeners_at_height = Vec::new();
47+
let mut most_common_ancestor = None;
48+
let mut most_connected_blocks = Vec::new();
49+
for (old_header, chain_listener) in chain_listeners_with_old_headers.drain(..) {
50+
// Disconnect any stale blocks, but keep them in the cache for the next iteration.
51+
let header_cache = &mut NonDiscardingCache(header_cache);
52+
let mut chain_notifier = ChainNotifier { header_cache };
53+
let difference =
54+
chain_notifier.find_difference(new_header, &old_header, &mut chain_poller).await?;
55+
chain_notifier.disconnect_blocks(
56+
difference.disconnected_blocks,
57+
&mut DynamicChainListener(chain_listener),
58+
);
59+
60+
// Keep track of the most common ancestor and all blocks connected across all listeners.
61+
chain_listeners_at_height.push((difference.common_ancestor.height, chain_listener));
62+
if difference.connected_blocks.len() > most_connected_blocks.len() {
63+
most_common_ancestor = Some(difference.common_ancestor);
64+
most_connected_blocks = difference.connected_blocks;
65+
}
66+
}
67+
68+
// Connect new blocks for all listeners at once to avoid re-fetching blocks.
69+
if let Some(common_ancestor) = most_common_ancestor {
70+
let mut chain_notifier = ChainNotifier { header_cache };
71+
let mut chain_listener = ChainListenerSet(chain_listeners_at_height);
72+
chain_notifier.connect_blocks(
73+
common_ancestor,
74+
most_connected_blocks,
75+
&mut chain_poller,
76+
&mut chain_listener,
77+
).await.or_else(|(e, _)| Err(e))?;
78+
}
79+
80+
Ok(new_header)
81+
}
82+
83+
/// A cache that won't discard any block headers. Used to prevent losing headers that are needed to
84+
/// disconnect blocks common to more than one listener.
85+
struct NonDiscardingCache<'a, C: Cache>(&'a mut C);
86+
87+
impl<'a, C: Cache> Cache for NonDiscardingCache<'a, C> {
88+
fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader> {
89+
self.0.look_up(block_hash)
90+
}
91+
92+
fn block_connected(&mut self, _block_hash: BlockHash, _block_header: ValidatedBlockHeader) {
93+
unreachable!()
94+
}
95+
96+
fn block_disconnected(&mut self, _block_hash: &BlockHash) -> Option<ValidatedBlockHeader> {
97+
None
98+
}
99+
}
100+
101+
/// Wrapper for supporting dynamically sized chain listeners.
102+
struct DynamicChainListener<'a>(&'a mut dyn ChainListener);
103+
104+
impl<'a> ChainListener for DynamicChainListener<'a> {
105+
fn block_connected(&mut self, _block: &Block, _height: u32) {
106+
unreachable!()
107+
}
108+
109+
fn block_disconnected(&mut self, header: &BlockHeader, height: u32) {
110+
self.0.block_disconnected(header, height)
111+
}
112+
}
113+
114+
/// A set of dynamically sized chain listeners, each paired with a starting block height.
115+
struct ChainListenerSet<'a>(Vec<(u32, &'a mut dyn ChainListener)>);
116+
117+
impl<'a> ChainListener for ChainListenerSet<'a> {
118+
fn block_connected(&mut self, block: &Block, height: u32) {
119+
for (starting_height, chain_listener) in self.0.iter_mut() {
120+
if height > *starting_height {
121+
chain_listener.block_connected(block, height);
122+
}
123+
}
124+
}
125+
126+
fn block_disconnected(&mut self, _header: &BlockHeader, _height: u32) {
127+
unreachable!()
128+
}
129+
}
130+
131+
#[cfg(test)]
132+
mod tests {
133+
use crate::test_utils::{Blockchain, MockChainListener};
134+
use super::*;
135+
136+
use bitcoin::network::constants::Network;
137+
138+
#[tokio::test]
139+
async fn sync_from_same_chain() {
140+
let mut chain = Blockchain::default().with_height(4);
141+
let new_tip = chain.tip();
142+
let old_tip_1 = chain.at_height(1);
143+
let old_tip_2 = chain.at_height(2);
144+
let old_tip_3 = chain.at_height(3);
145+
146+
let mut listener_1 = MockChainListener::new()
147+
.expect_block_connected(*old_tip_2)
148+
.expect_block_connected(*old_tip_3)
149+
.expect_block_connected(*new_tip);
150+
let mut listener_2 = MockChainListener::new()
151+
.expect_block_connected(*old_tip_3)
152+
.expect_block_connected(*new_tip);
153+
let mut listener_3 = MockChainListener::new()
154+
.expect_block_connected(*new_tip);
155+
156+
let listeners = vec![
157+
(old_tip_1.block_hash, &mut listener_1 as &mut dyn ChainListener),
158+
(old_tip_2.block_hash, &mut listener_2 as &mut dyn ChainListener),
159+
(old_tip_3.block_hash, &mut listener_3 as &mut dyn ChainListener),
160+
];
161+
let mut cache = chain.header_cache(0..=4);
162+
match sync_listeners(&mut chain, Network::Bitcoin, &mut cache, listeners).await {
163+
Ok(header) => assert_eq!(header, new_tip),
164+
Err(e) => panic!("Unexpected error: {:?}", e),
165+
}
166+
}
167+
168+
#[tokio::test]
169+
async fn sync_from_different_chains() {
170+
let mut main_chain = Blockchain::default().with_height(4);
171+
let fork_chain_1 = main_chain.fork_at_height(1);
172+
let fork_chain_2 = main_chain.fork_at_height(2);
173+
let fork_chain_3 = main_chain.fork_at_height(3);
174+
175+
let new_tip = main_chain.tip();
176+
let old_tip_1 = fork_chain_1.tip();
177+
let old_tip_2 = fork_chain_2.tip();
178+
let old_tip_3 = fork_chain_3.tip();
179+
180+
let mut listener_1 = MockChainListener::new()
181+
.expect_block_disconnected(*fork_chain_1.at_height(4))
182+
.expect_block_disconnected(*fork_chain_1.at_height(3))
183+
.expect_block_disconnected(*fork_chain_1.at_height(2))
184+
.expect_block_connected(*main_chain.at_height(2))
185+
.expect_block_connected(*main_chain.at_height(3))
186+
.expect_block_connected(*main_chain.at_height(4));
187+
let mut listener_2 = MockChainListener::new()
188+
.expect_block_disconnected(*fork_chain_2.at_height(4))
189+
.expect_block_disconnected(*fork_chain_2.at_height(3))
190+
.expect_block_connected(*main_chain.at_height(3))
191+
.expect_block_connected(*main_chain.at_height(4));
192+
let mut listener_3 = MockChainListener::new()
193+
.expect_block_disconnected(*fork_chain_3.at_height(4))
194+
.expect_block_connected(*main_chain.at_height(4));
195+
196+
let listeners = vec![
197+
(old_tip_1.block_hash, &mut listener_1 as &mut dyn ChainListener),
198+
(old_tip_2.block_hash, &mut listener_2 as &mut dyn ChainListener),
199+
(old_tip_3.block_hash, &mut listener_3 as &mut dyn ChainListener),
200+
];
201+
let mut cache = fork_chain_1.header_cache(2..=4);
202+
cache.extend(fork_chain_2.header_cache(3..=4));
203+
cache.extend(fork_chain_3.header_cache(4..=4));
204+
match sync_listeners(&mut main_chain, Network::Bitcoin, &mut cache, listeners).await {
205+
Ok(header) => assert_eq!(header, new_tip),
206+
Err(e) => panic!("Unexpected error: {:?}", e),
207+
}
208+
}
209+
210+
#[tokio::test]
211+
async fn sync_from_overlapping_chains() {
212+
let mut main_chain = Blockchain::default().with_height(4);
213+
let fork_chain_1 = main_chain.fork_at_height(1);
214+
let fork_chain_2 = fork_chain_1.fork_at_height(2);
215+
let fork_chain_3 = fork_chain_2.fork_at_height(3);
216+
217+
let new_tip = main_chain.tip();
218+
let old_tip_1 = fork_chain_1.tip();
219+
let old_tip_2 = fork_chain_2.tip();
220+
let old_tip_3 = fork_chain_3.tip();
221+
222+
let mut listener_1 = MockChainListener::new()
223+
.expect_block_disconnected(*fork_chain_1.at_height(4))
224+
.expect_block_disconnected(*fork_chain_1.at_height(3))
225+
.expect_block_disconnected(*fork_chain_1.at_height(2))
226+
.expect_block_connected(*main_chain.at_height(2))
227+
.expect_block_connected(*main_chain.at_height(3))
228+
.expect_block_connected(*main_chain.at_height(4));
229+
let mut listener_2 = MockChainListener::new()
230+
.expect_block_disconnected(*fork_chain_2.at_height(4))
231+
.expect_block_disconnected(*fork_chain_2.at_height(3))
232+
.expect_block_disconnected(*fork_chain_2.at_height(2))
233+
.expect_block_connected(*main_chain.at_height(2))
234+
.expect_block_connected(*main_chain.at_height(3))
235+
.expect_block_connected(*main_chain.at_height(4));
236+
let mut listener_3 = MockChainListener::new()
237+
.expect_block_disconnected(*fork_chain_3.at_height(4))
238+
.expect_block_disconnected(*fork_chain_3.at_height(3))
239+
.expect_block_disconnected(*fork_chain_3.at_height(2))
240+
.expect_block_connected(*main_chain.at_height(2))
241+
.expect_block_connected(*main_chain.at_height(3))
242+
.expect_block_connected(*main_chain.at_height(4));
243+
244+
let listeners = vec![
245+
(old_tip_1.block_hash, &mut listener_1 as &mut dyn ChainListener),
246+
(old_tip_2.block_hash, &mut listener_2 as &mut dyn ChainListener),
247+
(old_tip_3.block_hash, &mut listener_3 as &mut dyn ChainListener),
248+
];
249+
let mut cache = fork_chain_1.header_cache(2..=4);
250+
cache.extend(fork_chain_2.header_cache(3..=4));
251+
cache.extend(fork_chain_3.header_cache(4..=4));
252+
match sync_listeners(&mut main_chain, Network::Bitcoin, &mut cache, listeners).await {
253+
Ok(header) => assert_eq!(header, new_tip),
254+
Err(e) => panic!("Unexpected error: {:?}", e),
255+
}
256+
}
257+
258+
#[tokio::test]
259+
async fn cache_connected_and_keep_disconnected_blocks() {
260+
let mut main_chain = Blockchain::default().with_height(2);
261+
let fork_chain = main_chain.fork_at_height(1);
262+
263+
let new_tip = main_chain.tip();
264+
let old_tip = fork_chain.tip();
265+
266+
let mut listener = MockChainListener::new()
267+
.expect_block_disconnected(*fork_chain.at_height(2))
268+
.expect_block_connected(*main_chain.at_height(2));
269+
270+
let listeners = vec![(old_tip.block_hash, &mut listener as &mut dyn ChainListener)];
271+
let mut cache = fork_chain.header_cache(2..=2);
272+
match sync_listeners(&mut main_chain, Network::Bitcoin, &mut cache, listeners).await {
273+
Ok(_) => {
274+
assert!(cache.contains_key(&new_tip.block_hash));
275+
assert!(cache.contains_key(&old_tip.block_hash));
276+
},
277+
Err(e) => panic!("Unexpected error: {:?}", e),
278+
}
279+
}
280+
}

0 commit comments

Comments
 (0)