From 4a8f272f1e9589e478a1440eff7c9eba865f1a49 Mon Sep 17 00:00:00 2001 From: Malcolm Greaves Date: Sun, 15 Mar 2026 14:05:51 +0530 Subject: [PATCH 1/5] Reapply "Standardized keep-alive and retry logic for http clients (#325)" (#341) This reverts commit 5080a4b4c5eddf58b76e0a7d802a7bdd748d48af. --- oxen-rust/crates/lib/src/api/client.rs | 127 +++++++++-------- .../crates/lib/src/api/client/commits.rs | 77 +++-------- oxen-rust/crates/lib/src/api/client/file.rs | 91 +++++++----- oxen-rust/crates/lib/src/api/client/import.rs | 83 ++++++++--- oxen-rust/crates/lib/src/api/client/retry.rs | 130 ++++++++++++++++++ oxen-rust/crates/lib/src/api/client/tree.rs | 9 +- .../crates/lib/src/api/client/versions.rs | 87 +++++------- .../lib/src/api/client/workspaces/files.rs | 2 +- oxen-rust/crates/lib/src/constants.rs | 29 +++- .../crates/lib/src/core/v_latest/push.rs | 2 +- oxen-rust/crates/lib/src/util.rs | 1 + .../crates/lib/src/util/internal_types.rs | 49 +++++++ 12 files changed, 451 insertions(+), 236 deletions(-) create mode 100644 oxen-rust/crates/lib/src/api/client/retry.rs create mode 100644 oxen-rust/crates/lib/src/util/internal_types.rs diff --git a/oxen-rust/crates/lib/src/api/client.rs b/oxen-rust/crates/lib/src/api/client.rs index 447a9e027..f3c902ab7 100644 --- a/oxen-rust/crates/lib/src/api/client.rs +++ b/oxen-rust/crates/lib/src/api/client.rs @@ -6,7 +6,6 @@ use crate::config::RuntimeConfig; use crate::config::runtime_config::runtime::Runtime; use crate::constants; use crate::error::OxenError; -use crate::model::RemoteRepository; use crate::view::OxenResponse; use crate::view::http; pub use reqwest::Url; @@ -30,6 +29,7 @@ pub mod metadata; pub mod oxen_version; pub mod prune; pub mod repositories; +pub mod retry; pub mod revisions; pub mod schemas; pub mod stats; @@ -49,92 +49,101 @@ pub fn get_scheme_and_host_from_url(url: &str) -> Result<(String, String), OxenE Ok((parsed_url.scheme().to_owned(), host_str)) } -// TODO: we probably want to create a pool of clients instead of constructing a -// new one for each request so we can take advantage of keep-alive -pub fn new_for_url(url: &str) -> Result { +// Note: reqwest::Client already maintains an internal HTTP connection pool with keep-alive. +// The 3 upload paths that matter most (push, version chunks, workspace files) already share +// clients via Arc. A global per-host cache (OnceCell + RwLock) would add +// complexity for little gain since per-request client overhead for metadata/query paths is minimal. +pub fn new_for_url(url: U) -> Result { let (_scheme, host) = get_scheme_and_host_from_url(url)?; - new_for_host(host, true) + new_for_host(&host, true) } pub fn new_for_url_no_user_agent(url: &str) -> Result { let (_scheme, host) = get_scheme_and_host_from_url(url)?; - new_for_host(host, false) + new_for_host(&host, false) } -fn new_for_host(host: String, should_add_user_agent: bool) -> Result { - match builder_for_host(host, should_add_user_agent)? - .timeout(time::Duration::from_secs(constants::timeout())) - .build() - { - Ok(client) => Ok(client), - Err(reqwest_err) => Err(OxenError::HTTP(reqwest_err)), - } -} - -pub fn new_for_remote_repo(remote_repo: &RemoteRepository) -> Result { - let (_scheme, host) = get_scheme_and_host_from_url(remote_repo.url())?; - new_for_host(host, true) +/// Has connection timeout and TCP keep-alives, but also imposes a per-request timeout. +/// NOT SUITABLE FOR LONG-LIVED TRANSFERS! Use `new_for_url_transfer` instead. +fn new_for_host(host: &str, should_add_user_agent: bool) -> Result { + builder_for_host( + host, + should_add_user_agent, + time::Duration::from_secs(constants::connect_timeout()), + time::Duration::from_secs(constants::tcp_keepalive()), + time::Duration::from_secs(20), + )? + .timeout(time::Duration::from_secs(constants::timeout())) + .build() + .map_err(OxenError::HTTP) } -pub fn builder_for_remote_repo(remote_repo: &RemoteRepository) -> Result { - let (_scheme, host) = get_scheme_and_host_from_url(remote_repo.url())?; - builder_for_host(host, true) +/// Create a client for long-lived transfers (uploads/downloads). +/// No overall timeout; uses connect_timeout + tcp_keepalive + HTTP/2 keep-alive +/// to detect hung connections without capping total transfer time. +pub fn new_for_url_transfer(url: U) -> Result { + let (_scheme, host) = get_scheme_and_host_from_url(url)?; + builder_for_host( + &host, + true, + time::Duration::from_secs(constants::connect_timeout()), + time::Duration::from_secs(constants::tcp_keepalive()), + time::Duration::from_secs(20), + )? + .build() + .map_err(OxenError::HTTP) } -pub fn builder_for_url(url: &str) -> Result { - let (_scheme, host) = get_scheme_and_host_from_url(url)?; - builder_for_host(host, true) +fn new_for_host_transfer(host: &str) -> Result { + builder_for_host( + host, + true, + time::Duration::from_secs(constants::connect_timeout()), + time::Duration::from_secs(constants::tcp_keepalive()), + time::Duration::from_secs(20), + )? + .build() + .map_err(OxenError::HTTP) } -fn builder_for_host(host: String, should_add_user_agent: bool) -> Result { +fn builder_for_host( + host: &str, + should_add_user_agent: bool, + connect_timeout: time::Duration, + keep_alive_interval: time::Duration, + http2_keep_alive_timeout: time::Duration, +) -> Result { let mut builder = Client::builder(); + if should_add_user_agent { let config = RuntimeConfig::get()?; - builder = builder.user_agent(build_user_agent(&config)); + let user_agent = build_user_agent(&config)?; + builder = builder.user_agent(user_agent); } - // Bump max retries for this oxen-server host from 2 to 3. Exponential backoff is used by default. - let retry_policy = retry::for_host(host.clone()) - .max_retries_per_request(3) - .classify_fn(|req_rep| { - // Still retry on low-level network errors - if req_rep.error().is_some() { - return req_rep.retryable(); - } - // Have reqwest retry all application-level server errors*, not just network-level errors - // that reqwest considers retryable by default. This assumes that oxen-server endpoints are - // safe to retry if the server returned any error mid-operation. We can tighten this up - // to only retry specific server errors in the future if that is not true. - // - // * info (100's), success (200's), redirection (300's), and client errors (400's) - // don't make sense to retry. We'll only retry server errors (500's). - match req_rep.status() { - Some(status_code) if status_code.is_server_error() => req_rep.retryable(), // retry - _ => req_rep.success(), // this means don't retry, and is the only other valid return value from the closure - } - }); - builder = builder.retry(retry_policy); + builder = builder + .connect_timeout(connect_timeout) + .tcp_keepalive(keep_alive_interval) + .http2_keep_alive_interval(keep_alive_interval) + .http2_keep_alive_timeout(http2_keep_alive_timeout); // If auth_config.toml isn't found, return without authorizing let config = match AuthConfig::get() { Ok(config) => config, Err(e) => { - log::debug!( - "Error getting config: {}. No auth token found for host {}", - e, - host - ); + log::debug!("Error getting config: {e}. No auth token found for host {host}"); return Ok(builder); } }; - if let Some(auth_token) = config.auth_token_for_host(host.as_str()) { + + if let Some(auth_token) = config.auth_token_for_host(host) { log::debug!("Setting auth token for host: {}", host); let auth_header = format!("Bearer {auth_token}"); let mut auth_value = match header::HeaderValue::from_str(auth_header.as_str()) { Ok(header) => header, Err(e) => { log::debug!("Invalid header value: {e}"); - return Err(OxenError::basic_str( + return Err(OxenError::authentication( "Error setting request auth. Please check your Oxen config.", )); } @@ -142,14 +151,16 @@ fn builder_for_host(host: String, should_add_user_agent: bool) -> Result String { +fn build_user_agent(config: &RuntimeConfig) -> Result { let host_platform = config.host_platform.display_name(); let runtime_name = match config.runtime_name { Runtime::CLI => config.runtime_name.display_name().to_string(), @@ -159,7 +170,7 @@ fn build_user_agent(config: &RuntimeConfig) -> String { config.runtime_version ), }; - format!("{USER_AGENT}/{VERSION} ({host_platform}; {runtime_name})") + Ok(format!("{USER_AGENT}/{VERSION} ({host_platform}; {runtime_name})")) } /// Performs an extra parse to validate that the response is success diff --git a/oxen-rust/crates/lib/src/api/client/commits.rs b/oxen-rust/crates/lib/src/api/client/commits.rs index 66e2741ce..53734d91d 100644 --- a/oxen-rust/crates/lib/src/api/client/commits.rs +++ b/oxen-rust/crates/lib/src/api/client/commits.rs @@ -719,7 +719,7 @@ pub async fn post_commit_dir_hashes_to_server( let quiet_bar = Arc::new(ProgressBar::hidden()); - let client = client::new_for_remote_repo(remote_repo)?; + let client = client::new_for_host_transfer(remote_repo.url())?; post_data_to_server_with_client( &client, remote_repo, @@ -763,18 +763,15 @@ pub async fn post_commits_dir_hashes_to_server( let buffer: Vec = tar.into_inner()?.finish()?; - let is_compressed = true; - let filename = None; - let quiet_bar = Arc::new(ProgressBar::hidden()); - let client = client::new_for_remote_repo(remote_repo)?; + let client = client::new_for_host_transfer(remote_repo.url())?; post_data_to_server_with_client( &client, remote_repo, buffer, - is_compressed, - &filename, + true, // compression + &None, // filename quiet_bar, ) .await @@ -838,33 +835,15 @@ pub async fn upload_single_tarball_to_server_with_client_with_retry( buffer: &[u8], bar: Arc, ) -> Result<(), OxenError> { - let mut total_tries = 0; - - while total_tries < constants::NUM_HTTP_RETRIES { - match upload_single_tarball_to_server_with_client( - client, - remote_repo, - buffer, - bar.to_owned(), - ) - .await - { - Ok(_) => { - return Ok(()); - } - Err(err) => { - total_tries += 1; - // Exponentially back off - let sleep_time = total_tries * total_tries; - log::debug!( - "upload_single_tarball_to_server_with_retry upload failed sleeping {sleep_time}: {err:?}" - ); - std::thread::sleep(std::time::Duration::from_secs(sleep_time)); - } + let config = crate::api::client::retry::RetryConfig::default(); + crate::api::client::retry::with_retry(&config, |_attempt| { + let bar = bar.clone(); + async move { + upload_single_tarball_to_server_with_client(client, remote_repo, buffer, bar).await?; + Ok(()) } - } - - Err(OxenError::basic_str("Upload retry failed.")) + }) + .await } async fn upload_single_tarball_to_server_with_client( @@ -952,10 +931,9 @@ pub async fn upload_data_chunk_to_server_with_retry( is_compressed: bool, filename: &Option, ) -> Result<(), OxenError> { - let mut total_tries = 0; - let mut last_error = String::from(""); - while total_tries < constants::NUM_HTTP_RETRIES { - match upload_data_chunk_to_server( + let config = crate::api::client::retry::RetryConfig::default(); + crate::api::client::retry::with_retry(&config, |_attempt| async move { + upload_data_chunk_to_server( client, remote_repo, chunk, @@ -964,27 +942,10 @@ pub async fn upload_data_chunk_to_server_with_retry( is_compressed, filename, ) - .await - { - Ok(_) => { - return Ok(()); - } - Err(err) => { - total_tries += 1; - // Exponentially back off - let sleep_time = total_tries * total_tries; - log::debug!( - "upload_data_chunk_to_server_with_retry upload failed sleeping {sleep_time}: {err}" - ); - last_error = format!("{err}"); - std::thread::sleep(std::time::Duration::from_secs(sleep_time)); - } - } - } - - Err(OxenError::basic_str(format!( - "Upload chunk retry failed. {last_error}" - ))) + .await?; + Ok(()) + }) + .await } async fn upload_data_chunk_to_server( diff --git a/oxen-rust/crates/lib/src/api/client/file.rs b/oxen-rust/crates/lib/src/api/client/file.rs index 0345e3255..fb271550f 100644 --- a/oxen-rust/crates/lib/src/api/client/file.rs +++ b/oxen-rust/crates/lib/src/api/client/file.rs @@ -1,9 +1,10 @@ -use crate::api; use crate::api::client; +use crate::api::client::retry; use crate::error::OxenError; use crate::model::RemoteRepository; use crate::model::commit::NewCommitBody; use crate::view::CommitResponse; +use crate::{api, util::internal_types::HasLen}; use bytes::{Bytes, BytesMut}; use futures_util::StreamExt; @@ -22,11 +23,12 @@ pub async fn put_file( let directory = directory.as_ref(); put_multipart_file( remote_repo, - format!("/file/{branch}/{directory}"), + &format!("/file/{branch}/{directory}"), "files[]", - file_path, - file_name, - commit_body, + file_path.as_ref(), + file_name.as_ref().map(|s| s.as_ref()), + commit_body.as_ref(), + &retry::RetryConfig::default(), ) .await } @@ -43,53 +45,76 @@ pub async fn put_file_to_path( let file_path_on_repo = file_path_on_repo.as_ref(); put_multipart_file( remote_repo, - format!("/file/{branch}/{file_path_on_repo}"), + &format!("/file/{branch}/{file_path_on_repo}"), "file", - file_path, - file_name, - commit_body, + file_path.as_ref(), + file_name.as_ref().map(|s| s.as_ref()), + commit_body.as_ref(), + &retry::RetryConfig::default(), ) .await } - async fn put_multipart_file( remote_repo: &RemoteRepository, - uri: String, + uri: &str, field_name: &'static str, - file_path: impl AsRef, - file_name: Option>, - commit_body: Option, + file_path: &Path, + file_name: Option<&str>, + commit_body: Option<&NewCommitBody>, + config: &retry::RetryConfig, ) -> Result { - let file_path = file_path.as_ref(); log::debug!("put_multipart_file {uri:?}, file_path {file_path:?}"); - let url = api::endpoint::url_from_repo(remote_repo, &uri)?; - let client = client::new_for_url(&url)?; + let url = api::endpoint::url_from_repo(remote_repo, uri)?; + let client = client::new_for_host_transfer(&url)?; - let file_part = make_file_part(file_path, file_name).await?; - let form = apply_commit_body(Form::new().part(field_name, file_part), commit_body); - let res = client.put(&url).multipart(form).send().await?; - let body = client::parse_json_body(&url, res).await?; - Ok(serde_json::from_str(&body)?) + let file_data = bytes::Bytes::from(tokio::fs::read(file_path).await?); + + retry::with_retry(config, |_attempt| { + let file_data = file_data.clone(); // cloning is cheap: it's essentially an Arc<[u8]> + let client = client.clone(); // HTTP client is also an Arc + let url = url.clone(); // it's just the length + pointer + + async move { + let file_part = make_file_part(file_data, file_name).await?; + let form = apply_commit_body(Form::new().part(field_name, file_part), commit_body); + + let res = client.put(&url).multipart(form).send().await?; + let body = client::parse_json_body(&url, res).await?; + + Ok(serde_json::from_str(&body)?) + } + }) + .await } -async fn make_file_part( - file_path: &Path, - file_name: Option>, +/// Create a Part in a multipart Form from the specified data. +/// Intended to be used for uploading a single file. +async fn make_file_part + HasLen>( + file_data: T, + file_name: Option<&str>, ) -> Result { - let file_part = Part::file(file_path).await?; + let file_data_len = file_data.len() as u64; + let file_part = reqwest::multipart::Part::stream_with_length(file_data, file_data_len); Ok(match file_name { - Some(file_name) => file_part.file_name(file_name.as_ref().to_string()), + Some(file_name) => file_part.file_name(file_name.to_string()), None => file_part, }) } -fn apply_commit_body(mut form: Form, commit_body: Option) -> Form { - if let Some(body) = commit_body { - form = form.text("name", body.author); - form = form.text("email", body.email); - form = form.text("message", body.message); +/// Add the commit information as fields to the form. +fn apply_commit_body(form: Form, commit_body: Option<&NewCommitBody>) -> Form { + if let Some(NewCommitBody { + message, + author, + email, + }) = commit_body + { + form.text("name", author.to_string()) + .text("email", email.to_string()) + .text("message", message.to_string()) + } else { + form } - form } pub async fn get_file( diff --git a/oxen-rust/crates/lib/src/api/client/import.rs b/oxen-rust/crates/lib/src/api/client/import.rs index 1611db20d..6f59e329e 100644 --- a/oxen-rust/crates/lib/src/api/client/import.rs +++ b/oxen-rust/crates/lib/src/api/client/import.rs @@ -1,5 +1,6 @@ use crate::api; use crate::api::client; +use crate::api::client::retry; use crate::error::OxenError; use crate::model::RemoteRepository; @@ -18,42 +19,80 @@ pub async fn upload_zip( let branch_name = branch_name.as_ref(); let directory = directory.as_ref(); let zip_path = zip_path.as_ref(); + let name = name.as_ref(); let email = email.as_ref(); - // Read the ZIP file - let zip_data = std::fs::read(zip_path)?; + // Read the ZIP file into memory once for potential retries + // NOTE: bytes::Bytes wraps the Vec into an Arc internally. + // We need to move into `reqwest.multipart::Part::bytes`, but with retries, we don't want + // to actually clone the data. This wrapper makes it cheap to clone as we're just + // incrementing the arc's internal reference count. + let zip_data = bytes::Bytes::from(tokio::fs::read(zip_path).await?); + let len_zip_data = zip_data.len() as u64; let file_name = zip_path .file_name() .ok_or_else(|| OxenError::basic_str("Invalid ZIP file path"))? - .to_string_lossy(); + .to_string_lossy() + .to_string(); - // Create the URL for workspace ZIP upload endpoint let uri = format!("/import/upload/{branch_name}/{directory}"); let url = api::endpoint::url_from_repo(remote_repo, &uri)?; + let commit_msg = commit_message.map(|m| m.as_ref().to_string()); + + let config = retry::RetryConfig::default(); + retry::with_retry(&config, |_attempt| { + let url = url.clone(); + let zip_data = zip_data.clone(); + let file_name = file_name.clone(); + let name = name.to_string(); + let email = email.to_string(); + let directory = directory.to_string(); + let commit_msg = commit_msg.clone(); + async move { + let form = make_multipart_form( + zip_data, + len_zip_data, + file_name, + name, + email, + directory, + commit_msg, + ); + + let client = client::new_for_url_transfer(&url)?; + let response = client.post(&url).multipart(form).send().await?; + let body = client::parse_json_body(&url, response).await?; + let response: crate::view::CommitResponse = serde_json::from_str(&body) + .map_err(|e| OxenError::basic_str(format!("Failed to parse response: {e}")))?; + Ok(response.commit) + } + }) + .await +} + +fn make_multipart_form>( + zip_data: T, + len_zip_data: u64, + file_name: String, + name: String, + email: String, + directory: String, + commit_msg: Option, +) -> reqwest::multipart::Form { + let file_part = + reqwest::multipart::Part::stream_with_length(zip_data, len_zip_data).file_name(file_name); - // Create multipart form - let file_part = reqwest::multipart::Part::bytes(zip_data).file_name(file_name.to_string()); let mut form = reqwest::multipart::Form::new() .part("file", file_part) - .text("name", name.to_string()) - .text("email", email.to_string()) - .text("resource_path", directory.to_string()); + .text("name", name) + .text("email", email) + .text("resource_path", directory); - if let Some(msg) = commit_message { - form = form.text("commit_message", msg.as_ref().to_string()); + if let Some(msg) = commit_msg { + form = form.text("commit_message", msg); } - - // Send the request - let client = client::new_for_url(&url)?; - let response = client.post(&url).multipart(form).send().await?; - let body = client::parse_json_body(&url, response).await?; - - // Parse the response - let response: crate::view::CommitResponse = serde_json::from_str(&body) - .map_err(|e| OxenError::basic_str(format!("Failed to parse response: {e}")))?; - - Ok(response.commit) + form } #[cfg(test)] diff --git a/oxen-rust/crates/lib/src/api/client/retry.rs b/oxen-rust/crates/lib/src/api/client/retry.rs new file mode 100644 index 000000000..9c3da5a55 --- /dev/null +++ b/oxen-rust/crates/lib/src/api/client/retry.rs @@ -0,0 +1,130 @@ +use crate::constants; +use crate::error::OxenError; +use rand::Rng; +use std::future::Future; + +pub struct RetryConfig { + pub max_retries: usize, + pub base_wait_ms: u64, + pub max_wait_ms: u64, +} + +impl Default for RetryConfig { + fn default() -> Self { + Self { + max_retries: constants::max_retries(), + base_wait_ms: 300, + max_wait_ms: 10_000, + } + } +} + +/// True exponential backoff: base_wait_ms * 2^attempt + jitter, clamped to max_wait_ms +pub fn exponential_backoff(base_wait_ms: u64, attempt: usize, max_wait_ms: u64) -> u64 { + let exp = base_wait_ms.saturating_mul(1u64 << attempt.min(16)); + let jitter = rand::thread_rng().gen_range(0..=500u64); + exp.saturating_add(jitter).min(max_wait_ms) +} + +/// Retry an async operation with exponential backoff. +/// Authentication errors are not retried. +pub async fn with_retry(config: &RetryConfig, mut operation: F) -> Result +where + F: FnMut(usize) -> Fut, + Fut: Future>, +{ + let mut last_err = None; + for attempt in 0..=config.max_retries { + match operation(attempt).await { + Ok(val) => return Ok(val), + error @ Err(OxenError::Authentication(_)) => return error, + Err(err) => { + log::warn!( + "Retry attempt {}/{} failed: {err}", + attempt + 1, + config.max_retries + 1 + ); + last_err = Some(err); + if attempt < config.max_retries { + let wait = + exponential_backoff(config.base_wait_ms, attempt, config.max_wait_ms); + tokio::time::sleep(std::time::Duration::from_millis(wait)).await; + } + } + } + } + Err(last_err.unwrap_or_else(|| OxenError::basic_str("Retry failed with no attempts"))) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_exponential_backoff_increases() { + let b0 = exponential_backoff(300, 0, 100_000); + let b1 = exponential_backoff(300, 1, 100_000); + let b2 = exponential_backoff(300, 2, 100_000); + // With jitter, we can't be exact, but the base doubles each time + // 300*1=300, 300*2=600, 300*4=1200 (plus jitter 0..500) + assert!(b0 <= 800, "b0={b0}"); + assert!(b1 >= 300 && b1 <= 1100, "b1={b1}"); + assert!(b2 >= 900 && b2 <= 1700, "b2={b2}"); + } + + #[test] + fn test_exponential_backoff_clamps_to_max() { + let val = exponential_backoff(300, 20, 5000); + assert!(val <= 5000, "val={val}"); + } + + #[tokio::test] + async fn test_with_retry_immediate_success() { + let config = RetryConfig { + max_retries: 3, + base_wait_ms: 10, + max_wait_ms: 100, + }; + let result: Result = with_retry(&config, |_attempt| async { Ok(42) }).await; + assert_eq!(result.unwrap(), 42); + } + + #[tokio::test] + async fn test_with_retry_exhaustion() { + let config = RetryConfig { + max_retries: 2, + base_wait_ms: 10, + max_wait_ms: 50, + }; + let result: Result = with_retry(&config, |_attempt| async { + Err(OxenError::basic_str("always fails")) + }) + .await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn test_with_retry_succeeds_on_third_attempt() { + let config = RetryConfig { + max_retries: 3, + base_wait_ms: 10, + max_wait_ms: 50, + }; + let counter = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0)); + let counter_clone = counter.clone(); + let result: Result = with_retry(&config, move |_attempt| { + let c = counter_clone.clone(); + async move { + let prev = c.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + if prev < 2 { + Err(OxenError::basic_str("not yet")) + } else { + Ok(99) + } + } + }) + .await; + assert_eq!(result.unwrap(), 99); + assert_eq!(counter.load(std::sync::atomic::Ordering::SeqCst), 3); + } +} diff --git a/oxen-rust/crates/lib/src/api/client/tree.rs b/oxen-rust/crates/lib/src/api/client/tree.rs index eb1168590..88b9cdd28 100644 --- a/oxen-rust/crates/lib/src/api/client/tree.rs +++ b/oxen-rust/crates/lib/src/api/client/tree.rs @@ -6,7 +6,6 @@ use futures_util::TryStreamExt; use std::collections::HashSet; use std::path::PathBuf; use std::sync::Arc; -use std::time; use tempfile::TempDir; use crate::api::client; @@ -91,9 +90,7 @@ pub async fn create_nodes( // Upload the node let uri = "/tree/nodes".to_string(); let url = api::endpoint::url_from_repo(remote_repo, &uri)?; - let client = client::builder_for_url(&url)? - .timeout(time::Duration::from_secs(120)) - .build()?; + let client = client::new_for_url_transfer(&url)?; let size = buffer.len() as u64; log::debug!( @@ -357,9 +354,7 @@ async fn node_download_request( ) -> Result<(), OxenError> { let url = url.as_ref(); - let client = client::builder_for_url(url)? - .timeout(time::Duration::from_secs(12000)) - .build()?; + let client = client::new_for_url_transfer(url)?; log::debug!("node_download_request about to send request {url}"); let res = client.get(url).send().await?; let res = client::handle_non_json_response(url, res).await?; diff --git a/oxen-rust/crates/lib/src/api/client/versions.rs b/oxen-rust/crates/lib/src/api/client/versions.rs index e41025878..7a6e6e79d 100644 --- a/oxen-rust/crates/lib/src/api/client/versions.rs +++ b/oxen-rust/crates/lib/src/api/client/versions.rs @@ -1,6 +1,7 @@ use crate::api; use crate::api::client; use crate::api::client::internal_types::LocalOrBase; +use crate::api::client::retry; use crate::constants::{AVG_CHUNK_SIZE, max_retries}; use crate::error::OxenError; use crate::model::entry::commit_entry::Entry; @@ -21,7 +22,6 @@ use futures_util::StreamExt; use futures_util::stream::FuturesUnordered; use http::Method; use http::header::CONTENT_LENGTH; -use rand::{Rng, thread_rng}; use tokio_tar::Archive; use tokio_util::codec::{BytesCodec, FramedRead}; @@ -197,29 +197,15 @@ pub async fn download_data_from_version_paths( hashes: &[String], local_repo: &LocalRepository, ) -> Result { - let total_retries = max_retries().try_into().unwrap_or(max_retries() as u64); - let mut num_retries = 0; - - while num_retries < total_retries { + let config = retry::RetryConfig::default(); + retry::with_retry(&config, |_attempt| async { match try_download_data_from_version_paths(remote_repo, hashes, local_repo).await { - Ok(val) => return Ok(val), - Err(OxenError::Authentication(val)) => return Err(OxenError::Authentication(val)), - Err(err) => { - num_retries += 1; - // Exponentially back off - let sleep_time = num_retries * num_retries; - log::warn!("Could not download content {err:?} sleeping {sleep_time}"); - tokio::time::sleep(std::time::Duration::from_secs(sleep_time)).await; - } + Ok(val) => Ok(val), + Err(OxenError::Authentication(val)) => Err(OxenError::Authentication(val)), + Err(err) => Err(err), } - } - - let err = format!( - "Err: Failed to download {} files after {} retries", - hashes.len(), - total_retries - ); - Err(OxenError::basic_str(err)) + }) + .await } pub async fn try_download_data_from_version_paths( @@ -312,7 +298,7 @@ async fn upload_chunks( max_retries: usize, progress: Option<&Arc>, ) -> Result>, OxenError> { - let client = Arc::new(api::client::builder_for_remote_repo(remote_repo)?.build()?); + let client = Arc::new(api::client::new_for_host_transfer(remote_repo.url())?); // Figure out how many parts we need to upload let file_size = upload.size; @@ -353,8 +339,8 @@ async fn upload_chunks( )) })?; - let wait_time = exponential_backoff(BASE_WAIT_TIME, i, MAX_WAIT_TIME); - sleep(Duration::from_millis(wait_time as u64)).await; + let wait_time = retry::exponential_backoff(BASE_WAIT_TIME as u64, i, MAX_WAIT_TIME as u64); + sleep(Duration::from_millis(wait_time)).await; chunk = upload_chunk(&client, &remote_repo, &upload, start, chunk_size).await; i += 1; @@ -485,30 +471,28 @@ pub async fn multipart_batch_upload_with_retry( chunk: &Vec, client: &reqwest::Client, ) -> Result<(), OxenError> { - let mut files_to_retry: Vec = vec![]; - let mut first_try = true; - let mut retry_count: usize = 0; let max_retries = max_retries(); + let mut files_to_retry: Vec = vec![]; - while (first_try || !files_to_retry.is_empty()) && retry_count < max_retries { - first_try = false; - retry_count += 1; - + for attempt in 0..max_retries { files_to_retry = multipart_batch_upload(local_repo, remote_repo, chunk, client, files_to_retry).await?; - if !files_to_retry.is_empty() { - let wait_time = exponential_backoff(BASE_WAIT_TIME, retry_count, MAX_WAIT_TIME); - sleep(Duration::from_millis(wait_time as u64)).await; + if files_to_retry.is_empty() { + return Ok(()); + } + + // Don't sleep after the last attempt + if attempt + 1 < max_retries { + let wait_time = + retry::exponential_backoff(BASE_WAIT_TIME as u64, attempt, MAX_WAIT_TIME as u64); + sleep(Duration::from_millis(wait_time)).await; } } - if files_to_retry.is_empty() { - Ok(()) - } else { - Err(OxenError::basic_str(format!( - "Failed to upload files: {files_to_retry:#?}" - ))) - } + + Err(OxenError::basic_str(format!( + "Failed to upload files: {files_to_retry:#?}" + ))) } pub async fn multipart_batch_upload( @@ -780,8 +764,12 @@ pub(crate) async fn workspace_multipart_batch_upload_parts_with_retry( }; if !files_to_retry.is_empty() { - let wait_time = exponential_backoff(BASE_WAIT_TIME, retry_count, MAX_WAIT_TIME); - sleep(Duration::from_millis(wait_time as u64)).await; + let wait_time = retry::exponential_backoff( + BASE_WAIT_TIME as u64, + retry_count, + MAX_WAIT_TIME as u64, + ); + sleep(Duration::from_millis(wait_time)).await; } } @@ -794,17 +782,6 @@ pub(crate) async fn workspace_multipart_batch_upload_parts_with_retry( Ok(upload_result.err_files) } -pub fn exponential_backoff(base_wait_time: usize, n: usize, max: usize) -> usize { - log::debug!( - "Exponential backoff got called with base_wait_time {base_wait_time}. n {n}, and max {max}" - ); - (base_wait_time + n.pow(2) + jitter()).min(max) -} - -fn jitter() -> usize { - thread_rng().gen_range(0..=500) -} - #[cfg(test)] mod tests { use std::path::PathBuf; diff --git a/oxen-rust/crates/lib/src/api/client/workspaces/files.rs b/oxen-rust/crates/lib/src/api/client/workspaces/files.rs index fa51b3e86..8620f5ca6 100644 --- a/oxen-rust/crates/lib/src/api/client/workspaces/files.rs +++ b/oxen-rust/crates/lib/src/api/client/workspaces/files.rs @@ -448,7 +448,7 @@ pub(crate) async fn parallel_batched_small_file_upload( } // Create a client for uploading batches - let client = Arc::new(api::client::builder_for_remote_repo(remote_repo)?.build()?); + let client = Arc::new(api::client::new_for_host_transfer(remote_repo.url())?); // For individual files let err_files: Arc>> = Arc::new(Mutex::new(vec![])); diff --git a/oxen-rust/crates/lib/src/constants.rs b/oxen-rust/crates/lib/src/constants.rs index f5fe965b2..3980dad1e 100644 --- a/oxen-rust/crates/lib/src/constants.rs +++ b/oxen-rust/crates/lib/src/constants.rs @@ -193,7 +193,11 @@ pub const NUM_HTTP_RETRIES: u64 = 5; /// Number of workers pub const DEFAULT_NUM_WORKERS: usize = 8; /// Default timeout for HTTP requests -pub const DEFAULT_TIMEOUT_SECS: u64 = 600; +pub const DEFAULT_TIMEOUT_SECS: u64 = 120; +/// Default connect timeout for HTTP requests +pub const DEFAULT_CONNECT_TIMEOUT_SECS: u64 = 10; +/// Default TCP keep-alive interval +pub const DEFAULT_TCP_KEEPALIVE_SECS: u64 = 30; /// Default vnode size pub const DEFAULT_VNODE_SIZE: u64 = 10_000; @@ -270,3 +274,26 @@ pub fn chunk_size() -> u64 { AVG_CHUNK_SIZE } } + +// Parse the connect timeout from environment variable +pub fn connect_timeout() -> u64 { + if let Ok(val) = std::env::var("OXEN_CONNECT_TIMEOUT_SECS") + && let Ok(val) = val.parse::() + { + return val; + } + DEFAULT_CONNECT_TIMEOUT_SECS +} + +// Parse the TCP keep-alive interval from environment variable +pub fn tcp_keepalive() -> u64 { + if let Ok(val) = std::env::var("OXEN_TCP_KEEPALIVE_SECS") + && let Ok(val) = val.parse::() + { + return val; + } + DEFAULT_TCP_KEEPALIVE_SECS +} + +// Oxen request Id +pub const OXEN_REQUEST_ID: &str = "x-oxen-request-id"; diff --git a/oxen-rust/crates/lib/src/core/v_latest/push.rs b/oxen-rust/crates/lib/src/core/v_latest/push.rs index 40016f514..720193e4e 100644 --- a/oxen-rust/crates/lib/src/core/v_latest/push.rs +++ b/oxen-rust/crates/lib/src/core/v_latest/push.rs @@ -716,7 +716,7 @@ async fn bundle_and_send_small_entries( } // Create a client for uploading chunks - let client = Arc::new(api::client::builder_for_remote_repo(remote_repo)?.build()?); + let client = Arc::new(api::client::new_for_url_transfer(remote_repo.url())?); // Split into chunks, zip up, and post to server use tokio::time::sleep; diff --git a/oxen-rust/crates/lib/src/util.rs b/oxen-rust/crates/lib/src/util.rs index a7d6302e7..34037de58 100644 --- a/oxen-rust/crates/lib/src/util.rs +++ b/oxen-rust/crates/lib/src/util.rs @@ -6,6 +6,7 @@ pub mod fs; pub mod glob; pub mod hasher; pub mod image; +pub(crate) mod internal_types; pub mod logging; pub mod oxen_version; pub mod paginate; diff --git a/oxen-rust/crates/lib/src/util/internal_types.rs b/oxen-rust/crates/lib/src/util/internal_types.rs new file mode 100644 index 000000000..bdbf5e9ec --- /dev/null +++ b/oxen-rust/crates/lib/src/util/internal_types.rs @@ -0,0 +1,49 @@ +use std::collections::{HashMap, HashSet}; + +/// Indicates that the type has a length. +pub(crate) trait HasLen { + fn len(&self) -> usize; +} + +macro_rules! impl_has_len_simple { + ($ty:ty) => { + impl HasLen for $ty { + fn len(&self) -> usize { + self.len() + } + } + }; +} + +macro_rules! impl_has_len_generic1 { + ($($ty:ident),*) => { + $( + impl HasLen for $ty { + fn len(&self) -> usize { + self.len() + } + } + )* + }; +} + +macro_rules! impl_has_len_generic2 { + ($($ty:ident),*) => { + $( + impl HasLen for $ty { + fn len(&self) -> usize { + self.len() + } + } + )* + }; +} + +impl_has_len_simple!(bytes::Bytes); +impl_has_len_simple!(String); +impl_has_len_simple!(str); + +impl_has_len_generic1!(Vec); +impl_has_len_generic1!(HashSet); + +impl_has_len_generic2!(HashMap); From b4063de8ab4e5365cad71c471613c65946bc9c02 Mon Sep 17 00:00:00 2001 From: Malcolm Greaves Date: Tue, 17 Mar 2026 16:07:07 +0530 Subject: [PATCH 2/5] FIX - use Host for building client to get auth correct The problem was that we were incorrectly using the full repository URL instead of supplying just the host. This caused the auth to fail because it was trying to provide an auth token for the repository url instead of the host (i.e. hub.oxen.ai/api/ox/repo instead of hub.oxen.ai). To prevent confusion, `String` is no longer used to indicate the host. Instead, the dedicated `Host` enum from `url::Url` is used. All call-sites have bene updated to properly parse the URL and extract the host. If there's no host in the URL, then the new `OxenError::NoHost` variant is returned. A better design would be to keep `Url`'s present instead of `String`. The best design would be a new wrapper for `Url` that would always have the `scheme` and `host` present, since we never want to use a repository URL that doesn't have a host (and a URL can be valid w/o having a host). --- oxen-rust/crates/cli/src/cmd/clone.rs | 12 ++++-- oxen-rust/crates/cli/src/helpers.rs | 15 +++++--- oxen-rust/crates/lib/src/api/client.rs | 37 ++++++++++--------- .../crates/lib/src/api/client/commits.rs | 19 +++++++++- oxen-rust/crates/lib/src/api/client/file.rs | 12 +++++- .../crates/lib/src/api/client/repositories.rs | 12 ++++-- .../crates/lib/src/api/client/versions.rs | 10 ++++- .../lib/src/api/client/workspaces/files.rs | 11 +++++- .../crates/lib/src/config/auth_config.rs | 12 +++--- oxen-rust/crates/lib/src/error.rs | 2 + .../src/model/repository/remote_repository.rs | 10 +++-- .../lib/src/repositories/workspaces/upload.rs | 6 +-- 12 files changed, 111 insertions(+), 47 deletions(-) diff --git a/oxen-rust/crates/cli/src/cmd/clone.rs b/oxen-rust/crates/cli/src/cmd/clone.rs index b4b3735d8..1fc9c9823 100644 --- a/oxen-rust/crates/cli/src/cmd/clone.rs +++ b/oxen-rust/crates/cli/src/cmd/clone.rs @@ -1,8 +1,8 @@ use async_trait::async_trait; use clap::{Arg, Command, arg}; use std::path::{Component, Path, PathBuf}; +use url::Url; -use liboxen::api; use liboxen::constants::DEFAULT_BRANCH_NAME; use liboxen::error::OxenError; use liboxen::opts::CloneOpts; @@ -167,10 +167,16 @@ impl RunCmd for CloneCmd { is_remote, }; - let (scheme, host) = api::client::get_scheme_and_host_from_url(&opts.url)?; + let (scheme, host) = { + let url: Url = opts.url.parse()?; + let Some(host) = url.host() else { + return Err(OxenError::NoHost(opts.url.into())); + }; + (url.scheme().to_string(), host.to_string()) + }; // TODO: Do I need to worry about this for remote repo? - check_remote_version_blocking(scheme.clone(), host.clone()).await?; + check_remote_version_blocking(&scheme, &host).await?; check_remote_version(scheme, host).await?; repositories::clone(&opts).await?; diff --git a/oxen-rust/crates/cli/src/helpers.rs b/oxen-rust/crates/cli/src/helpers.rs index 58ac02795..ba41a863b 100644 --- a/oxen-rust/crates/cli/src/helpers.rs +++ b/oxen-rust/crates/cli/src/helpers.rs @@ -7,6 +7,7 @@ use liboxen::model::LocalRepository; use liboxen::util::oxen_version::OxenVersion; use colored::Colorize; +use url::Url; use std::collections::HashMap; use std::str::FromStr; @@ -30,12 +31,16 @@ pub fn get_scheme_and_host_or_default() -> Result<(String, String), OxenError> { pub fn get_scheme_and_host_from_repo( repo: &LocalRepository, ) -> Result<(String, String), OxenError> { - if let Some(remote) = repo.remote() { - let host_and_scheme = api::client::get_scheme_and_host_from_url(&remote.url)?; - return Ok(host_and_scheme); + match repo.remote() { + Some(remote) => { + let url: Url = remote.url.parse()?; + let Some(host) = url.host() else { + return Err(OxenError::NoHost(remote.url.into())); + }; + Ok((url.scheme().to_string(), host.to_string())) + } + None => get_scheme_and_host_or_default(), } - - get_scheme_and_host_or_default() } pub async fn check_remote_version( diff --git a/oxen-rust/crates/lib/src/api/client.rs b/oxen-rust/crates/lib/src/api/client.rs index f3c902ab7..a79e4cf42 100644 --- a/oxen-rust/crates/lib/src/api/client.rs +++ b/oxen-rust/crates/lib/src/api/client.rs @@ -6,12 +6,14 @@ use crate::config::RuntimeConfig; use crate::config::runtime_config::runtime::Runtime; use crate::constants; use crate::error::OxenError; +use crate::error::StringError; use crate::view::OxenResponse; use crate::view::http; pub use reqwest::Url; use reqwest::retry; use reqwest::{Client, ClientBuilder, header}; use std::time; +use url::Host; pub mod branches; pub mod commits; @@ -40,32 +42,29 @@ pub mod workspaces; const VERSION: &str = crate::constants::OXEN_VERSION; const USER_AGENT: &str = "Oxen"; -pub fn get_scheme_and_host_from_url(url: &str) -> Result<(String, String), OxenError> { - let parsed_url = Url::parse(url)?; - let mut host_str = parsed_url.host_str().unwrap_or_default().to_string(); - if let Some(port) = parsed_url.port() { - host_str = format!("{host_str}:{port}"); - } - Ok((parsed_url.scheme().to_owned(), host_str)) -} - // Note: reqwest::Client already maintains an internal HTTP connection pool with keep-alive. // The 3 upload paths that matter most (push, version chunks, workspace files) already share // clients via Arc. A global per-host cache (OnceCell + RwLock) would add // complexity for little gain since per-request client overhead for metadata/query paths is minimal. pub fn new_for_url(url: U) -> Result { - let (_scheme, host) = get_scheme_and_host_from_url(url)?; + let url = url.into_url()?; + let Some(host) = url.host() else { + return Err(OxenError::NoHost(StringError::new(url.to_string()))); + }; new_for_host(&host, true) } -pub fn new_for_url_no_user_agent(url: &str) -> Result { - let (_scheme, host) = get_scheme_and_host_from_url(url)?; +pub fn new_for_url_no_user_agent(url: U) -> Result { + let url = url.into_url()?; + let Some(host) = url.host() else { + return Err(OxenError::NoHost(StringError::new(url.to_string()))); + }; new_for_host(&host, false) } /// Has connection timeout and TCP keep-alives, but also imposes a per-request timeout. /// NOT SUITABLE FOR LONG-LIVED TRANSFERS! Use `new_for_url_transfer` instead. -fn new_for_host(host: &str, should_add_user_agent: bool) -> Result { +fn new_for_host(host: &Host<&str>, should_add_user_agent: bool) -> Result { builder_for_host( host, should_add_user_agent, @@ -82,7 +81,11 @@ fn new_for_host(host: &str, should_add_user_agent: bool) -> Result(url: U) -> Result { - let (_scheme, host) = get_scheme_and_host_from_url(url)?; + let url = url.into_url()?; + let Some(host) = url.host() else { + return Err(OxenError::NoHost(url.to_string().into())); + }; + builder_for_host( &host, true, @@ -94,7 +97,7 @@ pub fn new_for_url_transfer(url: U) -> Result { .map_err(OxenError::HTTP) } -fn new_for_host_transfer(host: &str) -> Result { +fn new_for_host_transfer(host: &Host<&str>) -> Result { builder_for_host( host, true, @@ -107,7 +110,7 @@ fn new_for_host_transfer(host: &str) -> Result { } fn builder_for_host( - host: &str, + host: &Host<&str>, should_add_user_agent: bool, connect_timeout: time::Duration, keep_alive_interval: time::Duration, @@ -131,7 +134,7 @@ fn builder_for_host( let config = match AuthConfig::get() { Ok(config) => config, Err(e) => { - log::debug!("Error getting config: {e}. No auth token found for host {host}"); + log::debug!("Error getting config: {e}. No auth configuration found!"); return Ok(builder); } }; diff --git a/oxen-rust/crates/lib/src/api/client/commits.rs b/oxen-rust/crates/lib/src/api/client/commits.rs index 53734d91d..5af44a986 100644 --- a/oxen-rust/crates/lib/src/api/client/commits.rs +++ b/oxen-rust/crates/lib/src/api/client/commits.rs @@ -31,6 +31,7 @@ use flate2::write::GzEncoder; use futures_util::TryStreamExt; use http::header::CONTENT_LENGTH; use indicatif::{ProgressBar, ProgressStyle}; +use url::Url; pub struct ChunkParams { pub chunk_num: usize, @@ -719,7 +720,14 @@ pub async fn post_commit_dir_hashes_to_server( let quiet_bar = Arc::new(ProgressBar::hidden()); - let client = client::new_for_host_transfer(remote_repo.url())?; + let client = { + let raw_url = remote_repo.url(); + let url: Url = raw_url.parse()?; + let Some(host) = url.host() else { + return Err(OxenError::NoHost(raw_url.into())); + }; + client::new_for_host_transfer(&host)? + }; post_data_to_server_with_client( &client, remote_repo, @@ -765,7 +773,14 @@ pub async fn post_commits_dir_hashes_to_server( let quiet_bar = Arc::new(ProgressBar::hidden()); - let client = client::new_for_host_transfer(remote_repo.url())?; + let client = { + let raw_url = remote_repo.url(); + let url: Url = raw_url.parse()?; + let Some(host) = url.host() else { + return Err(OxenError::NoHost(raw_url.into())); + }; + client::new_for_host_transfer(&host)? + }; post_data_to_server_with_client( &client, remote_repo, diff --git a/oxen-rust/crates/lib/src/api/client/file.rs b/oxen-rust/crates/lib/src/api/client/file.rs index fb271550f..7c94f70af 100644 --- a/oxen-rust/crates/lib/src/api/client/file.rs +++ b/oxen-rust/crates/lib/src/api/client/file.rs @@ -10,6 +10,7 @@ use bytes::{Bytes, BytesMut}; use futures_util::StreamExt; use reqwest::multipart::{Form, Part}; use std::path::Path; +use url::Url; pub async fn put_file( remote_repo: &RemoteRepository, @@ -64,8 +65,17 @@ async fn put_multipart_file( config: &retry::RetryConfig, ) -> Result { log::debug!("put_multipart_file {uri:?}, file_path {file_path:?}"); + + let client = { + let raw_url = remote_repo.url(); + let url: Url = raw_url.parse()?; + let Some(host) = url.host() else { + return Err(OxenError::NoHost(raw_url.into())); + }; + client::new_for_host_transfer(&host)? + }; + let url = api::endpoint::url_from_repo(remote_repo, uri)?; - let client = client::new_for_host_transfer(&url)?; let file_data = bytes::Bytes::from(tokio::fs::read(file_path).await?); diff --git a/oxen-rust/crates/lib/src/api/client/repositories.rs b/oxen-rust/crates/lib/src/api/client/repositories.rs index e03a258df..694ff8efb 100644 --- a/oxen-rust/crates/lib/src/api/client/repositories.rs +++ b/oxen-rust/crates/lib/src/api/client/repositories.rs @@ -13,6 +13,7 @@ use reqwest::multipart; use serde_json::json; use serde_json::value; use std::fmt; +use url::Url; const CLONE: &str = "clone"; const PUSH: &str = "push"; @@ -420,14 +421,17 @@ pub async fn transfer_namespace( match response { Ok(response) => { - // Update remote to reflect new namespace - let (scheme, host) = api::client::get_scheme_and_host_from_url(&url)?; + let parsed_url: Url = url.parse()?; + + let Some(host) = parsed_url.host() else { + return Err(OxenError::NoHost(url.into())); + }; let new_remote_url = api::endpoint::remote_url_from_namespace_name_scheme( - &host, + host.to_string().as_str(), &response.repository.namespace, &repository.name, - &scheme, + parsed_url.scheme(), ); let new_remote = Remote { url: new_remote_url, diff --git a/oxen-rust/crates/lib/src/api/client/versions.rs b/oxen-rust/crates/lib/src/api/client/versions.rs index 7a6e6e79d..9ea8dffae 100644 --- a/oxen-rust/crates/lib/src/api/client/versions.rs +++ b/oxen-rust/crates/lib/src/api/client/versions.rs @@ -24,6 +24,7 @@ use http::Method; use http::header::CONTENT_LENGTH; use tokio_tar::Archive; use tokio_util::codec::{BytesCodec, FramedRead}; +use url::Url; use std::collections::{HashMap, HashSet}; use std::io::{SeekFrom, Write}; @@ -298,7 +299,14 @@ async fn upload_chunks( max_retries: usize, progress: Option<&Arc>, ) -> Result>, OxenError> { - let client = Arc::new(api::client::new_for_host_transfer(remote_repo.url())?); + let client = { + let raw_url = remote_repo.url(); + let url: Url = raw_url.parse()?; + let Some(host) = url.host() else { + return Err(OxenError::NoHost(raw_url.into())); + }; + Arc::new(client::new_for_host_transfer(&host)?) + }; // Figure out how many parts we need to upload let file_size = upload.size; diff --git a/oxen-rust/crates/lib/src/api/client/workspaces/files.rs b/oxen-rust/crates/lib/src/api/client/workspaces/files.rs index 8620f5ca6..47668f646 100644 --- a/oxen-rust/crates/lib/src/api/client/workspaces/files.rs +++ b/oxen-rust/crates/lib/src/api/client/workspaces/files.rs @@ -21,6 +21,7 @@ use std::sync::Arc; use tokio::io::AsyncWriteExt; use tokio::sync::mpsc; use tokio::time::{Duration, sleep}; +use url::Url; use futures::stream; use tokio_stream::wrappers::ReceiverStream; @@ -447,8 +448,14 @@ pub(crate) async fn parallel_batched_small_file_upload( } } - // Create a client for uploading batches - let client = Arc::new(api::client::new_for_host_transfer(remote_repo.url())?); + let client = { + let raw_url = remote_repo.url(); + let url: Url = raw_url.parse()?; + let Some(host) = url.host() else { + return Err(OxenError::NoHost(raw_url.into())); + }; + Arc::new(client::new_for_host_transfer(&host)?) + }; // For individual files let err_files: Arc>> = Arc::new(Mutex::new(vec![])); diff --git a/oxen-rust/crates/lib/src/config/auth_config.rs b/oxen-rust/crates/lib/src/config/auth_config.rs index 5a29c33f3..5d1236e74 100644 --- a/oxen-rust/crates/lib/src/config/auth_config.rs +++ b/oxen-rust/crates/lib/src/config/auth_config.rs @@ -6,6 +6,7 @@ use std::collections::HashSet; use std::fs; use std::hash::{Hash, Hasher}; use std::path::Path; +use url::Host; pub const AUTH_CONFIG_FILENAME: &str = "auth_config.toml"; @@ -16,9 +17,9 @@ pub struct HostConfig { } impl HostConfig { - pub fn from_host(host: &str) -> HostConfig { + pub fn from_host(host: &Host<&str>) -> HostConfig { HostConfig { - host: String::from(host), + host: host.to_string(), auth_token: None, } } @@ -126,8 +127,7 @@ impl AuthConfig { }); } - pub fn auth_token_for_host>(&self, host: S) -> Option { - let host = host.as_ref(); + pub fn auth_token_for_host(&self, host: &Host<&str>) -> Option { if let Some(token) = self.host_configs.get(&HostConfig::from_host(host)) { if token.auth_token.is_none() { log::trace!("no auth_token found for host \"{}\"", token.host); @@ -142,6 +142,8 @@ impl AuthConfig { #[cfg(test)] mod tests { + use url::Host; + use crate::config::AuthConfig; use crate::error::OxenError; use crate::test; @@ -159,7 +161,7 @@ mod tests { assert_eq!(auth_config.host_configs.len(), og_num_configs + 1); assert_eq!( - auth_config.auth_token_for_host(host), + auth_config.auth_token_for_host(&Host::Domain(host)), Some(token_2.to_string()) ); diff --git a/oxen-rust/crates/lib/src/error.rs b/oxen-rust/crates/lib/src/error.rs index cfafd21d6..46c6a41ad 100644 --- a/oxen-rust/crates/lib/src/error.rs +++ b/oxen-rust/crates/lib/src/error.rs @@ -77,6 +77,7 @@ pub enum OxenError { ResourceNotFound(StringError), PathDoesNotExist(Box), ParsedResourceNotFound(Box), + NoHost(StringError), // Versioning MigrationRequired(StringError), @@ -149,6 +150,7 @@ impl fmt::Display for OxenError { OxenError::OxenUpdateRequired(err) | OxenError::Basic(err) | OxenError::ThumbnailingNotEnabled(err) => write!(f, "{err}"), + Self::NoHost(url) => write!(f, "No host in url: {url}"), OxenError::InvalidRepoName(name) => write!( f, "Invalid repository or namespace name '{name}'. Must match [a-zA-Z0-9][a-zA-Z0-9_.-]+" diff --git a/oxen-rust/crates/lib/src/model/repository/remote_repository.rs b/oxen-rust/crates/lib/src/model/repository/remote_repository.rs index 35c9e0a3d..13cc9b730 100644 --- a/oxen-rust/crates/lib/src/model/repository/remote_repository.rs +++ b/oxen-rust/crates/lib/src/model/repository/remote_repository.rs @@ -68,10 +68,12 @@ impl RemoteRepository { } /// Host of the remote repository - pub fn host(&self) -> String { - // parse it from the url - let uri = self.remote.url.parse::().unwrap(); - uri.host().unwrap().to_string() + pub fn host(&self) -> Result { + let uri = self.remote.url.parse::()?; + let Some(host) = uri.host() else { + return Err(OxenError::NoHost(self.remote.url.to_string().into())); + }; + Ok(host.to_string()) } /// Host of the remote repository diff --git a/oxen-rust/crates/lib/src/repositories/workspaces/upload.rs b/oxen-rust/crates/lib/src/repositories/workspaces/upload.rs index 5ba34f790..fc88651c1 100644 --- a/oxen-rust/crates/lib/src/repositories/workspaces/upload.rs +++ b/oxen-rust/crates/lib/src/repositories/workspaces/upload.rs @@ -70,7 +70,7 @@ mod tests { let opts = UploadOpts { paths: vec![file.to_path_buf()], dst: Path::new("").to_path_buf(), - host: remote_repo.host(), + host: remote_repo.host()?, scheme: remote_repo.scheme(), remote: remote_repo.name.clone(), branch: None, @@ -125,7 +125,7 @@ mod tests { let opts = UploadOpts { paths: vec![file.to_path_buf()], dst: Path::new("test").join("ing").join("data").to_path_buf(), - host: remote_repo.host(), + host: remote_repo.host()?, scheme: remote_repo.scheme(), remote: remote_repo.name.clone(), branch: None, @@ -192,7 +192,7 @@ mod tests { let opts = UploadOpts { paths: vec![file.to_path_buf()], dst: Path::new("").to_path_buf(), - host: remote_repo.host(), + host: remote_repo.host()?, scheme: remote_repo.scheme(), remote: remote_repo.name.clone(), branch: Some(branch_name.clone()), From 166e664241b432159feb572506c87f4fae671f6f Mon Sep 17 00:00:00 2001 From: Malcolm Greaves Date: Tue, 17 Mar 2026 21:51:20 +0530 Subject: [PATCH 3/5] improve test debugability --- oxen-rust/crates/lib/src/api/client/compare.rs | 17 ++++++++++------- oxen-rust/crates/lib/src/test.rs | 10 ++-------- 2 files changed, 12 insertions(+), 15 deletions(-) diff --git a/oxen-rust/crates/lib/src/api/client/compare.rs b/oxen-rust/crates/lib/src/api/client/compare.rs index 2520f9914..0752155cf 100644 --- a/oxen-rust/crates/lib/src/api/client/compare.rs +++ b/oxen-rust/crates/lib/src/api/client/compare.rs @@ -396,8 +396,7 @@ mod tests { let mut commit_ids = Vec::new(); // Create 5 commits with 5 new directories - let total_dirs = 5; - for i in 0..total_dirs { + for i in 0..5 { // Write a file let dir_path = format!("dir_{i}"); let file_path = PathBuf::from(dir_path).join(format!("file_{i}.txt")); @@ -429,12 +428,16 @@ mod tests { let results = api::client::compare::dir_tree(&remote_repo, &base_commit_id, &head_commit_id) .await?; - println!("results: {results:?}"); - assert_eq!(results.len(), 1); + assert_eq!(results.len(), 1, "results: {results:?}"); let first = results.first().unwrap(); - assert_eq!(first.name, PathBuf::from("")); - assert_eq!(first.status, DiffEntryStatus::Modified); - assert_eq!(first.children.len(), 2); + assert_eq!(first.name, PathBuf::from(""), "{:?}", first.name); + assert_eq!( + first.status, + DiffEntryStatus::Modified, + "{:?}", + first.status + ); + assert_eq!(first.children.len(), 2, "{:?}", first.children); Ok(remote_repo) }) .await diff --git a/oxen-rust/crates/lib/src/test.rs b/oxen-rust/crates/lib/src/test.rs index d53d41ddf..66b224979 100644 --- a/oxen-rust/crates/lib/src/test.rs +++ b/oxen-rust/crates/lib/src/test.rs @@ -330,19 +330,13 @@ where version_store.init().await?; log::info!(">>>>> run_empty_local_repo_test_async running test"); - let result = match test(repo).await { - Ok(_) => true, - Err(err) => { - eprintln!("Error running test. Err: {err}"); - false - } - }; + let result = test(repo).await; // Remove repo dir maybe_cleanup_repo(&repo_dir)?; // Assert everything okay after we cleanup the repo dir - assert!(result); + assert!(result.is_ok(), "Error running test: {result:?}"); Ok(()) } From df477b5041a3374076bfd9dbfb236b2839308d8d Mon Sep 17 00:00:00 2001 From: Malcolm Greaves Date: Tue, 17 Mar 2026 22:13:38 +0530 Subject: [PATCH 4/5] need to include port in host -- make new function & Hostname struct --- oxen-rust/crates/cli/src/cmd/clone.rs | 10 ++--- oxen-rust/crates/cli/src/helpers.rs | 9 ++-- oxen-rust/crates/lib/src/api/client.rs | 42 +++++++------------ .../crates/lib/src/api/client/commits.rs | 18 ++------ oxen-rust/crates/lib/src/api/client/file.rs | 10 ++--- .../crates/lib/src/api/client/repositories.rs | 12 ++---- .../crates/lib/src/api/client/versions.rs | 14 ++----- .../lib/src/api/client/workspaces/files.rs | 12 ++---- .../crates/lib/src/config/auth_config.rs | 9 ++-- .../src/model/repository/remote_repository.rs | 27 +++++------- .../lib/src/model/repository/repo_new.rs | 12 +++--- .../lib/src/repositories/workspaces/upload.rs | 6 +-- oxen-rust/crates/lib/src/util.rs | 2 +- .../crates/lib/src/util/internal_types.rs | 31 ++++++++++++++ 14 files changed, 97 insertions(+), 117 deletions(-) diff --git a/oxen-rust/crates/cli/src/cmd/clone.rs b/oxen-rust/crates/cli/src/cmd/clone.rs index 1fc9c9823..4c8e267ad 100644 --- a/oxen-rust/crates/cli/src/cmd/clone.rs +++ b/oxen-rust/crates/cli/src/cmd/clone.rs @@ -1,8 +1,8 @@ use async_trait::async_trait; use clap::{Arg, Command, arg}; use std::path::{Component, Path, PathBuf}; -use url::Url; +use liboxen::api; use liboxen::constants::DEFAULT_BRANCH_NAME; use liboxen::error::OxenError; use liboxen::opts::CloneOpts; @@ -168,11 +168,9 @@ impl RunCmd for CloneCmd { }; let (scheme, host) = { - let url: Url = opts.url.parse()?; - let Some(host) = url.host() else { - return Err(OxenError::NoHost(opts.url.into())); - }; - (url.scheme().to_string(), host.to_string()) + let hn = api::client::hostname_from_url_str(&opts.url)?; + let hostname = hn.hostname(); + (hn.scheme, hostname) }; // TODO: Do I need to worry about this for remote repo? diff --git a/oxen-rust/crates/cli/src/helpers.rs b/oxen-rust/crates/cli/src/helpers.rs index ba41a863b..0372f5a24 100644 --- a/oxen-rust/crates/cli/src/helpers.rs +++ b/oxen-rust/crates/cli/src/helpers.rs @@ -7,7 +7,6 @@ use liboxen::model::LocalRepository; use liboxen::util::oxen_version::OxenVersion; use colored::Colorize; -use url::Url; use std::collections::HashMap; use std::str::FromStr; @@ -33,11 +32,9 @@ pub fn get_scheme_and_host_from_repo( ) -> Result<(String, String), OxenError> { match repo.remote() { Some(remote) => { - let url: Url = remote.url.parse()?; - let Some(host) = url.host() else { - return Err(OxenError::NoHost(remote.url.into())); - }; - Ok((url.scheme().to_string(), host.to_string())) + let hn = api::client::hostname_from_url_str(&remote.url)?; + let hostname = hn.hostname(); + Ok((hn.scheme, hostname)) } None => get_scheme_and_host_or_default(), } diff --git a/oxen-rust/crates/lib/src/api/client.rs b/oxen-rust/crates/lib/src/api/client.rs index a79e4cf42..177050943 100644 --- a/oxen-rust/crates/lib/src/api/client.rs +++ b/oxen-rust/crates/lib/src/api/client.rs @@ -6,14 +6,13 @@ use crate::config::RuntimeConfig; use crate::config::runtime_config::runtime::Runtime; use crate::constants; use crate::error::OxenError; -use crate::error::StringError; +use crate::util::internal_types::Hostname; use crate::view::OxenResponse; use crate::view::http; pub use reqwest::Url; use reqwest::retry; use reqwest::{Client, ClientBuilder, header}; use std::time; -use url::Host; pub mod branches; pub mod commits; @@ -42,29 +41,31 @@ pub mod workspaces; const VERSION: &str = crate::constants::OXEN_VERSION; const USER_AGENT: &str = "Oxen"; +/// Parse a URL string into a `Hostname` (scheme + host + optional port). +pub fn hostname_from_url_str(url_str: &str) -> Result { + let url: url::Url = url_str.parse()?; + Hostname::from_url(&url) +} + // Note: reqwest::Client already maintains an internal HTTP connection pool with keep-alive. // The 3 upload paths that matter most (push, version chunks, workspace files) already share // clients via Arc. A global per-host cache (OnceCell + RwLock) would add // complexity for little gain since per-request client overhead for metadata/query paths is minimal. pub fn new_for_url(url: U) -> Result { let url = url.into_url()?; - let Some(host) = url.host() else { - return Err(OxenError::NoHost(StringError::new(url.to_string()))); - }; - new_for_host(&host, true) + let hn = Hostname::from_url(&url)?; + new_for_host(&hn.hostname(), true) } pub fn new_for_url_no_user_agent(url: U) -> Result { let url = url.into_url()?; - let Some(host) = url.host() else { - return Err(OxenError::NoHost(StringError::new(url.to_string()))); - }; - new_for_host(&host, false) + let hn = Hostname::from_url(&url)?; + new_for_host(&hn.hostname(), false) } /// Has connection timeout and TCP keep-alives, but also imposes a per-request timeout. /// NOT SUITABLE FOR LONG-LIVED TRANSFERS! Use `new_for_url_transfer` instead. -fn new_for_host(host: &Host<&str>, should_add_user_agent: bool) -> Result { +fn new_for_host(host: &str, should_add_user_agent: bool) -> Result { builder_for_host( host, should_add_user_agent, @@ -82,22 +83,11 @@ fn new_for_host(host: &Host<&str>, should_add_user_agent: bool) -> Result(url: U) -> Result { let url = url.into_url()?; - let Some(host) = url.host() else { - return Err(OxenError::NoHost(url.to_string().into())); - }; - - builder_for_host( - &host, - true, - time::Duration::from_secs(constants::connect_timeout()), - time::Duration::from_secs(constants::tcp_keepalive()), - time::Duration::from_secs(20), - )? - .build() - .map_err(OxenError::HTTP) + let hn = Hostname::from_url(&url)?; + new_for_host_transfer(&hn.hostname()) } -fn new_for_host_transfer(host: &Host<&str>) -> Result { +pub(crate) fn new_for_host_transfer(host: &str) -> Result { builder_for_host( host, true, @@ -110,7 +100,7 @@ fn new_for_host_transfer(host: &Host<&str>) -> Result { } fn builder_for_host( - host: &Host<&str>, + host: &str, should_add_user_agent: bool, connect_timeout: time::Duration, keep_alive_interval: time::Duration, diff --git a/oxen-rust/crates/lib/src/api/client/commits.rs b/oxen-rust/crates/lib/src/api/client/commits.rs index 5af44a986..42d2182bf 100644 --- a/oxen-rust/crates/lib/src/api/client/commits.rs +++ b/oxen-rust/crates/lib/src/api/client/commits.rs @@ -31,8 +31,6 @@ use flate2::write::GzEncoder; use futures_util::TryStreamExt; use http::header::CONTENT_LENGTH; use indicatif::{ProgressBar, ProgressStyle}; -use url::Url; - pub struct ChunkParams { pub chunk_num: usize, pub total_chunks: usize, @@ -721,12 +719,8 @@ pub async fn post_commit_dir_hashes_to_server( let quiet_bar = Arc::new(ProgressBar::hidden()); let client = { - let raw_url = remote_repo.url(); - let url: Url = raw_url.parse()?; - let Some(host) = url.host() else { - return Err(OxenError::NoHost(raw_url.into())); - }; - client::new_for_host_transfer(&host)? + let hn = crate::util::internal_types::Hostname::from_url(&remote_repo.url().parse()?)?; + client::new_for_host_transfer(&hn.hostname())? }; post_data_to_server_with_client( &client, @@ -774,12 +768,8 @@ pub async fn post_commits_dir_hashes_to_server( let quiet_bar = Arc::new(ProgressBar::hidden()); let client = { - let raw_url = remote_repo.url(); - let url: Url = raw_url.parse()?; - let Some(host) = url.host() else { - return Err(OxenError::NoHost(raw_url.into())); - }; - client::new_for_host_transfer(&host)? + let hn = crate::util::internal_types::Hostname::from_url(&remote_repo.url().parse()?)?; + client::new_for_host_transfer(&hn.hostname())? }; post_data_to_server_with_client( &client, diff --git a/oxen-rust/crates/lib/src/api/client/file.rs b/oxen-rust/crates/lib/src/api/client/file.rs index 7c94f70af..de87b8e6a 100644 --- a/oxen-rust/crates/lib/src/api/client/file.rs +++ b/oxen-rust/crates/lib/src/api/client/file.rs @@ -3,6 +3,7 @@ use crate::api::client::retry; use crate::error::OxenError; use crate::model::RemoteRepository; use crate::model::commit::NewCommitBody; +use crate::util::internal_types::Hostname; use crate::view::CommitResponse; use crate::{api, util::internal_types::HasLen}; @@ -10,7 +11,6 @@ use bytes::{Bytes, BytesMut}; use futures_util::StreamExt; use reqwest::multipart::{Form, Part}; use std::path::Path; -use url::Url; pub async fn put_file( remote_repo: &RemoteRepository, @@ -67,12 +67,8 @@ async fn put_multipart_file( log::debug!("put_multipart_file {uri:?}, file_path {file_path:?}"); let client = { - let raw_url = remote_repo.url(); - let url: Url = raw_url.parse()?; - let Some(host) = url.host() else { - return Err(OxenError::NoHost(raw_url.into())); - }; - client::new_for_host_transfer(&host)? + let hn = Hostname::from_url(&remote_repo.url().parse()?)?; + client::new_for_host_transfer(&hn.hostname())? }; let url = api::endpoint::url_from_repo(remote_repo, uri)?; diff --git a/oxen-rust/crates/lib/src/api/client/repositories.rs b/oxen-rust/crates/lib/src/api/client/repositories.rs index 694ff8efb..ef8ddc5f2 100644 --- a/oxen-rust/crates/lib/src/api/client/repositories.rs +++ b/oxen-rust/crates/lib/src/api/client/repositories.rs @@ -5,6 +5,7 @@ use crate::error::OxenError; use crate::model::file::{FileContents, FileNew}; use crate::model::{Branch, LocalRepository, Remote, RemoteRepository, RepoNew}; use crate::repositories; +use crate::util::internal_types::Hostname; use crate::view::repository::{ RepositoryCreationResponse, RepositoryDataTypesResponse, RepositoryDataTypesView, }; @@ -13,7 +14,6 @@ use reqwest::multipart; use serde_json::json; use serde_json::value; use std::fmt; -use url::Url; const CLONE: &str = "clone"; const PUSH: &str = "push"; @@ -421,17 +421,13 @@ pub async fn transfer_namespace( match response { Ok(response) => { - let parsed_url: Url = url.parse()?; - - let Some(host) = parsed_url.host() else { - return Err(OxenError::NoHost(url.into())); - }; + let hn = Hostname::from_url(&url.parse()?)?; let new_remote_url = api::endpoint::remote_url_from_namespace_name_scheme( - host.to_string().as_str(), + &hn.hostname(), &response.repository.namespace, &repository.name, - parsed_url.scheme(), + &hn.scheme, ); let new_remote = Remote { url: new_remote_url, diff --git a/oxen-rust/crates/lib/src/api/client/versions.rs b/oxen-rust/crates/lib/src/api/client/versions.rs index 9ea8dffae..c6b36a9c1 100644 --- a/oxen-rust/crates/lib/src/api/client/versions.rs +++ b/oxen-rust/crates/lib/src/api/client/versions.rs @@ -22,10 +22,6 @@ use futures_util::StreamExt; use futures_util::stream::FuturesUnordered; use http::Method; use http::header::CONTENT_LENGTH; -use tokio_tar::Archive; -use tokio_util::codec::{BytesCodec, FramedRead}; -use url::Url; - use std::collections::{HashMap, HashSet}; use std::io::{SeekFrom, Write}; use std::path::{Path, PathBuf}; @@ -35,6 +31,8 @@ use tokio::fs::OpenOptions; use tokio::io::{AsyncReadExt, AsyncSeekExt}; use tokio::sync::Semaphore; use tokio::time::sleep; +use tokio_tar::Archive; +use tokio_util::codec::{BytesCodec, FramedRead}; use crate::repositories; @@ -300,12 +298,8 @@ async fn upload_chunks( progress: Option<&Arc>, ) -> Result>, OxenError> { let client = { - let raw_url = remote_repo.url(); - let url: Url = raw_url.parse()?; - let Some(host) = url.host() else { - return Err(OxenError::NoHost(raw_url.into())); - }; - Arc::new(client::new_for_host_transfer(&host)?) + let hn = crate::util::internal_types::Hostname::from_url(&remote_repo.url().parse()?)?; + Arc::new(client::new_for_host_transfer(&hn.hostname())?) }; // Figure out how many parts we need to upload diff --git a/oxen-rust/crates/lib/src/api/client/workspaces/files.rs b/oxen-rust/crates/lib/src/api/client/workspaces/files.rs index 47668f646..471f8f465 100644 --- a/oxen-rust/crates/lib/src/api/client/workspaces/files.rs +++ b/oxen-rust/crates/lib/src/api/client/workspaces/files.rs @@ -13,6 +13,7 @@ use bytesize::ByteSize; use futures_util::StreamExt; use glob_match::glob_match; +use futures::stream; use parking_lot::Mutex; use rand::{Rng, thread_rng}; use std::collections::HashSet; @@ -21,9 +22,6 @@ use std::sync::Arc; use tokio::io::AsyncWriteExt; use tokio::sync::mpsc; use tokio::time::{Duration, sleep}; -use url::Url; - -use futures::stream; use tokio_stream::wrappers::ReceiverStream; use crate::util::hasher; @@ -449,12 +447,8 @@ pub(crate) async fn parallel_batched_small_file_upload( } let client = { - let raw_url = remote_repo.url(); - let url: Url = raw_url.parse()?; - let Some(host) = url.host() else { - return Err(OxenError::NoHost(raw_url.into())); - }; - Arc::new(client::new_for_host_transfer(&host)?) + let hn = crate::util::internal_types::Hostname::from_url(&remote_repo.url().parse()?)?; + Arc::new(client::new_for_host_transfer(&hn.hostname())?) }; // For individual files diff --git a/oxen-rust/crates/lib/src/config/auth_config.rs b/oxen-rust/crates/lib/src/config/auth_config.rs index 5d1236e74..08b11a507 100644 --- a/oxen-rust/crates/lib/src/config/auth_config.rs +++ b/oxen-rust/crates/lib/src/config/auth_config.rs @@ -6,7 +6,6 @@ use std::collections::HashSet; use std::fs; use std::hash::{Hash, Hasher}; use std::path::Path; -use url::Host; pub const AUTH_CONFIG_FILENAME: &str = "auth_config.toml"; @@ -17,7 +16,7 @@ pub struct HostConfig { } impl HostConfig { - pub fn from_host(host: &Host<&str>) -> HostConfig { + pub fn from_host(host: &str) -> HostConfig { HostConfig { host: host.to_string(), auth_token: None, @@ -127,7 +126,7 @@ impl AuthConfig { }); } - pub fn auth_token_for_host(&self, host: &Host<&str>) -> Option { + pub fn auth_token_for_host(&self, host: &str) -> Option { if let Some(token) = self.host_configs.get(&HostConfig::from_host(host)) { if token.auth_token.is_none() { log::trace!("no auth_token found for host \"{}\"", token.host); @@ -142,8 +141,6 @@ impl AuthConfig { #[cfg(test)] mod tests { - use url::Host; - use crate::config::AuthConfig; use crate::error::OxenError; use crate::test; @@ -161,7 +158,7 @@ mod tests { assert_eq!(auth_config.host_configs.len(), og_num_configs + 1); assert_eq!( - auth_config.auth_token_for_host(&Host::Domain(host)), + auth_config.auth_token_for_host(host), Some(token_2.to_string()) ); diff --git a/oxen-rust/crates/lib/src/model/repository/remote_repository.rs b/oxen-rust/crates/lib/src/model/repository/remote_repository.rs index 13cc9b730..de0fe650e 100644 --- a/oxen-rust/crates/lib/src/model/repository/remote_repository.rs +++ b/oxen-rust/crates/lib/src/model/repository/remote_repository.rs @@ -1,9 +1,9 @@ use crate::api; use crate::core::versions::MinOxenVersion; +use crate::util::internal_types::Hostname; use crate::view::RepositoryView; use crate::view::repository::{RepositoryCreationView, RepositoryDataTypesView}; use crate::{error::OxenError, model::Remote}; -use http::Uri; use serde::{Deserialize, Serialize}; #[derive(Deserialize, Serialize, Debug, Clone)] @@ -67,27 +67,22 @@ impl RemoteRepository { &self.remote.url } - /// Host of the remote repository + /// Host of the remote repository (includes port if non-default) pub fn host(&self) -> Result { - let uri = self.remote.url.parse::()?; - let Some(host) = uri.host() else { - return Err(OxenError::NoHost(self.remote.url.to_string().into())); - }; - Ok(host.to_string()) + let hn = Hostname::from_url(&self.remote.url.parse()?)?; + Ok(hn.hostname()) } - /// Host of the remote repository - pub fn port(&self) -> String { - // parse it from the url - let uri = self.remote.url.parse::().unwrap(); - uri.port().unwrap().to_string() + /// Port of the remote repository + pub fn port(&self) -> Result, OxenError> { + let hn = Hostname::from_url(&self.remote.url.parse()?)?; + Ok(hn.port) } /// Scheme of the remote repository - pub fn scheme(&self) -> String { - // parse it from the url - let uri = self.remote.url.parse::().unwrap(); - uri.scheme().unwrap().to_string() + pub fn scheme(&self) -> Result { + let hn = Hostname::from_url(&self.remote.url.parse()?)?; + Ok(hn.scheme) } /// Underlying api url for the remote repository diff --git a/oxen-rust/crates/lib/src/model/repository/repo_new.rs b/oxen-rust/crates/lib/src/model/repository/repo_new.rs index c6e05c960..b0e460a88 100644 --- a/oxen-rust/crates/lib/src/model/repository/repo_new.rs +++ b/oxen-rust/crates/lib/src/model/repository/repo_new.rs @@ -1,4 +1,3 @@ -use http::Uri; use serde::{Deserialize, Serialize}; use utoipa::ToSchema; @@ -7,6 +6,7 @@ use crate::error::OxenError; use crate::model::commit::Commit; use crate::model::file::FileNew; use crate::opts::StorageOpts; +use crate::util::internal_types::Hostname; #[derive(Deserialize, Serialize, Debug, Clone, ToSchema)] pub struct RepoNew { @@ -167,13 +167,15 @@ impl RepoNew { } pub fn from_url(url: &str) -> Result { - let uri = url.parse::()?; - let mut split_path: Vec<&str> = uri.path().split('/').collect(); + let parsed: url::Url = url.parse()?; + let mut split_path: Vec<&str> = parsed.path().split('/').collect(); if split_path.len() < 3 { return Err(OxenError::basic_str("Invalid repo url")); } + let hn = Hostname::from_url(&parsed)?; + // Pop in reverse to get repo_name then namespace let repo_name = split_path.pop().unwrap(); let namespace = split_path.pop().unwrap(); @@ -181,8 +183,8 @@ impl RepoNew { namespace: namespace.to_string(), name: repo_name.to_string(), is_public: None, - host: Some(uri.host().unwrap().to_string()), - scheme: Some(uri.scheme().unwrap().to_string()), + host: Some(hn.hostname()), + scheme: Some(hn.scheme), root_commit: None, description: None, files: None, diff --git a/oxen-rust/crates/lib/src/repositories/workspaces/upload.rs b/oxen-rust/crates/lib/src/repositories/workspaces/upload.rs index fc88651c1..434ac29b9 100644 --- a/oxen-rust/crates/lib/src/repositories/workspaces/upload.rs +++ b/oxen-rust/crates/lib/src/repositories/workspaces/upload.rs @@ -71,7 +71,7 @@ mod tests { paths: vec![file.to_path_buf()], dst: Path::new("").to_path_buf(), host: remote_repo.host()?, - scheme: remote_repo.scheme(), + scheme: remote_repo.scheme()?, remote: remote_repo.name.clone(), branch: None, message: "adding new file".to_string(), @@ -126,7 +126,7 @@ mod tests { paths: vec![file.to_path_buf()], dst: Path::new("test").join("ing").join("data").to_path_buf(), host: remote_repo.host()?, - scheme: remote_repo.scheme(), + scheme: remote_repo.scheme()?, remote: remote_repo.name.clone(), branch: None, message: "adding new file".to_string(), @@ -193,7 +193,7 @@ mod tests { paths: vec![file.to_path_buf()], dst: Path::new("").to_path_buf(), host: remote_repo.host()?, - scheme: remote_repo.scheme(), + scheme: remote_repo.scheme()?, remote: remote_repo.name.clone(), branch: Some(branch_name.clone()), message: "adding new file".to_string(), diff --git a/oxen-rust/crates/lib/src/util.rs b/oxen-rust/crates/lib/src/util.rs index 34037de58..992f433cf 100644 --- a/oxen-rust/crates/lib/src/util.rs +++ b/oxen-rust/crates/lib/src/util.rs @@ -6,7 +6,7 @@ pub mod fs; pub mod glob; pub mod hasher; pub mod image; -pub(crate) mod internal_types; +pub mod internal_types; pub mod logging; pub mod oxen_version; pub mod paginate; diff --git a/oxen-rust/crates/lib/src/util/internal_types.rs b/oxen-rust/crates/lib/src/util/internal_types.rs index bdbf5e9ec..e89653463 100644 --- a/oxen-rust/crates/lib/src/util/internal_types.rs +++ b/oxen-rust/crates/lib/src/util/internal_types.rs @@ -1,4 +1,35 @@ +use crate::error::OxenError; use std::collections::{HashMap, HashSet}; +use url::Url; + +/// Parsed URL components with port-aware hostname. +pub struct Hostname { + pub host: String, + pub port: Option, + pub scheme: String, +} + +impl Hostname { + /// Returns `host:port` when a port is present, otherwise just `host`. + pub fn hostname(&self) -> String { + match self.port { + Some(port) => format!("{}:{}", self.host, port), + None => self.host.clone(), + } + } + + /// Extract scheme, host, and port from a `Url`. + pub fn from_url(url: &Url) -> Result { + let Some(host) = url.host_str() else { + return Err(OxenError::NoHost(url.to_string().into())); + }; + Ok(Self { + host: host.to_string(), + port: url.port(), + scheme: url.scheme().to_string(), + }) + } +} /// Indicates that the type has a length. pub(crate) trait HasLen { From 408be7ab2f1d9426cf7addf044bbb26adc73226f Mon Sep 17 00:00:00 2001 From: Malcolm Greaves Date: Tue, 17 Mar 2026 22:44:07 +0530 Subject: [PATCH 5/5] move Hostname struct --- oxen-rust/crates/lib/src/api/client.rs | 36 +++++++++++++++++-- .../crates/lib/src/api/client/commits.rs | 6 ++-- oxen-rust/crates/lib/src/api/client/file.rs | 2 +- .../crates/lib/src/api/client/repositories.rs | 2 +- .../crates/lib/src/api/client/versions.rs | 4 +-- .../lib/src/api/client/workspaces/files.rs | 4 +-- .../src/model/repository/remote_repository.rs | 2 +- .../lib/src/model/repository/repo_new.rs | 2 +- .../crates/lib/src/util/internal_types.rs | 31 ---------------- 9 files changed, 44 insertions(+), 45 deletions(-) diff --git a/oxen-rust/crates/lib/src/api/client.rs b/oxen-rust/crates/lib/src/api/client.rs index 177050943..a2eb32b69 100644 --- a/oxen-rust/crates/lib/src/api/client.rs +++ b/oxen-rust/crates/lib/src/api/client.rs @@ -6,11 +6,10 @@ use crate::config::RuntimeConfig; use crate::config::runtime_config::runtime::Runtime; use crate::constants; use crate::error::OxenError; -use crate::util::internal_types::Hostname; use crate::view::OxenResponse; use crate::view::http; +use reqwest::IntoUrl; pub use reqwest::Url; -use reqwest::retry; use reqwest::{Client, ClientBuilder, header}; use std::time; @@ -41,6 +40,35 @@ pub mod workspaces; const VERSION: &str = crate::constants::OXEN_VERSION; const USER_AGENT: &str = "Oxen"; +/// Parsed URL components with port-aware hostname. +pub struct Hostname { + pub host: String, + pub port: Option, + pub scheme: String, +} + +impl Hostname { + /// Returns `host:port` when a port is present, otherwise just `host`. + pub fn hostname(&self) -> String { + match self.port { + Some(port) => format!("{}:{}", self.host, port), + None => self.host.clone(), + } + } + + /// Extract scheme, host, and port from a `Url`. + pub fn from_url(url: &Url) -> Result { + let Some(host) = url.host_str() else { + return Err(OxenError::NoHost(url.to_string().into())); + }; + Ok(Self { + host: host.to_string(), + port: url.port(), + scheme: url.scheme().to_string(), + }) + } +} + /// Parse a URL string into a `Hostname` (scheme + host + optional port). pub fn hostname_from_url_str(url_str: &str) -> Result { let url: url::Url = url_str.parse()?; @@ -163,7 +191,9 @@ fn build_user_agent(config: &RuntimeConfig) -> Result { config.runtime_version ), }; - Ok(format!("{USER_AGENT}/{VERSION} ({host_platform}; {runtime_name})")) + Ok(format!( + "{USER_AGENT}/{VERSION} ({host_platform}; {runtime_name})" + )) } /// Performs an extra parse to validate that the response is success diff --git a/oxen-rust/crates/lib/src/api/client/commits.rs b/oxen-rust/crates/lib/src/api/client/commits.rs index 42d2182bf..c6d5cb8ab 100644 --- a/oxen-rust/crates/lib/src/api/client/commits.rs +++ b/oxen-rust/crates/lib/src/api/client/commits.rs @@ -1,4 +1,4 @@ -use crate::api::client; +use crate::api::client::{self, Hostname}; use crate::constants::{DEFAULT_PAGE_NUM, DIR_HASHES_DIR, DIRS_DIR, HISTORY_DIR}; use crate::error::OxenError; @@ -719,7 +719,7 @@ pub async fn post_commit_dir_hashes_to_server( let quiet_bar = Arc::new(ProgressBar::hidden()); let client = { - let hn = crate::util::internal_types::Hostname::from_url(&remote_repo.url().parse()?)?; + let hn = Hostname::from_url(&remote_repo.url().parse()?)?; client::new_for_host_transfer(&hn.hostname())? }; post_data_to_server_with_client( @@ -768,7 +768,7 @@ pub async fn post_commits_dir_hashes_to_server( let quiet_bar = Arc::new(ProgressBar::hidden()); let client = { - let hn = crate::util::internal_types::Hostname::from_url(&remote_repo.url().parse()?)?; + let hn = Hostname::from_url(&remote_repo.url().parse()?)?; client::new_for_host_transfer(&hn.hostname())? }; post_data_to_server_with_client( diff --git a/oxen-rust/crates/lib/src/api/client/file.rs b/oxen-rust/crates/lib/src/api/client/file.rs index de87b8e6a..adc2009ce 100644 --- a/oxen-rust/crates/lib/src/api/client/file.rs +++ b/oxen-rust/crates/lib/src/api/client/file.rs @@ -1,9 +1,9 @@ use crate::api::client; +use crate::api::client::Hostname; use crate::api::client::retry; use crate::error::OxenError; use crate::model::RemoteRepository; use crate::model::commit::NewCommitBody; -use crate::util::internal_types::Hostname; use crate::view::CommitResponse; use crate::{api, util::internal_types::HasLen}; diff --git a/oxen-rust/crates/lib/src/api/client/repositories.rs b/oxen-rust/crates/lib/src/api/client/repositories.rs index ef8ddc5f2..5f118ece3 100644 --- a/oxen-rust/crates/lib/src/api/client/repositories.rs +++ b/oxen-rust/crates/lib/src/api/client/repositories.rs @@ -1,11 +1,11 @@ use crate::api; use crate::api::client; +use crate::api::client::Hostname; use crate::constants::{DEFAULT_HOST, DEFAULT_REMOTE_NAME, DEFAULT_SCHEME}; use crate::error::OxenError; use crate::model::file::{FileContents, FileNew}; use crate::model::{Branch, LocalRepository, Remote, RemoteRepository, RepoNew}; use crate::repositories; -use crate::util::internal_types::Hostname; use crate::view::repository::{ RepositoryCreationResponse, RepositoryDataTypesResponse, RepositoryDataTypesView, }; diff --git a/oxen-rust/crates/lib/src/api/client/versions.rs b/oxen-rust/crates/lib/src/api/client/versions.rs index c6b36a9c1..cbcaf4d59 100644 --- a/oxen-rust/crates/lib/src/api/client/versions.rs +++ b/oxen-rust/crates/lib/src/api/client/versions.rs @@ -1,7 +1,7 @@ use crate::api; -use crate::api::client; use crate::api::client::internal_types::LocalOrBase; use crate::api::client::retry; +use crate::api::client::{self, Hostname}; use crate::constants::{AVG_CHUNK_SIZE, max_retries}; use crate::error::OxenError; use crate::model::entry::commit_entry::Entry; @@ -298,7 +298,7 @@ async fn upload_chunks( progress: Option<&Arc>, ) -> Result>, OxenError> { let client = { - let hn = crate::util::internal_types::Hostname::from_url(&remote_repo.url().parse()?)?; + let hn = Hostname::from_url(&remote_repo.url().parse()?)?; Arc::new(client::new_for_host_transfer(&hn.hostname())?) }; diff --git a/oxen-rust/crates/lib/src/api/client/workspaces/files.rs b/oxen-rust/crates/lib/src/api/client/workspaces/files.rs index 471f8f465..554d3cb97 100644 --- a/oxen-rust/crates/lib/src/api/client/workspaces/files.rs +++ b/oxen-rust/crates/lib/src/api/client/workspaces/files.rs @@ -1,5 +1,5 @@ -use crate::api::client; use crate::api::client::internal_types::LocalOrBase; +use crate::api::client::{self, Hostname}; use crate::constants::{chunk_size, max_retries}; use crate::core::progress::push_progress::PushProgress; use crate::error::OxenError; @@ -447,7 +447,7 @@ pub(crate) async fn parallel_batched_small_file_upload( } let client = { - let hn = crate::util::internal_types::Hostname::from_url(&remote_repo.url().parse()?)?; + let hn = Hostname::from_url(&remote_repo.url().parse()?)?; Arc::new(client::new_for_host_transfer(&hn.hostname())?) }; diff --git a/oxen-rust/crates/lib/src/model/repository/remote_repository.rs b/oxen-rust/crates/lib/src/model/repository/remote_repository.rs index de0fe650e..308d900ae 100644 --- a/oxen-rust/crates/lib/src/model/repository/remote_repository.rs +++ b/oxen-rust/crates/lib/src/model/repository/remote_repository.rs @@ -1,6 +1,6 @@ use crate::api; +use crate::api::client::Hostname; use crate::core::versions::MinOxenVersion; -use crate::util::internal_types::Hostname; use crate::view::RepositoryView; use crate::view::repository::{RepositoryCreationView, RepositoryDataTypesView}; use crate::{error::OxenError, model::Remote}; diff --git a/oxen-rust/crates/lib/src/model/repository/repo_new.rs b/oxen-rust/crates/lib/src/model/repository/repo_new.rs index b0e460a88..64f89506f 100644 --- a/oxen-rust/crates/lib/src/model/repository/repo_new.rs +++ b/oxen-rust/crates/lib/src/model/repository/repo_new.rs @@ -1,12 +1,12 @@ use serde::{Deserialize, Serialize}; use utoipa::ToSchema; +use crate::api::client::Hostname; use crate::constants::DEFAULT_HOST; use crate::error::OxenError; use crate::model::commit::Commit; use crate::model::file::FileNew; use crate::opts::StorageOpts; -use crate::util::internal_types::Hostname; #[derive(Deserialize, Serialize, Debug, Clone, ToSchema)] pub struct RepoNew { diff --git a/oxen-rust/crates/lib/src/util/internal_types.rs b/oxen-rust/crates/lib/src/util/internal_types.rs index e89653463..bdbf5e9ec 100644 --- a/oxen-rust/crates/lib/src/util/internal_types.rs +++ b/oxen-rust/crates/lib/src/util/internal_types.rs @@ -1,35 +1,4 @@ -use crate::error::OxenError; use std::collections::{HashMap, HashSet}; -use url::Url; - -/// Parsed URL components with port-aware hostname. -pub struct Hostname { - pub host: String, - pub port: Option, - pub scheme: String, -} - -impl Hostname { - /// Returns `host:port` when a port is present, otherwise just `host`. - pub fn hostname(&self) -> String { - match self.port { - Some(port) => format!("{}:{}", self.host, port), - None => self.host.clone(), - } - } - - /// Extract scheme, host, and port from a `Url`. - pub fn from_url(url: &Url) -> Result { - let Some(host) = url.host_str() else { - return Err(OxenError::NoHost(url.to_string().into())); - }; - Ok(Self { - host: host.to_string(), - port: url.port(), - scheme: url.scheme().to_string(), - }) - } -} /// Indicates that the type has a length. pub(crate) trait HasLen {