Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions crates/lib/src/storage/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,16 @@ impl VersionStore for LocalVersionStore {
Ok(data)
}

async fn get_version_derived_size(
&self,
orig_hash: &str,
derived_filename: &str,
) -> Result<u64, OxenError> {
let path = self.version_dir(orig_hash).join(derived_filename);
let metadata = fs::metadata(&path).await?;
Ok(metadata.len())
}

async fn get_version_stream(
&self,
hash: &str,
Expand Down Expand Up @@ -773,6 +783,13 @@ mod tests {
collected.extend_from_slice(&chunk.unwrap());
}
assert_eq!(collected, derived_data);
assert_eq!(
store
.get_version_derived_size(orig_hash, derived_filename)
.await
.unwrap(),
derived_data.len() as u64
);
}

#[tokio::test]
Expand Down
23 changes: 23 additions & 0 deletions crates/lib/src/storage/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,29 @@ impl VersionStore for S3VersionStore {
Ok(data)
}

async fn get_version_derived_size(
&self,
orig_hash: &str,
derived_filename: &str,
) -> Result<u64, OxenError> {
let client = self.init_client().await?;
let key = format!("{}/{}", self.version_dir(orig_hash), derived_filename);

let resp = client
.head_object()
.bucket(&self.bucket)
.key(&key)
.send()
.await
.map_err(|e| OxenError::basic_str(format!("S3 head_object failed: {e}")))?;

let size = resp
.content_length()
.ok_or_else(|| OxenError::basic_str("S3 object missing content_length"))?
as u64;
Ok(size)
}

async fn get_version_stream(
&self,
hash: &str,
Expand Down
16 changes: 16 additions & 0 deletions crates/lib/src/storage/version_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,22 @@ pub trait VersionStore: Debug + Send + Sync + 'static {
hash: &str,
) -> Result<Box<dyn Stream<Item = Result<Bytes, std::io::Error>> + Send + Unpin>, OxenError>;

/// Get the size in bytes of a derived file (e.g., resized image, video thumbnail).
///
/// Returns the content length of the derived artifact on disk.
///
/// # Arguments
/// * `orig_hash` - The content hash of the parent version
/// * `derived_filename` - Filename for the derived artifact
///
/// # Errors
/// Returns `OxenError` if the derived file does not exist or cannot be read.
async fn get_version_derived_size(
&self,
orig_hash: &str,
derived_filename: &str,
) -> Result<u64, OxenError>;

