From b7352663fe73e0f4f5d5184f4ab125be7d626982 Mon Sep 17 00:00:00 2001 From: Niven Date: Thu, 15 Jan 2026 17:43:19 +0800 Subject: [PATCH 1/9] Fix payload handler for p2p flashblocks and full built payloads --- .../src/builders/flashblocks/payload.rs | 17 +++++++----- .../builders/flashblocks/payload_handler.rs | 26 ++++++++++++------- .../src/builders/flashblocks/service.rs | 5 ++++ 3 files changed, 33 insertions(+), 15 deletions(-) diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload.rs b/crates/op-rbuilder/src/builders/flashblocks/payload.rs index 9e894969a..e847f0fe2 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload.rs @@ -174,9 +174,12 @@ pub(super) struct OpPayloadBuilder { pub pool: Pool, /// Node client pub client: Client, - /// Sender for sending built payloads to [`PayloadHandler`], - /// which broadcasts outgoing payloads via p2p. - pub payload_tx: mpsc::Sender, + /// Sender for sending built flashblock payloads to [`PayloadHandler`], + /// which broadcasts outgoing flashblock payloads via p2p. + pub built_fb_payload_tx: mpsc::Sender, + /// Sender for sending built full block payloads to [`PayloadHandler`], + /// which updates the engine tree state. + pub built_payload_tx: mpsc::Sender, /// WebSocket publisher for broadcasting flashblocks /// to all connected subscribers. pub ws_pub: Arc, @@ -199,7 +202,8 @@ impl OpPayloadBuilder { client: Client, config: BuilderConfig, builder_tx: BuilderTx, - payload_tx: mpsc::Sender, + built_fb_payload_tx: mpsc::Sender, + built_payload_tx: mpsc::Sender, ws_pub: Arc, metrics: Arc, ) -> Self { @@ -208,7 +212,8 @@ impl OpPayloadBuilder { evm_config, pool, client, - payload_tx, + built_fb_payload_tx, + built_payload_tx, ws_pub, config, metrics, @@ -848,7 +853,7 @@ where .ws_pub .publish(&fb_payload) .wrap_err("failed to publish flashblock via websocket")?; - self.payload_tx + self.built_fb_payload_tx .send(new_payload.clone()) .await .wrap_err("failed to send built payload to handler")?; diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs b/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs index 8613abc86..f0a29fcd8 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs @@ -29,8 +29,10 @@ use tracing::warn; /// In the case of a payload built by this node, it is broadcast to peers and an event is sent to the payload builder. /// In the case of a payload received from a peer, it is executed and if successful, an event is sent to the payload builder. pub(crate) struct PayloadHandler { - // receives new payloads built by this builder. - built_rx: mpsc::Receiver, + // receives new flashblock payloads built by this builder. + built_fb_payload_rx: mpsc::Receiver, + // receives new full block payloads built by this builder. + built_payload_rx: mpsc::Receiver, // receives incoming p2p messages from peers. p2p_rx: mpsc::Receiver, // outgoing p2p channel to broadcast new payloads to peers. @@ -50,7 +52,8 @@ where { #[allow(clippy::too_many_arguments)] pub(crate) fn new( - built_rx: mpsc::Receiver, + built_fb_payload_rx: mpsc::Receiver, + built_payload_rx: mpsc::Receiver, p2p_rx: mpsc::Receiver, p2p_tx: mpsc::Sender, payload_events_handle: tokio::sync::broadcast::Sender>, @@ -59,7 +62,8 @@ where cancel: tokio_util::sync::CancellationToken, ) -> Self { Self { - built_rx, + built_fb_payload_rx, + built_payload_rx, p2p_rx, p2p_tx, payload_events_handle, @@ -71,7 +75,8 @@ where pub(crate) async fn run(self) { let Self { - mut built_rx, + mut built_fb_payload_rx, + mut built_payload_rx, mut p2p_rx, p2p_tx, payload_events_handle, @@ -84,12 +89,15 @@ where loop { tokio::select! { - Some(payload) = built_rx.recv() => { + Some(payload) = built_fb_payload_rx.recv() => { + // ignore error here; if p2p was disabled, the channel will be closed. + let _ = p2p_tx.send(payload.into()).await; + } + Some(payload) = built_payload_rx.recv() => { + // Update engine tree state with locally built block payloads if let Err(e) = payload_events_handle.send(Events::BuiltPayload(payload.clone())) { warn!(e = ?e, "failed to send BuiltPayload event"); } - // ignore error here; if p2p was disabled, the channel will be closed. - let _ = p2p_tx.send(payload.into()).await; } Some(message) = p2p_rx.recv() => { match message { @@ -241,7 +249,7 @@ where cancel, ); - let (built_payload, fb_payload) = crate::builders::flashblocks::payload::build_block( + let (built_payload, fb_payload, _) = crate::builders::flashblocks::payload::build_block( &mut state, &builder_ctx, &mut info, diff --git a/crates/op-rbuilder/src/builders/flashblocks/service.rs b/crates/op-rbuilder/src/builders/flashblocks/service.rs index b0563421a..2e67208ec 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/service.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/service.rs @@ -106,6 +106,9 @@ impl FlashblocksServiceBuilder { }; let metrics = Arc::new(OpRBuilderMetrics::default()); + // Channels for built flashblock payloads + let (built_fb_payload_tx, built_fb_payload_rx) = tokio::sync::mpsc::channel(16); + // Channels for built full block payloads let (built_payload_tx, built_payload_rx) = tokio::sync::mpsc::channel(16); let ws_pub: Arc = @@ -118,6 +121,7 @@ impl FlashblocksServiceBuilder { ctx.provider().clone(), self.0.clone(), builder_tx, + built_fb_payload_tx, built_payload_tx, ws_pub.clone(), metrics.clone(), @@ -145,6 +149,7 @@ impl FlashblocksServiceBuilder { .wrap_err("failed to create flashblocks payload builder context")?; let payload_handler = PayloadHandler::new( + built_fb_payload_rx, built_payload_rx, incoming_message_rx, outgoing_message_tx, From 38204c5763f81c31cc5234afbfe261ae57e85c1c Mon Sep 17 00:00:00 2001 From: Niven Date: Thu, 15 Jan 2026 17:53:17 +0800 Subject: [PATCH 2/9] Update --- .../src/builders/flashblocks/payload.rs | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload.rs b/crates/op-rbuilder/src/builders/flashblocks/payload.rs index e847f0fe2..a474eab10 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload.rs @@ -407,10 +407,17 @@ where !disable_state_root || ctx.attributes().no_tx_pool, // need to calculate state root for CL sync )?; - self.payload_tx + self.built_fb_payload_tx .send(payload.clone()) .await .map_err(PayloadBuilderError::other)?; + if let Err(e) = self.built_payload_tx.send(payload.clone()).await { + warn!( + target: "payload_builder", + error = %e, + "Failed to send updated payload" + ); + } best_payload.set(payload); info!( @@ -857,6 +864,13 @@ where .send(new_payload.clone()) .await .wrap_err("failed to send built payload to handler")?; + if let Err(e) = self.built_payload_tx.send(new_payload.clone()).await { + warn!( + target: "payload_builder", + error = %e, + "Failed to send updated payload" + ); + } best_payload.set(new_payload); // Record flashblock build duration From 4b582641f2fe4c4a4ec5451b560cc6453e43156a Mon Sep 17 00:00:00 2001 From: Niven Date: Fri, 16 Jan 2026 14:30:48 +0800 Subject: [PATCH 3/9] Fix --- crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs b/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs index f0a29fcd8..c8c84ccdb 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs @@ -249,7 +249,7 @@ where cancel, ); - let (built_payload, fb_payload, _) = crate::builders::flashblocks::payload::build_block( + let (built_payload, fb_payload) = crate::builders::flashblocks::payload::build_block( &mut state, &builder_ctx, &mut info, From b90b71c2ddcf0f1d1c67c8b2c3aefcd53f187e50 Mon Sep 17 00:00:00 2001 From: Niven Date: Fri, 16 Jan 2026 14:32:05 +0800 Subject: [PATCH 4/9] Add state root calculation on payload resolution --- .../src/builders/flashblocks/payload.rs | 212 +++++++++++++++--- .../builders/flashblocks/payload_handler.rs | 2 +- 2 files changed, 182 insertions(+), 32 deletions(-) diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload.rs b/crates/op-rbuilder/src/builders/flashblocks/payload.rs index a474eab10..f0eb55091 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload.rs @@ -34,6 +34,7 @@ use reth_optimism_evm::{OpEvmConfig, OpNextBlockEnvAttributes}; use reth_optimism_forks::OpHardforks; use reth_optimism_node::{OpBuiltPayload, OpPayloadBuilderAttributes}; use reth_optimism_primitives::{OpReceipt, OpTransactionSigned}; +use reth_payload_primitives::BuiltPayload; use reth_payload_util::BestPayloadTransactions; use reth_primitives_traits::RecoveredBlock; use reth_provider::{ @@ -41,7 +42,9 @@ use reth_provider::{ StorageRootProvider, }; use reth_revm::{ - State, database::StateProviderDatabase, db::states::bundle_state::BundleRetention, + State, + database::StateProviderDatabase, + db::{BundleState, states::bundle_state::BundleRetention}, }; use reth_transaction_pool::TransactionPool; use reth_trie::{HashedPostState, updates::TrieUpdates}; @@ -331,7 +334,7 @@ where async fn build_payload( &self, args: BuildArguments, OpBuiltPayload>, - best_payload: BlockCell, + resolve_payload: BlockCell, ) -> Result<(), PayloadBuilderError> { let block_build_start_time = Instant::now(); let BuildArguments { @@ -400,25 +403,14 @@ where ); }; - let (payload, fb_payload) = build_block( - &mut state, - &ctx, - &mut info, - !disable_state_root || ctx.attributes().no_tx_pool, // need to calculate state root for CL sync - )?; - + // We should always calculate state root for fallback payload + let (fallback_payload, fb_payload, bundle_state) = + build_block(&mut state, &ctx, &mut info, true)?; self.built_fb_payload_tx - .send(payload.clone()) + .send(fallback_payload.clone()) .await .map_err(PayloadBuilderError::other)?; - if let Err(e) = self.built_payload_tx.send(payload.clone()).await { - warn!( - target: "payload_builder", - error = %e, - "Failed to send updated payload" - ); - } - best_payload.set(payload); + let mut best_payload = (fallback_payload.clone(), bundle_state); info!( target: "payload_builder", @@ -458,6 +450,14 @@ where .set(info.executed_transactions.len() as f64); // return early since we don't need to build a block with transactions from the pool + self.resolve_best_payload( + &mut state, + &ctx, + best_payload, + fallback_payload, + &resolve_payload, + ) + .await; return Ok(()); } // We adjust our flashblocks timings based on time the fcu block building signal arrived @@ -614,6 +614,14 @@ where ctx = ctx.with_cancel(new_fb_cancel); }, _ = block_cancel.cancelled() => { + self.resolve_best_payload( + &mut state, + &ctx, + best_payload, + fallback_payload, + &resolve_payload, + ) + .await; self.record_flashblocks_metrics( &ctx, &info, @@ -637,6 +645,14 @@ where let _entered = fb_span.enter(); if ctx.flashblock_index() > ctx.target_flashblock_count() { + self.resolve_best_payload( + &mut state, + &ctx, + best_payload, + fallback_payload, + &resolve_payload, + ) + .await; self.record_flashblocks_metrics( &ctx, &info, @@ -656,13 +672,21 @@ where &state_provider, &mut best_txs, &block_cancel, - &best_payload, + &mut best_payload, &fb_span, ) .await { Ok(Some(next_flashblocks_ctx)) => next_flashblocks_ctx, Ok(None) => { + self.resolve_best_payload( + &mut state, + &ctx, + best_payload, + fallback_payload, + &resolve_payload, + ) + .await; self.record_flashblocks_metrics( &ctx, &info, @@ -680,6 +704,14 @@ where ctx.block_number(), err ); + self.resolve_best_payload( + &mut state, + &ctx, + best_payload, + fallback_payload, + &resolve_payload, + ) + .await; return Err(PayloadBuilderError::Other(err.into())); } }; @@ -700,7 +732,7 @@ where state_provider: impl reth::providers::StateProvider + Clone, best_txs: &mut NextBestFlashblocksTxs, block_cancel: &CancellationToken, - best_payload: &BlockCell, + best_payload: &mut (OpBuiltPayload, BundleState), span: &tracing::Span, ) -> eyre::Result> { let flashblock_index = ctx.flashblock_index(); @@ -840,7 +872,7 @@ where ctx.metrics.invalid_built_blocks_count.increment(1); Err(err).wrap_err("failed to build payload") } - Ok((new_payload, mut fb_payload)) => { + Ok((new_payload, mut fb_payload, bundle_state)) => { fb_payload.index = flashblock_index; fb_payload.base = None; @@ -864,14 +896,7 @@ where .send(new_payload.clone()) .await .wrap_err("failed to send built payload to handler")?; - if let Err(e) = self.built_payload_tx.send(new_payload.clone()).await { - warn!( - target: "payload_builder", - error = %e, - "Failed to send updated payload" - ); - } - best_payload.set(new_payload); + *best_payload = (new_payload, bundle_state); // Record flashblock build duration ctx.metrics @@ -925,6 +950,93 @@ where } } + async fn resolve_best_payload< + DB: Database + std::fmt::Debug + AsRef

, + P: StateRootProvider + HashedPostStateProvider + StorageRootProvider, + >( + &self, + state: &mut State, + ctx: &OpPayloadBuilderCtx, + best_payload: (OpBuiltPayload, BundleState), + fallback_payload: OpBuiltPayload, + resolve_payload: &BlockCell, + ) { + if resolve_payload.get().is_some() { + return; + } + + let payload = match best_payload.0.block().header().state_root { + B256::ZERO => { + info!(target: "payload_builder", "Resolving payload with zero state root"); + self.resolve_zero_state_root(state, ctx, best_payload) + .await + .unwrap_or_else(|err| { + warn!( + target: "payload_builder", + error = %err, + "Failed to calculate state root, falling back to fallback payload" + ); + fallback_payload + }) + } + _ => best_payload.0, + }; + resolve_payload.set(payload); + } + + async fn resolve_zero_state_root< + DB: Database + std::fmt::Debug + AsRef

, + P: StateRootProvider + HashedPostStateProvider + StorageRootProvider, + >( + &self, + state: &mut State, + ctx: &OpPayloadBuilderCtx, + best_payload: (OpBuiltPayload, BundleState), + ) -> Result { + let (state_root, trie_updates, hashed_state) = + calculate_state_root_on_resolve(state, ctx, best_payload.1)?; + + let payload_id = best_payload.0.id(); + let fees = best_payload.0.fees(); + let executed_block = best_payload.0.executed_block().ok_or_else(|| { + PayloadBuilderError::Other( + eyre::eyre!("No executed block available in best payload for payload resolution") + .into(), + ) + })?; + let block = best_payload.0.into_sealed_block().into_block(); + let (mut header, body) = block.split(); + header.state_root = state_root; + let updated_block = alloy_consensus::Block::::new(header, body); + let recovered_block = RecoveredBlock::new_unhashed( + updated_block.clone(), + executed_block.recovered_block().senders().to_vec(), + ); + let sealed_block = Arc::new(updated_block.seal_slow()); + + let executed = ExecutedBlock { + recovered_block: Arc::new(recovered_block), + execution_output: executed_block.execution_output.clone(), + hashed_state: Arc::new(hashed_state), + trie_updates: Arc::new(trie_updates), + }; + let updated_payload = OpBuiltPayload::new(payload_id, sealed_block, fees, Some(executed)); + if let Err(e) = self.built_payload_tx.send(updated_payload.clone()).await { + warn!( + target: "payload_builder", + error = %e, + "Failed to send updated payload" + ); + } + debug!( + target: "payload_builder", + state_root = %state_root, + "Updated payload with calculated state root" + ); + + Ok(updated_payload) + } + /// Do some logging and metric recording when we stop build flashblocks fn record_flashblocks_metrics( &self, @@ -1220,7 +1332,7 @@ pub(super) fn build_block( ctx: &OpPayloadBuilderCtx, info: &mut ExecutionInfo, calculate_state_root: bool, -) -> Result<(OpBuiltPayload, OpFlashblockPayload), PayloadBuilderError> +) -> Result<(OpBuiltPayload, OpFlashblockPayload, BundleState), PayloadBuilderError> where DB: Database + AsRef

, P: StateRootProvider + HashedPostStateProvider + StorageRootProvider, @@ -1438,7 +1550,7 @@ where }; // We clean bundle and place initial state transaction back - state.take_bundle(); + let bundle_state = state.take_bundle(); state.transition_state = untouched_transition_state; Ok(( @@ -1449,5 +1561,43 @@ where Some(executed), ), fb_payload, + bundle_state, )) } + +/// Calculates only the state root for an existing payload +fn calculate_state_root_on_resolve( + state: &mut State, + ctx: &OpPayloadBuilderCtx, + bundle_state: BundleState, +) -> Result<(B256, TrieUpdates, HashedPostState), PayloadBuilderError> +where + DB: Database + AsRef

