From 8b7c84e514a8bf17922f7ba0288c6aaefd01770e Mon Sep 17 00:00:00 2001 From: Leo Date: Tue, 4 Nov 2025 22:00:24 +0900 Subject: [PATCH 01/13] feat: add error in case no rpc providers support pubsub --- src/error.rs | 4 ++++ src/robust_provider.rs | 4 +++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/src/error.rs b/src/error.rs index cc67dc13..72c86c95 100644 --- a/src/error.rs +++ b/src/error.rs @@ -38,6 +38,9 @@ pub enum ScannerError { #[error("RPC call failed after exhausting all retry attempts: {0}")] RetryFailure(Arc>), + + #[error("No RPC providers support pubsub")] + PubSubNotSupported, } impl From for ScannerError { @@ -46,6 +49,7 @@ impl From for ScannerError { RobustProviderError::Timeout => ScannerError::Timeout, RobustProviderError::RetryFailure(err) => ScannerError::RetryFailure(err), RobustProviderError::BlockNotFound(block) => ScannerError::BlockNotFound(block), + RobustProviderError::PubSubNotSupported => ScannerError::PubSubNotSupported, } } } diff --git a/src/robust_provider.rs b/src/robust_provider.rs index 023bb21d..6482c03a 100644 --- a/src/robust_provider.rs +++ b/src/robust_provider.rs @@ -21,6 +21,8 @@ pub enum Error { RetryFailure(Arc>), #[error("Block not found, Block Id: {0}")] BlockNotFound(BlockId), + #[error("No RPC providers support pubsub")] + PubSubNotSupported, } impl From> for Error { @@ -258,8 +260,8 @@ impl RobustProvider { } } - error!("All providers failed or timed out"); // Return the last error encountered + error!("All providers failed or timed out"); Err(last_error.unwrap_or(Error::Timeout)) } From 8d156cc977b5b59ef3549ca1bbde745c56bebb6e Mon Sep 17 00:00:00 2001 From: Leo Date: Tue, 4 Nov 2025 22:14:15 +0900 Subject: [PATCH 02/13] feat: add test --- src/robust_provider.rs | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/robust_provider.rs b/src/robust_provider.rs index 6482c03a..4c89ff87 100644 --- a/src/robust_provider.rs +++ b/src/robust_provider.rs @@ -384,4 +384,19 @@ mod tests { assert!(matches!(result, Err(Error::Timeout))); } + + #[tokio::test] + async fn test_http_provider_skipped_when_pubsub_required() { + let provider = test_provider(100, 3, 10); + + let result: Result<(), Error> = provider + .retry_with_total_timeout( + |_| async { Ok(()) }, + true, // require pubsub + ) + .await; + + // Should get PubSubNotSupported error + assert!(matches!(result, Err(Error::PubSubNotSupported))); + } } From 7b9cdfb4034bb7dbb3ced4b135fc707e06ded8d5 Mon Sep 17 00:00:00 2001 From: Leo Date: Wed, 5 Nov 2025 18:00:05 +0900 Subject: [PATCH 03/13] feat: refactor test and remove unsued error enum --- src/error.rs | 9 +-------- src/robust_provider.rs | 32 +++++++++++++++++--------------- 2 files changed, 18 insertions(+), 23 deletions(-) diff --git a/src/error.rs b/src/error.rs index 72c86c95..69bf846a 100644 --- a/src/error.rs +++ b/src/error.rs @@ -35,21 +35,14 @@ pub enum ScannerError { #[error("Operation timed out")] Timeout, - - #[error("RPC call failed after exhausting all retry attempts: {0}")] - RetryFailure(Arc>), - - #[error("No RPC providers support pubsub")] - PubSubNotSupported, } impl From for ScannerError { fn from(error: RobustProviderError) -> ScannerError { match error { RobustProviderError::Timeout => ScannerError::Timeout, - RobustProviderError::RetryFailure(err) => ScannerError::RetryFailure(err), + RobustProviderError::RpcError(err) => ScannerError::RpcError(err), RobustProviderError::BlockNotFound(block) => ScannerError::BlockNotFound(block), - RobustProviderError::PubSubNotSupported => ScannerError::PubSubNotSupported, } } } diff --git a/src/robust_provider.rs b/src/robust_provider.rs index 4c89ff87..af714f32 100644 --- a/src/robust_provider.rs +++ b/src/robust_provider.rs @@ -18,16 +18,14 @@ pub enum Error { #[error("Operation timed out")] Timeout, #[error("RPC call failed after exhausting all retry attempts: {0}")] - RetryFailure(Arc>), + RpcError(Arc>), #[error("Block not found, Block Id: {0}")] BlockNotFound(BlockId), - #[error("No RPC providers support pubsub")] - PubSubNotSupported, } impl From> for Error { fn from(err: RpcError) -> Self { - Error::RetryFailure(Arc::new(err)) + Error::RpcError(Arc::new(err)) } } @@ -195,8 +193,11 @@ impl RobustProvider { /// after exhausting retries or if the call times out. pub async fn subscribe_blocks(&self) -> Result, Error> { info!("eth_subscribe called"); - // We need this otherwise error is not clear - self.root().client().expect_pubsub_frontend(); + + self.root().client().pubsub_frontend().ok_or_else(|| { + Error::from(RpcError::Transport(TransportErrorKind::PubsubUnavailable)) + })?; + let result = self .retry_with_total_timeout( move |provider| async move { provider.subscribe_blocks().await }, @@ -366,7 +367,7 @@ mod tests { }) .await; - assert!(matches!(result, Err(Error::RetryFailure(_)))); + assert!(matches!(result, Err(Error::RpcError(_)))); assert_eq!(call_count.load(Ordering::SeqCst), 3); } @@ -389,14 +390,15 @@ mod tests { async fn test_http_provider_skipped_when_pubsub_required() { let provider = test_provider(100, 3, 10); - let result: Result<(), Error> = provider - .retry_with_total_timeout( - |_| async { Ok(()) }, - true, // require pubsub - ) - .await; + let result = provider.subscribe_blocks().await; + + let Err(Error::RpcError(err)) = result else { + panic!("Expected Error::RpcError, got: {result:?}"); + }; - // Should get PubSubNotSupported error - assert!(matches!(result, Err(Error::PubSubNotSupported))); + assert!( + matches!(err.as_ref(), RpcError::Transport(TransportErrorKind::PubsubUnavailable)), + "Expected PubsubUnavailable error, got: {err:?}", + ); } } From 45fa778ea323c975901f490c2762ca676660ca06 Mon Sep 17 00:00:00 2001 From: Leo Date: Wed, 5 Nov 2025 18:26:19 +0900 Subject: [PATCH 04/13] feat: increase timeout in test --- tests/block_range_scanner.rs | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/tests/block_range_scanner.rs b/tests/block_range_scanner.rs index 1055e28c..a6971d26 100644 --- a/tests/block_range_scanner.rs +++ b/tests/block_range_scanner.rs @@ -25,16 +25,16 @@ async fn live_mode_processes_all_blocks_respecting_block_confirmations() -> anyh robust_provider.root().anvil_mine(Some(5), None).await?; - assert_next!(stream, 1..=1); - assert_next!(stream, 2..=2); - assert_next!(stream, 3..=3); - assert_next!(stream, 4..=4); - assert_next!(stream, 5..=5); + assert_next!(stream, 1..=1, timeout = 10); + assert_next!(stream, 2..=2, timeout = 10); + assert_next!(stream, 3..=3, timeout = 10); + assert_next!(stream, 4..=4, timeout = 10); + assert_next!(stream, 5..=5, timeout = 10); let mut stream = assert_empty!(stream); robust_provider.root().anvil_mine(Some(1), None).await?; - assert_next!(stream, 6..=6); + assert_next!(stream, 6..=6, timeout = 10); assert_empty!(stream); // --- 1 block confirmation --- @@ -43,16 +43,16 @@ async fn live_mode_processes_all_blocks_respecting_block_confirmations() -> anyh robust_provider.root().anvil_mine(Some(5), None).await?; - assert_next!(stream, 6..=6); - assert_next!(stream, 7..=7); - assert_next!(stream, 8..=8); - assert_next!(stream, 9..=9); - assert_next!(stream, 10..=10); + assert_next!(stream, 6..=6, timeout = 10); + assert_next!(stream, 7..=7, timeout = 10); + assert_next!(stream, 8..=8, timeout = 10); + assert_next!(stream, 9..=9, timeout = 10); + assert_next!(stream, 10..=10, timeout = 10); let mut stream = assert_empty!(stream); robust_provider.root().anvil_mine(Some(1), None).await?; - assert_next!(stream, 11..=11); + assert_next!(stream, 11..=11, timeout = 10); assert_empty!(stream); Ok(()) From 2deb8dfaee4dfb85f79c629f2f41f4d684f8ba29 Mon Sep 17 00:00:00 2001 From: Leo Date: Wed, 5 Nov 2025 21:50:36 +0900 Subject: [PATCH 05/13] feat: better error handling --- src/robust_provider.rs | 61 +++++++++++++++++++++++++++++------------- 1 file changed, 43 insertions(+), 18 deletions(-) diff --git a/src/robust_provider.rs b/src/robust_provider.rs index af714f32..e6436bbc 100644 --- a/src/robust_provider.rs +++ b/src/robust_provider.rs @@ -10,7 +10,7 @@ use alloy::{ }; use backon::{ExponentialBuilder, Retryable}; use thiserror::Error; -use tokio::time::timeout; +use tokio::time::{error as TokioError, timeout}; use tracing::{error, info}; #[derive(Error, Debug, Clone)] @@ -29,6 +29,12 @@ impl From> for Error { } } +impl From for Error { + fn from(_: TokioError::Elapsed) -> Self { + Error::Timeout + } +} + /// Provider wrapper with built-in retry and timeout mechanisms. /// /// This wrapper around Alloy providers automatically handles retries, @@ -44,9 +50,9 @@ pub struct RobustProvider { // RPC retry and timeout settings /// Default timeout used by `RobustProvider` -pub const DEFAULT_MAX_TIMEOUT: Duration = Duration::from_secs(30); +pub const DEFAULT_MAX_TIMEOUT: Duration = Duration::from_secs(60); /// Default maximum number of retry attempts. -pub const DEFAULT_MAX_RETRIES: usize = 5; +pub const DEFAULT_MAX_RETRIES: usize = 3; /// Default base delay between retries. pub const DEFAULT_RETRY_INTERVAL: Duration = Duration::from_secs(1); @@ -193,11 +199,6 @@ impl RobustProvider { /// after exhausting retries or if the call times out. pub async fn subscribe_blocks(&self) -> Result, Error> { info!("eth_subscribe called"); - - self.root().client().pubsub_frontend().ok_or_else(|| { - Error::from(RpcError::Transport(TransportErrorKind::PubsubUnavailable)) - })?; - let result = self .retry_with_total_timeout( move |provider| async move { provider.subscribe_blocks().await }, @@ -250,10 +251,8 @@ impl RobustProvider { } Err(e) => { last_error = Some(e); - if idx == 0 { - if self.providers.len() > 1 { - info!("Primary provider failed, trying fallback provider(s)"); - } + if idx == 0 && self.providers.len() > 1 { + info!("Primary provider failed, trying fallback provider(s)"); } else { error!(provider_num = idx, err = %last_error.as_ref().unwrap(), "Fallback provider failed with error"); } @@ -280,27 +279,29 @@ impl RobustProvider { .with_max_times(self.max_retries) .with_min_delay(self.retry_interval); - match timeout( + timeout( self.max_timeout, (|| operation(provider.clone())) .retry(retry_strategy) + .when(|err: &RpcError| { + // Don't retry if pubsub is unavailable (it won't suddenly become available) + !matches!(err, RpcError::Transport(TransportErrorKind::PubsubUnavailable)) + }) .notify(|err: &RpcError, dur: Duration| { info!(error = %err, "RPC error retrying after {:?}", dur); }) .sleep(tokio::time::sleep), ) .await - { - Ok(res) => res.map_err(Error::from), - Err(_) => Err(Error::Timeout), - } + .map_err(Error::from)? + .map_err(Error::from) } } #[cfg(test)] mod tests { use super::*; - use alloy::network::Ethereum; + use alloy::{network::Ethereum, providers::WsConnect}; use std::sync::atomic::{AtomicUsize, Ordering}; use tokio::time::sleep; @@ -401,4 +402,28 @@ mod tests { "Expected PubsubUnavailable error, got: {err:?}", ); } + + #[tokio::test] + async fn test_fallback_to_ws_provider_when_http_lacks_pubsub() { + use alloy::providers::ProviderBuilder; + use alloy_node_bindings::Anvil; + + let anvil = Anvil::new().try_spawn().expect("Failed to start anvil"); + + let http_provider = ProviderBuilder::new().connect_http(anvil.endpoint_url()); + let ws_provider = ProviderBuilder::new() + .connect_ws(WsConnect::new(anvil.ws_endpoint_url().as_str())) + .await + .expect("Failed to connect to WS"); + + let robust = RobustProvider::new(http_provider) + .fallback(ws_provider.root().to_owned()) + .max_timeout(Duration::from_secs(5)) + .max_retries(10) + .retry_interval(Duration::from_millis(100)); + + let result = robust.subscribe_blocks().await; + + assert!(result.is_ok(), "Expected success with WS fallback, got: {result:?}"); + } } From 572d4e96f44553692c3be6b588ab3a3072bb16fa Mon Sep 17 00:00:00 2001 From: Leo Date: Wed, 5 Nov 2025 21:50:54 +0900 Subject: [PATCH 06/13] ref: add test --- src/robust_provider.rs | 41 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/src/robust_provider.rs b/src/robust_provider.rs index e6436bbc..2e41fa45 100644 --- a/src/robust_provider.rs +++ b/src/robust_provider.rs @@ -426,4 +426,45 @@ mod tests { assert!(result.is_ok(), "Expected success with WS fallback, got: {result:?}"); } + + #[tokio::test] + async fn test_ws_fails_http_fallback_returns_primary_error() { + use alloy::providers::ProviderBuilder; + use alloy_node_bindings::Anvil; + + let anvil_1 = Anvil::new().port(2222_u16).try_spawn().expect("Failed to start anvil"); + + let ws_provider = ProviderBuilder::new() + .connect_ws(WsConnect::new(anvil_1.ws_endpoint_url().as_str())) + .await + .expect("Failed to connect to WS"); + + let anvil = Anvil::new().port(1111_u16).try_spawn().expect("Failed to start anvil"); + let http_provider = ProviderBuilder::new().connect_http(anvil.endpoint_url()); + + let robust = RobustProvider::new(ws_provider) + .fallback(http_provider.root().to_owned()) + .max_timeout(Duration::from_secs(5)) + .max_retries(1) + .retry_interval(Duration::from_millis(10)); + + drop(anvil_1); + let result = robust.subscribe_blocks().await; + + let Err(err) = result else { + panic!("Expected error, got success"); + }; + + if let Error::RpcError(rpc_err) = err { + assert!( + !matches!( + rpc_err.as_ref(), + RpcError::Transport(TransportErrorKind::PubsubUnavailable) + ), + "Should return primary provider's error (timeout/connection), not fallback's PubsubUnavailable error. Got: {rpc_err:?}", + ); + } else { + // + } + } } From 61584fbb03bbcad3ad87242174ba8176a7583e6e Mon Sep 17 00:00:00 2001 From: Leo Date: Wed, 5 Nov 2025 23:08:47 +0900 Subject: [PATCH 07/13] ref: update test --- src/robust_provider.rs | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/src/robust_provider.rs b/src/robust_provider.rs index 2e41fa45..27a2e67e 100644 --- a/src/robust_provider.rs +++ b/src/robust_provider.rs @@ -451,20 +451,18 @@ mod tests { drop(anvil_1); let result = robust.subscribe_blocks().await; - let Err(err) = result else { - panic!("Expected error, got success"); - }; + assert!(result.is_err(), "Expected error when WS fails and HTTP fallback used"); + + let err = result.unwrap_err(); - if let Error::RpcError(rpc_err) = err { - assert!( - !matches!( + match err { + Error::RpcError(rpc_err) => { + assert!(matches!( rpc_err.as_ref(), - RpcError::Transport(TransportErrorKind::PubsubUnavailable) - ), - "Should return primary provider's error (timeout/connection), not fallback's PubsubUnavailable error. Got: {rpc_err:?}", - ); - } else { - // + RpcError::Transport(TransportErrorKind::BackendGone) + ),); + } + _ => panic!("Expected Error::RpcError, got: {err:?}"), } } } From aa7b6654e7c771e680114426106c623421ecc213 Mon Sep 17 00:00:00 2001 From: Leo Date: Thu, 6 Nov 2025 19:52:42 +0900 Subject: [PATCH 08/13] feat: revert back to old true / false logic + more tests --- src/robust_provider.rs | 193 ++++++++++++++++++++++++++--------------- 1 file changed, 122 insertions(+), 71 deletions(-) diff --git a/src/robust_provider.rs b/src/robust_provider.rs index 27a2e67e..343e0b80 100644 --- a/src/robust_provider.rs +++ b/src/robust_provider.rs @@ -102,8 +102,8 @@ impl RobustProvider { /// /// Fallback providers are used when the primary provider times out or fails. #[must_use] - pub fn fallback(mut self, provider: RootProvider) -> Self { - self.providers.push(provider); + pub fn fallback(mut self, provider: impl Provider) -> Self { + self.providers.push(provider.root().to_owned()); self } @@ -119,9 +119,10 @@ impl RobustProvider { ) -> Result { info!("eth_getBlockByNumber called"); let result = self - .retry_with_total_timeout(move |provider| async move { - provider.get_block_by_number(number).await - }) + .retry_with_total_timeout( + move |provider| async move { provider.get_block_by_number(number).await }, + false, + ) .await; if let Err(e) = &result { error!(error = %e, "eth_getByBlockNumber failed"); @@ -141,6 +142,7 @@ impl RobustProvider { let result = self .retry_with_total_timeout( move |provider| async move { provider.get_block_number().await }, + false, ) .await; if let Err(e) = &result { @@ -161,9 +163,10 @@ impl RobustProvider { ) -> Result { info!("eth_getBlockByHash called"); let result = self - .retry_with_total_timeout(move |provider| async move { - provider.get_block_by_hash(hash).await - }) + .retry_with_total_timeout( + move |provider| async move { provider.get_block_by_hash(hash).await }, + false, + ) .await; if let Err(e) = &result { error!(error = %e, "eth_getBlockByHash failed"); @@ -183,6 +186,7 @@ impl RobustProvider { let result = self .retry_with_total_timeout( move |provider| async move { provider.get_logs(filter).await }, + false, ) .await; if let Err(e) = &result { @@ -199,9 +203,12 @@ impl RobustProvider { /// after exhausting retries or if the call times out. pub async fn subscribe_blocks(&self) -> Result, Error> { info!("eth_subscribe called"); + // immediately fail if primary does not support pubsub + self.root().client().expect_pubsub_frontend(); let result = self .retry_with_total_timeout( move |provider| async move { provider.subscribe_blocks().await }, + true, ) .await; if let Err(e) = &result { @@ -219,21 +226,41 @@ impl RobustProvider { /// If the timeout is exceeded and fallback providers are available, it will /// attempt to use each fallback provider in sequence. /// + /// If `require_pubsub` is true, providers that don't support pubsub will be skipped. + /// /// # Errors /// /// - Returns [`RpcError`] with message "total operation timeout exceeded /// and all fallback providers failed" if the overall timeout elapses and no fallback /// providers succeed. + /// - Returns [`RpcError::Transport(TransportErrorKind::PubsubUnavailable)`] if `require_pubsub` + /// is true and all providers don't support pubsub. /// - Propagates any [`RpcError`] from the underlying retries. - async fn retry_with_total_timeout(&self, operation: F) -> Result + async fn retry_with_total_timeout( + &self, + operation: F, + require_pubsub: bool, + ) -> Result where F: Fn(RootProvider) -> Fut, Fut: Future>>, { let mut last_error = None; + let mut skipped_count = 0; // Try each provider in sequence (first one is primary) for (idx, provider) in self.providers.iter().enumerate() { + // Skip providers that don't support pubsub if required + if require_pubsub && !Self::supports_pubsub(provider) { + if idx == 0 { + info!("Primary provider doesn't support pubsub, skipping"); + } else { + info!("Fallback provider {} doesn't support pubsub, skipping", idx); + } + skipped_count += 1; + continue; + } + if idx == 0 { info!("Attempting primary provider"); } else { @@ -250,16 +277,23 @@ impl RobustProvider { return Ok(value); } Err(e) => { + println!("errr {e:?}"); last_error = Some(e); if idx == 0 && self.providers.len() > 1 { info!("Primary provider failed, trying fallback provider(s)"); - } else { + } else if idx > 0 { error!(provider_num = idx, err = %last_error.as_ref().unwrap(), "Fallback provider failed with error"); } } } } + // If all providers were skipped due to pubsub requirement + if skipped_count == self.providers.len() { + error!("All providers skipped - none support pubsub"); + return Err(RpcError::Transport(TransportErrorKind::PubsubUnavailable).into()); + } + // Return the last error encountered error!("All providers failed or timed out"); Err(last_error.unwrap_or(Error::Timeout)) @@ -283,11 +317,8 @@ impl RobustProvider { self.max_timeout, (|| operation(provider.clone())) .retry(retry_strategy) - .when(|err: &RpcError| { - // Don't retry if pubsub is unavailable (it won't suddenly become available) - !matches!(err, RpcError::Transport(TransportErrorKind::PubsubUnavailable)) - }) .notify(|err: &RpcError, dur: Duration| { + println!("retry {err:?}"); info!(error = %err, "RPC error retrying after {:?}", dur); }) .sleep(tokio::time::sleep), @@ -296,12 +327,21 @@ impl RobustProvider { .map_err(Error::from)? .map_err(Error::from) } + + /// Check if a provider supports pubsub + fn supports_pubsub(provider: &RootProvider) -> bool { + provider.client().pubsub_frontend().is_some() + } } #[cfg(test)] mod tests { use super::*; - use alloy::{network::Ethereum, providers::WsConnect}; + use alloy::{ + network::Ethereum, + providers::{ProviderBuilder, WsConnect}, + }; + use alloy_node_bindings::Anvil; use std::sync::atomic::{AtomicUsize, Ordering}; use tokio::time::sleep; @@ -325,11 +365,14 @@ mod tests { let call_count = AtomicUsize::new(0); let result = provider - .retry_with_total_timeout(|_| async { - call_count.fetch_add(1, Ordering::SeqCst); - let count = call_count.load(Ordering::SeqCst); - Ok(count) - }) + .retry_with_total_timeout( + |_| async { + call_count.fetch_add(1, Ordering::SeqCst); + let count = call_count.load(Ordering::SeqCst); + Ok(count) + }, + false, + ) .await; assert!(matches!(result, Ok(1))); @@ -342,14 +385,17 @@ mod tests { let call_count = AtomicUsize::new(0); let result = provider - .retry_with_total_timeout(|_| async { - call_count.fetch_add(1, Ordering::SeqCst); - let count = call_count.load(Ordering::SeqCst); - match count { - 3 => Ok(count), - _ => Err(TransportErrorKind::BackendGone.into()), - } - }) + .retry_with_total_timeout( + |_| async { + call_count.fetch_add(1, Ordering::SeqCst); + let count = call_count.load(Ordering::SeqCst); + match count { + 3 => Ok(count), + _ => Err(TransportErrorKind::BackendGone.into()), + } + }, + false, + ) .await; assert!(matches!(result, Ok(3))); @@ -362,10 +408,13 @@ mod tests { let call_count = AtomicUsize::new(0); let result: Result<(), Error> = provider - .retry_with_total_timeout(|_| async { - call_count.fetch_add(1, Ordering::SeqCst); - Err(TransportErrorKind::BackendGone.into()) - }) + .retry_with_total_timeout( + |_| async { + call_count.fetch_add(1, Ordering::SeqCst); + Err(TransportErrorKind::BackendGone.into()) + }, + false, + ) .await; assert!(matches!(result, Err(Error::RpcError(_)))); @@ -378,36 +427,50 @@ mod tests { let provider = test_provider(max_timeout, 10, 1); let result = provider - .retry_with_total_timeout(move |_provider| async move { - sleep(Duration::from_millis(max_timeout + 10)).await; - Ok(42) - }) + .retry_with_total_timeout( + move |_provider| async move { + sleep(Duration::from_millis(max_timeout + 10)).await; + Ok(42) + }, + false, + ) .await; assert!(matches!(result, Err(Error::Timeout))); } #[tokio::test] - async fn test_http_provider_skipped_when_pubsub_required() { - let provider = test_provider(100, 3, 10); + async fn test_subscribe_fails_causes_backup_to_be_used() { + let anvil_1 = Anvil::new().port(2222_u16).try_spawn().expect("Failed to start anvil"); + + let ws_provider_1 = ProviderBuilder::new() + .connect_ws(WsConnect::new(anvil_1.ws_endpoint_url().as_str())) + .await + .expect("Failed to connect to WS"); + + let anvil_2 = Anvil::new().port(1111_u16).try_spawn().expect("Failed to start anvil"); + + let ws_provider_2 = ProviderBuilder::new() + .connect_ws(WsConnect::new(anvil_2.ws_endpoint_url().as_str())) + .await + .expect("Failed to connect to WS"); + + let robust = RobustProvider::new(ws_provider_1) + .fallback(ws_provider_2) + .max_timeout(Duration::from_secs(5)) + .max_retries(10) + .retry_interval(Duration::from_millis(100)); - let result = provider.subscribe_blocks().await; + drop(anvil_1); - let Err(Error::RpcError(err)) = result else { - panic!("Expected Error::RpcError, got: {result:?}"); - }; + let result = robust.subscribe_blocks().await; - assert!( - matches!(err.as_ref(), RpcError::Transport(TransportErrorKind::PubsubUnavailable)), - "Expected PubsubUnavailable error, got: {err:?}", - ); + assert!(result.is_ok(), "Expected subscribe blocks to work"); } #[tokio::test] - async fn test_fallback_to_ws_provider_when_http_lacks_pubsub() { - use alloy::providers::ProviderBuilder; - use alloy_node_bindings::Anvil; - + #[should_panic(expected = "called pubsub_frontend on a non-pubsub transport")] + async fn test_subscribe_fails_if_primary_provider_lacks_pubsub() { let anvil = Anvil::new().try_spawn().expect("Failed to start anvil"); let http_provider = ProviderBuilder::new().connect_http(anvil.endpoint_url()); @@ -417,52 +480,40 @@ mod tests { .expect("Failed to connect to WS"); let robust = RobustProvider::new(http_provider) - .fallback(ws_provider.root().to_owned()) + .fallback(ws_provider) .max_timeout(Duration::from_secs(5)) .max_retries(10) .retry_interval(Duration::from_millis(100)); - let result = robust.subscribe_blocks().await; - - assert!(result.is_ok(), "Expected success with WS fallback, got: {result:?}"); + let _ = robust.subscribe_blocks().await; } #[tokio::test] async fn test_ws_fails_http_fallback_returns_primary_error() { - use alloy::providers::ProviderBuilder; - use alloy_node_bindings::Anvil; - - let anvil_1 = Anvil::new().port(2222_u16).try_spawn().expect("Failed to start anvil"); + let anvil_1 = Anvil::new().port(8111_u16).try_spawn().expect("Failed to start anvil"); let ws_provider = ProviderBuilder::new() .connect_ws(WsConnect::new(anvil_1.ws_endpoint_url().as_str())) .await .expect("Failed to connect to WS"); - let anvil = Anvil::new().port(1111_u16).try_spawn().expect("Failed to start anvil"); - let http_provider = ProviderBuilder::new().connect_http(anvil.endpoint_url()); + let anvil_2 = Anvil::new().port(8222_u16).try_spawn().expect("Failed to start anvil"); + let http_provider = ProviderBuilder::new().connect_http(anvil_2.endpoint_url()); let robust = RobustProvider::new(ws_provider) - .fallback(http_provider.root().to_owned()) - .max_timeout(Duration::from_secs(5)) + .fallback(http_provider) + .max_timeout(Duration::from_secs(1)) .max_retries(1) .retry_interval(Duration::from_millis(10)); drop(anvil_1); + let result = robust.subscribe_blocks().await; assert!(result.is_err(), "Expected error when WS fails and HTTP fallback used"); let err = result.unwrap_err(); - match err { - Error::RpcError(rpc_err) => { - assert!(matches!( - rpc_err.as_ref(), - RpcError::Transport(TransportErrorKind::BackendGone) - ),); - } - _ => panic!("Expected Error::RpcError, got: {err:?}"), - } + assert!(matches!(err, Error::Timeout)); } } From fa7fe6db4d66db0f70ac1e4837fe4ee1bdd7344e Mon Sep 17 00:00:00 2001 From: Leo Date: Thu, 6 Nov 2025 20:21:33 +0900 Subject: [PATCH 09/13] fix: rename to min delay --- src/robust_provider.rs | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/src/robust_provider.rs b/src/robust_provider.rs index 7644d46a..d154f702 100644 --- a/src/robust_provider.rs +++ b/src/robust_provider.rs @@ -85,8 +85,8 @@ impl RobustProvider { /// Set the base delay for exponential backoff retries. #[must_use] - pub fn min_delay(mut self, retry_interval: Duration) -> Self { - self.min_delay = retry_interval; + pub fn min_delay(mut self, min_delay: Duration) -> Self { + self.min_delay = min_delay; self } @@ -348,16 +348,12 @@ mod tests { use std::sync::atomic::{AtomicUsize, Ordering}; use tokio::time::sleep; - fn test_provider( - timeout: u64, - max_retries: usize, - retry_interval: u64, - ) -> RobustProvider { + fn test_provider(timeout: u64, max_retries: usize, min_delay: u64) -> RobustProvider { RobustProvider { providers: vec![RootProvider::new_http("http://localhost:8545".parse().unwrap())], max_timeout: Duration::from_millis(timeout), max_retries, - min_delay: Duration::from_millis(retry_interval), + min_delay: Duration::from_millis(min_delay), } } @@ -462,7 +458,7 @@ mod tests { .fallback(ws_provider_2) .max_timeout(Duration::from_secs(5)) .max_retries(10) - .retry_interval(Duration::from_millis(100)); + .min_delay(Duration::from_millis(100)); drop(anvil_1); @@ -486,7 +482,7 @@ mod tests { .fallback(ws_provider) .max_timeout(Duration::from_secs(5)) .max_retries(10) - .retry_interval(Duration::from_millis(100)); + .min_delay(Duration::from_millis(100)); let _ = robust.subscribe_blocks().await; } @@ -507,7 +503,7 @@ mod tests { .fallback(http_provider) .max_timeout(Duration::from_secs(1)) .max_retries(1) - .retry_interval(Duration::from_millis(10)); + .min_delay(Duration::from_millis(10)); drop(anvil_1); From e0d82093c57f20d87c9d5fddd4842320de1c62ef Mon Sep 17 00:00:00 2001 From: Leo Date: Thu, 6 Nov 2025 20:57:58 +0900 Subject: [PATCH 10/13] feat: more robust test --- src/robust_provider.rs | 37 +++++++++++++++++++++++++++++++------ 1 file changed, 31 insertions(+), 6 deletions(-) diff --git a/src/robust_provider.rs b/src/robust_provider.rs index d154f702..ed995047 100644 --- a/src/robust_provider.rs +++ b/src/robust_provider.rs @@ -280,7 +280,6 @@ impl RobustProvider { return Ok(value); } Err(e) => { - println!("errr {e:?}"); last_error = Some(e); if idx == 0 && self.providers.len() > 1 { info!("Primary provider failed, trying fallback provider(s)"); @@ -321,7 +320,6 @@ impl RobustProvider { (|| operation(provider.clone())) .retry(retry_strategy) .notify(|err: &RpcError, dur: Duration| { - println!("retry {err:?}"); info!(error = %err, "RPC error retrying after {:?}", dur); }) .sleep(tokio::time::sleep), @@ -499,20 +497,47 @@ mod tests { let anvil_2 = Anvil::new().port(8222_u16).try_spawn().expect("Failed to start anvil"); let http_provider = ProviderBuilder::new().connect_http(anvil_2.endpoint_url()); - let robust = RobustProvider::new(ws_provider) + let robust = RobustProvider::new(ws_provider.clone()) .fallback(http_provider) - .max_timeout(Duration::from_secs(1)) - .max_retries(1) + .max_timeout(Duration::from_millis(500)) + .max_retries(0) .min_delay(Duration::from_millis(10)); drop(anvil_1); + // Verify that the WS connection is actually dead before proceeding + // Retry until it fails to ensure the connection is truly dead + let mut retries = 0; + loop { + let verification = ws_provider.get_block_number().await; + if verification.is_err() { + break; + } + sleep(Duration::from_millis(50)).await; + retries += 1; + assert!(retries < 50, "WS provider should have failed after anvil dropped"); + } + let result = robust.subscribe_blocks().await; assert!(result.is_err(), "Expected error when WS fails and HTTP fallback used"); let err = result.unwrap_err(); - assert!(matches!(err, Error::Timeout)); + // The error should be either a Timeout or BackendGone from the primary WS provider, + // NOT a PubsubUnavailable error (which would indicate HTTP fallback was attempted) + match err { + Error::Timeout => { + // Expected - WS provider timed out + } + Error::RpcError(e) => { + if matches!(e.as_ref(), RpcError::Transport(TransportErrorKind::PubsubUnavailable)) + { + panic!("Should not get PubsubUnavailable error"); + } + // All other RPC errors (including BackendGone) are acceptable + } + Error::BlockNotFound(_) => panic!("Unexpected error type: {err:?}"), + } } } From f4b21ba15a684dc156f198984ed2d5f4be39a449 Mon Sep 17 00:00:00 2001 From: 0xNeshi Date: Fri, 7 Nov 2025 08:53:43 +0100 Subject: [PATCH 11/13] test: optimize test_ws_fails_http_fallback_returns_primary_error --- src/robust_provider.rs | 32 ++++---------------------------- 1 file changed, 4 insertions(+), 28 deletions(-) diff --git a/src/robust_provider.rs b/src/robust_provider.rs index c9482060..fb549c51 100644 --- a/src/robust_provider.rs +++ b/src/robust_provider.rs @@ -484,7 +484,7 @@ mod tests { #[tokio::test] async fn test_ws_fails_http_fallback_returns_primary_error() { - let anvil_1 = Anvil::new().port(8111_u16).try_spawn().expect("Failed to start anvil"); + let anvil_1 = Anvil::new().try_spawn().expect("Failed to start anvil"); let ws_provider = ProviderBuilder::new() .connect_ws(WsConnect::new(anvil_1.ws_endpoint_url().as_str())) @@ -502,39 +502,15 @@ mod tests { drop(anvil_1); - // Verify that the WS connection is actually dead before proceeding - // Retry until it fails to ensure the connection is truly dead - let mut retries = 0; - loop { - let verification = ws_provider.get_block_number().await; - if verification.is_err() { - break; - } - sleep(Duration::from_millis(50)).await; - retries += 1; - assert!(retries < 50, "WS provider should have failed after anvil dropped"); - } - - let result = robust.subscribe_blocks().await; - - assert!(result.is_err(), "Expected error when WS fails and HTTP fallback used"); - - let err = result.unwrap_err(); + let err = robust.subscribe_blocks().await.unwrap_err(); // The error should be either a Timeout or BackendGone from the primary WS provider, // NOT a PubsubUnavailable error (which would indicate HTTP fallback was attempted) match err { - Error::Timeout => { - // Expected - WS provider timed out - } Error::RpcError(e) => { - if matches!(e.as_ref(), RpcError::Transport(TransportErrorKind::PubsubUnavailable)) - { - panic!("Should not get PubsubUnavailable error"); - } - // All other RPC errors (including BackendGone) are acceptable + assert!(matches!(e.as_ref(), RpcError::Transport(TransportErrorKind::BackendGone))); } - Error::BlockNotFound(_) => panic!("Unexpected error type: {err:?}"), + other => panic!("Unexpected error type: {other:?}"), } } } From 497f5b42879242eab0d812db57a4638b83411256 Mon Sep 17 00:00:00 2001 From: 0xNeshi Date: Fri, 7 Nov 2025 08:56:37 +0100 Subject: [PATCH 12/13] test: allow timeout to fail test_ws_fails_http_fallback_returns_primary_error --- src/robust_provider.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/robust_provider.rs b/src/robust_provider.rs index fb549c51..6d4681b3 100644 --- a/src/robust_provider.rs +++ b/src/robust_provider.rs @@ -500,6 +500,7 @@ mod tests { .max_retries(0) .min_delay(Duration::from_millis(10)); + // force ws_provider to fail drop(anvil_1); let err = robust.subscribe_blocks().await.unwrap_err(); @@ -507,6 +508,7 @@ mod tests { // The error should be either a Timeout or BackendGone from the primary WS provider, // NOT a PubsubUnavailable error (which would indicate HTTP fallback was attempted) match err { + Error::Timeout => {} Error::RpcError(e) => { assert!(matches!(e.as_ref(), RpcError::Transport(TransportErrorKind::BackendGone))); } From e7289bc86cc482970a8bd34e2f0771de688a0dcc Mon Sep 17 00:00:00 2001 From: 0xNeshi Date: Fri, 7 Nov 2025 08:59:03 +0100 Subject: [PATCH 13/13] test: handle blocknotfound explicitly --- src/robust_provider.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/robust_provider.rs b/src/robust_provider.rs index 6d4681b3..eb277e80 100644 --- a/src/robust_provider.rs +++ b/src/robust_provider.rs @@ -500,7 +500,7 @@ mod tests { .max_retries(0) .min_delay(Duration::from_millis(10)); - // force ws_provider to fail + // force ws_provider to fail and return BackendGone drop(anvil_1); let err = robust.subscribe_blocks().await.unwrap_err(); @@ -512,7 +512,7 @@ mod tests { Error::RpcError(e) => { assert!(matches!(e.as_ref(), RpcError::Transport(TransportErrorKind::BackendGone))); } - other => panic!("Unexpected error type: {other:?}"), + Error::BlockNotFound(id) => panic!("Unexpected error type: BlockNotFound({id})"), } } }