Skip to content

Commit 2a6c61c

Browse files
author
Zoran Cvetkov
committed
read entities from database
1 parent 9f70d9a commit 2a6c61c

File tree

1 file changed

+83
-8
lines changed

1 file changed

+83
-8
lines changed

graph/src/blockchain/block_stream.rs

Lines changed: 83 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use crate::blockchain::SubgraphFilter;
12
use crate::data::store::scalar;
23
use crate::data_source::subgraph;
34
use crate::substreams::Clock;
@@ -7,17 +8,16 @@ use anyhow::Error;
78
use async_stream::stream;
89
use futures03::Stream;
910
use prost_types::Any;
10-
use std::collections::HashSet;
11+
use std::collections::{BTreeMap, HashSet};
1112
use std::fmt;
13+
use std::ops::Range;
1214
use std::sync::Arc;
1315
use std::time::Instant;
1416
use thiserror::Error;
1517
use tokio::sync::mpsc::{self, Receiver, Sender};
1618

1719
use super::substreams_block_stream::SubstreamsLogData;
18-
use super::{
19-
Block, BlockPtr, BlockTime, Blockchain, SubgraphFilter, Trigger, TriggerFilterWrapper,
20-
};
20+
use super::{Block, BlockPtr, BlockTime, Blockchain, Trigger, TriggerFilterWrapper};
2121
use crate::anyhow::Result;
2222
use crate::components::store::{BlockNumber, DeploymentLocator, SourceableStore};
2323
use crate::data::subgraph::UnifiedMappingApiVersion;
@@ -353,9 +353,44 @@ impl<C: Blockchain> TriggersAdapterWrapper<C> {
353353
filter: &Arc<TriggerFilterWrapper<C>>,
354354
) -> Result<(Vec<BlockWithTriggers<C>>, BlockNumber), Error> {
355355
if !filter.subgraph_filter.is_empty() {
356-
return self
357-
.subgraph_triggers(Logger::root(slog::Discard, o!()), from, to, filter)
358-
.await;
356+
// TODO: handle empty range, or empty entity set bellow
357+
if to <= from {
358+
return self
359+
.mock_subgraph_triggers(Logger::root(slog::Discard, o!()), from, to, filter)
360+
.await;
361+
}
362+
363+
if let Some(SubgraphFilter {
364+
subgraph: dh,
365+
start_block: _sb,
366+
entities: ent,
367+
}) = filter.subgraph_filter.first()
368+
{
369+
if let Some((dh2, store)) = self.source_subgraph_stores.first() {
370+
if dh == dh2 {
371+
let schema = crate::components::store::ReadStore::input_schema(store);
372+
if let Some(entity_type) = ent.first() {
373+
let et = schema.entity_type(entity_type).unwrap();
374+
375+
let f: u32 = from as u32;
376+
let t: u32 = to as u32;
377+
let br: Range<u32> = f..t;
378+
let entities = store.get_range(&et, br)?;
379+
if !entities.is_empty() {
380+
return self
381+
.subgraph_triggers(
382+
Logger::root(slog::Discard, o!()),
383+
from,
384+
to,
385+
filter,
386+
entities,
387+
)
388+
.await;
389+
}
390+
}
391+
}
392+
}
393+
}
359394
}
360395

361396
self.adapter
@@ -384,10 +419,50 @@ impl<C: Blockchain> TriggersAdapterWrapper<C> {
384419
self.adapter.chain_head_ptr().await
385420
}
386421

422+
async fn subgraph_triggers(
423+
&self,
424+
logger: Logger,
425+
from: BlockNumber,
426+
to: BlockNumber,
427+
filter: &Arc<TriggerFilterWrapper<C>>,
428+
entities: BTreeMap<BlockNumber, Entity>,
429+
) -> Result<(Vec<BlockWithTriggers<C>>, BlockNumber), Error> {
430+
let logger2 = logger.cheap_clone();
431+
let adapter = self.adapter.clone();
432+
let first_filter = filter.subgraph_filter.first().unwrap();
433+
let blocks = adapter
434+
.load_blocks_by_numbers(logger, HashSet::from_iter(from..to))
435+
.await?
436+
.into_iter()
437+
.map(|block| {
438+
let key = block.number();
439+
let entity = entities.get(&key).unwrap();
440+
let trigger_data = vec![Self::create_subgraph_trigger_from_entity(
441+
first_filter,
442+
entity,
443+
)];
444+
BlockWithTriggers::new_with_subgraph_triggers(block, trigger_data, &logger2)
445+
})
446+
.collect();
447+
448+
Ok((blocks, to))
449+
}
450+
451+
fn create_subgraph_trigger_from_entity(
452+
filter: &SubgraphFilter,
453+
entity: &Entity,
454+
) -> subgraph::TriggerData {
455+
subgraph::TriggerData {
456+
source: filter.subgraph.clone(),
457+
entity: entity.clone(),
458+
entity_type: filter.entities.first().unwrap().clone(),
459+
}
460+
}
461+
387462
// TODO(krishna): Currently this is a mock implementation of subgraph triggers.
388463
// This will be replaced with the actual implementation which will use the filters to
389464
// query the database of the source subgraph and return the entity triggers.
390-
async fn subgraph_triggers(
465+
async fn mock_subgraph_triggers(
391466
&self,
392467
logger: Logger,
393468
from: BlockNumber,

0 commit comments

Comments
 (0)