Skip to content

Add rustfmt support #2648

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,051 changes: 775 additions & 276 deletions lightning-background-processor/src/lib.rs

Large diffs are not rendered by default.

312 changes: 194 additions & 118 deletions lightning-block-sync/src/convert.rs

Large diffs are not rendered by default.

100 changes: 56 additions & 44 deletions lightning-block-sync/src/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,21 @@ use crate::{AsyncBlockSourceResult, BlockData, BlockSource, BlockSourceError};

use bitcoin::blockdata::block::Block;
use bitcoin::blockdata::constants::ChainHash;
use bitcoin::blockdata::transaction::{TxOut, OutPoint};
use bitcoin::blockdata::transaction::{OutPoint, TxOut};
use bitcoin::hash_types::BlockHash;

use lightning::ln::peer_handler::APeerManager;

use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
use lightning::routing::utxo::{UtxoFuture, UtxoLookup, UtxoResult, UtxoLookupError};
use lightning::routing::utxo::{UtxoFuture, UtxoLookup, UtxoLookupError, UtxoResult};

use lightning::util::logger::Logger;

use std::sync::{Arc, Mutex};
use std::collections::VecDeque;
use std::future::Future;
use std::ops::Deref;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::Poll;

/// A trait which extends [`BlockSource`] and can be queried to fetch the block at a given height
Expand All @@ -29,12 +29,14 @@ use std::task::Poll;
/// Note that while this is implementable for a [`BlockSource`] which returns filtered block data
/// (i.e. [`BlockData::HeaderOnly`] for [`BlockSource::get_block`] requests), such an
/// implementation will reject all gossip as it is not fully able to verify the UTXOs referenced.
pub trait UtxoSource : BlockSource + 'static {
pub trait UtxoSource: BlockSource + 'static {
/// Fetches the block hash of the block at the given height.
///
/// This will, in turn, be passed to to [`BlockSource::get_block`] to fetch the block needed
/// for gossip validation.
fn get_block_hash_by_height<'a>(&'a self, block_height: u32) -> AsyncBlockSourceResult<'a, BlockHash>;
fn get_block_hash_by_height<'a>(
&'a self, block_height: u32,
) -> AsyncBlockSourceResult<'a, BlockHash>;

