diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload.rs b/crates/op-rbuilder/src/builders/flashblocks/payload.rs index 68b0ad62..ee64a502 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload.rs @@ -176,9 +176,12 @@ pub(super) struct OpPayloadBuilder { pub pool: Pool, /// Node client pub client: Client, - /// Sender for sending built payloads to [`PayloadHandler`], - /// which broadcasts outgoing payloads via p2p. - pub payload_tx: mpsc::Sender, + /// Sender for sending built flashblock payloads to [`PayloadHandler`], + /// which broadcasts outgoing flashblock payloads via p2p. + pub built_fb_payload_tx: mpsc::Sender, + /// Sender for sending built full block payloads to [`PayloadHandler`], + /// which updates the engine tree state. + pub built_payload_tx: mpsc::Sender, /// WebSocket publisher for broadcasting flashblocks /// to all connected subscribers. pub ws_pub: Arc, @@ -203,7 +206,8 @@ impl OpPayloadBuilder { client: Client, config: BuilderConfig, builder_tx: BuilderTx, - payload_tx: mpsc::Sender, + built_fb_payload_tx: mpsc::Sender, + built_payload_tx: mpsc::Sender, ws_pub: Arc, metrics: Arc, task_metrics: Arc, @@ -213,7 +217,8 @@ impl OpPayloadBuilder { evm_config, pool, client, - payload_tx, + built_fb_payload_tx, + built_payload_tx, ws_pub, config, metrics, @@ -408,9 +413,16 @@ where !disable_state_root || ctx.attributes().no_tx_pool, // need to calculate state root for CL sync )?; - self.payload_tx + self.built_fb_payload_tx .try_send(payload.clone()) .map_err(PayloadBuilderError::other)?; + if let Err(e) = self.built_payload_tx.try_send(payload.clone()) { + warn!( + target: "payload_builder", + error = %e, + "Failed to send updated payload" + ); + } best_payload.set(payload); info!( @@ -832,9 +844,16 @@ where .ws_pub .publish(&fb_payload) .wrap_err("failed to publish flashblock via websocket")?; - self.payload_tx + self.built_fb_payload_tx .try_send(new_payload.clone()) .wrap_err("failed to send built payload to handler")?; + if let Err(e) = self.built_payload_tx.try_send(new_payload.clone()) { + warn!( + target: "payload_builder", + error = %e, + "Failed to send updated payload" + ); + } best_payload.set(new_payload); // Record flashblock build duration diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs b/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs index 00245bed..59a86ea0 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs @@ -34,8 +34,10 @@ use tracing::warn; /// In the case of a payload built by this node, it is broadcast to peers and an event is sent to the payload builder. /// In the case of a payload received from a peer, it is executed and if successful, an event is sent to the payload builder. pub(crate) struct PayloadHandler { - // receives new payloads built by this builder. - built_rx: mpsc::Receiver, + // receives new flashblock payloads built by this builder. + built_fb_payload_rx: mpsc::Receiver, + // receives new full block payloads built by this builder. + built_payload_rx: mpsc::Receiver, // receives incoming p2p messages from peers. p2p_rx: mpsc::Receiver, // outgoing p2p channel to broadcast new payloads to peers. @@ -58,7 +60,8 @@ where { #[allow(clippy::too_many_arguments)] pub(crate) fn new( - built_rx: mpsc::Receiver, + built_fb_payload_rx: mpsc::Receiver, + built_payload_rx: mpsc::Receiver, p2p_rx: mpsc::Receiver, p2p_tx: mpsc::Sender, payload_events_handle: tokio::sync::broadcast::Sender>, @@ -68,7 +71,8 @@ where cancel: tokio_util::sync::CancellationToken, ) -> Self { Self { - built_rx, + built_fb_payload_rx, + built_payload_rx, p2p_rx, p2p_tx, payload_events_handle, @@ -81,7 +85,8 @@ where pub(crate) async fn run(self) { let Self { - mut built_rx, + mut built_fb_payload_rx, + mut built_payload_rx, mut p2p_rx, p2p_tx, payload_events_handle, @@ -95,12 +100,15 @@ where loop { tokio::select! { - Some(payload) = built_rx.recv() => { + Some(payload) = built_fb_payload_rx.recv() => { + // ignore error here; if p2p was disabled, the channel will be closed. + let _ = p2p_tx.send(payload.into()).await; + } + Some(payload) = built_payload_rx.recv() => { + // Update engine tree state with locally built block payloads if let Err(e) = payload_events_handle.send(Events::BuiltPayload(payload.clone())) { warn!(target: "payload_builder", e = ?e, "failed to send BuiltPayload event"); } - // ignore error here; if p2p was disabled, the channel will be closed. - let _ = p2p_tx.send(payload.into()).await; } Some(message) = p2p_rx.recv() => { match message { diff --git a/crates/op-rbuilder/src/builders/flashblocks/service.rs b/crates/op-rbuilder/src/builders/flashblocks/service.rs index 650dcc96..a790cb59 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/service.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/service.rs @@ -108,6 +108,9 @@ impl FlashblocksServiceBuilder { let metrics = Arc::new(OpRBuilderMetrics::default()); let task_metrics = Arc::new(FlashblocksTaskMetrics::new()); + + // Channels for built flashblock payloads + let (built_fb_payload_tx, built_fb_payload_rx) = tokio::sync::mpsc::channel(16); let (built_payload_tx, built_payload_rx) = tokio::sync::mpsc::channel(16); let ws_pub: Arc = WebSocketPublisher::new( @@ -124,6 +127,7 @@ impl FlashblocksServiceBuilder { ctx.provider().clone(), self.0.clone(), builder_tx, + built_fb_payload_tx, built_payload_tx, ws_pub.clone(), metrics.clone(), @@ -152,6 +156,7 @@ impl FlashblocksServiceBuilder { .wrap_err("failed to create flashblocks payload builder context")?; let payload_handler = PayloadHandler::new( + built_fb_payload_rx, built_payload_rx, incoming_message_rx, outgoing_message_tx,