From 8105fbb03d340d94988b666cbcee68adf06a1ab1 Mon Sep 17 00:00:00 2001 From: Matus Kysel Date: Fri, 30 Jan 2026 12:38:30 +0100 Subject: [PATCH 1/8] feat(txpool): add EIP-7594 blob sidecar toggle --- crates/transaction-pool/src/validate/eth.rs | 57 ++++++++++++++++++--- 1 file changed, 50 insertions(+), 7 deletions(-) diff --git a/crates/transaction-pool/src/validate/eth.rs b/crates/transaction-pool/src/validate/eth.rs index 381f8c223..74e5123af 100644 --- a/crates/transaction-pool/src/validate/eth.rs +++ b/crates/transaction-pool/src/validate/eth.rs @@ -95,6 +95,10 @@ pub struct EthTransactionValidator { validation_metrics: TxPoolValidationMetrics, /// Bitmap of custom transaction types that are allowed. other_tx_types: U256, + /// Whether EIP-7594 blob sidecars are accepted. + /// When false, EIP-7594 (v1) sidecars are always rejected and EIP-4844 (v0) sidecars + /// are always accepted, regardless of Osaka fork activation. + eip7594: bool, } impl EthTransactionValidator { @@ -701,16 +705,27 @@ where EthBlobTransactionSidecar::Present(sidecar) => { let now = Instant::now(); - if self.fork_tracker.is_osaka_activated() { - if sidecar.is_eip4844() { + // EIP-7594 sidecar version handling + if !self.eip7594 { + // EIP-7594 disabled: always reject v1 sidecars, accept v0 + if sidecar.is_eip7594() { return Err(InvalidPoolTransactionError::Eip4844( - Eip4844PoolTransactionError::UnexpectedEip4844SidecarAfterOsaka, + Eip4844PoolTransactionError::UnexpectedEip7594SidecarBeforeOsaka, + )); + } + } else { + // Standard Ethereum behavior + if self.fork_tracker.is_osaka_activated() { + if sidecar.is_eip4844() { + return Err(InvalidPoolTransactionError::Eip4844( + Eip4844PoolTransactionError::UnexpectedEip4844SidecarAfterOsaka, + )); + } + } else if sidecar.is_eip7594() && !self.allow_7594_sidecars() { + return Err(InvalidPoolTransactionError::Eip4844( + Eip4844PoolTransactionError::UnexpectedEip7594SidecarBeforeOsaka, )); } - } else if sidecar.is_eip7594() && !self.allow_7594_sidecars() { - return Err(InvalidPoolTransactionError::Eip4844( - Eip4844PoolTransactionError::UnexpectedEip7594SidecarBeforeOsaka, - )); } // validate the blob @@ -902,6 +917,10 @@ pub struct EthTransactionValidatorBuilder { disable_balance_check: bool, /// Bitmap of custom transaction types that are allowed. other_tx_types: U256, + /// Whether EIP-7594 blob sidecars are accepted. + /// When false, EIP-7594 (v1) sidecars are always rejected and EIP-4844 (v0) sidecars + /// are always accepted, regardless of Osaka fork activation. + eip7594: bool, } impl EthTransactionValidatorBuilder { @@ -953,6 +972,9 @@ impl EthTransactionValidatorBuilder { // no custom transaction types by default other_tx_types: U256::ZERO, + + // EIP-7594 sidecars are accepted by default (standard Ethereum behavior) + eip7594: true, } } @@ -1009,6 +1031,25 @@ impl EthTransactionValidatorBuilder { self } + /// Disables EIP-7594 blob sidecar support. + /// + /// When disabled, EIP-7594 (v1) blob sidecars are always rejected and EIP-4844 (v0) + /// sidecars are always accepted, regardless of Osaka fork activation. + /// + /// Use this for chains that do not adopt EIP-7594 (PeerDAS). + pub const fn no_eip7594(self) -> Self { + self.set_eip7594(false) + } + + /// Set EIP-7594 blob sidecar support. + /// + /// When true (default), standard Ethereum behavior applies: v0 sidecars before Osaka, + /// v1 sidecars after Osaka. When false, v1 sidecars are always rejected. + pub const fn set_eip7594(mut self, eip7594: bool) -> Self { + self.eip7594 = eip7594; + self + } + /// Disables the support for EIP-2718 transactions. pub const fn no_eip2718(self) -> Self { self.set_eip2718(false) @@ -1149,6 +1190,7 @@ impl EthTransactionValidatorBuilder { max_blob_count, additional_tasks: _, other_tx_types, + eip7594, } = self; let fork_tracker = ForkTracker { @@ -1179,6 +1221,7 @@ impl EthTransactionValidatorBuilder { _marker: Default::default(), validation_metrics: TxPoolValidationMetrics::default(), other_tx_types, + eip7594, } } From 5e84e9f80106165a91d85e175fa396c9362c1883 Mon Sep 17 00:00:00 2001 From: Matus Kysel Date: Mon, 9 Feb 2026 10:12:16 +0100 Subject: [PATCH 2/8] make tracing-logfmt optional --- crates/tracing/Cargo.toml | 3 ++- crates/tracing/src/formatter.rs | 3 +++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/crates/tracing/Cargo.toml b/crates/tracing/Cargo.toml index 5d130aff2..b61bc8e37 100644 --- a/crates/tracing/Cargo.toml +++ b/crates/tracing/Cargo.toml @@ -20,7 +20,7 @@ tracing.workspace = true tracing-subscriber = { workspace = true, features = ["env-filter", "fmt", "ansi", "json"] } tracing-appender.workspace = true tracing-journald.workspace = true -tracing-logfmt.workspace = true +tracing-logfmt = { workspace = true, optional = true } tracing-samply = { workspace = true, optional = true } tracing-tracy = { workspace = true, optional = true } tracy-client = { workspace = true, optional = true, features = ["demangle"] } @@ -34,5 +34,6 @@ rolling-file.workspace = true default = ["otlp"] otlp = ["reth-tracing-otlp"] otlp-logs = ["reth-tracing-otlp/otlp-logs"] +logfmt = ["tracing-logfmt"] samply = ["tracing-samply"] tracy = ["tracing-tracy", "tracy-client"] diff --git a/crates/tracing/src/formatter.rs b/crates/tracing/src/formatter.rs index 202a92136..984a8de6d 100644 --- a/crates/tracing/src/formatter.rs +++ b/crates/tracing/src/formatter.rs @@ -18,6 +18,7 @@ pub enum LogFormat { /// Represents logfmt (key=value) formatting for logs. /// This format is concise and human-readable, /// typically used in command-line applications. + #[cfg(feature = "logfmt")] LogFmt, /// Represents terminal-friendly formatting for logs. @@ -67,6 +68,7 @@ impl LogFormat { layer.with_filter(filter).boxed() } } + #[cfg(feature = "logfmt")] Self::LogFmt => tracing_logfmt::layer().with_filter(filter).boxed(), Self::Terminal => { let layer = tracing_subscriber::fmt::layer().with_ansi(ansi).with_target(target); @@ -85,6 +87,7 @@ impl Display for LogFormat { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { Self::Json => write!(f, "json"), + #[cfg(feature = "logfmt")] Self::LogFmt => write!(f, "logfmt"), Self::Terminal => write!(f, "terminal"), } From 1e368d333296eddb48c5d04f8117c283b1e3f49b Mon Sep 17 00:00:00 2001 From: Matus Kysel Date: Thu, 19 Feb 2026 08:26:23 +0100 Subject: [PATCH 3/8] feat: align eth_getFinalizedHeader/block with BSC validator-based finalization --- crates/rpc/rpc-eth-api/src/core.rs | 25 +- crates/rpc/rpc-eth-api/src/helpers/block.rs | 324 +++++++++++++++++++- crates/rpc/rpc-eth-api/src/node.rs | 8 + crates/rpc/rpc/src/eth/builder.rs | 72 ++++- crates/rpc/rpc/src/eth/core.rs | 17 + 5 files changed, 420 insertions(+), 26 deletions(-) diff --git a/crates/rpc/rpc-eth-api/src/core.rs b/crates/rpc/rpc-eth-api/src/core.rs index c2cc42880..4939e53f5 100644 --- a/crates/rpc/rpc-eth-api/src/core.rs +++ b/crates/rpc/rpc-eth-api/src/core.rs @@ -245,22 +245,26 @@ pub trait EthApi< /// Returns the finalized block header. /// - /// The `verified_validator_num` parameter is provided for BSC compatibility but is not used - /// in standard Ethereum. The finalized block is determined by the Beacon Chain consensus - /// (Casper FFG) and requires 2/3+ validator attestations. + /// BSC compatibility: + /// - `-1` uses ceil(validators/2) + /// - `-2` uses ceil(validators*2/3) + /// - `-3` uses all validators + /// - positive values override the threshold directly. #[method(name = "getFinalizedHeader")] - async fn finalized_header(&self, verified_validator_num: u64) -> RpcResult>; + async fn finalized_header(&self, verified_validator_num: i64) -> RpcResult>; /// Returns the finalized block. /// - /// The `verified_validator_num` parameter is provided for BSC compatibility but is not used - /// in standard Ethereum. The finalized block is determined by the Beacon Chain consensus - /// (Casper FFG) and requires 2/3+ validator attestations. + /// BSC compatibility: + /// - `-1` uses ceil(validators/2) + /// - `-2` uses ceil(validators*2/3) + /// - `-3` uses all validators + /// - positive values override the threshold directly. /// /// If `full` is true, the block object will contain all transaction objects, /// otherwise it will only contain the transaction hashes. #[method(name = "getFinalizedBlock")] - async fn finalized_block(&self, verified_validator_num: u64, full: bool) -> RpcResult>; + async fn finalized_block(&self, verified_validator_num: i64, full: bool) -> RpcResult>; /// `eth_simulateV1` executes an arbitrary number of transactions on top of the requested state. /// The transactions are packed into individual blocks. Overrides can be provided. @@ -752,13 +756,13 @@ where } /// Handler for: `eth_getFinalizedHeader` - async fn finalized_header(&self, verified_validator_num: u64) -> RpcResult>> { + async fn finalized_header(&self, verified_validator_num: i64) -> RpcResult>> { trace!(target: "rpc::eth", verified_validator_num, "Serving eth_getFinalizedHeader"); Ok(EthBlocks::rpc_finalized_header(self, verified_validator_num).await?) } /// Handler for: `eth_getFinalizedBlock` - async fn finalized_block(&self, verified_validator_num: u64, full: bool) -> RpcResult>> { + async fn finalized_block(&self, verified_validator_num: i64, full: bool) -> RpcResult>> { trace!(target: "rpc::eth", verified_validator_num, ?full, "Serving eth_getFinalizedBlock"); Ok(EthBlocks::rpc_finalized_block(self, verified_validator_num, full).await?) } @@ -974,4 +978,3 @@ where Ok(EthState::get_account_info(self, address, block).await?) } } - diff --git a/crates/rpc/rpc-eth-api/src/helpers/block.rs b/crates/rpc/rpc-eth-api/src/helpers/block.rs index 505bbd04a..dc852ce4d 100644 --- a/crates/rpc/rpc-eth-api/src/helpers/block.rs +++ b/crates/rpc/rpc-eth-api/src/helpers/block.rs @@ -2,8 +2,8 @@ use super::{LoadPendingBlock, LoadReceipt, SpawnBlocking}; use crate::{ - node::RpcNodeCoreExt, EthApiTypes, FromEthApiError, FullEthApiTypes, RpcBlock, RpcNodeCore, - RpcReceipt, + node::RpcNodeCoreExt, EthApiError, EthApiTypes, FromEthApiError, FullEthApiTypes, RpcBlock, + RpcNodeCore, RpcReceipt, }; use alloy_consensus::{transaction::TxHashRef, TxReceipt}; use alloy_eips::{BlockId, BlockNumberOrTag}; @@ -18,6 +18,7 @@ use reth_storage_api::{ BlockIdReader, BlockReader, HeaderProvider, ProviderHeader, ProviderReceipt, ProviderTx, }; use reth_transaction_pool::{PoolTransaction, TransactionPool}; +use std::collections::HashSet; use std::sync::Arc; /// Result type of the fetched block receipts. @@ -31,6 +32,44 @@ pub type BlockAndReceiptsResult = Result< ::Error, >; +const MAX_VALIDATOR_LOOKBACK: usize = 1000; + +fn resolved_validators_threshold( + verified_validator_num: i64, + active_validator_count: Option, +) -> Result { + let missing_validator_count = || -> EthApiError { + Err(EthApiError::InvalidParams(format!( + "Unable to derive validator-count from request value {verified_validator_num} without chain validator set" + ))) + }; + + match verified_validator_num { + -1 | -2 | -3 => { + let active_validator_count = + active_validator_count.ok_or_else(|| missing_validator_count())?; + match verified_validator_num { + -1 => Ok((active_validator_count + 1) / 2), + -2 => Ok((active_validator_count * 2 + 2) / 3), + _ => Ok(active_validator_count), + } + } + value if value < 1 => Err(EthApiError::InvalidParams(format!( + "{value} neither within the range [1,{}] nor the range [-3,-1]", + active_validator_count.unwrap_or(0) + ))), + value if active_validator_count + .is_some_and(|validator_count| value > validator_count as i64) => + { + Err(EthApiError::InvalidParams(format!( + "{value} neither within the range [1,{}] nor the range [-3,-1]", + active_validator_count.unwrap_or(0) + ))) + } + value => Ok(value as usize), + } +} + /// Block related functions for the [`EthApiServer`](crate::EthApiServer) trait in the /// `eth_` namespace. pub trait EthBlocks: LoadBlock> { @@ -115,41 +154,134 @@ pub trait EthBlocks: LoadBlock=1`: explicit validator threshold. fn rpc_finalized_header( &self, - _verified_validator_num: u64, + verified_validator_num: i64, ) -> impl Future>, Self::Error>> + Send where Self: FullEthApiTypes, { async move { - // Simply delegate to rpc_block_header with Finalized tag - self.rpc_block_header(BlockNumberOrTag::Finalized.into()).await + let Some(finalized_block_number) = + self.finalized_block_number(verified_validator_num).await? + else { + return Ok(None); + }; + + self.rpc_block_header(BlockNumberOrTag::Number(finalized_block_number).into()).await } } /// Returns the finalized block. /// - /// The `verified_validator_num` parameter is provided for BSC compatibility but is ignored - /// in the implementation. In standard Ethereum, the finalized block is determined by the - /// Beacon Chain consensus (Casper FFG). + /// BSC compatibility: + /// - `verified_validator_num` is required to determine the probabilistic finalized height. + /// Accepted values are: + /// - `-1`: ceil(number_of_validators / 2) + /// - `-2`: ceil(number_of_validators * 2 / 3) + /// - `-3`: number_of_validators + /// - or `>=1`: explicit validator threshold. /// /// If `full` is true, the block object will contain all transaction objects, otherwise it will /// only contain the transaction hashes. fn rpc_finalized_block( &self, - _verified_validator_num: u64, + verified_validator_num: i64, full: bool, ) -> impl Future>, Self::Error>> + Send where Self: FullEthApiTypes, { async move { - // Simply delegate to rpc_block with Finalized tag - self.rpc_block(BlockNumberOrTag::Finalized.into(), full).await + let Some(finalized_block_number) = + self.finalized_block_number(verified_validator_num).await? + else { + return Ok(None); + }; + self.rpc_block(BlockNumberOrTag::Number(finalized_block_number).into(), full).await + } + } + + /// Returns the best-effort finalized block number according to `verified_validator_num`. + /// + /// BSC compatibility: + /// - `verified_validator_num` is required to determine the probabilistic finalized height. + /// Accepted values are: + /// - `-1`: ceil(number_of_validators / 2) + /// - `-2`: ceil(number_of_validators * 2 / 3) + /// - `-3`: number_of_validators + /// - or `>=1`: explicit validator threshold. + fn finalized_block_number( + &self, + verified_validator_num: i64, + ) -> impl Future, Self::Error>> + Send + where + Self: FullEthApiTypes, + { + async move { + let latest_header = self + .provider() + .sealed_header_by_id(BlockNumberOrTag::Latest.into()) + .map_err(Self::Error::from_eth_err)? + .ok_or_else(|| { + Self::Error::from_eth_err(EthApiError::HeaderNotFound( + BlockNumberOrTag::Latest.into(), + )) + })?; + + let fast_finalized_header = self + .provider() + .sealed_header_by_id(BlockNumberOrTag::Finalized.into()) + .map_err(Self::Error::from_eth_err)? + .ok_or_else(|| { + Self::Error::from_eth_err(EthApiError::HeaderNotFound( + BlockNumberOrTag::Finalized.into(), + )) + })?; + + let lower_bound = fast_finalized_header.number().max(1); + let active_validator_count = self.current_validators_len(); + let threshold = resolved_validators_threshold(verified_validator_num, active_validator_count)?; + if threshold == 0 { + return Ok(Some(fast_finalized_header.number())); + } + + let mut cursor = latest_header; + let mut seen_signers = HashSet::with_capacity(threshold.max(1)); + let mut probabilistic_finalized = fast_finalized_header.number(); + for i in 0..=MAX_VALIDATOR_LOOKBACK { + seen_signers.insert(cursor.beneficiary()); + probabilistic_finalized = cursor.number(); + + if seen_signers.len() >= threshold { + break; + } + + let parent_hash = cursor.parent_hash(); + if cursor.number() <= lower_bound { + break; + } + + if i == MAX_VALIDATOR_LOOKBACK { + break; + } + cursor = self + .provider() + .sealed_header_by_hash(parent_hash) + .map_err(Self::Error::from_eth_err)? + .ok_or_else(|| { + Self::Error::from_eth_err(EthApiError::HeaderNotFound(parent_hash.into())) + })?; + } + + Ok(Some(std::cmp::max(fast_finalized_header.number(), probabilistic_finalized))) } } @@ -366,6 +498,170 @@ pub trait EthBlocks: LoadBlock, + ) -> Result { + if headers.is_empty() { + return Ok(fast_finalized_number); + } + + let threshold = resolved_validators_threshold(verified_validator_num, active_validator_count)?; + if threshold == 0 { + return Ok(fast_finalized_number); + } + + let mut seen = HashSet::with_capacity(threshold.max(1)); + let mut probabilistic_finalized = headers[0].0; + for (number, beneficiary) in headers { + probabilistic_finalized = *number; + seen.insert(*beneficiary); + if seen.len() >= threshold { + break; + } + } + + Ok(std::cmp::max(fast_finalized_number, probabilistic_finalized)) + } + + #[test] + fn resolves_validator_threshold_special_cases() { + let validator_count = 9usize; + assert_eq!(resolved_validators_threshold(-1, Some(validator_count)).unwrap(), 5); + assert_eq!(resolved_validators_threshold(-2, Some(validator_count)).unwrap(), 6); + assert_eq!(resolved_validators_threshold(-3, Some(validator_count)).unwrap(), 9); + assert_eq!(resolved_validators_threshold(4, Some(validator_count)).unwrap(), 4); + assert!(resolved_validators_threshold(10, Some(validator_count)).is_err()); + assert!(resolved_validators_threshold(-4, Some(validator_count)).is_err()); + } + + fn header(number: u64, beneficiary: u8) -> (u64, Address) { + (number, Address::repeat_byte(beneficiary)) + } + + #[test] + fn finalized_block_number_uses_ceil_half_for_negative_one() { + let headers = vec![ + header(12, 0x11), + header(11, 0x22), + header(10, 0x33), + header(9, 0x11), + ]; + + // 3 validators => threshold ceil(3/2) = 2, satisfied at header #11. + assert_eq!(resolved_finalized_block_number(8, &headers, -1, Some(3)).unwrap(), 11); + } + + #[test] + fn finalized_block_number_uses_explicit_threshold() { + let headers = vec![ + header(12, 0x11), + header(11, 0x11), + header(10, 0x22), + header(9, 0x33), + header(8, 0x44), + ]; + + // 4 validators and explicit threshold 3, reached at block 9. + assert_eq!(resolved_finalized_block_number(7, &headers, 3, Some(4)).unwrap(), 9); + } + + #[test] + fn finalized_block_number_prefers_fast_finalized_if_newer() { + let headers = vec![ + header(12, 0x11), + header(11, 0x22), + header(10, 0x33), + ]; + // threshold reached at 12 but fast-finalized is higher. + assert_eq!(resolved_finalized_block_number(20, &headers, -1, Some(3)).unwrap(), 20); + } + + #[test] + fn finalized_block_number_prefers_probabilistic_if_newer_than_fast_finalized() { + let headers = vec![ + header(16, 0x11), + header(15, 0x22), + header(14, 0x33), + header(13, 0x11), + ]; + + // 3 validators, threshold ceil(3/2) = 2, reached at block 15. + assert_eq!(resolved_finalized_block_number(10, &headers, -1, Some(3)).unwrap(), 15); + } + + #[test] + fn finalized_block_number_negative_two_and_three_thresholds() { + let headers = vec![ + header(12, 0x11), + header(11, 0x22), + header(10, 0x33), + header(9, 0x44), + ]; + + // ceil(4*2/3) = 3 -> reached at block 10. + assert_eq!(resolved_finalized_block_number(0, &headers, -2, Some(4)).unwrap(), 10); + + // 4 validators, threshold 4 -> reached at block 9. + assert_eq!(resolved_finalized_block_number(0, &headers, -3, Some(4)).unwrap(), 9); + } + + #[test] + fn finalized_block_number_empty_headers_uses_fast_finalized_for_negative_values() { + let headers = Vec::new(); + assert_eq!(resolved_finalized_block_number(42, &headers, -1, Some(4)).unwrap(), 42); + } + + #[test] + fn finalized_block_number_empty_headers_uses_fast_finalized_for_positive_validator_values() { + let headers = Vec::new(); + assert_eq!(resolved_finalized_block_number(42, &headers, 5, Some(3)).unwrap(), 42); + } + + #[test] + fn finalized_block_number_rejects_invalid_threshold() { + let headers = vec![header(1, 0x11)]; + assert!(resolved_finalized_block_number(1, &headers, 0, Some(1)).is_err()); + assert!(resolved_finalized_block_number(1, &headers, -4, Some(1)).is_err()); + } + + #[test] + fn finalized_block_number_uses_active_validator_count_for_negative_values() { + let headers = vec![ + header(12, 0x11), + header(11, 0x22), + header(10, 0x33), + header(9, 0x44), + ]; + + // With active set size 20, -2 => ceil(20*2/3) = 14. Only 4 unique signers seen, so + // no probabilistic finalization should happen beyond fast-finalized height. + assert_eq!( + resolved_finalized_block_number(8, &headers, -2, Some(20)).unwrap(), + 8 + ); + } + + #[test] + fn finalized_block_number_requires_validator_count_for_negative_values() { + let headers = vec![header(3, 0x11), header(2, 0x22), header(1, 0x33)]; + + assert!( + resolved_finalized_block_number(1, &headers, -1, None) + .unwrap_err() + .to_string() + .contains("Unable to derive validator-count") + ); + } +} + /// Loads a block from database. /// /// Behaviour shared by several `eth_` RPC methods, not exclusive to `eth_` blocks RPC methods. diff --git a/crates/rpc/rpc-eth-api/src/node.rs b/crates/rpc/rpc-eth-api/src/node.rs index bde95b9c5..74ca9af3d 100644 --- a/crates/rpc/rpc-eth-api/src/node.rs +++ b/crates/rpc/rpc-eth-api/src/node.rs @@ -98,6 +98,14 @@ where pub trait RpcNodeCoreExt: RpcNodeCore { /// Returns handle to RPC cache service. fn cache(&self) -> &EthStateCache; + + /// Returns the current active validator count for the latest snapshot. + /// + /// Default implementation returns `None` to preserve behavior for non-parlia chains. + /// BSC-compatible chains should override this to provide an active validator count. + fn current_validators_len(&self) -> Option { + None + } } /// An adapter that allows to construct [`RpcNodeCore`] from components. diff --git a/crates/rpc/rpc/src/eth/builder.rs b/crates/rpc/rpc/src/eth/builder.rs index 9642ca97b..ad70fab9b 100644 --- a/crates/rpc/rpc/src/eth/builder.rs +++ b/crates/rpc/rpc/src/eth/builder.rs @@ -19,7 +19,7 @@ use reth_rpc_server_types::constants::{ DEFAULT_PROOF_PERMITS, }; use reth_tasks::{pool::BlockingTaskPool, TaskSpawner, TokioTaskExecutor}; -use std::{sync::Arc, time::Duration}; +use std::{fmt, sync::Arc, time::Duration}; /// A helper to build the `EthApi` handler instance. /// @@ -47,6 +47,13 @@ pub struct EthApiBuilder { raw_tx_forwarder: ForwardConfig, send_raw_transaction_sync_timeout: Duration, evm_memory_limit: u64, + current_validators_len: Option Option + Send + Sync>>, +} + +impl fmt::Debug for EthApiBuilder { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("EthApiBuilder").finish_non_exhaustive() + } } impl @@ -99,6 +106,7 @@ impl EthApiBuilder { raw_tx_forwarder, send_raw_transaction_sync_timeout, evm_memory_limit, + current_validators_len, } = self; EthApiBuilder { components, @@ -121,6 +129,7 @@ impl EthApiBuilder { raw_tx_forwarder, send_raw_transaction_sync_timeout, evm_memory_limit, + current_validators_len, } } } @@ -154,6 +163,7 @@ where raw_tx_forwarder: ForwardConfig::default(), send_raw_transaction_sync_timeout: Duration::from_secs(30), evm_memory_limit: (1 << 32) - 1, + current_validators_len: None, } } } @@ -194,6 +204,7 @@ where raw_tx_forwarder, send_raw_transaction_sync_timeout, evm_memory_limit, + current_validators_len, } = self; EthApiBuilder { components, @@ -216,6 +227,7 @@ where raw_tx_forwarder, send_raw_transaction_sync_timeout, evm_memory_limit, + current_validators_len, } } @@ -245,7 +257,62 @@ where raw_tx_forwarder, send_raw_transaction_sync_timeout, evm_memory_limit, + current_validators_len, + } = self; + EthApiBuilder { + components, + rpc_converter, + gas_cap, + max_simulate_blocks, + eth_proof_window, + fee_history_cache_config, + proof_permits, + eth_state_cache_config, + eth_cache, + gas_oracle, + blocking_task_pool, + task_spawner, + gas_oracle_config, + next_env, + max_batch_size, + max_blocking_io_requests, + pending_block_kind, + raw_tx_forwarder, + send_raw_transaction_sync_timeout, + evm_memory_limit, + current_validators_len, + } + } + + /// Sets a callback that resolves the current validator set size. + pub fn with_current_validators_len(self, current_validators_len: F) -> Self + where + F: Fn() -> Option + Send + Sync + 'static, + { + let Self { + components, + rpc_converter, + gas_cap, + max_simulate_blocks, + eth_proof_window, + fee_history_cache_config, + proof_permits, + eth_state_cache_config, + eth_cache, + gas_oracle, + blocking_task_pool, + task_spawner, + gas_oracle_config, + next_env, + max_batch_size, + max_blocking_io_requests, + pending_block_kind, + raw_tx_forwarder, + send_raw_transaction_sync_timeout, + evm_memory_limit, + .. } = self; + EthApiBuilder { components, rpc_converter, @@ -267,6 +334,7 @@ where raw_tx_forwarder, send_raw_transaction_sync_timeout, evm_memory_limit, + current_validators_len: Some(Arc::new(current_validators_len)), } } @@ -502,6 +570,7 @@ where raw_tx_forwarder, send_raw_transaction_sync_timeout, evm_memory_limit, + current_validators_len, } = self; let provider = components.provider().clone(); @@ -544,6 +613,7 @@ where raw_tx_forwarder.forwarder_client(), send_raw_transaction_sync_timeout, evm_memory_limit, + current_validators_len, ) } diff --git a/crates/rpc/rpc/src/eth/core.rs b/crates/rpc/rpc/src/eth/core.rs index 04216f49f..ae57b58de 100644 --- a/crates/rpc/rpc/src/eth/core.rs +++ b/crates/rpc/rpc/src/eth/core.rs @@ -157,6 +157,7 @@ where raw_tx_forwarder: ForwardConfig, send_raw_transaction_sync_timeout: Duration, evm_memory_limit: u64, + current_validators_len: Option Option + Send + Sync>>, ) -> Self { let inner = EthApiInner::new( components, @@ -177,6 +178,7 @@ where raw_tx_forwarder.forwarder_client(), send_raw_transaction_sync_timeout, evm_memory_limit, + current_validators_len, ); Self { inner: Arc::new(inner) } @@ -234,6 +236,11 @@ where fn cache(&self) -> &EthStateCache { self.inner.cache() } + + #[inline] + fn current_validators_len(&self) -> Option { + self.inner.current_validators_len() + } } impl std::fmt::Debug for EthApi @@ -333,6 +340,9 @@ pub struct EthApiInner { /// Maximum memory the EVM can allocate per RPC request. evm_memory_limit: u64, + + /// Optional callback returning the current active validator count. + current_validators_len: Option Option + Send + Sync>>, } impl EthApiInner @@ -361,6 +371,7 @@ where raw_tx_forwarder: Option, send_raw_transaction_sync_timeout: Duration, evm_memory_limit: u64, + current_validators_len: Option Option + Send + Sync>>, ) -> Self { let signers = parking_lot::RwLock::new(Default::default()); // get the block number of the latest block @@ -405,6 +416,7 @@ where send_raw_transaction_sync_timeout, blob_sidecar_converter: BlobSidecarConverter::new(), evm_memory_limit, + current_validators_len, } } } @@ -426,6 +438,11 @@ where &self.converter } + /// Returns the current active validator count. + pub fn current_validators_len(&self) -> Option { + self.current_validators_len.as_ref().and_then(|resolve| resolve()) + } + /// Returns a handle to data in memory. #[inline] pub const fn cache(&self) -> &EthStateCache { From 3237b7248ec38d8a03bc0c6725691cb31bdcfdd0 Mon Sep 17 00:00:00 2001 From: Matus Kysel Date: Fri, 20 Feb 2026 14:06:08 +0100 Subject: [PATCH 4/8] fix(rpc): resolve broken imports in block.rs - Import EthApiError from reth_rpc_eth_types instead of crate root - Add BlockReaderIdExt to reth_storage_api imports for sealed_header_by_id - Fix closure return type mismatch in resolved_validators_threshold --- crates/rpc/rpc-eth-api/src/helpers/block.rs | 10 ++++++---- crates/rpc/rpc/src/eth/builder.rs | 1 - 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/crates/rpc/rpc-eth-api/src/helpers/block.rs b/crates/rpc/rpc-eth-api/src/helpers/block.rs index dc852ce4d..29d7d4c71 100644 --- a/crates/rpc/rpc-eth-api/src/helpers/block.rs +++ b/crates/rpc/rpc-eth-api/src/helpers/block.rs @@ -2,7 +2,7 @@ use super::{LoadPendingBlock, LoadReceipt, SpawnBlocking}; use crate::{ - node::RpcNodeCoreExt, EthApiError, EthApiTypes, FromEthApiError, FullEthApiTypes, RpcBlock, + node::RpcNodeCoreExt, EthApiTypes, FromEthApiError, FullEthApiTypes, RpcBlock, RpcNodeCore, RpcReceipt, }; use alloy_consensus::{transaction::TxHashRef, TxReceipt}; @@ -14,8 +14,10 @@ use futures::Future; use reth_node_api::BlockBody; use reth_primitives_traits::{AlloyBlockHeader, RecoveredBlock, SealedHeader, TransactionMeta}; use reth_rpc_convert::{transaction::ConvertReceiptInput, RpcConvert, RpcHeader}; +use reth_rpc_eth_types::EthApiError; use reth_storage_api::{ - BlockIdReader, BlockReader, HeaderProvider, ProviderHeader, ProviderReceipt, ProviderTx, + BlockIdReader, BlockReader, BlockReaderIdExt, HeaderProvider, ProviderHeader, ProviderReceipt, + ProviderTx, }; use reth_transaction_pool::{PoolTransaction, TransactionPool}; use std::collections::HashSet; @@ -39,9 +41,9 @@ fn resolved_validators_threshold( active_validator_count: Option, ) -> Result { let missing_validator_count = || -> EthApiError { - Err(EthApiError::InvalidParams(format!( + EthApiError::InvalidParams(format!( "Unable to derive validator-count from request value {verified_validator_num} without chain validator set" - ))) + )) }; match verified_validator_num { diff --git a/crates/rpc/rpc/src/eth/builder.rs b/crates/rpc/rpc/src/eth/builder.rs index ad70fab9b..cd0cc0651 100644 --- a/crates/rpc/rpc/src/eth/builder.rs +++ b/crates/rpc/rpc/src/eth/builder.rs @@ -25,7 +25,6 @@ use std::{fmt, sync::Arc, time::Duration}; /// /// This builder type contains all settings to create an [`EthApiInner`] or an [`EthApi`] instance /// directly. -#[derive(Debug)] pub struct EthApiBuilder { components: N, rpc_converter: Rpc, From 8b65d2a4b8719e562b0fff1ada4490343070b00a Mon Sep 17 00:00:00 2001 From: cbh876 <3930922419@qq.com> Date: Sat, 28 Feb 2026 17:46:51 +0800 Subject: [PATCH 5/8] fix: deferred trie --- crates/chain-state/src/deferred_trie.rs | 75 +++++++++---------------- 1 file changed, 26 insertions(+), 49 deletions(-) diff --git a/crates/chain-state/src/deferred_trie.rs b/crates/chain-state/src/deferred_trie.rs index 6e758a122..f0cdd909b 100644 --- a/crates/chain-state/src/deferred_trie.rs +++ b/crates/chain-state/src/deferred_trie.rs @@ -146,9 +146,8 @@ impl DeferredTrieData { /// /// # Process /// 1. Sort the current block's hashed state and trie updates - /// 2. Reuse parent's cached overlay if available (O(1) - the common case) - /// 3. Otherwise, rebuild overlay from ancestors (rare fallback) - /// 4. Extend the overlay with this block's sorted data + /// 2. Rebuild overlay from ancestors' per-block data + /// 3. Extend the overlay with this block's sorted data /// /// Used by both the async background task and the synchronous fallback path. /// @@ -172,46 +171,25 @@ impl DeferredTrieData { Err(arc) => arc.clone_into_sorted(), }; - // Reuse parent's overlay if available and anchors match. - // We can only reuse the parent's overlay if it was built on top of the same - // persisted anchor. If the anchor has changed (e.g., due to persistence), - // the parent's overlay is relative to an old state and cannot be used. - let overlay = if let Some(parent) = ancestors.last() { - let parent_data = parent.wait_cloned(); - - match &parent_data.anchored_trie_input { - // Case 1: Parent has cached overlay AND anchors match. - Some(AnchoredTrieInput { anchor_hash: parent_anchor, trie_input }) - if *parent_anchor == anchor_hash => - { - // O(1): Reuse parent's overlay, extend with current block's data. - let mut overlay = TrieInputSorted::new( - Arc::clone(&trie_input.nodes), - Arc::clone(&trie_input.state), - Default::default(), // prefix_sets are per-block, not cumulative - ); - // Only trigger COW clone if there's actually data to add. - if !sorted_hashed_state.is_empty() { - Arc::make_mut(&mut overlay.state).extend_ref_and_sort(&sorted_hashed_state); - } - if !sorted_trie_updates.is_empty() { - Arc::make_mut(&mut overlay.nodes).extend_ref_and_sort(&sorted_trie_updates); - } - overlay - } - // Case 2: Parent exists but anchor mismatch or no cached overlay. - // We must rebuild from the ancestors list (which only contains unpersisted blocks). - _ => Self::merge_ancestors_into_overlay( - ancestors, - &sorted_hashed_state, - &sorted_trie_updates, - ), - } - } else { - // Case 3: No in-memory ancestors (first block after persisted anchor). - // Build overlay with just this block's data. - Self::merge_ancestors_into_overlay(&[], &sorted_hashed_state, &sorted_trie_updates) - }; + // Always rebuild the overlay from ancestors' per-block data instead of reusing + // the parent's cached cumulative overlay. + // + // The previous approach cloned the parent's overlay via Arc and then called + // Arc::make_mut to extend it. Because the parent's cached ComputedTrieData also + // holds a reference to the same Arc, strong_count is always >= 2, forcing + // Arc::make_mut to deep-copy the entire cumulative overlay on every block. + // For high-throughput chains with large state (e.g., BSC with 3-second blocks), + // this causes uncontrollable memory growth and OOM. + // + // The rebuild path starts with a fresh TrieInputSorted (strong_count == 1) and + // extends it with each ancestor's per-block hashed_state and trie_updates. + // This is O(sum of ancestors' per-block state) which is bounded by + // persistence_threshold * per_block_state_size, and avoids deep copies entirely. + let overlay = Self::merge_ancestors_into_overlay( + ancestors, + &sorted_hashed_state, + &sorted_trie_updates, + ); ComputedTrieData::with_trie_input( Arc::new(sorted_hashed_state), @@ -223,13 +201,12 @@ impl DeferredTrieData { /// Merge all ancestors and current block's data into a single overlay. /// - /// This is a rare fallback path, only used when no ancestor has a cached - /// `anchored_trie_input` (e.g., blocks created via alternative constructors). - /// In normal operation, the parent always has a cached overlay and this - /// function is never called. + /// Builds a fresh [`TrieInputSorted`] by iterating ancestors oldest -> newest, + /// extending with each ancestor's per-block `hashed_state` and `trie_updates`, + /// then extending with the current block's sorted data (so later state wins). /// - /// Iterates ancestors oldest -> newest, then extends with current block's data, - /// so later state takes precedence. + /// Starts from `TrieInputSorted::default()` whose inner `Arc`s have + /// `strong_count == 1`, so `Arc::make_mut` never triggers a deep copy. fn merge_ancestors_into_overlay( ancestors: &[Self], sorted_hashed_state: &HashedPostStateSorted, From 32440cb2c5b3f0ad66843d8ba37590f65c5f009b Mon Sep 17 00:00:00 2001 From: cbh876 <3930922419@qq.com> Date: Sun, 1 Mar 2026 11:28:48 +0800 Subject: [PATCH 6/8] test: add debug log --- crates/chain-state/src/in_memory.rs | 11 ++- crates/engine/tree/src/persistence.rs | 110 +++++++++++++++++++++++++- crates/engine/tree/src/tree/mod.rs | 63 ++++++++++++++- crates/engine/tree/src/tree/state.rs | 8 ++ 4 files changed, 183 insertions(+), 9 deletions(-) diff --git a/crates/chain-state/src/in_memory.rs b/crates/chain-state/src/in_memory.rs index 311830dbc..23de6eac1 100644 --- a/crates/chain-state/src/in_memory.rs +++ b/crates/chain-state/src/in_memory.rs @@ -323,8 +323,15 @@ impl CanonicalInMemoryState { // // This can happen if the persistence task takes a long time, while a reorg is happening. { - if self.inner.in_memory_state.blocks.read().get(&persisted_num_hash.hash).is_none() { - // do nothing + let blocks = self.inner.in_memory_state.blocks.read(); + if blocks.get(&persisted_num_hash.hash).is_none() { + tracing::warn!( + target: "engine::tree", + persisted_hash = ?persisted_num_hash.hash, + persisted_number = persisted_num_hash.number, + in_memory_block_count = blocks.len(), + "Skipping remove_persisted_blocks: persisted hash NOT found in canonical in-memory state" + ); return } } diff --git a/crates/engine/tree/src/persistence.rs b/crates/engine/tree/src/persistence.rs index d867e91ca..3295013b1 100644 --- a/crates/engine/tree/src/persistence.rs +++ b/crates/engine/tree/src/persistence.rs @@ -75,8 +75,22 @@ where /// This is the main loop, that will listen to database events and perform the requested /// database actions pub fn run(mut self) -> Result<(), PersistenceError> { + tracing::warn!(target: "engine::persistence", "Persistence service thread started, waiting for actions"); + // If the receiver errors then senders have disconnected, so the loop should then end. while let Ok(action) = self.incoming.recv() { + let action_name = match &action { + PersistenceAction::RemoveBlocksAbove(_, _) => "RemoveBlocksAbove", + PersistenceAction::SaveBlocks(_, _) => "SaveBlocks", + PersistenceAction::SaveFinalizedBlock(_) => "SaveFinalizedBlock", + PersistenceAction::SaveSafeBlock(_) => "SaveSafeBlock", + }; + tracing::warn!( + target: "engine::persistence", + action = action_name, + "Persistence service: received action" + ); + match action { PersistenceAction::RemoveBlocksAbove(new_tip_num, sender) => { let result = self.on_remove_blocks_above(new_tip_num)?; @@ -87,8 +101,25 @@ where let _ = sender.send(result); } PersistenceAction::SaveBlocks(blocks, sender) => { + let block_count = blocks.len(); + let first = blocks.first().map(|b| b.recovered_block.num_hash()); + let last = blocks.last().map(|b| b.recovered_block.num_hash()); + tracing::warn!( + target: "engine::persistence", + block_count, + first = ?first, + last = ?last, + "SaveBlocks: starting on_save_blocks" + ); + let save_start = Instant::now(); let result = self.on_save_blocks(blocks)?; let result_number = result.map(|r| r.number); + tracing::warn!( + target: "engine::persistence", + ?result, + elapsed = ?save_start.elapsed(), + "SaveBlocks: on_save_blocks completed" + ); // we ignore the error because the caller may or may not care about the result let _ = sender.send(result); @@ -100,20 +131,67 @@ where .send(MetricEvent::SyncHeight { height: block_number }); if self.pruner.is_pruning_needed(block_number) { - // We log `PrunerOutput` inside the `Pruner` + tracing::warn!( + target: "engine::persistence", + block_number, + "SaveBlocks: pruning needed, starting prune" + ); + let prune_start = Instant::now(); let _ = self.prune_before(block_number)?; + tracing::warn!( + target: "engine::persistence", + block_number, + elapsed = ?prune_start.elapsed(), + "SaveBlocks: pruning completed" + ); } } } PersistenceAction::SaveFinalizedBlock(finalized_block) => { + tracing::warn!( + target: "engine::persistence", + finalized_block, + "SaveFinalizedBlock: acquiring write lock" + ); + let rw_start = Instant::now(); let provider = self.provider.database_provider_rw()?; + tracing::warn!( + target: "engine::persistence", + finalized_block, + elapsed = ?rw_start.elapsed(), + "SaveFinalizedBlock: write lock acquired, saving" + ); provider.save_finalized_block_number(finalized_block)?; provider.commit()?; + tracing::warn!( + target: "engine::persistence", + finalized_block, + total_elapsed = ?rw_start.elapsed(), + "SaveFinalizedBlock: completed" + ); } PersistenceAction::SaveSafeBlock(safe_block) => { + tracing::warn!( + target: "engine::persistence", + safe_block, + "SaveSafeBlock: acquiring write lock" + ); + let rw_start = Instant::now(); let provider = self.provider.database_provider_rw()?; + tracing::warn!( + target: "engine::persistence", + safe_block, + elapsed = ?rw_start.elapsed(), + "SaveSafeBlock: write lock acquired, saving" + ); provider.save_safe_block_number(safe_block)?; provider.commit()?; + tracing::warn!( + target: "engine::persistence", + safe_block, + total_elapsed = ?rw_start.elapsed(), + "SaveSafeBlock: completed" + ); } } } @@ -144,19 +222,43 @@ where let first_block = blocks.first().map(|b| b.recovered_block.num_hash()); let last_block = blocks.last().map(|b| b.recovered_block.num_hash()); let block_count = blocks.len(); - debug!(target: "engine::persistence", ?block_count, first=?first_block, last=?last_block, "Saving range of blocks"); let start_time = Instant::now(); if last_block.is_some() { + tracing::warn!( + target: "engine::persistence", + block_count, + first = ?first_block, + last = ?last_block, + "on_save_blocks: acquiring database_provider_rw" + ); + let rw_start = Instant::now(); let provider_rw = self.provider.database_provider_rw()?; + tracing::warn!( + target: "engine::persistence", + elapsed = ?rw_start.elapsed(), + "on_save_blocks: database_provider_rw acquired" + ); + let save_start = Instant::now(); provider_rw.save_blocks(blocks, SaveBlocksMode::Full)?; + tracing::warn!( + target: "engine::persistence", + elapsed = ?save_start.elapsed(), + "on_save_blocks: save_blocks completed, committing" + ); + + let commit_start = Instant::now(); provider_rw.commit()?; + tracing::warn!( + target: "engine::persistence", + commit_elapsed = ?commit_start.elapsed(), + total_elapsed = ?start_time.elapsed(), + "on_save_blocks: commit completed" + ); } - debug!(target: "engine::persistence", first=?first_block, last=?last_block, "Saved range of blocks"); - self.metrics.save_blocks_block_count.record(block_count as f64); self.metrics.save_blocks_duration_seconds.record(start_time.elapsed()); diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index 2b0f35fe8..5def77504 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -1275,7 +1275,14 @@ where .map(|b| b.recovered_block().num_hash()) .expect("Checked non-empty persisting blocks"); - debug!(target: "engine::tree", count=blocks_to_persist.len(), blocks = ?blocks_to_persist.iter().map(|block| block.recovered_block().num_hash()).collect::>(), "Persisting blocks"); + warn!( + target: "engine::tree", + count = blocks_to_persist.len(), + first_block = blocks_to_persist.first().map(|b| b.recovered_block().number()).unwrap_or(0), + last_block = blocks_to_persist.last().map(|b| b.recovered_block().number()).unwrap_or(0), + in_memory_blocks = self.state.tree_state.block_count(), + "Starting persistence task" + ); let (tx, rx) = crossbeam_channel::bounded(1); let _ = self.persistence.save_blocks(blocks_to_persist, tx); @@ -1287,6 +1294,19 @@ where /// This checks if we need to remove blocks (disk reorg) or save new blocks to disk. /// Persistence completion is handled separately via the `wait_for_event` method. fn advance_persistence(&mut self) -> Result<(), AdvancePersistenceError> { + let block_count = self.state.tree_state.block_count(); + if block_count > 64 { + warn!( + target: "engine::tree", + block_count, + persistence_in_progress = self.persistence_state.in_progress(), + canonical_head = self.state.tree_state.canonical_block_number(), + last_persisted = self.persistence_state.last_persisted_block.number, + backfill_idle = self.backfill_sync_state.is_idle(), + "In-memory block count exceeds 64, possible persistence stall" + ); + } + if !self.persistence_state.in_progress() { if let Some(new_tip_num) = self.find_disk_reorg()? { self.remove_blocks(new_tip_num) @@ -1366,7 +1386,9 @@ where last_persisted_hash_num: Option, start_time: Instant, ) -> Result<(), AdvancePersistenceError> { - self.metrics.engine.persistence_duration.record(start_time.elapsed()); + let elapsed = start_time.elapsed(); + self.metrics.engine.persistence_duration.record(elapsed); + let block_count_before = self.state.tree_state.block_count(); let Some(BlockNumHash { hash: last_persisted_block_hash, @@ -1378,7 +1400,15 @@ where return Ok(()) }; - debug!(target: "engine::tree", ?last_persisted_block_hash, ?last_persisted_block_number, elapsed=?start_time.elapsed(), "Finished persisting, calling finish"); + warn!( + target: "engine::tree", + ?last_persisted_block_hash, + last_persisted_block_number, + ?elapsed, + block_count_before, + canonical_head = self.state.tree_state.canonical_block_number(), + "Persistence completed, starting cleanup" + ); self.persistence_state.finish(last_persisted_block_hash, last_persisted_block_number); // Evict trie changesets for blocks below the finalized block, but keep at least 64 blocks @@ -1396,6 +1426,16 @@ where } self.on_new_persisted_block()?; + + let block_count_after = self.state.tree_state.block_count(); + warn!( + target: "engine::tree", + block_count_after, + blocks_removed = block_count_before.saturating_sub(block_count_after), + last_persisted = self.persistence_state.last_persisted_block.number, + "Cleanup after persistence completed" + ); + Ok(()) } @@ -1851,11 +1891,28 @@ where // If we have an on-disk reorg, we need to handle it first before touching the in-memory // state. if let Some(remove_above) = self.find_disk_reorg()? { + warn!( + target: "engine::tree", + remove_above, + last_persisted = self.persistence_state.last_persisted_block.number, + canonical_head = self.state.tree_state.canonical_block_number(), + "Disk reorg detected in on_new_persisted_block, skipping normal cleanup" + ); self.remove_blocks(remove_above); return Ok(()) } let finalized = self.state.forkchoice_state_tracker.last_valid_finalized(); + let persisted = self.persistence_state.last_persisted_block; + let is_canonical = self.state.tree_state.is_canonical(persisted.hash); + warn!( + target: "engine::tree", + persisted_number = persisted.number, + ?finalized, + is_persisted_hash_canonical = is_canonical, + "on_new_persisted_block: about to remove_before" + ); + self.remove_before(self.persistence_state.last_persisted_block, finalized)?; self.canonical_in_memory_state.remove_persisted_blocks(BlockNumHash { number: self.persistence_state.last_persisted_block.number, diff --git a/crates/engine/tree/src/tree/state.rs b/crates/engine/tree/src/tree/state.rs index 0a13207e6..e347924e2 100644 --- a/crates/engine/tree/src/tree/state.rs +++ b/crates/engine/tree/src/tree/state.rs @@ -177,6 +177,14 @@ impl TreeState { // If the last persisted hash is not canonical, then we don't want to remove any canonical // blocks yet. if !self.is_canonical(last_persisted_hash) { + tracing::warn!( + target: "engine::tree", + ?upper_bound, + ?last_persisted_hash, + block_count = self.blocks_by_hash.len(), + canonical_head = ?self.current_canonical_head, + "Skipping canonical block removal: last_persisted_hash is NOT canonical" + ); return } From 48a3878ef1d937d606758f17dec60b9b9dd0059a Mon Sep 17 00:00:00 2001 From: cbh876 <3930922419@qq.com> Date: Sun, 1 Mar 2026 13:01:07 +0800 Subject: [PATCH 7/8] feat: skip pruning --- crates/engine/tree/src/persistence.rs | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/crates/engine/tree/src/persistence.rs b/crates/engine/tree/src/persistence.rs index 3295013b1..ce0b920e8 100644 --- a/crates/engine/tree/src/persistence.rs +++ b/crates/engine/tree/src/persistence.rs @@ -130,19 +130,15 @@ where .sync_metrics_tx .send(MetricEvent::SyncHeight { height: block_number }); + // BSC: skip inline pruning to prevent persistence service stall. + // The pruner can hang on large BSC mainnet data, blocking all + // subsequent SaveBlocks and causing unbounded memory growth (OOM). + // Pruning should be handled externally or in a non-blocking way. if self.pruner.is_pruning_needed(block_number) { tracing::warn!( target: "engine::persistence", block_number, - "SaveBlocks: pruning needed, starting prune" - ); - let prune_start = Instant::now(); - let _ = self.prune_before(block_number)?; - tracing::warn!( - target: "engine::persistence", - block_number, - elapsed = ?prune_start.elapsed(), - "SaveBlocks: pruning completed" + "SaveBlocks: pruning needed but SKIPPED to avoid persistence stall" ); } } From 6b505395de08d256ce9b3ec0f8892191a2a99b27 Mon Sep 17 00:00:00 2001 From: cbh876 <3930922419@qq.com> Date: Sun, 1 Mar 2026 13:48:15 +0800 Subject: [PATCH 8/8] feat: skip prune --- crates/chain-state/src/in_memory.rs | 11 +-- crates/engine/tree/src/persistence.rs | 102 ++------------------------ crates/engine/tree/src/tree/mod.rs | 26 +------ crates/engine/tree/src/tree/state.rs | 8 -- 4 files changed, 8 insertions(+), 139 deletions(-) diff --git a/crates/chain-state/src/in_memory.rs b/crates/chain-state/src/in_memory.rs index 23de6eac1..311830dbc 100644 --- a/crates/chain-state/src/in_memory.rs +++ b/crates/chain-state/src/in_memory.rs @@ -323,15 +323,8 @@ impl CanonicalInMemoryState { // // This can happen if the persistence task takes a long time, while a reorg is happening. { - let blocks = self.inner.in_memory_state.blocks.read(); - if blocks.get(&persisted_num_hash.hash).is_none() { - tracing::warn!( - target: "engine::tree", - persisted_hash = ?persisted_num_hash.hash, - persisted_number = persisted_num_hash.number, - in_memory_block_count = blocks.len(), - "Skipping remove_persisted_blocks: persisted hash NOT found in canonical in-memory state" - ); + if self.inner.in_memory_state.blocks.read().get(&persisted_num_hash.hash).is_none() { + // do nothing return } } diff --git a/crates/engine/tree/src/persistence.rs b/crates/engine/tree/src/persistence.rs index ce0b920e8..a7e1de823 100644 --- a/crates/engine/tree/src/persistence.rs +++ b/crates/engine/tree/src/persistence.rs @@ -75,22 +75,8 @@ where /// This is the main loop, that will listen to database events and perform the requested /// database actions pub fn run(mut self) -> Result<(), PersistenceError> { - tracing::warn!(target: "engine::persistence", "Persistence service thread started, waiting for actions"); - // If the receiver errors then senders have disconnected, so the loop should then end. while let Ok(action) = self.incoming.recv() { - let action_name = match &action { - PersistenceAction::RemoveBlocksAbove(_, _) => "RemoveBlocksAbove", - PersistenceAction::SaveBlocks(_, _) => "SaveBlocks", - PersistenceAction::SaveFinalizedBlock(_) => "SaveFinalizedBlock", - PersistenceAction::SaveSafeBlock(_) => "SaveSafeBlock", - }; - tracing::warn!( - target: "engine::persistence", - action = action_name, - "Persistence service: received action" - ); - match action { PersistenceAction::RemoveBlocksAbove(new_tip_num, sender) => { let result = self.on_remove_blocks_above(new_tip_num)?; @@ -101,25 +87,8 @@ where let _ = sender.send(result); } PersistenceAction::SaveBlocks(blocks, sender) => { - let block_count = blocks.len(); - let first = blocks.first().map(|b| b.recovered_block.num_hash()); - let last = blocks.last().map(|b| b.recovered_block.num_hash()); - tracing::warn!( - target: "engine::persistence", - block_count, - first = ?first, - last = ?last, - "SaveBlocks: starting on_save_blocks" - ); - let save_start = Instant::now(); let result = self.on_save_blocks(blocks)?; let result_number = result.map(|r| r.number); - tracing::warn!( - target: "engine::persistence", - ?result, - elapsed = ?save_start.elapsed(), - "SaveBlocks: on_save_blocks completed" - ); // we ignore the error because the caller may or may not care about the result let _ = sender.send(result); @@ -133,61 +102,24 @@ where // BSC: skip inline pruning to prevent persistence service stall. // The pruner can hang on large BSC mainnet data, blocking all // subsequent SaveBlocks and causing unbounded memory growth (OOM). - // Pruning should be handled externally or in a non-blocking way. if self.pruner.is_pruning_needed(block_number) { - tracing::warn!( + debug!( target: "engine::persistence", block_number, - "SaveBlocks: pruning needed but SKIPPED to avoid persistence stall" + "Pruning needed but skipped to avoid persistence stall" ); } } } PersistenceAction::SaveFinalizedBlock(finalized_block) => { - tracing::warn!( - target: "engine::persistence", - finalized_block, - "SaveFinalizedBlock: acquiring write lock" - ); - let rw_start = Instant::now(); let provider = self.provider.database_provider_rw()?; - tracing::warn!( - target: "engine::persistence", - finalized_block, - elapsed = ?rw_start.elapsed(), - "SaveFinalizedBlock: write lock acquired, saving" - ); provider.save_finalized_block_number(finalized_block)?; provider.commit()?; - tracing::warn!( - target: "engine::persistence", - finalized_block, - total_elapsed = ?rw_start.elapsed(), - "SaveFinalizedBlock: completed" - ); } PersistenceAction::SaveSafeBlock(safe_block) => { - tracing::warn!( - target: "engine::persistence", - safe_block, - "SaveSafeBlock: acquiring write lock" - ); - let rw_start = Instant::now(); let provider = self.provider.database_provider_rw()?; - tracing::warn!( - target: "engine::persistence", - safe_block, - elapsed = ?rw_start.elapsed(), - "SaveSafeBlock: write lock acquired, saving" - ); provider.save_safe_block_number(safe_block)?; provider.commit()?; - tracing::warn!( - target: "engine::persistence", - safe_block, - total_elapsed = ?rw_start.elapsed(), - "SaveSafeBlock: completed" - ); } } } @@ -218,43 +150,19 @@ where let first_block = blocks.first().map(|b| b.recovered_block.num_hash()); let last_block = blocks.last().map(|b| b.recovered_block.num_hash()); let block_count = blocks.len(); + debug!(target: "engine::persistence", ?block_count, first=?first_block, last=?last_block, "Saving range of blocks"); let start_time = Instant::now(); if last_block.is_some() { - tracing::warn!( - target: "engine::persistence", - block_count, - first = ?first_block, - last = ?last_block, - "on_save_blocks: acquiring database_provider_rw" - ); - let rw_start = Instant::now(); let provider_rw = self.provider.database_provider_rw()?; - tracing::warn!( - target: "engine::persistence", - elapsed = ?rw_start.elapsed(), - "on_save_blocks: database_provider_rw acquired" - ); - let save_start = Instant::now(); provider_rw.save_blocks(blocks, SaveBlocksMode::Full)?; - tracing::warn!( - target: "engine::persistence", - elapsed = ?save_start.elapsed(), - "on_save_blocks: save_blocks completed, committing" - ); - - let commit_start = Instant::now(); provider_rw.commit()?; - tracing::warn!( - target: "engine::persistence", - commit_elapsed = ?commit_start.elapsed(), - total_elapsed = ?start_time.elapsed(), - "on_save_blocks: commit completed" - ); } + debug!(target: "engine::persistence", first=?first_block, last=?last_block, "Saved range of blocks"); + self.metrics.save_blocks_block_count.record(block_count as f64); self.metrics.save_blocks_duration_seconds.record(start_time.elapsed()); diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index 5def77504..b62041703 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -1275,14 +1275,7 @@ where .map(|b| b.recovered_block().num_hash()) .expect("Checked non-empty persisting blocks"); - warn!( - target: "engine::tree", - count = blocks_to_persist.len(), - first_block = blocks_to_persist.first().map(|b| b.recovered_block().number()).unwrap_or(0), - last_block = blocks_to_persist.last().map(|b| b.recovered_block().number()).unwrap_or(0), - in_memory_blocks = self.state.tree_state.block_count(), - "Starting persistence task" - ); + debug!(target: "engine::tree", count=blocks_to_persist.len(), blocks = ?blocks_to_persist.iter().map(|block| block.recovered_block().num_hash()).collect::>(), "Persisting blocks"); let (tx, rx) = crossbeam_channel::bounded(1); let _ = self.persistence.save_blocks(blocks_to_persist, tx); @@ -1891,28 +1884,11 @@ where // If we have an on-disk reorg, we need to handle it first before touching the in-memory // state. if let Some(remove_above) = self.find_disk_reorg()? { - warn!( - target: "engine::tree", - remove_above, - last_persisted = self.persistence_state.last_persisted_block.number, - canonical_head = self.state.tree_state.canonical_block_number(), - "Disk reorg detected in on_new_persisted_block, skipping normal cleanup" - ); self.remove_blocks(remove_above); return Ok(()) } let finalized = self.state.forkchoice_state_tracker.last_valid_finalized(); - let persisted = self.persistence_state.last_persisted_block; - let is_canonical = self.state.tree_state.is_canonical(persisted.hash); - warn!( - target: "engine::tree", - persisted_number = persisted.number, - ?finalized, - is_persisted_hash_canonical = is_canonical, - "on_new_persisted_block: about to remove_before" - ); - self.remove_before(self.persistence_state.last_persisted_block, finalized)?; self.canonical_in_memory_state.remove_persisted_blocks(BlockNumHash { number: self.persistence_state.last_persisted_block.number, diff --git a/crates/engine/tree/src/tree/state.rs b/crates/engine/tree/src/tree/state.rs index e347924e2..0a13207e6 100644 --- a/crates/engine/tree/src/tree/state.rs +++ b/crates/engine/tree/src/tree/state.rs @@ -177,14 +177,6 @@ impl TreeState { // If the last persisted hash is not canonical, then we don't want to remove any canonical // blocks yet. if !self.is_canonical(last_persisted_hash) { - tracing::warn!( - target: "engine::tree", - ?upper_bound, - ?last_persisted_hash, - block_count = self.blocks_by_hash.len(), - canonical_head = ?self.current_canonical_head, - "Skipping canonical block removal: last_persisted_hash is NOT canonical" - ); return }