, + P: StateRootProvider + HashedPostStateProvider + StorageRootProvider, + ExtraCtx: std::fmt::Debug + Default, +{ + let state_root_start_time = Instant::now(); + let state_provider = state.database.as_ref(); + let hashed_state = state_provider.hashed_post_state(&bundle_state); + let state_root_updates = state + .database + .as_ref() + .state_root_with_updates(hashed_state.clone()) + .inspect_err(|err| { + warn!(target: "payload_builder", + parent_header=%ctx.parent().hash(), + %err, + "failed to calculate state root for payload" + ); + })?; + + let state_root_calculation_time = state_root_start_time.elapsed(); + ctx.metrics + .state_root_calculation_duration + .record(state_root_calculation_time); + ctx.metrics + .state_root_calculation_gauge + .set(state_root_calculation_time); + + Ok((state_root_updates.0, state_root_updates.1, hashed_state)) +} diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs b/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs index c8c84ccdb..f0a29fcd8 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs @@ -249,7 +249,7 @@ where cancel, ); - let (built_payload, fb_payload) = crate::builders::flashblocks::payload::build_block( + let (built_payload, fb_payload, _) = crate::builders::flashblocks::payload::build_block( &mut state, &builder_ctx, &mut info, From cb6408ce0315f4de6da061361e9f9d75cc3de354 Mon Sep 17 00:00:00 2001 From: Niven Date: Fri, 16 Jan 2026 16:27:23 +0800 Subject: [PATCH 5/9] Add async state root calculation --- crates/op-rbuilder/src/args/op.rs | 8 + crates/op-rbuilder/src/builders/context.rs | 2 +- .../src/builders/flashblocks/config.rs | 9 + .../src/builders/flashblocks/payload.rs | 252 ++++++++++-------- .../src/builders/flashblocks/service.rs | 1 + 5 files changed, 162 insertions(+), 110 deletions(-) diff --git a/crates/op-rbuilder/src/args/op.rs b/crates/op-rbuilder/src/args/op.rs index 73bf00625..4b8a8fe8e 100644 --- a/crates/op-rbuilder/src/args/op.rs +++ b/crates/op-rbuilder/src/args/op.rs @@ -157,6 +157,14 @@ pub struct FlashblocksArgs { )] pub flashblocks_disable_state_root: bool, + /// Whether to disable async state root calculation on full payload resolution + #[arg( + long = "flashblocks.disable-async-calculate-state-root", + default_value = "false", + env = "FLASHBLOCKS_DISABLE_ASYNC_CALCULATE_STATE_ROOT" + )] + pub flashblocks_disable_async_calculate_state_root: bool, + /// Flashblocks number contract address /// /// This is the address of the contract that will be used to increment the flashblock number. diff --git a/crates/op-rbuilder/src/builders/context.rs b/crates/op-rbuilder/src/builders/context.rs index 1c042fc48..04326b117 100644 --- a/crates/op-rbuilder/src/builders/context.rs +++ b/crates/op-rbuilder/src/builders/context.rs @@ -49,7 +49,7 @@ use crate::{ }; /// Container type that holds all necessities to build a new payload. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct OpPayloadBuilderCtx { /// The type that knows how to perform system calls and configure the evm. pub evm_config: OpEvmConfig, diff --git a/crates/op-rbuilder/src/builders/flashblocks/config.rs b/crates/op-rbuilder/src/builders/flashblocks/config.rs index 364c68115..475ef2d17 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/config.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/config.rs @@ -34,6 +34,9 @@ pub struct FlashblocksConfig { /// Should we disable state root calculation for each flashblock pub disable_state_root: bool, + /// Should we disable async state root calculation on full payload resolution + pub disable_async_calculate_state_root: bool, + /// The address of the flashblocks number contract. /// /// If set a builder tx will be added to the start of every flashblock instead of the regular builder tx. @@ -77,6 +80,7 @@ impl Default for FlashblocksConfig { leeway_time: Duration::from_millis(0), fixed: false, disable_state_root: false, + disable_async_calculate_state_root: false, number_contract_address: None, number_contract_use_permit: false, build_at_interval_end: false, @@ -108,6 +112,10 @@ impl TryFrom for FlashblocksConfig { let disable_state_root = args.flashblocks.flashblocks_disable_state_root; + let disable_async_calculate_state_root = args + .flashblocks + .flashblocks_disable_async_calculate_state_root; + let number_contract_address = args.flashblocks.flashblocks_number_contract_address; let number_contract_use_permit = args.flashblocks.flashblocks_number_contract_use_permit; @@ -118,6 +126,7 @@ impl TryFrom for FlashblocksConfig { leeway_time, fixed, disable_state_root, + disable_async_calculate_state_root, number_contract_address, number_contract_use_permit, build_at_interval_end: args.flashblocks.flashblocks_build_at_interval_end, diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload.rs b/crates/op-rbuilder/src/builders/flashblocks/payload.rs index f0eb55091..50647a4b8 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload.rs @@ -16,14 +16,14 @@ use alloy_consensus::{ BlockBody, EMPTY_OMMER_ROOT_HASH, Header, constants::EMPTY_WITHDRAWALS, proofs, }; use alloy_eips::{Encodable2718, eip7685::EMPTY_REQUESTS_HASH, merge::BEACON_NONCE}; -use alloy_primitives::{Address, B256, U256}; +use alloy_primitives::{Address, B256, BlockHash, U256}; use core::time::Duration; use eyre::WrapErr as _; use op_alloy_rpc_types_engine::{ OpFlashblockPayload, OpFlashblockPayloadBase, OpFlashblockPayloadDelta, OpFlashblockPayloadMetadata, }; -use reth::payload::PayloadBuilderAttributes; +use reth::{payload::PayloadBuilderAttributes, tasks::TaskSpawner}; use reth_basic_payload_builder::BuildOutcome; use reth_chain_state::ExecutedBlock; use reth_chainspec::EthChainSpec; @@ -55,7 +55,7 @@ use std::{ sync::Arc, time::Instant, }; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, oneshot}; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, metadata::Level, span, warn}; @@ -170,13 +170,15 @@ impl OpPayloadBuilderCtx { /// Optimism's payload builder #[derive(Debug, Clone)] -pub(super) struct OpPayloadBuilder { +pub(super) struct OpPayloadBuilder { /// The type responsible for creating the evm. pub evm_config: OpEvmConfig, /// The transaction pool pub pool: Pool, /// Node client pub client: Client, + /// Task executor + pub task_executor: Tasks, /// Sender for sending built flashblock payloads to [`PayloadHandler`], /// which broadcasts outgoing flashblock payloads via p2p. pub built_fb_payload_tx: mpsc::Sender, @@ -196,13 +198,14 @@ pub(super) struct OpPayloadBuilder { pub address_gas_limiter: AddressGasLimiter, } -impl OpPayloadBuilder { +impl OpPayloadBuilder { /// `OpPayloadBuilder` constructor. #[allow(clippy::too_many_arguments)] pub(super) fn new( evm_config: OpEvmConfig, pool: Pool, client: Client, + task_executor: Tasks, config: BuilderConfig, builder_tx: BuilderTx, built_fb_payload_tx: mpsc::Sender, @@ -215,6 +218,7 @@ impl OpPayloadBuilder { evm_config, pool, client, + task_executor, built_fb_payload_tx, built_payload_tx, ws_pub, @@ -226,12 +230,13 @@ impl OpPayloadBuilder { } } -impl reth_basic_payload_builder::PayloadBuilder - for OpPayloadBuilder +impl reth_basic_payload_builder::PayloadBuilder + for OpPayloadBuilder where Pool: Clone + Send + Sync, Client: Clone + Send + Sync, BuilderTx: Clone + Send + Sync, + Tasks: Clone + Send + Sync, { type Attributes = OpPayloadBuilderAttributes; type BuiltPayload = OpBuiltPayload; @@ -254,11 +259,12 @@ where } } -impl OpPayloadBuilder +impl OpPayloadBuilder where Pool: PoolBounds, Client: ClientBounds, BuilderTx: BuilderTransactions + Send + Sync, + Tasks: TaskSpawner + Clone + Unpin + 'static, { fn get_op_payload_builder_ctx( &self, @@ -450,14 +456,8 @@ where .set(info.executed_transactions.len() as f64); // return early since we don't need to build a block with transactions from the pool - self.resolve_best_payload( - &mut state, - &ctx, - best_payload, - fallback_payload, - &resolve_payload, - ) - .await; + self.resolve_best_payload(&ctx, best_payload, fallback_payload, &resolve_payload) + .await; return Ok(()); } // We adjust our flashblocks timings based on time the fcu block building signal arrived @@ -615,7 +615,6 @@ where }, _ = block_cancel.cancelled() => { self.resolve_best_payload( - &mut state, &ctx, best_payload, fallback_payload, @@ -645,14 +644,8 @@ where let _entered = fb_span.enter(); if ctx.flashblock_index() > ctx.target_flashblock_count() { - self.resolve_best_payload( - &mut state, - &ctx, - best_payload, - fallback_payload, - &resolve_payload, - ) - .await; + self.resolve_best_payload(&ctx, best_payload, fallback_payload, &resolve_payload) + .await; self.record_flashblocks_metrics( &ctx, &info, @@ -680,7 +673,6 @@ where Ok(Some(next_flashblocks_ctx)) => next_flashblocks_ctx, Ok(None) => { self.resolve_best_payload( - &mut state, &ctx, best_payload, fallback_payload, @@ -705,7 +697,6 @@ where err ); self.resolve_best_payload( - &mut state, &ctx, best_payload, fallback_payload, @@ -950,12 +941,8 @@ where } } - async fn resolve_best_payload< - DB: Database + std::fmt::Debug + AsRef

, - P: StateRootProvider + HashedPostStateProvider + StorageRootProvider, - >( + async fn resolve_best_payload( &self, - state: &mut State, ctx: &OpPayloadBuilderCtx, best_payload: (OpBuiltPayload, BundleState), fallback_payload: OpBuiltPayload, @@ -967,76 +954,73 @@ where let payload = match best_payload.0.block().header().state_root { B256::ZERO => { - info!(target: "payload_builder", "Resolving payload with zero state root"); - self.resolve_zero_state_root(state, ctx, best_payload) - .await - .unwrap_or_else(|err| { + let (tx, rx) = oneshot::channel(); + + // Get the fallback payload for payload resolution. Note that if async state root calculation is + // enabled, we use the fallback payload as the best payload contains empty state root since zero + // state root payloads are acceptable. + let fallback_payload_for_resolve = + if self.config.specific.disable_async_calculate_state_root { + // If async state root calculation is disabled, we use the fallback payload with state + // root calcuated to ensure the full payload is valid + fallback_payload.clone() + } else { + best_payload.0.clone() + }; + + let state_root_ctx = CalculateStateRootContext { + best_payload, + parent_hash: ctx.parent().hash(), + built_payload_tx: self.built_payload_tx.clone(), + metrics: self.metrics.clone(), + }; + + // Async calculate state root + match self.client.state_by_block_hash(ctx.parent().hash()) { + Ok(state_provider) => { + self.task_executor.spawn(Box::pin(async move { + let payload_with_state_root = resolve_zero_state_root(state_root_ctx, state_provider).await.unwrap_or_else(|err| { + warn!( + target: "payload_builder", + error = %err, + "Failed to calculate state root, falling back to fallback payload" + ); + // Use the fallback payload with state root calculated on failures as a safety + // net so that the full built payload is assured to be a valid. + fallback_payload + }); + + // Send the payload if aync SR calculation is disabled + let _ = tx.send(payload_with_state_root); + })); + + if self.config.specific.disable_async_calculate_state_root { + rx.await.unwrap_or_else(|_| { + warn!( + target: "payload_builder", + "Failed to calculate state root, state root task stopped. falling back to fallback payload" + ); + fallback_payload_for_resolve + }) + } else { + fallback_payload_for_resolve + } + } + Err(err) => { warn!( target: "payload_builder", error = %err, - "Failed to calculate state root, falling back to fallback payload" + "Failed to get state provider for parent block for SR calculation. falling back to fallback payload" ); - fallback_payload - }) + fallback_payload_for_resolve + } + } } _ => best_payload.0, }; resolve_payload.set(payload); } - async fn resolve_zero_state_root< - DB: Database + std::fmt::Debug + AsRef

, - P: StateRootProvider + HashedPostStateProvider + StorageRootProvider, - >( - &self, - state: &mut State, - ctx: &OpPayloadBuilderCtx, - best_payload: (OpBuiltPayload, BundleState), - ) -> Result { - let (state_root, trie_updates, hashed_state) = - calculate_state_root_on_resolve(state, ctx, best_payload.1)?; - - let payload_id = best_payload.0.id(); - let fees = best_payload.0.fees(); - let executed_block = best_payload.0.executed_block().ok_or_else(|| { - PayloadBuilderError::Other( - eyre::eyre!("No executed block available in best payload for payload resolution") - .into(), - ) - })?; - let block = best_payload.0.into_sealed_block().into_block(); - let (mut header, body) = block.split(); - header.state_root = state_root; - let updated_block = alloy_consensus::Block::::new(header, body); - let recovered_block = RecoveredBlock::new_unhashed( - updated_block.clone(), - executed_block.recovered_block().senders().to_vec(), - ); - let sealed_block = Arc::new(updated_block.seal_slow()); - - let executed = ExecutedBlock { - recovered_block: Arc::new(recovered_block), - execution_output: executed_block.execution_output.clone(), - hashed_state: Arc::new(hashed_state), - trie_updates: Arc::new(trie_updates), - }; - let updated_payload = OpBuiltPayload::new(payload_id, sealed_block, fees, Some(executed)); - if let Err(e) = self.built_payload_tx.send(updated_payload.clone()).await { - warn!( - target: "payload_builder", - error = %e, - "Failed to send updated payload" - ); - } - debug!( - target: "payload_builder", - state_root = %state_root, - "Updated payload with calculated state root" - ); - - Ok(updated_payload) - } - /// Do some logging and metric recording when we stop build flashblocks fn record_flashblocks_metrics( &self, @@ -1288,12 +1272,14 @@ where } #[async_trait::async_trait] -impl PayloadBuilder for OpPayloadBuilder +impl PayloadBuilder + for OpPayloadBuilder where Pool: PoolBounds, Client: ClientBounds, BuilderTx: BuilderTransactions + Clone + Send + Sync, + Tasks: TaskSpawner + Clone + Unpin + 'static, { type Attributes = OpPayloadBuilderAttributes; type BuiltPayload = OpBuiltPayload; @@ -1565,27 +1551,75 @@ where )) } +struct CalculateStateRootContext { + best_payload: (OpBuiltPayload, BundleState), + parent_hash: BlockHash, + built_payload_tx: mpsc::Sender, + metrics: Arc, +} + +async fn resolve_zero_state_root( + ctx: CalculateStateRootContext, + state_provider: Box, +) -> Result { + let (state_root, trie_updates, hashed_state) = + calculate_state_root_on_resolve(&ctx, state_provider)?; + + let payload_id = ctx.best_payload.0.id(); + let fees = ctx.best_payload.0.fees(); + let executed_block = ctx.best_payload.0.executed_block().ok_or_else(|| { + PayloadBuilderError::Other( + eyre::eyre!("No executed block available in best payload for payload resolution") + .into(), + ) + })?; + let block = ctx.best_payload.0.into_sealed_block().into_block(); + let (mut header, body) = block.split(); + header.state_root = state_root; + let updated_block = alloy_consensus::Block::::new(header, body); + let recovered_block = RecoveredBlock::new_unhashed( + updated_block.clone(), + executed_block.recovered_block().senders().to_vec(), + ); + let sealed_block = Arc::new(updated_block.seal_slow()); + + let executed = ExecutedBlock { + recovered_block: Arc::new(recovered_block), + execution_output: executed_block.execution_output.clone(), + hashed_state: Arc::new(hashed_state), + trie_updates: Arc::new(trie_updates), + }; + let updated_payload = OpBuiltPayload::new(payload_id, sealed_block, fees, Some(executed)); + + // Send full built payload with state root calculated to pre-warm local engine state tree + if let Err(e) = ctx.built_payload_tx.send(updated_payload.clone()).await { + warn!( + target: "payload_builder", + error = %e, + "Failed to send updated payload" + ); + } + debug!( + target: "payload_builder", + state_root = %state_root, + "Updated payload with calculated state root" + ); + + Ok(updated_payload) +} + /// Calculates only the state root for an existing payload -fn calculate_state_root_on_resolve( - state: &mut State, - ctx: &OpPayloadBuilderCtx, - bundle_state: BundleState, -) -> Result<(B256, TrieUpdates, HashedPostState), PayloadBuilderError> -where - DB: Database + AsRef

, - P: StateRootProvider + HashedPostStateProvider + StorageRootProvider, - ExtraCtx: std::fmt::Debug + Default, -{ +fn calculate_state_root_on_resolve( + ctx: &CalculateStateRootContext, + state_provider: Box, +) -> Result<(B256, TrieUpdates, HashedPostState), PayloadBuilderError> { let state_root_start_time = Instant::now(); - let state_provider = state.database.as_ref(); - let hashed_state = state_provider.hashed_post_state(&bundle_state); - let state_root_updates = state - .database - .as_ref() + let hashed_state = state_provider.hashed_post_state(&ctx.best_payload.1); + let state_root_updates = state_provider .state_root_with_updates(hashed_state.clone()) .inspect_err(|err| { warn!(target: "payload_builder", - parent_header=%ctx.parent().hash(), + parent_header=%ctx.parent_hash, %err, "failed to calculate state root for payload" ); diff --git a/crates/op-rbuilder/src/builders/flashblocks/service.rs b/crates/op-rbuilder/src/builders/flashblocks/service.rs index 2e67208ec..a92345292 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/service.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/service.rs @@ -119,6 +119,7 @@ impl FlashblocksServiceBuilder { OpEvmConfig::optimism(ctx.chain_spec()), pool, ctx.provider().clone(), + ctx.task_executor().clone(), self.0.clone(), builder_tx, built_fb_payload_tx, From d345bcae36fd0caf3c157233af4ea4038c0463eb Mon Sep 17 00:00:00 2001 From: Niven Date: Fri, 16 Jan 2026 19:43:41 +0800 Subject: [PATCH 6/9] Better refactor --- .../src/builders/flashblocks/payload.rs | 54 +++++++++++-------- 1 file changed, 31 insertions(+), 23 deletions(-) diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload.rs b/crates/op-rbuilder/src/builders/flashblocks/payload.rs index 50647a4b8..ff476c815 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload.rs @@ -954,17 +954,13 @@ where let payload = match best_payload.0.block().header().state_root { B256::ZERO => { - let (tx, rx) = oneshot::channel(); - - // Get the fallback payload for payload resolution. Note that if async state root calculation is - // enabled, we use the fallback payload as the best payload contains empty state root since zero - // state root payloads are acceptable. + // Get the fallback payload for payload resolution let fallback_payload_for_resolve = if self.config.specific.disable_async_calculate_state_root { - // If async state root calculation is disabled, we use the fallback payload with state - // root calcuated to ensure the full payload is valid + // Use the fallback payload with state root calculated to ensure the full payload is valid fallback_payload.clone() } else { + // Use the best payload as empty state root payloads are acceptable best_payload.0.clone() }; @@ -978,27 +974,39 @@ where // Async calculate state root match self.client.state_by_block_hash(ctx.parent().hash()) { Ok(state_provider) => { - self.task_executor.spawn(Box::pin(async move { - let payload_with_state_root = resolve_zero_state_root(state_root_ctx, state_provider).await.unwrap_or_else(|err| { - warn!( - target: "payload_builder", - error = %err, - "Failed to calculate state root, falling back to fallback payload" - ); - // Use the fallback payload with state root calculated on failures as a safety - // net so that the full built payload is assured to be a valid. - fallback_payload - }); + let (sync_tx, sync_rx) = + if self.config.specific.disable_async_calculate_state_root { + let (tx, rx) = oneshot::channel(); + (Some(tx), Some(rx)) + } else { + (None, None) + }; - // Send the payload if aync SR calculation is disabled - let _ = tx.send(payload_with_state_root); + self.task_executor.spawn(Box::pin(async move { + let result = resolve_zero_state_root(state_root_ctx, state_provider) + .await + .unwrap_or_else(|err| { + warn!( + target: "payload_builder", + error = %err, + "Failed to calculate state root, falling back to fallback payload" + ); + // Use the fallback payload with state root calculated on failures as a safety + // net so that the full built payload is assured to be valid. + fallback_payload + }); + + // Only send via channel in sync mode + if let Some(tx) = sync_tx { + let _ = tx.send(result); + } })); - if self.config.specific.disable_async_calculate_state_root { + if let Some(rx) = sync_rx { rx.await.unwrap_or_else(|_| { warn!( target: "payload_builder", - "Failed to calculate state root, state root task stopped. falling back to fallback payload" + "Failed to calculate state root, state root task stopped. Falling back to fallback payload" ); fallback_payload_for_resolve }) @@ -1010,7 +1018,7 @@ where warn!( target: "payload_builder", error = %err, - "Failed to get state provider for parent block for SR calculation. falling back to fallback payload" + "Failed to calculate state root, parent block not found. Falling back to fallback payload" ); fallback_payload_for_resolve } From 4a252112aa7e179c4457e735050f0604ec36f1e9 Mon Sep 17 00:00:00 2001 From: Niven Date: Mon, 19 Jan 2026 14:26:40 +0800 Subject: [PATCH 7/9] Fixes --- crates/op-rbuilder/src/builders/flashblocks/payload.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload.rs b/crates/op-rbuilder/src/builders/flashblocks/payload.rs index 3983752c4..c2cb2bdb4 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload.rs @@ -180,7 +180,7 @@ pub(super) struct OpPayloadBuilder { pub task_executor: Tasks, /// Sender for sending built flashblock payloads to [`PayloadHandler`], /// which broadcasts outgoing flashblock payloads via p2p. - pub built_fb_payload_tx: mpsc::Sender, + pub built_fb_payload_tx: mpsc::Sender, /// Sender for sending built full block payloads to [`PayloadHandler`], /// which updates the engine tree state. pub built_payload_tx: mpsc::Sender, @@ -207,7 +207,7 @@ impl OpPayloadBuilder, builder_tx: BuilderTx, - built_fb_payload_tx: mpsc::Sender, + built_fb_payload_tx: mpsc::Sender, built_payload_tx: mpsc::Sender, ws_pub: Arc, metrics: Arc, @@ -412,7 +412,7 @@ where let (fallback_payload, fb_payload, bundle_state) = build_block(&mut state, &ctx, &mut info, true)?; self.built_fb_payload_tx - .try_send(fb_payload.clone()) + .try_send(fallback_payload.clone()) .map_err(PayloadBuilderError::other)?; let mut best_payload = (fallback_payload.clone(), bundle_state); @@ -835,7 +835,7 @@ where .publish(&fb_payload) .wrap_err("failed to publish flashblock via websocket")?; self.built_fb_payload_tx - .try_send(fb_payload.clone()) + .try_send(new_payload.clone()) .wrap_err("failed to send built payload to handler")?; *best_payload = (new_payload, bundle_state); From a7e54affff3092c40b007fe96e4802fb82d489a3 Mon Sep 17 00:00:00 2001 From: Niven Date: Mon, 19 Jan 2026 19:15:38 +0800 Subject: [PATCH 8/9] Remove async oneshot channel, better refactor (#60) --- .../src/builders/flashblocks/payload.rs | 40 +++++-------------- 1 file changed, 9 insertions(+), 31 deletions(-) diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload.rs b/crates/op-rbuilder/src/builders/flashblocks/payload.rs index c2cb2bdb4..4bc83dc40 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload.rs @@ -54,7 +54,7 @@ use std::{ sync::Arc, time::Instant, }; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, metadata::Level, span, warn}; @@ -908,7 +908,7 @@ where let fallback_payload_for_resolve = if self.config.specific.disable_async_calculate_state_root { // Use the fallback payload with state root calculated to ensure the full payload is valid - fallback_payload.clone() + fallback_payload } else { // Use the best payload as empty state root payloads are acceptable best_payload.0.clone() @@ -924,42 +924,20 @@ where // Async calculate state root match self.client.state_by_block_hash(ctx.parent().hash()) { Ok(state_provider) => { - let (sync_tx, sync_rx) = - if self.config.specific.disable_async_calculate_state_root { - let (tx, rx) = oneshot::channel(); - (Some(tx), Some(rx)) - } else { - (None, None) - }; - - self.task_executor.spawn(Box::pin(async move { - let result = resolve_zero_state_root(state_root_ctx, state_provider) + if self.config.specific.disable_async_calculate_state_root { + resolve_zero_state_root(state_root_ctx, state_provider) .unwrap_or_else(|err| { warn!( target: "payload_builder", error = %err, "Failed to calculate state root, falling back to fallback payload" ); - // Use the fallback payload with state root calculated on failures as a safety - // net so that the full built payload is assured to be valid. - fallback_payload - }); - - // Only send via channel in sync mode - if let Some(tx) = sync_tx { - let _ = tx.send(result); - } - })); - - if let Some(rx) = sync_rx { - rx.blocking_recv().unwrap_or_else(|_| { - warn!( - target: "payload_builder", - "Failed to calculate state root, state root task stopped. Falling back to fallback payload" - ); - fallback_payload_for_resolve - }) + fallback_payload_for_resolve + }) } else { + self.task_executor.spawn(Box::pin(async move { + let _ = resolve_zero_state_root(state_root_ctx, state_provider); + })); fallback_payload_for_resolve } } From 3c87e10c5020af162fa5093524a3bef3f2b461b6 Mon Sep 17 00:00:00 2001 From: Niven Date: Tue, 10 Feb 2026 14:46:01 +0800 Subject: [PATCH 9/9] Switch to blocking --- crates/op-rbuilder/src/builders/flashblocks/payload.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload.rs b/crates/op-rbuilder/src/builders/flashblocks/payload.rs index 8f2e3b54d..0fd0a4b65 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload.rs @@ -958,7 +958,7 @@ where fallback_payload_for_resolve }) } else { - self.task_executor.spawn(Box::pin(async move { + self.task_executor.spawn_blocking(Box::pin(async move { let _ = resolve_zero_state_root(state_root_ctx, state_provider); })); fallback_payload_for_resolve