Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 17 additions & 27 deletions graph/src/blockchain/block_stream.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::blockchain::SubgraphFilter;
use crate::data_source::subgraph;
use crate::data_source::{subgraph, CausalityRegion};
use crate::substreams::Clock;
use crate::substreams_rpc::response::Message as SubstreamsMessage;
use crate::substreams_rpc::BlockScopedData;
Expand Down Expand Up @@ -433,9 +433,19 @@ async fn scan_subgraph_triggers<C: Blockchain>(
}
}

#[derive(Debug)]
pub enum EntitySubgraphOperation {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if this makes sense, but might be better to stick the entity into the operation, i.e. to have Create(Entity), Modify(Entity), Delete(Entity); that way, users of this enum are forced to consider each of these cases, and it would make it easy to, e.g., change to Delete(Id), i.e., not load the entire entity if that's useful.

But I don't understand how all of composition fits together, so I am not sure if this suggestions is really an improvement.

Copy link
Contributor Author

@zorancv zorancv Dec 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess here the naming is poor. The enum EntitySubgraphOperation is a kind of operation, with definition more akin to the C style enums. Where the struct just bellow it EntityWithType contains the whole 'entity event' with all the fields including this operation kind and the entity itself.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did some renaming which I believe is closer to the semantics. I done it in a separate PR in order to save on rebasing.

Create,
Modify,
Delete,
}

#[derive(Debug)]
pub struct EntityWithType {
pub entity_op: EntitySubgraphOperation,
pub entity_type: EntityType,
pub entity: Entity,
pub vid: i64,
}

