diff --git a/src/error.rs b/src/error.rs index e7ebedb..c9a83af 100644 --- a/src/error.rs +++ b/src/error.rs @@ -21,6 +21,15 @@ pub enum ScannerError { #[error("Operation timed out")] Timeout, + + #[error("{0} {1} exceeds the latest block {2}")] + BlockExceedsLatest(&'static str, u64, u64), + + #[error("Event count must be greater than 0")] + InvalidEventCount, + + #[error("Max block range must be greater than 0")] + InvalidMaxBlockRange, } impl From for ScannerError { diff --git a/src/event_scanner/scanner/historic.rs b/src/event_scanner/scanner/historic.rs index eeac822..164c503 100644 --- a/src/event_scanner/scanner/historic.rs +++ b/src/event_scanner/scanner/historic.rs @@ -4,6 +4,7 @@ use super::common::{ConsumerMode, handle_stream}; use crate::{ EventScannerBuilder, ScannerError, event_scanner::scanner::{EventScanner, Historic}, + robust_provider::IntoRobustProvider, }; impl EventScannerBuilder { @@ -18,6 +19,40 @@ impl EventScannerBuilder { self.config.to_block = block.into(); self } + + /// Connects to an existing provider with block range validation. + /// + /// Validates that the maximum of `from_block` and `to_block` does not exceed + /// the latest block on the chain. + /// + /// # Errors + /// + /// Returns an error if: + /// * The provider connection fails + /// * The specified block range exceeds the latest block on the chain + /// * The max block range is zero + pub async fn connect( + self, + provider: impl IntoRobustProvider, + ) -> Result, ScannerError> { + let scanner = self.build(provider).await?; + + let provider = scanner.block_range_scanner.provider(); + let latest_block = provider.get_block_number().await?; + + let from_num = scanner.config.from_block.as_number().unwrap_or(0); + let to_num = scanner.config.to_block.as_number().unwrap_or(0); + + if from_num > latest_block { + Err(ScannerError::BlockExceedsLatest("from_block", from_num, latest_block))?; + } + + if to_num > latest_block { + Err(ScannerError::BlockExceedsLatest("to_block", to_num, latest_block))?; + } + + Ok(scanner) + } } impl EventScanner { @@ -52,6 +87,12 @@ impl EventScanner { #[cfg(test)] mod tests { use super::*; + use alloy::{ + network::Ethereum, + providers::{Provider, ProviderBuilder, RootProvider, mock::Asserter}, + rpc::client::RpcClient, + }; + use alloy_node_bindings::Anvil; #[test] fn test_historic_scanner_builder_pattern() { @@ -88,4 +129,81 @@ mod tests { assert!(matches!(builder.config.from_block, BlockNumberOrTag::Number(2))); assert!(matches!(builder.config.to_block, BlockNumberOrTag::Number(200))); } + + #[tokio::test] + async fn test_from_block_above_latest_returns_error() { + let anvil = Anvil::new().try_spawn().unwrap(); + let provider = ProviderBuilder::new().connect_http(anvil.endpoint_url()); + + let latest_block = provider.get_block_number().await.unwrap(); + + let result = EventScannerBuilder::historic() + .from_block(latest_block + 100) + .to_block(latest_block) + .connect(provider) + .await; + + match result { + Err(ScannerError::BlockExceedsLatest("from_block", max, latest)) => { + assert_eq!(max, latest_block + 100); + assert_eq!(latest, latest_block); + } + _ => panic!("Expected BlockExceedsLatest error"), + } + } + + #[tokio::test] + async fn test_to_block_above_latest_returns_error() { + let anvil = Anvil::new().try_spawn().unwrap(); + let provider = ProviderBuilder::new().connect_http(anvil.endpoint_url()); + + let latest_block = provider.get_block_number().await.unwrap(); + + let result = EventScannerBuilder::historic() + .from_block(0) + .to_block(latest_block + 100) + .connect(provider) + .await; + + match result { + Err(ScannerError::BlockExceedsLatest("to_block", max, latest)) => { + assert_eq!(max, latest_block + 100); + assert_eq!(latest, latest_block); + } + _ => panic!("Expected BlockExceedsLatest error"), + } + } + + #[tokio::test] + async fn test_to_and_from_block_above_latest_returns_error() { + let anvil = Anvil::new().try_spawn().unwrap(); + let provider = ProviderBuilder::new().connect_http(anvil.endpoint_url()); + + let latest_block = provider.get_block_number().await.unwrap(); + + let result = EventScannerBuilder::historic() + .from_block(latest_block + 50) + .to_block(latest_block + 100) + .connect(provider) + .await; + + match result { + Err(ScannerError::BlockExceedsLatest("from_block", max, latest)) => { + assert_eq!(max, latest_block + 50); + assert_eq!(latest, latest_block); + } + _ => panic!("Expected BlockExceedsLatest error for 'from_block'"), + } + } + + #[tokio::test] + async fn test_historic_returns_error_with_zero_max_block_range() { + let provider = RootProvider::::new(RpcClient::mocked(Asserter::new())); + let result = EventScannerBuilder::historic().max_block_range(0).connect(provider).await; + + match result { + Err(ScannerError::InvalidMaxBlockRange) => {} + _ => panic!("Expected InvalidMaxBlockRange error"), + } + } } diff --git a/src/event_scanner/scanner/latest.rs b/src/event_scanner/scanner/latest.rs index da5ccea..becf1be 100644 --- a/src/event_scanner/scanner/latest.rs +++ b/src/event_scanner/scanner/latest.rs @@ -4,6 +4,7 @@ use super::common::{ConsumerMode, handle_stream}; use crate::{ EventScannerBuilder, ScannerError, event_scanner::{EventScanner, LatestEvents}, + robust_provider::IntoRobustProvider, }; impl EventScannerBuilder { @@ -24,6 +25,24 @@ impl EventScannerBuilder { self.config.to_block = block.into(); self } + + /// Connects to an existing provider. + /// + /// # Errors + /// + /// Returns an error if: + /// * The provider connection fails + /// * The event count is zero + /// * The max block range is zero + pub async fn connect( + self, + provider: impl IntoRobustProvider, + ) -> Result, ScannerError> { + if self.config.count == 0 { + return Err(ScannerError::InvalidEventCount); + } + self.build(provider).await + } } impl EventScanner { @@ -63,6 +82,12 @@ impl EventScanner { #[cfg(test)] mod tests { + use alloy::{ + network::Ethereum, + providers::{RootProvider, mock::Asserter}, + rpc::client::RpcClient, + }; + use super::*; #[test] @@ -111,4 +136,26 @@ mod tests { assert_eq!(builder.config.block_confirmations, 7); assert_eq!(builder.block_range_scanner.max_block_range, 60); } + + #[tokio::test] + async fn test_latest_returns_error_with_zero_count() { + let provider = RootProvider::::new(RpcClient::mocked(Asserter::new())); + let result = EventScannerBuilder::latest(0).connect(provider).await; + + match result { + Err(ScannerError::InvalidEventCount) => {} + _ => panic!("Expected InvalidEventCount error"), + } + } + + #[tokio::test] + async fn test_latest_returns_error_with_zero_max_block_range() { + let provider = RootProvider::::new(RpcClient::mocked(Asserter::new())); + let result = EventScannerBuilder::latest(10).max_block_range(0).connect(provider).await; + + match result { + Err(ScannerError::InvalidMaxBlockRange) => {} + _ => panic!("Expected InvalidMaxBlockRange error"), + } + } } diff --git a/src/event_scanner/scanner/live.rs b/src/event_scanner/scanner/live.rs index df1fa8d..3df67ae 100644 --- a/src/event_scanner/scanner/live.rs +++ b/src/event_scanner/scanner/live.rs @@ -4,6 +4,7 @@ use super::common::{ConsumerMode, handle_stream}; use crate::{ EventScannerBuilder, ScannerError, event_scanner::{EventScanner, scanner::Live}, + robust_provider::IntoRobustProvider, }; impl EventScannerBuilder { @@ -12,6 +13,20 @@ impl EventScannerBuilder { self.config.block_confirmations = confirmations; self } + + /// Connects to an existing provider. + /// + /// # Errors + /// + /// Returns an error if: + /// * The provider connection fails + /// * The max block range is zero + pub async fn connect( + self, + provider: impl IntoRobustProvider, + ) -> Result, ScannerError> { + self.build(provider).await + } } impl EventScanner { @@ -45,6 +60,12 @@ impl EventScanner { #[cfg(test)] mod tests { + use alloy::{ + network::Ethereum, + providers::{RootProvider, mock::Asserter}, + rpc::client::RpcClient, + }; + use super::*; #[test] @@ -76,4 +97,15 @@ mod tests { assert_eq!(builder.block_range_scanner.max_block_range, 105); assert_eq!(builder.config.block_confirmations, 8); } + + #[tokio::test] + async fn test_live_returns_error_with_zero_max_block_range() { + let provider = RootProvider::::new(RpcClient::mocked(Asserter::new())); + let result = EventScannerBuilder::live().max_block_range(0).connect(provider).await; + + match result { + Err(ScannerError::InvalidMaxBlockRange) => {} + _ => panic!("Expected InvalidMaxBlockRange error"), + } + } } diff --git a/src/event_scanner/scanner/mod.rs b/src/event_scanner/scanner/mod.rs index 1a13a5f..a2d8b6f 100644 --- a/src/event_scanner/scanner/mod.rs +++ b/src/event_scanner/scanner/mod.rs @@ -231,7 +231,7 @@ impl EventScannerBuilder { /// # Example /// /// ```no_run - /// # use alloy::{network::Ethereum, primitives::Address, providers::{Provider, ProviderBuilder}}; + /// # use alloy::{network::Ethereum, providers::{Provider, ProviderBuilder}}; /// # use event_scanner::{EventFilter, EventScannerBuilder, Message, robust_provider::RobustProviderBuilder}; /// # use tokio_stream::StreamExt; /// # @@ -298,7 +298,7 @@ impl EventScannerBuilder { /// /// # Arguments /// - /// * `count` - Maximum number of recent events to collect per listener + /// * `count` - Maximum number of recent events to collect per listener (must be greater than 0) /// /// # Reorg behavior /// @@ -373,7 +373,7 @@ impl EventScannerBuilder { /// /// # Arguments /// - /// * `max_block_range` - Maximum number of blocks to process per batch. + /// * `max_block_range` - Maximum number of blocks to process per batch (must be greater than 0) /// /// # Example /// @@ -388,17 +388,16 @@ impl EventScannerBuilder { self } - /// Connects to an existing provider. - /// - /// Final builder method: consumes the builder and returns the built [`EventScanner`]. - /// - /// # Errors + /// Builds the scanner by connecting to an existing provider. /// - /// Returns an error if the provider connection fails. - pub async fn connect( + /// This is a shared method used internally by scanner-specific `connect()` methods. + async fn build( self, provider: impl IntoRobustProvider, ) -> Result, ScannerError> { + if self.block_range_scanner.max_block_range == 0 { + return Err(ScannerError::InvalidMaxBlockRange); + } let block_range_scanner = self.block_range_scanner.connect::(provider).await?; Ok(EventScanner { config: self.config, block_range_scanner, listeners: Vec::new() }) } @@ -458,7 +457,7 @@ mod tests { #[tokio::test] async fn test_historic_event_stream_listeners_vector_updates() -> anyhow::Result<()> { let provider = RootProvider::::new(RpcClient::mocked(Asserter::new())); - let mut scanner = EventScannerBuilder::historic().connect(provider).await?; + let mut scanner = EventScannerBuilder::historic().build(provider).await?; assert!(scanner.listeners.is_empty()); @@ -475,7 +474,7 @@ mod tests { #[tokio::test] async fn test_historic_event_stream_channel_capacity() -> anyhow::Result<()> { let provider = RootProvider::::new(RpcClient::mocked(Asserter::new())); - let mut scanner = EventScannerBuilder::historic().connect(provider).await?; + let mut scanner = EventScannerBuilder::historic().build(provider).await?; let _ = scanner.subscribe(EventFilter::new()); @@ -484,4 +483,20 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_latest_returns_error_with_zero_count() { + use alloy::{ + providers::{RootProvider, mock::Asserter}, + rpc::client::RpcClient, + }; + + let provider = RootProvider::::new(RpcClient::mocked(Asserter::new())); + let result = EventScannerBuilder::latest(0).connect(provider).await; + + match result { + Err(ScannerError::InvalidEventCount) => {} + _ => panic!("Expected InvalidEventCount error"), + } + } } diff --git a/src/event_scanner/scanner/sync/from_block.rs b/src/event_scanner/scanner/sync/from_block.rs index 9e62680..fbf137f 100644 --- a/src/event_scanner/scanner/sync/from_block.rs +++ b/src/event_scanner/scanner/sync/from_block.rs @@ -6,6 +6,7 @@ use crate::{ EventScanner, SyncFromBlock, scanner::common::{ConsumerMode, handle_stream}, }, + robust_provider::IntoRobustProvider, }; impl EventScannerBuilder { @@ -14,6 +15,20 @@ impl EventScannerBuilder { self.config.block_confirmations = confirmations; self } + + /// Connects to an existing provider. + /// + /// # Errors + /// + /// Returns an error if: + /// * The provider connection fails + /// * The max block range is zero + pub async fn connect( + self, + provider: impl IntoRobustProvider, + ) -> Result, ScannerError> { + self.build(provider).await + } } impl EventScanner { @@ -48,7 +63,12 @@ impl EventScanner { #[cfg(test)] mod tests { - use alloy::eips::BlockNumberOrTag; + use alloy::{ + eips::BlockNumberOrTag, + network::Ethereum, + providers::{RootProvider, mock::Asserter}, + rpc::client::RpcClient, + }; use super::*; @@ -98,4 +118,16 @@ mod tests { assert!(matches!(builder.config.from_block, BlockNumberOrTag::Number(2))); assert_eq!(builder.config.block_confirmations, 7); } + + #[tokio::test] + async fn test_sync_from_block_returns_error_with_zero_max_block_range() { + let provider = RootProvider::::new(RpcClient::mocked(Asserter::new())); + let result = + EventScannerBuilder::sync().from_block(100).max_block_range(0).connect(provider).await; + + match result { + Err(ScannerError::InvalidMaxBlockRange) => {} + _ => panic!("Expected InvalidMaxBlockRange error"), + } + } } diff --git a/src/event_scanner/scanner/sync/from_latest.rs b/src/event_scanner/scanner/sync/from_latest.rs index 2775371..e5623f5 100644 --- a/src/event_scanner/scanner/sync/from_latest.rs +++ b/src/event_scanner/scanner/sync/from_latest.rs @@ -18,6 +18,7 @@ use crate::{ common::{ConsumerMode, handle_stream}, }, }, + robust_provider::IntoRobustProvider, }; impl EventScannerBuilder { @@ -26,6 +27,24 @@ impl EventScannerBuilder { self.config.block_confirmations = confirmations; self } + + /// Connects to an existing provider. + /// + /// # Errors + /// + /// Returns an error if: + /// * The provider connection fails + /// * The event count is zero + /// * The max block range is zero + pub async fn connect( + self, + provider: impl IntoRobustProvider, + ) -> Result, ScannerError> { + if self.config.count == 0 { + return Err(ScannerError::InvalidEventCount); + } + self.build(provider).await + } } impl EventScanner { @@ -103,3 +122,37 @@ impl EventScanner { Ok(()) } } + +#[cfg(test)] +mod tests { + use alloy::{ + network::Ethereum, + providers::{RootProvider, mock::Asserter}, + rpc::client::RpcClient, + }; + + use super::*; + + #[tokio::test] + async fn test_sync_from_latest_returns_error_with_zero_count() { + let provider = RootProvider::::new(RpcClient::mocked(Asserter::new())); + let result = EventScannerBuilder::sync().from_latest(0).connect(provider).await; + + match result { + Err(ScannerError::InvalidEventCount) => {} + _ => panic!("Expected InvalidEventCount error"), + } + } + + #[tokio::test] + async fn test_sync_from_latest_returns_error_with_zero_max_block_range() { + let provider = RootProvider::::new(RpcClient::mocked(Asserter::new())); + let result = + EventScannerBuilder::sync().from_latest(10).max_block_range(0).connect(provider).await; + + match result { + Err(ScannerError::InvalidMaxBlockRange) => {} + _ => panic!("Expected InvalidMaxBlockRange error"), + } + } +} diff --git a/src/event_scanner/scanner/sync/mod.rs b/src/event_scanner/scanner/sync/mod.rs index ffdae09..db351a8 100644 --- a/src/event_scanner/scanner/sync/mod.rs +++ b/src/event_scanner/scanner/sync/mod.rs @@ -82,7 +82,7 @@ impl EventScannerBuilder { /// # Arguments /// /// * `count` - Maximum number of recent events to collect per listener before switching to live - /// streaming + /// streaming (must be greater than 0) /// /// # Important notes ///