Skip to content

Message-Based Testing #42

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 17 additions & 15 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,25 @@ members = [
"common",

# Modules
"modules/genesis_bootstrapper", # Genesis bootstrap UTXOs
"modules/mithril_snapshot_fetcher",# Mithril snapshot fetcher
"modules/upstream_chain_fetcher", # Upstream chain fetcher
"modules/block_unpacker", # Block to transaction unpacker
"modules/tx_unpacker", # Tx to UTXO unpacker
"modules/utxo_state", # UTXO state
"modules/spo_state", # SPO state
"modules/drep_state", # DRep state
"modules/governance_state", # Governance state
"modules/parameters_state", # Keeps track of protocol parameters
"modules/stake_delta_filter", # Filters address deltas
"modules/epoch_activity_counter", # Counts fees and block producers for rewards
"modules/accounts_state", # Tracks stake and reward accounts
"modules/genesis_bootstrapper", # Genesis bootstrap UTXOs
"modules/mithril_snapshot_fetcher", # Mithril snapshot fetcher
"modules/snapshot_bootstrapper", # Bootstrap state from a ledger snapshot
"modules/upstream_chain_fetcher", # Upstream chain fetcher
"modules/block_unpacker", # Block to transaction unpacker
"modules/tx_unpacker", # Tx to UTXO unpacker
"modules/utxo_state", # UTXO state
"modules/spo_state", # SPO state
"modules/drep_state", # DRep state
"modules/parameters_state", # Keeps track of protocol parameters
"modules/governance_state", # Governance state
"modules/stake_delta_filter", # Filters address deltas
"modules/epoch_activity_counter", # Counts fees and block producers for rewards
"modules/accounts_state", # Tracks stake and reward accounts

# Process builds
"processes/omnibus", # All-inclusive omnibus process
"processes/replayer", # All-inclusive process to replay messages
"processes/omnibus", # All-inclusive omnibus process
"processes/replayer", # All-inclusive process to replay messages
"processes/golden_tests", #All-inclusive golden tests process
]

resolver = "2"
2 changes: 2 additions & 0 deletions common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ serde_with = { version = "3.12.0", features = ["hex"] }
tokio = { version = "1", features = ["full"] }
tracing = "0.1.40"
futures = "0.3.31"
minicbor = { version = "0.26.0", features = ["std", "half", "derive"] }


[lib]
crate-type = ["rlib"]
Expand Down
206 changes: 206 additions & 0 deletions common/src/ledger_state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
use crate::{
KeyHash, MultiHostName, PoolRegistration, Ratio, Relay, SingleHostAddr, SingleHostName,
};
use anyhow::{bail, Context, Result};
use minicbor::data::Tag;
use std::{collections::BTreeMap, fs, path::Path};

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, Default)]
pub struct LedgerState {
pub spo_state: SPOState,
}

pub struct UTxOState {}

pub struct StakeDistributionState {}

pub struct AccountState {}

pub struct ParametersState {}

#[derive(
Debug,
Clone,
serde::Serialize,
serde::Deserialize,
minicbor::Decode,
minicbor::Encode,
Default,
Eq,
PartialEq,
)]
pub struct SPOState {
#[n(0)]
pub pools: BTreeMap<KeyHash, PoolRegistration>,
#[n(1)]
pub retiring: BTreeMap<KeyHash, u64>,
}

pub struct DRepState {}

pub struct ProposalState {}

pub struct VotingState {}

impl LedgerState {
pub fn from_directory(directory_path: impl AsRef<Path>) -> Result<Self> {
let directory_path = directory_path.as_ref();
if !directory_path.exists() {
bail!("directory does not exist: {}", directory_path.display());
}

if !directory_path.is_dir() {
bail!("path is not a directory: {}", directory_path.display());
}

let mut ledger_state = Self::default();
ledger_state
.load_from_directory(directory_path)
.with_context(|| {
format!(
"Failed to load ledger state from directory: {}",
directory_path.display()
)
})?;

Ok(ledger_state)
}

fn load_from_directory(&mut self, directory_path: impl AsRef<Path>) -> Result<()> {
let directory_path = directory_path.as_ref();
let entries = fs::read_dir(directory_path)
.with_context(|| format!("failed to read directory: {}", directory_path.display()))?;

for entry in entries {
let entry = entry.with_context(|| "failed to read directory entry")?;
let path = entry.path();

if path.is_file() && path.extension().map_or(false, |ext| ext == "cbor") {
self.load_cbor_file(&path)
.with_context(|| format!("failed to load CBOR file: {}", path.display()))?;
}
}

Ok(())
}

fn load_cbor_file(&mut self, file_path: impl AsRef<Path>) -> Result<()> {
let file_path = file_path.as_ref();
let filename = file_path
.file_stem()
.and_then(|s| s.to_str())
.with_context(|| format!("invalid filename: {}", file_path.display()))?;

let bytes = fs::read(file_path)
.with_context(|| format!("failed to read file: {}", file_path.display()))?;

match filename {
"pools" => {
self.spo_state = minicbor::decode(&bytes).with_context(|| {
format!("failed to decode SPO state from: {}", file_path.display())
})?;
}
_ => {
// ignore unknown cbor files
}
}

Ok(())
}
}

