Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions crates/stateless-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,5 +61,8 @@ tokio.workspace = true
tracing.workspace = true
zstd.workspace = true

[dev-dependencies]
tempfile = "3"

[features]
test-bucket-resize = ["salt/test-bucket-resize"]
29 changes: 19 additions & 10 deletions crates/stateless-core/src/chain_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ use op_alloy_rpc_types::Transaction;
use salt::SaltWitness;
use tracing::{Instrument, debug, error, info, info_span, instrument, warn};

use crate::{RpcClient, ValidatorDB, withdrawals::MptWitness};
use crate::{
RpcClient,
storage_traits::{ChainState, TaskQueue},
withdrawals::MptWitness,
};

/// Default metrics port for Prometheus endpoint.
pub const DEFAULT_METRICS_PORT: u16 = 9090;
Expand Down Expand Up @@ -111,12 +115,15 @@ pub struct FetchResult {
/// * `Ok(FetchResult)` - Result containing fetch statistics
/// * `Err(eyre::Error)` - On critical failures
#[instrument(skip_all, name = "chain_sync")]
pub async fn fetch_blocks_batch(
pub async fn fetch_blocks_batch<DB>(
client: &RpcClient,
db: &ValidatorDB,
db: &DB,
config: &ChainSyncConfig,
block_error_counts: &mut HashMap<u64, usize>,
) -> Result<FetchResult> {
) -> Result<FetchResult>
where
DB: ChainState + TaskQueue,
{
let batch_start = Instant::now();

// Calculate how far behind our local chain is from remote
Expand Down Expand Up @@ -370,7 +377,8 @@ pub async fn fetch_blocks_batch(
let add_tasks_elapsed = db_start.elapsed();

let grow_chain_start = Instant::now();
db.grow_remote_chain(tasks.iter().map(|(block, _, _)| &block.header))?;
let headers: Vec<_> = tasks.iter().map(|(block, _, _)| block.header.clone()).collect();
db.grow_remote_chain(&headers)?;
Comment on lines +380 to +381
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The &[Header] trait signature forces an allocation + full clone of every header on the hot path. The original code passed an iterator with no allocation:

// before
db.grow_remote_chain(tasks.iter().map(|(block, _, _)| &block.header))?;

Consider changing the trait method to accept impl IntoIterator<Item = &Header> — though that isn't directly expressible in a trait method today without a generic parameter. A practical middle ground: keep &[Header] in the trait but change the inherent ValidatorDB::grow_remote_chain to also accept &[Header] so the trait impl is a zero-cost pass-through, and fix the call site to avoid the clone by collecting &block.header into a Vec<&Header>:

Suggested change
let headers: Vec<_> = tasks.iter().map(|(block, _, _)| block.header.clone()).collect();
db.grow_remote_chain(&headers)?;
let headers: Vec<&Header> = tasks.iter().map(|(block, _, _)| &block.header).collect();
db.grow_remote_chain(&headers)?;

This still needs a Vec but at least avoids cloning each Header.

let grow_chain_elapsed = grow_chain_start.elapsed();

info!(
Expand Down Expand Up @@ -436,14 +444,15 @@ pub async fn fetch_blocks_batch(
///
/// # Returns
/// * Never returns under normal operation - runs indefinitely until externally terminated
pub async fn remote_chain_tracker<F, G>(
pub async fn remote_chain_tracker<DB, F, G>(
client: Arc<RpcClient>,
db: Arc<ValidatorDB>,
db: Arc<DB>,
config: Arc<ChainSyncConfig>,
on_reorg: Option<F>,
on_fetch: Option<G>,
) -> Result<()>
where
DB: ChainState + TaskQueue + Send + Sync + 'static,
F: Fn(&[B256]) + Send + Sync,
G: Fn(&FetchResult) + Send + Sync,
{
Expand All @@ -453,7 +462,7 @@ where
let mut block_error_counts: HashMap<u64, usize> = HashMap::new();

loop {
match fetch_blocks_batch(&client, &db, &config, &mut block_error_counts).await {
match fetch_blocks_batch(&client, &*db, &config, &mut block_error_counts).await {
Ok(result) => {
// Call reorg callback if a reorg occurred
if !result.reverted_hashes.is_empty() &&
Expand Down Expand Up @@ -489,9 +498,9 @@ where
/// The algorithm first exponentially expands backward to find a known-matching block,
/// then binary searches in that range.
#[instrument(skip(client, db), name = "find_divergence")]
async fn find_divergence_point(
async fn find_divergence_point<DB: ChainState>(
client: &RpcClient,
db: &ValidatorDB,
db: &DB,
mismatch_block: u64,
) -> Result<u64> {
let earliest_local = db.get_earliest_local_block()?.expect("Local chain cannot be empty");
Expand Down
2 changes: 2 additions & 0 deletions crates/stateless-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ pub use chain_sync::{
pub use light_witness::{LightWitness, LightWitnessExecutor};
pub mod database;
pub use database::{WitnessDatabase, WitnessDatabaseError, WitnessExternalEnv};
pub mod storage_traits;
pub use storage_traits::{BlockDataStore, ChainState, TaskQueue, ValidationResultStore};
pub mod validator_db;
pub use validator_db::{ValidationDbError, ValidationDbResult, ValidatorDB};
pub mod data_types;
Expand Down
141 changes: 141 additions & 0 deletions crates/stateless-core/src/storage_traits.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
//! Storage trait abstractions for `ValidatorDB`.
//!
//! These traits decouple consumers from the concrete `redb`-backed `ValidatorDB` implementation,
//! enabling alternative backends (e.g., in-memory for testing) without changing consumer code.

use std::collections::HashMap;

use alloy_genesis::Genesis;
use alloy_primitives::{B256, BlockHash, BlockNumber};
use alloy_rpc_types_eth::{Block, Header};
use op_alloy_rpc_types::Transaction;
use revm::state::Bytecode;
use salt::SaltWitness;

use crate::{
executor::ValidationResult, light_witness::LightWitness, validator_db::ValidationDbResult,
withdrawals::MptWitness,
};

/// Chain state management: tips, growth, rollback, genesis, anchor, and pruning.
///
/// Encapsulates the full lifecycle of chain state from initialization (genesis/anchor)
/// through synchronization (remote/local chain growth) to maintenance (rollback/pruning).
pub trait ChainState {
/// Returns the highest block in the local canonical chain, or `None` if empty.
fn get_local_tip(&self) -> ValidationDbResult<Option<(BlockNumber, BlockHash)>>;

/// Returns the highest block in the remote (unvalidated) chain, or `None` if empty.
fn get_remote_tip(&self) -> ValidationDbResult<Option<(BlockNumber, BlockHash)>>;

/// Extends the remote chain with a consecutive sequence of unvalidated block headers.
///
/// Each header's parent hash must match the current remote chain tip.
fn grow_remote_chain(&self, headers: &[Header]) -> ValidationDbResult<()>;

/// Extends the canonical chain with the next validated block from the remote chain.
///
/// Returns `true` if a block was advanced, `false` if no work to do.
fn grow_local_chain(&self) -> ValidationDbResult<bool>;

/// Promotes the first remote chain block to canonical without validation.
///
/// Used by debug-trace-server where blocks are trusted from upstream RPC.
/// Returns `true` if a block was promoted, `false` if remote chain is empty.
fn promote_remote_to_canonical(&self) -> ValidationDbResult<bool>;

/// Rolls back both canonical and remote chains to the given block number.
fn rollback_chain(&self, to_block: BlockNumber) -> ValidationDbResult<()>;

/// Looks up a block hash by number in canonical chain first, then remote chain.
fn get_block_hash(&self, block_number: BlockNumber) -> ValidationDbResult<Option<BlockHash>>;

/// Returns the earliest (lowest) block in the canonical chain.
fn get_earliest_local_block(&self) -> ValidationDbResult<Option<(BlockNumber, BlockHash)>>;

/// Resets the chain to start from a trusted anchor block, clearing all chain state.
fn reset_anchor_block(
&self,
block_number: BlockNumber,
block_hash: BlockHash,
post_state_root: B256,
post_withdrawals_root: B256,
) -> ValidationDbResult<()>;

/// Returns the stored anchor block, or `None` if not set.
fn get_anchor_block(&self) -> ValidationDbResult<Option<(BlockNumber, BlockHash)>>;

/// Persists the genesis configuration.
fn store_genesis(&self, genesis: &Genesis) -> ValidationDbResult<()>;

/// Loads the genesis configuration, or `None` if not stored yet.
fn load_genesis(&self) -> ValidationDbResult<Option<Genesis>>;

/// Removes chain data older than the given block number.
///
/// Returns the number of blocks pruned.
fn prune_history(&self, before_block: BlockNumber) -> ValidationDbResult<u64>;
}

/// Validation task lifecycle: creation, claiming, and recovery.
///
/// Manages the queue of blocks pending validation, including both the full-validation
/// path (stateless-validator) and the data-only path (debug-trace-server).
pub trait TaskQueue {
/// Queues blocks with full witness data for validation workers.
fn add_validation_tasks(
&self,
tasks: &[(Block<Transaction>, SaltWitness, MptWitness)],
) -> ValidationDbResult<()>;

/// Stores block data and light witnesses without creating validation tasks.
///
/// Used by debug-trace-server where blocks are served for tracing, not validated.
fn store_block_data(
&self,
tasks: &[(Block<Transaction>, LightWitness)],
) -> ValidationDbResult<()>;

/// Atomically claims the next pending validation task.
///
/// Returns `None` when no tasks are available.
fn get_next_task(
&self,
) -> ValidationDbResult<Option<(Block<Transaction>, SaltWitness, MptWitness)>>;

/// Moves interrupted (in-progress) tasks back to the pending queue.
fn recover_interrupted_tasks(&self) -> ValidationDbResult<()>;
}

/// Block and witness data retrieval, plus contract bytecode cache.
pub trait BlockDataStore {
/// Retrieves block data and light witness for a given block hash.
fn get_block_and_witness(
&self,
block_hash: BlockHash,
) -> ValidationDbResult<(Block<Transaction>, LightWitness)>;

/// Retrieves cached contract bytecodes.
///
/// Returns `(found, missing)` where `found` maps code hashes to bytecodes and
/// `missing` lists code hashes not present in the cache.
fn get_contract_codes(
&self,
code_hashes: &[B256],
) -> ValidationDbResult<(HashMap<B256, Bytecode>, Vec<B256>)>;

/// Stores contract bytecodes in the cache.
fn add_contract_codes(&self, bytecodes: &[(B256, Bytecode)]) -> ValidationDbResult<()>;
}

/// Validation result storage and lookup.
pub trait ValidationResultStore {
/// Records a completed validation and removes the task from the in-progress queue.
fn complete_validation(&self, result: ValidationResult) -> ValidationDbResult<()>;

/// Retrieves the validation result for a block, or `None` if not yet validated.
fn get_validation_result(
&self,
block_hash: BlockHash,
) -> ValidationDbResult<Option<ValidationResult>>;
}
Loading
Loading