Skip to content

Commit b9c60d3

Browse files
committed
use more flags
1 parent ee2fe0f commit b9c60d3

File tree

9 files changed

+75
-32
lines changed

9 files changed

+75
-32
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

turbopack/crates/turbo-persistence-tools/src/main.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,14 @@ fn main() -> Result<()> {
3434
amqf_size,
3535
amqf_entries,
3636
sst_size,
37-
cold,
37+
flags,
3838
key_compression_dictionary_size,
3939
block_count,
4040
} in meta_file.entries
4141
{
4242
println!(
43-
" {} SST {sequence_number:08}.sst: {min_hash:016x} - {max_hash:016x} (p = 1/{})",
44-
if cold { "COLD" } else { "HOT" },
43+
" SST {sequence_number:08}.sst: {} {min_hash:016x} - {max_hash:016x} (p = 1/{})",
44+
flags,
4545
u64::MAX / (max_hash - min_hash + 1)
4646
);
4747
println!(" AMQF {amqf_entries} entries = {} KiB", amqf_size / 1024);

turbopack/crates/turbo-persistence/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ print_stats = ["stats"]
1313

1414
[dependencies]
1515
anyhow = { workspace = true }
16+
bitfield = { workspace = true }
1617
dashmap = { workspace = true}
1718
either = { workspace = true }
1819
pot = "3.0.0"

turbopack/crates/turbo-persistence/README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,9 @@ A meta file can contain metadata about multiple SST files. The metadata is store
4949
- 8 bytes min hash
5050
- 8 bytes max hash
5151
- 8 bytes SST file size
52-
- 4 bytes flags (bit 0: cold)
52+
- 4 bytes flags
53+
- bit 0: cold (compacted and not recently accessed)
54+
- bit 1: fresh (not yet compacted)
5355
- 4 bytes end of AMQF offset relative to start of all AMQF data
5456
- 4 bytes end of AMQF offset relative to start of all AMQF data of the "used key hashes" AMQF
5557
- foreach described SST file

turbopack/crates/turbo-persistence/src/db.rs

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use crate::{
3131
key::{StoreKey, hash_key},
3232
lookup_entry::{LookupEntry, LookupValue},
3333
merge_iter::MergeIter,
34-
meta_file::{AmqfCache, MetaFile, MetaLookupResult, StaticSortedFileRange},
34+
meta_file::{AmqfCache, MetaEntryFlags, MetaFile, MetaLookupResult, StaticSortedFileRange},
3535
meta_file_builder::MetaFileBuilder,
3636
parallel_scheduler::ParallelScheduler,
3737
sst_filter::SstFilter,
@@ -549,8 +549,8 @@ impl<S: ParallelScheduler, const FAMILIES: usize> TurboPersistence<S, FAMILIES>
549549
let seq = entry.sequence_number();
550550
let range = entry.range();
551551
let size = entry.size();
552-
let cold = entry.cold();
553-
(seq, range.min_hash, range.max_hash, size, cold)
552+
let flags = entry.flags();
553+
(seq, range.min_hash, range.max_hash, size, flags)
554554
})
555555
.collect::<Vec<_>>();
556556
(
@@ -642,12 +642,12 @@ impl<S: ParallelScheduler, const FAMILIES: usize> TurboPersistence<S, FAMILIES>
642642
writeln!(log, "Commit {seq:08} {keys_written} keys in {span:#}")?;
643643
for (seq, family, ssts, obsolete) in new_meta_info {
644644
writeln!(log, "{seq:08} META family:{family}",)?;
645-
for (seq, min, max, size, cold) in ssts {
645+
for (seq, min, max, size, flags) in ssts {
646646
writeln!(
647647
log,
648648
" {seq:08} SST {min:016x}-{max:016x} {} MiB ({})",
649649
size / 1024 / 1024,
650-
if cold { "cold" } else { "warm" }
650+
flags
651651
)?;
652652
}
653653
for seq in obsolete {
@@ -930,7 +930,7 @@ impl<S: ParallelScheduler, const FAMILIES: usize> TurboPersistence<S, FAMILIES>
930930
.key_compression_dictionary_length(),
931931
block_count: entry.block_count(),
932932
size: entry.size(),
933-
cold: entry.cold(),
933+
flags: entry.flags(),
934934
entries: 0,
935935
};
936936
return Ok(PartialMergeResult::Move {
@@ -945,7 +945,7 @@ impl<S: ParallelScheduler, const FAMILIES: usize> TurboPersistence<S, FAMILIES>
945945
total_key_size: usize,
946946
path: &Path,
947947
seq: u32,
948-
cold: bool,
948+
flags: MetaEntryFlags,
949949
) -> Result<(u32, File, StaticSortedFileBuilderMeta<'static>)>
950950
{
951951
let _span = tracing::trace_span!("write merged sst file").entered();
@@ -954,7 +954,7 @@ impl<S: ParallelScheduler, const FAMILIES: usize> TurboPersistence<S, FAMILIES>
954954
entries,
955955
total_key_size,
956956
&path.join(format!("{seq:08}.sst")),
957-
cold,
957+
flags,
958958
)
959959
})?;
960960
Ok((seq, file, meta))
@@ -1036,13 +1036,16 @@ impl<S: ParallelScheduler, const FAMILIES: usize> TurboPersistence<S, FAMILIES>
10361036
+ 1;
10371037

10381038
keys_written += collector.entries.len() as u64;
1039+
let mut flags = MetaEntryFlags::default();
1040+
flags.set_cold(!is_used);
1041+
10391042
new_sst_files.push(create_sst_file(
10401043
&self.parallel_scheduler,
10411044
&collector.entries,
10421045
selected_total_key_size,
10431046
path,
10441047
seq,
1045-
!is_used,
1048+
flags,
10461049
)?);
10471050

10481051
collector.entries.clear();
@@ -1073,9 +1076,10 @@ impl<S: ParallelScheduler, const FAMILIES: usize> TurboPersistence<S, FAMILIES>
10731076
}
10741077

10751078
// If we have one set of entries left, write them to a new SST file
1076-
for (collector, cold) in
1077-
[(&mut used_collector, false), (&mut unused_collector, true)]
1078-
{
1079+
for (collector, flags) in [
1080+
(&mut used_collector, MetaEntryFlags::WARM),
1081+
(&mut unused_collector, MetaEntryFlags::COLD),
1082+
] {
10791083
if collector.last_entries.is_empty()
10801084
&& !collector.entries.is_empty()
10811085
{
@@ -1088,7 +1092,7 @@ impl<S: ParallelScheduler, const FAMILIES: usize> TurboPersistence<S, FAMILIES>
10881092
collector.total_key_size,
10891093
path,
10901094
seq,
1091-
cold,
1095+
flags,
10921096
)?);
10931097
} else
10941098
// If we have two sets of entries left, merge them and
@@ -1115,7 +1119,7 @@ impl<S: ParallelScheduler, const FAMILIES: usize> TurboPersistence<S, FAMILIES>
11151119
collector.last_entries_total_key_size / 2,
11161120
path,
11171121
seq1,
1118-
cold,
1122+
flags,
11191123
)?);
11201124

