Skip to content

Commit 81fb399

Browse files
author
Zoran Cvetkov
committed
fetch block from DB
1 parent 8878caf commit 81fb399

File tree

4 files changed

+394
-27
lines changed

4 files changed

+394
-27
lines changed

graph/src/components/store/traits.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -293,17 +293,24 @@ impl<T: ?Sized + DeploymentCursorTracker> DeploymentCursorTracker for Arc<T> {
293293
}
294294
}
295295

296-
pub trait SourceableStore: DeploymentCursorTracker {
296+
#[async_trait]
297+
pub trait SourceableStore: Sync + Send + 'static {
297298
/// Returns all versions of entities of the given entity_type that were
298299
/// changed in the given block_range.
299300
fn get_range(
300301
&self,
301302
entity_type: &EntityType,
302303
block_range: Range<BlockNumber>,
303304
) -> Result<BTreeMap<BlockNumber, Vec<Entity>>, StoreError>;
305+
306+
fn input_schema(&self) -> InputSchema;
307+
308+
/// Get a pointer to the most recently processed block in the subgraph.
309+
async fn block_ptr(&self) -> Result<Option<BlockPtr>, StoreError>;
304310
}
305311

306312
// This silly impl is needed until https://github.com/rust-lang/rust/issues/65991 is stable.
313+
#[async_trait]
307314
impl<T: ?Sized + SourceableStore> SourceableStore for Arc<T> {
308315
fn get_range(
309316
&self,
@@ -312,6 +319,14 @@ impl<T: ?Sized + SourceableStore> SourceableStore for Arc<T> {
312319
) -> Result<BTreeMap<BlockNumber, Vec<Entity>>, StoreError> {
313320
(**self).get_range(entity_type, block_range)
314321
}
322+
323+
fn input_schema(&self) -> InputSchema {
324+
(**self).input_schema()
325+
}
326+
327+
async fn block_ptr(&self) -> Result<Option<BlockPtr>, StoreError> {
328+
(**self).block_ptr().await
329+
}
315330
}
316331

317332
/// A view of the store for indexing. All indexing-related operations need

store/postgres/src/subgraph_store.rs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1545,16 +1545,12 @@ impl SubgraphStoreTrait for SubgraphStore {
15451545
let site = self.find_site(deployment)?;
15461546
let store = self.for_site(&site)?;
15471547
let input_schema = self.input_schema(&site.deployment)?;
1548-
let block_ptr = store.block_ptr(site.clone()).await?;
1549-
let block_cursor = store.block_cursor(site.clone()).await?;
1550-
let s = Arc::new(SourceableStore::new(
1548+
1549+
Ok(Arc::new(SourceableStore::new(
15511550
site,
15521551
store.clone(),
1553-
block_ptr,
1554-
block_cursor,
15551552
input_schema,
1556-
));
1557-
Ok(s as Arc<dyn store::SourceableStore>)
1553+
)))
15581554
}
15591555

15601556
async fn stop_subgraph(&self, loc: &DeploymentLocator) -> Result<(), StoreError> {

store/postgres/src/writable.rs

Lines changed: 6 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use std::sync::{Mutex, RwLock, TryLockError as RwLockError};
55
use std::time::Instant;
66
use std::{collections::BTreeMap, sync::Arc};
77

8+
use async_trait::async_trait;
89
use graph::blockchain::block_stream::FirehoseCursor;
910
use graph::blockchain::BlockTime;
1011
use graph::components::store::{Batch, DeploymentCursorTracker, DerivedEntityQuery, ReadStore};
@@ -1572,29 +1573,20 @@ impl ReadStore for WritableStore {
15721573
pub struct SourceableStore {
15731574
site: Arc<Site>,
15741575
store: Arc<DeploymentStore>,
1575-
block_ptr: Option<BlockPtr>,
1576-
block_cursor: FirehoseCursor,
15771576
input_schema: InputSchema,
15781577
}
15791578

15801579
impl SourceableStore {
1581-
pub fn new(
1582-
site: Arc<Site>,
1583-
store: Arc<DeploymentStore>,
1584-
block_ptr: Option<BlockPtr>,
1585-
block_cursor: FirehoseCursor,
1586-
input_schema: InputSchema,
1587-
) -> Self {
1580+
pub fn new(site: Arc<Site>, store: Arc<DeploymentStore>, input_schema: InputSchema) -> Self {
15881581
Self {
15891582
site,
15901583
store,
1591-
block_ptr,
1592-
block_cursor,
15931584
input_schema,
15941585
}
15951586
}
15961587
}
15971588

1589+
#[async_trait]
15981590
impl store::SourceableStore for SourceableStore {
15991591
fn get_range(
16001592
&self,
@@ -1604,18 +1596,13 @@ impl store::SourceableStore for SourceableStore {
16041596
self.store
16051597
.get_range(self.site.clone(), entity_type, block_range)
16061598
}
1607-
}
1608-
impl DeploymentCursorTracker for SourceableStore {
1599+
16091600
fn input_schema(&self) -> InputSchema {
16101601
self.input_schema.cheap_clone()
16111602
}
16121603

1613-
fn block_ptr(&self) -> Option<BlockPtr> {
1614-
self.block_ptr.clone()
1615-
}
1616-
1617-
fn firehose_cursor(&self) -> FirehoseCursor {
1618-
self.block_cursor.clone()
1604+
async fn block_ptr(&self) -> Result<Option<BlockPtr>, StoreError> {
1605+
self.store.block_ptr(self.site.cheap_clone()).await
16191606
}
16201607
}
16211608

0 commit comments

Comments
 (0)