diff --git a/Cargo.lock b/Cargo.lock index 1d3ec96f..08bf342b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1044,6 +1044,12 @@ dependencies = [ "libc", ] +[[package]] +name = "anes" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" + [[package]] name = "anstream" version = "0.6.18" @@ -2373,6 +2379,12 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df8670b8c7b9dae1793364eafadf7239c40d669904660c5960d74cfd80b46a53" +[[package]] +name = "cast" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" + [[package]] name = "castaway" version = "0.2.3" @@ -2435,6 +2447,33 @@ dependencies = [ "windows-link", ] +[[package]] +name = "ciborium" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42e69ffd6f0917f5c029256a24d0161db17cea3997d185db0d35926308770f0e" +dependencies = [ + "ciborium-io", + "ciborium-ll", + "serde", +] + +[[package]] +name = "ciborium-io" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05afea1e0a06c9be33d539b876f1ce3692f4afea2cb41f740e7743225ed1c757" + +[[package]] +name = "ciborium-ll" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57663b653d948a338bfb3eeba9bb2fd5fcfaecb9e199e87e1eda4d9e8b240fd9" +dependencies = [ + "ciborium-io", + "half", +] + [[package]] name = "cipher" version = "0.4.4" @@ -2750,6 +2789,40 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "criterion" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3bf7af66b0989381bd0be551bd7cc91912a655a58c6918420c9527b1fd8b4679" +dependencies = [ + "anes", + "cast", + "ciborium", + "clap", + "criterion-plot", + "itertools 0.13.0", + "num-traits", + "oorandom", + "plotters", + "rayon", + "regex", + "serde", + "serde_json", + "tinytemplate", + "tokio", + "walkdir", +] + +[[package]] +name = "criterion-plot" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b50826342786a51a89e2da3a28f1c32b06e387201bc2d19791f622c673706b1" +dependencies = [ + "cast", + "itertools 0.10.5", +] + [[package]] name = "critical-section" version = "1.2.0" @@ -3970,6 +4043,16 @@ dependencies = [ "tracing", ] +[[package]] +name = "half" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "459196ed295495a68f7d7fe1d84f6c4b7ff0e21fe3017b2f283c6fac3ad803c9" +dependencies = [ + "cfg-if", + "crunchy", +] + [[package]] name = "hash-db" version = "0.15.2" @@ -5837,6 +5920,12 @@ version = "1.70.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4895175b425cb1f87721b59f0f286c2092bd4af812243672510e1ac53e2e0ad" +[[package]] +name = "oorandom" +version = "11.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6790f58c7ff633d8771f42965289203411a5e5c68388703c06e14f24770b41e" + [[package]] name = "op-alloy-consensus" version = "0.18.9" @@ -6276,6 +6365,34 @@ dependencies = [ "crunchy", ] +[[package]] +name = "plotters" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5aeb6f403d7a4911efb1e33402027fc44f29b5bf6def3effcc22d7bb75f2b747" +dependencies = [ + "num-traits", + "plotters-backend", + "plotters-svg", + "wasm-bindgen", + "web-sys", +] + +[[package]] +name = "plotters-backend" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df42e13c12958a16b3f7f4386b9ab1f3e7933914ecea48da7139435263a4172a" + +[[package]] +name = "plotters-svg" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51bae2ac328883f7acdfea3d66a7c35751187f870bc81f94563733a154d7a670" +dependencies = [ + "plotters-backend", +] + [[package]] name = "polyval" version = "0.6.2" @@ -9745,7 +9862,7 @@ dependencies = [ [[package]] name = "revm" version = "26.0.1" -source = "git+https://github.com/scroll-tech/revm?branch=feat/reth-v78#64e018f80e65d79505591aacec4f35ec46bca5ff" +source = "git+https://github.com/scroll-tech/revm?branch=feat/reth-v78#c143b332683b41849632cc482bddbcb1be0d8d6f" dependencies = [ "revm-bytecode", "revm-context", @@ -9763,7 +9880,7 @@ dependencies = [ [[package]] name = "revm-bytecode" version = "5.0.0" -source = "git+https://github.com/scroll-tech/revm?branch=feat/reth-v78#64e018f80e65d79505591aacec4f35ec46bca5ff" +source = "git+https://github.com/scroll-tech/revm?branch=feat/reth-v78#c143b332683b41849632cc482bddbcb1be0d8d6f" dependencies = [ "bitvec", "once_cell", @@ -9775,7 +9892,7 @@ dependencies = [ [[package]] name = "revm-context" version = "7.0.1" -source = "git+https://github.com/scroll-tech/revm?branch=feat/reth-v78#64e018f80e65d79505591aacec4f35ec46bca5ff" +source = "git+https://github.com/scroll-tech/revm?branch=feat/reth-v78#c143b332683b41849632cc482bddbcb1be0d8d6f" dependencies = [ "cfg-if", "derive-where", @@ -9790,7 +9907,7 @@ dependencies = [ [[package]] name = "revm-context-interface" version = "7.0.1" -source = "git+https://github.com/scroll-tech/revm?branch=feat/reth-v78#64e018f80e65d79505591aacec4f35ec46bca5ff" +source = "git+https://github.com/scroll-tech/revm?branch=feat/reth-v78#c143b332683b41849632cc482bddbcb1be0d8d6f" dependencies = [ "alloy-eip2930", "alloy-eip7702", @@ -9805,7 +9922,7 @@ dependencies = [ [[package]] name = "revm-database" version = "6.0.0" -source = "git+https://github.com/scroll-tech/revm?branch=feat/reth-v78#64e018f80e65d79505591aacec4f35ec46bca5ff" +source = "git+https://github.com/scroll-tech/revm?branch=feat/reth-v78#c143b332683b41849632cc482bddbcb1be0d8d6f" dependencies = [ "alloy-eips 1.0.20", "revm-bytecode", @@ -9818,7 +9935,7 @@ dependencies = [ [[package]] name = "revm-database-interface" version = "6.0.0" -source = "git+https://github.com/scroll-tech/revm?branch=feat/reth-v78#64e018f80e65d79505591aacec4f35ec46bca5ff" +source = "git+https://github.com/scroll-tech/revm?branch=feat/reth-v78#c143b332683b41849632cc482bddbcb1be0d8d6f" dependencies = [ "auto_impl", "revm-primitives", @@ -9829,7 +9946,7 @@ dependencies = [ [[package]] name = "revm-handler" version = "7.0.1" -source = "git+https://github.com/scroll-tech/revm?branch=feat/reth-v78#64e018f80e65d79505591aacec4f35ec46bca5ff" +source = "git+https://github.com/scroll-tech/revm?branch=feat/reth-v78#c143b332683b41849632cc482bddbcb1be0d8d6f" dependencies = [ "auto_impl", "derive-where", @@ -9847,7 +9964,7 @@ dependencies = [ [[package]] name = "revm-inspector" version = "7.0.1" -source = "git+https://github.com/scroll-tech/revm?branch=feat/reth-v78#64e018f80e65d79505591aacec4f35ec46bca5ff" +source = "git+https://github.com/scroll-tech/revm?branch=feat/reth-v78#c143b332683b41849632cc482bddbcb1be0d8d6f" dependencies = [ "auto_impl", "either", @@ -9882,7 +9999,7 @@ dependencies = [ [[package]] name = "revm-interpreter" version = "22.0.1" -source = "git+https://github.com/scroll-tech/revm?branch=feat/reth-v78#64e018f80e65d79505591aacec4f35ec46bca5ff" +source = "git+https://github.com/scroll-tech/revm?branch=feat/reth-v78#c143b332683b41849632cc482bddbcb1be0d8d6f" dependencies = [ "revm-bytecode", "revm-context-interface", @@ -9893,7 +10010,7 @@ dependencies = [ [[package]] name = "revm-precompile" version = "23.0.0" -source = "git+https://github.com/scroll-tech/revm?branch=feat/reth-v78#64e018f80e65d79505591aacec4f35ec46bca5ff" +source = "git+https://github.com/scroll-tech/revm?branch=feat/reth-v78#c143b332683b41849632cc482bddbcb1be0d8d6f" dependencies = [ "ark-bls12-381", "ark-bn254", @@ -9918,7 +10035,7 @@ dependencies = [ [[package]] name = "revm-primitives" version = "20.0.0" -source = "git+https://github.com/scroll-tech/revm?branch=feat/reth-v78#64e018f80e65d79505591aacec4f35ec46bca5ff" +source = "git+https://github.com/scroll-tech/revm?branch=feat/reth-v78#c143b332683b41849632cc482bddbcb1be0d8d6f" dependencies = [ "alloy-primitives", "num_enum", @@ -9942,7 +10059,7 @@ dependencies = [ [[package]] name = "revm-state" version = "6.0.0" -source = "git+https://github.com/scroll-tech/revm?branch=feat/reth-v78#64e018f80e65d79505591aacec4f35ec46bca5ff" +source = "git+https://github.com/scroll-tech/revm?branch=feat/reth-v78#c143b332683b41849632cc482bddbcb1be0d8d6f" dependencies = [ "bitflags 2.9.1", "revm-bytecode", @@ -10847,6 +10964,7 @@ dependencies = [ "alloy-primitives", "alloy-rpc-types-engine 1.0.20", "async-trait", + "criterion", "eyre", "futures", "metrics", @@ -12128,6 +12246,16 @@ dependencies = [ "zerovec", ] +[[package]] +name = "tinytemplate" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be4d6b5f19ff7664e8c98d03e2139cb510db9b0a60b55f8e8709b689d939b6bc" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "tinyvec" version = "1.9.0" diff --git a/crates/derivation-pipeline/Cargo.toml b/crates/derivation-pipeline/Cargo.toml index da597e2a..3dff6669 100644 --- a/crates/derivation-pipeline/Cargo.toml +++ b/crates/derivation-pipeline/Cargo.toml @@ -35,10 +35,12 @@ tracing.workspace = true [dev-dependencies] async-trait.workspace = true +alloy-primitives = { workspace = true, features = ["getrandom"] } +criterion = { version = "0.6", features = ["async", "async_tokio"] } eyre.workspace = true scroll-db = { workspace = true, features = ["test-utils"] } scroll-codec = { workspace = true, features = ["test-utils"] } -tokio = { workspace = true, features = ["macros"] } +tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } [features] default = ["std"] @@ -52,3 +54,7 @@ std = [ "scroll-alloy-rpc-types-engine/std", "futures/std", ] + +[[bench]] +name = "pipeline" +harness = false diff --git a/crates/derivation-pipeline/benches/pipeline.rs b/crates/derivation-pipeline/benches/pipeline.rs new file mode 100644 index 00000000..1349d969 --- /dev/null +++ b/crates/derivation-pipeline/benches/pipeline.rs @@ -0,0 +1,106 @@ +//! Benchmarks for the derivation pipeline. + +#![allow(missing_docs)] + +use std::sync::Arc; + +use alloy_primitives::{address, b256, bytes, U256}; +use criterion::{criterion_group, criterion_main, Criterion}; +use futures::StreamExt; +use rollup_node_primitives::{BatchCommitData, BatchInfo, L1MessageEnvelope}; +use rollup_node_providers::{test_utils::NoBlobProvider, DatabaseL1MessageProvider}; +use scroll_alloy_consensus::TxL1Message; +use scroll_codec::decoding::test_utils::read_to_bytes; +use scroll_db::{test_utils::setup_test_db, Database, DatabaseOperations}; +use scroll_derivation_pipeline::DerivationPipeline; +use tokio::runtime::{Handle, Runtime}; + +async fn setup_pipeline( +) -> DerivationPipeline>>> { + // load batch data in the db. + let db = Arc::new(setup_test_db().await); + let raw_calldata = read_to_bytes("./testdata/calldata_v0.bin").unwrap(); + let batch_data = BatchCommitData { + hash: b256!("7f26edf8e3decbc1620b4d2ba5f010a6bdd10d6bb16430c4f458134e36ab3961"), + index: 12, + block_number: 18319648, + block_timestamp: 1696935971, + calldata: Arc::new(raw_calldata), + blob_versioned_hash: None, + finalized_block_number: None, + }; + db.insert_batch(batch_data).await.unwrap(); + + // load messages in db. + let l1_messages = vec![ + L1MessageEnvelope { + l1_block_number: 717, + l2_block_number: None, + queue_hash: None, + transaction: TxL1Message { + queue_index: 33, + gas_limit: 168000, + to: address!("781e90f1c8Fc4611c9b7497C3B47F99Ef6969CbC"), + value: U256::ZERO, + sender: address!("7885BcBd5CeCEf1336b5300fb5186A12DDD8c478"), + input: bytes!("8ef1332e0000000000000000000000007f2b8c31f88b6006c382775eea88297ec1e3e9050000000000000000000000006ea73e05adc79974b931123675ea8f78ffdacdf0000000000000000000000000000000000000000000000000006a94d74f430000000000000000000000000000000000000000000000000000000000000000002100000000000000000000000000000000000000000000000000000000000000a000000000000000000000000000000000000000000000000000000000000000a4232e8748000000000000000000000000ca266224613396a0e8d4c2497dbc4f33dd6cdeff000000000000000000000000ca266224613396a0e8d4c2497dbc4f33dd6cdeff000000000000000000000000000000000000000000000000006a94d74f4300000000000000000000000000000000000000000000000000000000000000000080000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), + }, + }, + L1MessageEnvelope { + l1_block_number: 717, + l2_block_number: None, + queue_hash: None, + transaction: TxL1Message { + queue_index: 34, + gas_limit: 168000, + to: address!("781e90f1c8fc4611c9b7497c3b47f99ef6969cbc"), + value: U256::ZERO, + sender: address!("7885BcBd5CeCEf1336b5300fb5186A12DDD8c478"), + input: bytes!("8ef1332e0000000000000000000000007f2b8c31f88b6006c382775eea88297ec1e3e9050000000000000000000000006ea73e05adc79974b931123675ea8f78ffdacdf000000000000000000000000000000000000000000000000000470de4df820000000000000000000000000000000000000000000000000000000000000000002200000000000000000000000000000000000000000000000000000000000000a000000000000000000000000000000000000000000000000000000000000000a4232e8748000000000000000000000000982fe4a7cbd74bb3422ebe46333c3e8046c12c7f000000000000000000000000982fe4a7cbd74bb3422ebe46333c3e8046c12c7f00000000000000000000000000000000000000000000000000470de4df8200000000000000000000000000000000000000000000000000000000000000000080000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"), + }, + }, + ]; + for message in l1_messages { + db.insert_l1_message(message).await.unwrap(); + } + + // construct the pipeline. + let l1_messages_provider = DatabaseL1MessageProvider::new(db.clone(), 0); + let mock_l1_provider = NoBlobProvider { l1_messages_provider }; + DerivationPipeline::new(mock_l1_provider, db) +} + +fn benchmark_pipeline_derivation(c: &mut Criterion) { + let rt = Runtime::new().unwrap(); + + c.bench_function("pipeline_derive_1000_batches", |b| { + b.to_async(&rt).iter_batched( + || { + let (tx, rx) = std::sync::mpsc::channel(); + Handle::current().spawn(async move { + // setup (not measured): create fresh pipeline with 1000 committed batches + let mut pipeline = setup_pipeline().await; + let batch_info = BatchInfo { index: 12, hash: Default::default() }; + + // commit 1000 batches. + for _ in 0..1000 { + pipeline.handle_batch_commit(batch_info, 0); + } + + tx.send(pipeline).unwrap(); + }); + rx.recv().unwrap() + }, + |mut pipeline| async move { + // measured work: derive 1000 batches. + for _ in 0..1000 { + let _ = pipeline.next().await.unwrap(); + } + }, + criterion::BatchSize::SmallInput, + ) + }); +} + +criterion_group!(benches, benchmark_pipeline_derivation); +criterion_main!(benches); diff --git a/crates/derivation-pipeline/src/lib.rs b/crates/derivation-pipeline/src/lib.rs index 8ec64242..bb0d0050 100644 --- a/crates/derivation-pipeline/src/lib.rs +++ b/crates/derivation-pipeline/src/lib.rs @@ -12,7 +12,7 @@ mod metrics; pub use metrics::DerivationPipelineMetrics; use crate::data_source::CodecDataSource; -use std::{boxed::Box, collections::VecDeque, sync::Arc, time::Instant, vec::Vec}; +use std::{boxed::Box, collections::VecDeque, fmt::Formatter, sync::Arc, time::Instant, vec::Vec}; use alloy_primitives::{Address, B256}; use alloy_rpc_types_engine::PayloadAttributes; @@ -22,8 +22,10 @@ use core::{ pin::Pin, task::{Context, Poll, Waker}, }; -use futures::{ready, stream::FuturesOrdered, Stream, StreamExt}; -use rollup_node_primitives::{BatchCommitData, BatchInfo, ScrollPayloadAttributesWithBatchInfo}; +use futures::{FutureExt, Stream}; +use rollup_node_primitives::{ + BatchCommitData, BatchInfo, ScrollPayloadAttributesWithBatchInfo, WithBlockNumber, +}; use rollup_node_providers::{BlockDataProvider, L1Provider}; use scroll_alloy_rpc_types_engine::{BlockDataHint, ScrollPayloadAttributes}; use scroll_codec::Codec; @@ -41,28 +43,41 @@ type DerivationPipelineFuture = Pin< >, >; -/// Limit the amount of pipeline futures allowed to be polled concurrently. -const MAX_CONCURRENT_DERIVATION_PIPELINE_FUTS: usize = 100; - /// A structure holding the current unresolved futures for the derivation pipeline. -#[derive(Debug)] pub struct DerivationPipeline