async fn get_entities_for_range(
Expand All @@ -445,32 +455,12 @@ async fn get_entities_for_range(
from: BlockNumber,
to: BlockNumber,
) -> Result<BTreeMap<BlockNumber, Vec<EntityWithType>>, Error> {
let mut entities_by_block = BTreeMap::new();

for entity_name in &filter.entities {
let entity_type = schema.entity_type(entity_name)?;

let entity_ranges = store.get_range(&entity_type, from..to)?;

for (block_number, entity_vec) in entity_ranges {
let mut entity_vec = entity_vec
.into_iter()
.map(|e| EntityWithType {
entity_type: entity_type.clone(),
entity: e,
})
.collect();

entities_by_block
.entry(block_number)
.and_modify(|existing_vec: &mut Vec<EntityWithType>| {
existing_vec.append(&mut entity_vec);
})
.or_insert(entity_vec);
}
}

Ok(entities_by_block)
let entity_types: Result<Vec<EntityType>> = filter
.entities
.iter()
.map(|name| schema.entity_type(name))
.collect();
Ok(store.get_range(entity_types?, CausalityRegion::ONCHAIN, from..to)?)
}

impl<C: Blockchain> TriggersAdapterWrapper<C> {
Expand Down
14 changes: 8 additions & 6 deletions graph/src/components/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use async_trait::async_trait;
use web3::types::{Address, H256};

use super::*;
use crate::blockchain::block_stream::FirehoseCursor;
use crate::blockchain::block_stream::{EntityWithType, FirehoseCursor};
use crate::blockchain::{BlockTime, ChainIdentifier};
use crate::components::metrics::stopwatch::StopwatchMetrics;
use crate::components::server::index_node::VersionInfo;
Expand Down Expand Up @@ -299,9 +299,10 @@ pub trait SourceableStore: Sync + Send + 'static {
/// changed in the given block_range.
fn get_range(
&self,
entity_type: &EntityType,
entity_types: Vec<EntityType>,
causality_region: CausalityRegion,
block_range: Range<BlockNumber>,
) -> Result<BTreeMap<BlockNumber, Vec<Entity>>, StoreError>;
) -> Result<BTreeMap<BlockNumber, Vec<EntityWithType>>, StoreError>;

fn input_schema(&self) -> InputSchema;

Expand All @@ -314,10 +315,11 @@ pub trait SourceableStore: Sync + Send + 'static {
impl<T: ?Sized + SourceableStore> SourceableStore for Arc<T> {
fn get_range(
&self,
entity_type: &EntityType,
entity_types: Vec<EntityType>,
causality_region: CausalityRegion,
block_range: Range<BlockNumber>,
) -> Result<BTreeMap<BlockNumber, Vec<Entity>>, StoreError> {
(**self).get_range(entity_type, block_range)
) -> Result<BTreeMap<BlockNumber, Vec<EntityWithType>>, StoreError> {
(**self).get_range(entity_types, causality_region, block_range)
}

fn input_schema(&self) -> InputSchema {
Expand Down
41 changes: 31 additions & 10 deletions store/postgres/src/block_range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,36 +132,52 @@ impl<'a> QueryFragment<Pg> for BlockRangeUpperBoundClause<'a> {
}
}

#[derive(Debug, Clone, Copy)]
pub enum BoundSide {
Lower,
Upper,
}

/// Helper for generating SQL fragments for selecting entities in a specific block range
#[derive(Debug, Clone, Copy)]
pub enum EntityBlockRange {
Mutable(BlockRange), // TODO: check if this is a proper type here (maybe Range<BlockNumber>?)
Mutable((BlockRange, BoundSide)),
Immutable(BlockRange),
}

impl EntityBlockRange {
pub fn new(table: &Table, block_range: std::ops::Range<BlockNumber>) -> Self {
pub fn new(
immutable: bool,
block_range: std::ops::Range<BlockNumber>,
bound_side: BoundSide,
) -> Self {
let start: Bound<BlockNumber> = Bound::Included(block_range.start);
let end: Bound<BlockNumber> = Bound::Excluded(block_range.end);
let block_range: BlockRange = BlockRange(start, end);
if table.immutable {
if immutable {
Self::Immutable(block_range)
} else {
Self::Mutable(block_range)
Self::Mutable((block_range, bound_side))
}
}

/// Output SQL that matches only rows whose block range contains `block`.
/// Outputs SQL that matches only rows whose entities would trigger a change
/// event (Create, Modify, Delete) in a given interval of blocks. Otherwise said
/// a block_range border is contained in an interval of blocks. For instance
/// one of the following:
/// lower(block_range) >= $1 and lower(block_range) <= $2
/// upper(block_range) >= $1 and upper(block_range) <= $2
/// block$ >= $1 and block$ <= $2
pub fn contains<'b>(&'b self, out: &mut AstPass<'_, 'b, Pg>) -> QueryResult<()> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find this very confusing. It would help if the comment described what the SQL text is that can be generated by this method

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some sensible comment. Hope it's clear now.

out.unsafe_to_cache_prepared();
let block_range = match self {
EntityBlockRange::Mutable(br) => br,
EntityBlockRange::Mutable((br, _)) => br,
EntityBlockRange::Immutable(br) => br,
};
let BlockRange(start, finish) = block_range;

self.compare_column(out);
out.push_sql(" >= ");
out.push_sql(">= ");
match start {
Bound::Included(block) => out.push_bind_param::<Integer, _>(block)?,
Bound::Excluded(block) => {
Expand All @@ -170,9 +186,9 @@ impl EntityBlockRange {
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could just push the bind param block + 1

Copy link
Contributor Author

@zorancv zorancv Dec 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't able to make the compiler borrower happy as the addition creates a temporary variable that can't outlive it's block...

Bound::Unbounded => unimplemented!(),
};
out.push_sql(" AND ");
out.push_sql(" and");
self.compare_column(out);
out.push_sql(" <= ");
out.push_sql("<= ");
match finish {
Bound::Included(block) => {
out.push_bind_param::<Integer, _>(block)?;
Expand All @@ -186,7 +202,12 @@ impl EntityBlockRange {

pub fn compare_column(&self, out: &mut AstPass<Pg>) {
match self {
EntityBlockRange::Mutable(_) => out.push_sql(" lower(block_range) "),
EntityBlockRange::Mutable((_, BoundSide::Lower)) => {
out.push_sql(" lower(block_range) ")
}
EntityBlockRange::Mutable((_, BoundSide::Upper)) => {
out.push_sql(" upper(block_range) ")
}
EntityBlockRange::Immutable(_) => out.push_sql(" block$ "),
}
}
Expand Down
9 changes: 5 additions & 4 deletions store/postgres/src/deployment_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use diesel::pg::PgConnection;
use diesel::r2d2::{ConnectionManager, PooledConnection};
use diesel::{prelude::*, sql_query};
use graph::anyhow::Context;
use graph::blockchain::block_stream::FirehoseCursor;
use graph::blockchain::block_stream::{EntityWithType, FirehoseCursor};
use graph::blockchain::BlockTime;
use graph::components::store::write::RowGroup;
use graph::components::store::{
Expand Down Expand Up @@ -1066,12 +1066,13 @@ impl DeploymentStore {
pub(crate) fn get_range(
&self,
site: Arc<Site>,
entity_type: &EntityType,
entity_types: Vec<EntityType>,
causality_region: CausalityRegion,
block_range: Range<BlockNumber>,
) -> Result<BTreeMap<BlockNumber, Vec<Entity>>, StoreError> {
) -> Result<BTreeMap<BlockNumber, Vec<EntityWithType>>, StoreError> {
let mut conn = self.get_conn()?;
let layout = self.layout(&mut conn, site)?;
layout.find_range(&mut conn, entity_type, block_range)
layout.find_range(&mut conn, entity_types, causality_region, block_range)
}

pub(crate) fn get_derived(
Expand Down
141 changes: 125 additions & 16 deletions store/postgres/src/relational.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use diesel::{connection::SimpleConnection, Connection};
use diesel::{
debug_query, sql_query, OptionalExtension, PgConnection, QueryDsl, QueryResult, RunQueryDsl,
};
use graph::blockchain::block_stream::{EntitySubgraphOperation, EntityWithType};
use graph::blockchain::BlockTime;
use graph::cheap_clone::CheapClone;
use graph::components::store::write::{RowGroup, WriteChunk};
Expand Down Expand Up @@ -57,8 +58,8 @@ use std::time::{Duration, Instant};

use crate::relational::value::{FromOidRow, OidRow};
use crate::relational_queries::{
ConflictingEntitiesData, ConflictingEntitiesQuery, FindChangesQuery, FindDerivedQuery,
FindPossibleDeletionsQuery, ReturnedEntityData,
ConflictingEntitiesData, ConflictingEntitiesQuery, EntityDataExt, FindChangesQuery,
FindDerivedQuery, FindPossibleDeletionsQuery, ReturnedEntityData,
};
use crate::{
primary::{Namespace, Site},
Expand All @@ -75,7 +76,7 @@ use graph::prelude::{
QueryExecutionError, StoreError, StoreEvent, ValueType, BLOCK_NUMBER_MAX,
};

use crate::block_range::{BLOCK_COLUMN, BLOCK_RANGE_COLUMN};
use crate::block_range::{BoundSide, BLOCK_COLUMN, BLOCK_RANGE_COLUMN};
pub use crate::catalog::Catalog;
use crate::connection_pool::ForeignServer;
use crate::{catalog, deployment};
Expand Down Expand Up @@ -545,21 +546,129 @@ impl Layout {
pub fn find_range(
&self,
conn: &mut PgConnection,
entity_type: &EntityType,
entity_types: Vec<EntityType>,
causality_region: CausalityRegion,
block_range: Range<BlockNumber>,
) -> Result<BTreeMap<BlockNumber, Vec<Entity>>, StoreError> {
let table = self.table_for_entity(entity_type)?;
let mut entities: BTreeMap<BlockNumber, Vec<Entity>> = BTreeMap::new();
if let Some(vec) = FindRangeQuery::new(table.as_ref(), block_range)
.get_results::<EntityData>(conn)
.optional()?
{
for e in vec {
let block = e.clone().deserialize_block_number::<Entity>()?;
let en = e.deserialize_with_layout::<Entity>(self, None)?;
entities.entry(block).or_default().push(en);
}
) -> Result<BTreeMap<BlockNumber, Vec<EntityWithType>>, StoreError> {
let mut tables = vec![];
for et in entity_types {
tables.push(self.table_for_entity(&et)?.as_ref());
}
let mut entities: BTreeMap<BlockNumber, Vec<EntityWithType>> = BTreeMap::new();

// Collect all entities that have their 'lower(block_range)' attribute in the
// interval of blocks defined by the variable block_range. For the immutable
// entities the respective attribute is 'block$'.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This helps, but somehow buries the lede: lower_vec contains all entities that were created or updated in block_range

// Here are all entities that are created or modified in the block_range.
let lower_vec = FindRangeQuery::new(
&tables,
causality_region,
BoundSide::Lower,
block_range.clone(),
)
.get_results::<EntityDataExt>(conn)
.optional()?
.unwrap_or_default();
// Collect all entities that have their 'upper(block_range)' attribute in the
// interval of blocks defined by the variable block_range. For the immutable
// entities no entries are returned.
// Here are all entities that are modified or deleted in the block_range,
// but will have the previous versions, i.e. in the case of an update, it's
// the version before the update, and lower_vec will have a corresponding
// entry with the new version.
let upper_vec =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, upper_vec contains all entitites that were deleted or updated, but will have the previous versions, i.e. in the case of an update, it's the version before the update, and lower_vec will have a corresponding entry with the new version.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll just copy this text.

FindRangeQuery::new(&tables, causality_region, BoundSide::Upper, block_range)
.get_results::<EntityDataExt>(conn)
.optional()?
.unwrap_or_default();
let mut lower_iter = lower_vec.iter().fuse().peekable();
let mut upper_iter = upper_vec.iter().fuse().peekable();
let mut lower_now = lower_iter.next();
let mut upper_now = upper_iter.next();
// A closure to convert the entity data from the database into entity operation.
let transform = |ede: &EntityDataExt,
entity_op: EntitySubgraphOperation|
-> Result<(EntityWithType, BlockNumber), StoreError> {
let e = EntityData::new(ede.entity.clone(), ede.data.clone());
let block = ede.block_number;
let entity_type = e.entity_type(&self.input_schema);
let entity = e.deserialize_with_layout::<Entity>(self, None)?;
let vid = ede.vid;
let ewt = EntityWithType {
entity_op,
entity_type,
entity,
vid,
};
Ok((ewt, block))
};

// The algorithm is a similar to merge sort algorithm and it relays on the fact that both vectors
// are ordered by (block_number, entity_type, entity_id). It advances simultaneously entities from
// both lower_vec and upper_vec and tries to match entities that have entries in both vectors for
// a particular block. The match is successful if an entry in one array has the same values in the
// other one for the number of the block, entity type and the entity id. The comparison operation
// over the EntityDataExt implements that check. If there is a match it’s a modification operation,
// since both sides of a range are present for that block, entity type and id. If one side of the
// range exists and the other is missing it is a creation or deletion depending on which side is
// present. For immutable entities the entries in upper_vec are missing, hence they are considered
// having a lower bound at particular block and upper bound at infinity.
while lower_now.is_some() || upper_now.is_some() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The body of this is pretty intricate. There should be a comment about what the strategy here is and how it works.

let (ewt, block) = match (lower_now, upper_now) {
(Some(lower), Some(upper)) => {
match lower.cmp(&upper) {
std::cmp::Ordering::Greater => {
// we have upper bound at this block, but no lower bounds at the same block so it's deletion
let (ewt, block) = transform(upper, EntitySubgraphOperation::Delete)?;
// advance upper_vec pointer
upper_now = upper_iter.next();
(ewt, block)
}
std::cmp::Ordering::Less => {
// we have lower bound at this block but no upper bound at the same block so its creation
let (ewt, block) = transform(lower, EntitySubgraphOperation::Create)?;
// advance lower_vec pointer
lower_now = lower_iter.next();
(ewt, block)
}
std::cmp::Ordering::Equal => {
let (ewt, block) = transform(lower, EntitySubgraphOperation::Modify)?;
// advance both lower_vec and upper_vec pointers
lower_now = lower_iter.next();
upper_now = upper_iter.next();
(ewt, block)
}
}
}
(Some(lower), None) => {
// we have lower bound at this block but no upper bound at the same block so its creation
let (ewt, block) = transform(lower, EntitySubgraphOperation::Create)?;
// advance lower_vec pointer
lower_now = lower_iter.next();
(ewt, block)
}
(None, Some(upper)) => {
let (ewt, block) = transform(upper, EntitySubgraphOperation::Delete)?;
// advance upper_vec pointer
upper_now = upper_iter.next();
(ewt, block)
}
_ => panic!("Imposible case to happen"),
};

match entities.get_mut(&block) {
Some(vec) => vec.push(ewt),
None => {
let _ = entities.insert(block, vec![ewt]);
}
};
}

// sort the elements in each blocks bucket by vid
for (_, vec) in &mut entities {
vec.sort_by(|a, b| a.vid.cmp(&b.vid));
}

Ok(entities)
}

Expand Down
Loading