Skip to content

chore(l1): preventing errors to propagate to spawned #3840

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Aug 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
d65a999
Errors on connection handshake and initialization now are logged in e…
ElFantasma Jul 25, 2025
ead13d6
Updated tee/quote-gen Cargo.lock
ElFantasma Jul 25, 2025
7a14ce4
Merge branch 'main' into catch_init_errors_on_connection
ElFantasma Jul 25, 2025
8218562
Fixed linting issue
ElFantasma Jul 25, 2025
f36f07d
Merge branch 'main' into catch_init_errors_on_connection
ElFantasma Jul 29, 2025
57c5b41
Bumped spawned version
ElFantasma Jul 29, 2025
04ad563
Handling teardown Broken Pipe error
ElFantasma Jul 30, 2025
9c196cc
Merge branch 'main' into catch_init_errors_on_connection
ElFantasma Jul 30, 2025
e70f24b
Merge branch 'main' into catch_init_errors_on_connection
ElFantasma Jul 31, 2025
b0d92f1
Merge branch 'main' into catch_init_errors_on_connection
ElFantasma Jul 31, 2025
024e9d0
Merge branch 'main' into catch_init_errors_on_connection
ElFantasma Jul 31, 2025
1fc57b4
Merge branch 'main' into catch_init_errors_on_connection
ElFantasma Jul 31, 2025
4660a33
Merge branch 'main' into catch_init_errors_on_connection
ElFantasma Aug 1, 2025
834154e
Merge branch 'main' into catch_init_errors_on_connection
ElFantasma Aug 1, 2025
a1e52a7
Merge branch 'main' into catch_init_errors_on_connection
ElFantasma Aug 2, 2025
a32c49b
addressed review comment
ElFantasma Aug 4, 2025
b71ac27
addressed review comment
ElFantasma Aug 4, 2025
715042b
Merge branch 'main' into catch_init_errors_on_connection
ElFantasma Aug 4, 2025
24b1527
Merge branch 'main' into catch_init_errors_on_connection
ElFantasma Aug 4, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ url = "2.5.4"
kzg-rs = "0.2.6"
libsql = "0.9.10"
futures = "0.3.31"
spawned-concurrency = "0.2.1"
spawned-rt = "0.2.1"
spawned-concurrency = "0.2.2"
spawned-rt = "0.2.2"
lambdaworks-crypto = "0.11.0"
tui-logger = { version = "0.17.3", features = ["tracing-support"] }
rayon = "1.10.0"
Expand Down
11 changes: 2 additions & 9 deletions cmd/ethrex/initializers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use std::{
fs,
net::{Ipv4Addr, SocketAddr},
path::{Path, PathBuf},
str::FromStr,
sync::Arc,
time::{SystemTime, UNIX_EPOCH},
};
Expand All @@ -33,14 +32,8 @@ use tracing_subscriber::{EnvFilter, FmtSubscriber, filter::Directive};

