diff --git a/Cargo.lock b/Cargo.lock index 3e25612318..11a65fe51a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11125,9 +11125,9 @@ dependencies = [ [[package]] name = "spawned-concurrency" -version = "0.2.1" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "36bfc580e2c0cf6ebb73f7103f315309124cbb85e3484d526088d12a466bd354" +checksum = "632387372394c445aeb504bcff82918a5a3963b54168d5efc7531e9db17adc26" dependencies = [ "futures", "spawned-rt", @@ -11137,9 +11137,9 @@ dependencies = [ [[package]] name = "spawned-rt" -version = "0.2.1" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81a7fb722e293601d83f2affdedf3b65b4e26a3bd918ae43320a64190d090edb" +checksum = "c29e9b02f1ed35706a607a20da5b5568eadc023ab2372e175cb654cd2347e307" dependencies = [ "crossbeam", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 985e2e2a35..d6c78aa10f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -108,8 +108,8 @@ url = "2.5.4" kzg-rs = "0.2.6" libsql = "0.9.10" futures = "0.3.31" -spawned-concurrency = "0.2.1" -spawned-rt = "0.2.1" +spawned-concurrency = "0.2.2" +spawned-rt = "0.2.2" lambdaworks-crypto = "0.11.0" tui-logger = { version = "0.17.3", features = ["tracing-support"] } rayon = "1.10.0" diff --git a/cmd/ethrex/initializers.rs b/cmd/ethrex/initializers.rs index 03fbb1bbd7..b80e726773 100644 --- a/cmd/ethrex/initializers.rs +++ b/cmd/ethrex/initializers.rs @@ -22,7 +22,6 @@ use std::{ fs, net::{Ipv4Addr, SocketAddr}, path::{Path, PathBuf}, - str::FromStr, sync::Arc, time::{SystemTime, UNIX_EPOCH}, }; @@ -33,14 +32,8 @@ use tracing_subscriber::{EnvFilter, FmtSubscriber, filter::Directive}; pub fn init_tracing(opts: &Options) { let log_filter = EnvFilter::builder() - .with_default_directive( - // Filters all spawned logs - // TODO: revert #3467 when error logs are no longer emitted - Directive::from_str("spawned_concurrency::tasks::gen_server=off") - .expect("this can't fail"), - ) - .from_env_lossy() - .add_directive(Directive::from(opts.log_level)); + .with_default_directive(Directive::from(opts.log_level)) + .from_env_lossy(); let subscriber = FmtSubscriber::builder() .with_env_filter(log_filter) .finish(); diff --git a/crates/l2/monitor/app.rs b/crates/l2/monitor/app.rs index 0fe30558f9..74e138b2b8 100644 --- a/crates/l2/monitor/app.rs +++ b/crates/l2/monitor/app.rs @@ -17,7 +17,10 @@ use ratatui::{ }; use spawned_concurrency::{ messages::Unused, - tasks::{CastResponse, GenServer, GenServerHandle, send_interval, spawn_listener}, + tasks::{ + CastResponse, GenServer, GenServerHandle, InitResult, Success, send_interval, + spawn_listener, + }, }; use std::io; use std::sync::Arc; @@ -109,7 +112,7 @@ impl GenServer for EthrexMonitor { type OutMsg = OutMessage; type Error = MonitorError; - async fn init(self, handle: &GenServerHandle) -> Result { + async fn init(self, handle: &GenServerHandle) -> Result, Self::Error> { // Tick handling send_interval( Duration::from_millis(self.widget.cfg.tick_rate), @@ -122,7 +125,7 @@ impl GenServer for EthrexMonitor { |event: Event| Self::CastMsg::Event(event), EventStream::new(), ); - Ok(self) + Ok(Success(self)) } async fn handle_cast( diff --git a/crates/l2/sequencer/l1_watcher.rs b/crates/l2/sequencer/l1_watcher.rs index a4bfcef1fa..ac56277223 100644 --- a/crates/l2/sequencer/l1_watcher.rs +++ b/crates/l2/sequencer/l1_watcher.rs @@ -16,7 +16,9 @@ use ethrex_rpc::{ use ethrex_storage::Store; use keccak_hash::keccak; use spawned_concurrency::messages::Unused; -use spawned_concurrency::tasks::{CastResponse, GenServer, GenServerHandle, send_after}; +use spawned_concurrency::tasks::{ + CastResponse, GenServer, GenServerHandle, InitResult, Success, send_after, +}; use std::{cmp::min, sync::Arc}; use tracing::{debug, error, info, warn}; @@ -271,14 +273,14 @@ impl GenServer for L1Watcher { type OutMsg = OutMessage; type Error = L1WatcherError; - async fn init(self, handle: &GenServerHandle) -> Result { + async fn init(self, handle: &GenServerHandle) -> Result, Self::Error> { // Perform the check and suscribe a periodic Watch. handle .clone() .cast(Self::CastMsg::Watch) .await .map_err(Self::Error::GenServerError)?; - Ok(self) + Ok(Success(self)) } async fn handle_cast( diff --git a/crates/l2/tee/quote-gen/Cargo.lock b/crates/l2/tee/quote-gen/Cargo.lock index 619fee0a07..b5cedcc550 100644 --- a/crates/l2/tee/quote-gen/Cargo.lock +++ b/crates/l2/tee/quote-gen/Cargo.lock @@ -6366,9 +6366,9 @@ dependencies = [ [[package]] name = "spawned-concurrency" -version = "0.2.1" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "36bfc580e2c0cf6ebb73f7103f315309124cbb85e3484d526088d12a466bd354" +checksum = "632387372394c445aeb504bcff82918a5a3963b54168d5efc7531e9db17adc26" dependencies = [ "futures", "spawned-rt", @@ -6378,9 +6378,9 @@ dependencies = [ [[package]] name = "spawned-rt" -version = "0.2.1" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81a7fb722e293601d83f2affdedf3b65b4e26a3bd918ae43320a64190d090edb" +checksum = "c29e9b02f1ed35706a607a20da5b5568eadc023ab2372e175cb654cd2347e307" dependencies = [ "crossbeam", "tokio", diff --git a/crates/networking/p2p/rlpx/connection/handshake.rs b/crates/networking/p2p/rlpx/connection/handshake.rs index 4d59c76c78..ad279c282c 100644 --- a/crates/networking/p2p/rlpx/connection/handshake.rs +++ b/crates/networking/p2p/rlpx/connection/handshake.rs @@ -111,6 +111,11 @@ pub(crate) async fn perform( InnerState::Established(_) => { return Err(RLPxError::StateError("Already established".to_string())); } + // Shouldn't perform a Handshake on an already failed connection. + // Put it here to complete the match arms + InnerState::HandshakeFailed => { + return Err(RLPxError::StateError("Handshake Failed".to_string())); + } }; let (sink, stream) = framed.split(); Ok(( diff --git a/crates/networking/p2p/rlpx/connection/server.rs b/crates/networking/p2p/rlpx/connection/server.rs index 119b2e358d..3b2402c010 100644 --- a/crates/networking/p2p/rlpx/connection/server.rs +++ b/crates/networking/p2p/rlpx/connection/server.rs @@ -16,7 +16,11 @@ use rand::random; use secp256k1::{PublicKey, SecretKey}; use spawned_concurrency::{ messages::Unused, - tasks::{CastResponse, GenServer, GenServerHandle, send_interval, spawn_listener}, + tasks::{ + CastResponse, GenServer, GenServerHandle, + InitResult::{self, NoSuccess, Success}, + send_interval, spawn_listener, + }, }; use spawned_rt::tasks::BroadcastStream; use tokio::{ @@ -124,8 +128,23 @@ pub struct Established { pub(crate) l2_state: L2ConnState, } +impl Established { + async fn teardown(&self) { + // Closing the sink. It may fail if it is already closed (eg. the other side already closed it) + // Just logging a debug line if that's the case. + let _ = self + .sink + .lock() + .await + .close() + .await + .inspect_err(|err| debug!("Could not close the socket: {err}")); + } +} + #[derive(Clone, Debug)] pub enum InnerState { + HandshakeFailed, Initiator(Initiator), Receiver(Receiver), Established(Established), @@ -207,22 +226,38 @@ impl GenServer for RLPxConnection { type OutMsg = MsgResult; type Error = RLPxError; - async fn init(mut self, handle: &GenServerHandle) -> Result { - let (mut established_state, stream) = handshake::perform(self.inner_state).await?; - log_peer_debug(&established_state.node, "Starting RLPx connection"); - - if let Err(reason) = initialize_connection(handle, &mut established_state, stream).await { - connection_failed( - &mut established_state, - "Failed to initialize RLPx connection", - reason, - ) - .await; - Err(RLPxError::Disconnected()) - } else { - // New state - self.inner_state = InnerState::Established(established_state); - Ok(self) + async fn init( + mut self, + handle: &GenServerHandle, + ) -> Result, Self::Error> { + match handshake::perform(self.inner_state).await { + Ok((mut established_state, stream)) => { + log_peer_debug(&established_state.node, "Starting RLPx connection"); + + if let Err(reason) = + initialize_connection(handle, &mut established_state, stream).await + { + connection_failed( + &mut established_state, + "Failed to initialize RLPx connection", + reason, + ) + .await; + self.inner_state = InnerState::Established(established_state); + Ok(NoSuccess(self)) + } else { + // New state + self.inner_state = InnerState::Established(established_state); + Ok(Success(self)) + } + } + Err(err) => { + // Handshake failed, just log a debug message. + // No connection was established so no need to perform any other action + debug!("Failed Handshake on RLPx connection {err}"); + self.inner_state = InnerState::HandshakeFailed; + Ok(NoSuccess(self)) + } } } @@ -333,9 +368,9 @@ impl GenServer for RLPxConnection { .lock() .await .replace_peer(established_state.node.node_id()); - established_state.sink.lock().await.close().await?; + established_state.teardown().await; } - InnerState::Initiator(_) | InnerState::Receiver(_) => { + _ => { // Nothing to do if the connection was not established } }; @@ -558,8 +593,6 @@ async fn send_disconnect_message(state: &mut Established, reason: Option { + log_peer_debug(&state.node, &format!("{error_text}: ({error})")); log_peer_debug(&state.node, "Peer already connected, don't replace it"); } _ => { @@ -583,7 +617,7 @@ async fn connection_failed(state: &mut Established, error_text: &str, error: RLP } } - let _ = state.sink.lock().await.close().await; + state.teardown().await; } fn match_disconnect_reason(error: &RLPxError) -> Option {