diff --git a/crates/chain-orchestrator/src/lib.rs b/crates/chain-orchestrator/src/lib.rs index 442431b0..576ba7d4 100644 --- a/crates/chain-orchestrator/src/lib.rs +++ b/crates/chain-orchestrator/src/lib.rs @@ -34,13 +34,7 @@ use scroll_engine::Engine; use scroll_network::{ BlockImportOutcome, NewBlockWithPeer, ScrollNetwork, ScrollNetworkManagerEvent, }; -use std::{ - collections::{HashMap, VecDeque}, - sync::Arc, - time::Instant, - vec, -}; -use strum::IntoEnumIterator; +use std::{collections::VecDeque, sync::Arc, time::Instant, vec}; use tokio::sync::mpsc::{self, Receiver, UnboundedReceiver}; mod config; @@ -50,7 +44,7 @@ mod consensus; pub use consensus::{Consensus, NoopConsensus, SystemContractConsensus}; mod consolidation; -use consolidation::reconcile_batch; +use consolidation::{reconcile_batch, BlockConsolidationAction}; mod event; pub use event::ChainOrchestratorEvent; @@ -62,7 +56,7 @@ mod handle; pub use handle::{ChainOrchestratorCommand, ChainOrchestratorHandle, DatabaseQuery}; mod metrics; -pub use metrics::{ChainOrchestratorItem, ChainOrchestratorMetrics}; +use metrics::{MetricsHandler, Task}; mod sync; pub use sync::{SyncMode, SyncState}; @@ -70,7 +64,18 @@ pub use sync::{SyncMode, SyncState}; mod status; pub use status::ChainOrchestratorStatus; -use crate::consolidation::BlockConsolidationAction; +/// Wraps a future, metering the completion of it. +macro_rules! metered { + ($task:expr, $self:ident, $method:ident($($args:expr),*)) => { + { + let metric = $self.metric_handler.get($task).expect("metric exists").clone(); + let now = Instant::now(); + let res =$self.$method($($args),*).await; + metric.task_duration.record(now.elapsed().as_secs_f64()); + res + } + }; +} /// The mask used to mask the L1 message queue hash. const L1_MESSAGE_QUEUE_HASH_MASK: B256 = @@ -102,8 +107,6 @@ pub struct ChainOrchestrator< l2_client: Arc, /// The reference to database. database: Arc, - /// The metrics for the chain orchestrator. - metrics: HashMap, /// The current sync state of the [`ChainOrchestrator`]. sync_state: SyncState, /// A receiver for [`L1Notification`]s from the [`rollup_node_watcher::L1Watcher`]. @@ -122,6 +125,8 @@ pub struct ChainOrchestrator< derivation_pipeline: DerivationPipeline, /// Optional event sender for broadcasting events to listeners. event_sender: Option>, + /// The metrics handler. + metric_handler: MetricsHandler, } impl< @@ -155,12 +160,6 @@ impl< l2_client: Arc::new(l2_provider), database, config, - metrics: ChainOrchestratorItem::iter() - .map(|i| { - let label = i.as_str(); - (i, ChainOrchestratorMetrics::new_with_labels(&[("item", label)])) - }) - .collect(), sync_state: SyncState::default(), l1_notification_rx, network, @@ -171,6 +170,7 @@ impl< derivation_pipeline, handle_rx, event_sender: None, + metric_handler: MetricsHandler::default(), }, handle, )) @@ -211,7 +211,7 @@ impl< self.handle_outcome(res); } Some(batch) = self.derivation_pipeline.next() => { - let res = self.handle_derived_batch(batch).await; + let res = metered!(Task::BatchReconciliation, self, handle_derived_batch(batch)); self.handle_outcome(res); } Some(event) = self.network.events().next() => { @@ -266,7 +266,7 @@ impl< /// Handles an event from the sequencer. async fn handle_sequencer_event( &mut self, - event: rollup_node_sequencer::SequencerEvent, + event: SequencerEvent, ) -> Result, ChainOrchestratorError> { tracing::info!(target: "scroll::chain_orchestrator", ?event, "Handling sequencer event"); match event { @@ -277,6 +277,7 @@ impl< .map(|s| &s.address) .expect("signer must be set if sequencer is present"), ) { + self.metric_handler.start_block_building_recording(); self.sequencer .as_mut() .expect("sequencer must be present") @@ -300,6 +301,7 @@ impl< .as_mut() .expect("signer must be present") .sign_block(block.clone())?; + self.metric_handler.finish_block_building_recording(); return Ok(Some(ChainOrchestratorEvent::BlockSequenced(block))); } } @@ -508,30 +510,38 @@ impl< self.database.set_processed_l1_block_number(block_number).await?; Ok(None) } - L1Notification::Reorg(block_number) => self.handle_l1_reorg(*block_number).await, + L1Notification::Reorg(block_number) => { + metered!(Task::L1Reorg, self, handle_l1_reorg(*block_number)) + } L1Notification::Consensus(update) => { self.consensus.update_config(update); Ok(None) } L1Notification::NewBlock(block_number) => self.handle_l1_new_block(*block_number).await, L1Notification::Finalized(block_number) => { - self.handle_l1_finalized(*block_number).await + metered!(Task::L1Finalization, self, handle_l1_finalized(*block_number)) + } + L1Notification::BatchCommit(batch) => { + metered!(Task::BatchCommit, self, handle_batch_commit(batch.clone())) } - L1Notification::BatchCommit(batch) => self.handle_batch_commit(batch.clone()).await, L1Notification::L1Message { message, block_number, block_timestamp: _ } => { - self.handle_l1_message(message.clone(), *block_number).await + metered!(Task::L1Message, self, handle_l1_message(message.clone(), *block_number)) } L1Notification::Synced => { tracing::info!(target: "scroll::chain_orchestrator", "L1 is now synced"); self.sync_state.l1_mut().set_synced(); if self.sync_state.is_synced() { - self.consolidate_chain().await?; + metered!(Task::ChainConsolidation, self, consolidate_chain())?; } self.notify(ChainOrchestratorEvent::L1Synced); Ok(None) } L1Notification::BatchFinalization { hash: _hash, index, block_number } => { - self.handle_l1_batch_finalization(*index, *block_number).await + metered!( + Task::BatchFinalization, + self, + handle_batch_finalization(*index, *block_number) + ) } } } @@ -550,8 +560,6 @@ impl< &mut self, block_number: u64, ) -> Result, ChainOrchestratorError> { - let metric = self.metrics.get(&ChainOrchestratorItem::L1Reorg).expect("metric exists"); - let now = Instant::now(); let genesis_hash = self.config.chain_spec().genesis_hash(); let UnwindResult { l1_block_number, queue_index, l2_head_block_number, l2_safe_block_info } = self.database.unwind(genesis_hash, block_number).await?; @@ -594,8 +602,6 @@ impl< self.engine.update_fcs(l2_head_block_info, l2_safe_block_info, None).await?; } - metric.task_duration.record(now.elapsed().as_secs_f64()); - let event = ChainOrchestratorEvent::L1Reorg { l1_block_number, queue_index, @@ -612,10 +618,6 @@ impl< &mut self, block_number: u64, ) -> Result, ChainOrchestratorError> { - let metric = - self.metrics.get(&ChainOrchestratorItem::L1Finalization).expect("metric exists"); - let now = Instant::now(); - let finalized_batches = self .database .tx_mut(move |tx| async move { @@ -632,8 +634,6 @@ impl< self.derivation_pipeline.push_batch(Arc::new(*batch)).await; } - metric.task_duration.record(now.elapsed().as_secs_f64()); - Ok(Some(ChainOrchestratorEvent::L1BlockFinalized(block_number, finalized_batches))) } @@ -642,9 +642,6 @@ impl< &self, batch: BatchCommitData, ) -> Result, ChainOrchestratorError> { - let metric = self.metrics.get(&ChainOrchestratorItem::BatchCommit).expect("metric exists"); - let now = Instant::now(); - let event = self .database .tx_mut(move |tx| { @@ -682,13 +679,11 @@ impl< }) .await?; - metric.task_duration.record(now.elapsed().as_secs_f64()); - Ok(event) } /// Handles a batch finalization event by updating the batch input in the database. - async fn handle_l1_batch_finalization( + async fn handle_batch_finalization( &mut self, batch_index: u64, block_number: u64, @@ -732,9 +727,6 @@ impl< l1_message: TxL1Message, l1_block_number: u64, ) -> Result, ChainOrchestratorError> { - let metric = self.metrics.get(&ChainOrchestratorItem::L1Message).expect("metric exists"); - let now = Instant::now(); - let event = ChainOrchestratorEvent::L1MessageCommitted(l1_message.queue_index); let queue_hash = compute_l1_message_queue_hash( &self.database, @@ -770,27 +762,9 @@ impl< }) .await?; - metric.task_duration.record(now.elapsed().as_secs_f64()); - Ok(Some(event)) } - // /// Wraps a pending chain orchestrator future, metering the completion of it. - // pub fn handle_metered( - // &mut self, - // item: ChainOrchestratorItem, - // chain_orchestrator_fut: PendingChainOrchestratorFuture, - // ) -> PendingChainOrchestratorFuture { - // let metric = self.metrics.get(&item).expect("metric exists").clone(); - // let fut_wrapper = Box::pin(async move { - // let now = Instant::now(); - // let res = chain_orchestrator_fut.await; - // metric.task_duration.record(now.elapsed().as_secs_f64()); - // res - // }); - // fut_wrapper - // } - async fn handle_network_event( &mut self, event: ScrollNetworkManagerEvent, @@ -798,7 +772,7 @@ impl< match event { ScrollNetworkManagerEvent::NewBlock(block_with_peer) => { self.notify(ChainOrchestratorEvent::NewBlockReceived(block_with_peer.clone())); - Ok(self.handle_block_from_peer(block_with_peer).await?) + metered!(Task::L2BlockImport, self, handle_block_from_peer(block_with_peer)) } } } diff --git a/crates/chain-orchestrator/src/metrics.rs b/crates/chain-orchestrator/src/metrics.rs index c71b4841..1e5da569 100644 --- a/crates/chain-orchestrator/src/metrics.rs +++ b/crates/chain-orchestrator/src/metrics.rs @@ -1,48 +1,110 @@ use metrics::Histogram; use metrics_derive::Metrics; -use strum::EnumIter; +use std::{collections::HashMap, time::Instant}; +use strum::{EnumIter, IntoEnumIterator}; -/// An enum representing the items the chain orchestrator can handle. +/// The metric handler for the chain orchestrator. Tracks execution duration of various tasks. +#[derive(Debug)] +pub(crate) struct MetricsHandler { + /// The chain orchestrator metrics. + chain_orchestrator_tasks_metrics: HashMap, + /// The inflight block building meter. + block_building_meter: BlockBuildingMeter, +} + +impl MetricsHandler { + /// Returns the [`ChainOrchestratorMetrics`] for the provided task. + pub(crate) fn get(&self, task: Task) -> Option<&ChainOrchestratorMetrics> { + self.chain_orchestrator_tasks_metrics.get(&task) + } + + /// Starts tracking a new block building task. + pub(crate) fn start_block_building_recording(&mut self) { + if self.block_building_meter.start.is_some() { + tracing::warn!(target: "scroll::chain_orchestrator", "block building recording is already ongoing, overwriting"); + } + self.block_building_meter.start = Some(Instant::now()); + } + + /// The duration of the current block building task if any. + pub(crate) fn finish_block_building_recording(&mut self) { + let duration = self.block_building_meter.start.take().map(|start| start.elapsed()); + if let Some(duration) = duration { + self.block_building_meter.metric.block_building_duration.record(duration.as_secs_f64()); + } + } +} + +impl Default for MetricsHandler { + fn default() -> Self { + Self { + chain_orchestrator_tasks_metrics: Task::iter() + .map(|i| { + let label = i.as_str(); + (i, ChainOrchestratorMetrics::new_with_labels(&[("task", label)])) + }) + .collect(), + block_building_meter: BlockBuildingMeter::default(), + } + } +} + +/// An enum representing the chain orchestrator tasks. #[derive(Debug, PartialEq, Eq, Hash, EnumIter)] -pub enum ChainOrchestratorItem { - /// Handle a block received from the network. - NewBlock, - /// Insert consolidated L2 blocks into the database. - InsertConsolidatedL2Blocks, - /// L2 block. - InsertL2Block, - /// L1 reorg. +pub(crate) enum Task { + /// Batch reconciliation with the unsafe L2 chain. + BatchReconciliation, + /// Import of an L2 block received over p2p. + L2BlockImport, + /// Consolidation of the L2 ledger by validating unsafe blocks. + ChainConsolidation, + /// L1 reorg handling. L1Reorg, - /// L1 finalization. + /// L1 finalization handling. L1Finalization, - /// L1 message. + /// L1 message handling. L1Message, - /// Batch commit. + /// Batch commit event handling. BatchCommit, - /// Batch finalization. + /// Batch finalization event handling. BatchFinalization, } -impl ChainOrchestratorItem { +impl Task { /// Returns the str representation of the [`ChainOrchestratorItem`]. - pub const fn as_str(&self) -> &'static str { + pub(crate) const fn as_str(&self) -> &'static str { match self { - Self::NewBlock => "new_block", - Self::InsertConsolidatedL2Blocks => "insert_consolidated_l2_blocks", - Self::InsertL2Block => "l2_block", Self::L1Reorg => "l1_reorg", Self::L1Finalization => "l1_finalization", Self::L1Message => "l1_message", Self::BatchCommit => "batch_commit", Self::BatchFinalization => "batch_finalization", + Self::BatchReconciliation => "batch_reconciliation", + Self::ChainConsolidation => "chain_consolidation", + Self::L2BlockImport => "l2_block_import", } } } /// The metrics for the [`super::ChainOrchestrator`]. #[derive(Metrics, Clone)] -#[metrics(scope = "indexer")] -pub struct ChainOrchestratorMetrics { +#[metrics(scope = "chain_orchestrator")] +pub(crate) struct ChainOrchestratorMetrics { /// The duration of the task for the chain orchestrator. pub task_duration: Histogram, } + +/// A block building meter. +#[derive(Debug, Default)] +pub(crate) struct BlockBuildingMeter { + metric: BlockBuildingMetric, + start: Option, +} + +/// Block building related metric. +#[derive(Metrics, Clone)] +#[metrics(scope = "chain_orchestrator")] +pub(crate) struct BlockBuildingMetric { + /// The duration of the block building task. + block_building_duration: Histogram, +} diff --git a/crates/derivation-pipeline/src/lib.rs b/crates/derivation-pipeline/src/lib.rs index 2bd4c8d8..461740b6 100644 --- a/crates/derivation-pipeline/src/lib.rs +++ b/crates/derivation-pipeline/src/lib.rs @@ -82,7 +82,7 @@ impl Stream for DerivationPipeline { fn poll_next( self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { + ) -> Poll> { let this = self.get_mut(); match Pin::new(&mut this.result_receiver).poll_recv(cx) { Poll::Ready(Some(result)) => { diff --git a/crates/sequencer/src/lib.rs b/crates/sequencer/src/lib.rs index 6376aefb..286bdca0 100644 --- a/crates/sequencer/src/lib.rs +++ b/crates/sequencer/src/lib.rs @@ -101,6 +101,7 @@ where engine: &mut Engine, ) -> Result<(), SequencerError> { tracing::info!(target: "rollup_node::sequencer", "New payload attributes request received."); + let now = Instant::now(); let timestamp = SystemTime::now() .duration_since(UNIX_EPOCH) @@ -114,7 +115,6 @@ where withdrawals: None, }; - let now = Instant::now(); let mut l1_messages = vec![]; let mut cumulative_gas_used = 0; @@ -212,8 +212,8 @@ pub struct PayloadBuildingJob { future: PayloadBuildingJobFuture, } -impl std::fmt::Debug for PayloadBuildingJob { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +impl fmt::Debug for PayloadBuildingJob { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("PayloadBuildingJob") .field("l1_origin", &self.l1_origin) .field("future", &"PayloadBuildingJobFuture") @@ -265,8 +265,8 @@ impl Stream for Sequencer { } } -impl fmt::Debug for Sequencer { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +impl fmt::Debug for Sequencer { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Sequencer") .field("provider", &"SequencerMessageProvider") .field("config", &self.config)