Skip to content

Commit 6a47d26

Browse files
author
Zoran Cvetkov
committed
remove get_range() from WritableStore
1 parent e25e9bf commit 6a47d26

File tree

6 files changed

+97
-70
lines changed

6 files changed

+97
-70
lines changed

graph/src/components/store/mod.rs

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ use std::collections::btree_map::Entry;
2121
use std::collections::{BTreeMap, BTreeSet, HashSet};
2222
use std::fmt;
2323
use std::fmt::Display;
24-
use std::ops::Range;
2524
use std::sync::atomic::{AtomicUsize, Ordering};
2625
use std::sync::{Arc, RwLock};
2726
use std::time::Duration;
@@ -1039,14 +1038,6 @@ impl ReadStore for EmptyStore {
10391038
Ok(BTreeMap::new())
10401039
}
10411040

1042-
fn get_range(
1043-
&self,
1044-
_entity_type: &EntityType,
1045-
_block_range: Range<BlockNumber>,
1046-
) -> Result<BTreeMap<BlockNumber, Vec<Entity>>, StoreError> {
1047-
Ok(BTreeMap::new())
1048-
}
1049-
10501041
fn get_derived(
10511042
&self,
10521043
_query: &DerivedEntityQuery,

graph/src/components/store/traits.rs

Lines changed: 28 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,13 @@ pub trait SubgraphStore: Send + Sync + 'static {
193193
manifest_idx_and_name: Arc<Vec<(u32, String)>>,
194194
) -> Result<Arc<dyn ReadStore>, StoreError>;
195195

196+
async fn sourceable(
197+
self: Arc<Self>,
198+
logger: Logger,
199+
deployment: DeploymentId,
200+
manifest_idx_and_name: Arc<Vec<(u32, String)>>,
201+
) -> Result<Arc<dyn SourceableStore>, StoreError>;
202+
196203
/// Initiate a graceful shutdown of the writable that a previous call to
197204
/// `writable` might have started
198205
async fn stop_subgraph(&self, deployment: &DeploymentLocator) -> Result<(), StoreError>;
@@ -235,14 +242,6 @@ pub trait ReadStore: Send + Sync + 'static {
235242
keys: BTreeSet<EntityKey>,
236243
) -> Result<BTreeMap<EntityKey, Entity>, StoreError>;
237244

238-
/// Returns all versions of entities of the given entity_type that were
239-
/// changed in the given block_range.
240-
fn get_range(
241-
&self,
242-
entity_type: &EntityType,
243-
block_range: Range<BlockNumber>,
244-
) -> Result<BTreeMap<BlockNumber, Vec<Entity>>, StoreError>;
245-
246245
/// Reverse lookup
247246
fn get_derived(
248247
&self,
@@ -265,14 +264,6 @@ impl<T: ?Sized + ReadStore> ReadStore for Arc<T> {
265264
(**self).get_many(keys)
266265
}
267266

268-
fn get_range(
269-
&self,
270-
entity_type: &EntityType,
271-
block_range: Range<BlockNumber>,
272-
) -> Result<BTreeMap<BlockNumber, Vec<Entity>>, StoreError> {
273-
(**self).get_range(entity_type, block_range)
274-
}
275-
276267
fn get_derived(
277268
&self,
278269
entity_derived: &DerivedEntityQuery,
@@ -285,6 +276,27 @@ impl<T: ?Sized + ReadStore> ReadStore for Arc<T> {
285276
}
286277
}
287278

279+
pub trait SourceableStore: Send + Sync + 'static {
280+
/// Returns all versions of entities of the given entity_type that were
281+
/// changed in the given block_range.
282+
fn get_range(
283+
&self,
284+
entity_type: &EntityType,
285+
block_range: Range<BlockNumber>,
286+
) -> Result<BTreeMap<BlockNumber, Vec<Entity>>, StoreError>;
287+
}
288+
289+
// This silly impl is needed until https://github.com/rust-lang/rust/issues/65991 is stable.
290+
impl<T: ?Sized + SourceableStore> SourceableStore for Arc<T> {
291+
fn get_range(
292+
&self,
293+
entity_type: &EntityType,
294+
block_range: Range<BlockNumber>,
295+
) -> Result<BTreeMap<BlockNumber, Vec<Entity>>, StoreError> {
296+
(**self).get_range(entity_type, block_range)
297+
}
298+
}
299+
288300
pub trait DeploymentCursorTracker: Sync + Send + 'static {
289301
fn input_schema(&self) -> InputSchema;
290302

