Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 51 additions & 12 deletions src/cli.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
use std::{fmt::Display, time::Instant};

use anyhow::Result;
use clap::{Parser, Subcommand};
use clap::{Parser, Subcommand, ValueEnum};
use indicatif::{ProgressBar, ProgressStyle};
use revm::InMemoryDB;
use tokio::sync::mpsc;

use crate::{
evm_map::erc20_contract_to_system_address,
fs::{download_blocks, read_abci_state, read_blocks, read_evm_state},
run::{run_blocks, MAINNET_CHAIN_ID},
run::run_blocks,
state::State,
types::PreprocessedBlock,
};
Expand All @@ -17,6 +19,7 @@ use anyhow::anyhow;
const CHUNK_SIZE: u64 = 1000;
// only store this many blocks in memory
const READ_LIMIT: u64 = 100000;
const TESTNET_BLOCK_THRESHOLD: u64 = 26800000;

#[derive(Parser)]
#[command(name = "hyper-evm-sync")]
Expand All @@ -25,9 +28,30 @@ pub struct Cli {
commands: Commands,
}

#[derive(Debug, Clone, Copy, ValueEnum)]
pub enum Chain {
Mainnet,
Testnet,
}

impl Display for Chain {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}",
match self {
Chain::Mainnet => "Mainnet",
Chain::Testnet => "Testnet",
}
)
}
}

