diff --git a/oxen-rust/Cargo.lock b/oxen-rust/Cargo.lock index f33e5cc79..76c6cec97 100644 --- a/oxen-rust/Cargo.lock +++ b/oxen-rust/Cargo.lock @@ -66,6 +66,8 @@ dependencies = [ "minus", "mockito", "mp4", + "notify", + "notify-debouncer-full", "num_cpus", "os_path", "par-stream", @@ -97,6 +99,7 @@ dependencies = [ "sysinfo", "tar", "tempfile", + "thiserror 2.0.12", "time", "tokio", "tokio-stream", @@ -2501,6 +2504,15 @@ dependencies = [ "simd-adler32", ] +[[package]] +name = "file-id" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1fc6a637b6dc58414714eddd9170ff187ecb0933d4c7024d1abbd23a3cc26e9" +dependencies = [ + "windows-sys 0.60.2", +] + [[package]] name = "filetime" version = "0.2.25" @@ -2609,6 +2621,15 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" +[[package]] +name = "fsevent-sys" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2" +dependencies = [ + "libc", +] + [[package]] name = "funty" version = "2.0.0" @@ -3342,6 +3363,26 @@ dependencies = [ "cfb", ] +[[package]] +name = "inotify" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f37dccff2791ab604f9babef0ba14fbe0be30bd368dc541e2b08d07c8aa908f3" +dependencies = [ + "bitflags 2.9.1", + "inotify-sys", + "libc", +] + +[[package]] +name = "inotify-sys" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb" +dependencies = [ + "libc", +] + [[package]] name = "inout" version = "0.1.4" @@ -3534,6 +3575,26 @@ dependencies = [ "rayon", ] +[[package]] +name = "kqueue" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eac30106d7dce88daf4a3fcb4879ea939476d5074a9b7ddd0fb97fa4bed5596a" +dependencies = [ + "kqueue-sys", + "libc", +] + +[[package]] +name = "kqueue-sys" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed9625ffda8729b85e45cf04090035ac368927b8cebc34898e7c120f52e4838b" +dependencies = [ + "bitflags 1.3.2", + "libc", +] + [[package]] name = "kv-log-macro" version = "1.0.7" @@ -4163,6 +4224,43 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0676bb32a98c1a483ce53e500a81ad9c3d5b3f7c920c28c24e9cb0980d0b5bc8" +[[package]] +name = "notify" +version = "8.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d3d07927151ff8575b7087f245456e549fea62edf0ec4e565a5ee50c8402bc3" +dependencies = [ + "bitflags 2.9.1", + "fsevent-sys", + "inotify", + "kqueue", + "libc", + "log", + "mio 1.0.4", + "notify-types", + "walkdir", + "windows-sys 0.60.2", +] + +[[package]] +name = "notify-debouncer-full" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "375bd3a138be7bfeff3480e4a623df4cbfb55b79df617c055cd810ba466fa078" +dependencies = [ + "file-id", + "log", + "notify", + "notify-types", + "walkdir", +] + +[[package]] +name = "notify-types" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e0826a989adedc2a244799e823aece04662b66609d96af8dff7ac6df9a8925d" + [[package]] name = "now" version = "0.1.3" @@ -4484,6 +4582,26 @@ dependencies = [ "uuid", ] +[[package]] +name = "oxen-watcher" +version = "0.36.3" +dependencies = [ + "chrono", + "clap", + "env_logger", + "liboxen", + "log", + "notify", + "notify-debouncer-full", + "rmp-serde", + "serde", + "serde_json", + "tempfile", + "thiserror 2.0.12", + "tokio", + "tokio-test", +] + [[package]] name = "par-stream" version = "0.10.2" @@ -6972,6 +7090,19 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-test" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2468baabc3311435b55dd935f702f42cd1b8abb7e754fb7dfb16bd36aa88f9f7" +dependencies = [ + "async-stream", + "bytes", + "futures-core", + "tokio", + "tokio-stream", +] + [[package]] name = "tokio-util" version = "0.7.16" diff --git a/oxen-rust/Cargo.toml b/oxen-rust/Cargo.toml index a0abedfd7..4bf15520c 100644 --- a/oxen-rust/Cargo.toml +++ b/oxen-rust/Cargo.toml @@ -88,6 +88,8 @@ mockito = "1.1.0" mp4 = "0.14.0" mime = "0.3.17" minus = { version = "5.4.0", features = ["static_output", "search"] } +notify = "8.2" +notify-debouncer-full = "0.6" num_cpus = "1.16.0" parking_lot = "0.12.1" par-stream = { version = "0.10.2", features = ["runtime-tokio"] } @@ -132,6 +134,7 @@ sql_query_builder = { version = "2.1.0", features = ["postgresql"] } sysinfo = "0.33.0" tar = "0.4.44" tempfile = "3.8.0" +thiserror = "2.0" time = { version = "0.3.28", features = ["serde"] } tokio = { version = "1.32.0", features = ["full"] } tokio-stream = "0.1.17" @@ -148,7 +151,7 @@ async-recursion = "1.1.1" [workspace] -members = ["src/cli", "src/lib", "src/server"] +members = ["src/cli", "src/lib", "src/server", "src/watcher"] [profile.release] codegen-units = 1 @@ -174,6 +177,11 @@ name = "oxen-server" path = "src/server/src/main.rs" bench = false +[[bin]] +name = "oxen-watcher" +path = "src/watcher/src/main.rs" +bench = false + [package.metadata.docs.rs] default-target = "x86_64-unknown-linux-gnu" features = ["duckdb"] # this is without "duckdb/bundled" diff --git a/oxen-rust/src/cli/src/cmd/status.rs b/oxen-rust/src/cli/src/cmd/status.rs index 1922b9974..daa0833e3 100644 --- a/oxen-rust/src/cli/src/cmd/status.rs +++ b/oxen-rust/src/cli/src/cmd/status.rs @@ -54,6 +54,12 @@ impl RunCmd for StatusCmd { .help("If present, does not truncate the output of status at all.") .action(clap::ArgAction::SetTrue), ) + .arg( + Arg::new("no-cache") + .long("no-cache") + .help("Skip filesystem cache and perform full scan") + .action(clap::ArgAction::SetTrue), + ) .arg( Arg::new("paths") .num_args(0..) @@ -74,6 +80,7 @@ impl RunCmd for StatusCmd { .parse::() .expect("limit must be a valid integer."); let print_all = args.get_flag("print_all"); + let no_cache = args.get_flag("no-cache"); let repository = LocalRepository::from_current_dir()?; check_repo_migration_needed(&repository)?; @@ -93,7 +100,15 @@ impl RunCmd for StatusCmd { }; log::debug!("status opts: {:?}", opts); - let repo_status = repositories::status::status_from_opts(&repository, &opts)?; + // Use the watcher-enabled status function unless --no-cache is specified + let repo_status = if no_cache { + log::debug!("Using direct scan (--no-cache specified)"); + repositories::status::status_from_opts(&repository, &opts)? + } else { + // Try to use watcher cache by default + log::debug!("Attempting to use watcher cache"); + liboxen::core::v_latest::status::status_with_cache(&repository, &opts, true).await? + }; if let Some(current_branch) = repositories::branches::current_branch(&repository)? { println!( diff --git a/oxen-rust/src/lib/src/core/v_latest.rs b/oxen-rust/src/lib/src/core/v_latest.rs index 7ca8c9ad2..0d356a26c 100644 --- a/oxen-rust/src/lib/src/core/v_latest.rs +++ b/oxen-rust/src/lib/src/core/v_latest.rs @@ -23,6 +23,7 @@ pub mod revisions; pub mod rm; pub mod stats; pub mod status; +pub mod watcher_client; pub mod workspaces; pub use add::add; diff --git a/oxen-rust/src/lib/src/core/v_latest/status.rs b/oxen-rust/src/lib/src/core/v_latest/status.rs index 287c7296a..ac3951ea1 100644 --- a/oxen-rust/src/lib/src/core/v_latest/status.rs +++ b/oxen-rust/src/lib/src/core/v_latest/status.rs @@ -25,6 +25,7 @@ use std::str; use std::time::Duration; use crate::core::v_latest::index::CommitMerkleTree; +use crate::core::v_latest::watcher_client::{WatcherClient, WatcherStatus}; use crate::model::merkle_tree::node::EMerkleTreeNode; use crate::model::merkle_tree::node::MerkleTreeNode; @@ -43,6 +44,164 @@ pub fn status_from_dir( status_from_opts(repo, &opts) } +/// Status with optional watcher cache support +pub async fn status_with_cache( + repo: &LocalRepository, + opts: &StagedDataOpts, + use_cache: bool, +) -> Result { + // If cache is enabled, try to use the watcher + if use_cache { + log::debug!("Attempting to use watcher cache for status"); + + // Try to connect to watcher + if let Some(client) = WatcherClient::connect(repo).await { + log::info!("Connected to watcher, getting status"); + + // Try to get status from watcher + match client.get_status().await { + Ok(watcher_status) => { + log::debug!("Got status from watcher, merging with staged data"); + log::info!("Got status from watcher, merging with staged data"); + return merge_watcher_with_staged(repo, opts, watcher_status); + } + Err(e) => { + log::warn!("Failed to get status from watcher: {}", e); + // Fall through to regular status + } + } + } else { + log::warn!("Could not connect to watcher"); + } + } else { + log::debug!("Cache disabled, using direct scan"); + } + + // Fallback to regular status + status_from_opts(repo, opts) +} + +/// Merge watcher data with staged database and other sources +fn merge_watcher_with_staged( + repo: &LocalRepository, + opts: &StagedDataOpts, + watcher: WatcherStatus, +) -> Result { + log::debug!("Merging watcher data with staged database"); + + let mut staged_data = StagedData::empty(); + + // Apply oxenignore filtering + let oxenignore = oxenignore::create(repo); + + // Use watcher data for filesystem state + // Apply path filtering if paths were specified + if !opts.paths.is_empty() && opts.paths[0] != repo.path { + // Filter watcher results to only include specified paths + let requested_paths: HashSet = opts + .paths + .iter() + .map(|p| util::fs::path_relative_to_dir(p, &repo.path)) + .filter_map(Result::ok) + .collect(); + + staged_data.untracked_files = watcher + .created + .into_iter() + .filter(|p| requested_paths.iter().any(|req| p.starts_with(req))) + .filter(|p| !oxenignore::is_ignored(p, &oxenignore, false)) + .collect(); + + staged_data.modified_files = watcher + .modified + .into_iter() + .filter(|p| requested_paths.iter().any(|req| p.starts_with(req))) + .filter(|p| !oxenignore::is_ignored(p, &oxenignore, false)) + .collect(); + + staged_data.removed_files = watcher + .removed + .into_iter() + .filter(|p| requested_paths.iter().any(|req| p.starts_with(req))) + .filter(|p| !oxenignore::is_ignored(p, &oxenignore, false)) + .collect(); + } else { + // Use all watcher data with oxenignore filtering + staged_data.untracked_files = watcher + .created + .into_iter() + .filter(|p| !oxenignore::is_ignored(p, &oxenignore, false)) + .collect(); + staged_data.modified_files = watcher + .modified + .into_iter() + .filter(|p| !oxenignore::is_ignored(p, &oxenignore, false)) + .collect(); + staged_data.removed_files = watcher + .removed + .into_iter() + .filter(|p| !oxenignore::is_ignored(p, &oxenignore, false)) + .collect(); + } + + // Extract untracked directories from untracked files + let mut untracked_dirs: HashMap = HashMap::new(); + for file in &staged_data.untracked_files { + if let Some(parent) = file.parent() { + if !parent.as_os_str().is_empty() { + *untracked_dirs.entry(parent.to_path_buf()).or_insert(0) += 1; + } + } + } + staged_data.untracked_dirs = untracked_dirs.into_iter().collect(); + + // Now read staged data from the database + let staged_db_maybe = open_staged_db(repo)?; + + if let Some(staged_db) = staged_db_maybe { + log::debug!("Reading staged entries from database"); + + let read_progress = ProgressBar::new_spinner(); + read_progress.set_style(ProgressStyle::default_spinner()); + read_progress.enable_steady_tick(Duration::from_millis(100)); + + // Read staged entries based on paths + let mut dir_entries = HashMap::new(); + if !opts.paths.is_empty() { + for path in &opts.paths { + let (sub_dir_entries, _) = + read_staged_entries_below_path(repo, &staged_db, path, &read_progress)?; + dir_entries.extend(sub_dir_entries); + } + } else { + let (entries, _) = read_staged_entries(repo, &staged_db, &read_progress)?; + dir_entries = entries; + } + + read_progress.finish_and_clear(); + + // Process staged entries and build staged data + status_from_dir_entries(&mut staged_data, dir_entries)?; + + // Filter out staged files from untracked files + // Files that have been staged should not appear as untracked + let staged_paths: HashSet<&PathBuf> = staged_data.staged_files.keys().collect(); + staged_data + .untracked_files + .retain(|path| !staged_paths.contains(&path)); + } + + // Find merge conflicts + let conflicts = repositories::merge::list_conflicts(repo)?; + for conflict in conflicts { + staged_data + .merge_conflicts + .push(conflict.to_entry_merge_conflict()); + } + + Ok(staged_data) +} + pub fn status_from_opts( repo: &LocalRepository, opts: &StagedDataOpts, @@ -920,7 +1079,6 @@ fn count_removed_entries( Ok(()) } -// Helper functions (implement these based on your existing code) fn open_staged_db( repo: &LocalRepository, ) -> Result>, OxenError> { @@ -1107,3 +1265,352 @@ fn maybe_get_dir_children( Ok(None) } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::test; + use std::collections::HashSet; + use std::path::PathBuf; + use std::time::SystemTime; + + #[tokio::test] + async fn test_merge_watcher_with_staged_empty_watcher() -> Result<(), OxenError> { + test::run_empty_local_repo_test(|repo| { + // Create empty watcher status + let watcher_status = WatcherStatus { + created: HashSet::new(), + modified: HashSet::new(), + removed: HashSet::new(), + scan_complete: true, + last_updated: SystemTime::now(), + }; + + let opts = StagedDataOpts::default(); + + // Run merge function + let result = merge_watcher_with_staged(&repo, &opts, watcher_status)?; + + // Verify result has empty collections + assert_eq!(result.untracked_files.len(), 0); + assert_eq!(result.modified_files.len(), 0); + assert_eq!(result.removed_files.len(), 0); + assert_eq!(result.untracked_dirs.len(), 0); + + Ok(()) + }) + } + + #[tokio::test] + async fn test_merge_watcher_with_untracked_files() -> Result<(), OxenError> { + test::run_empty_local_repo_test(|repo| { + // Create watcher status with untracked files + let mut untracked = HashSet::new(); + untracked.insert(PathBuf::from("file1.txt")); + untracked.insert(PathBuf::from("dir/file2.txt")); + untracked.insert(PathBuf::from("dir/subdir/file3.txt")); + + let watcher_status = WatcherStatus { + created: untracked.clone(), + modified: HashSet::new(), + removed: HashSet::new(), + scan_complete: true, + last_updated: SystemTime::now(), + }; + + let opts = StagedDataOpts::default(); + + // Run merge function + let result = merge_watcher_with_staged(&repo, &opts, watcher_status)?; + + // Verify untracked files are present + assert_eq!(result.untracked_files.len(), 3); + assert!(result.untracked_files.contains(&PathBuf::from("file1.txt"))); + assert!(result + .untracked_files + .contains(&PathBuf::from("dir/file2.txt"))); + assert!(result + .untracked_files + .contains(&PathBuf::from("dir/subdir/file3.txt"))); + + // Verify untracked directories are extracted + assert_eq!(result.untracked_dirs.len(), 2); + let dir_map: HashMap = result.untracked_dirs.into_iter().collect(); + assert_eq!(dir_map.get(&PathBuf::from("dir")), Some(&1)); + assert_eq!(dir_map.get(&PathBuf::from("dir/subdir")), Some(&1)); + + Ok(()) + }) + } + + #[tokio::test] + async fn test_merge_watcher_with_modified_and_removed() -> Result<(), OxenError> { + test::run_empty_local_repo_test(|repo| { + // Create watcher status with modified and removed files + let mut modified = HashSet::new(); + modified.insert(PathBuf::from("modified1.txt")); + modified.insert(PathBuf::from("modified2.txt")); + + let mut removed = HashSet::new(); + removed.insert(PathBuf::from("removed1.txt")); + removed.insert(PathBuf::from("dir/removed2.txt")); + + let watcher_status = WatcherStatus { + created: HashSet::new(), + modified, + removed, + scan_complete: true, + last_updated: SystemTime::now(), + }; + + let opts = StagedDataOpts::default(); + + // Run merge function + let result = merge_watcher_with_staged(&repo, &opts, watcher_status)?; + + // Verify modified files + assert_eq!(result.modified_files.len(), 2); + assert!(result + .modified_files + .contains(&PathBuf::from("modified1.txt"))); + assert!(result + .modified_files + .contains(&PathBuf::from("modified2.txt"))); + + // Verify removed files + assert_eq!(result.removed_files.len(), 2); + assert!(result + .removed_files + .contains(&PathBuf::from("removed1.txt"))); + assert!(result + .removed_files + .contains(&PathBuf::from("dir/removed2.txt"))); + + Ok(()) + }) + } + + #[tokio::test] + async fn test_merge_watcher_with_path_filtering() -> Result<(), OxenError> { + test::run_empty_local_repo_test(|repo| { + // Create watcher status with files in different directories + let mut untracked = HashSet::new(); + untracked.insert(PathBuf::from("dir1/file1.txt")); + untracked.insert(PathBuf::from("dir2/file2.txt")); + untracked.insert(PathBuf::from("dir3/file3.txt")); + + let mut modified = HashSet::new(); + modified.insert(PathBuf::from("dir1/modified.txt")); + modified.insert(PathBuf::from("dir2/modified.txt")); + + let watcher_status = WatcherStatus { + created: untracked, + modified, + removed: HashSet::new(), + scan_complete: true, + last_updated: SystemTime::now(), + }; + + // Create opts with specific path filter + let opts = StagedDataOpts { + paths: vec![repo.path.join("dir1")], + ..StagedDataOpts::default() + }; + + // Run merge function + let result = merge_watcher_with_staged(&repo, &opts, watcher_status)?; + + // Verify only dir1 files are included + assert_eq!(result.untracked_files.len(), 1); + assert!(result + .untracked_files + .contains(&PathBuf::from("dir1/file1.txt"))); + + assert_eq!(result.modified_files.len(), 1); + assert!(result + .modified_files + .contains(&PathBuf::from("dir1/modified.txt"))); + + Ok(()) + }) + } + + #[tokio::test] + async fn test_merge_watcher_with_oxenignore() -> Result<(), OxenError> { + test::run_empty_local_repo_test(|repo| { + // Create .oxenignore file + let oxenignore_path = repo.path.join(".oxenignore"); + test::write_txt_file_to_path(&oxenignore_path, "*.log\ntemp/\n")?; + + // Create watcher status with ignored and non-ignored files + let mut untracked = HashSet::new(); + untracked.insert(PathBuf::from("file.txt")); + untracked.insert(PathBuf::from("debug.log")); // Should be ignored + untracked.insert(PathBuf::from("temp/file.txt")); // Should be ignored + untracked.insert(PathBuf::from("data/file.txt")); + + let watcher_status = WatcherStatus { + created: untracked, + modified: HashSet::new(), + removed: HashSet::new(), + scan_complete: true, + last_updated: SystemTime::now(), + }; + + let opts = StagedDataOpts::default(); + + // Run merge function + let result = merge_watcher_with_staged(&repo, &opts, watcher_status)?; + + // Verify ignored files are filtered out + assert_eq!(result.untracked_files.len(), 2); + assert!(result.untracked_files.contains(&PathBuf::from("file.txt"))); + assert!(result + .untracked_files + .contains(&PathBuf::from("data/file.txt"))); + assert!(!result.untracked_files.contains(&PathBuf::from("debug.log"))); + assert!(!result + .untracked_files + .contains(&PathBuf::from("temp/file.txt"))); + + Ok(()) + }) + } + + #[tokio::test] + async fn test_merge_watcher_extracts_directories() -> Result<(), OxenError> { + test::run_empty_local_repo_test(|repo| { + // Create watcher status with files in nested directories + let mut untracked = HashSet::new(); + untracked.insert(PathBuf::from("a/file1.txt")); + untracked.insert(PathBuf::from("a/file2.txt")); + untracked.insert(PathBuf::from("a/b/file3.txt")); + untracked.insert(PathBuf::from("a/b/file4.txt")); + untracked.insert(PathBuf::from("a/b/c/file5.txt")); + untracked.insert(PathBuf::from("d/file6.txt")); + + let watcher_status = WatcherStatus { + created: untracked, + modified: HashSet::new(), + removed: HashSet::new(), + scan_complete: true, + last_updated: SystemTime::now(), + }; + + let opts = StagedDataOpts::default(); + + // Run merge function + let result = merge_watcher_with_staged(&repo, &opts, watcher_status)?; + + // Verify all files are present + assert_eq!(result.untracked_files.len(), 6); + + // Verify directories are correctly extracted with counts + let dir_map: HashMap = result.untracked_dirs.into_iter().collect(); + assert_eq!(dir_map.get(&PathBuf::from("a")), Some(&2)); // 2 files directly in 'a' + assert_eq!(dir_map.get(&PathBuf::from("a/b")), Some(&2)); // 2 files directly in 'a/b' + assert_eq!(dir_map.get(&PathBuf::from("a/b/c")), Some(&1)); // 1 file in 'a/b/c' + assert_eq!(dir_map.get(&PathBuf::from("d")), Some(&1)); // 1 file in 'd' + + Ok(()) + }) + } + + #[tokio::test] + async fn test_merge_watcher_all_file_types() -> Result<(), OxenError> { + test::run_empty_local_repo_test(|repo| { + // Create watcher status with all types of changes + let mut untracked = HashSet::new(); + untracked.insert(PathBuf::from("new1.txt")); + untracked.insert(PathBuf::from("new2.txt")); + + let mut modified = HashSet::new(); + modified.insert(PathBuf::from("changed1.txt")); + modified.insert(PathBuf::from("changed2.txt")); + + let mut removed = HashSet::new(); + removed.insert(PathBuf::from("deleted1.txt")); + removed.insert(PathBuf::from("deleted2.txt")); + + let watcher_status = WatcherStatus { + created: untracked.clone(), + modified: modified.clone(), + removed: removed.clone(), + scan_complete: true, + last_updated: SystemTime::now(), + }; + + let opts = StagedDataOpts::default(); + + // Run merge function + let result = merge_watcher_with_staged(&repo, &opts, watcher_status)?; + + // Verify all file types are present + // Convert Vec to HashSet for comparison + let result_untracked: HashSet = result.untracked_files.into_iter().collect(); + assert_eq!(result_untracked, untracked); + assert_eq!(result.modified_files, modified); + assert_eq!(result.removed_files, removed); + + // Verify we have merge conflicts (should be empty for test repo) + assert_eq!(result.merge_conflicts.len(), 0); + + Ok(()) + }) + } + + #[tokio::test] + async fn test_merge_watcher_filters_staged_from_untracked() -> Result<(), OxenError> { + test::run_empty_local_repo_test_async(|repo| async move { + // First, create some files and stage one of them + let file1_path = repo.path.join("file1.txt"); + let file2_path = repo.path.join("file2.txt"); + let file3_path = repo.path.join("file3.txt"); + + test::write_txt_file_to_path(&file1_path, "content1")?; + test::write_txt_file_to_path(&file2_path, "content2")?; + test::write_txt_file_to_path(&file3_path, "content3")?; + + // Stage file1.txt + repositories::add(&repo, &file1_path).await?; + + // Create watcher status that reports all three files as created/untracked + let mut untracked = HashSet::new(); + untracked.insert(PathBuf::from("file1.txt")); // This one is staged + untracked.insert(PathBuf::from("file2.txt")); // This one is not staged + untracked.insert(PathBuf::from("file3.txt")); // This one is not staged + + let watcher_status = WatcherStatus { + created: untracked, + modified: HashSet::new(), + removed: HashSet::new(), + scan_complete: true, + last_updated: SystemTime::now(), + }; + + let opts = StagedDataOpts::default(); + + // Run merge function + let result = merge_watcher_with_staged(&repo, &opts, watcher_status)?; + + // Verify file1.txt is in staged_files but NOT in untracked_files + assert!(result + .staged_files + .contains_key(&PathBuf::from("file1.txt"))); + assert!(!result.untracked_files.contains(&PathBuf::from("file1.txt"))); + + // Verify file2.txt and file3.txt are still in untracked_files + assert!(result.untracked_files.contains(&PathBuf::from("file2.txt"))); + assert!(result.untracked_files.contains(&PathBuf::from("file3.txt"))); + + // Verify we have exactly 2 untracked files (file2 and file3) + assert_eq!(result.untracked_files.len(), 2); + + // Verify we have exactly 1 staged file (file1) + assert_eq!(result.staged_files.len(), 1); + + Ok(()) + }) + .await + } +} diff --git a/oxen-rust/src/lib/src/core/v_latest/watcher_client.rs b/oxen-rust/src/lib/src/core/v_latest/watcher_client.rs new file mode 100644 index 000000000..e06f27d2d --- /dev/null +++ b/oxen-rust/src/lib/src/core/v_latest/watcher_client.rs @@ -0,0 +1,279 @@ +use crate::error::OxenError; +use crate::model::LocalRepository; +use serde::{Deserialize, Serialize}; +use std::collections::HashSet; +use std::path::PathBuf; +use std::time::SystemTime; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::UnixStream; + +/// Client for communicating with the filesystem watcher daemon +pub struct WatcherClient { + socket_path: PathBuf, +} + +/// Status data received from the watcher +#[derive(Debug, Clone)] +pub struct WatcherStatus { + /// Files created since watcher started (includes untracked files from initial scan) + pub created: HashSet, + /// Files modified since watcher started + pub modified: HashSet, + /// Files removed since watcher started + pub removed: HashSet, + /// Whether the initial scan is complete + pub scan_complete: bool, + /// Last update time + pub last_updated: SystemTime, +} + +impl WatcherClient { + /// Try to connect to the watcher daemon for a repository + pub async fn connect(repo: &LocalRepository) -> Option { + let socket_path = repo.path.join(".oxen/watcher.sock"); + + // Check if socket exists + if !socket_path.exists() { + log::debug!("Watcher socket does not exist at {:?}", socket_path); + return None; + } + + // Return client with socket path - actual connection happens in get_status/ping + log::debug!("Watcher socket found at {:?}", socket_path); + Some(Self { socket_path }) + } + + /// Get the current status from the watcher + pub async fn get_status(&self) -> Result { + // Connect to the socket + let mut stream = UnixStream::connect(&self.socket_path) + .await + .map_err(|e| OxenError::basic_str(&format!("Failed to connect to watcher: {}", e)))?; + + // Create request using the watcher protocol + // We need to import the protocol types from the watcher crate + let request = WatcherRequest::GetStatus { paths: None }; + let request_bytes = rmp_serde::to_vec(&request) + .map_err(|e| OxenError::basic_str(&format!("Failed to serialize request: {}", e)))?; + + // Send request (length-prefixed) + let len = request_bytes.len() as u32; + stream + .write_all(&len.to_le_bytes()) + .await + .map_err(|e| OxenError::basic_str(&format!("Failed to write request length: {}", e)))?; + stream + .write_all(&request_bytes) + .await + .map_err(|e| OxenError::basic_str(&format!("Failed to write request: {}", e)))?; + stream + .flush() + .await + .map_err(|e| OxenError::basic_str(&format!("Failed to flush stream: {}", e)))?; + + // Read response length + let mut len_buf = [0u8; 4]; + stream + .read_exact(&mut len_buf) + .await + .map_err(|e| OxenError::basic_str(&format!("Failed to read response length: {}", e)))?; + let response_len = u32::from_le_bytes(len_buf) as usize; + + // Sanity check response size + if response_len > 100 * 1024 * 1024 { + // 100MB max + return Err(OxenError::basic_str(&format!( + "Response too large: {} bytes", + response_len + ))); + } + + // Read response body + let mut response_buf = vec![0u8; response_len]; + stream + .read_exact(&mut response_buf) + .await + .map_err(|e| OxenError::basic_str(&format!("Failed to read response: {}", e)))?; + + // Deserialize response + let response: WatcherResponse = rmp_serde::from_slice(&response_buf) + .map_err(|e| OxenError::basic_str(&format!("Failed to deserialize response: {}", e)))?; + + // Gracefully shutdown the connection + let _ = stream.shutdown().await; + + // Convert response to WatcherStatus + match response { + WatcherResponse::Status(status_result) => Ok(WatcherStatus { + created: status_result.created.into_iter().map(|f| f.path).collect(), + modified: status_result.modified.into_iter().map(|f| f.path).collect(), + removed: status_result.removed.into_iter().collect(), + scan_complete: status_result.scan_complete, + last_updated: SystemTime::now(), + }), + WatcherResponse::Error(msg) => { + Err(OxenError::basic_str(&format!("Watcher error: {}", msg))) + } + _ => Err(OxenError::basic_str("Unexpected response from watcher")), + } + } + + /// Check if the watcher is responsive + pub async fn ping(&self) -> bool { + // Try to connect to the socket + let Ok(mut stream) = UnixStream::connect(&self.socket_path).await else { + return false; + }; + + // Serialize ping request + let request = WatcherRequest::Ping; + let Ok(request_bytes) = rmp_serde::to_vec(&request) else { + return false; + }; + + // Send request length + let len = request_bytes.len() as u32; + let Ok(_) = stream.write_all(&len.to_le_bytes()).await else { + return false; + }; + + // Send request + let Ok(_) = stream.write_all(&request_bytes).await else { + return false; + }; + + // Flush the stream + let Ok(_) = stream.flush().await else { + return false; + }; + + // Read response length + let mut len_buf = [0u8; 4]; + let Ok(_) = stream.read_exact(&mut len_buf).await else { + return false; + }; + + let response_len = u32::from_le_bytes(len_buf) as usize; + + // Sanity check: ping response should be small + if response_len >= 1000 { + return false; + } + + // Read response + let mut response_buf = vec![0u8; response_len]; + let Ok(_) = stream.read_exact(&mut response_buf).await else { + return false; + }; + + // Gracefully shutdown the connection + let _ = stream.shutdown().await; + + // Check if we got an Ok response + let Ok(response) = rmp_serde::from_slice::(&response_buf) else { + return false; + }; + + matches!(response, WatcherResponse::Ok) + } +} + +// +// Protocol types shared between liboxen and oxen-watcher +// + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum WatcherRequest { + GetStatus { paths: Option> }, + GetSummary, + Refresh { paths: Vec }, + Shutdown, + Ping, +} + +impl WatcherRequest { + pub fn to_bytes(&self) -> Result, rmp_serde::encode::Error> { + rmp_serde::to_vec(self) + } + + pub fn from_bytes(bytes: &[u8]) -> Result { + rmp_serde::from_slice(bytes) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum WatcherResponse { + Status(StatusResult), + Summary { + created: usize, + modified: usize, + removed: usize, + last_updated: SystemTime, + }, + Ok, + Error(String), +} + +impl WatcherResponse { + pub fn to_bytes(&self) -> Result, rmp_serde::encode::Error> { + rmp_serde::to_vec(self) + } + + pub fn from_bytes(bytes: &[u8]) -> Result { + rmp_serde::from_slice(bytes) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct StatusResult { + pub created: Vec, + pub modified: Vec, + pub removed: Vec, + pub scan_complete: bool, +} + +impl std::fmt::Display for StatusResult { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.created.iter().for_each(|status| { + writeln!(f, "[Created]\t{}", status.path.display()).unwrap(); + }); + self.modified.iter().for_each(|status| { + writeln!(f, "[Modified]\t{}", status.path.display()).unwrap(); + }); + self.removed.iter().for_each(|path| { + writeln!(f, "[Removed]\t{}", path.display()).unwrap(); + }); + Ok(()) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct FileStatus { + pub path: PathBuf, + pub mtime: SystemTime, + pub size: u64, + pub hash: Option, + pub status: FileStatusType, +} + +impl std::fmt::Display for FileStatus { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "[{:?}] {} {}", + self.status, + self.mtime + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_millis(), + self.path.display(), + ) + } +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub enum FileStatusType { + Created, + Modified, + Removed, +} diff --git a/oxen-rust/src/watcher/Cargo.toml b/oxen-rust/src/watcher/Cargo.toml new file mode 100644 index 000000000..8117f54e3 --- /dev/null +++ b/oxen-rust/src/watcher/Cargo.toml @@ -0,0 +1,34 @@ +[package] +name = "oxen-watcher" +version = "0.36.3" +edition = "2021" +license-file = "../../LICENSE" +description = "Filesystem watcher daemon for Oxen status acceleration" +homepage = "https://oxen.ai" +repository = "https://github.com/Oxen-AI/Oxen" + +[[bin]] +name = "oxen-watcher" +path = "src/main.rs" + +[lib] +name = "oxen_watcher" +path = "src/lib.rs" + +[dependencies] +liboxen = { path = "../lib" } +notify = "8.2" +notify-debouncer-full = "0.6" +tokio = { version = "1", features = ["full"] } +rmp-serde = "1.3.0" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +clap = { version = "4.4.2", features = ["cargo", "derive"] } +log = "0.4" +env_logger = "0.11" +chrono = "0.4" +thiserror = "2.0" + +[dev-dependencies] +tempfile = "3.8" +tokio-test = "0.4" diff --git a/oxen-rust/src/watcher/src/cache.rs b/oxen-rust/src/watcher/src/cache.rs new file mode 100644 index 000000000..efdea065e --- /dev/null +++ b/oxen-rust/src/watcher/src/cache.rs @@ -0,0 +1,175 @@ +use std::collections::HashMap; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use std::time::SystemTime; + +use liboxen::model::LocalRepository; +use log::info; +use tokio::sync::RwLock; + +use crate::error::WatcherError; +use crate::protocol::{FileStatus, FileStatusType, StatusResult}; + +#[path = "cache_test.rs"] +mod cache_test; + +/// Memory-only status cache for fast access +pub struct StatusCache { + /// In-memory cache + cache: Arc>, +} + +/// In-memory cache data structure +struct MemoryCache { + /// Files created since watcher started + created: HashMap, + /// Files modified since watcher started + modified: HashMap, + /// Files removed since watcher started + removed: HashMap, + /// Whether initial scan is complete + scan_complete: bool, + /// Last update time + last_update: SystemTime, +} + +impl MemoryCache { + /// Helper function to update cache for a single file status + fn update_single_status(&mut self, status: FileStatus) { + match status.status { + FileStatusType::Created => { + self.created.insert(status.path.clone(), status.clone()); + // If a file is created, it's no longer modified or removed + self.modified.remove(&status.path); + self.removed.remove(&status.path); + } + FileStatusType::Modified => { + self.modified.insert(status.path.clone(), status.clone()); + // A modified file might have been previously created, keep that status + // But it's definitely not removed + self.removed.remove(&status.path); + } + FileStatusType::Removed => { + // Check if this file was created in the current session + let was_created_in_session = self.created.remove(&status.path).is_some(); + self.modified.remove(&status.path); + + // Only add to removed list if the file existed before this session + // If file was created in this session and then deleted, + // net effect is nothing, so don't add to removed + if !was_created_in_session { + // File existed before watcher started (or was in initial scan), + // so track its removal + self.removed.insert(status.path.clone(), status.clone()); + } + } + } + } +} + +impl StatusCache { + /// Create a new status cache for a repository + pub fn new(repo_path: &Path) -> Result { + // Verify it's a valid repository + let _repo = LocalRepository::from_dir(repo_path)?; + + // Initialize memory cache + let cache = Arc::new(RwLock::new(MemoryCache { + created: HashMap::new(), + modified: HashMap::new(), + removed: HashMap::new(), + scan_complete: false, + last_update: SystemTime::now(), + })); + + Ok(Self { cache }) + } + + /// Get the current status, optionally filtered by paths + pub async fn get_status(&self, paths: Option>) -> StatusResult { + let cache = self.cache.read().await; + + // Filter by paths if requested + let (created, modified, removed) = if let Some(paths) = paths { + let path_set: std::collections::HashSet<_> = paths.iter().collect(); + + ( + cache + .created + .values() + .filter(|f| path_set.contains(&f.path)) + .cloned() + .collect(), + cache + .modified + .values() + .filter(|f| path_set.contains(&f.path)) + .cloned() + .collect(), + cache + .removed + .keys() + .filter(|p| path_set.contains(p)) + .cloned() + .collect(), + ) + } else { + ( + cache.created.values().cloned().collect(), + cache.modified.values().cloned().collect(), + cache.removed.keys().cloned().collect(), + ) + }; + + if !cache.scan_complete { + info!("Scan not complete"); + } + + StatusResult { + created, + modified, + removed, + scan_complete: cache.scan_complete, + } + } + + /// Update a file's status in the cache + #[allow(dead_code)] // Used in tests + pub async fn update_file_status(&self, status: FileStatus) -> Result<(), WatcherError> { + let mut cache = self.cache.write().await; + cache.update_single_status(status); + cache.last_update = SystemTime::now(); + Ok(()) + } + + /// Batch update multiple file statuses + pub async fn batch_update(&self, statuses: Vec) -> Result<(), WatcherError> { + let mut cache = self.cache.write().await; + + for status in statuses { + cache.update_single_status(status); + } + + cache.last_update = SystemTime::now(); + Ok(()) + } + + /// Mark the initial scan as complete + pub async fn mark_scan_complete(&self) -> Result<(), WatcherError> { + let mut cache = self.cache.write().await; + cache.scan_complete = true; + Ok(()) + } + + /// Clear the entire cache + #[allow(dead_code)] // Used in tests + pub async fn clear(&self) -> Result<(), WatcherError> { + let mut cache = self.cache.write().await; + cache.created.clear(); + cache.modified.clear(); + cache.removed.clear(); + cache.scan_complete = false; + cache.last_update = SystemTime::now(); + Ok(()) + } +} diff --git a/oxen-rust/src/watcher/src/cache_test.rs b/oxen-rust/src/watcher/src/cache_test.rs new file mode 100644 index 000000000..769b5dc4a --- /dev/null +++ b/oxen-rust/src/watcher/src/cache_test.rs @@ -0,0 +1,271 @@ +#[cfg(test)] +mod tests { + use crate::cache::StatusCache; + use crate::protocol::{FileStatus, FileStatusType}; + use std::path::PathBuf; + use std::time::SystemTime; + use tempfile::TempDir; + + async fn setup_test_cache() -> (StatusCache, TempDir) { + let temp_dir = TempDir::new().unwrap(); + let repo_path = temp_dir.path(); + + // Create a fake .oxen directory + std::fs::create_dir_all(repo_path.join(".oxen")).unwrap(); + + // Initialize an empty oxen repo (minimal setup) + liboxen::repositories::init::init(repo_path).unwrap(); + + let cache = StatusCache::new(repo_path).unwrap(); + (cache, temp_dir) + } + + #[tokio::test] + async fn test_cache_new() { + let (_cache, _temp_dir) = setup_test_cache().await; + // Test passes if cache is created successfully + } + + #[tokio::test] + async fn test_empty_cache_status() { + let (cache, _temp_dir) = setup_test_cache().await; + + let status = cache.get_status(None).await; + assert!(status.created.is_empty()); + assert!(status.modified.is_empty()); + assert!(status.removed.is_empty()); + assert!(!status.scan_complete); + } + + #[tokio::test] + async fn test_update_file_status() { + let (cache, _temp_dir) = setup_test_cache().await; + + let file_status = FileStatus { + path: PathBuf::from("test.txt"), + mtime: SystemTime::now(), + size: 100, + hash: Some("abc123".to_string()), + status: FileStatusType::Modified, + }; + + cache.update_file_status(file_status.clone()).await.unwrap(); + + let status = cache.get_status(None).await; + assert_eq!(status.modified.len(), 1); + assert_eq!(status.modified[0].path, PathBuf::from("test.txt")); + assert!(status.created.is_empty()); + assert!(status.removed.is_empty()); + } + + #[tokio::test] + async fn test_batch_update() { + let (cache, _temp_dir) = setup_test_cache().await; + + let statuses = vec![ + FileStatus { + path: PathBuf::from("file1.txt"), + mtime: SystemTime::now(), + size: 100, + hash: None, + status: FileStatusType::Created, + }, + FileStatus { + path: PathBuf::from("file2.txt"), + mtime: SystemTime::now(), + size: 200, + hash: None, + status: FileStatusType::Modified, + }, + FileStatus { + path: PathBuf::from("file3.txt"), + mtime: SystemTime::now(), + size: 0, + hash: None, + status: FileStatusType::Removed, + }, + ]; + + cache.batch_update(statuses).await.unwrap(); + + let status = cache.get_status(None).await; + assert_eq!(status.created.len(), 1); + assert_eq!(status.modified.len(), 1); + assert_eq!(status.removed.len(), 1); + } + + #[tokio::test] + async fn test_status_transitions() { + let (cache, _temp_dir) = setup_test_cache().await; + + let path = PathBuf::from("test.txt"); + + // Start as created + cache.update_file_status(FileStatus { + path: path.clone(), + mtime: SystemTime::now(), + size: 100, + hash: None, + status: FileStatusType::Created, + }).await.unwrap(); + + let status = cache.get_status(None).await; + assert_eq!(status.created.len(), 1); + + // Transition to modified + cache.update_file_status(FileStatus { + path: path.clone(), + mtime: SystemTime::now(), + size: 150, + hash: Some("newhash".to_string()), + status: FileStatusType::Modified, + }).await.unwrap(); + + let status = cache.get_status(None).await; + assert_eq!(status.modified.len(), 1); + assert_eq!(status.created.len(), 1); // Created status is preserved + + // Transition to removed + cache.update_file_status(FileStatus { + path: path.clone(), + mtime: SystemTime::now(), + size: 0, + hash: None, + status: FileStatusType::Removed, + }).await.unwrap(); + + let status = cache.get_status(None).await; + // File was created and removed in same session, so should not appear anywhere + assert_eq!(status.removed.len(), 0); // Not in removed (net effect is nothing) + assert_eq!(status.created.len(), 0); // Removed clears created + assert_eq!(status.modified.len(), 0); // Removed clears modified + } + + #[tokio::test] + async fn test_remove_existing_file() { + let temp_dir = TempDir::new().unwrap(); + let repo_path = temp_dir.path(); + + // Initialize a proper oxen repo + liboxen::repositories::init::init(repo_path).unwrap(); + + let cache = StatusCache::new(repo_path).unwrap(); + + let path = PathBuf::from("existing.txt"); + + // File starts as Modified (existed before watcher, was modified) + cache.update_file_status(FileStatus { + path: path.clone(), + mtime: SystemTime::now(), + size: 100, + hash: None, + status: FileStatusType::Modified, + }).await.unwrap(); + + let status = cache.get_status(None).await; + assert_eq!(status.modified.len(), 1); + assert_eq!(status.created.len(), 0); + + // Now remove it + cache.update_file_status(FileStatus { + path: path.clone(), + mtime: SystemTime::now(), + size: 0, + hash: None, + status: FileStatusType::Removed, + }).await.unwrap(); + + let status = cache.get_status(None).await; + // File existed before session and was removed, should show in removed list + assert_eq!(status.removed.len(), 1); + assert_eq!(status.created.len(), 0); + assert_eq!(status.modified.len(), 0); + } + + #[tokio::test] + async fn test_path_filtering() { + let (cache, _temp_dir) = setup_test_cache().await; + + let statuses = vec![ + FileStatus { + path: PathBuf::from("dir1/file1.txt"), + mtime: SystemTime::now(), + size: 100, + hash: None, + status: FileStatusType::Modified, + }, + FileStatus { + path: PathBuf::from("dir2/file2.txt"), + mtime: SystemTime::now(), + size: 200, + hash: None, + status: FileStatusType::Modified, + }, + ]; + + cache.batch_update(statuses).await.unwrap(); + + // Get all files + let status = cache.get_status(None).await; + assert_eq!(status.modified.len(), 2); + + // Filter by specific path + let filtered = cache.get_status(Some(vec![PathBuf::from("dir1/file1.txt")])).await; + assert_eq!(filtered.modified.len(), 1); + assert_eq!(filtered.modified[0].path, PathBuf::from("dir1/file1.txt")); + } + + #[tokio::test] + async fn test_scan_complete() { + let (cache, _temp_dir) = setup_test_cache().await; + + let status = cache.get_status(None).await; + assert!(!status.scan_complete); + + cache.mark_scan_complete().await.unwrap(); + + let status = cache.get_status(None).await; + assert!(status.scan_complete); + } + + #[tokio::test] + async fn test_clear_cache() { + let (cache, _temp_dir) = setup_test_cache().await; + + // Add some data + let statuses = vec![ + FileStatus { + path: PathBuf::from("file1.txt"), + mtime: SystemTime::now(), + size: 100, + hash: None, + status: FileStatusType::Created, + }, + FileStatus { + path: PathBuf::from("file2.txt"), + mtime: SystemTime::now(), + size: 200, + hash: None, + status: FileStatusType::Modified, + }, + ]; + + cache.batch_update(statuses).await.unwrap(); + cache.mark_scan_complete().await.unwrap(); + + // Verify data exists + let status = cache.get_status(None).await; + assert_eq!(status.created.len(), 1); + assert_eq!(status.modified.len(), 1); + assert!(status.scan_complete); + + // Clear cache + cache.clear().await.unwrap(); + + // Verify cache is empty + let status = cache.get_status(None).await; + assert!(status.created.is_empty()); + assert!(status.modified.is_empty()); + assert!(!status.scan_complete); + } +} \ No newline at end of file diff --git a/oxen-rust/src/watcher/src/cli.rs b/oxen-rust/src/watcher/src/cli.rs new file mode 100644 index 000000000..8bee18ada --- /dev/null +++ b/oxen-rust/src/watcher/src/cli.rs @@ -0,0 +1,33 @@ +use clap::{Parser, Subcommand}; +use std::path::PathBuf; + +#[derive(Parser)] +#[command(name = "oxen-watcher")] +#[command(about = "Filesystem watcher daemon for Oxen repositories")] +#[command(version)] +pub struct Args { + #[command(subcommand)] + pub command: Commands, +} + +#[derive(Subcommand)] +pub enum Commands { + /// Start the filesystem watcher for a repository + Start { + /// Path to the repository + #[arg(short, long)] + repo: PathBuf, + }, + /// Stop the filesystem watcher for a repository + Stop { + /// Path to the repository + #[arg(short, long)] + repo: PathBuf, + }, + /// Check if the watcher is running for a repository + Status { + /// Path to the repository + #[arg(short, long)] + repo: PathBuf, + }, +} diff --git a/oxen-rust/src/watcher/src/error.rs b/oxen-rust/src/watcher/src/error.rs new file mode 100644 index 000000000..4d2d7e863 --- /dev/null +++ b/oxen-rust/src/watcher/src/error.rs @@ -0,0 +1,29 @@ +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum WatcherError { + #[error("IO error: {0}")] + Io(#[from] std::io::Error), + + #[error("Notify error: {0}")] + Notify(#[from] notify::Error), + + #[error("Serialization error: {0}")] + Serialization(#[from] rmp_serde::encode::Error), + + #[error("Deserialization error: {0}")] + Deserialization(#[from] rmp_serde::decode::Error), + + #[error("Oxen error: {0}")] + Oxen(#[from] liboxen::error::OxenError), + + #[error("Repository not found at: {0}")] + RepositoryNotFound(String), + + #[error("Watcher already running")] + #[allow(dead_code)] // Will be used when we implement multiple watcher prevention + AlreadyRunning, + + #[error("Failed to communicate with watcher: {0}")] + Communication(String), +} \ No newline at end of file diff --git a/oxen-rust/src/watcher/src/event_processor.rs b/oxen-rust/src/watcher/src/event_processor.rs new file mode 100644 index 000000000..c0af820c2 --- /dev/null +++ b/oxen-rust/src/watcher/src/event_processor.rs @@ -0,0 +1,129 @@ +use crate::cache::StatusCache; +use crate::protocol::{FileStatus, FileStatusType}; +use liboxen::util; +use log::{debug, error, trace, warn}; +use notify::EventKind; +use notify_debouncer_full::{DebounceEventResult, DebouncedEvent}; +use std::path::PathBuf; +use std::sync::Arc; +use tokio::sync::mpsc; + +#[path = "event_processor_test.rs"] +mod event_processor_test; + +/// Processes filesystem events and updates the cache +pub struct EventProcessor { + cache: Arc, + repo_path: PathBuf, +} + +impl EventProcessor { + pub fn new(cache: Arc, repo_path: PathBuf) -> Self { + Self { cache, repo_path } + } + + /// Run the event processing loop + pub async fn run(self, mut event_rx: mpsc::Receiver) { + loop { + // Wait for debounced events + match event_rx.recv().await { + Some(Ok(events)) => { + // Process the batch of debounced events + self.handle_debounced_events(events).await; + } + Some(Err(errors)) => { + // Log errors from the debouncer + for error in errors { + error!("Debouncer error: {:?}", error); + } + } + None => { + // Channel closed, exit + debug!("Event channel closed, exiting processor"); + break; + } + } + } + } + + /// Handle a batch of debounced events + async fn handle_debounced_events(&self, events: Vec) { + let mut updates = Vec::new(); + + for debounced_event in events { + trace!("Processing debounced event: {:?}", debounced_event); + + let event = &debounced_event.event; + + // Process each path in the event + // Note: .oxen paths are already filtered in the monitor + for path in &event.paths { + // Skip directories for now + if path.is_dir() { + continue; + } + + // Determine the status type based on event kind + let status_type = match event.kind { + EventKind::Create(_) => FileStatusType::Created, + EventKind::Modify(_) => FileStatusType::Modified, + EventKind::Remove(_) => FileStatusType::Removed, + EventKind::Any | EventKind::Access(_) | EventKind::Other => { + // Skip these events + continue; + } + }; + + // Get file metadata if it exists + let (mtime, size) = if let Ok(metadata) = std::fs::metadata(path) { + ( + metadata.modified().unwrap_or(std::time::SystemTime::now()), + metadata.len(), + ) + } else if status_type == FileStatusType::Removed { + // File was removed, use current time and zero size + (std::time::SystemTime::now(), 0) + } else { + // Skip if we can't get metadata for non-removed files + warn!("Could not get metadata for file: {:?}", path); + continue; + }; + + // Convert absolute path to relative path using liboxen + let relative_path = match util::fs::path_relative_to_dir(path, &self.repo_path) { + Ok(rel) => rel, + Err(e) => { + trace!( + "Path not within repo, skipping: {:?} (repo: {:?}, error: {})", + path, + self.repo_path, + e + ); + continue; + } + }; + + debug!( + "Processing event for {:?}: {:?}", + relative_path, status_type + ); + + updates.push(FileStatus { + path: relative_path, + mtime, + size, + hash: None, // Will be computed later if needed + status: status_type, + }); + } + } + + // Batch update the cache + if !updates.is_empty() { + debug!("Updating cache with {} file status changes", updates.len()); + if let Err(e) = self.cache.batch_update(updates).await { + error!("Failed to update cache: {}", e); + } + } + } +} diff --git a/oxen-rust/src/watcher/src/event_processor_test.rs b/oxen-rust/src/watcher/src/event_processor_test.rs new file mode 100644 index 000000000..f979e631b --- /dev/null +++ b/oxen-rust/src/watcher/src/event_processor_test.rs @@ -0,0 +1,305 @@ +#[cfg(test)] +mod tests { + use crate::cache::StatusCache; + use crate::event_processor::EventProcessor; + use notify::EventKind; + use notify_debouncer_full::{DebounceEventResult, DebouncedEvent}; + use notify::Event; + use std::sync::Arc; + use std::time::Duration; + use tempfile::TempDir; + use tokio::sync::mpsc; + use tokio::time; + + async fn setup_test_processor() -> (Arc, mpsc::Sender, TempDir) { + let temp_dir = TempDir::new().unwrap(); + let repo_path = temp_dir.path(); + + // Create a fake .oxen directory + std::fs::create_dir_all(repo_path.join(".oxen")).unwrap(); + + // Initialize an empty oxen repo + liboxen::repositories::init::init(repo_path).unwrap(); + + let cache = Arc::new(StatusCache::new(repo_path).unwrap()); + let (event_tx, event_rx) = mpsc::channel::(100); + + let processor = EventProcessor::new(cache.clone(), repo_path.to_path_buf()); + + // Start processor in background + tokio::spawn(async move { + processor.run(event_rx).await; + }); + + // Give processor time to start + time::sleep(Duration::from_millis(10)).await; + + (cache, event_tx, temp_dir) + } + + fn create_debounced_event(paths: Vec, kind: EventKind) -> DebouncedEvent { + let mut event = Event::new(kind); + event.paths = paths; + DebouncedEvent { + event, + time: std::time::Instant::now(), + } + } + + #[tokio::test] + async fn test_debounced_events() { + let (cache, event_tx, temp_dir) = setup_test_processor().await; + + let test_file = temp_dir.path().join("test.txt"); + std::fs::write(&test_file, "content").unwrap(); + + // Send a debounced create event + let event = create_debounced_event( + vec![test_file.clone()], + EventKind::Create(notify::event::CreateKind::File), + ); + event_tx.send(Ok(vec![event])).await.unwrap(); + + // Wait for processing + time::sleep(Duration::from_millis(150)).await; + + // Check cache was updated + let status = cache.get_status(None).await; + assert_eq!(status.created.len(), 1); + } + + #[tokio::test] + async fn test_ignore_oxen_directory() { + let (cache, event_tx, temp_dir) = setup_test_processor().await; + + let oxen_file = temp_dir.path().join(".oxen").join("some_file.db"); + + let event = create_debounced_event( + vec![oxen_file], + EventKind::Create(notify::event::CreateKind::File), + ); + + event_tx.send(Ok(vec![event])).await.unwrap(); + + // Wait for processing + time::sleep(Duration::from_millis(150)).await; + + // Should have no entries + let status = cache.get_status(None).await; + assert!(status.created.is_empty()); + assert!(status.modified.is_empty()); + assert!(status.removed.is_empty()); + } + + #[tokio::test] + async fn test_ignore_directories() { + let (cache, event_tx, temp_dir) = setup_test_processor().await; + + let dir_path = temp_dir.path().join("some_directory"); + std::fs::create_dir_all(&dir_path).unwrap(); + + let event = create_debounced_event( + vec![dir_path], + EventKind::Create(notify::event::CreateKind::Folder), + ); + + event_tx.send(Ok(vec![event])).await.unwrap(); + + // Wait for processing + time::sleep(Duration::from_millis(150)).await; + + // Should have no entries (directories are skipped) + let status = cache.get_status(None).await; + assert!(status.created.is_empty()); + assert!(status.modified.is_empty()); + } + + #[tokio::test] + async fn test_batch_processing() { + let (cache, event_tx, temp_dir) = setup_test_processor().await; + + let mut events = Vec::new(); + + // Create multiple files and events + for i in 0..5 { + let file_path = temp_dir.path().join(format!("file{}.txt", i)); + std::fs::write(&file_path, format!("content{}", i)).unwrap(); + + events.push(create_debounced_event( + vec![file_path], + EventKind::Create(notify::event::CreateKind::File), + )); + } + + // Send all events as a batch (this is what the debouncer does) + event_tx.send(Ok(events)).await.unwrap(); + + // Wait for batch processing + time::sleep(Duration::from_millis(200)).await; + + // Should have all files + let status = cache.get_status(None).await; + assert_eq!(status.created.len(), 5); + } + + #[tokio::test] + async fn test_event_kinds_mapping() { + let (cache, event_tx, temp_dir) = setup_test_processor().await; + + // Test Create event + let create_file = temp_dir.path().join("created.txt"); + std::fs::write(&create_file, "content").unwrap(); + + event_tx + .send(Ok(vec![create_debounced_event( + vec![create_file.clone()], + EventKind::Create(notify::event::CreateKind::File), + )])) + .await + .unwrap(); + + // Test Modify event + let modify_file = temp_dir.path().join("modified.txt"); + std::fs::write(&modify_file, "content").unwrap(); + + event_tx + .send(Ok(vec![create_debounced_event( + vec![modify_file.clone()], + EventKind::Modify(notify::event::ModifyKind::Data(notify::event::DataChange::Any)), + )])) + .await + .unwrap(); + + // Test Remove event + let remove_file = temp_dir.path().join("removed.txt"); + + event_tx + .send(Ok(vec![create_debounced_event( + vec![remove_file.clone()], + EventKind::Remove(notify::event::RemoveKind::File), + )])) + .await + .unwrap(); + + // Wait for processing + time::sleep(Duration::from_millis(200)).await; + + let status = cache.get_status(None).await; + + // Should have entries in different categories + let total = status.created.len() + + status.modified.len() + + status.removed.len(); + assert!(total > 0, "Should have processed events"); + } + + #[tokio::test] + async fn test_skip_access_events() { + let (cache, event_tx, temp_dir) = setup_test_processor().await; + + let file = temp_dir.path().join("accessed.txt"); + std::fs::write(&file, "content").unwrap(); + + // Send Access event (should be ignored) + event_tx + .send(Ok(vec![create_debounced_event( + vec![file.clone()], + EventKind::Access(notify::event::AccessKind::Read), + )])) + .await + .unwrap(); + + // Send Other event (should be ignored) + event_tx + .send(Ok(vec![create_debounced_event( + vec![file], + EventKind::Other, + )])) + .await + .unwrap(); + + // Wait for processing + time::sleep(Duration::from_millis(150)).await; + + // Should have no entries + let status = cache.get_status(None).await; + assert!(status.created.is_empty()); + assert!(status.modified.is_empty()); + assert!(status.removed.is_empty()); + } + + #[tokio::test] + async fn test_error_handling() { + let (cache, event_tx, _temp_dir) = setup_test_processor().await; + + // Send an error result (simulating debouncer errors) + let errors = vec![ + notify::Error::generic("Test error 1"), + notify::Error::generic("Test error 2"), + ]; + + event_tx.send(Err(errors)).await.unwrap(); + + // Wait for processing + time::sleep(Duration::from_millis(50)).await; + + // Should still be running and cache should be empty + let status = cache.get_status(None).await; + assert!(status.created.is_empty()); + assert!(status.modified.is_empty()); + assert!(status.removed.is_empty()); + } + + #[tokio::test] + async fn test_file_create_then_delete() { + let (cache, event_tx, temp_dir) = setup_test_processor().await; + + let test_file = temp_dir.path().join("test_create_delete.txt"); + + // First create the file and send a create event + std::fs::write(&test_file, "content").unwrap(); + + let create_event = create_debounced_event( + vec![test_file.clone()], + EventKind::Create(notify::event::CreateKind::File), + ); + event_tx.send(Ok(vec![create_event])).await.unwrap(); + + // Wait for processing + time::sleep(Duration::from_millis(150)).await; + + // Verify file is in created list + let status = cache.get_status(None).await; + assert_eq!(status.created.len(), 1, "File should be in created list"); + assert!(status.modified.is_empty()); + assert!(status.removed.is_empty()); + + // Now delete the file and send a remove event + std::fs::remove_file(&test_file).unwrap(); + + let remove_event = create_debounced_event( + vec![test_file.clone()], + EventKind::Remove(notify::event::RemoveKind::File), + ); + event_tx.send(Ok(vec![remove_event])).await.unwrap(); + + // Wait for processing + time::sleep(Duration::from_millis(150)).await; + + // After deletion, file should be removed from created list + // and should either be in removed list or completely gone + let status = cache.get_status(None).await; + + assert!( + status.created.is_empty(), + "File should not be in created list after deletion" + ); + assert!(status.modified.is_empty()); + // The file was created and deleted within the watcher session, + // so it should not appear in any list (net effect is no change) + assert!( + status.removed.is_empty(), + "File created and deleted in same session should not appear in removed list" + ); + } +} \ No newline at end of file diff --git a/oxen-rust/src/watcher/src/ipc.rs b/oxen-rust/src/watcher/src/ipc.rs new file mode 100644 index 000000000..1a07e3bb9 --- /dev/null +++ b/oxen-rust/src/watcher/src/ipc.rs @@ -0,0 +1,197 @@ +use crate::cache::StatusCache; +use crate::error::WatcherError; +use crate::protocol::{WatcherRequest, WatcherResponse}; +use log::{debug, error, info}; +use std::path::PathBuf; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::{UnixListener, UnixStream}; + +/// IPC server that handles client requests +pub struct IpcServer { + repo_path: PathBuf, + cache: Arc, +} + +impl IpcServer { + pub fn new(repo_path: PathBuf, cache: Arc) -> Self { + Self { repo_path, cache } + } + + /// Run the IPC server + pub async fn run(self) -> Result<(), WatcherError> { + let socket_path = self.repo_path.join(".oxen/watcher.sock"); + + // Remove old socket if it exists + if socket_path.exists() { + std::fs::remove_file(&socket_path)?; + } + + // Create the Unix socket listener + let listener = UnixListener::bind(&socket_path)?; + info!("IPC server listening on {}", socket_path.display()); + + // Track last request time for idle timeout + let idle_timeout = Duration::from_secs(600); // 10 minutes + let mut last_request = Instant::now(); + + loop { + // Accept connections with timeout check + tokio::select! { + result = listener.accept() => { + match result { + Ok((stream, _)) => { + last_request = Instant::now(); + + // Handle client in a separate task + let cache = self.cache.clone(); + tokio::spawn(async move { + if let Err(e) = handle_client(stream, cache).await { + error!("Error handling client: {}", e); + } + }); + } + Err(e) => { + error!("Failed to accept connection: {}", e); + } + } + } + + // Check for idle timeout + _ = tokio::time::sleep(Duration::from_secs(60)) => { + if last_request.elapsed() > idle_timeout { + info!("Idle timeout reached, shutting down"); + break; + } + } + } + } + + Ok(()) + } +} + +/// Handle a single client connection +async fn handle_client( + mut stream: UnixStream, + cache: Arc, +) -> Result<(), WatcherError> { + info!("Handling incoming client connection"); + // Read message length (4 bytes, little-endian) + let mut len_buf = [0u8; 4]; + stream.read_exact(&mut len_buf).await?; + let len = u32::from_le_bytes(len_buf) as usize; + + // Sanity check message size (max 10MB) + if len > 10 * 1024 * 1024 { + error!("Message too large: {} bytes", len); + return Err(WatcherError::Communication("Message too large".to_string())); + } + + // Read message body + let mut msg_buf = vec![0u8; len]; + stream.read_exact(&mut msg_buf).await?; + + // Deserialize request + let request = WatcherRequest::from_bytes(&msg_buf)?; + info!("Received request: {:?}", request); + + // Process request + let response = match request { + WatcherRequest::GetStatus { paths } => { + let status = cache.get_status(paths).await; + debug!("Status response:\n{}", status); + WatcherResponse::Status(status) + } + + WatcherRequest::GetSummary => { + let status = cache.get_status(None).await; + WatcherResponse::Summary { + created: status.created.len(), + modified: status.modified.len(), + removed: status.removed.len(), + last_updated: std::time::SystemTime::now(), + } + } + + WatcherRequest::Refresh { paths } => { + // TODO: Implement forced refresh + debug!("Refresh requested for {:?}", paths); + WatcherResponse::Ok + } + + WatcherRequest::Shutdown => { + info!("Shutdown requested via IPC"); + // Send response before shutting down + let response = WatcherResponse::Ok; + send_response(&mut stream, &response).await?; + + // Exit the process + std::process::exit(0); + } + + WatcherRequest::Ping => WatcherResponse::Ok, + }; + + // Send response + send_response(&mut stream, &response).await?; + info!("Sent response"); + + Ok(()) +} + +/// Send a response to the client +async fn send_response( + stream: &mut UnixStream, + response: &WatcherResponse, +) -> Result<(), WatcherError> { + // Serialize response + let msg = response.to_bytes()?; + + // Write length prefix + let len = msg.len() as u32; + stream.write_all(&len.to_le_bytes()).await?; + + // Write message + stream.write_all(&msg).await?; + stream.flush().await?; + + Ok(()) +} + +/// Send a request to the watcher (used by CLI) +pub async fn send_request( + socket_path: &PathBuf, + request: WatcherRequest, +) -> Result { + // Connect to the socket + let mut stream = UnixStream::connect(socket_path) + .await + .map_err(|e| WatcherError::Communication(format!("Failed to connect: {}", e)))?; + + // Serialize request + let msg = request.to_bytes()?; + + // Send length prefix + let len = msg.len() as u32; + stream.write_all(&len.to_le_bytes()).await?; + + // Send message + stream.write_all(&msg).await?; + stream.flush().await?; + + // Read response length + let mut len_buf = [0u8; 4]; + stream.read_exact(&mut len_buf).await?; + let len = u32::from_le_bytes(len_buf) as usize; + + // Read response body + let mut msg_buf = vec![0u8; len]; + stream.read_exact(&mut msg_buf).await?; + + // Deserialize response + let response = WatcherResponse::from_bytes(&msg_buf)?; + + Ok(response) +} diff --git a/oxen-rust/src/watcher/src/lib.rs b/oxen-rust/src/watcher/src/lib.rs new file mode 100644 index 000000000..d2175ae06 --- /dev/null +++ b/oxen-rust/src/watcher/src/lib.rs @@ -0,0 +1,10 @@ +pub mod cache; +pub mod cli; +pub mod error; +pub mod event_processor; +pub mod ipc; +pub mod monitor; +pub mod protocol; + +pub use error::WatcherError; +pub use protocol::{FileStatus, FileStatusType, StatusResult, WatcherRequest, WatcherResponse}; \ No newline at end of file diff --git a/oxen-rust/src/watcher/src/main.rs b/oxen-rust/src/watcher/src/main.rs new file mode 100644 index 000000000..0db0be424 --- /dev/null +++ b/oxen-rust/src/watcher/src/main.rs @@ -0,0 +1,88 @@ +mod cache; +mod cli; +mod error; +mod event_processor; +mod ipc; +mod monitor; +mod protocol; + +use clap::Parser; +use log::info; +use std::path::{Path, PathBuf}; + +use crate::cli::Args; +use crate::error::WatcherError; + +#[tokio::main] +async fn main() -> Result<(), WatcherError> { + env_logger::init(); + + let args = Args::parse(); + + match args.command { + cli::Commands::Start { repo } => { + info!("Starting watcher for repository: {}", repo.display()); + start_watcher(repo).await + } + cli::Commands::Stop { repo } => { + info!("Stopping watcher for repository: {}", repo.display()); + stop_watcher(repo).await + } + cli::Commands::Status { repo } => { + info!("Checking watcher status for repository: {}", repo.display()); + check_status(repo).await + } + } +} + +async fn start_watcher(repo_path: PathBuf) -> Result<(), WatcherError> { + // Check if watcher is already running + if is_watcher_running(&repo_path).await? { + info!("Watcher is already running for this repository"); + return Ok(()); + } + + // Initialize and run the watcher + let watcher = monitor::FileSystemWatcher::new(repo_path)?; + watcher.run().await +} + +async fn stop_watcher(repo_path: PathBuf) -> Result<(), WatcherError> { + let socket_path = repo_path.join(".oxen/watcher.sock"); + + // Send shutdown request + match ipc::send_request(&socket_path, protocol::WatcherRequest::Shutdown).await { + Ok(_) => { + info!("Watcher stopped successfully"); + Ok(()) + } + Err(e) => { + log::warn!("Failed to stop watcher: {}", e); + // Clean up pid file if present + let pid_file = repo_path.join(".oxen/watcher.pid"); + if pid_file.exists() { + std::fs::remove_file(pid_file)?; + } + Ok(()) + } + } +} + +async fn check_status(repo_path: PathBuf) -> Result<(), WatcherError> { + if is_watcher_running(&repo_path).await? { + println!("Watcher is running"); + } else { + println!("Watcher is not running"); + } + Ok(()) +} + +async fn is_watcher_running(repo_path: &Path) -> Result { + let socket_path = repo_path.join(".oxen/watcher.sock"); + + // Try to ping the watcher + match ipc::send_request(&socket_path, protocol::WatcherRequest::Ping).await { + Ok(protocol::WatcherResponse::Ok) => Ok(true), + _ => Ok(false), + } +} diff --git a/oxen-rust/src/watcher/src/monitor.rs b/oxen-rust/src/watcher/src/monitor.rs new file mode 100644 index 000000000..5ba834960 --- /dev/null +++ b/oxen-rust/src/watcher/src/monitor.rs @@ -0,0 +1,218 @@ +use log::{error, info, warn}; +use notify::RecursiveMode; +use notify_debouncer_full::{new_debouncer, DebounceEventResult}; +use std::path::PathBuf; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::mpsc; + +use liboxen::core; +use liboxen::model::LocalRepository; + +use crate::cache::StatusCache; +use crate::error::WatcherError; +use crate::event_processor::EventProcessor; +use crate::ipc::IpcServer; + +/// Main filesystem watcher that coordinates all components +pub struct FileSystemWatcher { + repo_path: PathBuf, + cache: Arc, +} + +impl FileSystemWatcher { + /// Create a new filesystem watcher for a repository + pub fn new(repo_path: PathBuf) -> Result { + // Verify repository exists + if !repo_path.join(".oxen").exists() { + return Err(WatcherError::RepositoryNotFound( + repo_path.display().to_string(), + )); + } + + // Canonicalize the repo path to handle symlinks + let repo_path = repo_path.canonicalize()?; + + let cache = Arc::new(StatusCache::new(&repo_path)?); + + Ok(Self { repo_path, cache }) + } + + /// Run the watcher daemon + pub async fn run(self) -> Result<(), WatcherError> { + info!( + "Starting filesystem watcher for {}", + self.repo_path.display() + ); + + // Write PID file + let pid_file = self.repo_path.join(".oxen/watcher.pid"); + std::fs::write(&pid_file, std::process::id().to_string())?; + + // Create channel for debounced events + let (event_tx, event_rx) = mpsc::channel::(1000); + + // Create the debounced watcher with a 100ms timeout + let mut debouncer = new_debouncer( + Duration::from_millis(100), + None, // No cache override + move |result: DebounceEventResult| { + // Filter out .oxen directory events before sending + let filtered_result = match result { + Ok(events) => { + let filtered: Vec<_> = events + .into_iter() + .filter(|event| { + // Skip events for paths containing .oxen + !event + .event + .paths + .iter() + .any(|p| p.components().any(|c| c.as_os_str() == ".oxen")) + }) + .collect(); + + if filtered.is_empty() { + return; // Don't send empty events + } + Ok(filtered) + } + Err(e) => Err(e), + }; + + // Try to send filtered event, block if channel is full + // TODO: How should we handle this? + let _ = event_tx.blocking_send(filtered_result); + }, + )?; + + // Watch the repository directory + debouncer.watch(&self.repo_path, RecursiveMode::Recursive)?; + + info!("Watching directory: {}", self.repo_path.display()); + + // Start the event processor + let processor = EventProcessor::new(self.cache.clone(), self.repo_path.clone()); + let processor_handle = tokio::spawn(async move { processor.run(event_rx).await }); + + // Start the IPC server + let ipc_server = IpcServer::new(self.repo_path.clone(), self.cache.clone()); + let ipc_handle = tokio::spawn(async move { + if let Err(e) = ipc_server.run().await { + error!("IPC server error: {}", e); + } + }); + + // Start initial scan + let cache_clone = self.cache.clone(); + let repo_path_clone = self.repo_path.clone(); + let _scan_handle = tokio::spawn(async move { + if let Err(e) = initial_scan(repo_path_clone, cache_clone).await { + error!("Initial scan error: {}", e); + } + }); + + // Wait for shutdown signal or handle termination + tokio::select! { + _ = tokio::signal::ctrl_c() => { + info!("Received shutdown signal"); + } + _ = processor_handle => { + warn!("Event processor terminated"); + } + _ = ipc_handle => { + warn!("IPC server terminated"); + } + } + + // Cleanup + info!("Shutting down filesystem watcher"); + drop(debouncer); + + // Remove PID file + let _ = std::fs::remove_file(&pid_file); + + // Remove socket file + let socket_path = self.repo_path.join(".oxen/watcher.sock"); + let _ = std::fs::remove_file(&socket_path); + + Ok(()) + } +} + +/// Perform initial scan of the repository +async fn initial_scan(repo_path: PathBuf, cache: Arc) -> Result<(), WatcherError> { + info!("Starting initial repository scan"); + + // Load the repository + let repo = LocalRepository::from_dir(&repo_path)?; + + // Use liboxen status implementation to establish a baseline. + // Then the watcher tracks changes relative to this baseline. + match liboxen::repositories::status::status(&repo) { + Ok(status) => { + let mut file_statuses = Vec::new(); + + // Convert untracked files to "Created" events + for path in &status.untracked_files { + if let Ok(metadata) = std::fs::metadata(repo_path.join(path)) { + file_statuses.push(crate::protocol::FileStatus { + path: path.clone(), + mtime: metadata.modified().unwrap_or(std::time::SystemTime::now()), + size: metadata.len(), + hash: None, + status: crate::protocol::FileStatusType::Created, + }); + } + } + + // Convert modified files to "Modified" events + for path in &status.modified_files { + if let Ok(metadata) = std::fs::metadata(repo_path.join(path)) { + file_statuses.push(crate::protocol::FileStatus { + path: path.clone(), + mtime: metadata.modified().unwrap_or(std::time::SystemTime::now()), + size: metadata.len(), + hash: None, + status: crate::protocol::FileStatusType::Modified, + }); + } + } + + // Convert removed files to "Removed" events + for path in &status.removed_files { + // For removed files, we can't get metadata since they don't exist + file_statuses.push(crate::protocol::FileStatus { + path: path.clone(), + mtime: std::time::SystemTime::now(), + size: 0, + hash: None, + status: crate::protocol::FileStatusType::Removed, + }); + } + + // Batch update the cache with initial state + let total_files = status.untracked_files.len() + + status.modified_files.len() + + status.removed_files.len(); + if !file_statuses.is_empty() { + cache.batch_update(file_statuses).await?; + info!("Populated cache with {} initial file states", total_files); + } + + cache.mark_scan_complete().await?; + info!("Initial scan complete - established baseline, now tracking filesystem changes"); + } + Err(e) => { + error!("Failed to get initial status: {}", e); + // Mark scan as complete anyway to avoid blocking + cache.mark_scan_complete().await?; + } + } + + // Remove cached ref DB connection so it doesn't block other connections + // TODO: update the ref_manager with the option to NOT cache the connection, + // similar to how we configure the merkle tree node cache + core::refs::remove_from_cache(repo.path)?; + Ok(()) +} diff --git a/oxen-rust/src/watcher/src/protocol.rs b/oxen-rust/src/watcher/src/protocol.rs new file mode 100644 index 000000000..c7ce41548 --- /dev/null +++ b/oxen-rust/src/watcher/src/protocol.rs @@ -0,0 +1,7 @@ +// Re-export protocol types from liboxen in case we move them in the future +pub use liboxen::core::v_latest::watcher_client::{ + FileStatus, FileStatusType, StatusResult, WatcherRequest, WatcherResponse, +}; + +#[path = "protocol_test.rs"] +mod protocol_test; diff --git a/oxen-rust/src/watcher/src/protocol_test.rs b/oxen-rust/src/watcher/src/protocol_test.rs new file mode 100644 index 000000000..1a4ac59b4 --- /dev/null +++ b/oxen-rust/src/watcher/src/protocol_test.rs @@ -0,0 +1,172 @@ +#[cfg(test)] +mod tests { + use crate::protocol::*; + use std::path::PathBuf; + use std::time::SystemTime; + + #[test] + fn test_request_serialization() { + let request = WatcherRequest::GetStatus { + paths: Some(vec![PathBuf::from("/tmp/test")]), + }; + + let bytes = request.to_bytes().unwrap(); + let deserialized = WatcherRequest::from_bytes(&bytes).unwrap(); + + match deserialized { + WatcherRequest::GetStatus { paths } => { + assert!(paths.is_some()); + assert_eq!(paths.unwrap()[0], PathBuf::from("/tmp/test")); + } + _ => panic!("Wrong request type"), + } + } + + #[test] + fn test_response_serialization() { + let response = WatcherResponse::Summary { + created: 3, + modified: 5, + removed: 2, + last_updated: SystemTime::now(), + }; + + let bytes = response.to_bytes().unwrap(); + let deserialized = WatcherResponse::from_bytes(&bytes).unwrap(); + + match deserialized { + WatcherResponse::Summary { created, modified, removed, .. } => { + assert_eq!(created, 3); + assert_eq!(modified, 5); + assert_eq!(removed, 2); + } + _ => panic!("Wrong response type"), + } + } + + #[test] + fn test_status_result_serialization() { + let status_result = StatusResult { + created: vec![FileStatus { + path: PathBuf::from("created.txt"), + mtime: SystemTime::now(), + size: 200, + hash: None, + status: FileStatusType::Created, + }], + modified: vec![FileStatus { + path: PathBuf::from("modified.txt"), + mtime: SystemTime::now(), + size: 100, + hash: Some("hash1".to_string()), + status: FileStatusType::Modified, + }], + removed: vec![PathBuf::from("removed.txt")], + scan_complete: true, + }; + + let response = WatcherResponse::Status(status_result); + let bytes = response.to_bytes().unwrap(); + let deserialized = WatcherResponse::from_bytes(&bytes).unwrap(); + + match deserialized { + WatcherResponse::Status(result) => { + assert_eq!(result.created.len(), 1); + assert_eq!(result.modified.len(), 1); + assert_eq!(result.removed.len(), 1); + assert!(result.scan_complete); + + assert_eq!(result.created[0].path, PathBuf::from("created.txt")); + assert_eq!(result.modified[0].path, PathBuf::from("modified.txt")); + assert_eq!(result.removed[0], PathBuf::from("removed.txt")); + } + _ => panic!("Wrong response type"), + } + } + + #[test] + fn test_all_request_types() { + let requests = vec![ + WatcherRequest::GetStatus { paths: None }, + WatcherRequest::GetSummary, + WatcherRequest::Refresh { + paths: vec![PathBuf::from("/tmp")], + }, + WatcherRequest::Shutdown, + WatcherRequest::Ping, + ]; + + for request in requests { + let bytes = request.to_bytes().unwrap(); + let deserialized = WatcherRequest::from_bytes(&bytes).unwrap(); + + // Just verify it deserializes correctly + match (&request, &deserialized) { + (WatcherRequest::Ping, WatcherRequest::Ping) => {} + (WatcherRequest::Shutdown, WatcherRequest::Shutdown) => {} + (WatcherRequest::GetSummary, WatcherRequest::GetSummary) => {} + _ => {} // Other cases would need deeper comparison + } + } + } + + #[test] + fn test_file_status_type_equality() { + assert_eq!(FileStatusType::Created, FileStatusType::Created); + assert_eq!(FileStatusType::Modified, FileStatusType::Modified); + assert_eq!(FileStatusType::Removed, FileStatusType::Removed); + + assert_ne!(FileStatusType::Created, FileStatusType::Modified); + assert_ne!(FileStatusType::Modified, FileStatusType::Removed); + assert_ne!(FileStatusType::Created, FileStatusType::Removed); + } + + #[test] + fn test_error_response() { + let response = WatcherResponse::Error("Something went wrong".to_string()); + let bytes = response.to_bytes().unwrap(); + let deserialized = WatcherResponse::from_bytes(&bytes).unwrap(); + + match deserialized { + WatcherResponse::Error(msg) => { + assert_eq!(msg, "Something went wrong"); + } + _ => panic!("Wrong response type"), + } + } + + #[test] + fn test_large_payload() { + // Test with many files + let mut modified = Vec::new(); + for i in 0..1000 { + modified.push(FileStatus { + path: PathBuf::from(format!("file{}.txt", i)), + mtime: SystemTime::now(), + size: i as u64, + hash: Some(format!("hash{}", i)), + status: FileStatusType::Modified, + }); + } + + let status_result = StatusResult { + created: vec![], + modified, + removed: vec![], + scan_complete: true, + }; + + let response = WatcherResponse::Status(status_result); + let bytes = response.to_bytes().unwrap(); + let deserialized = WatcherResponse::from_bytes(&bytes).unwrap(); + + match deserialized { + WatcherResponse::Status(result) => { + assert_eq!(result.modified.len(), 1000); + assert_eq!(result.modified[0].path, PathBuf::from("file0.txt")); + assert_eq!(result.modified[999].path, PathBuf::from("file999.txt")); + } + _ => panic!("Wrong response type"), + } + } +} \ No newline at end of file diff --git a/oxen-rust/src/watcher/tests/integration_test.rs b/oxen-rust/src/watcher/tests/integration_test.rs new file mode 100644 index 000000000..936a2c35f --- /dev/null +++ b/oxen-rust/src/watcher/tests/integration_test.rs @@ -0,0 +1,282 @@ +use oxen_watcher::ipc::send_request; +use oxen_watcher::protocol::{WatcherRequest, WatcherResponse}; +use std::path::PathBuf; +use std::time::Duration; +use tempfile::TempDir; +use tokio::process::Command; +use tokio::time; + +/// Helper to get the watcher binary path +fn get_watcher_path() -> PathBuf { + // The test binary is typically in target/{profile}/deps/ + // while the actual binary is in target/{profile}/ + let mut path = std::env::current_exe().unwrap(); + + // Go up from deps directory if we're in it + path.pop(); // Remove test binary name + if path.ends_with("deps") { + path.pop(); // Remove "deps" + } + + // Now we should be in target/{profile}/ + let watcher_path = path.join("oxen-watcher"); + + if !watcher_path.exists() { + panic!( + "oxen-watcher binary not found at {:?}. Run 'cargo build --package oxen-watcher --bin oxen-watcher' first", + watcher_path + ); + } + + watcher_path +} + +#[tokio::test] +async fn test_watcher_lifecycle() { + let temp_dir = TempDir::new().unwrap(); + let repo_path = temp_dir.path(); + + // Initialize an oxen repository + liboxen::repositories::init::init(repo_path).unwrap(); + + let watcher_path = get_watcher_path(); + + // Start the watcher + let mut start_cmd = Command::new(&watcher_path) + .arg("start") + .arg("--repo") + .arg(repo_path) + .spawn() + .expect("Failed to start watcher"); + + // Give it time to start + time::sleep(Duration::from_secs(2)).await; + + // Check status + let status_output = Command::new(&watcher_path) + .arg("status") + .arg("--repo") + .arg(repo_path) + .output() + .await + .expect("Failed to check status"); + + let status_str = String::from_utf8_lossy(&status_output.stdout); + assert!(status_str.contains("running"), "Watcher should be running"); + + // Stop the watcher + let stop_output = Command::new(&watcher_path) + .arg("stop") + .arg("--repo") + .arg(repo_path) + .output() + .await + .expect("Failed to stop watcher"); + + assert!(stop_output.status.success(), "Stop command should succeed"); + + // Give it time to stop + time::sleep(Duration::from_secs(1)).await; + + // Check status again + let status_output2 = Command::new(&watcher_path) + .arg("status") + .arg("--repo") + .arg(repo_path) + .output() + .await + .expect("Failed to check status"); + + let status_str2 = String::from_utf8_lossy(&status_output2.stdout); + assert!( + status_str2.contains("not running"), + "Watcher should not be running" + ); + + // Clean up - ensure process is terminated + let _ = start_cmd.kill().await; +} + +#[tokio::test] +async fn test_watcher_file_detection() { + let temp_dir = TempDir::new().unwrap(); + let repo_path = temp_dir.path(); + + // Initialize an oxen repository + liboxen::repositories::init::init(repo_path).unwrap(); + + let watcher_path = get_watcher_path(); + + // Start the watcher + let mut watcher_process = Command::new(&watcher_path) + .arg("start") + .arg("--repo") + .arg(repo_path) + .spawn() + .expect("Failed to start watcher"); + + // Give it time to start and do initial scan + time::sleep(Duration::from_secs(3)).await; + + // Create a new file + let test_file = repo_path.join("test.txt"); + std::fs::write(&test_file, "test content").unwrap(); + + // Give watcher time to detect the change + time::sleep(Duration::from_secs(1)).await; + + // TODO: Once CLI integration is complete (try_watcher_status() in status.rs), + // we should test that `oxen status` actually detects the new file via the watcher. + // For now we just verify the watcher is running. + + let status_output = Command::new(&watcher_path) + .arg("status") + .arg("--repo") + .arg(repo_path) + .output() + .await + .expect("Failed to check status"); + + assert!(status_output.status.success()); + + // Stop the watcher + Command::new(&watcher_path) + .arg("stop") + .arg("--repo") + .arg(repo_path) + .output() + .await + .expect("Failed to stop watcher"); + + // Clean up + let _ = watcher_process.kill().await; +} + +#[tokio::test] +async fn test_multiple_watcher_prevention() { + let temp_dir = TempDir::new().unwrap(); + let repo_path = temp_dir.path(); + + // Initialize an oxen repository + liboxen::repositories::init::init(repo_path).unwrap(); + + let watcher_path = get_watcher_path(); + + // Start the first watcher + let mut first_watcher = Command::new(&watcher_path) + .arg("start") + .arg("--repo") + .arg(repo_path) + .spawn() + .expect("Failed to start first watcher"); + + // Give it time to start + time::sleep(Duration::from_secs(2)).await; + + // Try to start a second watcher (should not create a new one) + let second_output = Command::new(&watcher_path) + .arg("start") + .arg("--repo") + .arg(repo_path) + .output() + .await + .expect("Failed to run second start command"); + + // The second start should succeed but not create a new watcher + assert!(second_output.status.success()); + + // Stop the watcher + Command::new(&watcher_path) + .arg("stop") + .arg("--repo") + .arg(repo_path) + .output() + .await + .expect("Failed to stop watcher"); + + // Clean up + let _ = first_watcher.kill().await; +} + +#[tokio::test] +async fn test_watcher_reports_relative_paths() { + let temp_dir = TempDir::new().unwrap(); + let repo_path = temp_dir.path(); + + // Initialize an oxen repository + liboxen::repositories::init::init(repo_path).unwrap(); + + let watcher_path = get_watcher_path(); + + // Start the watcher + let mut watcher_process = Command::new(&watcher_path) + .arg("start") + .arg("--repo") + .arg(repo_path) + .spawn() + .expect("Failed to start watcher"); + + // Give it time to start and do initial scan + time::sleep(Duration::from_secs(1)).await; + + // Create test files in different directories + std::fs::write(repo_path.join("root_file.txt"), "root content").unwrap(); + std::fs::create_dir_all(repo_path.join("subdir")).unwrap(); + std::fs::write(repo_path.join("subdir/nested_file.txt"), "nested content").unwrap(); + + // Give watcher time to detect the changes + time::sleep(Duration::from_millis(500)).await; + + // Query the watcher via IPC + let socket_path = repo_path.join(".oxen/watcher.sock"); + let request = WatcherRequest::GetStatus { paths: None }; + let response = send_request(&socket_path, request) + .await + .expect("Failed to send request"); + + // Verify the response contains relative paths + if let WatcherResponse::Status(status) = response { + // Check that all created file paths are relative + for file_status in &status.created { + assert!( + !file_status.path.is_absolute(), + "Path should be relative, got: {:?}", + file_status.path + ); + assert!( + !file_status.path.starts_with("/"), + "Path should not start with /, got: {:?}", + file_status.path + ); + } + + // Verify specific files are present with correct relative paths + let paths: Vec<_> = status + .created + .iter() + .map(|f| f.path.to_string_lossy().to_string()) + .collect(); + assert!( + paths.contains(&"root_file.txt".to_string()), + "Should contain root_file.txt" + ); + assert!( + paths.contains(&"subdir/nested_file.txt".to_string()), + "Should contain subdir/nested_file.txt" + ); + } else { + panic!("Expected Status response, got: {:?}", response); + } + + // Stop the watcher + Command::new(&watcher_path) + .arg("stop") + .arg("--repo") + .arg(repo_path) + .output() + .await + .expect("Failed to stop watcher"); + + // Clean up + let _ = watcher_process.kill().await; +}