Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,7 @@ napi = { version = "2", default-features = false, features = [
"napi5",
"compat-mode",
] }
nohash-hasher = "0.2.0"
notify = "8.1.0"
once_cell = "1.17.1"
owo-colors = "4.2.2"
Expand Down
6 changes: 4 additions & 2 deletions turbopack/crates/turbo-persistence-tools/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ fn main() -> Result<()> {
bail!("The provided path does not exist: {}", path.display());
}

let db: TurboPersistence<SerialScheduler> = TurboPersistence::open_read_only(path)?;
let db: TurboPersistence<SerialScheduler, 0> = TurboPersistence::open_read_only(path)?;
let meta_info = db
.meta_info()
.context("Failed to retrieve meta information")?;
Expand All @@ -34,12 +34,14 @@ fn main() -> Result<()> {
amqf_size,
amqf_entries,
sst_size,
flags,
key_compression_dictionary_size,
block_count,
} in meta_file.entries
{
println!(
" SST {sequence_number:08}.sst: {min_hash:016x} - {max_hash:016x} (p = 1/{})",
" SST {sequence_number:08}.sst: {flags} {min_hash:016x} - {max_hash:016x} (p = \
1/{})",
u64::MAX / (max_hash - min_hash + 1)
);
println!(" AMQF {amqf_entries} entries = {} KiB", amqf_size / 1024);
Expand Down
3 changes: 3 additions & 0 deletions turbopack/crates/turbo-persistence/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@ verbose_log = []

[dependencies]
anyhow = { workspace = true }
bitfield = { workspace = true }
dashmap = { workspace = true}
either = { workspace = true }
pot = "3.0.0"
byteorder = { workspace = true }
jiff = "0.2.10"
lzzzz = "1.1.0"
memmap2 = "0.9.5"
nohash-hasher = { workspace = true }
parking_lot = { workspace = true }
qfilter = { version = "0.2.4", features = ["serde"] }
quick_cache = { workspace = true }
Expand Down
13 changes: 9 additions & 4 deletions turbopack/crates/turbo-persistence/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,14 @@ A meta file can contain metadata about multiple SST files. The metadata is store
- 8 bytes min hash
- 8 bytes max hash
- 8 bytes SST file size
- 4 bytes flags
- bit 0: cold (compacted and not recently accessed)
- bit 1: fresh (not yet compacted)
- 4 bytes end of AMQF offset relative to start of all AMQF data
- 4 bytes end of AMQF offset relative to start of all AMQF data of the "used key hashes" AMQF
- foreach described SST file
- serialized AMQF
- serialized "used key hashes" AMQF

### SST file

Expand Down Expand Up @@ -169,7 +174,7 @@ Compaction chooses a few SST files and runs the merge step of merge sort on tham

Example:

```
``` text
key hash range: | 0 ... u64::MAX |
SST 1: |----------------|
SST 2: |----------------|
Expand All @@ -178,7 +183,7 @@ SST 3: |-----|

can be compacted into:

```
``` text
key hash range: | 0 ... u64::MAX |
SST 1': |-------|
SST 2': |------|
Expand Down Expand Up @@ -206,7 +211,7 @@ Full example:

Example:

```
``` text
key hash range: | 0 ... u64::MAX | Family
SST 1: |-| 1
SST 2: |----------------| 1
Expand Down Expand Up @@ -234,7 +239,7 @@ Then we delete SST files 2, 3, 6 and 4, 5, 8 and 7, 9. The

SST files 1 stays unchanged.

```
``` text
key hash range: | 0 ... u64::MAX | Family
SST 1: |-| 1
SST 10: |-----| 1
Expand Down
37 changes: 28 additions & 9 deletions turbopack/crates/turbo-persistence/src/compaction/selector.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::ops::RangeInclusive;

use rustc_hash::FxHashMap;
use smallvec::{SmallVec, smallvec};

use crate::compaction::interval_map::IntervalMap;
Expand All @@ -12,6 +13,12 @@ pub trait Compactable {

/// The size of the compactable database segment in bytes.
fn size(&self) -> u64;

/// The category of the compactable. Overlap between different categories is not considered for
/// compaction.
fn category(&self) -> u8 {
0
}
}

fn is_overlapping(a: &RangeInclusive<u64>, b: &RangeInclusive<u64>) -> bool {
Expand All @@ -37,6 +44,7 @@ fn extend_range(a: &mut RangeInclusive<u64>, b: &RangeInclusive<u64>) -> bool {
extended
}

#[cfg(test)]
#[derive(Debug)]
pub struct CompactableMetrics {
/// The total coverage of the compactables.
Expand All @@ -53,6 +61,7 @@ pub struct CompactableMetrics {
}

/// Computes metrics about the compactables.
#[cfg(test)]
pub fn compute_metrics<T: Compactable>(
compactables: &[T],
full_range: RangeInclusive<u64>,
Expand Down Expand Up @@ -155,7 +164,7 @@ impl DuplicationInfo {
/// Get a value in the range `0..=u64` that represents the estimated amount of duplication
/// across the given range. The units are arbitrary, but linear.
fn duplication(&self, range: &RangeInclusive<u64>) -> u64 {
if self.total_size == 0 {
if self.max_size == self.total_size {
return 0;
}
// the maximum numerator value is `u64::MAX + 1`
Expand All @@ -167,6 +176,7 @@ impl DuplicationInfo {
}

/// The estimated size (in bytes) of a database segment containing `range` keys.
#[cfg(test)]
fn size(&self, range: &RangeInclusive<u64>) -> u64 {
if self.total_size == 0 {
return 0;
Expand All @@ -190,11 +200,14 @@ impl DuplicationInfo {
}
}

fn total_duplication_size(duplication: &IntervalMap<Option<DuplicationInfo>>) -> u64 {
fn total_duplication_size(duplication: &IntervalMap<FxHashMap<u8, DuplicationInfo>>) -> u64 {
duplication
.iter()
.flat_map(|(range, info)| Some((range, info.as_ref()?)))
.map(|(range, info)| info.duplication(&range))
.map(|(range, info)| {
info.values()
.map(|info| info.duplication(&range))
.sum::<u64>()
})
.sum()
}

Expand Down Expand Up @@ -238,16 +251,18 @@ pub fn get_merge_segments<T: Compactable>(
}
let start_compactable_range = start_compactable.range();
let start_compactable_size = start_compactable.size();
let start_compactable_category = start_compactable.category();
let mut current_range = start_compactable_range.clone();

// We might need to restart the search if we need to extend the range.
'search: loop {
let mut current_set = smallvec![start_index];
let mut current_size = start_compactable_size;
let mut duplication = IntervalMap::<Option<DuplicationInfo>>::new();
let mut duplication = IntervalMap::<FxHashMap<u8, DuplicationInfo>>::new();
duplication.update(start_compactable_range.clone(), |dup_info| {
dup_info
.get_or_insert_default()
.entry(start_compactable_category)
.or_default()
.add(start_compactable_size, &start_compactable_range);
});
let mut current_skip = 0;
Expand Down Expand Up @@ -337,8 +352,9 @@ pub fn get_merge_segments<T: Compactable>(
// set.
current_set.push(next_index);
current_size += size;
let category = compactable.category();
duplication.update(range.clone(), |dup_info| {
dup_info.get_or_insert_default().add(size, &range);
dup_info.entry(category).or_default().add(size, &range);
});
}
}
Expand Down Expand Up @@ -633,13 +649,16 @@ mod tests {

let new_metrics = compute_metrics(&containers, 0..=KEY_RANGE);
println!(
"Compaction done: coverage: {} ({}), overlap: {} ({}), duplication: {} ({})",
"Compaction done: coverage: {} ({}), overlap: {} ({}), duplication: {} ({}), \
duplicated_size: {} ({})",
new_metrics.coverage,
new_metrics.coverage - metrics.coverage,
new_metrics.overlap,
new_metrics.overlap - metrics.overlap,
new_metrics.duplication,
new_metrics.duplication - metrics.duplication
new_metrics.duplication - metrics.duplication,
new_metrics.duplicated_size,
(new_metrics.duplicated_size as f32) - metrics.duplicated_size as f32,
);
} else {
println!("No compaction needed");
Expand Down
1 change: 0 additions & 1 deletion turbopack/crates/turbo-persistence/src/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::{mem::MaybeUninit, sync::Arc};
use anyhow::{Context, Result};
use lzzzz::lz4::{ACC_LEVEL_DEFAULT, decompress, decompress_with_dict};

#[tracing::instrument(level = "trace", skip_all, name = "decompress database block")]
pub fn decompress_into_arc(
uncompressed_length: u32,
block: &[u8],
Expand Down
Loading
Loading