store/postgres/src/subgraph_store.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ use crate::{
4444
index::{IndexList, Method},
4545
Layout,
4646
},
47-
writable::WritableStore,
47+
writable::{SourceableStore, WritableStore},
4848
NotificationSender,
4949
};
5050
use crate::{
@@ -1548,6 +1548,19 @@ impl SubgraphStoreTrait for SubgraphStore {
15481548
.map(|store| store as Arc<dyn store::ReadStore>)
15491549
}
15501550

1551+
async fn sourceable(
1552+
self: Arc<Self>,
1553+
_1logger: Logger,
1554+
deployment: graph::components::store::DeploymentId,
1555+
_manifest_idx_and_name: Arc<Vec<(u32, String)>>,
1556+
) -> Result<Arc<dyn store::SourceableStore>, StoreError> {
1557+
let deployment = deployment.into();
1558+
let site = self.find_site(deployment)?;
1559+
let store = self.for_site(&site)?;
1560+
let s = Arc::new(SourceableStore::new(site, store.clone()));
1561+
Ok(s as Arc<dyn store::SourceableStore>)
1562+
}
1563+
15511564
async fn stop_subgraph(&self, loc: &DeploymentLocator) -> Result<(), StoreError> {
15521565
self.evict(&loc.hash)?;
15531566

store/postgres/src/writable.rs

Lines changed: 22 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -352,17 +352,6 @@ impl SyncStore {
352352
})
353353
}
354354