/// Get a stream of a derived file (resized, video thumbnail, etc.)
///
/// # Arguments
Expand Down
24 changes: 20 additions & 4 deletions crates/lib/src/util/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,13 @@ pub async fn handle_image_resize(
file_hash: String,
file_path: &Path,
img_resize: ImgResize,
) -> Result<Pin<Box<dyn Stream<Item = Result<Bytes, std::io::Error>> + Send>>, OxenError> {
) -> Result<
(
Pin<Box<dyn Stream<Item = Result<Bytes, std::io::Error>> + Send>>,
u64,
),
OxenError,
> {
log::debug!("img_resize {img_resize:?}");
let derived_filename = resized_filename(file_path, img_resize.width, img_resize.height);

Expand Down Expand Up @@ -1579,16 +1585,25 @@ pub async fn resize_cache_image_version_store(
img_hash: &str,
derived_filename: &str,
resize: ImgResize,
) -> Result<Pin<Box<dyn Stream<Item = Result<Bytes, std::io::Error>> + Send>>, OxenError> {
) -> Result<
(
Pin<Box<dyn Stream<Item = Result<Bytes, std::io::Error>> + Send>>,
u64,
),
OxenError,
> {
if version_store
.derived_version_exists(img_hash, derived_filename)
.await?
{
log::debug!("In the resize cache! {derived_filename}");
let content_length = version_store
.get_version_derived_size(img_hash, derived_filename)
.await?;
let stream = version_store
.get_version_derived_stream(img_hash, derived_filename)
.await?;
return Ok(stream.boxed());
return Ok((stream.boxed(), content_length));
}

log::debug!("create resized image {derived_filename} from hash {img_hash}");
Expand Down Expand Up @@ -1638,11 +1653,12 @@ pub async fn resize_cache_image_version_store(
version_store
.store_version_derived(img_hash, derived_filename, &buf)
.await?;
let content_length = buf.len() as u64;

let stream: Pin<Box<dyn Stream<Item = Result<Bytes, std::io::Error>> + Send>> =
futures::stream::once(async move { Ok(Bytes::from(buf)) }).boxed();

Ok(stream)
Ok((stream, content_length))
}

/// Generate a video thumbnail using thumbnails crate.
Expand Down
72 changes: 58 additions & 14 deletions crates/server/src/controllers/file.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::errors::OxenHttpError;
use crate::helpers::{create_user_from_options, get_repo};
use crate::helpers::{create_user_from_options, file_stream_response, get_repo};
use crate::params::{app_data, parse_resource, path_param};

use actix_multipart::form::text::Text;
Expand Down Expand Up @@ -177,6 +177,7 @@ pub async fn get(
let file_hash = entry.hash();
let hash_str = file_hash.to_string();
let mime_type = entry.mime_type();
let num_bytes = entry.num_bytes();
let last_commit_id = entry.last_commit_id().to_string();
let query_params = query.into_inner();

Expand All @@ -190,18 +191,18 @@ pub async fn get(
};
log::debug!("img_resize {img_resize:?}");

let file_stream = util::fs::handle_image_resize(
let (file_stream, content_length) = util::fs::handle_image_resize(
version_store.clone(),
hash_str.clone(),
&path,
img_resize,
)
.await?;

return Ok(HttpResponse::Ok()
.content_type(mime_type)
.insert_header(("oxen-revision-id", last_commit_id.as_str()))
.streaming(file_stream));
return Ok(
file_stream_response(mime_type, &last_commit_id, Some(content_length))
.streaming(file_stream),
);
}

// Handle video thumbnail - requires thumbnail=true parameter
Expand All @@ -218,21 +219,15 @@ pub async fn get(
util::fs::handle_video_thumbnail(Arc::clone(&version_store), hash_str, video_thumbnail)
.await?;

return Ok(HttpResponse::Ok()
.content_type("image/jpeg")
.insert_header(("oxen-revision-id", last_commit_id.as_str()))
.streaming(stream));
return Ok(file_stream_response("image/jpeg", &last_commit_id, None).streaming(stream));
}

log::debug!("did not hit the resize or thumbnail cache");

// Stream the file
let stream = version_store.get_version_stream(&hash_str).await?;

Ok(HttpResponse::Ok()
.content_type(mime_type)
.insert_header(("oxen-revision-id", last_commit_id.as_str()))
.streaming(stream))
Ok(file_stream_response(mime_type, &last_commit_id, Some(num_bytes)).streaming(stream))
}

/// Upload files
Expand Down Expand Up @@ -793,6 +788,7 @@ mod tests {
use std::path::{Path, PathBuf};

use actix_multipart_test::MultiPartFormDataBuilder;
use actix_web::http::header;
use actix_web::{App, body, web};
use liboxen::view::CommitResponse;

Expand Down Expand Up @@ -871,6 +867,54 @@ mod tests {
Ok(())
}

#[actix_web::test]
async fn test_controllers_file_get_exposes_content_length() -> Result<(), OxenError> {
liboxen::test::init_test_env();
let sync_dir = test::get_sync_dir()?;
let namespace = "Testing-Namespace";
let repo_name = "Testing-Get-Headers";
let repo = test::create_local_repo(&sync_dir, namespace, repo_name)?;

util::fs::create_dir_all(repo.path.join("data"))?;
let hello_file = repo.path.join("data/hello.txt");
let file_content = "Hello";
util::fs::write_to_path(&hello_file, file_content)?;
repositories::add(&repo, &hello_file).await?;
let _commit = repositories::commit(&repo, "First commit")?;

let uri = format!("/oxen/{namespace}/{repo_name}/file/main/data/hello.txt");
let req = actix_web::test::TestRequest::get()
.uri(&uri)
.app_data(OxenAppData::new(sync_dir.to_path_buf()))
.to_request();

let app = actix_web::test::init_service(
App::new()
.app_data(OxenAppData::new(sync_dir.clone()))
.route(
"/oxen/{namespace}/{repo_name}/file/{resource:.*}",
web::get().to(controllers::file::get),
),
)
.await;

let resp = actix_web::test::call_service(&app, req).await;
assert_eq!(resp.status(), actix_web::http::StatusCode::OK);
assert_eq!(
resp.headers().get(header::CONTENT_LENGTH).unwrap(),
file_content.len().to_string().as_str()
);
assert_eq!(
resp.headers()
.get(header::ACCESS_CONTROL_EXPOSE_HEADERS)
.unwrap(),
header::CONTENT_LENGTH.as_str()
);

test::cleanup_sync_dir(&sync_dir)?;
Ok(())
}

#[actix_web::test]
async fn test_controllers_file_put_single_file_to_full_resource_path() -> Result<(), OxenError>
{
Expand Down
Loading
Loading