Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
6f72dad
Separate in fkv generator, triedb, trie openers. Pending: apply updates
Arkenan Oct 27, 2025
e982dac
merge with main
Arkenan Oct 28, 2025
f4a30bb
revert some changes to main, move some to apply_trie_updates
Arkenan Oct 28, 2025
89a7b41
fix warning. Separate node writes in accounts and storages
Arkenan Oct 28, 2025
2460e46
fix db test
Arkenan Oct 28, 2025
e1b543f
fix lint
Arkenan Oct 28, 2025
057820e
separate cfs in locked trie
Arkenan Oct 29, 2025
84901c9
Fix account in open direct state
Arkenan Oct 30, 2025
7f89472
merge with stash
Arkenan Nov 3, 2025
e462dba
Merge branch 'main' into separate-accounts-and-storage-rocksdb
Arkenan Nov 3, 2025
99c1ce8
Fix genesis setup so that it doesn't call put_batch
Arkenan Nov 5, 2025
3b431a5
Remove some printlns, tidy some code, fix some tests
Arkenan Nov 6, 2025
4d8903a
remove unnecessary changes in smoke test
Arkenan Nov 6, 2025
64a7192
remove printlns
Arkenan Nov 6, 2025
7a84201
remove trash files
Arkenan Nov 6, 2025
d55c9f4
merge with main
Arkenan Nov 6, 2025
3043984
fix merging bug
Arkenan Nov 6, 2025
6a345c5
Merge branch 'main' into separate-accounts-and-storage-rocksdb
edg-l Nov 10, 2025
7fc56e7
Merge remote-tracking branch 'origin/main' into separate-accounts-and…
edg-l Nov 11, 2025
59533f4
fix
edg-l Nov 11, 2025
53a0402
fmt
edg-l Nov 11, 2025
08eefec
changelog
edg-l Nov 11, 2025
1cd7021
lint
edg-l Nov 11, 2025
d29e669
Merge branch 'main' into separate-accounts-and-storage-rocksdb
jrchatruc Nov 11, 2025
c7e9b76
fmt
jrchatruc Nov 11, 2025
4a8be57
Merge branch 'main' into separate-accounts-and-storage-rocksdb
jrchatruc Nov 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

### 2025-11-11