{ /// The current derivation pipeline futures polled. - pipeline_futures: FuturesOrdered, + pipeline_future: Option>, /// A reference to the database. database: Arc, /// A L1 provider. l1_provider: P, /// The queue of batches to handle. - batch_queue: VecDeque>, + batch_queue: VecDeque>>, /// The queue of polled attributes. - attributes_queue: VecDeque, + attributes_queue: VecDeque>, /// The waker for the pipeline. waker: Option, /// The metrics of the pipeline. metrics: DerivationPipelineMetrics, } +impl Debug for DerivationPipeline

{ + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("DerivationPipeline") + .field( + "pipeline_future", + &self.pipeline_future.as_ref().map(|_| "Some( ... )").unwrap_or("None"), + ) + .field("database", &self.database) + .field("l1_provider", &self.l1_provider) + .field("batch_queue", &self.batch_queue) + .field("attributes_queue", &self.attributes_queue) + .field("waker", &self.waker) + .field("metrics", &self.metrics) + .finish() + } +} + impl

DerivationPipeline

where P: L1Provider + Clone + Send + Sync + 'static, @@ -73,7 +88,7 @@ where database, l1_provider, batch_queue: Default::default(), - pipeline_futures: Default::default(), + pipeline_future: None, attributes_queue: Default::default(), waker: None, metrics: DerivationPipelineMetrics::default(), @@ -82,9 +97,9 @@ where /// Handles a new batch commit index by pushing it in its internal queue. /// Wakes the waker in order to trigger a call to poll. - pub fn handle_batch_commit(&mut self, batch_info: BatchInfo) { + pub fn handle_batch_commit(&mut self, batch_info: BatchInfo, l1_block_number: u64) { let block_info = Arc::new(batch_info); - self.batch_queue.push_back(block_info); + self.batch_queue.push_back(WithBlockNumber::new(l1_block_number, block_info)); if let Some(waker) = self.waker.take() { waker.wake() } @@ -92,21 +107,24 @@ where /// Handles the next batch index in the batch index queue, pushing the future in the pipeline /// futures. - fn handle_next_batch(&mut self) -> Option { + fn handle_next_batch(&mut self) -> Option> { let database = self.database.clone(); let metrics = self.metrics.clone(); let provider = self.l1_provider.clone(); if let Some(info) = self.batch_queue.pop_front() { + let block_number = info.number; let fut = Box::pin(async move { let derive_start = Instant::now(); // get the batch commit data. + let index = info.inner.index; + let info = info.inner; let batch = database - .get_batch_by_index(info.index) + .get_batch_by_index(index) .await .map_err(|err| (info.clone(), err.into()))? - .ok_or((info.clone(), DerivationPipelineError::UnknownBatch(info.index)))?; + .ok_or((info.clone(), DerivationPipelineError::UnknownBatch(index)))?; // derive the attributes and attach the corresponding batch info. let attrs = @@ -119,16 +137,28 @@ where Ok(attrs.into_iter().map(|attr| (attr, *info).into()).collect()) }); - return Some(fut); + return Some(WithBlockNumber::new(block_number, fut)); } None } + /// Clear attributes, batches and future for which the associated block number > + /// `l1_block_number`. + pub fn handle_reorg(&mut self, l1_block_number: u64) { + self.batch_queue.retain(|batch| batch.number <= l1_block_number); + if let Some(fut) = &mut self.pipeline_future { + if fut.number > l1_block_number { + self.pipeline_future = None; + } + } + self.attributes_queue.retain(|attr| attr.number <= l1_block_number); + } + /// Flushes all the data in the pipeline. pub fn flush(&mut self) { self.attributes_queue.clear(); self.batch_queue.clear(); - self.pipeline_futures = FuturesOrdered::new(); + self.pipeline_future = None; } } @@ -143,36 +173,37 @@ where // return attributes from the queue if any. if let Some(attribute) = this.attributes_queue.pop_front() { - return Poll::Ready(Some(attribute)) + return Poll::Ready(Some(attribute.inner)) } - // if futures are empty and the batch queue is empty, store the waker - // and return. - if this.pipeline_futures.is_empty() && this.batch_queue.is_empty() { + // if future is None and the batch queue is empty, store the waker and return. + if this.pipeline_future.is_none() && this.batch_queue.is_empty() { this.waker = Some(cx.waker().clone()); return Poll::Pending } - // if the futures can still grow, handle the next batch. - if this.pipeline_futures.len() < MAX_CONCURRENT_DERIVATION_PIPELINE_FUTS { - if let Some(fut) = this.handle_next_batch() { - this.pipeline_futures.push_back(fut) - } + // if the future is None, handle the next batch. + if this.pipeline_future.is_none() { + this.pipeline_future = this.handle_next_batch() } // poll the futures and handle result. - if let Some(res) = ready!(this.pipeline_futures.poll_next_unpin(cx)) { + if let Some(Poll::Ready(res)) = this.pipeline_future.as_mut().map(|fut| fut.poll_unpin(cx)) + { match res { - Ok(attributes) => { + WithBlockNumber { inner: Ok(attributes), number } => { + let attributes = + attributes.into_iter().map(|attr| WithBlockNumber::new(number, attr)); this.attributes_queue.extend(attributes); + this.pipeline_future = None; cx.waker().wake_by_ref(); } - Err((batch_info, err)) => { + WithBlockNumber { inner: Err((batch_info, err)), number } => { tracing::error!(target: "scroll::node::derivation_pipeline", batch_info = ?*batch_info, ?err, "failed to derive payload attributes for batch"); // retry polling the same batch. - this.batch_queue.push_front(batch_info); + this.batch_queue.push_front(WithBlockNumber::new(number, batch_info)); let fut = this.handle_next_batch().expect("Pushed batch info into queue"); - this.pipeline_futures.push_front(fut); + this.pipeline_future = Some(fut) } } } @@ -180,7 +211,6 @@ where } } -/// TODO(bench): add criterion bench. /// Returns a vector of [`ScrollPayloadAttributes`] from the [`BatchCommitData`] and a /// [`L1Provider`]. pub async fn derive( @@ -271,9 +301,11 @@ mod tests { use alloy_eips::{eip4844::Blob, Decodable2718}; use alloy_primitives::{address, b256, bytes, U256}; use core::sync::atomic::{AtomicU64, Ordering}; + use futures::StreamExt; use rollup_node_primitives::L1MessageEnvelope; use rollup_node_providers::{ - DatabaseL1MessageProvider, L1BlobProvider, L1MessageProvider, L1ProviderError, + test_utils::NoBlobProvider, DatabaseL1MessageProvider, L1BlobProvider, L1MessageProvider, + L1ProviderError, }; use scroll_alloy_consensus::TxL1Message; use scroll_alloy_rpc_types_engine::BlockDataHint; @@ -323,42 +355,6 @@ mod tests { } } - #[derive(Clone)] - struct MockL1Provider { - l1_messages_provider: P, - } - - #[async_trait::async_trait] - impl L1BlobProvider for MockL1Provider