/// Returns true if the given output has *not* been spent, i.e. is a member of the current UTXO
/// set.
Expand All @@ -45,7 +47,7 @@ pub trait UtxoSource : BlockSource + 'static {
///
/// If the `tokio` feature is enabled, this is implemented on `TokioSpawner` struct which
/// delegates to `tokio::spawn()`.
pub trait FutureSpawner : Send + Sync + 'static {
pub trait FutureSpawner: Send + Sync + 'static {
/// Spawns the given future as a background task.
///
/// This method MUST NOT block on the given future immediately.
Expand All @@ -65,8 +67,8 @@ impl FutureSpawner for TokioSpawner {
/// A trivial future which joins two other futures and polls them at the same time, returning only
/// once both complete.
pub(crate) struct Joiner<
A: Future<Output=Result<(BlockHash, Option<u32>), BlockSourceError>> + Unpin,
B: Future<Output=Result<BlockHash, BlockSourceError>> + Unpin,
A: Future<Output = Result<(BlockHash, Option<u32>), BlockSourceError>> + Unpin,
B: Future<Output = Result<BlockHash, BlockSourceError>> + Unpin,
> {
pub a: A,
pub b: B,
Expand All @@ -75,16 +77,20 @@ pub(crate) struct Joiner<
}

impl<
A: Future<Output=Result<(BlockHash, Option<u32>), BlockSourceError>> + Unpin,
B: Future<Output=Result<BlockHash, BlockSourceError>> + Unpin,
> Joiner<A, B> {
fn new(a: A, b: B) -> Self { Self { a, b, a_res: None, b_res: None } }
A: Future<Output = Result<(BlockHash, Option<u32>), BlockSourceError>> + Unpin,
B: Future<Output = Result<BlockHash, BlockSourceError>> + Unpin,
> Joiner<A, B>
{
fn new(a: A, b: B) -> Self {
Self { a, b, a_res: None, b_res: None }
}
}

impl<
A: Future<Output=Result<(BlockHash, Option<u32>), BlockSourceError>> + Unpin,
B: Future<Output=Result<BlockHash, BlockSourceError>> + Unpin,
> Future for Joiner<A, B> {
A: Future<Output = Result<(BlockHash, Option<u32>), BlockSourceError>> + Unpin,
B: Future<Output = Result<BlockHash, BlockSourceError>> + Unpin,
> Future for Joiner<A, B>
{
type Output = Result<((BlockHash, Option<u32>), BlockHash), BlockSourceError>;
fn poll(mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>) -> Poll<Self::Output> {
if self.a_res.is_none() {
Expand All @@ -107,14 +113,13 @@ impl<
} else {
return Poll::Ready(Err(res.unwrap_err()));
}

},
Poll::Pending => {},
}
}
if let Some(b_res) = self.b_res {
if let Some(a_res) = self.a_res {
return Poll::Ready(Ok((a_res, b_res)))
return Poll::Ready(Ok((a_res, b_res)));
}
}
Poll::Pending
Expand All @@ -129,7 +134,8 @@ impl<
/// value of 1024 should more than suffice), and ensure you have sufficient file descriptors
/// available on both Bitcoin Core and your LDK application for each request to hold its own
/// connection.
pub struct GossipVerifier<S: FutureSpawner,
pub struct GossipVerifier<
S: FutureSpawner,
Blocks: Deref + Send + Sync + 'static + Clone,
L: Deref + Send + Sync + 'static,
> where
Expand All @@ -145,10 +151,9 @@ pub struct GossipVerifier<S: FutureSpawner,

const BLOCK_CACHE_SIZE: usize = 5;

impl<S: FutureSpawner,
Blocks: Deref + Send + Sync + Clone,
L: Deref + Send + Sync,
> GossipVerifier<S, Blocks, L> where
impl<S: FutureSpawner, Blocks: Deref + Send + Sync + Clone, L: Deref + Send + Sync>
GossipVerifier<S, Blocks, L>
where
Blocks::Target: UtxoSource,
L::Target: Logger,
{
Expand All @@ -157,27 +162,35 @@ impl<S: FutureSpawner,
/// This is expected to be given to a [`P2PGossipSync`] (initially constructed with `None` for
/// the UTXO lookup) via [`P2PGossipSync::add_utxo_lookup`].
pub fn new<APM: Deref + Send + Sync + Clone + 'static>(
source: Blocks, spawn: S, gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<L>>, Self, L>>, peer_manager: APM
) -> Self where APM::Target: APeerManager {
source: Blocks, spawn: S, gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<L>>, Self, L>>,
peer_manager: APM,
) -> Self
where
APM::Target: APeerManager,
{
let peer_manager_wake = Arc::new(move || peer_manager.as_ref().process_events());
Self {
source, spawn, gossiper, peer_manager_wake,
source,
spawn,
gossiper,
peer_manager_wake,
block_cache: Arc::new(Mutex::new(VecDeque::with_capacity(BLOCK_CACHE_SIZE))),
}
}

async fn retrieve_utxo(
source: Blocks, block_cache: Arc<Mutex<VecDeque<(u32, Block)>>>, short_channel_id: u64
source: Blocks, block_cache: Arc<Mutex<VecDeque<(u32, Block)>>>, short_channel_id: u64,
) -> Result<TxOut, UtxoLookupError> {
let block_height = (short_channel_id >> 5 * 8) as u32; // block height is most significant three bytes
let transaction_index = ((short_channel_id >> 2 * 8) & 0xffffff) as u32;
let output_index = (short_channel_id & 0xffff) as u16;

let (outpoint, output);

'tx_found: loop { // Used as a simple goto
'tx_found: loop {
// Used as a simple goto
macro_rules! process_block {
($block: expr) => { {
($block: expr) => {{
if transaction_index as usize >= $block.txdata.len() {
return Err(UtxoLookupError::UnknownTx);
}
Expand All @@ -188,7 +201,7 @@ impl<S: FutureSpawner,

outpoint = OutPoint::new(transaction.txid(), output_index.into());
output = transaction.output[output_index as usize].clone();
} }
}};
}
{
let recent_blocks = block_cache.lock().unwrap();
Expand All @@ -202,8 +215,8 @@ impl<S: FutureSpawner,

let ((_, tip_height_opt), block_hash) =
Joiner::new(source.get_best_block(), source.get_block_hash_by_height(block_height))
.await
.map_err(|_| UtxoLookupError::UnknownTx)?;
.await
.map_err(|_| UtxoLookupError::UnknownTx)?;
if let Some(tip_height) = tip_height_opt {
// If the block doesn't yet have five confirmations, error out.
//
Expand All @@ -214,8 +227,8 @@ impl<S: FutureSpawner,
return Err(UtxoLookupError::UnknownTx);
}
}
let block_data = source.get_block(&block_hash).await
.map_err(|_| UtxoLookupError::UnknownTx)?;
let block_data =
source.get_block(&block_hash).await.map_err(|_| UtxoLookupError::UnknownTx)?;
let block = match block_data {
BlockData::HeaderOnly(_) => return Err(UtxoLookupError::UnknownTx),
BlockData::FullBlock(block) => block,
Expand All @@ -237,7 +250,7 @@ impl<S: FutureSpawner,
}
}
break 'tx_found;
};
}
let outpoint_unspent =
source.is_output_unspent(outpoint).await.map_err(|_| UtxoLookupError::UnknownTx)?;
if outpoint_unspent {
Expand All @@ -248,22 +261,21 @@ impl<S: FutureSpawner,
}
}

impl<S: FutureSpawner,
Blocks: Deref + Send + Sync + Clone,
L: Deref + Send + Sync,
> Deref for GossipVerifier<S, Blocks, L> where
impl<S: FutureSpawner, Blocks: Deref + Send + Sync + Clone, L: Deref + Send + Sync> Deref
for GossipVerifier<S, Blocks, L>
where
Blocks::Target: UtxoSource,
L::Target: Logger,
{
type Target = Self;
fn deref(&self) -> &Self { self }
fn deref(&self) -> &Self {
self
}
}


impl<S: FutureSpawner,
Blocks: Deref + Send + Sync + Clone,
L: Deref + Send + Sync,
> UtxoLookup for GossipVerifier<S, Blocks, L> where
impl<S: FutureSpawner, Blocks: Deref + Send + Sync + Clone, L: Deref + Send + Sync> UtxoLookup
for GossipVerifier<S, Blocks, L>
where
Blocks::Target: UtxoSource,
L::Target: Logger,
{
Expand Down
Loading