355-
fn get_range(
356-
&self,
357-
entity_type: &EntityType,
358-
block_range: Range<BlockNumber>,
359-
) -> Result<BTreeMap<BlockNumber, Vec<Entity>>, StoreError> {
360-
retry::forever(&self.logger, "get_range", || {
361-
self.writable
362-
.get_range(self.site.cheap_clone(), entity_type, block_range.clone())
363-
})
364-
}
365-
366355
fn get_derived(
367356
&self,
368357
key: &DerivedEntityQuery,
@@ -1568,16 +1557,6 @@ impl ReadStore for WritableStore {
15681557
self.writer.get_many(keys)
15691558
}
15701559

1571-
// The entities that are returned are only the ones from the database.
1572-
// The ones in the queue are ignored.
1573-
fn get_range(
1574-
&self,
1575-
entity_type: &EntityType,
1576-
block_range: Range<BlockNumber>,
1577-
) -> Result<BTreeMap<BlockNumber, Vec<Entity>>, StoreError> {
1578-
self.store.get_range(entity_type, block_range)
1579-
}
1580-
15811560
fn get_derived(
15821561
&self,
15831562
key: &DerivedEntityQuery,
@@ -1590,6 +1569,28 @@ impl ReadStore for WritableStore {
15901569
}
15911570
}
15921571

1572+
pub struct SourceableStore {
1573+
site: Arc<Site>,
1574+
store: Arc<DeploymentStore>,
1575+
}
1576+
1577+
impl SourceableStore {
1578+
pub fn new(site: Arc<Site>, store: Arc<DeploymentStore>) -> Self {
1579+
Self { site, store }
1580+
}
1581+
}
1582+
1583+
impl store::SourceableStore for SourceableStore {
1584+
fn get_range(
1585+
&self,
1586+
entity_type: &EntityType,
1587+
block_range: Range<BlockNumber>,
1588+
) -> Result<BTreeMap<BlockNumber, Vec<Entity>>, StoreError> {
1589+
self.store
1590+
.get_range(self.site.clone(), entity_type, block_range)
1591+
}
1592+
}
1593+
15931594
impl DeploymentCursorTracker for WritableStore {
15941595
fn block_ptr(&self) -> Option<BlockPtr> {
15951596
self.block_ptr.lock().unwrap().clone()

store/test-store/tests/graph/entity_cache.rs

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ use lazy_static::lazy_static;
2020
use slog::Logger;
2121
use std::collections::{BTreeMap, BTreeSet};
2222
use std::marker::PhantomData;
23-
use std::ops::Range;
2423
use std::sync::Arc;
2524
use web3::types::H256;
2625

@@ -67,14 +66,6 @@ impl ReadStore for MockStore {
6766
Ok(self.get_many_res.clone())
6867
}
6968

70-
fn get_range(
71-
&self,
72-
_entity_type: &EntityType,
73-
_block_range: Range<BlockNumber>,
74-
) -> Result<BTreeMap<BlockNumber, Vec<Entity>>, StoreError> {
75-
todo!()
76-
}
77-
7869
fn get_derived(
7970
&self,
8071
_key: &DerivedEntityQuery,

store/test-store/tests/postgres/writable.rs

Lines changed: 33 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@ use std::marker::PhantomData;
99
use std::ops::Range;
1010
use test_store::*;
1111

12-
use graph::components::store::{DeploymentLocator, DerivedEntityQuery, WritableStore};
12+
use graph::components::store::{
13+
DeploymentLocator, DerivedEntityQuery, SourceableStore, WritableStore,
14+
};
1315
use graph::data::subgraph::*;
1416
use graph::semver::Version;
1517
use graph::{entity, prelude::*};
@@ -87,7 +89,14 @@ fn remove_test_data(store: Arc<DieselSubgraphStore>) {
8789
/// Test harness for running database integration tests.
8890
fn run_test<R, F>(test: F)
8991
where
90-
F: FnOnce(Arc<DieselStore>, Arc<dyn WritableStore>, DeploymentLocator) -> R + Send + 'static,
92+
F: FnOnce(
93+
Arc<DieselStore>,
94+
Arc<dyn WritableStore>,
95+
Arc<dyn SourceableStore>,
96+
DeploymentLocator,
97+
) -> R
98+
+ Send
99+
+ 'static,
91100
R: std::future::Future<Output = ()> + Send + 'static,
92101
{
93102
run_test_sequentially(|store| async move {
@@ -102,10 +111,15 @@ where
102111
.writable(LOGGER.clone(), deployment.id, Arc::new(Vec::new()))
103112
.await
104113
.expect("we can get a writable store");
114+
let sourceable = store
115+
.subgraph_store()
116+
.sourceable(LOGGER.clone(), deployment.id, Arc::new(Vec::new()))
117+
.await
118+
.expect("we can get a writable store");
105119

106120
// Run test and wait for the background writer to finish its work so
107121
// it won't conflict with the next test
108-
test(store, writable, deployment).await;
122+
test(store, writable, sourceable, deployment).await;
109123
});
110124
}
111125

@@ -196,7 +210,7 @@ fn get_with_pending<F>(batch: bool, read_count: F)
196210
where
197211
F: Send + Fn(&dyn WritableStore) -> i32 + Sync + 'static,
198212
{
199-
run_test(move |store, writable, deployment| async move {
213+
run_test(move |store, writable, _, deployment| async move {
200214
let subgraph_store = store.subgraph_store();
201215

202216
let read_count = || read_count(writable.as_ref());
@@ -294,7 +308,7 @@ fn get_derived_nobatch() {
294308

295309
#[test]
296310
fn restart() {
297-
run_test(|store, writable, deployment| async move {
311+
run_test(|store, writable, _, deployment| async move {
298312
let subgraph_store = store.subgraph_store();
299313
let schema = subgraph_store.input_schema(&deployment.hash).unwrap();
300314

@@ -346,6 +360,7 @@ fn restart() {
346360
async fn read_range(
347361
store: Arc<Store>,
348362
writable: Arc<dyn WritableStore>,
363+
sourceable: Arc<dyn SourceableStore>,
349364
deployment: DeploymentLocator,
350365
mutable: bool,
351366
) -> usize {
@@ -367,22 +382,26 @@ async fn read_range(
367382
} else {
368383
&COUNTER2_TYPE
369384
};
370-
let e = writable.get_range(et, br).unwrap();
385+
let e = sourceable.get_range(et, br).unwrap();
371386
e.iter().map(|(_, v)| v.iter()).flatten().count()
372387
}
373388

374389
#[test]
375390
fn read_range_mutable() {
376-
run_test(|store, writable, deployment| async move {
377-
let num_entities = read_range(store, writable, deployment, true).await;
378-
assert_eq!(num_entities, 6) // TODO: fix it - it should be 4 as the range is open
379-
})
391+
run_test(
392+
|store, writable, sourceable: Arc<dyn SourceableStore>, deployment| async move {
393+
let num_entities = read_range(store, writable, sourceable, deployment, true).await;
394+
assert_eq!(num_entities, 6) // TODO: fix it - it should be 4 as the range is open
395+
},
396+
)
380397
}
381398

382399
#[test]
383400
fn read_range_immutable() {
384-
run_test(|store, writable, deployment| async move {
385-
let num_entities = read_range(store, writable, deployment, false).await;
386-
assert_eq!(num_entities, 6) // TODO: fix it - it should be 4 as the range is open
387-
})
401+
run_test(
402+
|store, writable, sourceable: Arc<dyn SourceableStore>, deployment| async move {
403+
let num_entities = read_range(store, writable, sourceable, deployment, false).await;
404+
assert_eq!(num_entities, 6) // TODO: fix it - it should be 4 as the range is open
405+
},
406+
)
388407
}

0 commit comments

Comments
 (0)