diff --git a/crates/l2/sequencer/proof_coordinator.rs b/crates/l2/sequencer/proof_coordinator.rs index ed8af713494..d4aa6116c0e 100644 --- a/crates/l2/sequencer/proof_coordinator.rs +++ b/crates/l2/sequencer/proof_coordinator.rs @@ -1,4 +1,4 @@ -use crate::sequencer::errors::{ConnectionHandlerError, ProofCoordinatorError}; +use crate::sequencer::errors::ProofCoordinatorError; use crate::sequencer::setup::{prepare_quote_prerequisites, register_tdx_key}; use crate::sequencer::utils::get_latest_sent_batch; use crate::{ @@ -22,7 +22,7 @@ use serde::{Deserialize, Serialize}; use serde_with::serde_as; use spawned_concurrency::messages::Unused; use spawned_concurrency::tasks::{CastResponse, GenServer, GenServerHandle}; -use std::net::{IpAddr, SocketAddr}; +use std::net::IpAddr; use std::sync::Arc; use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, @@ -38,7 +38,7 @@ use std::{collections::HashMap, time::SystemTime}; use tokio::sync::Mutex; #[serde_as] -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Clone)] pub struct ProverInputData { pub blocks: Vec, pub execution_witness: ExecutionWitness, @@ -53,7 +53,7 @@ pub struct ProverInputData { /// Enum for the ProverServer <--> ProverClient Communication Protocol. #[allow(clippy::large_enum_variant)] -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Clone)] pub enum ProofData { /// 1. /// The client performs any needed setup steps @@ -158,7 +158,7 @@ pub fn get_commit_hash() -> String { #[derive(Clone)] pub enum ProofCordInMessage { - Listen { listener: Arc }, + Request(ProofData, Arc), } #[derive(Clone, PartialEq)] @@ -168,8 +168,6 @@ pub enum ProofCordOutMessage { #[derive(Clone)] pub struct ProofCoordinator { - listen_ip: IpAddr, - port: u16, store: Store, eth_client: EthClient, on_chain_proposer_address: Address, @@ -217,8 +215,6 @@ impl ProofCoordinator { .to_string(); Ok(Self { - listen_ip: config.listen_ip, - port: config.listen_port, store, eth_client, on_chain_proposer_address, @@ -253,39 +249,15 @@ impl ProofCoordinator { needed_proof_types, ) .await?; - let listener = - Arc::new(TcpListener::bind(format!("{}:{}", state.listen_ip, state.port)).await?); - let mut proof_coordinator = ProofCoordinator::start(state); - let _ = proof_coordinator - .cast(ProofCordInMessage::Listen { listener }) - .await; - Ok(()) - } - async fn handle_listens(&mut self, listener: Arc) { - info!("Starting TCP server at {}:{}.", self.listen_ip, self.port); - loop { - let res = listener.accept().await; - match res { - Ok((stream, addr)) => { - // Cloning the ProofCoordinatorState structure to use the handle_connection() fn - // in every spawned task. - // The important fields are `Store` and `EthClient` - // Both fields are wrapped with an Arc, making it possible to clone - // the entire structure. - let _ = ConnectionHandler::spawn(self.clone(), stream, addr) - .await - .inspect_err(|err| { - error!("Error starting ConnectionHandler: {err}"); - }); - } - Err(e) => { - error!("Failed to accept connection: {e}"); - } - } + let proof_coordinator = ProofCoordinator::start(state); + spawned_rt::tasks::spawn(start_prover_listener( + proof_coordinator, + cfg.proof_coordinator.listen_ip, + cfg.proof_coordinator.listen_port, + )); - debug!("Connection closed"); - } + Ok(()) } async fn handle_request( @@ -537,133 +509,43 @@ impl GenServer for ProofCoordinator { _handle: &GenServerHandle, ) -> CastResponse { match message { - ProofCordInMessage::Listen { listener } => { - self.handle_listens(listener).await; - } - } - CastResponse::Stop - } -} - -#[derive(Clone)] -struct ConnectionHandler { - proof_coordinator: ProofCoordinator, -} - -impl ConnectionHandler { - fn new(proof_coordinator: ProofCoordinator) -> Self { - Self { proof_coordinator } - } - - async fn spawn( - proof_coordinator: ProofCoordinator, - stream: TcpStream, - addr: SocketAddr, - ) -> Result<(), ConnectionHandlerError> { - let mut connection_handler = Self::new(proof_coordinator).start(); - connection_handler - .cast(ConnInMessage::Connection { - stream: Arc::new(stream), - addr, - }) - .await - .map_err(ConnectionHandlerError::InternalError) - } - - async fn handle_connection( - &mut self, - stream: Arc, - ) -> Result<(), ProofCoordinatorError> { - let mut buffer = Vec::new(); - // TODO: This should be fixed in https://github.com/lambdaclass/ethrex/issues/3316 - // (stream should not be wrapped in an Arc) - if let Some(mut stream) = Arc::into_inner(stream) { - stream.read_to_end(&mut buffer).await?; - - let data: Result = serde_json::from_slice(&buffer); - match data { - Ok(ProofData::BatchRequest { commit_hash }) => { - if let Err(e) = self - .proof_coordinator - .handle_request(&mut stream, commit_hash) - .await - { - error!("Failed to handle BatchRequest: {e}"); + ProofCordInMessage::Request(data, stream) => { + let Some(mut stream) = Arc::into_inner(stream) else { + error!("Failed to send response to prover client failed to get stream"); + return CastResponse::NoReply; + }; + match data { + ProofData::BatchRequest { commit_hash } => { + if let Err(e) = self.handle_request(&mut stream, commit_hash).await { + error!("Failed to handle BatchRequest: {e}"); + } } - } - Ok(ProofData::ProofSubmit { - batch_number, - batch_proof, - }) => { - if let Err(e) = self - .proof_coordinator - .handle_submit(&mut stream, batch_number, batch_proof) - .await - { - error!("Failed to handle ProofSubmit: {e}"); + ProofData::ProofSubmit { + batch_number, + batch_proof, + } => { + if let Err(e) = self + .handle_submit(&mut stream, batch_number, batch_proof) + .await + { + error!("Failed to handle ProofSubmit: {e}"); + } } - } - Ok(ProofData::ProverSetup { - prover_type, - payload, - }) => { - if let Err(e) = self - .proof_coordinator - .handle_setup(&mut stream, prover_type, payload) - .await - { - error!("Failed to handle ProverSetup: {e}"); + ProofData::ProverSetup { + prover_type, + payload, + } => { + if let Err(e) = self.handle_setup(&mut stream, prover_type, payload).await { + error!("Failed to handle ProverSetup: {e}"); + } + } + _ => { + warn!("Invalid request"); } - } - Ok(_) => { - warn!("Invalid request"); - } - Err(e) => { - warn!("Failed to parse request: {e}"); - } - } - debug!("Connection closed"); - } else { - error!("Unable to use stream"); - } - Ok(()) - } -} - -#[derive(Clone)] -pub enum ConnInMessage { - Connection { - stream: Arc, - addr: SocketAddr, - }, -} - -#[derive(Clone, PartialEq)] -pub enum ConnOutMessage { - Done, -} - -impl GenServer for ConnectionHandler { - type CallMsg = Unused; - type CastMsg = ConnInMessage; - type OutMsg = ConnOutMessage; - type Error = ProofCoordinatorError; - - async fn handle_cast( - &mut self, - message: Self::CastMsg, - _handle: &GenServerHandle, - ) -> CastResponse { - match message { - ConnInMessage::Connection { stream, addr } => { - if let Err(err) = self.handle_connection(stream).await { - error!("Error handling connection from {addr}: {err}"); - } else { - debug!("Connection from {addr} handled successfully"); } } } - CastResponse::Stop + CastResponse::NoReply } } @@ -678,3 +560,50 @@ async fn send_response( .map_err(ProofCoordinatorError::ConnectionError)?; Ok(()) } + +async fn start_prover_listener( + proof_coordinator: GenServerHandle, + ip: IpAddr, + port: u16, +) -> Result<(), ProofCoordinatorError> { + info!("Starting TCP server at {ip}:{port}."); + let listener = TcpListener::bind(format!("{ip}:{port}")) + .await + .map_err(|e| { + ProofCoordinatorError::InternalError(format!("Failed to bind tcp socker: {e}")) + })?; + loop { + match listener.accept().await { + Ok((mut stream, _address)) => { + let mut proof_coordinator = proof_coordinator.clone(); + spawned_rt::tasks::spawn(async move { + let mut buffer = Vec::new(); + if stream + .read_to_end(&mut buffer) + .await + .inspect_err(|err: &std::io::Error| { + error!("Failed to read from tcp stream: {err}") + }) + .is_err() + { + return; + } + + let Ok(message) = serde_json::from_slice(&buffer) + .map(|data| ProofCordInMessage::Request(data, Arc::new(stream))) + .inspect_err(|err| error!("Failed to deserialize data: {}", err)) + else { + return; + }; + + let _ = proof_coordinator.cast(message).await.inspect_err(|e| { + error!("Failed to send cast message to proof coordinator {e}") + }); + }); + } + Err(err) => { + error!("Error while accepting tpc connection {err}"); + } + } + } +}