diff --git a/Cargo.lock b/Cargo.lock index fc818c79..bd983444 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1566,6 +1566,8 @@ dependencies = [ "reth-rpc-eth-api", "reth-testing-utils", "reth-tracing", + "revm", + "revm-database", "rollup-boost", "serde", "serde_json", @@ -1585,11 +1587,19 @@ dependencies = [ "alloy-eips", "alloy-genesis", "alloy-primitives", + "alloy-provider", "alloy-rpc-client", + "alloy-rpc-types-engine", + "alloy-rpc-types-eth", + "alloy-sol-types", + "arc-swap", + "base-reth-flashblocks-rpc", "base-reth-test-utils", "eyre", + "hex-literal", "jsonrpsee 0.26.0", "op-alloy-consensus", + "op-alloy-rpc-types", "rand 0.9.2", "reth", "reth-db", @@ -1607,7 +1617,10 @@ dependencies = [ "reth-testing-utils", "reth-tracing", "reth-transaction-pool", + "reth-trie-common", "revm", + "revm-database", + "rollup-boost", "serde", "serde_json", "tips-core", @@ -4103,6 +4116,12 @@ dependencies = [ "arrayvec", ] +[[package]] +name = "hex-literal" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fe2267d4ed49bc07b63801559be28c718ea06c4738b7a03c94df7386d2cde46" + [[package]] name = "hickory-proto" version = "0.25.2" @@ -4341,7 +4360,7 @@ dependencies = [ "libc", "percent-encoding", "pin-project-lite", - "socket2 0.6.1", + "socket2 0.5.10", "system-configuration", "tokio", "tower-service", @@ -4361,7 +4380,7 @@ dependencies = [ "js-sys", "log", "wasm-bindgen", - "windows-core 0.62.2", + "windows-core 0.57.0", ] [[package]] @@ -6761,7 +6780,7 @@ dependencies = [ "quinn-udp", "rustc-hash 2.1.1", "rustls", - "socket2 0.6.1", + "socket2 0.5.10", "thiserror 2.0.17", "tokio", "tracing", @@ -6798,7 +6817,7 @@ dependencies = [ "cfg_aliases", "libc", "once_cell", - "socket2 0.6.1", + "socket2 0.5.10", "tracing", "windows-sys 0.60.2", ] @@ -11726,7 +11745,7 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tips-core" version = "0.1.0" -source = "git+https://github.com/base/tips?rev=a21ee492dede17f31eea108c12c669a8190f31aa#a21ee492dede17f31eea108c12c669a8190f31aa" +source = "git+https://github.com/base/tips?rev=c08eaa4fe10c26de8911609b41ddab4918698325#c08eaa4fe10c26de8911609b41ddab4918698325" dependencies = [ "alloy-consensus", "alloy-primitives", @@ -12742,7 +12761,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.48.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 6964da5e..1b76b115 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,7 +47,7 @@ base-reth-transaction-tracing = { path = "crates/transaction-tracing" } # base/tips # Note: default-features = false avoids version conflicts with reth's alloy/op-alloy dependencies -tips-core = { git = "https://github.com/base/tips", rev = "a21ee492dede17f31eea108c12c669a8190f31aa", default-features = false } +tips-core = { git = "https://github.com/base/tips", rev = "c08eaa4fe10c26de8911609b41ddab4918698325", default-features = false } # reth reth = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3" } @@ -74,10 +74,12 @@ reth-db-common = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3" } reth-rpc-layer = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3" } reth-ipc = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3" } reth-node-core = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3" } +reth-trie-common = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3" } # revm revm = { version = "31.0.2", default-features = false } revm-bytecode = { version = "7.1.1", default-features = false } +revm-database = { version = "9.0.6", default-features = false } # alloy alloy-primitives = { version = "1.4.1", default-features = false, features = [ @@ -89,6 +91,7 @@ alloy-rpc-types = { version = "1.0.41", default-features = false } alloy-rpc-types-engine = { version = "1.0.41", default-features = false } alloy-rpc-types-eth = { version = "1.0.41" } alloy-consensus = { version = "1.0.41" } +alloy-sol-types = { version = "1.4.1" } alloy-trie = { version = "0.9.1", default-features = false } alloy-provider = { version = "1.0.41" } alloy-hardforks = "0.4.4" diff --git a/crates/flashblocks-rpc/Cargo.toml b/crates/flashblocks-rpc/Cargo.toml index d12a57eb..0247e4ff 100644 --- a/crates/flashblocks-rpc/Cargo.toml +++ b/crates/flashblocks-rpc/Cargo.toml @@ -29,6 +29,9 @@ reth-primitives.workspace = true reth-primitives-traits.workspace = true reth-exex.workspace = true +# revm +revm-database.workspace = true + # alloy alloy-primitives.workspace = true alloy-eips.workspace = true @@ -76,6 +79,7 @@ arc-swap.workspace = true [dev-dependencies] base-reth-test-utils.workspace = true rand.workspace = true +revm.workspace = true reth-db.workspace = true reth-testing-utils.workspace = true reth-db-common.workspace = true diff --git a/crates/flashblocks-rpc/src/metrics.rs b/crates/flashblocks-rpc/src/metrics.rs index bd73816a..4c557fae 100644 --- a/crates/flashblocks-rpc/src/metrics.rs +++ b/crates/flashblocks-rpc/src/metrics.rs @@ -66,4 +66,10 @@ pub struct Metrics { #[metric(describe = "Total number of WebSocket reconnection attempts")] pub reconnect_attempts: Counter, + + #[metric(describe = "Time taken to clone bundle state")] + pub bundle_state_clone_duration: Histogram, + + #[metric(describe = "Size of bundle state being cloned (number of accounts)")] + pub bundle_state_clone_size: Histogram, } diff --git a/crates/flashblocks-rpc/src/pending_blocks.rs b/crates/flashblocks-rpc/src/pending_blocks.rs index a0d9ffbc..1c8b22a4 100644 --- a/crates/flashblocks-rpc/src/pending_blocks.rs +++ b/crates/flashblocks-rpc/src/pending_blocks.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{sync::Arc, time::Instant}; use alloy_consensus::{Header, Sealed}; use alloy_eips::BlockNumberOrTag; @@ -13,11 +13,14 @@ use arc_swap::Guard; use eyre::eyre; use op_alloy_network::Optimism; use op_alloy_rpc_types::{OpTransactionReceipt, Transaction}; -use reth::revm::{db::Cache, state::EvmState}; +use reth::revm::{ + db::{BundleState, Cache}, + state::EvmState, +}; use reth_rpc_convert::RpcTransaction; use reth_rpc_eth_api::{RpcBlock, RpcReceipt}; -use crate::{rpc::PendingBlocksAPI, subscription::Flashblock}; +use crate::{metrics::Metrics, rpc::PendingBlocksAPI, subscription::Flashblock}; pub struct PendingBlocksBuilder { flashblocks: Vec, @@ -33,6 +36,7 @@ pub struct PendingBlocksBuilder { state_overrides: Option, db_cache: Cache, + bundle_state: BundleState, } impl PendingBlocksBuilder { @@ -49,6 +53,7 @@ impl PendingBlocksBuilder { transaction_senders: HashMap::new(), state_overrides: None, db_cache: Cache::default(), + bundle_state: BundleState::default(), } } @@ -116,6 +121,12 @@ impl PendingBlocksBuilder { self } + #[inline] + pub(crate) fn with_bundle_state(&mut self, bundle_state: BundleState) -> &Self { + self.bundle_state = bundle_state; + self + } + pub(crate) fn build(self) -> eyre::Result { if self.headers.is_empty() { return Err(eyre!("missing headers")); @@ -137,6 +148,7 @@ impl PendingBlocksBuilder { transaction_senders: self.transaction_senders, state_overrides: self.state_overrides, db_cache: self.db_cache, + bundle_state: self.bundle_state, }) } } @@ -156,6 +168,7 @@ pub struct PendingBlocks { state_overrides: Option, db_cache: Cache, + bundle_state: BundleState, } impl PendingBlocks { @@ -195,6 +208,22 @@ impl PendingBlocks { self.db_cache.clone() } + /// Returns a clone of the bundle state. + /// + /// NOTE: This clones the entire BundleState, which contains a HashMap of all touched + /// accounts and their storage slots. The cost scales with the number of accounts and + /// storage slots modified in the flashblock. Monitor `bundle_state_clone_duration` and + /// `bundle_state_clone_size` metrics to track if this becomes a bottleneck. + pub fn get_bundle_state(&self) -> BundleState { + let metrics = Metrics::default(); + let size = self.bundle_state.state.len(); + let start = Instant::now(); + let cloned = self.bundle_state.clone(); + metrics.bundle_state_clone_duration.record(start.elapsed()); + metrics.bundle_state_clone_size.record(size as f64); + cloned + } + pub fn get_transactions_for_block(&self, block_number: BlockNumber) -> Vec { self.transactions .iter() diff --git a/crates/flashblocks-rpc/src/state.rs b/crates/flashblocks-rpc/src/state.rs index b3335062..1383ac79 100644 --- a/crates/flashblocks-rpc/src/state.rs +++ b/crates/flashblocks-rpc/src/state.rs @@ -33,6 +33,7 @@ use reth_optimism_primitives::{DepositReceipt, OpBlock, OpPrimitives}; use reth_optimism_rpc::OpReceiptBuilder; use reth_primitives::RecoveredBlock; use reth_rpc_convert::transaction::ConvertReceiptInput; +use revm_database::states::bundle_state::BundleRetention; use tokio::sync::{ Mutex, broadcast::{self, Sender}, @@ -375,12 +376,26 @@ where let state_provider = self.client.state_by_block_number_or_tag(BlockNumberOrTag::Number(canonical_block))?; let state_provider_db = StateProviderDatabase::new(state_provider); - let state = State::builder().with_database(state_provider_db).with_bundle_update().build(); let mut pending_blocks_builder = PendingBlocksBuilder::new(); + // Cache reads across flashblocks, accumulating caches from previous + // pending blocks if available + let cache_db = match &prev_pending_blocks { + Some(pending_blocks) => { + CacheDB { cache: pending_blocks.get_db_cache(), db: state_provider_db } + } + None => CacheDB::new(state_provider_db), + }; + + // Track state changes across flashblocks, accumulating bundle state + // from previous pending blocks if available let mut db = match &prev_pending_blocks { - Some(pending_blocks) => CacheDB { cache: pending_blocks.get_db_cache(), db: state }, - None => CacheDB::new(state), + Some(pending_blocks) => State::builder() + .with_database(cache_db) + .with_bundle_update() + .with_bundle_prestate(pending_blocks.get_bundle_state()) + .build(), + None => State::builder().with_database(cache_db).with_bundle_update().build(), }; let mut state_overrides = match &prev_pending_blocks { @@ -620,7 +635,9 @@ where last_block_header = block.header.clone(); } - pending_blocks_builder.with_db_cache(db.cache); + db.merge_transitions(BundleRetention::Reverts); + pending_blocks_builder.with_bundle_state(db.take_bundle()); + pending_blocks_builder.with_db_cache(db.database.cache); pending_blocks_builder.with_state_overrides(state_overrides); Ok(Some(Arc::new(pending_blocks_builder.build()?))) } diff --git a/crates/flashblocks-rpc/tests/layering.rs b/crates/flashblocks-rpc/tests/layering.rs new file mode 100644 index 00000000..049198d3 --- /dev/null +++ b/crates/flashblocks-rpc/tests/layering.rs @@ -0,0 +1,410 @@ +// ============================================================================= +// Database Layering Tests +// +// These tests verify that the three-layer state architecture is correct: +// 1. StateProviderDatabase (canonical block base state) +// 2. CacheDB (applies flashblock pending read cache) +// 3. State wrapper (applies bundle_prestate for pending writes + tracks new changes) +// +// The correct layering is State>. Writes go +// through State first (properly tracked for bundle/state root calculation), +// and reads fall through to CacheDB (flashblock read cache) then to +// StateProviderDatabase (canonical state). +// +// The WRONG layering would be CacheDB>. CacheDB intercepts all +// writes into its internal cache and doesn't propagate them to State, so +// State's bundle tracking captures nothing. See the test +// layering_cachedb_wrapping_state_loses_writes for a demonstration. +// ============================================================================= + +use std::sync::Arc; + +use alloy_consensus::crypto::secp256k1::public_key_to_address; +use alloy_genesis::GenesisAccount; +use alloy_primitives::{Address, B256, U256}; +use eyre::Context; +use rand::{SeedableRng, rngs::StdRng}; +use reth::{ + api::NodeTypesWithDBAdapter, + revm::db::{AccountState, BundleState, Cache, CacheDB, DbAccount, State}, +}; +use reth_db::{ + ClientVersion, DatabaseEnv, init_db, + mdbx::{DatabaseArguments, KILOBYTE, MEGABYTE, MaxReadTransactionDuration}, + test_utils::{ERROR_DB_CREATION, TempDatabase, create_test_static_files_dir, tempdir_path}, +}; +use reth_optimism_chainspec::{BASE_MAINNET, OpChainSpec, OpChainSpecBuilder}; +use reth_optimism_node::OpNode; +use reth_primitives_traits::SealedHeader; +use reth_provider::{ + HeaderProvider, ProviderFactory, StateProviderFactory, + providers::{BlockchainProvider, StaticFileProvider}, +}; +use reth_testing_utils::generators::generate_keys; +use revm::{Database, primitives::KECCAK_EMPTY}; + +type NodeTypes = NodeTypesWithDBAdapter>>; + +#[derive(Eq, PartialEq, Debug, Hash, Clone, Copy)] +enum User { + Alice, + Bob, +} + +#[derive(Debug, Clone)] +struct TestHarness { + provider: BlockchainProvider, + header: SealedHeader, + #[allow(dead_code)] + chain_spec: Arc, + user_to_address: std::collections::HashMap, + #[allow(dead_code)] + user_to_private_key: std::collections::HashMap, +} + +impl TestHarness { + fn address(&self, u: User) -> Address { + self.user_to_address[&u] + } +} + +fn create_chain_spec( + seed: u64, +) -> ( + Arc, + std::collections::HashMap, + std::collections::HashMap, +) { + let keys = generate_keys(&mut StdRng::seed_from_u64(seed), 2); + + let mut addresses = std::collections::HashMap::new(); + let mut private_keys = std::collections::HashMap::new(); + + let alice_key = keys[0]; + let alice_address = public_key_to_address(alice_key.public_key()); + let alice_secret = B256::from(alice_key.secret_bytes()); + addresses.insert(User::Alice, alice_address); + private_keys.insert(User::Alice, alice_secret); + + let bob_key = keys[1]; + let bob_address = public_key_to_address(bob_key.public_key()); + let bob_secret = B256::from(bob_key.secret_bytes()); + addresses.insert(User::Bob, bob_address); + private_keys.insert(User::Bob, bob_secret); + + let genesis = BASE_MAINNET + .genesis + .clone() + .extend_accounts(vec![ + (alice_address, GenesisAccount::default().with_balance(U256::from(1_000_000_000_u64))), + (bob_address, GenesisAccount::default().with_balance(U256::from(1_000_000_000_u64))), + ]) + .with_gas_limit(100_000_000); + + let spec = + Arc::new(OpChainSpecBuilder::base_mainnet().genesis(genesis).isthmus_activated().build()); + + (spec, addresses, private_keys) +} + +fn create_provider_factory( + chain_spec: Arc, +) -> ProviderFactory>>> { + let (static_dir, _) = create_test_static_files_dir(); + let db = create_test_db(); + ProviderFactory::new( + db, + chain_spec, + StaticFileProvider::read_write(static_dir.keep()).expect("static file provider"), + ) +} + +fn create_test_db() -> Arc> { + let path = tempdir_path(); + let emsg = format!("{ERROR_DB_CREATION}: {path:?}"); + + let db = init_db( + &path, + DatabaseArguments::new(ClientVersion::default()) + .with_max_read_transaction_duration(Some(MaxReadTransactionDuration::Unbounded)) + .with_geometry_max_size(Some(4 * MEGABYTE)) + .with_growth_step(Some(4 * KILOBYTE)), + ) + .expect(&emsg); + + Arc::new(TempDatabase::new(db, path)) +} + +fn setup_harness() -> eyre::Result { + let (chain_spec, user_to_address, user_to_private_key) = create_chain_spec(1337); + let factory = create_provider_factory(chain_spec.clone()); + + reth_db_common::init::init_genesis(&factory).context("initializing genesis state")?; + + let provider = BlockchainProvider::new(factory.clone()).context("creating provider")?; + let header = provider + .sealed_header(0) + .context("fetching genesis header")? + .expect("genesis header exists"); + + Ok(TestHarness { provider, header, chain_spec, user_to_address, user_to_private_key }) +} + +/// Demonstrates that State alone cannot see pending state. +/// +/// Without CacheDB or bundle_prestate, State can only see canonical block state. +#[test] +fn layering_old_state_only_cannot_see_pending_state() -> eyre::Result<()> { + let harness = setup_harness()?; + let alice_address = harness.address(User::Alice); + + let state_provider = harness + .provider + .state_by_block_hash(harness.header.hash()) + .context("getting state provider")?; + + // OLD implementation: just State wrapping StateProviderDatabase directly + // No CacheDB, no bundle_prestate - cannot see any pending flashblock state + let state_db = reth::revm::database::StateProviderDatabase::new(state_provider); + let mut db = State::builder().with_database(state_db).with_bundle_update().build(); + + // Read through State - can only see canonical state (nonce 0) + let account = db.basic(alice_address)?.expect("account should exist"); + + // Old implementation sees canonical nonce (0), not any pending state + assert_eq!(account.nonce, 0, "Old State-only layering can only see canonical state"); + + Ok(()) +} + +/// Demonstrates that CacheDB> is the WRONG layering order. +/// +/// CacheDB is a read-through cache. When the EVM writes state changes, those +/// writes go into CacheDB's internal cache and are NOT propagated to the +/// underlying State. As a result, State's bundle tracking captures nothing, +/// and take_bundle() returns an empty bundle - breaking state root calculation. +/// +/// The correct layering is State> where State wraps CacheDB, +/// so all writes go through State first and are properly tracked. +#[test] +fn layering_cachedb_wrapping_state_loses_writes() -> eyre::Result<()> { + use revm::DatabaseCommit; + + let harness = setup_harness()?; + let alice_address = harness.address(User::Alice); + let bob_address = harness.address(User::Bob); + + // ========================================================================= + // WRONG layering: CacheDB> + // ========================================================================= + let state_provider = harness + .provider + .state_by_block_hash(harness.header.hash()) + .context("getting state provider")?; + + let state_db = reth::revm::database::StateProviderDatabase::new(state_provider); + let state = State::builder().with_database(state_db).with_bundle_update().build(); + let mut wrong_db = CacheDB::new(state); + + // Simulate a write: modify Alice's account through the CacheDB + let mut alice_account = wrong_db.basic(alice_address)?.expect("alice should exist"); + alice_account.nonce = 999; + alice_account.balance = U256::from(12345u64); + + // Commit the change through CacheDB + let mut state_changes = revm::state::EvmState::default(); + state_changes.insert( + alice_address, + revm::state::Account { + info: alice_account.clone(), + storage: Default::default(), + status: revm::state::AccountStatus::Touched, + transaction_id: 0, + }, + ); + wrong_db.commit(state_changes); + + // The write went into CacheDB's cache - verify we can read it back + let alice_from_cache = wrong_db.basic(alice_address)?.expect("alice should exist"); + assert_eq!(alice_from_cache.nonce, 999, "CacheDB should have the written nonce"); + + // BUT: The underlying State captured NOTHING! + // CacheDB doesn't propagate writes to its underlying database. + wrong_db.db.merge_transitions(revm_database::states::bundle_state::BundleRetention::Reverts); + let wrong_bundle = wrong_db.db.take_bundle(); + + assert!( + wrong_bundle.state.is_empty(), + "WRONG layering: State's bundle should be EMPTY because CacheDB intercepted all writes. \ + Got {} accounts in bundle.", + wrong_bundle.state.len() + ); + + // ========================================================================= + // CORRECT layering: State> + // ========================================================================= + let state_provider2 = harness + .provider + .state_by_block_hash(harness.header.hash()) + .context("getting state provider")?; + + let state_db2 = reth::revm::database::StateProviderDatabase::new(state_provider2); + let cache_db = CacheDB::new(state_db2); + let mut correct_db = State::builder().with_database(cache_db).with_bundle_update().build(); + + // Simulate the same write through State + let mut bob_account = correct_db.basic(bob_address)?.expect("bob should exist"); + bob_account.nonce = 888; + bob_account.balance = U256::from(54321u64); + + // Commit through State + let mut state_changes2 = revm::state::EvmState::default(); + state_changes2.insert( + bob_address, + revm::state::Account { + info: bob_account.clone(), + storage: Default::default(), + status: revm::state::AccountStatus::Touched, + transaction_id: 0, + }, + ); + correct_db.commit(state_changes2); + + // State properly captures the write + correct_db.merge_transitions(revm_database::states::bundle_state::BundleRetention::Reverts); + let correct_bundle = correct_db.take_bundle(); + + assert!( + !correct_bundle.state.is_empty(), + "CORRECT layering: State's bundle should contain the written account" + ); + assert!( + correct_bundle.state.contains_key(&bob_address), + "Bundle should contain Bob's account changes" + ); + + Ok(()) +} + +/// Verifies that CacheDB layer is required for pending balance visibility. +/// +/// This test demonstrates that without the CacheDB layer, pending balance +/// changes from flashblocks would not be visible to bundle execution. +#[test] +fn layering_cachedb_makes_pending_balance_visible() -> eyre::Result<()> { + let harness = setup_harness()?; + + // Get the canonical balance for Alice from the state provider + let state_provider = harness + .provider + .state_by_block_hash(harness.header.hash()) + .context("getting state provider")?; + + let alice_address = harness.address(User::Alice); + let canonical_balance = state_provider.account_balance(&alice_address)?.unwrap_or(U256::ZERO); + + // Create a cache with a modified balance (simulating a pending flashblock) + let pending_balance = canonical_balance + U256::from(999_999_u64); + let mut cache = Cache::default(); + cache.accounts.insert( + alice_address, + DbAccount { + info: revm::state::AccountInfo { + balance: pending_balance, + nonce: 0, + code_hash: KECCAK_EMPTY, + code: None, + }, + account_state: AccountState::Touched, + storage: Default::default(), + }, + ); + + // Wrap with CacheDB to apply the pending cache + let state_db = reth::revm::database::StateProviderDatabase::new(state_provider); + let mut cache_db = CacheDB { cache, db: state_db }; + + // Read the balance through CacheDB - should see pending value + let account = cache_db.basic(alice_address)?.expect("account should exist"); + assert_eq!( + account.balance, pending_balance, + "CacheDB should return pending balance from cache" + ); + + // Verify the canonical state still has the original balance + let state_provider2 = harness + .provider + .state_by_block_hash(harness.header.hash()) + .context("getting state provider")?; + let canonical_balance2 = state_provider2.account_balance(&alice_address)?.unwrap_or(U256::ZERO); + assert_eq!(canonical_balance, canonical_balance2, "Canonical state should be unchanged"); + + Ok(()) +} + +/// Verifies that bundle_prestate is required for pending state changes visibility. +/// +/// This test demonstrates that without with_bundle_prestate(), the State wrapper +/// would start with empty state and not see pending flashblock state changes. +#[test] +fn layering_bundle_prestate_makes_pending_nonce_visible() -> eyre::Result<()> { + let harness = setup_harness()?; + let alice_address = harness.address(User::Alice); + + let state_provider = harness + .provider + .state_by_block_hash(harness.header.hash()) + .context("getting state provider")?; + + // Create a BundleState with a modified nonce (simulating pending flashblock writes) + let pending_nonce = 42u64; + let bundle_state = BundleState::new( + [( + alice_address, + Some(revm::state::AccountInfo { + balance: U256::from(1_000_000_000u64), + nonce: 0, // original + code_hash: KECCAK_EMPTY, + code: None, + }), + Some(revm::state::AccountInfo { + balance: U256::from(1_000_000_000u64), + nonce: pending_nonce, // pending + code_hash: KECCAK_EMPTY, + code: None, + }), + Default::default(), + )], + Vec::>, Vec<(U256, U256)>)>>::new(), + Vec::<(B256, revm::bytecode::Bytecode)>::new(), + ); + + // Create the three-layer stack WITH bundle_prestate + let state_db = reth::revm::database::StateProviderDatabase::new(state_provider); + let cache_db = CacheDB::new(state_db); + let mut db_with_prestate = State::builder() + .with_database(cache_db) + .with_bundle_update() + .with_bundle_prestate(bundle_state.clone()) + .build(); + + // Read through the State - should see pending nonce from bundle_prestate + let account = db_with_prestate.basic(alice_address)?.expect("account should exist"); + assert_eq!(account.nonce, pending_nonce, "State with bundle_prestate should see pending nonce"); + + // Now create WITHOUT bundle_prestate to show the difference + let state_provider2 = harness + .provider + .state_by_block_hash(harness.header.hash()) + .context("getting state provider")?; + let state_db2 = reth::revm::database::StateProviderDatabase::new(state_provider2); + let cache_db2 = CacheDB::new(state_db2); + let mut db_without_prestate = + State::builder().with_database(cache_db2).with_bundle_update().build(); + + // Read through State without prestate - should see canonical nonce (0) + let account2 = db_without_prestate.basic(alice_address)?.expect("account should exist"); + assert_eq!(account2.nonce, 0, "State without bundle_prestate should see canonical nonce"); + + Ok(()) +} diff --git a/crates/flashblocks-rpc/tests/state.rs b/crates/flashblocks-rpc/tests/state.rs index 5c93c1e8..456d3b7a 100644 --- a/crates/flashblocks-rpc/tests/state.rs +++ b/crates/flashblocks-rpc/tests/state.rs @@ -838,6 +838,131 @@ async fn test_duplicate_flashblock_ignored() { assert_eq!(block, block_two); } +/// Verifies that eth_call targeting pending block sees flashblock state changes. +/// +/// This test catches database layering bugs where pending state from flashblocks +/// isn't visible to RPC callers. After a flashblock transfers ETH to Bob, an +/// eth_call simulating a transfer FROM Bob should succeed because Bob now has +/// more funds from the flashblock. +#[tokio::test] +async fn test_eth_call_sees_flashblock_state_changes() { + use alloy_eips::BlockNumberOrTag; + use alloy_provider::Provider; + use alloy_rpc_types_eth::TransactionInput; + use op_alloy_rpc_types::OpTransactionRequest; + + let test = TestHarness::new().await; + let provider = test.node.provider(); + + let bob_address = test.address(User::Bob); + let charlie_address = test.address(User::Charlie); + + // Get Bob's canonical balance to calculate a transfer amount that exceeds it + let canonical_balance = provider.get_balance(bob_address).await.unwrap(); + + // Send base flashblock + test.send_flashblock(FlashblockBuilder::new_base(&test).build()).await; + + // Flashblock 1: Alice sends a large amount to Bob + let transfer_to_bob = 1_000_000_000_000_000_000u128; // 1 ETH + let tx = + test.build_transaction_to_send_eth_with_nonce(User::Alice, User::Bob, transfer_to_bob, 0); + test.send_flashblock(FlashblockBuilder::new(&test, 1).with_transactions(vec![tx]).build()) + .await; + + // Verify via state overrides that Bob received the funds + let overrides = test + .flashblocks + .get_pending_blocks() + .get_state_overrides() + .expect("state overrides should exist after flashblock execution"); + let bob_override = overrides.get(&bob_address).expect("Bob should have a state override"); + let bob_pending_balance = bob_override.balance.expect("Bob's balance override should be set"); + assert_eq!( + bob_pending_balance, + canonical_balance + U256::from(transfer_to_bob), + "State override should show Bob's increased balance" + ); + + // Now the key test: eth_call from Bob should see this pending balance. + // Try to transfer more than Bob's canonical balance (but less than pending). + // This would fail if eth_call can't see the pending state. + let transfer_amount = canonical_balance + U256::from(100_000u64); + let call_request = OpTransactionRequest::default() + .from(bob_address) + .to(charlie_address) + .value(transfer_amount) + .gas_limit(21_000) + .input(TransactionInput::default()); + + let result = provider.call(call_request).block(BlockNumberOrTag::Pending.into()).await; + assert!( + result.is_ok(), + "eth_call from Bob should succeed because pending state shows increased balance. \ + If this fails, eth_call may not be seeing flashblock state changes. Error: {:?}", + result.err() + ); +} + +/// Verifies that transactions in flashblock N+1 can see state changes from flashblock N. +/// +/// This test catches database layering bugs where writes from earlier flashblocks +/// aren't visible to later flashblock execution. The key is that flashblock 2's +/// transaction uses nonce=1, which only succeeds if the execution layer sees +/// flashblock 1's transaction (which used nonce=0). +#[tokio::test] +async fn test_sequential_nonces_across_flashblocks() { + let test = TestHarness::new().await; + + // Send base flashblock + test.send_flashblock(FlashblockBuilder::new_base(&test).build()).await; + + // Flashblock 1: Alice sends to Bob with nonce 0 + let tx_nonce_0 = test.build_transaction_to_send_eth_with_nonce(User::Alice, User::Bob, 1000, 0); + test.send_flashblock( + FlashblockBuilder::new(&test, 1).with_transactions(vec![tx_nonce_0]).build(), + ) + .await; + + // Verify flashblock 1 was processed - Alice's pending nonce should now be 1 + let alice_state = test.account_state(User::Alice); + assert_eq!(alice_state.nonce, 1, "After flashblock 1, Alice's pending nonce should be 1"); + + // Flashblock 2: Alice sends to Charlie with nonce 1 + // This will FAIL if the execution layer can't see flashblock 1's state change + let tx_nonce_1 = + test.build_transaction_to_send_eth_with_nonce(User::Alice, User::Charlie, 2000, 1); + test.send_flashblock( + FlashblockBuilder::new(&test, 2).with_transactions(vec![tx_nonce_1]).build(), + ) + .await; + + // Verify flashblock 2 was processed - Alice's pending nonce should now be 2 + let alice_state_after = test.account_state(User::Alice); + assert_eq!( + alice_state_after.nonce, 2, + "After flashblock 2, Alice's pending nonce should be 2. \ + If this fails, the database layering may be preventing flashblock 2 \ + from seeing flashblock 1's state changes." + ); + + // Also verify Bob and Charlie received their funds + let overrides = test + .flashblocks + .get_pending_blocks() + .get_state_overrides() + .expect("state overrides should exist"); + + assert!( + overrides.get(&test.address(User::Bob)).is_some(), + "Bob should have received funds from flashblock 1" + ); + assert!( + overrides.get(&test.address(User::Charlie)).is_some(), + "Charlie should have received funds from flashblock 2" + ); +} + #[tokio::test] async fn test_progress_canonical_blocks_without_flashblocks() { let mut test = TestHarness::new().await; diff --git a/crates/metering/Cargo.toml b/crates/metering/Cargo.toml index 27e9590e..d1d10b08 100644 --- a/crates/metering/Cargo.toml +++ b/crates/metering/Cargo.toml @@ -26,17 +26,25 @@ reth-optimism-chainspec.workspace = true reth-optimism-primitives.workspace = true reth-transaction-pool.workspace = true reth-optimism-cli.workspace = true # Enables serde & codec traits for OpReceipt/OpTxEnvelope +reth-trie-common.workspace = true # alloy alloy-primitives.workspace = true alloy-consensus.workspace = true alloy-eips.workspace = true +alloy-sol-types.workspace = true +alloy-rpc-types-eth.workspace = true # op-alloy op-alloy-consensus.workspace = true +op-alloy-rpc-types.workspace = true + +# base +base-reth-flashblocks-rpc = { path = "../flashblocks-rpc" } # revm revm.workspace = true +revm-database.workspace = true # rpc jsonrpsee.workspace = true @@ -45,6 +53,7 @@ jsonrpsee.workspace = true tracing.workspace = true serde.workspace = true eyre.workspace = true +arc-swap.workspace = true [dev-dependencies] alloy-genesis.workspace = true @@ -59,5 +68,8 @@ reth-tracing.workspace = true reth-transaction-pool = { workspace = true, features = ["test-utils"] } serde_json.workspace = true tokio.workspace = true -base-reth-test-utils.workspace = true - +base-reth-test-utils = { path = "../test-utils" } +alloy-rpc-types-engine.workspace = true +rollup-boost.workspace = true +alloy-provider.workspace = true +hex-literal = "0.4" diff --git a/crates/metering/README.md b/crates/metering/README.md index 1cc88343..eefc12f3 100644 --- a/crates/metering/README.md +++ b/crates/metering/README.md @@ -11,7 +11,7 @@ Simulates a bundle of transactions, providing gas usage and execution time metri The method accepts a Bundle object with the following fields: - `txs`: Array of signed, RLP-encoded transactions (hex strings with 0x prefix) -- `block_number`: Target block number for bundle validity (note: simulation always uses the latest available block state) +- `block_number`: Target block number for bundle validity (note: simulation uses pending flashblocks state when available, otherwise latest canonical block) - `min_timestamp` (optional): Minimum timestamp for bundle validity (also used as simulation timestamp if provided) - `max_timestamp` (optional): Maximum timestamp for bundle validity - `reverting_tx_hashes` (optional): Array of transaction hashes allowed to revert @@ -26,7 +26,7 @@ The method accepts a Bundle object with the following fields: - `coinbaseDiff`: Total gas fees paid - `ethSentToCoinbase`: ETH sent directly to coinbase - `gasFees`: Total gas fees -- `stateBlockNumber`: Block number used for state (always the latest available block) +- `stateBlockNumber`: Block number used for state (latest flashblock if pending flashblocks exist, otherwise latest canonical block) - `totalGasUsed`: Total gas consumed - `totalExecutionTimeUs`: Total execution time (μs) - `results`: Array of per-transaction results: diff --git a/crates/metering/src/flashblock_trie_cache.rs b/crates/metering/src/flashblock_trie_cache.rs new file mode 100644 index 00000000..f1322b40 --- /dev/null +++ b/crates/metering/src/flashblock_trie_cache.rs @@ -0,0 +1,92 @@ +use std::sync::Arc; + +use alloy_primitives::B256; +use arc_swap::ArcSwap; +use eyre::Result as EyreResult; +use reth_provider::StateProvider; +use reth_trie_common::{HashedPostState, updates::TrieUpdates}; + +use crate::FlashblocksState; + +/// Trie nodes and hashed state from computing a flashblock state root. +/// +/// When metering bundles, we want each state root calculation to measure only +/// the bundle's incremental I/O, not I/O from previous flashblocks. By caching +/// the flashblock trie once and reusing it for all bundle simulations, we ensure +/// each bundle's state root time reflects only its own I/O cost. +#[derive(Debug, Clone)] +pub struct FlashblockTrieData { + pub trie_updates: TrieUpdates, + pub hashed_state: HashedPostState, +} + +/// Internal cache entry for a single flashblock. +#[derive(Debug, Clone)] +struct CachedFlashblockTrie { + block_hash: B256, + flashblock_index: u64, + trie_data: FlashblockTrieData, +} + +/// Thread-safe single-entry cache for the latest flashblock's trie nodes. +/// +/// This cache stores the intermediate trie nodes computed when calculating +/// the latest flashblock's state root. Subsequent bundle metering operations +/// on the same flashblock can reuse these cached nodes instead of recalculating +/// them, significantly improving performance. +/// +/// **Important**: This cache holds only ONE flashblock's trie at a time. +/// When a new flashblock is cached, it replaces any previously cached flashblock. +#[derive(Debug, Clone)] +pub struct FlashblockTrieCache { + cache: Arc>>, +} + +impl FlashblockTrieCache { + /// Creates a new empty flashblock trie cache. + pub fn new() -> Self { + Self { cache: Arc::new(ArcSwap::from_pointee(None)) } + } + + /// Ensures the trie for the given flashblock is cached and returns it. + /// + /// If the cache already contains an entry for the provided `block_hash` and + /// `flashblock_index`, the cached data is returned immediately. Otherwise the trie is + /// recomputed, cached (replacing any previous entry), and returned. + pub fn ensure_cached( + &self, + block_hash: B256, + flashblock_index: u64, + flashblocks_state: &FlashblocksState, + canonical_state_provider: &dyn StateProvider, + ) -> EyreResult { + let cached_entry = self.cache.load(); + if let Some(cached) = cached_entry.as_ref() { + if cached.block_hash == block_hash && cached.flashblock_index == flashblock_index { + return Ok(cached.trie_data.clone()); + } + } + + let hashed_state = + canonical_state_provider.hashed_post_state(&flashblocks_state.bundle_state); + let (_state_root, trie_updates) = + canonical_state_provider.state_root_with_updates(hashed_state.clone())?; + + let trie_data = FlashblockTrieData { trie_updates, hashed_state }; + + // Store the new entry, replacing any previous flashblock's cached trie + self.cache.store(Arc::new(Some(CachedFlashblockTrie { + block_hash, + flashblock_index, + trie_data: trie_data.clone(), + }))); + + Ok(trie_data) + } +} + +impl Default for FlashblockTrieCache { + fn default() -> Self { + Self::new() + } +} diff --git a/crates/metering/src/lib.rs b/crates/metering/src/lib.rs index 138a3c7d..8d1d65bd 100644 --- a/crates/metering/src/lib.rs +++ b/crates/metering/src/lib.rs @@ -1,8 +1,10 @@ +mod flashblock_trie_cache; mod meter; mod rpc; #[cfg(test)] mod tests; -pub use meter::meter_bundle; +pub use flashblock_trie_cache::{FlashblockTrieCache, FlashblockTrieData}; +pub use meter::{FlashblocksState, MeterBundleOutput, meter_bundle}; pub use rpc::{MeteringApiImpl, MeteringApiServer}; pub use tips_core::types::{Bundle, MeterBundleResponse, TransactionResult}; diff --git a/crates/metering/src/meter.rs b/crates/metering/src/meter.rs index 577b8ee3..33c68047 100644 --- a/crates/metering/src/meter.rs +++ b/crates/metering/src/meter.rs @@ -3,43 +3,102 @@ use std::{sync::Arc, time::Instant}; use alloy_consensus::{BlockHeader, Transaction as _, transaction::SignerRecoverable}; use alloy_primitives::{B256, U256}; use eyre::{Result as EyreResult, eyre}; -use reth::revm::db::State; +use reth::revm::db::{BundleState, Cache, CacheDB, State}; use reth_evm::{ConfigureEvm, execute::BlockBuilder}; use reth_optimism_chainspec::OpChainSpec; use reth_optimism_evm::{OpEvmConfig, OpNextBlockEnvAttributes}; use reth_primitives_traits::SealedHeader; +use reth_trie_common::TrieInput; +use revm_database::states::bundle_state::BundleRetention; use tips_core::types::{BundleExtensions, BundleTxs, ParsedBundle}; -use crate::TransactionResult; +use crate::{FlashblockTrieData, TransactionResult}; + +/// State from pending flashblocks that is used as a base for metering +#[derive(Debug, Clone)] +pub struct FlashblocksState { + /// The cache of account and storage data + pub cache: Cache, + /// The accumulated bundle of state changes + pub bundle_state: BundleState, +} const BLOCK_TIME: u64 = 2; // 2 seconds per block +/// Output from metering a bundle of transactions +#[derive(Debug)] +pub struct MeterBundleOutput { + /// Transaction results with individual metrics + pub results: Vec, + /// Total gas used by all transactions + pub total_gas_used: u64, + /// Total gas fees paid by all transactions + pub total_gas_fees: U256, + /// Bundle hash + pub bundle_hash: B256, + /// Total time in microseconds (includes transaction execution and state root calculation) + pub total_time_us: u128, + /// State root calculation time in microseconds + pub state_root_time_us: u128, +} + /// Simulates and meters a bundle of transactions /// -/// Takes a state provider, chain spec, decoded transactions, block header, and bundle metadata, -/// and executes transactions in sequence to measure gas usage and execution time. +/// Takes a state provider, chain spec, parsed bundle, block header, and optional flashblocks state, +/// then executes transactions in sequence to measure gas usage and execution time. /// -/// Returns a tuple of: -/// - Vector of transaction results -/// - Total gas used -/// - Total gas fees paid -/// - Bundle hash -/// - Total execution time in microseconds +/// Returns [`MeterBundleOutput`] containing transaction results and aggregated metrics. pub fn meter_bundle( state_provider: SP, chain_spec: Arc, bundle: ParsedBundle, header: &SealedHeader, -) -> EyreResult<(Vec, u64, U256, B256, u128)> + flashblocks_state: Option, + cached_flashblock_trie: Option, +) -> EyreResult where SP: reth_provider::StateProvider, { // Get bundle hash let bundle_hash = bundle.bundle_hash(); + // Get flashblock trie data before starting timers. This ensures we only measure + // the bundle's incremental I/O cost, not I/O from previous flashblocks. + let flashblock_trie_data = cached_flashblock_trie + .map(Ok::<_, eyre::Report>) + .or_else(|| { + flashblocks_state.as_ref().map(|fb_state| { + let fb_hashed_state = state_provider.hashed_post_state(&fb_state.bundle_state); + let (_fb_state_root, fb_trie_updates) = + state_provider.state_root_with_updates(fb_hashed_state.clone())?; + Ok(crate::FlashblockTrieData { + trie_updates: fb_trie_updates, + hashed_state: fb_hashed_state, + }) + }) + }) + .transpose()?; + // Create state database let state_db = reth::revm::database::StateProviderDatabase::new(state_provider); - let mut db = State::builder().with_database(state_db).with_bundle_update().build(); + + // Apply flashblocks read cache if available + let cache_db = if let Some(ref flashblocks) = flashblocks_state { + CacheDB { cache: flashblocks.cache.clone(), db: state_db } + } else { + CacheDB::new(state_db) + }; + + // Track bundle state changes. If metering using flashblocks state, include its bundle prestate. + let mut db = if let Some(flashblocks) = flashblocks_state.as_ref() { + State::builder() + .with_database(cache_db) + .with_bundle_update() + .with_bundle_prestate(flashblocks.bundle_state.clone()) + .build() + } else { + State::builder().with_database(cache_db).with_bundle_update().build() + }; // Set up next block attributes // Use bundle.min_timestamp if provided, otherwise use header timestamp + BLOCK_TIME @@ -58,7 +117,7 @@ where let mut total_gas_used = 0u64; let mut total_gas_fees = U256::ZERO; - let execution_start = Instant::now(); + let total_start = Instant::now(); { let evm_config = OpEvmConfig::optimism(chain_spec); let mut builder = evm_config.builder_for_next_block(&mut db, header, attributes)?; @@ -85,7 +144,7 @@ where results.push(TransactionResult { coinbase_diff: gas_fees, - eth_sent_to_coinbase: U256::from(0), + eth_sent_to_coinbase: U256::ZERO, from_address: from, gas_fees, gas_price: U256::from(gas_price), @@ -97,7 +156,36 @@ where }); } } - let total_execution_time = execution_start.elapsed().as_micros(); - Ok((results, total_gas_used, total_gas_fees, bundle_hash, total_execution_time)) + // Calculate state root and measure its calculation time. The bundle already includes + // flashblocks state if it was provided via with_bundle_prestate. + db.merge_transitions(BundleRetention::Reverts); + let bundle_update = db.take_bundle(); + let state_provider = db.database.db.as_ref(); + + let state_root_start = Instant::now(); + let hashed_state = state_provider.hashed_post_state(&bundle_update); + + if let Some(fb_trie_data) = flashblock_trie_data { + // Prepend cached flashblock trie so state root calculation only performs I/O + // for this bundle's changes, not for previous flashblocks. + let mut trie_input = TrieInput::from_state(hashed_state); + trie_input.prepend_cached(fb_trie_data.trie_updates, fb_trie_data.hashed_state); + let _ = state_provider.state_root_from_nodes_with_updates(trie_input)?; + } else { + // No flashblocks, just calculate bundle state root + let _ = state_provider.state_root_with_updates(hashed_state)?; + } + + let state_root_time_us = state_root_start.elapsed().as_micros(); + let total_time_us = total_start.elapsed().as_micros(); + + Ok(MeterBundleOutput { + results, + total_gas_used, + total_gas_fees, + bundle_hash, + total_time_us, + state_root_time_us, + }) } diff --git a/crates/metering/src/rpc.rs b/crates/metering/src/rpc.rs index 19cd2eb4..4c6d40b7 100644 --- a/crates/metering/src/rpc.rs +++ b/crates/metering/src/rpc.rs @@ -1,17 +1,20 @@ -use alloy_consensus::Header; -use alloy_eips::BlockNumberOrTag; +use std::sync::Arc; + +use alloy_consensus::{Header, Sealed}; use alloy_primitives::U256; +use base_reth_flashblocks_rpc::rpc::{FlashblocksAPI, PendingBlocksAPI}; use jsonrpsee::{ core::{RpcResult, async_trait}, proc_macros::rpc, }; use reth::providers::BlockReaderIdExt; use reth_optimism_chainspec::OpChainSpec; +use reth_primitives_traits::SealedHeader; use reth_provider::{ChainSpecProvider, StateProviderFactory}; use tips_core::types::{Bundle, MeterBundleResponse, ParsedBundle}; use tracing::{error, info}; -use crate::meter_bundle; +use crate::{FlashblockTrieCache, meter_bundle}; /// RPC API for transaction metering #[rpc(server, namespace = "base")] @@ -22,25 +25,30 @@ pub trait MeteringApi { } /// Implementation of the metering RPC API -pub struct MeteringApiImpl { +pub struct MeteringApiImpl { provider: Provider, + flashblocks_state: Arc, + /// Cache for the latest flashblock's trie, ensuring each bundle's state root + /// calculation only measures the bundle's incremental I/O. + trie_cache: FlashblockTrieCache, } -impl MeteringApiImpl +impl MeteringApiImpl where Provider: StateProviderFactory + ChainSpecProvider + BlockReaderIdExt
+ Clone, + FB: FlashblocksAPI, { /// Creates a new instance of MeteringApi - pub fn new(provider: Provider) -> Self { - Self { provider } + pub fn new(provider: Provider, flashblocks_state: Arc) -> Self { + Self { provider, flashblocks_state, trie_cache: FlashblockTrieCache::new() } } } #[async_trait] -impl MeteringApiServer for MeteringApiImpl +impl MeteringApiServer for MeteringApiImpl where Provider: StateProviderFactory + ChainSpecProvider @@ -49,6 +57,7 @@ where + Send + Sync + 'static, + FB: FlashblocksAPI + Send + Sync + 'static, { async fn meter_bundle(&self, bundle: Bundle) -> RpcResult { info!( @@ -57,24 +66,56 @@ where "Starting bundle metering" ); - // Get the latest header - let header = self - .provider - .sealed_header_by_number_or_tag(BlockNumberOrTag::Latest) - .map_err(|e| { - jsonrpsee::types::ErrorObjectOwned::owned( - jsonrpsee::types::ErrorCode::InternalError.code(), - format!("Failed to get latest header: {}", e), - None::<()>, - ) - })? - .ok_or_else(|| { - jsonrpsee::types::ErrorObjectOwned::owned( - jsonrpsee::types::ErrorCode::InternalError.code(), - "Latest block not found".to_string(), - None::<()>, - ) - })?; + // Get pending flashblocks state + let pending_blocks = self.flashblocks_state.get_pending_blocks(); + + // Get header and flashblock index from pending blocks + // If no pending blocks exist, fall back to latest canonical block + let (header, flashblock_index, canonical_block_number) = + if let Some(pb) = pending_blocks.as_ref() { + let latest_header: Sealed
= pb.latest_header(); + let flashblock_index = pb.latest_flashblock_index(); + let canonical_block_number = pb.canonical_block_number(); + + info!( + latest_block = latest_header.number, + canonical_block = %canonical_block_number, + flashblock_index = flashblock_index, + "Using latest flashblock state for metering" + ); + + // Convert Sealed
to SealedHeader + let sealed_header = + SealedHeader::new(latest_header.inner().clone(), latest_header.hash()); + (sealed_header, flashblock_index, canonical_block_number) + } else { + // No pending blocks, use latest canonical block + let canonical_block_number = pending_blocks.get_canonical_block_number(); + let header = self + .provider + .sealed_header_by_number_or_tag(canonical_block_number) + .map_err(|e| { + jsonrpsee::types::ErrorObjectOwned::owned( + jsonrpsee::types::ErrorCode::InternalError.code(), + format!("Failed to get canonical block header: {}", e), + None::<()>, + ) + })? + .ok_or_else(|| { + jsonrpsee::types::ErrorObjectOwned::owned( + jsonrpsee::types::ErrorCode::InternalError.code(), + "Canonical block not found".to_string(), + None::<()>, + ) + })?; + + info!( + canonical_block = header.number, + "No flashblocks available, using canonical block state for metering" + ); + + (header, 0, canonical_block_number) + }; let parsed_bundle = ParsedBundle::try_from(bundle).map_err(|e| { jsonrpsee::types::ErrorObjectOwned::owned( @@ -84,59 +125,93 @@ where ) })?; - // Get state provider for the block - let state_provider = self.provider.state_by_block_hash(header.hash()).map_err(|e| { - error!(error = %e, "Failed to get state provider"); - jsonrpsee::types::ErrorObjectOwned::owned( - jsonrpsee::types::ErrorCode::InternalError.code(), - format!("Failed to get state provider: {}", e), - None::<()>, - ) - })?; - - // Meter bundle using utility function - let (results, total_gas_used, total_gas_fees, bundle_hash, total_execution_time) = - meter_bundle( - state_provider, - self.provider.chain_spec().clone(), - parsed_bundle, - &header, - ) - .map_err(|e| { - error!(error = %e, "Bundle metering failed"); + // Get state provider for the canonical block + let state_provider = + self.provider.state_by_block_number_or_tag(canonical_block_number).map_err(|e| { + error!(error = %e, "Failed to get state provider"); jsonrpsee::types::ErrorObjectOwned::owned( jsonrpsee::types::ErrorCode::InternalError.code(), - format!("Bundle metering failed: {}", e), + format!("Failed to get state provider: {}", e), None::<()>, ) })?; + // If we have pending flashblocks, get the state to apply pending changes + let flashblocks_state = pending_blocks.as_ref().map(|pb| crate::FlashblocksState { + cache: pb.get_db_cache(), + bundle_state: pb.get_bundle_state(), + }); + + // Get the flashblock index if we have pending flashblocks + let state_flashblock_index = pending_blocks.as_ref().map(|pb| pb.latest_flashblock_index()); + + // Ensure the flashblock trie is cached for reuse across bundle simulations + let cached_trie = if let Some(ref fb_state) = flashblocks_state { + let fb_index = state_flashblock_index.unwrap(); + Some( + self.trie_cache + .ensure_cached(header.hash(), fb_index, fb_state, &*state_provider) + .map_err(|e| { + error!(error = %e, "Failed to cache flashblock trie"); + jsonrpsee::types::ErrorObjectOwned::owned( + jsonrpsee::types::ErrorCode::InternalError.code(), + format!("Failed to cache flashblock trie: {}", e), + None::<()>, + ) + })?, + ) + } else { + None + }; + + // Meter bundle using utility function + let result = meter_bundle( + state_provider, + self.provider.chain_spec().clone(), + parsed_bundle, + &header, + flashblocks_state, + cached_trie, + ) + .map_err(|e| { + error!(error = %e, "Bundle metering failed"); + jsonrpsee::types::ErrorObjectOwned::owned( + jsonrpsee::types::ErrorCode::InternalError.code(), + format!("Bundle metering failed: {}", e), + None::<()>, + ) + })?; + // Calculate average gas price - let bundle_gas_price = if total_gas_used > 0 { - total_gas_fees / U256::from(total_gas_used) + let bundle_gas_price = if result.total_gas_used > 0 { + result.total_gas_fees / U256::from(result.total_gas_used) } else { U256::from(0) }; info!( - bundle_hash = %bundle_hash, - num_transactions = results.len(), - total_gas_used = total_gas_used, - total_execution_time_us = total_execution_time, + bundle_hash = %result.bundle_hash, + num_transactions = result.results.len(), + total_gas_used = result.total_gas_used, + total_time_us = result.total_time_us, + state_block_number = header.number, + flashblock_index = flashblock_index, "Bundle metering completed successfully" ); Ok(MeterBundleResponse { bundle_gas_price, - bundle_hash, - coinbase_diff: total_gas_fees, + bundle_hash: result.bundle_hash, + coinbase_diff: result.total_gas_fees, eth_sent_to_coinbase: U256::from(0), - gas_fees: total_gas_fees, - results, + gas_fees: result.total_gas_fees, + results: result.results, state_block_number: header.number, - state_flashblock_index: None, - total_gas_used, - total_execution_time_us: total_execution_time, + state_flashblock_index, + total_gas_used: result.total_gas_used, + // TODO: reintroduce state_root_time_us once tips-core exposes it again. + // TODO: rename total_execution_time_us to total_time_us since it includes state root time + total_execution_time_us: result.total_time_us, }) } } diff --git a/crates/metering/src/tests/chain_state.rs b/crates/metering/src/tests/chain_state.rs new file mode 100644 index 00000000..3d383a8b --- /dev/null +++ b/crates/metering/src/tests/chain_state.rs @@ -0,0 +1,395 @@ +use alloy_consensus::Receipt; +use alloy_eips::{BlockNumberOrTag, Encodable2718}; +use alloy_primitives::{B256, Bytes, U256}; +use alloy_provider::Provider; +use alloy_rpc_types_eth::TransactionInput; +use alloy_sol_types::{SolCall, sol}; +use base_reth_flashblocks_rpc::rpc::FlashblocksAPI; +use base_reth_test_utils::{ + flashblocks_harness::FlashblocksHarness, node::BASE_CHAIN_ID, tracing::init_silenced_tracing, +}; +use eyre::{Result, eyre}; +use hex_literal::hex; +use op_alloy_consensus::OpTxEnvelope; +use op_alloy_rpc_types::OpTransactionRequest; +use reth::providers::HeaderProvider; +use reth_optimism_primitives::{OpReceipt, OpTransactionSigned}; +use reth_primitives::TransactionSigned; +use reth_transaction_pool::test_utils::TransactionBuilder; +use tips_core::types::Bundle; + +use super::utils::secret_from_hex; +use crate::rpc::{MeteringApiImpl, MeteringApiServer}; + +#[tokio::test] +async fn meter_bundle_simulation_reflects_pending_state() -> Result<()> { + init_silenced_tracing(); + let harness = FlashblocksHarness::new().await?; + + let provider = harness.provider(); + let alice = &harness.accounts().alice; + let alice_secret = secret_from_hex(alice.private_key); + + // Deploy the Counter contract (nonce 0) + let deploy_signed = TransactionBuilder::default() + .signer(alice_secret) + .chain_id(BASE_CHAIN_ID) + .nonce(0) + .gas_limit(DEPLOY_GAS_LIMIT) + .max_fee_per_gas(GWEI) + .max_priority_fee_per_gas(GWEI) + .input(COUNTER_CREATION_BYTECODE.to_vec()) + .into_eip1559(); + let (deploy_envelope, deploy_bytes) = envelope_from_signed(deploy_signed); + harness.build_block_from_transactions(vec![deploy_bytes]).await?; + + let deploy_receipt = provider + .get_transaction_receipt(deploy_envelope.tx_hash()) + .await? + .ok_or_else(|| eyre!("deployment transaction missing receipt"))?; + let contract_address = deploy_receipt + .inner + .contract_address + .ok_or_else(|| eyre!("deployment receipt missing contract address"))?; + + // Mutate storage on-chain via setNumber (nonce 1) + let set_call = Counter::setNumberCall { newNumber: U256::from(42u64) }; + let set_signed = TransactionBuilder::default() + .signer(alice_secret) + .chain_id(BASE_CHAIN_ID) + .nonce(1) + .gas_limit(CALL_GAS_LIMIT) + .max_fee_per_gas(GWEI) + .max_priority_fee_per_gas(GWEI) + .to(contract_address) + .input(Bytes::from(set_call.abi_encode())) + .into_eip1559(); + let (_set_envelope, set_bytes) = envelope_from_signed(set_signed); + harness.build_block_from_transactions(vec![set_bytes]).await?; + + // Meter an increment call (nonce 2) after the storage change + let increment_call = Counter::incrementCall {}; + let increment_signed = TransactionBuilder::default() + .signer(alice_secret) + .chain_id(BASE_CHAIN_ID) + .nonce(2) + .gas_limit(CALL_GAS_LIMIT) + .max_fee_per_gas(GWEI) + .max_priority_fee_per_gas(GWEI) + .to(contract_address) + .input(Bytes::from(increment_call.abi_encode())) + .into_eip1559(); + let (_increment_envelope, increment_bytes) = envelope_from_signed(increment_signed.clone()); + + let bundle = Bundle { + txs: vec![increment_bytes.clone()], + block_number: provider.get_block_number().await?, + flashblock_number_min: None, + flashblock_number_max: None, + min_timestamp: None, + max_timestamp: None, + reverting_tx_hashes: vec![], + replacement_uuid: None, + dropping_tx_hashes: vec![], + }; + + let metering_api = + MeteringApiImpl::new(harness.blockchain_provider(), harness.flashblocks_state()); + let response = MeteringApiServer::meter_bundle(&metering_api, bundle) + .await + .map_err(|err| eyre!("meter_bundle rpc failed: {}", err))?; + + assert_eq!(response.results.len(), 1); + let result = &response.results[0]; + assert_eq!(result.to_address, Some(contract_address)); + assert!(result.gas_used > 0); + assert!(response.state_flashblock_index.is_none()); + + // Confirm canonical storage remains at 42 (increment transaction only simulated) + let number_call = Counter::numberCall {}; + let call_request = OpTransactionRequest::default() + .from(alice.address) + .to(contract_address) + .input(TransactionInput::new(Bytes::from(number_call.abi_encode()))); + let raw_number = provider.call(call_request).block(BlockNumberOrTag::Latest.into()).await?; + let decoded: U256 = Counter::numberCall::abi_decode_returns(raw_number.as_ref())?; + assert_eq!(decoded, U256::from(42u64)); + + // Execute the increment on-chain to confirm the transaction is valid when mined + harness.build_block_from_transactions(vec![increment_bytes]).await?; + let number_after_increment = provider + .call( + OpTransactionRequest::default() + .from(alice.address) + .to(contract_address) + .input(TransactionInput::new(Bytes::from(number_call.abi_encode()))), + ) + .block(BlockNumberOrTag::Latest.into()) + .await?; + let decoded_after_increment: U256 = + Counter::numberCall::abi_decode_returns(number_after_increment.as_ref())?; + assert_eq!(decoded_after_increment, U256::from(43u64)); + + Ok(()) +} + +#[tokio::test] +async fn meter_bundle_errors_when_beacon_root_missing() -> Result<()> { + init_silenced_tracing(); + let harness = FlashblocksHarness::new().await?; + + let provider = harness.provider(); + let alice = &harness.accounts().alice; + let alice_secret = secret_from_hex(alice.private_key); + + // Deploy the contract so the pending flashblock can interact with storage + let deploy_signed = TransactionBuilder::default() + .signer(alice_secret) + .chain_id(BASE_CHAIN_ID) + .nonce(0) + .gas_limit(DEPLOY_GAS_LIMIT) + .max_fee_per_gas(GWEI) + .max_priority_fee_per_gas(GWEI) + .input(COUNTER_CREATION_BYTECODE.to_vec()) + .into_eip1559(); + let (deploy_envelope, deploy_bytes) = envelope_from_signed(deploy_signed); + harness.build_block_from_transactions(vec![deploy_bytes]).await?; + let contract_address = provider + .get_transaction_receipt(deploy_envelope.tx_hash()) + .await? + .ok_or_else(|| eyre!("deployment transaction missing receipt"))? + .inner + .contract_address + .ok_or_else(|| eyre!("deployment receipt missing contract address"))?; + + let blockchain_provider = harness.blockchain_provider(); + let latest_number = provider.get_block_number().await?; + let latest_header = blockchain_provider + .sealed_header(latest_number)? + .ok_or_else(|| eyre!("missing header for block {}", latest_number))?; + + let pending_block_number = latest_header.number + 1; + let flash_call = Counter::setNumberCall { newNumber: U256::from(99u64) }; + let flash_signed = TransactionBuilder::default() + .signer(alice_secret) + .chain_id(BASE_CHAIN_ID) + .nonce(1) + .gas_limit(CALL_GAS_LIMIT) + .max_fee_per_gas(GWEI) + .max_priority_fee_per_gas(GWEI) + .to(contract_address) + .input(Bytes::from(flash_call.abi_encode())) + .into_eip1559(); + let (flash_envelope, flash_bytes) = envelope_from_signed(flash_signed); + let receipt = OpReceipt::Eip1559(Receipt { + status: true.into(), + cumulative_gas_used: 80_000, + logs: vec![], + }); + + // Pending flashblock omits the beacon root, so metering will report the validation error. + let flashblock = harness.build_flashblock( + pending_block_number, + latest_header.hash(), + B256::ZERO, + latest_header.timestamp + 2, + latest_header.gas_limit, + vec![(flash_bytes.clone(), Some((flash_envelope.tx_hash(), receipt.clone())))], + ); + + harness.send_flashblock(flashblock).await?; + + let bundle = Bundle { + txs: vec![flash_bytes.clone()], + block_number: pending_block_number, + flashblock_number_min: None, + flashblock_number_max: None, + min_timestamp: None, + max_timestamp: None, + reverting_tx_hashes: vec![], + replacement_uuid: None, + dropping_tx_hashes: vec![], + }; + + let metering_api = + MeteringApiImpl::new(blockchain_provider.clone(), harness.flashblocks_state()); + let result = MeteringApiServer::meter_bundle(&metering_api, bundle).await; + let err = result.expect_err("pending flashblock metering should surface missing beacon root"); + assert!( + err.message().contains("parent beacon block root missing"), + "unexpected error: {err:?}" + ); + + let pending_blocks = harness.flashblocks_state().get_pending_blocks(); + assert!(pending_blocks.is_some(), "expected flashblock to populate pending state"); + assert_eq!(pending_blocks.as_ref().unwrap().latest_flashblock_index(), 0); + + // Pending state should reflect the storage change even though the simulation failed. + let number_call = Counter::numberCall {}; + let pending_value = provider + .call( + OpTransactionRequest::default() + .from(alice.address) + .to(contract_address) + .input(TransactionInput::new(Bytes::from(number_call.abi_encode()))), + ) + .block(BlockNumberOrTag::Pending.into()) + .await?; + let decoded_pending: U256 = Counter::numberCall::abi_decode_returns(pending_value.as_ref())?; + assert_eq!(decoded_pending, U256::from(99u64)); + + Ok(()) +} + +sol! { + contract Counter { + function setNumber(uint256 newNumber); + function increment(); + function number() view returns (uint256); + } +} + +const COUNTER_CREATION_BYTECODE: &[u8] = &hex!( + "6080604052348015600e575f5ffd5b506101e18061001c5f395ff3fe608060405234801561000f575f5ffd5b506004361061003f575f3560e01c80633fb5c1cb146100435780638381f58a1461005f578063d09de08a1461007d575b5f5ffd5b61005d600480360381019061005891906100e4565b610087565b005b610067610090565b604051610074919061011e565b60405180910390f35b610085610095565b005b805f8190555050565b5f5481565b5f5f8154809291906100a690610164565b9190505550565b5f5ffd5b5f819050919050565b6100c3816100b1565b81146100cd575f5ffd5b50565b5f813590506100de816100ba565b92915050565b5f602082840312156100f9576100f86100ad565b5b5f610106848285016100d0565b91505092915050565b610118816100b1565b82525050565b5f6020820190506101315f83018461010f565b92915050565b7f4e487b71000000000000000000000000000000000000000000000000000000005f52601160045260245ffd5b5f61016e826100b1565b91507fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff82036101a05761019f610137565b5b60018201905091905056fea26469706673582212204b710430bf5e9541dd320fc4eece1bf270f8b7d4835bba28f79ff7bd29904a2964736f6c634300081e0033" +); +const GWEI: u128 = 1_000_000_000; +const DEPLOY_GAS_LIMIT: u64 = 1_000_000; +const CALL_GAS_LIMIT: u64 = 150_000; + +fn envelope_from_signed(tx: TransactionSigned) -> (OpTxEnvelope, Bytes) { + let op_signed = OpTransactionSigned::Eip1559( + tx.as_eip1559().expect("transaction should be EIP-1559").clone(), + ); + let envelope = OpTxEnvelope::from(op_signed); + let bytes = Bytes::from(envelope.encoded_2718()); + (envelope, bytes) +} + +#[tokio::test] +async fn meter_bundle_reads_canonical_storage_without_mutation() -> Result<()> { + init_silenced_tracing(); + let harness = FlashblocksHarness::new().await?; + let alice = &harness.accounts().alice; + let alice_secret = secret_from_hex(alice.private_key); + let mut nonce = 0u64; + + let deploy_signed = TransactionBuilder::default() + .signer(alice_secret) + .chain_id(BASE_CHAIN_ID) + .nonce(nonce) + .gas_limit(DEPLOY_GAS_LIMIT) + .max_fee_per_gas(GWEI) + .max_priority_fee_per_gas(GWEI) + .input(COUNTER_CREATION_BYTECODE.to_vec()) + .into_eip1559(); + let (deploy_envelope, deploy_bytes) = envelope_from_signed(deploy_signed); + + harness.build_block_from_transactions(vec![deploy_bytes]).await?; + nonce += 1; + + let provider = harness.provider(); + let deploy_receipt = provider + .get_transaction_receipt(deploy_envelope.tx_hash()) + .await? + .ok_or_else(|| eyre!("deployment transaction missing receipt"))?; + let contract_address = deploy_receipt + .inner + .contract_address + .ok_or_else(|| eyre!("deployment receipt missing contract address"))?; + + let set_call = Counter::setNumberCall { newNumber: U256::from(42u64) }; + let set_signed = TransactionBuilder::default() + .signer(alice_secret) + .chain_id(BASE_CHAIN_ID) + .nonce(nonce) + .gas_limit(CALL_GAS_LIMIT) + .max_fee_per_gas(GWEI) + .max_priority_fee_per_gas(GWEI) + .to(contract_address) + .input(Bytes::from(set_call.abi_encode())) + .into_eip1559(); + let (_set_envelope, set_bytes) = envelope_from_signed(set_signed); + harness.build_block_from_transactions(vec![set_bytes]).await?; + nonce += 1; + + let increment_call = Counter::incrementCall {}; + let increment_signed = TransactionBuilder::default() + .signer(alice_secret) + .chain_id(BASE_CHAIN_ID) + .nonce(nonce) + .gas_limit(CALL_GAS_LIMIT) + .max_fee_per_gas(GWEI) + .max_priority_fee_per_gas(GWEI) + .to(contract_address) + .input(Bytes::from(increment_call.abi_encode())) + .into_eip1559(); + let (_increment_envelope, increment_bytes) = envelope_from_signed(increment_signed); + harness.build_block_from_transactions(vec![increment_bytes]).await?; + + nonce += 1; + + let storage_value = provider.get_storage_at(contract_address, U256::ZERO).await?; + assert_eq!(storage_value, U256::from(43u64)); + + let number_call = Counter::numberCall {}; + let call_request = OpTransactionRequest::default() + .from(alice.address) + .to(contract_address) + .input(TransactionInput::new(Bytes::from(number_call.abi_encode()))); + let raw_number = provider.call(call_request).block(BlockNumberOrTag::Latest.into()).await?; + let decoded: U256 = Counter::numberCall::abi_decode_returns(raw_number.as_ref())?; + assert_eq!(decoded, U256::from(43u64)); + + // Meter another increment (nonce 3) to ensure meter_bundle sees the persisted state. + let meter_increment_call = Counter::incrementCall {}; + let meter_increment_signed = TransactionBuilder::default() + .signer(alice_secret) + .chain_id(BASE_CHAIN_ID) + .nonce(nonce) + .gas_limit(CALL_GAS_LIMIT) + .max_fee_per_gas(GWEI) + .max_priority_fee_per_gas(GWEI) + .to(contract_address) + .input(Bytes::from(meter_increment_call.abi_encode())) + .into_eip1559(); + let (_meter_increment_envelope, meter_increment_bytes) = + envelope_from_signed(meter_increment_signed.clone()); + + let bundle = Bundle { + txs: vec![meter_increment_bytes.clone()], + block_number: provider.get_block_number().await?, + flashblock_number_min: None, + flashblock_number_max: None, + min_timestamp: None, + max_timestamp: None, + reverting_tx_hashes: vec![], + replacement_uuid: None, + dropping_tx_hashes: vec![], + }; + let metering_api = + MeteringApiImpl::new(harness.blockchain_provider(), harness.flashblocks_state()); + let response = MeteringApiServer::meter_bundle(&metering_api, bundle) + .await + .map_err(|err| eyre!("meter_bundle rpc failed: {}", err))?; + + assert_eq!(response.results.len(), 1); + let metering_result = &response.results[0]; + assert_eq!(metering_result.to_address, Some(contract_address)); + assert!(metering_result.gas_used > 0); + + // Canonical state remains unchanged by the simulation. + let raw_number_after_sim = provider + .call( + OpTransactionRequest::default() + .from(alice.address) + .to(contract_address) + .input(TransactionInput::new(Bytes::from(number_call.abi_encode()))), + ) + .block(BlockNumberOrTag::Latest.into()) + .await?; + let decoded_after_sim: U256 = + Counter::numberCall::abi_decode_returns(raw_number_after_sim.as_ref())?; + assert_eq!(decoded_after_sim, U256::from(43u64)); + + Ok(()) +} diff --git a/crates/metering/src/tests/meter.rs b/crates/metering/src/tests/meter.rs index 47c2a16a..b8655fd0 100644 --- a/crates/metering/src/tests/meter.rs +++ b/crates/metering/src/tests/meter.rs @@ -7,7 +7,11 @@ use alloy_primitives::{Address, B256, Bytes, U256, keccak256}; use eyre::Context; use op_alloy_consensus::OpTxEnvelope; use rand::{SeedableRng, rngs::StdRng}; -use reth::{api::NodeTypesWithDBAdapter, chainspec::EthChainSpec}; +use reth::{ + api::NodeTypesWithDBAdapter, + chainspec::EthChainSpec, + revm::db::{BundleState, Cache}, +}; use reth_db::{DatabaseEnv, test_utils::TempDatabase}; use reth_optimism_chainspec::{BASE_MAINNET, OpChainSpec, OpChainSpecBuilder}; use reth_optimism_node::OpNode; @@ -16,10 +20,11 @@ use reth_primitives_traits::SealedHeader; use reth_provider::{HeaderProvider, StateProviderFactory, providers::BlockchainProvider}; use reth_testing_utils::generators::generate_keys; use reth_transaction_pool::test_utils::TransactionBuilder; +use revm::primitives::KECCAK_EMPTY; use tips_core::types::{Bundle, ParsedBundle}; use super::utils::create_provider_factory; -use crate::meter_bundle; +use crate::{FlashblocksState, meter_bundle}; type NodeTypes = NodeTypesWithDBAdapter>>; @@ -135,15 +140,22 @@ fn meter_bundle_empty_transactions() -> eyre::Result<()> { let parsed_bundle = create_parsed_bundle(Vec::new())?; - let (results, total_gas_used, total_gas_fees, bundle_hash, total_execution_time) = - meter_bundle(state_provider, harness.chain_spec.clone(), parsed_bundle, &harness.header)?; - - assert!(results.is_empty()); - assert_eq!(total_gas_used, 0); - assert_eq!(total_gas_fees, U256::ZERO); + let output = meter_bundle( + state_provider, + harness.chain_spec.clone(), + parsed_bundle, + &harness.header, + None, + None, + )?; + + assert!(output.results.is_empty()); + assert_eq!(output.total_gas_used, 0); + assert_eq!(output.total_gas_fees, U256::ZERO); // Even empty bundles have some EVM setup overhead - assert!(total_execution_time > 0); - assert_eq!(bundle_hash, keccak256([])); + assert!(output.total_time_us > 0); + assert!(output.state_root_time_us > 0); + assert_eq!(output.bundle_hash, keccak256([])); Ok(()) } @@ -177,12 +189,19 @@ fn meter_bundle_single_transaction() -> eyre::Result<()> { let parsed_bundle = create_parsed_bundle(vec![envelope.clone()])?; - let (results, total_gas_used, total_gas_fees, bundle_hash, total_execution_time) = - meter_bundle(state_provider, harness.chain_spec.clone(), parsed_bundle, &harness.header)?; + let output = meter_bundle( + state_provider, + harness.chain_spec.clone(), + parsed_bundle, + &harness.header, + None, + None, + )?; - assert_eq!(results.len(), 1); - let result = &results[0]; - assert!(total_execution_time > 0); + assert_eq!(output.results.len(), 1); + let result = &output.results[0]; + assert!(output.total_time_us > 0); + assert!(output.state_root_time_us > 0); assert_eq!(result.from_address, harness.address(User::Alice)); assert_eq!(result.to_address, Some(to)); @@ -191,12 +210,12 @@ fn meter_bundle_single_transaction() -> eyre::Result<()> { assert_eq!(result.gas_used, 21_000); assert_eq!(result.coinbase_diff, (U256::from(21_000) * U256::from(10)),); - assert_eq!(total_gas_used, 21_000); - assert_eq!(total_gas_fees, U256::from(21_000) * U256::from(10)); + assert_eq!(output.total_gas_used, 21_000); + assert_eq!(output.total_gas_fees, U256::from(21_000) * U256::from(10)); let mut concatenated = Vec::with_capacity(32); concatenated.extend_from_slice(tx_hash.as_slice()); - assert_eq!(bundle_hash, keccak256(concatenated)); + assert_eq!(output.bundle_hash, keccak256(concatenated)); assert!(result.execution_time_us > 0, "execution_time_us should be greater than zero"); @@ -254,14 +273,21 @@ fn meter_bundle_multiple_transactions() -> eyre::Result<()> { let parsed_bundle = create_parsed_bundle(vec![envelope_1.clone(), envelope_2.clone()])?; - let (results, total_gas_used, total_gas_fees, bundle_hash, total_execution_time) = - meter_bundle(state_provider, harness.chain_spec.clone(), parsed_bundle, &harness.header)?; + let output = meter_bundle( + state_provider, + harness.chain_spec.clone(), + parsed_bundle, + &harness.header, + None, + None, + )?; - assert_eq!(results.len(), 2); - assert!(total_execution_time > 0); + assert_eq!(output.results.len(), 2); + assert!(output.total_time_us > 0); + assert!(output.state_root_time_us > 0); // Check first transaction - let result_1 = &results[0]; + let result_1 = &output.results[0]; assert_eq!(result_1.from_address, harness.address(User::Alice)); assert_eq!(result_1.to_address, Some(to_1)); assert_eq!(result_1.tx_hash, tx_hash_1); @@ -270,7 +296,7 @@ fn meter_bundle_multiple_transactions() -> eyre::Result<()> { assert_eq!(result_1.coinbase_diff, (U256::from(21_000) * U256::from(10)),); // Check second transaction - let result_2 = &results[1]; + let result_2 = &output.results[1]; assert_eq!(result_2.from_address, harness.address(User::Bob)); assert_eq!(result_2.to_address, Some(to_2)); assert_eq!(result_2.tx_hash, tx_hash_2); @@ -279,19 +305,166 @@ fn meter_bundle_multiple_transactions() -> eyre::Result<()> { assert_eq!(result_2.coinbase_diff, U256::from(21_000) * U256::from(15),); // Check aggregated values - assert_eq!(total_gas_used, 42_000); + assert_eq!(output.total_gas_used, 42_000); let expected_total_fees = U256::from(21_000) * U256::from(10) + U256::from(21_000) * U256::from(15); - assert_eq!(total_gas_fees, expected_total_fees); + assert_eq!(output.total_gas_fees, expected_total_fees); // Check bundle hash includes both transactions let mut concatenated = Vec::with_capacity(64); concatenated.extend_from_slice(tx_hash_1.as_slice()); concatenated.extend_from_slice(tx_hash_2.as_slice()); - assert_eq!(bundle_hash, keccak256(concatenated)); + assert_eq!(output.bundle_hash, keccak256(concatenated)); assert!(result_1.execution_time_us > 0, "execution_time_us should be greater than zero"); assert!(result_2.execution_time_us > 0, "execution_time_us should be greater than zero"); Ok(()) } + +#[test] +fn meter_bundle_state_root_time_invariant() -> eyre::Result<()> { + let harness = setup_harness()?; + + let to = Address::random(); + let signed_tx = TransactionBuilder::default() + .signer(harness.signer(User::Alice)) + .chain_id(harness.chain_spec.chain_id()) + .nonce(0) + .to(to) + .value(1_000) + .gas_limit(21_000) + .max_fee_per_gas(10) + .max_priority_fee_per_gas(1) + .into_eip1559(); + + let tx = + OpTransactionSigned::Eip1559(signed_tx.as_eip1559().expect("eip1559 transaction").clone()); + + let envelope = envelope_from_signed(&tx)?; + + let state_provider = harness + .provider + .state_by_block_hash(harness.header.hash()) + .context("getting state provider")?; + + let parsed_bundle = create_parsed_bundle(vec![envelope.clone()])?; + + let output = meter_bundle( + state_provider, + harness.chain_spec.clone(), + parsed_bundle, + &harness.header, + None, + None, + )?; + + // Verify invariant: total time must include state root time + assert!( + output.total_time_us >= output.state_root_time_us, + "total_time_us ({}) should be >= state_root_time_us ({})", + output.total_time_us, + output.state_root_time_us + ); + + // State root time should be non-zero + assert!(output.state_root_time_us > 0, "state_root_time_us should be greater than zero"); + + Ok(()) +} + +/// Integration test: verifies meter_bundle uses flashblocks state correctly. +/// +/// A transaction using nonce=1 should fail without flashblocks state (since +/// canonical nonce is 0), but succeed when flashblocks state indicates nonce=1. +#[test] +fn meter_bundle_requires_correct_layering_for_pending_nonce() -> eyre::Result<()> { + let harness = setup_harness()?; + let alice_address = harness.address(User::Alice); + + // Create a transaction that requires nonce=1 (assuming canonical nonce is 0) + let to = Address::random(); + let signed_tx = TransactionBuilder::default() + .signer(harness.signer(User::Alice)) + .chain_id(harness.chain_spec.chain_id()) + .nonce(1) // Requires pending state to have nonce=1 + .to(to) + .value(100) + .gas_limit(21_000) + .max_fee_per_gas(10) + .max_priority_fee_per_gas(1) + .into_eip1559(); + + let tx = + OpTransactionSigned::Eip1559(signed_tx.as_eip1559().expect("eip1559 transaction").clone()); + let envelope = envelope_from_signed(&tx)?; + let parsed_bundle = create_parsed_bundle(vec![envelope])?; + + // Without flashblocks state, transaction should fail (nonce mismatch) + let state_provider = harness + .provider + .state_by_block_hash(harness.header.hash()) + .context("getting state provider")?; + + let result_without_flashblocks = meter_bundle( + state_provider, + harness.chain_spec.clone(), + parsed_bundle.clone(), + &harness.header, + None, // No flashblocks state + None, + ); + + assert!( + result_without_flashblocks.is_err(), + "Transaction with nonce=1 should fail without pending state (canonical nonce is 0)" + ); + + // Now create flashblocks state with nonce=1 for Alice + // Use BundleState::new() to properly calculate state_size + let bundle_state = BundleState::new( + [( + alice_address, + Some(revm::state::AccountInfo { + balance: U256::from(1_000_000_000u64), + nonce: 0, // original + code_hash: KECCAK_EMPTY, + code: None, + }), + Some(revm::state::AccountInfo { + balance: U256::from(1_000_000_000u64), + nonce: 1, // pending (after first flashblock tx) + code_hash: KECCAK_EMPTY, + code: None, + }), + Default::default(), // no storage changes + )], + Vec::>, Vec<(U256, U256)>)>>::new(), + Vec::<(B256, revm::bytecode::Bytecode)>::new(), + ); + + let flashblocks_state = FlashblocksState { cache: Cache::default(), bundle_state }; + + // With correct flashblocks state, transaction should succeed + let state_provider2 = harness + .provider + .state_by_block_hash(harness.header.hash()) + .context("getting state provider")?; + + let result_with_flashblocks = meter_bundle( + state_provider2, + harness.chain_spec.clone(), + parsed_bundle, + &harness.header, + Some(flashblocks_state), + None, + ); + + assert!( + result_with_flashblocks.is_ok(), + "Transaction with nonce=1 should succeed with pending state showing nonce=1: {:?}", + result_with_flashblocks.err() + ); + + Ok(()) +} diff --git a/crates/metering/src/tests/mod.rs b/crates/metering/src/tests/mod.rs index 80d28813..af2a9605 100644 --- a/crates/metering/src/tests/mod.rs +++ b/crates/metering/src/tests/mod.rs @@ -1,4 +1,6 @@ #[cfg(test)] +mod chain_state; +#[cfg(test)] mod meter; #[cfg(test)] mod rpc; diff --git a/crates/metering/src/tests/rpc.rs b/crates/metering/src/tests/rpc.rs index b77f9534..46491674 100644 --- a/crates/metering/src/tests/rpc.rs +++ b/crates/metering/src/tests/rpc.rs @@ -1,426 +1,316 @@ -#[cfg(test)] -mod tests { - use std::{any::Any, net::SocketAddr, sync::Arc}; - - use alloy_eips::Encodable2718; - use alloy_genesis::Genesis; - use alloy_primitives::{Bytes, U256, address, b256, bytes}; - use alloy_rpc_client::RpcClient; - use base_reth_test_utils::tracing::init_silenced_tracing; - use op_alloy_consensus::OpTxEnvelope; - use reth::{ - args::{DiscoveryArgs, NetworkArgs, RpcServerArgs}, - builder::{Node, NodeBuilder, NodeConfig, NodeHandle}, - chainspec::Chain, - core::exit::NodeExitFuture, - tasks::TaskManager, - }; - use reth_optimism_chainspec::OpChainSpecBuilder; - use reth_optimism_node::{OpNode, args::RollupArgs}; - use reth_optimism_primitives::OpTransactionSigned; - use reth_provider::providers::BlockchainProvider; - use reth_transaction_pool::test_utils::TransactionBuilder; - use serde_json; - use tips_core::types::Bundle; - - use crate::rpc::{MeteringApiImpl, MeteringApiServer}; - - pub struct NodeContext { - http_api_addr: SocketAddr, - _node_exit_future: NodeExitFuture, - _node: Box, +use alloy_eips::Encodable2718; +use alloy_primitives::{Bytes, U256, address}; +use alloy_provider::Provider; +use base_reth_test_utils::{ + flashblocks_harness::FlashblocksHarness, + node::{BASE_CHAIN_ID, LocalFlashblocksState, LocalNodeProvider}, + tracing::init_silenced_tracing, +}; +use eyre::{Result, eyre}; +use op_alloy_consensus::OpTxEnvelope; +use reth_optimism_primitives::OpTransactionSigned; +use reth_transaction_pool::test_utils::TransactionBuilder; +use tips_core::types::Bundle; + +use super::utils::secret_from_hex; +use crate::rpc::{MeteringApiImpl, MeteringApiServer}; + +struct RpcTestContext { + harness: FlashblocksHarness, + api: MeteringApiImpl, +} + +impl RpcTestContext { + async fn new() -> Result { + let harness = FlashblocksHarness::new().await?; + let provider = harness.blockchain_provider(); + let flashblocks_state = harness.flashblocks_state(); + let api = MeteringApiImpl::new(provider, flashblocks_state); + + Ok(Self { harness, api }) } - // Helper function to create a Bundle with default fields - fn create_bundle(txs: Vec, block_number: u64, min_timestamp: Option) -> Bundle { - Bundle { - txs, - block_number, - flashblock_number_min: None, - flashblock_number_max: None, - min_timestamp, - max_timestamp: None, - reverting_tx_hashes: vec![], - replacement_uuid: None, - dropping_tx_hashes: vec![], - } + fn accounts(&self) -> &base_reth_test_utils::accounts::TestAccounts { + self.harness.accounts() } - impl NodeContext { - pub async fn rpc_client(&self) -> eyre::Result { - let url = format!("http://{}", self.http_api_addr); - let client = RpcClient::new_http(url.parse()?); - Ok(client) - } + fn harness(&self) -> &FlashblocksHarness { + &self.harness } - async fn setup_node() -> eyre::Result { - init_silenced_tracing(); - let tasks = TaskManager::current(); - let exec = tasks.executor(); - const BASE_SEPOLIA_CHAIN_ID: u64 = 84532; - - let genesis: Genesis = serde_json::from_str(include_str!("assets/genesis.json")).unwrap(); - let chain_spec = Arc::new( - OpChainSpecBuilder::base_mainnet() - .genesis(genesis) - .ecotone_activated() - .chain(Chain::from(BASE_SEPOLIA_CHAIN_ID)) - .build(), - ); - - let network_config = NetworkArgs { - discovery: DiscoveryArgs { disable_discovery: true, ..DiscoveryArgs::default() }, - ..NetworkArgs::default() - }; - - let node_config = NodeConfig::new(chain_spec.clone()) - .with_network(network_config.clone()) - .with_rpc(RpcServerArgs::default().with_unused_ports().with_http()) - .with_unused_ports(); - - let node = OpNode::new(RollupArgs::default()); - - let NodeHandle { node, node_exit_future } = NodeBuilder::new(node_config.clone()) - .testing_node(exec.clone()) - .with_types_and_provider::>() - .with_components(node.components_builder()) - .with_add_ons(node.add_ons()) - .extend_rpc_modules(move |ctx| { - let metering_api = MeteringApiImpl::new(ctx.provider().clone()); - ctx.modules.merge_configured(metering_api.into_rpc())?; - Ok(()) - }) - .launch() - .await?; - - let http_api_addr = node - .rpc_server_handle() - .http_local_addr() - .ok_or_else(|| eyre::eyre!("Failed to get http api address"))?; - - Ok(NodeContext { - http_api_addr, - _node_exit_future: node_exit_future, - _node: Box::new(node), - }) + async fn meter_bundle(&self, bundle: Bundle) -> Result { + MeteringApiServer::meter_bundle(&self.api, bundle) + .await + .map_err(|err| eyre!("meter_bundle rpc failed: {}", err)) } - #[tokio::test] - async fn test_meter_bundle_empty() -> eyre::Result<()> { - let node = setup_node().await?; - let client = node.rpc_client().await?; + async fn meter_bundle_raw( + &self, + bundle: Bundle, + ) -> jsonrpsee::core::RpcResult { + MeteringApiServer::meter_bundle(&self.api, bundle).await + } +} - let bundle = create_bundle(vec![], 0, None); +fn create_bundle(txs: Vec, block_number: u64, min_timestamp: Option) -> Bundle { + Bundle { + txs, + block_number, + flashblock_number_min: None, + flashblock_number_max: None, + min_timestamp, + max_timestamp: None, + reverting_tx_hashes: vec![], + replacement_uuid: None, + dropping_tx_hashes: vec![], + } +} - let response: crate::MeterBundleResponse = - client.request("base_meterBundle", (bundle,)).await?; +#[tokio::test] +async fn test_meter_bundle_empty() -> Result<()> { + init_silenced_tracing(); + let ctx = RpcTestContext::new().await?; - assert_eq!(response.results.len(), 0); - assert_eq!(response.total_gas_used, 0); - assert_eq!(response.gas_fees, U256::from(0)); - assert_eq!(response.state_block_number, 0); + let bundle = create_bundle(vec![], 0, None); + let response = ctx.meter_bundle(bundle).await?; - Ok(()) - } + assert_eq!(response.results.len(), 0); + assert_eq!(response.total_gas_used, 0); + assert_eq!(response.gas_fees, U256::ZERO); - #[tokio::test] - async fn test_meter_bundle_single_transaction() -> eyre::Result<()> { - let node = setup_node().await?; - let client = node.rpc_client().await?; - - // Use a funded account from genesis.json - // Account: 0xf39fd6e51aad88f6f4ce6ab8827279cfffb92266 - // Private key from common test accounts (Hardhat account #0) - let sender_address = address!("0xf39fd6e51aad88f6f4ce6ab8827279cfffb92266"); - let sender_secret = - b256!("0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80"); - - // Build a transaction - let tx = TransactionBuilder::default() - .signer(sender_secret) - .chain_id(84532) - .nonce(0) - .to(address!("0x1111111111111111111111111111111111111111")) - .value(1000) - .gas_limit(21_000) - .max_fee_per_gas(1_000_000_000) // 1 gwei - .max_priority_fee_per_gas(1_000_000_000) - .into_eip1559(); - - let signed_tx = - OpTransactionSigned::Eip1559(tx.as_eip1559().expect("eip1559 transaction").clone()); - let envelope: OpTxEnvelope = signed_tx.into(); - - // Encode transaction - let tx_bytes = Bytes::from(envelope.encoded_2718()); - - let bundle = create_bundle(vec![tx_bytes], 0, None); - - let response: crate::MeterBundleResponse = - client.request("base_meterBundle", (bundle,)).await?; - - assert_eq!(response.results.len(), 1); - assert_eq!(response.total_gas_used, 21_000); - assert!(response.total_execution_time_us > 0); - - let result = &response.results[0]; - assert_eq!(result.from_address, sender_address); - assert_eq!(result.to_address, Some(address!("0x1111111111111111111111111111111111111111"))); - assert_eq!(result.gas_used, 21_000); - assert_eq!(result.gas_price, 1_000_000_000); - assert!(result.execution_time_us > 0); - - Ok(()) - } + let latest_block = ctx.harness().provider().get_block_number().await?; + assert_eq!(response.state_block_number, latest_block); - #[tokio::test] - async fn test_meter_bundle_multiple_transactions() -> eyre::Result<()> { - let node = setup_node().await?; - let client = node.rpc_client().await?; - - // Use funded accounts from genesis.json - // Hardhat account #0 and #1 - let address1 = address!("0xf39fd6e51aad88f6f4ce6ab8827279cfffb92266"); - let secret1 = b256!("0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80"); - - let tx1_inner = TransactionBuilder::default() - .signer(secret1) - .chain_id(84532) - .nonce(0) - .to(address!("0x1111111111111111111111111111111111111111")) - .value(1000) - .gas_limit(21_000) - .max_fee_per_gas(1_000_000_000) - .max_priority_fee_per_gas(1_000_000_000) - .into_eip1559(); - - let tx1_signed = OpTransactionSigned::Eip1559( - tx1_inner.as_eip1559().expect("eip1559 transaction").clone(), - ); - let tx1_envelope: OpTxEnvelope = tx1_signed.into(); - let tx1_bytes = Bytes::from(tx1_envelope.encoded_2718()); - - // Second transaction from second account - let address2 = address!("0x70997970C51812dc3A010C7d01b50e0d17dc79C8"); - let secret2 = b256!("0x59c6995e998f97a5a0044966f0945389dc9e86dae88c7a8412f4603b6b78690d"); - - let tx2_inner = TransactionBuilder::default() - .signer(secret2) - .chain_id(84532) - .nonce(0) - .to(address!("0x2222222222222222222222222222222222222222")) - .value(2000) - .gas_limit(21_000) - .max_fee_per_gas(2_000_000_000) - .max_priority_fee_per_gas(2_000_000_000) - .into_eip1559(); - - let tx2_signed = OpTransactionSigned::Eip1559( - tx2_inner.as_eip1559().expect("eip1559 transaction").clone(), - ); - let tx2_envelope: OpTxEnvelope = tx2_signed.into(); - let tx2_bytes = Bytes::from(tx2_envelope.encoded_2718()); - - let bundle = create_bundle(vec![tx1_bytes, tx2_bytes], 0, None); - - let response: crate::MeterBundleResponse = - client.request("base_meterBundle", (bundle,)).await?; - - assert_eq!(response.results.len(), 2); - assert_eq!(response.total_gas_used, 42_000); - assert!(response.total_execution_time_us > 0); - - // Check first transaction - let result1 = &response.results[0]; - assert_eq!(result1.from_address, address1); - assert_eq!(result1.gas_used, 21_000); - assert_eq!(result1.gas_price, 1_000_000_000); - - // Check second transaction - let result2 = &response.results[1]; - assert_eq!(result2.from_address, address2); - assert_eq!(result2.gas_used, 21_000); - assert_eq!(result2.gas_price, 2_000_000_000); - - Ok(()) - } + Ok(()) +} + +#[tokio::test] +async fn test_meter_bundle_single_transaction() -> Result<()> { + init_silenced_tracing(); + let ctx = RpcTestContext::new().await?; + + let sender_account = &ctx.accounts().alice; + let sender_address = sender_account.address; + let sender_secret = secret_from_hex(sender_account.private_key); + + let tx = TransactionBuilder::default() + .signer(sender_secret) + .chain_id(BASE_CHAIN_ID) + .nonce(0) + .to(address!("0x1111111111111111111111111111111111111111")) + .value(1000) + .gas_limit(21_000) + .max_fee_per_gas(1_000_000_000) + .max_priority_fee_per_gas(1_000_000_000) + .into_eip1559(); + + let signed_tx = OpTransactionSigned::Eip1559(tx.as_eip1559().unwrap().clone()); + let envelope: OpTxEnvelope = signed_tx.into(); + let tx_bytes = Bytes::from(envelope.encoded_2718()); + + let bundle = create_bundle(vec![tx_bytes], 0, None); + let response = ctx.meter_bundle(bundle).await?; + + assert_eq!(response.results.len(), 1); + assert_eq!(response.total_gas_used, 21_000); + assert!(response.total_execution_time_us > 0); + + let result = &response.results[0]; + assert_eq!(result.from_address, sender_address); + assert_eq!(result.to_address, Some(address!("0x1111111111111111111111111111111111111111"))); + assert_eq!(result.gas_used, 21_000); + assert_eq!(result.gas_price, U256::from(1_000_000_000u64)); + assert!(result.execution_time_us > 0); + + Ok(()) +} - #[tokio::test] - async fn test_meter_bundle_invalid_transaction() -> eyre::Result<()> { - let node = setup_node().await?; - let client = node.rpc_client().await?; +#[tokio::test] +async fn test_meter_bundle_multiple_transactions() -> Result<()> { + init_silenced_tracing(); + let ctx = RpcTestContext::new().await?; + + let secret1 = secret_from_hex(ctx.accounts().alice.private_key); + let secret2 = secret_from_hex(ctx.accounts().bob.private_key); + + let tx1 = TransactionBuilder::default() + .signer(secret1) + .chain_id(BASE_CHAIN_ID) + .nonce(0) + .to(address!("0x1111111111111111111111111111111111111111")) + .value(1000) + .gas_limit(21_000) + .max_fee_per_gas(1_000_000_000) + .max_priority_fee_per_gas(1_000_000_000) + .into_eip1559(); + let tx1_bytes = Bytes::from( + OpTxEnvelope::from(OpTransactionSigned::Eip1559(tx1.as_eip1559().unwrap().clone())) + .encoded_2718(), + ); + + let tx2 = TransactionBuilder::default() + .signer(secret2) + .chain_id(BASE_CHAIN_ID) + .nonce(0) + .to(address!("0x2222222222222222222222222222222222222222")) + .value(2000) + .gas_limit(21_000) + .max_fee_per_gas(2_000_000_000) + .max_priority_fee_per_gas(2_000_000_000) + .into_eip1559(); + let tx2_bytes = Bytes::from( + OpTxEnvelope::from(OpTransactionSigned::Eip1559(tx2.as_eip1559().unwrap().clone())) + .encoded_2718(), + ); + + let bundle = create_bundle(vec![tx1_bytes, tx2_bytes], 0, None); + let response = ctx.meter_bundle(bundle).await?; + + assert_eq!(response.results.len(), 2); + assert_eq!(response.total_gas_used, 42_000); + + let result1 = &response.results[0]; + assert_eq!(result1.from_address, ctx.accounts().alice.address); + assert_eq!(result1.gas_price, U256::from(1_000_000_000u64)); + + let result2 = &response.results[1]; + assert_eq!(result2.from_address, ctx.accounts().bob.address); + assert_eq!(result2.gas_price, U256::from(2_000_000_000u64)); + + Ok(()) +} - let bundle = create_bundle( - vec![bytes!("0xdeadbeef")], // Invalid transaction data - 0, - None, - ); +#[tokio::test] +async fn test_meter_bundle_invalid_transaction() -> Result<()> { + init_silenced_tracing(); + let ctx = RpcTestContext::new().await?; - let result: Result = - client.request("base_meterBundle", (bundle,)).await; + let bundle = create_bundle(vec![Bytes::from_static(b"\xde\xad\xbe\xef")], 0, None); + let result = ctx.meter_bundle_raw(bundle).await; - assert!(result.is_err()); + assert!(result.is_err(), "expected invalid transaction to fail"); + Ok(()) +} - Ok(()) - } +#[tokio::test] +async fn test_meter_bundle_uses_latest_block() -> Result<()> { + init_silenced_tracing(); + let ctx = RpcTestContext::new().await?; - #[tokio::test] - async fn test_meter_bundle_uses_latest_block() -> eyre::Result<()> { - let node = setup_node().await?; - let client = node.rpc_client().await?; + ctx.harness().advance_chain(2).await?; - // Metering always uses the latest block state, regardless of bundle.block_number - let bundle = create_bundle(vec![], 0, None); + let bundle = create_bundle(vec![], 0, None); + let response = ctx.meter_bundle(bundle).await?; - let response: crate::MeterBundleResponse = - client.request("base_meterBundle", (bundle,)).await?; + let latest_block = ctx.harness().provider().get_block_number().await?; + assert_eq!(response.state_block_number, latest_block); - // Should return the latest block number (genesis block 0) - assert_eq!(response.state_block_number, 0); + Ok(()) +} - Ok(()) - } +#[tokio::test] +async fn test_meter_bundle_ignores_bundle_block_number() -> Result<()> { + init_silenced_tracing(); + let ctx = RpcTestContext::new().await?; - #[tokio::test] - async fn test_meter_bundle_ignores_bundle_block_number() -> eyre::Result<()> { - let node = setup_node().await?; - let client = node.rpc_client().await?; - - // Even if bundle.block_number is different, it should use the latest block - // In this test, we specify block_number=0 in the bundle - let bundle1 = create_bundle(vec![], 0, None); - let response1: crate::MeterBundleResponse = - client.request("base_meterBundle", (bundle1,)).await?; - - // Try with a different bundle.block_number (999 - arbitrary value) - // Since we can't create future blocks, we use a different value to show it's ignored - let bundle2 = create_bundle(vec![], 999, None); - let response2: crate::MeterBundleResponse = - client.request("base_meterBundle", (bundle2,)).await?; - - // Both should return the same state_block_number (the latest block) - // because the implementation always uses Latest, not bundle.block_number - assert_eq!(response1.state_block_number, response2.state_block_number); - assert_eq!(response1.state_block_number, 0); // Genesis block - - Ok(()) - } + let bundle1 = create_bundle(vec![], 0, None); + let response1 = ctx.meter_bundle(bundle1).await?; - #[tokio::test] - async fn test_meter_bundle_custom_timestamp() -> eyre::Result<()> { - let node = setup_node().await?; - let client = node.rpc_client().await?; + let bundle2 = create_bundle(vec![], 999, None); + let response2 = ctx.meter_bundle(bundle2).await?; - // Test that bundle.min_timestamp is used for simulation. - // The timestamp affects block.timestamp in the EVM during simulation but is not - // returned in the response. - let custom_timestamp = 1234567890; - let bundle = create_bundle(vec![], 0, Some(custom_timestamp)); + assert_eq!(response1.state_block_number, response2.state_block_number); + let latest_block = ctx.harness().provider().get_block_number().await?; + assert_eq!(response1.state_block_number, latest_block); - let response: crate::MeterBundleResponse = - client.request("base_meterBundle", (bundle,)).await?; + Ok(()) +} - // Verify the request succeeded with custom timestamp - assert_eq!(response.results.len(), 0); - assert_eq!(response.total_gas_used, 0); +#[tokio::test] +async fn test_meter_bundle_custom_timestamp() -> Result<()> { + init_silenced_tracing(); + let ctx = RpcTestContext::new().await?; - Ok(()) - } + let custom_timestamp = 1_234_567_890; + let bundle = create_bundle(vec![], 0, Some(custom_timestamp)); + let response = ctx.meter_bundle(bundle).await?; - #[tokio::test] - async fn test_meter_bundle_arbitrary_block_number() -> eyre::Result<()> { - let node = setup_node().await?; - let client = node.rpc_client().await?; + assert_eq!(response.results.len(), 0); + assert_eq!(response.total_gas_used, 0); - // Since we now ignore bundle.block_number and always use the latest block, - // any block_number value should work (it's only used for bundle validity in TIPS) - let bundle = create_bundle(vec![], 999999, None); + Ok(()) +} - let response: crate::MeterBundleResponse = - client.request("base_meterBundle", (bundle,)).await?; +#[tokio::test] +async fn test_meter_bundle_arbitrary_block_number() -> Result<()> { + init_silenced_tracing(); + let ctx = RpcTestContext::new().await?; - // Should succeed and use the latest block (genesis block 0) - assert_eq!(response.state_block_number, 0); + let bundle = create_bundle(vec![], 999_999, None); + let response = ctx.meter_bundle(bundle).await?; - Ok(()) - } + let latest_block = ctx.harness().provider().get_block_number().await?; + assert_eq!(response.state_block_number, latest_block); - #[tokio::test] - async fn test_meter_bundle_gas_calculations() -> eyre::Result<()> { - let node = setup_node().await?; - let client = node.rpc_client().await?; - - // Use two funded accounts from genesis.json with different gas prices - let secret1 = b256!("0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80"); - let secret2 = b256!("0x59c6995e998f97a5a0044966f0945389dc9e86dae88c7a8412f4603b6b78690d"); - - // First transaction with 3 gwei gas price - let tx1_inner = TransactionBuilder::default() - .signer(secret1) - .chain_id(84532) - .nonce(0) - .to(address!("0x1111111111111111111111111111111111111111")) - .value(1000) - .gas_limit(21_000) - .max_fee_per_gas(3_000_000_000) // 3 gwei - .max_priority_fee_per_gas(3_000_000_000) - .into_eip1559(); - - let signed_tx1 = OpTransactionSigned::Eip1559( - tx1_inner.as_eip1559().expect("eip1559 transaction").clone(), - ); - let envelope1: OpTxEnvelope = signed_tx1.into(); - let tx1_bytes = Bytes::from(envelope1.encoded_2718()); - - // Second transaction with 7 gwei gas price - let tx2_inner = TransactionBuilder::default() - .signer(secret2) - .chain_id(84532) - .nonce(0) - .to(address!("0x2222222222222222222222222222222222222222")) - .value(2000) - .gas_limit(21_000) - .max_fee_per_gas(7_000_000_000) // 7 gwei - .max_priority_fee_per_gas(7_000_000_000) - .into_eip1559(); - - let signed_tx2 = OpTransactionSigned::Eip1559( - tx2_inner.as_eip1559().expect("eip1559 transaction").clone(), - ); - let envelope2: OpTxEnvelope = signed_tx2.into(); - let tx2_bytes = Bytes::from(envelope2.encoded_2718()); - - let bundle = create_bundle(vec![tx1_bytes, tx2_bytes], 0, None); - - let response: crate::MeterBundleResponse = - client.request("base_meterBundle", (bundle,)).await?; - - assert_eq!(response.results.len(), 2); - - // Check first transaction (3 gwei) - let result1 = &response.results[0]; - let expected_gas_fees_1 = U256::from(21_000) * U256::from(3_000_000_000u64); - assert_eq!(result1.gas_fees, expected_gas_fees_1); - assert_eq!(result1.gas_price, U256::from(3000000000u64)); - assert_eq!(result1.coinbase_diff, expected_gas_fees_1); - - // Check second transaction (7 gwei) - let result2 = &response.results[1]; - let expected_gas_fees_2 = U256::from(21_000) * U256::from(7_000_000_000u64); - assert_eq!(result2.gas_fees, expected_gas_fees_2); - assert_eq!(result2.gas_price, U256::from(7000000000u64)); - assert_eq!(result2.coinbase_diff, expected_gas_fees_2); - - // Check bundle totals - let total_gas_fees = expected_gas_fees_1 + expected_gas_fees_2; - assert_eq!(response.gas_fees, total_gas_fees); - assert_eq!(response.coinbase_diff, total_gas_fees); - assert_eq!(response.total_gas_used, 42_000); - - // Bundle gas price should be weighted average: (3*21000 + 7*21000) / (21000 + 21000) = 5 gwei - assert_eq!(response.bundle_gas_price, U256::from(5000000000u64)); - - Ok(()) - } + Ok(()) +} + +#[tokio::test] +async fn test_meter_bundle_gas_calculations() -> Result<()> { + init_silenced_tracing(); + let ctx = RpcTestContext::new().await?; + + let secret1 = secret_from_hex(ctx.accounts().alice.private_key); + let secret2 = secret_from_hex(ctx.accounts().bob.private_key); + + let tx1 = TransactionBuilder::default() + .signer(secret1) + .chain_id(BASE_CHAIN_ID) + .nonce(0) + .to(address!("0x1111111111111111111111111111111111111111")) + .value(1000) + .gas_limit(21_000) + .max_fee_per_gas(3_000_000_000) + .max_priority_fee_per_gas(3_000_000_000) + .into_eip1559(); + let tx1_bytes = Bytes::from( + OpTxEnvelope::from(OpTransactionSigned::Eip1559(tx1.as_eip1559().unwrap().clone())) + .encoded_2718(), + ); + + let tx2 = TransactionBuilder::default() + .signer(secret2) + .chain_id(BASE_CHAIN_ID) + .nonce(0) + .to(address!("0x2222222222222222222222222222222222222222")) + .value(2000) + .gas_limit(21_000) + .max_fee_per_gas(7_000_000_000) + .max_priority_fee_per_gas(7_000_000_000) + .into_eip1559(); + let tx2_bytes = Bytes::from( + OpTxEnvelope::from(OpTransactionSigned::Eip1559(tx2.as_eip1559().unwrap().clone())) + .encoded_2718(), + ); + + let bundle = create_bundle(vec![tx1_bytes, tx2_bytes], 0, None); + let response = ctx.meter_bundle(bundle).await?; + + assert_eq!(response.results.len(), 2); + + let expected_fees_1 = U256::from(21_000) * U256::from(3_000_000_000u64); + let expected_fees_2 = U256::from(21_000) * U256::from(7_000_000_000u64); + + assert_eq!(response.results[0].gas_fees, expected_fees_1); + assert_eq!(response.results[0].gas_price, U256::from(3_000_000_000u64)); + assert_eq!(response.results[1].gas_fees, expected_fees_2); + assert_eq!(response.results[1].gas_price, U256::from(7_000_000_000u64)); + + let total_fees = expected_fees_1 + expected_fees_2; + assert_eq!(response.gas_fees, total_fees); + assert_eq!(response.coinbase_diff, total_fees); + assert_eq!(response.total_gas_used, 42_000); + assert_eq!(response.bundle_gas_price, U256::from(5_000_000_000u64)); + + Ok(()) } diff --git a/crates/metering/src/tests/utils.rs b/crates/metering/src/tests/utils.rs index 7bd29fef..7fd3a775 100644 --- a/crates/metering/src/tests/utils.rs +++ b/crates/metering/src/tests/utils.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use alloy_primitives::{B256, hex::FromHex}; use reth::api::{NodeTypes, NodeTypesWithDBAdapter}; use reth_db::{ ClientVersion, DatabaseEnv, init_db, @@ -8,6 +9,10 @@ use reth_db::{ }; use reth_provider::{ProviderFactory, providers::StaticFileProvider}; +pub fn secret_from_hex(hex_key: &str) -> B256 { + B256::from_hex(hex_key).expect("32-byte private key") +} + pub fn create_provider_factory( chain_spec: Arc, ) -> ProviderFactory>>> { diff --git a/crates/node/src/main.rs b/crates/node/src/main.rs index 9162c746..e86d6d3b 100644 --- a/crates/node/src/main.rs +++ b/crates/node/src/main.rs @@ -135,12 +135,6 @@ fn main() { } }) .extend_rpc_modules(move |ctx| { - if metering_enabled { - info!(message = "Starting Metering RPC"); - let metering_api = MeteringApiImpl::new(ctx.provider().clone()); - ctx.modules.merge_configured(metering_api.into_rpc())?; - } - if flashblocks_enabled { info!(message = "Starting Flashblocks"); @@ -166,12 +160,23 @@ fn main() { let api_ext = EthApiExt::new( ctx.registry.eth_api().clone(), ctx.registry.eth_handlers().filter.clone(), - fb, + fb.clone(), ); ctx.modules.replace_configured(api_ext.into_rpc())?; + + if metering_enabled { + info!(message = "Starting Metering RPC with Flashblocks state"); + let metering_api = MeteringApiImpl::new(ctx.provider().clone(), fb); + ctx.modules.merge_configured(metering_api.into_rpc())?; + } } else { info!(message = "flashblocks integration is disabled"); + if metering_enabled { + return Err(eyre::eyre!( + "Metering RPC requires flashblocks to be enabled (--websocket-url)" + )); + } } Ok(()) }) diff --git a/crates/test-utils/README.md b/crates/test-utils/README.md index 70f7f37e..32842b2c 100644 --- a/crates/test-utils/README.md +++ b/crates/test-utils/README.md @@ -226,9 +226,21 @@ async fn test_flashblocks() -> eyre::Result<()> { } ``` -`FlashblocksHarness` derefs to the base `TestHarness`, so you can keep using methods like `provider()`, `build_block_from_transactions`, etc. +Need to craft a flashblock (including intentionally malformed payloads)? Use `build_flashblock`: -Test flashblocks delivery without WebSocket connections by constructing payloads and sending them through `FlashblocksHarness` (or the lower-level `FlashblocksLocalNode`). +```rust +let flashblock = harness.build_flashblock( + next_block_number, + parent_hash, + B256::ZERO, // force missing beacon root to test validation + timestamp, + gas_limit, + vec![(tx_bytes, Some((tx_hash, receipt)))], +); +harness.send_flashblock(flashblock).await?; +``` + +`FlashblocksHarness` derefs to the base `TestHarness`, so you can keep using methods like `provider()`, `build_block_from_transactions`, etc. Test flashblocks delivery without WebSocket connections by constructing payloads and sending them through `FlashblocksHarness` (or the lower-level `FlashblocksLocalNode`). ## Configuration Constants diff --git a/crates/test-utils/src/flashblocks_harness.rs b/crates/test-utils/src/flashblocks_harness.rs index 6ed61caf..68fcf7a4 100644 --- a/crates/test-utils/src/flashblocks_harness.rs +++ b/crates/test-utils/src/flashblocks_harness.rs @@ -1,11 +1,17 @@ -use std::{ops::Deref, sync::Arc}; +use std::{collections::HashMap, ops::Deref, sync::Arc}; -use base_reth_flashblocks_rpc::subscription::Flashblock; +use alloy_consensus::Receipt; +use alloy_primitives::{Address, B256, Bytes, U256, b256, bytes}; +use alloy_rpc_types_engine::PayloadId; +use base_reth_flashblocks_rpc::subscription::{Flashblock, Metadata}; use eyre::Result; use futures_util::Future; +use op_alloy_consensus::OpDepositReceipt; use reth::builder::NodeHandle; use reth_e2e_test_utils::Adapter; use reth_optimism_node::OpNode; +use reth_optimism_primitives::OpReceipt; +use rollup_boost::{ExecutionPayloadBaseV1, ExecutionPayloadFlashblockDeltaV1}; use crate::{ harness::TestHarness, @@ -16,6 +22,13 @@ use crate::{ tracing::init_silenced_tracing, }; +const FLASHBLOCK_PAYLOAD_ID: [u8; 8] = [0; 8]; +const BLOCK_INFO_DEPOSIT_TX: Bytes = bytes!( + "0x7ef90104a06c0c775b6b492bab9d7e81abdf27f77cafb698551226455a82f559e0f93fea3794deaddeaddeaddeaddeaddeaddeaddeaddead00019442000000000000000000000000000000000000158080830f424080b8b0098999be000008dd00101c1200000000000000020000000068869d6300000000015f277f000000000000000000000000000000000000000000000000000000000d42ac290000000000000000000000000000000000000000000000000000000000000001abf52777e63959936b1bf633a2a643f0da38d63deffe49452fed1bf8a44975d50000000000000000000000005050f69a9786f081509234f1a7f4684b5e5b76c9000000000000000000000000" +); +const BLOCK_INFO_DEPOSIT_TX_HASH: B256 = + b256!("0xba56c8b0deb460ff070f8fca8e2ee01e51a3db27841cc862fdd94cc1a47662b6"); + pub struct FlashblocksHarness { inner: TestHarness, parts: FlashblocksParts, @@ -68,6 +81,61 @@ impl FlashblocksHarness { Ok(()) } + /// Builds a flashblock payload for testing. Callers can intentionally pass invalid + /// values (for example a zeroed beacon root) to assert how downstream components + /// react to malformed flashblocks. + #[allow(clippy::too_many_arguments)] + pub fn build_flashblock( + &self, + block_number: u64, + parent_hash: B256, + parent_beacon_block_root: B256, + timestamp: u64, + gas_limit: u64, + transactions: Vec<(Bytes, Option<(B256, OpReceipt)>)>, + ) -> Flashblock { + let base = ExecutionPayloadBaseV1 { + parent_beacon_block_root, + parent_hash, + fee_recipient: Address::ZERO, + prev_randao: B256::ZERO, + block_number, + gas_limit, + timestamp, + extra_data: Bytes::new(), + base_fee_per_gas: U256::from(1), + }; + + let mut flashblock_txs = vec![BLOCK_INFO_DEPOSIT_TX.clone()]; + let mut receipts = HashMap::default(); + receipts.insert( + BLOCK_INFO_DEPOSIT_TX_HASH, + OpReceipt::Deposit(OpDepositReceipt { + inner: Receipt { status: true.into(), cumulative_gas_used: 10_000, logs: vec![] }, + deposit_nonce: Some(4_012_991u64), + deposit_receipt_version: None, + }), + ); + + for (tx_bytes, maybe_receipt) in transactions { + if let Some((hash, receipt)) = maybe_receipt { + receipts.insert(hash, receipt); + } + flashblock_txs.push(tx_bytes); + } + + Flashblock { + payload_id: PayloadId::new(FLASHBLOCK_PAYLOAD_ID), + index: 0, + base: Some(base), + diff: ExecutionPayloadFlashblockDeltaV1 { + transactions: flashblock_txs, + ..Default::default() + }, + metadata: Metadata { receipts, new_account_balances: Default::default(), block_number }, + } + } + pub fn into_inner(self) -> TestHarness { self.inner }