diff --git a/README.md b/README.md index 4d8737ae..bf74df65 100644 --- a/README.md +++ b/README.md @@ -59,20 +59,24 @@ event-scanner = "0.4.0-alpha" Create an event stream for the given event filters registered with the `EventScanner`: ```rust -use alloy::{network::Ethereum, sol_types::SolEvent}; -use event_scanner::{EventFilter, EventScannerBuilder, Message}; +use alloy::{network::Ethereum, providers::{Provider, ProviderBuilder}, sol_types::SolEvent}; +use event_scanner::{EventFilter, EventScannerBuilder, Message, robust_provider::RobustProviderBuilder}; use tokio_stream::StreamExt; use crate::MyContract; async fn run_scanner( - ws_url: alloy::transports::http::reqwest::Url, + ws_url: &str, contract: alloy::primitives::Address, ) -> Result<(), Box> { + // Connect to provider + let provider = ProviderBuilder::new().connect(ws_url).await?; + let robust_provider = RobustProviderBuilder::new(provider).build().await?; + // Configure scanner with custom batch size (optional) let mut scanner = EventScannerBuilder::live() .max_block_range(500) // Process up to 500 blocks per batch - .connect_ws::(ws_url).await?; + .connect(robust_provider); // Register an event listener let filter = EventFilter::new() @@ -109,48 +113,56 @@ async fn run_scanner( ### Building a Scanner -`EventScannerBuilder` provides mode-specific constructors and a functions to configure settings before connecting. -Once configured, connect using one of: +`EventScannerBuilder` provides mode-specific constructors and functions to configure settings before connecting. +Once configured, connect using: -- `connect_ws::(ws_url)` -- `connect_ipc::(path)` -- `connect::(provider)` +- `connect(provider)` - Connect using a `RobustProvider` wrapping your alloy provider or using an alloy provider directly This will connect the `EventScanner` and allow you to create event streams and start scanning in various [modes](#scanning-modes). ```rust +use alloy::providers::{Provider, ProviderBuilder}; +use event_scanner::robust_provider::RobustProviderBuilder; + +// Connect to provider (example with WebSocket) +let provider = ProviderBuilder::new().connect("ws://localhost:8545").await?; + // Live streaming mode let scanner = EventScannerBuilder::live() .max_block_range(500) // Optional: set max blocks per read (default: 1000) .block_confirmations(12) // Optional: set block confirmations (default: 12) - .connect_ws::(ws_url).await?; + .connect(provider.clone()); // Historical block range mode let scanner = EventScannerBuilder::historic() .from_block(1_000_000) .to_block(2_000_000) .max_block_range(500) - .connect_ws::(ws_url).await?; + .connect(provider.clone()); + +// we can also wrap the provider in a RobustProvider +// for more advanced configurations like retries and fallbacks +let robust_provider = RobustProviderBuilder::new(provider).build().await?; // Latest events mode let scanner = EventScannerBuilder::latest(100) // .from_block(1_000_000) // Optional: set start of search range // .to_block(2_000_000) // Optional: set end of search range .max_block_range(500) - .connect_ws::(ws_url).await?; + .connect(robust_provider.clone()); // Sync from block then switch to live mode let scanner = EventScannerBuilder::sync() .from_block(100) .max_block_range(500) .block_confirmations(12) - .connect_ws::(ws_url).await?; + .connect(robust_provider.clone()); // Sync the latest 60 events then switch to live mode let scanner = EventScannerBuilder::sync() .from_latest(60) .block_confirmations(12) - .connect_ws::(ws_url).await?; + .connect(robust_provider); ``` Invoking `scanner.start()` starts the scanner in the specified mode. diff --git a/examples/historical_scanning/main.rs b/examples/historical_scanning/main.rs index f95476b0..1d2cc039 100644 --- a/examples/historical_scanning/main.rs +++ b/examples/historical_scanning/main.rs @@ -1,7 +1,9 @@ -use alloy::{network::Ethereum, providers::ProviderBuilder, sol, sol_types::SolEvent}; +use alloy::{providers::ProviderBuilder, sol, sol_types::SolEvent}; use alloy_node_bindings::Anvil; -use event_scanner::{EventFilter, EventScannerBuilder, Message}; +use event_scanner::{ + EventFilter, EventScannerBuilder, Message, robust_provider::RobustProviderBuilder, +}; use tokio_stream::StreamExt; use tracing::{error, info}; use tracing_subscriber::EnvFilter; @@ -38,9 +40,11 @@ async fn main() -> anyhow::Result<()> { let anvil = Anvil::new().block_time_f64(0.1).try_spawn()?; let wallet = anvil.wallet(); - let provider = - ProviderBuilder::new().wallet(wallet.unwrap()).connect(anvil.endpoint().as_str()).await?; - let counter_contract = Counter::deploy(provider).await?; + let provider = ProviderBuilder::new() + .wallet(wallet.unwrap()) + .connect(anvil.ws_endpoint_url().as_str()) + .await?; + let counter_contract = Counter::deploy(provider.clone()).await?; let contract_address = counter_contract.address(); @@ -50,8 +54,14 @@ async fn main() -> anyhow::Result<()> { let _ = counter_contract.increase().send().await?.get_receipt().await?; - let mut scanner = - EventScannerBuilder::historic().connect_ws::(anvil.ws_endpoint_url()).await?; + let robust_provider = RobustProviderBuilder::new(provider) + .max_timeout(std::time::Duration::from_secs(30)) + .max_retries(5) + .min_delay(std::time::Duration::from_millis(500)) + .build() + .await?; + + let mut scanner = EventScannerBuilder::historic().connect(robust_provider).await?; let mut stream = scanner.subscribe(increase_filter); diff --git a/examples/latest_events_scanning/main.rs b/examples/latest_events_scanning/main.rs index dad70168..a51c404f 100644 --- a/examples/latest_events_scanning/main.rs +++ b/examples/latest_events_scanning/main.rs @@ -1,6 +1,8 @@ -use alloy::{network::Ethereum, providers::ProviderBuilder, sol, sol_types::SolEvent}; +use alloy::{providers::ProviderBuilder, sol, sol_types::SolEvent}; use alloy_node_bindings::Anvil; -use event_scanner::{EventFilter, EventScannerBuilder, Message}; +use event_scanner::{ + EventFilter, EventScannerBuilder, Message, robust_provider::RobustProviderBuilder, +}; use tokio_stream::StreamExt; use tracing::{error, info}; use tracing_subscriber::EnvFilter; @@ -37,9 +39,11 @@ async fn main() -> anyhow::Result<()> { let anvil = Anvil::new().block_time_f64(0.5).try_spawn()?; let wallet = anvil.wallet(); - let provider = - ProviderBuilder::new().wallet(wallet.unwrap()).connect(anvil.endpoint().as_str()).await?; - let counter_contract = Counter::deploy(provider).await?; + let provider = ProviderBuilder::new() + .wallet(wallet.unwrap()) + .connect(anvil.ws_endpoint_url().as_str()) + .await?; + let counter_contract = Counter::deploy(provider.clone()).await?; let contract_address = counter_contract.address(); @@ -47,8 +51,14 @@ async fn main() -> anyhow::Result<()> { .contract_address(*contract_address) .event(Counter::CountIncreased::SIGNATURE); - let mut scanner = - EventScannerBuilder::latest(5).connect_ws::(anvil.ws_endpoint_url()).await?; + let robust_provider = RobustProviderBuilder::new(provider) + .max_timeout(std::time::Duration::from_secs(30)) + .max_retries(5) + .min_delay(std::time::Duration::from_millis(500)) + .build() + .await?; + + let mut scanner = EventScannerBuilder::latest(5).connect(robust_provider).await?; let mut stream = scanner.subscribe(increase_filter); diff --git a/examples/live_scanning/main.rs b/examples/live_scanning/main.rs index ab081494..dd888e20 100644 --- a/examples/live_scanning/main.rs +++ b/examples/live_scanning/main.rs @@ -1,6 +1,8 @@ -use alloy::{network::Ethereum, providers::ProviderBuilder, sol, sol_types::SolEvent}; +use alloy::{providers::ProviderBuilder, sol, sol_types::SolEvent}; use alloy_node_bindings::Anvil; -use event_scanner::{EventFilter, EventScannerBuilder, Message}; +use event_scanner::{ + EventFilter, EventScannerBuilder, Message, robust_provider::RobustProviderBuilder, +}; use tokio_stream::StreamExt; use tracing::{error, info}; @@ -38,9 +40,11 @@ async fn main() -> anyhow::Result<()> { let anvil = Anvil::new().block_time(1).try_spawn()?; let wallet = anvil.wallet(); - let provider = - ProviderBuilder::new().wallet(wallet.unwrap()).connect(anvil.endpoint().as_str()).await?; - let counter_contract = Counter::deploy(provider).await?; + let provider = ProviderBuilder::new() + .wallet(wallet.unwrap()) + .connect(anvil.ws_endpoint_url().as_str()) + .await?; + let counter_contract = Counter::deploy(provider.clone()).await?; let contract_address = counter_contract.address(); @@ -48,8 +52,14 @@ async fn main() -> anyhow::Result<()> { .contract_address(*contract_address) .event(Counter::CountIncreased::SIGNATURE); - let mut scanner = - EventScannerBuilder::live().connect_ws::(anvil.ws_endpoint_url()).await?; + let robust_provider = RobustProviderBuilder::new(provider) + .max_timeout(std::time::Duration::from_secs(30)) + .max_retries(5) + .min_delay(std::time::Duration::from_millis(500)) + .build() + .await?; + + let mut scanner = EventScannerBuilder::live().connect(robust_provider).await?; let mut stream = scanner.subscribe(increase_filter); diff --git a/examples/sync_from_block_scanning/main.rs b/examples/sync_from_block_scanning/main.rs index 27d18184..35c0c254 100644 --- a/examples/sync_from_block_scanning/main.rs +++ b/examples/sync_from_block_scanning/main.rs @@ -1,8 +1,10 @@ use std::time::Duration; -use alloy::{network::Ethereum, providers::ProviderBuilder, sol, sol_types::SolEvent}; +use alloy::{providers::ProviderBuilder, sol, sol_types::SolEvent}; use alloy_node_bindings::Anvil; -use event_scanner::{EventFilter, EventScannerBuilder, Message}; +use event_scanner::{ + EventFilter, EventScannerBuilder, Message, robust_provider::RobustProviderBuilder, +}; use tokio::time::sleep; use tokio_stream::StreamExt; use tracing::{error, info}; @@ -40,9 +42,11 @@ async fn main() -> anyhow::Result<()> { let anvil = Anvil::new().block_time(1).try_spawn()?; let wallet = anvil.wallet(); - let provider = - ProviderBuilder::new().wallet(wallet.unwrap()).connect(anvil.endpoint().as_str()).await?; - let counter_contract = Counter::deploy(provider).await?; + let provider = ProviderBuilder::new() + .wallet(wallet.unwrap()) + .connect(anvil.ws_endpoint_url().as_str()) + .await?; + let counter_contract = Counter::deploy(provider.clone()).await?; let contract_address = counter_contract.address(); @@ -56,11 +60,15 @@ async fn main() -> anyhow::Result<()> { info!("Historical event {} created", i + 1); } - let mut scanner = EventScannerBuilder::sync() - .from_block(0) - .connect_ws::(anvil.ws_endpoint_url()) + let robust_provider = RobustProviderBuilder::new(provider) + .max_timeout(Duration::from_secs(30)) + .max_retries(5) + .min_delay(Duration::from_millis(500)) + .build() .await?; + let mut scanner = EventScannerBuilder::sync().from_block(0).connect(robust_provider).await?; + let mut stream = scanner.subscribe(increase_filter); info!("Starting sync scanner..."); diff --git a/examples/sync_from_latest_scanning/main.rs b/examples/sync_from_latest_scanning/main.rs index f0886bd5..c8ba8cfb 100644 --- a/examples/sync_from_latest_scanning/main.rs +++ b/examples/sync_from_latest_scanning/main.rs @@ -1,6 +1,8 @@ -use alloy::{network::Ethereum, providers::ProviderBuilder, sol, sol_types::SolEvent}; +use alloy::{providers::ProviderBuilder, sol, sol_types::SolEvent}; use alloy_node_bindings::Anvil; -use event_scanner::{EventFilter, EventScannerBuilder, Message}; +use event_scanner::{ + EventFilter, EventScannerBuilder, Message, robust_provider::RobustProviderBuilder, +}; use tokio_stream::StreamExt; use tracing::{error, info}; @@ -38,9 +40,11 @@ async fn main() -> anyhow::Result<()> { let anvil = Anvil::new().block_time_f64(0.5).try_spawn()?; let wallet = anvil.wallet(); - let provider = - ProviderBuilder::new().wallet(wallet.unwrap()).connect(anvil.endpoint().as_str()).await?; - let counter_contract = Counter::deploy(provider).await?; + let provider = ProviderBuilder::new() + .wallet(wallet.unwrap()) + .connect(anvil.ws_endpoint_url().as_str()) + .await?; + let counter_contract = Counter::deploy(provider.clone()).await?; let contract_address = counter_contract.address(); @@ -48,11 +52,15 @@ async fn main() -> anyhow::Result<()> { .contract_address(*contract_address) .event(Counter::CountIncreased::SIGNATURE); - let mut client = EventScannerBuilder::sync() - .from_latest(5) - .connect_ws::(anvil.ws_endpoint_url()) + let robust_provider = RobustProviderBuilder::new(provider) + .max_timeout(std::time::Duration::from_secs(30)) + .max_retries(5) + .min_delay(std::time::Duration::from_millis(500)) + .build() .await?; + let mut client = EventScannerBuilder::sync().from_latest(5).connect(robust_provider).await?; + let mut stream = client.subscribe(increase_filter); for _ in 0..10 { diff --git a/src/block_range_scanner.rs b/src/block_range_scanner.rs index 7e26c0f9..da742503 100644 --- a/src/block_range_scanner.rs +++ b/src/block_range_scanner.rs @@ -5,13 +5,14 @@ //! use std::ops::RangeInclusive; //! use tokio_stream::{StreamExt, wrappers::ReceiverStream}; //! -//! use alloy::transports::http::reqwest::Url; +//! use alloy::providers::{Provider, ProviderBuilder}; //! use event_scanner::{ //! ScannerError, //! block_range_scanner::{ //! BlockRangeScanner, BlockRangeScannerClient, DEFAULT_BLOCK_CONFIRMATIONS, //! DEFAULT_MAX_BLOCK_RANGE, Message, //! }, +//! robust_provider::RobustProviderBuilder, //! }; //! use tokio::time::Duration; //! use tracing::{error, info}; @@ -22,9 +23,9 @@ //! tracing_subscriber::fmt::init(); //! //! // Configuration -//! let block_range_scanner = BlockRangeScanner::new() -//! .connect_ws::(Url::parse("ws://localhost:8546").unwrap()) -//! .await?; +//! let provider = ProviderBuilder::new().connect("ws://localhost:8546").await?; +//! let robust_provider = RobustProviderBuilder::new(provider).build().await?; +//! let block_range_scanner = BlockRangeScanner::new().connect(robust_provider).await?; //! //! // Create client to send subscribe command to block scanner //! let client: BlockRangeScannerClient = block_range_scanner.run()?; @@ -58,7 +59,7 @@ //! } //! ``` -use std::{cmp::Ordering, ops::RangeInclusive, time::Duration}; +use std::{cmp::Ordering, ops::RangeInclusive}; use tokio::{ sync::{mpsc, oneshot}, try_join, @@ -68,10 +69,7 @@ use tokio_stream::{StreamExt, wrappers::ReceiverStream}; use crate::{ ScannerMessage, error::ScannerError, - robust_provider::{ - DEFAULT_MAX_RETRIES, DEFAULT_MAX_TIMEOUT, DEFAULT_RETRY_INTERVAL, - Error as RobustProviderError, RobustProvider, - }, + robust_provider::{Error as RobustProviderError, IntoRobustProvider, RobustProvider}, types::{ScannerStatus, TryStream}, }; use alloy::{ @@ -79,12 +77,8 @@ use alloy::{ eips::BlockNumberOrTag, network::{BlockResponse, Network, primitives::HeaderResponse}, primitives::{B256, BlockNumber}, - providers::RootProvider, pubsub::Subscription, - rpc::client::ClientBuilder, - transports::{ - RpcError, TransportErrorKind, TransportResult, http::reqwest::Url, ws::WsConnect, - }, + transports::{RpcError, TransportErrorKind}, }; use tracing::{debug, error, info, warn}; @@ -130,12 +124,9 @@ impl From for Message { } } -#[derive(Clone, Copy)] +#[derive(Clone)] pub struct BlockRangeScanner { pub max_block_range: u64, - pub max_timeout: Duration, - pub max_retries: usize, - pub retry_interval: Duration, } impl Default for BlockRangeScanner { @@ -147,12 +138,7 @@ impl Default for BlockRangeScanner { impl BlockRangeScanner { #[must_use] pub fn new() -> Self { - Self { - max_block_range: DEFAULT_MAX_BLOCK_RANGE, - max_timeout: DEFAULT_MAX_TIMEOUT, - max_retries: DEFAULT_MAX_RETRIES, - retry_interval: DEFAULT_RETRY_INTERVAL, - } + Self { max_block_range: DEFAULT_MAX_BLOCK_RANGE } } #[must_use] @@ -161,62 +147,17 @@ impl BlockRangeScanner { self } - #[must_use] - pub fn with_max_timeout(mut self, rpc_timeout: Duration) -> Self { - self.max_timeout = rpc_timeout; - self - } - - #[must_use] - pub fn with_max_retries(mut self, rpc_max_retries: usize) -> Self { - self.max_retries = rpc_max_retries; - self - } - - #[must_use] - pub fn with_retry_interval(mut self, rpc_retry_interval: Duration) -> Self { - self.retry_interval = rpc_retry_interval; - self - } - - /// Connects to the provider via WebSocket - /// - /// # Errors - /// - /// Returns an error if the connection fails - pub async fn connect_ws( - self, - ws_url: Url, - ) -> TransportResult> { - let provider = - RootProvider::::new(ClientBuilder::default().ws(WsConnect::new(ws_url)).await?); - Ok(self.connect(provider)) - } - - /// Connects to the provider via IPC + /// Connects to an existing provider /// /// # Errors /// - /// Returns an error if the connection fails - pub async fn connect_ipc( + /// Returns an error if the provider connection fails. + pub async fn connect( self, - ipc_path: String, - ) -> Result, RpcError> { - let provider = RootProvider::::new(ClientBuilder::default().ipc(ipc_path.into()).await?); - Ok(self.connect(provider)) - } - - /// Connects to an existing provider - #[must_use] - pub fn connect(self, provider: RootProvider) -> ConnectedBlockRangeScanner { - let robust_provider = RobustProvider::new(provider) - .max_timeout(self.max_timeout) - .max_retries(self.max_retries) - .retry_interval(self.retry_interval); - ConnectedBlockRangeScanner { - provider: robust_provider, - max_block_range: self.max_block_range, - } + provider: impl IntoRobustProvider, + ) -> Result, ScannerError> { + let provider = provider.into_robust_provider().await?; + Ok(ConnectedBlockRangeScanner { provider, max_block_range: self.max_block_range }) } } diff --git a/src/error.rs b/src/error.rs index 829591f1..e7ebedb5 100644 --- a/src/error.rs +++ b/src/error.rs @@ -21,16 +21,13 @@ pub enum ScannerError { #[error("Operation timed out")] Timeout, - - #[error("RPC call failed after exhausting all retry attempts: {0}")] - RetryFailure(Arc>), } impl From for ScannerError { fn from(error: RobustProviderError) -> ScannerError { match error { RobustProviderError::Timeout => ScannerError::Timeout, - RobustProviderError::RetryFailure(err) => ScannerError::RetryFailure(err), + RobustProviderError::RpcError(err) => ScannerError::RpcError(err), RobustProviderError::BlockNotFound(block) => ScannerError::BlockNotFound(block), } } diff --git a/src/event_scanner/scanner/mod.rs b/src/event_scanner/scanner/mod.rs index 98535f1d..1a13a5f8 100644 --- a/src/event_scanner/scanner/mod.rs +++ b/src/event_scanner/scanner/mod.rs @@ -1,19 +1,18 @@ use alloy::{ eips::BlockNumberOrTag, network::{Ethereum, Network}, - providers::RootProvider, - transports::{TransportResult, http::reqwest::Url}, }; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use crate::{ - EventFilter, Message, + EventFilter, Message, ScannerError, block_range_scanner::{ BlockRangeScanner, ConnectedBlockRangeScanner, DEFAULT_BLOCK_CONFIRMATIONS, MAX_BUFFERED_MESSAGES, }, event_scanner::listener::EventListener, + robust_provider::IntoRobustProvider, }; mod common; @@ -78,17 +77,16 @@ impl EventScannerBuilder { /// # Example /// /// ```no_run - /// # use alloy::network::Ethereum; - /// # use event_scanner::{EventFilter, EventScannerBuilder, Message}; + /// # use alloy::{network::Ethereum, providers::{Provider, ProviderBuilder}}; + /// # use event_scanner::{EventFilter, EventScannerBuilder, Message, robust_provider::RobustProviderBuilder}; /// # use tokio_stream::StreamExt; /// # /// # async fn example() -> Result<(), Box> { - /// # let ws_url = "ws://localhost:8545".parse()?; /// # let contract_address = alloy::primitives::address!("0xd8dA6BF26964af9d7eed9e03e53415d37aa96045"); /// // Stream all events from genesis to latest block - /// let mut scanner = EventScannerBuilder::historic() - /// .connect_ws::(ws_url) - /// .await?; + /// let provider = ProviderBuilder::new().connect("ws://localhost:8545").await?; + /// let robust_provider = RobustProviderBuilder::new(provider).build().await?; + /// let mut scanner = EventScannerBuilder::historic().connect(robust_provider).await?; /// /// let filter = EventFilter::new().contract_address(contract_address); /// let mut stream = scanner.subscribe(filter); @@ -105,16 +103,17 @@ impl EventScannerBuilder { /// Specifying a custom block range: /// /// ```no_run - /// # use alloy::network::Ethereum; - /// # use event_scanner::EventScannerBuilder; + /// # use alloy::{network::Ethereum, providers::{Provider, ProviderBuilder}}; + /// # use event_scanner::{EventScannerBuilder, robust_provider::RobustProviderBuilder}; /// # /// # async fn example() -> Result<(), Box> { - /// # let ws_url = "ws://localhost:8545".parse()?; /// // Stream events between blocks [1_000_000, 2_000_000] + /// let provider = ProviderBuilder::new().connect("ws://localhost:8545").await?; + /// let robust_provider = RobustProviderBuilder::new(provider).build().await?; /// let mut scanner = EventScannerBuilder::historic() /// .from_block(1_000_000) /// .to_block(2_000_000) - /// .connect_ws::(ws_url) + /// .connect(robust_provider) /// .await?; /// # Ok(()) /// # } @@ -144,17 +143,18 @@ impl EventScannerBuilder { /// # Example /// /// ```no_run - /// # use alloy::network::Ethereum; - /// # use event_scanner::{EventFilter, EventScannerBuilder, Message}; + /// # use alloy::{network::Ethereum, providers::{Provider, ProviderBuilder}}; + /// # use event_scanner::{EventFilter, EventScannerBuilder, Message, robust_provider::RobustProviderBuilder}; /// # use tokio_stream::StreamExt; /// # /// # async fn example() -> Result<(), Box> { - /// # let ws_url = "ws://localhost:8545".parse()?; /// # let contract_address = alloy::primitives::address!("0xd8dA6BF26964af9d7eed9e03e53415d37aa96045"); /// // Stream new events as they arrive + /// let provider = ProviderBuilder::new().connect("ws://localhost:8545").await?; + /// let robust_provider = RobustProviderBuilder::new(provider).build().await?; /// let mut scanner = EventScannerBuilder::live() /// .block_confirmations(20) - /// .connect_ws::(ws_url) + /// .connect(robust_provider) /// .await?; /// /// let filter = EventFilter::new().contract_address(contract_address); @@ -231,17 +231,16 @@ impl EventScannerBuilder { /// # Example /// /// ```no_run - /// # use alloy::{network::Ethereum, primitives::Address}; - /// # use event_scanner::{EventFilter, EventScannerBuilder, Message}; + /// # use alloy::{network::Ethereum, primitives::Address, providers::{Provider, ProviderBuilder}}; + /// # use event_scanner::{EventFilter, EventScannerBuilder, Message, robust_provider::RobustProviderBuilder}; /// # use tokio_stream::StreamExt; /// # /// # async fn example() -> Result<(), Box> { - /// # let ws_url = "ws://localhost:8545".parse()?; /// # let contract_address = alloy::primitives::address!("0xd8dA6BF26964af9d7eed9e03e53415d37aa96045"); /// // Collect the latest 10 events across Earliest..=Latest - /// let mut scanner = EventScannerBuilder::latest(10) - /// .connect_ws::(ws_url) - /// .await?; + /// let provider = ProviderBuilder::new().connect("ws://localhost:8545").await?; + /// let robust_provider = RobustProviderBuilder::new(provider).build().await?; + /// let mut scanner = EventScannerBuilder::latest(10).connect(robust_provider).await?; /// /// let filter = EventFilter::new().contract_address(contract_address); /// let mut stream = scanner.subscribe(filter); @@ -259,17 +258,17 @@ impl EventScannerBuilder { /// Restricting to a specific block range: /// /// ```no_run - /// # use alloy::network::Ethereum; - /// # use event_scanner::EventScannerBuilder; + /// # use alloy::{network::Ethereum, providers::{Provider, ProviderBuilder}}; + /// # use event_scanner::{EventScannerBuilder, robust_provider::RobustProviderBuilder}; /// # /// # async fn example() -> Result<(), Box> { - /// # let ws_url = "ws://localhost:8545".parse()?; /// // Collect the latest 5 events between blocks [1_000_000, 1_100_000] + /// let provider = ProviderBuilder::new().connect("ws://localhost:8545").await?; + /// let robust_provider = RobustProviderBuilder::new(provider).build().await?; /// let mut scanner = EventScannerBuilder::latest(5) /// .from_block(1_000_000) /// .to_block(1_100_000) - /// .connect_ws::(ws_url) - /// .await?; + /// .connect(robust_provider); /// # Ok(()) /// # } /// ``` @@ -389,45 +388,20 @@ impl EventScannerBuilder { self } - /// Connects to the provider via WebSocket. - /// - /// Final builder method: consumes the builder and returns the built [`EventScanner`]. - /// - /// # Errors - /// - /// Returns an error if the connection fails - pub async fn connect_ws(self, ws_url: Url) -> TransportResult> { - let block_range_scanner = self.block_range_scanner.connect_ws::(ws_url).await?; - Ok(EventScanner { config: self.config, block_range_scanner, listeners: Vec::new() }) - } - - /// Connects to the provider via IPC. + /// Connects to an existing provider. /// /// Final builder method: consumes the builder and returns the built [`EventScanner`]. /// /// # Errors /// - /// Returns an error if the connection fails - pub async fn connect_ipc( + /// Returns an error if the provider connection fails. + pub async fn connect( self, - ipc_path: String, - ) -> TransportResult> { - let block_range_scanner = self.block_range_scanner.connect_ipc::(ipc_path).await?; + provider: impl IntoRobustProvider, + ) -> Result, ScannerError> { + let block_range_scanner = self.block_range_scanner.connect::(provider).await?; Ok(EventScanner { config: self.config, block_range_scanner, listeners: Vec::new() }) } - - /// Connects to an existing provider. - /// - /// Final builder method: consumes the builder and returns the built [`EventScanner`]. - /// - /// # Errors - /// - /// Returns an error if the connection fails - #[must_use] - pub fn connect(self, provider: RootProvider) -> EventScanner { - let block_range_scanner = self.block_range_scanner.connect::(provider); - EventScanner { config: self.config, block_range_scanner, listeners: Vec::new() } - } } impl EventScanner { @@ -441,7 +415,10 @@ impl EventScanner { #[cfg(test)] mod tests { - use alloy::{providers::mock::Asserter, rpc::client::RpcClient}; + use alloy::{ + providers::{RootProvider, mock::Asserter}, + rpc::client::RpcClient, + }; use super::*; @@ -478,10 +455,10 @@ mod tests { assert_eq!(builder.config.block_confirmations, DEFAULT_BLOCK_CONFIRMATIONS); } - #[test] - fn test_historic_event_stream_listeners_vector_updates() { + #[tokio::test] + async fn test_historic_event_stream_listeners_vector_updates() -> anyhow::Result<()> { let provider = RootProvider::::new(RpcClient::mocked(Asserter::new())); - let mut scanner = EventScannerBuilder::historic().connect::(provider); + let mut scanner = EventScannerBuilder::historic().connect(provider).await?; assert!(scanner.listeners.is_empty()); @@ -491,16 +468,20 @@ mod tests { let _stream2 = scanner.subscribe(EventFilter::new()); let _stream3 = scanner.subscribe(EventFilter::new()); assert_eq!(scanner.listeners.len(), 3); + + Ok(()) } - #[test] - fn test_historic_event_stream_channel_capacity() { + #[tokio::test] + async fn test_historic_event_stream_channel_capacity() -> anyhow::Result<()> { let provider = RootProvider::::new(RpcClient::mocked(Asserter::new())); - let mut scanner = EventScannerBuilder::historic().connect::(provider); + let mut scanner = EventScannerBuilder::historic().connect(provider).await?; let _ = scanner.subscribe(EventFilter::new()); let sender = &scanner.listeners[0].sender; assert_eq!(sender.capacity(), MAX_BUFFERED_MESSAGES); + + Ok(()) } } diff --git a/src/event_scanner/scanner/sync/from_block.rs b/src/event_scanner/scanner/sync/from_block.rs index 0fa6f69c..9e626809 100644 --- a/src/event_scanner/scanner/sync/from_block.rs +++ b/src/event_scanner/scanner/sync/from_block.rs @@ -10,8 +10,8 @@ use crate::{ impl EventScannerBuilder { #[must_use] - pub fn block_confirmations(mut self, count: u64) -> Self { - self.config.block_confirmations = count; + pub fn block_confirmations(mut self, confirmations: u64) -> Self { + self.config.block_confirmations = confirmations; self } } diff --git a/src/event_scanner/scanner/sync/from_latest.rs b/src/event_scanner/scanner/sync/from_latest.rs index e266b699..27753718 100644 --- a/src/event_scanner/scanner/sync/from_latest.rs +++ b/src/event_scanner/scanner/sync/from_latest.rs @@ -22,8 +22,8 @@ use crate::{ impl EventScannerBuilder { #[must_use] - pub fn block_confirmations(mut self, count: u64) -> Self { - self.config.block_confirmations = count; + pub fn block_confirmations(mut self, confirmations: u64) -> Self { + self.config.block_confirmations = confirmations; self } } diff --git a/src/event_scanner/scanner/sync/mod.rs b/src/event_scanner/scanner/sync/mod.rs index 8d589709..e8467e54 100644 --- a/src/event_scanner/scanner/sync/mod.rs +++ b/src/event_scanner/scanner/sync/mod.rs @@ -24,17 +24,18 @@ impl EventScannerBuilder { /// # Example /// /// ```no_run - /// # use alloy::network::Ethereum; - /// # use event_scanner::{EventFilter, EventScannerBuilder, Message}; + /// # use alloy::{network::Ethereum, providers::{Provider, ProviderBuilder}}; + /// # use event_scanner::{EventFilter, EventScannerBuilder, Message, robust_provider::RobustProviderBuilder}; /// # use tokio_stream::StreamExt; /// # /// # async fn example() -> Result<(), Box> { - /// # let ws_url = "ws://localhost:8545".parse()?; /// # let contract_address = alloy::primitives::address!("0xd8dA6BF26964af9d7eed9e03e53415d37aa96045"); /// // Fetch the latest 10 events, then stream new events continuously + /// let provider = ProviderBuilder::new().connect("ws://localhost:8545").await?; + /// let robust_provider = RobustProviderBuilder::new(provider).build().await?; /// let mut scanner = EventScannerBuilder::sync() /// .from_latest(10) - /// .connect_ws::(ws_url) + /// .connect(robust_provider) /// .await?; /// /// let filter = EventFilter::new().contract_address(contract_address); @@ -120,17 +121,18 @@ impl EventScannerBuilder { /// # Example /// /// ```no_run - /// # use alloy::network::Ethereum; - /// # use event_scanner::{EventFilter, EventScannerBuilder, Message}; + /// # use alloy::{network::Ethereum, providers::{Provider, ProviderBuilder}}; + /// # use event_scanner::{EventFilter, EventScannerBuilder, Message, robust_provider::RobustProviderBuilder}; /// # use tokio_stream::StreamExt; /// # /// # async fn example() -> Result<(), Box> { - /// # let ws_url = "ws://localhost:8545".parse()?; /// # let contract_address = alloy::primitives::address!("0xd8dA6BF26964af9d7eed9e03e53415d37aa96045"); /// // Sync from block 1_000_000 to present, then stream new events + /// let provider = ProviderBuilder::new().connect("ws://localhost:8545").await?; + /// let robust_provider = RobustProviderBuilder::new(provider).build().await?; /// let mut scanner = EventScannerBuilder::sync() /// .from_block(1_000_000) - /// .connect_ws::(ws_url) + /// .connect(robust_provider) /// .await?; /// /// let filter = EventFilter::new().contract_address(contract_address); @@ -159,15 +161,16 @@ impl EventScannerBuilder { /// Using block tags: /// /// ```no_run - /// # use alloy::{network::Ethereum, eips::BlockNumberOrTag}; - /// # use event_scanner::EventScannerBuilder; + /// # use alloy::{network::Ethereum, eips::BlockNumberOrTag, providers::{Provider, ProviderBuilder}}; + /// # use event_scanner::{EventScannerBuilder, robust_provider::RobustProviderBuilder}; /// # /// # async fn example() -> Result<(), Box> { - /// # let ws_url = "ws://localhost:8545".parse()?; /// // Sync from genesis block + /// let provider = ProviderBuilder::new().connect("ws://localhost:8545").await?; + /// let robust_provider = RobustProviderBuilder::new(provider).build().await?; /// let mut scanner = EventScannerBuilder::sync() /// .from_block(BlockNumberOrTag::Earliest) - /// .connect_ws::(ws_url) + /// .connect(robust_provider) /// .await?; /// # Ok(()) /// # } diff --git a/src/lib.rs b/src/lib.rs index 1b374057..496051a7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,6 @@ pub mod block_range_scanner; -mod robust_provider; +pub mod robust_provider; #[cfg(any(test, feature = "test-utils"))] pub mod test_utils; diff --git a/src/robust_provider.rs b/src/robust_provider.rs index f66158d4..bada3fb2 100644 --- a/src/robust_provider.rs +++ b/src/robust_provider.rs @@ -1,15 +1,20 @@ -use std::{future::Future, sync::Arc, time::Duration}; +use std::{fmt::Debug, future::Future, marker::PhantomData, sync::Arc, time::Duration}; use alloy::{ eips::{BlockId, BlockNumberOrTag}, - network::Network, - providers::{Provider, RootProvider}, + network::{Ethereum, Network}, + providers::{ + DynProvider, Provider, RootProvider, + fillers::{FillProvider, TxFiller}, + layers::{CacheProvider, CallBatchProvider}, + }, pubsub::Subscription, rpc::types::{Filter, Log}, - transports::{RpcError, TransportErrorKind}, + transports::{RpcError, TransportErrorKind, http::reqwest::Url}, }; use backon::{ExponentialBuilder, Retryable}; use thiserror::Error; +use tokio::time::{error as TokioError, timeout}; use tracing::{error, info}; #[derive(Error, Debug, Clone)] @@ -17,80 +22,239 @@ pub enum Error { #[error("Operation timed out")] Timeout, #[error("RPC call failed after exhausting all retry attempts: {0}")] - RetryFailure(Arc>), + RpcError(Arc>), #[error("Block not found, Block Id: {0}")] BlockNotFound(BlockId), } impl From> for Error { fn from(err: RpcError) -> Self { - Error::RetryFailure(Arc::new(err)) + Error::RpcError(Arc::new(err)) } } -/// Provider wrapper with built-in retry and timeout mechanisms. -/// -/// This wrapper around Alloy providers automatically handles retries, -/// timeouts, and error logging for RPC calls. -#[derive(Clone)] -pub struct RobustProvider { - provider: RootProvider, - max_timeout: Duration, - max_retries: usize, - retry_interval: Duration, +impl From for Error { + fn from(_: TokioError::Elapsed) -> Self { + Error::Timeout + } +} + +pub trait IntoProvider { + fn into_provider( + self, + ) -> impl std::future::Future, Error>> + Send; +} + +impl IntoProvider for RobustProvider { + async fn into_provider(self) -> Result, Error> { + Ok(self.primary().to_owned()) + } +} + +impl IntoProvider for RootProvider { + async fn into_provider(self) -> Result, Error> { + Ok(self) + } +} + +impl IntoProvider for &str { + async fn into_provider(self) -> Result, Error> { + Ok(RootProvider::connect(self).await?) + } +} + +impl IntoProvider for Url { + async fn into_provider(self) -> Result, Error> { + Ok(RootProvider::connect(self.as_str()).await?) + } +} + +impl IntoProvider for FillProvider +where + F: TxFiller, + P: Provider, + N: Network, +{ + async fn into_provider(self) -> Result, Error> { + Ok(self) + } +} + +impl IntoProvider for CacheProvider +where + P: Provider, + N: Network, +{ + async fn into_provider(self) -> Result, Error> { + Ok(self) + } +} + +impl IntoProvider for DynProvider +where + N: Network, +{ + async fn into_provider(self) -> Result, Error> { + Ok(self) + } +} + +impl IntoProvider for CallBatchProvider +where + P: Provider + 'static, + N: Network, +{ + async fn into_provider(self) -> Result, Error> { + Ok(self) + } +} + +pub trait IntoRobustProvider { + fn into_robust_provider( + self, + ) -> impl std::future::Future, Error>> + Send; +} + +impl + Send> IntoRobustProvider for P { + async fn into_robust_provider(self) -> Result, Error> { + RobustProviderBuilder::new(self).build().await + } } // RPC retry and timeout settings /// Default timeout used by `RobustProvider` -pub const DEFAULT_MAX_TIMEOUT: Duration = Duration::from_secs(30); +pub const DEFAULT_MAX_TIMEOUT: Duration = Duration::from_secs(60); /// Default maximum number of retry attempts. -pub const DEFAULT_MAX_RETRIES: usize = 5; +pub const DEFAULT_MAX_RETRIES: usize = 3; /// Default base delay between retries. -pub const DEFAULT_RETRY_INTERVAL: Duration = Duration::from_secs(1); +pub const DEFAULT_MIN_DELAY: Duration = Duration::from_secs(1); -impl RobustProvider { +#[derive(Clone)] +pub struct RobustProviderBuilder> { + providers: Vec

