Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion bin/stateless-validator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,21 @@ struct CommandLineArgs {
#[clap(long, env = "STATELESS_VALIDATOR_METRICS_PORT", default_value_t = metrics::DEFAULT_METRICS_PORT)]
metrics_port: u16,

/// Timeout in seconds for block and header RPC fetches (eth_getBlockByNumber, etc.).
/// Defaults to no timeout.
#[clap(long, env = "STATELESS_VALIDATOR_BLOCK_TIMEOUT_SECS")]
block_timeout_secs: Option<u64>,

/// Timeout in seconds for witness RPC fetches (mega_getBlockWitness).
/// Defaults to no timeout.
#[clap(long, env = "STATELESS_VALIDATOR_WITNESS_TIMEOUT_SECS")]
witness_timeout_secs: Option<u64>,

/// Timeout in seconds for contract code RPC fetches (eth_getCodeByHash).
/// Defaults to no timeout.
#[clap(long, env = "STATELESS_VALIDATOR_CODE_TIMEOUT_SECS")]
code_timeout_secs: Option<u64>,

/// Logging configuration.
#[command(flatten)]
log: LogArgs,
Expand Down Expand Up @@ -148,7 +163,17 @@ async fn main() -> Result<()> {

let work_dir = PathBuf::from(args.data_dir);

let rpc_config = RpcClientConfig::validator().with_metrics(Arc::new(metrics::ValidatorMetrics));
let mut rpc_config =
RpcClientConfig::validator().with_metrics(Arc::new(metrics::ValidatorMetrics));
if let Some(secs) = args.block_timeout_secs {
rpc_config = rpc_config.with_block_timeout(Duration::from_secs(secs));
}
if let Some(secs) = args.witness_timeout_secs {
rpc_config = rpc_config.with_witness_timeout(Duration::from_secs(secs));
}
if let Some(secs) = args.code_timeout_secs {
rpc_config = rpc_config.with_code_timeout(Duration::from_secs(secs));
}
let client = Arc::new(RpcClient::new_with_config(
&args.rpc_endpoint,
&args.witness_endpoint,
Expand Down
177 changes: 133 additions & 44 deletions crates/stateless-core/src/rpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@
//!
//! Provides methods to fetch blocks, witnesses, and contract bytecode from MegaETH nodes.

use std::{collections::HashMap, sync::Arc, time::Instant};
use std::{
collections::HashMap,
sync::Arc,
time::{Duration, Instant},
};

use alloy_primitives::{B256, Bytes, U64};
use alloy_provider::{Provider, ProviderBuilder, RootProvider};
Expand Down Expand Up @@ -101,6 +105,15 @@ pub struct RpcClientConfig {
pub skip_block_verification: bool,
/// Optional metrics callbacks for tracking RPC performance.
pub metrics: Option<Arc<dyn RpcMetrics>>,
/// Timeout for block and header fetches (eth_getBlockByNumber, eth_getHeaderByNumber, etc.).
/// `None` means no timeout.
pub block_timeout: Option<Duration>,
/// Timeout for witness fetches (mega_getBlockWitness).
/// `None` means no timeout.
pub witness_timeout: Option<Duration>,
/// Timeout for contract code fetches (eth_getCodeByHash).
/// `None` means no timeout.
pub code_timeout: Option<Duration>,
}

impl std::fmt::Debug for RpcClientConfig {
Expand All @@ -115,19 +128,49 @@ impl std::fmt::Debug for RpcClientConfig {
impl RpcClientConfig {
/// Creates a config for validation mode (full verification).
pub fn validator() -> Self {
Self { skip_block_verification: false, metrics: None }
Self {
skip_block_verification: false,
metrics: None,
block_timeout: None,
witness_timeout: None,
code_timeout: None,
}
}

/// Creates a config for trace/debug mode (skip verification).
pub fn trace_server() -> Self {
Self { skip_block_verification: true, metrics: None }
Self {
skip_block_verification: true,
metrics: None,
block_timeout: None,
witness_timeout: None,
code_timeout: None,
}
}

/// Sets the metrics callbacks.
pub fn with_metrics(mut self, metrics: Arc<dyn RpcMetrics>) -> Self {
self.metrics = Some(metrics);
self
}

/// Sets the timeout for block and header fetches.
pub fn with_block_timeout(mut self, timeout: Duration) -> Self {
self.block_timeout = Some(timeout);
self
}

/// Sets the timeout for witness fetches.
pub fn with_witness_timeout(mut self, timeout: Duration) -> Self {
self.witness_timeout = Some(timeout);
self
}

/// Sets the timeout for contract code fetches.
pub fn with_code_timeout(mut self, timeout: Duration) -> Self {
self.code_timeout = Some(timeout);
self
}
}

/// Response from mega_setValidatedBlocks RPC call
Expand Down Expand Up @@ -218,15 +261,31 @@ impl RpcClient {
}
}

/// Applies an optional timeout to a future, returning a timeout error if exceeded.
async fn apply_timeout<F, T>(timeout: Option<Duration>, fut: F) -> Result<T>
where
F: std::future::Future<Output = Result<T>>,
{
match timeout {
Some(d) => tokio::time::timeout(d, fut)
.await
.map_err(|_| eyre!("RPC request timed out after {:.1}s", d.as_secs_f64()))
.and_then(|r| r),
None => fut.await,
}
}

/// Gets contract bytecode for a code hash.
pub async fn get_code(&self, hash: B256) -> Result<Bytes> {
let start = Instant::now();
let result = self
.data_provider
.client()
.request("eth_getCodeByHash", (hash,))
.await
.map_err(|e| eyre!("eth_getCodeByHash for hash {hash:?} failed: {e}"));
let result = Self::apply_timeout(self.config.code_timeout, async {
self.data_provider
.client()
.request("eth_getCodeByHash", (hash,))
.await
.map_err(|e| eyre!("eth_getCodeByHash for hash {hash:?} failed: {e}"))
})
.await;
self.record_rpc(
RpcMethod::EthGetCodeByHash,
result.is_ok(),
Expand Down Expand Up @@ -269,11 +328,15 @@ impl RpcClient {
block_id: BlockId,
full_txs: bool,
) -> Result<Block<Transaction>> {
let block = if full_txs {
self.data_provider.get_block(block_id).full().await?
} else {
self.data_provider.get_block(block_id).await?
};
let block = Self::apply_timeout(self.config.block_timeout, async {
let block = if full_txs {
self.data_provider.get_block(block_id).full().await?
} else {
self.data_provider.get_block(block_id).await?
};
Ok(block)
})
.await?;

let block = block.ok_or_else(|| eyre!("Block {:?} not found", block_id))?;

Expand Down Expand Up @@ -303,8 +366,10 @@ impl RpcClient {

/// Gets the current latest block number from the blockchain.
pub async fn get_latest_block_number(&self) -> Result<u64> {
let result =
self.data_provider.get_block_number().await.context("Failed to get block number");
let result = Self::apply_timeout(self.config.block_timeout, async {
self.data_provider.get_block_number().await.context("Failed to get block number")
})
.await;
self.record_rpc(RpcMethod::EthBlockNumber, result.is_ok(), None);
if let Err(ref e) = result {
trace!(error = %e, "eth_blockNumber failed");
Expand All @@ -325,20 +390,23 @@ impl RpcClient {
/// `verify_block_integrity`) or when running in a trusted context like the trace server.
pub async fn get_header(&self, block_id: BlockId, verify_hash: bool) -> Result<Header> {
let start = Instant::now();
let result = match block_id {
BlockId::Hash(hash) => self
.data_provider
.client()
.request::<_, Header>("eth_getHeaderByHash", (hash.block_hash,))
.await
.map_err(|e| eyre!("eth_getHeaderByHash for {} failed: {e}", hash.block_hash)),
BlockId::Number(tag) => self
.data_provider
.client()
.request::<_, Header>("eth_getHeaderByNumber", (tag,))
.await
.map_err(|e| eyre!("eth_getHeaderByNumber for {:?} failed: {e}", tag)),
};
let result = Self::apply_timeout(self.config.block_timeout, async {
match block_id {
BlockId::Hash(hash) => self
.data_provider
.client()
.request::<_, Header>("eth_getHeaderByHash", (hash.block_hash,))
.await
.map_err(|e| eyre!("eth_getHeaderByHash for {} failed: {e}", hash.block_hash)),
BlockId::Number(tag) => self
.data_provider
.client()
.request::<_, Header>("eth_getHeaderByNumber", (tag,))
.await
.map_err(|e| eyre!("eth_getHeaderByNumber for {:?} failed: {e}", tag)),
}
})
.await;
self.record_rpc(
RpcMethod::EthGetHeader,
result.is_ok(),
Expand Down Expand Up @@ -451,11 +519,13 @@ impl RpcClient {
) -> Result<(SaltWitness, MptWitness)> {
let start = Instant::now();
let keys = WitnessRequestKeys { block_number: U64::from(number), block_hash: hash };
let result: Result<String> = provider
.client()
.request("mega_getBlockWitness", (keys,))
.await
.map_err(|e| eyre!("{} witness fetch failed for block {}: {}", source, number, e));
let result: Result<String> =
Self::apply_timeout(self.config.witness_timeout, async {
provider.client().request("mega_getBlockWitness", (keys,)).await.map_err(|e| {
eyre!("{} witness fetch failed for block {}: {}", source, number, e)
})
})
.await;

self.record_rpc(rpc_method, result.is_ok(), Some(start.elapsed().as_secs_f64()));

Expand Down Expand Up @@ -548,15 +618,17 @@ impl RpcClient {
&self,
tx_hash: B256,
) -> Result<Option<(Transaction, B256)>> {
let tx = self
.data_provider
.get_transaction_by_hash(tx_hash)
.await
.map_err(|e| {
trace!(%tx_hash, error = %e, "get_transaction_by_hash failed");
e
})
.context("Failed to get transaction by hash")?;
let tx = Self::apply_timeout(self.config.block_timeout, async {
self.data_provider
.get_transaction_by_hash(tx_hash)
.await
.map_err(|e| {
trace!(%tx_hash, error = %e, "get_transaction_by_hash failed");
eyre!(e)
})
.context("Failed to get transaction by hash")
})
.await?;

match tx {
Some(tx) => {
Expand Down Expand Up @@ -598,13 +670,30 @@ mod tests {
let config = RpcClientConfig::validator();
assert!(!config.skip_block_verification);
assert!(config.metrics.is_none());
assert!(config.block_timeout.is_none());
assert!(config.witness_timeout.is_none());
assert!(config.code_timeout.is_none());
}

#[test]
fn test_rpc_client_config_trace_server() {
let config = RpcClientConfig::trace_server();
assert!(config.skip_block_verification);
assert!(config.metrics.is_none());
assert!(config.block_timeout.is_none());
assert!(config.witness_timeout.is_none());
assert!(config.code_timeout.is_none());
}

#[test]
fn test_rpc_client_config_timeouts() {
let config = RpcClientConfig::validator()
.with_block_timeout(Duration::from_secs(5))
.with_witness_timeout(Duration::from_secs(30))
.with_code_timeout(Duration::from_secs(10));
assert_eq!(config.block_timeout, Some(Duration::from_secs(5)));
assert_eq!(config.witness_timeout, Some(Duration::from_secs(30)));
assert_eq!(config.code_timeout, Some(Duration::from_secs(10)));
}

#[test]
Expand Down