diff --git a/bin/stateless-validator/src/main.rs b/bin/stateless-validator/src/main.rs index 3b12254..3385956 100644 --- a/bin/stateless-validator/src/main.rs +++ b/bin/stateless-validator/src/main.rs @@ -118,6 +118,21 @@ struct CommandLineArgs { #[clap(long, env = "STATELESS_VALIDATOR_METRICS_PORT", default_value_t = metrics::DEFAULT_METRICS_PORT)] metrics_port: u16, + /// Timeout in seconds for block and header RPC fetches (eth_getBlockByNumber, etc.). + /// Defaults to no timeout. + #[clap(long, env = "STATELESS_VALIDATOR_BLOCK_TIMEOUT_SECS")] + block_timeout_secs: Option, + + /// Timeout in seconds for witness RPC fetches (mega_getBlockWitness). + /// Defaults to no timeout. + #[clap(long, env = "STATELESS_VALIDATOR_WITNESS_TIMEOUT_SECS")] + witness_timeout_secs: Option, + + /// Timeout in seconds for contract code RPC fetches (eth_getCodeByHash). + /// Defaults to no timeout. + #[clap(long, env = "STATELESS_VALIDATOR_CODE_TIMEOUT_SECS")] + code_timeout_secs: Option, + /// Logging configuration. #[command(flatten)] log: LogArgs, @@ -148,7 +163,17 @@ async fn main() -> Result<()> { let work_dir = PathBuf::from(args.data_dir); - let rpc_config = RpcClientConfig::validator().with_metrics(Arc::new(metrics::ValidatorMetrics)); + let mut rpc_config = + RpcClientConfig::validator().with_metrics(Arc::new(metrics::ValidatorMetrics)); + if let Some(secs) = args.block_timeout_secs { + rpc_config = rpc_config.with_block_timeout(Duration::from_secs(secs)); + } + if let Some(secs) = args.witness_timeout_secs { + rpc_config = rpc_config.with_witness_timeout(Duration::from_secs(secs)); + } + if let Some(secs) = args.code_timeout_secs { + rpc_config = rpc_config.with_code_timeout(Duration::from_secs(secs)); + } let client = Arc::new(RpcClient::new_with_config( &args.rpc_endpoint, &args.witness_endpoint, diff --git a/crates/stateless-core/src/rpc_client.rs b/crates/stateless-core/src/rpc_client.rs index 59db8cd..82b916d 100644 --- a/crates/stateless-core/src/rpc_client.rs +++ b/crates/stateless-core/src/rpc_client.rs @@ -2,7 +2,11 @@ //! //! Provides methods to fetch blocks, witnesses, and contract bytecode from MegaETH nodes. -use std::{collections::HashMap, sync::Arc, time::Instant}; +use std::{ + collections::HashMap, + sync::Arc, + time::{Duration, Instant}, +}; use alloy_primitives::{B256, Bytes, U64}; use alloy_provider::{Provider, ProviderBuilder, RootProvider}; @@ -101,6 +105,15 @@ pub struct RpcClientConfig { pub skip_block_verification: bool, /// Optional metrics callbacks for tracking RPC performance. pub metrics: Option>, + /// Timeout for block and header fetches (eth_getBlockByNumber, eth_getHeaderByNumber, etc.). + /// `None` means no timeout. + pub block_timeout: Option, + /// Timeout for witness fetches (mega_getBlockWitness). + /// `None` means no timeout. + pub witness_timeout: Option, + /// Timeout for contract code fetches (eth_getCodeByHash). + /// `None` means no timeout. + pub code_timeout: Option, } impl std::fmt::Debug for RpcClientConfig { @@ -115,12 +128,24 @@ impl std::fmt::Debug for RpcClientConfig { impl RpcClientConfig { /// Creates a config for validation mode (full verification). pub fn validator() -> Self { - Self { skip_block_verification: false, metrics: None } + Self { + skip_block_verification: false, + metrics: None, + block_timeout: None, + witness_timeout: None, + code_timeout: None, + } } /// Creates a config for trace/debug mode (skip verification). pub fn trace_server() -> Self { - Self { skip_block_verification: true, metrics: None } + Self { + skip_block_verification: true, + metrics: None, + block_timeout: None, + witness_timeout: None, + code_timeout: None, + } } /// Sets the metrics callbacks. @@ -128,6 +153,24 @@ impl RpcClientConfig { self.metrics = Some(metrics); self } + + /// Sets the timeout for block and header fetches. + pub fn with_block_timeout(mut self, timeout: Duration) -> Self { + self.block_timeout = Some(timeout); + self + } + + /// Sets the timeout for witness fetches. + pub fn with_witness_timeout(mut self, timeout: Duration) -> Self { + self.witness_timeout = Some(timeout); + self + } + + /// Sets the timeout for contract code fetches. + pub fn with_code_timeout(mut self, timeout: Duration) -> Self { + self.code_timeout = Some(timeout); + self + } } /// Response from mega_setValidatedBlocks RPC call @@ -218,15 +261,31 @@ impl RpcClient { } } + /// Applies an optional timeout to a future, returning a timeout error if exceeded. + async fn apply_timeout(timeout: Option, fut: F) -> Result + where + F: std::future::Future>, + { + match timeout { + Some(d) => tokio::time::timeout(d, fut) + .await + .map_err(|_| eyre!("RPC request timed out after {:.1}s", d.as_secs_f64())) + .and_then(|r| r), + None => fut.await, + } + } + /// Gets contract bytecode for a code hash. pub async fn get_code(&self, hash: B256) -> Result { let start = Instant::now(); - let result = self - .data_provider - .client() - .request("eth_getCodeByHash", (hash,)) - .await - .map_err(|e| eyre!("eth_getCodeByHash for hash {hash:?} failed: {e}")); + let result = Self::apply_timeout(self.config.code_timeout, async { + self.data_provider + .client() + .request("eth_getCodeByHash", (hash,)) + .await + .map_err(|e| eyre!("eth_getCodeByHash for hash {hash:?} failed: {e}")) + }) + .await; self.record_rpc( RpcMethod::EthGetCodeByHash, result.is_ok(), @@ -269,11 +328,15 @@ impl RpcClient { block_id: BlockId, full_txs: bool, ) -> Result> { - let block = if full_txs { - self.data_provider.get_block(block_id).full().await? - } else { - self.data_provider.get_block(block_id).await? - }; + let block = Self::apply_timeout(self.config.block_timeout, async { + let block = if full_txs { + self.data_provider.get_block(block_id).full().await? + } else { + self.data_provider.get_block(block_id).await? + }; + Ok(block) + }) + .await?; let block = block.ok_or_else(|| eyre!("Block {:?} not found", block_id))?; @@ -303,8 +366,10 @@ impl RpcClient { /// Gets the current latest block number from the blockchain. pub async fn get_latest_block_number(&self) -> Result { - let result = - self.data_provider.get_block_number().await.context("Failed to get block number"); + let result = Self::apply_timeout(self.config.block_timeout, async { + self.data_provider.get_block_number().await.context("Failed to get block number") + }) + .await; self.record_rpc(RpcMethod::EthBlockNumber, result.is_ok(), None); if let Err(ref e) = result { trace!(error = %e, "eth_blockNumber failed"); @@ -325,20 +390,23 @@ impl RpcClient { /// `verify_block_integrity`) or when running in a trusted context like the trace server. pub async fn get_header(&self, block_id: BlockId, verify_hash: bool) -> Result
{ let start = Instant::now(); - let result = match block_id { - BlockId::Hash(hash) => self - .data_provider - .client() - .request::<_, Header>("eth_getHeaderByHash", (hash.block_hash,)) - .await - .map_err(|e| eyre!("eth_getHeaderByHash for {} failed: {e}", hash.block_hash)), - BlockId::Number(tag) => self - .data_provider - .client() - .request::<_, Header>("eth_getHeaderByNumber", (tag,)) - .await - .map_err(|e| eyre!("eth_getHeaderByNumber for {:?} failed: {e}", tag)), - }; + let result = Self::apply_timeout(self.config.block_timeout, async { + match block_id { + BlockId::Hash(hash) => self + .data_provider + .client() + .request::<_, Header>("eth_getHeaderByHash", (hash.block_hash,)) + .await + .map_err(|e| eyre!("eth_getHeaderByHash for {} failed: {e}", hash.block_hash)), + BlockId::Number(tag) => self + .data_provider + .client() + .request::<_, Header>("eth_getHeaderByNumber", (tag,)) + .await + .map_err(|e| eyre!("eth_getHeaderByNumber for {:?} failed: {e}", tag)), + } + }) + .await; self.record_rpc( RpcMethod::EthGetHeader, result.is_ok(), @@ -451,11 +519,13 @@ impl RpcClient { ) -> Result<(SaltWitness, MptWitness)> { let start = Instant::now(); let keys = WitnessRequestKeys { block_number: U64::from(number), block_hash: hash }; - let result: Result = provider - .client() - .request("mega_getBlockWitness", (keys,)) - .await - .map_err(|e| eyre!("{} witness fetch failed for block {}: {}", source, number, e)); + let result: Result = + Self::apply_timeout(self.config.witness_timeout, async { + provider.client().request("mega_getBlockWitness", (keys,)).await.map_err(|e| { + eyre!("{} witness fetch failed for block {}: {}", source, number, e) + }) + }) + .await; self.record_rpc(rpc_method, result.is_ok(), Some(start.elapsed().as_secs_f64())); @@ -548,15 +618,17 @@ impl RpcClient { &self, tx_hash: B256, ) -> Result> { - let tx = self - .data_provider - .get_transaction_by_hash(tx_hash) - .await - .map_err(|e| { - trace!(%tx_hash, error = %e, "get_transaction_by_hash failed"); - e - }) - .context("Failed to get transaction by hash")?; + let tx = Self::apply_timeout(self.config.block_timeout, async { + self.data_provider + .get_transaction_by_hash(tx_hash) + .await + .map_err(|e| { + trace!(%tx_hash, error = %e, "get_transaction_by_hash failed"); + eyre!(e) + }) + .context("Failed to get transaction by hash") + }) + .await?; match tx { Some(tx) => { @@ -598,6 +670,9 @@ mod tests { let config = RpcClientConfig::validator(); assert!(!config.skip_block_verification); assert!(config.metrics.is_none()); + assert!(config.block_timeout.is_none()); + assert!(config.witness_timeout.is_none()); + assert!(config.code_timeout.is_none()); } #[test] @@ -605,6 +680,20 @@ mod tests { let config = RpcClientConfig::trace_server(); assert!(config.skip_block_verification); assert!(config.metrics.is_none()); + assert!(config.block_timeout.is_none()); + assert!(config.witness_timeout.is_none()); + assert!(config.code_timeout.is_none()); + } + + #[test] + fn test_rpc_client_config_timeouts() { + let config = RpcClientConfig::validator() + .with_block_timeout(Duration::from_secs(5)) + .with_witness_timeout(Duration::from_secs(30)) + .with_code_timeout(Duration::from_secs(10)); + assert_eq!(config.block_timeout, Some(Duration::from_secs(5))); + assert_eq!(config.witness_timeout, Some(Duration::from_secs(30))); + assert_eq!(config.code_timeout, Some(Duration::from_secs(10))); } #[test]