diff --git a/oxen-rust/crates/cli/src/cmd/clone.rs b/oxen-rust/crates/cli/src/cmd/clone.rs index b4b3735d8..4c8e267ad 100644 --- a/oxen-rust/crates/cli/src/cmd/clone.rs +++ b/oxen-rust/crates/cli/src/cmd/clone.rs @@ -167,10 +167,14 @@ impl RunCmd for CloneCmd { is_remote, }; - let (scheme, host) = api::client::get_scheme_and_host_from_url(&opts.url)?; + let (scheme, host) = { + let hn = api::client::hostname_from_url_str(&opts.url)?; + let hostname = hn.hostname(); + (hn.scheme, hostname) + }; // TODO: Do I need to worry about this for remote repo? - check_remote_version_blocking(scheme.clone(), host.clone()).await?; + check_remote_version_blocking(&scheme, &host).await?; check_remote_version(scheme, host).await?; repositories::clone(&opts).await?; diff --git a/oxen-rust/crates/cli/src/helpers.rs b/oxen-rust/crates/cli/src/helpers.rs index 58ac02795..0372f5a24 100644 --- a/oxen-rust/crates/cli/src/helpers.rs +++ b/oxen-rust/crates/cli/src/helpers.rs @@ -30,12 +30,14 @@ pub fn get_scheme_and_host_or_default() -> Result<(String, String), OxenError> { pub fn get_scheme_and_host_from_repo( repo: &LocalRepository, ) -> Result<(String, String), OxenError> { - if let Some(remote) = repo.remote() { - let host_and_scheme = api::client::get_scheme_and_host_from_url(&remote.url)?; - return Ok(host_and_scheme); + match repo.remote() { + Some(remote) => { + let hn = api::client::hostname_from_url_str(&remote.url)?; + let hostname = hn.hostname(); + Ok((hn.scheme, hostname)) + } + None => get_scheme_and_host_or_default(), } - - get_scheme_and_host_or_default() } pub async fn check_remote_version( diff --git a/oxen-rust/crates/lib/src/api/client.rs b/oxen-rust/crates/lib/src/api/client.rs index 447a9e027..a2eb32b69 100644 --- a/oxen-rust/crates/lib/src/api/client.rs +++ b/oxen-rust/crates/lib/src/api/client.rs @@ -6,11 +6,10 @@ use crate::config::RuntimeConfig; use crate::config::runtime_config::runtime::Runtime; use crate::constants; use crate::error::OxenError; -use crate::model::RemoteRepository; use crate::view::OxenResponse; use crate::view::http; +use reqwest::IntoUrl; pub use reqwest::Url; -use reqwest::retry; use reqwest::{Client, ClientBuilder, header}; use std::time; @@ -30,6 +29,7 @@ pub mod metadata; pub mod oxen_version; pub mod prune; pub mod repositories; +pub mod retry; pub mod revisions; pub mod schemas; pub mod stats; @@ -40,101 +40,131 @@ pub mod workspaces; const VERSION: &str = crate::constants::OXEN_VERSION; const USER_AGENT: &str = "Oxen"; -pub fn get_scheme_and_host_from_url(url: &str) -> Result<(String, String), OxenError> { - let parsed_url = Url::parse(url)?; - let mut host_str = parsed_url.host_str().unwrap_or_default().to_string(); - if let Some(port) = parsed_url.port() { - host_str = format!("{host_str}:{port}"); +/// Parsed URL components with port-aware hostname. +pub struct Hostname { + pub host: String, + pub port: Option, + pub scheme: String, +} + +impl Hostname { + /// Returns `host:port` when a port is present, otherwise just `host`. + pub fn hostname(&self) -> String { + match self.port { + Some(port) => format!("{}:{}", self.host, port), + None => self.host.clone(), + } + } + + /// Extract scheme, host, and port from a `Url`. + pub fn from_url(url: &Url) -> Result { + let Some(host) = url.host_str() else { + return Err(OxenError::NoHost(url.to_string().into())); + }; + Ok(Self { + host: host.to_string(), + port: url.port(), + scheme: url.scheme().to_string(), + }) } - Ok((parsed_url.scheme().to_owned(), host_str)) } -// TODO: we probably want to create a pool of clients instead of constructing a -// new one for each request so we can take advantage of keep-alive -pub fn new_for_url(url: &str) -> Result { - let (_scheme, host) = get_scheme_and_host_from_url(url)?; - new_for_host(host, true) +/// Parse a URL string into a `Hostname` (scheme + host + optional port). +pub fn hostname_from_url_str(url_str: &str) -> Result { + let url: url::Url = url_str.parse()?; + Hostname::from_url(&url) } -pub fn new_for_url_no_user_agent(url: &str) -> Result { - let (_scheme, host) = get_scheme_and_host_from_url(url)?; - new_for_host(host, false) +// Note: reqwest::Client already maintains an internal HTTP connection pool with keep-alive. +// The 3 upload paths that matter most (push, version chunks, workspace files) already share +// clients via Arc. A global per-host cache (OnceCell + RwLock) would add +// complexity for little gain since per-request client overhead for metadata/query paths is minimal. +pub fn new_for_url(url: U) -> Result { + let url = url.into_url()?; + let hn = Hostname::from_url(&url)?; + new_for_host(&hn.hostname(), true) } -fn new_for_host(host: String, should_add_user_agent: bool) -> Result { - match builder_for_host(host, should_add_user_agent)? - .timeout(time::Duration::from_secs(constants::timeout())) - .build() - { - Ok(client) => Ok(client), - Err(reqwest_err) => Err(OxenError::HTTP(reqwest_err)), - } +pub fn new_for_url_no_user_agent(url: U) -> Result { + let url = url.into_url()?; + let hn = Hostname::from_url(&url)?; + new_for_host(&hn.hostname(), false) } -pub fn new_for_remote_repo(remote_repo: &RemoteRepository) -> Result { - let (_scheme, host) = get_scheme_and_host_from_url(remote_repo.url())?; - new_for_host(host, true) +/// Has connection timeout and TCP keep-alives, but also imposes a per-request timeout. +/// NOT SUITABLE FOR LONG-LIVED TRANSFERS! Use `new_for_url_transfer` instead. +fn new_for_host(host: &str, should_add_user_agent: bool) -> Result { + builder_for_host( + host, + should_add_user_agent, + time::Duration::from_secs(constants::connect_timeout()), + time::Duration::from_secs(constants::tcp_keepalive()), + time::Duration::from_secs(20), + )? + .timeout(time::Duration::from_secs(constants::timeout())) + .build() + .map_err(OxenError::HTTP) } -pub fn builder_for_remote_repo(remote_repo: &RemoteRepository) -> Result { - let (_scheme, host) = get_scheme_and_host_from_url(remote_repo.url())?; - builder_for_host(host, true) +/// Create a client for long-lived transfers (uploads/downloads). +/// No overall timeout; uses connect_timeout + tcp_keepalive + HTTP/2 keep-alive +/// to detect hung connections without capping total transfer time. +pub fn new_for_url_transfer(url: U) -> Result { + let url = url.into_url()?; + let hn = Hostname::from_url(&url)?; + new_for_host_transfer(&hn.hostname()) } -pub fn builder_for_url(url: &str) -> Result { - let (_scheme, host) = get_scheme_and_host_from_url(url)?; - builder_for_host(host, true) +pub(crate) fn new_for_host_transfer(host: &str) -> Result { + builder_for_host( + host, + true, + time::Duration::from_secs(constants::connect_timeout()), + time::Duration::from_secs(constants::tcp_keepalive()), + time::Duration::from_secs(20), + )? + .build() + .map_err(OxenError::HTTP) } -fn builder_for_host(host: String, should_add_user_agent: bool) -> Result { +fn builder_for_host( + host: &str, + should_add_user_agent: bool, + connect_timeout: time::Duration, + keep_alive_interval: time::Duration, + http2_keep_alive_timeout: time::Duration, +) -> Result { let mut builder = Client::builder(); + if should_add_user_agent { let config = RuntimeConfig::get()?; - builder = builder.user_agent(build_user_agent(&config)); + let user_agent = build_user_agent(&config)?; + builder = builder.user_agent(user_agent); } - // Bump max retries for this oxen-server host from 2 to 3. Exponential backoff is used by default. - let retry_policy = retry::for_host(host.clone()) - .max_retries_per_request(3) - .classify_fn(|req_rep| { - // Still retry on low-level network errors - if req_rep.error().is_some() { - return req_rep.retryable(); - } - // Have reqwest retry all application-level server errors*, not just network-level errors - // that reqwest considers retryable by default. This assumes that oxen-server endpoints are - // safe to retry if the server returned any error mid-operation. We can tighten this up - // to only retry specific server errors in the future if that is not true. - // - // * info (100's), success (200's), redirection (300's), and client errors (400's) - // don't make sense to retry. We'll only retry server errors (500's). - match req_rep.status() { - Some(status_code) if status_code.is_server_error() => req_rep.retryable(), // retry - _ => req_rep.success(), // this means don't retry, and is the only other valid return value from the closure - } - }); - builder = builder.retry(retry_policy); + builder = builder + .connect_timeout(connect_timeout) + .tcp_keepalive(keep_alive_interval) + .http2_keep_alive_interval(keep_alive_interval) + .http2_keep_alive_timeout(http2_keep_alive_timeout); // If auth_config.toml isn't found, return without authorizing let config = match AuthConfig::get() { Ok(config) => config, Err(e) => { - log::debug!( - "Error getting config: {}. No auth token found for host {}", - e, - host - ); + log::debug!("Error getting config: {e}. No auth configuration found!"); return Ok(builder); } }; - if let Some(auth_token) = config.auth_token_for_host(host.as_str()) { + + if let Some(auth_token) = config.auth_token_for_host(host) { log::debug!("Setting auth token for host: {}", host); let auth_header = format!("Bearer {auth_token}"); let mut auth_value = match header::HeaderValue::from_str(auth_header.as_str()) { Ok(header) => header, Err(e) => { log::debug!("Invalid header value: {e}"); - return Err(OxenError::basic_str( + return Err(OxenError::authentication( "Error setting request auth. Please check your Oxen config.", )); } @@ -142,14 +172,16 @@ fn builder_for_host(host: String, should_add_user_agent: bool) -> Result String { +fn build_user_agent(config: &RuntimeConfig) -> Result { let host_platform = config.host_platform.display_name(); let runtime_name = match config.runtime_name { Runtime::CLI => config.runtime_name.display_name().to_string(), @@ -159,7 +191,9 @@ fn build_user_agent(config: &RuntimeConfig) -> String { config.runtime_version ), }; - format!("{USER_AGENT}/{VERSION} ({host_platform}; {runtime_name})") + Ok(format!( + "{USER_AGENT}/{VERSION} ({host_platform}; {runtime_name})" + )) } /// Performs an extra parse to validate that the response is success diff --git a/oxen-rust/crates/lib/src/api/client/commits.rs b/oxen-rust/crates/lib/src/api/client/commits.rs index 66e2741ce..c6d5cb8ab 100644 --- a/oxen-rust/crates/lib/src/api/client/commits.rs +++ b/oxen-rust/crates/lib/src/api/client/commits.rs @@ -1,4 +1,4 @@ -use crate::api::client; +use crate::api::client::{self, Hostname}; use crate::constants::{DEFAULT_PAGE_NUM, DIR_HASHES_DIR, DIRS_DIR, HISTORY_DIR}; use crate::error::OxenError; @@ -31,7 +31,6 @@ use flate2::write::GzEncoder; use futures_util::TryStreamExt; use http::header::CONTENT_LENGTH; use indicatif::{ProgressBar, ProgressStyle}; - pub struct ChunkParams { pub chunk_num: usize, pub total_chunks: usize, @@ -719,7 +718,10 @@ pub async fn post_commit_dir_hashes_to_server( let quiet_bar = Arc::new(ProgressBar::hidden()); - let client = client::new_for_remote_repo(remote_repo)?; + let client = { + let hn = Hostname::from_url(&remote_repo.url().parse()?)?; + client::new_for_host_transfer(&hn.hostname())? + }; post_data_to_server_with_client( &client, remote_repo, @@ -763,18 +765,18 @@ pub async fn post_commits_dir_hashes_to_server( let buffer: Vec = tar.into_inner()?.finish()?; - let is_compressed = true; - let filename = None; - let quiet_bar = Arc::new(ProgressBar::hidden()); - let client = client::new_for_remote_repo(remote_repo)?; + let client = { + let hn = Hostname::from_url(&remote_repo.url().parse()?)?; + client::new_for_host_transfer(&hn.hostname())? + }; post_data_to_server_with_client( &client, remote_repo, buffer, - is_compressed, - &filename, + true, // compression + &None, // filename quiet_bar, ) .await @@ -838,33 +840,15 @@ pub async fn upload_single_tarball_to_server_with_client_with_retry( buffer: &[u8], bar: Arc, ) -> Result<(), OxenError> { - let mut total_tries = 0; - - while total_tries < constants::NUM_HTTP_RETRIES { - match upload_single_tarball_to_server_with_client( - client, - remote_repo, - buffer, - bar.to_owned(), - ) - .await - { - Ok(_) => { - return Ok(()); - } - Err(err) => { - total_tries += 1; - // Exponentially back off - let sleep_time = total_tries * total_tries; - log::debug!( - "upload_single_tarball_to_server_with_retry upload failed sleeping {sleep_time}: {err:?}" - ); - std::thread::sleep(std::time::Duration::from_secs(sleep_time)); - } + let config = crate::api::client::retry::RetryConfig::default(); + crate::api::client::retry::with_retry(&config, |_attempt| { + let bar = bar.clone(); + async move { + upload_single_tarball_to_server_with_client(client, remote_repo, buffer, bar).await?; + Ok(()) } - } - - Err(OxenError::basic_str("Upload retry failed.")) + }) + .await } async fn upload_single_tarball_to_server_with_client( @@ -952,10 +936,9 @@ pub async fn upload_data_chunk_to_server_with_retry( is_compressed: bool, filename: &Option, ) -> Result<(), OxenError> { - let mut total_tries = 0; - let mut last_error = String::from(""); - while total_tries < constants::NUM_HTTP_RETRIES { - match upload_data_chunk_to_server( + let config = crate::api::client::retry::RetryConfig::default(); + crate::api::client::retry::with_retry(&config, |_attempt| async move { + upload_data_chunk_to_server( client, remote_repo, chunk, @@ -964,27 +947,10 @@ pub async fn upload_data_chunk_to_server_with_retry( is_compressed, filename, ) - .await - { - Ok(_) => { - return Ok(()); - } - Err(err) => { - total_tries += 1; - // Exponentially back off - let sleep_time = total_tries * total_tries; - log::debug!( - "upload_data_chunk_to_server_with_retry upload failed sleeping {sleep_time}: {err}" - ); - last_error = format!("{err}"); - std::thread::sleep(std::time::Duration::from_secs(sleep_time)); - } - } - } - - Err(OxenError::basic_str(format!( - "Upload chunk retry failed. {last_error}" - ))) + .await?; + Ok(()) + }) + .await } async fn upload_data_chunk_to_server( diff --git a/oxen-rust/crates/lib/src/api/client/compare.rs b/oxen-rust/crates/lib/src/api/client/compare.rs index 2520f9914..0752155cf 100644 --- a/oxen-rust/crates/lib/src/api/client/compare.rs +++ b/oxen-rust/crates/lib/src/api/client/compare.rs @@ -396,8 +396,7 @@ mod tests { let mut commit_ids = Vec::new(); // Create 5 commits with 5 new directories - let total_dirs = 5; - for i in 0..total_dirs { + for i in 0..5 { // Write a file let dir_path = format!("dir_{i}"); let file_path = PathBuf::from(dir_path).join(format!("file_{i}.txt")); @@ -429,12 +428,16 @@ mod tests { let results = api::client::compare::dir_tree(&remote_repo, &base_commit_id, &head_commit_id) .await?; - println!("results: {results:?}"); - assert_eq!(results.len(), 1); + assert_eq!(results.len(), 1, "results: {results:?}"); let first = results.first().unwrap(); - assert_eq!(first.name, PathBuf::from("")); - assert_eq!(first.status, DiffEntryStatus::Modified); - assert_eq!(first.children.len(), 2); + assert_eq!(first.name, PathBuf::from(""), "{:?}", first.name); + assert_eq!( + first.status, + DiffEntryStatus::Modified, + "{:?}", + first.status + ); + assert_eq!(first.children.len(), 2, "{:?}", first.children); Ok(remote_repo) }) .await diff --git a/oxen-rust/crates/lib/src/api/client/file.rs b/oxen-rust/crates/lib/src/api/client/file.rs index 0345e3255..adc2009ce 100644 --- a/oxen-rust/crates/lib/src/api/client/file.rs +++ b/oxen-rust/crates/lib/src/api/client/file.rs @@ -1,9 +1,11 @@ -use crate::api; use crate::api::client; +use crate::api::client::Hostname; +use crate::api::client::retry; use crate::error::OxenError; use crate::model::RemoteRepository; use crate::model::commit::NewCommitBody; use crate::view::CommitResponse; +use crate::{api, util::internal_types::HasLen}; use bytes::{Bytes, BytesMut}; use futures_util::StreamExt; @@ -22,11 +24,12 @@ pub async fn put_file( let directory = directory.as_ref(); put_multipart_file( remote_repo, - format!("/file/{branch}/{directory}"), + &format!("/file/{branch}/{directory}"), "files[]", - file_path, - file_name, - commit_body, + file_path.as_ref(), + file_name.as_ref().map(|s| s.as_ref()), + commit_body.as_ref(), + &retry::RetryConfig::default(), ) .await } @@ -43,53 +46,81 @@ pub async fn put_file_to_path( let file_path_on_repo = file_path_on_repo.as_ref(); put_multipart_file( remote_repo, - format!("/file/{branch}/{file_path_on_repo}"), + &format!("/file/{branch}/{file_path_on_repo}"), "file", - file_path, - file_name, - commit_body, + file_path.as_ref(), + file_name.as_ref().map(|s| s.as_ref()), + commit_body.as_ref(), + &retry::RetryConfig::default(), ) .await } - async fn put_multipart_file( remote_repo: &RemoteRepository, - uri: String, + uri: &str, field_name: &'static str, - file_path: impl AsRef, - file_name: Option>, - commit_body: Option, + file_path: &Path, + file_name: Option<&str>, + commit_body: Option<&NewCommitBody>, + config: &retry::RetryConfig, ) -> Result { - let file_path = file_path.as_ref(); log::debug!("put_multipart_file {uri:?}, file_path {file_path:?}"); - let url = api::endpoint::url_from_repo(remote_repo, &uri)?; - let client = client::new_for_url(&url)?; - let file_part = make_file_part(file_path, file_name).await?; - let form = apply_commit_body(Form::new().part(field_name, file_part), commit_body); - let res = client.put(&url).multipart(form).send().await?; - let body = client::parse_json_body(&url, res).await?; - Ok(serde_json::from_str(&body)?) + let client = { + let hn = Hostname::from_url(&remote_repo.url().parse()?)?; + client::new_for_host_transfer(&hn.hostname())? + }; + + let url = api::endpoint::url_from_repo(remote_repo, uri)?; + + let file_data = bytes::Bytes::from(tokio::fs::read(file_path).await?); + + retry::with_retry(config, |_attempt| { + let file_data = file_data.clone(); // cloning is cheap: it's essentially an Arc<[u8]> + let client = client.clone(); // HTTP client is also an Arc + let url = url.clone(); // it's just the length + pointer + + async move { + let file_part = make_file_part(file_data, file_name).await?; + let form = apply_commit_body(Form::new().part(field_name, file_part), commit_body); + + let res = client.put(&url).multipart(form).send().await?; + let body = client::parse_json_body(&url, res).await?; + + Ok(serde_json::from_str(&body)?) + } + }) + .await } -async fn make_file_part( - file_path: &Path, - file_name: Option>, +/// Create a Part in a multipart Form from the specified data. +/// Intended to be used for uploading a single file. +async fn make_file_part + HasLen>( + file_data: T, + file_name: Option<&str>, ) -> Result { - let file_part = Part::file(file_path).await?; + let file_data_len = file_data.len() as u64; + let file_part = reqwest::multipart::Part::stream_with_length(file_data, file_data_len); Ok(match file_name { - Some(file_name) => file_part.file_name(file_name.as_ref().to_string()), + Some(file_name) => file_part.file_name(file_name.to_string()), None => file_part, }) } -fn apply_commit_body(mut form: Form, commit_body: Option) -> Form { - if let Some(body) = commit_body { - form = form.text("name", body.author); - form = form.text("email", body.email); - form = form.text("message", body.message); +/// Add the commit information as fields to the form. +fn apply_commit_body(form: Form, commit_body: Option<&NewCommitBody>) -> Form { + if let Some(NewCommitBody { + message, + author, + email, + }) = commit_body + { + form.text("name", author.to_string()) + .text("email", email.to_string()) + .text("message", message.to_string()) + } else { + form } - form } pub async fn get_file( diff --git a/oxen-rust/crates/lib/src/api/client/import.rs b/oxen-rust/crates/lib/src/api/client/import.rs index 1611db20d..6f59e329e 100644 --- a/oxen-rust/crates/lib/src/api/client/import.rs +++ b/oxen-rust/crates/lib/src/api/client/import.rs @@ -1,5 +1,6 @@ use crate::api; use crate::api::client; +use crate::api::client::retry; use crate::error::OxenError; use crate::model::RemoteRepository; @@ -18,42 +19,80 @@ pub async fn upload_zip( let branch_name = branch_name.as_ref(); let directory = directory.as_ref(); let zip_path = zip_path.as_ref(); + let name = name.as_ref(); let email = email.as_ref(); - // Read the ZIP file - let zip_data = std::fs::read(zip_path)?; + // Read the ZIP file into memory once for potential retries + // NOTE: bytes::Bytes wraps the Vec into an Arc internally. + // We need to move into `reqwest.multipart::Part::bytes`, but with retries, we don't want + // to actually clone the data. This wrapper makes it cheap to clone as we're just + // incrementing the arc's internal reference count. + let zip_data = bytes::Bytes::from(tokio::fs::read(zip_path).await?); + let len_zip_data = zip_data.len() as u64; let file_name = zip_path .file_name() .ok_or_else(|| OxenError::basic_str("Invalid ZIP file path"))? - .to_string_lossy(); + .to_string_lossy() + .to_string(); - // Create the URL for workspace ZIP upload endpoint let uri = format!("/import/upload/{branch_name}/{directory}"); let url = api::endpoint::url_from_repo(remote_repo, &uri)?; + let commit_msg = commit_message.map(|m| m.as_ref().to_string()); + + let config = retry::RetryConfig::default(); + retry::with_retry(&config, |_attempt| { + let url = url.clone(); + let zip_data = zip_data.clone(); + let file_name = file_name.clone(); + let name = name.to_string(); + let email = email.to_string(); + let directory = directory.to_string(); + let commit_msg = commit_msg.clone(); + async move { + let form = make_multipart_form( + zip_data, + len_zip_data, + file_name, + name, + email, + directory, + commit_msg, + ); + + let client = client::new_for_url_transfer(&url)?; + let response = client.post(&url).multipart(form).send().await?; + let body = client::parse_json_body(&url, response).await?; + let response: crate::view::CommitResponse = serde_json::from_str(&body) + .map_err(|e| OxenError::basic_str(format!("Failed to parse response: {e}")))?; + Ok(response.commit) + } + }) + .await +} + +fn make_multipart_form>( + zip_data: T, + len_zip_data: u64, + file_name: String, + name: String, + email: String, + directory: String, + commit_msg: Option, +) -> reqwest::multipart::Form { + let file_part = + reqwest::multipart::Part::stream_with_length(zip_data, len_zip_data).file_name(file_name); - // Create multipart form - let file_part = reqwest::multipart::Part::bytes(zip_data).file_name(file_name.to_string()); let mut form = reqwest::multipart::Form::new() .part("file", file_part) - .text("name", name.to_string()) - .text("email", email.to_string()) - .text("resource_path", directory.to_string()); + .text("name", name) + .text("email", email) + .text("resource_path", directory); - if let Some(msg) = commit_message { - form = form.text("commit_message", msg.as_ref().to_string()); + if let Some(msg) = commit_msg { + form = form.text("commit_message", msg); } - - // Send the request - let client = client::new_for_url(&url)?; - let response = client.post(&url).multipart(form).send().await?; - let body = client::parse_json_body(&url, response).await?; - - // Parse the response - let response: crate::view::CommitResponse = serde_json::from_str(&body) - .map_err(|e| OxenError::basic_str(format!("Failed to parse response: {e}")))?; - - Ok(response.commit) + form } #[cfg(test)] diff --git a/oxen-rust/crates/lib/src/api/client/repositories.rs b/oxen-rust/crates/lib/src/api/client/repositories.rs index e03a258df..5f118ece3 100644 --- a/oxen-rust/crates/lib/src/api/client/repositories.rs +++ b/oxen-rust/crates/lib/src/api/client/repositories.rs @@ -1,5 +1,6 @@ use crate::api; use crate::api::client; +use crate::api::client::Hostname; use crate::constants::{DEFAULT_HOST, DEFAULT_REMOTE_NAME, DEFAULT_SCHEME}; use crate::error::OxenError; use crate::model::file::{FileContents, FileNew}; @@ -420,14 +421,13 @@ pub async fn transfer_namespace( match response { Ok(response) => { - // Update remote to reflect new namespace - let (scheme, host) = api::client::get_scheme_and_host_from_url(&url)?; + let hn = Hostname::from_url(&url.parse()?)?; let new_remote_url = api::endpoint::remote_url_from_namespace_name_scheme( - &host, + &hn.hostname(), &response.repository.namespace, &repository.name, - &scheme, + &hn.scheme, ); let new_remote = Remote { url: new_remote_url, diff --git a/oxen-rust/crates/lib/src/api/client/retry.rs b/oxen-rust/crates/lib/src/api/client/retry.rs new file mode 100644 index 000000000..9c3da5a55 --- /dev/null +++ b/oxen-rust/crates/lib/src/api/client/retry.rs @@ -0,0 +1,130 @@ +use crate::constants; +use crate::error::OxenError; +use rand::Rng; +use std::future::Future; + +pub struct RetryConfig { + pub max_retries: usize, + pub base_wait_ms: u64, + pub max_wait_ms: u64, +} + +impl Default for RetryConfig { + fn default() -> Self { + Self { + max_retries: constants::max_retries(), + base_wait_ms: 300, + max_wait_ms: 10_000, + } + } +} + +/// True exponential backoff: base_wait_ms * 2^attempt + jitter, clamped to max_wait_ms +pub fn exponential_backoff(base_wait_ms: u64, attempt: usize, max_wait_ms: u64) -> u64 { + let exp = base_wait_ms.saturating_mul(1u64 << attempt.min(16)); + let jitter = rand::thread_rng().gen_range(0..=500u64); + exp.saturating_add(jitter).min(max_wait_ms) +} + +/// Retry an async operation with exponential backoff. +/// Authentication errors are not retried. +pub async fn with_retry(config: &RetryConfig, mut operation: F) -> Result +where + F: FnMut(usize) -> Fut, + Fut: Future>, +{ + let mut last_err = None; + for attempt in 0..=config.max_retries { + match operation(attempt).await { + Ok(val) => return Ok(val), + error @ Err(OxenError::Authentication(_)) => return error, + Err(err) => { + log::warn!( + "Retry attempt {}/{} failed: {err}", + attempt + 1, + config.max_retries + 1 + ); + last_err = Some(err); + if attempt < config.max_retries { + let wait = + exponential_backoff(config.base_wait_ms, attempt, config.max_wait_ms); + tokio::time::sleep(std::time::Duration::from_millis(wait)).await; + } + } + } + } + Err(last_err.unwrap_or_else(|| OxenError::basic_str("Retry failed with no attempts"))) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_exponential_backoff_increases() { + let b0 = exponential_backoff(300, 0, 100_000); + let b1 = exponential_backoff(300, 1, 100_000); + let b2 = exponential_backoff(300, 2, 100_000); + // With jitter, we can't be exact, but the base doubles each time + // 300*1=300, 300*2=600, 300*4=1200 (plus jitter 0..500) + assert!(b0 <= 800, "b0={b0}"); + assert!(b1 >= 300 && b1 <= 1100, "b1={b1}"); + assert!(b2 >= 900 && b2 <= 1700, "b2={b2}"); + } + + #[test] + fn test_exponential_backoff_clamps_to_max() { + let val = exponential_backoff(300, 20, 5000); + assert!(val <= 5000, "val={val}"); + } + + #[tokio::test] + async fn test_with_retry_immediate_success() { + let config = RetryConfig { + max_retries: 3, + base_wait_ms: 10, + max_wait_ms: 100, + }; + let result: Result = with_retry(&config, |_attempt| async { Ok(42) }).await; + assert_eq!(result.unwrap(), 42); + } + + #[tokio::test] + async fn test_with_retry_exhaustion() { + let config = RetryConfig { + max_retries: 2, + base_wait_ms: 10, + max_wait_ms: 50, + }; + let result: Result = with_retry(&config, |_attempt| async { + Err(OxenError::basic_str("always fails")) + }) + .await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn test_with_retry_succeeds_on_third_attempt() { + let config = RetryConfig { + max_retries: 3, + base_wait_ms: 10, + max_wait_ms: 50, + }; + let counter = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0)); + let counter_clone = counter.clone(); + let result: Result = with_retry(&config, move |_attempt| { + let c = counter_clone.clone(); + async move { + let prev = c.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + if prev < 2 { + Err(OxenError::basic_str("not yet")) + } else { + Ok(99) + } + } + }) + .await; + assert_eq!(result.unwrap(), 99); + assert_eq!(counter.load(std::sync::atomic::Ordering::SeqCst), 3); + } +} diff --git a/oxen-rust/crates/lib/src/api/client/tree.rs b/oxen-rust/crates/lib/src/api/client/tree.rs index eb1168590..88b9cdd28 100644 --- a/oxen-rust/crates/lib/src/api/client/tree.rs +++ b/oxen-rust/crates/lib/src/api/client/tree.rs @@ -6,7 +6,6 @@ use futures_util::TryStreamExt; use std::collections::HashSet; use std::path::PathBuf; use std::sync::Arc; -use std::time; use tempfile::TempDir; use crate::api::client; @@ -91,9 +90,7 @@ pub async fn create_nodes( // Upload the node let uri = "/tree/nodes".to_string(); let url = api::endpoint::url_from_repo(remote_repo, &uri)?; - let client = client::builder_for_url(&url)? - .timeout(time::Duration::from_secs(120)) - .build()?; + let client = client::new_for_url_transfer(&url)?; let size = buffer.len() as u64; log::debug!( @@ -357,9 +354,7 @@ async fn node_download_request( ) -> Result<(), OxenError> { let url = url.as_ref(); - let client = client::builder_for_url(url)? - .timeout(time::Duration::from_secs(12000)) - .build()?; + let client = client::new_for_url_transfer(url)?; log::debug!("node_download_request about to send request {url}"); let res = client.get(url).send().await?; let res = client::handle_non_json_response(url, res).await?; diff --git a/oxen-rust/crates/lib/src/api/client/versions.rs b/oxen-rust/crates/lib/src/api/client/versions.rs index e41025878..cbcaf4d59 100644 --- a/oxen-rust/crates/lib/src/api/client/versions.rs +++ b/oxen-rust/crates/lib/src/api/client/versions.rs @@ -1,6 +1,7 @@ use crate::api; -use crate::api::client; use crate::api::client::internal_types::LocalOrBase; +use crate::api::client::retry; +use crate::api::client::{self, Hostname}; use crate::constants::{AVG_CHUNK_SIZE, max_retries}; use crate::error::OxenError; use crate::model::entry::commit_entry::Entry; @@ -21,10 +22,6 @@ use futures_util::StreamExt; use futures_util::stream::FuturesUnordered; use http::Method; use http::header::CONTENT_LENGTH; -use rand::{Rng, thread_rng}; -use tokio_tar::Archive; -use tokio_util::codec::{BytesCodec, FramedRead}; - use std::collections::{HashMap, HashSet}; use std::io::{SeekFrom, Write}; use std::path::{Path, PathBuf}; @@ -34,6 +31,8 @@ use tokio::fs::OpenOptions; use tokio::io::{AsyncReadExt, AsyncSeekExt}; use tokio::sync::Semaphore; use tokio::time::sleep; +use tokio_tar::Archive; +use tokio_util::codec::{BytesCodec, FramedRead}; use crate::repositories; @@ -197,29 +196,15 @@ pub async fn download_data_from_version_paths( hashes: &[String], local_repo: &LocalRepository, ) -> Result { - let total_retries = max_retries().try_into().unwrap_or(max_retries() as u64); - let mut num_retries = 0; - - while num_retries < total_retries { + let config = retry::RetryConfig::default(); + retry::with_retry(&config, |_attempt| async { match try_download_data_from_version_paths(remote_repo, hashes, local_repo).await { - Ok(val) => return Ok(val), - Err(OxenError::Authentication(val)) => return Err(OxenError::Authentication(val)), - Err(err) => { - num_retries += 1; - // Exponentially back off - let sleep_time = num_retries * num_retries; - log::warn!("Could not download content {err:?} sleeping {sleep_time}"); - tokio::time::sleep(std::time::Duration::from_secs(sleep_time)).await; - } + Ok(val) => Ok(val), + Err(OxenError::Authentication(val)) => Err(OxenError::Authentication(val)), + Err(err) => Err(err), } - } - - let err = format!( - "Err: Failed to download {} files after {} retries", - hashes.len(), - total_retries - ); - Err(OxenError::basic_str(err)) + }) + .await } pub async fn try_download_data_from_version_paths( @@ -312,7 +297,10 @@ async fn upload_chunks( max_retries: usize, progress: Option<&Arc>, ) -> Result>, OxenError> { - let client = Arc::new(api::client::builder_for_remote_repo(remote_repo)?.build()?); + let client = { + let hn = Hostname::from_url(&remote_repo.url().parse()?)?; + Arc::new(client::new_for_host_transfer(&hn.hostname())?) + }; // Figure out how many parts we need to upload let file_size = upload.size; @@ -353,8 +341,8 @@ async fn upload_chunks( )) })?; - let wait_time = exponential_backoff(BASE_WAIT_TIME, i, MAX_WAIT_TIME); - sleep(Duration::from_millis(wait_time as u64)).await; + let wait_time = retry::exponential_backoff(BASE_WAIT_TIME as u64, i, MAX_WAIT_TIME as u64); + sleep(Duration::from_millis(wait_time)).await; chunk = upload_chunk(&client, &remote_repo, &upload, start, chunk_size).await; i += 1; @@ -485,30 +473,28 @@ pub async fn multipart_batch_upload_with_retry( chunk: &Vec, client: &reqwest::Client, ) -> Result<(), OxenError> { - let mut files_to_retry: Vec = vec![]; - let mut first_try = true; - let mut retry_count: usize = 0; let max_retries = max_retries(); + let mut files_to_retry: Vec = vec![]; - while (first_try || !files_to_retry.is_empty()) && retry_count < max_retries { - first_try = false; - retry_count += 1; - + for attempt in 0..max_retries { files_to_retry = multipart_batch_upload(local_repo, remote_repo, chunk, client, files_to_retry).await?; - if !files_to_retry.is_empty() { - let wait_time = exponential_backoff(BASE_WAIT_TIME, retry_count, MAX_WAIT_TIME); - sleep(Duration::from_millis(wait_time as u64)).await; + if files_to_retry.is_empty() { + return Ok(()); + } + + // Don't sleep after the last attempt + if attempt + 1 < max_retries { + let wait_time = + retry::exponential_backoff(BASE_WAIT_TIME as u64, attempt, MAX_WAIT_TIME as u64); + sleep(Duration::from_millis(wait_time)).await; } } - if files_to_retry.is_empty() { - Ok(()) - } else { - Err(OxenError::basic_str(format!( - "Failed to upload files: {files_to_retry:#?}" - ))) - } + + Err(OxenError::basic_str(format!( + "Failed to upload files: {files_to_retry:#?}" + ))) } pub async fn multipart_batch_upload( @@ -780,8 +766,12 @@ pub(crate) async fn workspace_multipart_batch_upload_parts_with_retry( }; if !files_to_retry.is_empty() { - let wait_time = exponential_backoff(BASE_WAIT_TIME, retry_count, MAX_WAIT_TIME); - sleep(Duration::from_millis(wait_time as u64)).await; + let wait_time = retry::exponential_backoff( + BASE_WAIT_TIME as u64, + retry_count, + MAX_WAIT_TIME as u64, + ); + sleep(Duration::from_millis(wait_time)).await; } } @@ -794,17 +784,6 @@ pub(crate) async fn workspace_multipart_batch_upload_parts_with_retry( Ok(upload_result.err_files) } -pub fn exponential_backoff(base_wait_time: usize, n: usize, max: usize) -> usize { - log::debug!( - "Exponential backoff got called with base_wait_time {base_wait_time}. n {n}, and max {max}" - ); - (base_wait_time + n.pow(2) + jitter()).min(max) -} - -fn jitter() -> usize { - thread_rng().gen_range(0..=500) -} - #[cfg(test)] mod tests { use std::path::PathBuf; diff --git a/oxen-rust/crates/lib/src/api/client/workspaces/files.rs b/oxen-rust/crates/lib/src/api/client/workspaces/files.rs index fa51b3e86..554d3cb97 100644 --- a/oxen-rust/crates/lib/src/api/client/workspaces/files.rs +++ b/oxen-rust/crates/lib/src/api/client/workspaces/files.rs @@ -1,5 +1,5 @@ -use crate::api::client; use crate::api::client::internal_types::LocalOrBase; +use crate::api::client::{self, Hostname}; use crate::constants::{chunk_size, max_retries}; use crate::core::progress::push_progress::PushProgress; use crate::error::OxenError; @@ -13,6 +13,7 @@ use bytesize::ByteSize; use futures_util::StreamExt; use glob_match::glob_match; +use futures::stream; use parking_lot::Mutex; use rand::{Rng, thread_rng}; use std::collections::HashSet; @@ -21,8 +22,6 @@ use std::sync::Arc; use tokio::io::AsyncWriteExt; use tokio::sync::mpsc; use tokio::time::{Duration, sleep}; - -use futures::stream; use tokio_stream::wrappers::ReceiverStream; use crate::util::hasher; @@ -447,8 +446,10 @@ pub(crate) async fn parallel_batched_small_file_upload( } } - // Create a client for uploading batches - let client = Arc::new(api::client::builder_for_remote_repo(remote_repo)?.build()?); + let client = { + let hn = Hostname::from_url(&remote_repo.url().parse()?)?; + Arc::new(client::new_for_host_transfer(&hn.hostname())?) + }; // For individual files let err_files: Arc>> = Arc::new(Mutex::new(vec![])); diff --git a/oxen-rust/crates/lib/src/config/auth_config.rs b/oxen-rust/crates/lib/src/config/auth_config.rs index 5a29c33f3..08b11a507 100644 --- a/oxen-rust/crates/lib/src/config/auth_config.rs +++ b/oxen-rust/crates/lib/src/config/auth_config.rs @@ -18,7 +18,7 @@ pub struct HostConfig { impl HostConfig { pub fn from_host(host: &str) -> HostConfig { HostConfig { - host: String::from(host), + host: host.to_string(), auth_token: None, } } @@ -126,8 +126,7 @@ impl AuthConfig { }); } - pub fn auth_token_for_host>(&self, host: S) -> Option { - let host = host.as_ref(); + pub fn auth_token_for_host(&self, host: &str) -> Option { if let Some(token) = self.host_configs.get(&HostConfig::from_host(host)) { if token.auth_token.is_none() { log::trace!("no auth_token found for host \"{}\"", token.host); diff --git a/oxen-rust/crates/lib/src/constants.rs b/oxen-rust/crates/lib/src/constants.rs index f5fe965b2..3980dad1e 100644 --- a/oxen-rust/crates/lib/src/constants.rs +++ b/oxen-rust/crates/lib/src/constants.rs @@ -193,7 +193,11 @@ pub const NUM_HTTP_RETRIES: u64 = 5; /// Number of workers pub const DEFAULT_NUM_WORKERS: usize = 8; /// Default timeout for HTTP requests -pub const DEFAULT_TIMEOUT_SECS: u64 = 600; +pub const DEFAULT_TIMEOUT_SECS: u64 = 120; +/// Default connect timeout for HTTP requests +pub const DEFAULT_CONNECT_TIMEOUT_SECS: u64 = 10; +/// Default TCP keep-alive interval +pub const DEFAULT_TCP_KEEPALIVE_SECS: u64 = 30; /// Default vnode size pub const DEFAULT_VNODE_SIZE: u64 = 10_000; @@ -270,3 +274,26 @@ pub fn chunk_size() -> u64 { AVG_CHUNK_SIZE } } + +// Parse the connect timeout from environment variable +pub fn connect_timeout() -> u64 { + if let Ok(val) = std::env::var("OXEN_CONNECT_TIMEOUT_SECS") + && let Ok(val) = val.parse::() + { + return val; + } + DEFAULT_CONNECT_TIMEOUT_SECS +} + +// Parse the TCP keep-alive interval from environment variable +pub fn tcp_keepalive() -> u64 { + if let Ok(val) = std::env::var("OXEN_TCP_KEEPALIVE_SECS") + && let Ok(val) = val.parse::() + { + return val; + } + DEFAULT_TCP_KEEPALIVE_SECS +} + +// Oxen request Id +pub const OXEN_REQUEST_ID: &str = "x-oxen-request-id"; diff --git a/oxen-rust/crates/lib/src/core/v_latest/push.rs b/oxen-rust/crates/lib/src/core/v_latest/push.rs index 40016f514..720193e4e 100644 --- a/oxen-rust/crates/lib/src/core/v_latest/push.rs +++ b/oxen-rust/crates/lib/src/core/v_latest/push.rs @@ -716,7 +716,7 @@ async fn bundle_and_send_small_entries( } // Create a client for uploading chunks - let client = Arc::new(api::client::builder_for_remote_repo(remote_repo)?.build()?); + let client = Arc::new(api::client::new_for_url_transfer(remote_repo.url())?); // Split into chunks, zip up, and post to server use tokio::time::sleep; diff --git a/oxen-rust/crates/lib/src/error.rs b/oxen-rust/crates/lib/src/error.rs index cfafd21d6..46c6a41ad 100644 --- a/oxen-rust/crates/lib/src/error.rs +++ b/oxen-rust/crates/lib/src/error.rs @@ -77,6 +77,7 @@ pub enum OxenError { ResourceNotFound(StringError), PathDoesNotExist(Box), ParsedResourceNotFound(Box), + NoHost(StringError), // Versioning MigrationRequired(StringError), @@ -149,6 +150,7 @@ impl fmt::Display for OxenError { OxenError::OxenUpdateRequired(err) | OxenError::Basic(err) | OxenError::ThumbnailingNotEnabled(err) => write!(f, "{err}"), + Self::NoHost(url) => write!(f, "No host in url: {url}"), OxenError::InvalidRepoName(name) => write!( f, "Invalid repository or namespace name '{name}'. Must match [a-zA-Z0-9][a-zA-Z0-9_.-]+" diff --git a/oxen-rust/crates/lib/src/model/repository/remote_repository.rs b/oxen-rust/crates/lib/src/model/repository/remote_repository.rs index 35c9e0a3d..308d900ae 100644 --- a/oxen-rust/crates/lib/src/model/repository/remote_repository.rs +++ b/oxen-rust/crates/lib/src/model/repository/remote_repository.rs @@ -1,9 +1,9 @@ use crate::api; +use crate::api::client::Hostname; use crate::core::versions::MinOxenVersion; use crate::view::RepositoryView; use crate::view::repository::{RepositoryCreationView, RepositoryDataTypesView}; use crate::{error::OxenError, model::Remote}; -use http::Uri; use serde::{Deserialize, Serialize}; #[derive(Deserialize, Serialize, Debug, Clone)] @@ -67,25 +67,22 @@ impl RemoteRepository { &self.remote.url } - /// Host of the remote repository - pub fn host(&self) -> String { - // parse it from the url - let uri = self.remote.url.parse::().unwrap(); - uri.host().unwrap().to_string() + /// Host of the remote repository (includes port if non-default) + pub fn host(&self) -> Result { + let hn = Hostname::from_url(&self.remote.url.parse()?)?; + Ok(hn.hostname()) } - /// Host of the remote repository - pub fn port(&self) -> String { - // parse it from the url - let uri = self.remote.url.parse::().unwrap(); - uri.port().unwrap().to_string() + /// Port of the remote repository + pub fn port(&self) -> Result, OxenError> { + let hn = Hostname::from_url(&self.remote.url.parse()?)?; + Ok(hn.port) } /// Scheme of the remote repository - pub fn scheme(&self) -> String { - // parse it from the url - let uri = self.remote.url.parse::().unwrap(); - uri.scheme().unwrap().to_string() + pub fn scheme(&self) -> Result { + let hn = Hostname::from_url(&self.remote.url.parse()?)?; + Ok(hn.scheme) } /// Underlying api url for the remote repository diff --git a/oxen-rust/crates/lib/src/model/repository/repo_new.rs b/oxen-rust/crates/lib/src/model/repository/repo_new.rs index c6e05c960..64f89506f 100644 --- a/oxen-rust/crates/lib/src/model/repository/repo_new.rs +++ b/oxen-rust/crates/lib/src/model/repository/repo_new.rs @@ -1,7 +1,7 @@ -use http::Uri; use serde::{Deserialize, Serialize}; use utoipa::ToSchema; +use crate::api::client::Hostname; use crate::constants::DEFAULT_HOST; use crate::error::OxenError; use crate::model::commit::Commit; @@ -167,13 +167,15 @@ impl RepoNew { } pub fn from_url(url: &str) -> Result { - let uri = url.parse::()?; - let mut split_path: Vec<&str> = uri.path().split('/').collect(); + let parsed: url::Url = url.parse()?; + let mut split_path: Vec<&str> = parsed.path().split('/').collect(); if split_path.len() < 3 { return Err(OxenError::basic_str("Invalid repo url")); } + let hn = Hostname::from_url(&parsed)?; + // Pop in reverse to get repo_name then namespace let repo_name = split_path.pop().unwrap(); let namespace = split_path.pop().unwrap(); @@ -181,8 +183,8 @@ impl RepoNew { namespace: namespace.to_string(), name: repo_name.to_string(), is_public: None, - host: Some(uri.host().unwrap().to_string()), - scheme: Some(uri.scheme().unwrap().to_string()), + host: Some(hn.hostname()), + scheme: Some(hn.scheme), root_commit: None, description: None, files: None, diff --git a/oxen-rust/crates/lib/src/repositories/workspaces/upload.rs b/oxen-rust/crates/lib/src/repositories/workspaces/upload.rs index 5ba34f790..434ac29b9 100644 --- a/oxen-rust/crates/lib/src/repositories/workspaces/upload.rs +++ b/oxen-rust/crates/lib/src/repositories/workspaces/upload.rs @@ -70,8 +70,8 @@ mod tests { let opts = UploadOpts { paths: vec![file.to_path_buf()], dst: Path::new("").to_path_buf(), - host: remote_repo.host(), - scheme: remote_repo.scheme(), + host: remote_repo.host()?, + scheme: remote_repo.scheme()?, remote: remote_repo.name.clone(), branch: None, message: "adding new file".to_string(), @@ -125,8 +125,8 @@ mod tests { let opts = UploadOpts { paths: vec![file.to_path_buf()], dst: Path::new("test").join("ing").join("data").to_path_buf(), - host: remote_repo.host(), - scheme: remote_repo.scheme(), + host: remote_repo.host()?, + scheme: remote_repo.scheme()?, remote: remote_repo.name.clone(), branch: None, message: "adding new file".to_string(), @@ -192,8 +192,8 @@ mod tests { let opts = UploadOpts { paths: vec![file.to_path_buf()], dst: Path::new("").to_path_buf(), - host: remote_repo.host(), - scheme: remote_repo.scheme(), + host: remote_repo.host()?, + scheme: remote_repo.scheme()?, remote: remote_repo.name.clone(), branch: Some(branch_name.clone()), message: "adding new file".to_string(), diff --git a/oxen-rust/crates/lib/src/test.rs b/oxen-rust/crates/lib/src/test.rs index d53d41ddf..66b224979 100644 --- a/oxen-rust/crates/lib/src/test.rs +++ b/oxen-rust/crates/lib/src/test.rs @@ -330,19 +330,13 @@ where version_store.init().await?; log::info!(">>>>> run_empty_local_repo_test_async running test"); - let result = match test(repo).await { - Ok(_) => true, - Err(err) => { - eprintln!("Error running test. Err: {err}"); - false - } - }; + let result = test(repo).await; // Remove repo dir maybe_cleanup_repo(&repo_dir)?; // Assert everything okay after we cleanup the repo dir - assert!(result); + assert!(result.is_ok(), "Error running test: {result:?}"); Ok(()) } diff --git a/oxen-rust/crates/lib/src/util.rs b/oxen-rust/crates/lib/src/util.rs index a7d6302e7..992f433cf 100644 --- a/oxen-rust/crates/lib/src/util.rs +++ b/oxen-rust/crates/lib/src/util.rs @@ -6,6 +6,7 @@ pub mod fs; pub mod glob; pub mod hasher; pub mod image; +pub mod internal_types; pub mod logging; pub mod oxen_version; pub mod paginate; diff --git a/oxen-rust/crates/lib/src/util/internal_types.rs b/oxen-rust/crates/lib/src/util/internal_types.rs new file mode 100644 index 000000000..bdbf5e9ec --- /dev/null +++ b/oxen-rust/crates/lib/src/util/internal_types.rs @@ -0,0 +1,49 @@ +use std::collections::{HashMap, HashSet}; + +/// Indicates that the type has a length. +pub(crate) trait HasLen { + fn len(&self) -> usize; +} + +macro_rules! impl_has_len_simple { + ($ty:ty) => { + impl HasLen for $ty { + fn len(&self) -> usize { + self.len() + } + } + }; +} + +macro_rules! impl_has_len_generic1 { + ($($ty:ident),*) => { + $( + impl HasLen for $ty { + fn len(&self) -> usize { + self.len() + } + } + )* + }; +} + +macro_rules! impl_has_len_generic2 { + ($($ty:ident),*) => { + $( + impl HasLen for $ty { + fn len(&self) -> usize { + self.len() + } + } + )* + }; +} + +impl_has_len_simple!(bytes::Bytes); +impl_has_len_simple!(String); +impl_has_len_simple!(str); + +impl_has_len_generic1!(Vec); +impl_has_len_generic1!(HashSet); + +impl_has_len_generic2!(HashMap);