Skip to content

Commit b0b8283

Browse files
committed
graph, runtime: Support subgraph entity operation detection in composed subgraphs
1 parent ca7824e commit b0b8283

File tree

9 files changed

+135
-46
lines changed

9 files changed

+135
-46
lines changed

graph/src/blockchain/block_stream.rs

Lines changed: 13 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -353,14 +353,13 @@ impl<C: Blockchain> TriggersAdapterWrapper<C> {
353353

354354
fn create_subgraph_trigger_from_entities(
355355
filter: &SubgraphFilter,
356-
entities: &Vec<EntityWithType>,
356+
entities: Vec<EntityWithType>,
357357
) -> Vec<subgraph::TriggerData> {
358358
entities
359-
.iter()
360-
.map(|e| subgraph::TriggerData {
359+
.into_iter()
360+
.map(|entity| subgraph::TriggerData {
361361
source: filter.subgraph.clone(),
362-
entity: e.entity.clone(),
363-
entity_type: e.entity_type.as_str().to_string(),
362+
entity,
364363
})
365364
.collect()
366365
}
@@ -369,25 +368,20 @@ async fn create_subgraph_triggers<C: Blockchain>(
369368
logger: Logger,
370369
blocks: Vec<C::Block>,
371370
filter: &SubgraphFilter,
372-
entities: BTreeMap<BlockNumber, Vec<EntityWithType>>,
371+
mut entities: BTreeMap<BlockNumber, Vec<EntityWithType>>,
373372
) -> Result<Vec<BlockWithTriggers<C>>, Error> {
374373
let logger_clone = logger.cheap_clone();
375374

376375
let blocks: Vec<BlockWithTriggers<C>> = blocks
377376
.into_iter()
378377
.map(|block| {
379378
let block_number = block.number();
380-
match entities.get(&block_number) {
381-
Some(e) => {
382-
let trigger_data = create_subgraph_trigger_from_entities(filter, e);
383-
BlockWithTriggers::new_with_subgraph_triggers(
384-
block,
385-
trigger_data,
386-
&logger_clone,
387-
)
388-
}
389-
None => BlockWithTriggers::new_with_subgraph_triggers(block, vec![], &logger_clone),
390-
}
379+
let trigger_data = entities
380+
.remove(&block_number)
381+
.map(|e| create_subgraph_trigger_from_entities(filter, e))
382+
.unwrap_or_else(Vec::new);
383+
384+
BlockWithTriggers::new_with_subgraph_triggers(block, trigger_data, &logger_clone)
391385
})
392386
.collect();
393387

@@ -429,14 +423,14 @@ async fn scan_subgraph_triggers<C: Blockchain>(
429423
}
430424
}
431425

432-
#[derive(Debug)]
426+
#[derive(Debug, Clone, Eq, PartialEq)]
433427
pub enum EntitySubgraphOperation {
434428
Create,
435429
Modify,
436430
Delete,
437431
}
438432

439-
#[derive(Debug)]
433+
#[derive(Debug, Clone, Eq, PartialEq)]
440434
pub struct EntityWithType {
441435
pub entity_op: EntitySubgraphOperation,
442436
pub entity_type: EntityType,

graph/src/data_source/subgraph.rs

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
11
use crate::{
2-
blockchain::{Block, Blockchain},
3-
components::{
4-
link_resolver::LinkResolver,
5-
store::{BlockNumber, Entity},
6-
},
2+
blockchain::{block_stream::EntityWithType, Block, Blockchain},
3+
components::{link_resolver::LinkResolver, store::BlockNumber},
74
data::{subgraph::SPEC_VERSION_1_3_0, value::Word},
85
data_source,
96
prelude::{DataSourceContext, DeploymentHash, Link},
@@ -76,7 +73,7 @@ impl DataSource {
7673
}
7774

7875
let trigger_ref = self.mapping.handlers.iter().find_map(|handler| {
79-
if handler.entity != trigger.entity_type {
76+
if handler.entity != trigger.entity_type() {
8077
return None;
8178
}
8279

@@ -281,17 +278,16 @@ impl UnresolvedDataSourceTemplate {
281278
#[derive(Clone, PartialEq, Eq)]
282279
pub struct TriggerData {
283280
pub source: DeploymentHash,
284-
pub entity: Entity,
285-
pub entity_type: String,
281+
pub entity: EntityWithType,
286282
}
287283

288284
impl TriggerData {
289-
pub fn new(source: DeploymentHash, entity: Entity, entity_type: String) -> Self {
290-
Self {
291-
source,
292-
entity,
293-
entity_type,
294-
}
285+
pub fn new(source: DeploymentHash, entity: EntityWithType) -> Self {
286+
Self { source, entity }
287+
}
288+
289+
pub fn entity_type(&self) -> &str {
290+
self.entity.entity_type.as_str()
295291
}
296292
}
297293

graph/src/runtime/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,11 @@ pub enum IndexForAscTypeId {
368368
// ...
369369
// LastStarknetType = 4499,
370370

371+
372+
// Subgraph Data Source types
373+
AscEntityTrigger = 4500,
374+
375+
371376
// Reserved discriminant space for a future blockchain type IDs: [4,500, 5,499]
372377
//
373378
// Generated with the following shell script:

runtime/wasm/src/module/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ impl ToAscPtr for subgraph::TriggerData {
7676
heap: &mut H,
7777
gas: &GasCounter,
7878
) -> Result<AscPtr<()>, HostExportError> {
79-
asc_new(heap, &self.entity.sorted_ref(), gas).map(|ptr| ptr.erase())
79+
asc_new(heap, &self.entity, gas).map(|ptr| ptr.erase())
8080
}
8181
}
8282

runtime/wasm/src/to_from/external.rs

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,18 @@
11
use ethabi;
22

3+
use graph::blockchain::block_stream::{EntitySubgraphOperation, EntityWithType};
34
use graph::data::store::scalar::Timestamp;
45
use graph::data::value::Word;
56
use graph::prelude::{BigDecimal, BigInt};
67
use graph::runtime::gas::GasCounter;
78
use graph::runtime::{
8-
asc_get, asc_new, AscIndexId, AscPtr, AscType, AscValue, HostExportError, ToAscObj,
9+
asc_get, asc_new, AscIndexId, AscPtr, AscType, AscValue, HostExportError, IndexForAscTypeId,
10+
ToAscObj,
911
};
1012
use graph::{data::store, runtime::DeterministicHostError};
1113
use graph::{prelude::serde_json, runtime::FromAscObj};
1214
use graph::{prelude::web3::types as web3, runtime::AscHeap};
15+
use graph_runtime_derive::AscType;
1316

1417
use crate::asc_abi::class::*;
1518

@@ -463,3 +466,43 @@ where
463466
})
464467
}
465468
}
469+
470+
#[derive(Debug, Clone, Eq, PartialEq, AscType)]
471+
pub enum AscSubgraphEntityOp {
472+
Create,
473+
Modify,
474+
Delete,
475+
}
476+
477+
#[derive(AscType)]
478+
pub struct AscEntityTrigger {
479+
pub entity_op: AscSubgraphEntityOp,
480+
pub entity_type: AscPtr<AscString>,
481+
pub entity: AscPtr<AscEntity>,
482+
pub vid: i64,
483+
}
484+
485+
impl ToAscObj<AscEntityTrigger> for EntityWithType {
486+
fn to_asc_obj<H: AscHeap + ?Sized>(
487+
&self,
488+
heap: &mut H,
489+
gas: &GasCounter,
490+
) -> Result<AscEntityTrigger, HostExportError> {
491+
let entity_op = match self.entity_op {
492+
EntitySubgraphOperation::Create => AscSubgraphEntityOp::Create,
493+
EntitySubgraphOperation::Modify => AscSubgraphEntityOp::Modify,
494+
EntitySubgraphOperation::Delete => AscSubgraphEntityOp::Delete,
495+
};
496+
497+
Ok(AscEntityTrigger {
498+
entity_op,
499+
entity_type: asc_new(heap, &self.entity_type.as_str(), gas)?,
500+
entity: asc_new(heap, &self.entity.sorted_ref(), gas)?,
501+
vid: self.vid,
502+
})
503+
}
504+
}
505+
506+
impl AscIndexId for AscEntityTrigger {
507+
const INDEX_ASC_TYPE_ID: IndexForAscTypeId = IndexForAscTypeId::AscEntityTrigger;
508+
}

tests/integration-tests/subgraph-data-sources/src/mapping.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,17 @@
11
import { Entity, log } from '@graphprotocol/graph-ts';
22
import { MirrorBlock } from '../generated/schema';
33

4-
export function handleEntity(blockEntity: Entity): void {
4+
export class EntityTrigger {
5+
constructor(
6+
public entityOp: u32,
7+
public entityType: string,
8+
public entity: Entity,
9+
public vid: i64,
10+
) {}
11+
}
12+
13+
export function handleEntity(trigger: EntityTrigger): void {
14+
let blockEntity = trigger.entity;
515
let blockNumber = blockEntity.getBigInt('number');
616
let blockHash = blockEntity.getBytes('hash');
717
let id = blockEntity.getString('id');
Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,35 @@
11
import { Entity, log } from '@graphprotocol/graph-ts';
22

3-
export function handleBlock(content: Entity): void {
4-
let stringContent = content.getString('val');
3+
export const SubgraphEntityOpCreate: u32 = 0;
4+
export const SubgraphEntityOpModify: u32 = 1;
5+
export const SubgraphEntityOpDelete: u32 = 2;
6+
7+
export class EntityTrigger {
8+
constructor(
9+
public entityOp: u32,
10+
public entityType: string,
11+
public entity: Entity,
12+
public vid: i64,
13+
) {}
14+
}
15+
16+
export function handleBlock(content: EntityTrigger): void {
17+
let stringContent = content.entity.getString('val');
518
log.info('Content: {}', [stringContent]);
19+
log.info('EntityOp: {}', [content.entityOp.toString()]);
20+
21+
switch (content.entityOp) {
22+
case SubgraphEntityOpCreate: {
23+
log.info('Entity created: {}', [content.entityType]);
24+
break
25+
}
26+
case SubgraphEntityOpModify: {
27+
log.info('Entity modified: {}', [content.entityType]);
28+
break;
29+
}
30+
case SubgraphEntityOpDelete: {
31+
log.info('Entity deleted: {}', [content.entityType]);
32+
break;
33+
}
34+
}
635
}

tests/src/fixture/ethereum.rs

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,15 @@ use super::{
66
test_ptr, CommonChainConfig, MutexBlockStreamBuilder, NoopAdapterSelector,
77
NoopRuntimeAdapterBuilder, StaticBlockRefetcher, StaticStreamBuilder, Stores, TestChain,
88
};
9+
use graph::blockchain::block_stream::{EntitySubgraphOperation, EntityWithType};
910
use graph::blockchain::client::ChainClient;
1011
use graph::blockchain::{BlockPtr, Trigger, TriggersAdapterSelector};
1112
use graph::cheap_clone::CheapClone;
1213
use graph::data_source::subgraph;
1314
use graph::prelude::ethabi::ethereum_types::H256;
1415
use graph::prelude::web3::types::{Address, Log, Transaction, H160};
1516
use graph::prelude::{ethabi, tiny_keccak, DeploymentHash, Entity, LightEthereumBlock, ENV_VARS};
17+
use graph::schema::EntityType;
1618
use graph::{blockchain::block_stream::BlockWithTriggers, prelude::ethabi::ethereum_types::U64};
1719
use graph_chain_ethereum::network::EthereumNetworkAdapters;
1820
use graph_chain_ethereum::trigger::LogRef;
@@ -164,15 +166,20 @@ pub fn push_test_subgraph_trigger(
164166
block: &mut BlockWithTriggers<Chain>,
165167
source: DeploymentHash,
166168
entity: Entity,
167-
entity_type: &str,
169+
entity_type: EntityType,
170+
entity_op: EntitySubgraphOperation,
171+
vid: i64,
168172
) {
173+
let entity = EntityWithType {
174+
entity: entity,
175+
entity_type: entity_type,
176+
entity_op: entity_op,
177+
vid,
178+
};
179+
169180
block
170181
.trigger_data
171-
.push(Trigger::Subgraph(subgraph::TriggerData {
172-
source,
173-
entity: entity,
174-
entity_type: entity_type.to_string(),
175-
}));
182+
.push(Trigger::Subgraph(subgraph::TriggerData { source, entity }));
176183
}
177184

178185
pub fn push_test_command(

tests/tests/runner_tests.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use std::sync::Arc;
66
use std::time::Duration;
77

88
use assert_json_diff::assert_json_eq;
9-
use graph::blockchain::block_stream::BlockWithTriggers;
9+
use graph::blockchain::block_stream::{BlockWithTriggers, EntitySubgraphOperation};
1010
use graph::blockchain::{Block, BlockPtr, Blockchain};
1111
use graph::data::store::scalar::Bytes;
1212
use graph::data::subgraph::schema::{SubgraphError, SubgraphHealth};
@@ -1109,14 +1109,19 @@ async fn subgraph_data_sources() {
11091109
])
11101110
.unwrap();
11111111

1112+
let entity_type = schema.entity_type("User").unwrap();
1113+
11121114
let blocks = {
11131115
let block_0 = genesis();
11141116
let mut block_1 = empty_block(block_0.ptr(), test_ptr(1));
1117+
11151118
push_test_subgraph_trigger(
11161119
&mut block_1,
11171120
DeploymentHash::new("QmRFXhvyvbm4z5Lo7z2mN9Ckmo623uuB2jJYbRmAXgYKXJ").unwrap(),
11181121
entity,
1119-
"User",
1122+
entity_type,
1123+
EntitySubgraphOperation::Create,
1124+
1,
11201125
);
11211126

11221127
let block_2 = empty_block(block_1.ptr(), test_ptr(2));

0 commit comments

Comments
 (0)