From 02d91aa391ad6fd677e4f044a6e9ed278d6711db Mon Sep 17 00:00:00 2001 From: Niven Date: Thu, 12 Feb 2026 18:17:30 +0800 Subject: [PATCH 1/4] Add p2p feat to gossip flashblocks payloads --- crates/op-rbuilder/src/args/op.rs | 16 ++++++ .../src/builders/flashblocks/config.rs | 10 ++++ .../src/builders/flashblocks/p2p.rs | 12 ++--- .../src/builders/flashblocks/payload.rs | 8 +-- .../builders/flashblocks/payload_handler.rs | 49 +++++++++++++++++-- .../src/builders/flashblocks/service.rs | 5 +- 6 files changed, 85 insertions(+), 15 deletions(-) diff --git a/crates/op-rbuilder/src/args/op.rs b/crates/op-rbuilder/src/args/op.rs index aed722003..26da47c8f 100644 --- a/crates/op-rbuilder/src/args/op.rs +++ b/crates/op-rbuilder/src/args/op.rs @@ -240,6 +240,22 @@ pub struct FlashblocksP2pArgs { default_value = "50" )] pub p2p_max_peer_count: u32, + + /// Optional flag to send the full payload to peers + #[arg( + long = "flashblocks.p2p_send_full_payload", + env = "FLASHBLOCK_P2P_SEND_FULL_PAYLOAD", + default_value = "false" + )] + pub p2p_send_full_payload: bool, + + /// Optional flag to process the full payload received by peers + #[arg( + long = "flashblocks.p2p_process_full_payload", + env = "FLASHBLOCK_P2P_PROCESS_FULL_PAYLOAD", + default_value = "false" + )] + pub p2p_process_full_payload: bool, } /// Parameters for telemetry configuration diff --git a/crates/op-rbuilder/src/builders/flashblocks/config.rs b/crates/op-rbuilder/src/builders/flashblocks/config.rs index 62fa2ca64..dacb0fdd0 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/config.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/config.rs @@ -52,6 +52,12 @@ pub struct FlashblocksConfig { /// Maximum number of peers for the p2p node pub p2p_max_peer_count: u32, + /// Optional flag to send the full payload to peers + pub p2p_send_full_payload: bool, + + /// Optional flag to process the full payload received by peers + pub p2p_process_full_payload: bool, + /// Maximum number of concurrent WebSocket subscribers pub ws_subscriber_limit: Option, } @@ -70,6 +76,8 @@ impl Default for FlashblocksConfig { p2p_port: 9009, p2p_private_key_file: None, p2p_known_peers: None, + p2p_send_full_payload: false, + p2p_process_full_payload: false, p2p_max_peer_count: 50, ws_subscriber_limit: None, } @@ -106,6 +114,8 @@ impl TryFrom for FlashblocksConfig { p2p_private_key_file: args.flashblocks.p2p.p2p_private_key_file, p2p_known_peers: args.flashblocks.p2p.p2p_known_peers, p2p_max_peer_count: args.flashblocks.p2p.p2p_max_peer_count, + p2p_send_full_payload: args.flashblocks.p2p.p2p_send_full_payload, + p2p_process_full_payload: args.flashblocks.p2p.p2p_process_full_payload, ws_subscriber_limit: args.flashblocks.ws_subscriber_limit, }) } diff --git a/crates/op-rbuilder/src/builders/flashblocks/p2p.rs b/crates/op-rbuilder/src/builders/flashblocks/p2p.rs index 9f947d959..3d3f8dfe0 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/p2p.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/p2p.rs @@ -1,4 +1,5 @@ use alloy_primitives::U256; +use op_alloy_rpc_types_engine::OpFlashblockPayload; use reth::{core::primitives::SealedBlock, payload::PayloadId}; use reth_optimism_payload_builder::OpBuiltPayload as RethOpBuiltPayload; use reth_optimism_primitives::OpBlock; @@ -11,6 +12,7 @@ pub(super) const FLASHBLOCKS_STREAM_PROTOCOL: p2p::StreamProtocol = #[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] pub(super) enum Message { OpBuiltPayload(OpBuiltPayload), + OpFlashblockPayload(OpFlashblockPayload), } impl p2p::Message for Message { @@ -31,15 +33,13 @@ pub(crate) struct OpBuiltPayload { pub(crate) fees: U256, } -impl From for Message { - fn from(value: RethOpBuiltPayload) -> Self { +impl Message { + pub(super) fn from_built_payload(value: RethOpBuiltPayload) -> Self { Message::OpBuiltPayload(value.into()) } -} -impl From for Message { - fn from(value: OpBuiltPayload) -> Self { - Message::OpBuiltPayload(value) + pub(super) fn from_flashblock_payload(value: OpFlashblockPayload) -> Self { + Message::OpFlashblockPayload(value) } } diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload.rs b/crates/op-rbuilder/src/builders/flashblocks/payload.rs index 613ab5cfa..493542371 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload.rs @@ -163,7 +163,7 @@ pub(super) struct OpPayloadBuilder { pub client: Client, /// Sender for sending built flashblock payloads to [`PayloadHandler`], /// which broadcasts outgoing flashblock payloads via p2p. - pub built_fb_payload_tx: mpsc::Sender, + pub built_fb_payload_tx: mpsc::Sender, /// Sender for sending built full block payloads to [`PayloadHandler`], /// which updates the engine tree state. pub built_payload_tx: mpsc::Sender, @@ -191,7 +191,7 @@ impl OpPayloadBuilder { client: Client, config: BuilderConfig, builder_tx: BuilderTx, - built_fb_payload_tx: mpsc::Sender, + built_fb_payload_tx: mpsc::Sender, built_payload_tx: mpsc::Sender, ws_pub: Arc, metrics: Arc, @@ -398,7 +398,7 @@ where )?; self.built_fb_payload_tx - .try_send(payload.clone()) + .try_send(fb_payload.clone()) .map_err(PayloadBuilderError::other)?; if let Err(e) = self.built_payload_tx.try_send(payload.clone()) { warn!( @@ -746,7 +746,7 @@ where .publish(&fb_payload) .wrap_err("failed to publish flashblock via websocket")?; self.built_fb_payload_tx - .try_send(new_payload.clone()) + .try_send(fb_payload) .wrap_err("failed to send built payload to handler")?; if let Err(e) = self.built_payload_tx.try_send(new_payload.clone()) { warn!( diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs b/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs index 59a86ea04..6cd5579c1 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs @@ -1,6 +1,7 @@ use crate::{ builders::flashblocks::{ ctx::OpPayloadSyncerCtx, p2p::Message, payload::FlashblocksExecutionInfo, + wspub::WebSocketPublisher, }, primitives::reth::ExecutionInfo, traits::ClientBounds, @@ -35,7 +36,7 @@ use tracing::warn; /// In the case of a payload received from a peer, it is executed and if successful, an event is sent to the payload builder. pub(crate) struct PayloadHandler { // receives new flashblock payloads built by this builder. - built_fb_payload_rx: mpsc::Receiver, + built_fb_payload_rx: mpsc::Receiver, // receives new full block payloads built by this builder. built_payload_rx: mpsc::Receiver, // receives incoming p2p messages from peers. @@ -44,6 +45,8 @@ pub(crate) struct PayloadHandler { p2p_tx: mpsc::Sender, // sends a `Events::BuiltPayload` to the reth payload builder when a new payload is received. payload_events_handle: tokio::sync::broadcast::Sender>, + // websocket publisher for broadcasting flashblocks to all connected subscribers. + ws_pub: Arc, // context required for execution of blocks during syncing ctx: OpPayloadSyncerCtx, // chain client @@ -51,6 +54,8 @@ pub(crate) struct PayloadHandler { // task executor task_executor: Tasks, cancel: tokio_util::sync::CancellationToken, + p2p_send_full_payload_flag: bool, + p2p_process_full_payload_flag: bool, } impl PayloadHandler @@ -60,15 +65,18 @@ where { #[allow(clippy::too_many_arguments)] pub(crate) fn new( - built_fb_payload_rx: mpsc::Receiver, + built_fb_payload_rx: mpsc::Receiver, built_payload_rx: mpsc::Receiver, p2p_rx: mpsc::Receiver, p2p_tx: mpsc::Sender, payload_events_handle: tokio::sync::broadcast::Sender>, + ws_pub: Arc, ctx: OpPayloadSyncerCtx, client: Client, task_executor: Tasks, cancel: tokio_util::sync::CancellationToken, + p2p_send_full_payload_flag: bool, + p2p_process_full_payload_flag: bool, ) -> Self { Self { built_fb_payload_rx, @@ -76,10 +84,13 @@ where p2p_rx, p2p_tx, payload_events_handle, + ws_pub, ctx, client, task_executor, cancel, + p2p_send_full_payload_flag, + p2p_process_full_payload_flag, } } @@ -90,10 +101,13 @@ where mut p2p_rx, p2p_tx, payload_events_handle, + ws_pub, ctx, client, task_executor, cancel, + p2p_send_full_payload_flag, + p2p_process_full_payload_flag, } = self; tracing::info!(target: "payload_builder", "flashblocks payload handler started"); @@ -102,24 +116,46 @@ where tokio::select! { Some(payload) = built_fb_payload_rx.recv() => { // ignore error here; if p2p was disabled, the channel will be closed. - let _ = p2p_tx.send(payload.into()).await; + let _ = p2p_tx.send(Message::from_flashblock_payload(payload)).await; } Some(payload) = built_payload_rx.recv() => { // Update engine tree state with locally built block payloads if let Err(e) = payload_events_handle.send(Events::BuiltPayload(payload.clone())) { warn!(target: "payload_builder", e = ?e, "failed to send BuiltPayload event"); } + if p2p_send_full_payload_flag { + // ignore error here; if p2p was disabled, the channel will be closed. + let _ = p2p_tx.send(Message::from_built_payload(payload)).await; + } } Some(message) = p2p_rx.recv() => { match message { Message::OpBuiltPayload(payload) => { + if !p2p_process_full_payload_flag { + continue; + } + let payload: OpBuiltPayload = payload.into(); + let block_hash = payload.block().hash(); + // Check if this block is already the pending block in canonical state + if let Ok(Some(pending)) = client.pending_block() + && pending.hash() == block_hash + { + tracing::trace!( + target: "payload_builder", + hash = %block_hash, + block_number = payload.block().header().number, + "skipping flashblock execution - block already pending in canonical state" + ); + continue; + } + let ctx = ctx.clone(); let client = client.clone(); let payload_events_handle = payload_events_handle.clone(); let cancel = cancel.clone(); - // execute the flashblock on a thread where blocking is acceptable, + // execute the built full payload on a thread where blocking is acceptable, // as it's potentially a heavy operation task_executor.spawn_blocking(Box::pin(async move { let res = execute_flashblock( @@ -141,6 +177,11 @@ where } })); } + Message::OpFlashblockPayload(fb_payload) => { + if let Err(e) = ws_pub.publish(&fb_payload) { + warn!(target: "payload_builder", e = ?e, "failed to publish flashblock to websocket publisher"); + } + } } } else => break, diff --git a/crates/op-rbuilder/src/builders/flashblocks/service.rs b/crates/op-rbuilder/src/builders/flashblocks/service.rs index a790cb595..1fe38a43e 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/service.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/service.rs @@ -149,7 +149,7 @@ impl FlashblocksServiceBuilder { let syncer_ctx = crate::builders::flashblocks::ctx::OpPayloadSyncerCtx::new( &ctx.provider().clone(), - self.0, + self.0.clone(), OpEvmConfig::optimism(ctx.chain_spec()), metrics.clone(), ) @@ -161,10 +161,13 @@ impl FlashblocksServiceBuilder { incoming_message_rx, outgoing_message_tx, payload_service.payload_events_handle(), + ws_pub.clone(), syncer_ctx, ctx.provider().clone(), ctx.task_executor().clone(), cancel, + self.0.specific.p2p_send_full_payload, + self.0.specific.p2p_process_full_payload, ); ctx.task_executor().spawn_critical( From cb90c836d78d65be9578b6fc5b1fc0456e9058ab Mon Sep 17 00:00:00 2001 From: Niven Date: Thu, 12 Feb 2026 18:19:39 +0800 Subject: [PATCH 2/4] Fix comments --- crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs b/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs index 6cd5579c1..0d7d8ff84 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs @@ -214,7 +214,7 @@ where .wrap_err("failed to get parent header")? .ok_or_else(|| eyre::eyre!("parent header not found"))?; - // For X Layer, validate header and parent relationship before execution + // Validate header and parent relationship before execution let chain_spec = client.chain_spec(); validate_pre_execution(&payload, &parent_header, parent_hash, chain_spec.clone()) .wrap_err("pre-execution validation failed")?; From c1dca13151535a2a0f29835790b600b99928e2e5 Mon Sep 17 00:00:00 2001 From: Niven Date: Thu, 12 Feb 2026 18:30:51 +0800 Subject: [PATCH 3/4] Refactor naming --- .../op-rbuilder/src/builders/flashblocks/payload_handler.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs b/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs index 0d7d8ff84..2cd863181 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs @@ -158,7 +158,7 @@ where // execute the built full payload on a thread where blocking is acceptable, // as it's potentially a heavy operation task_executor.spawn_blocking(Box::pin(async move { - let res = execute_flashblock( + let res = execute_built_payload( payload, ctx, client, @@ -190,7 +190,7 @@ where } } -fn execute_flashblock( +fn execute_built_payload( payload: OpBuiltPayload, ctx: OpPayloadSyncerCtx, client: Client, From a11c5c4a99656a70555a97a61010cf4ee5d9c9f6 Mon Sep 17 00:00:00 2001 From: Niven Date: Fri, 20 Feb 2026 22:28:19 +0800 Subject: [PATCH 4/4] Resolve comments --- crates/op-rbuilder/src/args/op.rs | 16 ++++++------ .../src/builders/flashblocks/config.rs | 16 ++++++------ .../src/builders/flashblocks/p2p.rs | 8 +++--- .../builders/flashblocks/payload_handler.rs | 25 ++++++++++--------- .../src/builders/flashblocks/service.rs | 4 +-- 5 files changed, 36 insertions(+), 33 deletions(-) diff --git a/crates/op-rbuilder/src/args/op.rs b/crates/op-rbuilder/src/args/op.rs index 26da47c8f..541e6a4df 100644 --- a/crates/op-rbuilder/src/args/op.rs +++ b/crates/op-rbuilder/src/args/op.rs @@ -241,21 +241,21 @@ pub struct FlashblocksP2pArgs { )] pub p2p_max_peer_count: u32, - /// Optional flag to send the full payload to peers + /// Optional flag to send the payload to peers #[arg( - long = "flashblocks.p2p_send_full_payload", - env = "FLASHBLOCK_P2P_SEND_FULL_PAYLOAD", + long = "flashblocks.p2p_send_payload", + env = "FLASHBLOCK_P2P_SEND_PAYLOAD", default_value = "false" )] - pub p2p_send_full_payload: bool, + pub p2p_send_payload: bool, - /// Optional flag to process the full payload received by peers + /// Optional flag to process the payload received by peers #[arg( - long = "flashblocks.p2p_process_full_payload", - env = "FLASHBLOCK_P2P_PROCESS_FULL_PAYLOAD", + long = "flashblocks.p2p_process_payload", + env = "FLASHBLOCK_P2P_PROCESS_PAYLOAD", default_value = "false" )] - pub p2p_process_full_payload: bool, + pub p2p_process_payload: bool, } /// Parameters for telemetry configuration diff --git a/crates/op-rbuilder/src/builders/flashblocks/config.rs b/crates/op-rbuilder/src/builders/flashblocks/config.rs index dacb0fdd0..c3f880207 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/config.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/config.rs @@ -52,11 +52,11 @@ pub struct FlashblocksConfig { /// Maximum number of peers for the p2p node pub p2p_max_peer_count: u32, - /// Optional flag to send the full payload to peers - pub p2p_send_full_payload: bool, + /// Optional flag to send the payload to peers + pub p2p_send_payload: bool, - /// Optional flag to process the full payload received by peers - pub p2p_process_full_payload: bool, + /// Optional flag to process the payload received by peers + pub p2p_process_payload: bool, /// Maximum number of concurrent WebSocket subscribers pub ws_subscriber_limit: Option, @@ -76,8 +76,8 @@ impl Default for FlashblocksConfig { p2p_port: 9009, p2p_private_key_file: None, p2p_known_peers: None, - p2p_send_full_payload: false, - p2p_process_full_payload: false, + p2p_send_payload: false, + p2p_process_payload: false, p2p_max_peer_count: 50, ws_subscriber_limit: None, } @@ -114,8 +114,8 @@ impl TryFrom for FlashblocksConfig { p2p_private_key_file: args.flashblocks.p2p.p2p_private_key_file, p2p_known_peers: args.flashblocks.p2p.p2p_known_peers, p2p_max_peer_count: args.flashblocks.p2p.p2p_max_peer_count, - p2p_send_full_payload: args.flashblocks.p2p.p2p_send_full_payload, - p2p_process_full_payload: args.flashblocks.p2p.p2p_process_full_payload, + p2p_send_payload: args.flashblocks.p2p.p2p_send_payload, + p2p_process_payload: args.flashblocks.p2p.p2p_process_payload, ws_subscriber_limit: args.flashblocks.ws_subscriber_limit, }) } diff --git a/crates/op-rbuilder/src/builders/flashblocks/p2p.rs b/crates/op-rbuilder/src/builders/flashblocks/p2p.rs index 3d3f8dfe0..5283c9c09 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/p2p.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/p2p.rs @@ -33,12 +33,14 @@ pub(crate) struct OpBuiltPayload { pub(crate) fees: U256, } -impl Message { - pub(super) fn from_built_payload(value: RethOpBuiltPayload) -> Self { +impl From for Message { + fn from(value: RethOpBuiltPayload) -> Self { Message::OpBuiltPayload(value.into()) } +} - pub(super) fn from_flashblock_payload(value: OpFlashblockPayload) -> Self { +impl From for Message { + fn from(value: OpFlashblockPayload) -> Self { Message::OpFlashblockPayload(value) } } diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs b/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs index 2cd863181..d2798c183 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs @@ -54,8 +54,8 @@ pub(crate) struct PayloadHandler { // task executor task_executor: Tasks, cancel: tokio_util::sync::CancellationToken, - p2p_send_full_payload_flag: bool, - p2p_process_full_payload_flag: bool, + p2p_send_payload: bool, + p2p_process_payload: bool, } impl PayloadHandler @@ -75,8 +75,8 @@ where client: Client, task_executor: Tasks, cancel: tokio_util::sync::CancellationToken, - p2p_send_full_payload_flag: bool, - p2p_process_full_payload_flag: bool, + p2p_send_payload: bool, + p2p_process_payload: bool, ) -> Self { Self { built_fb_payload_rx, @@ -89,8 +89,8 @@ where client, task_executor, cancel, - p2p_send_full_payload_flag, - p2p_process_full_payload_flag, + p2p_send_payload, + p2p_process_payload, } } @@ -106,8 +106,8 @@ where client, task_executor, cancel, - p2p_send_full_payload_flag, - p2p_process_full_payload_flag, + p2p_send_payload, + p2p_process_payload, } = self; tracing::info!(target: "payload_builder", "flashblocks payload handler started"); @@ -116,22 +116,22 @@ where tokio::select! { Some(payload) = built_fb_payload_rx.recv() => { // ignore error here; if p2p was disabled, the channel will be closed. - let _ = p2p_tx.send(Message::from_flashblock_payload(payload)).await; + let _ = p2p_tx.send(payload.into()).await; } Some(payload) = built_payload_rx.recv() => { // Update engine tree state with locally built block payloads if let Err(e) = payload_events_handle.send(Events::BuiltPayload(payload.clone())) { warn!(target: "payload_builder", e = ?e, "failed to send BuiltPayload event"); } - if p2p_send_full_payload_flag { + if p2p_send_payload { // ignore error here; if p2p was disabled, the channel will be closed. - let _ = p2p_tx.send(Message::from_built_payload(payload)).await; + let _ = p2p_tx.send(payload.into()).await; } } Some(message) = p2p_rx.recv() => { match message { Message::OpBuiltPayload(payload) => { - if !p2p_process_full_payload_flag { + if !p2p_process_payload { continue; } @@ -178,6 +178,7 @@ where })); } Message::OpFlashblockPayload(fb_payload) => { + // Skip validation as flashblock builder p2p is trusted if let Err(e) = ws_pub.publish(&fb_payload) { warn!(target: "payload_builder", e = ?e, "failed to publish flashblock to websocket publisher"); } diff --git a/crates/op-rbuilder/src/builders/flashblocks/service.rs b/crates/op-rbuilder/src/builders/flashblocks/service.rs index 1fe38a43e..c689e011b 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/service.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/service.rs @@ -166,8 +166,8 @@ impl FlashblocksServiceBuilder { ctx.provider().clone(), ctx.task_executor().clone(), cancel, - self.0.specific.p2p_send_full_payload, - self.0.specific.p2p_process_full_payload, + self.0.specific.p2p_send_payload, + self.0.specific.p2p_process_payload, ); ctx.task_executor().spawn_critical(