diff --git a/chain/arweave/src/chain.rs b/chain/arweave/src/chain.rs index 8d40408a463..e897e10d8d8 100644 --- a/chain/arweave/src/chain.rs +++ b/chain/arweave/src/chain.rs @@ -3,15 +3,15 @@ use graph::blockchain::client::ChainClient; use graph::blockchain::firehose_block_ingestor::FirehoseBlockIngestor; use graph::blockchain::{ BasicBlockchainBuilder, Block, BlockIngestor, BlockchainBuilder, BlockchainKind, - EmptyNodeCapabilities, NoopDecoderHook, NoopRuntimeAdapter, + EmptyNodeCapabilities, NoopDecoderHook, NoopRuntimeAdapter, TriggerFilterWrapper, }; use graph::cheap_clone::CheapClone; use graph::components::adapter::ChainId; -use graph::components::store::DeploymentCursorTracker; +use graph::components::store::{DeploymentCursorTracker, WritableStore}; use graph::data::subgraph::UnifiedMappingApiVersion; use graph::env::EnvVars; use graph::firehose::FirehoseEndpoint; -use graph::prelude::MetricsRegistry; +use graph::prelude::{DeploymentHash, MetricsRegistry}; use graph::substreams::Clock; use graph::{ blockchain::{ @@ -27,11 +27,13 @@ use graph::{ prelude::{async_trait, o, BlockNumber, ChainStore, Error, Logger, LoggerFactory}, }; use prost::Message; +use std::collections::HashSet; use std::sync::Arc; use crate::adapter::TriggerFilter; use crate::data_source::{DataSourceTemplate, UnresolvedDataSourceTemplate}; use crate::trigger::{self, ArweaveTrigger}; +use crate::Block as ArweaveBlock; use crate::{ codec, data_source::{DataSource, UnresolvedDataSource}, @@ -119,7 +121,8 @@ impl Blockchain for Chain { deployment: DeploymentLocator, store: impl DeploymentCursorTracker, start_blocks: Vec, - filter: Arc, + _source_subgraph_stores: Vec<(DeploymentHash, Arc)>, + filter: Arc>, unified_api_version: UnifiedMappingApiVersion, ) -> Result>, Error> { let adapter = self @@ -135,7 +138,10 @@ impl Blockchain for Chain { .subgraph_logger(&deployment) .new(o!("component" => "FirehoseBlockStream")); - let firehose_mapper = Arc::new(FirehoseMapper { adapter, filter }); + let firehose_mapper = Arc::new(FirehoseMapper { + adapter, + filter: filter.chain_filter.clone(), + }); Ok(Box::new(FirehoseBlockStream::new( deployment.hash, @@ -199,6 +205,10 @@ impl TriggersAdapterTrait for TriggersAdapter { panic!("Should never be called since not used by FirehoseBlockStream") } + async fn chain_head_ptr(&self) -> Result, Error> { + unimplemented!() + } + async fn triggers_in_block( &self, logger: &Logger, @@ -258,6 +268,14 @@ impl TriggersAdapterTrait for TriggersAdapter { number: block.number.saturating_sub(1), })) } + + async fn load_blocks_by_numbers( + &self, + _logger: Logger, + _block_numbers: HashSet, + ) -> Result, Error> { + todo!() + } } pub struct FirehoseMapper { diff --git a/chain/cosmos/src/chain.rs b/chain/cosmos/src/chain.rs index 955aa7efc3c..83a299b6163 100644 --- a/chain/cosmos/src/chain.rs +++ b/chain/cosmos/src/chain.rs @@ -1,9 +1,10 @@ use graph::blockchain::firehose_block_ingestor::FirehoseBlockIngestor; -use graph::blockchain::{BlockIngestor, NoopDecoderHook}; +use graph::blockchain::{BlockIngestor, NoopDecoderHook, TriggerFilterWrapper}; use graph::components::adapter::ChainId; use graph::env::EnvVars; -use graph::prelude::MetricsRegistry; +use graph::prelude::{DeploymentHash, MetricsRegistry}; use graph::substreams::Clock; +use std::collections::HashSet; use std::convert::TryFrom; use std::sync::Arc; @@ -11,7 +12,7 @@ use graph::blockchain::block_stream::{BlockStreamError, BlockStreamMapper, Fireh use graph::blockchain::client::ChainClient; use graph::blockchain::{BasicBlockchainBuilder, BlockchainBuilder, NoopRuntimeAdapter}; use graph::cheap_clone::CheapClone; -use graph::components::store::DeploymentCursorTracker; +use graph::components::store::{DeploymentCursorTracker, WritableStore}; use graph::data::subgraph::UnifiedMappingApiVersion; use graph::{ blockchain::{ @@ -33,7 +34,7 @@ use crate::data_source::{ DataSource, DataSourceTemplate, EventOrigin, UnresolvedDataSource, UnresolvedDataSourceTemplate, }; use crate::trigger::CosmosTrigger; -use crate::{codec, TriggerFilter}; +use crate::{codec, Block, TriggerFilter}; pub struct Chain { logger_factory: LoggerFactory, @@ -113,7 +114,8 @@ impl Blockchain for Chain { deployment: DeploymentLocator, store: impl DeploymentCursorTracker, start_blocks: Vec, - filter: Arc, + _source_subgraph_stores: Vec<(DeploymentHash, Arc)>, + filter: Arc>, unified_api_version: UnifiedMappingApiVersion, ) -> Result>, Error> { let adapter = self @@ -129,7 +131,10 @@ impl Blockchain for Chain { .subgraph_logger(&deployment) .new(o!("component" => "FirehoseBlockStream")); - let firehose_mapper = Arc::new(FirehoseMapper { adapter, filter }); + let firehose_mapper = Arc::new(FirehoseMapper { + adapter, + filter: filter.chain_filter.clone(), + }); Ok(Box::new(FirehoseBlockStream::new( deployment.hash, @@ -193,6 +198,18 @@ impl TriggersAdapterTrait for TriggersAdapter { panic!("Should never be called since not used by FirehoseBlockStream") } + async fn load_blocks_by_numbers( + &self, + _logger: Logger, + _block_numbers: HashSet, + ) -> Result, Error> { + unimplemented!() + } + + async fn chain_head_ptr(&self) -> Result, Error> { + unimplemented!() + } + async fn scan_triggers( &self, _from: BlockNumber, @@ -467,9 +484,12 @@ impl FirehoseMapperTrait for FirehoseMapper { #[cfg(test)] mod test { - use graph::prelude::{ - slog::{o, Discard, Logger}, - tokio, + use graph::{ + blockchain::Trigger, + prelude::{ + slog::{o, Discard, Logger}, + tokio, + }, }; use super::*; @@ -600,7 +620,10 @@ mod test { // they may not be in the same order for trigger in expected_triggers { assert!( - triggers.trigger_data.contains(&trigger), + triggers.trigger_data.iter().any(|t| match t { + Trigger::Chain(t) => t == &trigger, + _ => false, + }), "Expected trigger list to contain {:?}, but it only contains: {:?}", trigger, triggers.trigger_data diff --git a/chain/ethereum/src/adapter.rs b/chain/ethereum/src/adapter.rs index f78ff1b0bec..3d4dc00c030 100644 --- a/chain/ethereum/src/adapter.rs +++ b/chain/ethereum/src/adapter.rs @@ -1109,6 +1109,13 @@ pub trait EthereumAdapter: Send + Sync + 'static { block_hash: H256, ) -> Box + Send>; + async fn load_blocks_by_numbers( + &self, + _logger: Logger, + _chain_store: Arc, + _block_numbers: HashSet, + ) -> Box, Error = Error> + Send>; + /// Load Ethereum blocks in bulk, returning results as they come back as a Stream. /// May use the `chain_store` as a cache. async fn load_blocks( diff --git a/chain/ethereum/src/chain.rs b/chain/ethereum/src/chain.rs index 1def8c483cc..9b8e71b0fe2 100644 --- a/chain/ethereum/src/chain.rs +++ b/chain/ethereum/src/chain.rs @@ -3,15 +3,16 @@ use anyhow::{Context, Error}; use graph::blockchain::client::ChainClient; use graph::blockchain::firehose_block_ingestor::{FirehoseBlockIngestor, Transforms}; use graph::blockchain::{ - BlockIngestor, BlockTime, BlockchainKind, ChainIdentifier, TriggersAdapterSelector, + BlockIngestor, BlockTime, BlockchainKind, ChainIdentifier, TriggerFilterWrapper, + TriggersAdapterSelector, }; use graph::components::adapter::ChainId; -use graph::components::store::DeploymentCursorTracker; +use graph::components::store::{DeploymentCursorTracker, WritableStore}; use graph::data::subgraph::UnifiedMappingApiVersion; use graph::firehose::{FirehoseEndpoint, ForkStep}; use graph::futures03::compat::Future01CompatExt; use graph::prelude::{ - BlockHash, ComponentLoggerConfig, ElasticComponentLoggerConfig, EthereumBlock, + BlockHash, ComponentLoggerConfig, DeploymentHash, ElasticComponentLoggerConfig, EthereumBlock, EthereumCallCache, LightEthereumBlock, LightEthereumBlockExt, MetricsRegistry, }; use graph::schema::InputSchema; @@ -61,6 +62,7 @@ use crate::{BufferedCallCache, NodeCapabilities}; use crate::{EthereumAdapter, RuntimeAdapter}; use graph::blockchain::block_stream::{ BlockStream, BlockStreamBuilder, BlockStreamError, BlockStreamMapper, FirehoseCursor, + TriggersAdapterWrapper, }; /// Celo Mainnet: 42220, Testnet Alfajores: 44787, Testnet Baklava: 62320 @@ -121,24 +123,50 @@ impl BlockStreamBuilder for EthereumStreamBuilder { unimplemented!() } + async fn build_subgraph_block_stream( + &self, + chain: &Chain, + deployment: DeploymentLocator, + start_blocks: Vec, + source_subgraph_stores: Vec<(DeploymentHash, Arc)>, + subgraph_current_block: Option, + filter: Arc>, + unified_api_version: UnifiedMappingApiVersion, + ) -> Result>> { + self.build_polling( + chain, + deployment, + start_blocks, + source_subgraph_stores, + subgraph_current_block, + filter, + unified_api_version, + ) + .await + } + async fn build_polling( &self, chain: &Chain, deployment: DeploymentLocator, start_blocks: Vec, + source_subgraph_stores: Vec<(DeploymentHash, Arc)>, subgraph_current_block: Option, - filter: Arc<::TriggerFilter>, + filter: Arc>, unified_api_version: UnifiedMappingApiVersion, ) -> Result>> { - let requirements = filter.node_capabilities(); - let adapter = chain - .triggers_adapter(&deployment, &requirements, unified_api_version.clone()) - .unwrap_or_else(|_| { - panic!( - "no adapter for network {} with capabilities {}", - chain.name, requirements - ) - }); + let requirements = filter.chain_filter.node_capabilities(); + let adapter = TriggersAdapterWrapper::new( + chain + .triggers_adapter(&deployment, &requirements, unified_api_version.clone()) + .unwrap_or_else(|_| { + panic!( + "no adapter for network {} with capabilities {}", + chain.name, requirements + ) + }), + source_subgraph_stores, + ); let logger = chain .logger_factory @@ -172,7 +200,7 @@ impl BlockStreamBuilder for EthereumStreamBuilder { Ok(Box::new(PollingBlockStream::new( chain_store, chain_head_update_stream, - adapter, + Arc::new(adapter), chain.node_id.clone(), deployment.hash, filter, @@ -409,10 +437,27 @@ impl Blockchain for Chain { deployment: DeploymentLocator, store: impl DeploymentCursorTracker, start_blocks: Vec, - filter: Arc, + source_subgraph_stores: Vec<(DeploymentHash, Arc)>, + filter: Arc>, unified_api_version: UnifiedMappingApiVersion, ) -> Result>, Error> { let current_ptr = store.block_ptr(); + + if !filter.subgraph_filter.is_empty() { + return self + .block_stream_builder + .build_subgraph_block_stream( + self, + deployment, + start_blocks, + source_subgraph_stores, + current_ptr, + filter, + unified_api_version, + ) + .await; + } + match self.chain_client().as_ref() { ChainClient::Rpc(_) => { self.block_stream_builder @@ -420,6 +465,7 @@ impl Blockchain for Chain { self, deployment, start_blocks, + source_subgraph_stores, current_ptr, filter, unified_api_version, @@ -434,7 +480,7 @@ impl Blockchain for Chain { store.firehose_cursor(), start_blocks, current_ptr, - filter, + filter.chain_filter.clone(), unified_api_version, ) .await @@ -689,6 +735,35 @@ impl TriggersAdapterTrait for TriggersAdapter { .await } + async fn load_blocks_by_numbers( + &self, + logger: Logger, + block_numbers: HashSet, + ) -> Result> { + use graph::futures01::stream::Stream; + + let adapter = self + .chain_client + .rpc()? + .cheapest_with(&self.capabilities) + .await?; + + let blocks = adapter + .load_blocks_by_numbers(logger, self.chain_store.clone(), block_numbers) + .await + .map(|block| BlockFinality::Final(block)) + .collect() + .compat() + .await?; + + Ok(blocks) + } + + async fn chain_head_ptr(&self) -> Result, Error> { + let chain_store = self.chain_store.clone(); + chain_store.chain_head_ptr().await + } + async fn triggers_in_block( &self, logger: &Logger, diff --git a/chain/ethereum/src/ethereum_adapter.rs b/chain/ethereum/src/ethereum_adapter.rs index c4ea6323c7d..9fe0b8262b2 100644 --- a/chain/ethereum/src/ethereum_adapter.rs +++ b/chain/ethereum/src/ethereum_adapter.rs @@ -2,6 +2,7 @@ use futures03::{future::BoxFuture, stream::FuturesUnordered}; use graph::blockchain::client::ChainClient; use graph::blockchain::BlockHash; use graph::blockchain::ChainIdentifier; + use graph::components::transaction_receipt::LightTransactionReceipt; use graph::data::store::ethereum::call; use graph::data::store::scalar; @@ -58,6 +59,7 @@ use crate::chain::BlockFinality; use crate::trigger::LogRef; use crate::Chain; use crate::NodeCapabilities; +use crate::TriggerFilter; use crate::{ adapter::{ ContractCall, ContractCallError, EthGetLogsFilter, EthereumAdapter as EthereumAdapterTrait, @@ -66,7 +68,7 @@ use crate::{ }, transport::Transport, trigger::{EthereumBlockTriggerType, EthereumTrigger}, - TriggerFilter, ENV_VARS, + ENV_VARS, }; #[derive(Debug, Clone)] @@ -1648,6 +1650,28 @@ impl EthereumAdapterTrait for EthereumAdapter { Ok(decoded) } + // This is a ugly temporary implementation to get the block ptrs for a range of blocks + async fn load_blocks_by_numbers( + &self, + logger: Logger, + chain_store: Arc, + block_numbers: HashSet, + ) -> Box, Error = Error> + Send> { + let block_hashes = block_numbers + .into_iter() + .map(|number| { + chain_store + .block_hashes_by_block_number(number) + .unwrap() + .first() + .unwrap() + .as_h256() + }) + .collect::>(); + + self.load_blocks(logger, chain_store, block_hashes).await + } + /// Load Ethereum blocks in bulk, returning results as they come back as a Stream. async fn load_blocks( &self, @@ -2077,8 +2101,8 @@ async fn filter_call_triggers_from_unsuccessful_transactions( let transaction_hashes: BTreeSet = block .trigger_data .iter() - .filter_map(|trigger| match trigger { - EthereumTrigger::Call(call_trigger) => Some(call_trigger.transaction_hash), + .filter_map(|trigger| match trigger.as_chain() { + Some(EthereumTrigger::Call(call_trigger)) => Some(call_trigger.transaction_hash), _ => None, }) .collect::>>() @@ -2169,7 +2193,7 @@ async fn filter_call_triggers_from_unsuccessful_transactions( // Filter call triggers from unsuccessful transactions block.trigger_data.retain(|trigger| { - if let EthereumTrigger::Call(call_trigger) = trigger { + if let Some(EthereumTrigger::Call(call_trigger)) = trigger.as_chain() { // Unwrap: We already checked that those values exist transaction_success[&call_trigger.transaction_hash.unwrap()] } else { diff --git a/chain/ethereum/src/tests.rs b/chain/ethereum/src/tests.rs index 455a7c07432..00873f8ea87 100644 --- a/chain/ethereum/src/tests.rs +++ b/chain/ethereum/src/tests.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use graph::{ - blockchain::{block_stream::BlockWithTriggers, BlockPtr}, + blockchain::{block_stream::BlockWithTriggers, BlockPtr, Trigger}, prelude::{ web3::types::{Address, Bytes, Log, H160, H256, U64}, EthereumCall, LightEthereumBlock, @@ -107,10 +107,12 @@ fn test_trigger_ordering() { &logger, ); - assert_eq!( - block_with_triggers.trigger_data, - vec![log1, log2, call1, log3, call2, call4, call3, block2, block1] - ); + let expected = vec![log1, log2, call1, log3, call2, call4, call3, block2, block1] + .into_iter() + .map(|t| Trigger::Chain(t)) + .collect::>(); + + assert_eq!(block_with_triggers.trigger_data, expected); } #[test] @@ -203,8 +205,10 @@ fn test_trigger_dedup() { &logger, ); - assert_eq!( - block_with_triggers.trigger_data, - vec![log1, log2, call1, log3, call2, call3, block2, block1] - ); + let expected = vec![log1, log2, call1, log3, call2, call3, block2, block1] + .into_iter() + .map(|t| Trigger::Chain(t)) + .collect::>(); + + assert_eq!(block_with_triggers.trigger_data, expected); } diff --git a/chain/near/src/chain.rs b/chain/near/src/chain.rs index 283552e7f33..44c0449adce 100644 --- a/chain/near/src/chain.rs +++ b/chain/near/src/chain.rs @@ -4,16 +4,16 @@ use graph::blockchain::firehose_block_ingestor::FirehoseBlockIngestor; use graph::blockchain::substreams_block_stream::SubstreamsBlockStream; use graph::blockchain::{ BasicBlockchainBuilder, BlockIngestor, BlockchainBuilder, BlockchainKind, NoopDecoderHook, - NoopRuntimeAdapter, + NoopRuntimeAdapter, Trigger, TriggerFilterWrapper, }; use graph::cheap_clone::CheapClone; use graph::components::adapter::ChainId; -use graph::components::store::DeploymentCursorTracker; +use graph::components::store::{DeploymentCursorTracker, WritableStore}; use graph::data::subgraph::UnifiedMappingApiVersion; use graph::env::EnvVars; use graph::firehose::FirehoseEndpoint; use graph::futures03::TryFutureExt; -use graph::prelude::MetricsRegistry; +use graph::prelude::{DeploymentHash, MetricsRegistry}; use graph::schema::InputSchema; use graph::substreams::{Clock, Package}; use graph::{ @@ -32,10 +32,12 @@ use graph::{ prelude::{async_trait, o, BlockNumber, ChainStore, Error, Logger, LoggerFactory}, }; use prost::Message; +use std::collections::HashSet; use std::sync::Arc; use crate::adapter::TriggerFilter; use crate::codec::substreams_triggers::BlockAndReceipts; +use crate::codec::Block; use crate::data_source::{DataSourceTemplate, UnresolvedDataSourceTemplate}; use crate::trigger::{self, NearTrigger}; use crate::{ @@ -108,7 +110,6 @@ impl BlockStreamBuilder for NearStreamBuilder { chain.metrics_registry.clone(), ))) } - async fn build_firehose( &self, chain: &Chain, @@ -151,8 +152,9 @@ impl BlockStreamBuilder for NearStreamBuilder { _chain: &Chain, _deployment: DeploymentLocator, _start_blocks: Vec, + _source_subgraph_stores: Vec<(DeploymentHash, Arc)>, _subgraph_current_block: Option, - _filter: Arc<::TriggerFilter>, + _filter: Arc>, _unified_api_version: UnifiedMappingApiVersion, ) -> Result>> { todo!() @@ -230,7 +232,8 @@ impl Blockchain for Chain { deployment: DeploymentLocator, store: impl DeploymentCursorTracker, start_blocks: Vec, - filter: Arc, + _source_subgraph_stores: Vec<(DeploymentHash, Arc)>, + filter: Arc>, unified_api_version: UnifiedMappingApiVersion, ) -> Result>, Error> { if self.prefer_substreams { @@ -242,7 +245,7 @@ impl Blockchain for Chain { deployment, store.firehose_cursor(), store.block_ptr(), - filter, + filter.chain_filter.clone(), ) .await; } @@ -254,7 +257,7 @@ impl Blockchain for Chain { store.firehose_cursor(), start_blocks, store.block_ptr(), - filter, + filter.chain_filter.clone(), unified_api_version, ) .await @@ -322,6 +325,18 @@ impl TriggersAdapterTrait for TriggersAdapter { panic!("Should never be called since not used by FirehoseBlockStream") } + async fn load_blocks_by_numbers( + &self, + _logger: Logger, + _block_numbers: HashSet, + ) -> Result> { + unimplemented!() + } + + async fn chain_head_ptr(&self) -> Result, Error> { + unimplemented!() + } + async fn triggers_in_block( &self, logger: &Logger, @@ -462,11 +477,13 @@ impl BlockStreamMapper for FirehoseMapper { .into_iter() .zip(receipt.into_iter()) .map(|(outcome, receipt)| { - NearTrigger::Receipt(Arc::new(trigger::ReceiptWithOutcome { - outcome, - receipt, - block: arc_block.clone(), - })) + Trigger::Chain(NearTrigger::Receipt(Arc::new( + trigger::ReceiptWithOutcome { + outcome, + receipt, + block: arc_block.clone(), + }, + ))) }) .collect(); @@ -973,8 +990,8 @@ mod test { .trigger_data .clone() .into_iter() - .filter_map(|x| match x { - crate::trigger::NearTrigger::Block(b) => b.header.clone().map(|x| x.height), + .filter_map(|x| match x.as_chain() { + Some(crate::trigger::NearTrigger::Block(b)) => b.header.clone().map(|x| x.height), _ => None, }) .collect() diff --git a/chain/starknet/src/chain.rs b/chain/starknet/src/chain.rs index cd10af5f965..b6ff51a9bcd 100644 --- a/chain/starknet/src/chain.rs +++ b/chain/starknet/src/chain.rs @@ -11,26 +11,26 @@ use graph::{ firehose_block_stream::FirehoseBlockStream, BasicBlockchainBuilder, Block, BlockIngestor, BlockPtr, Blockchain, BlockchainBuilder, BlockchainKind, EmptyNodeCapabilities, IngestorError, NoopDecoderHook, NoopRuntimeAdapter, - RuntimeAdapter as RuntimeAdapterTrait, + RuntimeAdapter as RuntimeAdapterTrait, TriggerFilterWrapper, }, cheap_clone::CheapClone, components::{ adapter::ChainId, - store::{DeploymentCursorTracker, DeploymentLocator}, + store::{DeploymentCursorTracker, DeploymentLocator, WritableStore}, }, data::subgraph::UnifiedMappingApiVersion, env::EnvVars, firehose::{self, FirehoseEndpoint, ForkStep}, futures03::future::TryFutureExt, prelude::{ - async_trait, BlockHash, BlockNumber, ChainStore, Error, Logger, LoggerFactory, - MetricsRegistry, + async_trait, BlockHash, BlockNumber, ChainStore, DeploymentHash, Error, Logger, + LoggerFactory, MetricsRegistry, }, schema::InputSchema, slog::o, }; use prost::Message; -use std::sync::Arc; +use std::{collections::HashSet, sync::Arc}; use crate::{ adapter::TriggerFilter, @@ -39,6 +39,7 @@ use crate::{ DataSource, DataSourceTemplate, UnresolvedDataSource, UnresolvedDataSourceTemplate, }, trigger::{StarknetBlockTrigger, StarknetEventTrigger, StarknetTrigger}, + Block as StarknetBlock, }; pub struct Chain { @@ -115,7 +116,8 @@ impl Blockchain for Chain { deployment: DeploymentLocator, store: impl DeploymentCursorTracker, start_blocks: Vec, - filter: Arc, + _source_subgraph_stores: Vec<(DeploymentHash, Arc)>, + filter: Arc>, unified_api_version: UnifiedMappingApiVersion, ) -> Result>, Error> { self.block_stream_builder @@ -125,7 +127,7 @@ impl Blockchain for Chain { store.firehose_cursor(), start_blocks, store.block_ptr(), - filter, + filter.chain_filter.clone(), unified_api_version, ) .await @@ -238,8 +240,9 @@ impl BlockStreamBuilder for StarknetStreamBuilder { _chain: &Chain, _deployment: DeploymentLocator, _start_blocks: Vec, + _source_subgraph_stores: Vec<(DeploymentHash, Arc)>, _subgraph_current_block: Option, - _filter: Arc, + _filter: Arc>, _unified_api_version: UnifiedMappingApiVersion, ) -> Result>> { panic!("StarkNet does not support polling block stream") @@ -369,6 +372,18 @@ impl TriggersAdapterTrait for TriggersAdapter { panic!("Should never be called since FirehoseBlockStream cannot resolve it") } + async fn load_blocks_by_numbers( + &self, + _logger: Logger, + _block_numbers: HashSet, + ) -> Result, Error> { + unimplemented!() + } + + async fn chain_head_ptr(&self) -> Result, Error> { + unimplemented!() + } + // Returns a sequence of blocks in increasing order of block number. // Each block will include all of its triggers that match the given `filter`. // The sequence may omit blocks that contain no triggers, @@ -379,7 +394,7 @@ impl TriggersAdapterTrait for TriggersAdapter { &self, _from: BlockNumber, _to: BlockNumber, - _filter: &crate::adapter::TriggerFilter, + _filter: &TriggerFilter, ) -> Result<(Vec>, BlockNumber), Error> { panic!("Should never be called since not used by FirehoseBlockStream") } diff --git a/chain/substreams/src/block_stream.rs b/chain/substreams/src/block_stream.rs index 8844df0610e..3687bab0b91 100644 --- a/chain/substreams/src/block_stream.rs +++ b/chain/substreams/src/block_stream.rs @@ -7,11 +7,11 @@ use graph::{ BlockStream, BlockStreamBuilder as BlockStreamBuilderTrait, FirehoseCursor, }, substreams_block_stream::SubstreamsBlockStream, - Blockchain, + Blockchain, TriggerFilterWrapper, }, - components::store::DeploymentLocator, + components::store::{DeploymentLocator, WritableStore}, data::subgraph::UnifiedMappingApiVersion, - prelude::{async_trait, BlockNumber, BlockPtr}, + prelude::{async_trait, BlockNumber, BlockPtr, DeploymentHash}, schema::InputSchema, slog::o, }; @@ -104,8 +104,9 @@ impl BlockStreamBuilderTrait for BlockStreamBuilder { _chain: &Chain, _deployment: DeploymentLocator, _start_blocks: Vec, + _source_subgraph_stores: Vec<(DeploymentHash, Arc)>, _subgraph_current_block: Option, - _filter: Arc, + _filter: Arc>, _unified_api_version: UnifiedMappingApiVersion, ) -> Result>> { unimplemented!("polling block stream is not support for substreams") diff --git a/chain/substreams/src/chain.rs b/chain/substreams/src/chain.rs index 28ef4bdc38b..38ef49bdb5d 100644 --- a/chain/substreams/src/chain.rs +++ b/chain/substreams/src/chain.rs @@ -4,12 +4,14 @@ use anyhow::Error; use graph::blockchain::client::ChainClient; use graph::blockchain::{ BasicBlockchainBuilder, BlockIngestor, BlockTime, EmptyNodeCapabilities, NoopDecoderHook, - NoopRuntimeAdapter, + NoopRuntimeAdapter, TriggerFilterWrapper, }; use graph::components::adapter::ChainId; -use graph::components::store::DeploymentCursorTracker; +use graph::components::store::{DeploymentCursorTracker, WritableStore}; use graph::env::EnvVars; -use graph::prelude::{BlockHash, CheapClone, Entity, LoggerFactory, MetricsRegistry}; +use graph::prelude::{ + BlockHash, CheapClone, DeploymentHash, Entity, LoggerFactory, MetricsRegistry, +}; use graph::schema::EntityKey; use graph::{ blockchain::{ @@ -140,7 +142,8 @@ impl Blockchain for Chain { deployment: DeploymentLocator, store: impl DeploymentCursorTracker, _start_blocks: Vec, - filter: Arc, + _source_subgraph_stores: Vec<(DeploymentHash, Arc)>, + filter: Arc>, _unified_api_version: UnifiedMappingApiVersion, ) -> Result>, Error> { self.block_stream_builder @@ -150,7 +153,7 @@ impl Blockchain for Chain { deployment, store.firehose_cursor(), store.block_ptr(), - filter, + filter.chain_filter.clone(), ) .await } diff --git a/chain/substreams/src/trigger.rs b/chain/substreams/src/trigger.rs index 2b47e4e57b8..db4034cd55c 100644 --- a/chain/substreams/src/trigger.rs +++ b/chain/substreams/src/trigger.rs @@ -16,7 +16,7 @@ use graph::{ }; use graph_runtime_wasm::module::ToAscPtr; use lazy_static::__Deref; -use std::sync::Arc; +use std::{collections::HashSet, sync::Arc}; use crate::{Block, Chain, NoopDataSourceTemplate, ParsedChanges}; @@ -136,6 +136,18 @@ impl blockchain::TriggersAdapter for TriggersAdapter { unimplemented!() } + async fn load_blocks_by_numbers( + &self, + _logger: Logger, + _block_numbers: HashSet, + ) -> Result, Error> { + unimplemented!() + } + + async fn chain_head_ptr(&self) -> Result, Error> { + unimplemented!() + } + async fn scan_triggers( &self, _from: BlockNumber, diff --git a/core/src/subgraph/context/instance/hosts.rs b/core/src/subgraph/context/instance/hosts.rs index 73701fadb29..f9a774246c8 100644 --- a/core/src/subgraph/context/instance/hosts.rs +++ b/core/src/subgraph/context/instance/hosts.rs @@ -57,7 +57,7 @@ impl> OnchainHosts { } pub fn push(&mut self, host: Arc) { - assert!(host.data_source().as_onchain().is_some()); + assert!(host.data_source().is_chain_based()); self.hosts.push(host.cheap_clone()); let idx = self.hosts.len() - 1; diff --git a/core/src/subgraph/context/instance/mod.rs b/core/src/subgraph/context/instance/mod.rs index ed242836a28..6436ea1a56e 100644 --- a/core/src/subgraph/context/instance/mod.rs +++ b/core/src/subgraph/context/instance/mod.rs @@ -29,6 +29,9 @@ pub(crate) struct SubgraphInstance> { /// will return the onchain hosts in the same order as they were inserted. onchain_hosts: OnchainHosts, + // TODO(krishna): Describe subgraph_hosts + subgraph_hosts: OnchainHosts, + offchain_hosts: OffchainHosts, /// Maps the hash of a module to a channel to the thread in which the module is instantiated. @@ -79,6 +82,7 @@ where network, static_data_sources: Arc::new(manifest.data_sources), onchain_hosts: OnchainHosts::new(), + subgraph_hosts: OnchainHosts::new(), offchain_hosts: OffchainHosts::new(), module_cache: HashMap::new(), templates, @@ -138,34 +142,44 @@ where ); } - let is_onchain = data_source.is_onchain(); let Some(host) = self.new_host(logger.clone(), data_source)? else { return Ok(None); }; // Check for duplicates and add the host. - if is_onchain { - // `onchain_hosts` will remain ordered by the creation block. - // See also 8f1bca33-d3b7-4035-affc-fd6161a12448. - ensure!( - self.onchain_hosts - .last() - .and_then(|h| h.creation_block_number()) - <= host.data_source().creation_block(), - ); + match host.data_source() { + DataSource::Onchain(_) => { + // `onchain_hosts` will remain ordered by the creation block. + // See also 8f1bca33-d3b7-4035-affc-fd6161a12448. + ensure!( + self.onchain_hosts + .last() + .and_then(|h| h.creation_block_number()) + <= host.data_source().creation_block(), + ); - if self.onchain_hosts.contains(&host) { - Ok(None) - } else { - self.onchain_hosts.push(host.cheap_clone()); - Ok(Some(host)) + if self.onchain_hosts.contains(&host) { + Ok(None) + } else { + self.onchain_hosts.push(host.cheap_clone()); + Ok(Some(host)) + } } - } else { - if self.offchain_hosts.contains(&host) { - Ok(None) - } else { - self.offchain_hosts.push(host.cheap_clone()); - Ok(Some(host)) + DataSource::Offchain(_) => { + if self.offchain_hosts.contains(&host) { + Ok(None) + } else { + self.offchain_hosts.push(host.cheap_clone()); + Ok(Some(host)) + } + } + DataSource::Subgraph(_) => { + if self.subgraph_hosts.contains(&host) { + Ok(None) + } else { + self.subgraph_hosts.push(host.cheap_clone()); + Ok(Some(host)) + } } } } @@ -226,6 +240,9 @@ where TriggerData::Offchain(trigger) => self .offchain_hosts .matches_by_address(trigger.source.address().as_ref().map(|a| a.as_slice())), + TriggerData::Subgraph(trigger) => self + .subgraph_hosts + .matches_by_address(Some(trigger.source.to_bytes().as_slice())), } } diff --git a/core/src/subgraph/context/mod.rs b/core/src/subgraph/context/mod.rs index 6ffc5a5aa12..7b7686a04fb 100644 --- a/core/src/subgraph/context/mod.rs +++ b/core/src/subgraph/context/mod.rs @@ -6,7 +6,7 @@ use crate::polling_monitor::{ use anyhow::{self, Error}; use bytes::Bytes; use graph::{ - blockchain::{BlockTime, Blockchain}, + blockchain::{BlockTime, Blockchain, TriggerFilterWrapper}, components::{ store::{DeploymentId, SubgraphFork}, subgraph::{HostMetrics, MappingError, RuntimeHost as _, SharedProofOfIndexing}, @@ -73,7 +73,7 @@ where pub(crate) instance: SubgraphInstance, pub instances: SubgraphKeepAlive, pub offchain_monitor: OffchainMonitor, - pub filter: Option, + pub filter: Option>, pub(crate) trigger_processor: Box>, pub(crate) decoder: Box>, } diff --git a/core/src/subgraph/inputs.rs b/core/src/subgraph/inputs.rs index b2e95c753f5..386b1a5e35b 100644 --- a/core/src/subgraph/inputs.rs +++ b/core/src/subgraph/inputs.rs @@ -1,12 +1,12 @@ use graph::{ - blockchain::{Blockchain, TriggersAdapter}, + blockchain::{block_stream::TriggersAdapterWrapper, Blockchain}, components::{ store::{DeploymentLocator, SubgraphFork, WritableStore}, subgraph::ProofOfIndexingVersion, }, data::subgraph::{SubgraphFeature, UnifiedMappingApiVersion}, data_source::DataSourceTemplate, - prelude::BlockNumber, + prelude::{BlockNumber, DeploymentHash}, }; use std::collections::BTreeSet; use std::sync::Arc; @@ -16,10 +16,11 @@ pub struct IndexingInputs { pub features: BTreeSet, pub start_blocks: Vec, pub end_blocks: BTreeSet, + pub source_subgraph_stores: Vec<(DeploymentHash, Arc)>, pub stop_block: Option, pub store: Arc, pub debug_fork: Option>, - pub triggers_adapter: Arc>, + pub triggers_adapter: Arc>, pub chain: Arc, pub templates: Arc>>, pub unified_api_version: UnifiedMappingApiVersion, @@ -39,6 +40,7 @@ impl IndexingInputs { features, start_blocks, end_blocks, + source_subgraph_stores, stop_block, store: _, debug_fork, @@ -56,6 +58,7 @@ impl IndexingInputs { features: features.clone(), start_blocks: start_blocks.clone(), end_blocks: end_blocks.clone(), + source_subgraph_stores: source_subgraph_stores.clone(), stop_block: stop_block.clone(), store, debug_fork: debug_fork.clone(), diff --git a/core/src/subgraph/instance_manager.rs b/core/src/subgraph/instance_manager.rs index c98641539d9..20ad464339f 100644 --- a/core/src/subgraph/instance_manager.rs +++ b/core/src/subgraph/instance_manager.rs @@ -6,15 +6,17 @@ use crate::subgraph::Decoder; use std::collections::BTreeSet; use crate::subgraph::runner::SubgraphRunner; -use graph::blockchain::block_stream::BlockStreamMetrics; +use graph::blockchain::block_stream::{BlockStreamMetrics, TriggersAdapterWrapper}; use graph::blockchain::{Blockchain, BlockchainKind, DataSource, NodeCapabilities}; use graph::components::metrics::gas::GasMetrics; +use graph::components::store::WritableStore; use graph::components::subgraph::ProofOfIndexingVersion; use graph::data::subgraph::{UnresolvedSubgraphManifest, SPEC_VERSION_0_0_6}; use graph::data::value::Word; use graph::data_source::causality_region::CausalityRegionSeq; use graph::env::EnvVars; use graph::prelude::{SubgraphInstanceManager as SubgraphInstanceManagerTrait, *}; +use graph::semver::Version; use graph::{blockchain::BlockchainMap, components::store::DeploymentLocator}; use graph_runtime_wasm::module::ToAscPtr; use graph_runtime_wasm::RuntimeHostBuilder; @@ -202,6 +204,52 @@ impl SubgraphInstanceManager { } } + pub async fn hashes_to_writable_store( + &self, + logger: &Logger, + link_resolver: &Arc, + hashes: Vec, + max_spec_version: Version, + is_runner_test: bool, + ) -> anyhow::Result)>> { + let mut writable_stores = Vec::new(); + let subgraph_store = self.subgraph_store.clone(); + + if is_runner_test { + return Ok(writable_stores); + } + + for hash in hashes { + let file_bytes = link_resolver + .cat(logger, &hash.to_ipfs_link()) + .await + .map_err(SubgraphAssignmentProviderError::ResolveError)?; + let raw: serde_yaml::Mapping = serde_yaml::from_slice(&file_bytes) + .map_err(|e| SubgraphAssignmentProviderError::ResolveError(e.into()))?; + let manifest = UnresolvedSubgraphManifest::::parse(hash.cheap_clone(), raw)?; + let manifest = manifest + .resolve(&link_resolver, &logger, max_spec_version.clone()) + .await?; + + let loc = subgraph_store + .active_locator(&hash)? + .ok_or_else(|| anyhow!("no active deployment for hash {}", hash))?; + + let writable_store = subgraph_store + .clone() // Clone the Arc again for each iteration + .writable( + logger.clone(), + loc.id.clone(), + Arc::new(manifest.template_idx_and_name().collect()), + ) + .await?; + + writable_stores.push((loc.hash, writable_store)); + } + + Ok(writable_stores) + } + pub async fn build_subgraph_runner( &self, logger: Logger, @@ -211,6 +259,26 @@ impl SubgraphInstanceManager { stop_block: Option, tp: Box>>, ) -> anyhow::Result>> + where + C: Blockchain, + ::MappingTrigger: ToAscPtr, + { + self.build_subgraph_runner_inner( + logger, env_vars, deployment, manifest, stop_block, tp, false, + ) + .await + } + + pub async fn build_subgraph_runner_inner( + &self, + logger: Logger, + env_vars: Arc, + deployment: DeploymentLocator, + manifest: serde_yaml::Mapping, + stop_block: Option, + tp: Box>>, + is_runner_test: bool, + ) -> anyhow::Result>> where C: Blockchain, ::MappingTrigger: ToAscPtr, @@ -307,6 +375,16 @@ impl SubgraphInstanceManager { .filter_map(|d| d.as_onchain().cloned()) .collect::>(); + let subgraph_data_sources = data_sources + .iter() + .filter_map(|d| d.as_subgraph()) + .collect::>(); + + let subgraph_ds_source_deployments = subgraph_data_sources + .iter() + .map(|d| d.source.address()) + .collect::>(); + let required_capabilities = C::NodeCapabilities::from_data_sources(&onchain_data_sources); let network: Word = manifest.network_name().into(); @@ -318,7 +396,7 @@ impl SubgraphInstanceManager { let start_blocks: Vec = data_sources .iter() - .filter_map(|d| d.as_onchain().map(|d: &C::DataSource| d.start_block())) + .filter_map(|d| d.start_block()) .collect(); let end_blocks: BTreeSet = manifest @@ -413,11 +491,27 @@ impl SubgraphInstanceManager { let decoder = Box::new(Decoder::new(decoder_hook)); + let subgraph_data_source_writables = self + .hashes_to_writable_store::( + &logger, + &link_resolver, + subgraph_ds_source_deployments, + manifest.spec_version.clone(), + is_runner_test, + ) + .await?; + + let triggers_adapter = Arc::new(TriggersAdapterWrapper::new( + triggers_adapter, + subgraph_data_source_writables.clone(), + )); + let inputs = IndexingInputs { deployment: deployment.clone(), features, start_blocks, end_blocks, + source_subgraph_stores: subgraph_data_source_writables, stop_block, store, debug_fork, diff --git a/core/src/subgraph/registrar.rs b/core/src/subgraph/registrar.rs index fe80d118457..e8ad83315e0 100644 --- a/core/src/subgraph/registrar.rs +++ b/core/src/subgraph/registrar.rs @@ -629,7 +629,6 @@ async fn create_subgraph_version( ) .map_err(SubgraphRegistrarError::ResolveError) .await?; - // Determine if the graft_base should be validated. // Validate the graft_base if there is a pending graft, ensuring its presence. // If the subgraph is new (indicated by DeploymentNotFound), the graft_base should be validated. diff --git a/core/src/subgraph/runner.rs b/core/src/subgraph/runner.rs index cd341ce2f99..9712a9095fe 100644 --- a/core/src/subgraph/runner.rs +++ b/core/src/subgraph/runner.rs @@ -7,7 +7,10 @@ use atomic_refcell::AtomicRefCell; use graph::blockchain::block_stream::{ BlockStreamError, BlockStreamEvent, BlockWithTriggers, FirehoseCursor, }; -use graph::blockchain::{Block, BlockTime, Blockchain, DataSource as _, TriggerFilter as _}; +use graph::blockchain::{ + Block, BlockTime, Blockchain, DataSource as _, SubgraphFilter, Trigger, TriggerFilter as _, + TriggerFilterWrapper, +}; use graph::components::store::{EmptyStore, GetScope, ReadStore, StoredDynamicDataSource}; use graph::components::subgraph::InstanceDSTemplate; use graph::components::{ @@ -116,7 +119,7 @@ where self.inputs.static_filters || self.ctx.hosts_len() > ENV_VARS.static_filters_threshold } - fn build_filter(&self) -> C::TriggerFilter { + fn build_filter(&self) -> TriggerFilterWrapper { let current_ptr = self.inputs.store.block_ptr(); let static_filters = self.is_static_filters_enabled(); @@ -128,10 +131,30 @@ where None => true, }; + let data_sources = self.ctx.static_data_sources(); + + let subgraph_filter = data_sources + .iter() + .filter_map(|ds| ds.as_subgraph()) + .map(|ds| SubgraphFilter { + subgraph: ds.source.address(), + start_block: ds.source.start_block, + entities: ds + .mapping + .handlers + .iter() + .map(|handler| handler.entity.clone()) + .collect(), + }) + .collect::>(); + // if static_filters is not enabled we just stick to the filter based on all the data sources. if !static_filters { - return C::TriggerFilter::from_data_sources( - self.ctx.onchain_data_sources().filter(end_block_filter), + return TriggerFilterWrapper::new( + C::TriggerFilter::from_data_sources( + self.ctx.onchain_data_sources().filter(end_block_filter), + ), + subgraph_filter, ); } @@ -158,11 +181,11 @@ where filter.extend_with_template(templates.iter().filter_map(|ds| ds.as_onchain()).cloned()); - filter + TriggerFilterWrapper::new(filter, subgraph_filter) } #[cfg(debug_assertions)] - pub fn build_filter_for_test(&self) -> C::TriggerFilter { + pub fn build_filter_for_test(&self) -> TriggerFilterWrapper { self.build_filter() } @@ -209,7 +232,7 @@ where let mut block_stream = new_block_stream( &self.inputs, - self.ctx.filter.as_ref().unwrap(), // Safe to unwrap as we just called `build_filter` in the previous line + self.ctx.filter.clone().unwrap(), // Safe to unwrap as we just called `build_filter` in the previous line &self.metrics.subgraph, ) .await? @@ -323,7 +346,10 @@ where .match_and_decode_many( &logger, &block, - triggers.into_iter().map(TriggerData::Onchain), + triggers.into_iter().map(|t| match t { + Trigger::Chain(t) => TriggerData::Onchain(t), + Trigger::Subgraph(t) => TriggerData::Subgraph(t), + }), hosts_filter, &self.metrics.subgraph, ) @@ -482,7 +508,10 @@ where .match_and_decode_many( &logger, &block, - triggers.into_iter().map(TriggerData::Onchain), + triggers.into_iter().map(|t| match t { + Trigger::Chain(t) => TriggerData::Onchain(t), + Trigger::Subgraph(_) => unreachable!(), // TODO(krishna): Re-evaulate this + }), |_| Box::new(runtime_hosts.iter().map(Arc::as_ref)), &self.metrics.subgraph, ) diff --git a/core/src/subgraph/stream.rs b/core/src/subgraph/stream.rs index c1d767e3fcf..5547543f13d 100644 --- a/core/src/subgraph/stream.rs +++ b/core/src/subgraph/stream.rs @@ -1,13 +1,13 @@ use crate::subgraph::inputs::IndexingInputs; use anyhow::bail; use graph::blockchain::block_stream::{BlockStream, BufferedBlockStream}; -use graph::blockchain::Blockchain; +use graph::blockchain::{Blockchain, TriggerFilterWrapper}; use graph::prelude::{CheapClone, Error, SubgraphInstanceMetrics}; use std::sync::Arc; pub async fn new_block_stream( inputs: &IndexingInputs, - filter: &C::TriggerFilter, + filter: TriggerFilterWrapper, metrics: &SubgraphInstanceMetrics, ) -> Result>, Error> { let is_firehose = inputs.chain.chain_client().is_firehose(); @@ -18,6 +18,7 @@ pub async fn new_block_stream( inputs.deployment.clone(), inputs.store.cheap_clone(), inputs.start_blocks.clone(), + inputs.source_subgraph_stores.clone(), Arc::new(filter.clone()), inputs.unified_api_version.clone(), ) diff --git a/graph/src/blockchain/block_stream.rs b/graph/src/blockchain/block_stream.rs index 25a923dd502..3c5f08851f2 100644 --- a/graph/src/blockchain/block_stream.rs +++ b/graph/src/blockchain/block_stream.rs @@ -1,3 +1,5 @@ +use crate::data::store::scalar; +use crate::data_source::subgraph; use crate::substreams::Clock; use crate::substreams_rpc::response::Message as SubstreamsMessage; use crate::substreams_rpc::BlockScopedData; @@ -5,6 +7,7 @@ use anyhow::Error; use async_stream::stream; use futures03::Stream; use prost_types::Any; +use std::collections::HashSet; use std::fmt; use std::sync::Arc; use std::time::Instant; @@ -12,9 +15,11 @@ use thiserror::Error; use tokio::sync::mpsc::{self, Receiver, Sender}; use super::substreams_block_stream::SubstreamsLogData; -use super::{Block, BlockPtr, BlockTime, Blockchain}; +use super::{ + Block, BlockPtr, BlockTime, Blockchain, SubgraphFilter, Trigger, TriggerFilterWrapper, +}; use crate::anyhow::Result; -use crate::components::store::{BlockNumber, DeploymentLocator}; +use crate::components::store::{BlockNumber, DeploymentLocator, WritableStore}; use crate::data::subgraph::UnifiedMappingApiVersion; use crate::firehose::{self, FirehoseEndpoint}; use crate::futures03::stream::StreamExt as _; @@ -144,10 +149,33 @@ pub trait BlockStreamBuilder: Send + Sync { chain: &C, deployment: DeploymentLocator, start_blocks: Vec, + source_subgraph_stores: Vec<(DeploymentHash, Arc)>, subgraph_current_block: Option, - filter: Arc, + filter: Arc>, unified_api_version: UnifiedMappingApiVersion, ) -> Result>>; + + async fn build_subgraph_block_stream( + &self, + chain: &C, + deployment: DeploymentLocator, + start_blocks: Vec, + source_subgraph_stores: Vec<(DeploymentHash, Arc)>, + subgraph_current_block: Option, + filter: Arc>, + unified_api_version: UnifiedMappingApiVersion, + ) -> Result>> { + self.build_polling( + chain, + deployment, + start_blocks, + source_subgraph_stores, + subgraph_current_block, + filter, + unified_api_version, + ) + .await + } } #[derive(Debug, Clone)] @@ -198,7 +226,7 @@ impl AsRef> for FirehoseCursor { #[derive(Debug)] pub struct BlockWithTriggers { pub block: C::Block, - pub trigger_data: Vec, + pub trigger_data: Vec>, } impl Clone for BlockWithTriggers @@ -216,7 +244,31 @@ where impl BlockWithTriggers { /// Creates a BlockWithTriggers structure, which holds /// the trigger data ordered and without any duplicates. - pub fn new(block: C::Block, mut trigger_data: Vec, logger: &Logger) -> Self { + pub fn new(block: C::Block, trigger_data: Vec, logger: &Logger) -> Self { + Self::new_with_triggers( + block, + trigger_data.into_iter().map(Trigger::Chain).collect(), + logger, + ) + } + + pub fn new_with_subgraph_triggers( + block: C::Block, + trigger_data: Vec, + logger: &Logger, + ) -> Self { + Self::new_with_triggers( + block, + trigger_data.into_iter().map(Trigger::Subgraph).collect(), + logger, + ) + } + + fn new_with_triggers( + block: C::Block, + mut trigger_data: Vec>, + logger: &Logger, + ) -> Self { // This is where triggers get sorted. trigger_data.sort(); @@ -256,6 +308,144 @@ impl BlockWithTriggers { pub fn parent_ptr(&self) -> Option { self.block.parent_ptr() } + + pub fn extend_triggers(&mut self, triggers: Vec>) { + self.trigger_data.extend(triggers); + self.trigger_data.sort(); + } +} + +pub struct TriggersAdapterWrapper { + pub adapter: Arc>, + pub source_subgraph_stores: Vec<(DeploymentHash, Arc)>, +} + +impl TriggersAdapterWrapper { + pub fn new( + adapter: Arc>, + source_subgraph_stores: Vec<(DeploymentHash, Arc)>, + ) -> Self { + Self { + adapter, + source_subgraph_stores, + } + } +} + +impl TriggersAdapterWrapper { + pub async fn ancestor_block( + &self, + ptr: BlockPtr, + offset: BlockNumber, + root: Option, + ) -> Result, Error> { + self.adapter.ancestor_block(ptr, offset, root).await + } + + // TODO: Do a proper implementation, this is a complete mock implementation + pub async fn scan_triggers( + &self, + from: BlockNumber, + to: BlockNumber, + filter: &Arc>, + ) -> Result<(Vec>, BlockNumber), Error> { + if !filter.subgraph_filter.is_empty() { + return self + .subgraph_triggers(Logger::root(slog::Discard, o!()), from, to, filter) + .await; + } + + self.adapter + .scan_triggers(from, to, &filter.chain_filter) + .await + } + + pub async fn triggers_in_block( + &self, + logger: &Logger, + block: C::Block, + filter: &C::TriggerFilter, + ) -> Result, Error> { + self.adapter.triggers_in_block(logger, block, filter).await + } + + pub async fn is_on_main_chain(&self, ptr: BlockPtr) -> Result { + self.adapter.is_on_main_chain(ptr).await + } + + pub async fn parent_ptr(&self, block: &BlockPtr) -> Result, Error> { + self.adapter.parent_ptr(block).await + } + + pub async fn chain_head_ptr(&self) -> Result, Error> { + self.adapter.chain_head_ptr().await + } + + // TODO(krishna): Currently this is a mock implementation of subgraph triggers. + // This will be replaced with the actual implementation which will use the filters to + // query the database of the source subgraph and return the entity triggers. + async fn subgraph_triggers( + &self, + logger: Logger, + from: BlockNumber, + to: BlockNumber, + filter: &Arc>, + ) -> Result<(Vec>, BlockNumber), Error> { + let logger2 = logger.cheap_clone(); + let adapter = self.adapter.clone(); + // let to_ptr = eth.next_existing_ptr_to_number(&logger, to).await?; + // let to = to_ptr.block_number(); + + let first_filter = filter.subgraph_filter.first().unwrap(); + + let blocks = adapter + .load_blocks_by_numbers(logger, HashSet::from_iter(from..=to)) + .await? + .into_iter() + .map(|block| { + let trigger_data = vec![Self::create_mock_subgraph_trigger(first_filter, &block)]; + BlockWithTriggers::new_with_subgraph_triggers(block, trigger_data, &logger2) + }) + .collect(); + + Ok((blocks, to)) + } + + fn create_mock_subgraph_trigger( + filter: &SubgraphFilter, + block: &C::Block, + ) -> subgraph::TriggerData { + let mock_entity = Self::create_mock_entity(block); + subgraph::TriggerData { + source: filter.subgraph.clone(), + entity: mock_entity, + entity_type: filter.entities.first().unwrap().clone(), + } + } + + fn create_mock_entity(block: &C::Block) -> Entity { + let id = DeploymentHash::new("test").unwrap(); + let data_schema = InputSchema::parse_latest( + "type Block @entity { id: Bytes!, number: BigInt!, hash: Bytes! }", + id.clone(), + ) + .unwrap(); + + let block = block.ptr(); + let hash = Value::Bytes(scalar::Bytes::from(block.hash_slice().to_vec())); + let data = data_schema + .make_entity(vec![ + ("id".into(), hash.clone()), + ( + "number".into(), + Value::BigInt(scalar::BigInt::from(block.block_number())), + ), + ("hash".into(), hash), + ]) + .unwrap(); + + data + } } #[async_trait] @@ -298,6 +488,15 @@ pub trait TriggersAdapter: Send + Sync { /// Get pointer to parent of `block`. This is called when reverting `block`. async fn parent_ptr(&self, block: &BlockPtr) -> Result, Error>; + + /// Get pointer to parent of `block`. This is called when reverting `block`. + async fn chain_head_ptr(&self) -> Result, Error>; + + async fn load_blocks_by_numbers( + &self, + logger: Logger, + block_numbers: HashSet, + ) -> Result>; } #[async_trait] diff --git a/graph/src/blockchain/mock.rs b/graph/src/blockchain/mock.rs index c89eca95727..21042921cb6 100644 --- a/graph/src/blockchain/mock.rs +++ b/graph/src/blockchain/mock.rs @@ -2,22 +2,23 @@ use crate::{ bail, components::{ link_resolver::LinkResolver, - store::{BlockNumber, DeploymentCursorTracker, DeploymentLocator}, + store::{BlockNumber, DeploymentCursorTracker, DeploymentLocator, WritableStore}, subgraph::InstanceDSTemplateInfo, }, data::subgraph::UnifiedMappingApiVersion, - prelude::{BlockHash, DataSourceTemplateInfo}, + prelude::{BlockHash, DataSourceTemplateInfo, DeploymentHash}, }; -use anyhow::Error; +use anyhow::{Error, Result}; use async_trait::async_trait; use serde::Deserialize; +use slog::Logger; use std::{collections::HashSet, convert::TryFrom, sync::Arc}; use super::{ block_stream::{self, BlockStream, FirehoseCursor}, client::ChainClient, BlockIngestor, BlockTime, EmptyNodeCapabilities, HostFn, IngestorError, MappingTriggerTrait, - NoopDecoderHook, TriggerWithHandler, + NoopDecoderHook, Trigger, TriggerFilterWrapper, TriggerWithHandler, }; use super::{ @@ -218,31 +219,49 @@ impl UnresolvedDataSourceTemplate for MockUnresolvedDataSource pub struct MockTriggersAdapter; #[async_trait] -impl TriggersAdapter for MockTriggersAdapter { +impl TriggersAdapter for MockTriggersAdapter { async fn ancestor_block( &self, _ptr: BlockPtr, _offset: BlockNumber, _root: Option, - ) -> Result, Error> { + ) -> Result, Error> { todo!() } + async fn load_blocks_by_numbers( + &self, + _logger: Logger, + _block_numbers: HashSet, + ) -> Result> { + unimplemented!() + } + + async fn chain_head_ptr(&self) -> Result, Error> { + unimplemented!() + } + async fn scan_triggers( &self, - _from: crate::components::store::BlockNumber, - _to: crate::components::store::BlockNumber, - _filter: &C::TriggerFilter, - ) -> Result<(Vec>, BlockNumber), Error> { - todo!() + from: crate::components::store::BlockNumber, + to: crate::components::store::BlockNumber, + filter: &MockTriggerFilter, + ) -> Result< + ( + Vec>, + BlockNumber, + ), + Error, + > { + blocks_with_triggers(from, to, filter).await } async fn triggers_in_block( &self, _logger: &slog::Logger, - _block: C::Block, - _filter: &C::TriggerFilter, - ) -> Result, Error> { + _block: MockBlock, + _filter: &MockTriggerFilter, + ) -> Result, Error> { todo!() } @@ -255,6 +274,26 @@ impl TriggersAdapter for MockTriggersAdapter { } } +async fn blocks_with_triggers( + _from: crate::components::store::BlockNumber, + to: crate::components::store::BlockNumber, + _filter: &MockTriggerFilter, +) -> Result< + ( + Vec>, + BlockNumber, + ), + Error, +> { + Ok(( + vec![BlockWithTriggers { + block: MockBlock { number: 0 }, + trigger_data: vec![Trigger::Chain(MockTriggerData)], + }], + to, + )) +} + #[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd)] pub struct MockTriggerData; @@ -347,7 +386,8 @@ impl Blockchain for MockBlockchain { _deployment: DeploymentLocator, _store: impl DeploymentCursorTracker, _start_blocks: Vec, - _filter: Arc, + _source_subgraph_stores: Vec<(DeploymentHash, Arc)>, + _filter: Arc>, _unified_api_version: UnifiedMappingApiVersion, ) -> Result>, Error> { todo!() diff --git a/graph/src/blockchain/mod.rs b/graph/src/blockchain/mod.rs index 73cac816728..d1e07d2d36e 100644 --- a/graph/src/blockchain/mod.rs +++ b/graph/src/blockchain/mod.rs @@ -20,13 +20,15 @@ use crate::{ components::{ adapter::ChainId, metrics::subgraph::SubgraphInstanceMetrics, - store::{DeploymentCursorTracker, DeploymentLocator, StoredDynamicDataSource}, + store::{ + DeploymentCursorTracker, DeploymentLocator, StoredDynamicDataSource, WritableStore, + }, subgraph::{HostMetrics, InstanceDSTemplateInfo, MappingError}, trigger_processor::RunnableTriggers, }, data::subgraph::{UnifiedMappingApiVersion, MIN_SPEC_VERSION}, - data_source::{self, DataSourceTemplateInfo}, - prelude::DataSourceContext, + data_source::{self, subgraph, DataSourceTemplateInfo}, + prelude::{DataSourceContext, DeploymentHash}, runtime::{gas::GasCounter, AscHeap, HostExportError}, }; use crate::{ @@ -189,7 +191,8 @@ pub trait Blockchain: Debug + Sized + Send + Sync + Unpin + 'static { deployment: DeploymentLocator, store: impl DeploymentCursorTracker, start_blocks: Vec, - filter: Arc, + source_subgraph_stores: Vec<(DeploymentHash, Arc)>, + filter: Arc>, unified_api_version: UnifiedMappingApiVersion, ) -> Result>, Error>; @@ -247,6 +250,37 @@ impl From for IngestorError { } } +#[derive(Debug)] +pub struct TriggerFilterWrapper { + pub chain_filter: Arc, + pub subgraph_filter: Vec, +} + +#[derive(Clone, Debug)] +pub struct SubgraphFilter { + pub subgraph: DeploymentHash, + pub start_block: BlockNumber, + pub entities: Vec, +} + +impl TriggerFilterWrapper { + pub fn new(filter: C::TriggerFilter, subgraph_filter: Vec) -> Self { + Self { + chain_filter: Arc::new(filter), + subgraph_filter, + } + } +} + +impl Clone for TriggerFilterWrapper { + fn clone(&self) -> Self { + Self { + chain_filter: self.chain_filter.cheap_clone(), + subgraph_filter: self.subgraph_filter.clone(), + } + } +} + pub trait TriggerFilter: Default + Clone + Send + Sync { fn from_data_sources<'a>( data_sources: impl Iterator + Clone, @@ -370,6 +404,76 @@ pub trait UnresolvedDataSource: ) -> Result; } +#[derive(Debug)] +pub enum Trigger { + Chain(C::TriggerData), + Subgraph(subgraph::TriggerData), +} + +impl Trigger { + pub fn as_chain(&self) -> Option<&C::TriggerData> { + match self { + Trigger::Chain(data) => Some(data), + _ => None, + } + } + + pub fn as_subgraph(&self) -> Option<&subgraph::TriggerData> { + match self { + Trigger::Subgraph(data) => Some(data), + _ => None, + } + } +} + +impl Eq for Trigger where C::TriggerData: Eq {} + +impl PartialEq for Trigger +where + C::TriggerData: PartialEq, +{ + fn eq(&self, other: &Self) -> bool { + match (self, other) { + (Trigger::Chain(data1), Trigger::Chain(data2)) => data1 == data2, + (Trigger::Subgraph(a), Trigger::Subgraph(b)) => a == b, + _ => false, + } + } +} + +impl Clone for Trigger +where + C::TriggerData: Clone, +{ + fn clone(&self) -> Self { + match self { + Trigger::Chain(data) => Trigger::Chain(data.clone()), + Trigger::Subgraph(data) => Trigger::Subgraph(data.clone()), + } + } +} + +// TODO(krishna): Proper ordering for triggers +impl Ord for Trigger +where + C::TriggerData: Ord, +{ + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + match (self, other) { + (Trigger::Chain(data1), Trigger::Chain(data2)) => data1.cmp(data2), + (Trigger::Subgraph(_), Trigger::Chain(_)) => std::cmp::Ordering::Greater, + (Trigger::Chain(_), Trigger::Subgraph(_)) => std::cmp::Ordering::Less, + (Trigger::Subgraph(_), Trigger::Subgraph(_)) => std::cmp::Ordering::Equal, + } + } +} + +impl PartialOrd for Trigger { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + pub trait TriggerData { /// If there is an error when processing this trigger, this will called to add relevant context. /// For example an useful return is: `"block # (), transaction ". @@ -489,6 +593,7 @@ impl FromStr for BlockchainKind { "cosmos" => Ok(BlockchainKind::Cosmos), "substreams" => Ok(BlockchainKind::Substreams), "starknet" => Ok(BlockchainKind::Starknet), + "subgraph" => Ok(BlockchainKind::Ethereum), // TODO(krishna): We should detect the blockchain kind from the source subgraph _ => Err(anyhow!("unknown blockchain kind {}", s)), } } diff --git a/graph/src/blockchain/polling_block_stream.rs b/graph/src/blockchain/polling_block_stream.rs index ce3fdf2a4ef..5b37cd303b4 100644 --- a/graph/src/blockchain/polling_block_stream.rs +++ b/graph/src/blockchain/polling_block_stream.rs @@ -9,9 +9,9 @@ use std::time::Duration; use super::block_stream::{ BlockStream, BlockStreamError, BlockStreamEvent, BlockWithTriggers, ChainHeadUpdateStream, - FirehoseCursor, TriggersAdapter, BUFFERED_BLOCK_STREAM_SIZE, + FirehoseCursor, TriggersAdapterWrapper, BUFFERED_BLOCK_STREAM_SIZE, }; -use super::{Block, BlockPtr, Blockchain}; +use super::{Block, BlockPtr, Blockchain, TriggerFilterWrapper}; use crate::components::store::BlockNumber; use crate::data::subgraph::UnifiedMappingApiVersion; @@ -79,13 +79,13 @@ where C: Blockchain, { chain_store: Arc, - adapter: Arc>, + adapter: Arc>, node_id: NodeId, subgraph_id: DeploymentHash, // This is not really a block number, but the (unsigned) difference // between two block numbers reorg_threshold: BlockNumber, - filter: Arc, + filter: Arc>, start_blocks: Vec, logger: Logger, previous_triggers_per_block: f64, @@ -146,10 +146,10 @@ where pub fn new( chain_store: Arc, chain_head_update_stream: ChainHeadUpdateStream, - adapter: Arc>, + adapter: Arc>, node_id: NodeId, subgraph_id: DeploymentHash, - filter: Arc, + filter: Arc>, start_blocks: Vec, reorg_threshold: BlockNumber, logger: Logger, @@ -218,7 +218,7 @@ where let max_block_range_size = self.max_block_range_size; // Get pointers from database for comparison - let head_ptr_opt = ctx.chain_store.chain_head_ptr().await?; + let head_ptr_opt = ctx.adapter.chain_head_ptr().await?; let subgraph_ptr = self.current_block.clone(); // If chain head ptr is not set yet @@ -469,7 +469,11 @@ where // Note that head_ancestor is a child of subgraph_ptr. let block = self .adapter - .triggers_in_block(&self.logger, head_ancestor, &self.filter) + .triggers_in_block( + &self.logger, + head_ancestor, + &self.filter.chain_filter.clone(), + ) .await?; Ok(ReconciliationStep::ProcessDescendantBlocks(vec![block], 1)) } else { diff --git a/graph/src/blockchain/types.rs b/graph/src/blockchain/types.rs index 931e52e2dd5..7c670d4cdd6 100644 --- a/graph/src/blockchain/types.rs +++ b/graph/src/blockchain/types.rs @@ -31,6 +31,10 @@ impl BlockHash { &self.0 } + pub fn as_h256(&self) -> H256 { + H256::from_slice(self.as_slice()) + } + /// Encodes the block hash into a hexadecimal string **without** a "0x" /// prefix. Hashes are stored in the database in this format when the /// schema uses `text` columns, which is a legacy and such columns diff --git a/graph/src/components/subgraph/instance.rs b/graph/src/components/subgraph/instance.rs index 470e50334d3..889690c3916 100644 --- a/graph/src/components/subgraph/instance.rs +++ b/graph/src/components/subgraph/instance.rs @@ -20,6 +20,7 @@ impl From<&DataSourceTemplate> for InstanceDSTemplate { match value { DataSourceTemplate::Onchain(ds) => Self::Onchain(ds.info()), DataSourceTemplate::Offchain(ds) => Self::Offchain(ds.clone()), + DataSourceTemplate::Subgraph(_) => todo!(), // TODO(krishna) } } } diff --git a/graph/src/components/subgraph/proof_of_indexing/online.rs b/graph/src/components/subgraph/proof_of_indexing/online.rs index caaa76f0a76..f90fac969cf 100644 --- a/graph/src/components/subgraph/proof_of_indexing/online.rs +++ b/graph/src/components/subgraph/proof_of_indexing/online.rs @@ -146,8 +146,8 @@ impl BlockEventStream { fn write(&mut self, event: &ProofOfIndexingEvent<'_>) { let children = &[ 1, // kvp -> v - 0, // PoICausalityRegion.blocks: Vec - self.block_index, // Vec -> [i] + 0, // PoICausalityRegion.blocks: Result> + self.block_index, // Result> -> [i] 0, // Block.events -> Vec self.vec_length, ]; diff --git a/graph/src/data/subgraph/api_version.rs b/graph/src/data/subgraph/api_version.rs index e626e9f1dbc..43ee639007c 100644 --- a/graph/src/data/subgraph/api_version.rs +++ b/graph/src/data/subgraph/api_version.rs @@ -54,6 +54,9 @@ pub const SPEC_VERSION_1_1_0: Version = Version::new(1, 1, 0); // Enables eth call declarations and indexed arguments(topics) filtering in manifest pub const SPEC_VERSION_1_2_0: Version = Version::new(1, 2, 0); +// Enables subgraphs as datasource +pub const SPEC_VERSION_1_3_0: Version = Version::new(1, 3, 0); + // The latest spec version available pub const LATEST_VERSION: &Version = &SPEC_VERSION_1_2_0; diff --git a/graph/src/data/subgraph/mod.rs b/graph/src/data/subgraph/mod.rs index 52b0f4dfed1..941764b9a12 100644 --- a/graph/src/data/subgraph/mod.rs +++ b/graph/src/data/subgraph/mod.rs @@ -33,7 +33,7 @@ use web3::types::Address; use crate::{ bail, - blockchain::{BlockPtr, Blockchain, DataSource as _}, + blockchain::{BlockPtr, Blockchain}, components::{ link_resolver::LinkResolver, store::{StoreError, SubgraphStore}, @@ -140,6 +140,10 @@ impl DeploymentHash { link: format!("/ipfs/{}", self), } } + + pub fn to_bytes(&self) -> Vec { + self.0.as_bytes().to_vec() + } } impl Deref for DeploymentHash { @@ -719,7 +723,7 @@ impl UnvalidatedSubgraphManifest { .0 .data_sources .iter() - .filter_map(|d| Some(d.as_onchain()?.network()?.to_string())) + .filter_map(|d| Some(d.network()?.to_string())) .collect::>(); networks.sort(); networks.dedup(); @@ -765,11 +769,9 @@ impl SubgraphManifest { max_spec_version: semver::Version, ) -> Result { let unresolved = UnresolvedSubgraphManifest::parse(id, raw)?; - let resolved = unresolved .resolve(resolver, logger, max_spec_version) .await?; - Ok(resolved) } @@ -777,14 +779,14 @@ impl SubgraphManifest { // Assume the manifest has been validated, ensuring network names are homogenous self.data_sources .iter() - .find_map(|d| Some(d.as_onchain()?.network()?.to_string())) + .find_map(|d| Some(d.network()?.to_string())) .expect("Validated manifest does not have a network defined on any datasource") } pub fn start_blocks(&self) -> Vec { self.data_sources .iter() - .filter_map(|d| Some(d.as_onchain()?.start_block())) + .filter_map(|d| d.start_block()) .collect() } diff --git a/graph/src/data_source/mod.rs b/graph/src/data_source/mod.rs index a38148b25fe..1d255b1563c 100644 --- a/graph/src/data_source/mod.rs +++ b/graph/src/data_source/mod.rs @@ -1,6 +1,8 @@ pub mod causality_region; pub mod offchain; +pub mod subgraph; +pub use self::DataSource as DataSourceEnum; pub use causality_region::CausalityRegion; #[cfg(test)] @@ -17,6 +19,7 @@ use crate::{ store::{BlockNumber, StoredDynamicDataSource}, }, data_source::offchain::OFFCHAIN_KINDS, + data_source::subgraph::SUBGRAPH_DS_KIND, prelude::{CheapClone as _, DataSourceContext}, schema::{EntityType, InputSchema}, }; @@ -35,6 +38,7 @@ use thiserror::Error; pub enum DataSource { Onchain(C::DataSource), Offchain(offchain::DataSource), + Subgraph(subgraph::DataSource), } #[derive(Error, Debug)] @@ -89,6 +93,23 @@ impl DataSource { match self { Self::Onchain(ds) => Some(ds), Self::Offchain(_) => None, + Self::Subgraph(_) => None, + } + } + + pub fn as_subgraph(&self) -> Option<&subgraph::DataSource> { + match self { + Self::Onchain(_) => None, + Self::Offchain(_) => None, + Self::Subgraph(ds) => Some(ds), + } + } + + pub fn is_chain_based(&self) -> bool { + match self { + Self::Onchain(_) => true, + Self::Offchain(_) => false, + Self::Subgraph(_) => true, } } @@ -96,6 +117,23 @@ impl DataSource { match self { Self::Onchain(_) => None, Self::Offchain(ds) => Some(ds), + Self::Subgraph(_) => None, + } + } + + pub fn network(&self) -> Option<&str> { + match self { + DataSourceEnum::Onchain(ds) => ds.network(), + DataSourceEnum::Offchain(_) => None, + DataSourceEnum::Subgraph(ds) => ds.network(), + } + } + + pub fn start_block(&self) -> Option { + match self { + DataSourceEnum::Onchain(ds) => Some(ds.start_block()), + DataSourceEnum::Offchain(_) => None, + DataSourceEnum::Subgraph(ds) => Some(ds.source.start_block), } } @@ -111,6 +149,7 @@ impl DataSource { match self { Self::Onchain(ds) => ds.address().map(ToOwned::to_owned), Self::Offchain(ds) => ds.address(), + Self::Subgraph(ds) => ds.address(), } } @@ -118,6 +157,7 @@ impl DataSource { match self { Self::Onchain(ds) => ds.name(), Self::Offchain(ds) => &ds.name, + Self::Subgraph(ds) => &ds.name, } } @@ -125,6 +165,7 @@ impl DataSource { match self { Self::Onchain(ds) => ds.kind().to_owned(), Self::Offchain(ds) => ds.kind.to_string(), + Self::Subgraph(ds) => ds.kind.clone(), } } @@ -132,6 +173,7 @@ impl DataSource { match self { Self::Onchain(ds) => ds.min_spec_version(), Self::Offchain(ds) => ds.min_spec_version(), + Self::Subgraph(ds) => ds.min_spec_version(), } } @@ -139,6 +181,7 @@ impl DataSource { match self { Self::Onchain(ds) => ds.end_block(), Self::Offchain(_) => None, + Self::Subgraph(_) => None, } } @@ -146,6 +189,7 @@ impl DataSource { match self { Self::Onchain(ds) => ds.creation_block(), Self::Offchain(ds) => ds.creation_block, + Self::Subgraph(ds) => ds.creation_block, } } @@ -153,6 +197,7 @@ impl DataSource { match self { Self::Onchain(ds) => ds.context(), Self::Offchain(ds) => ds.context.clone(), + Self::Subgraph(ds) => ds.context.clone(), } } @@ -160,6 +205,7 @@ impl DataSource { match self { Self::Onchain(ds) => ds.api_version(), Self::Offchain(ds) => ds.mapping.api_version.clone(), + Self::Subgraph(ds) => ds.mapping.api_version.clone(), } } @@ -167,6 +213,7 @@ impl DataSource { match self { Self::Onchain(ds) => ds.runtime(), Self::Offchain(ds) => Some(ds.mapping.runtime.cheap_clone()), + Self::Subgraph(ds) => Some(ds.mapping.runtime.cheap_clone()), } } @@ -176,6 +223,7 @@ impl DataSource { // been enforced. Self::Onchain(_) => EntityTypeAccess::Any, Self::Offchain(ds) => EntityTypeAccess::Restriced(ds.mapping.entities.clone()), + Self::Subgraph(_) => EntityTypeAccess::Any, } } @@ -183,6 +231,7 @@ impl DataSource { match self { Self::Onchain(ds) => ds.handler_kinds(), Self::Offchain(ds) => vec![ds.handler_kind()].into_iter().collect(), + Self::Subgraph(ds) => vec![ds.handler_kind()].into_iter().collect(), } } @@ -190,6 +239,7 @@ impl DataSource { match self { Self::Onchain(ds) => ds.has_declared_calls(), Self::Offchain(_) => false, + Self::Subgraph(_) => false, } } @@ -207,8 +257,15 @@ impl DataSource { (Self::Offchain(ds), TriggerData::Offchain(trigger)) => { Ok(ds.match_and_decode(trigger)) } + (Self::Subgraph(ds), TriggerData::Subgraph(trigger)) => { + Ok(ds.match_and_decode(block, trigger)) + } (Self::Onchain(_), TriggerData::Offchain(_)) - | (Self::Offchain(_), TriggerData::Onchain(_)) => Ok(None), + | (Self::Offchain(_), TriggerData::Onchain(_)) + | (Self::Onchain(_), TriggerData::Subgraph(_)) + | (Self::Offchain(_), TriggerData::Subgraph(_)) + | (Self::Subgraph(_), TriggerData::Onchain(_)) + | (Self::Subgraph(_), TriggerData::Offchain(_)) => Ok(None), } } @@ -224,6 +281,7 @@ impl DataSource { match self { Self::Onchain(ds) => ds.as_stored_dynamic_data_source(), Self::Offchain(ds) => ds.as_stored_dynamic_data_source(), + Self::Subgraph(_) => todo!(), // TODO(krishna) } } @@ -240,6 +298,7 @@ impl DataSource { offchain::DataSource::from_stored_dynamic_data_source(template, stored) .map(DataSource::Offchain) } + DataSourceTemplate::Subgraph(_) => todo!(), // TODO(krishna) } } @@ -247,6 +306,7 @@ impl DataSource { match self { Self::Onchain(ds) => ds.validate(spec_version), Self::Offchain(_) => vec![], + Self::Subgraph(_) => vec![], // TODO(krishna) } } @@ -254,6 +314,7 @@ impl DataSource { match self { Self::Onchain(_) => CausalityRegion::ONCHAIN, Self::Offchain(ds) => ds.causality_region, + Self::Subgraph(_) => CausalityRegion::ONCHAIN, } } } @@ -262,6 +323,7 @@ impl DataSource { pub enum UnresolvedDataSource { Onchain(C::UnresolvedDataSource), Offchain(offchain::UnresolvedDataSource), + Subgraph(subgraph::UnresolvedDataSource), } impl UnresolvedDataSource { @@ -276,6 +338,10 @@ impl UnresolvedDataSource { .resolve(resolver, logger, manifest_idx) .await .map(DataSource::Onchain), + Self::Subgraph(unresolved) => unresolved + .resolve(resolver, logger, manifest_idx) + .await + .map(DataSource::Subgraph), Self::Offchain(_unresolved) => { anyhow::bail!( "static file data sources are not yet supported, \\ @@ -299,6 +365,7 @@ pub struct DataSourceTemplateInfo { pub enum DataSourceTemplate { Onchain(C::DataSourceTemplate), Offchain(offchain::DataSourceTemplate), + Subgraph(subgraph::DataSourceTemplate), } impl DataSourceTemplate { @@ -306,6 +373,7 @@ impl DataSourceTemplate { match self { DataSourceTemplate::Onchain(template) => template.info(), DataSourceTemplate::Offchain(template) => template.clone().into(), + DataSourceTemplate::Subgraph(template) => template.clone().into(), } } @@ -313,6 +381,7 @@ impl DataSourceTemplate { match self { Self::Onchain(ds) => Some(ds), Self::Offchain(_) => None, + Self::Subgraph(_) => todo!(), // TODO(krishna) } } @@ -320,6 +389,7 @@ impl DataSourceTemplate { match self { Self::Onchain(_) => None, Self::Offchain(t) => Some(t), + Self::Subgraph(_) => todo!(), // TODO(krishna) } } @@ -327,6 +397,7 @@ impl DataSourceTemplate { match self { Self::Onchain(ds) => Some(ds), Self::Offchain(_) => None, + Self::Subgraph(_) => todo!(), // TODO(krishna) } } @@ -334,6 +405,7 @@ impl DataSourceTemplate { match self { Self::Onchain(ds) => &ds.name(), Self::Offchain(ds) => &ds.name, + Self::Subgraph(ds) => &ds.name, } } @@ -341,6 +413,7 @@ impl DataSourceTemplate { match self { Self::Onchain(ds) => ds.api_version(), Self::Offchain(ds) => ds.mapping.api_version.clone(), + Self::Subgraph(ds) => ds.mapping.api_version.clone(), } } @@ -348,6 +421,7 @@ impl DataSourceTemplate { match self { Self::Onchain(ds) => ds.runtime(), Self::Offchain(ds) => Some(ds.mapping.runtime.clone()), + Self::Subgraph(ds) => Some(ds.mapping.runtime.clone()), } } @@ -355,6 +429,7 @@ impl DataSourceTemplate { match self { Self::Onchain(ds) => ds.manifest_idx(), Self::Offchain(ds) => ds.manifest_idx, + Self::Subgraph(ds) => ds.manifest_idx, } } @@ -362,6 +437,7 @@ impl DataSourceTemplate { match self { Self::Onchain(ds) => ds.kind().to_string(), Self::Offchain(ds) => ds.kind.to_string(), + Self::Subgraph(ds) => ds.kind.clone(), } } } @@ -370,6 +446,7 @@ impl DataSourceTemplate { pub enum UnresolvedDataSourceTemplate { Onchain(C::UnresolvedDataSourceTemplate), Offchain(offchain::UnresolvedDataSourceTemplate), + Subgraph(subgraph::UnresolvedDataSourceTemplate), } impl Default for UnresolvedDataSourceTemplate { @@ -395,6 +472,10 @@ impl UnresolvedDataSourceTemplate { .resolve(resolver, logger, manifest_idx, schema) .await .map(DataSourceTemplate::Offchain), + Self::Subgraph(ds) => ds + .resolve(resolver, logger, manifest_idx) + .await + .map(DataSourceTemplate::Subgraph), } } } @@ -475,6 +556,7 @@ impl TriggerWithHandler { pub enum TriggerData { Onchain(C::TriggerData), Offchain(offchain::TriggerData), + Subgraph(subgraph::TriggerData), } impl TriggerData { @@ -482,6 +564,7 @@ impl TriggerData { match self { Self::Onchain(trigger) => trigger.error_context(), Self::Offchain(trigger) => format!("{:?}", trigger.source), + Self::Subgraph(trigger) => format!("{:?}", trigger.source), } } } @@ -490,6 +573,7 @@ impl TriggerData { pub enum MappingTrigger { Onchain(C::MappingTrigger), Offchain(offchain::TriggerData), + Subgraph(subgraph::TriggerData), } impl MappingTrigger { @@ -497,6 +581,7 @@ impl MappingTrigger { match self { Self::Onchain(trigger) => Some(trigger.error_context()), Self::Offchain(_) => None, // TODO: Add error context for offchain triggers + Self::Subgraph(_) => None, // TODO(krishna) } } @@ -504,6 +589,7 @@ impl MappingTrigger { match self { Self::Onchain(trigger) => Some(trigger), Self::Offchain(_) => None, + Self::Subgraph(_) => None, // TODO(krishna) } } } @@ -515,6 +601,7 @@ macro_rules! clone_data_source { match self { Self::Onchain(ds) => Self::Onchain(ds.clone()), Self::Offchain(ds) => Self::Offchain(ds.clone()), + Self::Subgraph(ds) => Self::Subgraph(ds.clone()), } } } @@ -541,6 +628,10 @@ macro_rules! deserialize_data_source { offchain::$t::deserialize(map.into_deserializer()) .map_err(serde::de::Error::custom) .map($t::Offchain) + } else if SUBGRAPH_DS_KIND == kind { + subgraph::$t::deserialize(map.into_deserializer()) + .map_err(serde::de::Error::custom) + .map($t::Subgraph) } else if (&C::KIND.to_string() == kind) || C::ALIASES.contains(&kind) { C::$t::deserialize(map.into_deserializer()) .map_err(serde::de::Error::custom) diff --git a/graph/src/data_source/subgraph.rs b/graph/src/data_source/subgraph.rs new file mode 100644 index 00000000000..dba43786438 --- /dev/null +++ b/graph/src/data_source/subgraph.rs @@ -0,0 +1,305 @@ +use crate::{ + blockchain::{Block, Blockchain}, + components::{ + link_resolver::LinkResolver, + store::{BlockNumber, Entity}, + }, + data::{subgraph::SPEC_VERSION_1_3_0, value::Word}, + data_source, + prelude::{DataSourceContext, DeploymentHash, Link}, +}; +use anyhow::{Context, Error}; +use serde::Deserialize; +use slog::{info, Logger}; +use std::{fmt, sync::Arc}; + +use super::{DataSourceTemplateInfo, TriggerWithHandler}; + +pub const SUBGRAPH_DS_KIND: &str = "subgraph"; + +const ENTITY_HANDLER_KINDS: &str = "entity"; + +#[derive(Debug, Clone)] +pub struct DataSource { + pub kind: String, + pub name: String, + pub network: String, + pub manifest_idx: u32, + pub source: Source, + pub mapping: Mapping, + pub context: Arc>, + pub creation_block: Option, +} + +impl DataSource { + pub fn new( + kind: String, + name: String, + network: String, + manifest_idx: u32, + source: Source, + mapping: Mapping, + context: Arc>, + creation_block: Option, + ) -> Self { + Self { + kind, + name, + network, + manifest_idx, + source, + mapping, + context, + creation_block, + } + } + + pub fn min_spec_version(&self) -> semver::Version { + SPEC_VERSION_1_3_0 + } + + pub fn handler_kind(&self) -> &str { + ENTITY_HANDLER_KINDS + } + + pub fn network(&self) -> Option<&str> { + Some(&self.network) + } + + pub fn match_and_decode( + &self, + block: &Arc, + trigger: &TriggerData, + ) -> Option>> { + if self.source.address != trigger.source { + return None; + } + + let trigger_ref = self.mapping.handlers.iter().find_map(|handler| { + if handler.entity != trigger.entity_type { + return None; + } + + Some(TriggerWithHandler::new( + data_source::MappingTrigger::Subgraph(trigger.clone()), + handler.handler.clone(), + block.ptr(), + block.timestamp(), + )) + }); + + return trigger_ref; + } + + pub fn address(&self) -> Option> { + Some(self.source.address().to_bytes()) + } + + pub fn source_subgraph(&self) -> DeploymentHash { + self.source.address() + } +} + +pub type Base64 = Word; + +#[derive(Clone, Debug, Default, Hash, Eq, PartialEq, Deserialize)] +pub struct Source { + pub address: DeploymentHash, + #[serde(default)] + pub start_block: BlockNumber, +} + +impl Source { + /// The concept of an address may or not make sense for a subgraph data source, but graph node + /// will use this in a few places where some sort of not necessarily unique id is useful: + /// 1. This is used as the value to be returned to mappings from the `dataSource.address()` host + /// function, so changing this is a breaking change. + /// 2. This is used to match with triggers with hosts in `fn hosts_for_trigger`, so make sure + /// the `source` of the data source is equal the `source` of the `TriggerData`. + pub fn address(&self) -> DeploymentHash { + self.address.clone() + } +} + +#[derive(Clone, Debug)] +pub struct Mapping { + pub language: String, + pub api_version: semver::Version, + pub entities: Vec, + pub handlers: Vec, + pub runtime: Arc>, + pub link: Link, +} + +#[derive(Clone, Debug, Hash, Eq, PartialEq, Deserialize)] +pub struct EntityHandler { + pub handler: String, + pub entity: String, +} + +#[derive(Clone, Debug, Default, Eq, PartialEq, Deserialize)] +pub struct UnresolvedDataSource { + pub kind: String, + pub name: String, + pub network: String, + pub source: UnresolvedSource, + pub mapping: UnresolvedMapping, +} + +#[derive(Clone, Debug, Default, Hash, Eq, PartialEq, Deserialize)] +pub struct UnresolvedSource { + address: DeploymentHash, + #[serde(default)] + start_block: BlockNumber, +} + +#[derive(Clone, Debug, Default, Hash, Eq, PartialEq, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct UnresolvedMapping { + pub api_version: String, + pub language: String, + pub file: Link, + pub handlers: Vec, + pub entities: Vec, +} + +impl UnresolvedDataSource { + #[allow(dead_code)] + pub(super) async fn resolve( + self, + resolver: &Arc, + logger: &Logger, + manifest_idx: u32, + ) -> Result { + info!(logger, "Resolve subgraph data source"; + "name" => &self.name, + "kind" => &self.kind, + "source" => format_args!("{:?}", &self.source), + ); + + let kind = self.kind; + let source = Source { + address: self.source.address, + start_block: self.source.start_block, + }; + + Ok(DataSource { + manifest_idx, + kind, + name: self.name, + network: self.network, + source, + mapping: self.mapping.resolve(resolver, logger).await?, + context: Arc::new(None), + creation_block: None, + }) + } +} + +impl UnresolvedMapping { + pub async fn resolve( + self, + resolver: &Arc, + logger: &Logger, + ) -> Result { + info!(logger, "Resolve subgraph ds mapping"; "link" => &self.file.link); + + Ok(Mapping { + language: self.language, + api_version: semver::Version::parse(&self.api_version)?, + entities: self.entities, + handlers: self.handlers, + runtime: Arc::new(resolver.cat(logger, &self.file).await?), + link: self.file, + }) + } +} + +#[derive(Clone, Debug, Deserialize)] +pub struct UnresolvedDataSourceTemplate { + pub kind: String, + pub network: Option, + pub name: String, + pub mapping: UnresolvedMapping, +} + +#[derive(Clone, Debug)] +pub struct DataSourceTemplate { + pub kind: String, + pub network: Option, + pub name: String, + pub manifest_idx: u32, + pub mapping: Mapping, +} + +impl Into for DataSourceTemplate { + fn into(self) -> DataSourceTemplateInfo { + let DataSourceTemplate { + kind, + network: _, + name, + manifest_idx, + mapping, + } = self; + + DataSourceTemplateInfo { + api_version: mapping.api_version.clone(), + runtime: Some(mapping.runtime), + name, + manifest_idx: Some(manifest_idx), + kind: kind.to_string(), + } + } +} + +impl UnresolvedDataSourceTemplate { + pub async fn resolve( + self, + resolver: &Arc, + logger: &Logger, + manifest_idx: u32, + ) -> Result { + let kind = self.kind; + + let mapping = self + .mapping + .resolve(resolver, logger) + .await + .with_context(|| format!("failed to resolve data source template {}", self.name))?; + + Ok(DataSourceTemplate { + kind, + network: self.network, + name: self.name, + manifest_idx, + mapping, + }) + } +} + +#[derive(Clone, PartialEq, Eq)] +pub struct TriggerData { + pub source: DeploymentHash, + pub entity: Entity, + pub entity_type: String, +} + +impl TriggerData { + pub fn new(source: DeploymentHash, entity: Entity, entity_type: String) -> Self { + Self { + source, + entity, + entity_type, + } + } +} + +impl fmt::Debug for TriggerData { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "TriggerData {{ source: {:?}, entity: {:?} }}", + self.source, self.entity, + ) + } +} diff --git a/graph/src/env/mod.rs b/graph/src/env/mod.rs index af53562528a..3b1ca5a2862 100644 --- a/graph/src/env/mod.rs +++ b/graph/src/env/mod.rs @@ -357,7 +357,7 @@ struct Inner { default = "false" )] allow_non_deterministic_fulltext_search: EnvVarBoolean, - #[envconfig(from = "GRAPH_MAX_SPEC_VERSION", default = "1.2.0")] + #[envconfig(from = "GRAPH_MAX_SPEC_VERSION", default = "1.3.0")] max_spec_version: Version, #[envconfig(from = "GRAPH_LOAD_WINDOW_SIZE", default = "300")] load_window_size_in_secs: u64, diff --git a/runtime/wasm/src/host.rs b/runtime/wasm/src/host.rs index 3ecee7ba753..ebf107fb3ec 100644 --- a/runtime/wasm/src/host.rs +++ b/runtime/wasm/src/host.rs @@ -366,6 +366,7 @@ impl RuntimeHostTrait for RuntimeHost { match self.data_source() { DataSource::Onchain(_) => None, DataSource::Offchain(ds) => ds.done_at(), + DataSource::Subgraph(_) => None, } } @@ -373,6 +374,7 @@ impl RuntimeHostTrait for RuntimeHost { match self.data_source() { DataSource::Onchain(_) => {} DataSource::Offchain(ds) => ds.set_done_at(block), + DataSource::Subgraph(_) => {} } } diff --git a/runtime/wasm/src/module/mod.rs b/runtime/wasm/src/module/mod.rs index ffe4f7aba8e..532f75d2660 100644 --- a/runtime/wasm/src/module/mod.rs +++ b/runtime/wasm/src/module/mod.rs @@ -4,6 +4,7 @@ use std::mem::MaybeUninit; use anyhow::anyhow; use anyhow::Error; use graph::blockchain::Blockchain; +use graph::data_source::subgraph; use graph::util::mem::init_slice; use semver::Version; use wasmtime::AsContext; @@ -69,6 +70,16 @@ impl ToAscPtr for offchain::TriggerData { } } +impl ToAscPtr for subgraph::TriggerData { + fn to_asc_ptr( + self, + heap: &mut H, + gas: &GasCounter, + ) -> Result, HostExportError> { + asc_new(heap, &self.entity.sorted_ref(), gas).map(|ptr| ptr.erase()) + } +} + impl ToAscPtr for MappingTrigger where C::MappingTrigger: ToAscPtr, @@ -81,6 +92,7 @@ where match self { MappingTrigger::Onchain(trigger) => trigger.to_asc_ptr(heap, gas), MappingTrigger::Offchain(trigger) => trigger.to_asc_ptr(heap, gas), + MappingTrigger::Subgraph(trigger) => trigger.to_asc_ptr(heap, gas), } } } diff --git a/store/test-store/tests/chain/ethereum/manifest.rs b/store/test-store/tests/chain/ethereum/manifest.rs index 9089ec4f572..34eaf110f77 100644 --- a/store/test-store/tests/chain/ethereum/manifest.rs +++ b/store/test-store/tests/chain/ethereum/manifest.rs @@ -11,10 +11,10 @@ use graph::data::store::Value; use graph::data::subgraph::schema::SubgraphError; use graph::data::subgraph::{ Prune, LATEST_VERSION, SPEC_VERSION_0_0_4, SPEC_VERSION_0_0_7, SPEC_VERSION_0_0_8, - SPEC_VERSION_0_0_9, SPEC_VERSION_1_0_0, SPEC_VERSION_1_2_0, + SPEC_VERSION_0_0_9, SPEC_VERSION_1_0_0, SPEC_VERSION_1_2_0, SPEC_VERSION_1_3_0, }; use graph::data_source::offchain::OffchainDataSourceKind; -use graph::data_source::DataSourceTemplate; +use graph::data_source::{DataSourceEnum, DataSourceTemplate}; use graph::entity; use graph::env::ENV_VARS; use graph::prelude::web3::types::H256; @@ -166,10 +166,52 @@ specVersion: 0.0.7 let data_source = match &manifest.templates[0] { DataSourceTemplate::Offchain(ds) => ds, DataSourceTemplate::Onchain(_) => unreachable!(), + DataSourceTemplate::Subgraph(_) => unreachable!(), }; assert_eq!(data_source.kind, OffchainDataSourceKind::Ipfs); } +#[tokio::test] +async fn subgraph_ds_manifest() { + let yaml = " +schema: + file: + /: /ipfs/Qmschema +dataSources: + - name: SubgraphSource + kind: subgraph + entities: + - Gravatar + network: mainnet + source: + address: 'QmUVaWpdKgcxBov1jHEa8dr46d2rkVzfHuZFu4fXJ4sFse' + startBlock: 0 + mapping: + apiVersion: 0.0.6 + language: wasm/assemblyscript + entities: + - TestEntity + file: + /: /ipfs/Qmmapping + handlers: + - handler: handleEntity + entity: User +specVersion: 1.3.0 +"; + + let manifest = resolve_manifest(yaml, SPEC_VERSION_1_3_0).await; + + assert_eq!("Qmmanifest", manifest.id.as_str()); + assert_eq!(manifest.data_sources.len(), 1); + let data_source = &manifest.data_sources[0]; + match data_source { + DataSourceEnum::Subgraph(ds) => { + assert_eq!(ds.name, "SubgraphSource"); + } + _ => panic!("Expected a subgraph data source"), + } +} + #[tokio::test] async fn graft_manifest() { const YAML: &str = " diff --git a/tests/runner-tests/subgraph-data-sources/abis/Contract.abi b/tests/runner-tests/subgraph-data-sources/abis/Contract.abi new file mode 100644 index 00000000000..9d9f56b9263 --- /dev/null +++ b/tests/runner-tests/subgraph-data-sources/abis/Contract.abi @@ -0,0 +1,15 @@ +[ + { + "anonymous": false, + "inputs": [ + { + "indexed": false, + "internalType": "string", + "name": "testCommand", + "type": "string" + } + ], + "name": "TestEvent", + "type": "event" + } +] diff --git a/tests/runner-tests/subgraph-data-sources/package.json b/tests/runner-tests/subgraph-data-sources/package.json new file mode 100644 index 00000000000..87537290ad2 --- /dev/null +++ b/tests/runner-tests/subgraph-data-sources/package.json @@ -0,0 +1,13 @@ +{ + "name": "subgraph-data-sources", + "version": "0.1.0", + "scripts": { + "codegen": "graph codegen --skip-migrations", + "create:test": "graph create test/subgraph-data-sources --node $GRAPH_NODE_ADMIN_URI", + "deploy:test": "graph deploy test/subgraph-data-sources --version-label v0.0.1 --ipfs $IPFS_URI --node $GRAPH_NODE_ADMIN_URI" + }, + "devDependencies": { + "@graphprotocol/graph-cli": "0.79.0-alpha-20240711124603-49edf22", + "@graphprotocol/graph-ts": "0.31.0" + } +} diff --git a/tests/runner-tests/subgraph-data-sources/schema.graphql b/tests/runner-tests/subgraph-data-sources/schema.graphql new file mode 100644 index 00000000000..6f97fa65c43 --- /dev/null +++ b/tests/runner-tests/subgraph-data-sources/schema.graphql @@ -0,0 +1,6 @@ +type Data @entity { + id: ID! + foo: String + bar: Int + isTest: Boolean +} diff --git a/tests/runner-tests/subgraph-data-sources/src/mapping.ts b/tests/runner-tests/subgraph-data-sources/src/mapping.ts new file mode 100644 index 00000000000..2e1a5382af3 --- /dev/null +++ b/tests/runner-tests/subgraph-data-sources/src/mapping.ts @@ -0,0 +1,6 @@ +import { Entity, log } from '@graphprotocol/graph-ts'; + +export function handleBlock(content: Entity): void { + let stringContent = content.getString('val'); + log.info('Content: {}', [stringContent]); +} diff --git a/tests/runner-tests/subgraph-data-sources/subgraph.yaml b/tests/runner-tests/subgraph-data-sources/subgraph.yaml new file mode 100644 index 00000000000..1c666e3417e --- /dev/null +++ b/tests/runner-tests/subgraph-data-sources/subgraph.yaml @@ -0,0 +1,19 @@ +specVersion: 1.3.0 +schema: + file: ./schema.graphql +dataSources: + - kind: subgraph + name: Contract + network: test + source: + address: 'QmRFXhvyvbm4z5Lo7z2mN9Ckmo623uuB2jJYbRmAXgYKXJ' + startBlock: 6082461 + mapping: + apiVersion: 0.0.7 + language: wasm/assemblyscript + entities: + - Gravatar + handlers: + - handler: handleBlock + entity: User + file: ./src/mapping.ts diff --git a/tests/runner-tests/yarn.lock b/tests/runner-tests/yarn.lock index 50e0c2b471f..9f3bdae834d 100644 --- a/tests/runner-tests/yarn.lock +++ b/tests/runner-tests/yarn.lock @@ -349,6 +349,40 @@ which "2.0.2" yaml "1.10.2" +"@graphprotocol/graph-cli@0.79.0-alpha-20240711124603-49edf22": + version "0.79.0-alpha-20240711124603-49edf22" + resolved "https://registry.yarnpkg.com/@graphprotocol/graph-cli/-/graph-cli-0.79.0-alpha-20240711124603-49edf22.tgz#4e3f6201932a0b68ce64d6badd8432cf2bead3c2" + integrity sha512-fZrdPiFbbbBVMnvsjfKA+j48WzzquaHQIpozBqnUKRPCV1n1NenIaq2nH16mlMwovRIS7AAIVCpa0QYQuPzw7Q== + dependencies: + "@float-capital/float-subgraph-uncrashable" "^0.0.0-alpha.4" + "@oclif/core" "2.8.6" + "@oclif/plugin-autocomplete" "^2.3.6" + "@oclif/plugin-not-found" "^2.4.0" + "@whatwg-node/fetch" "^0.8.4" + assemblyscript "0.19.23" + binary-install-raw "0.0.13" + chalk "3.0.0" + chokidar "3.5.3" + debug "4.3.4" + docker-compose "0.23.19" + dockerode "2.5.8" + fs-extra "9.1.0" + glob "9.3.5" + gluegun "5.1.6" + graphql "15.5.0" + immutable "4.2.1" + ipfs-http-client "55.0.0" + jayson "4.0.0" + js-yaml "3.14.1" + open "8.4.2" + prettier "3.0.3" + semver "7.4.0" + sync-request "6.1.0" + tmp-promise "3.0.3" + web3-eth-abi "1.7.0" + which "2.0.2" + yaml "1.10.2" + "@graphprotocol/graph-ts@0.30.0": version "0.30.0" resolved "https://registry.npmjs.org/@graphprotocol/graph-ts/-/graph-ts-0.30.0.tgz" @@ -1473,6 +1507,11 @@ defaults@^1.0.3: dependencies: clone "^1.0.2" +define-lazy-prop@^2.0.0: + version "2.0.0" + resolved "https://registry.yarnpkg.com/define-lazy-prop/-/define-lazy-prop-2.0.0.tgz#3f7ae421129bcaaac9bc74905c98a0009ec9ee7f" + integrity sha512-Ds09qNh8yw3khSjiJjiUInaGX9xlqZDY7JVryGxdxV7NPeuqQfplOpQ66yJFZut3jLa5zOwkXw1g9EI2uKh4Og== + delay@^5.0.0: version "5.0.0" resolved "https://registry.npmjs.org/delay/-/delay-5.0.0.tgz" @@ -1545,6 +1584,13 @@ ejs@3.1.6: dependencies: jake "^10.6.1" +ejs@3.1.8: + version "3.1.8" + resolved "https://registry.yarnpkg.com/ejs/-/ejs-3.1.8.tgz#758d32910c78047585c7ef1f92f9ee041c1c190b" + integrity sha512-/sXZeMlhS0ArkfX2Aw780gJzXSMPnKjtspYZv+f3NiKLlubezAHDU5+9xz6gd3/NhG3txQCo6xlglmTS+oTGEQ== + dependencies: + jake "^10.8.5" + ejs@^3.1.8: version "3.1.9" resolved "https://registry.npmjs.org/ejs/-/ejs-3.1.9.tgz" @@ -1996,6 +2042,42 @@ gluegun@5.1.2: which "2.0.2" yargs-parser "^21.0.0" +gluegun@5.1.6: + version "5.1.6" + resolved "https://registry.yarnpkg.com/gluegun/-/gluegun-5.1.6.tgz#74ec13193913dc610f5c1a4039972c70c96a7bad" + integrity sha512-9zbi4EQWIVvSOftJWquWzr9gLX2kaDgPkNR5dYWbM53eVvCI3iKuxLlnKoHC0v4uPoq+Kr/+F569tjoFbA4DSA== + dependencies: + apisauce "^2.1.5" + app-module-path "^2.2.0" + cli-table3 "0.6.0" + colors "1.4.0" + cosmiconfig "7.0.1" + cross-spawn "7.0.3" + ejs "3.1.8" + enquirer "2.3.6" + execa "5.1.1" + fs-jetpack "4.3.1" + lodash.camelcase "^4.3.0" + lodash.kebabcase "^4.1.1" + lodash.lowercase "^4.3.0" + lodash.lowerfirst "^4.3.1" + lodash.pad "^4.5.1" + lodash.padend "^4.6.1" + lodash.padstart "^4.6.1" + lodash.repeat "^4.1.0" + lodash.snakecase "^4.1.1" + lodash.startcase "^4.4.0" + lodash.trim "^4.5.1" + lodash.trimend "^4.5.1" + lodash.trimstart "^4.5.1" + lodash.uppercase "^4.3.0" + lodash.upperfirst "^4.3.1" + ora "4.0.2" + pluralize "^8.0.0" + semver "7.3.5" + which "2.0.2" + yargs-parser "^21.0.0" + graceful-fs@^4.1.6, graceful-fs@^4.2.0: version "4.2.11" resolved "https://registry.yarnpkg.com/graceful-fs/-/graceful-fs-4.2.11.tgz#4183e4e8bf08bb6e05bbb2f7d2e0c8f712ca40e3" @@ -2282,7 +2364,7 @@ is-binary-path@~2.1.0: dependencies: binary-extensions "^2.0.0" -is-docker@^2.0.0: +is-docker@^2.0.0, is-docker@^2.1.1: version "2.2.1" resolved "https://registry.npmjs.org/is-docker/-/is-docker-2.2.1.tgz" integrity sha512-F+i2BKsFrH66iaUFc0woD8sLy8getkwTwtOBjvs56Cx4CgJDeKQeqfz8wAYiSb8JOprWhHH5p77PbmYCvvUuXQ== @@ -2922,6 +3004,15 @@ onetime@^5.1.0, onetime@^5.1.2: dependencies: mimic-fn "^2.1.0" +open@8.4.2: + version "8.4.2" + resolved "https://registry.yarnpkg.com/open/-/open-8.4.2.tgz#5b5ffe2a8f793dcd2aad73e550cb87b59cb084f9" + integrity sha512-7x81NCL719oNbsq/3mh+hVrAWmFuEYUqrq/Iw3kUzH8ReypT9QQ0BLoJS7/G9k6N81XjW4qHWtjWwe/9eLy1EQ== + dependencies: + define-lazy-prop "^2.0.0" + is-docker "^2.1.1" + is-wsl "^2.2.0" + ora@4.0.2: version "4.0.2" resolved "https://registry.npmjs.org/ora/-/ora-4.0.2.tgz" @@ -3042,6 +3133,11 @@ prettier@1.19.1: resolved "https://registry.npmjs.org/prettier/-/prettier-1.19.1.tgz" integrity sha512-s7PoyDv/II1ObgQunCbB9PdLmUcBZcnWOcxDh7O0N/UwDEsHyqkW+Qh28jW+mVuCdx7gLB0BotYI1Y6uI9iyew== +prettier@3.0.3: + version "3.0.3" + resolved "https://registry.yarnpkg.com/prettier/-/prettier-3.0.3.tgz#432a51f7ba422d1469096c0fdc28e235db8f9643" + integrity sha512-L/4pUDMxcNa8R/EthV08Zt42WBO4h1rarVtK0K+QJG0X187OLo7l699jWw0GKuwzkPQ//jMFA/8Xm6Fh3J/DAg== + process-nextick-args@~2.0.0: version "2.0.1" resolved "https://registry.npmjs.org/process-nextick-args/-/process-nextick-args-2.0.1.tgz" diff --git a/tests/src/fixture/ethereum.rs b/tests/src/fixture/ethereum.rs index b20672ce563..5381a530148 100644 --- a/tests/src/fixture/ethereum.rs +++ b/tests/src/fixture/ethereum.rs @@ -7,11 +7,12 @@ use super::{ NoopRuntimeAdapterBuilder, StaticBlockRefetcher, StaticStreamBuilder, Stores, TestChain, }; use graph::blockchain::client::ChainClient; -use graph::blockchain::{BlockPtr, TriggersAdapterSelector}; +use graph::blockchain::{BlockPtr, Trigger, TriggersAdapterSelector}; use graph::cheap_clone::CheapClone; +use graph::data_source::subgraph; use graph::prelude::ethabi::ethereum_types::H256; use graph::prelude::web3::types::{Address, Log, Transaction, H160}; -use graph::prelude::{ethabi, tiny_keccak, LightEthereumBlock, ENV_VARS}; +use graph::prelude::{ethabi, tiny_keccak, DeploymentHash, Entity, LightEthereumBlock, ENV_VARS}; use graph::{blockchain::block_stream::BlockWithTriggers, prelude::ethabi::ethereum_types::U64}; use graph_chain_ethereum::network::EthereumNetworkAdapters; use graph_chain_ethereum::trigger::LogRef; @@ -81,7 +82,10 @@ pub fn genesis() -> BlockWithTriggers { number: Some(U64::from(ptr.number)), ..Default::default() })), - trigger_data: vec![EthereumTrigger::Block(ptr, EthereumBlockTriggerType::End)], + trigger_data: vec![Trigger::Chain(EthereumTrigger::Block( + ptr, + EthereumBlockTriggerType::End, + ))], } } @@ -128,7 +132,10 @@ pub fn empty_block(parent_ptr: BlockPtr, ptr: BlockPtr) -> BlockWithTriggers, payload: impl Into, + source: DeploymentHash, + entity: Entity, + entity_type: &str, +) { + block + .trigger_data + .push(Trigger::Subgraph(subgraph::TriggerData { + source, + entity: entity, + entity_type: entity_type.to_string(), + })); } pub fn push_test_command( @@ -175,12 +199,16 @@ pub fn push_test_command( }); block .trigger_data - .push(EthereumTrigger::Log(LogRef::FullLog(log, None))) + .push(Trigger::Chain(EthereumTrigger::Log(LogRef::FullLog( + log, None, + )))) } pub fn push_test_polling_trigger(block: &mut BlockWithTriggers) { - block.trigger_data.push(EthereumTrigger::Block( - block.ptr(), - EthereumBlockTriggerType::End, - )) + block + .trigger_data + .push(Trigger::Chain(EthereumTrigger::Block( + block.ptr(), + EthereumBlockTriggerType::End, + ))) } diff --git a/tests/src/fixture/mod.rs b/tests/src/fixture/mod.rs index ebed1d3a115..b01eb4c7670 100644 --- a/tests/src/fixture/mod.rs +++ b/tests/src/fixture/mod.rs @@ -1,7 +1,7 @@ pub mod ethereum; pub mod substreams; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::marker::PhantomData; use std::sync::Mutex; use std::time::{Duration, Instant}; @@ -14,13 +14,13 @@ use graph::blockchain::block_stream::{ }; use graph::blockchain::{ Block, BlockHash, BlockPtr, Blockchain, BlockchainMap, ChainIdentifier, RuntimeAdapter, - TriggersAdapter, TriggersAdapterSelector, + TriggerFilterWrapper, TriggersAdapter, TriggersAdapterSelector, }; use graph::cheap_clone::CheapClone; use graph::components::adapter::ChainId; use graph::components::link_resolver::{ArweaveClient, ArweaveResolver, FileSizeLimit}; use graph::components::metrics::MetricsRegistry; -use graph::components::store::{BlockStore, DeploymentLocator, EthereumCallCache}; +use graph::components::store::{BlockStore, DeploymentLocator, EthereumCallCache, WritableStore}; use graph::components::subgraph::Settings; use graph::data::graphql::load_manager::LoadManager; use graph::data::query::{Query, QueryTarget}; @@ -211,13 +211,14 @@ impl TestContext { let tp: Box> = Box::new(SubgraphTriggerProcessor {}); self.instance_manager - .build_subgraph_runner( + .build_subgraph_runner_inner( logger, self.env_vars.cheap_clone(), deployment, raw, Some(stop_block.block_number()), tp, + true, ) .await .unwrap() @@ -236,13 +237,14 @@ impl TestContext { ); self.instance_manager - .build_subgraph_runner( + .build_subgraph_runner_inner( logger, self.env_vars.cheap_clone(), deployment, raw, Some(stop_block.block_number()), tp, + true, ) .await .unwrap() @@ -718,14 +720,27 @@ impl BlockStreamBuilder for MutexBlockStreamBuilder { async fn build_polling( &self, - _chain: &C, - _deployment: DeploymentLocator, - _start_blocks: Vec, - _subgraph_current_block: Option, - _filter: Arc<::TriggerFilter>, - _unified_api_version: graph::data::subgraph::UnifiedMappingApiVersion, + chain: &C, + deployment: DeploymentLocator, + start_blocks: Vec, + source_subgraph_stores: Vec<(DeploymentHash, Arc)>, + subgraph_current_block: Option, + filter: Arc>, + unified_api_version: graph::data::subgraph::UnifiedMappingApiVersion, ) -> anyhow::Result>> { - unimplemented!("only firehose mode should be used for tests") + let builder = self.0.lock().unwrap().clone(); + + builder + .build_polling( + chain, + deployment, + start_blocks, + source_subgraph_stores, + subgraph_current_block, + filter, + unified_api_version, + ) + .await } } @@ -783,11 +798,22 @@ where _chain: &C, _deployment: DeploymentLocator, _start_blocks: Vec, - _subgraph_current_block: Option, - _filter: Arc, + _source_subgraph_stores: Vec<(DeploymentHash, Arc)>, + subgraph_current_block: Option, + _filter: Arc>, _unified_api_version: graph::data::subgraph::UnifiedMappingApiVersion, ) -> anyhow::Result>> { - unimplemented!("only firehose mode should be used for tests") + let current_idx = subgraph_current_block.map(|current_block| { + self.chain + .iter() + .enumerate() + .find(|(_, b)| b.ptr() == current_block) + .unwrap() + .0 + }); + Ok(Box::new(StaticStream { + stream: Box::pin(stream_events(self.chain.clone(), current_idx)), + })) } } @@ -952,11 +978,23 @@ impl TriggersAdapter for MockTriggersAdapter { todo!() } + async fn load_blocks_by_numbers( + &self, + _logger: Logger, + _block_numbers: HashSet, + ) -> Result, Error> { + unimplemented!() + } + + async fn chain_head_ptr(&self) -> Result, Error> { + todo!() + } + async fn scan_triggers( &self, _from: BlockNumber, _to: BlockNumber, - _filter: &::TriggerFilter, + _filter: &C::TriggerFilter, ) -> Result<(Vec>, BlockNumber), Error> { todo!() } diff --git a/tests/tests/runner_tests.rs b/tests/tests/runner_tests.rs index 7da707ac7cd..8f01e4a98f2 100644 --- a/tests/tests/runner_tests.rs +++ b/tests/tests/runner_tests.rs @@ -18,11 +18,12 @@ use graph::object; use graph::prelude::ethabi::ethereum_types::H256; use graph::prelude::web3::types::Address; use graph::prelude::{ - hex, CheapClone, DeploymentHash, SubgraphAssignmentProvider, SubgraphName, SubgraphStore, + hex, CheapClone, DeploymentHash, SubgraphAssignmentProvider, SubgraphName, SubgraphStore, Value, }; +use graph::schema::InputSchema; use graph_tests::fixture::ethereum::{ chain, empty_block, generate_empty_blocks_for_range, genesis, push_test_command, push_test_log, - push_test_polling_trigger, + push_test_polling_trigger, push_test_subgraph_trigger, }; use graph_tests::fixture::substreams::chain as substreams_chain; @@ -500,10 +501,19 @@ async fn substreams_trigger_filter_construction() -> anyhow::Result<()> { let runner = ctx.runner_substreams(test_ptr(0)).await; let filter = runner.build_filter_for_test(); - assert_eq!(filter.module_name(), "graph_out"); - assert_eq!(filter.modules().as_ref().unwrap().modules.len(), 2); - assert_eq!(filter.start_block().unwrap(), 0); - assert_eq!(filter.data_sources_len(), 1); + assert_eq!(filter.chain_filter.module_name(), "graph_out"); + assert_eq!( + filter + .chain_filter + .modules() + .as_ref() + .unwrap() + .modules + .len(), + 2 + ); + assert_eq!(filter.chain_filter.start_block().unwrap(), 0); + assert_eq!(filter.chain_filter.data_sources_len(), 1); Ok(()) } @@ -525,7 +535,11 @@ async fn end_block() -> anyhow::Result<()> { let runner = ctx.runner(block_ptr.clone()).await; let runner = runner.run_for_test(false).await.unwrap(); let filter = runner.context().filter.as_ref().unwrap(); - let addresses = filter.log().contract_addresses().collect::>(); + let addresses = filter + .chain_filter + .log() + .contract_addresses() + .collect::>(); if should_contain_addr { assert!(addresses.contains(&addr)); @@ -1077,6 +1091,49 @@ async fn parse_data_source_context() { ); } +#[tokio::test] +async fn subgraph_data_sources() { + let RunnerTestRecipe { stores, test_info } = + RunnerTestRecipe::new("subgraph-data-sources", "subgraph-data-sources").await; + + let schema = InputSchema::parse_latest( + "type User @entity { id: String!, val: String! }", + DeploymentHash::new("test").unwrap(), + ) + .unwrap(); + + let entity = schema + .make_entity(vec![ + ("id".into(), Value::String("id".to_owned())), + ("val".into(), Value::String("DATA".to_owned())), + ]) + .unwrap(); + + let blocks = { + let block_0 = genesis(); + let mut block_1 = empty_block(block_0.ptr(), test_ptr(1)); + push_test_subgraph_trigger( + &mut block_1, + DeploymentHash::new("QmRFXhvyvbm4z5Lo7z2mN9Ckmo623uuB2jJYbRmAXgYKXJ").unwrap(), + entity, + "User", + ); + + let block_2 = empty_block(block_1.ptr(), test_ptr(2)); + vec![block_0, block_1, block_2] + }; + let stop_block = blocks.last().unwrap().block.ptr(); + let chain = chain(&test_info.test_name, blocks, &stores, None).await; + + let ctx = fixture::setup(&test_info, &stores, &chain, None, None).await; + let _ = ctx + .runner(stop_block) + .await + .run_for_test(true) + .await + .unwrap(); +} + #[tokio::test] async fn retry_create_ds() { let RunnerTestRecipe { stores, test_info } =