Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 26 additions & 49 deletions crates/chain-state/src/deferred_trie.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,8 @@ impl DeferredTrieData {
///
/// # Process
/// 1. Sort the current block's hashed state and trie updates
/// 2. Reuse parent's cached overlay if available (O(1) - the common case)
/// 3. Otherwise, rebuild overlay from ancestors (rare fallback)
/// 4. Extend the overlay with this block's sorted data
/// 2. Rebuild overlay from ancestors' per-block data
/// 3. Extend the overlay with this block's sorted data
///
/// Used by both the async background task and the synchronous fallback path.
///
Expand All @@ -172,46 +171,25 @@ impl DeferredTrieData {
Err(arc) => arc.clone_into_sorted(),
};

// Reuse parent's overlay if available and anchors match.
// We can only reuse the parent's overlay if it was built on top of the same
// persisted anchor. If the anchor has changed (e.g., due to persistence),
// the parent's overlay is relative to an old state and cannot be used.
let overlay = if let Some(parent) = ancestors.last() {
let parent_data = parent.wait_cloned();

match &parent_data.anchored_trie_input {
// Case 1: Parent has cached overlay AND anchors match.
Some(AnchoredTrieInput { anchor_hash: parent_anchor, trie_input })
if *parent_anchor == anchor_hash =>
{
// O(1): Reuse parent's overlay, extend with current block's data.
let mut overlay = TrieInputSorted::new(
Arc::clone(&trie_input.nodes),
Arc::clone(&trie_input.state),
Default::default(), // prefix_sets are per-block, not cumulative
);
// Only trigger COW clone if there's actually data to add.
if !sorted_hashed_state.is_empty() {
Arc::make_mut(&mut overlay.state).extend_ref_and_sort(&sorted_hashed_state);
}
if !sorted_trie_updates.is_empty() {
Arc::make_mut(&mut overlay.nodes).extend_ref_and_sort(&sorted_trie_updates);
}
overlay
}
// Case 2: Parent exists but anchor mismatch or no cached overlay.
// We must rebuild from the ancestors list (which only contains unpersisted blocks).
_ => Self::merge_ancestors_into_overlay(
ancestors,
&sorted_hashed_state,
&sorted_trie_updates,
),
}
} else {
// Case 3: No in-memory ancestors (first block after persisted anchor).
// Build overlay with just this block's data.
Self::merge_ancestors_into_overlay(&[], &sorted_hashed_state, &sorted_trie_updates)
};
// Always rebuild the overlay from ancestors' per-block data instead of reusing
// the parent's cached cumulative overlay.
//
// The previous approach cloned the parent's overlay via Arc and then called
// Arc::make_mut to extend it. Because the parent's cached ComputedTrieData also
// holds a reference to the same Arc, strong_count is always >= 2, forcing
// Arc::make_mut to deep-copy the entire cumulative overlay on every block.
// For high-throughput chains with large state (e.g., BSC with 3-second blocks),
// this causes uncontrollable memory growth and OOM.
//
// The rebuild path starts with a fresh TrieInputSorted (strong_count == 1) and
// extends it with each ancestor's per-block hashed_state and trie_updates.
// This is O(sum of ancestors' per-block state) which is bounded by
// persistence_threshold * per_block_state_size, and avoids deep copies entirely.
let overlay = Self::merge_ancestors_into_overlay(
ancestors,
&sorted_hashed_state,
&sorted_trie_updates,
);

ComputedTrieData::with_trie_input(
Arc::new(sorted_hashed_state),
Expand All @@ -223,13 +201,12 @@ impl DeferredTrieData {

/// Merge all ancestors and current block's data into a single overlay.
///
/// This is a rare fallback path, only used when no ancestor has a cached
/// `anchored_trie_input` (e.g., blocks created via alternative constructors).
/// In normal operation, the parent always has a cached overlay and this
/// function is never called.
/// Builds a fresh [`TrieInputSorted`] by iterating ancestors oldest -> newest,
/// extending with each ancestor's per-block `hashed_state` and `trie_updates`,
/// then extending with the current block's sorted data (so later state wins).
///
/// Iterates ancestors oldest -> newest, then extends with current block's data,
/// so later state takes precedence.
/// Starts from `TrieInputSorted::default()` whose inner `Arc`s have
/// `strong_count == 1`, so `Arc::make_mut` never triggers a deep copy.
fn merge_ancestors_into_overlay(
ancestors: &[Self],
sorted_hashed_state: &HashedPostStateSorted,
Expand Down
10 changes: 8 additions & 2 deletions crates/engine/tree/src/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,15 @@ where
.sync_metrics_tx
.send(MetricEvent::SyncHeight { height: block_number });

// BSC: skip inline pruning to prevent persistence service stall.
// The pruner can hang on large BSC mainnet data, blocking all
// subsequent SaveBlocks and causing unbounded memory growth (OOM).
if self.pruner.is_pruning_needed(block_number) {
// We log `PrunerOutput` inside the `Pruner`
let _ = self.prune_before(block_number)?;
debug!(
target: "engine::persistence",
block_number,
"Pruning needed but skipped to avoid persistence stall"
);
}
}
}
Expand Down
37 changes: 35 additions & 2 deletions crates/engine/tree/src/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1287,6 +1287,19 @@ where
/// This checks if we need to remove blocks (disk reorg) or save new blocks to disk.
/// Persistence completion is handled separately via the `wait_for_event` method.
fn advance_persistence(&mut self) -> Result<(), AdvancePersistenceError> {
let block_count = self.state.tree_state.block_count();
if block_count > 64 {
warn!(
target: "engine::tree",
block_count,
persistence_in_progress = self.persistence_state.in_progress(),
canonical_head = self.state.tree_state.canonical_block_number(),
last_persisted = self.persistence_state.last_persisted_block.number,
backfill_idle = self.backfill_sync_state.is_idle(),
"In-memory block count exceeds 64, possible persistence stall"
);
}

if !self.persistence_state.in_progress() {
if let Some(new_tip_num) = self.find_disk_reorg()? {
self.remove_blocks(new_tip_num)
Expand Down Expand Up @@ -1366,7 +1379,9 @@ where
last_persisted_hash_num: Option<BlockNumHash>,
start_time: Instant,
) -> Result<(), AdvancePersistenceError> {
self.metrics.engine.persistence_duration.record(start_time.elapsed());
let elapsed = start_time.elapsed();
self.metrics.engine.persistence_duration.record(elapsed);
let block_count_before = self.state.tree_state.block_count();

let Some(BlockNumHash {
hash: last_persisted_block_hash,
Expand All @@ -1378,7 +1393,15 @@ where
return Ok(())
};

debug!(target: "engine::tree", ?last_persisted_block_hash, ?last_persisted_block_number, elapsed=?start_time.elapsed(), "Finished persisting, calling finish");
warn!(
target: "engine::tree",
?last_persisted_block_hash,
last_persisted_block_number,
?elapsed,
block_count_before,
canonical_head = self.state.tree_state.canonical_block_number(),
"Persistence completed, starting cleanup"
);
self.persistence_state.finish(last_persisted_block_hash, last_persisted_block_number);

// Evict trie changesets for blocks below the finalized block, but keep at least 64 blocks
Expand All @@ -1396,6 +1419,16 @@ where
}

self.on_new_persisted_block()?;

let block_count_after = self.state.tree_state.block_count();
warn!(
target: "engine::tree",
block_count_after,
blocks_removed = block_count_before.saturating_sub(block_count_after),
last_persisted = self.persistence_state.last_persisted_block.number,
"Cleanup after persistence completed"
);

Ok(())
}

Expand Down
25 changes: 14 additions & 11 deletions crates/rpc/rpc-eth-api/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,22 +245,26 @@ pub trait EthApi<

/// Returns the finalized block header.
///
/// The `verified_validator_num` parameter is provided for BSC compatibility but is not used
/// in standard Ethereum. The finalized block is determined by the Beacon Chain consensus
/// (Casper FFG) and requires 2/3+ validator attestations.
/// BSC compatibility:
/// - `-1` uses ceil(validators/2)
/// - `-2` uses ceil(validators*2/3)
/// - `-3` uses all validators
/// - positive values override the threshold directly.
#[method(name = "getFinalizedHeader")]
async fn finalized_header(&self, verified_validator_num: u64) -> RpcResult<Option<H>>;
async fn finalized_header(&self, verified_validator_num: i64) -> RpcResult<Option<H>>;

/// Returns the finalized block.
///
/// The `verified_validator_num` parameter is provided for BSC compatibility but is not used
/// in standard Ethereum. The finalized block is determined by the Beacon Chain consensus
/// (Casper FFG) and requires 2/3+ validator attestations.
/// BSC compatibility:
/// - `-1` uses ceil(validators/2)
/// - `-2` uses ceil(validators*2/3)
/// - `-3` uses all validators
/// - positive values override the threshold directly.
///
/// If `full` is true, the block object will contain all transaction objects,
/// otherwise it will only contain the transaction hashes.
#[method(name = "getFinalizedBlock")]
async fn finalized_block(&self, verified_validator_num: u64, full: bool) -> RpcResult<Option<B>>;
async fn finalized_block(&self, verified_validator_num: i64, full: bool) -> RpcResult<Option<B>>;

/// `eth_simulateV1` executes an arbitrary number of transactions on top of the requested state.
/// The transactions are packed into individual blocks. Overrides can be provided.
Expand Down Expand Up @@ -752,13 +756,13 @@ where
}

/// Handler for: `eth_getFinalizedHeader`
async fn finalized_header(&self, verified_validator_num: u64) -> RpcResult<Option<RpcHeader<T::NetworkTypes>>> {
async fn finalized_header(&self, verified_validator_num: i64) -> RpcResult<Option<RpcHeader<T::NetworkTypes>>> {
trace!(target: "rpc::eth", verified_validator_num, "Serving eth_getFinalizedHeader");
Ok(EthBlocks::rpc_finalized_header(self, verified_validator_num).await?)
}

/// Handler for: `eth_getFinalizedBlock`
async fn finalized_block(&self, verified_validator_num: u64, full: bool) -> RpcResult<Option<RpcBlock<T::NetworkTypes>>> {
async fn finalized_block(&self, verified_validator_num: i64, full: bool) -> RpcResult<Option<RpcBlock<T::NetworkTypes>>> {
trace!(target: "rpc::eth", verified_validator_num, ?full, "Serving eth_getFinalizedBlock");
Ok(EthBlocks::rpc_finalized_block(self, verified_validator_num, full).await?)
}
Expand Down Expand Up @@ -974,4 +978,3 @@ where
Ok(EthState::get_account_info(self, address, block).await?)
}
}

Loading
Loading