diff --git a/crates/lib/src/api/client/import.rs b/crates/lib/src/api/client/import.rs index 1611db20d..cd98b96f9 100644 --- a/crates/lib/src/api/client/import.rs +++ b/crates/lib/src/api/client/import.rs @@ -1,7 +1,8 @@ use crate::api; use crate::api::client; use crate::error::OxenError; -use crate::model::RemoteRepository; +use crate::model::{NewCommitBody, RemoteRepository}; +use crate::view::CommitResponse; use std::path::Path; @@ -56,6 +57,45 @@ pub async fn upload_zip( Ok(response.commit) } +/// Import a file from a URL into a repository branch +pub async fn import_url( + remote_repo: &RemoteRepository, + branch_name: impl AsRef, + directory: impl AsRef, + download_url: impl AsRef, + commit: &NewCommitBody, + force_update: bool, +) -> Result { + let branch_name = branch_name.as_ref(); + let directory = directory.as_ref(); + + let uri = format!("/import/{branch_name}/{directory}"); + let url = api::endpoint::url_from_repo(remote_repo, &uri)?; + + let mut body = serde_json::json!({ + "download_url": download_url.as_ref(), + "name": commit.author, + "email": commit.email, + "message": commit.message, + }); + if force_update { + body["force_update"] = serde_json::Value::Bool(true); + } + + let client = client::new_for_url(&url)?; + let response = client + .post(&url) + .header("Content-Type", "application/json") + .json(&body) + .send() + .await?; + let body = client::parse_json_body(&url, response).await?; + let response: CommitResponse = serde_json::from_str(&body) + .map_err(|e| OxenError::basic_str(format!("Failed to parse response: {e}\n\n{body}")))?; + + Ok(response.commit) +} + #[cfg(test)] mod tests { use crate::test; @@ -63,10 +103,12 @@ mod tests { use crate::constants::DEFAULT_BRANCH_NAME; use crate::error::OxenError; + use crate::model::NewCommitBody; use crate::api; use std::io::Write; + use std::path::Path; #[tokio::test] async fn test_upload_zip_file() -> Result<(), OxenError> { @@ -183,4 +225,89 @@ mod tests { }) .await } + + #[tokio::test] + async fn test_import_url_with_force_update() -> Result<(), OxenError> { + test::run_remote_repo_test_bounding_box_csv_pushed(|_local_repo, remote_repo| async move { + let branch_name = DEFAULT_BRANCH_NAME; + let download_url = + "https://hub.oxen.ai/api/repos/ox/Oxen-AI-Assets/file/main/images/bloxy_white_background.png"; + + let commit_body = NewCommitBody { + message: "First import".to_string(), + author: "Test User".to_string(), + email: "test@oxen.ai".to_string(), + }; + + // First import + let first_commit = api::client::import::import_url( + &remote_repo, + branch_name, + "imported", + download_url, + &commit_body, + false, + ) + .await?; + assert!(!first_commit.id.is_empty()); + + // Second import of same file WITHOUT force_update should fail (no changes) + let result = api::client::import::import_url( + &remote_repo, + branch_name, + "imported", + download_url, + &NewCommitBody { + message: "Second import no force".to_string(), + ..commit_body.clone() + }, + false, + ) + .await; + assert!( + result.is_err(), + "Expected import to fail with no changes: {result:?}" + ); + + // Third import of same file WITH force_update should succeed + let third_commit = api::client::import::import_url( + &remote_repo, + branch_name, + "imported", + download_url, + &NewCommitBody { + message: "Force update import".to_string(), + ..commit_body.clone() + }, + true, + ) + .await?; + assert_ne!(first_commit.id, third_commit.id); + + // Verify latest_commit on the entry matches the force_update commit + let entries = api::client::dir::list( + &remote_repo, + branch_name, + Path::new("imported"), + 1, + 100, + ) + .await?; + let file_entry = entries + .entries + .iter() + .find(|e| e.filename() == "bloxy_white_background.png") + .expect("Should find the imported file"); + let latest_commit = file_entry + .latest_commit() + .expect("Entry should have latest_commit"); + assert_eq!( + latest_commit.id, third_commit.id, + "latest_commit should match the force_update commit" + ); + + Ok(remote_repo) + }) + .await + } } diff --git a/crates/lib/src/api/client/versions.rs b/crates/lib/src/api/client/versions.rs index aca98bf84..e081c5da7 100644 --- a/crates/lib/src/api/client/versions.rs +++ b/crates/lib/src/api/client/versions.rs @@ -110,6 +110,7 @@ pub async fn parallel_large_file_upload( file_path: impl AsRef, dst_dir: Option>, // dst_dir is provided for workspace add workflow workspace_id: Option, + force_update: bool, entry: Option, // entry is provided for push workflow progress: Option<&Arc>, // for push workflow ) -> Result { @@ -132,7 +133,14 @@ pub async fn parallel_large_file_upload( .await?; let num_chunks = results.len(); log::debug!("multipart_large_file_upload num_chunks: {num_chunks:?}"); - complete_multipart_large_file_upload(remote_repo, upload, num_chunks, workspace_id).await + complete_multipart_large_file_upload( + remote_repo, + upload, + num_chunks, + workspace_id, + force_update, + ) + .await } /// Creates a new multipart large file upload @@ -441,6 +449,7 @@ async fn complete_multipart_large_file_upload( upload: MultipartLargeFileUpload, num_chunks: usize, workspace_id: Option, + force_update: bool, ) -> Result { let file_hash = &upload.hash.to_string(); @@ -463,6 +472,7 @@ async fn complete_multipart_large_file_upload( upload_results: None, }], workspace_id, + force_update, }; let body = serde_json::to_string(&body)?; @@ -826,6 +836,7 @@ mod tests { path, dst_dir, workspace_id, + false, None, None, ) diff --git a/crates/lib/src/api/client/workspaces/commits.rs b/crates/lib/src/api/client/workspaces/commits.rs index 041f67028..909993aef 100644 --- a/crates/lib/src/api/client/workspaces/commits.rs +++ b/crates/lib/src/api/client/workspaces/commits.rs @@ -513,4 +513,234 @@ mod tests { }) .await } + + #[tokio::test] + async fn test_reupload_same_file_without_force_update_fails() -> Result<(), OxenError> { + test::run_remote_created_and_readme_remote_repo_test(|remote_repo| async move { + let branch_name = "main"; + + // First upload and commit + let workspace_id = UserConfig::identifier()?; + api::client::workspaces::create(&remote_repo, branch_name, &workspace_id).await?; + + let paths = vec![test::test_img_file()]; + let result = api::client::workspaces::files::add( + &remote_repo, + &workspace_id, + "images", + paths, + &None, + ) + .await; + assert!(result.is_ok(), "{result:?}"); + + let body = NewCommitBody { + message: "Add image".to_string(), + author: "Test User".to_string(), + email: "test@oxen.ai".to_string(), + }; + let first_commit = + api::client::workspaces::commit(&remote_repo, branch_name, &workspace_id, &body) + .await?; + assert!(!first_commit.id.is_empty()); + + // Re-upload the same file without force_update + let workspace_id_2 = UserConfig::identifier()? + "_2"; + api::client::workspaces::create(&remote_repo, branch_name, &workspace_id_2).await?; + + let paths = vec![test::test_img_file()]; + let result = api::client::workspaces::files::add( + &remote_repo, + &workspace_id_2, + "images", + paths, + &None, + ) + .await; + assert!(result.is_ok(), "{result:?}"); + + // Commit should fail because there are no changes + let body = NewCommitBody { + message: "Re-add same image".to_string(), + author: "Test User".to_string(), + email: "test@oxen.ai".to_string(), + }; + let result = + api::client::workspaces::commit(&remote_repo, branch_name, &workspace_id_2, &body) + .await; + assert!( + result.is_err(), + "Expected commit to fail with no changes, but got: {result:?}" + ); + + Ok(remote_repo) + }) + .await + } + + #[tokio::test] + async fn test_reupload_same_file_with_force_update_succeeds() -> Result<(), OxenError> { + test::run_remote_created_and_readme_remote_repo_test(|remote_repo| async move { + let branch_name = "main"; + + // First upload and commit + let workspace_id = UserConfig::identifier()?; + api::client::workspaces::create(&remote_repo, branch_name, &workspace_id).await?; + + let paths = vec![test::test_img_file()]; + let result = api::client::workspaces::files::add( + &remote_repo, + &workspace_id, + "images", + paths, + &None, + ) + .await; + assert!(result.is_ok(), "{result:?}"); + + let body = NewCommitBody { + message: "Add image".to_string(), + author: "Test User".to_string(), + email: "test@oxen.ai".to_string(), + }; + let first_commit = + api::client::workspaces::commit(&remote_repo, branch_name, &workspace_id, &body) + .await?; + assert!(!first_commit.id.is_empty()); + + // Re-upload the same file WITH force_update + let workspace_id_2 = UserConfig::identifier()? + "_2"; + api::client::workspaces::create(&remote_repo, branch_name, &workspace_id_2).await?; + + let paths = vec![test::test_img_file()]; + let result = api::client::workspaces::files::add_with_opts( + &remote_repo, + &workspace_id_2, + "images", + paths, + &None, + true, // force_update + ) + .await; + assert!(result.is_ok(), "{result:?}"); + + // Commit should succeed because force_update forces timestamp update + let body = NewCommitBody { + message: "Force update same image".to_string(), + author: "Test User".to_string(), + email: "test@oxen.ai".to_string(), + }; + let second_commit = + api::client::workspaces::commit(&remote_repo, branch_name, &workspace_id_2, &body) + .await?; + assert!(!second_commit.id.is_empty()); + + // The commit IDs should be different + assert_ne!( + first_commit.id, second_commit.id, + "Expected different commit IDs after force update" + ); + + // Verify latest_commit on the file entry matches the second commit + let entries = + api::client::dir::list(&remote_repo, branch_name, Path::new("images"), 1, 100) + .await?; + let file_entry = entries + .entries + .iter() + .find(|e| e.filename() == "dwight_vince.jpeg") + .expect("Should find the uploaded file in the directory listing"); + let latest_commit = file_entry + .latest_commit() + .expect("File entry should have a latest_commit"); + assert_eq!( + latest_commit.id, second_commit.id, + "latest_commit on the file entry should match the force_update commit, got {} expected {}", + latest_commit.id, second_commit.id, + ); + + Ok(remote_repo) + }) + .await + } + + #[tokio::test] + async fn test_reupload_same_large_file_with_force_update_succeeds() -> Result<(), OxenError> { + test::run_remote_created_and_readme_remote_repo_test(|remote_repo| async move { + let branch_name = "main"; + + let workspace_id = UserConfig::identifier()?; + api::client::workspaces::create(&remote_repo, branch_name, &workspace_id).await?; + + let paths = vec![test::test_30k_parquet()]; + let result = api::client::workspaces::files::add( + &remote_repo, + &workspace_id, + "parquet", + paths.clone(), + &None, + ) + .await; + assert!(result.is_ok(), "{result:?}"); + + let body = NewCommitBody { + message: "Add large parquet".to_string(), + author: "Test User".to_string(), + email: "test@oxen.ai".to_string(), + }; + let first_commit = + api::client::workspaces::commit(&remote_repo, branch_name, &workspace_id, &body) + .await?; + assert!(!first_commit.id.is_empty()); + + let workspace_id_2 = UserConfig::identifier()? + "_2"; + api::client::workspaces::create(&remote_repo, branch_name, &workspace_id_2).await?; + + let result = api::client::workspaces::files::add_with_opts( + &remote_repo, + &workspace_id_2, + "parquet", + paths, + &None, + true, + ) + .await; + assert!(result.is_ok(), "{result:?}"); + + let body = NewCommitBody { + message: "Force update same large parquet".to_string(), + author: "Test User".to_string(), + email: "test@oxen.ai".to_string(), + }; + let second_commit = + api::client::workspaces::commit(&remote_repo, branch_name, &workspace_id_2, &body) + .await?; + assert!(!second_commit.id.is_empty()); + + assert_ne!( + first_commit.id, second_commit.id, + "Expected different commit IDs after force update on a large file" + ); + + let entries = + api::client::dir::list(&remote_repo, branch_name, Path::new("parquet"), 1, 100) + .await?; + let file_entry = entries + .entries + .iter() + .find(|e| e.filename() == "wiki_30k.parquet") + .expect("Should find the uploaded large file in the directory listing"); + let latest_commit = file_entry + .latest_commit() + .expect("File entry should have a latest_commit"); + assert_eq!( + latest_commit.id, second_commit.id, + "latest_commit on the large file entry should match the force_update commit, got {} expected {}", + latest_commit.id, second_commit.id, + ); + + Ok(remote_repo) + }) + .await + } } diff --git a/crates/lib/src/api/client/workspaces/files.rs b/crates/lib/src/api/client/workspaces/files.rs index fa51b3e86..457df5e77 100644 --- a/crates/lib/src/api/client/workspaces/files.rs +++ b/crates/lib/src/api/client/workspaces/files.rs @@ -52,6 +52,25 @@ pub async fn add( directory: impl AsRef, paths: Vec, local_repo: &Option, +) -> Result { + add_with_opts( + remote_repo, + workspace_id, + directory, + paths, + local_repo, + false, + ) + .await +} + +pub async fn add_with_opts( + remote_repo: &RemoteRepository, + workspace_id: impl AsRef, + directory: impl AsRef, + paths: Vec, + local_repo: &Option, + force_update: bool, ) -> Result { let workspace_id = workspace_id.as_ref(); let directory = directory.as_ref(); @@ -85,6 +104,7 @@ pub async fn add( .clone() .map(|local| LocalOrBase::Local(local.clone())) .as_ref(), + force_update, ) .await; @@ -192,6 +212,7 @@ pub async fn add_files( // for a single API call. paths, Some(&base_dir_enum), + false, ) .await { @@ -246,6 +267,7 @@ pub async fn upload_single_file( path, Some(directory), Some(workspace_id.as_ref().to_string()), + false, None, None, ) @@ -276,6 +298,7 @@ async fn upload_multiple_files( directory: impl AsRef, paths: Vec, local_or_base: Option<&LocalOrBase>, + force_update: bool, ) -> Result, OxenError> { if paths.is_empty() { return Ok(vec![]); @@ -351,6 +374,7 @@ async fn upload_multiple_files( &path, Some(&dst_dir), Some(workspace_id.to_string()), + force_update, None, None, ) @@ -377,6 +401,7 @@ async fn upload_multiple_files( small_files, small_files_size, local_or_base, + force_update, ) .await?; @@ -392,6 +417,7 @@ pub(crate) async fn parallel_batched_small_file_upload( small_files: Vec<(PathBuf, u64)>, small_files_size: u64, local_or_base: Option<&LocalOrBase>, + force_update: bool, ) -> Result, OxenError> { if small_files.is_empty() { return Ok(vec![]); @@ -512,8 +538,10 @@ pub(crate) async fn parallel_batched_small_file_upload( )?; // In remote-mode repos, skip adding files already present in tree - if let Some((ref head_commit, ref local_repository)) = - head_commit_local_repo_maybe_clone + // unless force_update is set + if !force_update + && let Some((ref head_commit, ref local_repository)) = + head_commit_local_repo_maybe_clone && let Some(file_node) = repositories::tree::get_file_by_path( local_repository, @@ -677,6 +705,7 @@ pub(crate) async fn parallel_batched_small_file_upload( Arc::new(files_to_stage), &directory_str, upload_err_files, + force_update, ) .await { @@ -789,6 +818,7 @@ pub async fn stage_files_to_workspace_with_retry( files_to_add: Arc>, directory_str: impl AsRef, err_files: Vec, + force_update: bool, ) -> Result, OxenError> { let mut retry_count: usize = 0; let directory_str = directory_str.as_ref(); @@ -805,6 +835,7 @@ pub async fn stage_files_to_workspace_with_retry( files_to_add.clone(), directory_str, err_files.clone(), + force_update, ) .await { @@ -843,10 +874,15 @@ pub async fn stage_files_to_workspace( files_to_add: Arc>, directory_str: impl AsRef, err_files: Vec, + force_update: bool, ) -> Result, OxenError> { let workspace_id = workspace_id.as_ref(); let directory_str = directory_str.as_ref(); - let uri = format!("/workspaces/{workspace_id}/versions/{directory_str}"); + let uri = if force_update { + format!("/workspaces/{workspace_id}/versions/{directory_str}?force_update=true") + } else { + format!("/workspaces/{workspace_id}/versions/{directory_str}") + }; let url = api::endpoint::url_from_repo(remote_repo, &uri)?; let files_to_send = if !err_files.is_empty() { diff --git a/crates/lib/src/core/v_latest/add.rs b/crates/lib/src/core/v_latest/add.rs index 801f31e15..2c337c2d0 100644 --- a/crates/lib/src/core/v_latest/add.rs +++ b/crates/lib/src/core/v_latest/add.rs @@ -964,6 +964,7 @@ pub fn stage_file_with_hash( hash: &str, staged_db_manager: &StagedDBManager, seen_dirs: &Arc>>, + force_update: bool, ) -> Result<(), OxenError> { let workspace_repo = &workspace.workspace_repo; let base_repo = &workspace.base_repo; @@ -971,7 +972,12 @@ pub fn stage_file_with_hash( let relative_path = util::fs::path_relative_to_dir(dst_path, base_repo.path.clone())?; let metadata = util::fs::metadata(data_path)?; - let mtime = FileTime::from_last_modification_time(&metadata); + let mtime = if force_update { + // Use current time to force a timestamp update + FileTime::now() + } else { + FileTime::from_last_modification_time(&metadata) + }; let maybe_file_node = repositories::tree::get_file_by_path(base_repo, head_commit, &relative_path)?; @@ -979,6 +985,12 @@ pub fn stage_file_with_hash( let previous_metadata = file_node.metadata(); let status = if util::fs::is_modified_from_node(data_path, &file_node)? { StagedEntryStatus::Modified + } else if force_update { + // Force staging even though the content hasn't changed + log::info!( + "file {data_path:?} has not changed but force_update is set - staging as modified" + ); + StagedEntryStatus::Modified } else { // Don't add the file if it hasn't changed log::info!("file {data_path:?} has not changed - skipping add"); diff --git a/crates/lib/src/core/v_latest/push.rs b/crates/lib/src/core/v_latest/push.rs index c3edee1b2..1e686a86d 100644 --- a/crates/lib/src/core/v_latest/push.rs +++ b/crates/lib/src/core/v_latest/push.rs @@ -624,6 +624,7 @@ async fn chunk_and_send_large_entries( &*version_path, None::, None, + false, Some(entry.clone()), Some(&bar), ) diff --git a/crates/lib/src/core/v_latest/workspaces/files.rs b/crates/lib/src/core/v_latest/workspaces/files.rs index e1d0cc37c..713f2c735 100644 --- a/crates/lib/src/core/v_latest/workspaces/files.rs +++ b/crates/lib/src/core/v_latest/workspaces/files.rs @@ -1,4 +1,5 @@ use bytes::BytesMut; +use filetime::FileTime; use futures::StreamExt; use parking_lot::Mutex; use reqwest::Client; @@ -38,13 +39,28 @@ const MAX_COMPRESSION_RATIO: u64 = 100; // Maximum allowed // TODO: Do we depreciate this, if we always upload to version store? pub async fn add(workspace: &Workspace, filepath: impl AsRef) -> Result { + add_with_opts(workspace, filepath, false).await +} + +pub async fn add_with_opts( + workspace: &Workspace, + filepath: impl AsRef, + force_update: bool, +) -> Result { let filepath = filepath.as_ref(); let workspace_repo = &workspace.workspace_repo; let base_repo = &workspace.base_repo; // Stage the file using the repositories::add method let commit = workspace.commit.clone(); - p_add_file(base_repo, workspace_repo, &Some(commit), filepath).await?; + p_add_file( + base_repo, + workspace_repo, + &Some(commit), + filepath, + force_update, + ) + .await?; // Return the relative path of the file in the workspace let relative_path = util::fs::path_relative_to_dir(filepath, &workspace_repo.path)?; @@ -71,6 +87,7 @@ pub fn add_version_file( version_path: impl AsRef, dst_path: impl AsRef, file_hash: &str, + force_update: bool, ) -> Result { // version_path is where the file is stored, dst_path is the relative path to the repo // let version_path = version_path.as_ref(); @@ -86,6 +103,7 @@ pub fn add_version_file( file_hash, &staged_db_manager, &Arc::new(Mutex::new(HashSet::new())), + force_update, )?; Ok(dst_path.to_path_buf()) @@ -96,6 +114,7 @@ pub async fn add_version_files( workspace: &Workspace, files_with_hash: &[FileWithHash], directory: impl AsRef, + force_update: bool, ) -> Result, OxenError> { let version_store = repo.version_store()?; @@ -127,6 +146,7 @@ pub async fn add_version_files( &item.hash, &staged_db_manager, &seen_dirs, + force_update, ) { Ok(_) => { // Add parents to staged db @@ -342,6 +362,7 @@ pub async fn import( directory: PathBuf, filename: Option, workspace: &Workspace, + force_update: bool, ) -> Result<(), OxenError> { let parsed_url = Url::parse(url).map_err(|_| OxenError::file_import_error(format!("Invalid URL: {url}")))?; @@ -364,6 +385,7 @@ pub async fn import( directory, filename, workspace, + force_update, ) .await?; @@ -434,6 +456,7 @@ async fn fetch_file( directory: PathBuf, caller_filename: Option, workspace: &Workspace, + force_update: bool, ) -> Result<(), OxenError> { let client = Client::builder() .redirect(redirect::Policy::none()) @@ -603,12 +626,16 @@ async fn fetch_file( for file in files.iter() { log::debug!("file::import add file {file:?}"); - let path = repositories::workspaces::files::add(workspace, file).await?; + let path = + repositories::workspaces::files::add_with_opts(workspace, file, force_update) + .await?; log::debug!("file::import add file ✅ success! staged file {path:?}"); } } else { log::debug!("file::import add file {:?}", &filepath); - let path = repositories::workspaces::files::add(workspace, &save_path).await?; + let path = + repositories::workspaces::files::add_with_opts(workspace, &save_path, force_update) + .await?; log::debug!("file::import add file ✅ success! staged file {path:?}"); } @@ -818,6 +845,7 @@ async fn p_add_file( workspace_repo: &LocalRepository, maybe_head_commit: &Option, path: &Path, + force_update: bool, ) -> Result<(), OxenError> { let version_store = base_repo.version_store()?; let mut maybe_dir_node = None; @@ -838,9 +866,19 @@ async fn p_add_file( } // See if this is a new file or a modified file - let file_status = + let mut file_status = core::v_latest::add::determine_file_status(&maybe_dir_node, &file_name, &full_path)?; + // When force_update is set, override Unmodified status to Modified + // and use the current time so the commit gets a new merkle tree hash + if force_update && file_status.status == StagedEntryStatus::Unmodified { + log::info!( + "file {full_path:?} has not changed but force_update is set - staging as modified" + ); + file_status.status = StagedEntryStatus::Modified; + file_status.mtime = FileTime::now(); + } + // Store the file in the version store using the hash as the key let hash_str = file_status.hash.to_string(); version_store diff --git a/crates/lib/src/repositories/commits/commit_writer.rs b/crates/lib/src/repositories/commits/commit_writer.rs index 8e83b16b2..f7bed34af 100644 --- a/crates/lib/src/repositories/commits/commit_writer.rs +++ b/crates/lib/src/repositories/commits/commit_writer.rs @@ -986,13 +986,6 @@ fn r_create_dir_node( file_node.set_last_commit_id(&last_commit_id); file_node.set_name(file_name); - // if let Some(vnode_db) = &mut maybe_vnode_db { - // log::debug!( - // "Adding file {} to vnode {} in commit {}", - // file_name, - // vnode.id, - // commit_id - // ); vnode_db.add_child(&file_node)?; *total_written += 1; // } @@ -1070,14 +1063,13 @@ fn compute_dir_node( let err_msg = format!("compute_dir_node No entries found for directory {path:?}"); return Err(OxenError::basic_str(err_msg)); }; - // log::debug!( - // "Aggregating dir {:?} child {:?} with {} vnodes", - // path, - // child, - // vnodes.len() - // ); for vnode in vnodes.iter() { - // log::debug!("Aggregating vnode entries {:?}", vnode.entries.len()); + // Include VNode hashes in the directory hash so that when a VNode + // gets a new UUID-based hash (because it has modified entries), the + // parent directory hash changes too. Without this, two commits can + // produce the same directory hash even though their VNode children + // differ, causing stale node data to be read from shared storage. + hasher.update(&vnode.id.to_le_bytes()); for entry in vnode.entries.iter() { // log::debug!("Aggregating entry {}", entry.node); match &entry.node.node { diff --git a/crates/lib/src/repositories/workspaces/files.rs b/crates/lib/src/repositories/workspaces/files.rs index 1e791960a..fdcb18643 100644 --- a/crates/lib/src/repositories/workspaces/files.rs +++ b/crates/lib/src/repositories/workspaces/files.rs @@ -23,6 +23,17 @@ pub async fn add(workspace: &Workspace, path: impl AsRef) -> Result, + force_update: bool, +) -> Result { + match workspace.base_repo.min_version() { + MinOxenVersion::V0_10_0 => panic!("v0.10.0 no longer supported"), + _ => core::v_latest::workspaces::files::add_with_opts(workspace, path, force_update).await, + } +} + pub async fn rm( workspace: &Workspace, path: impl AsRef, @@ -46,12 +57,20 @@ pub async fn import( directory: PathBuf, filename: Option, workspace: &Workspace, + force_update: bool, ) -> Result<(), OxenError> { match workspace.base_repo.min_version() { MinOxenVersion::V0_10_0 => panic!("v0.10.0 no longer supported"), _ => { - core::v_latest::workspaces::files::import(url, auth, directory, filename, workspace) - .await?; + core::v_latest::workspaces::files::import( + url, + auth, + directory, + filename, + workspace, + force_update, + ) + .await?; Ok(()) } } @@ -266,6 +285,7 @@ mod tests { std::path::PathBuf::from("data"), None, &workspace, + false, ) .await; assert!(result.is_err(), "should reject {label}"); diff --git a/crates/lib/src/view/versions.rs b/crates/lib/src/view/versions.rs index 7b74e5741..ce67d81b0 100644 --- a/crates/lib/src/view/versions.rs +++ b/crates/lib/src/view/versions.rs @@ -55,6 +55,8 @@ pub struct CompleteVersionUploadRequest { // If the workspace_id is provided, we will add the file to the workspace // otherwise, we will just add the file to the versions store pub workspace_id: Option, + #[serde(default)] + pub force_update: bool, } #[derive(Serialize, Deserialize, Debug)] diff --git a/crates/server/src/controllers/import.rs b/crates/server/src/controllers/import.rs index b2499a5f6..b89d5879f 100644 --- a/crates/server/src/controllers/import.rs +++ b/crates/server/src/controllers/import.rs @@ -180,9 +180,21 @@ pub async fn import( .and_then(|v| v.as_str()) .map(|s| s.to_string()); + let force_update = body + .get("force_update") + .and_then(|v| v.as_bool()) + .unwrap_or(false); + // download and save the file into the workspace - repositories::workspaces::files::import(download_url, auth, directory, filename, &workspace) - .await?; + repositories::workspaces::files::import( + download_url, + auth, + directory, + filename, + &workspace, + force_update, + ) + .await?; // Commit workspace let commit_body = NewCommitBody { diff --git a/crates/server/src/controllers/versions/chunks.rs b/crates/server/src/controllers/versions/chunks.rs index b295a37af..492f98bc0 100644 --- a/crates/server/src/controllers/versions/chunks.rs +++ b/crates/server/src/controllers/versions/chunks.rs @@ -135,6 +135,7 @@ pub async fn complete(req: HttpRequest, body: String) -> Result, } +/// Query parameters for staging operations +#[derive(Deserialize, Debug, Default)] +pub struct StagingQueryParams { + pub force_update: Option, +} + /// Combined query parameters for workspace file operations (image resize and video thumbnail) #[derive(Deserialize, Debug)] pub struct WorkspaceFileQueryParams { @@ -214,11 +220,12 @@ pub async fn add(req: HttpRequest, payload: Multipart) -> Result Result ret_file, Err(e) => { @@ -260,7 +268,8 @@ pub async fn add(req: HttpRequest, payload: Multipart) -> Result, Query, description = "Force staging even if file content has not changed, updating the file timestamp", example = false) ), request_body( content = Vec, @@ -280,6 +289,7 @@ pub async fn add(req: HttpRequest, payload: Multipart) -> Result>, + query: web::Query, ) -> Result { // Add file to staging let app_data = app_data(&req)?; @@ -287,6 +297,7 @@ pub async fn add_version_files( let repo_name = path_param(&req, "repo_name")?; let workspace_id = path_param(&req, "workspace_id")?; let directory = path_param(&req, "directory")?; + let force_update = query.force_update.unwrap_or(false); let repo = get_repo(&app_data.path, namespace, repo_name)?; let Some(workspace) = repositories::workspaces::get(&repo, &workspace_id)? else { @@ -295,14 +306,16 @@ pub async fn add_version_files( }; let files_with_hash: Vec = payload.into_inner(); log::debug!( - "Calling add version files from the core workspace logic with {} files", + "Calling add version files from the core workspace logic with {} files (force_update: {})", files_with_hash.len(), + force_update, ); let err_files = core::v_latest::workspaces::files::add_version_files( &repo, &workspace, &files_with_hash, &directory, + force_update, ) .await?; @@ -469,7 +482,7 @@ pub async fn mv(req: HttpRequest, body: String) -> Result Result<(Vec, Vec), Error> { +) -> Result<(Vec, Vec, bool), Error> { // Receive a multipart request and save the files to the version store let version_store = repo.version_store().map_err(|oxen_err: OxenError| { log::error!("Failed to get version store: {oxen_err:?}"); @@ -479,12 +492,25 @@ pub async fn save_parts( let mut upload_files: Vec = vec![]; let mut err_files: Vec = vec![]; + let mut force_update = false; while let Some(mut field) = payload.try_next().await? { let Some(content_disposition) = field.content_disposition().cloned() else { continue; }; + if let Some(name) = content_disposition.get_name() + && name == "force_update" + { + let mut field_bytes = Vec::new(); + while let Some(chunk) = field.try_next().await? { + field_bytes.extend_from_slice(&chunk); + } + let value = String::from_utf8_lossy(&field_bytes); + force_update = value == "true" || value == "1"; + continue; + } + if let Some(name) = content_disposition.get_name() && (name == "file[]" || name == "file") { @@ -600,7 +626,7 @@ pub async fn save_parts( } } - Ok((upload_files, err_files)) + Ok((upload_files, err_files, force_update)) } // Record the error file info for retry