#[derive(Subcommand)]
enum Commands {
DownloadBlocks {
#[arg(long)]
chain: Chain,
#[arg(short, long)]
dir: String,
#[arg(short, long, default_value_t = 1)]
Expand All @@ -36,6 +60,8 @@ enum Commands {
end_block: u64,
},
SyncFromState {
#[arg(long)]
chain: Chain,
#[arg(long)]
is_abci: bool,
#[arg(short, long)]
Expand All @@ -60,12 +86,12 @@ enum Commands {
impl Cli {
pub async fn execute(self) -> Result<()> {
match self.commands {
Commands::DownloadBlocks { start_block, end_block, dir } => {
download_blocks(&dir, start_block, end_block).await?;
println!("Downloaded {start_block} -> {end_block}.");
Commands::DownloadBlocks { chain, start_block, end_block, dir } => {
download_blocks(chain, &dir, start_block, end_block).await?;
println!("Downloaded {start_block} -> {end_block} from {chain}.");
}
Commands::SyncFromState { fln, is_abci, snapshot_dir, chunk_size, blocks_dir, end_block } => {
run_from_state(blocks_dir, fln, is_abci, snapshot_dir, chunk_size, end_block).await?
Commands::SyncFromState { chain, fln, is_abci, snapshot_dir, chunk_size, blocks_dir, end_block } => {
run_from_state(chain, blocks_dir, fln, is_abci, snapshot_dir, chunk_size, end_block).await?
}
Commands::NextBlockNumber { abci_state_fln, evm_state_fln } => {
if let Some(fln) = abci_state_fln {
Expand All @@ -82,24 +108,34 @@ impl Cli {
}

async fn run_from_state(
chain: Chain,
blocks_dir: String,
state_fln: Option<String>,
is_abci: bool,
snapshot_dir: Option<String>,
chunk_size: u64,
end_block: u64,
) -> Result<()> {
let erc20_contract_to_system_address = erc20_contract_to_system_address(MAINNET_CHAIN_ID).await?;
let erc20_contract_to_system_address = erc20_contract_to_system_address(chain).await?;
let (start_block, mut state) = if let Some(state_fln) = state_fln {
if is_abci {
read_abci_state(state_fln)?
} else {
read_evm_state(state_fln)?
}
} else {
if let Chain::Testnet = chain {
return Err(anyhow!("Testnet must start from a snapshot"));
}
(1, InMemoryDB::genesis())
};
println!("{start_block} -> {end_block}");
if let Chain::Testnet = chain {
if start_block < TESTNET_BLOCK_THRESHOLD {
return Err(anyhow!("Testnet must be run after {TESTNET_BLOCK_THRESHOLD}"));
}
}

println!("{start_block} -> {end_block} on {chain}");
let pb = ProgressBar::new(end_block - start_block + 1);
pb.set_style(
ProgressStyle::default_bar()
Expand All @@ -110,10 +146,13 @@ async fn run_from_state(
let (tx, mut rx) = mpsc::channel::<Vec<(u64, Vec<PreprocessedBlock>)>>(1);

let processor = tokio::spawn(async move {
let start = Instant::now();
let hash = state.blake3_hash_slow();
println!("Computed state hash after block={start_block}: {hash:?} in {:?}", start.elapsed());
while let Some(blocks) = rx.recv().await {
run_blocks(
Some(pb.clone()),
MAINNET_CHAIN_ID,
chain,
&mut state,
blocks,
&erc20_contract_to_system_address,
Expand All @@ -135,10 +174,10 @@ async fn run_from_state(

let (processor_res, reader_res) = tokio::join!(processor, reader);
if let Err(e) = processor_res {
eprintln!("Processor failed: {}", e);
eprintln!("Processor failed: {e}");
}
if let Err(e) = reader_res {
eprintln!("Reader failed: {}", e);
eprintln!("Reader failed: {e}");
}
Ok(())
}
23 changes: 13 additions & 10 deletions src/evm_map.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::run::{MAINNET_CHAIN_ID, TESTNET_CHAIN_ID};
use crate::cli::Chain;
use alloy::primitives::Address;
use anyhow::{Error, Result};
use anyhow::Result;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;

Expand All @@ -21,19 +21,22 @@ struct SpotMeta {
tokens: Vec<SpotToken>,
}

async fn fetch_spot_meta(chain_id: u64) -> Result<SpotMeta> {
let url = match chain_id {
MAINNET_CHAIN_ID => "https://api.hyperliquid.xyz/info",
TESTNET_CHAIN_ID => "https://api.hyperliquid-testnet.xyz/info",
_ => return Err(Error::msg("unknown chain id")),
};
fn info_url(chain: Chain) -> &'static str {
match chain {
Chain::Mainnet => "https://api.hyperliquid.xyz/info",
Chain::Testnet => "https://api.hyperliquid-testnet.xyz/info",
}
}

async fn fetch_spot_meta(chain: Chain) -> Result<SpotMeta> {
let url = info_url(chain);
let client = reqwest::Client::new();
let response = client.post(url).json(&serde_json::json!({"type": "spotMeta"})).send().await?;
Ok(response.json().await?)
}

pub async fn erc20_contract_to_system_address(chain_id: u64) -> Result<BTreeMap<Address, Address>> {
let meta = fetch_spot_meta(chain_id).await?;
pub async fn erc20_contract_to_system_address(chain: Chain) -> Result<BTreeMap<Address, Address>> {
let meta = fetch_spot_meta(chain).await?;
let mut map = BTreeMap::new();
for token in &meta.tokens {
if let Some(evm_contract) = &token.evm_contract {
Expand Down
35 changes: 22 additions & 13 deletions src/fs.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use crate::types::{AbciState, BlockAndReceipts, EvmBlock, EvmState, PreprocessedBlock};
use crate::{
cli::Chain,
types::{AbciState, BlockAndReceipts, EvmBlock, EvmState, PreprocessedBlock},
};
use anyhow::Result;
use aws_config::{meta::region::RegionProviderChain, BehaviorVersion};
use aws_config::{BehaviorVersion, Region};
use aws_sdk_s3::{types::RequestPayer, Client};
use futures::{stream, StreamExt, TryStreamExt};
use indicatif::{ProgressBar, ProgressStyle};
Expand All @@ -17,7 +20,7 @@ use std::{
};

const DOWNLOAD_CHUNK_SIZE: u64 = 10000;
const CONCURRENCY_LIMIT: usize = 500;
const CONCURRENCY_LIMIT: usize = 1000;

fn decompress(data: &[u8]) -> Result<Vec<u8>, lz4_flex::frame::Error> {
let mut decoder = lz4_flex::frame::FrameDecoder::new(data);
Expand All @@ -26,6 +29,13 @@ fn decompress(data: &[u8]) -> Result<Vec<u8>, lz4_flex::frame::Error> {
Ok(decompressed)
}

fn bucket(chain: Chain) -> &'static str {
match chain {
Chain::Mainnet => "hl-mainnet-evm-blocks",
Chain::Testnet => "hl-testnet-evm-blocks",
}
}

fn read_block_and_receipts(file_path: &Path) -> Result<BlockAndReceipts> {
let mut file = File::open(file_path)?;
let mut buffer = Vec::new();
Expand Down Expand Up @@ -66,7 +76,6 @@ pub fn read_blocks(dir: &str, start_block: u64, end_block: u64, chunk_size: u64)
PreprocessedBlock { block_num, block_and_receipts, signers }
})
.collect();
// let blocks = stream::iter(futures).buffered(CONCURRENCY_LIMIT).collect().await;
println!("Deserialized blocks {}-{} in {:?}", start_block, end_block, start.elapsed());
all_blocks.push((chunk, blocks));
}
Expand Down Expand Up @@ -113,11 +122,6 @@ fn block_key(block_num: u64) -> String {
async fn fetch_block(block_num: u64, dir: PathBuf, s3: Arc<Client>, pb: ProgressBar, bucket: &str) -> Result<()> {
let key = block_key(block_num);
let local_path: PathBuf = dir.join(&key);

if let Some(parent) = local_path.parent() {
create_dir_all(parent)?;
}

if local_path.is_file() {
pb.inc(1);
return Ok(());
Expand All @@ -126,26 +130,30 @@ async fn fetch_block(block_num: u64, dir: PathBuf, s3: Arc<Client>, pb: Progress
let obj = s3.get_object().bucket(bucket).key(key).request_payer(RequestPayer::Requester).send().await?;

let mut body = obj.body.into_async_read();
if let Some(parent) = local_path.parent() {
create_dir_all(parent)?;
}
let mut file = tokio::fs::File::create(&local_path).await?;
tokio::io::copy(&mut body, &mut file).await?;

pb.inc(1);
Ok(())
}

pub async fn download_blocks(dir: &str, start_block: u64, end_block: u64) -> Result<()> {
pub async fn download_blocks(chain: Chain, dir: &str, start_block: u64, end_block: u64) -> Result<()> {
let pb = ProgressBar::new(end_block - start_block + 1);
pb.set_style(
ProgressStyle::default_bar()
.template("[{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} ({eta})")
.unwrap()
.progress_chars("##-"),
);
let region = RegionProviderChain::default_provider().or_else("us-east-1");
let region = Region::new("ap-northeast-1".to_string());
let config = aws_config::defaults(BehaviorVersion::latest()).region(region).load().await;
let s3 = Arc::new(Client::new(&config));

let bucket = "hl-mainnet-evm-blocks";
let bucket = bucket(chain);

let mut cur_block = start_block;
while cur_block <= end_block {
let next_block = (end_block + 1).min(cur_block + DOWNLOAD_CHUNK_SIZE);
Expand All @@ -165,6 +173,7 @@ pub async fn download_blocks(dir: &str, start_block: u64, end_block: u64) -> Res
#[cfg(test)]
mod tests {
use crate::{
cli::Chain,
fs::{download_blocks, read_abci_state, read_evm_state, snapshot_evm_state},
state::State,
};
Expand All @@ -174,7 +183,7 @@ mod tests {
#[tokio::test]
async fn test_block_download() -> Result<()> {
let time = Instant::now();
download_blocks("hl-mainnet-evm-blocks", 4000000, 4001000).await?;
download_blocks(Chain::Mainnet, "hl-mainnet-evm-blocks", 4000000, 4001000).await?;
println!("downloaded in {:?}", time.elapsed());
Ok(())
}
Expand Down
Loading