diff --git a/Cargo.lock b/Cargo.lock index 4f26c6c..22e882a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1378,6 +1378,16 @@ dependencies = [ "version_check", ] +[[package]] +name = "core-foundation" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2a6cd9ae233e7f62ba4e9353e81a88df7fc8a5987b8d445b4d90c879bd156f6" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "core-foundation-sys" version = "0.8.7" @@ -1686,6 +1696,18 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "enum-as-inner" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1e6a265c649f3f5979b601d26f1d05ada116434c87741c9493cb56218f76cbc" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn 2.0.110", +] + [[package]] name = "enum-ordinalize" version = "4.3.2" @@ -2752,6 +2774,12 @@ dependencies = [ "thiserror", ] +[[package]] +name = "openssl-probe" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" + [[package]] name = "parity-scale-codec" version = "3.7.5" @@ -3312,6 +3340,15 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rlimit" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7043b63bd0cd1aaa628e476b80e6d4023a3b50eb32789f2728908107bd0c793a" +dependencies = [ + "libc", +] + [[package]] name = "rlp" version = "0.5.2" @@ -3324,7 +3361,7 @@ dependencies = [ [[package]] name = "rproxy" -version = "0.0.9" +version = "0.0.10" dependencies = [ "actix", "actix-http", @@ -3349,6 +3386,7 @@ dependencies = [ "hex", "http 1.3.1", "humantime", + "libc", "moka", "op-alloy-consensus", "parking_lot", @@ -3356,6 +3394,7 @@ dependencies = [ "pnet", "prometheus-client", "rand 0.9.2", + "rlimit", "rustc-hash", "rustls", "rustls-pemfile", @@ -3363,6 +3402,7 @@ dependencies = [ "serde", "serde_json", "socket2 0.6.1", + "sysctl", "thiserror", "time", "tokio", @@ -3479,6 +3519,18 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustls-native-certs" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9980d917ebb0c0536119ba501e90834767bffc3d60641457fd84a1f3fd337923" +dependencies = [ + "openssl-probe", + "rustls-pki-types", + "schannel", + "security-framework", +] + [[package]] name = "rustls-pemfile" version = "2.2.0" @@ -3539,6 +3591,15 @@ version = "5.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4dfac11c0cd0606aaf7eb9ef66f82c119438a96dc487715abb8b57fdf08ad4fe" +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + [[package]] name = "scc" version = "3.4.2" @@ -3549,6 +3610,15 @@ dependencies = [ "sdd", ] +[[package]] +name = "schannel" +version = "0.1.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "891d81b926048e76efe18581bf793546b4c0eaf8448d72be8de2bbee5fd166e1" +dependencies = [ + "windows-sys 0.61.2", +] + [[package]] name = "schemars" version = "0.9.0" @@ -3619,6 +3689,29 @@ dependencies = [ "cc", ] +[[package]] +name = "security-framework" +version = "3.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3297343eaf830f66ede390ea39da1d462b6b0c1b000f420d0a83f898bbbe6ef" +dependencies = [ + "bitflags", + "core-foundation", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework-sys" +version = "2.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc1f0cbffaac4852523ce30d8bd3c5cdc873501d96ff467ca09b6767bb8cd5c0" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "semver" version = "0.11.0" @@ -3928,6 +4021,20 @@ dependencies = [ "syn 2.0.110", ] +[[package]] +name = "sysctl" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cca424247104946a59dacd27eaad296223b7feec3d168a6dd04585183091eb0b" +dependencies = [ + "bitflags", + "byteorder", + "enum-as-inner", + "libc", + "thiserror", + "walkdir", +] + [[package]] name = "tagptr" version = "0.2.0" @@ -4109,7 +4216,11 @@ checksum = "489a59b6730eda1b0171fcfda8b121f4bee2b35cba8645ca35c5f7ba3eb736c1" dependencies = [ "futures-util", "log", + "rustls", + "rustls-native-certs", + "rustls-pki-types", "tokio", + "tokio-rustls", "tungstenite", ] @@ -4324,6 +4435,8 @@ dependencies = [ "httparse", "log", "rand 0.9.2", + "rustls", + "rustls-pki-types", "sha1", "thiserror", "utf-8", @@ -4463,6 +4576,16 @@ dependencies = [ "libc", ] +[[package]] +name = "walkdir" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b" +dependencies = [ + "same-file", + "winapi-util", +] + [[package]] name = "want" version = "0.3.1" @@ -4548,6 +4671,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" +[[package]] +name = "winapi-util" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" +dependencies = [ + "windows-sys 0.61.2", +] + [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" diff --git a/crates/rproxy/Cargo.toml b/crates/rproxy/Cargo.toml index 8a6a4dc..fe5fa91 100644 --- a/crates/rproxy/Cargo.toml +++ b/crates/rproxy/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rproxy" -version = "0.0.9" +version = "0.0.10" edition = "2024" default-run = "rproxy" @@ -40,6 +40,7 @@ futures-core = "0.3.31" hex = "0.4.3" http = "1.3.1" humantime = "2.2.0" +libc = "0.2.177" moka = { version = "0.12.11", features = ["sync"] } op-alloy-consensus = "0.20.0" parking_lot = "0.12.4" @@ -47,6 +48,7 @@ pin-project = "1.1.10" pnet = "0.35.0" prometheus-client = { git = "https://github.com/0x416e746f6e/client_rust.git", branch = "nested-labels"} rand = { version = "0.9.2", optional = true } +rlimit = "0.10.2" rustc-hash = "2.1.1" rustls = "0.23.32" rustls-pemfile = "2.2.0" @@ -54,10 +56,11 @@ scc = "3.0.2" serde = { version = "1.0.219", features = ["derive"] } serde_json = "1.0.143" socket2 = "0.6.0" +sysctl = "0.7.1" thiserror = "2.0.16" time = { version = "0.3.43", features = ["formatting"] } tokio = { version = "1.47.1", features = ["macros", "rt", "rt-multi-thread", "signal"] } -tokio-tungstenite = "0.27.0" +tokio-tungstenite = {version = "0.27.0", features = ["rustls-tls-native-roots"] } tokio-util = "0.7.16" tracing = { version = "0.1.41", features = ["valuable"] } tracing-subscriber = { version = "0.3.20", features = ["env-filter", "json", "valuable"] } diff --git a/crates/rproxy/src/config.rs b/crates/rproxy/src/config.rs index 7933bcd..cf910d9 100644 --- a/crates/rproxy/src/config.rs +++ b/crates/rproxy/src/config.rs @@ -1,7 +1,9 @@ use std::{process, sync::LazyLock}; use clap::Parser; +use sysctl::Sysctl; use thiserror::Error; +use x509_parser::asn1_rs::ToStatic; use crate::server::{ config::{ConfigLogging, ConfigLoggingError, ConfigMetrics, ConfigMetricsError}, @@ -26,6 +28,72 @@ pub(crate) static PARALLELISM: LazyLock = pub(crate) static PARALLELISM_STRING: LazyLock = LazyLock::new(|| PARALLELISM.to_string()); +pub(crate) static TCP_KEEPALIVE_INTERVAL: LazyLock = LazyLock::new(|| { + #[cfg(target_os = "linux")] + { + let mut res: libc::c_int = 75; + if let Ok(ctl) = sysctl::Ctl::new("net.ipv4.tcp_keepalive_intvl") && + let Ok(value) = ctl.value() && + let Ok(value) = value.into_int() + { + res = value + } + return res; + } + + #[cfg(target_os = "macos")] + { + let mut res: libc::c_int = 75000; + if let Ok(ctl) = sysctl::Ctl::new("net.inet.tcp.keepintvl") && + let Ok(value) = ctl.value() && + let Ok(value) = value.into_int() + { + res = value / 1000 // millis on macos + } + + return res; + } + + #[allow(unreachable_code)] + 75 +}); + +pub(crate) static TCP_KEEPALIVE_INTERVAL_STRING: LazyLock = + LazyLock::new(|| format!("{}s", TCP_KEEPALIVE_INTERVAL.to_static())); + +pub(crate) static TCP_KEEPALIVE_PROBES: LazyLock = LazyLock::new(|| { + #[cfg(target_os = "linux")] + { + let mut res: libc::c_int = 9; + if let Ok(ctl) = sysctl::Ctl::new("net.ipv4.tcp_keepalive_probes") && + let Ok(value) = ctl.value() && + let Ok(value) = value.into_int() + { + res = value + } + return res; + } + + #[cfg(target_os = "macos")] + { + let mut res: libc::c_int = 8; + if let Ok(ctl) = sysctl::Ctl::new("net.inet.tcp.keepcnt") && + let Ok(value) = ctl.value() && + let Ok(value) = value.into_int() + { + res = value + } + + return res; + } + + #[allow(unreachable_code)] + 8 +}); + +pub(crate) static TCP_KEEPALIVE_PROBES_STRING: LazyLock = + LazyLock::new(|| TCP_KEEPALIVE_PROBES.to_string()); + // Config -------------------------------------------------------------- #[derive(Clone, Parser)] diff --git a/crates/rproxy/src/server.rs b/crates/rproxy/src/server.rs index 5485b0b..054e166 100644 --- a/crates/rproxy/src/server.rs +++ b/crates/rproxy/src/server.rs @@ -1,5 +1,4 @@ pub mod config; - pub(crate) mod metrics; pub(crate) mod proxy; @@ -13,7 +12,7 @@ use tokio::{ task::JoinHandle, }; use tokio_util::sync::CancellationToken; -use tracing::{error, info}; +use tracing::{error, info, warn}; use crate::{ config::Config, @@ -29,6 +28,8 @@ use crate::{ utils::tls_certificate_validity_timestamps, }; +const MAX_OPEN_FILES: u64 = 10240; + // Proxy --------------------------------------------------------------- pub struct Server {} @@ -38,6 +39,30 @@ impl Server { let canceller = Server::wait_for_shutdown_signal(); let resetter = Server::wait_for_reset_signal(canceller.clone()); + // try to set system limits + match rlimit::getrlimit(rlimit::Resource::NOFILE) { + Ok((mut soft, hard)) => { + soft = std::cmp::max(soft, MAX_OPEN_FILES); + soft = std::cmp::min(soft, hard); + + if let Err(err) = rlimit::setrlimit(rlimit::Resource::NOFILE, soft, hard) { + warn!( + error = ?err, + hard = hard, + soft = soft, + "Failed to set max open file limits", + ); + } + } + + Err(err) => { + warn!( + error = ?err, + "Failed to read max open file limits", + ); + } + } + // spawn metrics service let metrics = Arc::new(Metrics::new(config.metrics.clone())); { diff --git a/crates/rproxy/src/server/metrics.rs b/crates/rproxy/src/server/metrics.rs index d1cac6b..638846d 100644 --- a/crates/rproxy/src/server/metrics.rs +++ b/crates/rproxy/src/server/metrics.rs @@ -341,15 +341,15 @@ impl Metrics { socket.set_nonblocking(true)?; // allow time to flush buffers on close - socket.set_linger(Some(Duration::from_secs(1)))?; + socket.set_linger(Some(Duration::from_millis(5000)))?; // allow binding to the socket while there are still TIME_WAIT connections socket.set_reuse_address(true)?; socket.set_tcp_keepalive( &TcpKeepalive::new() - .with_time(Duration::from_secs(15)) - .with_interval(Duration::from_secs(15)) + .with_time(Duration::from_millis(15000)) + .with_interval(Duration::from_millis(15000)) .with_retries(4), )?; diff --git a/crates/rproxy/src/server/proxy/circuit_breaker.rs b/crates/rproxy/src/server/proxy/circuit_breaker.rs index f8cb786..d0959f7 100644 --- a/crates/rproxy/src/server/proxy/circuit_breaker.rs +++ b/crates/rproxy/src/server/proxy/circuit_breaker.rs @@ -51,7 +51,7 @@ impl CircuitBreaker { #[inline] fn timeout(config: &ConfigCircuitBreaker) -> Duration { - std::cmp::min(Duration::from_secs(5), config.poll_interval * 3 / 4) + std::cmp::min(Duration::from_millis(5000), config.poll_interval * 3 / 4) } #[inline] diff --git a/crates/rproxy/src/server/proxy/config/authrpc.rs b/crates/rproxy/src/server/proxy/config/authrpc.rs index 054bd28..7a172d0 100644 --- a/crates/rproxy/src/server/proxy/config/authrpc.rs +++ b/crates/rproxy/src/server/proxy/config/authrpc.rs @@ -9,7 +9,7 @@ use tracing::warn; use url::Url; use crate::{ - config::ALREADY_VALIDATED, + config::{ALREADY_VALIDATED, TCP_KEEPALIVE_INTERVAL_STRING, TCP_KEEPALIVE_PROBES_STRING}, server::proxy::http::config::{ConfigProxyHttp, ConfigProxyHttpMirroringStrategy}, utils::get_all_local_ip_addresses, }; @@ -84,6 +84,30 @@ pub(crate) struct ConfigAuthrpc { )] pub(crate) idle_connection_timeout: Duration, + /// interval between tcp keepalive packets on authrpc connections + #[arg( + default_value = TCP_KEEPALIVE_INTERVAL_STRING.as_str(), + env = "RPROXY_AUTHRPC_KEEPALIVE_INTERVAL", + help_heading = "authrpc", + long("authrpc-keepalive-interval"), + name("authrpc_keepalive_interval"), + value_name = "duration", + value_parser = humantime::parse_duration + )] + pub(crate) keepalive_interval: Duration, + + /// maximum number of keepalive probes to send before dropping authrpc + /// connection + #[arg( + default_value = TCP_KEEPALIVE_PROBES_STRING.as_str(), + env = "RPROXY_AUTHRPC_KEEPALIVE_RETRIES", + help_heading = "authrpc", + long("authrpc-keepalive-retries"), + name("authrpc_keepalive_retries"), + value_name = "count" + )] + pub(crate) keepalive_retries: u32, + /// host:port for authrpc proxy #[arg( default_value = "0.0.0.0:8651", @@ -353,6 +377,16 @@ impl ConfigProxyHttp for ConfigAuthrpc { self.idle_connection_timeout } + #[inline] + fn keepalive_interval(&self) -> Duration { + self.keepalive_interval + } + + #[inline] + fn keepalive_retries(&self) -> u32 { + self.keepalive_retries + } + #[inline] fn listen_address(&self) -> SocketAddr { self.listen_address.parse::().expect(ALREADY_VALIDATED) diff --git a/crates/rproxy/src/server/proxy/config/flashblocks.rs b/crates/rproxy/src/server/proxy/config/flashblocks.rs index 54fd650..ad12ae2 100644 --- a/crates/rproxy/src/server/proxy/config/flashblocks.rs +++ b/crates/rproxy/src/server/proxy/config/flashblocks.rs @@ -4,12 +4,28 @@ use awc::http::Uri; use clap::Args; use thiserror::Error; -use crate::{config::ALREADY_VALIDATED, server::proxy::ws::config::ConfigProxyWs}; +use crate::{ + config::{ALREADY_VALIDATED, TCP_KEEPALIVE_INTERVAL_STRING, TCP_KEEPALIVE_PROBES_STRING}, + server::proxy::ws::config::ConfigProxyWs, +}; // ConfigFlashblocks --------------------------------------------------- #[derive(Args, Clone, Debug)] pub(crate) struct ConfigFlashblocks { + /// timeout to establish backend connections of to receive pong + /// websocket response + #[arg( + default_value = "30s", + env = "RPROXY_FLASHBLOCKS_BACKEND_TIMEOUT", + help_heading = "flashblocks", + long("flashblocks-backend-timeout"), + name("flashblocks_backend_timeout"), + value_name = "duration", + value_parser = humantime::parse_duration + )] + pub(crate) backend_timeout: Duration, + /// url of flashblocks backend #[arg( default_value = "ws://127.0.0.1:11111", @@ -29,19 +45,29 @@ pub(crate) struct ConfigFlashblocks { name("flashblocks_enabled") )] pub(crate) enabled: bool, - - /// timeout to establish backend connections of to receive pong - /// websocket response + /// interval between tcp keepalive packets on flashblocks connections #[arg( - default_value = "30s", - env = "RPROXY_FLASHBLOCKS_BACKEND_TIMEOUT", + default_value = TCP_KEEPALIVE_INTERVAL_STRING.as_str(), + env = "RPROXY_FLASHBLOCKS_KEEPALIVE_INTERVAL", help_heading = "flashblocks", - long("flashblocks-backend-timeout"), - name("flashblocks_backend_timeout"), + long("flashblocks-keepalive-interval"), + name("flashblocks_keepalive_interval"), value_name = "duration", value_parser = humantime::parse_duration )] - pub(crate) backend_timeout: Duration, + pub(crate) keepalive_interval: Duration, + + /// maximum number of keepalive probes to send before dropping + /// flashblocks connection + #[arg( + default_value = TCP_KEEPALIVE_PROBES_STRING.as_str(), + env = "RPROXY_FLASHBLOCKS_KEEPALIVE_RETRIES", + help_heading = "flashblocks", + long("flashblocks-keepalive-retries"), + name("flashblocks_keepalive_retries"), + value_name = "count" + )] + pub(crate) keepalive_retries: u32, /// host:port for flashblocks proxy #[arg( @@ -212,6 +238,16 @@ impl ConfigProxyWs for ConfigFlashblocks { self.backend_url.parse::().expect(ALREADY_VALIDATED) } + #[inline] + fn keepalive_interval(&self) -> Duration { + self.keepalive_interval + } + + #[inline] + fn keepalive_retries(&self) -> u32 { + self.keepalive_retries + } + #[inline] fn listen_address(&self) -> SocketAddr { self.listen_address.parse::().expect(ALREADY_VALIDATED) diff --git a/crates/rproxy/src/server/proxy/config/rpc.rs b/crates/rproxy/src/server/proxy/config/rpc.rs index 2a9c2ad..2b17549 100644 --- a/crates/rproxy/src/server/proxy/config/rpc.rs +++ b/crates/rproxy/src/server/proxy/config/rpc.rs @@ -9,7 +9,12 @@ use tracing::warn; use url::Url; use crate::{ - config::{ALREADY_VALIDATED, PARALLELISM_STRING}, + config::{ + ALREADY_VALIDATED, + PARALLELISM_STRING, + TCP_KEEPALIVE_INTERVAL_STRING, + TCP_KEEPALIVE_PROBES_STRING, + }, server::proxy::http::config::{ConfigProxyHttp, ConfigProxyHttpMirroringStrategy}, utils::get_all_local_ip_addresses, }; @@ -73,6 +78,30 @@ pub(crate) struct ConfigRpc { )] pub(crate) idle_connection_timeout: Duration, + /// interval between tcp keepalive packets on rpc connections + #[arg( + default_value = TCP_KEEPALIVE_INTERVAL_STRING.as_str(), + env = "RPROXY_RPC_KEEPALIVE_INTERVAL", + help_heading = "rpc", + long("rpc-keepalive-interval"), + name("rpc_keepalive_interval"), + value_name = "duration", + value_parser = humantime::parse_duration + )] + pub(crate) keepalive_interval: Duration, + + /// maximum number of keepalive probes to send before dropping rpc + /// connection + #[arg( + default_value = TCP_KEEPALIVE_PROBES_STRING.as_str(), + env = "RPROXY_RPC_KEEPALIVE_RETRIES", + help_heading = "rpc", + long("rpc-keepalive-retries"), + name("rpc_keepalive_retries"), + value_name = "count" + )] + pub(crate) keepalive_retries: u32, + /// host:port for rpc proxy #[arg( default_value = "0.0.0.0:8645", @@ -349,6 +378,16 @@ impl ConfigProxyHttp for ConfigRpc { self.idle_connection_timeout } + #[inline] + fn keepalive_interval(&self) -> Duration { + self.keepalive_interval + } + + #[inline] + fn keepalive_retries(&self) -> u32 { + self.keepalive_retries + } + #[inline] fn listen_address(&self) -> SocketAddr { self.listen_address.parse::().expect(ALREADY_VALIDATED) diff --git a/crates/rproxy/src/server/proxy/connection_guard.rs b/crates/rproxy/src/server/proxy/connection_guard.rs index d1c04ef..804db32 100644 --- a/crates/rproxy/src/server/proxy/connection_guard.rs +++ b/crates/rproxy/src/server/proxy/connection_guard.rs @@ -4,13 +4,17 @@ use std::{ Arc, atomic::{AtomicI64, Ordering}, }, + time::Duration, }; use actix_web::dev::Extensions; use tracing::{debug, warn}; use uuid::Uuid; -use crate::server::metrics::{LabelsProxy, Metrics}; +use crate::{ + server::metrics::{LabelsProxy, Metrics}, + utils::setup_keepalive, +}; // ProxyConnectionGuard ------------------------------------------------ @@ -47,6 +51,7 @@ impl ConnectionGuard { proxy: &'static str, metrics: Arc, client_connections_count: Arc, + keep_alive_interval: Duration, ) -> impl Fn(&dyn Any, &mut Extensions) { move |connection, extensions| { { @@ -64,12 +69,23 @@ impl ConnectionGuard { Some(stream) } else { warn!( + proxy = proxy, connection_type = std::any::type_name_of_val(connection), "Unexpected connection type", ); None }; + if let Some(stream) = stream && + let Err(err) = setup_keepalive(stream, keep_alive_interval) + { + warn!( + proxy = proxy, + error = ?err, + "Failed to set keepalive interval", + ); + } + if let Some(stream) = stream { let id = Uuid::now_v7(); diff --git a/crates/rproxy/src/server/proxy/http/authrpc.rs b/crates/rproxy/src/server/proxy/http/authrpc.rs index 80edb63..68ad350 100644 --- a/crates/rproxy/src/server/proxy/http/authrpc.rs +++ b/crates/rproxy/src/server/proxy/http/authrpc.rs @@ -36,7 +36,7 @@ impl ProxyHttpInner for ProxyHttpInnerAuthrpc { Self { config, fcu_cache: Cache::builder() - .time_to_live(Duration::from_secs(60)) + .time_to_live(Duration::from_millis(60000)) .max_capacity(4096) .build_with_hasher(FxBuildHasher), } diff --git a/crates/rproxy/src/server/proxy/http/config.rs b/crates/rproxy/src/server/proxy/http/config.rs index f75201b..7985984 100644 --- a/crates/rproxy/src/server/proxy/http/config.rs +++ b/crates/rproxy/src/server/proxy/http/config.rs @@ -9,6 +9,8 @@ pub(crate) trait ConfigProxyHttp: Clone + Send + Unpin + 'static { fn backend_timeout(&self) -> Duration; fn backend_url(&self) -> Url; fn idle_connection_timeout(&self) -> Duration; + fn keepalive_interval(&self) -> Duration; + fn keepalive_retries(&self) -> u32; fn listen_address(&self) -> SocketAddr; fn log_mirrored_requests(&self) -> bool; fn log_mirrored_responses(&self) -> bool; diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs index b71354c..2c186e4 100644 --- a/crates/rproxy/src/server/proxy/http/proxy.rs +++ b/crates/rproxy/src/server/proxy/http/proxy.rs @@ -3,6 +3,7 @@ use std::{ fmt::Debug, marker::PhantomData, mem, + ops::Add, pin::Pin, str::FromStr, sync::{ @@ -10,6 +11,7 @@ use std::{ atomic::{AtomicI64, AtomicUsize, Ordering}, }, task::{Context, Poll}, + time::Duration, }; use actix::{Actor, AsyncContext, WrapFuture}; @@ -63,8 +65,6 @@ use crate::{ utils::{Loggable, decompress, is_hop_by_hop_header, raw_transaction_to_hash}, }; -const TCP_KEEPALIVE_ATTEMPTS: u32 = 8; - // ProxyHttp ----------------------------------------------------------- pub(crate) struct ProxyHttp @@ -160,8 +160,6 @@ where } }; - let keep_alive = - config.idle_connection_timeout().div_f64(f64::from(TCP_KEEPALIVE_ATTEMPTS)); let workers_count = std::cmp::min(PARALLELISM.to_static(), config.backend_max_concurrent_requests()); let max_concurrent_requests_per_worker = @@ -194,6 +192,8 @@ where let server = if tls.enabled() { server.listen(P::name(), listener, move || { + let config = shared.config(); + let this = web::Data::new(Self::new(shared.clone(), max_concurrent_requests_per_worker)); @@ -206,10 +206,12 @@ where P::name(), metrics.clone(), client_connections_count.clone(), + config.keepalive_interval(), ); let h1 = actix_http::HttpService::build() - .keep_alive(keep_alive) + .client_disconnect_timeout(Duration::from_millis(1000)) // same as in HttpServer + .keep_alive(config.keepalive_interval()) .on_connect_ext(move |io: &_, ext: _| { (on_connect)(io as &dyn std::any::Any, ext) }) @@ -225,6 +227,8 @@ where }) } else { server.listen(P::name(), listener, move || { + let config = shared.config(); + let this = web::Data::new(Self::new(shared.clone(), max_concurrent_requests_per_worker)); @@ -237,10 +241,12 @@ where P::name(), metrics.clone(), client_connections_count.clone(), + config.keepalive_interval(), ); let h1 = actix_http::HttpService::build() - .keep_alive(keep_alive) + .client_disconnect_timeout(Duration::from_millis(1000)) // same as in HttpServer + .keep_alive(config.keepalive_interval()) .on_connect_ext(move |io: &_, ext: _| { (on_connect)(io as &dyn std::any::Any, ext) }) @@ -289,28 +295,26 @@ where Some(socket2::Protocol::TCP), )?; + // allow keep-alive packets + let keep_alive_timeout = config.idle_connection_timeout().add(Duration::from_millis(1000)); + let keep_alive_interval = keep_alive_timeout.div_f64(f64::from(config.keepalive_retries())); + socket.set_keepalive(true)?; + socket.set_tcp_keepalive( + &socket2::TcpKeepalive::new() + .with_time(keep_alive_interval) + .with_interval(keep_alive_interval) + .with_retries(config.keepalive_retries()), + )?; + // must use non-blocking with tokio socket.set_nonblocking(true)?; // allow time to flush buffers on close - socket.set_linger(Some(config.backend_timeout()))?; + socket.set_linger(Some(Duration::from_millis(5000)))?; // allow binding while there are still residual connections in TIME_WAIT socket.set_reuse_address(true)?; - if !config.idle_connection_timeout().is_zero() { - socket.set_tcp_keepalive( - &socket2::TcpKeepalive::new() - .with_time( - config.idle_connection_timeout().div_f64(f64::from(TCP_KEEPALIVE_ATTEMPTS)), - ) - .with_interval( - config.idle_connection_timeout().div_f64(f64::from(TCP_KEEPALIVE_ATTEMPTS)), - ) - .with_retries(TCP_KEEPALIVE_ATTEMPTS - 1), - )?; - } - socket.bind(&socket2::SockAddr::from(config.listen_address()))?; socket.listen(1024)?; @@ -1513,7 +1517,7 @@ impl ProxyHttpRequestInfo { Self { req_id: Uuid::now_v7(), - conn_id: Uuid::now_v7(), + conn_id: guard.map_or(Uuid::nil(), |guard| guard.id), remote_addr, method: req.method().clone(), path, diff --git a/crates/rproxy/src/server/proxy/ws/config.rs b/crates/rproxy/src/server/proxy/ws/config.rs index af17611..d27914f 100644 --- a/crates/rproxy/src/server/proxy/ws/config.rs +++ b/crates/rproxy/src/server/proxy/ws/config.rs @@ -1,9 +1,15 @@ +use std::{net::SocketAddr, time::Duration}; + +use tungstenite::http::Uri; + // ConfigProxyWs ------------------------------------------------------- pub(crate) trait ConfigProxyWs: Clone + Send + Unpin + 'static { - fn backend_timeout(&self) -> std::time::Duration; - fn backend_url(&self) -> tungstenite::http::Uri; - fn listen_address(&self) -> std::net::SocketAddr; + fn backend_timeout(&self) -> Duration; + fn backend_url(&self) -> Uri; + fn keepalive_interval(&self) -> Duration; + fn keepalive_retries(&self) -> u32; + fn listen_address(&self) -> SocketAddr; fn log_backend_messages(&self) -> bool; fn log_client_messages(&self) -> bool; fn log_sanitise(&self) -> bool; diff --git a/crates/rproxy/src/server/proxy/ws/proxy.rs b/crates/rproxy/src/server/proxy/ws/proxy.rs index 5528a29..9aa4688 100644 --- a/crates/rproxy/src/server/proxy/ws/proxy.rs +++ b/crates/rproxy/src/server/proxy/ws/proxy.rs @@ -1,6 +1,7 @@ use std::{ io::Write, marker::PhantomData, + mem, str::FromStr, sync::{ Arc, @@ -30,7 +31,7 @@ use scc::HashMap; use time::{UtcDateTime, format_description::well_known::Iso8601}; use tokio::{net::TcpStream, sync::broadcast}; use tokio_tungstenite::{MaybeTlsStream, WebSocketStream}; -use tracing::{debug, error, info, trace, warn}; +use tracing::{debug, error, info, warn}; use uuid::Uuid; use x509_parser::asn1_rs::ToStatic; @@ -45,7 +46,7 @@ use crate::{ ws::{ProxyWsInner, config::ConfigProxyWs}, }, }, - utils::{Loggable, raw_transaction_to_hash}, + utils::{Loggable, raw_transaction_to_hash, setup_keepalive}, }; const WS_PING_INTERVAL_SECONDS: u64 = 1; @@ -132,7 +133,7 @@ where let workers_count = PARALLELISM.to_static(); - let shared = ProxyWsSharedState::::new(config, &metrics); + let shared = ProxyWsSharedState::::new(config.clone(), &metrics); let client_connections_count = shared.client_connections_count.clone(); let worker_canceller = canceller.clone(); let worker_resetter = resetter.clone(); @@ -156,7 +157,13 @@ where .wrap(NormalizePath::new(TrailingSlash::Trim)) .default_service(web::route().to(Self::receive)) }) - .on_connect(ConnectionGuard::on_connect(P::name(), metrics, client_connections_count)) + .keep_alive(config.keepalive_interval()) + .on_connect(ConnectionGuard::on_connect( + P::name(), + metrics, + client_connections_count, + config.keepalive_interval(), + )) .shutdown_signal(canceller.cancelled_owned()) .workers(workers_count); @@ -207,11 +214,22 @@ where Some(socket2::Protocol::TCP), )?; + // allow keep-alive packets + let keep_alive_timeout = config.backend_timeout().mul_f64(2.0); + let keep_alive_interval = keep_alive_timeout.div_f64(f64::from(config.keepalive_retries())); + socket.set_keepalive(true)?; + socket.set_tcp_keepalive( + &socket2::TcpKeepalive::new() + .with_time(keep_alive_interval) + .with_interval(keep_alive_interval) + .with_retries(config.keepalive_retries()), + )?; + // must use non-blocking with tokio socket.set_nonblocking(true)?; // allow time to flush buffers on close - socket.set_linger(Some(config.backend_timeout()))?; + socket.set_linger(Some(Duration::from_millis(5000)))?; // allow binding to the socket while there are still TIME_WAIT connections socket.set_reuse_address(true)?; @@ -258,7 +276,7 @@ where info: ProxyHttpRequestInfo, ) { let bknd_uri = this.backend.new_backend_uri(&info); - trace!( + debug!( proxy = P::name(), request_id = %info.req_id(), connection_id = %info.conn_id(), @@ -312,6 +330,37 @@ where } }; + match bknd_stream.get_ref() { + tokio_tungstenite::MaybeTlsStream::Plain(tcp_stream) => { + if let Err(err) = setup_keepalive(tcp_stream, this.config().keepalive_interval()) { + warn!( + proxy = P::name(), + request_id = %info.req_id(), + connection_id = %info.conn_id(), + worker_id = %this.id, + error = ?err, + "Failed to set keepalive interval" + ); + } + } + + tokio_tungstenite::MaybeTlsStream::Rustls(tls_stream) => { + let (tcp_stream, _) = tls_stream.get_ref(); + if let Err(err) = setup_keepalive(tcp_stream, this.config().keepalive_interval()) { + warn!( + proxy = P::name(), + request_id = %info.req_id(), + connection_id = %info.conn_id(), + worker_id = %this.id, + error = ?err, + "Failed to set keepalive interval" + ); + } + } + + &_ => {} // there's nothing else + } + let (bknd_tx, bknd_rx) = bknd_stream.split(); let mut pump = ProxyWsPump { @@ -323,8 +372,8 @@ where resetter: this.resetter.clone(), clnt_tx, clnt_rx, - bknd_tx, - bknd_rx, + bknd_tx: Some(bknd_tx), + bknd_rx: Some(bknd_rx), pings: HashMap::new(), ping_balance_bknd: AtomicI64::new(0), ping_balance_clnt: AtomicI64::new(0), @@ -613,8 +662,8 @@ where clnt_tx: Session, clnt_rx: MessageStream, - bknd_tx: SplitSink>, tungstenite::Message>, - bknd_rx: SplitStream>>, + bknd_tx: Option>, tungstenite::Message>>, + bknd_rx: Option>>>, pings: HashMap, ping_balance_clnt: AtomicI64, @@ -637,13 +686,34 @@ where "Starting websocket pump..." ); - let mut heartbeat = tokio::time::interval(Duration::from_secs(WS_PING_INTERVAL_SECONDS)); + let mut heartbeat = + tokio::time::interval(Duration::from_millis(WS_PING_INTERVAL_SECONDS * 1000)); let mut pumping: Result<(), &str> = Ok(()); let mut resetter = self.resetter.subscribe(); while pumping.is_ok() && !self.canceller.is_cancelled() && !resetter.is_closed() { + #[cfg(feature = "chaos")] + if self.chaos.stream_is_blocked.load(Ordering::Relaxed) { + tokio::select! { + _ = self.canceller.cancelled() => { + break; + } + + _ = resetter.recv() => { + break; + } + + // client => backend + clnt_msg = self.clnt_rx.next() => { + pumping = self.pump_clnt_to_bknd(UtcDateTime::now(), clnt_msg).await; + } + } + + continue; + } + tokio::select! { _ = self.canceller.cancelled() => { break; @@ -664,7 +734,12 @@ where } // backend => client - bknd_msg = self.bknd_rx.next() => { + bknd_msg = async { + match &mut self.bknd_rx { + Some(bknd_rx) => bknd_rx.next().await, + None => None, + } + } => { pumping = self.pump_bknd_to_cli(UtcDateTime::now(), bknd_msg).await; } } @@ -758,8 +833,9 @@ where } let bknd_ping = ProxyWsPing::new(self.info.conn_id()); - if let Err(err) = - self.bknd_tx.send(tungstenite::Message::Ping(bknd_ping.to_bytes())).await + if let Some(bknd_tx) = &mut self.bknd_tx && + let Err(err) = + bknd_tx.send(tungstenite::Message::Ping(bknd_ping.to_bytes())).await { error!( proxy = P::name(), @@ -785,13 +861,19 @@ where if !self.chaos.stream_is_blocked.load(Ordering::Relaxed) && rand::random::() < self.shared.config().chaos_probability_stream_blocked() { - debug!( + warn!( proxy = P::name(), connection_id = %self.info.conn_id(), worker_id = %self.worker_id, "Blocking the stream (chaos)" ); self.chaos.stream_is_blocked.store(true, Ordering::Relaxed); + _ = self + .close_bknd_session(tungstenite::protocol::CloseFrame { + code: tungstenite::protocol::frame::coding::CloseCode::Normal, + reason: WS_CLOSE_REASON_NORMAL.into(), + }) + .await; } match clnt_msg { @@ -804,8 +886,9 @@ where return Ok(()); } - if let Err(err) = - self.bknd_tx.send(tungstenite::Message::Binary(bytes.clone())).await + if let Some(bknd_tx) = &mut self.bknd_tx && + let Err(err) = + bknd_tx.send(tungstenite::Message::Binary(bytes.clone())).await { error!( proxy = P::name(), @@ -840,15 +923,15 @@ where return Ok(()); } - if let Err(err) = self - .bknd_tx - .send(tungstenite::Message::Text(unsafe { - // safety: it's from client's ws message => must be valid utf-8 - tungstenite::protocol::frame::Utf8Bytes::from_bytes_unchecked( - text.clone().into_bytes(), - ) - })) - .await + if let Some(bknd_tx) = &mut self.bknd_tx && + let Err(err) = bknd_tx + .send(tungstenite::Message::Text(unsafe { + // safety: it's from client's ws message => must be valid utf-8 + tungstenite::protocol::frame::Utf8Bytes::from_bytes_unchecked( + text.clone().into_bytes(), + ) + })) + .await { error!( proxy = P::name(), @@ -878,6 +961,11 @@ where // ping msg from client actix_ws::Message::Ping(bytes) => { + #[cfg(feature = "chaos")] + if self.chaos.stream_is_blocked.load(Ordering::Relaxed) { + return Ok(()); + } + #[cfg(debug_assertions)] debug!( proxy = P::name(), @@ -886,11 +974,6 @@ where "Handling client's ping..." ); - #[cfg(feature = "chaos")] - if self.chaos.stream_is_blocked.load(Ordering::Relaxed) { - return Ok(()); - } - #[cfg(feature = "chaos")] if rand::random::() < self.shared.config().chaos_probability_client_ping_ignored() @@ -924,7 +1007,7 @@ where proxy = P::name(), connection_id = %self.info.conn_id(), worker_id = %self.worker_id, - "Received pong form client" + "Received pong from client" ); if let Some(pong) = ProxyWsPing::from_bytes(bytes) && @@ -1012,13 +1095,19 @@ where if !self.chaos.stream_is_blocked.load(Ordering::Relaxed) && rand::random::() < self.shared.config().chaos_probability_stream_blocked() { - debug!( + warn!( proxy = P::name(), connection_id = %self.info.conn_id(), worker_id = %self.worker_id, "Blocking the stream (chaos)" ); self.chaos.stream_is_blocked.store(true, Ordering::Relaxed); + _ = self + .close_bknd_session(tungstenite::protocol::CloseFrame { + code: tungstenite::protocol::frame::coding::CloseCode::Normal, + reason: WS_CLOSE_REASON_NORMAL.into(), + }) + .await; } match bknd_msg { @@ -1094,6 +1183,11 @@ where // ping tungstenite::Message::Ping(bytes) => { + #[cfg(feature = "chaos")] + if self.chaos.stream_is_blocked.load(Ordering::Relaxed) { + return Ok(()); + } + #[cfg(debug_assertions)] debug!( proxy = P::name(), @@ -1102,11 +1196,6 @@ where "Handling backend's ping..." ); - #[cfg(feature = "chaos")] - if self.chaos.stream_is_blocked.load(Ordering::Relaxed) { - return Ok(()); - } - #[cfg(feature = "chaos")] if rand::random::() < self.shared.config().chaos_probability_backend_ping_ignored() @@ -1120,7 +1209,8 @@ where return Ok(()); } - if let Err(err) = self.bknd_tx.send(tungstenite::Message::Pong(bytes)).await + if let Some(bknd_tx) = &mut self.bknd_tx && + let Err(err) = bknd_tx.send(tungstenite::Message::Pong(bytes)).await { error!( proxy = P::name(), @@ -1141,7 +1231,7 @@ where proxy = P::name(), connection_id = %self.info.conn_id(), worker_id = %self.worker_id, - "Received pong form backend" + "Received pong from backend" ); if let Some(pong) = ProxyWsPing::from_bytes(bytes) && @@ -1238,34 +1328,50 @@ where &mut self, frame: tungstenite::protocol::CloseFrame, ) -> Result<(), &'static str> { - debug!( - proxy = P::name(), - connection_id = %self.info.conn_id(), - worker_id = %self.worker_id, - msg = %frame.reason, - "Closing backend websocket session..." - ); + if let Some(mut bknd_tx) = mem::take(&mut self.bknd_tx) { + debug!( + proxy = P::name(), + connection_id = %self.info.conn_id(), + worker_id = %self.worker_id, + msg = %frame.reason, + "Closing backend websocket session..." + ); - if let Err(err) = self - .bknd_tx - .send(tungstenite::Message::Close(Some( - frame.clone(), // it's cheap to clone - ))) - .await - { - if let tungstenite::error::Error::Protocol(protocol_err) = err { - if protocol_err == tungstenite::error::ProtocolError::SendAfterClosing { + if let Err(err) = bknd_tx + .send(tungstenite::Message::Close(Some( + frame.clone(), // it's cheap to clone + ))) + .await + { + if let tungstenite::error::Error::AlreadyClosed = err { return Ok(()); } - error!( - proxy = P::name(), - connection_id = %self.info.conn_id(), - worker_id = %self.worker_id, - msg = %frame.reason, - error = ?protocol_err, - "Failed to close backend websocket session" - ); - } else { + if let tungstenite::error::Error::Protocol(protocol_err) = err { + if protocol_err == tungstenite::error::ProtocolError::SendAfterClosing { + return Ok(()); + } + error!( + proxy = P::name(), + connection_id = %self.info.conn_id(), + worker_id = %self.worker_id, + msg = %frame.reason, + error = ?protocol_err, + "Failed to close backend websocket session" + ); + } else { + error!( + proxy = P::name(), + connection_id = %self.info.conn_id(), + worker_id = %self.worker_id, + msg = %frame.reason, + error = ?err, + "Failed to close backend websocket session" + ); + } + return Err(WS_BKND_ERROR); + } + + if let Err(err) = bknd_tx.close().await { error!( proxy = P::name(), connection_id = %self.info.conn_id(), @@ -1275,13 +1381,30 @@ where "Failed to close backend websocket session" ); } - return Err(WS_BKND_ERROR); } + drop(mem::take(&mut self.bknd_rx)); + Ok(()) } } +#[cfg(debug_assertions)] +impl Drop for ProxyWsPump +where + C: ConfigProxyWs, + P: ProxyWsInner, +{ + fn drop(&mut self) { + debug!( + proxy = P::name(), + connection_id = %self.info.conn_id(), + worker_id = %self.worker_id, + "Dropping websocket pump" + ); + } +} + // ProxyWsPostprocessor ------------------------------------------------ struct ProxyWsPostprocessor @@ -1393,7 +1516,7 @@ impl ProxyWsPing { let id = Uuid::from_u128(bytes.get_u128()); let conn_id = Uuid::from_u128(bytes.get_u128()); let Ok(timestamp) = UtcDateTime::from_unix_timestamp_nanos(bytes.get_i128()) else { - return None + return None; }; Some(Self { id, conn_id, timestamp }) diff --git a/crates/rproxy/src/utils/utils_net.rs b/crates/rproxy/src/utils/utils_net.rs index 948bd55..2a348b8 100644 --- a/crates/rproxy/src/utils/utils_net.rs +++ b/crates/rproxy/src/utils/utils_net.rs @@ -1,7 +1,9 @@ -use std::net::IpAddr; +use std::{net::IpAddr, os::fd::AsFd, time::Duration}; use pnet::datalink::{self}; +// get_all_local_ip_addresses ------------------------------------------ + pub(crate) fn get_all_local_ip_addresses() -> Vec { let mut ips = Vec::new(); @@ -13,3 +15,61 @@ pub(crate) fn get_all_local_ip_addresses() -> Vec { ips } + +pub(crate) fn setup_keepalive( + stream: &tokio::net::TcpStream, + interval: Duration, +) -> std::io::Result<()> { + let interval_sec = interval.as_secs_f64().ceil() as i32; + + if interval_sec == 0 { + return Ok(()); + } + + #[cfg(target_os = "linux")] + unsafe { + use std::os::fd::AsRawFd; + + if libc::setsockopt( + stream.as_fd().as_raw_fd(), + libc::IPPROTO_TCP, + libc::TCP_KEEPIDLE, + &interval_sec as *const _ as *const libc::c_void, + size_of_val(&interval_sec) as libc::socklen_t, + ) != 0 + { + return std::io::Result::Err(std::io::Error::last_os_error()); + } + + if libc::setsockopt( + stream.as_fd().as_raw_fd(), + libc::IPPROTO_TCP, + libc::TCP_KEEPINTVL, + &interval_sec as *const _ as *const libc::c_void, + size_of_val(&interval_sec) as libc::socklen_t, + ) != 0 + { + return std::io::Result::Err(std::io::Error::last_os_error()); + } + + Ok(()) + } + + #[cfg(target_os = "macos")] + unsafe { + use std::os::fd::AsRawFd; + + if libc::setsockopt( + stream.as_fd().as_raw_fd(), + libc::IPPROTO_TCP, + libc::TCP_KEEPALIVE, + &interval_sec as *const _ as *const _, + std::mem::size_of_val(&interval_sec) as libc::socklen_t, + ) != 0 + { + return std::io::Result::Err(std::io::Error::last_os_error()); + } + + Ok(()) + } +} diff --git a/readme.md b/readme.md index 7b3d229..4a04e10 100644 --- a/readme.md +++ b/readme.md @@ -84,6 +84,12 @@ authrpc: [env: RPROXY_AUTHRPC_IDLE_CONNECTION_TIMEOUT=] [default: 30s] + --authrpc-keepalive-interval + interval between tcp keepalive packets on authrpc connections + + [env: RPROXY_AUTHRPC_KEEPALIVE_INTERVAL=] + [default: 5s] + --authrpc-listen-address host:port for authrpc proxy @@ -191,6 +197,12 @@ circuit-breaker: [default: ] flashblocks: + --flashblocks-backend-timeout + timeout to establish backend connections of to receive pong websocket response + + [env: RPROXY_FLASHBLOCKS_BACKEND_TIMEOUT=] + [default: 30s] + --flashblocks-backend url of flashblocks backend @@ -202,11 +214,11 @@ flashblocks: [env: RPROXY_FLASHBLOCKS_ENABLED=] - --flashblocks-backend-timeout - timeout to establish backend connections of to receive pong websocket response + --flashblocks-keepalive-interval + interval between tcp keepalive packets on flashblocks connections - [env: RPROXY_FLASHBLOCKS_BACKEND_TIMEOUT=] - [default: 30s] + [env: RPROXY_FLASHBLOCKS_KEEPALIVE_INTERVAL=] + [default: 5s] --flashblocks-listen-address host:port for flashblocks proxy @@ -280,6 +292,12 @@ rpc: [env: RPROXY_RPC_IDLE_CONNECTION_TIMEOUT=] [default: 30s] + --rpc-keepalive-interval + interval between tcp keepalive packets on rpc connections + + [env: RPROXY_RPC_KEEPALIVE_INTERVAL=] + [default: 5s] + --rpc-listen-address host:port for rpc proxy