pub fn init_tracing(opts: &Options) {
let log_filter = EnvFilter::builder()
.with_default_directive(
// Filters all spawned logs
// TODO: revert #3467 when error logs are no longer emitted
Directive::from_str("spawned_concurrency::tasks::gen_server=off")
.expect("this can't fail"),
)
.from_env_lossy()
.add_directive(Directive::from(opts.log_level));
.with_default_directive(Directive::from(opts.log_level))
.from_env_lossy();
let subscriber = FmtSubscriber::builder()
.with_env_filter(log_filter)
.finish();
Expand Down
9 changes: 6 additions & 3 deletions crates/l2/monitor/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ use ratatui::{
};
use spawned_concurrency::{
messages::Unused,
tasks::{CastResponse, GenServer, GenServerHandle, send_interval, spawn_listener},
tasks::{
CastResponse, GenServer, GenServerHandle, InitResult, Success, send_interval,
spawn_listener,
},
};
use std::io;
use std::sync::Arc;
Expand Down Expand Up @@ -109,7 +112,7 @@ impl GenServer for EthrexMonitor {
type OutMsg = OutMessage;
type Error = MonitorError;

async fn init(self, handle: &GenServerHandle<Self>) -> Result<Self, Self::Error> {
async fn init(self, handle: &GenServerHandle<Self>) -> Result<InitResult<Self>, Self::Error> {
// Tick handling
send_interval(
Duration::from_millis(self.widget.cfg.tick_rate),
Expand All @@ -122,7 +125,7 @@ impl GenServer for EthrexMonitor {
|event: Event| Self::CastMsg::Event(event),
EventStream::new(),
);
Ok(self)
Ok(Success(self))
}

async fn handle_cast(
Expand Down
8 changes: 5 additions & 3 deletions crates/l2/sequencer/l1_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ use ethrex_rpc::{
use ethrex_storage::Store;
use keccak_hash::keccak;
use spawned_concurrency::messages::Unused;
use spawned_concurrency::tasks::{CastResponse, GenServer, GenServerHandle, send_after};
use spawned_concurrency::tasks::{
CastResponse, GenServer, GenServerHandle, InitResult, Success, send_after,
};
use std::{cmp::min, sync::Arc};
use tracing::{debug, error, info, warn};

Expand Down Expand Up @@ -271,14 +273,14 @@ impl GenServer for L1Watcher {
type OutMsg = OutMessage;
type Error = L1WatcherError;

async fn init(self, handle: &GenServerHandle<Self>) -> Result<Self, Self::Error> {
async fn init(self, handle: &GenServerHandle<Self>) -> Result<InitResult<Self>, Self::Error> {
// Perform the check and suscribe a periodic Watch.
handle
.clone()
.cast(Self::CastMsg::Watch)
.await
.map_err(Self::Error::GenServerError)?;
Ok(self)
Ok(Success(self))
}

async fn handle_cast(
Expand Down
8 changes: 4 additions & 4 deletions crates/l2/tee/quote-gen/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions crates/networking/p2p/rlpx/connection/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@ pub(crate) async fn perform(
InnerState::Established(_) => {
return Err(RLPxError::StateError("Already established".to_string()));
}
// Shouldn't perform a Handshake on an already failed connection.
// Put it here to complete the match arms
InnerState::HandshakeFailed => {
return Err(RLPxError::StateError("Handshake Failed".to_string()));
}
};
let (sink, stream) = framed.split();
Ok((
Expand Down
78 changes: 56 additions & 22 deletions crates/networking/p2p/rlpx/connection/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@ use rand::random;
use secp256k1::{PublicKey, SecretKey};
use spawned_concurrency::{
messages::Unused,
tasks::{CastResponse, GenServer, GenServerHandle, send_interval, spawn_listener},
tasks::{
CastResponse, GenServer, GenServerHandle,
InitResult::{self, NoSuccess, Success},
send_interval, spawn_listener,
},
};
use spawned_rt::tasks::BroadcastStream;
use tokio::{
Expand Down Expand Up @@ -124,8 +128,23 @@ pub struct Established {
pub(crate) l2_state: L2ConnState,
}

impl Established {
async fn teardown(&self) {
// Closing the sink. It may fail if it is already closed (eg. the other side already closed it)
// Just logging a debug line if that's the case.
let _ = self
.sink
.lock()
.await
.close()
.await
.inspect_err(|err| debug!("Could not close the socket: {err}"));
}
}

#[derive(Clone, Debug)]
pub enum InnerState {
HandshakeFailed,
Initiator(Initiator),
Receiver(Receiver),
Established(Established),
Expand Down Expand Up @@ -207,22 +226,38 @@ impl GenServer for RLPxConnection {
type OutMsg = MsgResult;
type Error = RLPxError;

async fn init(mut self, handle: &GenServerHandle<Self>) -> Result<Self, Self::Error> {
let (mut established_state, stream) = handshake::perform(self.inner_state).await?;
log_peer_debug(&established_state.node, "Starting RLPx connection");

if let Err(reason) = initialize_connection(handle, &mut established_state, stream).await {
connection_failed(
&mut established_state,
"Failed to initialize RLPx connection",
reason,
)
.await;
Err(RLPxError::Disconnected())
} else {
// New state
self.inner_state = InnerState::Established(established_state);
Ok(self)
async fn init(
mut self,
handle: &GenServerHandle<Self>,
) -> Result<InitResult<Self>, Self::Error> {
match handshake::perform(self.inner_state).await {
Ok((mut established_state, stream)) => {
log_peer_debug(&established_state.node, "Starting RLPx connection");

if let Err(reason) =
initialize_connection(handle, &mut established_state, stream).await
{
connection_failed(
&mut established_state,
"Failed to initialize RLPx connection",
reason,
)
.await;
self.inner_state = InnerState::Established(established_state);
Ok(NoSuccess(self))
} else {
// New state
self.inner_state = InnerState::Established(established_state);
Ok(Success(self))
}
}
Err(err) => {
// Handshake failed, just log a debug message.
// No connection was established so no need to perform any other action
debug!("Failed Handshake on RLPx connection {err}");
self.inner_state = InnerState::HandshakeFailed;
Ok(NoSuccess(self))
}
}
}

Expand Down Expand Up @@ -333,9 +368,9 @@ impl GenServer for RLPxConnection {
.lock()
.await
.replace_peer(established_state.node.node_id());
established_state.sink.lock().await.close().await?;
established_state.teardown().await;
}
InnerState::Initiator(_) | InnerState::Receiver(_) => {
_ => {
// Nothing to do if the connection was not established
}
};
Expand Down Expand Up @@ -558,8 +593,6 @@ async fn send_disconnect_message(state: &mut Established, reason: Option<Disconn
}

async fn connection_failed(state: &mut Established, error_text: &str, error: RLPxError) {
log_peer_debug(&state.node, &format!("{error_text}: ({error})"));

// Send disconnect message only if error is different than RLPxError::DisconnectRequested
// because if it is a DisconnectRequested error it means that the peer requested the disconnection, not us.
if !matches!(error, RLPxError::DisconnectReceived(_)) {
Expand All @@ -571,6 +604,7 @@ async fn connection_failed(state: &mut Established, error_text: &str, error: RLP
// already connected, don't discard it
RLPxError::DisconnectReceived(DisconnectReason::AlreadyConnected)
| RLPxError::DisconnectSent(DisconnectReason::AlreadyConnected) => {
log_peer_debug(&state.node, &format!("{error_text}: ({error})"));
log_peer_debug(&state.node, "Peer already connected, don't replace it");
}
_ => {
Expand All @@ -583,7 +617,7 @@ async fn connection_failed(state: &mut Established, error_text: &str, error: RLP
}
}

let _ = state.sink.lock().await.close().await;
state.teardown().await;
}

fn match_disconnect_reason(error: &RLPxError) -> Option<DisconnectReason> {
Expand Down