11211125
keys_written += part2.len() as u64;
@@ -1125,7 +1129,7 @@ impl<S: ParallelScheduler, const FAMILIES: usize> TurboPersistence<S, FAMILIES>
11251129
collector.last_entries_total_key_size / 2,
11261130
path,
11271131
seq2,
1128-
cold,
1132+
flags,
11291133
)?);
11301134
}
11311135
}
@@ -1323,7 +1327,7 @@ impl<S: ParallelScheduler, const FAMILIES: usize> TurboPersistence<S, FAMILIES>
13231327
min_hash: entry.min_hash(),
13241328
max_hash: entry.max_hash(),
13251329
sst_size: entry.size(),
1326-
cold: entry.cold(),
1330+
flags: entry.flags(),
13271331
amqf_size: entry.amqf_size(),
13281332
amqf_entries: amqf.len(),
13291333
key_compression_dictionary_size: entry
@@ -1364,7 +1368,7 @@ pub struct MetaFileEntryInfo {
13641368
pub amqf_size: u32,
13651369
pub amqf_entries: usize,
13661370
pub sst_size: u64,
1367-
pub cold: bool,
1371+
pub flags: MetaEntryFlags,
13681372
pub key_compression_dictionary_size: u16,
13691373
pub block_count: u16,
13701374
}

