Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion chain/substreams/src/trigger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,11 @@ where
logger,
);

state.entity_cache.set(key, entity)?;
state.entity_cache.set(
key,
entity,
Some(&mut state.write_capacity_remaining),
)?;
}
ParsedChanges::Delete(entity_key) => {
let entity_type = entity_key.entity_type.cheap_clone();
Expand Down
2 changes: 1 addition & 1 deletion core/src/subgraph/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1607,7 +1607,7 @@ async fn update_proof_of_indexing(
data.push((entity_cache.schema.poi_block_time(), block_time));
}
let poi = entity_cache.make_entity(data)?;
entity_cache.set(key, poi)
entity_cache.set(key, poi, None)
}

let _section_guard = stopwatch.start_section("update_proof_of_indexing");
Expand Down
22 changes: 20 additions & 2 deletions graph/src/components/store/entity_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::cheap_clone::CheapClone;
use crate::components::store::write::EntityModification;
use crate::components::store::{self as s, Entity, EntityOperation};
use crate::data::store::{EntityValidationError, Id, IdType, IntoEntityIterator};
use crate::prelude::ENV_VARS;
use crate::prelude::{CacheWeight, ENV_VARS};
use crate::schema::{EntityKey, InputSchema};
use crate::util::intern::Error as InternError;
use crate::util::lfu_cache::{EvictStats, LfuCache};
Expand Down Expand Up @@ -349,10 +349,28 @@ impl EntityCache {
/// with existing data. The entity will be validated against the
/// subgraph schema, and any errors will result in an `Err` being
/// returned.
pub fn set(&mut self, key: EntityKey, entity: Entity) -> Result<(), anyhow::Error> {
pub fn set(
&mut self,
key: EntityKey,
entity: Entity,
write_capacity_remaining: Option<&mut usize>,
) -> Result<(), anyhow::Error> {
// check the validate for derived fields
let is_valid = entity.validate(&key).is_ok();

if let Some(write_capacity_remaining) = write_capacity_remaining {
let weight = entity.weight();

if !self.current.contains_key(&key) && weight > *write_capacity_remaining {
return Err(anyhow!(
"exceeded block write limit when writing entity `{}`",
key.entity_id,
));
}

*write_capacity_remaining -= weight;
}

self.entity_op(key.clone(), EntityOp::Update(entity));

// The updates we were given are not valid by themselves; force a
Expand Down
8 changes: 7 additions & 1 deletion graph/src/components/subgraph/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ pub struct BlockState {
in_handler: bool,

pub metrics: BlockStateMetrics,

pub write_capacity_remaining: usize,
}

impl BlockState {
Expand All @@ -94,6 +96,7 @@ impl BlockState {
processed_data_sources: Vec::new(),
in_handler: false,
metrics: BlockStateMetrics::new(),
write_capacity_remaining: ENV_VARS.block_write_capacity,
}
}
}
Expand All @@ -111,6 +114,7 @@ impl BlockState {
processed_data_sources,
in_handler,
metrics,
write_capacity_remaining,
} = self;

match in_handler {
Expand All @@ -121,7 +125,9 @@ impl BlockState {
entity_cache.extend(other.entity_cache);
processed_data_sources.extend(other.processed_data_sources);
persisted_data_sources.extend(other.persisted_data_sources);
metrics.extend(other.metrics)
metrics.extend(other.metrics);
*write_capacity_remaining =
write_capacity_remaining.saturating_sub(other.write_capacity_remaining);
}

pub fn has_errors(&self) -> bool {
Expand Down
5 changes: 5 additions & 0 deletions graph/src/env/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,8 @@ pub struct EnvVars {
///
/// Defaults to an empty list, which means that this feature is enabled for all chains;
pub firehose_disable_extended_blocks_for_chains: Vec<String>,

pub block_write_capacity: usize,
}

impl EnvVars {
Expand Down Expand Up @@ -327,6 +329,7 @@ impl EnvVars {
Self::firehose_disable_extended_blocks_for_chains(
inner.firehose_disable_extended_blocks_for_chains,
),
block_write_capacity: inner.block_write_capacity.0,
})
}

Expand Down Expand Up @@ -488,6 +491,8 @@ struct Inner {
graphman_server_auth_token: Option<String>,
#[envconfig(from = "GRAPH_NODE_FIREHOSE_DISABLE_EXTENDED_BLOCKS_FOR_CHAINS")]
firehose_disable_extended_blocks_for_chains: Option<String>,
#[envconfig(from = "GRAPH_NODE_BLOCK_WRITE_CAPACITY", default = "4_000_000_000")]
block_write_capacity: NoUnderscores<usize>,
}

#[derive(Clone, Debug)]
Expand Down
4 changes: 3 additions & 1 deletion runtime/wasm/src/host_exports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,9 @@ impl HostExports {

state.metrics.track_entity_write(&entity_type, &entity);

state.entity_cache.set(key, entity)?;
state
.entity_cache
.set(key, entity, Some(&mut state.write_capacity_remaining))?;

Ok(())
}
Expand Down
3 changes: 1 addition & 2 deletions store/postgres/src/deployment_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -874,8 +874,7 @@ impl DeploymentStore {
}
}

/// Methods that back the trait `graph::components::Store`, but have small
/// variations in their signatures
/// Methods that back the trait `WritableStore`, but have small variations in their signatures
impl DeploymentStore {
pub(crate) async fn block_ptr(&self, site: Arc<Site>) -> Result<Option<BlockPtr>, StoreError> {
let site = site.cheap_clone();
Expand Down
22 changes: 13 additions & 9 deletions store/test-store/tests/graph/entity_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,12 +209,14 @@ fn insert_modifications() {

let mogwai_data = entity! { SCHEMA => id: "mogwai", name: "Mogwai" };
let mogwai_key = make_band_key("mogwai");
cache.set(mogwai_key.clone(), mogwai_data.clone()).unwrap();
cache
.set(mogwai_key.clone(), mogwai_data.clone(), None)
.unwrap();

let sigurros_data = entity! { SCHEMA => id: "sigurros", name: "Sigur Ros" };
let sigurros_key = make_band_key("sigurros");
cache
.set(sigurros_key.clone(), sigurros_data.clone())
.set(sigurros_key.clone(), sigurros_data.clone(), None)
.unwrap();

let result = cache.as_modifications(0);
Expand Down Expand Up @@ -253,12 +255,14 @@ fn overwrite_modifications() {

let mogwai_data = entity! { SCHEMA => id: "mogwai", name: "Mogwai", founded: 1995 };
let mogwai_key = make_band_key("mogwai");
cache.set(mogwai_key.clone(), mogwai_data.clone()).unwrap();
cache
.set(mogwai_key.clone(), mogwai_data.clone(), None)
.unwrap();

let sigurros_data = entity! { SCHEMA => id: "sigurros", name: "Sigur Ros", founded: 1994 };
let sigurros_key = make_band_key("sigurros");
cache
.set(sigurros_key.clone(), sigurros_data.clone())
.set(sigurros_key.clone(), sigurros_data.clone(), None)
.unwrap();

let result = cache.as_modifications(0);
Expand Down Expand Up @@ -289,12 +293,12 @@ fn consecutive_modifications() {
let update_data =
entity! { SCHEMA => id: "mogwai", founded: 1995, label: "Rock Action Records" };
let update_key = make_band_key("mogwai");
cache.set(update_key, update_data).unwrap();
cache.set(update_key, update_data, None).unwrap();

// Then, just reset the "label".
let update_data = entity! { SCHEMA => id: "mogwai", label: Value::Null };
let update_key = make_band_key("mogwai");
cache.set(update_key.clone(), update_data).unwrap();
cache.set(update_key.clone(), update_data, None).unwrap();

// We expect a single overwrite modification for the above that leaves "id"
// and "name" untouched, sets "founded" and removes the "label" field.
Expand Down Expand Up @@ -715,7 +719,7 @@ fn scoped_get() {
let account5 = ACCOUNT_TYPE.parse_id("5").unwrap();
let wallet5 = create_wallet_entity("5", &account5, 100);
let key5 = WALLET_TYPE.parse_key("5").unwrap();
cache.set(key5.clone(), wallet5.clone()).unwrap();
cache.set(key5.clone(), wallet5.clone(), None).unwrap();

// For the new entity, we can retrieve it with either scope
let act5 = cache.get(&key5, GetScope::InBlock).unwrap();
Expand All @@ -736,7 +740,7 @@ fn scoped_get() {
// But if it gets updated, it becomes visible with either scope
let mut wallet1 = wallet1;
wallet1.set("balance", 70).unwrap();
cache.set(key1.clone(), wallet1.clone()).unwrap();
cache.set(key1.clone(), wallet1.clone(), None).unwrap();
let act1 = cache.get(&key1, GetScope::InBlock).unwrap();
assert_eq!(Some(&wallet1), act1.as_ref().map(|e| e.as_ref()));
let act1 = cache.get(&key1, GetScope::Store).unwrap();
Expand Down Expand Up @@ -783,6 +787,6 @@ fn no_interface_mods() {

let entity = entity! { LOAD_RELATED_SUBGRAPH => id: "1", balance: 100 };

cache.set(key, entity).unwrap_err();
cache.set(key, entity, None).unwrap_err();
})
}
Loading