Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions crates/op-rbuilder/src/args/op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,22 @@ pub struct FlashblocksP2pArgs {
default_value = "50"
)]
pub p2p_max_peer_count: u32,

/// Optional flag to send the payload to peers
#[arg(
long = "flashblocks.p2p_send_payload",
env = "FLASHBLOCK_P2P_SEND_PAYLOAD",
default_value = "false"
)]
pub p2p_send_payload: bool,

/// Optional flag to process the payload received by peers
#[arg(
long = "flashblocks.p2p_process_payload",
env = "FLASHBLOCK_P2P_PROCESS_PAYLOAD",
default_value = "false"
)]
pub p2p_process_payload: bool,
}

/// Parameters for telemetry configuration
Expand Down
10 changes: 10 additions & 0 deletions crates/op-rbuilder/src/builders/flashblocks/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ pub struct FlashblocksConfig {
/// Maximum number of peers for the p2p node
pub p2p_max_peer_count: u32,

/// Optional flag to send the payload to peers
pub p2p_send_payload: bool,

/// Optional flag to process the payload received by peers
pub p2p_process_payload: bool,

/// Maximum number of concurrent WebSocket subscribers
pub ws_subscriber_limit: Option<u16>,
}
Expand All @@ -70,6 +76,8 @@ impl Default for FlashblocksConfig {
p2p_port: 9009,
p2p_private_key_file: None,
p2p_known_peers: None,
p2p_send_payload: false,
p2p_process_payload: false,
p2p_max_peer_count: 50,
ws_subscriber_limit: None,
}
Expand Down Expand Up @@ -106,6 +114,8 @@ impl TryFrom<OpRbuilderArgs> for FlashblocksConfig {
p2p_private_key_file: args.flashblocks.p2p.p2p_private_key_file,
p2p_known_peers: args.flashblocks.p2p.p2p_known_peers,
p2p_max_peer_count: args.flashblocks.p2p.p2p_max_peer_count,
p2p_send_payload: args.flashblocks.p2p.p2p_send_payload,
p2p_process_payload: args.flashblocks.p2p.p2p_process_payload,
ws_subscriber_limit: args.flashblocks.ws_subscriber_limit,
})
}
Expand Down
8 changes: 5 additions & 3 deletions crates/op-rbuilder/src/builders/flashblocks/p2p.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use alloy_primitives::U256;
use op_alloy_rpc_types_engine::OpFlashblockPayload;
use reth::{core::primitives::SealedBlock, payload::PayloadId};
use reth_optimism_payload_builder::OpBuiltPayload as RethOpBuiltPayload;
use reth_optimism_primitives::OpBlock;
Expand All @@ -11,6 +12,7 @@ pub(super) const FLASHBLOCKS_STREAM_PROTOCOL: p2p::StreamProtocol =
#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
pub(super) enum Message {
OpBuiltPayload(OpBuiltPayload),
OpFlashblockPayload(OpFlashblockPayload),
}

impl p2p::Message for Message {
Expand All @@ -37,9 +39,9 @@ impl From<RethOpBuiltPayload> for Message {
}
}

