From 27ead9eff6d8a0402cdc82a2be92906a108bc1b6 Mon Sep 17 00:00:00 2001 From: Vincent Prouillet Date: Tue, 27 Jan 2026 12:24:15 +0100 Subject: [PATCH 1/4] Audit --- README.md | 145 ++--------------------------------------------- dvs/src/audit.rs | 104 +++++++++++++++++++++++++++++++++ dvs/src/file.rs | 3 + dvs/src/lib.rs | 2 + dvs/src/lock.rs | 30 ++++++++++ 5 files changed, 145 insertions(+), 139 deletions(-) create mode 100644 dvs/src/audit.rs create mode 100644 dvs/src/lock.rs diff --git a/README.md b/README.md index 95e25b5..b42513c 100644 --- a/README.md +++ b/README.md @@ -2,145 +2,12 @@ Rewrite of `dvs`, the data-version-control system made by A2-AI. -DVS (Data Version System) is a tool for versioning large or sensitive files under Git without tracking the file content directly. It uses content-addressable storage with blake3 hashing. +DVS (Data Version System) is a tool for versioning large or sensitive files under Git without tracking the file content directly. -## Installation -The CLI binary is named `dvs`. Install from source: +## TODOs -```bash -# Install with locked dependencies (recommended) -cargo install --path dvs-cli --locked - -# Force reinstall if already installed -cargo install --path dvs-cli --locked --force -``` - -Or build directly: - -```bash -cargo build -p dvs-cli --release -# Binary will be at target/release/dvs -``` - -## Usage - -```bash -# Initialize DVS in a repository -dvs init - -# Add files to DVS tracking -dvs add - -# Restore files from storage -dvs get - -# Check file status -dvs status [files...] - -# Push objects to remote -dvs push [--remote URL] - -# Pull objects from remote -dvs pull [--remote URL] - -# Materialize files from manifest -dvs materialize [files...] - -# View reflog history -dvs log [-n N] - -# Rollback to previous state -dvs rollback -``` - -### Batch Operations - -Commands that accept file arguments also support `--batch` to read paths from stdin: - -```bash -# Add files listed in a file -cat files.txt | dvs add --batch - -# Process output from find -find . -name "*.csv" | dvs add --batch - -# Batch format supports comments and blank lines -echo "data.csv -# This is a comment -results.json" | dvs add --batch -``` - -### Output Formats - -All commands support `--format json` for machine-readable output: - -```bash -dvs status --format json -dvs add data.csv --format json -``` - -Use `--quiet` to suppress non-error output, or `--output null` to discard output entirely. - -## Development - -### Building - -```bash -# Build workspace (dvs-core, dvs-cli) -just build - -# Build R package -just rpkg-build - -# Build everything -just build-all -``` - -### Testing - -```bash -# Run workspace tests -just test - -# Run R package Rust tests -just rpkg-test - -# Run all tests -just test-all -``` - -### R Package Maintenance - -The R package (`dvsR`) uses vendored miniextendr crates for CRAN compliance. When developing with a local miniextendr checkout, use these commands to keep vendored sources up to date: - -```bash -# Automatic staleness detection (recommended) -# Re-vendors only if miniextendr sources have changed -just rpkg-vendor-detect - -# Force re-vendor (always updates vendored crates) -just rpkg-vendor-force - -# Custom miniextendr path -just rpkg-vendor-with-staleness /path/to/miniextendr - -# Configure R package (generates Cargo.toml, Makevars, etc.) -just rpkg-configure - -# Install R package -just rpkg-install -``` - -### Code Quality - -```bash -# Format code -just fmt - -# Run clippy -just clippy - -# Run all CI checks -just ci -``` +- Azure backend +- GC? +- dvs remove? +- integrity check? would need to read the file again after saving it \ No newline at end of file diff --git a/dvs/src/audit.rs b/dvs/src/audit.rs new file mode 100644 index 0000000..4ecf091 --- /dev/null +++ b/dvs/src/audit.rs @@ -0,0 +1,104 @@ +use std::io::{BufRead, BufReader, Write}; +use std::path::{Path, PathBuf}; + +use anyhow::Result; +use fs_err::{File, OpenOptions}; +use jiff::Zoned; +use serde::{Deserialize, Serialize}; + +use crate::file::Hashes; +use crate::lock::FileLock; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum Operation { + Add, + Get, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum Outcome { + Success, + Failure(String), +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AuditFile { + pub path: String, + pub hashes: Hashes, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AuditEntry { + pub timestamp: String, + pub operation: Operation, + pub user: String, + pub files: Vec, + pub outcome: Outcome, +} + +impl AuditEntry { + /// Creates a new audit entry with the current timestamp and user. + pub fn new(operation: Operation, files: Vec, outcome: Outcome) -> Self { + let timestamp = Zoned::now() + .timestamp() + .strftime("%Y-%m-%dT%H:%M:%SZ") + .to_string(); + let user = whoami::username().unwrap_or_else(|_| "unknown".to_string()); + + Self { + timestamp, + operation, + user, + files, + outcome, + } + } +} + +#[derive(Debug, Clone, PartialEq)] +pub struct AuditLog { + path: PathBuf, +} + +impl AuditLog { + pub fn new(path: impl AsRef) -> Self { + Self { + path: path.as_ref().to_path_buf(), + } + } + + pub fn log(&self, entry: &AuditEntry) -> Result<()> { + let _lock = FileLock::acquire(&self.path)?; + let mut file = OpenOptions::new() + .create(true) + .append(true) + .open(&self.path)?; + + let json = serde_json::to_string(entry)?; + writeln!(file, "{}", json)?; + Ok(()) + } + + pub fn read(&self) -> Result> { + if !self.path.exists() { + return Ok(Vec::new()); + } + + let file = File::open(&self.path)?; + let reader = BufReader::new(file); + let mut entries = Vec::new(); + + for line in reader.lines() { + let line = line?; + if line.trim().is_empty() { + continue; + } + let entry: AuditEntry = serde_json::from_str(&line)?; + entries.push(entry); + } + + Ok(entries) + } +} diff --git a/dvs/src/file.rs b/dvs/src/file.rs index b37744e..cdb6542 100644 --- a/dvs/src/file.rs +++ b/dvs/src/file.rs @@ -6,6 +6,7 @@ use serde::{Deserialize, Serialize}; use walkdir::WalkDir; use crate::backends::Backend; +use crate::lock::FileLock; use crate::paths::DvsPaths; #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] @@ -96,6 +97,8 @@ impl FileMetadata { relative_path: impl AsRef, ) -> Result { let dvs_file_path = paths.metadata_path(relative_path.as_ref()); + // Prevent concurrent edit of the same metadata file + let _lock = FileLock::acquire(&dvs_file_path)?; let dvs_file_exists = dvs_file_path.is_file(); let storage_exists = backend.exists(&self.hashes.md5)?; diff --git a/dvs/src/lib.rs b/dvs/src/lib.rs index 966bed3..79f3419 100644 --- a/dvs/src/lib.rs +++ b/dvs/src/lib.rs @@ -1,7 +1,9 @@ +pub mod audit; pub mod backends; pub mod config; pub mod file; pub mod init; +mod lock; pub mod paths; pub use backends::Backend; diff --git a/dvs/src/lock.rs b/dvs/src/lock.rs new file mode 100644 index 0000000..8e26f29 --- /dev/null +++ b/dvs/src/lock.rs @@ -0,0 +1,30 @@ +use std::path::{Path, PathBuf}; +use std::thread; +use std::time::Duration; + +use anyhow::{Result, bail}; +use fs_err as fs; + +pub struct FileLock(PathBuf); + +impl FileLock { + pub fn acquire(path: &Path) -> Result { + let lock_path = path.with_extension("lock"); + + for _ in 0..50 { + if fs::File::create(lock_path.clone()).is_ok() { + return Ok(Self(lock_path.to_path_buf())); + } + + thread::sleep(Duration::from_millis(100)); + } + + bail!("Timeout acquiring lock for {}", path.display()) + } +} + +impl Drop for FileLock { + fn drop(&mut self) { + let _ = std::fs::remove_file(&self.0); + } +} From b445f55cd90baf95dff79af205696418e2b73b70 Mon Sep 17 00:00:00 2001 From: Vincent Prouillet Date: Tue, 27 Jan 2026 15:01:42 +0100 Subject: [PATCH 2/4] Pick hash --- Cargo.lock | 12 +++++ Cargo.toml | 1 + README.md | 4 +- dvs/Cargo.toml | 1 + dvs/README.md | 10 ++-- dvs/src/audit.rs | 94 +++++--------------------------- dvs/src/backends/local.rs | 102 +++++++++++++++++++++++------------ dvs/src/backends/mod.rs | 17 +++--- dvs/src/file.rs | 111 +++++++++++++++++++------------------- dvs/src/hashes.rs | 42 +++++++++++++++ dvs/src/lib.rs | 5 +- dvs/src/lock.rs | 30 ----------- dvs/src/paths.rs | 9 +++- 13 files changed, 223 insertions(+), 215 deletions(-) create mode 100644 dvs/src/hashes.rs delete mode 100644 dvs/src/lock.rs diff --git a/Cargo.lock b/Cargo.lock index 0178e09..4ad0cef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -220,6 +220,7 @@ dependencies = [ "serde_json", "tempfile", "toml", + "uuid", "walkdir", "whoami", ] @@ -704,6 +705,17 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" +[[package]] +name = "uuid" +version = "1.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee48d38b119b0cd71fe4141b30f5ba9c7c5d9f4e7a3a8b4a674e4b6ef789976f" +dependencies = [ + "getrandom", + "js-sys", + "wasm-bindgen", +] + [[package]] name = "walkdir" version = "2.5.0" diff --git a/Cargo.toml b/Cargo.toml index 25e28af..de01ac7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,3 +33,4 @@ md5 = "0.8.0" jiff = "0.2.18" anyhow = "1.0.100" whoami = "2" +uuid = "1.20.0" diff --git a/README.md b/README.md index b42513c..752f455 100644 --- a/README.md +++ b/README.md @@ -10,4 +10,6 @@ DVS (Data Version System) is a tool for versioning large or sensitive files unde - Azure backend - GC? - dvs remove? -- integrity check? would need to read the file again after saving it \ No newline at end of file +- integrity check? would need to read the file again after saving it +- compression? +- migrate from dvs1 \ No newline at end of file diff --git a/dvs/Cargo.toml b/dvs/Cargo.toml index 9f84abc..411782a 100644 --- a/dvs/Cargo.toml +++ b/dvs/Cargo.toml @@ -21,6 +21,7 @@ md5.workspace = true jiff.workspace = true anyhow.workspace = true whoami.workspace = true +uuid = { version = "1.20.0", features = ["v4"] } [target.'cfg(unix)'.dependencies] nix = { version = "0.31", features = ["user", "fs"] } diff --git a/dvs/README.md b/dvs/README.md index d78b93c..bf46b20 100644 --- a/dvs/README.md +++ b/dvs/README.md @@ -1,9 +1,9 @@ ``` -rm -rf ../.dvs ../dvs.toml ../.storage && cargo run --features=cli -- init /home/vincent/Code/a2-ai/dvsexperimental/dvs/.storage -cargo run --features=cli -- add README.md -cargo run --features=cli -- status +rm -rf .dvs dvs.toml .storage && cargo run -- init /home/vincent/Code/a2-ai/dvs2/.storage +cargo run -- add README.md +cargo run -- status rm README.md -cargo run --features=cli -- get README.md -cargo run --features=cli -- status +cargo run -- get README.md +cargo run -- status ``` \ No newline at end of file diff --git a/dvs/src/audit.rs b/dvs/src/audit.rs index 4ecf091..5e246de 100644 --- a/dvs/src/audit.rs +++ b/dvs/src/audit.rs @@ -1,104 +1,34 @@ -use std::io::{BufRead, BufReader, Write}; -use std::path::{Path, PathBuf}; +use std::path::PathBuf; -use anyhow::Result; -use fs_err::{File, OpenOptions}; -use jiff::Zoned; +use crate::Hashes; +use jiff::Timestamp; use serde::{Deserialize, Serialize}; - -use crate::file::Hashes; -use crate::lock::FileLock; - -#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] -#[serde(rename_all = "lowercase")] -pub enum Operation { - Add, - Get, -} - -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -#[serde(rename_all = "lowercase")] -pub enum Outcome { - Success, - Failure(String), -} +use uuid::Uuid; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct AuditFile { - pub path: String, + pub path: PathBuf, pub hashes: Hashes, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct AuditEntry { - pub timestamp: String, - pub operation: Operation, + pub operation_id: String, + pub timestamp: i64, pub user: String, - pub files: Vec, - pub outcome: Outcome, + pub file: AuditFile, } impl AuditEntry { - /// Creates a new audit entry with the current timestamp and user. - pub fn new(operation: Operation, files: Vec, outcome: Outcome) -> Self { - let timestamp = Zoned::now() - .timestamp() - .strftime("%Y-%m-%dT%H:%M:%SZ") - .to_string(); + pub fn new(operation_id: Uuid, file: AuditFile) -> Self { + let timestamp = Timestamp::now().as_second(); let user = whoami::username().unwrap_or_else(|_| "unknown".to_string()); Self { + operation_id: operation_id.to_string(), timestamp, - operation, user, - files, - outcome, - } - } -} - -#[derive(Debug, Clone, PartialEq)] -pub struct AuditLog { - path: PathBuf, -} - -impl AuditLog { - pub fn new(path: impl AsRef) -> Self { - Self { - path: path.as_ref().to_path_buf(), + file, } } - - pub fn log(&self, entry: &AuditEntry) -> Result<()> { - let _lock = FileLock::acquire(&self.path)?; - let mut file = OpenOptions::new() - .create(true) - .append(true) - .open(&self.path)?; - - let json = serde_json::to_string(entry)?; - writeln!(file, "{}", json)?; - Ok(()) - } - - pub fn read(&self) -> Result> { - if !self.path.exists() { - return Ok(Vec::new()); - } - - let file = File::open(&self.path)?; - let reader = BufReader::new(file); - let mut entries = Vec::new(); - - for line in reader.lines() { - let line = line?; - if line.trim().is_empty() { - continue; - } - let entry: AuditEntry = serde_json::from_str(&line)?; - entries.push(entry); - } - - Ok(entries) - } } diff --git a/dvs/src/backends/local.rs b/dvs/src/backends/local.rs index 8906fcb..3f8bc8c 100644 --- a/dvs/src/backends/local.rs +++ b/dvs/src/backends/local.rs @@ -1,10 +1,17 @@ +use std::fs::OpenOptions; +use std::io::Write; use std::path::{Path, PathBuf}; -use crate::backends::Backend; use anyhow::{Result, anyhow, bail}; use fs_err as fs; use serde::{Deserialize, Serialize}; +use crate::audit::AuditEntry; +use crate::backends::Backend; +use crate::{HashAlg, Hashes}; + +const AUDIT_LOG_FILENAME: &str = "audit.log.jsonl"; + /// Parse a permission string as an octal mode. /// Returns the mode as an u32. fn parse_permissions(perms: &str) -> Result { @@ -43,6 +50,7 @@ pub struct LocalBackend { pub path: PathBuf, permissions: Option, group: Option, + hash_alg: HashAlg, } impl LocalBackend { @@ -63,6 +71,7 @@ impl LocalBackend { path: path.as_ref().to_path_buf(), permissions, group, + hash_alg: HashAlg::Blake3, }) } @@ -96,7 +105,8 @@ impl LocalBackend { Ok(()) } - fn hash_to_path(&self, hash: &str) -> Result { + fn hash_to_path(&self, hashes: &Hashes) -> Result { + let hash = hashes.get_by_alg(self.hash_alg); if hash.len() < 3 || !hash.chars().all(|c| c.is_ascii_hexdigit()) { bail!("Invalid hash: {}", hash); } @@ -114,7 +124,7 @@ impl Backend for LocalBackend { Ok(()) } - fn store(&self, hash: &str, source: &Path) -> Result<()> { + fn store(&self, hash: &Hashes, source: &Path) -> Result<()> { let path = self.hash_to_path(hash)?; if let Some(parent) = path.parent() { fs::create_dir_all(parent)?; @@ -125,7 +135,7 @@ impl Backend for LocalBackend { Ok(()) } - fn store_bytes(&self, hash: &str, content: &[u8]) -> Result<()> { + fn store_bytes(&self, hash: &Hashes, content: &[u8]) -> Result<()> { let path = self.hash_to_path(hash)?; log::debug!("Storing {} bytes to {}", content.len(), path.display()); if let Some(parent) = path.parent() { @@ -137,7 +147,7 @@ impl Backend for LocalBackend { Ok(()) } - fn retrieve(&self, hash: &str, target: &Path) -> Result { + fn retrieve(&self, hash: &Hashes, target: &Path) -> Result { let path = self.hash_to_path(hash)?; if path.is_file() { if let Some(parent) = target.parent() { @@ -150,11 +160,11 @@ impl Backend for LocalBackend { } } - fn exists(&self, hash: &str) -> Result { + fn exists(&self, hash: &Hashes) -> Result { Ok(self.hash_to_path(hash)?.is_file()) } - fn remove(&self, hash: &str) -> Result<()> { + fn remove(&self, hash: &Hashes) -> Result<()> { let path = self.hash_to_path(hash)?; if path.is_file() { log::debug!("Removing {} from storage", hash); @@ -163,7 +173,7 @@ impl Backend for LocalBackend { Ok(()) } - fn read(&self, hash: &str) -> Result>> { + fn read(&self, hash: &Hashes) -> Result>> { let path = self.hash_to_path(hash)?; if path.is_file() { Ok(Some(fs::read(&path)?)) @@ -171,22 +181,46 @@ impl Backend for LocalBackend { Ok(None) } } + + fn log_audit(&self, entry: &AuditEntry) -> Result<()> { + let audit_path = self.path.join(AUDIT_LOG_FILENAME); + let mut file = OpenOptions::new() + .create(true) + .append(true) + .open(&audit_path)?; + let json = serde_json::to_string(entry)?; + writeln!(file, "{}", json)?; + self.apply_perms(&audit_path)?; + Ok(()) + } } #[cfg(test)] mod tests { use super::*; + use crate::hashes::Hashes; + + fn test_hash(hash: &str) -> Hashes { + Hashes { + blake3: hash.to_string(), + md5: hash.to_string(), + } + } #[test] fn hash_to_path_rejects_bad_hash() { let backend = LocalBackend::new("/tmp/storage", None, None).unwrap(); // These should error or be sanitized - assert!(backend.hash_to_path("../../etc/passwd").is_err()); - assert!(backend.hash_to_path("../escape").is_err()); assert!( backend - .hash_to_path("d41d8cd98f00b204e9800998ecf8427e") + .hash_to_path(&test_hash("../../etc/passwd")) + .is_err() + ); + assert!(backend.hash_to_path(&test_hash("../escape")).is_err()); + assert!( + backend + .hash_to_path(&test_hash("d41d8cd98f00b204e9800998ecf8427e")) .is_ok() ); } @@ -214,8 +248,8 @@ mod tests { let source = tmp.path().join("source.txt"); fs::write(&source, b"test content").unwrap(); - let hash = "d41d8cd98f00b204e9800998ecf8427e"; - backend.store(hash, &source).unwrap(); + let hash = test_hash("d41d8cd98f00b204e9800998ecf8427e"); + backend.store(&hash, &source).unwrap(); let stored = storage.join("d4").join("1d8cd98f00b204e9800998ecf8427e"); assert!(stored.is_file()); @@ -230,12 +264,12 @@ mod tests { backend.init().unwrap(); // Store content - let hash = "abc123def456789012345678901234ab"; - backend.store_bytes(hash, b"stored content").unwrap(); + let hash = test_hash("abc123def456789012345678901234ab"); + backend.store_bytes(&hash, b"stored content").unwrap(); // Retrieve to new location let target = tmp.path().join("retrieved.txt"); - let result = backend.retrieve(hash, &target).unwrap(); + let result = backend.retrieve(&hash, &target).unwrap(); // file was copied if result == true assert!(result); @@ -250,7 +284,9 @@ mod tests { backend.init().unwrap(); let target = tmp.path().join("target.txt"); - let result = backend.retrieve("1234567890123456789012", &target).unwrap(); + let result = backend + .retrieve(&test_hash("1234567890123456789012"), &target) + .unwrap(); assert!(!result); assert!(!target.exists()); @@ -263,10 +299,10 @@ mod tests { let backend = LocalBackend::new(&storage, None, None).unwrap(); backend.init().unwrap(); - let hash = "abc123def456789012345678901234ab"; - assert!(!backend.exists(hash).unwrap()); - backend.store_bytes(hash, b"content").unwrap(); - assert!(backend.exists(hash).unwrap()); + let hash = test_hash("abc123def456789012345678901234ab"); + assert!(!backend.exists(&hash).unwrap()); + backend.store_bytes(&hash, b"content").unwrap(); + assert!(backend.exists(&hash).unwrap()); } #[test] @@ -276,14 +312,14 @@ mod tests { let backend = LocalBackend::new(&storage, None, None).unwrap(); backend.init().unwrap(); - let hash = "abc123def456789012345678901234ab"; - backend.store_bytes(hash, b"content").unwrap(); - assert!(backend.exists(hash).unwrap()); + let hash = test_hash("abc123def456789012345678901234ab"); + backend.store_bytes(&hash, b"content").unwrap(); + assert!(backend.exists(&hash).unwrap()); - backend.remove(hash).unwrap(); - assert!(!backend.exists(hash).unwrap()); + backend.remove(&hash).unwrap(); + assert!(!backend.exists(&hash).unwrap()); // removing something that doesn't exist is a noop - backend.remove(hash).unwrap(); + backend.remove(&hash).unwrap(); } #[test] @@ -293,13 +329,13 @@ mod tests { let backend = LocalBackend::new(&storage, None, None).unwrap(); backend.init().unwrap(); - let hash = "abc123def456789012345678901234ab"; - backend.store_bytes(hash, b"read me").unwrap(); + let hash = test_hash("abc123def456789012345678901234ab"); + backend.store_bytes(&hash, b"read me").unwrap(); - let content = backend.read(hash).unwrap(); + let content = backend.read(&hash).unwrap(); assert_eq!(content, Some(b"read me".to_vec())); // None if hash is not found - let content = backend.read("1234567890123456789012").unwrap(); + let content = backend.read(&test_hash("1234567890123456789012")).unwrap(); assert_eq!(content, None); } @@ -313,8 +349,8 @@ mod tests { let backend = LocalBackend::new(&storage, Some("750".to_string()), None).unwrap(); backend.init().unwrap(); - let hash = "abc123def456789012345678901234ab"; - backend.store_bytes(hash, b"content").unwrap(); + let hash = test_hash("abc123def456789012345678901234ab"); + backend.store_bytes(&hash, b"content").unwrap(); let stored = storage.join("ab").join("c123def456789012345678901234ab"); let mode = fs::metadata(&stored).unwrap().permissions().mode(); diff --git a/dvs/src/backends/mod.rs b/dvs/src/backends/mod.rs index 888f71e..6c61c6d 100644 --- a/dvs/src/backends/mod.rs +++ b/dvs/src/backends/mod.rs @@ -1,5 +1,7 @@ use std::path::Path; +use crate::Hashes; +use crate::audit::AuditEntry; use anyhow::Result; pub mod local; @@ -9,21 +11,24 @@ pub trait Backend: Send + Sync { fn init(&self) -> Result<()>; /// Store file to backend by hash. - fn store(&self, hash: &str, source: &Path) -> Result<()>; + fn store(&self, hash: &Hashes, source: &Path) -> Result<()>; /// Store raw bytes to backend by hash (for rollback). - fn store_bytes(&self, hash: &str, content: &[u8]) -> Result<()>; + fn store_bytes(&self, hash: &Hashes, content: &[u8]) -> Result<()>; /// Retrieve content by hash to target path. Returns true if the file was copied to the target /// path. - fn retrieve(&self, hash: &str, target: &Path) -> Result; + fn retrieve(&self, hash: &Hashes, target: &Path) -> Result; /// Check if the file exists in the backend - fn exists(&self, hash: &str) -> Result; + fn exists(&self, hash: &Hashes) -> Result; /// Remove content by hash (for rollback). Best-effort, may silently fail. - fn remove(&self, hash: &str) -> Result<()>; + fn remove(&self, hash: &Hashes) -> Result<()>; /// Read content by hash. Returns None if not found. - fn read(&self, hash: &str) -> Result>>; + fn read(&self, hash: &Hashes) -> Result>>; + + /// Log an audit entry to the backend's audit log. + fn log_audit(&self, entry: &AuditEntry) -> Result<()>; } diff --git a/dvs/src/file.rs b/dvs/src/file.rs index cdb6542..2e25e72 100644 --- a/dvs/src/file.rs +++ b/dvs/src/file.rs @@ -1,32 +1,15 @@ use std::path::{Path, PathBuf}; +use crate::audit::{AuditEntry, AuditFile}; +use crate::backends::Backend; +use crate::hashes::Hashes; +use crate::paths::DvsPaths; use anyhow::{Context, Result, bail}; use fs_err as fs; use serde::{Deserialize, Serialize}; +use uuid::Uuid; use walkdir::WalkDir; -use crate::backends::Backend; -use crate::lock::FileLock; -use crate::paths::DvsPaths; - -#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] -pub struct Hashes { - pub blake3: String, - pub md5: String, -} - -impl From> for Hashes { - fn from(bytes: Vec) -> Self { - let blake3_hash = format!("{}", blake3::hash(&bytes)); - let md5_hash = format!("{:x}", md5::compute(&bytes)); - - Self { - blake3: blake3_hash, - md5: md5_hash, - } - } -} - /// Outcome of an add or get operation. #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "lowercase")] @@ -91,16 +74,15 @@ impl FileMetadata { /// Copies the source file to storage and saves metadata atomically (both succeed or neither). pub fn save( &self, + operation_id: Uuid, source_file: impl AsRef, backend: &dyn Backend, paths: &DvsPaths, relative_path: impl AsRef, ) -> Result { let dvs_file_path = paths.metadata_path(relative_path.as_ref()); - // Prevent concurrent edit of the same metadata file - let _lock = FileLock::acquire(&dvs_file_path)?; let dvs_file_exists = dvs_file_path.is_file(); - let storage_exists = backend.exists(&self.hashes.md5)?; + let storage_exists = backend.exists(&self.hashes)?; log::debug!( "Saving {}: metadata_exists={}, storage_exists={}", @@ -128,10 +110,10 @@ impl FileMetadata { } // 2. Read old storage content for rollback (if any) - let old_storage_content = backend.read(&self.hashes.md5)?; + let old_storage_content = backend.read(&self.hashes)?; // 3. Store file to backend - let storage_res = backend.store(&self.hashes.md5, source_file.as_ref()); + let storage_res = backend.store(&self.hashes, source_file.as_ref()); // 4. Then metadata let old_metadata_content = fs::read(&dvs_file_path).ok(); @@ -142,7 +124,19 @@ impl FileMetadata { ); match (storage_res, metadata_res) { - (Ok(_), Ok(_)) => Ok(Outcome::Copied), + (Ok(_), Ok(_)) => { + let audit_entry = AuditEntry::new( + operation_id, + AuditFile { + path: source_file.as_ref().to_path_buf(), + hashes: self.hashes.clone(), + }, + ); + if let Err(e) = backend.log_audit(&audit_entry) { + log::warn!("Failed to write audit log: {}", e); + } + Ok(Outcome::Copied) + } (Err(e), Ok(_)) => { log::warn!( "Storage failed, rolling back metadata for {}", @@ -161,9 +155,9 @@ impl FileMetadata { relative_path.as_ref().display() ); if let Some(old) = old_storage_content { - backend.store_bytes(&self.hashes.md5, &old)?; + backend.store_bytes(&self.hashes, &old)?; } else { - backend.remove(&self.hashes.md5)?; + backend.remove(&self.hashes)?; } bail!("Failed to write metadata file: {dvs_file_path:?}") } @@ -178,9 +172,9 @@ impl FileMetadata { fs::remove_file(&dvs_file_path)?; } if let Some(old) = old_storage_content { - backend.store_bytes(&self.hashes.md5, &old)?; + backend.store_bytes(&self.hashes, &old)?; } else { - backend.remove(&self.hashes.md5)?; + backend.remove(&self.hashes)?; } bail!("Failed to write metadata file: {dvs_file_path:?}: {e}") } @@ -259,13 +253,13 @@ pub fn get_file( let metadata: FileMetadata = serde_json::from_reader(fs::File::open(&dvs_file_path)?)?; log::debug!( - "Read metadata for {}: md5 hash={}", + "Read metadata for {}: {}", relative_path.as_ref().display(), - metadata.hashes.md5 + metadata.hashes ); - if !backend.exists(&metadata.hashes.md5)? { - bail!("Storage file missing for hash: {}", metadata.hashes.md5); + if !backend.exists(&metadata.hashes)? { + bail!("Storage file missing for hash: {}", metadata.hashes); } let target_path = paths.file_path(relative_path.as_ref()); @@ -285,11 +279,11 @@ pub fn get_file( // Retrieve from backend to target path log::debug!( "Copying {} from storage to {}", - metadata.hashes.md5, + metadata.hashes, target_path.display() ); backend - .retrieve(&metadata.hashes.md5, &target_path) + .retrieve(&metadata.hashes, &target_path) .with_context(|| format!("Failed to retrieve {}", relative_path.as_ref().display()))?; let actual = FileMetadata::from_file(&target_path, None)?; if actual.hashes != metadata.hashes { @@ -337,12 +331,13 @@ pub fn add_files( .collect::>() ); let mut results = Vec::new(); + let operation_id = Uuid::new_v4(); for relative_path in matched_paths { let full_path = paths.file_path(&relative_path); let metadata = FileMetadata::from_file(&full_path, message.clone())?; - let outcome = metadata.save(&full_path, backend, paths, &relative_path)?; + let outcome = metadata.save(operation_id, &full_path, backend, paths, &relative_path)?; log::info!( "Successfully added {} ({:?})", relative_path.display(), @@ -446,13 +441,13 @@ mod tests { let metadata = FileMetadata::from_file(&file_path, None).unwrap(); let outcome = metadata - .save(&file_path, backend, &paths, "data.bin") + .save(Uuid::new_v4(), &file_path, backend, &paths, "data.bin") .unwrap(); assert_eq!(outcome, Outcome::Copied); // Metadata file should exist assert!(dvs_dir.join("data.bin.dvs").is_file()); - assert!(backend.exists(&metadata.hashes.md5).unwrap()); + assert!(backend.exists(&metadata.hashes).unwrap()); } #[test] @@ -465,12 +460,12 @@ mod tests { let metadata = FileMetadata::from_file(&file_path, None).unwrap(); metadata - .save(&file_path, backend, &paths, "data.bin") + .save(Uuid::new_v4(), &file_path, backend, &paths, "data.bin") .unwrap(); // Second save should return Present let outcome = metadata - .save(&file_path, backend, &paths, "data.bin") + .save(Uuid::new_v4(), &file_path, backend, &paths, "data.bin") .unwrap(); assert_eq!(outcome, Outcome::Present); } @@ -496,7 +491,7 @@ mod tests { let metadata = FileMetadata::from_file(&file_path, None).unwrap(); metadata - .save(&file_path, backend, &paths, "synced.txt") + .save(Uuid::new_v4(), &file_path, backend, &paths, "synced.txt") .unwrap(); let status = get_file_status(&paths, "synced.txt").unwrap(); @@ -513,7 +508,7 @@ mod tests { let metadata = FileMetadata::from_file(&file_path, None).unwrap(); metadata - .save(&file_path, backend, &paths, "deleted.txt") + .save(Uuid::new_v4(), &file_path, backend, &paths, "deleted.txt") .unwrap(); // Delete the original file @@ -533,7 +528,7 @@ mod tests { let metadata = FileMetadata::from_file(&file_path, None).unwrap(); metadata - .save(&file_path, backend, &paths, "modified.txt") + .save(Uuid::new_v4(), &file_path, backend, &paths, "modified.txt") .unwrap(); // Modify the file @@ -553,7 +548,7 @@ mod tests { let metadata = FileMetadata::from_file(&file_path, None).unwrap(); metadata - .save(&file_path, backend, &paths, "retrieve.txt") + .save(Uuid::new_v4(), &file_path, backend, &paths, "retrieve.txt") .unwrap(); // Delete the original file @@ -577,7 +572,7 @@ mod tests { let metadata = FileMetadata::from_file(&file_path, None).unwrap(); metadata - .save(&file_path, backend, &paths, "present.txt") + .save(Uuid::new_v4(), &file_path, backend, &paths, "present.txt") .unwrap(); // File still exists and matches - should return Present @@ -608,7 +603,9 @@ mod tests { for name in ["a.txt", "b.txt", "subdir/c.txt"] { let file_path = create_file(&root, name, name.as_bytes()); let metadata = FileMetadata::from_file(&file_path, None).unwrap(); - metadata.save(&file_path, backend, &paths, name).unwrap(); + metadata + .save(Uuid::new_v4(), &file_path, backend, &paths, name) + .unwrap(); } let statuses = get_status(&paths).unwrap(); @@ -775,13 +772,17 @@ mod tests { // Add file A with content "foo" (hash H1) let file_a = create_file(&root, "a.txt", b"foo"); let metadata_a = FileMetadata::from_file(&file_a, None).unwrap(); - metadata_a.save(&file_a, backend, &paths, "a.txt").unwrap(); + metadata_a + .save(Uuid::new_v4(), &file_a, backend, &paths, "a.txt") + .unwrap(); let hash_h1 = metadata_a.hashes.md5.clone(); // Add file B with content "bar" (hash H2) let file_b = create_file(&root, "b.txt", b"bar"); let metadata_b = FileMetadata::from_file(&file_b, None).unwrap(); - metadata_b.save(&file_b, backend, &paths, "b.txt").unwrap(); + metadata_b + .save(Uuid::new_v4(), &file_b, backend, &paths, "b.txt") + .unwrap(); let hash_h2 = metadata_b.hashes.md5.clone(); assert_ne!(hash_h1, hash_h2); @@ -793,7 +794,7 @@ mod tests { assert_eq!(metadata_b_new.hashes.md5, hash_h1); metadata_b_new - .save(&file_b, backend, &paths, "b.txt") + .save(Uuid::new_v4(), &file_b, backend, &paths, "b.txt") .unwrap(); // Verify metadata was updated @@ -821,7 +822,7 @@ mod tests { let file_path = create_file(&root, "data.txt", b"original content"); let metadata = FileMetadata::from_file(&file_path, None).unwrap(); metadata - .save(&file_path, backend, &paths, "data.txt") + .save(Uuid::new_v4(), &file_path, backend, &paths, "data.txt") .unwrap(); // Delete the local file @@ -830,8 +831,8 @@ mod tests { // Corrupt the storage file let storage_path = root .join(".storage") - .join(&metadata.hashes.md5[..2]) - .join(&metadata.hashes.md5[2..]); + .join(&metadata.hashes.blake3[..2]) + .join(&metadata.hashes.blake3[2..]); fs::write(&storage_path, b"corrupted content").unwrap(); // get_file should error on hash mismatch diff --git a/dvs/src/hashes.rs b/dvs/src/hashes.rs new file mode 100644 index 0000000..992d3ad --- /dev/null +++ b/dvs/src/hashes.rs @@ -0,0 +1,42 @@ +use serde::{Deserialize, Serialize}; +use std::fmt::Display; + +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Copy)] +#[serde(rename_all = "lowercase")] +pub enum HashAlg { + Blake3, + Md5, +} + +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] +pub struct Hashes { + pub blake3: String, + pub md5: String, +} + +impl From> for Hashes { + fn from(bytes: Vec) -> Self { + let blake3_hash = format!("{}", blake3::hash(&bytes)); + let md5_hash = format!("{:x}", md5::compute(&bytes)); + + Self { + blake3: blake3_hash, + md5: md5_hash, + } + } +} + +impl Hashes { + pub fn get_by_alg(&self, alg: HashAlg) -> &str { + match alg { + HashAlg::Blake3 => &self.blake3, + HashAlg::Md5 => &self.md5, + } + } +} + +impl Display for Hashes { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Hashes(md5={}, blake3={})", self.md5, self.blake3) + } +} diff --git a/dvs/src/lib.rs b/dvs/src/lib.rs index 79f3419..3eeffba 100644 --- a/dvs/src/lib.rs +++ b/dvs/src/lib.rs @@ -2,13 +2,14 @@ pub mod audit; pub mod backends; pub mod config; pub mod file; +mod hashes; pub mod init; -mod lock; pub mod paths; pub use backends::Backend; -pub use file::{AddResult, FileMetadata, FileStatus, GetResult, Hashes, Outcome, Status}; +pub use file::{AddResult, FileMetadata, FileStatus, GetResult, Outcome, Status}; pub use file::{add_files, get_file, get_file_status, get_files, get_status}; +pub use hashes::{HashAlg, Hashes}; pub use paths::{DvsPaths, find_repo_root}; #[cfg(test)] diff --git a/dvs/src/lock.rs b/dvs/src/lock.rs deleted file mode 100644 index 8e26f29..0000000 --- a/dvs/src/lock.rs +++ /dev/null @@ -1,30 +0,0 @@ -use std::path::{Path, PathBuf}; -use std::thread; -use std::time::Duration; - -use anyhow::{Result, bail}; -use fs_err as fs; - -pub struct FileLock(PathBuf); - -impl FileLock { - pub fn acquire(path: &Path) -> Result { - let lock_path = path.with_extension("lock"); - - for _ in 0..50 { - if fs::File::create(lock_path.clone()).is_ok() { - return Ok(Self(lock_path.to_path_buf())); - } - - thread::sleep(Duration::from_millis(100)); - } - - bail!("Timeout acquiring lock for {}", path.display()) - } -} - -impl Drop for FileLock { - fn drop(&mut self) { - let _ = std::fs::remove_file(&self.0); - } -} diff --git a/dvs/src/paths.rs b/dvs/src/paths.rs index 4dce009..09e0f83 100644 --- a/dvs/src/paths.rs +++ b/dvs/src/paths.rs @@ -193,6 +193,7 @@ impl DvsPaths { mod tests { use super::*; use crate::testutil::create_temp_git_repo; + use uuid::Uuid; #[test] fn find_repo_root_at_root() { @@ -263,7 +264,13 @@ mod tests { let metadata = crate::FileMetadata::from_file(&file_path, None).unwrap(); let paths = DvsPaths::new(root.clone(), root.clone(), config.metadata_folder_name()); metadata - .save(&file_path, config.backend(), &paths, "data.txt") + .save( + Uuid::new_v4(), + &file_path, + config.backend(), + &paths, + "data.txt", + ) .unwrap(); // Also create a regular file in a subdir From 728c99b582446367e82046b8d4b7497259e4dfe3 Mon Sep 17 00:00:00 2001 From: Vincent Prouillet Date: Wed, 28 Jan 2026 11:57:29 +0100 Subject: [PATCH 3/4] Address comments --- dvs/src/backends/local.rs | 8 +++++--- dvs/src/file.rs | 2 +- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/dvs/src/backends/local.rs b/dvs/src/backends/local.rs index 3f8bc8c..7cf6b28 100644 --- a/dvs/src/backends/local.rs +++ b/dvs/src/backends/local.rs @@ -108,7 +108,7 @@ impl LocalBackend { fn hash_to_path(&self, hashes: &Hashes) -> Result { let hash = hashes.get_by_alg(self.hash_alg); if hash.len() < 3 || !hash.chars().all(|c| c.is_ascii_hexdigit()) { - bail!("Invalid hash: {}", hash); + bail!("Invalid hash: {hash}"); } let (prefix, suffix) = hash.split_at(2); Ok(self.path.join(prefix).join(suffix)) @@ -137,7 +137,7 @@ impl Backend for LocalBackend { fn store_bytes(&self, hash: &Hashes, content: &[u8]) -> Result<()> { let path = self.hash_to_path(hash)?; - log::debug!("Storing {} bytes to {}", content.len(), path.display()); + log::debug!("Storing {} bytes to {path:?}", content.len()); if let Some(parent) = path.parent() { fs::create_dir_all(parent)?; self.apply_perms(parent)?; @@ -167,7 +167,7 @@ impl Backend for LocalBackend { fn remove(&self, hash: &Hashes) -> Result<()> { let path = self.hash_to_path(hash)?; if path.is_file() { - log::debug!("Removing {} from storage", hash); + log::debug!("Removing {path:?} from storage"); fs::remove_file(path)?; } Ok(()) @@ -176,6 +176,7 @@ impl Backend for LocalBackend { fn read(&self, hash: &Hashes) -> Result>> { let path = self.hash_to_path(hash)?; if path.is_file() { + log::debug!("Reading {path:?} from storage"); Ok(Some(fs::read(&path)?)) } else { Ok(None) @@ -183,6 +184,7 @@ impl Backend for LocalBackend { } fn log_audit(&self, entry: &AuditEntry) -> Result<()> { + log::debug!("Appending {entry:?} to audit log"); let audit_path = self.path.join(AUDIT_LOG_FILENAME); let mut file = OpenOptions::new() .create(true) diff --git a/dvs/src/file.rs b/dvs/src/file.rs index 2e25e72..d671e70 100644 --- a/dvs/src/file.rs +++ b/dvs/src/file.rs @@ -133,7 +133,7 @@ impl FileMetadata { }, ); if let Err(e) = backend.log_audit(&audit_entry) { - log::warn!("Failed to write audit log: {}", e); + log::error!("Failed to write audit log {audit_entry:?}: {e}"); } Ok(Outcome::Copied) } From 904502040213471319ff3a9a3facfa80f00dd514 Mon Sep 17 00:00:00 2001 From: Vincent Prouillet Date: Wed, 28 Jan 2026 12:11:49 +0100 Subject: [PATCH 4/4] Add test for local backend audit logging --- dvs/src/audit.rs | 10 ++++++++ dvs/src/backends/local.rs | 49 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 59 insertions(+) diff --git a/dvs/src/audit.rs b/dvs/src/audit.rs index 5e246de..52ac6e0 100644 --- a/dvs/src/audit.rs +++ b/dvs/src/audit.rs @@ -1,6 +1,7 @@ use std::path::PathBuf; use crate::Hashes; +use anyhow::Result; use jiff::Timestamp; use serde::{Deserialize, Serialize}; use uuid::Uuid; @@ -32,3 +33,12 @@ impl AuditEntry { } } } + +pub fn parse_audit_log(bytes: &[u8]) -> Result> { + let content = std::str::from_utf8(bytes)?; + content + .lines() + .filter(|line| !line.trim().is_empty()) + .map(|line| Ok(serde_json::from_str(line)?)) + .collect() +} diff --git a/dvs/src/backends/local.rs b/dvs/src/backends/local.rs index 7cf6b28..fdcb484 100644 --- a/dvs/src/backends/local.rs +++ b/dvs/src/backends/local.rs @@ -200,6 +200,7 @@ impl Backend for LocalBackend { #[cfg(test)] mod tests { use super::*; + use crate::audit::{AuditEntry, AuditFile, parse_audit_log}; use crate::hashes::Hashes; fn test_hash(hash: &str) -> Hashes { @@ -358,4 +359,52 @@ mod tests { let mode = fs::metadata(&stored).unwrap().permissions().mode(); assert_eq!(mode & 0o777, 0o750); } + + #[test] + fn log_audit_appends_to_jsonl() { + let tmp = tempfile::tempdir().unwrap(); + let storage = tmp.path().join("storage"); + let backend = LocalBackend::new(&storage, None, None).unwrap(); + backend.init().unwrap(); + + let hash = test_hash("abc123def456789012345678901234ab"); + + let entry1 = AuditEntry { + operation_id: "op-1".to_string(), + timestamp: 1000000000, + user: "alice".to_string(), + file: AuditFile { + path: PathBuf::from("file1.txt"), + hashes: hash.clone(), + }, + }; + + let entry2 = AuditEntry { + operation_id: "op-2".to_string(), + timestamp: 2000000000, + user: "bob".to_string(), + file: AuditFile { + path: PathBuf::from("file2.txt"), + hashes: hash.clone(), + }, + }; + + backend.log_audit(&entry1).unwrap(); + backend.log_audit(&entry2).unwrap(); + + let audit_path = storage.join("audit.log.jsonl"); + assert!(audit_path.is_file()); + + let content = fs::read(&audit_path).unwrap(); + let entries = parse_audit_log(&content).unwrap(); + assert_eq!(entries.len(), 2); + + assert_eq!(entries[0].operation_id, "op-1"); + assert_eq!(entries[0].timestamp, 1000000000); + assert_eq!(entries[0].user, "alice"); + + assert_eq!(entries[1].operation_id, "op-2"); + assert_eq!(entries[1].timestamp, 2000000000); + assert_eq!(entries[1].user, "bob"); + } }