Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 140 additions & 83 deletions crates/rrg/src/filestore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,18 @@ impl Filestore {
/// can't complete. In such cases there is no guarantee about the state of
/// the file on disk and it should not be used again.
pub fn store(&self, id: Id, part: Part) -> std::io::Result<Status> {
use std::io::{Seek as _, Write as _};

if part.content.is_empty() {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format! {
"empty content for file part at offset {}",
part.offset,
},
));
}

log::info!("storing part at {} for '{}'", part.offset, id);

// Note that in the code below we do not attempt to do any cleanup upon
Expand Down Expand Up @@ -220,11 +232,25 @@ impl Filestore {
"could not create file parts dir at '{}': {error}",
file_parts_dir_path.display(),
}))?;
std::fs::write(&part_path, &part.content)

let mut part_file = std::fs::File::create_new(&part_path)
.map_err(|error| std::io::Error::new(error.kind(), format! {
"could not write part to '{}': {error}",
"could not create part file at '{}': {error}",
part_path.display(),
}))?;
part_file.write_all(&part.content)
.map_err(|error| std::io::Error::new(error.kind(), format! {
"could not write content of part file at '{}': {error}",
part_path.display(),
}))?;
// We cannot account for any errors when closing (`drop` cannot return
// any errors), so we sync it before that.
part_file.sync_all()
.map_err(|error| std::io::Error::new(error.kind(), format! {
"could not sync part file at '{}': {error}",
part_path.display()
}))?;
drop(part_file);

log::info!("checking stored parts for '{}'", id);

