Skip to content

Commit 6ab4b24

Browse files
committed
Proof of concept for BlockSource multiplexing
1 parent d33c7c3 commit 6ab4b24

File tree

2 files changed

+150
-17
lines changed

2 files changed

+150
-17
lines changed

lightning-block-sync/src/lib.rs

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -357,10 +357,10 @@ where P: Poll<'a, B>,
357357
CL: ChainListener + Sized {
358358
chain_tip: (BlockHash, BlockHeaderData),
359359
chain_poller: P,
360-
backup_block_sources: Vec<B>,
361360
header_cache: HeaderCache,
362361
chain_notifier: CL,
363-
mainnet: bool
362+
mainnet: bool,
363+
marker: std::marker::PhantomData<B>,
364364
}
365365

366366
impl<'a, P, B, CL> MicroSPVClient<'a, P, B, CL>
@@ -380,11 +380,12 @@ where P: Poll<'a, B>,
380380
/// useful when you have a block source which is more censorship-resistant than others but
381381
/// which only provides headers. In this case, we can use such source(s) to learn of a censorship
382382
/// attack without giving up privacy by querying a privacy-losing block sources.
383-
pub fn init(chain_tip: BlockHeaderData, chain_poller: P, backup_block_sources: Vec<B>, chain_notifier: CL, mainnet: bool) -> Self {
383+
pub fn init(chain_tip: BlockHeaderData, chain_poller: P, chain_notifier: CL, mainnet: bool) -> Self {
384384
let header_cache = HeaderCache::new();
385385
Self {
386386
chain_tip: (chain_tip.header.block_hash(), chain_tip),
387-
chain_poller, backup_block_sources, header_cache, chain_notifier, mainnet
387+
chain_poller, header_cache, chain_notifier, mainnet,
388+
marker: std::marker::PhantomData
388389
}
389390
}
390391

@@ -420,15 +421,7 @@ where P: Poll<'a, B>,
420421
Ok((ChainTip::Better(new_hash, new_header), block_source)) => {
421422
debug_assert_ne!(new_hash, self.chain_tip.0);
422423
debug_assert!(new_header.chainwork > self.chain_tip.1.chainwork);
423-
let mut blocks_connected = false;
424-
let backup_block_sources = self.backup_block_sources.iter_mut().map(|s| &mut **s);
425-
for source in std::iter::once(block_source).chain(backup_block_sources) {
426-
blocks_connected |= sync_chain_monitor!(new_hash, new_header, source);
427-
if self.chain_tip.0 == new_hash {
428-
break;
429-
}
430-
}
431-
blocks_connected
424+
sync_chain_monitor!(new_hash, new_header, block_source)
432425
},
433426
Ok((ChainTip::Worse(hash, header), _)) => {
434427
debug_assert_ne!(hash, self.chain_tip.0);
@@ -786,8 +779,9 @@ mod tests {
786779
let mut source_three = &header_chain;
787780
let mut source_four = &backup_chain;
788781
let mut client = MicroSPVClient::init((&chain_one).get_header(&block_1a_hash, Some(1)).await.unwrap(),
789-
poller::MultipleChainPoller::new(vec![&mut source_one as &mut dyn BlockSource, &mut source_two as &mut dyn BlockSource, &mut source_three as &mut dyn BlockSource]),
790-
vec![&mut source_four as &mut dyn BlockSource],
782+
poller::ChainMultiplexer::new(
783+
vec![&mut source_one as &mut dyn BlockSource, &mut source_two as &mut dyn BlockSource, &mut source_three as &mut dyn BlockSource],
784+
vec![&mut source_four as &mut dyn BlockSource]),
791785
Arc::clone(&chain_notifier), true);
792786

793787
// Test that we will reorg onto 2b because chain_one knows about 1b + 2b

lightning-block-sync/src/poller.rs

Lines changed: 141 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
use crate::{AsyncBlockSourceResult, BlockHeaderData, BlockSource, BlockSourceError, ChainTip, Poll};
22

3+
use bitcoin::blockdata::block::Block;
4+
use bitcoin::hash_types::BlockHash;
5+
36
use std::ops::DerefMut;
47

58
pub struct ChainPoller<'a, B: DerefMut<Target=dyn BlockSource + 'a> + Sized + Sync + Send> {
@@ -13,7 +16,6 @@ impl<'a, B: DerefMut<Target=dyn BlockSource + 'a> + Sized + Sync + Send> ChainPo
1316
}
1417

1518
impl<'b, B: DerefMut<Target=dyn BlockSource + 'b> + Sized + Sync + Send> Poll<'b, B> for ChainPoller<'b, B> {
16-
1719
fn poll_chain_tip<'a>(&'a mut self, best_chain_tip: BlockHeaderData) ->
1820
AsyncBlockSourceResult<'a, (ChainTip, &'a mut B::Target)>
1921
where 'b: 'a {
@@ -59,7 +61,6 @@ impl<'a, B: DerefMut<Target=dyn BlockSource + 'a> + Sized + Sync + Send> Multipl
5961
}
6062

6163
impl<'b, B: DerefMut<Target=dyn BlockSource + 'b> + Sized + Sync + Send> Poll<'b, B> for MultipleChainPoller<'b, B> {
62-
6364
fn poll_chain_tip<'a>(&'a mut self, best_chain_tip: BlockHeaderData) ->
6465
AsyncBlockSourceResult<'a, (ChainTip, &'a mut B::Target)>
6566
where 'b: 'a {
@@ -102,6 +103,144 @@ impl<'b, B: DerefMut<Target=dyn BlockSource + 'b> + Sized + Sync + Send> Poll<'b
102103
}
103104
}
104105