turbopack/crates/turbo-persistence/src/meta_file.rs

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::{
2+
fmt::Display,
23
fs::File,
34
hash::BuildHasherDefault,
45
io::{BufReader, Seek},
@@ -8,6 +9,7 @@ use std::{
89
};
910

1011
use anyhow::{Context, Result, bail};
12+
use bitfield::bitfield;
1113
use byteorder::{BE, ReadBytesExt};
1214
use either::Either;
1315
use memmap2::{Mmap, MmapOptions};
@@ -31,6 +33,35 @@ impl quick_cache::Weighter<u32, Arc<qfilter::Filter>> for AmqfWeighter {
3133
pub type AmqfCache =
3234
quick_cache::sync::Cache<u32, Arc<qfilter::Filter>, AmqfWeighter, BuildHasherDefault<FxHasher>>;
3335

36+
bitfield! {
37+
#[derive(Clone, Copy, Default)]
38+
pub struct MetaEntryFlags(u32);
39+
impl Debug;
40+
impl From<u32>;
41+
/// The SST file was compacted and none of the entries have been accessed recently.
42+
pub cold, set_cold: 0;
43+
/// The SST file was freshly written and has not been compacted yet.
44+
pub fresh, set_fresh: 1;
45+
}
46+
47+
impl MetaEntryFlags {
48+
pub const FRESH: MetaEntryFlags = MetaEntryFlags(0b10);
49+
pub const COLD: MetaEntryFlags = MetaEntryFlags(0b01);
50+
pub const WARM: MetaEntryFlags = MetaEntryFlags(0b00);
51+
}
52+
53+
impl Display for MetaEntryFlags {
54+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55+
if self.fresh() {
56+
write!(f, "fresh")
57+
} else if self.cold() {
58+
write!(f, "cold")
59+
} else {
60+
write!(f, "warm")
61+
}
62+
}
63+
}
64+
3465
pub struct MetaEntry {
3566
/// The metadata for the static sorted file.
3667
sst_data: StaticSortedFileMetaData,
@@ -42,8 +73,8 @@ pub struct MetaEntry {
4273
max_hash: u64,
4374
/// The size of the SST file in bytes.
4475
size: u64,
45-
/// If the file is cold or warm.
46-
cold: bool,
76+
/// The status flags for this entry.
77+
flags: MetaEntryFlags,
4778
/// The offset of the start of the AMQF data in the meta file relative to the end of the
4879
/// header.
4980
start_of_amqf_data_offset: u32,
@@ -66,8 +97,8 @@ impl MetaEntry {
6697
self.size
6798
}
6899

69-
pub fn cold(&self) -> bool {
70-
self.cold
100+
pub fn flags(&self) -> MetaEntryFlags {
101+
self.flags
71102
}
72103

73104
pub fn amqf_size(&self) -> u32 {
@@ -236,7 +267,7 @@ impl MetaFile {
236267
min_hash: file.read_u64::<BE>()?,
237268
max_hash: file.read_u64::<BE>()?,
238269
size: file.read_u64::<BE>()?,
239-
cold: file.read_u32::<BE>()? != 0,
270+
flags: MetaEntryFlags(file.read_u32::<BE>()?),
240271
start_of_amqf_data_offset,
241272
end_of_amqf_data_offset: file.read_u32::<BE>()?,
242273
amqf: OnceLock::new(),

turbopack/crates/turbo-persistence/src/meta_file_builder.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ impl<'a> MetaFileBuilder<'a> {
7070
file.write_u64::<BE>(sst.min_hash)?;
7171
file.write_u64::<BE>(sst.max_hash)?;
7272
file.write_u64::<BE>(sst.size)?;
73-
file.write_u32::<BE>(if sst.cold { 1 } else { 0 })?;
73+
file.write_u32::<BE>(sst.flags.0)?;
7474
amqf_offset += sst.amqf.len();
7575
file.write_u32::<BE>(amqf_offset as u32)?;
7676
}

turbopack/crates/turbo-persistence/src/static_sorted_file_builder.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use byteorder::{BE, ByteOrder, WriteBytesExt};
1111

1212
use crate::{
1313
compression::compress_into_buffer,
14+
meta_file::MetaEntryFlags,
1415
static_sorted_file::{
1516
BLOCK_TYPE_INDEX, BLOCK_TYPE_KEY, KEY_BLOCK_ENTRY_TYPE_BLOB, KEY_BLOCK_ENTRY_TYPE_DELETED,
1617
KEY_BLOCK_ENTRY_TYPE_MEDIUM, KEY_BLOCK_ENTRY_TYPE_SMALL,
@@ -88,8 +89,8 @@ pub struct StaticSortedFileBuilderMeta<'a> {
8889
pub block_count: u16,
8990
/// The file size of the SST file
9091
pub size: u64,
91-
/// If the file is cold or warm
92-
pub cold: bool,
92+
/// The status flags for this SST file
93+
pub flags: MetaEntryFlags,
9394
/// The number of entries in the SST file
9495
pub entries: u64,
9596
}
@@ -98,7 +99,7 @@ pub fn write_static_stored_file<E: Entry>(
9899
entries: &[E],
99100
total_key_size: usize,
100101
file: &Path,
101-
cold: bool,
102+
flags: MetaEntryFlags,
102103
) -> Result<(StaticSortedFileBuilderMeta<'static>, File)> {
103104
debug_assert!(entries.iter().map(|e| e.key_hash()).is_sorted());
104105

@@ -144,7 +145,7 @@ pub fn write_static_stored_file<E: Entry>(
144145
key_compression_dictionary_length: key_dict.len().try_into().unwrap(),
145146
block_count,
146147
size: file.stream_position()?,
147-
cold,
148+
flags,
148149
entries: entries.len() as u64,
149150
};
150151
Ok((meta, file.into_inner()?))

turbopack/crates/turbo-persistence/src/write_batch.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use crate::{
2121
compression::compress_into_buffer,
2222
constants::{MAX_MEDIUM_VALUE_SIZE, THREAD_LOCAL_SIZE_SHIFT},
2323
key::StoreKey,
24+
meta_file::MetaEntryFlags,
2425
meta_file_builder::MetaFileBuilder,
2526
parallel_scheduler::ParallelScheduler,
2627
static_sorted_file_builder::{StaticSortedFileBuilderMeta, write_static_stored_file},
@@ -420,7 +421,9 @@ impl<K: StoreKey + Send + Sync, S: ParallelScheduler, const FAMILIES: usize>
420421
let path = self.db_path.join(format!("{seq:08}.sst"));
421422
let (meta, file) = self
422423
.parallel_scheduler
423-
.block_in_place(|| write_static_stored_file(entries, total_key_size, &path, true))
424+
.block_in_place(|| {
425+
write_static_stored_file(entries, total_key_size, &path, MetaEntryFlags::FRESH)
426+
})
424427
.with_context(|| format!("Unable to write SST file {seq:08}.sst"))?;
425428

426429
#[cfg(feature = "verify_sst_content")]

0 commit comments

Comments
 (0)