Skip to content

Commit 9069486

Browse files
committed
graph: use ReadStore instead of WriteStore for source subgraph stores
1 parent 3aba98f commit 9069486

File tree

15 files changed

+107
-80
lines changed

15 files changed

+107
-80
lines changed

chain/arweave/src/chain.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use graph::blockchain::{
77
};
88
use graph::cheap_clone::CheapClone;
99
use graph::components::adapter::ChainId;
10-
use graph::components::store::{DeploymentCursorTracker, WritableStore};
10+
use graph::components::store::{DeploymentCursorTracker, ReadStore};
1111
use graph::data::subgraph::UnifiedMappingApiVersion;
1212
use graph::env::EnvVars;
1313
use graph::firehose::FirehoseEndpoint;
@@ -121,7 +121,7 @@ impl Blockchain for Chain {
121121
deployment: DeploymentLocator,
122122
store: impl DeploymentCursorTracker,
123123
start_blocks: Vec<BlockNumber>,
124-
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn WritableStore>)>,
124+
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
125125
filter: Arc<TriggerFilterWrapper<Self>>,
126126
unified_api_version: UnifiedMappingApiVersion,
127127
) -> Result<Box<dyn BlockStream<Self>>, Error> {

chain/cosmos/src/chain.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use graph::blockchain::block_stream::{BlockStreamError, BlockStreamMapper, Fireh
1212
use graph::blockchain::client::ChainClient;
1313
use graph::blockchain::{BasicBlockchainBuilder, BlockchainBuilder, NoopRuntimeAdapter};
1414
use graph::cheap_clone::CheapClone;
15-
use graph::components::store::{DeploymentCursorTracker, WritableStore};
15+
use graph::components::store::{DeploymentCursorTracker, ReadStore};
1616
use graph::data::subgraph::UnifiedMappingApiVersion;
1717
use graph::{
1818
blockchain::{
@@ -114,7 +114,7 @@ impl Blockchain for Chain {
114114
deployment: DeploymentLocator,
115115
store: impl DeploymentCursorTracker,
116116
start_blocks: Vec<BlockNumber>,
117-
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn WritableStore>)>,
117+
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
118118
filter: Arc<TriggerFilterWrapper<Self>>,
119119
unified_api_version: UnifiedMappingApiVersion,
120120
) -> Result<Box<dyn BlockStream<Self>>, Error> {

chain/ethereum/src/chain.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use graph::blockchain::{
77
TriggersAdapterSelector,
88
};
99
use graph::components::adapter::ChainId;
10-
use graph::components::store::{DeploymentCursorTracker, WritableStore};
10+
use graph::components::store::{DeploymentCursorTracker, ReadStore};
1111
use graph::data::subgraph::UnifiedMappingApiVersion;
1212
use graph::firehose::{FirehoseEndpoint, ForkStep};
1313
use graph::futures03::compat::Future01CompatExt;
@@ -128,7 +128,7 @@ impl BlockStreamBuilder<Chain> for EthereumStreamBuilder {
128128
chain: &Chain,
129129
deployment: DeploymentLocator,
130130
start_blocks: Vec<BlockNumber>,
131-
source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn WritableStore>)>,
131+
source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
132132
subgraph_current_block: Option<BlockPtr>,
133133
filter: Arc<TriggerFilterWrapper<Chain>>,
134134
unified_api_version: UnifiedMappingApiVersion,
@@ -150,7 +150,7 @@ impl BlockStreamBuilder<Chain> for EthereumStreamBuilder {
150150
chain: &Chain,
151151
deployment: DeploymentLocator,
152152
start_blocks: Vec<BlockNumber>,
153-
source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn WritableStore>)>,
153+
source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
154154
subgraph_current_block: Option<BlockPtr>,
155155
filter: Arc<TriggerFilterWrapper<Chain>>,
156156
unified_api_version: UnifiedMappingApiVersion,
@@ -437,7 +437,7 @@ impl Blockchain for Chain {
437437
deployment: DeploymentLocator,
438438
store: impl DeploymentCursorTracker,
439439
start_blocks: Vec<BlockNumber>,
440-
source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn WritableStore>)>,
440+
source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
441441
filter: Arc<TriggerFilterWrapper<Self>>,
442442
unified_api_version: UnifiedMappingApiVersion,
443443
) -> Result<Box<dyn BlockStream<Self>>, Error> {

chain/near/src/chain.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use graph::blockchain::{
88
};
99
use graph::cheap_clone::CheapClone;
1010
use graph::components::adapter::ChainId;
11-
use graph::components::store::{DeploymentCursorTracker, WritableStore};
11+
use graph::components::store::{DeploymentCursorTracker, ReadStore};
1212
use graph::data::subgraph::UnifiedMappingApiVersion;
1313
use graph::env::EnvVars;
1414
use graph::firehose::FirehoseEndpoint;
@@ -152,7 +152,7 @@ impl BlockStreamBuilder<Chain> for NearStreamBuilder {
152152
_chain: &Chain,
153153
_deployment: DeploymentLocator,
154154
_start_blocks: Vec<BlockNumber>,
155-
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn WritableStore>)>,
155+
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
156156
_subgraph_current_block: Option<BlockPtr>,
157157
_filter: Arc<TriggerFilterWrapper<Chain>>,
158158
_unified_api_version: UnifiedMappingApiVersion,
@@ -232,7 +232,7 @@ impl Blockchain for Chain {
232232
deployment: DeploymentLocator,
233233
store: impl DeploymentCursorTracker,
234234
start_blocks: Vec<BlockNumber>,
235-
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn WritableStore>)>,
235+
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
236236
filter: Arc<TriggerFilterWrapper<Self>>,
237237
unified_api_version: UnifiedMappingApiVersion,
238238
) -> Result<Box<dyn BlockStream<Self>>, Error> {

chain/starknet/src/chain.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use graph::{
1616
cheap_clone::CheapClone,
1717
components::{
1818
adapter::ChainId,
19-
store::{DeploymentCursorTracker, DeploymentLocator, WritableStore},
19+
store::{DeploymentCursorTracker, DeploymentLocator, ReadStore},
2020
},
2121
data::subgraph::UnifiedMappingApiVersion,
2222
env::EnvVars,
@@ -116,7 +116,7 @@ impl Blockchain for Chain {
116116
deployment: DeploymentLocator,
117117
store: impl DeploymentCursorTracker,
118118
start_blocks: Vec<BlockNumber>,
119-
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn WritableStore>)>,
119+
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
120120
filter: Arc<TriggerFilterWrapper<Self>>,
121121
unified_api_version: UnifiedMappingApiVersion,
122122
) -> Result<Box<dyn BlockStream<Self>>, Error> {
@@ -240,7 +240,7 @@ impl BlockStreamBuilder<Chain> for StarknetStreamBuilder {
240240
_chain: &Chain,
241241
_deployment: DeploymentLocator,
242242
_start_blocks: Vec<BlockNumber>,
243-
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn WritableStore>)>,
243+
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
244244
_subgraph_current_block: Option<BlockPtr>,
245245
_filter: Arc<TriggerFilterWrapper<Chain>>,
246246
_unified_api_version: UnifiedMappingApiVersion,

chain/substreams/src/block_stream.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use graph::{
99
substreams_block_stream::SubstreamsBlockStream,
1010
Blockchain, TriggerFilterWrapper,
1111
},
12-
components::store::{DeploymentLocator, WritableStore},
12+
components::store::{DeploymentLocator, ReadStore},
1313
data::subgraph::UnifiedMappingApiVersion,
1414
prelude::{async_trait, BlockNumber, BlockPtr, DeploymentHash},
1515
schema::InputSchema,
@@ -104,7 +104,7 @@ impl BlockStreamBuilderTrait<Chain> for BlockStreamBuilder {
104104
_chain: &Chain,
105105
_deployment: DeploymentLocator,
106106
_start_blocks: Vec<BlockNumber>,
107-
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn WritableStore>)>,
107+
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
108108
_subgraph_current_block: Option<BlockPtr>,
109109
_filter: Arc<TriggerFilterWrapper<Chain>>,
110110
_unified_api_version: UnifiedMappingApiVersion,

chain/substreams/src/chain.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use graph::blockchain::{
77
NoopRuntimeAdapter, TriggerFilterWrapper,
88
};
99
use graph::components::adapter::ChainId;
10-
use graph::components::store::{DeploymentCursorTracker, WritableStore};
10+
use graph::components::store::{DeploymentCursorTracker, ReadStore};
1111
use graph::env::EnvVars;
1212
use graph::prelude::{
1313
BlockHash, CheapClone, DeploymentHash, Entity, LoggerFactory, MetricsRegistry,
@@ -142,7 +142,7 @@ impl Blockchain for Chain {
142142
deployment: DeploymentLocator,
143143
store: impl DeploymentCursorTracker,
144144
_start_blocks: Vec<BlockNumber>,
145-
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn WritableStore>)>,
145+
_source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
146146
filter: Arc<TriggerFilterWrapper<Self>>,
147147
_unified_api_version: UnifiedMappingApiVersion,
148148
) -> Result<Box<dyn BlockStream<Self>>, Error> {

core/src/subgraph/inputs.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use graph::{
22
blockchain::{block_stream::TriggersAdapterWrapper, Blockchain},
33
components::{
4-
store::{DeploymentLocator, SubgraphFork, WritableStore},
4+
store::{DeploymentLocator, ReadStore, SubgraphFork, WritableStore},
55
subgraph::ProofOfIndexingVersion,
66
},
77
data::subgraph::{SubgraphFeature, UnifiedMappingApiVersion},
@@ -16,7 +16,7 @@ pub struct IndexingInputs<C: Blockchain> {
1616
pub features: BTreeSet<SubgraphFeature>,
1717
pub start_blocks: Vec<BlockNumber>,
1818
pub end_blocks: BTreeSet<BlockNumber>,
19-
pub source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn WritableStore>)>,
19+
pub source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
2020
pub stop_block: Option<BlockNumber>,
2121
pub store: Arc<dyn WritableStore>,
2222
pub debug_fork: Option<Arc<dyn SubgraphFork>>,

core/src/subgraph/instance_manager.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use crate::subgraph::runner::SubgraphRunner;
99
use graph::blockchain::block_stream::{BlockStreamMetrics, TriggersAdapterWrapper};
1010
use graph::blockchain::{Blockchain, BlockchainKind, DataSource, NodeCapabilities};
1111
use graph::components::metrics::gas::GasMetrics;
12-
use graph::components::store::WritableStore;
12+
use graph::components::store::ReadStore;
1313
use graph::components::subgraph::ProofOfIndexingVersion;
1414
use graph::data::subgraph::{UnresolvedSubgraphManifest, SPEC_VERSION_0_0_6};
1515
use graph::data::value::Word;
@@ -204,14 +204,14 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
204204
}
205205
}
206206

207-
pub async fn hashes_to_writable_store<C: Blockchain>(
207+
pub async fn hashes_to_read_store<C: Blockchain>(
208208
&self,
209209
logger: &Logger,
210210
link_resolver: &Arc<dyn LinkResolver>,
211211
hashes: Vec<DeploymentHash>,
212212
max_spec_version: Version,
213213
is_runner_test: bool,
214-
) -> anyhow::Result<Vec<(DeploymentHash, Arc<dyn WritableStore>)>> {
214+
) -> anyhow::Result<Vec<(DeploymentHash, Arc<dyn ReadStore>)>> {
215215
let mut writable_stores = Vec::new();
216216
let subgraph_store = self.subgraph_store.clone();
217217

@@ -235,16 +235,16 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
235235
.active_locator(&hash)?
236236
.ok_or_else(|| anyhow!("no active deployment for hash {}", hash))?;
237237

238-
let writable_store = subgraph_store
239-
.clone() // Clone the Arc again for each iteration
240-
.writable(
238+
let readable_store = subgraph_store
239+
.clone()
240+
.readable(
241241
logger.clone(),
242242
loc.id.clone(),
243243
Arc::new(manifest.template_idx_and_name().collect()),
244244
)
245245
.await?;
246246

247-
writable_stores.push((loc.hash, writable_store));
247+
writable_stores.push((loc.hash, readable_store));
248248
}
249249

250250
Ok(writable_stores)
@@ -491,8 +491,8 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
491491

492492
let decoder = Box::new(Decoder::new(decoder_hook));
493493

494-
let subgraph_data_source_writables = self
495-
.hashes_to_writable_store::<C>(
494+
let subgraph_data_source_read_stores = self
495+
.hashes_to_read_store::<C>(
496496
&logger,
497497
&link_resolver,
498498
subgraph_ds_source_deployments,
@@ -503,15 +503,15 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
503503

504504
let triggers_adapter = Arc::new(TriggersAdapterWrapper::new(
505505
triggers_adapter,
506-
subgraph_data_source_writables.clone(),
506+
subgraph_data_source_read_stores.clone(),
507507
));
508508

509509
let inputs = IndexingInputs {
510510
deployment: deployment.clone(),
511511
features,
512512
start_blocks,
513513
end_blocks,
514-
source_subgraph_stores: subgraph_data_source_writables,
514+
source_subgraph_stores: subgraph_data_source_read_stores,
515515
stop_block,
516516
store,
517517
debug_fork,

graph/src/blockchain/block_stream.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use super::{
1919
Block, BlockPtr, BlockTime, Blockchain, SubgraphFilter, Trigger, TriggerFilterWrapper,
2020
};
2121
use crate::anyhow::Result;
22-
use crate::components::store::{BlockNumber, DeploymentLocator, WritableStore};
22+
use crate::components::store::{BlockNumber, DeploymentLocator, ReadStore};
2323
use crate::data::subgraph::UnifiedMappingApiVersion;
2424
use crate::firehose::{self, FirehoseEndpoint};
2525
use crate::futures03::stream::StreamExt as _;
@@ -149,7 +149,7 @@ pub trait BlockStreamBuilder<C: Blockchain>: Send + Sync {
149149
chain: &C,
150150
deployment: DeploymentLocator,
151151
start_blocks: Vec<BlockNumber>,
152-
source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn WritableStore>)>,
152+
source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
153153
subgraph_current_block: Option<BlockPtr>,
154154
filter: Arc<TriggerFilterWrapper<C>>,
155155
unified_api_version: UnifiedMappingApiVersion,
@@ -160,7 +160,7 @@ pub trait BlockStreamBuilder<C: Blockchain>: Send + Sync {
160160
chain: &C,
161161
deployment: DeploymentLocator,
162162
start_blocks: Vec<BlockNumber>,
163-
source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn WritableStore>)>,
163+
source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
164164
subgraph_current_block: Option<BlockPtr>,
165165
filter: Arc<TriggerFilterWrapper<C>>,
166166
unified_api_version: UnifiedMappingApiVersion,
@@ -317,13 +317,13 @@ impl<C: Blockchain> BlockWithTriggers<C> {
317317

318318
pub struct TriggersAdapterWrapper<C: Blockchain> {
319319
pub adapter: Arc<dyn TriggersAdapter<C>>,
320-
pub source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn WritableStore>)>,
320+
pub source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
321321
}
322322

323323
impl<C: Blockchain> TriggersAdapterWrapper<C> {
324324
pub fn new(
325325
adapter: Arc<dyn TriggersAdapter<C>>,
326-
source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn WritableStore>)>,
326+
source_subgraph_stores: Vec<(DeploymentHash, Arc<dyn ReadStore>)>,
327327
) -> Self {
328328
Self {
329329
adapter,

0 commit comments

Comments
 (0)