diff --git a/crates/op-rbuilder/src/args/op.rs b/crates/op-rbuilder/src/args/op.rs index aed72200..541e6a4d 100644 --- a/crates/op-rbuilder/src/args/op.rs +++ b/crates/op-rbuilder/src/args/op.rs @@ -240,6 +240,22 @@ pub struct FlashblocksP2pArgs { default_value = "50" )] pub p2p_max_peer_count: u32, + + /// Optional flag to send the payload to peers + #[arg( + long = "flashblocks.p2p_send_payload", + env = "FLASHBLOCK_P2P_SEND_PAYLOAD", + default_value = "false" + )] + pub p2p_send_payload: bool, + + /// Optional flag to process the payload received by peers + #[arg( + long = "flashblocks.p2p_process_payload", + env = "FLASHBLOCK_P2P_PROCESS_PAYLOAD", + default_value = "false" + )] + pub p2p_process_payload: bool, } /// Parameters for telemetry configuration diff --git a/crates/op-rbuilder/src/builders/flashblocks/config.rs b/crates/op-rbuilder/src/builders/flashblocks/config.rs index 62fa2ca6..c3f88020 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/config.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/config.rs @@ -52,6 +52,12 @@ pub struct FlashblocksConfig { /// Maximum number of peers for the p2p node pub p2p_max_peer_count: u32, + /// Optional flag to send the payload to peers + pub p2p_send_payload: bool, + + /// Optional flag to process the payload received by peers + pub p2p_process_payload: bool, + /// Maximum number of concurrent WebSocket subscribers pub ws_subscriber_limit: Option, } @@ -70,6 +76,8 @@ impl Default for FlashblocksConfig { p2p_port: 9009, p2p_private_key_file: None, p2p_known_peers: None, + p2p_send_payload: false, + p2p_process_payload: false, p2p_max_peer_count: 50, ws_subscriber_limit: None, } @@ -106,6 +114,8 @@ impl TryFrom for FlashblocksConfig { p2p_private_key_file: args.flashblocks.p2p.p2p_private_key_file, p2p_known_peers: args.flashblocks.p2p.p2p_known_peers, p2p_max_peer_count: args.flashblocks.p2p.p2p_max_peer_count, + p2p_send_payload: args.flashblocks.p2p.p2p_send_payload, + p2p_process_payload: args.flashblocks.p2p.p2p_process_payload, ws_subscriber_limit: args.flashblocks.ws_subscriber_limit, }) } diff --git a/crates/op-rbuilder/src/builders/flashblocks/p2p.rs b/crates/op-rbuilder/src/builders/flashblocks/p2p.rs index 9f947d95..5283c9c0 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/p2p.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/p2p.rs @@ -1,4 +1,5 @@ use alloy_primitives::U256; +use op_alloy_rpc_types_engine::OpFlashblockPayload; use reth::{core::primitives::SealedBlock, payload::PayloadId}; use reth_optimism_payload_builder::OpBuiltPayload as RethOpBuiltPayload; use reth_optimism_primitives::OpBlock; @@ -11,6 +12,7 @@ pub(super) const FLASHBLOCKS_STREAM_PROTOCOL: p2p::StreamProtocol = #[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] pub(super) enum Message { OpBuiltPayload(OpBuiltPayload), + OpFlashblockPayload(OpFlashblockPayload), } impl p2p::Message for Message { @@ -37,9 +39,9 @@ impl From for Message { } } -impl From for Message { - fn from(value: OpBuiltPayload) -> Self { - Message::OpBuiltPayload(value) +impl From for Message { + fn from(value: OpFlashblockPayload) -> Self { + Message::OpFlashblockPayload(value) } } diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload.rs b/crates/op-rbuilder/src/builders/flashblocks/payload.rs index 613ab5cf..49354237 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload.rs @@ -163,7 +163,7 @@ pub(super) struct OpPayloadBuilder { pub client: Client, /// Sender for sending built flashblock payloads to [`PayloadHandler`], /// which broadcasts outgoing flashblock payloads via p2p. - pub built_fb_payload_tx: mpsc::Sender, + pub built_fb_payload_tx: mpsc::Sender, /// Sender for sending built full block payloads to [`PayloadHandler`], /// which updates the engine tree state. pub built_payload_tx: mpsc::Sender, @@ -191,7 +191,7 @@ impl OpPayloadBuilder { client: Client, config: BuilderConfig, builder_tx: BuilderTx, - built_fb_payload_tx: mpsc::Sender, + built_fb_payload_tx: mpsc::Sender, built_payload_tx: mpsc::Sender, ws_pub: Arc, metrics: Arc, @@ -398,7 +398,7 @@ where )?; self.built_fb_payload_tx - .try_send(payload.clone()) + .try_send(fb_payload.clone()) .map_err(PayloadBuilderError::other)?; if let Err(e) = self.built_payload_tx.try_send(payload.clone()) { warn!( @@ -746,7 +746,7 @@ where .publish(&fb_payload) .wrap_err("failed to publish flashblock via websocket")?; self.built_fb_payload_tx - .try_send(new_payload.clone()) + .try_send(fb_payload) .wrap_err("failed to send built payload to handler")?; if let Err(e) = self.built_payload_tx.try_send(new_payload.clone()) { warn!( diff --git a/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs b/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs index 59a86ea0..d2798c18 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs @@ -1,6 +1,7 @@ use crate::{ builders::flashblocks::{ ctx::OpPayloadSyncerCtx, p2p::Message, payload::FlashblocksExecutionInfo, + wspub::WebSocketPublisher, }, primitives::reth::ExecutionInfo, traits::ClientBounds, @@ -35,7 +36,7 @@ use tracing::warn; /// In the case of a payload received from a peer, it is executed and if successful, an event is sent to the payload builder. pub(crate) struct PayloadHandler { // receives new flashblock payloads built by this builder. - built_fb_payload_rx: mpsc::Receiver, + built_fb_payload_rx: mpsc::Receiver, // receives new full block payloads built by this builder. built_payload_rx: mpsc::Receiver, // receives incoming p2p messages from peers. @@ -44,6 +45,8 @@ pub(crate) struct PayloadHandler { p2p_tx: mpsc::Sender, // sends a `Events::BuiltPayload` to the reth payload builder when a new payload is received. payload_events_handle: tokio::sync::broadcast::Sender>, + // websocket publisher for broadcasting flashblocks to all connected subscribers. + ws_pub: Arc, // context required for execution of blocks during syncing ctx: OpPayloadSyncerCtx, // chain client @@ -51,6 +54,8 @@ pub(crate) struct PayloadHandler { // task executor task_executor: Tasks, cancel: tokio_util::sync::CancellationToken, + p2p_send_payload: bool, + p2p_process_payload: bool, } impl PayloadHandler @@ -60,15 +65,18 @@ where { #[allow(clippy::too_many_arguments)] pub(crate) fn new( - built_fb_payload_rx: mpsc::Receiver, + built_fb_payload_rx: mpsc::Receiver, built_payload_rx: mpsc::Receiver, p2p_rx: mpsc::Receiver, p2p_tx: mpsc::Sender, payload_events_handle: tokio::sync::broadcast::Sender>, + ws_pub: Arc, ctx: OpPayloadSyncerCtx, client: Client, task_executor: Tasks, cancel: tokio_util::sync::CancellationToken, + p2p_send_payload: bool, + p2p_process_payload: bool, ) -> Self { Self { built_fb_payload_rx, @@ -76,10 +84,13 @@ where p2p_rx, p2p_tx, payload_events_handle, + ws_pub, ctx, client, task_executor, cancel, + p2p_send_payload, + p2p_process_payload, } } @@ -90,10 +101,13 @@ where mut p2p_rx, p2p_tx, payload_events_handle, + ws_pub, ctx, client, task_executor, cancel, + p2p_send_payload, + p2p_process_payload, } = self; tracing::info!(target: "payload_builder", "flashblocks payload handler started"); @@ -109,20 +123,42 @@ where if let Err(e) = payload_events_handle.send(Events::BuiltPayload(payload.clone())) { warn!(target: "payload_builder", e = ?e, "failed to send BuiltPayload event"); } + if p2p_send_payload { + // ignore error here; if p2p was disabled, the channel will be closed. + let _ = p2p_tx.send(payload.into()).await; + } } Some(message) = p2p_rx.recv() => { match message { Message::OpBuiltPayload(payload) => { + if !p2p_process_payload { + continue; + } + let payload: OpBuiltPayload = payload.into(); + let block_hash = payload.block().hash(); + // Check if this block is already the pending block in canonical state + if let Ok(Some(pending)) = client.pending_block() + && pending.hash() == block_hash + { + tracing::trace!( + target: "payload_builder", + hash = %block_hash, + block_number = payload.block().header().number, + "skipping flashblock execution - block already pending in canonical state" + ); + continue; + } + let ctx = ctx.clone(); let client = client.clone(); let payload_events_handle = payload_events_handle.clone(); let cancel = cancel.clone(); - // execute the flashblock on a thread where blocking is acceptable, + // execute the built full payload on a thread where blocking is acceptable, // as it's potentially a heavy operation task_executor.spawn_blocking(Box::pin(async move { - let res = execute_flashblock( + let res = execute_built_payload( payload, ctx, client, @@ -141,6 +177,12 @@ where } })); } + Message::OpFlashblockPayload(fb_payload) => { + // Skip validation as flashblock builder p2p is trusted + if let Err(e) = ws_pub.publish(&fb_payload) { + warn!(target: "payload_builder", e = ?e, "failed to publish flashblock to websocket publisher"); + } + } } } else => break, @@ -149,7 +191,7 @@ where } } -fn execute_flashblock( +fn execute_built_payload( payload: OpBuiltPayload, ctx: OpPayloadSyncerCtx, client: Client, @@ -173,7 +215,7 @@ where .wrap_err("failed to get parent header")? .ok_or_else(|| eyre::eyre!("parent header not found"))?; - // For X Layer, validate header and parent relationship before execution + // Validate header and parent relationship before execution let chain_spec = client.chain_spec(); validate_pre_execution(&payload, &parent_header, parent_hash, chain_spec.clone()) .wrap_err("pre-execution validation failed")?; diff --git a/crates/op-rbuilder/src/builders/flashblocks/service.rs b/crates/op-rbuilder/src/builders/flashblocks/service.rs index a790cb59..c689e011 100644 --- a/crates/op-rbuilder/src/builders/flashblocks/service.rs +++ b/crates/op-rbuilder/src/builders/flashblocks/service.rs @@ -149,7 +149,7 @@ impl FlashblocksServiceBuilder { let syncer_ctx = crate::builders::flashblocks::ctx::OpPayloadSyncerCtx::new( &ctx.provider().clone(), - self.0, + self.0.clone(), OpEvmConfig::optimism(ctx.chain_spec()), metrics.clone(), ) @@ -161,10 +161,13 @@ impl FlashblocksServiceBuilder { incoming_message_rx, outgoing_message_tx, payload_service.payload_events_handle(), + ws_pub.clone(), syncer_ctx, ctx.provider().clone(), ctx.task_executor().clone(), cancel, + self.0.specific.p2p_send_payload, + self.0.specific.p2p_process_payload, ); ctx.task_executor().spawn_critical(