impl<'b, C> minicbor::decode::Decode<'b, C> for Ratio {
fn decode(d: &mut minicbor::Decoder<'b>, ctx: &mut C) -> Result<Self, minicbor::decode::Error> {
let tag = d.tag()?;
if tag.as_u64() != 30 {
return Err(minicbor::decode::Error::message("tag must be 30"));
}
let maybe_array_length = d.array()?;
if let Some(length) = maybe_array_length {
if length != 2 {
return Err(minicbor::decode::Error::message(
"array must be of length 2",
));
}
}

Ok(Ratio {
numerator: d.decode_with(ctx)?,
denominator: d.decode_with(ctx)?,
})
}
}

impl<C> minicbor::encode::Encode<C> for Ratio {
fn encode<W: minicbor::encode::Write>(
&self,
e: &mut minicbor::Encoder<W>,
ctx: &mut C,
) -> Result<(), minicbor::encode::Error<W::Error>> {
e.tag(Tag::new(30))?;
e.array(2)?;
e.encode_with(self.numerator, ctx)?;
e.encode_with(self.denominator, ctx)?;
Ok(())
}
}

impl<'b, C> minicbor::decode::Decode<'b, C> for Relay {
fn decode(d: &mut minicbor::Decoder<'b>, ctx: &mut C) -> Result<Self, minicbor::decode::Error> {
d.array()?;
let variant = d.u16()?;

match variant {
0 => Ok(Relay::SingleHostAddr(SingleHostAddr {
port: d.decode_with(ctx)?,
ipv4: d.decode_with(ctx)?,
ipv6: d.decode_with(ctx)?,
})),
1 => Ok(Relay::SingleHostName(SingleHostName {
port: d.decode_with(ctx)?,
dns_name: d.decode_with(ctx)?,
})),
2 => Ok(Relay::MultiHostName(MultiHostName {
dns_name: d.decode_with(ctx)?,
})),
_ => Err(minicbor::decode::Error::message(
"invalid variant id for Relay",
)),
}
}
}

impl<C> minicbor::encode::Encode<C> for Relay {
fn encode<W: minicbor::encode::Write>(
&self,
e: &mut minicbor::Encoder<W>,
ctx: &mut C,
) -> Result<(), minicbor::encode::Error<W::Error>> {
match self {
Relay::SingleHostAddr(SingleHostAddr { port, ipv4, ipv6 }) => {
e.array(4)?;
e.encode_with(0, ctx)?;
e.encode_with(port, ctx)?;
e.encode_with(ipv4, ctx)?;
e.encode_with(ipv6, ctx)?;

Ok(())
}
Relay::SingleHostName(SingleHostName { port, dns_name }) => {
e.array(3)?;
e.encode_with(1, ctx)?;
e.encode_with(port, ctx)?;
e.encode_with(dns_name, ctx)?;

Ok(())
}
Relay::MultiHostName(MultiHostName { dns_name }) => {
e.array(2)?;
e.encode_with(2, ctx)?;
e.encode_with(dns_name, ctx)?;

Ok(())
}
}
}
}
1 change: 1 addition & 0 deletions common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub mod address;
pub mod calculations;
pub mod cip19;
pub mod crypto;
pub mod ledger_state;
pub mod messages;
pub mod params;
pub mod rational_number;
Expand Down
22 changes: 22 additions & 0 deletions common/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
// We don't use these messages in the acropolis_common crate itself
#![allow(dead_code)]

use crate::ledger_state::SPOState;

use crate::types::*;

// Caryatid core messages which we re-export
Expand Down Expand Up @@ -172,6 +174,23 @@ pub enum CardanoMessage {
StakeAddressDeltas(StakeAddressDeltasMessage), // Stake part of address deltas
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum SnapshotMessage {
Bootstrap(SnapshotStateMessage),
DumpRequest(SnapshotDumpMessage),
Dump(SnapshotStateMessage),
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct SnapshotDumpMessage {
pub block_height: u64,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum SnapshotStateMessage {
SPOState(SPOState),
}

// === Global message enum ===
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum Message {
Expand All @@ -188,6 +207,9 @@ pub enum Message {

// Cardano messages with attached BlockInfo
Cardano((BlockInfo, CardanoMessage)),

// Initialize state from a snapshot
Snapshot(SnapshotMessage),
}

impl Default for Message {
Expand Down
Loading
Loading