|
| 1 | +use std::collections::HashSet; |
| 2 | +use std::str::FromStr; |
| 3 | +use std::time::Instant; |
| 4 | + |
| 5 | +use anyhow::Context; |
| 6 | +use bdk_chain::bitcoin::{secp256k1::Secp256k1, Network}; |
| 7 | +use bdk_chain::indexer::keychain_txout::KeychainTxOutIndex; |
| 8 | +use bdk_chain::miniscript::Descriptor; |
| 9 | +use bdk_chain::{ |
| 10 | + Anchor, BlockId, CanonicalizationParams, CanonicalizationTask, ChainQuery, |
| 11 | + ConfirmationBlockTime, IndexedTxGraph, SpkIterator, |
| 12 | +}; |
| 13 | +use bdk_testenv::anyhow; |
| 14 | +use bip157::chain::{BlockHeaderChanges, ChainState}; |
| 15 | +use bip157::messages::Event; |
| 16 | +use bip157::{error::FetchBlockError, Builder, Client, HeaderCheckpoint, Requester}; |
| 17 | +use tracing::{debug, error, info, warn}; |
| 18 | + |
| 19 | +// This example shows how to use Kyoto (BIP157/158) with ChainOracle |
| 20 | +// to handle canonicalization without storing all chain data locally. |
| 21 | + |
| 22 | +const EXTERNAL: &str = "tr([83737d5e/86'/1'/0']tpubDDR5GgtoxS8fJyjjvdahN4VzV5DV6jtbcyvVXhEKq2XtpxjxBXmxH3r8QrNbQqHg4bJM1EGkxi7Pjfkgnui9jQWqS7kxHvX6rhUeriLDKxz/0/*)"; |
| 23 | +const INTERNAL: &str = "tr([83737d5e/86'/1'/0']tpubDDR5GgtoxS8fJyjjvdahN4VzV5DV6jtbcyvVXhEKq2XtpxjxBXmxH3r8QrNbQqHg4bJM1EGkxi7Pjfkgnui9jQWqS7kxHvX6rhUeriLDKxz/1/*)"; |
| 24 | +const SPK_COUNT: u32 = 25; |
| 25 | + |
| 26 | +const NETWORK: Network = Network::Signet; |
| 27 | +const START_HEIGHT: u32 = 201_000; |
| 28 | +const START_HASH: &str = "0000002238d05b522875f9edc4c9f418dd89ccfde7e4c305e8448a87a5dc71b7"; |
| 29 | + |
| 30 | +/// `KyotoOracle`` uses Kyoto's requester for on-demand chain queries |
| 31 | +/// It doesn't implement `ChainOracle` trait since that's synchronous and we need async |
| 32 | +pub struct KyotoOracle { |
| 33 | + /// Requester to fetch blocks on-demand |
| 34 | + requester: Requester, |
| 35 | + /// Current chain tip |
| 36 | + chain_tip: BlockId, |
| 37 | +} |
| 38 | + |
| 39 | +impl KyotoOracle { |
| 40 | + pub fn new(requester: Requester, chain_tip: BlockId) -> Self { |
| 41 | + Self { |
| 42 | + requester, |
| 43 | + chain_tip, |
| 44 | + } |
| 45 | + } |
| 46 | + |
| 47 | + /// Get the current chain tip |
| 48 | + pub fn get_chain_tip(&self) -> BlockId { |
| 49 | + self.chain_tip |
| 50 | + } |
| 51 | + |
| 52 | + /// Canonicalize a transaction graph using async on-demand queries to Kyoto |
| 53 | + pub async fn canonicalize<A: Anchor>( |
| 54 | + &self, |
| 55 | + mut task: CanonicalizationTask<'_, A>, |
| 56 | + ) -> bdk_chain::CanonicalView<A> { |
| 57 | + // Process all queries from the task |
| 58 | + while let Some(request) = task.next_query() { |
| 59 | + // Check each block_id against the chain to find the best one |
| 60 | + let mut best_block = None; |
| 61 | + |
| 62 | + for block_id in &request.block_ids { |
| 63 | + // Check if block is in chain by fetching it on-demand |
| 64 | + match self.is_block_in_chain(*block_id).await { |
| 65 | + Ok(true) => { |
| 66 | + best_block = Some(*block_id); |
| 67 | + break; // Found a confirmed block |
| 68 | + } |
| 69 | + Ok(false) => continue, // Not in chain, check next |
| 70 | + Err(_) => continue, // Error fetching, skip this one |
| 71 | + } |
| 72 | + } |
| 73 | + |
| 74 | + task.resolve_query(best_block); |
| 75 | + } |
| 76 | + |
| 77 | + // Finish and return the canonical view |
| 78 | + task.finish() |
| 79 | + } |
| 80 | + |
| 81 | + /// Check if a block is in the chain by fetching it on-demand from Kyoto |
| 82 | + async fn is_block_in_chain(&self, block: BlockId) -> Result<bool, FetchBlockError> { |
| 83 | + // Check if the requested block height is within range |
| 84 | + if block.height > self.chain_tip.height { |
| 85 | + return Ok(false); |
| 86 | + } |
| 87 | + |
| 88 | + // Try to fetch the block by its hash |
| 89 | + // If it exists and the height matches, it's in the chain |
| 90 | + match self.requester.get_block(block.hash).await { |
| 91 | + Ok(indexed_block) => { |
| 92 | + // Verify the height matches what we expect |
| 93 | + Ok(indexed_block.height == block.height) |
| 94 | + } |
| 95 | + Err(FetchBlockError::UnknownHash) => Ok(false), |
| 96 | + Err(e) => Err(e), |
| 97 | + } |
| 98 | + } |
| 99 | +} |
| 100 | + |
| 101 | +#[tokio::main] |
| 102 | +async fn main() -> anyhow::Result<()> { |
| 103 | + // Initialize tracing |
| 104 | + tracing_subscriber::fmt::init(); |
| 105 | + |
| 106 | + // Setup descriptors and graph |
| 107 | + let secp = Secp256k1::new(); |
| 108 | + let (descriptor, _) = Descriptor::parse_descriptor(&secp, EXTERNAL)?; |
| 109 | + let (change_descriptor, _) = Descriptor::parse_descriptor(&secp, INTERNAL)?; |
| 110 | + |
| 111 | + let mut graph = IndexedTxGraph::<ConfirmationBlockTime, KeychainTxOutIndex<&str>>::new({ |
| 112 | + let mut index = KeychainTxOutIndex::default(); |
| 113 | + index.insert_descriptor("external", descriptor.clone())?; |
| 114 | + index.insert_descriptor("internal", change_descriptor.clone())?; |
| 115 | + index |
| 116 | + }); |
| 117 | + |
| 118 | + // Collect scripts to watch |
| 119 | + let mut spks = HashSet::new(); |
| 120 | + for (_, desc) in graph.index.keychains() { |
| 121 | + spks.extend(SpkIterator::new_with_range(desc, 0..SPK_COUNT).map(|(_, s)| s)); |
| 122 | + } |
| 123 | + |
| 124 | + // Build Kyoto node with checkpoint |
| 125 | + let checkpoint = HeaderCheckpoint::new( |
| 126 | + START_HEIGHT, |
| 127 | + bitcoin::BlockHash::from_str(START_HASH).context("invalid checkpoint hash")?, |
| 128 | + ); |
| 129 | + |
| 130 | + let builder = Builder::new(NETWORK); |
| 131 | + let (node, client) = builder |
| 132 | + .chain_state(ChainState::Checkpoint(checkpoint)) |
| 133 | + .required_peers(1) |
| 134 | + .build(); |
| 135 | + |
| 136 | + // Run the node in background |
| 137 | + tokio::task::spawn(async move { node.run().await }); |
| 138 | + |
| 139 | + let Client { |
| 140 | + requester, |
| 141 | + mut info_rx, |
| 142 | + mut warn_rx, |
| 143 | + mut event_rx, |
| 144 | + } = client; |
| 145 | + |
| 146 | + let start = Instant::now(); |
| 147 | + #[allow(unused_assignments)] |
| 148 | + let mut chain_tip = BlockId { |
| 149 | + height: 0, |
| 150 | + hash: bitcoin::constants::genesis_block(bitcoin::Network::Signet).block_hash(), |
| 151 | + }; |
| 152 | + let mut matched_blocks_count = 0; |
| 153 | + |
| 154 | + info!("Starting sync with Kyoto..."); |
| 155 | + |
| 156 | + // Event loop to process filters and apply matching blocks immediately |
| 157 | + #[allow(unused_assignments)] |
| 158 | + #[allow(clippy::incompatible_msrv)] |
| 159 | + loop { |
| 160 | + tokio::select! { |
| 161 | + info_msg = info_rx.recv() => { |
| 162 | + if let Some(info_msg) = info_msg { |
| 163 | + info!("Kyoto: {}", info_msg); |
| 164 | + } |
| 165 | + } |
| 166 | + warn_msg = warn_rx.recv() => { |
| 167 | + if let Some(warn_msg) = warn_msg { |
| 168 | + warn!("Kyoto: {}", warn_msg); |
| 169 | + } |
| 170 | + } |
| 171 | + event = event_rx.recv() => { |
| 172 | + if let Some(event) = event { |
| 173 | + match event { |
| 174 | + Event::IndexedFilter(filter) => { |
| 175 | + let height = filter.height(); |
| 176 | + if filter.contains_any(spks.iter()) { |
| 177 | + let hash = filter.block_hash(); |
| 178 | + info!("Matched filter at height {}", height); |
| 179 | + match requester.get_block(hash).await { |
| 180 | + Ok(indexed_block) => { |
| 181 | + // Apply block immediately to the graph |
| 182 | + let _ = graph.apply_block_relevant(&indexed_block.block, indexed_block.height); |
| 183 | + matched_blocks_count += 1; |
| 184 | + debug!("Applied block at height {}", indexed_block.height); |
| 185 | + } |
| 186 | + Err(e) => { |
| 187 | + error!("Failed to fetch block {}: {}", hash, e); |
| 188 | + } |
| 189 | + } |
| 190 | + } |
| 191 | + }, |
| 192 | + Event::ChainUpdate(changes) => { |
| 193 | + match &changes { |
| 194 | + BlockHeaderChanges::Connected(header) => { |
| 195 | + // Update chain tip on each new header |
| 196 | + chain_tip = BlockId { |
| 197 | + height: header.height, |
| 198 | + hash: header.block_hash(), |
| 199 | + }; |
| 200 | + if header.height % 1000 == 0 { |
| 201 | + info!("Synced to height {}", header.height); |
| 202 | + } |
| 203 | + } |
| 204 | + BlockHeaderChanges::Reorganized { accepted, .. } => { |
| 205 | + // On reorg, update to the new tip (last in accepted) |
| 206 | + if let Some(header) = accepted.last() { |
| 207 | + chain_tip = BlockId { |
| 208 | + height: header.height, |
| 209 | + hash: header.block_hash(), |
| 210 | + }; |
| 211 | + warn!("Reorg to height {}", header.height); |
| 212 | + } |
| 213 | + } |
| 214 | + BlockHeaderChanges::ForkAdded(_) => { |
| 215 | + // Ignore forks that are not on the main chain |
| 216 | + debug!("Fork detected, ignoring"); |
| 217 | + } |
| 218 | + } |
| 219 | + } |
| 220 | + Event::FiltersSynced(sync_update) => { |
| 221 | + let tip = sync_update.tip(); |
| 222 | + chain_tip = BlockId { |
| 223 | + height: tip.height, |
| 224 | + hash: tip.hash, |
| 225 | + }; |
| 226 | + info!("Filters synced! Tip: height={}, hash={}", tip.height, tip.hash); |
| 227 | + break; |
| 228 | + } |
| 229 | + _ => (), |
| 230 | + } |
| 231 | + } |
| 232 | + } |
| 233 | + } |
| 234 | + } |
| 235 | + |
| 236 | + info!("Sync completed in {}s", start.elapsed().as_secs()); |
| 237 | + info!("Found and applied {} matching blocks", matched_blocks_count); |
| 238 | + |
| 239 | + info!( |
| 240 | + "Chain tip: height={}, hash={}", |
| 241 | + chain_tip.height, chain_tip.hash |
| 242 | + ); |
| 243 | + |
| 244 | + // Create KyotoOracle with requester for on-demand queries |
| 245 | + let oracle = KyotoOracle::new(requester.clone(), chain_tip); |
| 246 | + |
| 247 | + // Canonicalize TxGraph with KyotoOracle |
| 248 | + info!("Performing canonicalization using KyotoOracle..."); |
| 249 | + let task = graph.canonicalization_task(chain_tip, CanonicalizationParams::default()); |
| 250 | + let canonical_view = oracle.canonicalize(task).await; |
| 251 | + |
| 252 | + // Display unspent outputs |
| 253 | + let unspent: Vec<_> = canonical_view |
| 254 | + .filter_unspent_outpoints(graph.index.outpoints().clone()) |
| 255 | + .collect(); |
| 256 | + |
| 257 | + if !unspent.is_empty() { |
| 258 | + info!("Found {} unspent outputs:", unspent.len()); |
| 259 | + for (index, utxo) in unspent { |
| 260 | + info!("{:?} | {} | {}", index, utxo.txout.value, utxo.outpoint); |
| 261 | + } |
| 262 | + } else { |
| 263 | + info!("No unspent outputs found"); |
| 264 | + } |
| 265 | + |
| 266 | + // Verify all canonical transactions are confirmed |
| 267 | + for canon_tx in canonical_view.txs() { |
| 268 | + if !canon_tx.pos.is_confirmed() { |
| 269 | + error!("Canonical tx should be confirmed: {}", canon_tx.txid); |
| 270 | + } |
| 271 | + } |
| 272 | + |
| 273 | + let _ = requester.shutdown(); |
| 274 | + info!("Shutdown complete"); |
| 275 | + |
| 276 | + Ok(()) |
| 277 | +} |
0 commit comments