diff --git a/crates/chain-state/src/deferred_trie.rs b/crates/chain-state/src/deferred_trie.rs index 6e758a122..f0cdd909b 100644 --- a/crates/chain-state/src/deferred_trie.rs +++ b/crates/chain-state/src/deferred_trie.rs @@ -146,9 +146,8 @@ impl DeferredTrieData { /// /// # Process /// 1. Sort the current block's hashed state and trie updates - /// 2. Reuse parent's cached overlay if available (O(1) - the common case) - /// 3. Otherwise, rebuild overlay from ancestors (rare fallback) - /// 4. Extend the overlay with this block's sorted data + /// 2. Rebuild overlay from ancestors' per-block data + /// 3. Extend the overlay with this block's sorted data /// /// Used by both the async background task and the synchronous fallback path. /// @@ -172,46 +171,25 @@ impl DeferredTrieData { Err(arc) => arc.clone_into_sorted(), }; - // Reuse parent's overlay if available and anchors match. - // We can only reuse the parent's overlay if it was built on top of the same - // persisted anchor. If the anchor has changed (e.g., due to persistence), - // the parent's overlay is relative to an old state and cannot be used. - let overlay = if let Some(parent) = ancestors.last() { - let parent_data = parent.wait_cloned(); - - match &parent_data.anchored_trie_input { - // Case 1: Parent has cached overlay AND anchors match. - Some(AnchoredTrieInput { anchor_hash: parent_anchor, trie_input }) - if *parent_anchor == anchor_hash => - { - // O(1): Reuse parent's overlay, extend with current block's data. - let mut overlay = TrieInputSorted::new( - Arc::clone(&trie_input.nodes), - Arc::clone(&trie_input.state), - Default::default(), // prefix_sets are per-block, not cumulative - ); - // Only trigger COW clone if there's actually data to add. - if !sorted_hashed_state.is_empty() { - Arc::make_mut(&mut overlay.state).extend_ref_and_sort(&sorted_hashed_state); - } - if !sorted_trie_updates.is_empty() { - Arc::make_mut(&mut overlay.nodes).extend_ref_and_sort(&sorted_trie_updates); - } - overlay - } - // Case 2: Parent exists but anchor mismatch or no cached overlay. - // We must rebuild from the ancestors list (which only contains unpersisted blocks). - _ => Self::merge_ancestors_into_overlay( - ancestors, - &sorted_hashed_state, - &sorted_trie_updates, - ), - } - } else { - // Case 3: No in-memory ancestors (first block after persisted anchor). - // Build overlay with just this block's data. - Self::merge_ancestors_into_overlay(&[], &sorted_hashed_state, &sorted_trie_updates) - }; + // Always rebuild the overlay from ancestors' per-block data instead of reusing + // the parent's cached cumulative overlay. + // + // The previous approach cloned the parent's overlay via Arc and then called + // Arc::make_mut to extend it. Because the parent's cached ComputedTrieData also + // holds a reference to the same Arc, strong_count is always >= 2, forcing + // Arc::make_mut to deep-copy the entire cumulative overlay on every block. + // For high-throughput chains with large state (e.g., BSC with 3-second blocks), + // this causes uncontrollable memory growth and OOM. + // + // The rebuild path starts with a fresh TrieInputSorted (strong_count == 1) and + // extends it with each ancestor's per-block hashed_state and trie_updates. + // This is O(sum of ancestors' per-block state) which is bounded by + // persistence_threshold * per_block_state_size, and avoids deep copies entirely. + let overlay = Self::merge_ancestors_into_overlay( + ancestors, + &sorted_hashed_state, + &sorted_trie_updates, + ); ComputedTrieData::with_trie_input( Arc::new(sorted_hashed_state), @@ -223,13 +201,12 @@ impl DeferredTrieData { /// Merge all ancestors and current block's data into a single overlay. /// - /// This is a rare fallback path, only used when no ancestor has a cached - /// `anchored_trie_input` (e.g., blocks created via alternative constructors). - /// In normal operation, the parent always has a cached overlay and this - /// function is never called. + /// Builds a fresh [`TrieInputSorted`] by iterating ancestors oldest -> newest, + /// extending with each ancestor's per-block `hashed_state` and `trie_updates`, + /// then extending with the current block's sorted data (so later state wins). /// - /// Iterates ancestors oldest -> newest, then extends with current block's data, - /// so later state takes precedence. + /// Starts from `TrieInputSorted::default()` whose inner `Arc`s have + /// `strong_count == 1`, so `Arc::make_mut` never triggers a deep copy. fn merge_ancestors_into_overlay( ancestors: &[Self], sorted_hashed_state: &HashedPostStateSorted, diff --git a/crates/engine/tree/src/persistence.rs b/crates/engine/tree/src/persistence.rs index d867e91ca..a7e1de823 100644 --- a/crates/engine/tree/src/persistence.rs +++ b/crates/engine/tree/src/persistence.rs @@ -99,9 +99,15 @@ where .sync_metrics_tx .send(MetricEvent::SyncHeight { height: block_number }); + // BSC: skip inline pruning to prevent persistence service stall. + // The pruner can hang on large BSC mainnet data, blocking all + // subsequent SaveBlocks and causing unbounded memory growth (OOM). if self.pruner.is_pruning_needed(block_number) { - // We log `PrunerOutput` inside the `Pruner` - let _ = self.prune_before(block_number)?; + debug!( + target: "engine::persistence", + block_number, + "Pruning needed but skipped to avoid persistence stall" + ); } } } diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index 2b0f35fe8..b62041703 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -1287,6 +1287,19 @@ where /// This checks if we need to remove blocks (disk reorg) or save new blocks to disk. /// Persistence completion is handled separately via the `wait_for_event` method. fn advance_persistence(&mut self) -> Result<(), AdvancePersistenceError> { + let block_count = self.state.tree_state.block_count(); + if block_count > 64 { + warn!( + target: "engine::tree", + block_count, + persistence_in_progress = self.persistence_state.in_progress(), + canonical_head = self.state.tree_state.canonical_block_number(), + last_persisted = self.persistence_state.last_persisted_block.number, + backfill_idle = self.backfill_sync_state.is_idle(), + "In-memory block count exceeds 64, possible persistence stall" + ); + } + if !self.persistence_state.in_progress() { if let Some(new_tip_num) = self.find_disk_reorg()? { self.remove_blocks(new_tip_num) @@ -1366,7 +1379,9 @@ where last_persisted_hash_num: Option, start_time: Instant, ) -> Result<(), AdvancePersistenceError> { - self.metrics.engine.persistence_duration.record(start_time.elapsed()); + let elapsed = start_time.elapsed(); + self.metrics.engine.persistence_duration.record(elapsed); + let block_count_before = self.state.tree_state.block_count(); let Some(BlockNumHash { hash: last_persisted_block_hash, @@ -1378,7 +1393,15 @@ where return Ok(()) }; - debug!(target: "engine::tree", ?last_persisted_block_hash, ?last_persisted_block_number, elapsed=?start_time.elapsed(), "Finished persisting, calling finish"); + warn!( + target: "engine::tree", + ?last_persisted_block_hash, + last_persisted_block_number, + ?elapsed, + block_count_before, + canonical_head = self.state.tree_state.canonical_block_number(), + "Persistence completed, starting cleanup" + ); self.persistence_state.finish(last_persisted_block_hash, last_persisted_block_number); // Evict trie changesets for blocks below the finalized block, but keep at least 64 blocks @@ -1396,6 +1419,16 @@ where } self.on_new_persisted_block()?; + + let block_count_after = self.state.tree_state.block_count(); + warn!( + target: "engine::tree", + block_count_after, + blocks_removed = block_count_before.saturating_sub(block_count_after), + last_persisted = self.persistence_state.last_persisted_block.number, + "Cleanup after persistence completed" + ); + Ok(()) } diff --git a/crates/rpc/rpc-eth-api/src/core.rs b/crates/rpc/rpc-eth-api/src/core.rs index c2cc42880..4939e53f5 100644 --- a/crates/rpc/rpc-eth-api/src/core.rs +++ b/crates/rpc/rpc-eth-api/src/core.rs @@ -245,22 +245,26 @@ pub trait EthApi< /// Returns the finalized block header. /// - /// The `verified_validator_num` parameter is provided for BSC compatibility but is not used - /// in standard Ethereum. The finalized block is determined by the Beacon Chain consensus - /// (Casper FFG) and requires 2/3+ validator attestations. + /// BSC compatibility: + /// - `-1` uses ceil(validators/2) + /// - `-2` uses ceil(validators*2/3) + /// - `-3` uses all validators + /// - positive values override the threshold directly. #[method(name = "getFinalizedHeader")] - async fn finalized_header(&self, verified_validator_num: u64) -> RpcResult>; + async fn finalized_header(&self, verified_validator_num: i64) -> RpcResult>; /// Returns the finalized block. /// - /// The `verified_validator_num` parameter is provided for BSC compatibility but is not used - /// in standard Ethereum. The finalized block is determined by the Beacon Chain consensus - /// (Casper FFG) and requires 2/3+ validator attestations. + /// BSC compatibility: + /// - `-1` uses ceil(validators/2) + /// - `-2` uses ceil(validators*2/3) + /// - `-3` uses all validators + /// - positive values override the threshold directly. /// /// If `full` is true, the block object will contain all transaction objects, /// otherwise it will only contain the transaction hashes. #[method(name = "getFinalizedBlock")] - async fn finalized_block(&self, verified_validator_num: u64, full: bool) -> RpcResult>; + async fn finalized_block(&self, verified_validator_num: i64, full: bool) -> RpcResult>; /// `eth_simulateV1` executes an arbitrary number of transactions on top of the requested state. /// The transactions are packed into individual blocks. Overrides can be provided. @@ -752,13 +756,13 @@ where } /// Handler for: `eth_getFinalizedHeader` - async fn finalized_header(&self, verified_validator_num: u64) -> RpcResult>> { + async fn finalized_header(&self, verified_validator_num: i64) -> RpcResult>> { trace!(target: "rpc::eth", verified_validator_num, "Serving eth_getFinalizedHeader"); Ok(EthBlocks::rpc_finalized_header(self, verified_validator_num).await?) } /// Handler for: `eth_getFinalizedBlock` - async fn finalized_block(&self, verified_validator_num: u64, full: bool) -> RpcResult>> { + async fn finalized_block(&self, verified_validator_num: i64, full: bool) -> RpcResult>> { trace!(target: "rpc::eth", verified_validator_num, ?full, "Serving eth_getFinalizedBlock"); Ok(EthBlocks::rpc_finalized_block(self, verified_validator_num, full).await?) } @@ -974,4 +978,3 @@ where Ok(EthState::get_account_info(self, address, block).await?) } } - diff --git a/crates/rpc/rpc-eth-api/src/helpers/block.rs b/crates/rpc/rpc-eth-api/src/helpers/block.rs index 505bbd04a..29d7d4c71 100644 --- a/crates/rpc/rpc-eth-api/src/helpers/block.rs +++ b/crates/rpc/rpc-eth-api/src/helpers/block.rs @@ -2,8 +2,8 @@ use super::{LoadPendingBlock, LoadReceipt, SpawnBlocking}; use crate::{ - node::RpcNodeCoreExt, EthApiTypes, FromEthApiError, FullEthApiTypes, RpcBlock, RpcNodeCore, - RpcReceipt, + node::RpcNodeCoreExt, EthApiTypes, FromEthApiError, FullEthApiTypes, RpcBlock, + RpcNodeCore, RpcReceipt, }; use alloy_consensus::{transaction::TxHashRef, TxReceipt}; use alloy_eips::{BlockId, BlockNumberOrTag}; @@ -14,10 +14,13 @@ use futures::Future; use reth_node_api::BlockBody; use reth_primitives_traits::{AlloyBlockHeader, RecoveredBlock, SealedHeader, TransactionMeta}; use reth_rpc_convert::{transaction::ConvertReceiptInput, RpcConvert, RpcHeader}; +use reth_rpc_eth_types::EthApiError; use reth_storage_api::{ - BlockIdReader, BlockReader, HeaderProvider, ProviderHeader, ProviderReceipt, ProviderTx, + BlockIdReader, BlockReader, BlockReaderIdExt, HeaderProvider, ProviderHeader, ProviderReceipt, + ProviderTx, }; use reth_transaction_pool::{PoolTransaction, TransactionPool}; +use std::collections::HashSet; use std::sync::Arc; /// Result type of the fetched block receipts. @@ -31,6 +34,44 @@ pub type BlockAndReceiptsResult = Result< ::Error, >; +const MAX_VALIDATOR_LOOKBACK: usize = 1000; + +fn resolved_validators_threshold( + verified_validator_num: i64, + active_validator_count: Option, +) -> Result { + let missing_validator_count = || -> EthApiError { + EthApiError::InvalidParams(format!( + "Unable to derive validator-count from request value {verified_validator_num} without chain validator set" + )) + }; + + match verified_validator_num { + -1 | -2 | -3 => { + let active_validator_count = + active_validator_count.ok_or_else(|| missing_validator_count())?; + match verified_validator_num { + -1 => Ok((active_validator_count + 1) / 2), + -2 => Ok((active_validator_count * 2 + 2) / 3), + _ => Ok(active_validator_count), + } + } + value if value < 1 => Err(EthApiError::InvalidParams(format!( + "{value} neither within the range [1,{}] nor the range [-3,-1]", + active_validator_count.unwrap_or(0) + ))), + value if active_validator_count + .is_some_and(|validator_count| value > validator_count as i64) => + { + Err(EthApiError::InvalidParams(format!( + "{value} neither within the range [1,{}] nor the range [-3,-1]", + active_validator_count.unwrap_or(0) + ))) + } + value => Ok(value as usize), + } +} + /// Block related functions for the [`EthApiServer`](crate::EthApiServer) trait in the /// `eth_` namespace. pub trait EthBlocks: LoadBlock> { @@ -115,41 +156,134 @@ pub trait EthBlocks: LoadBlock=1`: explicit validator threshold. fn rpc_finalized_header( &self, - _verified_validator_num: u64, + verified_validator_num: i64, ) -> impl Future>, Self::Error>> + Send where Self: FullEthApiTypes, { async move { - // Simply delegate to rpc_block_header with Finalized tag - self.rpc_block_header(BlockNumberOrTag::Finalized.into()).await + let Some(finalized_block_number) = + self.finalized_block_number(verified_validator_num).await? + else { + return Ok(None); + }; + + self.rpc_block_header(BlockNumberOrTag::Number(finalized_block_number).into()).await } } /// Returns the finalized block. /// - /// The `verified_validator_num` parameter is provided for BSC compatibility but is ignored - /// in the implementation. In standard Ethereum, the finalized block is determined by the - /// Beacon Chain consensus (Casper FFG). + /// BSC compatibility: + /// - `verified_validator_num` is required to determine the probabilistic finalized height. + /// Accepted values are: + /// - `-1`: ceil(number_of_validators / 2) + /// - `-2`: ceil(number_of_validators * 2 / 3) + /// - `-3`: number_of_validators + /// - or `>=1`: explicit validator threshold. /// /// If `full` is true, the block object will contain all transaction objects, otherwise it will /// only contain the transaction hashes. fn rpc_finalized_block( &self, - _verified_validator_num: u64, + verified_validator_num: i64, full: bool, ) -> impl Future>, Self::Error>> + Send where Self: FullEthApiTypes, { async move { - // Simply delegate to rpc_block with Finalized tag - self.rpc_block(BlockNumberOrTag::Finalized.into(), full).await + let Some(finalized_block_number) = + self.finalized_block_number(verified_validator_num).await? + else { + return Ok(None); + }; + self.rpc_block(BlockNumberOrTag::Number(finalized_block_number).into(), full).await + } + } + + /// Returns the best-effort finalized block number according to `verified_validator_num`. + /// + /// BSC compatibility: + /// - `verified_validator_num` is required to determine the probabilistic finalized height. + /// Accepted values are: + /// - `-1`: ceil(number_of_validators / 2) + /// - `-2`: ceil(number_of_validators * 2 / 3) + /// - `-3`: number_of_validators + /// - or `>=1`: explicit validator threshold. + fn finalized_block_number( + &self, + verified_validator_num: i64, + ) -> impl Future, Self::Error>> + Send + where + Self: FullEthApiTypes, + { + async move { + let latest_header = self + .provider() + .sealed_header_by_id(BlockNumberOrTag::Latest.into()) + .map_err(Self::Error::from_eth_err)? + .ok_or_else(|| { + Self::Error::from_eth_err(EthApiError::HeaderNotFound( + BlockNumberOrTag::Latest.into(), + )) + })?; + + let fast_finalized_header = self + .provider() + .sealed_header_by_id(BlockNumberOrTag::Finalized.into()) + .map_err(Self::Error::from_eth_err)? + .ok_or_else(|| { + Self::Error::from_eth_err(EthApiError::HeaderNotFound( + BlockNumberOrTag::Finalized.into(), + )) + })?; + + let lower_bound = fast_finalized_header.number().max(1); + let active_validator_count = self.current_validators_len(); + let threshold = resolved_validators_threshold(verified_validator_num, active_validator_count)?; + if threshold == 0 { + return Ok(Some(fast_finalized_header.number())); + } + + let mut cursor = latest_header; + let mut seen_signers = HashSet::with_capacity(threshold.max(1)); + let mut probabilistic_finalized = fast_finalized_header.number(); + for i in 0..=MAX_VALIDATOR_LOOKBACK { + seen_signers.insert(cursor.beneficiary()); + probabilistic_finalized = cursor.number(); + + if seen_signers.len() >= threshold { + break; + } + + let parent_hash = cursor.parent_hash(); + if cursor.number() <= lower_bound { + break; + } + + if i == MAX_VALIDATOR_LOOKBACK { + break; + } + cursor = self + .provider() + .sealed_header_by_hash(parent_hash) + .map_err(Self::Error::from_eth_err)? + .ok_or_else(|| { + Self::Error::from_eth_err(EthApiError::HeaderNotFound(parent_hash.into())) + })?; + } + + Ok(Some(std::cmp::max(fast_finalized_header.number(), probabilistic_finalized))) } } @@ -366,6 +500,170 @@ pub trait EthBlocks: LoadBlock, + ) -> Result { + if headers.is_empty() { + return Ok(fast_finalized_number); + } + + let threshold = resolved_validators_threshold(verified_validator_num, active_validator_count)?; + if threshold == 0 { + return Ok(fast_finalized_number); + } + + let mut seen = HashSet::with_capacity(threshold.max(1)); + let mut probabilistic_finalized = headers[0].0; + for (number, beneficiary) in headers { + probabilistic_finalized = *number; + seen.insert(*beneficiary); + if seen.len() >= threshold { + break; + } + } + + Ok(std::cmp::max(fast_finalized_number, probabilistic_finalized)) + } + + #[test] + fn resolves_validator_threshold_special_cases() { + let validator_count = 9usize; + assert_eq!(resolved_validators_threshold(-1, Some(validator_count)).unwrap(), 5); + assert_eq!(resolved_validators_threshold(-2, Some(validator_count)).unwrap(), 6); + assert_eq!(resolved_validators_threshold(-3, Some(validator_count)).unwrap(), 9); + assert_eq!(resolved_validators_threshold(4, Some(validator_count)).unwrap(), 4); + assert!(resolved_validators_threshold(10, Some(validator_count)).is_err()); + assert!(resolved_validators_threshold(-4, Some(validator_count)).is_err()); + } + + fn header(number: u64, beneficiary: u8) -> (u64, Address) { + (number, Address::repeat_byte(beneficiary)) + } + + #[test] + fn finalized_block_number_uses_ceil_half_for_negative_one() { + let headers = vec![ + header(12, 0x11), + header(11, 0x22), + header(10, 0x33), + header(9, 0x11), + ]; + + // 3 validators => threshold ceil(3/2) = 2, satisfied at header #11. + assert_eq!(resolved_finalized_block_number(8, &headers, -1, Some(3)).unwrap(), 11); + } + + #[test] + fn finalized_block_number_uses_explicit_threshold() { + let headers = vec![ + header(12, 0x11), + header(11, 0x11), + header(10, 0x22), + header(9, 0x33), + header(8, 0x44), + ]; + + // 4 validators and explicit threshold 3, reached at block 9. + assert_eq!(resolved_finalized_block_number(7, &headers, 3, Some(4)).unwrap(), 9); + } + + #[test] + fn finalized_block_number_prefers_fast_finalized_if_newer() { + let headers = vec![ + header(12, 0x11), + header(11, 0x22), + header(10, 0x33), + ]; + // threshold reached at 12 but fast-finalized is higher. + assert_eq!(resolved_finalized_block_number(20, &headers, -1, Some(3)).unwrap(), 20); + } + + #[test] + fn finalized_block_number_prefers_probabilistic_if_newer_than_fast_finalized() { + let headers = vec![ + header(16, 0x11), + header(15, 0x22), + header(14, 0x33), + header(13, 0x11), + ]; + + // 3 validators, threshold ceil(3/2) = 2, reached at block 15. + assert_eq!(resolved_finalized_block_number(10, &headers, -1, Some(3)).unwrap(), 15); + } + + #[test] + fn finalized_block_number_negative_two_and_three_thresholds() { + let headers = vec![ + header(12, 0x11), + header(11, 0x22), + header(10, 0x33), + header(9, 0x44), + ]; + + // ceil(4*2/3) = 3 -> reached at block 10. + assert_eq!(resolved_finalized_block_number(0, &headers, -2, Some(4)).unwrap(), 10); + + // 4 validators, threshold 4 -> reached at block 9. + assert_eq!(resolved_finalized_block_number(0, &headers, -3, Some(4)).unwrap(), 9); + } + + #[test] + fn finalized_block_number_empty_headers_uses_fast_finalized_for_negative_values() { + let headers = Vec::new(); + assert_eq!(resolved_finalized_block_number(42, &headers, -1, Some(4)).unwrap(), 42); + } + + #[test] + fn finalized_block_number_empty_headers_uses_fast_finalized_for_positive_validator_values() { + let headers = Vec::new(); + assert_eq!(resolved_finalized_block_number(42, &headers, 5, Some(3)).unwrap(), 42); + } + + #[test] + fn finalized_block_number_rejects_invalid_threshold() { + let headers = vec![header(1, 0x11)]; + assert!(resolved_finalized_block_number(1, &headers, 0, Some(1)).is_err()); + assert!(resolved_finalized_block_number(1, &headers, -4, Some(1)).is_err()); + } + + #[test] + fn finalized_block_number_uses_active_validator_count_for_negative_values() { + let headers = vec![ + header(12, 0x11), + header(11, 0x22), + header(10, 0x33), + header(9, 0x44), + ]; + + // With active set size 20, -2 => ceil(20*2/3) = 14. Only 4 unique signers seen, so + // no probabilistic finalization should happen beyond fast-finalized height. + assert_eq!( + resolved_finalized_block_number(8, &headers, -2, Some(20)).unwrap(), + 8 + ); + } + + #[test] + fn finalized_block_number_requires_validator_count_for_negative_values() { + let headers = vec![header(3, 0x11), header(2, 0x22), header(1, 0x33)]; + + assert!( + resolved_finalized_block_number(1, &headers, -1, None) + .unwrap_err() + .to_string() + .contains("Unable to derive validator-count") + ); + } +} + /// Loads a block from database. /// /// Behaviour shared by several `eth_` RPC methods, not exclusive to `eth_` blocks RPC methods. diff --git a/crates/rpc/rpc-eth-api/src/node.rs b/crates/rpc/rpc-eth-api/src/node.rs index bde95b9c5..74ca9af3d 100644 --- a/crates/rpc/rpc-eth-api/src/node.rs +++ b/crates/rpc/rpc-eth-api/src/node.rs @@ -98,6 +98,14 @@ where pub trait RpcNodeCoreExt: RpcNodeCore { /// Returns handle to RPC cache service. fn cache(&self) -> &EthStateCache; + + /// Returns the current active validator count for the latest snapshot. + /// + /// Default implementation returns `None` to preserve behavior for non-parlia chains. + /// BSC-compatible chains should override this to provide an active validator count. + fn current_validators_len(&self) -> Option { + None + } } /// An adapter that allows to construct [`RpcNodeCore`] from components. diff --git a/crates/rpc/rpc/src/eth/builder.rs b/crates/rpc/rpc/src/eth/builder.rs index 9642ca97b..cd0cc0651 100644 --- a/crates/rpc/rpc/src/eth/builder.rs +++ b/crates/rpc/rpc/src/eth/builder.rs @@ -19,13 +19,12 @@ use reth_rpc_server_types::constants::{ DEFAULT_PROOF_PERMITS, }; use reth_tasks::{pool::BlockingTaskPool, TaskSpawner, TokioTaskExecutor}; -use std::{sync::Arc, time::Duration}; +use std::{fmt, sync::Arc, time::Duration}; /// A helper to build the `EthApi` handler instance. /// /// This builder type contains all settings to create an [`EthApiInner`] or an [`EthApi`] instance /// directly. -#[derive(Debug)] pub struct EthApiBuilder { components: N, rpc_converter: Rpc, @@ -47,6 +46,13 @@ pub struct EthApiBuilder { raw_tx_forwarder: ForwardConfig, send_raw_transaction_sync_timeout: Duration, evm_memory_limit: u64, + current_validators_len: Option Option + Send + Sync>>, +} + +impl fmt::Debug for EthApiBuilder { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("EthApiBuilder").finish_non_exhaustive() + } } impl @@ -99,6 +105,7 @@ impl EthApiBuilder { raw_tx_forwarder, send_raw_transaction_sync_timeout, evm_memory_limit, + current_validators_len, } = self; EthApiBuilder { components, @@ -121,6 +128,7 @@ impl EthApiBuilder { raw_tx_forwarder, send_raw_transaction_sync_timeout, evm_memory_limit, + current_validators_len, } } } @@ -154,6 +162,7 @@ where raw_tx_forwarder: ForwardConfig::default(), send_raw_transaction_sync_timeout: Duration::from_secs(30), evm_memory_limit: (1 << 32) - 1, + current_validators_len: None, } } } @@ -194,6 +203,7 @@ where raw_tx_forwarder, send_raw_transaction_sync_timeout, evm_memory_limit, + current_validators_len, } = self; EthApiBuilder { components, @@ -216,6 +226,7 @@ where raw_tx_forwarder, send_raw_transaction_sync_timeout, evm_memory_limit, + current_validators_len, } } @@ -245,7 +256,62 @@ where raw_tx_forwarder, send_raw_transaction_sync_timeout, evm_memory_limit, + current_validators_len, + } = self; + EthApiBuilder { + components, + rpc_converter, + gas_cap, + max_simulate_blocks, + eth_proof_window, + fee_history_cache_config, + proof_permits, + eth_state_cache_config, + eth_cache, + gas_oracle, + blocking_task_pool, + task_spawner, + gas_oracle_config, + next_env, + max_batch_size, + max_blocking_io_requests, + pending_block_kind, + raw_tx_forwarder, + send_raw_transaction_sync_timeout, + evm_memory_limit, + current_validators_len, + } + } + + /// Sets a callback that resolves the current validator set size. + pub fn with_current_validators_len(self, current_validators_len: F) -> Self + where + F: Fn() -> Option + Send + Sync + 'static, + { + let Self { + components, + rpc_converter, + gas_cap, + max_simulate_blocks, + eth_proof_window, + fee_history_cache_config, + proof_permits, + eth_state_cache_config, + eth_cache, + gas_oracle, + blocking_task_pool, + task_spawner, + gas_oracle_config, + next_env, + max_batch_size, + max_blocking_io_requests, + pending_block_kind, + raw_tx_forwarder, + send_raw_transaction_sync_timeout, + evm_memory_limit, + .. } = self; + EthApiBuilder { components, rpc_converter, @@ -267,6 +333,7 @@ where raw_tx_forwarder, send_raw_transaction_sync_timeout, evm_memory_limit, + current_validators_len: Some(Arc::new(current_validators_len)), } } @@ -502,6 +569,7 @@ where raw_tx_forwarder, send_raw_transaction_sync_timeout, evm_memory_limit, + current_validators_len, } = self; let provider = components.provider().clone(); @@ -544,6 +612,7 @@ where raw_tx_forwarder.forwarder_client(), send_raw_transaction_sync_timeout, evm_memory_limit, + current_validators_len, ) } diff --git a/crates/rpc/rpc/src/eth/core.rs b/crates/rpc/rpc/src/eth/core.rs index 04216f49f..ae57b58de 100644 --- a/crates/rpc/rpc/src/eth/core.rs +++ b/crates/rpc/rpc/src/eth/core.rs @@ -157,6 +157,7 @@ where raw_tx_forwarder: ForwardConfig, send_raw_transaction_sync_timeout: Duration, evm_memory_limit: u64, + current_validators_len: Option Option + Send + Sync>>, ) -> Self { let inner = EthApiInner::new( components, @@ -177,6 +178,7 @@ where raw_tx_forwarder.forwarder_client(), send_raw_transaction_sync_timeout, evm_memory_limit, + current_validators_len, ); Self { inner: Arc::new(inner) } @@ -234,6 +236,11 @@ where fn cache(&self) -> &EthStateCache { self.inner.cache() } + + #[inline] + fn current_validators_len(&self) -> Option { + self.inner.current_validators_len() + } } impl std::fmt::Debug for EthApi @@ -333,6 +340,9 @@ pub struct EthApiInner { /// Maximum memory the EVM can allocate per RPC request. evm_memory_limit: u64, + + /// Optional callback returning the current active validator count. + current_validators_len: Option Option + Send + Sync>>, } impl EthApiInner @@ -361,6 +371,7 @@ where raw_tx_forwarder: Option, send_raw_transaction_sync_timeout: Duration, evm_memory_limit: u64, + current_validators_len: Option Option + Send + Sync>>, ) -> Self { let signers = parking_lot::RwLock::new(Default::default()); // get the block number of the latest block @@ -405,6 +416,7 @@ where send_raw_transaction_sync_timeout, blob_sidecar_converter: BlobSidecarConverter::new(), evm_memory_limit, + current_validators_len, } } } @@ -426,6 +438,11 @@ where &self.converter } + /// Returns the current active validator count. + pub fn current_validators_len(&self) -> Option { + self.current_validators_len.as_ref().and_then(|resolve| resolve()) + } + /// Returns a handle to data in memory. #[inline] pub const fn cache(&self) -> &EthStateCache { diff --git a/crates/tracing/Cargo.toml b/crates/tracing/Cargo.toml index 5d130aff2..b61bc8e37 100644 --- a/crates/tracing/Cargo.toml +++ b/crates/tracing/Cargo.toml @@ -20,7 +20,7 @@ tracing.workspace = true tracing-subscriber = { workspace = true, features = ["env-filter", "fmt", "ansi", "json"] } tracing-appender.workspace = true tracing-journald.workspace = true -tracing-logfmt.workspace = true +tracing-logfmt = { workspace = true, optional = true } tracing-samply = { workspace = true, optional = true } tracing-tracy = { workspace = true, optional = true } tracy-client = { workspace = true, optional = true, features = ["demangle"] } @@ -34,5 +34,6 @@ rolling-file.workspace = true default = ["otlp"] otlp = ["reth-tracing-otlp"] otlp-logs = ["reth-tracing-otlp/otlp-logs"] +logfmt = ["tracing-logfmt"] samply = ["tracing-samply"] tracy = ["tracing-tracy", "tracy-client"] diff --git a/crates/tracing/src/formatter.rs b/crates/tracing/src/formatter.rs index 202a92136..984a8de6d 100644 --- a/crates/tracing/src/formatter.rs +++ b/crates/tracing/src/formatter.rs @@ -18,6 +18,7 @@ pub enum LogFormat { /// Represents logfmt (key=value) formatting for logs. /// This format is concise and human-readable, /// typically used in command-line applications. + #[cfg(feature = "logfmt")] LogFmt, /// Represents terminal-friendly formatting for logs. @@ -67,6 +68,7 @@ impl LogFormat { layer.with_filter(filter).boxed() } } + #[cfg(feature = "logfmt")] Self::LogFmt => tracing_logfmt::layer().with_filter(filter).boxed(), Self::Terminal => { let layer = tracing_subscriber::fmt::layer().with_ansi(ansi).with_target(target); @@ -85,6 +87,7 @@ impl Display for LogFormat { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { Self::Json => write!(f, "json"), + #[cfg(feature = "logfmt")] Self::LogFmt => write!(f, "logfmt"), Self::Terminal => write!(f, "terminal"), } diff --git a/crates/transaction-pool/src/validate/eth.rs b/crates/transaction-pool/src/validate/eth.rs index 381f8c223..74e5123af 100644 --- a/crates/transaction-pool/src/validate/eth.rs +++ b/crates/transaction-pool/src/validate/eth.rs @@ -95,6 +95,10 @@ pub struct EthTransactionValidator { validation_metrics: TxPoolValidationMetrics, /// Bitmap of custom transaction types that are allowed. other_tx_types: U256, + /// Whether EIP-7594 blob sidecars are accepted. + /// When false, EIP-7594 (v1) sidecars are always rejected and EIP-4844 (v0) sidecars + /// are always accepted, regardless of Osaka fork activation. + eip7594: bool, } impl EthTransactionValidator { @@ -701,16 +705,27 @@ where EthBlobTransactionSidecar::Present(sidecar) => { let now = Instant::now(); - if self.fork_tracker.is_osaka_activated() { - if sidecar.is_eip4844() { + // EIP-7594 sidecar version handling + if !self.eip7594 { + // EIP-7594 disabled: always reject v1 sidecars, accept v0 + if sidecar.is_eip7594() { return Err(InvalidPoolTransactionError::Eip4844( - Eip4844PoolTransactionError::UnexpectedEip4844SidecarAfterOsaka, + Eip4844PoolTransactionError::UnexpectedEip7594SidecarBeforeOsaka, + )); + } + } else { + // Standard Ethereum behavior + if self.fork_tracker.is_osaka_activated() { + if sidecar.is_eip4844() { + return Err(InvalidPoolTransactionError::Eip4844( + Eip4844PoolTransactionError::UnexpectedEip4844SidecarAfterOsaka, + )); + } + } else if sidecar.is_eip7594() && !self.allow_7594_sidecars() { + return Err(InvalidPoolTransactionError::Eip4844( + Eip4844PoolTransactionError::UnexpectedEip7594SidecarBeforeOsaka, )); } - } else if sidecar.is_eip7594() && !self.allow_7594_sidecars() { - return Err(InvalidPoolTransactionError::Eip4844( - Eip4844PoolTransactionError::UnexpectedEip7594SidecarBeforeOsaka, - )); } // validate the blob @@ -902,6 +917,10 @@ pub struct EthTransactionValidatorBuilder { disable_balance_check: bool, /// Bitmap of custom transaction types that are allowed. other_tx_types: U256, + /// Whether EIP-7594 blob sidecars are accepted. + /// When false, EIP-7594 (v1) sidecars are always rejected and EIP-4844 (v0) sidecars + /// are always accepted, regardless of Osaka fork activation. + eip7594: bool, } impl EthTransactionValidatorBuilder { @@ -953,6 +972,9 @@ impl EthTransactionValidatorBuilder { // no custom transaction types by default other_tx_types: U256::ZERO, + + // EIP-7594 sidecars are accepted by default (standard Ethereum behavior) + eip7594: true, } } @@ -1009,6 +1031,25 @@ impl EthTransactionValidatorBuilder { self } + /// Disables EIP-7594 blob sidecar support. + /// + /// When disabled, EIP-7594 (v1) blob sidecars are always rejected and EIP-4844 (v0) + /// sidecars are always accepted, regardless of Osaka fork activation. + /// + /// Use this for chains that do not adopt EIP-7594 (PeerDAS). + pub const fn no_eip7594(self) -> Self { + self.set_eip7594(false) + } + + /// Set EIP-7594 blob sidecar support. + /// + /// When true (default), standard Ethereum behavior applies: v0 sidecars before Osaka, + /// v1 sidecars after Osaka. When false, v1 sidecars are always rejected. + pub const fn set_eip7594(mut self, eip7594: bool) -> Self { + self.eip7594 = eip7594; + self + } + /// Disables the support for EIP-2718 transactions. pub const fn no_eip2718(self) -> Self { self.set_eip2718(false) @@ -1149,6 +1190,7 @@ impl EthTransactionValidatorBuilder { max_blob_count, additional_tasks: _, other_tx_types, + eip7594, } = self; let fork_tracker = ForkTracker { @@ -1179,6 +1221,7 @@ impl EthTransactionValidatorBuilder { _marker: Default::default(), validation_metrics: TxPoolValidationMetrics::default(), other_tx_types, + eip7594, } }