diff --git a/crates/flashblocks-rpc/src/lib.rs b/crates/flashblocks-rpc/src/lib.rs index 3df51459..cc7d6797 100644 --- a/crates/flashblocks-rpc/src/lib.rs +++ b/crates/flashblocks-rpc/src/lib.rs @@ -1,5 +1,6 @@ mod metrics; mod pending_blocks; +pub mod pubsub; pub mod rpc; pub mod state; pub mod subscription; diff --git a/crates/flashblocks-rpc/src/pubsub.rs b/crates/flashblocks-rpc/src/pubsub.rs new file mode 100644 index 00000000..37770aea --- /dev/null +++ b/crates/flashblocks-rpc/src/pubsub.rs @@ -0,0 +1,152 @@ +//! `base_` PubSub RPC implementation for flashblocks subscriptions + +use std::sync::Arc; + +use jsonrpsee::{ + core::{async_trait, SubscriptionResult}, + proc_macros::rpc, + server::SubscriptionMessage, + PendingSubscriptionSink, SubscriptionSink, +}; +use op_alloy_network::Optimism; +use reth_rpc_eth_api::RpcBlock; +use serde::{Deserialize, Serialize}; +use tokio_stream::{wrappers::BroadcastStream, Stream, StreamExt}; +use tracing::error; + +use crate::rpc::FlashblocksAPI; + +/// Subscription kind for Base-specific subscriptions +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum BaseSubscriptionKind { + /// New flashblocks subscription. + /// + /// Fires a notification each time a new flashblock is processed, providing the current + /// pending block state. Each flashblock represents an incremental update to the pending + /// block, so multiple notifications may be emitted for the same block height as new + /// flashblocks arrive. + NewFlashblocks, +} + +/// Base pub-sub RPC interface for flashblocks subscriptions. +#[rpc(server, namespace = "base")] +pub trait BasePubSubApi { + /// Create a Base subscription for the given kind + #[subscription( + name = "subscribe" => "subscription", + unsubscribe = "unsubscribe", + item = RpcBlock + )] + async fn subscribe(&self, kind: BaseSubscriptionKind) -> SubscriptionResult; +} + +/// `Base` pubsub RPC implementation. +/// +/// This handles `base_subscribe` RPC calls for flashblocks-specific subscriptions. +#[derive(Clone, Debug)] +pub struct BasePubSub { + /// Flashblocks state for accessing pending blocks stream + flashblocks_state: Arc, +} + +impl BasePubSub { + /// Creates a new instance with the given flashblocks state + pub fn new(flashblocks_state: Arc) -> Self { + Self { flashblocks_state } + } + + /// Returns a stream that yields all new flashblocks as RPC blocks + fn new_flashblocks_stream(&self) -> impl Stream> + where + FB: FlashblocksAPI + Send + Sync + 'static, + { + BroadcastStream::new(self.flashblocks_state.subscribe_to_flashblocks()).filter_map( + |result| { + let pending_blocks = match result { + Ok(blocks) => blocks, + Err(err) => { + error!( + message = "Error in flashblocks stream", + error = %err + ); + return None; + } + }; + Some(pending_blocks.get_latest_block(true)) + }, + ) + } +} + +#[async_trait] +impl BasePubSubApiServer for BasePubSub +where + FB: FlashblocksAPI + Send + Sync + 'static, +{ + /// Handler for `base_subscribe` + async fn subscribe( + &self, + pending: PendingSubscriptionSink, + kind: BaseSubscriptionKind, + ) -> SubscriptionResult { + let sink = pending.accept().await?; + + match kind { + BaseSubscriptionKind::NewFlashblocks => { + let stream = self.new_flashblocks_stream(); + + tokio::spawn(async move { + pipe_from_stream(sink, stream).await; + }); + } + } + + Ok(()) + } +} + +/// Pipes all stream items to the subscription sink. +/// +/// This function runs until the stream ends, the client disconnects, or a serialization error occurs. +/// All exit conditions result in graceful termination. +async fn pipe_from_stream(sink: SubscriptionSink, mut stream: St) +where + St: Stream + Unpin, + T: Serialize, +{ + loop { + tokio::select! { + // dropped by client + _ = sink.closed() => return, + + maybe_item = stream.next() => { + // stream ended + let Some(item) = maybe_item else { + return; + }; + + let msg = match SubscriptionMessage::new( + sink.method_name(), + sink.subscription_id(), + &item + ) { + Ok(msg) => msg, + Err(err) => { + error!( + target: "flashblocks_rpc::pubsub", + %err, + "Failed to serialize subscription message" + ); + return; + } + }; + + // if it fails, client disconnected + if sink.send(msg).await.is_err() { + return; + } + } + } + } +} diff --git a/crates/flashblocks-rpc/src/tests/rpc.rs b/crates/flashblocks-rpc/src/tests/rpc.rs index 0ed37d4f..cb5015ec 100644 --- a/crates/flashblocks-rpc/src/tests/rpc.rs +++ b/crates/flashblocks-rpc/src/tests/rpc.rs @@ -1,5 +1,6 @@ #[cfg(test)] mod tests { + use crate::pubsub::{BasePubSub, BasePubSubApiServer}; use crate::rpc::{EthApiExt, EthApiOverrideServer}; use crate::state::FlashblocksState; use crate::subscription::{Flashblock, FlashblocksReceiver, Metadata}; @@ -38,9 +39,15 @@ mod tests { use std::sync::Arc; use tokio::sync::{mpsc, oneshot}; + // ws + use futures_util::{SinkExt, StreamExt}; + use serde_json::json; + use tokio_tungstenite::{connect_async, tungstenite::Message}; + pub struct NodeContext { sender: mpsc::Sender<(Flashblock, oneshot::Sender<()>)>, http_api_addr: SocketAddr, + ws_api_addr: SocketAddr, _node_exit_future: NodeExitFuture, _node: Box, _task_manager: TaskManager, @@ -85,6 +92,10 @@ mod tests { Ok(receipt) } + + pub fn ws_url(&self) -> String { + format!("ws://{}", self.ws_api_addr) + } } async fn setup_node() -> eyre::Result { @@ -112,7 +123,12 @@ mod tests { // Use with_unused_ports() to let Reth allocate random ports and avoid port collisions let node_config = NodeConfig::new(chain_spec.clone()) .with_network(network_config.clone()) - .with_rpc(RpcServerArgs::default().with_unused_ports().with_http()) + .with_rpc( + RpcServerArgs::default() + .with_unused_ports() + .with_http() + .with_ws(), + ) .with_unused_ports(); let node = OpNode::new(RollupArgs::default()); @@ -142,6 +158,10 @@ mod tests { ctx.modules.replace_configured(api_ext.into_rpc())?; + // Register base_subscribe subscription endpoint + let base_pubsub = BasePubSub::new(flashblocks_state.clone()); + ctx.modules.merge_configured(base_pubsub.into_rpc())?; + tokio::spawn(async move { while let Some((payload, tx)) = receiver.recv().await { flashblocks_state.on_flashblock_received(payload); @@ -159,9 +179,15 @@ mod tests { .http_local_addr() .ok_or_else(|| eyre::eyre!("Failed to get http api address"))?; + let ws_api_addr = node + .rpc_server_handle() + .ws_local_addr() + .ok_or_else(|| eyre::eyre!("Failed to get websocket api address"))?; + Ok(NodeContext { sender, http_api_addr, + ws_api_addr, _node_exit_future: node_exit_future, _node: Box::new(node), _task_manager: tasks, @@ -881,4 +907,180 @@ mod tests { Ok(()) } + + // base_ methods + #[tokio::test] + async fn test_base_subscribe_new_flashblocks() -> eyre::Result<()> { + reth_tracing::init_test_tracing(); + let node = setup_node().await?; + let ws_url = node.ws_url(); + let (mut ws_stream, _) = connect_async(&ws_url).await?; + + ws_stream + .send(Message::Text( + json!({ + "jsonrpc": "2.0", + "id": 1, + "method": "base_subscribe", + "params": ["newFlashblocks"] + }) + .to_string() + .into(), + )) + .await?; + + let response = ws_stream.next().await.unwrap()?; + let sub: serde_json::Value = serde_json::from_str(response.to_text()?)?; + assert_eq!(sub["jsonrpc"], "2.0"); + assert_eq!(sub["id"], 1); + let subscription_id = sub["result"].as_str().expect("subscription id expected"); + + node.send_payload(create_first_payload()).await?; + + let notification = ws_stream.next().await.unwrap()?; + let notif: serde_json::Value = serde_json::from_str(notification.to_text()?)?; + assert_eq!(notif["method"], "base_subscription"); + assert_eq!(notif["params"]["subscription"], subscription_id); + + let block = ¬if["params"]["result"]; + assert_eq!(block["number"], "0x1"); + assert!(block["hash"].is_string()); + assert!(block["parentHash"].is_string()); + assert!(block["transactions"].is_array()); + assert_eq!(block["transactions"].as_array().unwrap().len(), 1); + + Ok(()) + } + + #[tokio::test] + async fn test_base_subscribe_multiple_flashblocks() -> eyre::Result<()> { + reth_tracing::init_test_tracing(); + let node = setup_node().await?; + let ws_url = node.ws_url(); + let (mut ws_stream, _) = connect_async(&ws_url).await?; + + ws_stream + .send(Message::Text( + json!({ + "jsonrpc": "2.0", + "id": 1, + "method": "base_subscribe", + "params": ["newFlashblocks"] + }) + .to_string() + .into(), + )) + .await?; + + let response = ws_stream.next().await.unwrap()?; + let sub: serde_json::Value = serde_json::from_str(response.to_text()?)?; + let subscription_id = sub["result"].as_str().expect("subscription id expected"); + + node.send_payload(create_first_payload()).await?; + + let notif1 = ws_stream.next().await.unwrap()?; + let notif1: serde_json::Value = serde_json::from_str(notif1.to_text()?)?; + assert_eq!(notif1["params"]["subscription"], subscription_id); + + let block1 = ¬if1["params"]["result"]; + assert_eq!(block1["number"], "0x1"); + assert_eq!(block1["transactions"].as_array().unwrap().len(), 1); + + node.send_payload(create_second_payload()).await?; + + let notif2 = ws_stream.next().await.unwrap()?; + let notif2: serde_json::Value = serde_json::from_str(notif2.to_text()?)?; + assert_eq!(notif2["params"]["subscription"], subscription_id); + + let block2 = ¬if2["params"]["result"]; + assert_eq!(block1["number"], block2["number"]); // Same block, incremental updates + assert_eq!(block2["transactions"].as_array().unwrap().len(), 5); + + Ok(()) + } + + #[tokio::test] + async fn test_base_unsubscribe() -> eyre::Result<()> { + reth_tracing::init_test_tracing(); + let node = setup_node().await?; + let ws_url = node.ws_url(); + let (mut ws_stream, _) = connect_async(&ws_url).await?; + + ws_stream + .send(Message::Text( + json!({ + "jsonrpc": "2.0", + "id": 1, + "method": "base_subscribe", + "params": ["newFlashblocks"] + }) + .to_string() + .into(), + )) + .await?; + + let response = ws_stream.next().await.unwrap()?; + let sub: serde_json::Value = serde_json::from_str(response.to_text()?)?; + let subscription_id = sub["result"].as_str().expect("subscription id expected"); + + ws_stream + .send(Message::Text( + json!({ + "jsonrpc": "2.0", + "id": 2, + "method": "base_unsubscribe", + "params": [subscription_id] + }) + .to_string() + .into(), + )) + .await?; + + let unsub = ws_stream.next().await.unwrap()?; + let unsub: serde_json::Value = serde_json::from_str(unsub.to_text()?)?; + assert_eq!(unsub["jsonrpc"], "2.0"); + assert_eq!(unsub["id"], 2); + assert_eq!(unsub["result"], true); + + Ok(()) + } + + #[tokio::test] + async fn test_base_subscribe_multiple_clients() -> eyre::Result<()> { + reth_tracing::init_test_tracing(); + let node = setup_node().await?; + let ws_url = node.ws_url(); + let (mut ws1, _) = connect_async(&ws_url).await?; + let (mut ws2, _) = connect_async(&ws_url).await?; + + let req = json!({ + "jsonrpc": "2.0", + "id": 1, + "method": "base_subscribe", + "params": ["newFlashblocks"] + }); + ws1.send(Message::Text(req.to_string().into())).await?; + ws2.send(Message::Text(req.to_string().into())).await?; + + let _sub1 = ws1.next().await.unwrap()?; + let _sub2 = ws2.next().await.unwrap()?; + + node.send_payload(create_first_payload()).await?; + + let notif1 = ws1.next().await.unwrap()?; + let notif1: serde_json::Value = serde_json::from_str(notif1.to_text()?)?; + let notif2 = ws2.next().await.unwrap()?; + let notif2: serde_json::Value = serde_json::from_str(notif2.to_text()?)?; + + assert_eq!(notif1["method"], "base_subscription"); + assert_eq!(notif2["method"], "base_subscription"); + + let block1 = ¬if1["params"]["result"]; + let block2 = ¬if2["params"]["result"]; + assert_eq!(block1["number"], "0x1"); + assert_eq!(block1["number"], block2["number"]); + assert_eq!(block1["hash"], block2["hash"]); + + Ok(()) + } } diff --git a/crates/node/src/main.rs b/crates/node/src/main.rs index 5535d251..c7e9e1f3 100644 --- a/crates/node/src/main.rs +++ b/crates/node/src/main.rs @@ -1,3 +1,4 @@ +use base_reth_flashblocks_rpc::pubsub::{BasePubSub, BasePubSubApiServer}; use base_reth_flashblocks_rpc::rpc::EthApiExt; use futures_util::TryStreamExt; use once_cell::sync::OnceCell; @@ -152,10 +153,14 @@ fn main() { let api_ext = EthApiExt::new( ctx.registry.eth_api().clone(), ctx.registry.eth_handlers().filter.clone(), - fb, + fb.clone(), ); ctx.modules.replace_configured(api_ext.into_rpc())?; + + // register the base_subscribe subscription endpoint + let base_pubsub = BasePubSub::new(fb); + ctx.modules.merge_configured(base_pubsub.into_rpc())?; } else { info!(message = "flashblocks integration is disabled"); }