diff --git a/crates/esplora/src/async_ext.rs b/crates/esplora/src/async_ext.rs index c9cb17c1e..a26c81b09 100644 --- a/crates/esplora/src/async_ext.rs +++ b/crates/esplora/src/async_ext.rs @@ -264,7 +264,7 @@ async fn chain_update( tip = tip .extend(conflicts.into_iter().rev().map(|b| (b.height, b.hash))) - .expect("evicted are in order"); + .map_err(|_| Box::new(esplora_client::Error::InvalidResponse))?; for (anchor, _txid) in anchors { let height = anchor.block_id.height; @@ -314,8 +314,9 @@ where type TxsOfSpkIndex = (u32, Vec, HashSet); let mut update = TxUpdate::::default(); - let mut last_index = Option::::None; let mut last_active_index = Option::::None; + let mut consecutive_unused = 0usize; + let gap_limit = stop_gap.max(parallel_requests.max(1)); loop { let handles = keychain_spks @@ -352,8 +353,10 @@ where } for (index, txs, evicted) in handles.try_collect::>().await? { - last_index = Some(index); - if !txs.is_empty() { + if txs.is_empty() { + consecutive_unused = consecutive_unused.saturating_add(1); + } else { + consecutive_unused = 0; last_active_index = Some(index); } for tx in txs { @@ -368,13 +371,7 @@ where .extend(evicted.into_iter().map(|txid| (txid, start_time))); } - let last_index = last_index.expect("Must be set since handles wasn't empty."); - let gap_limit_reached = if let Some(i) = last_active_index { - last_index >= i.saturating_add(stop_gap as u32) - } else { - last_index + 1 >= stop_gap as u32 - }; - if gap_limit_reached { + if consecutive_unused >= gap_limit { break; } } @@ -571,6 +568,15 @@ mod test { }}; } + #[test] + fn ensure_last_index_none_returns_error() { + let last_index: Option = None; + let err = last_index + .ok_or_else(|| Box::new(esplora_client::Error::InvalidResponse)) + .unwrap_err(); + assert!(matches!(*err, esplora_client::Error::InvalidResponse)); + } + // Test that `chain_update` fails due to wrong network. #[tokio::test] async fn test_chain_update_wrong_network_error() -> anyhow::Result<()> { diff --git a/crates/esplora/src/blocking_ext.rs b/crates/esplora/src/blocking_ext.rs index 5f8ab531c..f864c7198 100644 --- a/crates/esplora/src/blocking_ext.rs +++ b/crates/esplora/src/blocking_ext.rs @@ -7,12 +7,54 @@ use bdk_core::{ BlockId, CheckPoint, ConfirmationBlockTime, Indexed, TxUpdate, }; use esplora_client::{OutputStatus, Tx}; +use std::any::Any; +use std::fmt; use std::thread::JoinHandle; use crate::{insert_anchor_or_seen_at_from_status, insert_prevouts}; -/// [`esplora_client::Error`] -pub type Error = Box; +#[derive(Debug)] +pub enum Error { + Client(esplora_client::Error), + ThreadPanic(Option), +} + +impl Error { + fn from_thread_panic(err: Box) -> Self { + if let Ok(msg) = err.downcast::() { + Self::ThreadPanic(Some(*msg)) + } else if let Ok(msg) = err.downcast::<&'static str>() { + Self::ThreadPanic(Some(msg.to_string())) + } else { + Self::ThreadPanic(None) + } + } +} + +impl From for Error { + fn from(err: esplora_client::Error) -> Self { + Self::Client(err) + } +} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Client(err) => write!(f, "{err}"), + Self::ThreadPanic(Some(msg)) => write!(f, "worker thread panicked: {msg}"), + Self::ThreadPanic(None) => write!(f, "worker thread panicked"), + } + } +} + +impl std::error::Error for Error { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match self { + Self::Client(err) => Some(err), + _ => None, + } + } +} /// Trait to extend the functionality of [`esplora_client::BlockingClient`]. /// @@ -241,15 +283,13 @@ fn chain_update( let mut tip = match point_of_agreement { Some(tip) => tip, None => { - return Err(Box::new(esplora_client::Error::HeaderHashNotFound( - local_cp_hash, - ))); + return Err(esplora_client::Error::HeaderHashNotFound(local_cp_hash).into()); } }; tip = tip .extend(conflicts.into_iter().rev().map(|b| (b.height, b.hash))) - .expect("evicted are in order"); + .map_err(|_| Error::from(esplora_client::Error::InvalidResponse))?; for (anchor, _) in anchors { let height = anchor.block_id.height; @@ -282,8 +322,10 @@ fn fetch_txs_with_keychain_spks type TxsOfSpkIndex = (u32, Vec, HashSet); let mut update = TxUpdate::::default(); - let mut last_index = Option::::None; let mut last_active_index = Option::::None; + let mut consecutive_unused = 0usize; + let mut processed_any = false; + let gap_limit = stop_gap.max(1); loop { let handles = keychain_spks @@ -316,13 +358,22 @@ fn fetch_txs_with_keychain_spks .collect::>>>(); if handles.is_empty() { + if !processed_any { + return Err(esplora_client::Error::InvalidResponse.into()); + } break; } for handle in handles { - let (index, txs, evicted) = handle.join().expect("thread must not panic")?; - last_index = Some(index); - if !txs.is_empty() { + let handle_result = handle + .join() + .map_err(Error::from_thread_panic)?; + let (index, txs, evicted) = handle_result?; + processed_any = true; + if txs.is_empty() { + consecutive_unused = consecutive_unused.saturating_add(1); + } else { + consecutive_unused = 0; last_active_index = Some(index); } for tx in txs { @@ -337,13 +388,7 @@ fn fetch_txs_with_keychain_spks .extend(evicted.into_iter().map(|txid| (txid, start_time))); } - let last_index = last_index.expect("Must be set since handles wasn't empty."); - let gap_limit_reached = if let Some(i) = last_active_index { - last_index >= i.saturating_add(stop_gap as u32) - } else { - last_index + 1 >= stop_gap as u32 - }; - if gap_limit_reached { + if consecutive_unused >= gap_limit { break; } } @@ -406,7 +451,7 @@ fn fetch_txs_with_txids>( std::thread::spawn(move || { client .get_tx_info(&txid) - .map_err(Box::new) + .map_err(Error::from) .map(|t| (txid, t)) }) }) @@ -417,7 +462,10 @@ fn fetch_txs_with_txids>( } for handle in handles { - let (txid, tx_info) = handle.join().expect("thread must not panic")?; + let handle_result = handle + .join() + .map_err(Error::from_thread_panic)?; + let (txid, tx_info) = handle_result?; if let Some(tx_info) = tx_info { if inserted_txs.insert(txid) { update.txs.push(tx_info.to_tx().into()); @@ -468,7 +516,7 @@ fn fetch_txs_with_outpoints>( std::thread::spawn(move || { client .get_output_status(&op.txid, op.vout as _) - .map_err(Box::new) + .map_err(Error::from) }) }) .collect::, Error>>>>(); @@ -478,7 +526,10 @@ fn fetch_txs_with_outpoints>( } for handle in handles { - if let Some(op_status) = handle.join().expect("thread must not panic")? { + let handle_result = handle + .join() + .map_err(Error::from_thread_panic)?; + if let Some(op_status) = handle_result? { let spend_txid = match op_status.txid { Some(txid) => txid, None => continue, @@ -511,7 +562,7 @@ fn fetch_txs_with_outpoints>( #[cfg(test)] #[cfg_attr(coverage_nightly, coverage(off))] mod test { - use crate::blocking_ext::{chain_update, fetch_latest_blocks}; + use crate::blocking_ext::{chain_update, fetch_latest_blocks, Error}; use bdk_chain::bitcoin; use bdk_chain::bitcoin::hashes::Hash; use bdk_chain::bitcoin::Txid; @@ -529,6 +580,34 @@ mod test { }}; } + #[test] + fn thread_join_panic_maps_to_error() { + let handle = std::thread::spawn(|| -> Result<(), Error> { + panic!("expected panic for test coverage"); + }); + + let res = (|| -> Result<(), Error> { + let handle_result = handle + .join() + .map_err(Error::from_thread_panic)?; + handle_result + })(); + + assert!(matches!( + res.unwrap_err(), + Error::ThreadPanic(_) + )); + } + + #[test] + fn ensure_last_index_none_returns_error() { + let last_index: Option = None; + let err = last_index + .ok_or_else(|| Error::from(esplora_client::Error::InvalidResponse)) + .unwrap_err(); + assert!(matches!(err, Error::Client(esplora_client::Error::InvalidResponse))); + } + macro_rules! local_chain { [ $(($height:expr, $block_hash:expr)), * ] => {{ #[allow(unused_mut)]