diff --git a/crates/rrg/src/filestore.rs b/crates/rrg/src/filestore.rs index 0b2c2a5a..bd6ac364 100644 --- a/crates/rrg/src/filestore.rs +++ b/crates/rrg/src/filestore.rs @@ -189,6 +189,18 @@ impl Filestore { /// can't complete. In such cases there is no guarantee about the state of /// the file on disk and it should not be used again. pub fn store(&self, id: Id, part: Part) -> std::io::Result { + use std::io::{Seek as _, Write as _}; + + if part.content.is_empty() { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + format! { + "empty content for file part at offset {}", + part.offset, + }, + )); + } + log::info!("storing part at {} for '{}'", part.offset, id); // Note that in the code below we do not attempt to do any cleanup upon @@ -220,11 +232,25 @@ impl Filestore { "could not create file parts dir at '{}': {error}", file_parts_dir_path.display(), }))?; - std::fs::write(&part_path, &part.content) + + let mut part_file = std::fs::File::create_new(&part_path) .map_err(|error| std::io::Error::new(error.kind(), format! { - "could not write part to '{}': {error}", + "could not create part file at '{}': {error}", part_path.display(), }))?; + part_file.write_all(&part.content) + .map_err(|error| std::io::Error::new(error.kind(), format! { + "could not write content of part file at '{}': {error}", + part_path.display(), + }))?; + // We cannot account for any errors when closing (`drop` cannot return + // any errors), so we sync it before that. + part_file.sync_all() + .map_err(|error| std::io::Error::new(error.kind(), format! { + "could not sync part file at '{}': {error}", + part_path.display() + }))?; + drop(part_file); log::info!("checking stored parts for '{}'", id); @@ -386,7 +412,6 @@ impl Filestore { log::info!("verifying SHA-256 of '{}' content", id); - use std::io::Seek as _; file.seek(std::io::SeekFrom::Start(0)) .map_err(|error| std::io::Error::new(error.kind(), format! { "could not seek file at '{}' for SHA-256 verification: {error}", @@ -782,34 +807,6 @@ mod tests { assert_eq!(foo_contents, b"FOOBARBAZ"); } - #[test] - fn store_single_file_single_part_empty() { - let tempdir = tempfile::tempdir() - .unwrap(); - - let filestore = Filestore::init(tempdir.path(), Duration::MAX) - .unwrap(); - - let foo_id = Id { - flow_id: 0xf00, - file_id: "foo", - }; - - assert_eq! { - filestore.store(foo_id, Part { - offset: 0, - content: b"".to_vec(), - file_len: 0, - file_sha256: sha256(b""), - }).unwrap(), - Status::Complete, - }; - - let foo_contents = std::fs::read(filestore.path(foo_id).unwrap()) - .unwrap(); - assert_eq!(foo_contents, b""); - } - #[test] fn store_single_file_multiple_parts() { let tempdir = tempfile::tempdir() @@ -914,58 +911,6 @@ mod tests { assert_eq!(foo_contents, b"FOOBARBAZ"); } - #[test] - fn store_single_file_multiple_parts_empty() { - let tempdir = tempfile::tempdir() - .unwrap(); - - let filestore = Filestore::init(tempdir.path(), Duration::MAX) - .unwrap(); - - let foo_id = Id { - flow_id: 0xf00, - file_id: "foo", - }; - - assert_eq! { - filestore.store(foo_id, Part { - offset: 0, - content: b"FOO".to_vec(), - file_len: b"FOOBAR".len() as u64, - file_sha256: sha256(b"FOO"), - }).unwrap(), - Status::Pending { - offset: b"FOO".len() as u64, - len: b"BAR".len() as u64, - }, - }; - assert_eq! { - filestore.store(foo_id, Part { - offset: b"FOO".len() as u64, - content: b"".to_vec(), - file_len: b"FOOBAR".len() as u64, - file_sha256: sha256(b"FOOBAR"), - }).unwrap(), - Status::Pending { - offset: b"FOO".len() as u64, - len: b"BAR".len() as u64, - }, - }; - assert_eq! { - filestore.store(foo_id, Part { - offset: b"FOO".len() as u64, - content: b"BAR".to_vec(), - file_len: b"FOOBAR".len() as u64, - file_sha256: sha256(b"FOOBAR"), - }).unwrap(), - Status::Complete, - }; - - let foo_contents = std::fs::read(filestore.path(foo_id).unwrap()) - .unwrap(); - assert_eq!(foo_contents, b"FOOBAR"); - } - #[test] fn store_multiple_files_single_part() { let tempdir = tempfile::tempdir() @@ -1028,6 +973,86 @@ mod tests { assert_eq!(quux_contents, b"QUUX"); } + #[test] + fn store_empty_content() { + let tempdir = tempfile::tempdir() + .unwrap(); + + let filestore = Filestore::init(tempdir.path(), Duration::MAX) + .unwrap(); + + let foo_id = Id { + flow_id: 0xf00, + file_id: "foo", + }; + + let error = filestore.store(foo_id, Part { + offset: 0, + content: b"".to_vec(), + file_len: 0, + file_sha256: sha256(b""), + }).unwrap_err(); + assert_eq!(error.kind(), std::io::ErrorKind::InvalidData); + } + + #[test] + fn store_same_part() { + let tempdir = tempfile::tempdir() + .unwrap(); + + let filestore = Filestore::init(tempdir.path(), Duration::MAX) + .unwrap(); + + let foo_id = Id { + flow_id: 0xf00, + file_id: "foo", + }; + + filestore.store(foo_id, Part { + offset: 0, + content: b"FOO".to_vec(), + file_len: b"FOOBAR".len() as u64, + file_sha256: sha256(b"FOOBAR"), + }).unwrap(); + + let error = filestore.store(foo_id, Part { + offset: 0, + content: b"FOO".to_vec(), + file_len: b"FOOBAR".len() as u64, + file_sha256: sha256(b"FOOBAR"), + }).unwrap_err(); + assert_eq!(error.kind(), std::io::ErrorKind::AlreadyExists); + } + + #[test] + fn store_same_offset() { + let tempdir = tempfile::tempdir() + .unwrap(); + + let filestore = Filestore::init(tempdir.path(), Duration::MAX) + .unwrap(); + + let foo_id = Id { + flow_id: 0xf00, + file_id: "foo", + }; + + filestore.store(foo_id, Part { + offset: 0, + content: b"FOO".to_vec(), + file_len: b"FOOBAR".len() as u64, + file_sha256: sha256(b"FOOBAR"), + }).unwrap(); + + let error = filestore.store(foo_id, Part { + offset: 0, + content: b"FOOBA".to_vec(), + file_len: b"FOOBAR".len() as u64, + file_sha256: sha256(b"FOOBAR"), + }).unwrap_err(); + assert_eq!(error.kind(), std::io::ErrorKind::AlreadyExists); + } + #[test] fn store_overlapping_parts() { let tempdir = tempfile::tempdir() @@ -1123,6 +1148,38 @@ mod tests { assert_eq!(error.kind(), std::io::ErrorKind::InvalidData); } + #[test] + fn store_already_complete() { + let tempdir = tempfile::tempdir() + .unwrap(); + + let filestore = Filestore::init(tempdir.path(), Duration::MAX) + .unwrap(); + + let foo_id = Id { + flow_id: 0xf00, + file_id: "foo", + }; + + assert_eq! { + filestore.store(foo_id, Part { + offset: 0, + content: b"FOOBAR".to_vec(), + file_len: b"FOOBAR".len() as u64, + file_sha256: sha256(b"FOOBAR"), + }).unwrap(), + Status::Complete, + }; + + let error = filestore.store(foo_id, Part { + offset: 0, + content: b"FOOBAR".to_vec(), + file_len: b"FOOBAR".len() as u64, + file_sha256: sha256(b"FOOBAR"), + }).unwrap_err(); + assert_eq!(error.kind(), std::io::ErrorKind::AlreadyExists); + } + #[test] fn delete_single_file() { let tempdir = tempfile::tempdir()