1- use crate :: data :: store :: scalar ;
1+ use crate :: blockchain :: SubgraphFilter ;
22use crate :: data_source:: subgraph;
33use crate :: substreams:: Clock ;
44use crate :: substreams_rpc:: response:: Message as SubstreamsMessage ;
@@ -7,17 +7,16 @@ use anyhow::Error;
77use async_stream:: stream;
88use futures03:: Stream ;
99use prost_types:: Any ;
10- use std:: collections:: HashSet ;
10+ use std:: collections:: { BTreeMap , HashSet } ;
1111use std:: fmt;
12+ use std:: ops:: Range ;
1213use std:: sync:: Arc ;
1314use std:: time:: Instant ;
1415use thiserror:: Error ;
1516use tokio:: sync:: mpsc:: { self , Receiver , Sender } ;
1617
1718use super :: substreams_block_stream:: SubstreamsLogData ;
18- use super :: {
19- Block , BlockPtr , BlockTime , Blockchain , SubgraphFilter , Trigger , TriggerFilterWrapper ,
20- } ;
19+ use super :: { Block , BlockPtr , BlockTime , Blockchain , Trigger , TriggerFilterWrapper } ;
2120use crate :: anyhow:: Result ;
2221use crate :: components:: store:: { BlockNumber , DeploymentLocator , SourceableStore } ;
2322use crate :: data:: subgraph:: UnifiedMappingApiVersion ;
@@ -353,11 +352,37 @@ impl<C: Blockchain> TriggersAdapterWrapper<C> {
353352 filter : & Arc < TriggerFilterWrapper < C > > ,
354353 ) -> Result < ( Vec < BlockWithTriggers < C > > , BlockNumber ) , Error > {
355354 if !filter. subgraph_filter . is_empty ( ) {
356- return self
357- . subgraph_triggers ( Logger :: root ( slog:: Discard , o ! ( ) ) , from, to, filter)
358- . await ;
355+ // TODO: handle empty range, or empty entity set bellow
356+
357+ if let Some ( SubgraphFilter {
358+ subgraph : dh,
359+ start_block : _sb,
360+ entities : ent,
361+ } ) = filter. subgraph_filter . first ( )
362+ {
363+ if let Some ( store) = self . source_subgraph_stores . first ( ) {
364+ let schema = store. input_schema ( ) ;
365+ let dh2 = schema. id ( ) ;
366+ if dh == dh2 {
367+ if let Some ( entity_type) = ent. first ( ) {
368+ let et = schema. entity_type ( entity_type) . unwrap ( ) ;
369+
370+ let br: Range < BlockNumber > = from..to;
371+ let entities = store. get_range ( & et, br) ?;
372+ return self
373+ . subgraph_triggers (
374+ Logger :: root ( slog:: Discard , o ! ( ) ) ,
375+ from,
376+ to,
377+ filter,
378+ entities,
379+ )
380+ . await ;
381+ }
382+ }
383+ }
384+ }
359385 }
360-
361386 self . adapter
362387 . scan_triggers ( from, to, & filter. chain_filter )
363388 . await
@@ -384,70 +409,54 @@ impl<C: Blockchain> TriggersAdapterWrapper<C> {
384409 self . adapter . chain_head_ptr ( ) . await
385410 }
386411
387- // TODO(krishna): Currently this is a mock implementation of subgraph triggers.
388- // This will be replaced with the actual implementation which will use the filters to
389- // query the database of the source subgraph and return the entity triggers.
390412 async fn subgraph_triggers (
391413 & self ,
392414 logger : Logger ,
393415 from : BlockNumber ,
394416 to : BlockNumber ,
395417 filter : & Arc < TriggerFilterWrapper < C > > ,
418+ entities : BTreeMap < BlockNumber , Vec < Entity > > ,
396419 ) -> Result < ( Vec < BlockWithTriggers < C > > , BlockNumber ) , Error > {
397420 let logger2 = logger. cheap_clone ( ) ;
398421 let adapter = self . adapter . clone ( ) ;
399- // let to_ptr = eth.next_existing_ptr_to_number(&logger, to).await?;
400- // let to = to_ptr.block_number();
401-
402422 let first_filter = filter. subgraph_filter . first ( ) . unwrap ( ) ;
403-
404423 let blocks = adapter
405- . load_blocks_by_numbers ( logger, HashSet :: from_iter ( from..= to) )
424+ . load_blocks_by_numbers ( logger, HashSet :: from_iter ( from..to) )
406425 . await ?
407426 . into_iter ( )
408427 . map ( |block| {
409- let trigger_data = vec ! [ Self :: create_mock_subgraph_trigger( first_filter, & block) ] ;
410- BlockWithTriggers :: new_with_subgraph_triggers ( block, trigger_data, & logger2)
428+ let key = block. number ( ) ;
429+ match entities. get ( & key) {
430+ Some ( e) => {
431+ let trigger_data =
432+ Self :: create_subgraph_trigger_from_entity ( first_filter, e) ;
433+ Some ( BlockWithTriggers :: new_with_subgraph_triggers (
434+ block,
435+ trigger_data,
436+ & logger2,
437+ ) )
438+ }
439+ None => None ,
440+ }
411441 } )
442+ . flatten ( )
412443 . collect ( ) ;
413444
414445 Ok ( ( blocks, to) )
415446 }
416447
417- fn create_mock_subgraph_trigger (
448+ fn create_subgraph_trigger_from_entity (
418449 filter : & SubgraphFilter ,
419- block : & C :: Block ,
420- ) -> subgraph:: TriggerData {
421- let mock_entity = Self :: create_mock_entity ( block) ;
422- subgraph:: TriggerData {
423- source : filter. subgraph . clone ( ) ,
424- entity : mock_entity,
425- entity_type : filter. entities . first ( ) . unwrap ( ) . clone ( ) ,
426- }
427- }
428-
429- fn create_mock_entity ( block : & C :: Block ) -> Entity {
430- let id = DeploymentHash :: new ( "test" ) . unwrap ( ) ;
431- let data_schema = InputSchema :: parse_latest (
432- "type Block @entity { id: Bytes!, number: BigInt!, hash: Bytes! }" ,
433- id. clone ( ) ,
434- )
435- . unwrap ( ) ;
436-
437- let block = block. ptr ( ) ;
438- let hash = Value :: Bytes ( scalar:: Bytes :: from ( block. hash_slice ( ) . to_vec ( ) ) ) ;
439- let data = data_schema
440- . make_entity ( vec ! [
441- ( "id" . into( ) , hash. clone( ) ) ,
442- (
443- "number" . into( ) ,
444- Value :: BigInt ( scalar:: BigInt :: from( block. block_number( ) ) ) ,
445- ) ,
446- ( "hash" . into( ) , hash) ,
447- ] )
448- . unwrap ( ) ;
449-
450- data
450+ entity : & Vec < Entity > ,
451+ ) -> Vec < subgraph:: TriggerData > {
452+ entity
453+ . iter ( )
454+ . map ( |e| subgraph:: TriggerData {
455+ source : filter. subgraph . clone ( ) ,
456+ entity : e. clone ( ) ,
457+ entity_type : filter. entities . first ( ) . unwrap ( ) . clone ( ) ,
458+ } )
459+ . collect ( )
451460 }
452461}
453462
0 commit comments