, + max_timeout: Duration, + max_retries: usize, + min_delay: Duration, + _network: PhantomData, +} + +impl> RobustProviderBuilder { /// Create a new `RobustProvider` with default settings. + /// + /// The provided provider is treated as the primary provider. #[must_use] - pub fn new(provider: RootProvider) -> Self { + pub fn new(provider: P) -> Self { Self { - provider, + providers: vec![provider], max_timeout: DEFAULT_MAX_TIMEOUT, max_retries: DEFAULT_MAX_RETRIES, - retry_interval: DEFAULT_RETRY_INTERVAL, + min_delay: DEFAULT_MIN_DELAY, + _network: PhantomData, } } + /// Create a new `RobustProvider` with no retry attempts and only timeout set. + /// + /// The provided provider is treated as the primary provider. + #[must_use] + pub fn fragile(provider: P) -> Self { + Self::new(provider).max_retries(0).min_delay(Duration::ZERO) + } + + /// Add a fallback provider to the list. + /// + /// Fallback providers are used when the primary provider times out or fails. + #[must_use] + pub fn fallback(mut self, provider: P) -> Self { + self.providers.push(provider); + self + } + + /// Set the maximum timeout for RPC operations. #[must_use] pub fn max_timeout(mut self, timeout: Duration) -> Self { self.max_timeout = timeout; self } + /// Set the maximum number of retry attempts. #[must_use] pub fn max_retries(mut self, max_retries: usize) -> Self { self.max_retries = max_retries; self } + /// Set the base delay for exponential backoff retries. #[must_use] - pub fn retry_interval(mut self, retry_interval: Duration) -> Self { - self.retry_interval = retry_interval; + pub fn min_delay(mut self, min_delay: Duration) -> Self { + self.min_delay = min_delay; self } + /// Build the `RobustProvider`. + /// + /// Final builder method: consumes the builder and returns the built [`RobustProvider`]. + /// + /// # Errors + /// + /// Returns an error if any of the providers fail to connect. + pub async fn build(self) -> Result, Error> { + let mut providers = vec![]; + for p in self.providers { + providers.push(p.into_provider().await?.root().to_owned()); + } + Ok(RobustProvider { + providers, + max_timeout: self.max_timeout, + max_retries: self.max_retries, + min_delay: self.min_delay, + }) + } +} + +/// Provider wrapper with built-in retry and timeout mechanisms. +/// +/// This wrapper around Alloy providers automatically handles retries, +/// timeouts, and error logging for RPC calls. +/// The first provider in the vector is treated as the primary provider. +#[derive(Clone)] +pub struct RobustProvider { + providers: Vec>, + max_timeout: Duration, + max_retries: usize, + min_delay: Duration, +} + +impl RobustProvider { + /// Get a reference to the primary provider (the first provider in the list) + /// + /// # Panics + /// + /// If there are no providers set (this should never happen) + #[must_use] + pub fn primary(&self) -> &RootProvider { + // Safe to unwrap because we always have at least one provider + self.providers.first().expect("providers vector should never be empty") + } + /// Fetch a block by number with retry and timeout. /// /// # Errors /// - /// Returns an error if RPC call fails repeatedly even - /// after exhausting retries or if the call times out. + /// Returns an error if the RPC call fails after exhausting all retry attempts + /// or if the call times out. When fallback providers are configured, the error + /// returned will be from the final provider that was attempted. pub async fn get_block_by_number( &self, number: BlockNumberOrTag, ) -> Result { info!("eth_getBlockByNumber called"); - let operation = async || self.provider.get_block_by_number(number).await; - let result = self.retry_with_total_timeout(operation).await; + let result = self + .retry_with_total_timeout( + move |provider| async move { provider.get_block_by_number(number).await }, + false, + ) + .await; if let Err(e) = &result { error!(error = %e, "eth_getByBlockNumber failed"); } @@ -102,12 +266,17 @@ impl RobustProvider { /// /// # Errors /// - /// Returns an error if RPC call fails repeatedly even - /// after exhausting retries or if the call times out. + /// Returns an error if the RPC call fails after exhausting all retry attempts + /// or if the call times out. When fallback providers are configured, the error + /// returned will be from the final provider that was attempted. pub async fn get_block_number(&self) -> Result { info!("eth_getBlockNumber called"); - let operation = async || self.provider.get_block_number().await; - let result = self.retry_with_total_timeout(operation).await; + let result = self + .retry_with_total_timeout( + move |provider| async move { provider.get_block_number().await }, + false, + ) + .await; if let Err(e) = &result { error!(error = %e, "eth_getBlockNumber failed"); } @@ -118,15 +287,20 @@ impl RobustProvider { /// /// # Errors /// - /// Returns an error if RPC call fails repeatedly even - /// after exhausting retries or if the call times out. + /// Returns an error if the RPC call fails after exhausting all retry attempts + /// or if the call times out. When fallback providers are configured, the error + /// returned will be from the final provider that was attempted. pub async fn get_block_by_hash( &self, hash: alloy::primitives::BlockHash, ) -> Result { info!("eth_getBlockByHash called"); - let operation = async || self.provider.get_block_by_hash(hash).await; - let result = self.retry_with_total_timeout(operation).await; + let result = self + .retry_with_total_timeout( + move |provider| async move { provider.get_block_by_hash(hash).await }, + false, + ) + .await; if let Err(e) = &result { error!(error = %e, "eth_getBlockByHash failed"); } @@ -138,12 +312,17 @@ impl RobustProvider { /// /// # Errors /// - /// Returns an error if RPC call fails repeatedly even - /// after exhausting retries or if the call times out. + /// Returns an error if the RPC call fails after exhausting all retry attempts + /// or if the call times out. When fallback providers are configured, the error + /// returned will be from the final provider that was attempted. pub async fn get_logs(&self, filter: &Filter) -> Result, Error> { info!("eth_getLogs called"); - let operation = async || self.provider.get_logs(filter).await; - let result = self.retry_with_total_timeout(operation).await; + let result = self + .retry_with_total_timeout( + move |provider| async move { provider.get_logs(filter).await }, + false, + ) + .await; if let Err(e) = &result { error!(error = %e, "eth_getLogs failed"); } @@ -154,12 +333,18 @@ impl RobustProvider { /// /// # Errors /// - /// Returns an error if RPC call fails repeatedly even - /// after exhausting retries or if the call times out. + /// Returns an error if the primary provider does not support pubsub, if the RPC + /// call fails after exhausting all retry attempts, or if the call times out. + /// When fallback providers are configured, the error returned will be from the + /// final provider that was attempted. pub async fn subscribe_blocks(&self) -> Result, Error> { info!("eth_subscribe called"); - let operation = async || self.provider.subscribe_blocks().await; - let result = self.retry_with_total_timeout(operation).await; + let result = self + .retry_with_total_timeout( + move |provider| async move { provider.subscribe_blocks().await }, + true, + ) + .await; if let Err(e) = &result { error!(error = %e, "eth_subscribe failed"); } @@ -172,49 +357,121 @@ impl RobustProvider { /// the entire operation (including time spent inside the RPC call) cannot exceed /// `max_timeout`. /// + /// If the timeout is exceeded and fallback providers are available, it will + /// attempt to use each fallback provider in sequence. + /// + /// If `require_pubsub` is true, providers that don't support pubsub will be skipped. + /// /// # Errors /// - /// - Returns [`RpcError`] with message "total operation timeout exceeded" - /// if the overall timeout elapses. + /// - Returns [`RpcError`] with message "total operation timeout exceeded + /// and all fallback providers failed" if the overall timeout elapses and no fallback + /// providers succeed. + /// - Returns [`RpcError::Transport(TransportErrorKind::PubsubUnavailable)`] if `require_pubsub` + /// is true and all providers don't support pubsub. /// - Propagates any [`RpcError`] from the underlying retries. - async fn retry_with_total_timeout(&self, operation: F) -> Result + async fn retry_with_total_timeout( + &self, + operation: F, + require_pubsub: bool, + ) -> Result + where + F: Fn(RootProvider) -> Fut, + Fut: Future>>, + { + let mut providers = self.providers.iter(); + let primary = providers.next().expect("should have primary provider"); + + let result = self.try_provider_with_timeout(primary, &operation).await; + + if result.is_ok() { + return result; + } + + let mut last_error = result.unwrap_err(); + + let num_providers = self.providers.len(); + if num_providers > 1 { + info!("Primary provider failed, trying fallback provider(s)"); + } + + // This loop starts at index 1 automatically + for (idx, provider) in providers.enumerate() { + let fallback_num = idx + 1; + if require_pubsub && !Self::supports_pubsub(provider) { + info!("Fallback provider {} doesn't support pubsub, skipping", fallback_num); + continue; + } + info!("Attempting fallback provider {}/{}", fallback_num, num_providers - 1); + + match self.try_provider_with_timeout(provider, &operation).await { + Ok(value) => { + info!(provider_num = fallback_num, "Fallback provider succeeded"); + return Ok(value); + } + Err(e) => { + error!(provider_num = fallback_num, err = %e, "Fallback provider failed"); + last_error = e; + } + } + } + + // Return the last error encountered + error!("All providers failed or timed out - returning the last providers attempt's error"); + Err(last_error) + } + + /// Try executing an operation with a specific provider with retry and timeout. + async fn try_provider_with_timeout( + &self, + provider: &RootProvider, + operation: F, + ) -> Result where - F: Fn() -> Fut, + F: Fn(RootProvider) -> Fut, Fut: Future>>, { let retry_strategy = ExponentialBuilder::default() .with_max_times(self.max_retries) - .with_min_delay(self.retry_interval); + .with_min_delay(self.min_delay); - match tokio::time::timeout( + timeout( self.max_timeout, - operation.retry(retry_strategy).sleep(tokio::time::sleep), + (|| operation(provider.clone())) + .retry(retry_strategy) + .notify(|err: &RpcError, dur: Duration| { + info!(error = %err, "RPC error retrying after {:?}", dur); + }) + .sleep(tokio::time::sleep), ) .await - { - Ok(res) => res.map_err(Error::from), - Err(_) => Err(Error::Timeout), - } + .map_err(Error::from)? + .map_err(Error::from) + } + + /// Check if a provider supports pubsub + fn supports_pubsub(provider: &RootProvider) -> bool { + provider.client().pubsub_frontend().is_some() } } #[cfg(test)] mod tests { use super::*; - use alloy::network::Ethereum; + use alloy::{ + consensus::BlockHeader, + providers::{ProviderBuilder, WsConnect, ext::AnvilApi}, + }; + use alloy_node_bindings::Anvil; use std::sync::atomic::{AtomicUsize, Ordering}; use tokio::time::sleep; - fn test_provider( - timeout: u64, - max_retries: usize, - retry_interval: u64, - ) -> RobustProvider { + fn test_provider(timeout: u64, max_retries: usize, min_delay: u64) -> RobustProvider { RobustProvider { - provider: RootProvider::new_http("http://localhost:8545".parse().unwrap()), + providers: vec![RootProvider::new_http("http://localhost:8545".parse().unwrap())], max_timeout: Duration::from_millis(timeout), max_retries, - retry_interval: Duration::from_millis(retry_interval), + min_delay: Duration::from_millis(min_delay), } } @@ -225,11 +482,14 @@ mod tests { let call_count = AtomicUsize::new(0); let result = provider - .retry_with_total_timeout(|| async { - call_count.fetch_add(1, Ordering::SeqCst); - let count = call_count.load(Ordering::SeqCst); - Ok(count) - }) + .retry_with_total_timeout( + |_| async { + call_count.fetch_add(1, Ordering::SeqCst); + let count = call_count.load(Ordering::SeqCst); + Ok(count) + }, + false, + ) .await; assert!(matches!(result, Ok(1))); @@ -242,14 +502,17 @@ mod tests { let call_count = AtomicUsize::new(0); let result = provider - .retry_with_total_timeout(|| async { - call_count.fetch_add(1, Ordering::SeqCst); - let count = call_count.load(Ordering::SeqCst); - match count { - 3 => Ok(count), - _ => Err(TransportErrorKind::BackendGone.into()), - } - }) + .retry_with_total_timeout( + |_| async { + call_count.fetch_add(1, Ordering::SeqCst); + let count = call_count.load(Ordering::SeqCst); + match count { + 3 => Ok(count), + _ => Err(TransportErrorKind::BackendGone.into()), + } + }, + false, + ) .await; assert!(matches!(result, Ok(3))); @@ -262,13 +525,16 @@ mod tests { let call_count = AtomicUsize::new(0); let result: Result<(), Error> = provider - .retry_with_total_timeout(|| async { - call_count.fetch_add(1, Ordering::SeqCst); - Err(TransportErrorKind::BackendGone.into()) - }) + .retry_with_total_timeout( + |_| async { + call_count.fetch_add(1, Ordering::SeqCst); + Err(TransportErrorKind::BackendGone.into()) + }, + false, + ) .await; - assert!(matches!(result, Err(Error::RetryFailure(_)))); + assert!(matches!(result, Err(Error::RpcError(_)))); assert_eq!(call_count.load(Ordering::SeqCst), 3); } @@ -278,12 +544,130 @@ mod tests { let provider = test_provider(max_timeout, 10, 1); let result = provider - .retry_with_total_timeout(|| async { - sleep(Duration::from_millis(max_timeout + 10)).await; - Ok(42) - }) + .retry_with_total_timeout( + move |_provider| async move { + sleep(Duration::from_millis(max_timeout + 10)).await; + Ok(42) + }, + false, + ) .await; assert!(matches!(result, Err(Error::Timeout))); } + + #[tokio::test] + async fn test_subscribe_fails_causes_backup_to_be_used() -> anyhow::Result<()> { + let anvil_1 = Anvil::new().try_spawn()?; + + let ws_provider_1 = + ProviderBuilder::new().connect(anvil_1.ws_endpoint_url().as_str()).await?; + + let anvil_2 = Anvil::new().try_spawn()?; + + let ws_provider_2 = + ProviderBuilder::new().connect(anvil_2.ws_endpoint_url().as_str()).await?; + + let robust = RobustProviderBuilder::fragile(ws_provider_1.clone()) + .fallback(ws_provider_2.clone()) + .max_timeout(Duration::from_secs(1)) + .build() + .await?; + + drop(anvil_1); + + let mut subscription = robust.subscribe_blocks().await?; + + ws_provider_2.anvil_mine(Some(2), None).await?; + + assert_eq!(1, subscription.recv().await?.number()); + assert_eq!(2, subscription.recv().await?.number()); + assert!(subscription.is_empty()); + + Ok(()) + } + + #[tokio::test] + async fn test_subscribe_fails_when_all_providers_lack_pubsub() -> anyhow::Result<()> { + let anvil = Anvil::new().try_spawn()?; + + let http_provider = ProviderBuilder::new().connect_http(anvil.endpoint_url()); + + let robust = RobustProviderBuilder::new(http_provider.clone()) + .fallback(http_provider) + .max_timeout(Duration::from_secs(5)) + .min_delay(Duration::from_millis(100)) + .build() + .await?; + + let result = robust.subscribe_blocks().await.unwrap_err(); + + match result { + Error::RpcError(e) => { + assert!(matches!( + e.as_ref(), + RpcError::Transport(TransportErrorKind::PubsubUnavailable) + )); + } + other => panic!("Expected PubsubUnavailable error type, got: {other:?}"), + } + + Ok(()) + } + + #[tokio::test] + async fn test_subscribe_succeeds_if_primary_provider_lacks_pubsub_but_fallback_supports_it() + -> anyhow::Result<()> { + let anvil = Anvil::new().try_spawn()?; + + let http_provider = ProviderBuilder::new().connect_http(anvil.endpoint_url()); + let ws_provider = ProviderBuilder::new() + .connect_ws(WsConnect::new(anvil.ws_endpoint_url().as_str())) + .await?; + + let robust = RobustProviderBuilder::fragile(http_provider) + .fallback(ws_provider) + .max_timeout(Duration::from_secs(5)) + .build() + .await?; + + let result = robust.subscribe_blocks().await; + assert!(result.is_ok()); + + Ok(()) + } + + #[tokio::test] + async fn test_ws_fails_http_fallback_returns_primary_error() -> anyhow::Result<()> { + let anvil_1 = Anvil::new().try_spawn()?; + + let ws_provider = + ProviderBuilder::new().connect(anvil_1.ws_endpoint_url().as_str()).await?; + + let anvil_2 = Anvil::new().try_spawn()?; + let http_provider = ProviderBuilder::new().connect_http(anvil_2.endpoint_url()); + + let robust = RobustProviderBuilder::fragile(ws_provider.clone()) + .fallback(http_provider) + .max_timeout(Duration::from_millis(500)) + .build() + .await?; + + // force ws_provider to fail and return BackendGone + drop(anvil_1); + + let err = robust.subscribe_blocks().await.unwrap_err(); + + // The error should be either a Timeout or BackendGone from the primary WS provider, + // NOT a PubsubUnavailable error (which would indicate HTTP fallback was attempted) + match err { + Error::Timeout => {} + Error::RpcError(e) => { + assert!(matches!(e.as_ref(), RpcError::Transport(TransportErrorKind::BackendGone))); + } + Error::BlockNotFound(id) => panic!("Unexpected error type: BlockNotFound({id})"), + } + + Ok(()) + } } diff --git a/tests/block_range_scanner.rs b/tests/block_range_scanner.rs index 3cba4d0b..7dfe10c4 100644 --- a/tests/block_range_scanner.rs +++ b/tests/block_range_scanner.rs @@ -1,7 +1,6 @@ use alloy::{ eips::{BlockId, BlockNumberOrTag}, - network::Ethereum, - providers::{Provider, ProviderBuilder, ext::AnvilApi}, + providers::{ProviderBuilder, ext::AnvilApi}, rpc::types::anvil::ReorgOptions, }; use alloy_node_bindings::Anvil; @@ -17,7 +16,7 @@ async fn live_mode_processes_all_blocks_respecting_block_confirmations() -> anyh // --- Zero block confirmations -> stream immediately --- - let client = BlockRangeScanner::new().connect::(provider.root().clone()).run()?; + let client = BlockRangeScanner::new().connect(provider.clone()).await?.run()?; let mut stream = client.stream_live(0).await?; @@ -61,7 +60,7 @@ async fn stream_from_latest_starts_at_tip_not_confirmed() -> anyhow::Result<()> let anvil = Anvil::new().try_spawn()?; let provider = ProviderBuilder::new().connect(anvil.ws_endpoint_url().as_str()).await?; - let client = BlockRangeScanner::new().connect::(provider.root().clone()).run()?; + let client = BlockRangeScanner::new().connect(provider.clone()).await?.run()?; provider.anvil_mine(Some(20), None).await?; @@ -88,7 +87,7 @@ async fn continuous_blocks_if_reorg_less_than_block_confirmation() -> anyhow::Re let anvil = Anvil::new().try_spawn()?; let provider = ProviderBuilder::new().connect(anvil.ws_endpoint_url().as_str()).await?; - let client = BlockRangeScanner::new().connect::(provider.root().clone()).run()?; + let client = BlockRangeScanner::new().connect(provider.clone()).await?.run()?; let mut stream = client.stream_live(5).await?; @@ -128,7 +127,7 @@ async fn shallow_block_confirmation_does_not_mitigate_reorg() -> anyhow::Result< let anvil = Anvil::new().try_spawn()?; let provider = ProviderBuilder::new().connect(anvil.ws_endpoint_url().as_str()).await?; - let client = BlockRangeScanner::new().connect::(provider.root().clone()).run()?; + let client = BlockRangeScanner::new().connect(provider.clone()).await?.run()?; let mut stream = client.stream_live(3).await?; @@ -181,10 +180,8 @@ async fn historical_emits_correction_range_when_reorg_below_end() -> anyhow::Res let end_num = 110; - let client = BlockRangeScanner::new() - .max_block_range(30) - .connect::(provider.root().clone()) - .run()?; + let client = + BlockRangeScanner::new().max_block_range(30).connect(provider.clone()).await?.run()?; let mut stream = client .stream_historical(BlockNumberOrTag::Number(0), BlockNumberOrTag::Number(end_num)) @@ -215,10 +212,8 @@ async fn historical_emits_correction_range_when_end_num_reorgs() -> anyhow::Resu let end_num = 120; - let client = BlockRangeScanner::new() - .max_block_range(30) - .connect::(provider.root().clone()) - .run()?; + let client = + BlockRangeScanner::new().max_block_range(30).connect(provider.clone()).await?.run()?; let mut stream = client .stream_historical(BlockNumberOrTag::Number(0), BlockNumberOrTag::Number(end_num)) @@ -248,10 +243,8 @@ async fn historic_mode_respects_blocks_read_per_epoch() -> anyhow::Result<()> { provider.anvil_mine(Some(100), None).await?; - let client = BlockRangeScanner::new() - .max_block_range(5) - .connect::(provider.root().clone()) - .run()?; + let client = + BlockRangeScanner::new().max_block_range(5).connect(provider.clone()).await?.run()?; // ranges where each batch is of max blocks per epoch size let mut stream = client.stream_historical(0, 19).await?; @@ -278,10 +271,7 @@ async fn historic_mode_respects_blocks_read_per_epoch() -> anyhow::Result<()> { assert_closed!(stream); // range where blocks per epoch is larger than the number of blocks on chain - let client = BlockRangeScanner::new() - .max_block_range(200) - .connect::(provider.root().clone()) - .run()?; + let client = BlockRangeScanner::new().max_block_range(200).connect(provider).await?.run()?; let mut stream = client.stream_historical(0, 20).await?; assert_next!(stream, 0..=20); @@ -301,10 +291,7 @@ async fn historic_mode_normalises_start_and_end_block() -> anyhow::Result<()> { provider.anvil_mine(Some(11), None).await?; - let client = BlockRangeScanner::new() - .max_block_range(5) - .connect::(provider.root().clone()) - .run()?; + let client = BlockRangeScanner::new().max_block_range(5).connect(provider).await?.run()?; let mut stream = client.stream_historical(10, 0).await?; assert_next!(stream, 0..=4); @@ -328,10 +315,7 @@ async fn rewind_single_batch_when_epoch_larger_than_range() -> anyhow::Result<() provider.anvil_mine(Some(150), None).await?; - let client = BlockRangeScanner::new() - .max_block_range(100) - .connect::(provider.root().clone()) - .run()?; + let client = BlockRangeScanner::new().max_block_range(100).connect(provider).await?.run()?; let mut stream = client.rewind(100, 150).await?; @@ -349,10 +333,7 @@ async fn rewind_exact_multiple_of_epoch_creates_full_batches_in_reverse() -> any provider.anvil_mine(Some(15), None).await?; - let client = BlockRangeScanner::new() - .max_block_range(5) - .connect::(provider.root().clone()) - .run()?; + let client = BlockRangeScanner::new().max_block_range(5).connect(provider).await?.run()?; let mut stream = client.rewind(0, 14).await?; @@ -372,10 +353,7 @@ async fn rewind_with_remainder_trims_first_batch_to_stream_start() -> anyhow::Re provider.anvil_mine(Some(15), None).await?; - let client = BlockRangeScanner::new() - .max_block_range(4) - .connect::(provider.root().clone()) - .run()?; + let client = BlockRangeScanner::new().max_block_range(4).connect(provider).await?.run()?; let mut stream = client.rewind(3, 12).await?; @@ -395,10 +373,7 @@ async fn rewind_single_block_range() -> anyhow::Result<()> { provider.anvil_mine(Some(15), None).await?; - let client = BlockRangeScanner::new() - .max_block_range(5) - .connect::(provider.root().clone()) - .run()?; + let client = BlockRangeScanner::new().max_block_range(5).connect(provider).await?.run()?; let mut stream = client.rewind(7, 7).await?; @@ -415,10 +390,7 @@ async fn rewind_epoch_of_one_sends_each_block_in_reverse_order() -> anyhow::Resu provider.anvil_mine(Some(15), None).await?; - let client = BlockRangeScanner::new() - .max_block_range(1) - .connect::(provider.root().clone()) - .run()?; + let client = BlockRangeScanner::new().max_block_range(1).connect(provider).await?.run()?; let mut stream = client.rewind(5, 8).await?; @@ -440,10 +412,7 @@ async fn command_rewind_defaults_latest_to_earliest_batches_correctly() -> anyho // Mine 20 blocks, so the total number of blocks is 21 (including 0th block) provider.anvil_mine(Some(20), None).await?; - let client = BlockRangeScanner::new() - .max_block_range(7) - .connect::(provider.root().clone()) - .run()?; + let client = BlockRangeScanner::new().max_block_range(7).connect(provider).await?.run()?; let mut stream = client.rewind(BlockNumberOrTag::Earliest, BlockNumberOrTag::Latest).await?; @@ -463,10 +432,7 @@ async fn command_rewind_handles_start_and_end_in_any_order() -> anyhow::Result<( // Ensure blocks at 3 and 15 exist provider.anvil_mine(Some(16), None).await?; - let client = BlockRangeScanner::new() - .max_block_range(5) - .connect::(provider.root().clone()) - .run()?; + let client = BlockRangeScanner::new().max_block_range(5).connect(provider).await?.run()?; let mut stream = client.rewind(15, 3).await?; @@ -490,11 +456,8 @@ async fn command_rewind_propagates_block_not_found_error() -> anyhow::Result<()> let anvil = Anvil::new().try_spawn()?; // Do not mine up to 999 so start won't exist - let client = BlockRangeScanner::new() - .max_block_range(5) - .connect_ws::(anvil.ws_endpoint_url()) - .await? - .run()?; + let provider = ProviderBuilder::new().connect(anvil.endpoint().as_str()).await?; + let client = BlockRangeScanner::new().max_block_range(5).connect(provider).await?.run()?; let stream = client.rewind(0, 999).await; diff --git a/tests/common/mod.rs b/tests/common/mod.rs new file mode 100644 index 00000000..d45a1046 --- /dev/null +++ b/tests/common/mod.rs @@ -0,0 +1,32 @@ +#![allow(clippy::missing_errors_doc)] +#![allow(clippy::missing_panics_doc)] +#![allow(missing_docs)] + +pub mod setup_scanner; +pub mod test_counter; + +pub(crate) use setup_scanner::{ + LiveScannerSetup, SyncScannerSetup, setup_common, setup_historic_scanner, setup_latest_scanner, + setup_live_scanner, setup_sync_from_latest_scanner, setup_sync_scanner, +}; +pub(crate) use test_counter::{TestCounter, deploy_counter}; + +use alloy::{network::Ethereum, providers::ProviderBuilder}; +use alloy_node_bindings::{Anvil, AnvilInstance}; +use event_scanner::robust_provider::{RobustProvider, RobustProviderBuilder}; + +pub fn spawn_anvil(block_time: Option) -> anyhow::Result { + let mut anvil = Anvil::new(); + if let Some(block_time) = block_time { + anvil = anvil.block_time_f64(block_time); + } + Ok(anvil.try_spawn()?) +} + +pub async fn build_provider(anvil: &AnvilInstance) -> anyhow::Result> { + let wallet = anvil.wallet().expect("anvil should return a default wallet"); + let provider = + ProviderBuilder::new().wallet(wallet).connect(anvil.ws_endpoint_url().as_str()).await?; + let robust_provider = RobustProviderBuilder::new(provider).build().await?; + Ok(robust_provider) +} diff --git a/tests/common.rs b/tests/common/setup_scanner.rs similarity index 53% rename from tests/common.rs rename to tests/common/setup_scanner.rs index 5949a773..7e9fdf9e 100644 --- a/tests/common.rs +++ b/tests/common/setup_scanner.rs @@ -1,58 +1,31 @@ -#![allow(clippy::missing_errors_doc)] -#![allow(clippy::missing_panics_doc)] -#![allow(missing_docs)] - use alloy::{ eips::BlockNumberOrTag, network::Ethereum, - providers::{Provider, ProviderBuilder, RootProvider}, - sol, + providers::{Provider, RootProvider}, sol_types::SolEvent, }; -use alloy_node_bindings::{Anvil, AnvilInstance}; +use alloy_node_bindings::AnvilInstance; use event_scanner::{ EventFilter, EventScanner, EventScannerBuilder, Historic, LatestEvents, Live, Message, - SyncFromBlock, SyncFromLatestEvents, + SyncFromBlock, SyncFromLatestEvents, robust_provider::RobustProvider, }; use tokio_stream::wrappers::ReceiverStream; -// Shared test contract used across integration tests -sol! { - // Built directly with solc 0.8.30+commit.73712a01.Darwin.appleclang - #[sol(rpc, bytecode="608080604052346015576101b0908161001a8239f35b5f80fdfe6080806040526004361015610012575f80fd5b5f3560e01c90816306661abd1461016157508063a87d942c14610145578063d732d955146100ad5763e8927fbc14610048575f80fd5b346100a9575f3660031901126100a9575f5460018101809111610095576020817f7ca2ca9527391044455246730762df008a6b47bbdb5d37a890ef78394535c040925f55604051908152a1005b634e487b7160e01b5f52601160045260245ffd5b5f80fd5b346100a9575f3660031901126100a9575f548015610100575f198101908111610095576020817f53a71f16f53e57416424d0d18ccbd98504d42a6f98fe47b09772d8f357c620ce925f55604051908152a1005b60405162461bcd60e51b815260206004820152601860248201527f436f756e742063616e6e6f74206265206e6567617469766500000000000000006044820152606490fd5b346100a9575f3660031901126100a95760205f54604051908152f35b346100a9575f3660031901126100a9576020905f548152f3fea2646970667358221220471585b420a1ad0093820ff10129ec863f6df4bec186546249391fbc3cdbaa7c64736f6c634300081e0033")] - contract TestCounter { - uint256 public count; - - #[derive(Debug)] - event CountIncreased(uint256 newCount); - #[derive(Debug)] - event CountDecreased(uint256 newCount); - - function increase() public { - count += 1; - emit CountIncreased(count); - } - - function decrease() public { - require(count > 0, "Count cannot be negative"); - count -= 1; - emit CountDecreased(count); - } - - function getCount() public view returns (uint256) { - return count; - } - } -} +use crate::common::{ + TestCounter::{self, CountIncreased}, + build_provider, spawn_anvil, + test_counter::deploy_counter, +}; pub struct ScannerSetup where P: Provider + Clone, { - pub provider: RootProvider, + pub provider: RobustProvider, pub contract: TestCounter::TestCounterInstance

, pub scanner: S, pub stream: ReceiverStream, + #[allow(dead_code)] pub anvil: AnvilInstance, } @@ -67,17 +40,16 @@ pub async fn setup_common( filter: Option, ) -> anyhow::Result<( AnvilInstance, - RootProvider, + RobustProvider, TestCounter::TestCounterInstance, EventFilter, )> { let anvil = spawn_anvil(block_interval)?; let provider = build_provider(&anvil).await?; - let contract = deploy_counter(provider.clone()).await?; + let contract = deploy_counter(provider.primary().clone()).await?; - let default_filter = EventFilter::new() - .contract_address(*contract.address()) - .event(TestCounter::CountIncreased::SIGNATURE); + let default_filter = + EventFilter::new().contract_address(*contract.address()).event(CountIncreased::SIGNATURE); let filter = filter.unwrap_or(default_filter); @@ -91,8 +63,10 @@ pub async fn setup_live_scanner( ) -> anyhow::Result + Clone>> { let (anvil, provider, contract, filter) = setup_common(block_interval, filter).await?; - let mut scanner = - EventScannerBuilder::live().block_confirmations(confirmations).connect(provider.clone()); + let mut scanner = EventScannerBuilder::live() + .block_confirmations(confirmations) + .connect(provider.clone()) + .await?; let stream = scanner.subscribe(filter); @@ -110,7 +84,8 @@ pub async fn setup_sync_scanner( let mut scanner = EventScannerBuilder::sync() .from_block(from) .block_confirmations(confirmations) - .connect(provider.clone()); + .connect(provider.clone()) + .await?; let stream = scanner.subscribe(filter); @@ -128,7 +103,8 @@ pub async fn setup_sync_from_latest_scanner( let mut scanner = EventScannerBuilder::sync() .from_latest(latest) .block_confirmations(confirmations) - .connect(provider.clone()); + .connect(provider.clone()) + .await?; let stream = scanner.subscribe(filter); @@ -142,9 +118,11 @@ pub async fn setup_historic_scanner( to: BlockNumberOrTag, ) -> anyhow::Result + Clone>> { let (anvil, provider, contract, filter) = setup_common(block_interval, filter).await?; - - let mut scanner = - EventScannerBuilder::historic().from_block(from).to_block(to).connect(provider.clone()); + let mut scanner = EventScannerBuilder::historic() + .from_block(from) + .to_block(to) + .connect(provider.clone()) + .await?; let stream = scanner.subscribe(filter); @@ -167,32 +145,9 @@ pub async fn setup_latest_scanner( builder = builder.to_block(t); } - let mut scanner = builder.connect_ws(anvil.ws_endpoint_url()).await?; + let mut scanner = builder.connect(provider.clone()).await?; let stream = scanner.subscribe(filter); Ok(ScannerSetup { provider, contract, scanner, stream, anvil }) } - -pub fn spawn_anvil(block_time: Option) -> anyhow::Result { - let mut anvil = Anvil::new(); - if let Some(block_time) = block_time { - anvil = anvil.block_time_f64(block_time); - } - Ok(anvil.try_spawn()?) -} - -pub async fn build_provider(anvil: &AnvilInstance) -> anyhow::Result { - let wallet = anvil.wallet().expect("anvil should return a default wallet"); - let provider = - ProviderBuilder::new().wallet(wallet).connect(anvil.ws_endpoint_url().as_str()).await?; - Ok(provider.root().to_owned()) -} - -pub async fn deploy_counter

(provider: P) -> anyhow::Result> -where - P: alloy::providers::Provider, -{ - let contract = TestCounter::deploy(provider).await?; - Ok(contract) -} diff --git a/tests/common/test_counter.rs b/tests/common/test_counter.rs new file mode 100644 index 00000000..c1f1dee2 --- /dev/null +++ b/tests/common/test_counter.rs @@ -0,0 +1,38 @@ +use alloy::{network::Ethereum, sol}; + +// Shared test contract used across integration tests +sol! { + // Built directly with solc 0.8.30+commit.73712a01.Darwin.appleclang + #[sol(rpc, bytecode="608080604052346015576101b0908161001a8239f35b5f80fdfe6080806040526004361015610012575f80fd5b5f3560e01c90816306661abd1461016157508063a87d942c14610145578063d732d955146100ad5763e8927fbc14610048575f80fd5b346100a9575f3660031901126100a9575f5460018101809111610095576020817f7ca2ca9527391044455246730762df008a6b47bbdb5d37a890ef78394535c040925f55604051908152a1005b634e487b7160e01b5f52601160045260245ffd5b5f80fd5b346100a9575f3660031901126100a9575f548015610100575f198101908111610095576020817f53a71f16f53e57416424d0d18ccbd98504d42a6f98fe47b09772d8f357c620ce925f55604051908152a1005b60405162461bcd60e51b815260206004820152601860248201527f436f756e742063616e6e6f74206265206e6567617469766500000000000000006044820152606490fd5b346100a9575f3660031901126100a95760205f54604051908152f35b346100a9575f3660031901126100a9576020905f548152f3fea2646970667358221220471585b420a1ad0093820ff10129ec863f6df4bec186546249391fbc3cdbaa7c64736f6c634300081e0033")] + contract TestCounter { + uint256 public count; + + #[derive(Debug)] + event CountIncreased(uint256 newCount); + #[derive(Debug)] + event CountDecreased(uint256 newCount); + + function increase() public { + count += 1; + emit CountIncreased(count); + } + + function decrease() public { + require(count > 0, "Count cannot be negative"); + count -= 1; + emit CountDecreased(count); + } + + function getCount() public view returns (uint256) { + return count; + } + } +} + +pub async fn deploy_counter

(provider: P) -> anyhow::Result> +where + P: alloy::providers::Provider, +{ + let contract = TestCounter::deploy(provider).await?; + Ok(contract) +} diff --git a/tests/latest_events/basic.rs b/tests/latest_events/basic.rs index 28e16709..5d07f9ee 100644 --- a/tests/latest_events/basic.rs +++ b/tests/latest_events/basic.rs @@ -1,9 +1,5 @@ use alloy::{ - eips::BlockNumberOrTag, - network::Ethereum, - primitives::U256, - providers::{Provider, ext::AnvilApi}, - sol_types::SolEvent, + eips::BlockNumberOrTag, primitives::U256, providers::ext::AnvilApi, sol_types::SolEvent, }; use crate::common::{TestCounter, deploy_counter, setup_common, setup_latest_scanner}; @@ -86,7 +82,7 @@ async fn latest_scanner_no_events_returns_empty() -> anyhow::Result<()> { #[tokio::test] async fn latest_scanner_respects_range_subset() -> anyhow::Result<()> { - let (anvil, provider, contract, default_filter) = setup_common(None, None).await?; + let (_anvil, provider, contract, default_filter) = setup_common(None, None).await?; // Mine 6 events, one per tx (auto-mined), then manually mint 2 empty blocks to widen range contract.increase().send().await?.watch().await?; contract.increase().send().await?.watch().await?; @@ -96,18 +92,15 @@ async fn latest_scanner_respects_range_subset() -> anyhow::Result<()> { contract.increase().send().await?.watch().await?; // manual empty block minting - provider.anvil_mine(Some(2), None).await?; + provider.primary().anvil_mine(Some(2), None).await?; let head = provider.get_block_number().await?; // Choose a subrange covering last 4 blocks let start = BlockNumberOrTag::from(head - 3); let end = BlockNumberOrTag::from(head); - let mut scanner_with_range = EventScannerBuilder::latest(10) - .from_block(start) - .to_block(end) - .connect_ws::(anvil.ws_endpoint_url()) - .await?; + let mut scanner_with_range = + EventScannerBuilder::latest(10).from_block(start).to_block(end).connect(provider).await?; let mut stream_with_range = scanner_with_range.subscribe(default_filter); scanner_with_range.start().await?; @@ -279,7 +272,7 @@ async fn latest_scanner_ignores_non_tracked_contract() -> anyhow::Result<()> { let scanner = setup.scanner; let contract_a = setup.contract; - let contract_b = deploy_counter(provider).await?; + let contract_b = deploy_counter(provider.primary()).await?; // Listener only for contract A CountIncreased let mut stream_a = setup.stream; @@ -309,14 +302,14 @@ async fn latest_scanner_ignores_non_tracked_contract() -> anyhow::Result<()> { #[tokio::test] async fn latest_scanner_large_gaps_and_empty_ranges() -> anyhow::Result<()> { // Manual setup to mine empty blocks - let (anvil, provider, contract, default_filter) = setup_common(None, None).await?; + let (_anvil, provider, contract, default_filter) = setup_common(None, None).await?; // Emit 2 events contract.increase().send().await?.watch().await?; contract.increase().send().await?.watch().await?; // Mine 10 empty blocks - provider.anvil_mine(Some(10), None).await?; + provider.primary().anvil_mine(Some(10), None).await?; // Emit 1 more event contract.increase().send().await?.watch().await?; @@ -324,11 +317,8 @@ async fn latest_scanner_large_gaps_and_empty_ranges() -> anyhow::Result<()> { let start = BlockNumberOrTag::from(head - 12); let end = BlockNumberOrTag::from(head); - let mut scanner_with_range = EventScannerBuilder::latest(5) - .from_block(start) - .to_block(end) - .connect_ws::(anvil.ws_endpoint_url()) - .await?; + let mut scanner_with_range = + EventScannerBuilder::latest(5).from_block(start).to_block(end).connect(provider).await?; let mut stream_with_range = scanner_with_range.subscribe(default_filter); scanner_with_range.start().await?; @@ -348,7 +338,7 @@ async fn latest_scanner_large_gaps_and_empty_ranges() -> anyhow::Result<()> { #[tokio::test] async fn latest_scanner_boundary_range_single_block() -> anyhow::Result<()> { - let (anvil, _provider, contract, default_filter) = setup_common(None, None).await?; + let (_anvil, provider, contract, default_filter) = setup_common(None, None).await?; contract.increase().send().await?.watch().await?; let receipt = contract.increase().send().await?.get_receipt().await?; @@ -358,11 +348,8 @@ async fn latest_scanner_boundary_range_single_block() -> anyhow::Result<()> { let start = BlockNumberOrTag::from(receipt.block_number.unwrap()); let end = start; - let mut scanner_with_range = EventScannerBuilder::latest(5) - .from_block(start) - .to_block(end) - .connect_ws::(anvil.ws_endpoint_url()) - .await?; + let mut scanner_with_range = + EventScannerBuilder::latest(5).from_block(start).to_block(end).connect(provider).await?; let mut stream_with_range = scanner_with_range.subscribe(default_filter); scanner_with_range.start().await?; diff --git a/tests/live/basic.rs b/tests/live/basic.rs index 7ed53f51..cab23c0d 100644 --- a/tests/live/basic.rs +++ b/tests/live/basic.rs @@ -65,7 +65,7 @@ async fn multiple_contracts_same_event_isolate_callbacks() -> anyhow::Result<()> let setup = setup_live_scanner(Some(0.1), None, 0).await?; let provider = setup.provider.clone(); let a = setup.contract.clone(); - let b = deploy_counter(provider.clone()).await?; + let b = deploy_counter(provider.primary().clone()).await?; let a_filter = EventFilter::new() .contract_address(*a.address()) diff --git a/tests/live/reorg.rs b/tests/live/reorg.rs index 87b0ff1f..30af9e3a 100644 --- a/tests/live/reorg.rs +++ b/tests/live/reorg.rs @@ -34,7 +34,8 @@ async fn reorg_rescans_events_within_same_block() -> anyhow::Result<()> { (TransactionData::JSON(contract.increase().into_transaction_request()), 0), (TransactionData::JSON(contract.increase().into_transaction_request()), 0), ]; - provider.anvil_reorg(ReorgOptions { depth: 4, tx_block_pairs }).await?; + + provider.primary().anvil_reorg(ReorgOptions { depth: 4, tx_block_pairs }).await?; // assert expected messages post-reorg assert_next!(stream, ScannerStatus::ReorgDetected); @@ -77,7 +78,8 @@ async fn reorg_rescans_events_with_ascending_blocks() -> anyhow::Result<()> { (TransactionData::JSON(contract.increase().into_transaction_request()), 1), (TransactionData::JSON(contract.increase().into_transaction_request()), 2), ]; - provider.anvil_reorg(ReorgOptions { depth: 4, tx_block_pairs }).await?; + + provider.primary().anvil_reorg(ReorgOptions { depth: 4, tx_block_pairs }).await?; // assert expected messages post-reorg assert_next!(stream, ScannerStatus::ReorgDetected); @@ -111,7 +113,8 @@ async fn reorg_depth_one() -> anyhow::Result<()> { // reorg the chain let tx_block_pairs = vec![(TransactionData::JSON(contract.increase().into_transaction_request()), 0)]; - provider.anvil_reorg(ReorgOptions { depth: 1, tx_block_pairs }).await?; + + provider.primary().anvil_reorg(ReorgOptions { depth: 1, tx_block_pairs }).await?; // assert expected messages post-reorg assert_next!(stream, ScannerStatus::ReorgDetected); @@ -143,7 +146,8 @@ async fn reorg_depth_two() -> anyhow::Result<()> { // reorg the chain let tx_block_pairs = vec![(TransactionData::JSON(contract.increase().into_transaction_request()), 0)]; - provider.anvil_reorg(ReorgOptions { depth: 2, tx_block_pairs }).await?; + + provider.primary().anvil_reorg(ReorgOptions { depth: 2, tx_block_pairs }).await?; // assert expected messages post-reorg assert_next!(stream, ScannerStatus::ReorgDetected); @@ -162,7 +166,7 @@ async fn block_confirmations_mitigate_reorgs() -> anyhow::Result<()> { scanner.start().await?; // mine some initial blocks - provider.anvil_mine(Some(10), None).await?; + provider.primary().anvil_mine(Some(10), None).await?; // emit initial events for _ in 0..4 { @@ -179,13 +183,14 @@ async fn block_confirmations_mitigate_reorgs() -> anyhow::Result<()> { (TransactionData::JSON(contract.increase().into_transaction_request()), 0), (TransactionData::JSON(contract.increase().into_transaction_request()), 0), ]; - provider.anvil_reorg(ReorgOptions { depth: 2, tx_block_pairs }).await?; + + provider.primary().anvil_reorg(ReorgOptions { depth: 2, tx_block_pairs }).await?; // assert that still no events have been streamed let mut stream = assert_empty!(stream); - // mine some additional post-reorg blocks to confirm previous blocks with logs - provider.anvil_mine(Some(10), None).await?; + // mine some additional post-reorg blocks + provider.primary().anvil_mine(Some(10), None).await?; // no `ReorgDetected` should be emitted assert_next!(stream, &[CountIncreased { newCount: U256::from(1) }]); diff --git a/tests/sync/from_block.rs b/tests/sync/from_block.rs index 45dba8e6..1b146791 100644 --- a/tests/sync/from_block.rs +++ b/tests/sync/from_block.rs @@ -115,13 +115,13 @@ async fn block_confirmations_mitigate_reorgs() -> anyhow::Result<()> { (TransactionData::JSON(contract.increase().into_transaction_request()), 0), (TransactionData::JSON(contract.increase().into_transaction_request()), 0), ]; - provider.anvil_reorg(ReorgOptions { depth: 2, tx_block_pairs }).await?; + provider.primary().anvil_reorg(ReorgOptions { depth: 2, tx_block_pairs }).await?; // assert that still no events have been streamed let mut stream = assert_empty!(stream); // mine some additional post-reorg blocks to confirm previous blocks with logs - provider.anvil_mine(Some(10), None).await?; + provider.primary().anvil_mine(Some(10), None).await?; // no `ReorgDetected` should be emitted assert_next!(stream, &[TestCounter::CountIncreased { newCount: U256::from(5) }]); diff --git a/tests/sync/from_latest.rs b/tests/sync/from_latest.rs index f5b03c49..1cafed26 100644 --- a/tests/sync/from_latest.rs +++ b/tests/sync/from_latest.rs @@ -142,12 +142,12 @@ async fn scan_latest_then_live_boundary_no_duplication() -> anyhow::Result<()> { // Historical: emit 3, mine 1 empty block to form a clear boundary contract.increase().send().await?.watch().await?; - provider.anvil_mine(Some(1), None).await?; + provider.primary().anvil_mine(Some(1), None).await?; contract.increase().send().await?.watch().await?; contract.increase().send().await?.watch().await?; - provider.anvil_mine(Some(1), None).await?; + provider.primary().anvil_mine(Some(1), None).await?; scanner.start().await?;