From ddce43ad86468accfdcbbbd8802ab3c8c91876df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Hanuszczak?= Date: Wed, 28 Jan 2026 15:40:06 +0100 Subject: [PATCH 1/3] Require filestore part content to be non-empty. --- crates/rrg/src/filestore.rs | 112 +++++++++++------------------------- 1 file changed, 32 insertions(+), 80 deletions(-) diff --git a/crates/rrg/src/filestore.rs b/crates/rrg/src/filestore.rs index 0b2c2a5a..f0c4795a 100644 --- a/crates/rrg/src/filestore.rs +++ b/crates/rrg/src/filestore.rs @@ -189,6 +189,16 @@ impl Filestore { /// can't complete. In such cases there is no guarantee about the state of /// the file on disk and it should not be used again. pub fn store(&self, id: Id, part: Part) -> std::io::Result { + if part.content.is_empty() { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + format! { + "empty content for file part at offset {}", + part.offset, + }, + )); + } + log::info!("storing part at {} for '{}'", part.offset, id); // Note that in the code below we do not attempt to do any cleanup upon @@ -782,34 +792,6 @@ mod tests { assert_eq!(foo_contents, b"FOOBARBAZ"); } - #[test] - fn store_single_file_single_part_empty() { - let tempdir = tempfile::tempdir() - .unwrap(); - - let filestore = Filestore::init(tempdir.path(), Duration::MAX) - .unwrap(); - - let foo_id = Id { - flow_id: 0xf00, - file_id: "foo", - }; - - assert_eq! { - filestore.store(foo_id, Part { - offset: 0, - content: b"".to_vec(), - file_len: 0, - file_sha256: sha256(b""), - }).unwrap(), - Status::Complete, - }; - - let foo_contents = std::fs::read(filestore.path(foo_id).unwrap()) - .unwrap(); - assert_eq!(foo_contents, b""); - } - #[test] fn store_single_file_multiple_parts() { let tempdir = tempfile::tempdir() @@ -914,58 +896,6 @@ mod tests { assert_eq!(foo_contents, b"FOOBARBAZ"); } - #[test] - fn store_single_file_multiple_parts_empty() { - let tempdir = tempfile::tempdir() - .unwrap(); - - let filestore = Filestore::init(tempdir.path(), Duration::MAX) - .unwrap(); - - let foo_id = Id { - flow_id: 0xf00, - file_id: "foo", - }; - - assert_eq! { - filestore.store(foo_id, Part { - offset: 0, - content: b"FOO".to_vec(), - file_len: b"FOOBAR".len() as u64, - file_sha256: sha256(b"FOO"), - }).unwrap(), - Status::Pending { - offset: b"FOO".len() as u64, - len: b"BAR".len() as u64, - }, - }; - assert_eq! { - filestore.store(foo_id, Part { - offset: b"FOO".len() as u64, - content: b"".to_vec(), - file_len: b"FOOBAR".len() as u64, - file_sha256: sha256(b"FOOBAR"), - }).unwrap(), - Status::Pending { - offset: b"FOO".len() as u64, - len: b"BAR".len() as u64, - }, - }; - assert_eq! { - filestore.store(foo_id, Part { - offset: b"FOO".len() as u64, - content: b"BAR".to_vec(), - file_len: b"FOOBAR".len() as u64, - file_sha256: sha256(b"FOOBAR"), - }).unwrap(), - Status::Complete, - }; - - let foo_contents = std::fs::read(filestore.path(foo_id).unwrap()) - .unwrap(); - assert_eq!(foo_contents, b"FOOBAR"); - } - #[test] fn store_multiple_files_single_part() { let tempdir = tempfile::tempdir() @@ -1028,6 +958,28 @@ mod tests { assert_eq!(quux_contents, b"QUUX"); } + #[test] + fn store_empty_content() { + let tempdir = tempfile::tempdir() + .unwrap(); + + let filestore = Filestore::init(tempdir.path(), Duration::MAX) + .unwrap(); + + let foo_id = Id { + flow_id: 0xf00, + file_id: "foo", + }; + + let error = filestore.store(foo_id, Part { + offset: 0, + content: b"".to_vec(), + file_len: 0, + file_sha256: sha256(b""), + }).unwrap_err(); + assert_eq!(error.kind(), std::io::ErrorKind::InvalidData); + } + #[test] fn store_overlapping_parts() { let tempdir = tempfile::tempdir() From 15316a418a06387140ae6777f97be3ea56cc7397 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Hanuszczak?= Date: Wed, 28 Jan 2026 15:43:29 +0100 Subject: [PATCH 2/3] Disallow storing many parts at the same offset. --- crates/rrg/src/filestore.rs | 79 +++++++++++++++++++++++++++++++++++-- 1 file changed, 76 insertions(+), 3 deletions(-) diff --git a/crates/rrg/src/filestore.rs b/crates/rrg/src/filestore.rs index f0c4795a..738e2810 100644 --- a/crates/rrg/src/filestore.rs +++ b/crates/rrg/src/filestore.rs @@ -189,6 +189,8 @@ impl Filestore { /// can't complete. In such cases there is no guarantee about the state of /// the file on disk and it should not be used again. pub fn store(&self, id: Id, part: Part) -> std::io::Result { + use std::io::{Seek as _, Write as _}; + if part.content.is_empty() { return Err(std::io::Error::new( std::io::ErrorKind::InvalidData, @@ -230,11 +232,25 @@ impl Filestore { "could not create file parts dir at '{}': {error}", file_parts_dir_path.display(), }))?; - std::fs::write(&part_path, &part.content) + + let mut part_file = std::fs::File::create_new(&part_path) .map_err(|error| std::io::Error::new(error.kind(), format! { - "could not write part to '{}': {error}", + "could not create part file at '{}': {error}", part_path.display(), }))?; + part_file.write_all(&part.content) + .map_err(|error| std::io::Error::new(error.kind(), format! { + "could not write content of part file at '{}': {error}", + part_path.display(), + }))?; + // We cannot account for any errors when closing (`drop` cannot return + // any errors), so we sync it before that. + part_file.sync_all() + .map_err(|error| std::io::Error::new(error.kind(), format! { + "could not sync part file at '{}': {error}", + part_path.display() + }))?; + drop(part_file); log::info!("checking stored parts for '{}'", id); @@ -396,7 +412,6 @@ impl Filestore { log::info!("verifying SHA-256 of '{}' content", id); - use std::io::Seek as _; file.seek(std::io::SeekFrom::Start(0)) .map_err(|error| std::io::Error::new(error.kind(), format! { "could not seek file at '{}' for SHA-256 verification: {error}", @@ -980,6 +995,64 @@ mod tests { assert_eq!(error.kind(), std::io::ErrorKind::InvalidData); } + #[test] + fn store_same_part() { + let tempdir = tempfile::tempdir() + .unwrap(); + + let filestore = Filestore::init(tempdir.path(), Duration::MAX) + .unwrap(); + + let foo_id = Id { + flow_id: 0xf00, + file_id: "foo", + }; + + filestore.store(foo_id, Part { + offset: 0, + content: b"FOO".to_vec(), + file_len: b"FOOBAR".len() as u64, + file_sha256: sha256(b"FOOBAR"), + }).unwrap(); + + let error = filestore.store(foo_id, Part { + offset: 0, + content: b"FOO".to_vec(), + file_len: b"FOOBAR".len() as u64, + file_sha256: sha256(b"FOOBAR"), + }).unwrap_err(); + assert_eq!(error.kind(), std::io::ErrorKind::AlreadyExists); + } + + #[test] + fn store_same_offset() { + let tempdir = tempfile::tempdir() + .unwrap(); + + let filestore = Filestore::init(tempdir.path(), Duration::MAX) + .unwrap(); + + let foo_id = Id { + flow_id: 0xf00, + file_id: "foo", + }; + + filestore.store(foo_id, Part { + offset: 0, + content: b"FOO".to_vec(), + file_len: b"FOOBAR".len() as u64, + file_sha256: sha256(b"FOOBAR"), + }).unwrap(); + + let error = filestore.store(foo_id, Part { + offset: 0, + content: b"FOOBA".to_vec(), + file_len: b"FOOBAR".len() as u64, + file_sha256: sha256(b"FOOBAR"), + }).unwrap_err(); + assert_eq!(error.kind(), std::io::ErrorKind::AlreadyExists); + } + #[test] fn store_overlapping_parts() { let tempdir = tempfile::tempdir() From dfa53cc9c616cd4798fa3c3452f9fce9b28fe92d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Hanuszczak?= Date: Wed, 28 Jan 2026 17:15:41 +0100 Subject: [PATCH 3/3] Add a test for storing part of a complete file. --- crates/rrg/src/filestore.rs | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/crates/rrg/src/filestore.rs b/crates/rrg/src/filestore.rs index 738e2810..bd6ac364 100644 --- a/crates/rrg/src/filestore.rs +++ b/crates/rrg/src/filestore.rs @@ -1148,6 +1148,38 @@ mod tests { assert_eq!(error.kind(), std::io::ErrorKind::InvalidData); } + #[test] + fn store_already_complete() { + let tempdir = tempfile::tempdir() + .unwrap(); + + let filestore = Filestore::init(tempdir.path(), Duration::MAX) + .unwrap(); + + let foo_id = Id { + flow_id: 0xf00, + file_id: "foo", + }; + + assert_eq! { + filestore.store(foo_id, Part { + offset: 0, + content: b"FOOBAR".to_vec(), + file_len: b"FOOBAR".len() as u64, + file_sha256: sha256(b"FOOBAR"), + }).unwrap(), + Status::Complete, + }; + + let error = filestore.store(foo_id, Part { + offset: 0, + content: b"FOOBAR".to_vec(), + file_len: b"FOOBAR".len() as u64, + file_sha256: sha256(b"FOOBAR"), + }).unwrap_err(); + assert_eq!(error.kind(), std::io::ErrorKind::AlreadyExists); + } + #[test] fn delete_single_file() { let tempdir = tempfile::tempdir()