diff --git a/crates/fetchkit/src/client.rs b/crates/fetchkit/src/client.rs index 1797f53..80aef6f 100644 --- a/crates/fetchkit/src/client.rs +++ b/crates/fetchkit/src/client.rs @@ -157,6 +157,91 @@ pub async fn fetch_with_options( registry.fetch(req, options).await } +/// Default concurrency limit for batch fetching. +const DEFAULT_BATCH_CONCURRENCY: usize = 5; + +/// Fetch multiple URLs concurrently. +/// +/// Each URL is processed independently — one failure doesn't affect others. +/// Results are returned in the same order as the input requests. +/// Concurrency is limited to `concurrency` (default: 5). +/// +/// # Examples +/// +/// ```no_run +/// use fetchkit::{FetchRequest, batch_fetch}; +/// +/// # async fn example() -> Result<(), fetchkit::FetchError> { +/// let requests = vec![ +/// FetchRequest::new("https://example.com").as_markdown(), +/// FetchRequest::new("https://example.org").as_markdown(), +/// ]; +/// let results = batch_fetch(requests, None).await; +/// for result in &results { +/// match result { +/// Ok(resp) => println!("{}: {}", resp.url, resp.status_code), +/// Err(e) => println!("Error: {}", e), +/// } +/// } +/// # Ok(()) +/// # } +/// ``` +pub async fn batch_fetch( + requests: Vec, + concurrency: Option, +) -> Vec> { + let options = FetchOptions { + enable_markdown: true, + enable_text: true, + ..Default::default() + }; + batch_fetch_with_options(requests, options, concurrency).await +} + +/// Fetch multiple URLs concurrently with custom options. +/// +/// Each URL is processed independently with the shared options. +/// Returns results in the same order as input requests. +pub async fn batch_fetch_with_options( + requests: Vec, + options: FetchOptions, + concurrency: Option, +) -> Vec> { + use futures::stream::{self, StreamExt}; + use std::sync::Arc; + + let concurrency = concurrency.unwrap_or(DEFAULT_BATCH_CONCURRENCY).max(1); + let num_requests = requests.len(); + let options = Arc::new(options); + + let mut indexed_results: Vec<(usize, Result)> = + stream::iter(requests.into_iter().enumerate()) + .map(|(idx, req)| { + let options = Arc::clone(&options); + async move { + let registry = FetcherRegistry::with_defaults(); + let result = registry.fetch(req, (*options).clone()).await; + (idx, result) + } + }) + .buffer_unordered(concurrency) + .collect() + .await; + + // Sort by original index to preserve request order + indexed_results.sort_by_key(|(idx, _)| *idx); + + // Pre-fill with errors, then replace with actual results + let mut results: Vec> = (0..num_requests) + .map(|_| Err(FetchError::MissingUrl)) + .collect(); + for (idx, result) in indexed_results { + results[idx] = result; + } + + results +} + #[cfg(test)] mod tests { use super::*; @@ -229,4 +314,110 @@ mod tests { Err(FetchError::BlockedUrl) )); } + + #[tokio::test] + async fn test_batch_fetch_multiple_urls() { + use wiremock::matchers::{method, path}; + use wiremock::{Mock, MockServer, ResponseTemplate}; + + let server = MockServer::start().await; + Mock::given(method("GET")) + .and(path("/page1")) + .respond_with( + ResponseTemplate::new(200) + .set_body_string("Page 1") + .insert_header("content-type", "text/plain"), + ) + .mount(&server) + .await; + Mock::given(method("GET")) + .and(path("/page2")) + .respond_with( + ResponseTemplate::new(200) + .set_body_string("Page 2") + .insert_header("content-type", "text/plain"), + ) + .mount(&server) + .await; + + let requests = vec![ + FetchRequest::new(format!("{}/page1", server.uri())), + FetchRequest::new(format!("{}/page2", server.uri())), + ]; + + let options = FetchOptions { + enable_markdown: true, + dns_policy: DnsPolicy::allow_all(), + ..Default::default() + }; + let results = batch_fetch_with_options(requests, options, None).await; + + assert_eq!(results.len(), 2); + assert!(results[0] + .as_ref() + .unwrap() + .content + .as_deref() + .unwrap() + .contains("Page 1")); + assert!(results[1] + .as_ref() + .unwrap() + .content + .as_deref() + .unwrap() + .contains("Page 2")); + } + + #[tokio::test] + async fn test_batch_fetch_partial_failure() { + use wiremock::matchers::{method, path}; + use wiremock::{Mock, MockServer, ResponseTemplate}; + + let server = MockServer::start().await; + Mock::given(method("GET")) + .and(path("/ok")) + .respond_with( + ResponseTemplate::new(200) + .set_body_string("OK") + .insert_header("content-type", "text/plain"), + ) + .mount(&server) + .await; + + let requests = vec![ + FetchRequest::new(format!("{}/ok", server.uri())), + FetchRequest::new(""), // Will fail with MissingUrl + ]; + + let options = FetchOptions { + dns_policy: DnsPolicy::allow_all(), + ..Default::default() + }; + let results = batch_fetch_with_options(requests, options, None).await; + + assert_eq!(results.len(), 2); + assert!(results[0].is_ok()); + assert!(results[1].is_err()); + } + + #[tokio::test] + async fn test_batch_fetch_respects_concurrency_limit() { + // Just verify it works with concurrency=1 + let requests = vec![ + FetchRequest::new(""), // Will fail + FetchRequest::new(""), // Will fail + ]; + + let results = batch_fetch(requests, Some(1)).await; + assert_eq!(results.len(), 2); + assert!(results[0].is_err()); + assert!(results[1].is_err()); + } + + #[tokio::test] + async fn test_batch_fetch_empty_input() { + let results = batch_fetch(vec![], None).await; + assert!(results.is_empty()); + } } diff --git a/crates/fetchkit/src/lib.rs b/crates/fetchkit/src/lib.rs index a820f8c..e9f7723 100644 --- a/crates/fetchkit/src/lib.rs +++ b/crates/fetchkit/src/lib.rs @@ -84,7 +84,7 @@ pub mod file_saver; mod tool; mod types; -pub use client::{fetch, fetch_with_options, FetchOptions}; +pub use client::{batch_fetch, batch_fetch_with_options, fetch, fetch_with_options, FetchOptions}; pub use convert::{ extract_headings, extract_metadata, html_to_markdown, html_to_text, strip_boilerplate, };