From 0d4094fe31ff30974cb15d2203a44bcfd16308cf Mon Sep 17 00:00:00 2001 From: incrypto32 Date: Fri, 31 Jan 2025 16:58:11 +0400 Subject: [PATCH] Subgraph Composition: Use Firehose endpoint to load blocks for subgraph triggers --- chain/ethereum/src/adapter.rs | 8 - chain/ethereum/src/chain.rs | 348 ++++++++++++++++++++++--- chain/ethereum/src/ethereum_adapter.rs | 132 ++++------ graph/src/blockchain/mock.rs | 120 ++++++++- graph/src/blockchain/types.rs | 67 ++++- graph/src/firehose/endpoints.rs | 120 ++++++++- 6 files changed, 652 insertions(+), 143 deletions(-) diff --git a/chain/ethereum/src/adapter.rs b/chain/ethereum/src/adapter.rs index 1ccdb3d8f2d..f78ff1b0bec 100644 --- a/chain/ethereum/src/adapter.rs +++ b/chain/ethereum/src/adapter.rs @@ -1,7 +1,6 @@ use anyhow::Error; use ethabi::{Error as ABIError, Function, ParamType, Token}; use graph::blockchain::ChainIdentifier; -use graph::blockchain::ExtendedBlockPtr; use graph::components::subgraph::MappingError; use graph::data::store::ethereum::call; use graph::firehose::CallToFilter; @@ -1110,13 +1109,6 @@ pub trait EthereumAdapter: Send + Sync + 'static { block_hash: H256, ) -> Box + Send>; - async fn load_block_ptrs_by_numbers( - &self, - _logger: Logger, - _chain_store: Arc, - _block_numbers: HashSet, - ) -> Box, Error = Error> + Send>; - /// Load Ethereum blocks in bulk, returning results as they come back as a Stream. /// May use the `chain_store` as a cache. async fn load_blocks( diff --git a/chain/ethereum/src/chain.rs b/chain/ethereum/src/chain.rs index a4a8598e8ed..a4e1357de52 100644 --- a/chain/ethereum/src/chain.rs +++ b/chain/ethereum/src/chain.rs @@ -11,11 +11,13 @@ use graph::components::store::{DeploymentCursorTracker, SourceableStore}; use graph::data::subgraph::UnifiedMappingApiVersion; use graph::firehose::{FirehoseEndpoint, ForkStep}; use graph::futures03::compat::Future01CompatExt; +use graph::futures03::TryStreamExt; use graph::prelude::{ BlockHash, ComponentLoggerConfig, ElasticComponentLoggerConfig, EthereumBlock, EthereumCallCache, LightEthereumBlock, LightEthereumBlockExt, MetricsRegistry, }; use graph::schema::InputSchema; +use graph::slog::{debug, error, trace}; use graph::substreams::Clock; use graph::{ blockchain::{ @@ -38,6 +40,7 @@ use graph::{ }; use prost::Message; use std::collections::HashSet; +use std::future::Future; use std::iter::FromIterator; use std::sync::Arc; use std::time::Duration; @@ -242,7 +245,7 @@ impl BlockRefetcher for EthereumBlockRefetcher { logger: &Logger, cursor: FirehoseCursor, ) -> Result { - let endpoint = chain.chain_client().firehose_endpoint().await?; + let endpoint: Arc = chain.chain_client().firehose_endpoint().await?; let block = endpoint.get_block::(cursor, logger).await?; let ethereum_block: EthereumBlockWithCalls = (&block).try_into()?; Ok(BlockFinality::NonFinal(ethereum_block)) @@ -713,13 +716,17 @@ impl Block for BlockFinality { } fn timestamp(&self) -> BlockTime { - let ts = match self { - BlockFinality::Final(block) => block.timestamp, - BlockFinality::NonFinal(block) => block.ethereum_block.block.timestamp, + match self { + BlockFinality::Final(block) => { + let ts = i64::try_from(block.timestamp.as_u64()).unwrap(); + BlockTime::since_epoch(ts, 0) + } + BlockFinality::NonFinal(block) => { + let ts = i64::try_from(block.ethereum_block.block.timestamp.as_u64()).unwrap(); + BlockTime::since_epoch(ts, 0) + } BlockFinality::Ptr(block) => block.timestamp, - }; - let ts = i64::try_from(ts.as_u64()).unwrap(); - BlockTime::since_epoch(ts, 0) + } } } @@ -734,6 +741,81 @@ pub struct TriggersAdapter { unified_api_version: UnifiedMappingApiVersion, } +/// Fetches blocks from the cache based on block numbers, excluding duplicates +/// (i.e., multiple blocks for the same number), and identifying missing blocks that +/// need to be fetched via RPC/Firehose. Returns a tuple of the found blocks and the missing block numbers. +async fn fetch_unique_blocks_from_cache( + logger: &Logger, + chain_store: Arc, + block_numbers: HashSet, +) -> (Vec>, Vec) { + // Load blocks from the cache + let blocks_map = chain_store + .cheap_clone() + .block_ptrs_by_numbers(block_numbers.iter().map(|&b| b.into()).collect::>()) + .await + .map_err(|e| { + error!(logger, "Error accessing block cache {}", e); + e + }) + .unwrap_or_default(); + + // Collect blocks and filter out ones with multiple entries + let blocks: Vec> = blocks_map + .into_iter() + .filter_map(|(_, values)| { + if values.len() == 1 { + Some(Arc::new(values[0].clone())) + } else { + None + } + }) + .collect(); + + // Identify missing blocks + let missing_blocks: Vec = block_numbers + .into_iter() + .filter(|&number| !blocks.iter().any(|block| block.block_number() == number)) + .collect(); + + if !missing_blocks.is_empty() { + debug!( + logger, + "Loading {} block(s) not in the block cache", + missing_blocks.len() + ); + debug!(logger, "Missing blocks {:?}", missing_blocks); + } + + (blocks, missing_blocks) +} + +/// Fetches blocks by their numbers, first attempting to load from cache. +/// Missing blocks are retrieved from an external source, with all blocks sorted and converted to `BlockFinality` format. +async fn load_blocks( + logger: &Logger, + chain_store: Arc, + block_numbers: HashSet, + fetch_missing: F, +) -> Result> +where + F: FnOnce(Vec) -> Fut, + Fut: Future>>>, +{ + // Fetch cached blocks and identify missing ones + let (mut cached_blocks, missing_block_numbers) = + fetch_unique_blocks_from_cache(logger, chain_store, block_numbers).await; + + // Fetch missing blocks if any + if !missing_block_numbers.is_empty() { + let missing_blocks = fetch_missing(missing_block_numbers).await?; + cached_blocks.extend(missing_blocks); + cached_blocks.sort_by_key(|block| block.number); + } + + Ok(cached_blocks.into_iter().map(BlockFinality::Ptr).collect()) +} + #[async_trait] impl TriggersAdapterTrait for TriggersAdapter { async fn scan_triggers( @@ -763,23 +845,70 @@ impl TriggersAdapterTrait for TriggersAdapter { logger: Logger, block_numbers: HashSet, ) -> Result> { - use graph::futures01::stream::Stream; + match &*self.chain_client { + ChainClient::Firehose(endpoints) => { + trace!( + logger, + "Loading blocks from firehose"; + "block_numbers" => format!("{:?}", block_numbers) + ); - let adapter = self - .chain_client - .rpc()? - .cheapest_with(&self.capabilities) - .await?; + let endpoint = endpoints.endpoint().await?; + let chain_store = self.chain_store.clone(); + let logger_clone = logger.clone(); + + load_blocks( + &logger, + chain_store, + block_numbers, + |missing_numbers| async move { + let blocks = endpoint + .load_blocks_by_numbers::( + missing_numbers.iter().map(|&n| n as u64).collect(), + &logger_clone, + ) + .await? + .into_iter() + .map(|block| { + Arc::new(ExtendedBlockPtr { + hash: block.hash(), + number: block.number(), + parent_hash: block.parent_hash().unwrap_or_default(), + timestamp: block.timestamp(), + }) + }) + .collect::>(); + Ok(blocks) + }, + ) + .await + } - let blocks = adapter - .load_block_ptrs_by_numbers(logger, self.chain_store.clone(), block_numbers) - .await - .map(|block| BlockFinality::Ptr(block)) - .collect() - .compat() - .await?; + ChainClient::Rpc(client) => { + trace!( + logger, + "Loading blocks from RPC"; + "block_numbers" => format!("{:?}", block_numbers) + ); - Ok(blocks) + let adapter = client.cheapest_with(&self.capabilities).await?; + let chain_store = self.chain_store.clone(); + let logger_clone = logger.clone(); + + load_blocks( + &logger, + chain_store, + block_numbers, + |missing_numbers| async move { + adapter + .load_block_ptrs_by_numbers_rpc(logger_clone, missing_numbers) + .try_collect() + .await + }, + ) + .await + } + } } async fn chain_head_ptr(&self) -> Result, Error> { @@ -840,13 +969,25 @@ impl TriggersAdapterTrait for TriggersAdapter { } async fn is_on_main_chain(&self, ptr: BlockPtr) -> Result { - self.chain_client - .rpc()? - .cheapest() - .await - .ok_or(anyhow!("unable to get adapter for is_on_main_chain"))? - .is_on_main_chain(&self.logger, ptr.clone()) - .await + match &*self.chain_client { + ChainClient::Firehose(endpoints) => { + let endpoint = endpoints.endpoint().await?; + let block = endpoint + .get_block_by_number::(ptr.number as u64, &self.logger) + .await + .map_err(|e| anyhow!("Failed to fetch block from firehose: {}", e))?; + + Ok(block.hash() == ptr.hash) + } + ChainClient::Rpc(adapter) => { + let adapter = adapter + .cheapest() + .await + .ok_or_else(|| anyhow!("unable to get adapter for is_on_main_chain"))?; + + adapter.is_on_main_chain(&self.logger, ptr).await + } + } } async fn ancestor_block( @@ -876,10 +1017,18 @@ impl TriggersAdapterTrait for TriggersAdapter { use graph::prelude::LightEthereumBlockExt; let block = match self.chain_client.as_ref() { - ChainClient::Firehose(_) => Some(BlockPtr { - hash: BlockHash::from(vec![0xff; 32]), - number: block.number.saturating_sub(1), - }), + ChainClient::Firehose(endpoints) => { + let endpoint = endpoints.endpoint().await?; + let block = endpoint + .get_block_by_ptr::(block, &self.logger) + .await + .context(format!( + "Failed to fetch block by ptr {} from firehose, backtrace: {}", + block, + std::backtrace::Backtrace::force_capture() + ))?; + block.parent_ptr() + } ChainClient::Rpc(adapters) => { let blocks = adapters .cheapest_with(&self.capabilities) @@ -1044,3 +1193,138 @@ impl FirehoseMapperTrait for FirehoseMapper { .await } } + +#[cfg(test)] +mod tests { + use graph::blockchain::mock::MockChainStore; + use graph::{slog, tokio}; + + use super::*; + use std::collections::HashSet; + use std::sync::Arc; + + // Helper function to create test blocks + fn create_test_block(number: BlockNumber, hash: &str) -> ExtendedBlockPtr { + let hash = BlockHash(hash.as_bytes().to_vec().into_boxed_slice()); + let ptr = BlockPtr::new(hash.clone(), number); + ExtendedBlockPtr { + hash, + number, + parent_hash: BlockHash(vec![0; 32].into_boxed_slice()), + timestamp: BlockTime::for_test(&ptr), + } + } + + #[tokio::test] + async fn test_fetch_unique_blocks_single_block() { + let logger = Logger::root(slog::Discard, o!()); + let mut chain_store = MockChainStore::default(); + + // Add a single block + let block = create_test_block(1, "block1"); + chain_store.blocks.insert(1, vec![block.clone()]); + + let block_numbers: HashSet<_> = vec![1].into_iter().collect(); + + let (blocks, missing) = + fetch_unique_blocks_from_cache(&logger, Arc::new(chain_store), block_numbers).await; + + assert_eq!(blocks.len(), 1); + assert_eq!(blocks[0].number, 1); + assert!(missing.is_empty()); + } + + #[tokio::test] + async fn test_fetch_unique_blocks_duplicate_blocks() { + let logger = Logger::root(slog::Discard, o!()); + let mut chain_store = MockChainStore::default(); + + // Add multiple blocks for the same number + let block1 = create_test_block(1, "block1a"); + let block2 = create_test_block(1, "block1b"); + chain_store + .blocks + .insert(1, vec![block1.clone(), block2.clone()]); + + let block_numbers: HashSet<_> = vec![1].into_iter().collect(); + + let (blocks, missing) = + fetch_unique_blocks_from_cache(&logger, Arc::new(chain_store), block_numbers).await; + + // Should filter out the duplicate block + assert!(blocks.is_empty()); + assert_eq!(missing, vec![1]); + assert_eq!(missing[0], 1); + } + + #[tokio::test] + async fn test_fetch_unique_blocks_missing_blocks() { + let logger = Logger::root(slog::Discard, o!()); + let mut chain_store = MockChainStore::default(); + + // Add block number 1 but not 2 + let block = create_test_block(1, "block1"); + chain_store.blocks.insert(1, vec![block.clone()]); + + let block_numbers: HashSet<_> = vec![1, 2].into_iter().collect(); + + let (blocks, missing) = + fetch_unique_blocks_from_cache(&logger, Arc::new(chain_store), block_numbers).await; + + assert_eq!(blocks.len(), 1); + assert_eq!(blocks[0].number, 1); + assert_eq!(missing, vec![2]); + } + + #[tokio::test] + async fn test_fetch_unique_blocks_multiple_valid_blocks() { + let logger = Logger::root(slog::Discard, o!()); + let mut chain_store = MockChainStore::default(); + + // Add multiple valid blocks + let block1 = create_test_block(1, "block1"); + let block2 = create_test_block(2, "block2"); + chain_store.blocks.insert(1, vec![block1.clone()]); + chain_store.blocks.insert(2, vec![block2.clone()]); + + let block_numbers: HashSet<_> = vec![1, 2].into_iter().collect(); + + let (blocks, missing) = + fetch_unique_blocks_from_cache(&logger, Arc::new(chain_store), block_numbers).await; + + assert_eq!(blocks.len(), 2); + assert!(blocks.iter().any(|b| b.number == 1)); + assert!(blocks.iter().any(|b| b.number == 2)); + assert!(missing.is_empty()); + } + + #[tokio::test] + async fn test_fetch_unique_blocks_mixed_scenario() { + let logger = Logger::root(slog::Discard, o!()); + let mut chain_store = MockChainStore::default(); + + // Add a mix of scenarios: + // - Block 1: Single valid block + // - Block 2: Multiple blocks (duplicate) + // - Block 3: Missing + let block1 = create_test_block(1, "block1"); + let block2a = create_test_block(2, "block2a"); + let block2b = create_test_block(2, "block2b"); + + chain_store.blocks.insert(1, vec![block1.clone()]); + chain_store + .blocks + .insert(2, vec![block2a.clone(), block2b.clone()]); + + let block_numbers: HashSet<_> = vec![1, 2, 3].into_iter().collect(); + + let (blocks, missing) = + fetch_unique_blocks_from_cache(&logger, Arc::new(chain_store), block_numbers).await; + + assert_eq!(blocks.len(), 1); + assert_eq!(blocks[0].number, 1); + assert_eq!(missing.len(), 2); + assert!(missing.contains(&2)); + assert!(missing.contains(&3)); + } +} diff --git a/chain/ethereum/src/ethereum_adapter.rs b/chain/ethereum/src/ethereum_adapter.rs index dc672f1b5d9..71af858fb9f 100644 --- a/chain/ethereum/src/ethereum_adapter.rs +++ b/chain/ethereum/src/ethereum_adapter.rs @@ -784,50 +784,59 @@ impl EthereumAdapter { } /// Request blocks by number through JSON-RPC. - fn load_block_ptrs_by_numbers_rpc( + pub fn load_block_ptrs_by_numbers_rpc( &self, logger: Logger, numbers: Vec, - ) -> impl Stream, Error = Error> + Send { + ) -> impl futures03::Stream, Error>> + Send { let web3 = self.web3.clone(); - stream::iter_ok::<_, Error>(numbers.into_iter().map(move |number| { + futures03::stream::iter(numbers.into_iter().map(move |number| { let web3 = web3.clone(); - retry(format!("load block {}", number), &logger) - .limit(ENV_VARS.request_retries) - .timeout_secs(ENV_VARS.json_rpc_timeout.as_secs()) - .run(move || { - Box::pin( - web3.eth() - .block(BlockId::Number(Web3BlockNumber::Number(number.into()))), - ) - .compat() - .from_err::() - .and_then(move |block| { - block - .map(|block| { - let ptr = ExtendedBlockPtr::try_from(( - block.hash, - block.number, - block.parent_hash, - block.timestamp, - )) - .unwrap(); - - Arc::new(ptr) - }) - .ok_or_else(|| { - anyhow::anyhow!( + let logger = logger.clone(); + + async move { + retry(format!("load block {}", number), &logger) + .limit(ENV_VARS.request_retries) + .timeout_secs(ENV_VARS.json_rpc_timeout.as_secs()) + .run(move || { + let web3 = web3.clone(); + + async move { + let block_result = web3 + .eth() + .block(BlockId::Number(Web3BlockNumber::Number(number.into()))) + .await; + + match block_result { + Ok(Some(block)) => { + let ptr = ExtendedBlockPtr::try_from(( + block.hash, + block.number, + block.parent_hash, + block.timestamp, + )) + .map_err(|e| { + anyhow::anyhow!("Failed to convert block: {}", e) + })?; + Ok(Arc::new(ptr)) + } + Ok(None) => Err(anyhow::anyhow!( "Ethereum node did not find block with number {:?}", number - ) - }) + )), + Err(e) => Err(anyhow::anyhow!("Failed to fetch block: {}", e)), + } + } }) - .compat() - }) - .boxed() - .compat() - .from_err() + .await + .map_err(|e| match e { + TimeoutError::Elapsed => { + anyhow::anyhow!("Timeout while fetching block {}", number) + } + TimeoutError::Inner(e) => e, + }) + } })) .buffered(ENV_VARS.block_ptr_batch_size) } @@ -1700,59 +1709,6 @@ impl EthereumAdapterTrait for EthereumAdapter { Ok(decoded) } - /// Load Ethereum blocks in bulk by number, returning results as they come back as a Stream. - async fn load_block_ptrs_by_numbers( - &self, - logger: Logger, - chain_store: Arc, - block_numbers: HashSet, - ) -> Box, Error = Error> + Send> { - let blocks_map = chain_store - .cheap_clone() - .block_ptrs_by_numbers(block_numbers.iter().map(|&b| b.into()).collect::>()) - .await - .map_err(|e| { - error!(&logger, "Error accessing block cache {}", e); - e - }) - .unwrap_or_default(); - - let mut blocks: Vec> = blocks_map - .into_iter() - .filter_map(|(_number, values)| { - if values.len() == 1 { - Arc::new(values[0].clone()).into() - } else { - None - } - }) - .collect::>(); - - let missing_blocks: Vec = block_numbers - .into_iter() - .filter(|&number| !blocks.iter().any(|block| block.block_number() == number)) - .collect(); - - if !missing_blocks.is_empty() { - debug!( - logger, - "Loading {} block(s) not in the block cache", - missing_blocks.len() - ); - } - - Box::new( - self.load_block_ptrs_by_numbers_rpc(logger.clone(), missing_blocks) - .collect() - .map(move |new_blocks| { - blocks.extend(new_blocks); - blocks.sort_by_key(|block| block.number); - stream::iter_ok(blocks) - }) - .flatten_stream(), - ) - } - /// Load Ethereum blocks in bulk, returning results as they come back as a Stream. async fn load_blocks( &self, diff --git a/graph/src/blockchain/mock.rs b/graph/src/blockchain/mock.rs index 2f1480dd46a..41567db15ae 100644 --- a/graph/src/blockchain/mock.rs +++ b/graph/src/blockchain/mock.rs @@ -6,19 +6,29 @@ use crate::{ subgraph::InstanceDSTemplateInfo, }, data::subgraph::UnifiedMappingApiVersion, - prelude::{BlockHash, DataSourceTemplateInfo}, + prelude::{ + transaction_receipt::LightTransactionReceipt, BlockHash, ChainStore, + DataSourceTemplateInfo, StoreError, + }, }; use anyhow::{Error, Result}; use async_trait::async_trait; use serde::Deserialize; +use serde_json::Value; use slog::Logger; -use std::{collections::HashSet, convert::TryFrom, sync::Arc}; +use std::{ + collections::{BTreeMap, HashMap, HashSet}, + convert::TryFrom, + sync::Arc, +}; +use web3::types::H256; use super::{ block_stream::{self, BlockStream, FirehoseCursor}, client::ChainClient, - BlockIngestor, BlockTime, EmptyNodeCapabilities, HostFn, IngestorError, MappingTriggerTrait, - NoopDecoderHook, Trigger, TriggerFilterWrapper, TriggerWithHandler, + BlockIngestor, BlockTime, ChainIdentifier, EmptyNodeCapabilities, ExtendedBlockPtr, HostFn, + IngestorError, MappingTriggerTrait, NoopDecoderHook, Trigger, TriggerFilterWrapper, + TriggerWithHandler, }; use super::{ @@ -431,3 +441,105 @@ impl Blockchain for MockBlockchain { todo!() } } + +// Mock implementation +#[derive(Default)] +pub struct MockChainStore { + pub blocks: BTreeMap>, +} + +#[async_trait] +impl ChainStore for MockChainStore { + async fn block_ptrs_by_numbers( + self: Arc, + numbers: Vec, + ) -> Result>, Error> { + let mut result = BTreeMap::new(); + for num in numbers { + if let Some(blocks) = self.blocks.get(&num) { + result.insert(num, blocks.clone()); + } + } + Ok(result) + } + + // Implement other required methods with minimal implementations + fn genesis_block_ptr(&self) -> Result { + unimplemented!() + } + async fn upsert_block(&self, _block: Arc) -> Result<(), Error> { + unimplemented!() + } + fn upsert_light_blocks(&self, _blocks: &[&dyn Block]) -> Result<(), Error> { + unimplemented!() + } + async fn attempt_chain_head_update( + self: Arc, + _ancestor_count: BlockNumber, + ) -> Result, Error> { + unimplemented!() + } + async fn chain_head_ptr(self: Arc) -> Result, Error> { + unimplemented!() + } + fn chain_head_cursor(&self) -> Result, Error> { + unimplemented!() + } + async fn set_chain_head( + self: Arc, + _block: Arc, + _cursor: String, + ) -> Result<(), Error> { + unimplemented!() + } + async fn blocks(self: Arc, _hashes: Vec) -> Result, Error> { + unimplemented!() + } + async fn ancestor_block( + self: Arc, + _block_ptr: BlockPtr, + _offset: BlockNumber, + _root: Option, + ) -> Result, Error> { + unimplemented!() + } + fn cleanup_cached_blocks( + &self, + _ancestor_count: BlockNumber, + ) -> Result, Error> { + unimplemented!() + } + fn block_hashes_by_block_number(&self, _number: BlockNumber) -> Result, Error> { + unimplemented!() + } + fn confirm_block_hash(&self, _number: BlockNumber, _hash: &BlockHash) -> Result { + unimplemented!() + } + async fn block_number( + &self, + _hash: &BlockHash, + ) -> Result, Option)>, StoreError> { + unimplemented!() + } + async fn block_numbers( + &self, + _hashes: Vec, + ) -> Result, StoreError> { + unimplemented!() + } + async fn transaction_receipts_in_block( + &self, + _block_ptr: &H256, + ) -> Result, StoreError> { + unimplemented!() + } + async fn clear_call_cache(&self, _from: BlockNumber, _to: BlockNumber) -> Result<(), Error> { + unimplemented!() + } + fn chain_identifier(&self) -> Result { + unimplemented!() + } + fn set_chain_identifier(&self, _ident: &ChainIdentifier) -> Result<(), Error> { + unimplemented!() + } +} diff --git a/graph/src/blockchain/types.rs b/graph/src/blockchain/types.rs index 286215fb845..b2b802fbfac 100644 --- a/graph/src/blockchain/types.rs +++ b/graph/src/blockchain/types.rs @@ -355,6 +355,25 @@ where } } +fn deserialize_block_time<'de, D>(deserializer: D) -> Result +where + D: Deserializer<'de>, +{ + let value = String::deserialize(deserializer)?; + + if value.starts_with("0x") { + let hex_value = value.trim_start_matches("0x"); + + i64::from_str_radix(hex_value, 16) + .map(|secs| BlockTime::since_epoch(secs, 0)) + .map_err(serde::de::Error::custom) + } else { + value + .parse::() + .map(|secs| BlockTime::since_epoch(secs, 0)) + .map_err(serde::de::Error::custom) + } +} #[derive(Clone, PartialEq, Eq, Hash, Deserialize)] #[serde(rename_all = "camelCase")] pub struct ExtendedBlockPtr { @@ -362,7 +381,8 @@ pub struct ExtendedBlockPtr { #[serde(deserialize_with = "deserialize_block_number")] pub number: BlockNumber, pub parent_hash: BlockHash, - pub timestamp: U256, + #[serde(deserialize_with = "deserialize_block_time")] + pub timestamp: BlockTime, } impl ExtendedBlockPtr { @@ -370,7 +390,7 @@ impl ExtendedBlockPtr { hash: BlockHash, number: BlockNumber, parent_hash: BlockHash, - timestamp: U256, + timestamp: BlockTime, ) -> Self { Self { hash, @@ -464,7 +484,7 @@ impl TryFrom<(Option, Option, H256, U256)> for ExtendedBlockPtr { type Error = anyhow::Error; fn try_from(tuple: (Option, Option, H256, U256)) -> Result { - let (hash_opt, number_opt, parent_hash, timestamp) = tuple; + let (hash_opt, number_opt, parent_hash, timestamp_u256) = tuple; let hash = hash_opt.ok_or_else(|| anyhow!("Block hash is missing"))?; let number = number_opt @@ -474,11 +494,16 @@ impl TryFrom<(Option, Option, H256, U256)> for ExtendedBlockPtr { let block_number = i32::try_from(number).map_err(|_| anyhow!("Block number out of range"))?; + // Convert `U256` to `BlockTime` + let secs = + i64::try_from(timestamp_u256).map_err(|_| anyhow!("Timestamp out of range for i64"))?; + let block_time = BlockTime::since_epoch(secs, 0); + Ok(ExtendedBlockPtr { hash: hash.into(), number: block_number, parent_hash: parent_hash.into(), - timestamp, + timestamp: block_time, }) } } @@ -487,13 +512,18 @@ impl TryFrom<(H256, i32, H256, U256)> for ExtendedBlockPtr { type Error = anyhow::Error; fn try_from(tuple: (H256, i32, H256, U256)) -> Result { - let (hash, block_number, parent_hash, timestamp) = tuple; + let (hash, block_number, parent_hash, timestamp_u256) = tuple; + + // Convert `U256` to `BlockTime` + let secs = + i64::try_from(timestamp_u256).map_err(|_| anyhow!("Timestamp out of range for i64"))?; + let block_time = BlockTime::since_epoch(secs, 0); Ok(ExtendedBlockPtr { hash: hash.into(), number: block_number, parent_hash: parent_hash.into(), - timestamp, + timestamp: block_time, }) } } @@ -543,7 +573,9 @@ impl fmt::Display for ChainIdentifier { /// The timestamp associated with a block. This is used whenever a time /// needs to be connected to data within the block -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, FromSqlRow, AsExpression)] +#[derive( + Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord, FromSqlRow, AsExpression, Deserialize, +)] #[diesel(sql_type = Timestamptz)] pub struct BlockTime(Timestamp); @@ -625,6 +657,12 @@ impl FromSql for BlockTime { } } +impl fmt::Display for BlockTime { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}", self.0.as_microseconds_since_epoch()) + } +} + #[cfg(test)] mod tests { use super::*; @@ -654,8 +692,8 @@ mod tests { { "hash": "0x8186da3ec5590631ae7b9415ce58548cb98c7f1dc68c5ea1c519a3f0f6a25aac", "number": "0x2A", - "parentHash": "0xabc123", - "timestamp": "123456789012345678901234567890" + "parentHash": "0xd71699894d637632dea4d425396086edf033c1ff72b13753e8c4e67700e3eb8e", + "timestamp": "0x673b284f" } "#; @@ -665,6 +703,15 @@ mod tests { // Verify the deserialized values assert_eq!(block_ptr_ext.number, 42); // 0x2A in hex is 42 in decimal + assert_eq!( + block_ptr_ext.hash_hex(), + "8186da3ec5590631ae7b9415ce58548cb98c7f1dc68c5ea1c519a3f0f6a25aac" + ); + assert_eq!( + block_ptr_ext.parent_hash_hex(), + "d71699894d637632dea4d425396086edf033c1ff72b13753e8c4e67700e3eb8e" + ); + assert_eq!(block_ptr_ext.timestamp.0.as_secs_since_epoch(), 1731930191); } #[test] @@ -673,7 +720,7 @@ mod tests { { "hash": "0x8186da3ec5590631ae7b9415ce58548cb98c7f1dc68c5ea1c519a3f0f6a25aac", "number": "invalid_hex_string", - "parentHash": "0xabc123", + "parentHash": "0xd71699894d637632dea4d425396086edf033c1ff72b13753e8c4e67700e3eb8e", "timestamp": "123456789012345678901234567890" } "#; diff --git a/graph/src/firehose/endpoints.rs b/graph/src/firehose/endpoints.rs index 72d3f986c9c..45ff9c045ee 100644 --- a/graph/src/firehose/endpoints.rs +++ b/graph/src/firehose/endpoints.rs @@ -16,7 +16,7 @@ use async_trait::async_trait; use futures03::StreamExt; use http0::uri::{Scheme, Uri}; use itertools::Itertools; -use slog::Logger; +use slog::{error, info, Logger}; use std::{collections::HashMap, fmt::Display, ops::ControlFlow, sync::Arc, time::Duration}; use tokio::sync::OnceCell; use tonic::codegen::InterceptedService; @@ -359,6 +359,124 @@ impl FirehoseEndpoint { } } + pub async fn get_block_by_ptr( + &self, + ptr: &BlockPtr, + logger: &Logger, + ) -> Result + where + M: prost::Message + BlockchainBlock + Default + 'static, + { + debug!( + logger, + "Connecting to firehose to retrieve block for ptr {}", ptr; + "provider" => self.provider.as_str(), + ); + + let req = firehose::SingleBlockRequest { + transforms: [].to_vec(), + reference: Some( + firehose::single_block_request::Reference::BlockHashAndNumber( + firehose::single_block_request::BlockHashAndNumber { + hash: ptr.hash.to_string(), + num: ptr.number as u64, + }, + ), + ), + }; + + let mut client = self.new_fetch_client(); + match client.block(req).await { + Ok(v) => Ok(M::decode( + v.get_ref().block.as_ref().unwrap().value.as_ref(), + )?), + Err(e) => return Err(anyhow::format_err!("firehose error {}", e)), + } + } + + pub async fn get_block_by_number( + &self, + number: u64, + logger: &Logger, + ) -> Result + where + M: prost::Message + BlockchainBlock + Default + 'static, + { + debug!( + logger, + "Connecting to firehose to retrieve block for number {}", number; + "provider" => self.provider.as_str(), + ); + + let req = firehose::SingleBlockRequest { + transforms: [].to_vec(), + reference: Some(firehose::single_block_request::Reference::BlockNumber( + firehose::single_block_request::BlockNumber { num: number }, + )), + }; + + let mut client = self.new_fetch_client(); + match client.block(req).await { + Ok(v) => Ok(M::decode( + v.get_ref().block.as_ref().unwrap().value.as_ref(), + )?), + Err(e) => return Err(anyhow::format_err!("firehose error {}", e)), + } + } + + pub async fn load_blocks_by_numbers( + &self, + numbers: Vec, + logger: &Logger, + ) -> Result, anyhow::Error> + where + M: prost::Message + BlockchainBlock + Default + 'static, + { + let mut blocks = Vec::with_capacity(numbers.len()); + + for number in numbers { + debug!( + logger, + "Loading block for block number {}", number; + "provider" => self.provider.as_str(), + ); + + match self.get_block_by_number::(number, logger).await { + Ok(block) => { + blocks.push(block); + } + Err(e) => { + error!( + logger, + "Failed to load block number {}: {}", number, e; + "provider" => self.provider.as_str(), + ); + return Err(anyhow::format_err!( + "failed to load block number {}: {}", + number, + e + )); + } + } + } + + Ok(blocks) + } + + pub async fn genesis_block_ptr(&self, logger: &Logger) -> Result + where + M: prost::Message + BlockchainBlock + Default + 'static, + { + info!(logger, "Requesting genesis block from firehose"; + "provider" => self.provider.as_str()); + + // We use 0 here to mean the genesis block of the chain. Firehose + // when seeing start block number 0 will always return the genesis + // block of the chain, even if the chain's start block number is + // not starting at block #0. + self.block_ptr_for_number::(logger, 0).await + } + pub async fn block_ptr_for_number( &self, logger: &Logger,