From daa53d4181bd2269b1b72ade7de1b3c4c77e67aa Mon Sep 17 00:00:00 2001 From: "liquan.eth" Date: Tue, 31 Mar 2026 14:19:24 +0800 Subject: [PATCH] add storage straits --- Cargo.lock | 1 + crates/stateless-core/Cargo.toml | 3 + crates/stateless-core/src/chain_sync.rs | 29 +- crates/stateless-core/src/lib.rs | 2 + crates/stateless-core/src/storage_traits.rs | 141 ++++++++ crates/stateless-core/src/validator_db.rs | 357 ++++++++++++++++++++ 6 files changed, 523 insertions(+), 10 deletions(-) create mode 100644 crates/stateless-core/src/storage_traits.rs diff --git a/Cargo.lock b/Cargo.lock index 94084b7e..61d6bb9d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5952,6 +5952,7 @@ dependencies = [ "salt", "serde", "serde_json", + "tempfile", "thiserror 2.0.17", "tokio", "tracing", diff --git a/crates/stateless-core/Cargo.toml b/crates/stateless-core/Cargo.toml index 0b8f394c..c689c9c5 100644 --- a/crates/stateless-core/Cargo.toml +++ b/crates/stateless-core/Cargo.toml @@ -61,5 +61,8 @@ tokio.workspace = true tracing.workspace = true zstd.workspace = true +[dev-dependencies] +tempfile = "3" + [features] test-bucket-resize = ["salt/test-bucket-resize"] diff --git a/crates/stateless-core/src/chain_sync.rs b/crates/stateless-core/src/chain_sync.rs index b3ac0d97..bf12d55f 100644 --- a/crates/stateless-core/src/chain_sync.rs +++ b/crates/stateless-core/src/chain_sync.rs @@ -17,7 +17,11 @@ use op_alloy_rpc_types::Transaction; use salt::SaltWitness; use tracing::{Instrument, debug, error, info, info_span, instrument, warn}; -use crate::{RpcClient, ValidatorDB, withdrawals::MptWitness}; +use crate::{ + RpcClient, + storage_traits::{ChainState, TaskQueue}, + withdrawals::MptWitness, +}; /// Default metrics port for Prometheus endpoint. pub const DEFAULT_METRICS_PORT: u16 = 9090; @@ -111,12 +115,15 @@ pub struct FetchResult { /// * `Ok(FetchResult)` - Result containing fetch statistics /// * `Err(eyre::Error)` - On critical failures #[instrument(skip_all, name = "chain_sync")] -pub async fn fetch_blocks_batch( +pub async fn fetch_blocks_batch( client: &RpcClient, - db: &ValidatorDB, + db: &DB, config: &ChainSyncConfig, block_error_counts: &mut HashMap, -) -> Result { +) -> Result +where + DB: ChainState + TaskQueue, +{ let batch_start = Instant::now(); // Calculate how far behind our local chain is from remote @@ -370,7 +377,8 @@ pub async fn fetch_blocks_batch( let add_tasks_elapsed = db_start.elapsed(); let grow_chain_start = Instant::now(); - db.grow_remote_chain(tasks.iter().map(|(block, _, _)| &block.header))?; + let headers: Vec<_> = tasks.iter().map(|(block, _, _)| block.header.clone()).collect(); + db.grow_remote_chain(&headers)?; let grow_chain_elapsed = grow_chain_start.elapsed(); info!( @@ -436,14 +444,15 @@ pub async fn fetch_blocks_batch( /// /// # Returns /// * Never returns under normal operation - runs indefinitely until externally terminated -pub async fn remote_chain_tracker( +pub async fn remote_chain_tracker( client: Arc, - db: Arc, + db: Arc, config: Arc, on_reorg: Option, on_fetch: Option, ) -> Result<()> where + DB: ChainState + TaskQueue + Send + Sync + 'static, F: Fn(&[B256]) + Send + Sync, G: Fn(&FetchResult) + Send + Sync, { @@ -453,7 +462,7 @@ where let mut block_error_counts: HashMap = HashMap::new(); loop { - match fetch_blocks_batch(&client, &db, &config, &mut block_error_counts).await { + match fetch_blocks_batch(&client, &*db, &config, &mut block_error_counts).await { Ok(result) => { // Call reorg callback if a reorg occurred if !result.reverted_hashes.is_empty() && @@ -489,9 +498,9 @@ where /// The algorithm first exponentially expands backward to find a known-matching block, /// then binary searches in that range. #[instrument(skip(client, db), name = "find_divergence")] -async fn find_divergence_point( +async fn find_divergence_point( client: &RpcClient, - db: &ValidatorDB, + db: &DB, mismatch_block: u64, ) -> Result { let earliest_local = db.get_earliest_local_block()?.expect("Local chain cannot be empty"); diff --git a/crates/stateless-core/src/lib.rs b/crates/stateless-core/src/lib.rs index 136ca9b9..7206d6c0 100644 --- a/crates/stateless-core/src/lib.rs +++ b/crates/stateless-core/src/lib.rs @@ -25,6 +25,8 @@ pub use chain_sync::{ pub use light_witness::{LightWitness, LightWitnessExecutor}; pub mod database; pub use database::{WitnessDatabase, WitnessDatabaseError, WitnessExternalEnv}; +pub mod storage_traits; +pub use storage_traits::{BlockDataStore, ChainState, TaskQueue, ValidationResultStore}; pub mod validator_db; pub use validator_db::{ValidationDbError, ValidationDbResult, ValidatorDB}; pub mod data_types; diff --git a/crates/stateless-core/src/storage_traits.rs b/crates/stateless-core/src/storage_traits.rs new file mode 100644 index 00000000..77dda42c --- /dev/null +++ b/crates/stateless-core/src/storage_traits.rs @@ -0,0 +1,141 @@ +//! Storage trait abstractions for `ValidatorDB`. +//! +//! These traits decouple consumers from the concrete `redb`-backed `ValidatorDB` implementation, +//! enabling alternative backends (e.g., in-memory for testing) without changing consumer code. + +use std::collections::HashMap; + +use alloy_genesis::Genesis; +use alloy_primitives::{B256, BlockHash, BlockNumber}; +use alloy_rpc_types_eth::{Block, Header}; +use op_alloy_rpc_types::Transaction; +use revm::state::Bytecode; +use salt::SaltWitness; + +use crate::{ + executor::ValidationResult, light_witness::LightWitness, validator_db::ValidationDbResult, + withdrawals::MptWitness, +}; + +/// Chain state management: tips, growth, rollback, genesis, anchor, and pruning. +/// +/// Encapsulates the full lifecycle of chain state from initialization (genesis/anchor) +/// through synchronization (remote/local chain growth) to maintenance (rollback/pruning). +pub trait ChainState { + /// Returns the highest block in the local canonical chain, or `None` if empty. + fn get_local_tip(&self) -> ValidationDbResult>; + + /// Returns the highest block in the remote (unvalidated) chain, or `None` if empty. + fn get_remote_tip(&self) -> ValidationDbResult>; + + /// Extends the remote chain with a consecutive sequence of unvalidated block headers. + /// + /// Each header's parent hash must match the current remote chain tip. + fn grow_remote_chain(&self, headers: &[Header]) -> ValidationDbResult<()>; + + /// Extends the canonical chain with the next validated block from the remote chain. + /// + /// Returns `true` if a block was advanced, `false` if no work to do. + fn grow_local_chain(&self) -> ValidationDbResult; + + /// Promotes the first remote chain block to canonical without validation. + /// + /// Used by debug-trace-server where blocks are trusted from upstream RPC. + /// Returns `true` if a block was promoted, `false` if remote chain is empty. + fn promote_remote_to_canonical(&self) -> ValidationDbResult; + + /// Rolls back both canonical and remote chains to the given block number. + fn rollback_chain(&self, to_block: BlockNumber) -> ValidationDbResult<()>; + + /// Looks up a block hash by number in canonical chain first, then remote chain. + fn get_block_hash(&self, block_number: BlockNumber) -> ValidationDbResult>; + + /// Returns the earliest (lowest) block in the canonical chain. + fn get_earliest_local_block(&self) -> ValidationDbResult>; + + /// Resets the chain to start from a trusted anchor block, clearing all chain state. + fn reset_anchor_block( + &self, + block_number: BlockNumber, + block_hash: BlockHash, + post_state_root: B256, + post_withdrawals_root: B256, + ) -> ValidationDbResult<()>; + + /// Returns the stored anchor block, or `None` if not set. + fn get_anchor_block(&self) -> ValidationDbResult>; + + /// Persists the genesis configuration. + fn store_genesis(&self, genesis: &Genesis) -> ValidationDbResult<()>; + + /// Loads the genesis configuration, or `None` if not stored yet. + fn load_genesis(&self) -> ValidationDbResult>; + + /// Removes chain data older than the given block number. + /// + /// Returns the number of blocks pruned. + fn prune_history(&self, before_block: BlockNumber) -> ValidationDbResult; +} + +/// Validation task lifecycle: creation, claiming, and recovery. +/// +/// Manages the queue of blocks pending validation, including both the full-validation +/// path (stateless-validator) and the data-only path (debug-trace-server). +pub trait TaskQueue { + /// Queues blocks with full witness data for validation workers. + fn add_validation_tasks( + &self, + tasks: &[(Block, SaltWitness, MptWitness)], + ) -> ValidationDbResult<()>; + + /// Stores block data and light witnesses without creating validation tasks. + /// + /// Used by debug-trace-server where blocks are served for tracing, not validated. + fn store_block_data( + &self, + tasks: &[(Block, LightWitness)], + ) -> ValidationDbResult<()>; + + /// Atomically claims the next pending validation task. + /// + /// Returns `None` when no tasks are available. + fn get_next_task( + &self, + ) -> ValidationDbResult, SaltWitness, MptWitness)>>; + + /// Moves interrupted (in-progress) tasks back to the pending queue. + fn recover_interrupted_tasks(&self) -> ValidationDbResult<()>; +} + +/// Block and witness data retrieval, plus contract bytecode cache. +pub trait BlockDataStore { + /// Retrieves block data and light witness for a given block hash. + fn get_block_and_witness( + &self, + block_hash: BlockHash, + ) -> ValidationDbResult<(Block, LightWitness)>; + + /// Retrieves cached contract bytecodes. + /// + /// Returns `(found, missing)` where `found` maps code hashes to bytecodes and + /// `missing` lists code hashes not present in the cache. + fn get_contract_codes( + &self, + code_hashes: &[B256], + ) -> ValidationDbResult<(HashMap, Vec)>; + + /// Stores contract bytecodes in the cache. + fn add_contract_codes(&self, bytecodes: &[(B256, Bytecode)]) -> ValidationDbResult<()>; +} + +/// Validation result storage and lookup. +pub trait ValidationResultStore { + /// Records a completed validation and removes the task from the in-progress queue. + fn complete_validation(&self, result: ValidationResult) -> ValidationDbResult<()>; + + /// Retrieves the validation result for a block, or `None` if not yet validated. + fn get_validation_result( + &self, + block_hash: BlockHash, + ) -> ValidationDbResult>; +} diff --git a/crates/stateless-core/src/validator_db.rs b/crates/stateless-core/src/validator_db.rs index dd5bdbc1..518e0de8 100644 --- a/crates/stateless-core/src/validator_db.rs +++ b/crates/stateless-core/src/validator_db.rs @@ -337,6 +337,7 @@ impl ValidatorDB { let _validation_results = write_txn.open_table(VALIDATION_RESULTS)?; let _block_records = write_txn.open_table(BLOCK_RECORDS)?; let _contracts = write_txn.open_table(CONTRACTS)?; + let _genesis_config = write_txn.open_table(GENESIS_CONFIG)?; let _anchor_block = write_txn.open_table(ANCHOR_BLOCK)?; } write_txn.commit()?; @@ -1326,3 +1327,359 @@ fn decode_block_from_slice(bytes: &[u8]) -> Block { serde_json::from_slice(bytes) .expect("deserialization of previously stored block data must succeed") } + +// --------------------------------------------------------------------------- +// Storage trait implementations +// --------------------------------------------------------------------------- + +use crate::storage_traits::{BlockDataStore, ChainState, TaskQueue, ValidationResultStore}; + +impl ChainState for ValidatorDB { + fn get_local_tip(&self) -> ValidationDbResult> { + self.get_local_tip() + } + + fn get_remote_tip(&self) -> ValidationDbResult> { + self.get_remote_tip() + } + + fn grow_remote_chain(&self, headers: &[Header]) -> ValidationDbResult<()> { + self.grow_remote_chain(headers) + } + + fn grow_local_chain(&self) -> ValidationDbResult { + self.grow_local_chain() + } + + fn promote_remote_to_canonical(&self) -> ValidationDbResult { + self.promote_remote_to_canonical() + } + + fn rollback_chain(&self, to_block: BlockNumber) -> ValidationDbResult<()> { + self.rollback_chain(to_block) + } + + fn get_block_hash(&self, block_number: BlockNumber) -> ValidationDbResult> { + self.get_block_hash(block_number) + } + + fn get_earliest_local_block(&self) -> ValidationDbResult> { + self.get_earliest_local_block() + } + + fn reset_anchor_block( + &self, + block_number: BlockNumber, + block_hash: BlockHash, + post_state_root: B256, + post_withdrawals_root: B256, + ) -> ValidationDbResult<()> { + self.reset_anchor_block(block_number, block_hash, post_state_root, post_withdrawals_root) + } + + fn get_anchor_block(&self) -> ValidationDbResult> { + self.get_anchor_block() + } + + fn store_genesis(&self, genesis: &Genesis) -> ValidationDbResult<()> { + self.store_genesis(genesis) + } + + fn load_genesis(&self) -> ValidationDbResult> { + self.load_genesis() + } + + fn prune_history(&self, before_block: BlockNumber) -> ValidationDbResult { + self.prune_history(before_block) + } +} + +impl TaskQueue for ValidatorDB { + fn add_validation_tasks( + &self, + tasks: &[(Block, SaltWitness, MptWitness)], + ) -> ValidationDbResult<()> { + self.add_validation_tasks(tasks) + } + + fn store_block_data( + &self, + tasks: &[(Block, LightWitness)], + ) -> ValidationDbResult<()> { + self.store_block_data(tasks) + } + + fn get_next_task( + &self, + ) -> ValidationDbResult, SaltWitness, MptWitness)>> { + self.get_next_task() + } + + fn recover_interrupted_tasks(&self) -> ValidationDbResult<()> { + self.recover_interrupted_tasks() + } +} + +impl BlockDataStore for ValidatorDB { + fn get_block_and_witness( + &self, + block_hash: BlockHash, + ) -> ValidationDbResult<(Block, LightWitness)> { + self.get_block_and_witness(block_hash) + } + + fn get_contract_codes( + &self, + code_hashes: &[B256], + ) -> ValidationDbResult<(HashMap, Vec)> { + self.get_contract_codes(code_hashes.iter().copied()) + } + + fn add_contract_codes(&self, bytecodes: &[(B256, Bytecode)]) -> ValidationDbResult<()> { + self.add_contract_codes(bytecodes) + } +} + +impl ValidationResultStore for ValidatorDB { + fn complete_validation(&self, result: ValidationResult) -> ValidationDbResult<()> { + self.complete_validation(result) + } + + fn get_validation_result( + &self, + block_hash: BlockHash, + ) -> ValidationDbResult> { + self.get_validation_result(block_hash) + } +} + +#[cfg(test)] +mod tests { + use alloy_primitives::B256; + use alloy_rpc_types_eth::Header; + use revm::state::Bytecode; + + use super::*; + + /// Creates a ValidatorDB backed by a temporary directory. + fn temp_db() -> ValidatorDB { + let dir = tempfile::tempdir().unwrap(); + let db = ValidatorDB::new(dir.path().join("test.redb")).unwrap(); + std::mem::forget(dir); // OS cleans up when process exits + db + } + + /// Builds a test header with the given number, hash, and parent hash. + fn make_header(number: u64, hash: B256, parent_hash: B256) -> Header { + Header { + hash, + inner: alloy_consensus::Header { + number, + parent_hash, + state_root: B256::with_last_byte(number as u8), + withdrawals_root: Some(B256::with_last_byte(number as u8 + 100)), + ..Default::default() + }, + ..Default::default() + } + } + + /// Deterministic hash for a block number (for tests only). + fn block_hash(n: u64) -> B256 { + B256::with_last_byte(n as u8) + } + + #[test] + fn chain_growth_and_promotion() { + let db = temp_db(); + + // Anchor at block 50 + let anchor_hash = block_hash(50); + db.reset_anchor_block(50, anchor_hash, B256::ZERO, B256::ZERO).unwrap(); + + // Build headers 51, 52 with proper parent linkage + let h51 = make_header(51, block_hash(51), anchor_hash); + let h52 = make_header(52, block_hash(52), block_hash(51)); + db.grow_remote_chain(&[h51, h52]).unwrap(); + + // Remote tip should be at 52 + let remote_tip = db.get_remote_tip().unwrap().unwrap(); + assert_eq!(remote_tip.0, 52); + + // Promote one block + assert!(db.promote_remote_to_canonical().unwrap()); + let local_tip = db.get_local_tip().unwrap().unwrap(); + assert_eq!(local_tip.0, 51); + + // Promote second block + assert!(db.promote_remote_to_canonical().unwrap()); + let local_tip = db.get_local_tip().unwrap().unwrap(); + assert_eq!(local_tip.0, 52); + + // No more to promote + assert!(!db.promote_remote_to_canonical().unwrap()); + } + + #[test] + fn rollback() { + let db = temp_db(); + + // Anchor at block 50 + let anchor_hash = block_hash(50); + db.reset_anchor_block(50, anchor_hash, B256::ZERO, B256::ZERO).unwrap(); + + // Grow remote chain to block 55 + let headers: Vec<_> = + (51..=55).map(|n| make_header(n, block_hash(n), block_hash(n - 1))).collect(); + db.grow_remote_chain(&headers).unwrap(); + + // Promote all to canonical + for _ in 0..5 { + assert!(db.promote_remote_to_canonical().unwrap()); + } + assert_eq!(db.get_local_tip().unwrap().unwrap().0, 55); + + // Rollback to block 52 + db.rollback_chain(52).unwrap(); + assert_eq!(db.get_local_tip().unwrap().unwrap().0, 52); + // Remote chain should also be rolled back + assert!(db.get_remote_tip().unwrap().is_none()); + } + + #[test] + fn genesis_round_trip() { + let db = temp_db(); + + // No genesis initially + assert!(db.load_genesis().unwrap().is_none()); + + // Store a minimal genesis + let genesis = Genesis::default(); + db.store_genesis(&genesis).unwrap(); + + // Load it back + let loaded = db.load_genesis().unwrap().unwrap(); + assert_eq!( + serde_json::to_string(&loaded).unwrap(), + serde_json::to_string(&genesis).unwrap() + ); + } + + #[test] + fn anchor_block_round_trip() { + let db = temp_db(); + + // No anchor initially + assert!(db.get_anchor_block().unwrap().is_none()); + + let hash = block_hash(100); + let state_root = B256::with_last_byte(0xAA); + let withdrawals_root = B256::with_last_byte(0xBB); + db.reset_anchor_block(100, hash, state_root, withdrawals_root).unwrap(); + + // Anchor block stored + let (num, stored_hash) = db.get_anchor_block().unwrap().unwrap(); + assert_eq!(num, 100); + assert_eq!(stored_hash, hash); + + // Local tip matches anchor + let (tip_num, tip_hash) = db.get_local_tip().unwrap().unwrap(); + assert_eq!(tip_num, 100); + assert_eq!(tip_hash, hash); + } + + #[test] + fn contract_code_cache() { + let db = temp_db(); + + // Create two test bytecodes + let code1 = Bytecode::new_raw(alloy_primitives::Bytes::from_static(&[0x60, 0x00])); + let hash1 = code1.hash_slow(); + let code2 = Bytecode::new_raw(alloy_primitives::Bytes::from_static(&[0x60, 0x01])); + let hash2 = code2.hash_slow(); + let unknown_hash = B256::with_last_byte(0xFF); + + // Store first code + db.add_contract_codes(&[(hash1, code1.clone())]).unwrap(); + + // Query: hash1 found, hash2 and unknown missing + let (found, missing) = db.get_contract_codes([hash1, hash2, unknown_hash]).unwrap(); + assert_eq!(found.len(), 1); + assert!(found.contains_key(&hash1)); + assert_eq!(missing.len(), 2); + + // Store second code + db.add_contract_codes(&[(hash2, code2)]).unwrap(); + + // Now both found, only unknown missing + let (found, missing) = db.get_contract_codes([hash1, hash2, unknown_hash]).unwrap(); + assert_eq!(found.len(), 2); + assert_eq!(missing, vec![unknown_hash]); + } + + #[test] + fn get_block_hash_canonical_and_remote() { + let db = temp_db(); + + let anchor_hash = block_hash(10); + db.reset_anchor_block(10, anchor_hash, B256::ZERO, B256::ZERO).unwrap(); + + // Block 10 is in canonical chain + assert_eq!(db.get_block_hash(10).unwrap(), Some(anchor_hash)); + + // Build remote blocks 11, 12 + let h11 = make_header(11, block_hash(11), anchor_hash); + let h12 = make_header(12, block_hash(12), block_hash(11)); + db.grow_remote_chain(&[h11, h12]).unwrap(); + + // Block 11 found in remote chain + assert_eq!(db.get_block_hash(11).unwrap(), Some(block_hash(11))); + // Block 99 not found anywhere + assert!(db.get_block_hash(99).unwrap().is_none()); + } + + #[test] + fn earliest_local_block() { + let db = temp_db(); + + // Empty chain + assert!(db.get_earliest_local_block().unwrap().is_none()); + + let anchor_hash = block_hash(50); + db.reset_anchor_block(50, anchor_hash, B256::ZERO, B256::ZERO).unwrap(); + + let (num, _) = db.get_earliest_local_block().unwrap().unwrap(); + assert_eq!(num, 50); + } + + #[test] + fn prune_history() { + let db = temp_db(); + + // Anchor at block 100 + let anchor_hash = block_hash(100); + db.reset_anchor_block(100, anchor_hash, B256::ZERO, B256::ZERO).unwrap(); + + // Grow and promote 10 blocks (101..=110) + let headers: Vec<_> = + (101..=110).map(|n| make_header(n, block_hash(n), block_hash(n - 1))).collect(); + db.grow_remote_chain(&headers).unwrap(); + for _ in 0..10 { + db.promote_remote_to_canonical().unwrap(); + } + assert_eq!(db.get_local_tip().unwrap().unwrap().0, 110); + + // Verify earliest block before pruning + let (earliest_before, _) = db.get_earliest_local_block().unwrap().unwrap(); + assert_eq!(earliest_before, 100); + + // Prune blocks before 105. + // Note: pruned count reflects BLOCK_RECORDS entries (populated by add_validation_tasks, + // not promote_remote_to_canonical). The canonical chain orphan cleanup still runs. + db.prune_history(105).unwrap(); + + // Earliest block should now be >= 105 (orphaned canonical entries removed) + let (earliest, _) = db.get_earliest_local_block().unwrap().unwrap(); + assert!(earliest >= 105, "Expected earliest >= 105, got {earliest}"); + } +}