{ - async fn blob( - &self, - _block_timestamp: u64, - _hash: B256, - ) -> Result>, L1ProviderError> { - Ok(None) - } - } - - #[async_trait::async_trait] - impl L1MessageProvider for MockL1Provider

{ - type Error = P::Error; - - async fn get_l1_message_with_block_number( - &self, - ) -> Result, Self::Error> { - self.l1_messages_provider.get_l1_message_with_block_number().await - } - fn set_queue_index_cursor(&self, index: u64) { - self.l1_messages_provider.set_queue_index_cursor(index); - } - async fn set_hash_cursor(&self, hash: B256) { - self.l1_messages_provider.set_hash_cursor(hash).await - } - fn increment_cursor(&self) { - self.l1_messages_provider.increment_cursor() - } - } - struct MockL2Provider; #[async_trait::async_trait] @@ -424,11 +420,11 @@ mod tests { // construct the pipeline. let l1_messages_provider = DatabaseL1MessageProvider::new(db.clone(), 0); - let mock_l1_provider = MockL1Provider { l1_messages_provider }; + let mock_l1_provider = NoBlobProvider { l1_messages_provider }; let mut pipeline = DerivationPipeline::new(mock_l1_provider, db); // as long as we don't call `handle_commit_batch`, pipeline should not return attributes. - pipeline.handle_batch_commit(BatchInfo { index: 12, hash: Default::default() }); + pipeline.handle_batch_commit(BatchInfo { index: 12, hash: Default::default() }, 0); // we should find some attributes now assert!(pipeline.next().await.is_some()); @@ -614,4 +610,76 @@ mod tests { assert_eq!(expected_l1_messages, derived_l1_messages); Ok(()) } + + async fn new_test_pipeline( + ) -> DerivationPipeline>>> { + let initial_block = 200; + + let batches = (initial_block - 100..initial_block) + .map(|i| WithBlockNumber::new(i, Arc::new(BatchInfo::new(i, B256::random())))); + let attributes = (initial_block..initial_block + 100) + .zip(batches.clone()) + .map(|(i, batch)| { + WithBlockNumber::new( + i, + ScrollPayloadAttributesWithBatchInfo { + batch_info: *batch.inner, + ..Default::default() + }, + ) + }) + .collect(); + + let db = Arc::new(setup_test_db().await); + let l1_messages_provider = DatabaseL1MessageProvider::new(db.clone(), 0); + let mock_l1_provider = NoBlobProvider { l1_messages_provider }; + + DerivationPipeline { + pipeline_future: Some(WithBlockNumber::new( + initial_block, + Box::pin(async { Ok(vec![]) }), + )), + database: db, + l1_provider: mock_l1_provider, + batch_queue: batches.collect(), + attributes_queue: attributes, + waker: None, + metrics: Default::default(), + } + } + + #[tokio::test] + async fn test_should_handle_reorgs() -> eyre::Result<()> { + // set up pipeline. + let mut pipeline = new_test_pipeline().await; + + // reorg at block 0. + pipeline.handle_reorg(0); + // should completely clear the pipeline. + assert!(pipeline.batch_queue.is_empty()); + assert!(pipeline.pipeline_future.is_none()); + assert!(pipeline.attributes_queue.is_empty()); + + // set up pipeline. + let mut pipeline = new_test_pipeline().await; + + // reorg at block 200. + pipeline.handle_reorg(200); + // should clear all but one attribute and retain all batches and the pending future. + assert_eq!(pipeline.batch_queue.len(), 100); + assert!(pipeline.pipeline_future.is_some()); + assert_eq!(pipeline.attributes_queue.len(), 1); + + // set up pipeline. + let mut pipeline = new_test_pipeline().await; + + // reorg at block 300. + pipeline.handle_reorg(300); + // should retain all batches, attributes and the pending future. + assert_eq!(pipeline.batch_queue.len(), 100); + assert!(pipeline.pipeline_future.is_some()); + assert_eq!(pipeline.attributes_queue.len(), 100); + + Ok(()) + } } diff --git a/crates/indexer/src/event.rs b/crates/indexer/src/event.rs index 5b9b7051..64288bae 100644 --- a/crates/indexer/src/event.rs +++ b/crates/indexer/src/event.rs @@ -9,6 +9,8 @@ pub enum IndexerEvent { BatchCommitIndexed { /// The batch info. batch_info: BatchInfo, + /// The L1 block number in which the batch was committed. + l1_block_number: u64, /// The safe L2 block info. safe_head: Option, }, diff --git a/crates/indexer/src/lib.rs b/crates/indexer/src/lib.rs index afadeb1d..0a0afcc8 100644 --- a/crates/indexer/src/lib.rs +++ b/crates/indexer/src/lib.rs @@ -266,6 +266,7 @@ impl Indexer< let event = IndexerEvent::BatchCommitIndexed { batch_info: BatchInfo::new(batch.index, batch.hash), safe_head: new_safe_head, + l1_block_number: batch.block_number, }; // insert the batch and commit the transaction. @@ -377,7 +378,7 @@ mod test { // Verify the event structure match event { - IndexerEvent::BatchCommitIndexed { batch_info, safe_head } => { + IndexerEvent::BatchCommitIndexed { batch_info, safe_head, .. } => { assert_eq!(batch_info.index, batch_commit.index); assert_eq!(batch_info.hash, batch_commit.hash); assert_eq!(safe_head, None); // No safe head since no batch revert @@ -420,7 +421,7 @@ mod test { indexer.handle_l1_notification(L1Notification::BatchCommit(batch_1.clone())); let event = indexer.next().await.unwrap().unwrap(); match event { - IndexerEvent::BatchCommitIndexed { batch_info, safe_head } => { + IndexerEvent::BatchCommitIndexed { batch_info, safe_head, .. } => { assert_eq!(batch_info.index, 100); assert_eq!(safe_head, None); } @@ -431,7 +432,7 @@ mod test { indexer.handle_l1_notification(L1Notification::BatchCommit(batch_2.clone())); let event = indexer.next().await.unwrap().unwrap(); match event { - IndexerEvent::BatchCommitIndexed { batch_info, safe_head } => { + IndexerEvent::BatchCommitIndexed { batch_info, safe_head, .. } => { assert_eq!(batch_info.index, 101); assert_eq!(safe_head, None); } @@ -442,7 +443,7 @@ mod test { indexer.handle_l1_notification(L1Notification::BatchCommit(batch_3.clone())); let event = indexer.next().await.unwrap().unwrap(); match event { - IndexerEvent::BatchCommitIndexed { batch_info, safe_head } => { + IndexerEvent::BatchCommitIndexed { batch_info, safe_head, .. } => { assert_eq!(batch_info.index, 102); assert_eq!(safe_head, None); } @@ -488,7 +489,7 @@ mod test { // Verify the event indicates a batch revert match event { - IndexerEvent::BatchCommitIndexed { batch_info, safe_head } => { + IndexerEvent::BatchCommitIndexed { batch_info, safe_head, .. } => { assert_eq!(batch_info.index, 101); assert_eq!(batch_info.hash, new_batch_2.hash); // Safe head should be the highest block from batch index <= 100 diff --git a/crates/manager/src/manager/mod.rs b/crates/manager/src/manager/mod.rs index 9d3d51ac..f71078ba 100644 --- a/crates/manager/src/manager/mod.rs +++ b/crates/manager/src/manager/mod.rs @@ -237,7 +237,7 @@ where fn handle_indexer_event(&mut self, event: IndexerEvent) { trace!(target: "scroll::node::manager", ?event, "Received indexer event"); match event { - IndexerEvent::BatchCommitIndexed { batch_info, safe_head } => { + IndexerEvent::BatchCommitIndexed { batch_info, safe_head, l1_block_number } => { // if we detected a batch revert event, we reset the pipeline and the engine driver. if let Some(new_safe_head) = safe_head { if let Some(pipeline) = self.derivation_pipeline.as_mut() { @@ -249,7 +249,7 @@ where } // push the batch info into the derivation pipeline. if let Some(pipeline) = &mut self.derivation_pipeline { - pipeline.handle_batch_commit(batch_info); + pipeline.handle_batch_commit(batch_info, l1_block_number); } } IndexerEvent::BatchFinalizationIndexed(_, Some(finalized_block)) => { @@ -284,7 +284,10 @@ where sequencer.handle_reorg(queue_index, l1_block_number); } - // TODO: should clear the derivation pipeline. + // Handle the reorg in the derivation pipeline. + if let Some(pipeline) = self.derivation_pipeline.as_mut() { + pipeline.handle_reorg(l1_block_number); + } } IndexerEvent::L1MessageIndexed(index) => { if let Some(event_sender) = self.event_sender.as_ref() { diff --git a/crates/primitives/src/block.rs b/crates/primitives/src/block.rs index e698a314..2682ae48 100644 --- a/crates/primitives/src/block.rs +++ b/crates/primitives/src/block.rs @@ -1,6 +1,11 @@ use alloy_eips::{BlockNumHash, Decodable2718}; use alloy_primitives::{B256, U256}; use alloy_rpc_types_engine::ExecutionPayload; +use core::{ + future::Future, + pin::Pin, + task::{ready, Context, Poll}, +}; use reth_primitives_traits::transaction::signed::SignedTransaction; use reth_scroll_primitives::{ScrollBlock, ScrollTransactionSigned}; use scroll_alloy_consensus::L1_MESSAGE_TRANSACTION_TYPE; @@ -58,6 +63,32 @@ impl arbitrary::Arbitrary<'_> for BlockInfo { } } +/// A wrapper around a type to which a block number is attached. +#[derive(Debug, Default, Copy, Clone, PartialEq, Eq)] +pub struct WithBlockNumber { + /// The block number. + pub number: u64, + /// The wrapped type. + pub inner: T, +} + +impl WithBlockNumber { + /// Returns a new instance of a [`WithBlockNumber`] wrapper. + pub const fn new(number: u64, inner: T) -> Self { + Self { number, inner } + } +} + +impl Future for WithBlockNumber { + type Output = WithBlockNumber<::Output>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let block_number = self.number; + let inner = ready!(Pin::new(&mut self.get_mut().inner).poll(cx)); + Poll::Ready(WithBlockNumber::new(block_number, inner)) + } +} + /// This struct represents an L2 block with a vector the hashes of the L1 messages included in the /// block. #[derive(Debug, Clone, PartialEq, Eq)] diff --git a/crates/primitives/src/lib.rs b/crates/primitives/src/lib.rs index 0b3407a7..241c14fe 100644 --- a/crates/primitives/src/lib.rs +++ b/crates/primitives/src/lib.rs @@ -8,7 +8,7 @@ mod attributes; pub use attributes::ScrollPayloadAttributesWithBatchInfo; mod block; -pub use block::{BlockInfo, L2BlockInfoWithL1Messages, DEFAULT_BLOCK_DIFFICULTY}; +pub use block::{BlockInfo, L2BlockInfoWithL1Messages, WithBlockNumber, DEFAULT_BLOCK_DIFFICULTY}; mod batch; pub use batch::{BatchCommitData, BatchInfo}; diff --git a/crates/providers/src/test_utils.rs b/crates/providers/src/test_utils.rs index 4c6c51ac..4a0a1dc3 100644 --- a/crates/providers/src/test_utils.rs +++ b/crates/providers/src/test_utils.rs @@ -1,9 +1,13 @@ use crate::{ beacon::{APIResponse, ReducedConfigData, ReducedGenesisData}, - BeaconProvider, + BeaconProvider, L1BlobProvider, L1MessageProvider, L1ProviderError, }; +use std::sync::Arc; +use alloy_eips::eip4844::Blob; +use alloy_primitives::B256; use alloy_rpc_types_beacon::sidecar::BlobData; +use rollup_node_primitives::L1MessageEnvelope; /// Mocks all calls to the beacon chain. #[derive(Debug, Default)] @@ -26,3 +30,41 @@ impl BeaconProvider for MockBeaconProvider { Ok(vec![]) } } + +/// Implementation of the [`crate::L1Provider`] that never returns blobs. +#[derive(Clone, Debug)] +pub struct NoBlobProvider { + /// L1 message provider. + pub l1_messages_provider: P, +} + +#[async_trait::async_trait] +impl L1BlobProvider for NoBlobProvider

{ + async fn blob( + &self, + _block_timestamp: u64, + _hash: B256, + ) -> Result>, L1ProviderError> { + Ok(None) + } +} + +#[async_trait::async_trait] +impl L1MessageProvider for NoBlobProvider

{ + type Error = P::Error; + + async fn get_l1_message_with_block_number( + &self, + ) -> Result, Self::Error> { + self.l1_messages_provider.get_l1_message_with_block_number().await + } + fn set_queue_index_cursor(&self, index: u64) { + self.l1_messages_provider.set_queue_index_cursor(index); + } + async fn set_hash_cursor(&self, hash: B256) { + self.l1_messages_provider.set_hash_cursor(hash).await + } + fn increment_cursor(&self) { + self.l1_messages_provider.increment_cursor() + } +}