Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 191 additions & 0 deletions crates/fetchkit/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,91 @@ pub async fn fetch_with_options(
registry.fetch(req, options).await
}

/// Default concurrency limit for batch fetching.
const DEFAULT_BATCH_CONCURRENCY: usize = 5;

/// Fetch multiple URLs concurrently.
///
/// Each URL is processed independently — one failure doesn't affect others.
/// Results are returned in the same order as the input requests.
/// Concurrency is limited to `concurrency` (default: 5).
///
/// # Examples
///
/// ```no_run
/// use fetchkit::{FetchRequest, batch_fetch};
///
/// # async fn example() -> Result<(), fetchkit::FetchError> {
/// let requests = vec![
/// FetchRequest::new("https://example.com").as_markdown(),
/// FetchRequest::new("https://example.org").as_markdown(),
/// ];
/// let results = batch_fetch(requests, None).await;
/// for result in &results {
/// match result {
/// Ok(resp) => println!("{}: {}", resp.url, resp.status_code),
/// Err(e) => println!("Error: {}", e),
/// }
/// }
/// # Ok(())
/// # }
/// ```
pub async fn batch_fetch(
requests: Vec<FetchRequest>,
concurrency: Option<usize>,
) -> Vec<Result<FetchResponse, FetchError>> {
let options = FetchOptions {
enable_markdown: true,
enable_text: true,
..Default::default()
};
batch_fetch_with_options(requests, options, concurrency).await
}

/// Fetch multiple URLs concurrently with custom options.
///
/// Each URL is processed independently with the shared options.
/// Returns results in the same order as input requests.
pub async fn batch_fetch_with_options(
requests: Vec<FetchRequest>,
options: FetchOptions,
concurrency: Option<usize>,
) -> Vec<Result<FetchResponse, FetchError>> {
use futures::stream::{self, StreamExt};
use std::sync::Arc;

let concurrency = concurrency.unwrap_or(DEFAULT_BATCH_CONCURRENCY).max(1);
let num_requests = requests.len();
let options = Arc::new(options);

let mut indexed_results: Vec<(usize, Result<FetchResponse, FetchError>)> =
stream::iter(requests.into_iter().enumerate())
.map(|(idx, req)| {
let options = Arc::clone(&options);
async move {
let registry = FetcherRegistry::with_defaults();
let result = registry.fetch(req, (*options).clone()).await;
(idx, result)
}
})
.buffer_unordered(concurrency)
.collect()
.await;

// Sort by original index to preserve request order
indexed_results.sort_by_key(|(idx, _)| *idx);

// Pre-fill with errors, then replace with actual results
let mut results: Vec<Result<FetchResponse, FetchError>> = (0..num_requests)
.map(|_| Err(FetchError::MissingUrl))
.collect();
for (idx, result) in indexed_results {
results[idx] = result;
}

results
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -229,4 +314,110 @@ mod tests {
Err(FetchError::BlockedUrl)
));
}

#[tokio::test]
async fn test_batch_fetch_multiple_urls() {
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};

let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/page1"))
.respond_with(
ResponseTemplate::new(200)
.set_body_string("Page 1")
.insert_header("content-type", "text/plain"),
)
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path("/page2"))
.respond_with(
ResponseTemplate::new(200)
.set_body_string("Page 2")
.insert_header("content-type", "text/plain"),
)
.mount(&server)
.await;

let requests = vec![
FetchRequest::new(format!("{}/page1", server.uri())),
FetchRequest::new(format!("{}/page2", server.uri())),
];

let options = FetchOptions {
enable_markdown: true,
dns_policy: DnsPolicy::allow_all(),
..Default::default()
};
let results = batch_fetch_with_options(requests, options, None).await;

assert_eq!(results.len(), 2);
assert!(results[0]
.as_ref()
.unwrap()
.content
.as_deref()
.unwrap()
.contains("Page 1"));
assert!(results[1]
.as_ref()
.unwrap()
.content
.as_deref()
.unwrap()
.contains("Page 2"));
}

#[tokio::test]
async fn test_batch_fetch_partial_failure() {
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, ResponseTemplate};

let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/ok"))
.respond_with(
ResponseTemplate::new(200)
.set_body_string("OK")
.insert_header("content-type", "text/plain"),
)
.mount(&server)
.await;

let requests = vec![
FetchRequest::new(format!("{}/ok", server.uri())),
FetchRequest::new(""), // Will fail with MissingUrl
];

let options = FetchOptions {
dns_policy: DnsPolicy::allow_all(),
..Default::default()
};
let results = batch_fetch_with_options(requests, options, None).await;

assert_eq!(results.len(), 2);
assert!(results[0].is_ok());
assert!(results[1].is_err());
}

#[tokio::test]
async fn test_batch_fetch_respects_concurrency_limit() {
// Just verify it works with concurrency=1
let requests = vec![
FetchRequest::new(""), // Will fail
FetchRequest::new(""), // Will fail
];

let results = batch_fetch(requests, Some(1)).await;
assert_eq!(results.len(), 2);
assert!(results[0].is_err());
assert!(results[1].is_err());
}

#[tokio::test]
async fn test_batch_fetch_empty_input() {
let results = batch_fetch(vec![], None).await;
assert!(results.is_empty());
}
}
2 changes: 1 addition & 1 deletion crates/fetchkit/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ pub mod file_saver;
mod tool;
mod types;

pub use client::{fetch, fetch_with_options, FetchOptions};
pub use client::{batch_fetch, batch_fetch_with_options, fetch, fetch_with_options, FetchOptions};
pub use convert::{
extract_headings, extract_metadata, html_to_markdown, html_to_text, strip_boilerplate,
};
Expand Down
Loading