Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions crates/builder/src/args/op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,14 +172,6 @@ impl Default for FlashblocksArgs {

#[derive(Debug, Clone, PartialEq, Eq, clap::Args)]
pub struct FlashblocksP2pArgs {
/// Enable libp2p networking for flashblock propagation
#[arg(
long = "flashblocks.p2p_enabled",
env = "FLASHBLOCK_P2P_ENABLED",
default_value = "false"
)]
pub p2p_enabled: bool,

/// Port for the flashblocks p2p node
#[arg(long = "flashblocks.p2p_port", env = "FLASHBLOCK_P2P_PORT", default_value = "9009")]
pub p2p_port: u16,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl BehaviourEvent {
continue;
}

tracing::debug!(target: "flashblocks-p2p", "mDNS discovered peer {peer_id} at {multiaddr}");
tracing::debug!(target: "payload_builder::broadcast", "mDNS discovered peer {peer_id} at {multiaddr}");
swarm.add_peer_address(peer_id, multiaddr);
swarm.dial(peer_id).unwrap_or_else(|e| {
tracing::error!("failed to dial mDNS discovered peer {peer_id}: {e}")
Expand All @@ -100,7 +100,7 @@ impl BehaviourEvent {
}
mdns::Event::Expired(list) => {
for (peer_id, multiaddr) in list {
tracing::debug!(target: "flashblocks-p2p", "mDNS expired peer {peer_id} at {multiaddr}");
tracing::debug!(target: "payload_builder::broadcast", "mDNS expired peer {peer_id} at {multiaddr}");
}
}
},
Expand Down
409 changes: 306 additions & 103 deletions crates/builder/src/p2p/mod.rs → crates/builder/src/broadcast/mod.rs

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::Message;
use super::types;
use eyre::Context;
use futures::stream::FuturesUnordered;
use libp2p::{swarm::Stream, PeerId, StreamProtocol};
Expand Down Expand Up @@ -31,38 +31,41 @@ impl StreamsHandler {
self.peers_to_stream.remove(peer);
}

pub(crate) async fn broadcast_message<M: Message>(&mut self, message: M) -> eyre::Result<()> {
pub(crate) async fn broadcast_message(
&mut self,
message: types::Message,
) -> eyre::Result<Vec<PeerId>> {
use futures::{SinkExt as _, StreamExt as _};
use tokio_util::{
codec::{FramedWrite, LinesCodec},
compat::FuturesAsyncReadCompatExt as _,
};

let protocol = message.protocol();
let payload = message.to_string().wrap_err("failed to serialize payload")?;
let payload = serde_json::to_string(&message).wrap_err("failed to serialize payload")?;

let peers = self.peers_to_stream.keys().cloned().collect::<Vec<_>>();
let mut futures = FuturesUnordered::new();
for peer in peers {
let protocol_to_stream =
self.peers_to_stream.get_mut(&peer).expect("stream map must exist for peer");
let Some(stream) = protocol_to_stream.remove(&protocol) else {
warn!(target: "flashblocks-p2p", "no stream for protocol {protocol:?} to peer {peer}");
warn!(target: "payload_builder::broadcast", "no stream for protocol {protocol:?} to peer {peer}");
continue;
};
let stream = stream.compat();
let payload = payload.clone();
let fut = async move {
let mut writer = FramedWrite::new(stream, LinesCodec::new());
writer.send(payload).await.wrap_err("failed to send message to peer")?;
Ok::<(PeerId, libp2p::swarm::Stream), eyre::ErrReport>((
peer,
writer.into_inner().into_inner(),
))
match writer.send(payload).await {
Ok(()) => Ok((peer, writer.into_inner().into_inner())),
Err(e) => Err((peer, eyre::eyre!(e))),
}
};
futures.push(fut);
}

let mut failed_peers = Vec::new();
while let Some(result) = futures.next().await {
match result {
Ok((peer, stream)) => {
Expand All @@ -72,18 +75,20 @@ impl StreamsHandler {
.expect("stream map must exist for peer");
protocol_to_stream.insert(protocol.clone(), stream);
}
Err(e) => {
warn!(target: "flashblocks-p2p", "failed to send payload to peer: {e:?}");
Err((peer, e)) => {
warn!(target: "payload_builder::broadcast", "failed to send payload to peer {peer}: {e:?}");
self.peers_to_stream.remove(&peer);
failed_peers.push(peer);
}
}
}

debug!(
target: "flashblocks-p2p",
target: "payload_builder::broadcast",
"broadcasted message to {} peers",
self.peers_to_stream.len()
);

Ok(())
Ok(failed_peers)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,17 @@ use reth_optimism_payload_builder::OpBuiltPayload as RethOpBuiltPayload;
use reth_optimism_primitives::OpBlock;

pub(crate) const AGENT_VERSION: &str = "flashblock-builder/1.0.0";
pub(crate) const FLASHBLOCKS_STREAM_PROTOCOL: crate::p2p::StreamProtocol =
crate::p2p::StreamProtocol::new("/flashblocks/1.0.0");
pub(crate) const FLASHBLOCKS_STREAM_PROTOCOL: crate::broadcast::StreamProtocol =
crate::broadcast::StreamProtocol::new("/flashblocks/1.0.0");

#[derive(Clone, Debug, PartialEq, Deserialize, Serialize)]
pub(crate) enum Message {
OpBuiltPayload(OpBuiltPayload),
OpFlashblockPayload(OpFlashblockPayload),
}

impl crate::p2p::Message for Message {
fn protocol(&self) -> crate::p2p::StreamProtocol {
impl Message {
pub(crate) fn protocol(&self) -> crate::broadcast::StreamProtocol {
FLASHBLOCKS_STREAM_PROTOCOL
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl WebSocketPublisher {
// serialize only once, then just copy around only a pointer
// to the serialized data for each subscription.
info!(
target: "payload_builder",
target: "payload_builder::broadcast",
event = "flashblock_sent",
message = "Sending flashblock to subscribers",
id = %payload.payload_id,
Expand All @@ -93,7 +93,7 @@ impl Drop for WebSocketPublisher {
fn drop(&mut self) {
// Notify the listener loop to terminate
let _ = self.term.send(true);
info!(target: "payload_builder", "WebSocketPublisher dropped, terminating listener loop");
info!(target: "payload_builder::broadcast", "WebSocketPublisher dropped, terminating listener loop");
}
}

Expand All @@ -112,7 +112,7 @@ async fn listener_loop(
.expect("Failed to convert TcpListener to tokio TcpListener");

let listen_addr = listener.local_addr().expect("Failed to get local address of listener");
info!(target: "payload_builder", "Flashblocks WebSocketPublisher listening on {listen_addr}");
info!(target: "payload_builder::broadcast", "Flashblocks WebSocketPublisher listening on {listen_addr}");

let mut term = term;

Expand Down Expand Up @@ -140,25 +140,25 @@ async fn listener_loop(
Ok(mut stream) => {
tokio::spawn(async move {
if let Some(limit) = subscriber_limit && subs.load(Ordering::Relaxed) >= limit as usize {
warn!(target: "payload_builder", "WebSocket connection for {peer_addr} rejected: subscriber limit reached");
warn!(target: "payload_builder::broadcast", "WebSocket connection for {peer_addr} rejected: subscriber limit reached");
let _ = stream.close(Some(CloseFrame {
code: CloseCode::Again,
reason: "subscriber limit reached, please try again later".into(),
})).await;
return;
}
subs.fetch_add(1, Ordering::Relaxed);
debug!(target: "payload_builder", "WebSocket connection established with {}", peer_addr);
debug!(target: "payload_builder::broadcast", "WebSocket connection established with {}", peer_addr);

// Handle the WebSocket connection in a dedicated task
broadcast_loop(stream, metrics, term, receiver_clone, sent).await;

subs.fetch_sub(1, Ordering::Relaxed);
debug!(target: "payload_builder", "WebSocket connection closed for {}", peer_addr);
debug!(target: "payload_builder::broadcast", "WebSocket connection closed for {}", peer_addr);
});
}
Err(e) => {
warn!(target: "payload_builder", "Failed to accept WebSocket connection from {peer_addr}: {e}");
warn!(target: "payload_builder::broadcast", "Failed to accept WebSocket connection from {peer_addr}: {e}");
}
}
}
Expand Down Expand Up @@ -192,7 +192,7 @@ async fn broadcast_loop(
// Check if the publisher is terminated
_ = term.changed() => {
if *term.borrow() {
info!(target: "payload_builder", "WebSocketPublisher is terminating, closing broadcast loop");
info!(target: "payload_builder::broadcast", "WebSocketPublisher is terminating, closing broadcast loop");
return;
}
}
Expand All @@ -205,30 +205,30 @@ async fn broadcast_loop(
sent.fetch_add(1, Ordering::Relaxed);
metrics.messages_sent_count.increment(1);

trace!(target: "payload_builder", "Broadcasted payload: {:?}", payload);
trace!(target: "payload_builder::broadcast", "Broadcasted payload: {:?}", payload);
if let Err(e) = stream.send(Message::Text(payload)).await {
debug!(target: "payload_builder", "Send payload error for flashblocks subscription {peer_addr}: {e}");
debug!(target: "payload_builder::broadcast", "Send payload error for flashblocks subscription {peer_addr}: {e}");
break; // Exit the loop if sending fails
}
}
Err(RecvError::Closed) => {
debug!(target: "payload_builder", "Broadcast channel closed, exiting broadcast loop");
debug!(target: "payload_builder::broadcast", "Broadcast channel closed, exiting broadcast loop");
return;
}
Err(RecvError::Lagged(_)) => {
warn!(target: "payload_builder", "Broadcast channel lagged, some messages were dropped");
warn!(target: "payload_builder::broadcast", "Broadcast channel lagged, some messages were dropped");
}
},

// Ping-pong handled by tokio_tungstenite when you perform read on the socket
message = stream.next() => if let Some(message) = message { match message {
// We handle only close frame to highlight conn closing
Ok(Message::Close(_)) => {
info!(target: "payload_builder", "Closing frame received, stopping connection for {peer_addr}");
info!(target: "payload_builder::broadcast", "Closing frame received, stopping connection for {peer_addr}");
break;
}
Err(e) => {
warn!(target: "payload_builder", "Received error. Closing flashblocks subscription for {peer_addr}: {e}");
warn!(target: "payload_builder::broadcast", "Received error. Closing flashblocks subscription for {peer_addr}: {e}");
break;
}
_ => (),
Expand Down
26 changes: 4 additions & 22 deletions crates/builder/src/flashblocks/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ use crate::{
context::FlashblocksBuilderCtx,
generator::{BlockCell, BuildArguments, PayloadBuilder},
timing::FlashblockScheduler,
utils::{
cache::FlashblockPayloadsCache, execution::ExecutionInfo, wspub::WebSocketPublisher,
},
utils::{cache::FlashblockPayloadsCache, execution::ExecutionInfo},
BuilderConfig,
},
metrics::tokio::FlashblocksTaskMetrics,
Expand Down Expand Up @@ -201,9 +199,6 @@ pub(super) struct FlashblocksBuilder<Pool, Client, Tasks> {
pub built_payload_tx: mpsc::Sender<OpBuiltPayload>,
/// Cache for externally received pending flashblocks transactions received via p2p.
pub p2p_cache: FlashblockPayloadsCache,
/// WebSocket publisher for broadcasting flashblocks
/// to all connected subscribers.
pub ws_pub: Arc<WebSocketPublisher>,
/// System configuration for the builder
pub config: BuilderConfig,
/// The metrics for the builder
Expand All @@ -229,7 +224,6 @@ impl<Pool, Client, Tasks> FlashblocksBuilder<Pool, Client, Tasks> {
built_fb_payload_tx: mpsc::Sender<OpFlashblockPayload>,
built_payload_tx: mpsc::Sender<OpBuiltPayload>,
p2p_cache: FlashblockPayloadsCache,
ws_pub: Arc<WebSocketPublisher>,
metrics: Arc<BuilderMetrics>,
task_metrics: Arc<FlashblocksTaskMetrics>,
) -> Self {
Expand All @@ -241,7 +235,6 @@ impl<Pool, Client, Tasks> FlashblocksBuilder<Pool, Client, Tasks> {
built_fb_payload_tx,
built_payload_tx,
p2p_cache,
ws_pub,
config,
metrics,
builder_tx,
Expand Down Expand Up @@ -421,12 +414,6 @@ where
// We should always calculate state root for fallback payload
let (fallback_payload, fb_payload, bundle_state, new_tx_hashes) =
build_block(&mut state, &ctx, &mut info, Some(&mut fb_state), true)?;
// For X Layer - skip if replaying
if !rebuild_external_payload {
self.built_fb_payload_tx
.try_send(fb_payload.clone())
.map_err(PayloadBuilderError::other)?;
}
let mut best_payload = (fallback_payload.clone(), bundle_state);

info!(
Expand All @@ -438,9 +425,9 @@ where
// not emitting flashblock if no_tx_pool in FCU, it's just syncing
// For X Layer - skip if replaying
if !ctx.attributes().no_tx_pool && !rebuild_external_payload {
let flashblock_byte_size =
self.ws_pub.publish(&fb_payload).map_err(PayloadBuilderError::other)?;
ctx.metrics.flashblock_byte_size_histogram.record(flashblock_byte_size as f64);
self.built_fb_payload_tx
.try_send(fb_payload.clone())
.map_err(PayloadBuilderError::other)?;

// For X Layer, full link monitoring support
crate::flashblocks::utils::monitor::monitor(
Expand Down Expand Up @@ -757,18 +744,13 @@ where
fb_payload.index = flashblock_index;
fb_payload.base = None;

let flashblock_byte_size = self
.ws_pub
.publish(&fb_payload)
.wrap_err("failed to publish flashblock via websocket")?;
self.built_fb_payload_tx
.try_send(fb_payload)
.wrap_err("failed to send built payload to handler")?;
*best_payload = (new_payload, bundle_state);

// Record flashblock build duration
ctx.metrics.flashblock_build_duration.record(flashblock_build_start_time.elapsed());
ctx.metrics.flashblock_byte_size_histogram.record(flashblock_byte_size as f64);
ctx.metrics
.flashblock_num_tx_histogram
.record(info.executed_transactions.len() as f64);
Expand Down
10 changes: 4 additions & 6 deletions crates/builder/src/flashblocks/handler.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use crate::{
broadcast::{wspub::WebSocketPublisher, Message},
flashblocks::{
handler_ctx::FlashblockHandlerContext,
utils::{
cache::FlashblockPayloadsCache, execution::ExecutionInfo, p2p::Message,
wspub::WebSocketPublisher,
},
utils::{cache::FlashblockPayloadsCache, execution::ExecutionInfo},
},
traits::ClientBounds,
};
Expand Down Expand Up @@ -124,7 +122,7 @@ where
loop {
tokio::select! {
Some(payload) = built_fb_payload_rx.recv() => {
// ignore error here; if p2p was disabled, the channel will be closed.
// ignore send error (broadcast node may have shut down)
let _ = p2p_tx.send(Message::from_flashblock_payload(payload)).await;
}
Some(payload) = built_payload_rx.recv() => {
Expand All @@ -133,7 +131,7 @@ where
warn!(target: "payload_builder", e = ?e, "failed to send BuiltPayload event");
}
if p2p_send_full_payload_flag {
// ignore error here; if p2p was disabled, the channel will be closed.
// ignore send error (broadcast node may have shut down)
let _ = p2p_tx.send(Message::from_built_payload(payload)).await;
}
}
Expand Down
8 changes: 2 additions & 6 deletions crates/builder/src/flashblocks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ mod service;
mod timing;
pub(crate) mod utils;

pub use crate::broadcast::wspub::WebSocketPublisher;
pub use context::FlashblocksBuilderCtx;
pub use service::FlashblocksServiceBuilder;
pub use utils::{cache::FlashblockPayloadsCache, wspub::WebSocketPublisher};
pub use utils::cache::FlashblockPayloadsCache;

/// Configuration values that are specific to the flashblocks builder.
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -54,9 +55,6 @@ pub struct FlashblocksConfig {
/// This serves as a buffer time to account for the last flashblock being delayed.
pub end_buffer_ms: u64,

/// Whether to enable the p2p node for flashblocks
pub p2p_enabled: bool,

/// Port for the p2p node
pub p2p_port: u16,

Expand Down Expand Up @@ -92,7 +90,6 @@ impl Default for FlashblocksConfig {
number_contract_address: None,
send_offset_ms: 0,
end_buffer_ms: 0,
p2p_enabled: false,
p2p_port: 9009,
p2p_private_key_file: None,
p2p_known_peers: None,
Expand Down Expand Up @@ -212,7 +209,6 @@ impl TryFrom<BuilderArgs> for BuilderConfig {
number_contract_address,
send_offset_ms: args.flashblocks.flashblocks_send_offset_ms,
end_buffer_ms: args.flashblocks.flashblocks_end_buffer_ms,
p2p_enabled: args.flashblocks.p2p.p2p_enabled,
p2p_port: args.flashblocks.p2p.p2p_port,
p2p_private_key_file: args.flashblocks.p2p.p2p_private_key_file,
p2p_known_peers: args.flashblocks.p2p.p2p_known_peers,
Expand Down
Loading