From 4aea5d4cde944eac4b8f75ba7779f81115c6959a Mon Sep 17 00:00:00 2001 From: Anton Bronnikov Date: Mon, 1 Dec 2025 14:07:44 +0100 Subject: [PATCH 01/13] fix: ensure tcp keep-alive packets are sent --- Cargo.lock | 56 +++++++++++++ crates/rproxy/Cargo.toml | 2 + .../src/server/proxy/connection_guard.rs | 79 +++++++++++++++++++ crates/rproxy/src/server/proxy/http/proxy.rs | 41 +++++----- crates/rproxy/src/server/proxy/mod.rs | 2 +- crates/rproxy/src/server/proxy/ws/proxy.rs | 42 +++++++++- 6 files changed, 199 insertions(+), 23 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4f26c6c..6d3ffb2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1686,6 +1686,18 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "enum-as-inner" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1e6a265c649f3f5979b601d26f1d05ada116434c87741c9493cb56218f76cbc" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn 2.0.110", +] + [[package]] name = "enum-ordinalize" version = "4.3.2" @@ -3349,6 +3361,7 @@ dependencies = [ "hex", "http 1.3.1", "humantime", + "libc", "moka", "op-alloy-consensus", "parking_lot", @@ -3363,6 +3376,7 @@ dependencies = [ "serde", "serde_json", "socket2 0.6.1", + "sysctl", "thiserror", "time", "tokio", @@ -3539,6 +3553,15 @@ version = "5.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4dfac11c0cd0606aaf7eb9ef66f82c119438a96dc487715abb8b57fdf08ad4fe" +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + [[package]] name = "scc" version = "3.4.2" @@ -3928,6 +3951,20 @@ dependencies = [ "syn 2.0.110", ] +[[package]] +name = "sysctl" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cca424247104946a59dacd27eaad296223b7feec3d168a6dd04585183091eb0b" +dependencies = [ + "bitflags", + "byteorder", + "enum-as-inner", + "libc", + "thiserror", + "walkdir", +] + [[package]] name = "tagptr" version = "0.2.0" @@ -4463,6 +4500,16 @@ dependencies = [ "libc", ] +[[package]] +name = "walkdir" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b" +dependencies = [ + "same-file", + "winapi-util", +] + [[package]] name = "want" version = "0.3.1" @@ -4548,6 +4595,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" +[[package]] +name = "winapi-util" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" +dependencies = [ + "windows-sys 0.61.2", +] + [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" diff --git a/crates/rproxy/Cargo.toml b/crates/rproxy/Cargo.toml index 8a6a4dc..db0ead9 100644 --- a/crates/rproxy/Cargo.toml +++ b/crates/rproxy/Cargo.toml @@ -40,6 +40,7 @@ futures-core = "0.3.31" hex = "0.4.3" http = "1.3.1" humantime = "2.2.0" +libc = "0.2.177" moka = { version = "0.12.11", features = ["sync"] } op-alloy-consensus = "0.20.0" parking_lot = "0.12.4" @@ -54,6 +55,7 @@ scc = "3.0.2" serde = { version = "1.0.219", features = ["derive"] } serde_json = "1.0.143" socket2 = "0.6.0" +sysctl = "0.7.1" thiserror = "2.0.16" time = { version = "0.3.43", features = ["formatting"] } tokio = { version = "1.47.1", features = ["macros", "rt", "rt-multi-thread", "signal"] } diff --git a/crates/rproxy/src/server/proxy/connection_guard.rs b/crates/rproxy/src/server/proxy/connection_guard.rs index d1c04ef..045b1e8 100644 --- a/crates/rproxy/src/server/proxy/connection_guard.rs +++ b/crates/rproxy/src/server/proxy/connection_guard.rs @@ -1,17 +1,51 @@ use std::{ any::Any, + os::fd::{AsFd, AsRawFd}, sync::{ Arc, + LazyLock, atomic::{AtomicI64, Ordering}, }, + time::Duration, }; use actix_web::dev::Extensions; +use sysctl::Sysctl; use tracing::{debug, warn}; use uuid::Uuid; use crate::server::metrics::{LabelsProxy, Metrics}; +pub(crate) static TCP_KEEPALIVE_ATTEMPTS: LazyLock = LazyLock::new(|| { + #[cfg(target_os = "linux")] + { + let mut res: libc::c_int = 9; + if let Ok(ctl) = sysctl::Ctl::new("net.ipv4.tcp_keepalive_probes") && + let Ok(value) = ctl.value() && + let Ok(value) = value.into_int() + { + res = value + } + return res; + } + + #[cfg(target_os = "macos")] + { + let mut res: libc::c_int = 8; + if let Ok(ctl) = sysctl::Ctl::new("net.ipv4.tcp_keepalive_probes") && + let Ok(value) = ctl.value() && + let Ok(value) = value.into_int() + { + res = value + } + + return res; + } + + #[allow(unreachable_code)] + 8 +}); + // ProxyConnectionGuard ------------------------------------------------ pub(crate) struct ConnectionGuard { @@ -47,7 +81,12 @@ impl ConnectionGuard { proxy: &'static str, metrics: Arc, client_connections_count: Arc, + keep_alive_timeout: Duration, ) -> impl Fn(&dyn Any, &mut Extensions) { + let keep_alive_attempts: libc::c_int = *TCP_KEEPALIVE_ATTEMPTS; + let keep_alive_interval: libc::c_int = + keep_alive_timeout.div_f64(f64::from(keep_alive_attempts)).as_secs_f64().ceil() as i32; + move |connection, extensions| { { let val = client_connections_count.fetch_add(1, Ordering::Relaxed) + 1; @@ -70,6 +109,46 @@ impl ConnectionGuard { None }; + if let Some(stream) = stream { + #[cfg(target_os = "linux")] + { + libc::setsockopt( + stream.as_fd().as_raw_fd(), + libc::IPPROTO_TCP, + libc::TCP_KEEPIDLE, + &keep_alive_interval as *const _ as *const libc::c_void, + size_of_val(&keep_alive_interval) as libc::socklen_t, + ); + libc::setsockopt( + stream.as_fd().as_raw_fd(), + libc::IPPROTO_TCP, + libc::TCP_KEEPINTVL, + &keep_alive_interval as *const _ as *const libc::c_void, + size_of_val(&keep_alive_interval) as libc::socklen_t, + ); + libc::setsockopt( + stream.as_fd().as_raw_fd(), + libc::IPPROTO_TCP, + libc::TCP_KEEPCNT, + &keep_alive_attempts as *const _ as *const libc::c_void, + size_of_val(&keep_alive_attempts) as libc::socklen_t, + ); + } + + #[cfg(target_os = "macos")] + { + unsafe { + libc::setsockopt( + stream.as_fd().as_raw_fd(), + libc::IPPROTO_TCP, + libc::TCP_KEEPALIVE, + &keep_alive_interval as *const _ as *const _, + std::mem::size_of_val(&keep_alive_interval) as libc::socklen_t, + ); + } + } + } + if let Some(stream) = stream { let id = Uuid::now_v7(); diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs index b71354c..dcdad56 100644 --- a/crates/rproxy/src/server/proxy/http/proxy.rs +++ b/crates/rproxy/src/server/proxy/http/proxy.rs @@ -3,6 +3,7 @@ use std::{ fmt::Debug, marker::PhantomData, mem, + ops::Add, pin::Pin, str::FromStr, sync::{ @@ -10,6 +11,7 @@ use std::{ atomic::{AtomicI64, AtomicUsize, Ordering}, }, task::{Context, Poll}, + time::Duration, }; use actix::{Actor, AsyncContext, WrapFuture}; @@ -53,6 +55,7 @@ use crate::{ metrics::{LabelsProxy, LabelsProxyClientInfo, LabelsProxyHttpJrpc, Metrics}, proxy::{ ConnectionGuard, + TCP_KEEPALIVE_ATTEMPTS, config::ConfigTls, http::{ ProxyHttpInner, @@ -63,8 +66,6 @@ use crate::{ utils::{Loggable, decompress, is_hop_by_hop_header, raw_transaction_to_hash}, }; -const TCP_KEEPALIVE_ATTEMPTS: u32 = 8; - // ProxyHttp ----------------------------------------------------------- pub(crate) struct ProxyHttp @@ -160,8 +161,8 @@ where } }; - let keep_alive = - config.idle_connection_timeout().div_f64(f64::from(TCP_KEEPALIVE_ATTEMPTS)); + let keep_alive_timeout = config.idle_connection_timeout().add(Duration::from_millis(1000)); + let keep_alive_interval = keep_alive_timeout.div_f64(f64::from(*TCP_KEEPALIVE_ATTEMPTS)); let workers_count = std::cmp::min(PARALLELISM.to_static(), config.backend_max_concurrent_requests()); let max_concurrent_requests_per_worker = @@ -206,10 +207,12 @@ where P::name(), metrics.clone(), client_connections_count.clone(), + keep_alive_timeout, ); let h1 = actix_http::HttpService::build() - .keep_alive(keep_alive) + .client_disconnect_timeout(Duration::from_millis(1000)) // same as in HttpServer + .keep_alive(keep_alive_interval) .on_connect_ext(move |io: &_, ext: _| { (on_connect)(io as &dyn std::any::Any, ext) }) @@ -237,10 +240,12 @@ where P::name(), metrics.clone(), client_connections_count.clone(), + keep_alive_timeout, ); let h1 = actix_http::HttpService::build() - .keep_alive(keep_alive) + .client_disconnect_timeout(Duration::from_millis(1000)) // same as in HttpServer + .keep_alive(keep_alive_interval) .on_connect_ext(move |io: &_, ext: _| { (on_connect)(io as &dyn std::any::Any, ext) }) @@ -289,6 +294,17 @@ where Some(socket2::Protocol::TCP), )?; + // allow keep-alive packets + let keep_alive_timeout = config.idle_connection_timeout().add(Duration::from_millis(1000)); + let keep_alive_interval = keep_alive_timeout.div_f64(f64::from(*TCP_KEEPALIVE_ATTEMPTS)); + socket.set_keepalive(true)?; + socket.set_tcp_keepalive( + &socket2::TcpKeepalive::new() + .with_time(keep_alive_interval) + .with_interval(keep_alive_interval) + .with_retries(*TCP_KEEPALIVE_ATTEMPTS as u32), + )?; + // must use non-blocking with tokio socket.set_nonblocking(true)?; @@ -298,19 +314,6 @@ where // allow binding while there are still residual connections in TIME_WAIT socket.set_reuse_address(true)?; - if !config.idle_connection_timeout().is_zero() { - socket.set_tcp_keepalive( - &socket2::TcpKeepalive::new() - .with_time( - config.idle_connection_timeout().div_f64(f64::from(TCP_KEEPALIVE_ATTEMPTS)), - ) - .with_interval( - config.idle_connection_timeout().div_f64(f64::from(TCP_KEEPALIVE_ATTEMPTS)), - ) - .with_retries(TCP_KEEPALIVE_ATTEMPTS - 1), - )?; - } - socket.bind(&socket2::SockAddr::from(config.listen_address()))?; socket.listen(1024)?; diff --git a/crates/rproxy/src/server/proxy/mod.rs b/crates/rproxy/src/server/proxy/mod.rs index 0683f60..2a47506 100644 --- a/crates/rproxy/src/server/proxy/mod.rs +++ b/crates/rproxy/src/server/proxy/mod.rs @@ -4,4 +4,4 @@ pub(crate) mod http; pub(crate) mod ws; pub(crate) mod connection_guard; -use connection_guard::ConnectionGuard; +use connection_guard::{ConnectionGuard, TCP_KEEPALIVE_ATTEMPTS}; diff --git a/crates/rproxy/src/server/proxy/ws/proxy.rs b/crates/rproxy/src/server/proxy/ws/proxy.rs index 5528a29..9d85e80 100644 --- a/crates/rproxy/src/server/proxy/ws/proxy.rs +++ b/crates/rproxy/src/server/proxy/ws/proxy.rs @@ -40,6 +40,7 @@ use crate::{ metrics::{LabelsProxyWs, Metrics}, proxy::{ ConnectionGuard, + TCP_KEEPALIVE_ATTEMPTS, config::ConfigTls, http::ProxyHttpRequestInfo, ws::{ProxyWsInner, config::ConfigProxyWs}, @@ -130,6 +131,8 @@ where } }; + let keep_alive_timeout = config.backend_timeout().mul_f64(2.0); + let keep_alive_interval = keep_alive_timeout.div_f64(f64::from(*TCP_KEEPALIVE_ATTEMPTS)); let workers_count = PARALLELISM.to_static(); let shared = ProxyWsSharedState::::new(config, &metrics); @@ -156,7 +159,13 @@ where .wrap(NormalizePath::new(TrailingSlash::Trim)) .default_service(web::route().to(Self::receive)) }) - .on_connect(ConnectionGuard::on_connect(P::name(), metrics, client_connections_count)) + .keep_alive(keep_alive_interval) + .on_connect(ConnectionGuard::on_connect( + P::name(), + metrics, + client_connections_count, + keep_alive_timeout, + )) .shutdown_signal(canceller.cancelled_owned()) .workers(workers_count); @@ -207,6 +216,17 @@ where Some(socket2::Protocol::TCP), )?; + // allow keep-alive packets + let keep_alive_timeout = config.backend_timeout().mul_f64(2.0); + let keep_alive_interval = keep_alive_timeout.div_f64(f64::from(*TCP_KEEPALIVE_ATTEMPTS)); + socket.set_keepalive(true)?; + socket.set_tcp_keepalive( + &socket2::TcpKeepalive::new() + .with_time(keep_alive_interval) + .with_interval(keep_alive_interval) + .with_retries(*TCP_KEEPALIVE_ATTEMPTS as u32), + )?; + // must use non-blocking with tokio socket.set_nonblocking(true)?; @@ -924,7 +944,7 @@ where proxy = P::name(), connection_id = %self.info.conn_id(), worker_id = %self.worker_id, - "Received pong form client" + "Received pong from client" ); if let Some(pong) = ProxyWsPing::from_bytes(bytes) && @@ -1141,7 +1161,7 @@ where proxy = P::name(), connection_id = %self.info.conn_id(), worker_id = %self.worker_id, - "Received pong form backend" + "Received pong from backend" ); if let Some(pong) = ProxyWsPing::from_bytes(bytes) && @@ -1282,6 +1302,22 @@ where } } +#[cfg(debug_assertions)] +impl Drop for ProxyWsPump +where + C: ConfigProxyWs, + P: ProxyWsInner, +{ + fn drop(&mut self) { + debug!( + proxy = P::name(), + connection_id = %self.info.conn_id(), + worker_id = %self.worker_id, + "Dropping websocket pump" + ); + } +} + // ProxyWsPostprocessor ------------------------------------------------ struct ProxyWsPostprocessor From cee1087e57f01d960e75be3e676cab76a48686f8 Mon Sep 17 00:00:00 2001 From: Anton Bronnikov Date: Mon, 1 Dec 2025 14:08:10 +0100 Subject: [PATCH 02/13] chore: bump version --- Cargo.lock | 2 +- crates/rproxy/Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6d3ffb2..26853e2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3336,7 +3336,7 @@ dependencies = [ [[package]] name = "rproxy" -version = "0.0.9" +version = "0.0.10" dependencies = [ "actix", "actix-http", diff --git a/crates/rproxy/Cargo.toml b/crates/rproxy/Cargo.toml index db0ead9..56a220f 100644 --- a/crates/rproxy/Cargo.toml +++ b/crates/rproxy/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rproxy" -version = "0.0.9" +version = "0.0.10" edition = "2024" default-run = "rproxy" From 6a054c269a9a1fc3fd1d0e79d5a3e19581ebeece Mon Sep 17 00:00:00 2001 From: Anton Bronnikov Date: Mon, 1 Dec 2025 15:10:43 +0100 Subject: [PATCH 03/13] fix: wrap `libc::setsockopt` in unsafe block --- .../src/server/proxy/connection_guard.rs | 20 +++++++++---------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/crates/rproxy/src/server/proxy/connection_guard.rs b/crates/rproxy/src/server/proxy/connection_guard.rs index 045b1e8..0343be5 100644 --- a/crates/rproxy/src/server/proxy/connection_guard.rs +++ b/crates/rproxy/src/server/proxy/connection_guard.rs @@ -111,7 +111,7 @@ impl ConnectionGuard { if let Some(stream) = stream { #[cfg(target_os = "linux")] - { + unsafe { libc::setsockopt( stream.as_fd().as_raw_fd(), libc::IPPROTO_TCP, @@ -136,16 +136,14 @@ impl ConnectionGuard { } #[cfg(target_os = "macos")] - { - unsafe { - libc::setsockopt( - stream.as_fd().as_raw_fd(), - libc::IPPROTO_TCP, - libc::TCP_KEEPALIVE, - &keep_alive_interval as *const _ as *const _, - std::mem::size_of_val(&keep_alive_interval) as libc::socklen_t, - ); - } + unsafe { + libc::setsockopt( + stream.as_fd().as_raw_fd(), + libc::IPPROTO_TCP, + libc::TCP_KEEPALIVE, + &keep_alive_interval as *const _ as *const _, + std::mem::size_of_val(&keep_alive_interval) as libc::socklen_t, + ); } } From 317b3cd4207c345df0a971478bc4ed6b78f1e9cc Mon Sep 17 00:00:00 2001 From: Anton Bronnikov Date: Mon, 1 Dec 2025 19:33:59 +0100 Subject: [PATCH 04/13] fix: don't leak backend connections --- crates/rproxy/src/server/proxy/ws/proxy.rs | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/crates/rproxy/src/server/proxy/ws/proxy.rs b/crates/rproxy/src/server/proxy/ws/proxy.rs index 9d85e80..a57ccbc 100644 --- a/crates/rproxy/src/server/proxy/ws/proxy.rs +++ b/crates/rproxy/src/server/proxy/ws/proxy.rs @@ -1273,6 +1273,9 @@ where ))) .await { + if let tungstenite::error::Error::AlreadyClosed = err { + return Ok(()); + } if let tungstenite::error::Error::Protocol(protocol_err) = err { if protocol_err == tungstenite::error::ProtocolError::SendAfterClosing { return Ok(()); @@ -1298,6 +1301,17 @@ where return Err(WS_BKND_ERROR); } + if let Err(err) = self.bknd_tx.close().await { + error!( + proxy = P::name(), + connection_id = %self.info.conn_id(), + worker_id = %self.worker_id, + msg = %frame.reason, + error = ?err, + "Failed to close backend websocket session" + ); + } + Ok(()) } } From 961a4d51ec243238a95e40191eb5b0ccbddedd83 Mon Sep 17 00:00:00 2001 From: Anton Bronnikov Date: Wed, 3 Dec 2025 13:52:52 +0100 Subject: [PATCH 05/13] fix: handle libc errors --- Cargo.lock | 66 +++++++++++++++++++ crates/rproxy/Cargo.toml | 2 +- .../rproxy/src/server/proxy/config/authrpc.rs | 17 +++++ .../src/server/proxy/config/flashblocks.rs | 32 ++++++--- crates/rproxy/src/server/proxy/config/rpc.rs | 17 +++++ .../src/server/proxy/connection_guard.rs | 57 ++++------------ crates/rproxy/src/server/proxy/http/config.rs | 1 + crates/rproxy/src/server/proxy/http/proxy.rs | 14 ++-- crates/rproxy/src/server/proxy/ws/config.rs | 11 +++- crates/rproxy/src/server/proxy/ws/proxy.rs | 41 ++++++++++-- crates/rproxy/src/utils/utils_net.rs | 58 +++++++++++++++- 11 files changed, 248 insertions(+), 68 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 26853e2..cbea518 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1378,6 +1378,16 @@ dependencies = [ "version_check", ] +[[package]] +name = "core-foundation" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2a6cd9ae233e7f62ba4e9353e81a88df7fc8a5987b8d445b4d90c879bd156f6" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "core-foundation-sys" version = "0.8.7" @@ -2764,6 +2774,12 @@ dependencies = [ "thiserror", ] +[[package]] +name = "openssl-probe" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" + [[package]] name = "parity-scale-codec" version = "3.7.5" @@ -3493,6 +3509,18 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustls-native-certs" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9980d917ebb0c0536119ba501e90834767bffc3d60641457fd84a1f3fd337923" +dependencies = [ + "openssl-probe", + "rustls-pki-types", + "schannel", + "security-framework", +] + [[package]] name = "rustls-pemfile" version = "2.2.0" @@ -3572,6 +3600,15 @@ dependencies = [ "sdd", ] +[[package]] +name = "schannel" +version = "0.1.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "891d81b926048e76efe18581bf793546b4c0eaf8448d72be8de2bbee5fd166e1" +dependencies = [ + "windows-sys 0.61.2", +] + [[package]] name = "schemars" version = "0.9.0" @@ -3642,6 +3679,29 @@ dependencies = [ "cc", ] +[[package]] +name = "security-framework" +version = "3.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3297343eaf830f66ede390ea39da1d462b6b0c1b000f420d0a83f898bbbe6ef" +dependencies = [ + "bitflags", + "core-foundation", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework-sys" +version = "2.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc1f0cbffaac4852523ce30d8bd3c5cdc873501d96ff467ca09b6767bb8cd5c0" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "semver" version = "0.11.0" @@ -4146,7 +4206,11 @@ checksum = "489a59b6730eda1b0171fcfda8b121f4bee2b35cba8645ca35c5f7ba3eb736c1" dependencies = [ "futures-util", "log", + "rustls", + "rustls-native-certs", + "rustls-pki-types", "tokio", + "tokio-rustls", "tungstenite", ] @@ -4361,6 +4425,8 @@ dependencies = [ "httparse", "log", "rand 0.9.2", + "rustls", + "rustls-pki-types", "sha1", "thiserror", "utf-8", diff --git a/crates/rproxy/Cargo.toml b/crates/rproxy/Cargo.toml index 56a220f..263879b 100644 --- a/crates/rproxy/Cargo.toml +++ b/crates/rproxy/Cargo.toml @@ -59,7 +59,7 @@ sysctl = "0.7.1" thiserror = "2.0.16" time = { version = "0.3.43", features = ["formatting"] } tokio = { version = "1.47.1", features = ["macros", "rt", "rt-multi-thread", "signal"] } -tokio-tungstenite = "0.27.0" +tokio-tungstenite = {version = "0.27.0", features = ["rustls-tls-native-roots"] } tokio-util = "0.7.16" tracing = { version = "0.1.41", features = ["valuable"] } tracing-subscriber = { version = "0.3.20", features = ["env-filter", "json", "valuable"] } diff --git a/crates/rproxy/src/server/proxy/config/authrpc.rs b/crates/rproxy/src/server/proxy/config/authrpc.rs index 054bd28..0a10a07 100644 --- a/crates/rproxy/src/server/proxy/config/authrpc.rs +++ b/crates/rproxy/src/server/proxy/config/authrpc.rs @@ -84,6 +84,18 @@ pub(crate) struct ConfigAuthrpc { )] pub(crate) idle_connection_timeout: Duration, + /// interval between tcp keepalive packets on authrpc connections + #[arg( + default_value = "5s", + env = "RPROXY_AUTHRPC_KEEPALIVE_INTERVAL", + help_heading = "authrpc", + long("authrpc-keepalive-interval"), + name("authrpc_keepalive_interval"), + value_name = "duration", + value_parser = humantime::parse_duration + )] + pub(crate) keepalive_interval: Duration, + /// host:port for authrpc proxy #[arg( default_value = "0.0.0.0:8651", @@ -353,6 +365,11 @@ impl ConfigProxyHttp for ConfigAuthrpc { self.idle_connection_timeout } + #[inline] + fn keepalive_interval(&self) -> Duration { + self.keepalive_interval + } + #[inline] fn listen_address(&self) -> SocketAddr { self.listen_address.parse::().expect(ALREADY_VALIDATED) diff --git a/crates/rproxy/src/server/proxy/config/flashblocks.rs b/crates/rproxy/src/server/proxy/config/flashblocks.rs index 54fd650..093a807 100644 --- a/crates/rproxy/src/server/proxy/config/flashblocks.rs +++ b/crates/rproxy/src/server/proxy/config/flashblocks.rs @@ -10,6 +10,19 @@ use crate::{config::ALREADY_VALIDATED, server::proxy::ws::config::ConfigProxyWs} #[derive(Args, Clone, Debug)] pub(crate) struct ConfigFlashblocks { + /// timeout to establish backend connections of to receive pong + /// websocket response + #[arg( + default_value = "30s", + env = "RPROXY_FLASHBLOCKS_BACKEND_TIMEOUT", + help_heading = "flashblocks", + long("flashblocks-backend-timeout"), + name("flashblocks_backend_timeout"), + value_name = "duration", + value_parser = humantime::parse_duration + )] + pub(crate) backend_timeout: Duration, + /// url of flashblocks backend #[arg( default_value = "ws://127.0.0.1:11111", @@ -29,19 +42,17 @@ pub(crate) struct ConfigFlashblocks { name("flashblocks_enabled") )] pub(crate) enabled: bool, - - /// timeout to establish backend connections of to receive pong - /// websocket response + /// interval between tcp keepalive packets on flashblocks connections #[arg( - default_value = "30s", - env = "RPROXY_FLASHBLOCKS_BACKEND_TIMEOUT", + default_value = "5s", + env = "RPROXY_FLASHBLOCKS_KEEPALIVE_INTERVAL", help_heading = "flashblocks", - long("flashblocks-backend-timeout"), - name("flashblocks_backend_timeout"), + long("flashblocks-keepalive-interval"), + name("flashblocks_keepalive_interval"), value_name = "duration", value_parser = humantime::parse_duration )] - pub(crate) backend_timeout: Duration, + pub(crate) keepalive_interval: Duration, /// host:port for flashblocks proxy #[arg( @@ -212,6 +223,11 @@ impl ConfigProxyWs for ConfigFlashblocks { self.backend_url.parse::().expect(ALREADY_VALIDATED) } + #[inline] + fn keep_alive_interval(&self) -> Duration { + self.keepalive_interval + } + #[inline] fn listen_address(&self) -> SocketAddr { self.listen_address.parse::().expect(ALREADY_VALIDATED) diff --git a/crates/rproxy/src/server/proxy/config/rpc.rs b/crates/rproxy/src/server/proxy/config/rpc.rs index 2a9c2ad..e6cd20b 100644 --- a/crates/rproxy/src/server/proxy/config/rpc.rs +++ b/crates/rproxy/src/server/proxy/config/rpc.rs @@ -73,6 +73,18 @@ pub(crate) struct ConfigRpc { )] pub(crate) idle_connection_timeout: Duration, + /// interval between tcp keepalive packets on rpc connections + #[arg( + default_value = "5s", + env = "RPROXY_RPC_KEEPALIVE_INTERVAL", + help_heading = "rpc", + long("rpc-keepalive-interval"), + name("rpc_keepalive_interval"), + value_name = "duration", + value_parser = humantime::parse_duration + )] + pub(crate) keepalive_interval: Duration, + /// host:port for rpc proxy #[arg( default_value = "0.0.0.0:8645", @@ -349,6 +361,11 @@ impl ConfigProxyHttp for ConfigRpc { self.idle_connection_timeout } + #[inline] + fn keepalive_interval(&self) -> Duration { + self.keepalive_interval + } + #[inline] fn listen_address(&self) -> SocketAddr { self.listen_address.parse::().expect(ALREADY_VALIDATED) diff --git a/crates/rproxy/src/server/proxy/connection_guard.rs b/crates/rproxy/src/server/proxy/connection_guard.rs index 0343be5..91eac4b 100644 --- a/crates/rproxy/src/server/proxy/connection_guard.rs +++ b/crates/rproxy/src/server/proxy/connection_guard.rs @@ -1,6 +1,5 @@ use std::{ any::Any, - os::fd::{AsFd, AsRawFd}, sync::{ Arc, LazyLock, @@ -14,7 +13,10 @@ use sysctl::Sysctl; use tracing::{debug, warn}; use uuid::Uuid; -use crate::server::metrics::{LabelsProxy, Metrics}; +use crate::{ + server::metrics::{LabelsProxy, Metrics}, + utils::setup_keepalive, +}; pub(crate) static TCP_KEEPALIVE_ATTEMPTS: LazyLock = LazyLock::new(|| { #[cfg(target_os = "linux")] @@ -81,12 +83,8 @@ impl ConnectionGuard { proxy: &'static str, metrics: Arc, client_connections_count: Arc, - keep_alive_timeout: Duration, + keep_alive_interval: Duration, ) -> impl Fn(&dyn Any, &mut Extensions) { - let keep_alive_attempts: libc::c_int = *TCP_KEEPALIVE_ATTEMPTS; - let keep_alive_interval: libc::c_int = - keep_alive_timeout.div_f64(f64::from(keep_alive_attempts)).as_secs_f64().ceil() as i32; - move |connection, extensions| { { let val = client_connections_count.fetch_add(1, Ordering::Relaxed) + 1; @@ -103,48 +101,21 @@ impl ConnectionGuard { Some(stream) } else { warn!( + proxy = proxy, connection_type = std::any::type_name_of_val(connection), "Unexpected connection type", ); None }; - if let Some(stream) = stream { - #[cfg(target_os = "linux")] - unsafe { - libc::setsockopt( - stream.as_fd().as_raw_fd(), - libc::IPPROTO_TCP, - libc::TCP_KEEPIDLE, - &keep_alive_interval as *const _ as *const libc::c_void, - size_of_val(&keep_alive_interval) as libc::socklen_t, - ); - libc::setsockopt( - stream.as_fd().as_raw_fd(), - libc::IPPROTO_TCP, - libc::TCP_KEEPINTVL, - &keep_alive_interval as *const _ as *const libc::c_void, - size_of_val(&keep_alive_interval) as libc::socklen_t, - ); - libc::setsockopt( - stream.as_fd().as_raw_fd(), - libc::IPPROTO_TCP, - libc::TCP_KEEPCNT, - &keep_alive_attempts as *const _ as *const libc::c_void, - size_of_val(&keep_alive_attempts) as libc::socklen_t, - ); - } - - #[cfg(target_os = "macos")] - unsafe { - libc::setsockopt( - stream.as_fd().as_raw_fd(), - libc::IPPROTO_TCP, - libc::TCP_KEEPALIVE, - &keep_alive_interval as *const _ as *const _, - std::mem::size_of_val(&keep_alive_interval) as libc::socklen_t, - ); - } + if let Some(stream) = stream && + let Err(err) = setup_keepalive(stream, keep_alive_interval) + { + warn!( + proxy = proxy, + error = ?err, + "Failed to set keepalive interval", + ); } if let Some(stream) = stream { diff --git a/crates/rproxy/src/server/proxy/http/config.rs b/crates/rproxy/src/server/proxy/http/config.rs index f75201b..74c2f75 100644 --- a/crates/rproxy/src/server/proxy/http/config.rs +++ b/crates/rproxy/src/server/proxy/http/config.rs @@ -9,6 +9,7 @@ pub(crate) trait ConfigProxyHttp: Clone + Send + Unpin + 'static { fn backend_timeout(&self) -> Duration; fn backend_url(&self) -> Url; fn idle_connection_timeout(&self) -> Duration; + fn keepalive_interval(&self) -> Duration; fn listen_address(&self) -> SocketAddr; fn log_mirrored_requests(&self) -> bool; fn log_mirrored_responses(&self) -> bool; diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs index dcdad56..05db855 100644 --- a/crates/rproxy/src/server/proxy/http/proxy.rs +++ b/crates/rproxy/src/server/proxy/http/proxy.rs @@ -161,8 +161,6 @@ where } }; - let keep_alive_timeout = config.idle_connection_timeout().add(Duration::from_millis(1000)); - let keep_alive_interval = keep_alive_timeout.div_f64(f64::from(*TCP_KEEPALIVE_ATTEMPTS)); let workers_count = std::cmp::min(PARALLELISM.to_static(), config.backend_max_concurrent_requests()); let max_concurrent_requests_per_worker = @@ -195,6 +193,8 @@ where let server = if tls.enabled() { server.listen(P::name(), listener, move || { + let config = shared.config(); + let this = web::Data::new(Self::new(shared.clone(), max_concurrent_requests_per_worker)); @@ -207,12 +207,12 @@ where P::name(), metrics.clone(), client_connections_count.clone(), - keep_alive_timeout, + config.keepalive_interval(), ); let h1 = actix_http::HttpService::build() .client_disconnect_timeout(Duration::from_millis(1000)) // same as in HttpServer - .keep_alive(keep_alive_interval) + .keep_alive(config.keepalive_interval()) .on_connect_ext(move |io: &_, ext: _| { (on_connect)(io as &dyn std::any::Any, ext) }) @@ -228,6 +228,8 @@ where }) } else { server.listen(P::name(), listener, move || { + let config = shared.config(); + let this = web::Data::new(Self::new(shared.clone(), max_concurrent_requests_per_worker)); @@ -240,12 +242,12 @@ where P::name(), metrics.clone(), client_connections_count.clone(), - keep_alive_timeout, + config.keepalive_interval(), ); let h1 = actix_http::HttpService::build() .client_disconnect_timeout(Duration::from_millis(1000)) // same as in HttpServer - .keep_alive(keep_alive_interval) + .keep_alive(config.keepalive_interval()) .on_connect_ext(move |io: &_, ext: _| { (on_connect)(io as &dyn std::any::Any, ext) }) diff --git a/crates/rproxy/src/server/proxy/ws/config.rs b/crates/rproxy/src/server/proxy/ws/config.rs index af17611..a86bae6 100644 --- a/crates/rproxy/src/server/proxy/ws/config.rs +++ b/crates/rproxy/src/server/proxy/ws/config.rs @@ -1,9 +1,14 @@ +use std::{net::SocketAddr, time::Duration}; + +use tungstenite::http::Uri; + // ConfigProxyWs ------------------------------------------------------- pub(crate) trait ConfigProxyWs: Clone + Send + Unpin + 'static { - fn backend_timeout(&self) -> std::time::Duration; - fn backend_url(&self) -> tungstenite::http::Uri; - fn listen_address(&self) -> std::net::SocketAddr; + fn backend_timeout(&self) -> Duration; + fn backend_url(&self) -> Uri; + fn keep_alive_interval(&self) -> Duration; + fn listen_address(&self) -> SocketAddr; fn log_backend_messages(&self) -> bool; fn log_client_messages(&self) -> bool; fn log_sanitise(&self) -> bool; diff --git a/crates/rproxy/src/server/proxy/ws/proxy.rs b/crates/rproxy/src/server/proxy/ws/proxy.rs index a57ccbc..d4630f5 100644 --- a/crates/rproxy/src/server/proxy/ws/proxy.rs +++ b/crates/rproxy/src/server/proxy/ws/proxy.rs @@ -46,7 +46,7 @@ use crate::{ ws::{ProxyWsInner, config::ConfigProxyWs}, }, }, - utils::{Loggable, raw_transaction_to_hash}, + utils::{Loggable, raw_transaction_to_hash, setup_keepalive}, }; const WS_PING_INTERVAL_SECONDS: u64 = 1; @@ -131,11 +131,9 @@ where } }; - let keep_alive_timeout = config.backend_timeout().mul_f64(2.0); - let keep_alive_interval = keep_alive_timeout.div_f64(f64::from(*TCP_KEEPALIVE_ATTEMPTS)); let workers_count = PARALLELISM.to_static(); - let shared = ProxyWsSharedState::::new(config, &metrics); + let shared = ProxyWsSharedState::::new(config.clone(), &metrics); let client_connections_count = shared.client_connections_count.clone(); let worker_canceller = canceller.clone(); let worker_resetter = resetter.clone(); @@ -159,12 +157,12 @@ where .wrap(NormalizePath::new(TrailingSlash::Trim)) .default_service(web::route().to(Self::receive)) }) - .keep_alive(keep_alive_interval) + .keep_alive(config.keep_alive_interval()) .on_connect(ConnectionGuard::on_connect( P::name(), metrics, client_connections_count, - keep_alive_timeout, + config.keep_alive_interval(), )) .shutdown_signal(canceller.cancelled_owned()) .workers(workers_count); @@ -332,6 +330,37 @@ where } }; + match bknd_stream.get_ref() { + tokio_tungstenite::MaybeTlsStream::Plain(tcp_stream) => { + if let Err(err) = setup_keepalive(tcp_stream, this.config().keep_alive_interval()) { + warn!( + proxy = P::name(), + request_id = %info.req_id(), + connection_id = %info.conn_id(), + worker_id = %this.id, + error = ?err, + "Failed to set keepalive interval" + ); + } + } + + tokio_tungstenite::MaybeTlsStream::Rustls(tls_stream) => { + let (tcp_stream, _) = tls_stream.get_ref(); + if let Err(err) = setup_keepalive(tcp_stream, this.config().keep_alive_interval()) { + warn!( + proxy = P::name(), + request_id = %info.req_id(), + connection_id = %info.conn_id(), + worker_id = %this.id, + error = ?err, + "Failed to set keepalive interval" + ); + } + } + + &_ => {} + } + let (bknd_tx, bknd_rx) = bknd_stream.split(); let mut pump = ProxyWsPump { diff --git a/crates/rproxy/src/utils/utils_net.rs b/crates/rproxy/src/utils/utils_net.rs index 948bd55..773866b 100644 --- a/crates/rproxy/src/utils/utils_net.rs +++ b/crates/rproxy/src/utils/utils_net.rs @@ -1,7 +1,9 @@ -use std::net::IpAddr; +use std::{net::IpAddr, os::fd::AsFd, time::Duration}; use pnet::datalink::{self}; +// get_all_local_ip_addresses ------------------------------------------ + pub(crate) fn get_all_local_ip_addresses() -> Vec { let mut ips = Vec::new(); @@ -13,3 +15,57 @@ pub(crate) fn get_all_local_ip_addresses() -> Vec { ips } + +pub(crate) fn setup_keepalive( + stream: &tokio::net::TcpStream, + interval: Duration, +) -> std::io::Result<()> { + let interval_sec = interval.as_secs_f64().ceil() as i32; + + #[cfg(target_os = "linux")] + unsafe { + use std::os::fd::AsRawFd; + + if libc::setsockopt( + stream.as_fd().as_raw_fd(), + libc::IPPROTO_TCP, + libc::TCP_KEEPIDLE, + &interval_sec as *const _ as *const libc::c_void, + size_of_val(&interval_sec) as libc::socklen_t, + ) != 0 + { + return std::io::Result::Err(std::io::Error::last_os_error()); + } + + if libc::setsockopt( + stream.as_fd().as_raw_fd(), + libc::IPPROTO_TCP, + libc::TCP_KEEPINTVL, + &interval_sec as *const _ as *const libc::c_void, + size_of_val(&interval_sec) as libc::socklen_t, + ) != 0 + { + return std::io::Result::Err(std::io::Error::last_os_error()); + } + + Ok(()) + } + + #[cfg(target_os = "macos")] + unsafe { + use std::os::fd::AsRawFd; + + if libc::setsockopt( + stream.as_fd().as_raw_fd(), + libc::IPPROTO_TCP, + libc::TCP_KEEPALIVE, + &interval_sec as *const _ as *const _, + std::mem::size_of_val(&interval_sec) as libc::socklen_t, + ) != 0 + { + return std::io::Result::Err(std::io::Error::last_os_error()); + } + + Ok(()) + } +} From 40ddb8e04d0c6076c02d3eb7e69c95301c22e14c Mon Sep 17 00:00:00 2001 From: Anton Bronnikov Date: Wed, 3 Dec 2025 15:12:36 +0100 Subject: [PATCH 06/13] fix: close backend conn when chaos-blocking client one --- crates/rproxy/src/server/proxy/ws/proxy.rs | 64 ++++++++++++++++------ 1 file changed, 48 insertions(+), 16 deletions(-) diff --git a/crates/rproxy/src/server/proxy/ws/proxy.rs b/crates/rproxy/src/server/proxy/ws/proxy.rs index d4630f5..dc3713e 100644 --- a/crates/rproxy/src/server/proxy/ws/proxy.rs +++ b/crates/rproxy/src/server/proxy/ws/proxy.rs @@ -30,7 +30,7 @@ use scc::HashMap; use time::{UtcDateTime, format_description::well_known::Iso8601}; use tokio::{net::TcpStream, sync::broadcast}; use tokio_tungstenite::{MaybeTlsStream, WebSocketStream}; -use tracing::{debug, error, info, trace, warn}; +use tracing::{debug, error, info, warn}; use uuid::Uuid; use x509_parser::asn1_rs::ToStatic; @@ -276,7 +276,7 @@ where info: ProxyHttpRequestInfo, ) { let bknd_uri = this.backend.new_backend_uri(&info); - trace!( + debug!( proxy = P::name(), request_id = %info.req_id(), connection_id = %info.conn_id(), @@ -358,7 +358,7 @@ where } } - &_ => {} + &_ => {} // there's nothing else } let (bknd_tx, bknd_rx) = bknd_stream.split(); @@ -693,6 +693,26 @@ where let mut resetter = self.resetter.subscribe(); while pumping.is_ok() && !self.canceller.is_cancelled() && !resetter.is_closed() { + #[cfg(feature = "chaos")] + if self.chaos.stream_is_blocked.load(Ordering::Relaxed) { + tokio::select! { + _ = self.canceller.cancelled() => { + break; + } + + _ = resetter.recv() => { + break; + } + + // client => backend + clnt_msg = self.clnt_rx.next() => { + pumping = self.pump_clnt_to_bknd(UtcDateTime::now(), clnt_msg).await; + } + } + + continue; + } + tokio::select! { _ = self.canceller.cancelled() => { break; @@ -834,13 +854,19 @@ where if !self.chaos.stream_is_blocked.load(Ordering::Relaxed) && rand::random::() < self.shared.config().chaos_probability_stream_blocked() { - debug!( + warn!( proxy = P::name(), connection_id = %self.info.conn_id(), worker_id = %self.worker_id, "Blocking the stream (chaos)" ); self.chaos.stream_is_blocked.store(true, Ordering::Relaxed); + _ = self + .close_bknd_session(tungstenite::protocol::CloseFrame { + code: tungstenite::protocol::frame::coding::CloseCode::Normal, + reason: WS_CLOSE_REASON_NORMAL.into(), + }) + .await; } match clnt_msg { @@ -927,6 +953,11 @@ where // ping msg from client actix_ws::Message::Ping(bytes) => { + #[cfg(feature = "chaos")] + if self.chaos.stream_is_blocked.load(Ordering::Relaxed) { + return Ok(()); + } + #[cfg(debug_assertions)] debug!( proxy = P::name(), @@ -935,11 +966,6 @@ where "Handling client's ping..." ); - #[cfg(feature = "chaos")] - if self.chaos.stream_is_blocked.load(Ordering::Relaxed) { - return Ok(()); - } - #[cfg(feature = "chaos")] if rand::random::() < self.shared.config().chaos_probability_client_ping_ignored() @@ -1061,13 +1087,19 @@ where if !self.chaos.stream_is_blocked.load(Ordering::Relaxed) && rand::random::() < self.shared.config().chaos_probability_stream_blocked() { - debug!( + warn!( proxy = P::name(), connection_id = %self.info.conn_id(), worker_id = %self.worker_id, "Blocking the stream (chaos)" ); self.chaos.stream_is_blocked.store(true, Ordering::Relaxed); + _ = self + .close_bknd_session(tungstenite::protocol::CloseFrame { + code: tungstenite::protocol::frame::coding::CloseCode::Normal, + reason: WS_CLOSE_REASON_NORMAL.into(), + }) + .await; } match bknd_msg { @@ -1143,6 +1175,11 @@ where // ping tungstenite::Message::Ping(bytes) => { + #[cfg(feature = "chaos")] + if self.chaos.stream_is_blocked.load(Ordering::Relaxed) { + return Ok(()); + } + #[cfg(debug_assertions)] debug!( proxy = P::name(), @@ -1151,11 +1188,6 @@ where "Handling backend's ping..." ); - #[cfg(feature = "chaos")] - if self.chaos.stream_is_blocked.load(Ordering::Relaxed) { - return Ok(()); - } - #[cfg(feature = "chaos")] if rand::random::() < self.shared.config().chaos_probability_backend_ping_ignored() @@ -1472,7 +1504,7 @@ impl ProxyWsPing { let id = Uuid::from_u128(bytes.get_u128()); let conn_id = Uuid::from_u128(bytes.get_u128()); let Ok(timestamp) = UtcDateTime::from_unix_timestamp_nanos(bytes.get_i128()) else { - return None + return None; }; Some(Self { id, conn_id, timestamp }) From f9a8fd9ac7185c2c4758bb07f350e77d9b907aad Mon Sep 17 00:00:00 2001 From: Anton Bronnikov Date: Wed, 3 Dec 2025 15:22:59 +0100 Subject: [PATCH 07/13] fix: don't set keepalive if it's zero --- crates/rproxy/src/utils/utils_net.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/crates/rproxy/src/utils/utils_net.rs b/crates/rproxy/src/utils/utils_net.rs index 773866b..2a348b8 100644 --- a/crates/rproxy/src/utils/utils_net.rs +++ b/crates/rproxy/src/utils/utils_net.rs @@ -22,6 +22,10 @@ pub(crate) fn setup_keepalive( ) -> std::io::Result<()> { let interval_sec = interval.as_secs_f64().ceil() as i32; + if interval_sec == 0 { + return Ok(()); + } + #[cfg(target_os = "linux")] unsafe { use std::os::fd::AsRawFd; From 2b6552a8c98e13d388c7c9e22694f34ba5df676f Mon Sep 17 00:00:00 2001 From: Anton Bronnikov Date: Thu, 4 Dec 2025 12:24:00 +0100 Subject: [PATCH 08/13] chore: update docs --- readme.md | 26 ++++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/readme.md b/readme.md index 7b3d229..4a04e10 100644 --- a/readme.md +++ b/readme.md @@ -84,6 +84,12 @@ authrpc: [env: RPROXY_AUTHRPC_IDLE_CONNECTION_TIMEOUT=] [default: 30s] + --authrpc-keepalive-interval + interval between tcp keepalive packets on authrpc connections + + [env: RPROXY_AUTHRPC_KEEPALIVE_INTERVAL=] + [default: 5s] + --authrpc-listen-address host:port for authrpc proxy @@ -191,6 +197,12 @@ circuit-breaker: [default: ] flashblocks: + --flashblocks-backend-timeout + timeout to establish backend connections of to receive pong websocket response + + [env: RPROXY_FLASHBLOCKS_BACKEND_TIMEOUT=] + [default: 30s] + --flashblocks-backend url of flashblocks backend @@ -202,11 +214,11 @@ flashblocks: [env: RPROXY_FLASHBLOCKS_ENABLED=] - --flashblocks-backend-timeout - timeout to establish backend connections of to receive pong websocket response + --flashblocks-keepalive-interval + interval between tcp keepalive packets on flashblocks connections - [env: RPROXY_FLASHBLOCKS_BACKEND_TIMEOUT=] - [default: 30s] + [env: RPROXY_FLASHBLOCKS_KEEPALIVE_INTERVAL=] + [default: 5s] --flashblocks-listen-address host:port for flashblocks proxy @@ -280,6 +292,12 @@ rpc: [env: RPROXY_RPC_IDLE_CONNECTION_TIMEOUT=] [default: 30s] + --rpc-keepalive-interval + interval between tcp keepalive packets on rpc connections + + [env: RPROXY_RPC_KEEPALIVE_INTERVAL=] + [default: 5s] + --rpc-listen-address host:port for rpc proxy From e3fca10d1e46a8da92bb3c4aa72aa65b9bd32966 Mon Sep 17 00:00:00 2001 From: Anton Bronnikov Date: Thu, 4 Dec 2025 12:27:46 +0100 Subject: [PATCH 09/13] fix: bump default max open files limit --- Cargo.lock | 10 ++++++++++ crates/rproxy/Cargo.toml | 1 + crates/rproxy/src/server.rs | 29 +++++++++++++++++++++++++++-- 3 files changed, 38 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cbea518..22e882a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3340,6 +3340,15 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rlimit" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7043b63bd0cd1aaa628e476b80e6d4023a3b50eb32789f2728908107bd0c793a" +dependencies = [ + "libc", +] + [[package]] name = "rlp" version = "0.5.2" @@ -3385,6 +3394,7 @@ dependencies = [ "pnet", "prometheus-client", "rand 0.9.2", + "rlimit", "rustc-hash", "rustls", "rustls-pemfile", diff --git a/crates/rproxy/Cargo.toml b/crates/rproxy/Cargo.toml index 263879b..fe5fa91 100644 --- a/crates/rproxy/Cargo.toml +++ b/crates/rproxy/Cargo.toml @@ -48,6 +48,7 @@ pin-project = "1.1.10" pnet = "0.35.0" prometheus-client = { git = "https://github.com/0x416e746f6e/client_rust.git", branch = "nested-labels"} rand = { version = "0.9.2", optional = true } +rlimit = "0.10.2" rustc-hash = "2.1.1" rustls = "0.23.32" rustls-pemfile = "2.2.0" diff --git a/crates/rproxy/src/server.rs b/crates/rproxy/src/server.rs index 5485b0b..054e166 100644 --- a/crates/rproxy/src/server.rs +++ b/crates/rproxy/src/server.rs @@ -1,5 +1,4 @@ pub mod config; - pub(crate) mod metrics; pub(crate) mod proxy; @@ -13,7 +12,7 @@ use tokio::{ task::JoinHandle, }; use tokio_util::sync::CancellationToken; -use tracing::{error, info}; +use tracing::{error, info, warn}; use crate::{ config::Config, @@ -29,6 +28,8 @@ use crate::{ utils::tls_certificate_validity_timestamps, }; +const MAX_OPEN_FILES: u64 = 10240; + // Proxy --------------------------------------------------------------- pub struct Server {} @@ -38,6 +39,30 @@ impl Server { let canceller = Server::wait_for_shutdown_signal(); let resetter = Server::wait_for_reset_signal(canceller.clone()); + // try to set system limits + match rlimit::getrlimit(rlimit::Resource::NOFILE) { + Ok((mut soft, hard)) => { + soft = std::cmp::max(soft, MAX_OPEN_FILES); + soft = std::cmp::min(soft, hard); + + if let Err(err) = rlimit::setrlimit(rlimit::Resource::NOFILE, soft, hard) { + warn!( + error = ?err, + hard = hard, + soft = soft, + "Failed to set max open file limits", + ); + } + } + + Err(err) => { + warn!( + error = ?err, + "Failed to read max open file limits", + ); + } + } + // spawn metrics service let metrics = Arc::new(Metrics::new(config.metrics.clone())); { From 1dfa088a49dd642318acac18681a1ce00b74c9f9 Mon Sep 17 00:00:00 2001 From: Anton Bronnikov Date: Thu, 4 Dec 2025 13:45:02 +0100 Subject: [PATCH 10/13] fix: limit linger to 5s --- crates/rproxy/src/server/metrics.rs | 6 +++--- crates/rproxy/src/server/proxy/circuit_breaker.rs | 2 +- crates/rproxy/src/server/proxy/http/authrpc.rs | 2 +- crates/rproxy/src/server/proxy/http/proxy.rs | 2 +- crates/rproxy/src/server/proxy/ws/proxy.rs | 5 +++-- 5 files changed, 9 insertions(+), 8 deletions(-) diff --git a/crates/rproxy/src/server/metrics.rs b/crates/rproxy/src/server/metrics.rs index d1cac6b..638846d 100644 --- a/crates/rproxy/src/server/metrics.rs +++ b/crates/rproxy/src/server/metrics.rs @@ -341,15 +341,15 @@ impl Metrics { socket.set_nonblocking(true)?; // allow time to flush buffers on close - socket.set_linger(Some(Duration::from_secs(1)))?; + socket.set_linger(Some(Duration::from_millis(5000)))?; // allow binding to the socket while there are still TIME_WAIT connections socket.set_reuse_address(true)?; socket.set_tcp_keepalive( &TcpKeepalive::new() - .with_time(Duration::from_secs(15)) - .with_interval(Duration::from_secs(15)) + .with_time(Duration::from_millis(15000)) + .with_interval(Duration::from_millis(15000)) .with_retries(4), )?; diff --git a/crates/rproxy/src/server/proxy/circuit_breaker.rs b/crates/rproxy/src/server/proxy/circuit_breaker.rs index f8cb786..d0959f7 100644 --- a/crates/rproxy/src/server/proxy/circuit_breaker.rs +++ b/crates/rproxy/src/server/proxy/circuit_breaker.rs @@ -51,7 +51,7 @@ impl CircuitBreaker { #[inline] fn timeout(config: &ConfigCircuitBreaker) -> Duration { - std::cmp::min(Duration::from_secs(5), config.poll_interval * 3 / 4) + std::cmp::min(Duration::from_millis(5000), config.poll_interval * 3 / 4) } #[inline] diff --git a/crates/rproxy/src/server/proxy/http/authrpc.rs b/crates/rproxy/src/server/proxy/http/authrpc.rs index 80edb63..68ad350 100644 --- a/crates/rproxy/src/server/proxy/http/authrpc.rs +++ b/crates/rproxy/src/server/proxy/http/authrpc.rs @@ -36,7 +36,7 @@ impl ProxyHttpInner for ProxyHttpInnerAuthrpc { Self { config, fcu_cache: Cache::builder() - .time_to_live(Duration::from_secs(60)) + .time_to_live(Duration::from_millis(60000)) .max_capacity(4096) .build_with_hasher(FxBuildHasher), } diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs index 05db855..8dbd7f5 100644 --- a/crates/rproxy/src/server/proxy/http/proxy.rs +++ b/crates/rproxy/src/server/proxy/http/proxy.rs @@ -311,7 +311,7 @@ where socket.set_nonblocking(true)?; // allow time to flush buffers on close - socket.set_linger(Some(config.backend_timeout()))?; + socket.set_linger(Some(Duration::from_millis(5000)))?; // allow binding while there are still residual connections in TIME_WAIT socket.set_reuse_address(true)?; diff --git a/crates/rproxy/src/server/proxy/ws/proxy.rs b/crates/rproxy/src/server/proxy/ws/proxy.rs index dc3713e..780f29b 100644 --- a/crates/rproxy/src/server/proxy/ws/proxy.rs +++ b/crates/rproxy/src/server/proxy/ws/proxy.rs @@ -229,7 +229,7 @@ where socket.set_nonblocking(true)?; // allow time to flush buffers on close - socket.set_linger(Some(config.backend_timeout()))?; + socket.set_linger(Some(Duration::from_millis(5000)))?; // allow binding to the socket while there are still TIME_WAIT connections socket.set_reuse_address(true)?; @@ -686,7 +686,8 @@ where "Starting websocket pump..." ); - let mut heartbeat = tokio::time::interval(Duration::from_secs(WS_PING_INTERVAL_SECONDS)); + let mut heartbeat = + tokio::time::interval(Duration::from_millis(WS_PING_INTERVAL_SECONDS * 1000)); let mut pumping: Result<(), &str> = Ok(()); From b9e5385138cfb27128e9559dd2a1fde90532f471 Mon Sep 17 00:00:00 2001 From: Anton Bronnikov Date: Thu, 4 Dec 2025 18:36:26 +0100 Subject: [PATCH 11/13] fix: drop backend connections on close --- crates/rproxy/src/server/proxy/ws/proxy.rs | 128 +++++++++++---------- 1 file changed, 70 insertions(+), 58 deletions(-) diff --git a/crates/rproxy/src/server/proxy/ws/proxy.rs b/crates/rproxy/src/server/proxy/ws/proxy.rs index 780f29b..a1a5993 100644 --- a/crates/rproxy/src/server/proxy/ws/proxy.rs +++ b/crates/rproxy/src/server/proxy/ws/proxy.rs @@ -1,6 +1,7 @@ use std::{ io::Write, marker::PhantomData, + mem, str::FromStr, sync::{ Arc, @@ -372,8 +373,8 @@ where resetter: this.resetter.clone(), clnt_tx, clnt_rx, - bknd_tx, - bknd_rx, + bknd_tx: Some(bknd_tx), + bknd_rx: Some(bknd_rx), pings: HashMap::new(), ping_balance_bknd: AtomicI64::new(0), ping_balance_clnt: AtomicI64::new(0), @@ -662,8 +663,8 @@ where clnt_tx: Session, clnt_rx: MessageStream, - bknd_tx: SplitSink>, tungstenite::Message>, - bknd_rx: SplitStream>>, + bknd_tx: Option>, tungstenite::Message>>, + bknd_rx: Option>>>, pings: HashMap, ping_balance_clnt: AtomicI64, @@ -734,7 +735,12 @@ where } // backend => client - bknd_msg = self.bknd_rx.next() => { + bknd_msg = async { + match &mut self.bknd_rx { + Some(bknd_rx) => bknd_rx.next().await, + None => None, + } + } => { pumping = self.pump_bknd_to_cli(UtcDateTime::now(), bknd_msg).await; } } @@ -828,8 +834,9 @@ where } let bknd_ping = ProxyWsPing::new(self.info.conn_id()); - if let Err(err) = - self.bknd_tx.send(tungstenite::Message::Ping(bknd_ping.to_bytes())).await + if let Some(bknd_tx) = &mut self.bknd_tx && + let Err(err) = + bknd_tx.send(tungstenite::Message::Ping(bknd_ping.to_bytes())).await { error!( proxy = P::name(), @@ -880,8 +887,9 @@ where return Ok(()); } - if let Err(err) = - self.bknd_tx.send(tungstenite::Message::Binary(bytes.clone())).await + if let Some(bknd_tx) = &mut self.bknd_tx && + let Err(err) = + bknd_tx.send(tungstenite::Message::Binary(bytes.clone())).await { error!( proxy = P::name(), @@ -916,15 +924,15 @@ where return Ok(()); } - if let Err(err) = self - .bknd_tx - .send(tungstenite::Message::Text(unsafe { - // safety: it's from client's ws message => must be valid utf-8 - tungstenite::protocol::frame::Utf8Bytes::from_bytes_unchecked( - text.clone().into_bytes(), - ) - })) - .await + if let Some(bknd_tx) = &mut self.bknd_tx && + let Err(err) = bknd_tx + .send(tungstenite::Message::Text(unsafe { + // safety: it's from client's ws message => must be valid utf-8 + tungstenite::protocol::frame::Utf8Bytes::from_bytes_unchecked( + text.clone().into_bytes(), + ) + })) + .await { error!( proxy = P::name(), @@ -1202,7 +1210,8 @@ where return Ok(()); } - if let Err(err) = self.bknd_tx.send(tungstenite::Message::Pong(bytes)).await + if let Some(bknd_tx) = &mut self.bknd_tx && + let Err(err) = bknd_tx.send(tungstenite::Message::Pong(bytes)).await { error!( proxy = P::name(), @@ -1320,37 +1329,50 @@ where &mut self, frame: tungstenite::protocol::CloseFrame, ) -> Result<(), &'static str> { - debug!( - proxy = P::name(), - connection_id = %self.info.conn_id(), - worker_id = %self.worker_id, - msg = %frame.reason, - "Closing backend websocket session..." - ); + if let Some(mut bknd_tx) = mem::take(&mut self.bknd_tx) { + debug!( + proxy = P::name(), + connection_id = %self.info.conn_id(), + worker_id = %self.worker_id, + msg = %frame.reason, + "Closing backend websocket session..." + ); - if let Err(err) = self - .bknd_tx - .send(tungstenite::Message::Close(Some( - frame.clone(), // it's cheap to clone - ))) - .await - { - if let tungstenite::error::Error::AlreadyClosed = err { - return Ok(()); - } - if let tungstenite::error::Error::Protocol(protocol_err) = err { - if protocol_err == tungstenite::error::ProtocolError::SendAfterClosing { + if let Err(err) = bknd_tx + .send(tungstenite::Message::Close(Some( + frame.clone(), // it's cheap to clone + ))) + .await + { + if let tungstenite::error::Error::AlreadyClosed = err { return Ok(()); } - error!( - proxy = P::name(), - connection_id = %self.info.conn_id(), - worker_id = %self.worker_id, - msg = %frame.reason, - error = ?protocol_err, - "Failed to close backend websocket session" - ); - } else { + if let tungstenite::error::Error::Protocol(protocol_err) = err { + if protocol_err == tungstenite::error::ProtocolError::SendAfterClosing { + return Ok(()); + } + error!( + proxy = P::name(), + connection_id = %self.info.conn_id(), + worker_id = %self.worker_id, + msg = %frame.reason, + error = ?protocol_err, + "Failed to close backend websocket session" + ); + } else { + error!( + proxy = P::name(), + connection_id = %self.info.conn_id(), + worker_id = %self.worker_id, + msg = %frame.reason, + error = ?err, + "Failed to close backend websocket session" + ); + } + return Err(WS_BKND_ERROR); + } + + if let Err(err) = bknd_tx.close().await { error!( proxy = P::name(), connection_id = %self.info.conn_id(), @@ -1360,19 +1382,9 @@ where "Failed to close backend websocket session" ); } - return Err(WS_BKND_ERROR); } - if let Err(err) = self.bknd_tx.close().await { - error!( - proxy = P::name(), - connection_id = %self.info.conn_id(), - worker_id = %self.worker_id, - msg = %frame.reason, - error = ?err, - "Failed to close backend websocket session" - ); - } + drop(mem::take(&mut self.bknd_rx)); Ok(()) } From a35ec2904a870273e4eeb63be0789a1a50016ad3 Mon Sep 17 00:00:00 2001 From: Anton Bronnikov Date: Fri, 5 Dec 2025 11:12:18 +0100 Subject: [PATCH 12/13] chore: use os defaults for keepalive defaults --- crates/rproxy/src/config.rs | 68 +++++++++++++++++++ .../rproxy/src/server/proxy/config/authrpc.rs | 21 +++++- .../src/server/proxy/config/flashblocks.rs | 26 ++++++- crates/rproxy/src/server/proxy/config/rpc.rs | 26 ++++++- .../src/server/proxy/connection_guard.rs | 32 --------- crates/rproxy/src/server/proxy/http/config.rs | 1 + crates/rproxy/src/server/proxy/http/proxy.rs | 5 +- crates/rproxy/src/server/proxy/mod.rs | 2 +- crates/rproxy/src/server/proxy/ws/config.rs | 3 +- crates/rproxy/src/server/proxy/ws/proxy.rs | 13 ++-- 10 files changed, 146 insertions(+), 51 deletions(-) diff --git a/crates/rproxy/src/config.rs b/crates/rproxy/src/config.rs index 7933bcd..cf910d9 100644 --- a/crates/rproxy/src/config.rs +++ b/crates/rproxy/src/config.rs @@ -1,7 +1,9 @@ use std::{process, sync::LazyLock}; use clap::Parser; +use sysctl::Sysctl; use thiserror::Error; +use x509_parser::asn1_rs::ToStatic; use crate::server::{ config::{ConfigLogging, ConfigLoggingError, ConfigMetrics, ConfigMetricsError}, @@ -26,6 +28,72 @@ pub(crate) static PARALLELISM: LazyLock = pub(crate) static PARALLELISM_STRING: LazyLock = LazyLock::new(|| PARALLELISM.to_string()); +pub(crate) static TCP_KEEPALIVE_INTERVAL: LazyLock = LazyLock::new(|| { + #[cfg(target_os = "linux")] + { + let mut res: libc::c_int = 75; + if let Ok(ctl) = sysctl::Ctl::new("net.ipv4.tcp_keepalive_intvl") && + let Ok(value) = ctl.value() && + let Ok(value) = value.into_int() + { + res = value + } + return res; + } + + #[cfg(target_os = "macos")] + { + let mut res: libc::c_int = 75000; + if let Ok(ctl) = sysctl::Ctl::new("net.inet.tcp.keepintvl") && + let Ok(value) = ctl.value() && + let Ok(value) = value.into_int() + { + res = value / 1000 // millis on macos + } + + return res; + } + + #[allow(unreachable_code)] + 75 +}); + +pub(crate) static TCP_KEEPALIVE_INTERVAL_STRING: LazyLock = + LazyLock::new(|| format!("{}s", TCP_KEEPALIVE_INTERVAL.to_static())); + +pub(crate) static TCP_KEEPALIVE_PROBES: LazyLock = LazyLock::new(|| { + #[cfg(target_os = "linux")] + { + let mut res: libc::c_int = 9; + if let Ok(ctl) = sysctl::Ctl::new("net.ipv4.tcp_keepalive_probes") && + let Ok(value) = ctl.value() && + let Ok(value) = value.into_int() + { + res = value + } + return res; + } + + #[cfg(target_os = "macos")] + { + let mut res: libc::c_int = 8; + if let Ok(ctl) = sysctl::Ctl::new("net.inet.tcp.keepcnt") && + let Ok(value) = ctl.value() && + let Ok(value) = value.into_int() + { + res = value + } + + return res; + } + + #[allow(unreachable_code)] + 8 +}); + +pub(crate) static TCP_KEEPALIVE_PROBES_STRING: LazyLock = + LazyLock::new(|| TCP_KEEPALIVE_PROBES.to_string()); + // Config -------------------------------------------------------------- #[derive(Clone, Parser)] diff --git a/crates/rproxy/src/server/proxy/config/authrpc.rs b/crates/rproxy/src/server/proxy/config/authrpc.rs index 0a10a07..7a172d0 100644 --- a/crates/rproxy/src/server/proxy/config/authrpc.rs +++ b/crates/rproxy/src/server/proxy/config/authrpc.rs @@ -9,7 +9,7 @@ use tracing::warn; use url::Url; use crate::{ - config::ALREADY_VALIDATED, + config::{ALREADY_VALIDATED, TCP_KEEPALIVE_INTERVAL_STRING, TCP_KEEPALIVE_PROBES_STRING}, server::proxy::http::config::{ConfigProxyHttp, ConfigProxyHttpMirroringStrategy}, utils::get_all_local_ip_addresses, }; @@ -86,7 +86,7 @@ pub(crate) struct ConfigAuthrpc { /// interval between tcp keepalive packets on authrpc connections #[arg( - default_value = "5s", + default_value = TCP_KEEPALIVE_INTERVAL_STRING.as_str(), env = "RPROXY_AUTHRPC_KEEPALIVE_INTERVAL", help_heading = "authrpc", long("authrpc-keepalive-interval"), @@ -96,6 +96,18 @@ pub(crate) struct ConfigAuthrpc { )] pub(crate) keepalive_interval: Duration, + /// maximum number of keepalive probes to send before dropping authrpc + /// connection + #[arg( + default_value = TCP_KEEPALIVE_PROBES_STRING.as_str(), + env = "RPROXY_AUTHRPC_KEEPALIVE_RETRIES", + help_heading = "authrpc", + long("authrpc-keepalive-retries"), + name("authrpc_keepalive_retries"), + value_name = "count" + )] + pub(crate) keepalive_retries: u32, + /// host:port for authrpc proxy #[arg( default_value = "0.0.0.0:8651", @@ -370,6 +382,11 @@ impl ConfigProxyHttp for ConfigAuthrpc { self.keepalive_interval } + #[inline] + fn keepalive_retries(&self) -> u32 { + self.keepalive_retries + } + #[inline] fn listen_address(&self) -> SocketAddr { self.listen_address.parse::().expect(ALREADY_VALIDATED) diff --git a/crates/rproxy/src/server/proxy/config/flashblocks.rs b/crates/rproxy/src/server/proxy/config/flashblocks.rs index 093a807..ad12ae2 100644 --- a/crates/rproxy/src/server/proxy/config/flashblocks.rs +++ b/crates/rproxy/src/server/proxy/config/flashblocks.rs @@ -4,7 +4,10 @@ use awc::http::Uri; use clap::Args; use thiserror::Error; -use crate::{config::ALREADY_VALIDATED, server::proxy::ws::config::ConfigProxyWs}; +use crate::{ + config::{ALREADY_VALIDATED, TCP_KEEPALIVE_INTERVAL_STRING, TCP_KEEPALIVE_PROBES_STRING}, + server::proxy::ws::config::ConfigProxyWs, +}; // ConfigFlashblocks --------------------------------------------------- @@ -44,7 +47,7 @@ pub(crate) struct ConfigFlashblocks { pub(crate) enabled: bool, /// interval between tcp keepalive packets on flashblocks connections #[arg( - default_value = "5s", + default_value = TCP_KEEPALIVE_INTERVAL_STRING.as_str(), env = "RPROXY_FLASHBLOCKS_KEEPALIVE_INTERVAL", help_heading = "flashblocks", long("flashblocks-keepalive-interval"), @@ -54,6 +57,18 @@ pub(crate) struct ConfigFlashblocks { )] pub(crate) keepalive_interval: Duration, + /// maximum number of keepalive probes to send before dropping + /// flashblocks connection + #[arg( + default_value = TCP_KEEPALIVE_PROBES_STRING.as_str(), + env = "RPROXY_FLASHBLOCKS_KEEPALIVE_RETRIES", + help_heading = "flashblocks", + long("flashblocks-keepalive-retries"), + name("flashblocks_keepalive_retries"), + value_name = "count" + )] + pub(crate) keepalive_retries: u32, + /// host:port for flashblocks proxy #[arg( default_value = "0.0.0.0:1111", @@ -224,10 +239,15 @@ impl ConfigProxyWs for ConfigFlashblocks { } #[inline] - fn keep_alive_interval(&self) -> Duration { + fn keepalive_interval(&self) -> Duration { self.keepalive_interval } + #[inline] + fn keepalive_retries(&self) -> u32 { + self.keepalive_retries + } + #[inline] fn listen_address(&self) -> SocketAddr { self.listen_address.parse::().expect(ALREADY_VALIDATED) diff --git a/crates/rproxy/src/server/proxy/config/rpc.rs b/crates/rproxy/src/server/proxy/config/rpc.rs index e6cd20b..2b17549 100644 --- a/crates/rproxy/src/server/proxy/config/rpc.rs +++ b/crates/rproxy/src/server/proxy/config/rpc.rs @@ -9,7 +9,12 @@ use tracing::warn; use url::Url; use crate::{ - config::{ALREADY_VALIDATED, PARALLELISM_STRING}, + config::{ + ALREADY_VALIDATED, + PARALLELISM_STRING, + TCP_KEEPALIVE_INTERVAL_STRING, + TCP_KEEPALIVE_PROBES_STRING, + }, server::proxy::http::config::{ConfigProxyHttp, ConfigProxyHttpMirroringStrategy}, utils::get_all_local_ip_addresses, }; @@ -75,7 +80,7 @@ pub(crate) struct ConfigRpc { /// interval between tcp keepalive packets on rpc connections #[arg( - default_value = "5s", + default_value = TCP_KEEPALIVE_INTERVAL_STRING.as_str(), env = "RPROXY_RPC_KEEPALIVE_INTERVAL", help_heading = "rpc", long("rpc-keepalive-interval"), @@ -85,6 +90,18 @@ pub(crate) struct ConfigRpc { )] pub(crate) keepalive_interval: Duration, + /// maximum number of keepalive probes to send before dropping rpc + /// connection + #[arg( + default_value = TCP_KEEPALIVE_PROBES_STRING.as_str(), + env = "RPROXY_RPC_KEEPALIVE_RETRIES", + help_heading = "rpc", + long("rpc-keepalive-retries"), + name("rpc_keepalive_retries"), + value_name = "count" + )] + pub(crate) keepalive_retries: u32, + /// host:port for rpc proxy #[arg( default_value = "0.0.0.0:8645", @@ -366,6 +383,11 @@ impl ConfigProxyHttp for ConfigRpc { self.keepalive_interval } + #[inline] + fn keepalive_retries(&self) -> u32 { + self.keepalive_retries + } + #[inline] fn listen_address(&self) -> SocketAddr { self.listen_address.parse::().expect(ALREADY_VALIDATED) diff --git a/crates/rproxy/src/server/proxy/connection_guard.rs b/crates/rproxy/src/server/proxy/connection_guard.rs index 91eac4b..804db32 100644 --- a/crates/rproxy/src/server/proxy/connection_guard.rs +++ b/crates/rproxy/src/server/proxy/connection_guard.rs @@ -2,14 +2,12 @@ use std::{ any::Any, sync::{ Arc, - LazyLock, atomic::{AtomicI64, Ordering}, }, time::Duration, }; use actix_web::dev::Extensions; -use sysctl::Sysctl; use tracing::{debug, warn}; use uuid::Uuid; @@ -18,36 +16,6 @@ use crate::{ utils::setup_keepalive, }; -pub(crate) static TCP_KEEPALIVE_ATTEMPTS: LazyLock = LazyLock::new(|| { - #[cfg(target_os = "linux")] - { - let mut res: libc::c_int = 9; - if let Ok(ctl) = sysctl::Ctl::new("net.ipv4.tcp_keepalive_probes") && - let Ok(value) = ctl.value() && - let Ok(value) = value.into_int() - { - res = value - } - return res; - } - - #[cfg(target_os = "macos")] - { - let mut res: libc::c_int = 8; - if let Ok(ctl) = sysctl::Ctl::new("net.ipv4.tcp_keepalive_probes") && - let Ok(value) = ctl.value() && - let Ok(value) = value.into_int() - { - res = value - } - - return res; - } - - #[allow(unreachable_code)] - 8 -}); - // ProxyConnectionGuard ------------------------------------------------ pub(crate) struct ConnectionGuard { diff --git a/crates/rproxy/src/server/proxy/http/config.rs b/crates/rproxy/src/server/proxy/http/config.rs index 74c2f75..7985984 100644 --- a/crates/rproxy/src/server/proxy/http/config.rs +++ b/crates/rproxy/src/server/proxy/http/config.rs @@ -10,6 +10,7 @@ pub(crate) trait ConfigProxyHttp: Clone + Send + Unpin + 'static { fn backend_url(&self) -> Url; fn idle_connection_timeout(&self) -> Duration; fn keepalive_interval(&self) -> Duration; + fn keepalive_retries(&self) -> u32; fn listen_address(&self) -> SocketAddr; fn log_mirrored_requests(&self) -> bool; fn log_mirrored_responses(&self) -> bool; diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs index 8dbd7f5..aa66e31 100644 --- a/crates/rproxy/src/server/proxy/http/proxy.rs +++ b/crates/rproxy/src/server/proxy/http/proxy.rs @@ -55,7 +55,6 @@ use crate::{ metrics::{LabelsProxy, LabelsProxyClientInfo, LabelsProxyHttpJrpc, Metrics}, proxy::{ ConnectionGuard, - TCP_KEEPALIVE_ATTEMPTS, config::ConfigTls, http::{ ProxyHttpInner, @@ -298,13 +297,13 @@ where // allow keep-alive packets let keep_alive_timeout = config.idle_connection_timeout().add(Duration::from_millis(1000)); - let keep_alive_interval = keep_alive_timeout.div_f64(f64::from(*TCP_KEEPALIVE_ATTEMPTS)); + let keep_alive_interval = keep_alive_timeout.div_f64(f64::from(config.keepalive_retries())); socket.set_keepalive(true)?; socket.set_tcp_keepalive( &socket2::TcpKeepalive::new() .with_time(keep_alive_interval) .with_interval(keep_alive_interval) - .with_retries(*TCP_KEEPALIVE_ATTEMPTS as u32), + .with_retries(config.keepalive_retries()), )?; // must use non-blocking with tokio diff --git a/crates/rproxy/src/server/proxy/mod.rs b/crates/rproxy/src/server/proxy/mod.rs index 2a47506..0683f60 100644 --- a/crates/rproxy/src/server/proxy/mod.rs +++ b/crates/rproxy/src/server/proxy/mod.rs @@ -4,4 +4,4 @@ pub(crate) mod http; pub(crate) mod ws; pub(crate) mod connection_guard; -use connection_guard::{ConnectionGuard, TCP_KEEPALIVE_ATTEMPTS}; +use connection_guard::ConnectionGuard; diff --git a/crates/rproxy/src/server/proxy/ws/config.rs b/crates/rproxy/src/server/proxy/ws/config.rs index a86bae6..d27914f 100644 --- a/crates/rproxy/src/server/proxy/ws/config.rs +++ b/crates/rproxy/src/server/proxy/ws/config.rs @@ -7,7 +7,8 @@ use tungstenite::http::Uri; pub(crate) trait ConfigProxyWs: Clone + Send + Unpin + 'static { fn backend_timeout(&self) -> Duration; fn backend_url(&self) -> Uri; - fn keep_alive_interval(&self) -> Duration; + fn keepalive_interval(&self) -> Duration; + fn keepalive_retries(&self) -> u32; fn listen_address(&self) -> SocketAddr; fn log_backend_messages(&self) -> bool; fn log_client_messages(&self) -> bool; diff --git a/crates/rproxy/src/server/proxy/ws/proxy.rs b/crates/rproxy/src/server/proxy/ws/proxy.rs index a1a5993..9aa4688 100644 --- a/crates/rproxy/src/server/proxy/ws/proxy.rs +++ b/crates/rproxy/src/server/proxy/ws/proxy.rs @@ -41,7 +41,6 @@ use crate::{ metrics::{LabelsProxyWs, Metrics}, proxy::{ ConnectionGuard, - TCP_KEEPALIVE_ATTEMPTS, config::ConfigTls, http::ProxyHttpRequestInfo, ws::{ProxyWsInner, config::ConfigProxyWs}, @@ -158,12 +157,12 @@ where .wrap(NormalizePath::new(TrailingSlash::Trim)) .default_service(web::route().to(Self::receive)) }) - .keep_alive(config.keep_alive_interval()) + .keep_alive(config.keepalive_interval()) .on_connect(ConnectionGuard::on_connect( P::name(), metrics, client_connections_count, - config.keep_alive_interval(), + config.keepalive_interval(), )) .shutdown_signal(canceller.cancelled_owned()) .workers(workers_count); @@ -217,13 +216,13 @@ where // allow keep-alive packets let keep_alive_timeout = config.backend_timeout().mul_f64(2.0); - let keep_alive_interval = keep_alive_timeout.div_f64(f64::from(*TCP_KEEPALIVE_ATTEMPTS)); + let keep_alive_interval = keep_alive_timeout.div_f64(f64::from(config.keepalive_retries())); socket.set_keepalive(true)?; socket.set_tcp_keepalive( &socket2::TcpKeepalive::new() .with_time(keep_alive_interval) .with_interval(keep_alive_interval) - .with_retries(*TCP_KEEPALIVE_ATTEMPTS as u32), + .with_retries(config.keepalive_retries()), )?; // must use non-blocking with tokio @@ -333,7 +332,7 @@ where match bknd_stream.get_ref() { tokio_tungstenite::MaybeTlsStream::Plain(tcp_stream) => { - if let Err(err) = setup_keepalive(tcp_stream, this.config().keep_alive_interval()) { + if let Err(err) = setup_keepalive(tcp_stream, this.config().keepalive_interval()) { warn!( proxy = P::name(), request_id = %info.req_id(), @@ -347,7 +346,7 @@ where tokio_tungstenite::MaybeTlsStream::Rustls(tls_stream) => { let (tcp_stream, _) = tls_stream.get_ref(); - if let Err(err) = setup_keepalive(tcp_stream, this.config().keep_alive_interval()) { + if let Err(err) = setup_keepalive(tcp_stream, this.config().keepalive_interval()) { warn!( proxy = P::name(), request_id = %info.req_id(), From bbe062bff8242b281e4abd9c75735fedf217e0cd Mon Sep 17 00:00:00 2001 From: Anton Bronnikov Date: Wed, 10 Dec 2025 12:41:49 -0400 Subject: [PATCH 13/13] fix: use connection id from the guard --- crates/rproxy/src/server/proxy/http/proxy.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/rproxy/src/server/proxy/http/proxy.rs b/crates/rproxy/src/server/proxy/http/proxy.rs index aa66e31..2c186e4 100644 --- a/crates/rproxy/src/server/proxy/http/proxy.rs +++ b/crates/rproxy/src/server/proxy/http/proxy.rs @@ -1517,7 +1517,7 @@ impl ProxyHttpRequestInfo { Self { req_id: Uuid::now_v7(), - conn_id: Uuid::now_v7(), + conn_id: guard.map_or(Uuid::nil(), |guard| guard.id), remote_addr, method: req.method().clone(), path,