From b7352663fe73e0f4f5d5184f4ab125be7d626982 Mon Sep 17 00:00:00 2001 From: Niven Date: Thu, 15 Jan 2026 17:43:19 +0800 Subject: [PATCH 1/5] Fix payload handler for p2p flashblocks and full built payloads --- .../src/builders/flashblocks/payload.rs | 17 +++++++----- .../builders/flashblocks/payload_handler.rs | 26 ++++++++++++------- .../src/builders/flashblocks/service.rs | 5 ++++ 3 files changed, 33 insertions(+), 15 deletions(-) diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload.rs b/crates/op-rbuilder/src/builders/flashblocks/payload.rs index 9e894969a..e847f0fe2 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload.rs @@ -174,9 +174,12 @@ pub(super) struct OpPayloadBuilder { pub pool: Pool, /// Node client pub client: Client, - /// Sender for sending built payloads to [`PayloadHandler`], - /// which broadcasts outgoing payloads via p2p. - pub payload_tx: mpsc::Sender, + /// Sender for sending built flashblock payloads to [`PayloadHandler`], + /// which broadcasts outgoing flashblock payloads via p2p. + pub built_fb_payload_tx: mpsc::Sender, + /// Sender for sending built full block payloads to [`PayloadHandler`], + /// which updates the engine tree state. + pub built_payload_tx: mpsc::Sender, /// WebSocket publisher for broadcasting flashblocks /// to all connected subscribers. pub ws_pub: Arc, @@ -199,7 +202,8 @@ impl OpPayloadBuilder { client: Client, config: BuilderConfig, builder_tx: BuilderTx, - payload_tx: mpsc::Sender, + built_fb_payload_tx: mpsc::Sender, + built_payload_tx: mpsc::Sender, ws_pub: Arc, metrics: Arc, ) -> Self { @@ -208,7 +212,8 @@ impl OpPayloadBuilder { evm_config, pool, client, - payload_tx, + built_fb_payload_tx, + built_payload_tx, ws_pub, config, metrics, @@ -848,7 +853,7 @@ where .ws_pub .publish(&fb_payload) .wrap_err("failed to publish flashblock via websocket")?; - self.payload_tx + self.built_fb_payload_tx .send(new_payload.clone()) .await .wrap_err("failed to send built payload to handler")?; diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs b/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs index 8613abc86..f0a29fcd8 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs @@ -29,8 +29,10 @@ use tracing::warn; /// In the case of a payload built by this node, it is broadcast to peers and an event is sent to the payload builder. /// In the case of a payload received from a peer, it is executed and if successful, an event is sent to the payload builder. pub(crate) struct PayloadHandler { - // receives new payloads built by this builder. - built_rx: mpsc::Receiver, + // receives new flashblock payloads built by this builder. + built_fb_payload_rx: mpsc::Receiver, + // receives new full block payloads built by this builder. + built_payload_rx: mpsc::Receiver, // receives incoming p2p messages from peers. p2p_rx: mpsc::Receiver, // outgoing p2p channel to broadcast new payloads to peers. @@ -50,7 +52,8 @@ where { #[allow(clippy::too_many_arguments)] pub(crate) fn new( - built_rx: mpsc::Receiver, + built_fb_payload_rx: mpsc::Receiver, + built_payload_rx: mpsc::Receiver, p2p_rx: mpsc::Receiver, p2p_tx: mpsc::Sender, payload_events_handle: tokio::sync::broadcast::Sender>, @@ -59,7 +62,8 @@ where cancel: tokio_util::sync::CancellationToken, ) -> Self { Self { - built_rx, + built_fb_payload_rx, + built_payload_rx, p2p_rx, p2p_tx, payload_events_handle, @@ -71,7 +75,8 @@ where pub(crate) async fn run(self) { let Self { - mut built_rx, + mut built_fb_payload_rx, + mut built_payload_rx, mut p2p_rx, p2p_tx, payload_events_handle, @@ -84,12 +89,15 @@ where loop { tokio::select! { - Some(payload) = built_rx.recv() => { + Some(payload) = built_fb_payload_rx.recv() => { + // ignore error here; if p2p was disabled, the channel will be closed. + let _ = p2p_tx.send(payload.into()).await; + } + Some(payload) = built_payload_rx.recv() => { + // Update engine tree state with locally built block payloads if let Err(e) = payload_events_handle.send(Events::BuiltPayload(payload.clone())) { warn!(e = ?e, "failed to send BuiltPayload event"); } - // ignore error here; if p2p was disabled, the channel will be closed. - let _ = p2p_tx.send(payload.into()).await; } Some(message) = p2p_rx.recv() => { match message { @@ -241,7 +249,7 @@ where cancel, ); - let (built_payload, fb_payload) = crate::builders::flashblocks::payload::build_block( + let (built_payload, fb_payload, _) = crate::builders::flashblocks::payload::build_block( &mut state, &builder_ctx, &mut info, diff --git a/crates/op-rbuilder/src/builders/flashblocks/service.rs b/crates/op-rbuilder/src/builders/flashblocks/service.rs index b0563421a..2e67208ec 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/service.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/service.rs @@ -106,6 +106,9 @@ impl FlashblocksServiceBuilder { }; let metrics = Arc::new(OpRBuilderMetrics::default()); + // Channels for built flashblock payloads + let (built_fb_payload_tx, built_fb_payload_rx) = tokio::sync::mpsc::channel(16); + // Channels for built full block payloads let (built_payload_tx, built_payload_rx) = tokio::sync::mpsc::channel(16); let ws_pub: Arc = @@ -118,6 +121,7 @@ impl FlashblocksServiceBuilder { ctx.provider().clone(), self.0.clone(), builder_tx, + built_fb_payload_tx, built_payload_tx, ws_pub.clone(), metrics.clone(), @@ -145,6 +149,7 @@ impl FlashblocksServiceBuilder { .wrap_err("failed to create flashblocks payload builder context")?; let payload_handler = PayloadHandler::new( + built_fb_payload_rx, built_payload_rx, incoming_message_rx, outgoing_message_tx, From 38204c5763f81c31cc5234afbfe261ae57e85c1c Mon Sep 17 00:00:00 2001 From: Niven Date: Thu, 15 Jan 2026 17:53:17 +0800 Subject: [PATCH 2/5] Update --- .../src/builders/flashblocks/payload.rs | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload.rs b/crates/op-rbuilder/src/builders/flashblocks/payload.rs index e847f0fe2..a474eab10 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload.rs @@ -407,10 +407,17 @@ where !disable_state_root || ctx.attributes().no_tx_pool, // need to calculate state root for CL sync )?; - self.payload_tx + self.built_fb_payload_tx .send(payload.clone()) .await .map_err(PayloadBuilderError::other)?; + if let Err(e) = self.built_payload_tx.send(payload.clone()).await { + warn!( + target: "payload_builder", + error = %e, + "Failed to send updated payload" + ); + } best_payload.set(payload); info!( @@ -857,6 +864,13 @@ where .send(new_payload.clone()) .await .wrap_err("failed to send built payload to handler")?; + if let Err(e) = self.built_payload_tx.send(new_payload.clone()).await { + warn!( + target: "payload_builder", + error = %e, + "Failed to send updated payload" + ); + } best_payload.set(new_payload); // Record flashblock build duration From 4b582641f2fe4c4a4ec5451b560cc6453e43156a Mon Sep 17 00:00:00 2001 From: Niven Date: Fri, 16 Jan 2026 14:30:48 +0800 Subject: [PATCH 3/5] Fix --- crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs b/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs index f0a29fcd8..c8c84ccdb 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs @@ -249,7 +249,7 @@ where cancel, ); - let (built_payload, fb_payload, _) = crate::builders::flashblocks::payload::build_block( + let (built_payload, fb_payload) = crate::builders::flashblocks::payload::build_block( &mut state, &builder_ctx, &mut info, From 24ab12532f166c2ec7099f4bd18ce3cfc9aa415a Mon Sep 17 00:00:00 2001 From: Niven Date: Wed, 28 Jan 2026 18:04:10 +0800 Subject: [PATCH 4/5] Revert default behaviour --- .../src/builders/flashblocks/payload_handler.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs b/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs index 23897afec..bfe1d705d 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs @@ -92,6 +92,11 @@ where loop { tokio::select! { Some(payload) = built_fb_payload_rx.recv() => { + // Update engine tree state with locally built block payloads + if let Err(e) = payload_events_handle.send(Events::BuiltPayload(payload.clone())) { + warn!(e = ?e, "failed to send BuiltPayload event"); + } + // ignore error here; if p2p was disabled, the channel will be closed. let _ = p2p_tx.send(payload.into()).await; } @@ -100,6 +105,9 @@ where if let Err(e) = payload_events_handle.send(Events::BuiltPayload(payload.clone())) { warn!(e = ?e, "failed to send BuiltPayload event"); } + + // ignore error here; if p2p was disabled, the channel will be closed. + let _ = p2p_tx.send(payload.into()).await; } Some(message) = p2p_rx.recv() => { match message { From fa0fb5b337086fe09fc07854d89e463d071b6017 Mon Sep 17 00:00:00 2001 From: Niven Date: Wed, 4 Feb 2026 10:19:15 +0800 Subject: [PATCH 5/5] Revert "Revert default behaviour" This reverts commit 24ab12532f166c2ec7099f4bd18ce3cfc9aa415a. --- .../src/builders/flashblocks/payload_handler.rs | 8 -------- 1 file changed, 8 deletions(-) diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs b/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs index bfe1d705d..23897afec 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs @@ -92,11 +92,6 @@ where loop { tokio::select! { Some(payload) = built_fb_payload_rx.recv() => { - // Update engine tree state with locally built block payloads - if let Err(e) = payload_events_handle.send(Events::BuiltPayload(payload.clone())) { - warn!(e = ?e, "failed to send BuiltPayload event"); - } - // ignore error here; if p2p was disabled, the channel will be closed. let _ = p2p_tx.send(payload.into()).await; } @@ -105,9 +100,6 @@ where if let Err(e) = payload_events_handle.send(Events::BuiltPayload(payload.clone())) { warn!(e = ?e, "failed to send BuiltPayload event"); } - - // ignore error here; if p2p was disabled, the channel will be closed. - let _ = p2p_tx.send(payload.into()).await; } Some(message) = p2p_rx.recv() => { match message {