Expand Down Expand Up @@ -386,7 +412,6 @@ impl Filestore {

log::info!("verifying SHA-256 of '{}' content", id);

use std::io::Seek as _;
file.seek(std::io::SeekFrom::Start(0))
.map_err(|error| std::io::Error::new(error.kind(), format! {
"could not seek file at '{}' for SHA-256 verification: {error}",
Expand Down Expand Up @@ -782,34 +807,6 @@ mod tests {
assert_eq!(foo_contents, b"FOOBARBAZ");
}

#[test]
fn store_single_file_single_part_empty() {
let tempdir = tempfile::tempdir()
.unwrap();

let filestore = Filestore::init(tempdir.path(), Duration::MAX)
.unwrap();

let foo_id = Id {
flow_id: 0xf00,
file_id: "foo",
};

assert_eq! {
filestore.store(foo_id, Part {
offset: 0,
content: b"".to_vec(),
file_len: 0,
file_sha256: sha256(b""),
}).unwrap(),
Status::Complete,
};

let foo_contents = std::fs::read(filestore.path(foo_id).unwrap())
.unwrap();
assert_eq!(foo_contents, b"");
}

#[test]
fn store_single_file_multiple_parts() {
let tempdir = tempfile::tempdir()
Expand Down Expand Up @@ -914,58 +911,6 @@ mod tests {
assert_eq!(foo_contents, b"FOOBARBAZ");
}

#[test]
fn store_single_file_multiple_parts_empty() {
let tempdir = tempfile::tempdir()
.unwrap();

let filestore = Filestore::init(tempdir.path(), Duration::MAX)
.unwrap();

let foo_id = Id {
flow_id: 0xf00,
file_id: "foo",
};

assert_eq! {
filestore.store(foo_id, Part {
offset: 0,
content: b"FOO".to_vec(),
file_len: b"FOOBAR".len() as u64,
file_sha256: sha256(b"FOO"),
}).unwrap(),
Status::Pending {
offset: b"FOO".len() as u64,
len: b"BAR".len() as u64,
},
};
assert_eq! {
filestore.store(foo_id, Part {
offset: b"FOO".len() as u64,
content: b"".to_vec(),
file_len: b"FOOBAR".len() as u64,
file_sha256: sha256(b"FOOBAR"),
}).unwrap(),
Status::Pending {
offset: b"FOO".len() as u64,
len: b"BAR".len() as u64,
},
};
assert_eq! {
filestore.store(foo_id, Part {
offset: b"FOO".len() as u64,
content: b"BAR".to_vec(),
file_len: b"FOOBAR".len() as u64,
file_sha256: sha256(b"FOOBAR"),
}).unwrap(),
Status::Complete,
};

let foo_contents = std::fs::read(filestore.path(foo_id).unwrap())
.unwrap();
assert_eq!(foo_contents, b"FOOBAR");
}

#[test]
fn store_multiple_files_single_part() {
let tempdir = tempfile::tempdir()
Expand Down Expand Up @@ -1028,6 +973,86 @@ mod tests {
assert_eq!(quux_contents, b"QUUX");
}

#[test]
fn store_empty_content() {
let tempdir = tempfile::tempdir()
.unwrap();

let filestore = Filestore::init(tempdir.path(), Duration::MAX)
.unwrap();

let foo_id = Id {
flow_id: 0xf00,
file_id: "foo",
};

let error = filestore.store(foo_id, Part {
offset: 0,
content: b"".to_vec(),
file_len: 0,
file_sha256: sha256(b""),
}).unwrap_err();
assert_eq!(error.kind(), std::io::ErrorKind::InvalidData);
}

#[test]
fn store_same_part() {
let tempdir = tempfile::tempdir()
.unwrap();

let filestore = Filestore::init(tempdir.path(), Duration::MAX)
.unwrap();

let foo_id = Id {
flow_id: 0xf00,
file_id: "foo",
};

filestore.store(foo_id, Part {
offset: 0,
content: b"FOO".to_vec(),
file_len: b"FOOBAR".len() as u64,
file_sha256: sha256(b"FOOBAR"),
}).unwrap();

let error = filestore.store(foo_id, Part {
offset: 0,
content: b"FOO".to_vec(),
file_len: b"FOOBAR".len() as u64,
file_sha256: sha256(b"FOOBAR"),
}).unwrap_err();
assert_eq!(error.kind(), std::io::ErrorKind::AlreadyExists);
}

#[test]
fn store_same_offset() {
let tempdir = tempfile::tempdir()
.unwrap();

let filestore = Filestore::init(tempdir.path(), Duration::MAX)
.unwrap();

let foo_id = Id {
flow_id: 0xf00,
file_id: "foo",
};

filestore.store(foo_id, Part {
offset: 0,
content: b"FOO".to_vec(),
file_len: b"FOOBAR".len() as u64,
file_sha256: sha256(b"FOOBAR"),
}).unwrap();

let error = filestore.store(foo_id, Part {
offset: 0,
content: b"FOOBA".to_vec(),
file_len: b"FOOBAR".len() as u64,
file_sha256: sha256(b"FOOBAR"),
}).unwrap_err();
assert_eq!(error.kind(), std::io::ErrorKind::AlreadyExists);
}

#[test]
fn store_overlapping_parts() {
let tempdir = tempfile::tempdir()
Expand Down Expand Up @@ -1123,6 +1148,38 @@ mod tests {
assert_eq!(error.kind(), std::io::ErrorKind::InvalidData);
}

#[test]
fn store_already_complete() {
let tempdir = tempfile::tempdir()
.unwrap();

let filestore = Filestore::init(tempdir.path(), Duration::MAX)
.unwrap();

let foo_id = Id {
flow_id: 0xf00,
file_id: "foo",
};

assert_eq! {
filestore.store(foo_id, Part {
offset: 0,
content: b"FOOBAR".to_vec(),
file_len: b"FOOBAR".len() as u64,
file_sha256: sha256(b"FOOBAR"),
}).unwrap(),
Status::Complete,
};

let error = filestore.store(foo_id, Part {
offset: 0,
content: b"FOOBAR".to_vec(),
file_len: b"FOOBAR".len() as u64,
file_sha256: sha256(b"FOOBAR"),
}).unwrap_err();
assert_eq!(error.kind(), std::io::ErrorKind::AlreadyExists);
}

#[test]
fn delete_single_file() {
let tempdir = tempfile::tempdir()
Expand Down
Loading