Skip to content

Commit 151779b

Browse files
committed
graph: refactor TriggersAdapterWrapper.triggers_in_block to not rely on scan_triggers
1 parent a9ee371 commit 151779b

File tree

1 file changed

+78
-51
lines changed

1 file changed

+78
-51
lines changed

graph/src/blockchain/block_stream.rs

Lines changed: 78 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,24 @@ impl<C: Blockchain> TriggersAdapterWrapper<C> {
328328
source_subgraph_stores,
329329
}
330330
}
331+
332+
pub async fn blocks_with_subgraph_triggers(
333+
&self,
334+
logger: &Logger,
335+
subgraph_filter: &SubgraphFilter,
336+
range: SubgraphTriggerScanRange<C>,
337+
) -> Result<Vec<BlockWithTriggers<C>>, Error> {
338+
let store = self
339+
.source_subgraph_stores
340+
.get(&subgraph_filter.subgraph)
341+
.unwrap(); // TODO(krishna): Avoid unwrap
342+
343+
let schema = crate::components::store::ReadStore::input_schema(store);
344+
345+
let adapter = self.adapter.clone();
346+
347+
scan_subgraph_triggers::<C>(logger, store, &adapter, &schema, &subgraph_filter, range).await
348+
}
331349
}
332350

333351
fn create_subgraph_trigger_from_entities(
@@ -373,34 +391,60 @@ async fn create_subgraph_triggers<C: Blockchain>(
373391
Ok(blocks)
374392
}
375393

394+
pub enum SubgraphTriggerScanRange<C: Blockchain> {
395+
Single(C::Block),
396+
Range(BlockNumber, BlockNumber),
397+
}
398+
376399
async fn scan_subgraph_triggers<C: Blockchain>(
377400
logger: &Logger,
378401
store: &Arc<dyn WritableStore>,
379402
adapter: &Arc<dyn TriggersAdapter<C>>,
380403
schema: &InputSchema,
381404
filter: &SubgraphFilter,
405+
range: SubgraphTriggerScanRange<C>,
406+
) -> Result<Vec<BlockWithTriggers<C>>, Error> {
407+
match range {
408+
SubgraphTriggerScanRange::Single(block) => {
409+
let entities =
410+
get_entities_for_range(store, filter, schema, block.number(), block.number())
411+
.await?;
412+
create_subgraph_triggers::<C>(logger.clone(), vec![block], filter, entities).await
413+
}
414+
SubgraphTriggerScanRange::Range(from, to) => {
415+
let entities = get_entities_for_range(store, filter, schema, from, to).await?;
416+
let mut block_numbers: HashSet<BlockNumber> = entities.keys().cloned().collect();
417+
// Ensure the 'to' block is included in the block_numbers
418+
block_numbers.insert(to);
419+
420+
let blocks = adapter
421+
.load_blocks_by_numbers(logger.clone(), block_numbers)
422+
.await?;
423+
424+
create_subgraph_triggers::<C>(logger.clone(), blocks, filter, entities).await
425+
}
426+
}
427+
}
428+
429+
async fn get_entities_for_range(
430+
store: &Arc<dyn WritableStore>,
431+
filter: &SubgraphFilter,
432+
schema: &InputSchema,
382433
from: BlockNumber,
383434
to: BlockNumber,
384-
) -> Result<Vec<BlockWithTriggers<C>>, Error> {
435+
) -> Result<BTreeMap<BlockNumber, Vec<Entity>>, Error> {
385436
let entity_types: Vec<EntityType> = filter
386437
.entities
387438
.iter()
388439
.map(|e| schema.entity_type(e).unwrap())
389440
.collect();
390-
391-
let entity_type = entity_types.first().unwrap();
392-
let range = from..to;
393-
let entities = store.get_range(&entity_type, range)?;
394-
let mut block_numbers: HashSet<BlockNumber> = entities.keys().cloned().collect();
395-
396-
// Ensure the 'to' block is included in the block_numbers
397-
block_numbers.insert(to);
398-
399-
let blocks = adapter
400-
.load_blocks_by_numbers(logger.clone(), block_numbers)
401-
.await?;
402-
403-
create_subgraph_triggers::<C>(logger.clone(), blocks, filter, entities).await
441+
let mut entities = BTreeMap::new();
442+
for entity_type in entity_types {
443+
let range = from..to;
444+
let mut entities_for_type = store.get_range(&entity_type, range)?;
445+
entities.append(&mut entities_for_type);
446+
}
447+
Ok(entities)
404448
}
405449

406450
impl<C: Blockchain> TriggersAdapterWrapper<C> {
@@ -421,32 +465,13 @@ impl<C: Blockchain> TriggersAdapterWrapper<C> {
421465
filter: &Arc<TriggerFilterWrapper<C>>,
422466
) -> Result<(Vec<BlockWithTriggers<C>>, BlockNumber), Error> {
423467
if let Some(subgraph_filter) = filter.subgraph_filter.first() {
424-
let store = self
425-
.source_subgraph_stores
426-
.get(&subgraph_filter.subgraph)
427-
.unwrap(); // TODO(krishna): Avoid unwrap
428-
429-
let schema = crate::components::store::ReadStore::input_schema(store);
430-
let adapter = self.adapter.clone();
431-
432-
let blocks_with_triggers = scan_subgraph_triggers::<C>(
433-
logger,
434-
store,
435-
&adapter,
436-
&schema,
437-
&subgraph_filter,
438-
from,
439-
to,
440-
)
441-
.await?;
442-
443-
debug!(
444-
logger,
445-
"Scanned subgraph triggers";
446-
"from" => from,
447-
"to" => to,
448-
"blocks_with_triggers" => blocks_with_triggers.len(),
449-
);
468+
let blocks_with_triggers = self
469+
.blocks_with_subgraph_triggers(
470+
logger,
471+
subgraph_filter,
472+
SubgraphTriggerScanRange::Range(from, to),
473+
)
474+
.await?;
450475

451476
return Ok((blocks_with_triggers, to));
452477
}
@@ -469,19 +494,21 @@ impl<C: Blockchain> TriggersAdapterWrapper<C> {
469494
"block_hash" => block.hash().hash_hex(),
470495
);
471496

472-
let block_number = block.number();
473-
474-
if filter.subgraph_filter.is_empty() {
475-
trace!(logger, "No subgraph filters, scanning triggers in block");
476-
return self
477-
.adapter
478-
.triggers_in_block(logger, block, &filter.chain_filter)
479-
.await;
497+
if let Some(subgraph_filter) = filter.subgraph_filter.first() {
498+
let blocks_with_triggers = self
499+
.blocks_with_subgraph_triggers(
500+
logger,
501+
subgraph_filter,
502+
SubgraphTriggerScanRange::Single(block),
503+
)
504+
.await?;
505+
506+
return Ok(blocks_with_triggers.into_iter().next().unwrap());
480507
}
481508

482-
self.scan_triggers(logger, block_number, block_number, filter)
509+
self.adapter
510+
.triggers_in_block(logger, block, &filter.chain_filter)
483511
.await
484-
.map(|(mut blocks, _)| blocks.pop().unwrap())
485512
}
486513

487514
pub async fn is_on_main_chain(&self, ptr: BlockPtr) -> Result<bool, Error> {

0 commit comments

Comments
 (0)