Conversation
|
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the WalkthroughA new Rust-based filesystem watcher daemon for Oxen repositories was introduced under Changes
Sequence Diagram(s)sequenceDiagram
participant CLI
participant IPC_Server
participant StatusCache
participant EventProcessor
participant FileSystem
CLI->>IPC_Server: Send WatcherRequest (e.g., Start, Status, Stop)
alt Start
IPC_Server->>StatusCache: Initialize cache
IPC_Server->>EventProcessor: Start event loop
EventProcessor->>FileSystem: Watch for changes
FileSystem-->>EventProcessor: Notify events
EventProcessor->>StatusCache: Update file statuses
IPC_Server-->>CLI: Watcher started (WatcherResponse)
else Status
IPC_Server->>StatusCache: Query status
StatusCache-->>IPC_Server: StatusResult
IPC_Server-->>CLI: WatcherResponse::Status
else Stop
IPC_Server-->>CLI: WatcherResponse::Ok
IPC_Server->>IPC_Server: Shutdown
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~40 minutes Poem
✨ Finishing touches🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
@CodeRabbit review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (19)
oxen-rust/Cargo.toml (1)
149-151: Workspace member addition looks good; consider centralizing shared deps via workspace.dependencies
- The new member "src/watcher" is correctly added. No blockers.
- Optional: To keep versions consistent across crates (tokio, clap, serde, log, env_logger, chrono, notify, thiserror, etc.), define them once in [workspace.dependencies] and consume them from member crates. This reduces duplicate compilations and version drift.
Example addition in this file:
[workspace.dependencies] tokio = { version = "1.32.0", features = ["full"] } clap = { version = "4.4.2", features = ["cargo", "derive"] } serde = { version = "1.0.188", features = ["derive"] } serde_json = "1.0.106" log = "0.4.20" env_logger = "0.11.3" chrono = "0.4.30" notify = "6.1" thiserror = "2.0"oxen-rust/src/watcher/src/lib.rs (1)
1-10: Good crate root; add crate-level docs and consider explicit public surface
- Re-exports of WatcherError/WatcherRequest/WatcherResponse look right.
- Consider a crate-level doc comment to describe the crate and its public API.
- If only the CLI binary should use internal modules (cache, event_processor, ipc, monitor), keep those public only if you intend external crates to consume them. Otherwise, re-export a smaller API and document it.
Example additions:
+//! Filesystem watcher library for Oxen repositories. +//! Exposes protocol types and error for external use; internal modules are used by the binary. pub mod cache; pub mod cli; pub mod error; pub mod event_processor; pub mod ipc; pub mod monitor; pub mod protocol; pub use error::WatcherError; pub use protocol::{WatcherRequest, WatcherResponse};oxen-rust/src/watcher/src/cli.rs (3)
1-3: Minor: add ValueHint for better shell completions on repo pathsProviding a dir path hint improves UX in shells that leverage clap’s completion metadata.
-use clap::{Parser, Subcommand}; +use clap::{Parser, Subcommand, ValueHint};
16-20: Add directory path hints to all repo argsThis helps users with shell completions and conveys intent in help text.
- #[arg(short, long)] + #[arg(short, long, value_hint = ValueHint::DirPath)] repo: PathBuf,Apply the same change to Stop and Status.
Also applies to: 22-26, 28-32
13-33: Optional: de-duplicate the repeated repo argument using a shared Args structThis reduces repetition and keeps arg semantics consistent across subcommands.
+use clap::Args as ClapArgs; + #[derive(Subcommand)] pub enum Commands { - /// Start the filesystem watcher for a repository - Start { - /// Path to the repository - #[arg(short, long)] - repo: PathBuf, - }, - /// Stop the filesystem watcher for a repository - Stop { - /// Path to the repository - #[arg(short, long)] - repo: PathBuf, - }, - /// Check if the watcher is running for a repository - Status { - /// Path to the repository - #[arg(short, long)] - repo: PathBuf, - }, + /// Start the filesystem watcher for a repository + Start(RepoArg), + /// Stop the filesystem watcher for a repository + Stop(RepoArg), + /// Check if the watcher is running for a repository + Status(RepoArg), } + +#[derive(ClapArgs, Debug, Clone)] +pub struct RepoArg { + /// Path to the repository + #[arg(short, long, value_hint = ValueHint::DirPath)] + pub repo: PathBuf, +}oxen-rust/src/watcher/Cargo.toml (1)
18-30: Align dependency versions with workspace to avoid duplication and driftGiven the main workspace already pins versions for several crates, consider consuming them via workspace inheritance.
Two-step change:
- Add these to the root Cargo.toml under [workspace.dependencies] (see prior comment).
- Then update this file to consume them:
-notify = "6.1" -tokio = { version = "1", features = ["full"] } -rmp-serde = "1.3.0" -serde = { version = "1.0", features = ["derive"] } -serde_json = "1.0" -clap = { version = "4.4.2", features = ["cargo", "derive"] } -log = "0.4" -env_logger = "0.11" -chrono = "0.4" -thiserror = "2.0" +notify = { workspace = true } +tokio = { workspace = true } +rmp-serde = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +clap = { workspace = true } +log = { workspace = true } +env_logger = { workspace = true } +chrono = { workspace = true } +thiserror = { workspace = true }Additionally, if this crate is internal-only, consider:
[package] # ... publish = falseoxen-rust/src/watcher/src/error.rs (1)
3-29: Strengthen error typing and add common conversions; provide a Result alias
- Prefer RepositoryNotFound(PathBuf) over String for stronger typing.
- Add serde_json and tokio JoinError conversions frequently encountered in IPC/async code.
- Provide a Result alias for ergonomics.
+use std::path::PathBuf; use thiserror::Error; #[derive(Error, Debug)] pub enum WatcherError { @@ - #[error("Repository not found at: {0}")] - RepositoryNotFound(String), + #[error("Repository not found at: {0}")] + RepositoryNotFound(PathBuf), @@ #[error("Failed to communicate with watcher: {0}")] Communication(String), + + #[error("JSON serialization error: {0}")] + SerdeJson(#[from] serde_json::Error), + + #[error("Task join error: {0}")] + Join(#[from] tokio::task::JoinError), } + +pub type Result<T> = std::result::Result<T, WatcherError>;oxen-rust/src/watcher/src/cache_test.rs (1)
23-27: Add meaningful assertions totest_cache_new.The test currently passes by mere compilation and offers no behavioural guarantees.
Consider at minimum asserting that an empty cache returns zero entries andscan_completeisfalse, mirroring the checks you already do intest_empty_cache_status.oxen-rust/src/watcher/src/protocol_test.rs (1)
92-116: Strengthentest_all_request_types.
GetStatusandRefreshvariants aren’t compared after the round-trip, so field integrity isn’t exercised.
Either pattern-match each variant or reuseassert_eq!(request, deserialized)when the enums derivePartialEq.oxen-rust/src/watcher/src/event_processor_test.rs (2)
38-60: Make the coalescing test deterministic.Because the file is never created, the processor may drop the events, yielding
0entries and still passing (<= 1).
Create the file before sending events and assert== 1to confirm true coalescing.
140-192: Tighten assertions in event-kind mapping test.Verify that
added,modified, andremovedeach contain exactly one expected path; otherwise the test could pass with an unexpected distribution.oxen-rust/src/watcher/src/monitor.rs (1)
77-81: Keep or await the initial-scan handle.Dropping the JoinHandle means scan errors are silenced and the task may run after shutdown.
Store the handle and include it in thetokio::select!orjoin!to catch panics and ensure orderly shutdown.oxen-rust/src/watcher/src/main.rs (2)
38-48: Consider daemonisingstartso the CLI returns immediately.Running
FileSystemWatcher::runinline holds the terminal open and prevents further commands.
Options: spawn a detached child process, usetokio::spawnplusstd::process::exit(0), or document that users should run it in the background.
59-66: Clean up stale socket on failed shutdown.If sending
Shutdownfails you remove the PID file but leave.oxen/watcher.sock, causing future connection attempts to hang.
Remove the socket file in the same cleanup block.oxen-rust/src/watcher/src/cache.rs (1)
94-124: Consider narrowing write-lock scope
update_file_statusholds the write lock for the entire match block and timestamp update. Moving the match into a separate block lets the lock drop immediately after mutation, reducing contention for concurrent readers.oxen-rust/src/watcher/src/event_processor.rs (1)
116-128:metadatacall inside tight loop may hurt throughputFetching metadata for every event incurs disk I/O even when the information is unused later. Consider:
- For
Create/Modifyevents, defer metadata retrieval to a background task that hashes files anyway.- Use
symlink_metadatato avoid following symlinks if that matches project policy.[performance]
oxen-rust/src/watcher/src/ipc.rs (1)
24-33: Ensure socket directory exists
UnixListener::bind(&socket_path)will fail if.oxen/does not exist (e.g. corrupted repo). Addstd::fs::create_dir_all(socket_path.parent().unwrap())?before binding.oxen-rust/src/watcher/tests/integration_test.rs (1)
95-97: Suppress potentialkillerrorAfter a successful
stop, the child may already have exited, sokill().awaitcan returnErr(NoSuchProcess). Considerlet _ = start_cmd.try_wait()or ignore the result withif let Err(e)=... { debug!("already exited: {}",e); }.oxen-rust/src/watcher/src/protocol.rs (1)
47-56: Exposelast_updatedinStatusResultfor consumers
StatusCachetrackslast_update, but the field is absent here. Surfacing it would let clients decide if cached data is stale without an extra summary call.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (15)
oxen-rust/Cargo.toml(1 hunks)oxen-rust/src/watcher/Cargo.toml(1 hunks)oxen-rust/src/watcher/src/cache.rs(1 hunks)oxen-rust/src/watcher/src/cache_test.rs(1 hunks)oxen-rust/src/watcher/src/cli.rs(1 hunks)oxen-rust/src/watcher/src/error.rs(1 hunks)oxen-rust/src/watcher/src/event_processor.rs(1 hunks)oxen-rust/src/watcher/src/event_processor_test.rs(1 hunks)oxen-rust/src/watcher/src/ipc.rs(1 hunks)oxen-rust/src/watcher/src/lib.rs(1 hunks)oxen-rust/src/watcher/src/main.rs(1 hunks)oxen-rust/src/watcher/src/monitor.rs(1 hunks)oxen-rust/src/watcher/src/protocol.rs(1 hunks)oxen-rust/src/watcher/src/protocol_test.rs(1 hunks)oxen-rust/src/watcher/tests/integration_test.rs(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: Test Suite / Test Suite (macos-latest)
- GitHub Check: Test Suite / Test Suite (windows-latest)
- GitHub Check: Test Suite / Test Suite (ubuntu-latest)
- GitHub Check: Lint / Cargo check, format, clippy + Ruff
🔇 Additional comments (1)
oxen-rust/src/watcher/src/event_processor.rs (1)
61-64: Correct.oxendirectory filterComparing
OsStrto&str(c.as_os_str() == ".oxen") does not compile on stable Rust.- if path.components().any(|c| c.as_os_str() == ".oxen") { + if path.components().any(|c| c.as_os_str() == std::ffi::OsStr::new(".oxen")) {Likely an incorrect or invalid review comment.
oxen-rust/src/watcher/src/cache.rs
Outdated
| let (modified, added, removed, untracked) = if let Some(paths) = paths { | ||
| let path_set: std::collections::HashSet<_> = paths.iter().collect(); | ||
|
|
||
| ( | ||
| cache.modified.values() | ||
| .filter(|f| path_set.contains(&f.path)) | ||
| .cloned() | ||
| .collect(), |
There was a problem hiding this comment.
Fix HashSet key type to avoid compile-time mismatch
path_set is declared as HashSet<&PathBuf>, yet contains(&f.path) provides a &PathBuf, producing a &&PathBuf comparison that does not implement Eq/Hash. This will not compile.
- let path_set: std::collections::HashSet<_> = paths.iter().collect();
+ let path_set: std::collections::HashSet<PathBuf> =
+ paths.into_iter().collect(); // own the PathBufsSubsequently call path_set.contains(&f.path) unchanged.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let (modified, added, removed, untracked) = if let Some(paths) = paths { | |
| let path_set: std::collections::HashSet<_> = paths.iter().collect(); | |
| ( | |
| cache.modified.values() | |
| .filter(|f| path_set.contains(&f.path)) | |
| .cloned() | |
| .collect(), | |
| let (modified, added, removed, untracked) = if let Some(paths) = paths { | |
| let path_set: std::collections::HashSet<PathBuf> = | |
| paths.into_iter().collect(); // own the PathBufs | |
| ( | |
| cache.modified.values() | |
| .filter(|f| path_set.contains(&f.path)) | |
| .cloned() | |
| .collect(), |
🤖 Prompt for AI Agents
In oxen-rust/src/watcher/src/cache.rs around lines 53 to 60, the HashSet
path_set is created with keys of type &PathBuf, but contains is called with
&f.path which is a &PathBuf, resulting in a &&PathBuf lookup causing a type
mismatch. To fix this, change the HashSet declaration to store PathBuf values
directly instead of references, so that contains(&f.path) works correctly
without double referencing.
| WatcherRequest::Shutdown => { | ||
| info!("Shutdown requested via IPC"); | ||
| // Send response before shutting down | ||
| let response = WatcherResponse::Ok; | ||
| send_response(&mut stream, &response).await?; | ||
|
|
||
| // Exit the process | ||
| std::process::exit(0); | ||
| } |
There was a problem hiding this comment.
🛠️ Refactor suggestion
Avoid process::exit from inside a spawned task
Calling std::process::exit(0) here kills the entire watcher immediately, bypassing graceful shutdown of the IPC listener and event-processing tasks.
Return a shutdown signal to the parent task instead (e.g. via broadcast, oneshot, or Notify) and let main join all tasks before exiting.
🤖 Prompt for AI Agents
In oxen-rust/src/watcher/src/ipc.rs around lines 123 to 131, avoid calling
std::process::exit(0) inside the spawned task as it immediately terminates the
entire watcher. Instead, remove the process exit call and implement a mechanism
to send a shutdown signal (using broadcast, oneshot, or Notify) back to the
parent task. Modify the parent task to listen for this signal and perform a
graceful shutdown by joining all IPC listener and event-processing tasks before
exiting the process.
oxen-rust/src/watcher/src/monitor.rs
Outdated
| let mut watcher = notify::recommended_watcher(move |res: Result<Event, notify::Error>| { | ||
| match res { | ||
| Ok(event) => { | ||
| // Try to send event, drop if channel is full | ||
| let _ = event_tx.blocking_send(event); | ||
| } | ||
| Err(e) => error!("Filesystem watch error: {}", e), | ||
| } | ||
| })?; |
There was a problem hiding this comment.
Replace blocking_send with non-blocking try_send.
blocking_send will wait when the channel is full, potentially stalling the notify thread and losing future events.
If the intent is to drop events when saturated (per the comment), use:
- let _ = event_tx.blocking_send(event);
+ let _ = event_tx.try_send(event);or increase channel capacity and log drops.
🤖 Prompt for AI Agents
In oxen-rust/src/watcher/src/monitor.rs around lines 48 to 56, replace the use
of event_tx.blocking_send with event_tx.try_send to avoid blocking the notify
thread when the channel is full. This change aligns with the intent to drop
events if the channel is saturated. Additionally, handle the Result from
try_send to silently drop events or optionally log when drops occur. This
prevents stalling and potential loss of future events.
2153209 to
40f2b1c
Compare
2b7a0ca to
036241d
Compare
[ci skip]
[ci-skip]
036241d to
cfd4553
Compare
Summary by CodeRabbit
New Features
Bug Fixes
Tests