- Separate Account and storage Column families in rocksdb [#5055](https://github.com/lambdaclass/ethrex/pull/5055)
- Fix `FlatKeyValue` generation on fullsync mode [#5274](https://github.com/lambdaclass/ethrex/pull/5274)

### 2025-11-10
Expand Down
37 changes: 15 additions & 22 deletions crates/storage/store.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::api::StoreEngine;
use crate::error::StoreError;
use crate::store_db::in_memory::Store as InMemoryStore;
#[cfg(feature = "rocksdb")]
use crate::store_db::rocksdb::Store as RocksDBStore;
use crate::{api::StoreEngine, apply_prefix};

use ethereum_types::{Address, H256, U256};
use ethrex_common::{
Expand Down Expand Up @@ -532,50 +532,43 @@ impl Store {
&self,
genesis_accounts: BTreeMap<Address, GenesisAccount>,
) -> Result<H256, StoreError> {
let mut nodes = HashMap::new();
let mut genesis_state_trie = self.engine.open_direct_state_trie(*EMPTY_TRIE_HASH)?;
let mut account_trie = self.engine.open_direct_state_trie(*EMPTY_TRIE_HASH)?;

for (address, account) in genesis_accounts {
let hashed_address = hash_address(&address);

// Store account code (as this won't be stored in the trie)
let code = Code::from_bytecode(account.code);
let code_hash = code.hash;
self.add_account_code(code).await?;

// Store the account's storage in a clean storage trie and compute its root
let mut storage_trie = self
.engine
.open_direct_storage_trie(H256::from_slice(&hashed_address), *EMPTY_TRIE_HASH)?;

for (storage_key, storage_value) in account.storage {
if !storage_value.is_zero() {
let hashed_key = hash_key(&H256(storage_key.to_big_endian()));
storage_trie.insert(hashed_key, storage_value.encode_to_vec())?;
}
}
let (storage_root, new_nodes) = storage_trie.collect_changes_since_last_hash();
nodes.insert(H256::from_slice(&hashed_address), new_nodes);

// TODO(#5195): committing each storage trie individually is inefficient.
// We would benefit form a mass storage node insertion method.
Comment on lines +557 to +558
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we do it like before, with Trie::collect_changes_since_last_hash()? We can store the changes inside a shared buffer, and dump it in the DB at the end.


// Add account to trie
let account_state = AccountState {
nonce: account.nonce,
balance: account.balance,
storage_root,
storage_root: storage_trie.hash()?,
code_hash,
};
genesis_state_trie.insert(hashed_address, account_state.encode_to_vec())?;

account_trie.insert(hashed_address, account_state.encode_to_vec())?;
}
let (state_root, state_nodes) = genesis_state_trie.collect_changes_since_last_hash();

// TODO: replace this with a Store method
genesis_state_trie.db().put_batch(
nodes
.into_iter()
.flat_map(|(account_hash, nodes)| {
nodes
.into_iter()
.map(move |(path, node)| (apply_prefix(Some(account_hash), path), node))
})
.chain(state_nodes)
.collect(),
)?;
Ok(state_root)

Ok(account_trie.hash()?)
}

pub async fn add_receipt(
Expand Down
112 changes: 80 additions & 32 deletions crates/storage/store_db/rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,15 @@ const CF_CHAIN_DATA: &str = "chain_data";
/// - [`Vec<u8>`] = `BlockHashRLP::from(block_hash).bytes().clone()`
const CF_SNAP_STATE: &str = "snap_state";

/// State trie nodes column family: [`Nibbles`] => [`Vec<u8>`]
/// Account State trie nodes column family: [`Nibbles`] => [`Vec<u8>`]
/// - [`Nibbles`] = `node_hash.as_ref()`
/// - [`Vec<u8>`] = `node_data`
const CF_TRIE_NODES: &str = "trie_nodes";
const CF_ACCOUNT_TRIE_NODES: &str = "account_trie_nodes";

/// Storage trie nodes column family: [`Nibbles`] => [`Vec<u8>`]
/// - [`Nibbles`] = `node_hash.as_ref()`
/// - [`Vec<u8>`] = `node_data`
const CF_STORAGE_TRIE_NODES: &str = "storage_trie_nodes";

/// Pending blocks column family: [`Vec<u8>`] => [`Vec<u8>`]
/// - [`Vec<u8>`] = `BlockHashRLP::from(block.hash()).bytes().clone()`
Expand All @@ -115,7 +120,15 @@ const CF_INVALID_ANCESTORS: &str = "invalid_ancestors";
/// - [`Vec<u8>`] = `BlockHeaderRLP::from(block.header.clone()).bytes().clone()`
const CF_FULLSYNC_HEADERS: &str = "fullsync_headers";

pub const CF_FLATKEYVALUE: &str = "flatkeyvalue";
/// Account sate flat key-value store: [`Nibbles`] => [`Vec<u8>`]
/// - [`Nibbles`] = `node_hash.as_ref()`
/// - [`Vec<u8>`] = `node_data`
pub const CF_ACCOUNT_FLATKEYVALUE: &str = "account_flatkeyvalue";

/// Storage slots key-value store: [`Nibbles`] => [`Vec<u8>`]
/// - [`Nibbles`] = `node_hash.as_ref()`
/// - [`Vec<u8>`] = `node_data`
pub const CF_STORAGE_FLATKEYVALUE: &str = "storage_flatkeyvalue";

pub const CF_MISC_VALUES: &str = "misc_values";

Expand Down Expand Up @@ -197,11 +210,13 @@ impl Store {
CF_TRANSACTION_LOCATIONS,
CF_CHAIN_DATA,
CF_SNAP_STATE,
CF_TRIE_NODES,
CF_ACCOUNT_TRIE_NODES,
CF_STORAGE_TRIE_NODES,
CF_PENDING_BLOCKS,
CF_INVALID_ANCESTORS,
CF_FULLSYNC_HEADERS,
CF_FLATKEYVALUE,
CF_ACCOUNT_FLATKEYVALUE,
CF_STORAGE_FLATKEYVALUE,
CF_MISC_VALUES,
];

Expand Down Expand Up @@ -263,7 +278,7 @@ impl Store {
block_opts.set_bloom_filter(10.0, false);
cf_opts.set_block_based_table_factory(&block_opts);
}
CF_TRIE_NODES => {
CF_ACCOUNT_TRIE_NODES | CF_STORAGE_TRIE_NODES => {
cf_opts.set_write_buffer_size(512 * 1024 * 1024); // 512MB
cf_opts.set_max_write_buffer_number(6);
cf_opts.set_min_write_buffer_number_to_merge(2);
Expand All @@ -275,7 +290,7 @@ impl Store {
block_opts.set_bloom_filter(10.0, false); // 10 bits per key
cf_opts.set_block_based_table_factory(&block_opts);
}
CF_FLATKEYVALUE => {
CF_ACCOUNT_FLATKEYVALUE | CF_STORAGE_FLATKEYVALUE => {
cf_opts.set_write_buffer_size(512 * 1024 * 1024); // 512MB
cf_opts.set_max_write_buffer_number(6);
cf_opts.set_min_write_buffer_number_to_merge(2);
Expand Down Expand Up @@ -579,7 +594,8 @@ impl Store {
control_rx: &mut std::sync::mpsc::Receiver<FKVGeneratorControlMessage>,
) -> Result<(), StoreError> {
let cf_misc = self.cf_handle(CF_MISC_VALUES)?;
let cf_flatkeyvalue = self.cf_handle(CF_FLATKEYVALUE)?;
let cf_accounts_fkv = self.cf_handle(CF_ACCOUNT_FLATKEYVALUE)?;
let cf_storage_fkv = self.cf_handle(CF_STORAGE_FLATKEYVALUE)?;

let last_written = self
.db
Expand All @@ -590,11 +606,13 @@ impl Store {
}

self.db
.delete_range_cf(&cf_flatkeyvalue, last_written, vec![0xff])?;
.delete_range_cf(&cf_accounts_fkv, &last_written, vec![0xff].as_ref())?;
self.db
.delete_range_cf(&cf_storage_fkv, &last_written, vec![0xff].as_ref())?;

loop {
let root = self
.read_sync(CF_TRIE_NODES, [])?
.read_sync(CF_ACCOUNT_TRIE_NODES, [])?
.ok_or(StoreError::MissingLatestBlockNumber)?;
let root: Node = ethrex_trie::Node::decode(&root)?;
let state_root = root.compute_hash().finalize();
Expand All @@ -616,18 +634,18 @@ impl Store {

let mut ctr = 0;
let mut batch = WriteBatch::default();
let mut iter = self.open_direct_state_trie(state_root)?.into_iter();
let mut account_iter = self.open_direct_state_trie(state_root)?.into_iter();
if last_written_account > Nibbles::default() {
iter.advance(last_written_account.to_bytes())?;
account_iter.advance(last_written_account.to_bytes())?;
}
let res = iter.try_for_each(|(path, node)| -> Result<(), StoreError> {
let Node::Leaf(node) = node else {
let res = account_iter.try_for_each(|(path, account_node)| -> Result<(), StoreError> {
let Node::Leaf(node) = account_node else {
return Ok(());
};
let account_state = AccountState::decode(&node.value)?;
let account_hash = H256::from_slice(&path.to_bytes());
batch.put_cf(&cf_misc, "last_written", path.as_ref());
batch.put_cf(&cf_flatkeyvalue, path.as_ref(), node.value);
batch.put_cf(&cf_accounts_fkv, path.as_ref(), node.value);
ctr += 1;
if ctr > 10_000 {
self.db.write(std::mem::take(&mut batch))?;
Expand All @@ -638,20 +656,20 @@ impl Store {
ctr = 0;
}

let mut iter_inner = self
let mut storage_iter = self
.open_direct_storage_trie(account_hash, account_state.storage_root)?
.into_iter();
if last_written_storage > Nibbles::default() {
iter_inner.advance(last_written_storage.to_bytes())?;
storage_iter.advance(last_written_storage.to_bytes())?;
last_written_storage = Nibbles::default();
}
iter_inner.try_for_each(|(path, node)| -> Result<(), StoreError> {
let Node::Leaf(node) = node else {
storage_iter.try_for_each(|(path, storage_node)| -> Result<(), StoreError> {
let Node::Leaf(node) = storage_node else {
return Ok(());
};
let key = apply_prefix(Some(account_hash), path);
batch.put_cf(&cf_misc, "last_written", key.as_ref());
batch.put_cf(&cf_flatkeyvalue, key.as_ref(), node.value);
batch.put_cf(&cf_storage_fkv, key.as_ref(), node.value);
ctr += 1;
if ctr > 10_000 {
self.db.write(std::mem::take(&mut batch))?;
Expand Down Expand Up @@ -754,22 +772,45 @@ impl Store {
// RCU to remove the bottom layer: update step needs to happen after disk layer is updated.
let mut trie_mut = (*trie).clone();
let mut batch = WriteBatch::default();
let [cf_trie_nodes, cf_flatkeyvalue, cf_misc] =
open_cfs(db, [CF_TRIE_NODES, CF_FLATKEYVALUE, CF_MISC_VALUES])?;
let [
cf_accounts_trie_nodes,
cf_accounts_flatkeyvalue,
cf_storage_trie_nodes,
cf_storage_flatkeyvalue,
cf_misc,
] = open_cfs(
db,
[
CF_ACCOUNT_TRIE_NODES,
CF_ACCOUNT_FLATKEYVALUE,
CF_STORAGE_TRIE_NODES,
CF_STORAGE_FLATKEYVALUE,
CF_MISC_VALUES,
],
)?;

let last_written = db.get_cf(&cf_misc, "last_written")?.unwrap_or_default();
// Commit removes the bottom layer and returns it, this is the mutation step.
let nodes = trie_mut.commit(root).unwrap_or_default();
for (key, value) in nodes {
let is_leaf = key.len() == 65 || key.len() == 131;
// Accounts have only the account address as their path, while storage keys have
// the account address (32 bytes) + storage path (up to 32 bytes).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mentioning the pre-encoding sizes before code dealing with encoded sizes might generate confusion (one might think 32 + 32 and 65 are related).

let is_account = key.len() <= 65;

if is_leaf && key > last_written {
continue;
}
let cf = if is_leaf {
&cf_flatkeyvalue
if is_account {
&cf_accounts_flatkeyvalue
} else {
&cf_storage_flatkeyvalue
}
} else if is_account {
&cf_accounts_trie_nodes
} else {
&cf_trie_nodes
&cf_storage_trie_nodes
};
if value.is_empty() {
batch.delete_cf(cf, key);
Expand Down Expand Up @@ -836,6 +877,7 @@ impl StoreEngine for Store {
CF_BODIES,
],
)?;

let mut batch = WriteBatch::default();

let UpdateBatch {
Expand Down Expand Up @@ -1479,7 +1521,8 @@ impl StoreEngine for Store {
// FIXME: use a DB snapshot here
let db = Box::new(RocksDBTrieDB::new(
self.db.clone(),
CF_TRIE_NODES,
CF_STORAGE_TRIE_NODES,
CF_STORAGE_FLATKEYVALUE,
None,
self.last_written()?,
)?);
Expand All @@ -1500,7 +1543,8 @@ impl StoreEngine for Store {
// FIXME: use a DB snapshot here
let db = Box::new(RocksDBTrieDB::new(
self.db.clone(),
CF_TRIE_NODES,
CF_ACCOUNT_TRIE_NODES,
CF_ACCOUNT_FLATKEYVALUE,
None,
self.last_written()?,
)?);
Expand All @@ -1524,7 +1568,8 @@ impl StoreEngine for Store {
) -> Result<Trie, StoreError> {
let db = Box::new(RocksDBTrieDB::new(
self.db.clone(),
CF_TRIE_NODES,
CF_STORAGE_TRIE_NODES,
CF_STORAGE_FLATKEYVALUE,
Some(hashed_address),
self.last_written()?,
)?);
Expand All @@ -1534,7 +1579,8 @@ impl StoreEngine for Store {
fn open_direct_state_trie(&self, state_root: H256) -> Result<Trie, StoreError> {
let db = Box::new(RocksDBTrieDB::new(
self.db.clone(),
CF_TRIE_NODES,
CF_ACCOUNT_TRIE_NODES,
CF_ACCOUNT_FLATKEYVALUE,
None,
self.last_written()?,
)?);
Expand All @@ -1544,7 +1590,8 @@ impl StoreEngine for Store {
fn open_locked_state_trie(&self, state_root: H256) -> Result<Trie, StoreError> {
let db = Box::new(RocksDBLockedTrieDB::new(
self.db.clone(),
CF_TRIE_NODES,
CF_ACCOUNT_TRIE_NODES,
CF_ACCOUNT_FLATKEYVALUE,
None,
self.last_written()?,
)?);
Expand All @@ -1569,7 +1616,8 @@ impl StoreEngine for Store {
) -> Result<Trie, StoreError> {
let db = Box::new(RocksDBLockedTrieDB::new(
self.db.clone(),
CF_TRIE_NODES,
CF_STORAGE_TRIE_NODES,
CF_STORAGE_FLATKEYVALUE,
None,
self.last_written()?,
)?);
Expand Down Expand Up @@ -1845,8 +1893,8 @@ impl StoreEngine for Store {
let db = self.db.clone();
tokio::task::spawn_blocking(move || {
let mut batch = WriteBatch::default();
let cf = db.cf_handle(CF_TRIE_NODES).ok_or_else(|| {
StoreError::Custom("Column family not found: CF_TRIE_NODES".to_string())
let cf = db.cf_handle(CF_STORAGE_TRIE_NODES).ok_or_else(|| {
StoreError::Custom("Column family not found: CF_STORAGE_TRIE_NODES".to_string())
})?;

for (address_hash, nodes) in storage_trie_nodes {
Expand Down
Loading