106+
pub struct ChainMultiplexer<'b, B: DerefMut<Target=dyn BlockSource + 'b> + Sized + Sync + Send> {
107+
block_sources: Vec<(B, BlockSourceError)>,
108+
backup_block_sources: Vec<(B, BlockSourceError)>,
109+
best_block_source: usize,
110+
}
111+
112+
impl<'b, B: DerefMut<Target=dyn BlockSource + 'b> + Sized + Sync + Send> ChainMultiplexer<'b, B> {
113+
pub fn new(mut block_sources: Vec<B>, mut backup_block_sources: Vec<B>) -> Self {
114+
assert!(!block_sources.is_empty());
115+
let block_sources = block_sources.drain(..).map(|block_source| {
116+
(block_source, BlockSourceError::Transient)
117+
}).collect();
118+
119+
let backup_block_sources = backup_block_sources.drain(..).map(|block_source| {
120+
(block_source, BlockSourceError::Transient)
121+
}).collect();
122+
123+
Self { block_sources, backup_block_sources, best_block_source: 0 }
124+
}
125+
126+
fn best_and_backup_block_sources(&mut self) -> Vec<&mut (B, BlockSourceError)> {
127+
let best_block_source = self.block_sources.get_mut(self.best_block_source).unwrap();
128+
let backup_block_sources = self.backup_block_sources.iter_mut();
129+
std::iter::once(best_block_source)
130+
.chain(backup_block_sources)
131+
.filter(|(_, e)| e == &BlockSourceError::Transient)
132+
.collect()
133+
}
134+
}
135+
136+
impl<'b, B: 'b + DerefMut<Target=dyn BlockSource + 'b> + Sized + Sync + Send> Poll<'b, B> for ChainMultiplexer<'b, B> {
137+
fn poll_chain_tip<'a>(&'a mut self, best_chain_tip: BlockHeaderData) ->
138+
AsyncBlockSourceResult<'a, (ChainTip, &'a mut B::Target)>
139+
where 'b: 'a {
140+
Box::pin(async move {
141+
let mut heaviest_chain_tip = best_chain_tip;
142+
let mut best_result = Err(BlockSourceError::Persistent);
143+
for (i, (block_source, error)) in self.block_sources.iter_mut().enumerate() {
144+
if let BlockSourceError::Persistent = error {
145+
continue;
146+
}
147+
148+
let result = match block_source.get_best_block().await {
149+
Err(e) => Err(e),
150+
Ok((block_hash, height)) => {
151+
if block_hash == heaviest_chain_tip.header.block_hash() {
152+
Ok(ChainTip::Common)
153+
} else {
154+
match block_source.get_header(&block_hash, height).await {
155+
Err(e) => Err(e),
156+
Ok(chain_tip) => {
157+
crate::stateless_check_header(&chain_tip.header)?;
158+
if chain_tip.header.block_hash() != block_hash {
159+
Err(BlockSourceError::Persistent)
160+
} else if chain_tip.chainwork <= heaviest_chain_tip.chainwork {
161+
Ok(ChainTip::Worse(block_hash, chain_tip))
162+
} else {
163+
Ok(ChainTip::Better(block_hash, chain_tip))
164+
}
165+
},
166+
}
167+
}
168+
},
169+
};
170+
171+
match result {
172+
Err(BlockSourceError::Persistent) => {
173+
*error = BlockSourceError::Persistent;
174+
},
175+
Err(BlockSourceError::Transient) => {
176+
if best_result.is_err() {
177+
best_result = result;
178+
}
179+
},
180+
Ok(ChainTip::Common) => {
181+
if let Ok(ChainTip::Better(_, _)) = best_result {} else {
182+
best_result = result;
183+
}
184+
},
185+
Ok(ChainTip::Better(_, header)) => {
186+
self.best_block_source = i;
187+
best_result = result;
188+
heaviest_chain_tip = header;
189+
},
190+
Ok(ChainTip::Worse(_, _)) => {
191+
if best_result.is_err() {
192+
best_result = result;
193+
}
194+
},
195+
}
196+
}
197+
198+
best_result.map(move |chain_tip| (chain_tip, self as &mut dyn BlockSource))
199+
})
200+
}
201+
}
202+
203+
impl<'b, B: DerefMut<Target=dyn BlockSource + 'b> + Sized + Sync + Send> BlockSource for ChainMultiplexer<'b, B> {
204+
fn get_header<'a>(&'a mut self, header_hash: &'a BlockHash, height: Option<u32>) -> AsyncBlockSourceResult<'a, BlockHeaderData> {
205+
Box::pin(async move {
206+
for (block_source, error) in self.best_and_backup_block_sources() {
207+
let result = block_source.get_header(header_hash, height).await;
208+
match result {
209+
Err(e) => *error = e,
210+
Ok(_) => return result,
211+
}
212+
}
213+
Err(BlockSourceError::Persistent)
214+
})
215+
}
216+
217+
fn get_block<'a>(&'a mut self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block> {
218+
Box::pin(async move {
219+
for (block_source, error) in self.best_and_backup_block_sources() {
220+
let result = block_source.get_block(header_hash).await;
221+
match result {
222+
Err(e) => *error = e,
223+
Ok(_) => return result,
224+
}
225+
}
226+
Err(BlockSourceError::Persistent)
227+
})
228+
}
229+
230+
fn get_best_block<'a>(&'a mut self) -> AsyncBlockSourceResult<'a, (BlockHash, Option<u32>)> {
231+
Box::pin(async move {
232+
for (block_source, error) in self.best_and_backup_block_sources() {
233+
let result = block_source.get_best_block().await;
234+
match result {
235+
Err(e) => *error = e,
236+
Ok(_) => return result,
237+
}
238+
}
239+
Err(BlockSourceError::Persistent)
240+
})
241+
}
242+
}
243+
105244
#[cfg(test)]
106245
mod tests {
107246
use crate::*;

0 commit comments

Comments
 (0)