Skip to content

Commit b5af690

Browse files
committed
Subgraph Composition: Reading the entities for subgraph as a datasource
1 parent 383bbc9 commit b5af690

File tree

20 files changed

+443
-97
lines changed

20 files changed

+443
-97
lines changed

chain/arweave/src/chain.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use graph::blockchain::{
77
};
88
use graph::cheap_clone::CheapClone;
99
use graph::components::network_provider::ChainName;
10-
use graph::components::store::{DeploymentCursorTracker, ReadStore};
10+
use graph::components::store::{DeploymentCursorTracker, SourceableStore};
1111
use graph::data::subgraph::UnifiedMappingApiVersion;
1212
use graph::env::EnvVars;
1313
use graph::firehose::FirehoseEndpoint;
@@ -121,7 +121,7 @@ impl Blockchain for Chain {
121121
deployment: DeploymentLocator,
122122
store: impl DeploymentCursorTracker,
123123
start_blocks: Vec<BlockNumber>,
124-
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
124+
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn SourceableStore>)>,
125125
filter: Arc<TriggerFilterWrapper<Self>>,
126126
unified_api_version: UnifiedMappingApiVersion,
127127
) -> Result<Box<dyn BlockStream<Self>>, Error> {

chain/cosmos/src/chain.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use graph::blockchain::block_stream::{BlockStreamError, BlockStreamMapper, Fireh
1212
use graph::blockchain::client::ChainClient;
1313
use graph::blockchain::{BasicBlockchainBuilder, BlockchainBuilder, NoopRuntimeAdapter};
1414
use graph::cheap_clone::CheapClone;
15-
use graph::components::store::{DeploymentCursorTracker, ReadStore};
15+
use graph::components::store::{DeploymentCursorTracker, SourceableStore};
1616
use graph::data::subgraph::UnifiedMappingApiVersion;
1717
use graph::{
1818
blockchain::{
@@ -114,7 +114,7 @@ impl Blockchain for Chain {
114114
deployment: DeploymentLocator,
115115
store: impl DeploymentCursorTracker,
116116
start_blocks: Vec<BlockNumber>,
117-
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
117+
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn SourceableStore>)>,
118118
filter: Arc<TriggerFilterWrapper<Self>>,
119119
unified_api_version: UnifiedMappingApiVersion,
120120
) -> Result<Box<dyn BlockStream<Self>>, Error> {

chain/ethereum/src/chain.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use graph::blockchain::{
77
TriggersAdapterSelector,
88
};
99
use graph::components::network_provider::ChainName;
10-
use graph::components::store::{DeploymentCursorTracker, ReadStore};
10+
use graph::components::store::{DeploymentCursorTracker, SourceableStore};
1111
use graph::data::subgraph::UnifiedMappingApiVersion;
1212
use graph::firehose::{FirehoseEndpoint, ForkStep};
1313
use graph::futures03::compat::Future01CompatExt;
@@ -128,7 +128,7 @@ impl BlockStreamBuilder<Chain> for EthereumStreamBuilder {
128128
chain: &Chain,
129129
deployment: DeploymentLocator,
130130
start_blocks: Vec<BlockNumber>,
131-
source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
131+
source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn SourceableStore>)>,
132132
subgraph_current_block: Option<BlockPtr>,
133133
filter: Arc<TriggerFilterWrapper<Chain>>,
134134
unified_api_version: UnifiedMappingApiVersion,
@@ -150,7 +150,7 @@ impl BlockStreamBuilder<Chain> for EthereumStreamBuilder {
150150
chain: &Chain,
151151
deployment: DeploymentLocator,
152152
start_blocks: Vec<BlockNumber>,
153-
source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
153+
source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn SourceableStore>)>,
154154
subgraph_current_block: Option<BlockPtr>,
155155
filter: Arc<TriggerFilterWrapper<Chain>>,
156156
unified_api_version: UnifiedMappingApiVersion,
@@ -437,7 +437,7 @@ impl Blockchain for Chain {
437437
deployment: DeploymentLocator,
438438
store: impl DeploymentCursorTracker,
439439
start_blocks: Vec<BlockNumber>,
440-
source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
440+
source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn SourceableStore>)>,
441441
filter: Arc<TriggerFilterWrapper<Self>>,
442442
unified_api_version: UnifiedMappingApiVersion,
443443
) -> Result<Box<dyn BlockStream<Self>>, Error> {

chain/near/src/chain.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use graph::blockchain::{
88
};
99
use graph::cheap_clone::CheapClone;
1010
use graph::components::network_provider::ChainName;
11-
use graph::components::store::{DeploymentCursorTracker, ReadStore};
11+
use graph::components::store::{DeploymentCursorTracker, SourceableStore};
1212
use graph::data::subgraph::UnifiedMappingApiVersion;
1313
use graph::env::EnvVars;
1414
use graph::firehose::FirehoseEndpoint;
@@ -152,7 +152,7 @@ impl BlockStreamBuilder<Chain> for NearStreamBuilder {
152152
_chain: &Chain,
153153
_deployment: DeploymentLocator,
154154
_start_blocks: Vec<BlockNumber>,
155-
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
155+
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn SourceableStore>)>,
156156
_subgraph_current_block: Option<BlockPtr>,
157157
_filter: Arc<TriggerFilterWrapper<Chain>>,
158158
_unified_api_version: UnifiedMappingApiVersion,
@@ -232,7 +232,7 @@ impl Blockchain for Chain {
232232
deployment: DeploymentLocator,
233233
store: impl DeploymentCursorTracker,
234234
start_blocks: Vec<BlockNumber>,
235-
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
235+
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn SourceableStore>)>,
236236
filter: Arc<TriggerFilterWrapper<Self>>,
237237
unified_api_version: UnifiedMappingApiVersion,
238238
) -> Result<Box<dyn BlockStream<Self>>, Error> {

chain/substreams/src/block_stream.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use graph::{
99
substreams_block_stream::SubstreamsBlockStream,
1010
Blockchain, TriggerFilterWrapper,
1111
},
12-
components::store::{DeploymentLocator, ReadStore},
12+
components::store::{DeploymentLocator, SourceableStore},
1313
data::subgraph::UnifiedMappingApiVersion,
1414
prelude::{async_trait, BlockNumber, BlockPtr, DeploymentHash},
1515
schema::InputSchema,
@@ -104,7 +104,7 @@ impl BlockStreamBuilderTrait<Chain> for BlockStreamBuilder {
104104
_chain: &Chain,
105105
_deployment: DeploymentLocator,
106106
_start_blocks: Vec<BlockNumber>,
107-
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
107+
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn SourceableStore>)>,
108108
_subgraph_current_block: Option<BlockPtr>,
109109
_filter: Arc<TriggerFilterWrapper<Chain>>,
110110
_unified_api_version: UnifiedMappingApiVersion,

chain/substreams/src/chain.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use graph::blockchain::{
77
NoopRuntimeAdapter, TriggerFilterWrapper,
88
};
99
use graph::components::network_provider::ChainName;
10-
use graph::components::store::{DeploymentCursorTracker, ReadStore};
10+
use graph::components::store::{DeploymentCursorTracker, SourceableStore};
1111
use graph::env::EnvVars;
1212
use graph::prelude::{
1313
BlockHash, CheapClone, DeploymentHash, Entity, LoggerFactory, MetricsRegistry,
@@ -142,7 +142,7 @@ impl Blockchain for Chain {
142142
deployment: DeploymentLocator,
143143
store: impl DeploymentCursorTracker,
144144
_start_blocks: Vec<BlockNumber>,
145-
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
145+
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn SourceableStore>)>,
146146
filter: Arc<TriggerFilterWrapper<Self>>,
147147
_unified_api_version: UnifiedMappingApiVersion,
148148
) -> Result<Box<dyn BlockStream<Self>>, Error> {

core/src/subgraph/inputs.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use graph::{
22
blockchain::{block_stream::TriggersAdapterWrapper, Blockchain},
33
components::{
4-
store::{DeploymentLocator, ReadStore, SubgraphFork, WritableStore},
4+
store::{DeploymentLocator, SourceableStore, SubgraphFork, WritableStore},
55
subgraph::ProofOfIndexingVersion,
66
},
77
data::subgraph::{SubgraphFeature, UnifiedMappingApiVersion},
@@ -16,7 +16,7 @@ pub struct IndexingInputs<C: Blockchain> {
1616
pub features: BTreeSet<SubgraphFeature>,
1717
pub start_blocks: Vec<BlockNumber>,
1818
pub end_blocks: BTreeSet<BlockNumber>,
19-
pub source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
19+
pub source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn SourceableStore>)>,
2020
pub stop_block: Option<BlockNumber>,
2121
pub max_end_block: Option<BlockNumber>,
2222
pub store: Arc<dyn WritableStore>,

core/src/subgraph/instance_manager.rs

Lines changed: 8 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,13 @@ use graph::blockchain::block_stream::{BlockStreamMetrics, TriggersAdapterWrapper
1313
use graph::blockchain::{Blockchain, BlockchainKind, DataSource, NodeCapabilities};
1414
use graph::components::metrics::gas::GasMetrics;
1515
use graph::components::metrics::subgraph::DeploymentStatusMetric;
16-
use graph::components::store::ReadStore;
16+
use graph::components::store::SourceableStore;
1717
use graph::components::subgraph::ProofOfIndexingVersion;
1818
use graph::data::subgraph::{UnresolvedSubgraphManifest, SPEC_VERSION_0_0_6};
1919
use graph::data::value::Word;
2020
use graph::data_source::causality_region::CausalityRegionSeq;
2121
use graph::env::EnvVars;
2222
use graph::prelude::{SubgraphInstanceManager as SubgraphInstanceManagerTrait, *};
23-
use graph::semver::Version;
2423
use graph::{blockchain::BlockchainMap, components::store::DeploymentLocator};
2524
use graph_runtime_wasm::module::ToAscPtr;
2625
use graph_runtime_wasm::RuntimeHostBuilder;
@@ -232,48 +231,27 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
232231

233232
pub async fn hashes_to_read_store<C: Blockchain>(
234233
&self,
235-
logger: &Logger,
236-
link_resolver: &Arc<dyn LinkResolver>,
237234
hashes: Vec<DeploymentHash>,
238-
max_spec_version: Version,
239235
is_runner_test: bool,
240-
) -> anyhow::Result<Vec<(DeploymentHash, Arc<dyn ReadStore>)>> {
241-
let mut writable_stores = Vec::new();
236+
) -> anyhow::Result<Vec<(DeploymentHash, Arc<dyn SourceableStore>)>> {
237+
let mut sourceable_stores = Vec::new();
242238
let subgraph_store = self.subgraph_store.clone();
243239

244240
if is_runner_test {
245-
return Ok(writable_stores);
241+
return Ok(sourceable_stores);
246242
}
247243

248244
for hash in hashes {
249-
let file_bytes = link_resolver
250-
.cat(logger, &hash.to_ipfs_link())
251-
.await
252-
.map_err(SubgraphAssignmentProviderError::ResolveError)?;
253-
let raw: serde_yaml::Mapping = serde_yaml::from_slice(&file_bytes)
254-
.map_err(|e| SubgraphAssignmentProviderError::ResolveError(e.into()))?;
255-
let manifest = UnresolvedSubgraphManifest::<C>::parse(hash.cheap_clone(), raw)?;
256-
let manifest = manifest
257-
.resolve(&link_resolver, &logger, max_spec_version.clone())
258-
.await?;
259-
260245
let loc = subgraph_store
261246
.active_locator(&hash)?
262247
.ok_or_else(|| anyhow!("no active deployment for hash {}", hash))?;
263248

264-
let readable_store = subgraph_store
265-
.clone()
266-
.readable(
267-
logger.clone(),
268-
loc.id.clone(),
269-
Arc::new(manifest.template_idx_and_name().collect()),
270-
)
271-
.await?;
249+
let sourceable_store = subgraph_store.clone().sourceable(loc.id.clone()).await?;
272250

273-
writable_stores.push((loc.hash, readable_store));
251+
sourceable_stores.push((loc.hash, sourceable_store));
274252
}
275253

276-
Ok(writable_stores)
254+
Ok(sourceable_stores)
277255
}
278256

279257
pub async fn build_subgraph_runner<C>(
@@ -540,13 +518,7 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
540518
let decoder = Box::new(Decoder::new(decoder_hook));
541519

542520
let subgraph_data_source_read_stores = self
543-
.hashes_to_read_store::<C>(
544-
&logger,
545-
&link_resolver,
546-
subgraph_ds_source_deployments,
547-
manifest.spec_version.clone(),
548-
is_runner_test,
549-
)
521+
.hashes_to_read_store::<C>(subgraph_ds_source_deployments, is_runner_test)
550522
.await?;
551523

552524
let triggers_adapter = Arc::new(TriggersAdapterWrapper::new(

graph/src/blockchain/block_stream.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use super::{
1919
Block, BlockPtr, BlockTime, Blockchain, SubgraphFilter, Trigger, TriggerFilterWrapper,
2020
};
2121
use crate::anyhow::Result;
22-
use crate::components::store::{BlockNumber, DeploymentLocator, ReadStore};
22+
use crate::components::store::{BlockNumber, DeploymentLocator, SourceableStore};
2323
use crate::data::subgraph::UnifiedMappingApiVersion;
2424
use crate::firehose::{self, FirehoseEndpoint};
2525
use crate::futures03::stream::StreamExt as _;
@@ -149,7 +149,7 @@ pub trait BlockStreamBuilder<C: Blockchain>: Send + Sync {
149149
chain: &C,
150150
deployment: DeploymentLocator,
151151
start_blocks: Vec<BlockNumber>,
152-
source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
152+
source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn SourceableStore>)>,
153153
subgraph_current_block: Option<BlockPtr>,
154154
filter: Arc<TriggerFilterWrapper<C>>,
155155
unified_api_version: UnifiedMappingApiVersion,
@@ -160,7 +160,7 @@ pub trait BlockStreamBuilder<C: Blockchain>: Send + Sync {
160160
chain: &C,
161161
deployment: DeploymentLocator,
162162
start_blocks: Vec<BlockNumber>,
163-
source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
163+
source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn SourceableStore>)>,
164164
subgraph_current_block: Option<BlockPtr>,
165165
filter: Arc<TriggerFilterWrapper<C>>,
166166
unified_api_version: UnifiedMappingApiVersion,
@@ -320,13 +320,13 @@ impl<C: Blockchain> BlockWithTriggers<C> {
320320
/// logic for each chain, increasing code repetition.
321321
pub struct TriggersAdapterWrapper<C: Blockchain> {
322322
pub adapter: Arc<dyn TriggersAdapter<C>>,
323-
pub source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
323+
pub source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn SourceableStore>)>,
324324
}
325325

326326
impl<C: Blockchain> TriggersAdapterWrapper<C> {
327327
pub fn new(
328328
adapter: Arc<dyn TriggersAdapter<C>>,
329-
source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
329+
source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn SourceableStore>)>,
330330
) -> Self {
331331
Self {
332332
adapter,

graph/src/blockchain/mock.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use crate::{
22
bail,
33
components::{
44
link_resolver::LinkResolver,
5-
store::{BlockNumber, DeploymentCursorTracker, DeploymentLocator, ReadStore},
5+
store::{BlockNumber, DeploymentCursorTracker, DeploymentLocator, SourceableStore},
66
subgraph::InstanceDSTemplateInfo,
77
},
88
data::subgraph::UnifiedMappingApiVersion,
@@ -386,7 +386,7 @@ impl Blockchain for MockBlockchain {
386386
_deployment: DeploymentLocator,
387387
_store: impl DeploymentCursorTracker,
388388
_start_blocks: Vec<BlockNumber>,
389-
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
389+
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn SourceableStore>)>,
390390
_filter: Arc<TriggerFilterWrapper<Self>>,
391391
_unified_api_version: UnifiedMappingApiVersion,
392392
) -> Result<Box<dyn BlockStream<Self>>, Error> {

0 commit comments

Comments
 (0)