impl From<OpBuiltPayload> for Message {
fn from(value: OpBuiltPayload) -> Self {
Message::OpBuiltPayload(value)
impl From<OpFlashblockPayload> for Message {
fn from(value: OpFlashblockPayload) -> Self {
Message::OpFlashblockPayload(value)
}
}

Expand Down
8 changes: 4 additions & 4 deletions crates/op-rbuilder/src/builders/flashblocks/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ pub(super) struct OpPayloadBuilder<Pool, Client, BuilderTx> {
pub client: Client,
/// Sender for sending built flashblock payloads to [`PayloadHandler`],
/// which broadcasts outgoing flashblock payloads via p2p.
pub built_fb_payload_tx: mpsc::Sender<OpBuiltPayload>,
pub built_fb_payload_tx: mpsc::Sender<OpFlashblockPayload>,
/// Sender for sending built full block payloads to [`PayloadHandler`],
/// which updates the engine tree state.
pub built_payload_tx: mpsc::Sender<OpBuiltPayload>,
Expand Down Expand Up @@ -191,7 +191,7 @@ impl<Pool, Client, BuilderTx> OpPayloadBuilder<Pool, Client, BuilderTx> {
client: Client,
config: BuilderConfig<FlashblocksConfig>,
builder_tx: BuilderTx,
built_fb_payload_tx: mpsc::Sender<OpBuiltPayload>,
built_fb_payload_tx: mpsc::Sender<OpFlashblockPayload>,
built_payload_tx: mpsc::Sender<OpBuiltPayload>,
ws_pub: Arc<WebSocketPublisher>,
metrics: Arc<OpRBuilderMetrics>,
Expand Down Expand Up @@ -398,7 +398,7 @@ where
)?;

self.built_fb_payload_tx
.try_send(payload.clone())
.try_send(fb_payload.clone())
.map_err(PayloadBuilderError::other)?;
if let Err(e) = self.built_payload_tx.try_send(payload.clone()) {
warn!(
Expand Down Expand Up @@ -746,7 +746,7 @@ where
.publish(&fb_payload)
.wrap_err("failed to publish flashblock via websocket")?;
self.built_fb_payload_tx
.try_send(new_payload.clone())
.try_send(fb_payload)
.wrap_err("failed to send built payload to handler")?;
if let Err(e) = self.built_payload_tx.try_send(new_payload.clone()) {
warn!(
Expand Down
54 changes: 48 additions & 6 deletions crates/op-rbuilder/src/builders/flashblocks/payload_handler.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::{
builders::flashblocks::{
ctx::OpPayloadSyncerCtx, p2p::Message, payload::FlashblocksExecutionInfo,
wspub::WebSocketPublisher,
},
primitives::reth::ExecutionInfo,
traits::ClientBounds,
Expand Down Expand Up @@ -35,7 +36,7 @@ use tracing::warn;
/// In the case of a payload received from a peer, it is executed and if successful, an event is sent to the payload builder.
pub(crate) struct PayloadHandler<Client, Tasks> {
// receives new flashblock payloads built by this builder.
built_fb_payload_rx: mpsc::Receiver<OpBuiltPayload>,
built_fb_payload_rx: mpsc::Receiver<OpFlashblockPayload>,
// receives new full block payloads built by this builder.
built_payload_rx: mpsc::Receiver<OpBuiltPayload>,
// receives incoming p2p messages from peers.
Expand All @@ -44,13 +45,17 @@ pub(crate) struct PayloadHandler<Client, Tasks> {
p2p_tx: mpsc::Sender<Message>,
// sends a `Events::BuiltPayload` to the reth payload builder when a new payload is received.
payload_events_handle: tokio::sync::broadcast::Sender<Events<OpEngineTypes>>,
// websocket publisher for broadcasting flashblocks to all connected subscribers.
ws_pub: Arc<WebSocketPublisher>,
Comment on lines +48 to +49
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't the payload builder already broadcast flashblocks over websocket?

// context required for execution of blocks during syncing
ctx: OpPayloadSyncerCtx,
// chain client
client: Client,
// task executor
task_executor: Tasks,
cancel: tokio_util::sync::CancellationToken,
p2p_send_payload: bool,
p2p_process_payload: bool,
}

impl<Client, Tasks> PayloadHandler<Client, Tasks>
Expand All @@ -60,26 +65,32 @@ where
{
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
built_fb_payload_rx: mpsc::Receiver<OpBuiltPayload>,
built_fb_payload_rx: mpsc::Receiver<OpFlashblockPayload>,
built_payload_rx: mpsc::Receiver<OpBuiltPayload>,
p2p_rx: mpsc::Receiver<Message>,
p2p_tx: mpsc::Sender<Message>,
payload_events_handle: tokio::sync::broadcast::Sender<Events<OpEngineTypes>>,
ws_pub: Arc<WebSocketPublisher>,
ctx: OpPayloadSyncerCtx,
client: Client,
task_executor: Tasks,
cancel: tokio_util::sync::CancellationToken,
p2p_send_payload: bool,
p2p_process_payload: bool,
) -> Self {
Self {
built_fb_payload_rx,
built_payload_rx,
p2p_rx,
p2p_tx,
payload_events_handle,
ws_pub,
ctx,
client,
task_executor,
cancel,
p2p_send_payload,
p2p_process_payload,
}
}

Expand All @@ -90,10 +101,13 @@ where
mut p2p_rx,
p2p_tx,
payload_events_handle,
ws_pub,
ctx,
client,
task_executor,
cancel,
p2p_send_payload,
p2p_process_payload,
} = self;

tracing::info!(target: "payload_builder", "flashblocks payload handler started");
Expand All @@ -109,20 +123,42 @@ where
if let Err(e) = payload_events_handle.send(Events::BuiltPayload(payload.clone())) {
warn!(target: "payload_builder", e = ?e, "failed to send BuiltPayload event");
}
if p2p_send_payload {
// ignore error here; if p2p was disabled, the channel will be closed.
let _ = p2p_tx.send(payload.into()).await;
}
}
Some(message) = p2p_rx.recv() => {
match message {
Message::OpBuiltPayload(payload) => {
if !p2p_process_payload {
continue;
}

let payload: OpBuiltPayload = payload.into();
let block_hash = payload.block().hash();
// Check if this block is already the pending block in canonical state
if let Ok(Some(pending)) = client.pending_block()
&& pending.hash() == block_hash
{
tracing::trace!(
target: "payload_builder",
hash = %block_hash,
block_number = payload.block().header().number,
"skipping flashblock execution - block already pending in canonical state"
);
continue;
}

let ctx = ctx.clone();
let client = client.clone();
let payload_events_handle = payload_events_handle.clone();
let cancel = cancel.clone();

// execute the flashblock on a thread where blocking is acceptable,
// execute the built full payload on a thread where blocking is acceptable,
// as it's potentially a heavy operation
task_executor.spawn_blocking(Box::pin(async move {
let res = execute_flashblock(
let res = execute_built_payload(
payload,
ctx,
client,
Expand All @@ -141,6 +177,12 @@ where
}
}));
}
Message::OpFlashblockPayload(fb_payload) => {
// Skip validation as flashblock builder p2p is trusted
if let Err(e) = ws_pub.publish(&fb_payload) {
warn!(target: "payload_builder", e = ?e, "failed to publish flashblock to websocket publisher");
}
}
}
}
else => break,
Expand All @@ -149,7 +191,7 @@ where
}
}

fn execute_flashblock<Client>(
fn execute_built_payload<Client>(
payload: OpBuiltPayload,
ctx: OpPayloadSyncerCtx,
client: Client,
Expand All @@ -173,7 +215,7 @@ where
.wrap_err("failed to get parent header")?
.ok_or_else(|| eyre::eyre!("parent header not found"))?;

// For X Layer, validate header and parent relationship before execution
// Validate header and parent relationship before execution
let chain_spec = client.chain_spec();
validate_pre_execution(&payload, &parent_header, parent_hash, chain_spec.clone())
.wrap_err("pre-execution validation failed")?;
Expand Down
5 changes: 4 additions & 1 deletion crates/op-rbuilder/src/builders/flashblocks/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ impl FlashblocksServiceBuilder {

let syncer_ctx = crate::builders::flashblocks::ctx::OpPayloadSyncerCtx::new(
&ctx.provider().clone(),
self.0,
self.0.clone(),
OpEvmConfig::optimism(ctx.chain_spec()),
metrics.clone(),
)
Expand All @@ -161,10 +161,13 @@ impl FlashblocksServiceBuilder {
incoming_message_rx,
outgoing_message_tx,
payload_service.payload_events_handle(),
ws_pub.clone(),
syncer_ctx,
ctx.provider().clone(),
ctx.task_executor().clone(),
cancel,
self.0.specific.p2p_send_payload,
self.0.specific.p2p_process_payload,
);

ctx.task_executor().spawn_critical(
Expand Down
Loading