diff --git a/crates/builder/src/args/op.rs b/crates/builder/src/args/op.rs index 2ef62099..a0a16275 100644 --- a/crates/builder/src/args/op.rs +++ b/crates/builder/src/args/op.rs @@ -172,14 +172,6 @@ impl Default for FlashblocksArgs { #[derive(Debug, Clone, PartialEq, Eq, clap::Args)] pub struct FlashblocksP2pArgs { - /// Enable libp2p networking for flashblock propagation - #[arg( - long = "flashblocks.p2p_enabled", - env = "FLASHBLOCK_P2P_ENABLED", - default_value = "false" - )] - pub p2p_enabled: bool, - /// Port for the flashblocks p2p node #[arg(long = "flashblocks.p2p_port", env = "FLASHBLOCK_P2P_PORT", default_value = "9009")] pub p2p_port: u16, diff --git a/crates/builder/src/p2p/behaviour.rs b/crates/builder/src/broadcast/behaviour.rs similarity index 92% rename from crates/builder/src/p2p/behaviour.rs rename to crates/builder/src/broadcast/behaviour.rs index 88a2938a..bf834fcd 100644 --- a/crates/builder/src/p2p/behaviour.rs +++ b/crates/builder/src/broadcast/behaviour.rs @@ -91,7 +91,7 @@ impl BehaviourEvent { continue; } - tracing::debug!(target: "flashblocks-p2p", "mDNS discovered peer {peer_id} at {multiaddr}"); + tracing::debug!(target: "payload_builder::broadcast", "mDNS discovered peer {peer_id} at {multiaddr}"); swarm.add_peer_address(peer_id, multiaddr); swarm.dial(peer_id).unwrap_or_else(|e| { tracing::error!("failed to dial mDNS discovered peer {peer_id}: {e}") @@ -100,7 +100,7 @@ impl BehaviourEvent { } mdns::Event::Expired(list) => { for (peer_id, multiaddr) in list { - tracing::debug!(target: "flashblocks-p2p", "mDNS expired peer {peer_id} at {multiaddr}"); + tracing::debug!(target: "payload_builder::broadcast", "mDNS expired peer {peer_id} at {multiaddr}"); } } }, diff --git a/crates/builder/src/p2p/mod.rs b/crates/builder/src/broadcast/mod.rs similarity index 54% rename from crates/builder/src/p2p/mod.rs rename to crates/builder/src/broadcast/mod.rs index 1348fb6a..7fbfc226 100644 --- a/crates/builder/src/p2p/mod.rs +++ b/crates/builder/src/broadcast/mod.rs @@ -1,9 +1,13 @@ mod behaviour; mod outgoing; +pub(crate) mod types; +pub(crate) mod wspub; use behaviour::Behaviour; use libp2p_stream::IncomingStreams; +use wspub::WebSocketPublisher; +use crate::metrics::BuilderMetrics; use eyre::Context; use libp2p::{ dns, @@ -15,6 +19,7 @@ use libp2p::{ use multiaddr::Protocol; use std::{ collections::{HashMap, HashSet}, + sync::Arc, time::Duration, }; use tokio::sync::mpsc; @@ -22,38 +27,26 @@ use tokio_util::sync::CancellationToken; use tracing::{debug, warn}; pub use libp2p::{Multiaddr, StreamProtocol}; +pub(crate) use types::Message; const DEFAULT_MAX_PEER_COUNT: u32 = 50; -const DEFAULT_PEER_RETRY_INTERVAL: Duration = Duration::from_secs(60); +const DEFAULT_PEER_RETRY_INTERVAL: Duration = Duration::from_secs(1); -/// A message that can be sent between peers. -pub trait Message: - serde::Serialize + for<'de> serde::Deserialize<'de> + Send + Sync + Clone + std::fmt::Debug -{ - fn protocol(&self) -> StreamProtocol; - - fn to_string(&self) -> eyre::Result { - serde_json::to_string(self).wrap_err("failed to serialize message to string") - } - - fn from_str(s: &str) -> eyre::Result - where - Self: Sized, - { - serde_json::from_str(s).wrap_err("failed to deserialize message from string") - } -} - -/// The libp2p node. +/// The broadcast node. /// /// The current behaviour of the node regarding messaging protocols is as follows: -/// - for each supported protocol, the node will accept incoming streams from remote peers on that protocol. -/// - when a new connection is established with a peer, the node will open outbound streams to that peer for each supported protocol. -/// - when a new outgoing message is received on `outgoing_message_rx`, the node will broadcast that message to all connected peers that have an outbound stream open for the message's protocol. -/// - incoming messages received on incoming streams are handled by `IncomingStreamsHandler`, which reads messages from the stream and sends them to a channel for processing by the consumer of this library. +/// - for each supported protocol, the node will accept incoming streams from remote peers on that +/// protocol. +/// - when a new connection is established with a peer, the node will open outbound streams to that +/// peer for each supported protocol. +/// - when a new outgoing message is received on `outgoing_message_rx`, the node will broadcast that +/// message to all connected peers that have an outbound stream open for the message's protocol. +/// - incoming messages received on incoming streams are handled by `IncomingStreamsHandler`, which +/// reads messages from the stream and sends them to a channel for processing by the consumer of +/// this library. /// /// Currently, there is no gossip implemented; messages are simply broadcast to connected peers. -pub struct Node { +pub struct Node { /// The peer ID of this node. peer_id: PeerId, @@ -68,7 +61,7 @@ pub struct Node { known_peers: Vec, /// Receiver for outgoing messages to be sent to peers. - outgoing_message_rx: mpsc::Receiver, + outgoing_message_rx: mpsc::Receiver, /// Handler for managing outgoing streams to peers. /// Used to determine what peers to broadcast to when a @@ -76,16 +69,23 @@ pub struct Node { outgoing_streams_handler: outgoing::StreamsHandler, /// Handlers for incoming streams (streams which remote peers have opened with us). - incoming_streams_handlers: Vec>, + incoming_streams_handlers: Vec, /// The protocols this node supports. protocols: Vec, /// Cancellation token to shut down the node. cancellation_token: CancellationToken, + + /// WebSocket publisher for broadcasting flashblocks + /// to all connected subscribers. + ws_pub: Arc, + + /// The metrics for the builder + metrics: Arc, } -impl Node { +impl Node { /// Returns the multiaddresses that this node is listening on, with the peer ID included. pub fn multiaddrs(&self) -> Vec { self.listen_addrs @@ -94,7 +94,9 @@ impl Node { .collect() } - /// Runs the p2p node, dials known peers, and starts listening for incoming connections and messages. + /// Runs the broadcast node. + /// 1. Dials known peers, and starts listening for incoming connections and messages. + /// 2. Publishes flashblocks to websocket subscribers /// /// This function will run until the cancellation token is triggered. /// If an error occurs, it will be logged, but the node will continue running. @@ -111,6 +113,8 @@ impl Node { cancellation_token, incoming_streams_handlers, protocols, + ws_pub, + metrics, } = self; for addr in listen_addrs { @@ -142,7 +146,7 @@ impl Node { tokio::select! { biased; _ = cancellation_token.cancelled() => { - debug!(target: "flashblocks-p2p", "cancellation token triggered, shutting down node"); + debug!(target: "payload_builder::broadcast", "cancellation token triggered, shutting down node"); handles.into_iter().for_each(|h| h.abort()); break Ok(()); } @@ -151,19 +155,76 @@ impl Node { let connected_peers: HashSet = swarm.connected_peers().copied().collect(); for (peer_id, address) in &known_peers_info { if !connected_peers.contains(peer_id) { - debug!(target: "flashblocks-p2p", "retrying connection to disconnected known peer {peer_id} at {address}"); + debug!(target: "payload_builder::broadcast", "retrying connection to disconnected known peer {peer_id} at {address}"); swarm.add_peer_address(*peer_id, address.clone()); if let Err(e) = swarm.dial(address.clone()) { - warn!(target: "flashblocks-p2p", "failed to retry dial to known peer {peer_id} at {address}: {e:?}"); + warn!(target: "payload_builder::broadcast", "failed to retry dial to known peer {peer_id} at {address}: {e:?}"); } } } } Some(message) = outgoing_message_rx.recv() => { let protocol = message.protocol(); - debug!(target: "flashblocks-p2p", "received message to broadcast on protocol {protocol}"); - if let Err(e) = outgoing_streams_handler.broadcast_message(message).await { - warn!(target: "flashblocks-p2p", "failed to broadcast message on protocol {protocol}: {e:?}"); + debug!(target: "payload_builder::broadcast", "received message to broadcast on protocol {protocol}"); + + // NOTE on broadcast ordering and failure semantics: + // `broadcast_message` sends to all connected peers concurrently and + // awaits until every peer's TCP send completes (or fails). This is a + // blocking wait — WS publish below only runs after all peer sends have + // resolved. TCP send success means the kernel accepted the bytes into + // the send buffer. + // + // However this means that networking layer failures are swallowed, as + // only serialization errors are propagated. This means WS publish may + // proceed as networking failures can be silently drop. + // + // This design is intentional and the re-org risk the builder trade-off + // for lower latency - since failures/switches in builder are very small. + // The less strict (no ack of message deliveries) allow a lower latency + // in the gossiping of new flashblock payloads to websocket subscribers. + match outgoing_streams_handler.broadcast_message(message.clone()).await { + Ok(failed_peers) => { + // For each peer whose stream send failed, immediately attempt to + // re-open a new yamux stream on the existing TCP connection. + // This recovers from application-level stream closes without + // requiring a full TCP reconnect. + for &peer_id in &failed_peers { + for proto in &protocols { + match swarm + .behaviour_mut() + .new_control() + .open_stream(peer_id, proto.clone()) + .await + { + Ok(stream) => { + outgoing_streams_handler + .insert_peer_and_stream(peer_id, proto.clone(), stream); + warn!(target: "payload_builder::broadcast", "recovered stream to peer {peer_id} on protocol {proto}"); + } + Err(e) => { + warn!(target: "payload_builder::broadcast", "failed to recover stream to peer {peer_id}: {e:?}"); + } + } + } + } + if !failed_peers.is_empty() { + continue; + } + } + Err(e) => { + warn!(target: "payload_builder::broadcast", "failed to broadcast message on protocol {protocol}: {e:?}"); + continue; + } + } + if let Message::OpFlashblockPayload(ref fb_payload) = message { + match ws_pub.publish(fb_payload) { + Ok(flashblock_byte_size) => { + metrics.flashblock_byte_size_histogram.record(flashblock_byte_size as f64); + } + Err(e) => { + warn!(target: "payload_builder::broadcast", "failed to publish flashblock to ws subscribers: {e:?}"); + } + } } } event = swarm.select_next_some() => { @@ -172,10 +233,10 @@ impl Node { address, .. } => { - debug!(target: "flashblocks-p2p", "new listen address: {address}"); + debug!(target: "payload_builder::broadcast", "new listen address: {address}"); } SwarmEvent::ExternalAddrConfirmed { address } => { - debug!(target: "flashblocks-p2p", "external address confirmed: {address}"); + debug!(target: "payload_builder::broadcast", "external address confirmed: {address}"); } SwarmEvent::ConnectionEstablished { peer_id, @@ -184,7 +245,7 @@ impl Node { } => { // when a new connection is established, open outbound streams for each protocol // and add them to the outgoing streams handler. - debug!(target: "flashblocks-p2p", "fb p2p connection established with peer {peer_id}"); + debug!(target: "payload_builder::broadcast", "fb p2p connection established with peer {peer_id}"); if !outgoing_streams_handler.has_peer(&peer_id) { for protocol in &protocols { match swarm @@ -194,10 +255,10 @@ impl Node { .await { Ok(stream) => { outgoing_streams_handler.insert_peer_and_stream(peer_id, protocol.clone(), stream); - debug!(target: "flashblocks-p2p", "opened outbound stream with peer {peer_id} with protocol {protocol} on connection {connection_id}"); + debug!(target: "payload_builder::broadcast", "opened outbound stream with peer {peer_id} with protocol {protocol} on connection {connection_id}"); } Err(e) => { - warn!(target: "flashblocks-p2p", "failed to open stream with peer {peer_id} on connection {connection_id}: {e:?}"); + warn!(target: "payload_builder::broadcast", "failed to open stream with peer {peer_id} on connection {connection_id}: {e:?}"); } } } @@ -208,7 +269,7 @@ impl Node { cause, .. } => { - debug!(target: "flashblocks-p2p", "connection closed with peer {peer_id}: {cause:?}"); + debug!(target: "payload_builder::broadcast", "connection closed with peer {peer_id}: {cause:?}"); outgoing_streams_handler.remove_peer(&peer_id); } SwarmEvent::Behaviour(event) => event.handle(&mut swarm), @@ -220,10 +281,10 @@ impl Node { } } -pub struct NodeBuildResult { - pub node: Node, - pub outgoing_message_tx: mpsc::Sender, - pub incoming_message_rxs: HashMap>, +pub struct NodeBuildResult { + pub node: Node, + pub outgoing_message_tx: mpsc::Sender, + pub incoming_message_rxs: HashMap>, } pub struct NodeBuilder { @@ -235,16 +296,12 @@ pub struct NodeBuilder { protocols: Vec, max_peer_count: Option, cancellation_token: Option, -} - -impl Default for NodeBuilder { - fn default() -> Self { - Self::new() - } + ws_pub: Arc, + metrics: Arc, } impl NodeBuilder { - pub fn new() -> Self { + pub fn new(ws_pub: Arc, metrics: Arc) -> Self { Self { port: None, listen_addrs: Vec::new(), @@ -254,6 +311,8 @@ impl NodeBuilder { protocols: Vec::new(), max_peer_count: None, cancellation_token: None, + ws_pub, + metrics, } } @@ -307,7 +366,7 @@ impl NodeBuilder { self } - pub fn try_build(self) -> eyre::Result> { + pub fn try_build(self) -> eyre::Result { let Self { port, listen_addrs, @@ -317,6 +376,8 @@ impl NodeBuilder { protocols, max_peer_count, cancellation_token, + ws_pub, + metrics, } = self; // TODO: caller should be forced to provide this @@ -401,6 +462,8 @@ impl NodeBuilder { cancellation_token, incoming_streams_handlers, protocols, + ws_pub, + metrics, }, outgoing_message_tx, incoming_message_rxs, @@ -408,19 +471,19 @@ impl NodeBuilder { } } -struct IncomingStreamsHandler { +struct IncomingStreamsHandler { protocol: StreamProtocol, incoming: IncomingStreams, - tx: mpsc::Sender, + tx: mpsc::Sender, cancellation_token: CancellationToken, } -impl IncomingStreamsHandler { +impl IncomingStreamsHandler { fn new( protocol: StreamProtocol, incoming: IncomingStreams, cancellation_token: CancellationToken, - ) -> (Self, mpsc::Receiver) { + ) -> (Self, mpsc::Receiver) { const CHANNEL_SIZE: usize = 100; let (tx, rx) = mpsc::channel(CHANNEL_SIZE); (Self { protocol, incoming, tx, cancellation_token }, rx) @@ -435,21 +498,21 @@ impl IncomingStreamsHandler { loop { tokio::select! { _ = cancellation_token.cancelled() => { - debug!(target: "flashblocks-p2p", "cancellation token triggered, shutting down incoming streams handler for protocol {protocol}"); + debug!(target: "payload_builder::broadcast", "cancellation token triggered, shutting down incoming streams handler for protocol {protocol}"); return; } Some((from, stream)) = incoming.next() => { - debug!(target: "flashblocks-p2p", "new incoming stream on protocol {protocol} from peer {from}"); + debug!(target: "payload_builder::broadcast", "new incoming stream on protocol {protocol} from peer {from}"); handle_stream_futures.push(tokio::spawn(handle_incoming_stream(from, stream, tx.clone()))); } Some(res) = handle_stream_futures.next() => { match res { Ok(Ok(())) => {} Ok(Err(e)) => { - warn!(target: "flashblocks-p2p", "error handling incoming stream: {e:?}"); + warn!(target: "payload_builder::broadcast", "error handling incoming stream: {e:?}"); } Err(e) => { - warn!(target: "flashblocks-p2p", "task handling incoming stream panicked: {e:?}"); + warn!(target: "payload_builder::broadcast", "task handling incoming stream panicked: {e:?}"); } } } @@ -458,10 +521,10 @@ impl IncomingStreamsHandler { } } -async fn handle_incoming_stream( +async fn handle_incoming_stream( peer_id: PeerId, stream: libp2p::Stream, - payload_tx: mpsc::Sender, + payload_tx: mpsc::Sender, ) -> eyre::Result<()> { use futures::StreamExt as _; use tokio_util::{ @@ -475,8 +538,9 @@ async fn handle_incoming_stream( while let Some(res) = reader.next().await { match res { Ok(str) => { - let payload = M::from_str(&str).wrap_err("failed to decode stream message")?; - debug!(target: "flashblocks-p2p", "got message from peer {peer_id}: {payload:?}"); + let payload = serde_json::from_str::(&str) + .wrap_err("failed to decode stream message")?; + debug!(target: "payload_builder::broadcast", "got message from peer {peer_id}: {payload:?}"); let _ = payload_tx.send(payload).await; } Err(e) => { @@ -507,9 +571,11 @@ fn create_transport( #[cfg(test)] mod test { use super::*; + use crate::broadcast::wspub::WebSocketPublisher; + use crate::metrics::{tokio::FlashblocksTaskMetrics, BuilderMetrics}; + use op_alloy_rpc_types_engine::OpFlashblockPayload; const TEST_AGENT_VERSION: &str = "test/1.0.0"; - const TEST_PROTOCOL: StreamProtocol = StreamProtocol::new("/test/1.0.0"); /// Binds two ephemeral ports on 127.0.0.1, guaranteeing they are distinct. /// Returns `(port1, port2, guard1, guard2)` — callers must hold the guards @@ -532,34 +598,19 @@ mod test { panic!("failed to obtain two distinct ephemeral ports after {MAX_RETRIES} retries"); } - #[derive(Debug, PartialEq, Eq, Clone)] - struct TestMessage { - content: String, - } - - impl Message for TestMessage { - fn protocol(&self) -> StreamProtocol { - TEST_PROTOCOL - } - } - - impl serde::Serialize for TestMessage { - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { - serializer.serialize_str(&self.content) - } - } - - impl<'de> serde::Deserialize<'de> for TestMessage { - fn deserialize(deserializer: D) -> Result - where - D: serde::Deserializer<'de>, - { - let s = String::deserialize(deserializer)?; - Ok(TestMessage { content: s }) - } + fn make_test_node_builder() -> NodeBuilder { + let task_metrics = FlashblocksTaskMetrics::new(); + let metrics = Arc::new(BuilderMetrics::default()); + let ws_pub = Arc::new( + WebSocketPublisher::new( + "127.0.0.1:0".parse().unwrap(), + metrics.clone(), + &task_metrics.websocket_publisher, + None, + ) + .expect("can create test WebSocketPublisher"), + ); + NodeBuilder::new(ws_pub, metrics) } #[tokio::test] @@ -572,26 +623,26 @@ mod test { drop(_guard2); let NodeBuildResult { node: node1, outgoing_message_tx: _, incoming_message_rxs: mut rx1 } = - NodeBuilder::new() + make_test_node_builder() .with_listen_addr(format!("/ip4/127.0.0.1/tcp/{port1}").parse().unwrap()) .with_agent_version(TEST_AGENT_VERSION.to_string()) - .with_protocol(TEST_PROTOCOL) - .try_build::() + .with_protocol(types::FLASHBLOCKS_STREAM_PROTOCOL) + .try_build() .unwrap(); let NodeBuildResult { node: node2, outgoing_message_tx: tx2, incoming_message_rxs: _ } = - NodeBuilder::new() + make_test_node_builder() .with_known_peers(node1.multiaddrs()) - .with_protocol(TEST_PROTOCOL) + .with_protocol(types::FLASHBLOCKS_STREAM_PROTOCOL) .with_listen_addr(format!("/ip4/127.0.0.1/tcp/{port2}").parse().unwrap()) .with_agent_version(TEST_AGENT_VERSION.to_string()) - .try_build::() + .try_build() .unwrap(); tokio::spawn(async move { node1.run().await }); tokio::spawn(async move { node2.run().await }); - let message = TestMessage { content: "message".to_string() }; - let mut rx = rx1.remove(&TEST_PROTOCOL).unwrap(); - let recv_message: TestMessage = tokio::time::timeout(Duration::from_secs(10), async { + let message = Message::from_flashblock_payload(OpFlashblockPayload::default()); + let mut rx = rx1.remove(&types::FLASHBLOCKS_STREAM_PROTOCOL).unwrap(); + let recv_message = tokio::time::timeout(Duration::from_secs(10), async { loop { // Use try_send to avoid panicking if the channel is full // (e.g. connection not yet established and messages are buffered). @@ -607,4 +658,156 @@ mod test { .expect("message receive timed out"); assert_eq!(recv_message, message); } + + /// Creates two minimal libp2p swarms (using only `libp2p_stream::Behaviour`), + /// connects them over TCP, and returns the peer ID of A plus `libp2p_stream::Control` + /// handles for both A and B. + /// + /// Both swarms are handed off to background tokio tasks after connection is + /// established; all further interaction uses the returned controls. + async fn connected_stream_swarms() -> (PeerId, libp2p_stream::Control, libp2p_stream::Control) { + use libp2p::{futures::StreamExt as _, identity, swarm::SwarmEvent}; + + let make_swarm = || { + let keypair = identity::Keypair::generate_ed25519(); + let transport = create_transport(&keypair).unwrap(); + libp2p::SwarmBuilder::with_existing_identity(keypair) + .with_tokio() + .with_other_transport(|_| transport) + .unwrap() + .with_behaviour(|_| libp2p_stream::Behaviour::new()) + .unwrap() + .with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(30))) + .build() + }; + + let mut swarm_a = make_swarm(); + let mut swarm_b = make_swarm(); + let peer_a = *swarm_a.local_peer_id(); + let control_a = swarm_a.behaviour_mut().new_control(); + let control_b = swarm_b.behaviour_mut().new_control(); + + swarm_a.listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()).unwrap(); + + let addr_a = loop { + if let SwarmEvent::NewListenAddr { address, .. } = swarm_a.next().await.unwrap() { + break address.with(Protocol::P2p(peer_a)); + } + }; + + swarm_b.dial(addr_a).unwrap(); + + // Drive both swarms until connection is established on both sides. + let mut a_conn = false; + let mut b_conn = false; + while !(a_conn && b_conn) { + tokio::select! { + event = swarm_a.next() => { + if let Some(SwarmEvent::ConnectionEstablished { .. }) = event { + a_conn = true; + } + } + event = swarm_b.next() => { + if let Some(SwarmEvent::ConnectionEstablished { .. }) = event { + b_conn = true; + } + } + } + } + + tokio::spawn(async move { + loop { + swarm_a.next().await; + } + }); + tokio::spawn(async move { + loop { + swarm_b.next().await; + } + }); + + (peer_a, control_a, control_b) + } + + /// Verifies that when the remote end of a stream is closed, `broadcast_message` + /// returns the affected peer in `failed_peers` and evicts it from the handler. + #[tokio::test] + async fn broadcast_evicts_peer_on_stream_failure() { + use libp2p::futures::StreamExt as _; + + // A registers the protocol so B can open a stream to it. + let (peer_a, mut control_a, mut control_b) = connected_stream_swarms().await; + let mut incoming_a = control_a.accept(types::FLASHBLOCKS_STREAM_PROTOCOL).unwrap(); + + // B opens an outbound stream to A. + let stream = tokio::time::timeout( + Duration::from_secs(5), + control_b.open_stream(peer_a, types::FLASHBLOCKS_STREAM_PROTOCOL), + ) + .await + .expect("open_stream timed out") + .expect("open_stream failed"); + + // A accepts the stream and immediately drops it, simulating a remote stream close. + let (closed_tx, closed_rx) = tokio::sync::oneshot::channel::<()>(); + tokio::spawn(async move { + if let Some((_, remote_stream)) = incoming_a.next().await { + drop(remote_stream); + let _ = closed_tx.send(()); + } + }); + closed_rx.await.expect("A never accepted the stream"); + + // Give the close a moment to propagate through yamux. + tokio::time::sleep(Duration::from_millis(100)).await; + + let mut handler = outgoing::StreamsHandler::new(); + handler.insert_peer_and_stream(peer_a, types::FLASHBLOCKS_STREAM_PROTOCOL, stream); + assert!(handler.has_peer(&peer_a)); + + let msg = Message::from_flashblock_payload(OpFlashblockPayload::default()); + let failed = handler.broadcast_message(msg).await.expect("serialization must not fail"); + + assert_eq!(failed, vec![peer_a], "peer_a must be returned as a failed peer"); + assert!(!handler.has_peer(&peer_a), "peer_a must be evicted from the handler"); + } + + /// Verifies that a successful broadcast returns an empty `failed_peers` list + /// and that the peer remains in the handler. + #[tokio::test] + async fn broadcast_returns_empty_failed_peers_on_success() { + use libp2p::futures::StreamExt as _; + + // A registers the protocol so B can open a stream to it. + let (peer_a, mut control_a, mut control_b) = connected_stream_swarms().await; + let mut incoming_a = control_a.accept(types::FLASHBLOCKS_STREAM_PROTOCOL).unwrap(); + + let stream = tokio::time::timeout( + Duration::from_secs(5), + control_b.open_stream(peer_a, types::FLASHBLOCKS_STREAM_PROTOCOL), + ) + .await + .expect("open_stream timed out") + .expect("open_stream failed"); + + // A accepts the stream and keeps it alive for the duration of the test. + tokio::spawn(async move { + if let Some((_, stream)) = incoming_a.next().await { + tokio::time::sleep(Duration::from_secs(10)).await; + drop(stream); + } + }); + + let mut handler = outgoing::StreamsHandler::new(); + handler.insert_peer_and_stream(peer_a, types::FLASHBLOCKS_STREAM_PROTOCOL, stream); + + let msg = Message::from_flashblock_payload(OpFlashblockPayload::default()); + let failed = handler.broadcast_message(msg).await.expect("serialization must not fail"); + + assert!(failed.is_empty(), "no peers should fail on a healthy stream"); + assert!( + handler.has_peer(&peer_a), + "peer must remain in the handler after a successful send" + ); + } } diff --git a/crates/builder/src/p2p/outgoing.rs b/crates/builder/src/broadcast/outgoing.rs similarity index 71% rename from crates/builder/src/p2p/outgoing.rs rename to crates/builder/src/broadcast/outgoing.rs index aecb6b24..1ece75ba 100644 --- a/crates/builder/src/p2p/outgoing.rs +++ b/crates/builder/src/broadcast/outgoing.rs @@ -1,4 +1,4 @@ -use super::Message; +use super::types; use eyre::Context; use futures::stream::FuturesUnordered; use libp2p::{swarm::Stream, PeerId, StreamProtocol}; @@ -31,7 +31,10 @@ impl StreamsHandler { self.peers_to_stream.remove(peer); } - pub(crate) async fn broadcast_message(&mut self, message: M) -> eyre::Result<()> { + pub(crate) async fn broadcast_message( + &mut self, + message: types::Message, + ) -> eyre::Result> { use futures::{SinkExt as _, StreamExt as _}; use tokio_util::{ codec::{FramedWrite, LinesCodec}, @@ -39,7 +42,7 @@ impl StreamsHandler { }; let protocol = message.protocol(); - let payload = message.to_string().wrap_err("failed to serialize payload")?; + let payload = serde_json::to_string(&message).wrap_err("failed to serialize payload")?; let peers = self.peers_to_stream.keys().cloned().collect::>(); let mut futures = FuturesUnordered::new(); @@ -47,22 +50,22 @@ impl StreamsHandler { let protocol_to_stream = self.peers_to_stream.get_mut(&peer).expect("stream map must exist for peer"); let Some(stream) = protocol_to_stream.remove(&protocol) else { - warn!(target: "flashblocks-p2p", "no stream for protocol {protocol:?} to peer {peer}"); + warn!(target: "payload_builder::broadcast", "no stream for protocol {protocol:?} to peer {peer}"); continue; }; let stream = stream.compat(); let payload = payload.clone(); let fut = async move { let mut writer = FramedWrite::new(stream, LinesCodec::new()); - writer.send(payload).await.wrap_err("failed to send message to peer")?; - Ok::<(PeerId, libp2p::swarm::Stream), eyre::ErrReport>(( - peer, - writer.into_inner().into_inner(), - )) + match writer.send(payload).await { + Ok(()) => Ok((peer, writer.into_inner().into_inner())), + Err(e) => Err((peer, eyre::eyre!(e))), + } }; futures.push(fut); } + let mut failed_peers = Vec::new(); while let Some(result) = futures.next().await { match result { Ok((peer, stream)) => { @@ -72,18 +75,20 @@ impl StreamsHandler { .expect("stream map must exist for peer"); protocol_to_stream.insert(protocol.clone(), stream); } - Err(e) => { - warn!(target: "flashblocks-p2p", "failed to send payload to peer: {e:?}"); + Err((peer, e)) => { + warn!(target: "payload_builder::broadcast", "failed to send payload to peer {peer}: {e:?}"); + self.peers_to_stream.remove(&peer); + failed_peers.push(peer); } } } debug!( - target: "flashblocks-p2p", + target: "payload_builder::broadcast", "broadcasted message to {} peers", self.peers_to_stream.len() ); - Ok(()) + Ok(failed_peers) } } diff --git a/crates/builder/src/flashblocks/utils/p2p.rs b/crates/builder/src/broadcast/types.rs similarity index 87% rename from crates/builder/src/flashblocks/utils/p2p.rs rename to crates/builder/src/broadcast/types.rs index 6d616984..d842cce3 100644 --- a/crates/builder/src/flashblocks/utils/p2p.rs +++ b/crates/builder/src/broadcast/types.rs @@ -7,8 +7,8 @@ use reth_optimism_payload_builder::OpBuiltPayload as RethOpBuiltPayload; use reth_optimism_primitives::OpBlock; pub(crate) const AGENT_VERSION: &str = "flashblock-builder/1.0.0"; -pub(crate) const FLASHBLOCKS_STREAM_PROTOCOL: crate::p2p::StreamProtocol = - crate::p2p::StreamProtocol::new("/flashblocks/1.0.0"); +pub(crate) const FLASHBLOCKS_STREAM_PROTOCOL: crate::broadcast::StreamProtocol = + crate::broadcast::StreamProtocol::new("/flashblocks/1.0.0"); #[derive(Clone, Debug, PartialEq, Deserialize, Serialize)] pub(crate) enum Message { @@ -16,8 +16,8 @@ pub(crate) enum Message { OpFlashblockPayload(OpFlashblockPayload), } -impl crate::p2p::Message for Message { - fn protocol(&self) -> crate::p2p::StreamProtocol { +impl Message { + pub(crate) fn protocol(&self) -> crate::broadcast::StreamProtocol { FLASHBLOCKS_STREAM_PROTOCOL } } diff --git a/crates/builder/src/flashblocks/utils/wspub.rs b/crates/builder/src/broadcast/wspub.rs similarity index 83% rename from crates/builder/src/flashblocks/utils/wspub.rs rename to crates/builder/src/broadcast/wspub.rs index d3f29e1d..4524caed 100644 --- a/crates/builder/src/flashblocks/utils/wspub.rs +++ b/crates/builder/src/broadcast/wspub.rs @@ -70,7 +70,7 @@ impl WebSocketPublisher { // serialize only once, then just copy around only a pointer // to the serialized data for each subscription. info!( - target: "payload_builder", + target: "payload_builder::broadcast", event = "flashblock_sent", message = "Sending flashblock to subscribers", id = %payload.payload_id, @@ -93,7 +93,7 @@ impl Drop for WebSocketPublisher { fn drop(&mut self) { // Notify the listener loop to terminate let _ = self.term.send(true); - info!(target: "payload_builder", "WebSocketPublisher dropped, terminating listener loop"); + info!(target: "payload_builder::broadcast", "WebSocketPublisher dropped, terminating listener loop"); } } @@ -112,7 +112,7 @@ async fn listener_loop( .expect("Failed to convert TcpListener to tokio TcpListener"); let listen_addr = listener.local_addr().expect("Failed to get local address of listener"); - info!(target: "payload_builder", "Flashblocks WebSocketPublisher listening on {listen_addr}"); + info!(target: "payload_builder::broadcast", "Flashblocks WebSocketPublisher listening on {listen_addr}"); let mut term = term; @@ -140,7 +140,7 @@ async fn listener_loop( Ok(mut stream) => { tokio::spawn(async move { if let Some(limit) = subscriber_limit && subs.load(Ordering::Relaxed) >= limit as usize { - warn!(target: "payload_builder", "WebSocket connection for {peer_addr} rejected: subscriber limit reached"); + warn!(target: "payload_builder::broadcast", "WebSocket connection for {peer_addr} rejected: subscriber limit reached"); let _ = stream.close(Some(CloseFrame { code: CloseCode::Again, reason: "subscriber limit reached, please try again later".into(), @@ -148,17 +148,17 @@ async fn listener_loop( return; } subs.fetch_add(1, Ordering::Relaxed); - debug!(target: "payload_builder", "WebSocket connection established with {}", peer_addr); + debug!(target: "payload_builder::broadcast", "WebSocket connection established with {}", peer_addr); // Handle the WebSocket connection in a dedicated task broadcast_loop(stream, metrics, term, receiver_clone, sent).await; subs.fetch_sub(1, Ordering::Relaxed); - debug!(target: "payload_builder", "WebSocket connection closed for {}", peer_addr); + debug!(target: "payload_builder::broadcast", "WebSocket connection closed for {}", peer_addr); }); } Err(e) => { - warn!(target: "payload_builder", "Failed to accept WebSocket connection from {peer_addr}: {e}"); + warn!(target: "payload_builder::broadcast", "Failed to accept WebSocket connection from {peer_addr}: {e}"); } } } @@ -192,7 +192,7 @@ async fn broadcast_loop( // Check if the publisher is terminated _ = term.changed() => { if *term.borrow() { - info!(target: "payload_builder", "WebSocketPublisher is terminating, closing broadcast loop"); + info!(target: "payload_builder::broadcast", "WebSocketPublisher is terminating, closing broadcast loop"); return; } } @@ -205,18 +205,18 @@ async fn broadcast_loop( sent.fetch_add(1, Ordering::Relaxed); metrics.messages_sent_count.increment(1); - trace!(target: "payload_builder", "Broadcasted payload: {:?}", payload); + trace!(target: "payload_builder::broadcast", "Broadcasted payload: {:?}", payload); if let Err(e) = stream.send(Message::Text(payload)).await { - debug!(target: "payload_builder", "Send payload error for flashblocks subscription {peer_addr}: {e}"); + debug!(target: "payload_builder::broadcast", "Send payload error for flashblocks subscription {peer_addr}: {e}"); break; // Exit the loop if sending fails } } Err(RecvError::Closed) => { - debug!(target: "payload_builder", "Broadcast channel closed, exiting broadcast loop"); + debug!(target: "payload_builder::broadcast", "Broadcast channel closed, exiting broadcast loop"); return; } Err(RecvError::Lagged(_)) => { - warn!(target: "payload_builder", "Broadcast channel lagged, some messages were dropped"); + warn!(target: "payload_builder::broadcast", "Broadcast channel lagged, some messages were dropped"); } }, @@ -224,11 +224,11 @@ async fn broadcast_loop( message = stream.next() => if let Some(message) = message { match message { // We handle only close frame to highlight conn closing Ok(Message::Close(_)) => { - info!(target: "payload_builder", "Closing frame received, stopping connection for {peer_addr}"); + info!(target: "payload_builder::broadcast", "Closing frame received, stopping connection for {peer_addr}"); break; } Err(e) => { - warn!(target: "payload_builder", "Received error. Closing flashblocks subscription for {peer_addr}: {e}"); + warn!(target: "payload_builder::broadcast", "Received error. Closing flashblocks subscription for {peer_addr}: {e}"); break; } _ => (), diff --git a/crates/builder/src/flashblocks/builder.rs b/crates/builder/src/flashblocks/builder.rs index 8afb33bd..e93877e9 100644 --- a/crates/builder/src/flashblocks/builder.rs +++ b/crates/builder/src/flashblocks/builder.rs @@ -5,9 +5,7 @@ use crate::{ context::FlashblocksBuilderCtx, generator::{BlockCell, BuildArguments, PayloadBuilder}, timing::FlashblockScheduler, - utils::{ - cache::FlashblockPayloadsCache, execution::ExecutionInfo, wspub::WebSocketPublisher, - }, + utils::{cache::FlashblockPayloadsCache, execution::ExecutionInfo}, BuilderConfig, }, metrics::tokio::FlashblocksTaskMetrics, @@ -201,9 +199,6 @@ pub(super) struct FlashblocksBuilder { pub built_payload_tx: mpsc::Sender, /// Cache for externally received pending flashblocks transactions received via p2p. pub p2p_cache: FlashblockPayloadsCache, - /// WebSocket publisher for broadcasting flashblocks - /// to all connected subscribers. - pub ws_pub: Arc, /// System configuration for the builder pub config: BuilderConfig, /// The metrics for the builder @@ -229,7 +224,6 @@ impl FlashblocksBuilder { built_fb_payload_tx: mpsc::Sender, built_payload_tx: mpsc::Sender, p2p_cache: FlashblockPayloadsCache, - ws_pub: Arc, metrics: Arc, task_metrics: Arc, ) -> Self { @@ -241,7 +235,6 @@ impl FlashblocksBuilder { built_fb_payload_tx, built_payload_tx, p2p_cache, - ws_pub, config, metrics, builder_tx, @@ -421,12 +414,6 @@ where // We should always calculate state root for fallback payload let (fallback_payload, fb_payload, bundle_state, new_tx_hashes) = build_block(&mut state, &ctx, &mut info, Some(&mut fb_state), true)?; - // For X Layer - skip if replaying - if !rebuild_external_payload { - self.built_fb_payload_tx - .try_send(fb_payload.clone()) - .map_err(PayloadBuilderError::other)?; - } let mut best_payload = (fallback_payload.clone(), bundle_state); info!( @@ -438,9 +425,9 @@ where // not emitting flashblock if no_tx_pool in FCU, it's just syncing // For X Layer - skip if replaying if !ctx.attributes().no_tx_pool && !rebuild_external_payload { - let flashblock_byte_size = - self.ws_pub.publish(&fb_payload).map_err(PayloadBuilderError::other)?; - ctx.metrics.flashblock_byte_size_histogram.record(flashblock_byte_size as f64); + self.built_fb_payload_tx + .try_send(fb_payload.clone()) + .map_err(PayloadBuilderError::other)?; // For X Layer, full link monitoring support crate::flashblocks::utils::monitor::monitor( @@ -757,10 +744,6 @@ where fb_payload.index = flashblock_index; fb_payload.base = None; - let flashblock_byte_size = self - .ws_pub - .publish(&fb_payload) - .wrap_err("failed to publish flashblock via websocket")?; self.built_fb_payload_tx .try_send(fb_payload) .wrap_err("failed to send built payload to handler")?; @@ -768,7 +751,6 @@ where // Record flashblock build duration ctx.metrics.flashblock_build_duration.record(flashblock_build_start_time.elapsed()); - ctx.metrics.flashblock_byte_size_histogram.record(flashblock_byte_size as f64); ctx.metrics .flashblock_num_tx_histogram .record(info.executed_transactions.len() as f64); diff --git a/crates/builder/src/flashblocks/handler.rs b/crates/builder/src/flashblocks/handler.rs index 9ef9c078..353a5aa3 100644 --- a/crates/builder/src/flashblocks/handler.rs +++ b/crates/builder/src/flashblocks/handler.rs @@ -1,10 +1,8 @@ use crate::{ + broadcast::{wspub::WebSocketPublisher, Message}, flashblocks::{ handler_ctx::FlashblockHandlerContext, - utils::{ - cache::FlashblockPayloadsCache, execution::ExecutionInfo, p2p::Message, - wspub::WebSocketPublisher, - }, + utils::{cache::FlashblockPayloadsCache, execution::ExecutionInfo}, }, traits::ClientBounds, }; @@ -124,7 +122,7 @@ where loop { tokio::select! { Some(payload) = built_fb_payload_rx.recv() => { - // ignore error here; if p2p was disabled, the channel will be closed. + // ignore send error (broadcast node may have shut down) let _ = p2p_tx.send(Message::from_flashblock_payload(payload)).await; } Some(payload) = built_payload_rx.recv() => { @@ -133,7 +131,7 @@ where warn!(target: "payload_builder", e = ?e, "failed to send BuiltPayload event"); } if p2p_send_full_payload_flag { - // ignore error here; if p2p was disabled, the channel will be closed. + // ignore send error (broadcast node may have shut down) let _ = p2p_tx.send(Message::from_built_payload(payload)).await; } } diff --git a/crates/builder/src/flashblocks/mod.rs b/crates/builder/src/flashblocks/mod.rs index 17148b3c..635ae222 100644 --- a/crates/builder/src/flashblocks/mod.rs +++ b/crates/builder/src/flashblocks/mod.rs @@ -19,9 +19,10 @@ mod service; mod timing; pub(crate) mod utils; +pub use crate::broadcast::wspub::WebSocketPublisher; pub use context::FlashblocksBuilderCtx; pub use service::FlashblocksServiceBuilder; -pub use utils::{cache::FlashblockPayloadsCache, wspub::WebSocketPublisher}; +pub use utils::cache::FlashblockPayloadsCache; /// Configuration values that are specific to the flashblocks builder. #[derive(Debug, Clone)] @@ -54,9 +55,6 @@ pub struct FlashblocksConfig { /// This serves as a buffer time to account for the last flashblock being delayed. pub end_buffer_ms: u64, - /// Whether to enable the p2p node for flashblocks - pub p2p_enabled: bool, - /// Port for the p2p node pub p2p_port: u16, @@ -92,7 +90,6 @@ impl Default for FlashblocksConfig { number_contract_address: None, send_offset_ms: 0, end_buffer_ms: 0, - p2p_enabled: false, p2p_port: 9009, p2p_private_key_file: None, p2p_known_peers: None, @@ -212,7 +209,6 @@ impl TryFrom for BuilderConfig { number_contract_address, send_offset_ms: args.flashblocks.flashblocks_send_offset_ms, end_buffer_ms: args.flashblocks.flashblocks_end_buffer_ms, - p2p_enabled: args.flashblocks.p2p.p2p_enabled, p2p_port: args.flashblocks.p2p.p2p_port, p2p_private_key_file: args.flashblocks.p2p.p2p_private_key_file, p2p_known_peers: args.flashblocks.p2p.p2p_known_peers, diff --git a/crates/builder/src/flashblocks/service.rs b/crates/builder/src/flashblocks/service.rs index 854211bb..3db2401d 100644 --- a/crates/builder/src/flashblocks/service.rs +++ b/crates/builder/src/flashblocks/service.rs @@ -1,15 +1,12 @@ use crate::{ + broadcast::{ + types::{AGENT_VERSION, FLASHBLOCKS_STREAM_PROTOCOL}, + wspub::WebSocketPublisher, + }, flashblocks::{ - builder::FlashblocksBuilder, - builder_tx::FlashblocksBuilderTx, - generator::BlockPayloadJobGenerator, - handler::FlashblocksPayloadHandler, - handler_ctx::FlashblockHandlerContext, - utils::{ - cache::FlashblockPayloadsCache, - p2p::{Message, AGENT_VERSION, FLASHBLOCKS_STREAM_PROTOCOL}, - wspub::WebSocketPublisher, - }, + builder::FlashblocksBuilder, builder_tx::FlashblocksBuilderTx, + generator::BlockPayloadJobGenerator, handler::FlashblocksPayloadHandler, + handler_ctx::FlashblockHandlerContext, utils::cache::FlashblockPayloadsCache, BuilderConfig, }, metrics::{tokio::FlashblocksTaskMetrics, BuilderMetrics}, @@ -54,62 +51,67 @@ impl FlashblocksServiceBuilder { // this is effectively unused right now due to the usage of reth's `task_executor`. let cancel = tokio_util::sync::CancellationToken::new(); - let (incoming_message_rx, outgoing_message_tx) = if self.config.flashblocks.p2p_enabled { - let mut builder = crate::p2p::NodeBuilder::new(); - - if let Some(ref private_key_file) = self.config.flashblocks.p2p_private_key_file - && !private_key_file.is_empty() - { - let private_key_hex = std::fs::read_to_string(private_key_file) - .wrap_err_with(|| { - format!("failed to read p2p private key file: {private_key_file}") - })? - .trim() - .to_string(); - builder = builder.with_keypair_hex_string(private_key_hex); - } - - let known_peers: Vec = - if let Some(ref p2p_known_peers) = self.config.flashblocks.p2p_known_peers { - p2p_known_peers - .split(',') - .map(|s| s.to_string()) - .filter_map(|s| s.parse().ok()) - .collect() - } else { - vec![] - }; - - let crate::p2p::NodeBuildResult { node, outgoing_message_tx, mut incoming_message_rxs } = - builder - .with_agent_version(AGENT_VERSION.to_string()) - .with_protocol(FLASHBLOCKS_STREAM_PROTOCOL) - .with_known_peers(known_peers) - .with_port(self.config.flashblocks.p2p_port) - .with_cancellation_token(cancel.clone()) - .with_max_peer_count(self.config.flashblocks.p2p_max_peer_count) - .try_build::() - .wrap_err("failed to build flashblocks p2p node")?; - let multiaddrs = node.multiaddrs(); - ctx.task_executor().spawn_task(async move { - if let Err(e) = node.run().await { - tracing::error!(error = %e, "p2p node exited"); - } - }); - tracing::info!(target: "payload_builder", multiaddrs = ?multiaddrs, "flashblocks p2p node started"); - - let incoming_message_rx = incoming_message_rxs - .remove(&FLASHBLOCKS_STREAM_PROTOCOL) - .expect("flashblocks p2p protocol must be found in receiver map"); - (incoming_message_rx, outgoing_message_tx) - } else { - let (_incoming_message_tx, incoming_message_rx) = tokio::sync::mpsc::channel(16); - let (outgoing_message_tx, _outgoing_message_rx) = tokio::sync::mpsc::channel(16); - (incoming_message_rx, outgoing_message_tx) - }; - let metrics = Arc::new(BuilderMetrics::default()); let task_metrics = Arc::new(FlashblocksTaskMetrics::new()); + let ws_pub: Arc = WebSocketPublisher::new( + self.config.flashblocks.ws_addr, + metrics.clone(), + &task_metrics.websocket_publisher, + self.config.flashblocks.ws_subscriber_limit, + ) + .wrap_err("failed to create ws publisher")? + .into(); + + let mut broadcast_builder = + crate::broadcast::NodeBuilder::new(ws_pub.clone(), metrics.clone()); + + if let Some(ref private_key_file) = self.config.flashblocks.p2p_private_key_file + && !private_key_file.is_empty() + { + let private_key_hex = std::fs::read_to_string(private_key_file) + .wrap_err_with(|| { + format!("failed to read p2p private key file: {private_key_file}") + })? + .trim() + .to_string(); + broadcast_builder = broadcast_builder.with_keypair_hex_string(private_key_hex); + } + + let known_peers: Vec = + if let Some(ref p2p_known_peers) = self.config.flashblocks.p2p_known_peers { + p2p_known_peers + .split(',') + .map(|s| s.to_string()) + .filter_map(|s| s.parse().ok()) + .collect() + } else { + vec![] + }; + + let crate::broadcast::NodeBuildResult { + node, + outgoing_message_tx, + mut incoming_message_rxs, + } = broadcast_builder + .with_agent_version(AGENT_VERSION.to_string()) + .with_protocol(FLASHBLOCKS_STREAM_PROTOCOL) + .with_known_peers(known_peers) + .with_port(self.config.flashblocks.p2p_port) + .with_cancellation_token(cancel.clone()) + .with_max_peer_count(self.config.flashblocks.p2p_max_peer_count) + .try_build() + .wrap_err("failed to build flashblocks p2p node")?; + let multiaddrs = node.multiaddrs(); + ctx.task_executor().spawn_task(async move { + if let Err(e) = node.run().await { + tracing::error!(error = %e, "p2p node exited"); + } + }); + tracing::info!(target: "payload_builder", multiaddrs = ?multiaddrs, "flashblocks p2p node started"); + + let incoming_message_rx = incoming_message_rxs + .remove(&FLASHBLOCKS_STREAM_PROTOCOL) + .expect("flashblocks p2p protocol must be found in receiver map"); // Channels for built flashblock payloads let (built_fb_payload_tx, built_fb_payload_rx) = tokio::sync::mpsc::channel(16); @@ -122,14 +124,6 @@ impl FlashblocksServiceBuilder { FlashblockPayloadsCache::new(None) }; - let ws_pub: Arc = WebSocketPublisher::new( - self.config.flashblocks.ws_addr, - metrics.clone(), - &task_metrics.websocket_publisher, - self.config.flashblocks.ws_subscriber_limit, - ) - .wrap_err("failed to create ws publisher")? - .into(); let mut payload_builder = FlashblocksBuilder::new( OpEvmConfig::optimism(ctx.chain_spec()), pool, @@ -140,7 +134,6 @@ impl FlashblocksServiceBuilder { built_fb_payload_tx, built_payload_tx, p2p_cache.clone(), - ws_pub.clone(), metrics.clone(), task_metrics.clone(), ); diff --git a/crates/builder/src/flashblocks/utils/mod.rs b/crates/builder/src/flashblocks/utils/mod.rs index f1171090..b0064414 100644 --- a/crates/builder/src/flashblocks/utils/mod.rs +++ b/crates/builder/src/flashblocks/utils/mod.rs @@ -1,8 +1,6 @@ pub(crate) mod cache; pub(crate) mod execution; pub(crate) mod monitor; -pub(crate) mod p2p; -pub(crate) mod wspub; #[cfg(test)] pub(crate) mod mock; diff --git a/crates/builder/src/lib.rs b/crates/builder/src/lib.rs index 10626fed..7b5c6816 100644 --- a/crates/builder/src/lib.rs +++ b/crates/builder/src/lib.rs @@ -1,7 +1,7 @@ pub mod args; +pub(crate) mod broadcast; pub mod flashblocks; pub mod metrics; -pub(crate) mod p2p; pub(crate) mod signer; #[cfg(any(test, feature = "testing"))] pub mod tests; diff --git a/crates/intercept/src/lib.rs b/crates/intercept/src/lib.rs index e0a69d03..35b7e832 100644 --- a/crates/intercept/src/lib.rs +++ b/crates/intercept/src/lib.rs @@ -1,5 +1,5 @@ use alloy_primitives::{Address, B256, U256}; -use tracing::{debug}; +use tracing::debug; /// keccak256("BridgeEvent(uint8,uint32,address,uint32,address,uint256,bytes,uint32)") pub const BRIDGE_EVENT_SIGNATURE: B256 = B256::new([