Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
8b7c84e
feat: add error in case no rpc providers support pubsub
LeoPatOZ Nov 4, 2025
8d156cc
feat: add test
LeoPatOZ Nov 4, 2025
7b9cdfb
feat: refactor test and remove unsued error enum
LeoPatOZ Nov 5, 2025
279eb60
Merge branch 'multiple-providers' into pubsub-checks
LeoPatOZ Nov 5, 2025
45fa778
feat: increase timeout in test
LeoPatOZ Nov 5, 2025
2deb8df
feat: better error handling
LeoPatOZ Nov 5, 2025
572d4e9
ref: add test
LeoPatOZ Nov 5, 2025
25daeb5
Merge branch 'multiple-providers' into pubsub-checks
LeoPatOZ Nov 5, 2025
61584fb
ref: update test
LeoPatOZ Nov 5, 2025
492cc45
Merge branch 'multiple-providers' into pubsub-checks
LeoPatOZ Nov 5, 2025
217b274
Merge branch 'multiple-providers' into pubsub-checks
LeoPatOZ Nov 6, 2025
6c24f0e
Merge branch 'multiple-providers' into pubsub-checks
LeoPatOZ Nov 6, 2025
aa7b665
feat: revert back to old true / false logic + more tests
LeoPatOZ Nov 6, 2025
5915ab7
Merge branch 'multiple-providers' into pubsub-checks
LeoPatOZ Nov 6, 2025
d400ed8
Merge branch 'multiple-providers' into pubsub-checks
LeoPatOZ Nov 6, 2025
fa7fe6d
fix: rename to min delay
LeoPatOZ Nov 6, 2025
e0d8209
feat: more robust test
LeoPatOZ Nov 6, 2025
dc589b0
Merge branch 'multiple-providers' into pubsub-checks
LeoPatOZ Nov 6, 2025
f4b21ba
test: optimize test_ws_fails_http_fallback_returns_primary_error
0xNeshi Nov 7, 2025
497f5b4
test: allow timeout to fail test_ws_fails_http_fallback_returns_prima…
0xNeshi Nov 7, 2025
e7289bc
test: handle blocknotfound explicitly
0xNeshi Nov 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
207 changes: 167 additions & 40 deletions src/robust_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ impl<N: Network> RobustProvider<N> {

/// Set the base delay for exponential backoff retries.
#[must_use]
pub fn min_delay(mut self, retry_interval: Duration) -> Self {
self.min_delay = retry_interval;
pub fn min_delay(mut self, min_delay: Duration) -> Self {
self.min_delay = min_delay;
self
}

Expand All @@ -105,8 +105,8 @@ impl<N: Network> RobustProvider<N> {
///
/// Fallback providers are used when the primary provider times out or fails.
#[must_use]
pub fn fallback(mut self, provider: RootProvider<N>) -> Self {
self.providers.push(provider);
pub fn fallback(mut self, provider: impl Provider<N>) -> Self {
self.providers.push(provider.root().to_owned());
self
}

Expand All @@ -122,9 +122,10 @@ impl<N: Network> RobustProvider<N> {
) -> Result<N::BlockResponse, Error> {
info!("eth_getBlockByNumber called");
let result = self
.retry_with_total_timeout(move |provider| async move {
provider.get_block_by_number(number).await
})
.retry_with_total_timeout(
move |provider| async move { provider.get_block_by_number(number).await },
false,
)
.await;
if let Err(e) = &result {
error!(error = %e, "eth_getByBlockNumber failed");
Expand All @@ -144,6 +145,7 @@ impl<N: Network> RobustProvider<N> {
let result = self
.retry_with_total_timeout(
move |provider| async move { provider.get_block_number().await },
false,
)
.await;
if let Err(e) = &result {
Expand All @@ -164,9 +166,10 @@ impl<N: Network> RobustProvider<N> {
) -> Result<N::BlockResponse, Error> {
info!("eth_getBlockByHash called");
let result = self
.retry_with_total_timeout(move |provider| async move {
provider.get_block_by_hash(hash).await
})
.retry_with_total_timeout(
move |provider| async move { provider.get_block_by_hash(hash).await },
false,
)
.await;
if let Err(e) = &result {
error!(error = %e, "eth_getBlockByHash failed");
Expand All @@ -186,6 +189,7 @@ impl<N: Network> RobustProvider<N> {
let result = self
.retry_with_total_timeout(
move |provider| async move { provider.get_logs(filter).await },
false,
)
.await;
if let Err(e) = &result {
Expand All @@ -202,11 +206,12 @@ impl<N: Network> RobustProvider<N> {
/// after exhausting retries or if the call times out.
pub async fn subscribe_blocks(&self) -> Result<Subscription<N::HeaderResponse>, Error> {
info!("eth_subscribe called");
// We need this otherwise error is not clear
// immediately fail if primary does not support pubsub
self.root().client().expect_pubsub_frontend();
let result = self
.retry_with_total_timeout(
move |provider| async move { provider.subscribe_blocks().await },
true,
)
.await;
if let Err(e) = &result {
Expand All @@ -224,17 +229,27 @@ impl<N: Network> RobustProvider<N> {
/// If the timeout is exceeded and fallback providers are available, it will
/// attempt to use each fallback provider in sequence.
///
/// If `require_pubsub` is true, providers that don't support pubsub will be skipped.
///
/// # Errors
///
/// - Returns [`RpcError<TransportErrorKind>`] with message "total operation timeout exceeded
/// and all fallback providers failed" if the overall timeout elapses and no fallback
/// providers succeed.
/// - Returns [`RpcError::Transport(TransportErrorKind::PubsubUnavailable)`] if `require_pubsub`
/// is true and all providers don't support pubsub.
/// - Propagates any [`RpcError<TransportErrorKind>`] from the underlying retries.
async fn retry_with_total_timeout<T: Debug, F, Fut>(&self, operation: F) -> Result<T, Error>
async fn retry_with_total_timeout<T: Debug, F, Fut>(
&self,
operation: F,
require_pubsub: bool,
) -> Result<T, Error>
where
F: Fn(RootProvider<N>) -> Fut,
Fut: Future<Output = Result<T, RpcError<TransportErrorKind>>>,
{
let mut skipped_count = 0;

let mut providers = self.providers.iter();
let primary = providers.next().expect("should have primary provider");

Expand All @@ -253,6 +268,11 @@ impl<N: Network> RobustProvider<N> {
// This loop starts at index 1 automatically
for (idx, provider) in providers.enumerate() {
let fallback_num = idx + 1;
if require_pubsub && !Self::supports_pubsub(provider) {
info!("Fallback provider {} doesn't support pubsub, skipping", fallback_num);
skipped_count += 1;
continue;
}
info!("Attempting fallback provider {}/{}", fallback_num, self.providers.len() - 1);

match self.try_provider_with_timeout(provider, &operation).await {
Expand All @@ -267,6 +287,13 @@ impl<N: Network> RobustProvider<N> {
}
}

// If all providers were skipped due to pubsub requirement
if skipped_count == self.providers.len() {
error!("All providers skipped - none support pubsub");
return Err(RpcError::Transport(TransportErrorKind::PubsubUnavailable).into());
}

// Return the last error encountered
error!("All providers failed or timed out");
Err(last_error)
}
Expand Down Expand Up @@ -298,25 +325,30 @@ impl<N: Network> RobustProvider<N> {
.map_err(Error::from)?
.map_err(Error::from)
}

/// Check if a provider supports pubsub
fn supports_pubsub(provider: &RootProvider<N>) -> bool {
provider.client().pubsub_frontend().is_some()
}
}

#[cfg(test)]
mod tests {
use super::*;
use alloy::network::Ethereum;
use alloy::{
network::Ethereum,
providers::{ProviderBuilder, WsConnect},
};
use alloy_node_bindings::Anvil;
use std::sync::atomic::{AtomicUsize, Ordering};
use tokio::time::sleep;

fn test_provider(
timeout: u64,
max_retries: usize,
retry_interval: u64,
) -> RobustProvider<Ethereum> {
fn test_provider(timeout: u64, max_retries: usize, min_delay: u64) -> RobustProvider<Ethereum> {
RobustProvider {
providers: vec![RootProvider::new_http("http://localhost:8545".parse().unwrap())],
max_timeout: Duration::from_millis(timeout),
max_retries,
min_delay: Duration::from_millis(retry_interval),
min_delay: Duration::from_millis(min_delay),
}
}

Expand All @@ -327,11 +359,14 @@ mod tests {
let call_count = AtomicUsize::new(0);

let result = provider
.retry_with_total_timeout(|_| async {
call_count.fetch_add(1, Ordering::SeqCst);
let count = call_count.load(Ordering::SeqCst);
Ok(count)
})
.retry_with_total_timeout(
|_| async {
call_count.fetch_add(1, Ordering::SeqCst);
let count = call_count.load(Ordering::SeqCst);
Ok(count)
},
false,
)
.await;

assert!(matches!(result, Ok(1)));
Expand All @@ -344,14 +379,17 @@ mod tests {
let call_count = AtomicUsize::new(0);

let result = provider
.retry_with_total_timeout(|_| async {
call_count.fetch_add(1, Ordering::SeqCst);
let count = call_count.load(Ordering::SeqCst);
match count {
3 => Ok(count),
_ => Err(TransportErrorKind::BackendGone.into()),
}
})
.retry_with_total_timeout(
|_| async {
call_count.fetch_add(1, Ordering::SeqCst);
let count = call_count.load(Ordering::SeqCst);
match count {
3 => Ok(count),
_ => Err(TransportErrorKind::BackendGone.into()),
}
},
false,
)
.await;

assert!(matches!(result, Ok(3)));
Expand All @@ -364,10 +402,13 @@ mod tests {
let call_count = AtomicUsize::new(0);

let result: Result<(), Error> = provider
.retry_with_total_timeout(|_| async {
call_count.fetch_add(1, Ordering::SeqCst);
Err(TransportErrorKind::BackendGone.into())
})
.retry_with_total_timeout(
|_| async {
call_count.fetch_add(1, Ordering::SeqCst);
Err(TransportErrorKind::BackendGone.into())
},
false,
)
.await;

assert!(matches!(result, Err(Error::RpcError(_))));
Expand All @@ -380,12 +421,98 @@ mod tests {
let provider = test_provider(max_timeout, 10, 1);

let result = provider
.retry_with_total_timeout(move |_provider| async move {
sleep(Duration::from_millis(max_timeout + 10)).await;
Ok(42)
})
.retry_with_total_timeout(
move |_provider| async move {
sleep(Duration::from_millis(max_timeout + 10)).await;
Ok(42)
},
false,
)
.await;

assert!(matches!(result, Err(Error::Timeout)));
}

#[tokio::test]
async fn test_subscribe_fails_causes_backup_to_be_used() {
let anvil_1 = Anvil::new().port(2222_u16).try_spawn().expect("Failed to start anvil");

let ws_provider_1 = ProviderBuilder::new()
.connect_ws(WsConnect::new(anvil_1.ws_endpoint_url().as_str()))
.await
.expect("Failed to connect to WS");

let anvil_2 = Anvil::new().port(1111_u16).try_spawn().expect("Failed to start anvil");

let ws_provider_2 = ProviderBuilder::new()
.connect_ws(WsConnect::new(anvil_2.ws_endpoint_url().as_str()))
.await
.expect("Failed to connect to WS");

let robust = RobustProvider::new(ws_provider_1)
.fallback(ws_provider_2)
.max_timeout(Duration::from_secs(5))
.max_retries(10)
.min_delay(Duration::from_millis(100));

drop(anvil_1);

let result = robust.subscribe_blocks().await;

assert!(result.is_ok(), "Expected subscribe blocks to work");
}

#[tokio::test]
#[should_panic(expected = "called pubsub_frontend on a non-pubsub transport")]
async fn test_subscribe_fails_if_primary_provider_lacks_pubsub() {
let anvil = Anvil::new().try_spawn().expect("Failed to start anvil");

let http_provider = ProviderBuilder::new().connect_http(anvil.endpoint_url());
let ws_provider = ProviderBuilder::new()
.connect_ws(WsConnect::new(anvil.ws_endpoint_url().as_str()))
.await
.expect("Failed to connect to WS");

let robust = RobustProvider::new(http_provider)
.fallback(ws_provider)
.max_timeout(Duration::from_secs(5))
.max_retries(10)
.min_delay(Duration::from_millis(100));

let _ = robust.subscribe_blocks().await;
}

#[tokio::test]
async fn test_ws_fails_http_fallback_returns_primary_error() {
let anvil_1 = Anvil::new().try_spawn().expect("Failed to start anvil");

let ws_provider = ProviderBuilder::new()
.connect_ws(WsConnect::new(anvil_1.ws_endpoint_url().as_str()))
.await
.expect("Failed to connect to WS");

let anvil_2 = Anvil::new().port(8222_u16).try_spawn().expect("Failed to start anvil");
let http_provider = ProviderBuilder::new().connect_http(anvil_2.endpoint_url());

let robust = RobustProvider::new(ws_provider.clone())
.fallback(http_provider)
.max_timeout(Duration::from_millis(500))
.max_retries(0)
.min_delay(Duration::from_millis(10));

// force ws_provider to fail and return BackendGone
drop(anvil_1);

let err = robust.subscribe_blocks().await.unwrap_err();

// The error should be either a Timeout or BackendGone from the primary WS provider,
// NOT a PubsubUnavailable error (which would indicate HTTP fallback was attempted)
match err {
Error::Timeout => {}
Error::RpcError(e) => {
assert!(matches!(e.as_ref(), RpcError::Transport(TransportErrorKind::BackendGone)));
}
Error::BlockNotFound(id) => panic!("Unexpected error type: BlockNotFound({id})"),
}
}
}
4 changes: 2 additions & 2 deletions tests/block_range_scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ async fn live_mode_processes_all_blocks_respecting_block_confirmations() -> anyh

robust_provider.root().anvil_mine(Some(1), None).await?;

assert_next!(stream, 6..=6);
assert_next!(stream, 6..=6, timeout = 10);
assert_empty!(stream);

// --- 1 block confirmation ---
Expand All @@ -50,7 +50,7 @@ async fn live_mode_processes_all_blocks_respecting_block_confirmations() -> anyh

robust_provider.root().anvil_mine(Some(1), None).await?;

assert_next!(stream, 11..=11);
assert_next!(stream, 11..=11, timeout = 10);
assert_empty!(stream);

Ok(())
Expand Down