From 56a17b2f365103b999a8d07b417fc8033a60fa4f Mon Sep 17 00:00:00 2001 From: Leandro Serra Date: Mon, 15 Sep 2025 12:07:52 -0300 Subject: [PATCH 1/6] fix(l2): proof_coordinator genserver locked by tcp listen loop --- Cargo.lock | 1 + crates/l2/Cargo.toml | 1 + crates/l2/sequencer/proof_coordinator.rs | 244 ++++++++--------------- 3 files changed, 84 insertions(+), 162 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8f1656b5931..4139e825018 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4131,6 +4131,7 @@ dependencies = [ "tabwriter", "thiserror 2.0.16", "tokio", + "tokio-stream", "tokio-util", "tracing", "tui-big-text", diff --git a/crates/l2/Cargo.toml b/crates/l2/Cargo.toml index 3aec43897d6..41ecce1de04 100644 --- a/crates/l2/Cargo.toml +++ b/crates/l2/Cargo.toml @@ -56,6 +56,7 @@ tui-scrollview = "0.5.1" tui-logger.workspace = true tabwriter = "1.4.1" color-eyre = "0.6.5" +tokio-stream = { version = "0.1.17", features = ["net"] } guest_program = { path = "./prover/src/guest_program/" } diff --git a/crates/l2/sequencer/proof_coordinator.rs b/crates/l2/sequencer/proof_coordinator.rs index 7e4c6d810e5..e7bb87e7c27 100644 --- a/crates/l2/sequencer/proof_coordinator.rs +++ b/crates/l2/sequencer/proof_coordinator.rs @@ -1,4 +1,4 @@ -use crate::sequencer::errors::{ConnectionHandlerError, ProofCoordinatorError}; +use crate::sequencer::errors::ProofCoordinatorError; use crate::sequencer::setup::{prepare_quote_prerequisites, register_tdx_key}; use crate::sequencer::utils::get_latest_sent_batch; use crate::{ @@ -17,17 +17,19 @@ use ethrex_metrics::metrics; use ethrex_rpc::clients::eth::EthClient; use ethrex_storage::Store; use ethrex_storage_rollup::StoreRollup; +use futures::StreamExt; use secp256k1::SecretKey; use serde::{Deserialize, Serialize}; use serde_with::serde_as; use spawned_concurrency::messages::Unused; -use spawned_concurrency::tasks::{CastResponse, GenServer, GenServerHandle}; -use std::net::{IpAddr, SocketAddr}; +use spawned_concurrency::tasks::{CastResponse, GenServer, GenServerHandle, spawn_listener}; +use std::net::IpAddr; use std::sync::Arc; use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, net::{TcpListener, TcpStream}, }; +use tokio_stream::wrappers::TcpListenerStream; use tracing::{debug, error, info, warn}; #[cfg(feature = "metrics")] @@ -38,7 +40,7 @@ use std::{collections::HashMap, time::SystemTime}; use tokio::sync::Mutex; #[serde_as] -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Clone)] pub struct ProverInputData { pub blocks: Vec, pub db: ExecutionWitness, @@ -53,7 +55,7 @@ pub struct ProverInputData { /// Enum for the ProverServer <--> ProverClient Communication Protocol. #[allow(clippy::large_enum_variant)] -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Clone)] pub enum ProofData { /// 1. /// The client performs any needed setup steps @@ -158,7 +160,7 @@ pub fn get_commit_hash() -> String { #[derive(Clone)] pub enum ProofCordInMessage { - Listen { listener: Arc }, + Data(ProofData, Arc), } #[derive(Clone, PartialEq)] @@ -168,8 +170,6 @@ pub enum ProofCordOutMessage { #[derive(Clone)] pub struct ProofCoordinator { - listen_ip: IpAddr, - port: u16, store: Store, eth_client: EthClient, on_chain_proposer_address: Address, @@ -217,8 +217,6 @@ impl ProofCoordinator { .to_string(); Ok(Self { - listen_ip: config.listen_ip, - port: config.listen_port, store, eth_client, on_chain_proposer_address, @@ -253,39 +251,16 @@ impl ProofCoordinator { needed_proof_types, ) .await?; - let listener = - Arc::new(TcpListener::bind(format!("{}:{}", state.listen_ip, state.port)).await?); - let mut proof_coordinator = ProofCoordinator::start(state); - let _ = proof_coordinator - .cast(ProofCordInMessage::Listen { listener }) - .await; - Ok(()) - } - async fn handle_listens(&mut self, listener: Arc) { - info!("Starting TCP server at {}:{}.", self.listen_ip, self.port); - loop { - let res = listener.accept().await; - match res { - Ok((stream, addr)) => { - // Cloning the ProofCoordinatorState structure to use the handle_connection() fn - // in every spawned task. - // The important fields are `Store` and `EthClient` - // Both fields are wrapped with an Arc, making it possible to clone - // the entire structure. - let _ = ConnectionHandler::spawn(self.clone(), stream, addr) - .await - .inspect_err(|err| { - error!("Error starting ConnectionHandler: {err}"); - }); - } - Err(e) => { - error!("Failed to accept connection: {e}"); - } - } + let proof_coordinator = ProofCoordinator::start(state); + start_prover_listener( + proof_coordinator, + cfg.proof_coordinator.listen_ip, + cfg.proof_coordinator.listen_port, + ) + .await?; - debug!("Connection closed"); - } + Ok(()) } async fn handle_request( @@ -537,133 +512,43 @@ impl GenServer for ProofCoordinator { _handle: &GenServerHandle, ) -> CastResponse { match message { - ProofCordInMessage::Listen { listener } => { - self.handle_listens(listener).await; - } - } - CastResponse::Stop - } -} - -#[derive(Clone)] -struct ConnectionHandler { - proof_coordinator: ProofCoordinator, -} - -impl ConnectionHandler { - fn new(proof_coordinator: ProofCoordinator) -> Self { - Self { proof_coordinator } - } - - async fn spawn( - proof_coordinator: ProofCoordinator, - stream: TcpStream, - addr: SocketAddr, - ) -> Result<(), ConnectionHandlerError> { - let mut connection_handler = Self::new(proof_coordinator).start(); - connection_handler - .cast(ConnInMessage::Connection { - stream: Arc::new(stream), - addr, - }) - .await - .map_err(ConnectionHandlerError::InternalError) - } - - async fn handle_connection( - &mut self, - stream: Arc, - ) -> Result<(), ProofCoordinatorError> { - let mut buffer = Vec::new(); - // TODO: This should be fixed in https://github.com/lambdaclass/ethrex/issues/3316 - // (stream should not be wrapped in an Arc) - if let Some(mut stream) = Arc::into_inner(stream) { - stream.read_to_end(&mut buffer).await?; - - let data: Result = serde_json::from_slice(&buffer); - match data { - Ok(ProofData::BatchRequest { commit_hash }) => { - if let Err(e) = self - .proof_coordinator - .handle_request(&mut stream, commit_hash) - .await - { - error!("Failed to handle BatchRequest: {e}"); + ProofCordInMessage::Data(data, stream) => { + let Some(mut stream) = Arc::into_inner(stream) else { + error!("Failed to send response to prover client failed to get stream"); + return CastResponse::NoReply; + }; + match data { + ProofData::BatchRequest { commit_hash } => { + if let Err(e) = self.handle_request(&mut stream, commit_hash).await { + error!("Failed to handle BatchRequest: {e}"); + } } - } - Ok(ProofData::ProofSubmit { - batch_number, - batch_proof, - }) => { - if let Err(e) = self - .proof_coordinator - .handle_submit(&mut stream, batch_number, batch_proof) - .await - { - error!("Failed to handle ProofSubmit: {e}"); + ProofData::ProofSubmit { + batch_number, + batch_proof, + } => { + if let Err(e) = self + .handle_submit(&mut stream, batch_number, batch_proof) + .await + { + error!("Failed to handle ProofSubmit: {e}"); + } } - } - Ok(ProofData::ProverSetup { - prover_type, - payload, - }) => { - if let Err(e) = self - .proof_coordinator - .handle_setup(&mut stream, prover_type, payload) - .await - { - error!("Failed to handle ProverSetup: {e}"); + ProofData::ProverSetup { + prover_type, + payload, + } => { + if let Err(e) = self.handle_setup(&mut stream, prover_type, payload).await { + error!("Failed to handle ProverSetup: {e}"); + } + } + _ => { + warn!("Invalid request"); } - } - Ok(_) => { - warn!("Invalid request"); - } - Err(e) => { - warn!("Failed to parse request: {e}"); - } - } - debug!("Connection closed"); - } else { - error!("Unable to use stream"); - } - Ok(()) - } -} - -#[derive(Clone)] -pub enum ConnInMessage { - Connection { - stream: Arc, - addr: SocketAddr, - }, -} - -#[derive(Clone, PartialEq)] -pub enum ConnOutMessage { - Done, -} - -impl GenServer for ConnectionHandler { - type CallMsg = Unused; - type CastMsg = ConnInMessage; - type OutMsg = ConnOutMessage; - type Error = ProofCoordinatorError; - - async fn handle_cast( - &mut self, - message: Self::CastMsg, - _handle: &GenServerHandle, - ) -> CastResponse { - match message { - ConnInMessage::Connection { stream, addr } => { - if let Err(err) = self.handle_connection(stream).await { - error!("Error handling connection from {addr}: {err}"); - } else { - debug!("Connection from {addr} handled successfully"); } } } - CastResponse::Stop + CastResponse::NoReply } } @@ -678,3 +563,38 @@ async fn send_response( .map_err(ProofCoordinatorError::ConnectionError)?; Ok(()) } + +async fn start_prover_listener( + proof_coordinator: GenServerHandle, + ip: IpAddr, + port: u16, +) -> std::io::Result<()> { + let listener = TcpListener::bind(format!("{ip}:{port}")).await?; + let stream = TcpListenerStream::new(listener); + let stream = stream.filter_map(async |x| match x { + Ok(mut stream) => { + let mut buffer = Vec::new(); + stream + .read_to_end(&mut buffer) + .await + .inspect_err(|err| error!("Failed to read from tcp stream: {err}")) + .ok()?; + + let data: Result = serde_json::from_slice(&buffer); + + if let Ok(data) = data { + Some(ProofCordInMessage::Data(data, Arc::new(stream))) + } else { + error!("Invalid request data"); + None + } + } + Err(e) => { + error!("{}", e); + None + } + }); + info!("Starting TCP server at {ip}:{port}."); + spawn_listener(proof_coordinator, stream); + Ok(()) +} From 762f6116ae5b0abd5ef6d3eeccd3bdaf17a6b006 Mon Sep 17 00:00:00 2001 From: Leandro Serra Date: Mon, 15 Sep 2025 12:56:22 -0300 Subject: [PATCH 2/6] style(l2): clean up stream.filter_map closure --- crates/l2/sequencer/proof_coordinator.rs | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/crates/l2/sequencer/proof_coordinator.rs b/crates/l2/sequencer/proof_coordinator.rs index e7bb87e7c27..9c98210b92f 100644 --- a/crates/l2/sequencer/proof_coordinator.rs +++ b/crates/l2/sequencer/proof_coordinator.rs @@ -571,7 +571,7 @@ async fn start_prover_listener( ) -> std::io::Result<()> { let listener = TcpListener::bind(format!("{ip}:{port}")).await?; let stream = TcpListenerStream::new(listener); - let stream = stream.filter_map(async |x| match x { + let stream = stream.filter_map(async |result| match result { Ok(mut stream) => { let mut buffer = Vec::new(); stream @@ -580,14 +580,10 @@ async fn start_prover_listener( .inspect_err(|err| error!("Failed to read from tcp stream: {err}")) .ok()?; - let data: Result = serde_json::from_slice(&buffer); - - if let Ok(data) = data { - Some(ProofCordInMessage::Data(data, Arc::new(stream))) - } else { - error!("Invalid request data"); - None - } + serde_json::from_slice(&buffer) + .map(|data| ProofCordInMessage::Data(data, Arc::new(stream))) + .inspect_err(|err| error!("Failed to deserialize data: {}", err)) + .ok() } Err(e) => { error!("{}", e); From 1188074b06b8fd4b27e9a9df05a1dfabe05ee83d Mon Sep 17 00:00:00 2001 From: Leandro Serra Date: Mon, 15 Sep 2025 16:35:29 -0300 Subject: [PATCH 3/6] style(l2): rename ProofCordInMessage Data to Request --- crates/l2/sequencer/proof_coordinator.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/l2/sequencer/proof_coordinator.rs b/crates/l2/sequencer/proof_coordinator.rs index 9c98210b92f..c87407f2cbc 100644 --- a/crates/l2/sequencer/proof_coordinator.rs +++ b/crates/l2/sequencer/proof_coordinator.rs @@ -160,7 +160,7 @@ pub fn get_commit_hash() -> String { #[derive(Clone)] pub enum ProofCordInMessage { - Data(ProofData, Arc), + Request(ProofData, Arc), } #[derive(Clone, PartialEq)] @@ -512,7 +512,7 @@ impl GenServer for ProofCoordinator { _handle: &GenServerHandle, ) -> CastResponse { match message { - ProofCordInMessage::Data(data, stream) => { + ProofCordInMessage::Request(data, stream) => { let Some(mut stream) = Arc::into_inner(stream) else { error!("Failed to send response to prover client failed to get stream"); return CastResponse::NoReply; @@ -581,7 +581,7 @@ async fn start_prover_listener( .ok()?; serde_json::from_slice(&buffer) - .map(|data| ProofCordInMessage::Data(data, Arc::new(stream))) + .map(|data| ProofCordInMessage::Request(data, Arc::new(stream))) .inspect_err(|err| error!("Failed to deserialize data: {}", err)) .ok() } From a3c308bf4faed00c5fc6848b8a2c42eab69228dc Mon Sep 17 00:00:00 2001 From: Leandro Serra Date: Tue, 16 Sep 2025 17:03:19 -0300 Subject: [PATCH 4/6] fix(l2): don't use a single tcp stream for conections --- crates/l2/sequencer/proof_coordinator.rs | 70 ++++++++++++++---------- 1 file changed, 40 insertions(+), 30 deletions(-) diff --git a/crates/l2/sequencer/proof_coordinator.rs b/crates/l2/sequencer/proof_coordinator.rs index c87407f2cbc..68bef8f69c6 100644 --- a/crates/l2/sequencer/proof_coordinator.rs +++ b/crates/l2/sequencer/proof_coordinator.rs @@ -17,19 +17,17 @@ use ethrex_metrics::metrics; use ethrex_rpc::clients::eth::EthClient; use ethrex_storage::Store; use ethrex_storage_rollup::StoreRollup; -use futures::StreamExt; use secp256k1::SecretKey; use serde::{Deserialize, Serialize}; use serde_with::serde_as; use spawned_concurrency::messages::Unused; -use spawned_concurrency::tasks::{CastResponse, GenServer, GenServerHandle, spawn_listener}; +use spawned_concurrency::tasks::{CastResponse, GenServer, GenServerHandle}; use std::net::IpAddr; use std::sync::Arc; use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, net::{TcpListener, TcpStream}, }; -use tokio_stream::wrappers::TcpListenerStream; use tracing::{debug, error, info, warn}; #[cfg(feature = "metrics")] @@ -253,12 +251,11 @@ impl ProofCoordinator { .await?; let proof_coordinator = ProofCoordinator::start(state); - start_prover_listener( + spawned_rt::tasks::spawn(start_prover_listener( proof_coordinator, cfg.proof_coordinator.listen_ip, cfg.proof_coordinator.listen_port, - ) - .await?; + )); Ok(()) } @@ -565,32 +562,45 @@ async fn send_response( } async fn start_prover_listener( - proof_coordinator: GenServerHandle, + mut proof_coordinator: GenServerHandle, ip: IpAddr, port: u16, -) -> std::io::Result<()> { - let listener = TcpListener::bind(format!("{ip}:{port}")).await?; - let stream = TcpListenerStream::new(listener); - let stream = stream.filter_map(async |result| match result { - Ok(mut stream) => { - let mut buffer = Vec::new(); - stream - .read_to_end(&mut buffer) - .await - .inspect_err(|err| error!("Failed to read from tcp stream: {err}")) - .ok()?; +) -> Result<(), ProofCoordinatorError> { + info!("Starting TCP server at {ip}:{port}."); + let listener = TcpListener::bind(format!("{ip}:{port}")) + .await + .map_err(|e| { + ProofCoordinatorError::InternalError(format!("Failed to bind tcp socker: {e}")) + })?; + loop { + match listener.accept().await { + Ok((mut stream, _address)) => { + let mut buffer = Vec::new(); + if stream + .read_to_end(&mut buffer) + .await + .inspect_err(|err| error!("Failed to read from tcp stream: {err}")) + .is_err() + { + continue; + } - serde_json::from_slice(&buffer) - .map(|data| ProofCordInMessage::Request(data, Arc::new(stream))) - .inspect_err(|err| error!("Failed to deserialize data: {}", err)) - .ok() - } - Err(e) => { - error!("{}", e); - None + let Ok(message) = serde_json::from_slice(&buffer) + .map(|data| ProofCordInMessage::Request(data, Arc::new(stream))) + .inspect_err(|err| error!("Failed to deserialize data: {}", err)) + else { + continue; + }; + + proof_coordinator + .cast(message) + .await + .inspect_err(|e| error!("Failed to send cast message to proof coordinator {e}")) + .map_err(|e| ProofCoordinatorError::InternalError(e.to_string()))?; + } + Err(err) => { + error!("Error while accepting tpc connection {err}"); + } } - }); - info!("Starting TCP server at {ip}:{port}."); - spawn_listener(proof_coordinator, stream); - Ok(()) + } } From de46a220e19a5242797a36ec13f95b031e3c6301 Mon Sep 17 00:00:00 2001 From: Leandro Serra Date: Wed, 17 Sep 2025 12:02:58 -0300 Subject: [PATCH 5/6] fix(l2): spawn a task to handle the request --- crates/l2/sequencer/proof_coordinator.rs | 47 +++++++++++++----------- 1 file changed, 25 insertions(+), 22 deletions(-) diff --git a/crates/l2/sequencer/proof_coordinator.rs b/crates/l2/sequencer/proof_coordinator.rs index 35f0a5788c1..d4aa6116c0e 100644 --- a/crates/l2/sequencer/proof_coordinator.rs +++ b/crates/l2/sequencer/proof_coordinator.rs @@ -562,7 +562,7 @@ async fn send_response( } async fn start_prover_listener( - mut proof_coordinator: GenServerHandle, + proof_coordinator: GenServerHandle, ip: IpAddr, port: u16, ) -> Result<(), ProofCoordinatorError> { @@ -575,28 +575,31 @@ async fn start_prover_listener( loop { match listener.accept().await { Ok((mut stream, _address)) => { - let mut buffer = Vec::new(); - if stream - .read_to_end(&mut buffer) - .await - .inspect_err(|err| error!("Failed to read from tcp stream: {err}")) - .is_err() - { - continue; - } - - let Ok(message) = serde_json::from_slice(&buffer) - .map(|data| ProofCordInMessage::Request(data, Arc::new(stream))) - .inspect_err(|err| error!("Failed to deserialize data: {}", err)) - else { - continue; - }; + let mut proof_coordinator = proof_coordinator.clone(); + spawned_rt::tasks::spawn(async move { + let mut buffer = Vec::new(); + if stream + .read_to_end(&mut buffer) + .await + .inspect_err(|err: &std::io::Error| { + error!("Failed to read from tcp stream: {err}") + }) + .is_err() + { + return; + } - proof_coordinator - .cast(message) - .await - .inspect_err(|e| error!("Failed to send cast message to proof coordinator {e}")) - .map_err(|e| ProofCoordinatorError::InternalError(e.to_string()))?; + let Ok(message) = serde_json::from_slice(&buffer) + .map(|data| ProofCordInMessage::Request(data, Arc::new(stream))) + .inspect_err(|err| error!("Failed to deserialize data: {}", err)) + else { + return; + }; + + let _ = proof_coordinator.cast(message).await.inspect_err(|e| { + error!("Failed to send cast message to proof coordinator {e}") + }); + }); } Err(err) => { error!("Error while accepting tpc connection {err}"); From 0acac57dbbb3f113cf0173186b7361810cf30e73 Mon Sep 17 00:00:00 2001 From: Leandro Serra Date: Wed, 17 Sep 2025 16:14:33 -0300 Subject: [PATCH 6/6] fix(l2): remove tokio-stream as a dep from l2 --- Cargo.lock | 1 - crates/l2/Cargo.toml | 1 - 2 files changed, 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 35b107781d0..05b9142329e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4187,7 +4187,6 @@ dependencies = [ "tabwriter", "thiserror 2.0.16", "tokio", - "tokio-stream", "tokio-util", "tracing", "tui-big-text", diff --git a/crates/l2/Cargo.toml b/crates/l2/Cargo.toml index 26241b5f036..dc5e6417e25 100644 --- a/crates/l2/Cargo.toml +++ b/crates/l2/Cargo.toml @@ -56,7 +56,6 @@ tui-scrollview = "0.5.1" tui-logger.workspace = true tabwriter = "1.4.1" color-eyre = "0.6.5" -tokio-stream = { version = "0.1.17", features = ["net"] } axum.workspace = true guest_program = { path = "./prover/src/guest_program/" }