From bd4f6fb28f64ec2020a8026b1080e5cb41b2a845 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Fri, 31 Mar 2023 10:32:21 -0700 Subject: [PATCH 1/7] graph, store: Move the entity_cache test to test-store --- Cargo.lock | 3 --- graph/Cargo.toml | 3 --- graph/tests/README.md | 5 +++++ store/test-store/tests/graph.rs | 3 +++ .../tests => store/test-store/tests/graph}/entity_cache.rs | 1 - 5 files changed, 8 insertions(+), 7 deletions(-) create mode 100644 graph/tests/README.md create mode 100644 store/test-store/tests/graph.rs rename {graph/tests => store/test-store/tests/graph}/entity_cache.rs (99%) diff --git a/Cargo.lock b/Cargo.lock index 5f2b4fa8999..c909ae5a88b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1544,8 +1544,6 @@ dependencies = [ "ethabi", "futures 0.1.31", "futures 0.3.16", - "graph-chain-ethereum", - "graph-store-postgres", "graphql-parser", "hex", "hex-literal", @@ -1580,7 +1578,6 @@ dependencies = [ "stable-hash 0.4.2", "strum", "strum_macros", - "test-store", "thiserror", "tiny-keccak 1.5.0", "tokio", diff --git a/graph/Cargo.toml b/graph/Cargo.toml index 9686948ea27..39aba9d540f 100644 --- a/graph/Cargo.toml +++ b/graph/Cargo.toml @@ -66,9 +66,6 @@ web3 = { git = "https://github.com/graphprotocol/rust-web3", branch = "graph-pat serde_plain = "1.0.1" [dev-dependencies] -test-store = { path = "../store/test-store" } -graph-store-postgres = { path = "../store/postgres" } -graph-chain-ethereum = { path = "../chain/ethereum" } clap = { version = "3.2.23", features = ["derive", "env"] } maplit = "1.0.2" hex-literal = "0.3" diff --git a/graph/tests/README.md b/graph/tests/README.md new file mode 100644 index 00000000000..ff99b410d4b --- /dev/null +++ b/graph/tests/README.md @@ -0,0 +1,5 @@ +Put integration tests for this crate into `store/test-store/tests/graph`. +This avoids cyclic dev-dependencies which make rust-analyzer nearly +unusable. Once [this +issue](https://github.com/rust-lang/rust-analyzer/issues/14167) has been +fixed, we can move tests back here diff --git a/store/test-store/tests/graph.rs b/store/test-store/tests/graph.rs new file mode 100644 index 00000000000..6c8d2915540 --- /dev/null +++ b/store/test-store/tests/graph.rs @@ -0,0 +1,3 @@ +pub mod graph { + pub mod entity_cache; +} diff --git a/graph/tests/entity_cache.rs b/store/test-store/tests/graph/entity_cache.rs similarity index 99% rename from graph/tests/entity_cache.rs rename to store/test-store/tests/graph/entity_cache.rs index 38a490a2622..d0fc7dd3b26 100644 --- a/graph/tests/entity_cache.rs +++ b/store/test-store/tests/graph/entity_cache.rs @@ -1,4 +1,3 @@ -use async_trait::async_trait; use graph::blockchain::block_stream::FirehoseCursor; use graph::components::store::{ DeploymentCursorTracker, DerivedEntityQuery, EntityKey, EntityType, LoadRelatedRequest, From e1e943d4d9553afe4fcf56a3de942b6e9ab6c34c Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Fri, 31 Mar 2023 10:52:47 -0700 Subject: [PATCH 2/7] chain, store: Move ethereum tests to test-store --- Cargo.lock | 1 - chain/ethereum/Cargo.toml | 1 - chain/ethereum/tests/README.md | 5 +++++ store/test-store/tests/chain.rs | 5 +++++ .../tests/chain/ethereum}/full-text.graphql | 0 .../chain/ethereum}/ipfs-on-ethereum-contracts.ts | 0 .../chain/ethereum}/ipfs-on-ethereum-contracts.wasm | Bin .../test-store/tests/chain/ethereum}/manifest.rs | 2 +- 8 files changed, 11 insertions(+), 3 deletions(-) create mode 100644 chain/ethereum/tests/README.md create mode 100644 store/test-store/tests/chain.rs rename {chain/ethereum/tests => store/test-store/tests/chain/ethereum}/full-text.graphql (100%) rename {chain/ethereum/tests => store/test-store/tests/chain/ethereum}/ipfs-on-ethereum-contracts.ts (100%) rename {chain/ethereum/tests => store/test-store/tests/chain/ethereum}/ipfs-on-ethereum-contracts.wasm (100%) rename {chain/ethereum/tests => store/test-store/tests/chain/ethereum}/manifest.rs (99%) diff --git a/Cargo.lock b/Cargo.lock index c909ae5a88b..c69bea07c66 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1653,7 +1653,6 @@ dependencies = [ "prost-types", "semver", "serde", - "test-store", "tiny-keccak 1.5.0", "tonic-build", "uuid", diff --git a/chain/ethereum/Cargo.toml b/chain/ethereum/Cargo.toml index 170b9f554cc..4963b28ad79 100644 --- a/chain/ethereum/Cargo.toml +++ b/chain/ethereum/Cargo.toml @@ -25,7 +25,6 @@ graph-runtime-wasm = { path = "../../runtime/wasm" } graph-runtime-derive = { path = "../../runtime/derive" } [dev-dependencies] -test-store = { path = "../../store/test-store" } base64 = "0.20.0" uuid = { version = "1.3.0", features = ["v4"] } diff --git a/chain/ethereum/tests/README.md b/chain/ethereum/tests/README.md new file mode 100644 index 00000000000..e0444bc179f --- /dev/null +++ b/chain/ethereum/tests/README.md @@ -0,0 +1,5 @@ +Put integration tests for this crate into +`store/test-store/tests/chain/ethereum`. This avoids cyclic dev-dependencies +which make rust-analyzer nearly unusable. Once [this +issue](https://github.com/rust-lang/rust-analyzer/issues/14167) has been +fixed, we can move tests back here diff --git a/store/test-store/tests/chain.rs b/store/test-store/tests/chain.rs new file mode 100644 index 00000000000..3364791c26e --- /dev/null +++ b/store/test-store/tests/chain.rs @@ -0,0 +1,5 @@ +pub mod chain { + pub mod ethereum { + pub mod manifest; + } +} diff --git a/chain/ethereum/tests/full-text.graphql b/store/test-store/tests/chain/ethereum/full-text.graphql similarity index 100% rename from chain/ethereum/tests/full-text.graphql rename to store/test-store/tests/chain/ethereum/full-text.graphql diff --git a/chain/ethereum/tests/ipfs-on-ethereum-contracts.ts b/store/test-store/tests/chain/ethereum/ipfs-on-ethereum-contracts.ts similarity index 100% rename from chain/ethereum/tests/ipfs-on-ethereum-contracts.ts rename to store/test-store/tests/chain/ethereum/ipfs-on-ethereum-contracts.ts diff --git a/chain/ethereum/tests/ipfs-on-ethereum-contracts.wasm b/store/test-store/tests/chain/ethereum/ipfs-on-ethereum-contracts.wasm similarity index 100% rename from chain/ethereum/tests/ipfs-on-ethereum-contracts.wasm rename to store/test-store/tests/chain/ethereum/ipfs-on-ethereum-contracts.wasm diff --git a/chain/ethereum/tests/manifest.rs b/store/test-store/tests/chain/ethereum/manifest.rs similarity index 99% rename from chain/ethereum/tests/manifest.rs rename to store/test-store/tests/chain/ethereum/manifest.rs index 5d1c7bb3a84..f2278311cf9 100644 --- a/chain/ethereum/tests/manifest.rs +++ b/store/test-store/tests/chain/ethereum/manifest.rs @@ -18,8 +18,8 @@ use graph::{ data::subgraph::SubgraphFeature, }; +use graph::semver::Version; use graph_chain_ethereum::{Chain, NodeCapabilities}; -use semver::Version; use test_store::LOGGER; const GQL_SCHEMA: &str = "type Thing @entity { id: ID! }"; From b74748873edbd5b5a993d8487607aeefa6b96cdc Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Fri, 31 Mar 2023 11:30:01 -0700 Subject: [PATCH 3/7] core, store: Move core tests to test-store --- Cargo.lock | 2 +- core/Cargo.toml | 1 - core/tests/README.md | 5 +++++ store/test-store/Cargo.toml | 3 +++ store/test-store/tests/core.rs | 3 +++ store/test-store/tests/core/fixtures/ipfs_folder/hello.txt | 1 + store/test-store/tests/core/fixtures/ipfs_folder/random.txt | 1 + {core/tests => store/test-store/tests/core}/interfaces.rs | 0 8 files changed, 14 insertions(+), 2 deletions(-) create mode 100644 core/tests/README.md create mode 100644 store/test-store/tests/core.rs create mode 100644 store/test-store/tests/core/fixtures/ipfs_folder/hello.txt create mode 100644 store/test-store/tests/core/fixtures/ipfs_folder/random.txt rename {core/tests => store/test-store/tests/core}/interfaces.rs (100%) diff --git a/Cargo.lock b/Cargo.lock index c69bea07c66..b64ebea1d57 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1729,7 +1729,6 @@ dependencies = [ "serde", "serde_json", "serde_yaml", - "test-store", "tower 0.4.12", "tower-test", "uuid", @@ -4367,6 +4366,7 @@ dependencies = [ "graphql-parser", "hex-literal", "lazy_static", + "pretty_assertions", "prost-types", "serde", ] diff --git a/core/Cargo.toml b/core/Cargo.toml index 6100eeb7aa9..5c270e4a98f 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -32,7 +32,6 @@ anyhow = "1.0" [dev-dependencies] tower-test = { git = "https://github.com/tower-rs/tower.git" } -test-store = { path = "../store/test-store" } hex = "0.4.3" graphql-parser = "0.4.0" pretty_assertions = "1.3.0" diff --git a/core/tests/README.md b/core/tests/README.md new file mode 100644 index 00000000000..261623bcccf --- /dev/null +++ b/core/tests/README.md @@ -0,0 +1,5 @@ +Put integration tests for this crate into `store/test-store/tests/core`. +This avoids cyclic dev-dependencies which make rust-analyzer nearly +unusable. Once [this +issue](https://github.com/rust-lang/rust-analyzer/issues/14167) has been +fixed, we can move tests back here diff --git a/store/test-store/Cargo.toml b/store/test-store/Cargo.toml index 2ddd85b0989..fd134344991 100644 --- a/store/test-store/Cargo.toml +++ b/store/test-store/Cargo.toml @@ -18,3 +18,6 @@ hex-literal = "0.3" diesel = { version = "1.4.8", features = ["postgres", "serde_json", "numeric", "r2d2"] } serde = "1.0" prost-types = { workspace = true } + +[dev-dependencies] +pretty_assertions = "1.3.0" diff --git a/store/test-store/tests/core.rs b/store/test-store/tests/core.rs new file mode 100644 index 00000000000..46d45977a1f --- /dev/null +++ b/store/test-store/tests/core.rs @@ -0,0 +1,3 @@ +pub mod core { + pub mod interfaces; +} diff --git a/store/test-store/tests/core/fixtures/ipfs_folder/hello.txt b/store/test-store/tests/core/fixtures/ipfs_folder/hello.txt new file mode 100644 index 00000000000..3b18e512dba --- /dev/null +++ b/store/test-store/tests/core/fixtures/ipfs_folder/hello.txt @@ -0,0 +1 @@ +hello world diff --git a/store/test-store/tests/core/fixtures/ipfs_folder/random.txt b/store/test-store/tests/core/fixtures/ipfs_folder/random.txt new file mode 100644 index 00000000000..87332e5d5cc --- /dev/null +++ b/store/test-store/tests/core/fixtures/ipfs_folder/random.txt @@ -0,0 +1 @@ +20c12d76-0e6a-428c-b6c9-b7e384ccb6fc \ No newline at end of file diff --git a/core/tests/interfaces.rs b/store/test-store/tests/core/interfaces.rs similarity index 100% rename from core/tests/interfaces.rs rename to store/test-store/tests/core/interfaces.rs From c74154d04d5ef5b32cb89161d909eed1f35506ea Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Fri, 31 Mar 2023 11:33:26 -0700 Subject: [PATCH 4/7] graphql, store: Move graphql tests to test-store --- Cargo.lock | 2 -- graphql/Cargo.toml | 2 -- graphql/tests/README.md | 5 +++++ store/test-store/tests/graphql.rs | 4 ++++ .../test-store/tests/graphql}/introspection.rs | 3 --- {graphql/tests => store/test-store/tests/graphql}/query.rs | 3 --- 6 files changed, 9 insertions(+), 10 deletions(-) create mode 100644 graphql/tests/README.md create mode 100644 store/test-store/tests/graphql.rs rename {graphql/tests => store/test-store/tests/graphql}/introspection.rs (99%) rename {graphql/tests => store/test-store/tests/graphql}/query.rs (99%) diff --git a/Cargo.lock b/Cargo.lock index b64ebea1d57..bb6368e5cae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1744,7 +1744,6 @@ dependencies = [ "crossbeam", "defer", "graph", - "graph-chain-ethereum", "graphql-parser", "graphql-tools", "indexmap", @@ -1753,7 +1752,6 @@ dependencies = [ "pretty_assertions", "stable-hash 0.3.3", "stable-hash 0.4.2", - "test-store", ] [[package]] diff --git a/graphql/Cargo.toml b/graphql/Cargo.toml index 17bd166e8d1..9d779a39270 100644 --- a/graphql/Cargo.toml +++ b/graphql/Cargo.toml @@ -20,5 +20,3 @@ async-recursion = "1.0.0" [dev-dependencies] pretty_assertions = "1.3.0" -test-store = { path = "../store/test-store" } -graph-chain-ethereum = { path = "../chain/ethereum" } diff --git a/graphql/tests/README.md b/graphql/tests/README.md new file mode 100644 index 00000000000..c2b55fa311e --- /dev/null +++ b/graphql/tests/README.md @@ -0,0 +1,5 @@ +Put integration tests for this crate into `store/test-store/tests/graphql`. +This avoids cyclic dev-dependencies which make rust-analyzer nearly +unusable. Once [this +issue](https://github.com/rust-lang/rust-analyzer/issues/14167) has been +fixed, we can move tests back here diff --git a/store/test-store/tests/graphql.rs b/store/test-store/tests/graphql.rs new file mode 100644 index 00000000000..3ae1fcd2b74 --- /dev/null +++ b/store/test-store/tests/graphql.rs @@ -0,0 +1,4 @@ +pub mod graphql { + pub mod introspection; + pub mod query; +} diff --git a/graphql/tests/introspection.rs b/store/test-store/tests/graphql/introspection.rs similarity index 99% rename from graphql/tests/introspection.rs rename to store/test-store/tests/graphql/introspection.rs index ab2360e2567..43ba9bff433 100644 --- a/graphql/tests/introspection.rs +++ b/store/test-store/tests/graphql/introspection.rs @@ -1,6 +1,3 @@ -#[macro_use] -extern crate pretty_assertions; - use std::sync::Arc; use graph::data::graphql::{object, object_value, ObjectOrInterface}; diff --git a/graphql/tests/query.rs b/store/test-store/tests/graphql/query.rs similarity index 99% rename from graphql/tests/query.rs rename to store/test-store/tests/graphql/query.rs index af3f871fd6a..1f4c02a97ae 100644 --- a/graphql/tests/query.rs +++ b/store/test-store/tests/graphql/query.rs @@ -1,6 +1,3 @@ -#[macro_use] -extern crate pretty_assertions; - use graph::components::store::EntityKey; use graph::data::subgraph::schema::DeploymentCreate; use graph::entity; From 04ba48f6ace36a34759036e8264dd4432b2f8b7f Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Fri, 31 Mar 2023 11:39:31 -0700 Subject: [PATCH 5/7] store: Move postgres tests to test-store --- Cargo.lock | 3 +-- store/postgres/Cargo.toml | 2 -- store/postgres/tests/README.md | 5 +++++ store/test-store/Cargo.toml | 1 + store/test-store/tests/postgres.rs | 9 +++++++++ .../tests => test-store/tests/postgres}/chain_head.rs | 2 +- .../tests => test-store/tests/postgres}/graft.rs | 0 .../tests => test-store/tests/postgres}/relational.rs | 0 .../tests/postgres}/relational_bytes.rs | 0 .../tests => test-store/tests/postgres}/store.rs | 0 .../tests => test-store/tests/postgres}/subgraph.rs | 0 .../tests => test-store/tests/postgres}/writable.rs | 0 12 files changed, 17 insertions(+), 5 deletions(-) create mode 100644 store/postgres/tests/README.md create mode 100644 store/test-store/tests/postgres.rs rename store/{postgres/tests => test-store/tests/postgres}/chain_head.rs (99%) rename store/{postgres/tests => test-store/tests/postgres}/graft.rs (100%) rename store/{postgres/tests => test-store/tests/postgres}/relational.rs (100%) rename store/{postgres/tests => test-store/tests/postgres}/relational_bytes.rs (100%) rename store/{postgres/tests => test-store/tests/postgres}/store.rs (100%) rename store/{postgres/tests => test-store/tests/postgres}/subgraph.rs (100%) rename store/{postgres/tests => test-store/tests/postgres}/writable.rs (100%) diff --git a/Cargo.lock b/Cargo.lock index bb6368e5cae..c46f5235f34 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1928,7 +1928,6 @@ dependencies = [ "futures 0.3.16", "git-testament", "graph", - "graph-chain-ethereum", "graph-core", "graph-graphql", "graphql-parser", @@ -1946,7 +1945,6 @@ dependencies = [ "rand", "serde", "stable-hash 0.3.3", - "test-store", "uuid", ] @@ -4362,6 +4360,7 @@ dependencies = [ "graph-node", "graph-store-postgres", "graphql-parser", + "hex", "hex-literal", "lazy_static", "pretty_assertions", diff --git a/store/postgres/Cargo.toml b/store/postgres/Cargo.toml index ee185f9f05f..f407b0aff71 100644 --- a/store/postgres/Cargo.toml +++ b/store/postgres/Cargo.toml @@ -40,6 +40,4 @@ pretty_assertions = "1.3.0" futures = "0.3" clap = "3.2.23" graphql-parser = "0.4.0" -test-store = { path = "../test-store" } hex-literal = "0.3" -graph-chain-ethereum = { path = "../../chain/ethereum" } diff --git a/store/postgres/tests/README.md b/store/postgres/tests/README.md new file mode 100644 index 00000000000..9fa18d53625 --- /dev/null +++ b/store/postgres/tests/README.md @@ -0,0 +1,5 @@ +Put integration tests for this crate into `store/test-store/tests/postgres`. +This avoids cyclic dev-dependencies which make rust-analyzer nearly +unusable. Once [this +issue](https://github.com/rust-lang/rust-analyzer/issues/14167) has been +fixed, we can move tests back here diff --git a/store/test-store/Cargo.toml b/store/test-store/Cargo.toml index fd134344991..c03cdc6ca8c 100644 --- a/store/test-store/Cargo.toml +++ b/store/test-store/Cargo.toml @@ -20,4 +20,5 @@ serde = "1.0" prost-types = { workspace = true } [dev-dependencies] +hex = "0.4.3" pretty_assertions = "1.3.0" diff --git a/store/test-store/tests/postgres.rs b/store/test-store/tests/postgres.rs new file mode 100644 index 00000000000..71c7e3a37c1 --- /dev/null +++ b/store/test-store/tests/postgres.rs @@ -0,0 +1,9 @@ +pub mod postgres { + pub mod chain_head; + pub mod graft; + pub mod relational; + pub mod relational_bytes; + pub mod store; + pub mod subgraph; + pub mod writable; +} diff --git a/store/postgres/tests/chain_head.rs b/store/test-store/tests/postgres/chain_head.rs similarity index 99% rename from store/postgres/tests/chain_head.rs rename to store/test-store/tests/postgres/chain_head.rs index 9614dc0ae79..612333bc411 100644 --- a/store/postgres/tests/chain_head.rs +++ b/store/test-store/tests/postgres/chain_head.rs @@ -1,7 +1,7 @@ //! Test ChainStore implementation of Store, in particular, how //! the chain head pointer gets updated in various situations -use futures::executor; +use graph::prelude::futures03::executor; use std::future::Future; use std::sync::Arc; diff --git a/store/postgres/tests/graft.rs b/store/test-store/tests/postgres/graft.rs similarity index 100% rename from store/postgres/tests/graft.rs rename to store/test-store/tests/postgres/graft.rs diff --git a/store/postgres/tests/relational.rs b/store/test-store/tests/postgres/relational.rs similarity index 100% rename from store/postgres/tests/relational.rs rename to store/test-store/tests/postgres/relational.rs diff --git a/store/postgres/tests/relational_bytes.rs b/store/test-store/tests/postgres/relational_bytes.rs similarity index 100% rename from store/postgres/tests/relational_bytes.rs rename to store/test-store/tests/postgres/relational_bytes.rs diff --git a/store/postgres/tests/store.rs b/store/test-store/tests/postgres/store.rs similarity index 100% rename from store/postgres/tests/store.rs rename to store/test-store/tests/postgres/store.rs diff --git a/store/postgres/tests/subgraph.rs b/store/test-store/tests/postgres/subgraph.rs similarity index 100% rename from store/postgres/tests/subgraph.rs rename to store/test-store/tests/postgres/subgraph.rs diff --git a/store/postgres/tests/writable.rs b/store/test-store/tests/postgres/writable.rs similarity index 100% rename from store/postgres/tests/writable.rs rename to store/test-store/tests/postgres/writable.rs From 8417383525889c6d24695b0a802916358446d105 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Fri, 31 Mar 2023 16:08:32 -0700 Subject: [PATCH 6/7] store: Several improvements to Writable - check that the writer thread is still running when accepting items to be queued. It makes no sense to accept requests when the writer thread is not running, and should therefore fail early. - change the mechanism for how tests can single-step a deployment to be specific to that deployment to avoid problems with tests running at the same time. This was a huge footgun, and getting caught in that looks like a deadlock - implement Debug for queued requests --- store/postgres/src/writable.rs | 133 ++++++++++++++++---- store/test-store/tests/postgres/writable.rs | 4 +- 2 files changed, 109 insertions(+), 28 deletions(-) diff --git a/store/postgres/src/writable.rs b/store/postgres/src/writable.rs index a5127aa715b..cda37b79aeb 100644 --- a/store/postgres/src/writable.rs +++ b/store/postgres/src/writable.rs @@ -5,6 +5,7 @@ use std::{collections::BTreeMap, sync::Arc}; use graph::blockchain::block_stream::FirehoseCursor; use graph::components::store::{DeploymentCursorTracker, DerivedEntityQuery, EntityKey, ReadStore}; +use graph::constraint_violation; use graph::data::subgraph::schema; use graph::data_source::CausalityRegion; use graph::prelude::{ @@ -12,6 +13,7 @@ use graph::prelude::{ BLOCK_NUMBER_MAX, }; use graph::slog::info; +use graph::tokio::task::JoinHandle; use graph::util::bounded_queue::BoundedQueue; use graph::{ cheap_clone::CheapClone, @@ -448,6 +450,29 @@ enum Request { Stop, } +impl std::fmt::Debug for Request { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Write { + block_ptr, + mods, + store, + .. + } => write!( + f, + "write[{}, {:p}, {} entities]", + block_ptr.number, + store.as_ref(), + mods.len() + ), + Self::RevertTo { + block_ptr, store, .. + } => write!(f, "revert[{}, {:p}]", block_ptr.number, store.as_ref()), + Self::Stop => write!(f, "stop"), + } + } +} + enum ExecResult { Continue, Stop, @@ -520,28 +545,56 @@ struct Queue { /// allowed to process as many requests as it can #[cfg(debug_assertions)] pub(crate) mod test_support { - use std::sync::atomic::{AtomicBool, Ordering}; + use std::{ + collections::HashMap, + sync::{Arc, Mutex}, + }; - use graph::{prelude::lazy_static, util::bounded_queue::BoundedQueue}; + use graph::{ + components::store::{DeploymentId, DeploymentLocator}, + prelude::lazy_static, + util::bounded_queue::BoundedQueue, + }; lazy_static! { - static ref DO_STEP: AtomicBool = AtomicBool::new(false); - static ref ALLOWED_STEPS: BoundedQueue<()> = BoundedQueue::with_capacity(1_000); + static ref STEPS: Mutex>>> = + Mutex::new(HashMap::new()); } - pub(super) async fn take_step() { - if DO_STEP.load(Ordering::SeqCst) { - ALLOWED_STEPS.pop().await + pub(super) async fn take_step(deployment: &DeploymentLocator) { + let steps = STEPS.lock().unwrap().get(&deployment.id).cloned(); + if let Some(steps) = steps { + steps.pop().await; } } /// Allow the writer to process `steps` requests. After calling this, /// the writer will only process the number of requests it is allowed to - pub async fn allow_steps(steps: usize) { + pub async fn allow_steps(deployment: &DeploymentLocator, steps: usize) { + let queue = { + let mut map = STEPS.lock().unwrap(); + map.entry(deployment.id) + .or_insert_with(|| Arc::new(BoundedQueue::with_capacity(1_000))) + .clone() + }; for _ in 0..steps { - ALLOWED_STEPS.push(()).await + queue.push(()).await } - DO_STEP.store(true, Ordering::SeqCst); + } +} + +impl std::fmt::Debug for Queue { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let reqs = self.queue.fold(vec![], |mut reqs, req| { + reqs.push(req.clone()); + reqs + }); + + write!(f, "reqs[{} : ", self.store.site)?; + for req in reqs { + write!(f, " {:?}", req)?; + } + writeln!(f, "]") } } @@ -552,11 +605,11 @@ impl Queue { store: Arc, capacity: usize, registry: Arc, - ) -> Arc { + ) -> (Arc, JoinHandle<()>) { async fn start_writer(queue: Arc, logger: Logger) { loop { #[cfg(debug_assertions)] - test_support::take_step().await; + test_support::take_step(&queue.store.site.as_ref().into()).await; // We peek at the front of the queue, rather than pop it // right away, so that query methods like `get` have access @@ -623,9 +676,9 @@ impl Queue { }; let queue = Arc::new(queue); - graph::spawn(start_writer(queue.cheap_clone(), logger)); + let handle = graph::spawn(start_writer(queue.cheap_clone(), logger)); - queue + (queue, handle) } /// Add a write request to the queue @@ -637,6 +690,7 @@ impl Queue { /// Wait for the background writer to finish processing queued entries async fn flush(&self) -> Result<(), StoreError> { + self.check_err()?; self.queue.wait_empty().await; self.check_err() } @@ -880,7 +934,10 @@ impl Queue { /// A shim to allow bypassing any pipelined store handling if need be enum Writer { Sync(Arc), - Async(Arc), + Async { + queue: Arc, + join_handle: JoinHandle<()>, + }, } impl Writer { @@ -894,7 +951,24 @@ impl Writer { if capacity == 0 { Self::Sync(store) } else { - Self::Async(Queue::start(logger, store, capacity, registry)) + let (queue, join_handle) = Queue::start(logger, store.clone(), capacity, registry); + Self::Async { queue, join_handle } + } + } + + fn check_queue_running(&self) -> Result<(), StoreError> { + match self { + Writer::Sync(_) => Ok(()), + Writer::Async { join_handle, queue } => { + if join_handle.is_finished() { + Err(constraint_violation!( + "Subgraph writer for {} is not running", + queue.store.site + )) + } else { + Ok(()) + } + } } } @@ -920,7 +994,8 @@ impl Writer { &manifest_idx_and_name, &processed_data_sources, ), - Writer::Async(queue) => { + Writer::Async { queue, .. } => { + self.check_queue_running()?; let req = Request::Write { store: queue.store.cheap_clone(), stopwatch: queue.stopwatch.cheap_clone(), @@ -944,7 +1019,8 @@ impl Writer { ) -> Result<(), StoreError> { match self { Writer::Sync(store) => store.revert_block_operations(block_ptr_to, &firehose_cursor), - Writer::Async(queue) => { + Writer::Async { queue, .. } => { + self.check_queue_running()?; let req = Request::RevertTo { store: queue.store.cheap_clone(), block_ptr: block_ptr_to, @@ -958,14 +1034,17 @@ impl Writer { async fn flush(&self) -> Result<(), StoreError> { match self { Writer::Sync { .. } => Ok(()), - Writer::Async(queue) => queue.flush().await, + Writer::Async { queue, .. } => { + self.check_queue_running()?; + queue.flush().await + } } } fn get(&self, key: &EntityKey) -> Result, StoreError> { match self { Writer::Sync(store) => store.get(key, BLOCK_NUMBER_MAX), - Writer::Async(queue) => queue.get(key), + Writer::Async { queue, .. } => queue.get(key), } } @@ -975,7 +1054,7 @@ impl Writer { ) -> Result, StoreError> { match self { Writer::Sync(store) => store.get_many(keys, BLOCK_NUMBER_MAX), - Writer::Async(queue) => queue.get_many(keys), + Writer::Async { queue, .. } => queue.get_many(keys), } } @@ -985,7 +1064,7 @@ impl Writer { ) -> Result, StoreError> { match self { Writer::Sync(store) => store.get_derived(key, BLOCK_NUMBER_MAX, vec![]), - Writer::Async(queue) => queue.get_derived(key), + Writer::Async { queue, .. } => queue.get_derived(key), } } @@ -999,28 +1078,30 @@ impl Writer { .load_dynamic_data_sources(BLOCK_NUMBER_MAX, manifest_idx_and_name) .await } - Writer::Async(queue) => queue.load_dynamic_data_sources(manifest_idx_and_name).await, + Writer::Async { queue, .. } => { + queue.load_dynamic_data_sources(manifest_idx_and_name).await + } } } fn poisoned(&self) -> bool { match self { Writer::Sync(_) => false, - Writer::Async(queue) => queue.poisoned(), + Writer::Async { queue, .. } => queue.poisoned(), } } async fn stop(&self) -> Result<(), StoreError> { match self { Writer::Sync(_) => Ok(()), - Writer::Async(queue) => queue.stop().await, + Writer::Async { queue, .. } => queue.stop().await, } } fn deployment_synced(&self) { match self { Writer::Sync(_) => {} - Writer::Async(queue) => queue.deployment_synced(), + Writer::Async { queue, .. } => queue.deployment_synced(), } } } diff --git a/store/test-store/tests/postgres/writable.rs b/store/test-store/tests/postgres/writable.rs index a171de104cf..1c0733e0574 100644 --- a/store/test-store/tests/postgres/writable.rs +++ b/store/test-store/tests/postgres/writable.rs @@ -121,11 +121,11 @@ async fn insert_count(store: &Arc, deployment: &DeploymentL async fn pause_writer(deployment: &DeploymentLocator) { flush(deployment).await.unwrap(); - writable::allow_steps(0).await; + writable::allow_steps(deployment, 0).await; } async fn resume_writer(deployment: &DeploymentLocator, steps: usize) { - writable::allow_steps(steps).await; + writable::allow_steps(deployment, steps).await; flush(deployment).await.unwrap(); } From 3ba05c329bc9e35ee63b059ca7a60de368823d85 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Fri, 31 Mar 2023 18:11:27 -0700 Subject: [PATCH 7/7] all: Remove some unneeded dev-dependencies --- Cargo.lock | 20 +++++++------------- core/Cargo.toml | 4 ---- graphql/Cargo.toml | 3 --- store/postgres/Cargo.toml | 2 -- 4 files changed, 7 insertions(+), 22 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c46f5235f34..b44deb1a5bf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1718,13 +1718,10 @@ dependencies = [ "graph-chain-near", "graph-chain-substreams", "graph-runtime-wasm", - "graphql-parser", - "hex", "ipfs-api", "ipfs-api-backend-hyper", "lazy_static", "lru_time_cache", - "pretty_assertions", "semver", "serde", "serde_json", @@ -1749,7 +1746,6 @@ dependencies = [ "indexmap", "lazy_static", "parking_lot 0.12.1", - "pretty_assertions", "stable-hash 0.3.3", "stable-hash 0.4.2", ] @@ -1925,14 +1921,12 @@ dependencies = [ "diesel_derives", "diesel_migrations", "fallible-iterator", - "futures 0.3.16", "git-testament", "graph", "graph-core", "graph-graphql", "graphql-parser", "hex", - "hex-literal", "itertools", "lazy_static", "lru_time_cache", @@ -2209,9 +2203,9 @@ dependencies = [ [[package]] name = "hyper-rustls" -version = "0.23.0" +version = "0.23.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d87c48c02e0dc5e3b849a2041db3029fd066650f8f717c07bf8ed78ccb895cac" +checksum = "1788965e61b367cd03a62950836d5cd41560c3577d90e40e0819373194d1661c" dependencies = [ "http", "hyper", @@ -2769,13 +2763,14 @@ checksum = "0debeb9fcf88823ea64d64e4a815ab1643f33127d995978e099942ce38f25238" [[package]] name = "multiaddr" -version = "0.17.0" +version = "0.17.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b53e0cc5907a5c216ba6584bf74be8ab47d6d6289f72793b2dddbf15dc3bf8c" +checksum = "2b36f567c7099511fa8612bbbb52dda2419ce0bdbacf31714e3a5ffdb766d3bd" dependencies = [ "arrayref", "byteorder", "data-encoding", + "log", "multibase", "multihash 0.17.0", "percent-encoding", @@ -5048,12 +5043,11 @@ checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" [[package]] name = "walkdir" -version = "2.3.2" +version = "2.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "808cf2735cd4b6866113f648b791c6adc5714537bc222d9347bb203386ffda56" +checksum = "36df944cda56c7d8d8b7496af378e6b16de9284591917d307c9b4d313c44e698" dependencies = [ "same-file", - "winapi", "winapi-util", ] diff --git a/core/Cargo.toml b/core/Cargo.toml index 5c270e4a98f..fbc4e27c723 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -32,10 +32,6 @@ anyhow = "1.0" [dev-dependencies] tower-test = { git = "https://github.com/tower-rs/tower.git" } -hex = "0.4.3" -graphql-parser = "0.4.0" -pretty_assertions = "1.3.0" -anyhow = "1.0" ipfs-api-backend-hyper = "0.6" ipfs-api = { version = "0.17.0", features = [ "with-hyper-rustls", diff --git a/graphql/Cargo.toml b/graphql/Cargo.toml index 9d779a39270..3e247f66988 100644 --- a/graphql/Cargo.toml +++ b/graphql/Cargo.toml @@ -17,6 +17,3 @@ defer = "0.1" parking_lot = "0.12" anyhow = "1.0" async-recursion = "1.0.0" - -[dev-dependencies] -pretty_assertions = "1.3.0" diff --git a/store/postgres/Cargo.toml b/store/postgres/Cargo.toml index f407b0aff71..90c977aa3b2 100644 --- a/store/postgres/Cargo.toml +++ b/store/postgres/Cargo.toml @@ -37,7 +37,5 @@ hex = "0.4.3" pretty_assertions = "1.3.0" [dev-dependencies] -futures = "0.3" clap = "3.2.23" graphql-parser = "0.4.0" -hex-literal = "0.3"