Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
251 changes: 90 additions & 161 deletions crates/l2/sequencer/proof_coordinator.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::sequencer::errors::{ConnectionHandlerError, ProofCoordinatorError};
use crate::sequencer::errors::ProofCoordinatorError;
use crate::sequencer::setup::{prepare_quote_prerequisites, register_tdx_key};
use crate::sequencer::utils::get_latest_sent_batch;
use crate::{
Expand All @@ -22,7 +22,7 @@ use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use spawned_concurrency::messages::Unused;
use spawned_concurrency::tasks::{CastResponse, GenServer, GenServerHandle};
use std::net::{IpAddr, SocketAddr};
use std::net::IpAddr;
use std::sync::Arc;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
Expand All @@ -38,7 +38,7 @@ use std::{collections::HashMap, time::SystemTime};
use tokio::sync::Mutex;

#[serde_as]
#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Clone)]
pub struct ProverInputData {
pub blocks: Vec<Block>,
pub execution_witness: ExecutionWitness,
Expand All @@ -53,7 +53,7 @@ pub struct ProverInputData {

/// Enum for the ProverServer <--> ProverClient Communication Protocol.
#[allow(clippy::large_enum_variant)]
#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Clone)]
pub enum ProofData {
/// 1.
/// The client performs any needed setup steps
Expand Down Expand Up @@ -158,7 +158,7 @@ pub fn get_commit_hash() -> String {

#[derive(Clone)]
pub enum ProofCordInMessage {
Listen { listener: Arc<TcpListener> },
Request(ProofData, Arc<TcpStream>),
}

#[derive(Clone, PartialEq)]
Expand All @@ -168,8 +168,6 @@ pub enum ProofCordOutMessage {

#[derive(Clone)]
pub struct ProofCoordinator {
listen_ip: IpAddr,
port: u16,
store: Store,
eth_client: EthClient,
on_chain_proposer_address: Address,
Expand Down Expand Up @@ -217,8 +215,6 @@ impl ProofCoordinator {
.to_string();

Ok(Self {
listen_ip: config.listen_ip,
port: config.listen_port,
store,
eth_client,
on_chain_proposer_address,
Expand Down Expand Up @@ -253,39 +249,15 @@ impl ProofCoordinator {
needed_proof_types,
)
.await?;
let listener =
Arc::new(TcpListener::bind(format!("{}:{}", state.listen_ip, state.port)).await?);
let mut proof_coordinator = ProofCoordinator::start(state);
let _ = proof_coordinator
.cast(ProofCordInMessage::Listen { listener })
.await;
Ok(())
}

async fn handle_listens(&mut self, listener: Arc<TcpListener>) {
info!("Starting TCP server at {}:{}.", self.listen_ip, self.port);
loop {
let res = listener.accept().await;
match res {
Ok((stream, addr)) => {
// Cloning the ProofCoordinatorState structure to use the handle_connection() fn
// in every spawned task.
// The important fields are `Store` and `EthClient`
// Both fields are wrapped with an Arc, making it possible to clone
// the entire structure.
let _ = ConnectionHandler::spawn(self.clone(), stream, addr)
.await
.inspect_err(|err| {
error!("Error starting ConnectionHandler: {err}");
});
}
Err(e) => {
error!("Failed to accept connection: {e}");
}
}
let proof_coordinator = ProofCoordinator::start(state);
spawned_rt::tasks::spawn(start_prover_listener(
proof_coordinator,
cfg.proof_coordinator.listen_ip,
cfg.proof_coordinator.listen_port,
));

debug!("Connection closed");
}
Ok(())
}

async fn handle_request(
Expand Down Expand Up @@ -537,133 +509,43 @@ impl GenServer for ProofCoordinator {
_handle: &GenServerHandle<Self>,
) -> CastResponse {
match message {
ProofCordInMessage::Listen { listener } => {
self.handle_listens(listener).await;
}
}
CastResponse::Stop
}
}

#[derive(Clone)]
struct ConnectionHandler {
proof_coordinator: ProofCoordinator,
}

impl ConnectionHandler {
fn new(proof_coordinator: ProofCoordinator) -> Self {
Self { proof_coordinator }
}

async fn spawn(
proof_coordinator: ProofCoordinator,
stream: TcpStream,
addr: SocketAddr,
) -> Result<(), ConnectionHandlerError> {
let mut connection_handler = Self::new(proof_coordinator).start();
connection_handler
.cast(ConnInMessage::Connection {
stream: Arc::new(stream),
addr,
})
.await
.map_err(ConnectionHandlerError::InternalError)
}

async fn handle_connection(
&mut self,
stream: Arc<TcpStream>,
) -> Result<(), ProofCoordinatorError> {
let mut buffer = Vec::new();
// TODO: This should be fixed in https://github.com/lambdaclass/ethrex/issues/3316
// (stream should not be wrapped in an Arc)
if let Some(mut stream) = Arc::into_inner(stream) {
stream.read_to_end(&mut buffer).await?;

let data: Result<ProofData, _> = serde_json::from_slice(&buffer);
match data {
Ok(ProofData::BatchRequest { commit_hash }) => {
if let Err(e) = self
.proof_coordinator
.handle_request(&mut stream, commit_hash)
.await
{
error!("Failed to handle BatchRequest: {e}");
ProofCordInMessage::Request(data, stream) => {
let Some(mut stream) = Arc::into_inner(stream) else {
error!("Failed to send response to prover client failed to get stream");
return CastResponse::NoReply;
};
match data {
ProofData::BatchRequest { commit_hash } => {
if let Err(e) = self.handle_request(&mut stream, commit_hash).await {
error!("Failed to handle BatchRequest: {e}");
}
}
}
Ok(ProofData::ProofSubmit {
batch_number,
batch_proof,
}) => {
if let Err(e) = self
.proof_coordinator
.handle_submit(&mut stream, batch_number, batch_proof)
.await
{
error!("Failed to handle ProofSubmit: {e}");
ProofData::ProofSubmit {
batch_number,
batch_proof,
} => {
if let Err(e) = self
.handle_submit(&mut stream, batch_number, batch_proof)
.await
{
error!("Failed to handle ProofSubmit: {e}");
}
}
}
Ok(ProofData::ProverSetup {
prover_type,
payload,
}) => {
if let Err(e) = self
.proof_coordinator
.handle_setup(&mut stream, prover_type, payload)
.await
{
error!("Failed to handle ProverSetup: {e}");
ProofData::ProverSetup {
prover_type,
payload,
} => {
if let Err(e) = self.handle_setup(&mut stream, prover_type, payload).await {
error!("Failed to handle ProverSetup: {e}");
}
}
_ => {
warn!("Invalid request");
}
}
Ok(_) => {
warn!("Invalid request");
}
Err(e) => {
warn!("Failed to parse request: {e}");
}
}
debug!("Connection closed");
} else {
error!("Unable to use stream");
}
Ok(())
}
}

#[derive(Clone)]
pub enum ConnInMessage {
Connection {
stream: Arc<TcpStream>,
addr: SocketAddr,
},
}

#[derive(Clone, PartialEq)]
pub enum ConnOutMessage {
Done,
}

impl GenServer for ConnectionHandler {
type CallMsg = Unused;
type CastMsg = ConnInMessage;
type OutMsg = ConnOutMessage;
type Error = ProofCoordinatorError;

async fn handle_cast(
&mut self,
message: Self::CastMsg,
_handle: &GenServerHandle<Self>,
) -> CastResponse {
match message {
ConnInMessage::Connection { stream, addr } => {
if let Err(err) = self.handle_connection(stream).await {
error!("Error handling connection from {addr}: {err}");
} else {
debug!("Connection from {addr} handled successfully");
}
}
}
CastResponse::Stop
CastResponse::NoReply
}
}

Expand All @@ -678,3 +560,50 @@ async fn send_response(
.map_err(ProofCoordinatorError::ConnectionError)?;
Ok(())
}

async fn start_prover_listener(
proof_coordinator: GenServerHandle<ProofCoordinator>,
ip: IpAddr,
port: u16,
) -> Result<(), ProofCoordinatorError> {
info!("Starting TCP server at {ip}:{port}.");
let listener = TcpListener::bind(format!("{ip}:{port}"))
.await
.map_err(|e| {
ProofCoordinatorError::InternalError(format!("Failed to bind tcp socker: {e}"))
})?;
loop {
match listener.accept().await {
Ok((mut stream, _address)) => {
let mut proof_coordinator = proof_coordinator.clone();
spawned_rt::tasks::spawn(async move {
let mut buffer = Vec::new();
if stream
.read_to_end(&mut buffer)
.await
.inspect_err(|err: &std::io::Error| {
error!("Failed to read from tcp stream: {err}")
})
.is_err()
{
return;
}

let Ok(message) = serde_json::from_slice(&buffer)
.map(|data| ProofCordInMessage::Request(data, Arc::new(stream)))
.inspect_err(|err| error!("Failed to deserialize data: {}", err))
else {
return;
};

let _ = proof_coordinator.cast(message).await.inspect_err(|e| {
error!("Failed to send cast message to proof coordinator {e}")
});
});
}
Err(err) => {
error!("Error while accepting tpc connection {err}");
}
}
}
}
Loading