Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions oxen-rust/crates/cli/src/cmd/clone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,14 @@ impl RunCmd for CloneCmd {
is_remote,
};

let (scheme, host) = api::client::get_scheme_and_host_from_url(&opts.url)?;
let (scheme, host) = {
let hn = api::client::hostname_from_url_str(&opts.url)?;
let hostname = hn.hostname();
(hn.scheme, hostname)
};

// TODO: Do I need to worry about this for remote repo?
check_remote_version_blocking(scheme.clone(), host.clone()).await?;
check_remote_version_blocking(&scheme, &host).await?;
check_remote_version(scheme, host).await?;

repositories::clone(&opts).await?;
Expand Down
12 changes: 7 additions & 5 deletions oxen-rust/crates/cli/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@ pub fn get_scheme_and_host_or_default() -> Result<(String, String), OxenError> {
pub fn get_scheme_and_host_from_repo(
repo: &LocalRepository,
) -> Result<(String, String), OxenError> {
if let Some(remote) = repo.remote() {
let host_and_scheme = api::client::get_scheme_and_host_from_url(&remote.url)?;
return Ok(host_and_scheme);
match repo.remote() {
Some(remote) => {
let hn = api::client::hostname_from_url_str(&remote.url)?;
let hostname = hn.hostname();
Ok((hn.scheme, hostname))
}
None => get_scheme_and_host_or_default(),
}

get_scheme_and_host_or_default()
}

pub async fn check_remote_version(
Expand Down
166 changes: 100 additions & 66 deletions oxen-rust/crates/lib/src/api/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@ use crate::config::RuntimeConfig;
use crate::config::runtime_config::runtime::Runtime;
use crate::constants;
use crate::error::OxenError;
use crate::model::RemoteRepository;
use crate::view::OxenResponse;
use crate::view::http;
use reqwest::IntoUrl;
pub use reqwest::Url;
use reqwest::retry;
use reqwest::{Client, ClientBuilder, header};
use std::time;

Expand All @@ -30,6 +29,7 @@ pub mod metadata;
pub mod oxen_version;
pub mod prune;
pub mod repositories;
pub mod retry;
pub mod revisions;
pub mod schemas;
pub mod stats;
Expand All @@ -40,116 +40,148 @@ pub mod workspaces;
const VERSION: &str = crate::constants::OXEN_VERSION;
const USER_AGENT: &str = "Oxen";

pub fn get_scheme_and_host_from_url(url: &str) -> Result<(String, String), OxenError> {
let parsed_url = Url::parse(url)?;
let mut host_str = parsed_url.host_str().unwrap_or_default().to_string();
if let Some(port) = parsed_url.port() {
host_str = format!("{host_str}:{port}");
/// Parsed URL components with port-aware hostname.
pub struct Hostname {
pub host: String,
pub port: Option<u16>,
pub scheme: String,
}

impl Hostname {
/// Returns `host:port` when a port is present, otherwise just `host`.
pub fn hostname(&self) -> String {
match self.port {
Some(port) => format!("{}:{}", self.host, port),
None => self.host.clone(),
}
}

/// Extract scheme, host, and port from a `Url`.
pub fn from_url(url: &Url) -> Result<Self, OxenError> {
let Some(host) = url.host_str() else {
return Err(OxenError::NoHost(url.to_string().into()));
};
Ok(Self {
host: host.to_string(),
port: url.port(),
scheme: url.scheme().to_string(),
})
}
Ok((parsed_url.scheme().to_owned(), host_str))
}

// TODO: we probably want to create a pool of clients instead of constructing a
// new one for each request so we can take advantage of keep-alive
pub fn new_for_url(url: &str) -> Result<Client, OxenError> {
let (_scheme, host) = get_scheme_and_host_from_url(url)?;
new_for_host(host, true)
/// Parse a URL string into a `Hostname` (scheme + host + optional port).
pub fn hostname_from_url_str(url_str: &str) -> Result<Hostname, OxenError> {
let url: url::Url = url_str.parse()?;
Hostname::from_url(&url)
}

pub fn new_for_url_no_user_agent(url: &str) -> Result<Client, OxenError> {
let (_scheme, host) = get_scheme_and_host_from_url(url)?;
new_for_host(host, false)
// Note: reqwest::Client already maintains an internal HTTP connection pool with keep-alive.
// The 3 upload paths that matter most (push, version chunks, workspace files) already share
// clients via Arc<Client>. A global per-host cache (OnceCell + RwLock<HashMap>) would add
// complexity for little gain since per-request client overhead for metadata/query paths is minimal.
pub fn new_for_url<U: IntoUrl>(url: U) -> Result<Client, OxenError> {
let url = url.into_url()?;
let hn = Hostname::from_url(&url)?;
new_for_host(&hn.hostname(), true)
}

fn new_for_host(host: String, should_add_user_agent: bool) -> Result<Client, OxenError> {
match builder_for_host(host, should_add_user_agent)?
.timeout(time::Duration::from_secs(constants::timeout()))
.build()
{
Ok(client) => Ok(client),
Err(reqwest_err) => Err(OxenError::HTTP(reqwest_err)),
}
pub fn new_for_url_no_user_agent<U: IntoUrl>(url: U) -> Result<Client, OxenError> {
let url = url.into_url()?;
let hn = Hostname::from_url(&url)?;
new_for_host(&hn.hostname(), false)
}

pub fn new_for_remote_repo(remote_repo: &RemoteRepository) -> Result<Client, OxenError> {
let (_scheme, host) = get_scheme_and_host_from_url(remote_repo.url())?;
new_for_host(host, true)
/// Has connection timeout and TCP keep-alives, but also imposes a per-request timeout.
/// NOT SUITABLE FOR LONG-LIVED TRANSFERS! Use `new_for_url_transfer` instead.
fn new_for_host(host: &str, should_add_user_agent: bool) -> Result<Client, OxenError> {
builder_for_host(
host,
should_add_user_agent,
time::Duration::from_secs(constants::connect_timeout()),
time::Duration::from_secs(constants::tcp_keepalive()),
time::Duration::from_secs(20),
)?
.timeout(time::Duration::from_secs(constants::timeout()))
.build()
.map_err(OxenError::HTTP)
}

pub fn builder_for_remote_repo(remote_repo: &RemoteRepository) -> Result<ClientBuilder, OxenError> {
let (_scheme, host) = get_scheme_and_host_from_url(remote_repo.url())?;
builder_for_host(host, true)
/// Create a client for long-lived transfers (uploads/downloads).
/// No overall timeout; uses connect_timeout + tcp_keepalive + HTTP/2 keep-alive
/// to detect hung connections without capping total transfer time.
pub fn new_for_url_transfer<U: IntoUrl>(url: U) -> Result<Client, OxenError> {
let url = url.into_url()?;
let hn = Hostname::from_url(&url)?;
new_for_host_transfer(&hn.hostname())
}

pub fn builder_for_url(url: &str) -> Result<ClientBuilder, OxenError> {
let (_scheme, host) = get_scheme_and_host_from_url(url)?;
builder_for_host(host, true)
pub(crate) fn new_for_host_transfer(host: &str) -> Result<Client, OxenError> {
builder_for_host(
host,
true,
time::Duration::from_secs(constants::connect_timeout()),
time::Duration::from_secs(constants::tcp_keepalive()),
time::Duration::from_secs(20),
)?
.build()
.map_err(OxenError::HTTP)
}

fn builder_for_host(host: String, should_add_user_agent: bool) -> Result<ClientBuilder, OxenError> {
fn builder_for_host(
host: &str,
should_add_user_agent: bool,
connect_timeout: time::Duration,
keep_alive_interval: time::Duration,
http2_keep_alive_timeout: time::Duration,
) -> Result<ClientBuilder, OxenError> {
let mut builder = Client::builder();

if should_add_user_agent {
let config = RuntimeConfig::get()?;
builder = builder.user_agent(build_user_agent(&config));
let user_agent = build_user_agent(&config)?;
builder = builder.user_agent(user_agent);
}

// Bump max retries for this oxen-server host from 2 to 3. Exponential backoff is used by default.
let retry_policy = retry::for_host(host.clone())
.max_retries_per_request(3)
.classify_fn(|req_rep| {
// Still retry on low-level network errors
if req_rep.error().is_some() {
return req_rep.retryable();
}
// Have reqwest retry all application-level server errors*, not just network-level errors
// that reqwest considers retryable by default. This assumes that oxen-server endpoints are
// safe to retry if the server returned any error mid-operation. We can tighten this up
// to only retry specific server errors in the future if that is not true.
//
// * info (100's), success (200's), redirection (300's), and client errors (400's)
// don't make sense to retry. We'll only retry server errors (500's).
match req_rep.status() {
Some(status_code) if status_code.is_server_error() => req_rep.retryable(), // retry
_ => req_rep.success(), // this means don't retry, and is the only other valid return value from the closure
}
});
builder = builder.retry(retry_policy);
builder = builder
.connect_timeout(connect_timeout)
.tcp_keepalive(keep_alive_interval)
.http2_keep_alive_interval(keep_alive_interval)
.http2_keep_alive_timeout(http2_keep_alive_timeout);

// If auth_config.toml isn't found, return without authorizing
let config = match AuthConfig::get() {
Ok(config) => config,
Err(e) => {
log::debug!(
"Error getting config: {}. No auth token found for host {}",
e,
host
);
log::debug!("Error getting config: {e}. No auth configuration found!");
return Ok(builder);
}
};
if let Some(auth_token) = config.auth_token_for_host(host.as_str()) {

if let Some(auth_token) = config.auth_token_for_host(host) {
log::debug!("Setting auth token for host: {}", host);
let auth_header = format!("Bearer {auth_token}");
let mut auth_value = match header::HeaderValue::from_str(auth_header.as_str()) {
Ok(header) => header,
Err(e) => {
log::debug!("Invalid header value: {e}");
return Err(OxenError::basic_str(
return Err(OxenError::authentication(
"Error setting request auth. Please check your Oxen config.",
));
}
};
auth_value.set_sensitive(true);
let mut headers = header::HeaderMap::new();
headers.insert(header::AUTHORIZATION, auth_value);

builder = builder.default_headers(headers);
} else {
log::trace!("No auth token found for host: {}", host);
log::trace!("No auth token found for host: {host}");
}

Ok(builder)
}

fn build_user_agent(config: &RuntimeConfig) -> String {
fn build_user_agent(config: &RuntimeConfig) -> Result<String, OxenError> {
let host_platform = config.host_platform.display_name();
let runtime_name = match config.runtime_name {
Runtime::CLI => config.runtime_name.display_name().to_string(),
Expand All @@ -159,7 +191,9 @@ fn build_user_agent(config: &RuntimeConfig) -> String {
config.runtime_version
),
};
format!("{USER_AGENT}/{VERSION} ({host_platform}; {runtime_name})")
Ok(format!(
"{USER_AGENT}/{VERSION} ({host_platform}; {runtime_name})"
))
}

/// Performs an extra parse to validate that the response is success
Expand Down
Loading
Loading