From 88943de14237b8f12655cd3c26670adb5fd135af Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Fri, 31 Oct 2025 15:45:21 +0100 Subject: [PATCH 01/13] capture used key hashes and store them in the meta file --- Cargo.lock | 8 ++ Cargo.toml | 1 + .../turbo-persistence-tools/src/main.rs | 2 +- turbopack/crates/turbo-persistence/Cargo.toml | 2 + turbopack/crates/turbo-persistence/README.md | 2 + turbopack/crates/turbo-persistence/src/db.rs | 81 +++++++++++++------ .../crates/turbo-persistence/src/meta_file.rs | 11 +++ .../src/meta_file_builder.rs | 20 +++++ .../crates/turbo-persistence/src/tests.rs | 45 ++++++----- .../turbo-persistence/src/write_batch.rs | 11 ++- .../src/database/turbo/mod.rs | 12 ++- 11 files changed, 140 insertions(+), 55 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index dfabf8ac4203e..e1037d83e1a0f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4501,6 +4501,12 @@ dependencies = [ "libc", ] +[[package]] +name = "nohash-hasher" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2bf50223579dc7cdcfb3bfcacf7069ff68243f8c363f62ffa99cf000a6b9c451" + [[package]] name = "nom" version = "5.1.3" @@ -9101,10 +9107,12 @@ version = "0.1.0" dependencies = [ "anyhow", "byteorder", + "dashmap 6.1.0", "either", "jiff", "lzzzz", "memmap2 0.9.5", + "nohash-hasher", "parking_lot", "pot", "qfilter", diff --git a/Cargo.toml b/Cargo.toml index 84f520685ad34..2a79dd1fb7372 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -423,6 +423,7 @@ napi = { version = "2", default-features = false, features = [ "napi5", "compat-mode", ] } +nohash-hasher = "0.2.0" notify = "8.1.0" once_cell = "1.17.1" owo-colors = "4.2.2" diff --git a/turbopack/crates/turbo-persistence-tools/src/main.rs b/turbopack/crates/turbo-persistence-tools/src/main.rs index a1b2bb15a1f09..d23e257822e6f 100644 --- a/turbopack/crates/turbo-persistence-tools/src/main.rs +++ b/turbopack/crates/turbo-persistence-tools/src/main.rs @@ -16,7 +16,7 @@ fn main() -> Result<()> { bail!("The provided path does not exist: {}", path.display()); } - let db: TurboPersistence = TurboPersistence::open_read_only(path)?; + let db: TurboPersistence = TurboPersistence::open_read_only(path)?; let meta_info = db .meta_info() .context("Failed to retrieve meta information")?; diff --git a/turbopack/crates/turbo-persistence/Cargo.toml b/turbopack/crates/turbo-persistence/Cargo.toml index 4f0b035eb4e8f..f5d36f510fb41 100644 --- a/turbopack/crates/turbo-persistence/Cargo.toml +++ b/turbopack/crates/turbo-persistence/Cargo.toml @@ -14,12 +14,14 @@ verbose_log = [] [dependencies] anyhow = { workspace = true } +dashmap = { workspace = true} either = { workspace = true } pot = "3.0.0" byteorder = { workspace = true } jiff = "0.2.10" lzzzz = "1.1.0" memmap2 = "0.9.5" +nohash-hasher = { workspace = true } parking_lot = { workspace = true } qfilter = { version = "0.2.4", features = ["serde"] } quick_cache = { workspace = true } diff --git a/turbopack/crates/turbo-persistence/README.md b/turbopack/crates/turbo-persistence/README.md index 51baa4c5a457c..7ee9d2bc09e2a 100644 --- a/turbopack/crates/turbo-persistence/README.md +++ b/turbopack/crates/turbo-persistence/README.md @@ -50,8 +50,10 @@ A meta file can contain metadata about multiple SST files. The metadata is store - 8 bytes max hash - 8 bytes SST file size - 4 bytes end of AMQF offset relative to start of all AMQF data + - 4 bytes end of AMQF offset relative to start of all AMQF data of the "used key hashes" AMQF - foreach described SST file - serialized AMQF +- serialized "used key hashes" AMQF ### SST file diff --git a/turbopack/crates/turbo-persistence/src/db.rs b/turbopack/crates/turbo-persistence/src/db.rs index f1c03d1b58cfc..777ed03dd013e 100644 --- a/turbopack/crates/turbo-persistence/src/db.rs +++ b/turbopack/crates/turbo-persistence/src/db.rs @@ -11,8 +11,10 @@ use std::{ use anyhow::{Context, Result, bail}; use byteorder::{BE, ReadBytesExt, WriteBytesExt}; +use dashmap::DashSet; use jiff::Timestamp; use memmap2::Mmap; +use nohash_hasher::BuildNoHashHasher; use parking_lot::{Mutex, RwLock}; use smallvec::SmallVec; @@ -105,7 +107,7 @@ struct TrackedStats { /// TurboPersistence is a persistent key-value store. It is limited to a single writer at a time /// using a single write batch. It allows for concurrent reads. -pub struct TurboPersistence { +pub struct TurboPersistence { parallel_scheduler: S, /// The path to the directory where the database is stored path: PathBuf, @@ -113,7 +115,7 @@ pub struct TurboPersistence { /// no modification on the database is performed. read_only: bool, /// The inner state of the database. Writing will update that. - inner: RwLock, + inner: RwLock>, /// A flag to indicate if a write operation is currently active. Prevents multiple concurrent /// write operations. active_write_operation: AtomicBool, @@ -129,11 +131,15 @@ pub struct TurboPersistence { } /// The inner state of the database. -struct Inner { +struct Inner { /// The list of meta files in the database. This is used to derive the SST files. meta_files: Vec, /// The current sequence number for the database. current_sequence_number: u32, + /// The in progress set of hashes of keys that have been accessed. + /// It will be flushed onto disk (into a meta file) on next commit. + /// It's a dashset to allow modification while only tracking a read lock on Inner. + accessed_key_hashes: [DashSet>; FAMILIES], } pub struct CommitOptions { @@ -146,7 +152,7 @@ pub struct CommitOptions { keys_written: u64, } -impl TurboPersistence { +impl TurboPersistence { /// Open a TurboPersistence database at the given path. /// This will read the directory and might performance cleanup when the database was not closed /// properly. Cleanup only requires to read a few bytes from a few files and to delete @@ -162,7 +168,7 @@ impl TurboPersistence { } } -impl TurboPersistence { +impl TurboPersistence { fn new(path: PathBuf, read_only: bool, parallel_scheduler: S) -> Self { Self { parallel_scheduler, @@ -171,6 +177,8 @@ impl TurboPersistence { inner: RwLock::new(Inner { meta_files: Vec::new(), current_sequence_number: 0, + accessed_key_hashes: [(); FAMILIES] + .map(|_| DashSet::with_hasher(BuildNoHashHasher::default())), }), active_write_operation: AtomicBool::new(false), amqf_cache: AmqfCache::with( @@ -406,7 +414,7 @@ impl TurboPersistence { /// time. The WriteBatch need to be committed with [`TurboPersistence::commit_write_batch`]. /// Note that the WriteBatch might start writing data to disk while it's filled up with data. /// This data will only become visible after the WriteBatch is committed. - pub fn write_batch( + pub fn write_batch( &self, ) -> Result> { if self.read_only { @@ -444,7 +452,7 @@ impl TurboPersistence { /// Commits a WriteBatch to the database. This will finish writing the data to disk and make it /// visible to readers. - pub fn commit_write_batch( + pub fn commit_write_batch( &self, mut write_batch: WriteBatch, ) -> Result<()> { @@ -457,7 +465,27 @@ impl TurboPersistence { new_sst_files, new_blob_files, keys_written, - } = write_batch.finish()?; + } = write_batch.finish(|family| { + let inner = self.inner.read(); + let set = &inner.accessed_key_hashes[family as usize]; + // len is only a snapshot at that time and it can change while we create the filter. + // So we give it 5% more space to make resizes less likely. + let initial_capacity = set.len() * 19 / 20; + let mut amqf = + qfilter::Filter::with_fingerprint_size(initial_capacity as u64, u64::BITS as u8) + .unwrap(); + // This drains items from the set. But due to concurrency it might not be empty + // afterwards, but that's fine. It will be part of the next commit. + set.retain(|hash| { + // Performance-wise it would usually be better to insert sorted fingerprints, but we + // assume that hashes are equally distributed, which makes it unnecessary. + // Good for cache locality is that we insert in the order of the dashset's buckets. + amqf.insert_fingerprint(false, *hash) + .expect("Failed to insert fingerprint"); + false + }); + amqf + })?; self.commit(CommitOptions { new_meta_files, new_sst_files, @@ -1269,24 +1297,27 @@ impl TurboPersistence { self.stats.miss_amqf.fetch_add(1, Ordering::Relaxed); } MetaLookupResult::SstLookup(result) => match result { - SstLookupResult::Found(result) => match result { - LookupValue::Deleted => { - #[cfg(feature = "stats")] - self.stats.hits_deleted.fetch_add(1, Ordering::Relaxed); - return Ok(None); - } - LookupValue::Slice { value } => { - #[cfg(feature = "stats")] - self.stats.hits_small.fetch_add(1, Ordering::Relaxed); - return Ok(Some(value)); - } - LookupValue::Blob { sequence_number } => { - #[cfg(feature = "stats")] - self.stats.hits_blob.fetch_add(1, Ordering::Relaxed); - let blob = self.read_blob(sequence_number)?; - return Ok(Some(blob)); + SstLookupResult::Found(result) => { + inner.accessed_key_hashes[family].insert(hash); + match result { + LookupValue::Deleted => { + #[cfg(feature = "stats")] + self.stats.hits_deleted.fetch_add(1, Ordering::Relaxed); + return Ok(None); + } + LookupValue::Slice { value } => { + #[cfg(feature = "stats")] + self.stats.hits_small.fetch_add(1, Ordering::Relaxed); + return Ok(Some(value)); + } + LookupValue::Blob { sequence_number } => { + #[cfg(feature = "stats")] + self.stats.hits_blob.fetch_add(1, Ordering::Relaxed); + let blob = self.read_blob(sequence_number)?; + return Ok(Some(blob)); + } } - }, + } SstLookupResult::NotFound => { #[cfg(feature = "stats")] self.stats.miss_key.fetch_add(1, Ordering::Relaxed); diff --git a/turbopack/crates/turbo-persistence/src/meta_file.rs b/turbopack/crates/turbo-persistence/src/meta_file.rs index 3c0f1b3aa755e..16b291bcd0e1a 100644 --- a/turbopack/crates/turbo-persistence/src/meta_file.rs +++ b/turbopack/crates/turbo-persistence/src/meta_file.rs @@ -183,6 +183,12 @@ pub struct MetaFile { obsolete_entries: Vec, /// The obsolete SST files. obsolete_sst_files: Vec, + /// The offset of the start of the "used keys" AMQF data in the meta file relative to the end + /// of the header. + start_of_used_keys_amqf_data_offset: u32, + /// The offset of the end of the "used keys" AMQF data in the the meta file relative to the end + /// of the header. + end_of_used_keys_amqf_data_offset: u32, /// The memory mapped file. mmap: Mmap, } @@ -232,6 +238,9 @@ impl MetaFile { start_of_amqf_data_offset = entry.end_of_amqf_data_offset; entries.push(entry); } + let start_of_used_keys_amqf_data_offset = start_of_amqf_data_offset; + let end_of_used_keys_amqf_data_offset = file.read_u32::()?; + let offset = file.stream_position()?; let file = file.into_inner(); let mut options = MmapOptions::new(); @@ -246,6 +255,8 @@ impl MetaFile { entries, obsolete_entries: Vec::new(), obsolete_sst_files, + start_of_used_keys_amqf_data_offset, + end_of_used_keys_amqf_data_offset, mmap, }; Ok(file) diff --git a/turbopack/crates/turbo-persistence/src/meta_file_builder.rs b/turbopack/crates/turbo-persistence/src/meta_file_builder.rs index 6783175368371..1e34c8f75cd5b 100644 --- a/turbopack/crates/turbo-persistence/src/meta_file_builder.rs +++ b/turbopack/crates/turbo-persistence/src/meta_file_builder.rs @@ -6,6 +6,7 @@ use std::{ use anyhow::{Context, Result}; use byteorder::{BE, WriteBytesExt}; +use qfilter::Filter; use crate::static_sorted_file_builder::StaticSortedFileBuilderMeta; @@ -15,6 +16,8 @@ pub struct MetaFileBuilder<'a> { entries: Vec<(u32, StaticSortedFileBuilderMeta<'a>)>, /// Obsolete SST files, represented by their sequence numbers obsolete_sst_files: Vec, + /// Optional AMQF for used key hashes + used_key_hashes_amqf: Option, } impl<'a> MetaFileBuilder<'a> { @@ -23,6 +26,7 @@ impl<'a> MetaFileBuilder<'a> { family, entries: Vec::new(), obsolete_sst_files: Vec::new(), + used_key_hashes_amqf: None, } } @@ -34,6 +38,10 @@ impl<'a> MetaFileBuilder<'a> { self.obsolete_sst_files.push(sequence_number); } + pub fn set_used_key_hashes_amqf(&mut self, amqf: Filter) { + self.used_key_hashes_amqf = Some(amqf); + } + #[tracing::instrument(level = "trace", skip_all)] pub fn write(self, db_path: &Path, seq: u32) -> Result { let file = db_path.join(format!("{seq:08}.meta")); @@ -65,10 +73,22 @@ impl<'a> MetaFileBuilder<'a> { amqf_offset += sst.amqf.len(); file.write_u32::(amqf_offset as u32)?; } + let serialized_used_key_hashes = self + .used_key_hashes_amqf + .as_ref() + .map(|f| pot::to_vec(f).expect("AMQF serialization failed")); + amqf_offset += serialized_used_key_hashes + .as_ref() + .map(|bytes| bytes.len()) + .unwrap_or(0); + file.write_u32::(amqf_offset as u32)?; for (_, sst) in &self.entries { file.write_all(&sst.amqf)?; } + if let Some(bytes) = &serialized_used_key_hashes { + file.write_all(bytes)?; + } Ok(file.into_inner()?) } } diff --git a/turbopack/crates/turbo-persistence/src/tests.rs b/turbopack/crates/turbo-persistence/src/tests.rs index a52f7e9dd44b0..05691f27e8e77 100644 --- a/turbopack/crates/turbo-persistence/src/tests.rs +++ b/turbopack/crates/turbo-persistence/src/tests.rs @@ -109,20 +109,21 @@ fn full_cycle() -> Result<()> { type TestCases = Vec<( &'static str, Box, RayonParallelScheduler, 16>) -> Result<()>>, - Box) -> Result<()>>, + Box) -> Result<()>>, )>; fn test_case( test_cases: &mut TestCases, name: &'static str, write: impl Fn(&mut WriteBatch, RayonParallelScheduler, 16>) -> Result<()> + 'static, - read: impl Fn(&TurboPersistence) -> Result<()> + 'static, + read: impl Fn(&TurboPersistence) -> Result<()> + 'static, ) { test_cases.push(( name, Box::new(write) as Box, RayonParallelScheduler, 16>) -> Result<()>>, - Box::new(read) as Box) -> Result<()>>, + Box::new(read) + as Box) -> Result<()>>, )); } @@ -506,7 +507,7 @@ fn persist_changes() -> Result<()> { } Ok(()) } - fn check(db: &TurboPersistence, key: u8, value: u8) -> Result<()> { + fn check(db: &TurboPersistence, key: u8, value: u8) -> Result<()> { for i in 0..READ_COUNT { // read every 10th item let i = i * 10; @@ -519,11 +520,11 @@ fn persist_changes() -> Result<()> { } { - let db = TurboPersistence::open_with_parallel_scheduler( + let db = TurboPersistence::<_, 1>::open_with_parallel_scheduler( path.to_path_buf(), RayonParallelScheduler, )?; - let b = db.write_batch::<_, 1>()?; + let b = db.write_batch()?; put(&b, 1, 11)?; put(&b, 2, 21)?; put(&b, 3, 31)?; @@ -538,11 +539,11 @@ fn persist_changes() -> Result<()> { println!("---"); { - let db = TurboPersistence::open_with_parallel_scheduler( + let db = TurboPersistence::<_, 1>::open_with_parallel_scheduler( path.to_path_buf(), RayonParallelScheduler, )?; - let b = db.write_batch::<_, 1>()?; + let b = db.write_batch()?; put(&b, 1, 12)?; put(&b, 2, 22)?; db.commit_write_batch(b)?; @@ -555,11 +556,11 @@ fn persist_changes() -> Result<()> { } { - let db = TurboPersistence::open_with_parallel_scheduler( + let db = TurboPersistence::<_, 1>::open_with_parallel_scheduler( path.to_path_buf(), RayonParallelScheduler, )?; - let b = db.write_batch::<_, 1>()?; + let b = db.write_batch()?; put(&b, 1, 13)?; db.commit_write_batch(b)?; @@ -638,7 +639,7 @@ fn partial_compaction() -> Result<()> { } Ok(()) } - fn check(db: &TurboPersistence, key: u8, value: u8) -> Result<()> { + fn check(db: &TurboPersistence, key: u8, value: u8) -> Result<()> { for i in 0..READ_COUNT { // read every 10th item let i = i * 10; @@ -655,11 +656,11 @@ fn partial_compaction() -> Result<()> { println!("--- Iteration {i} ---"); println!("Add more entries"); { - let db = TurboPersistence::open_with_parallel_scheduler( + let db = TurboPersistence::<_, 1>::open_with_parallel_scheduler( path.to_path_buf(), RayonParallelScheduler, )?; - let b = db.write_batch::<_, 1>()?; + let b = db.write_batch()?; put(&b, i, i)?; put(&b, i + 1, i)?; put(&b, i + 2, i)?; @@ -677,7 +678,7 @@ fn partial_compaction() -> Result<()> { println!("Compaction"); { - let db = TurboPersistence::open_with_parallel_scheduler( + let db = TurboPersistence::<_, 1>::open_with_parallel_scheduler( path.to_path_buf(), RayonParallelScheduler, )?; @@ -701,7 +702,7 @@ fn partial_compaction() -> Result<()> { println!("Restore check"); { - let db = TurboPersistence::open_with_parallel_scheduler( + let db = TurboPersistence::<_, 1>::open_with_parallel_scheduler( path.to_path_buf(), RayonParallelScheduler, )?; @@ -742,7 +743,7 @@ fn merge_file_removal() -> Result<()> { } Ok(()) } - fn check(db: &TurboPersistence, key: u8, value: u32) -> Result<()> { + fn check(db: &TurboPersistence, key: u8, value: u32) -> Result<()> { for i in 0..READ_COUNT { // read every 10th item let i = i * 10; @@ -760,11 +761,11 @@ fn merge_file_removal() -> Result<()> { { println!("--- Init ---"); - let db = TurboPersistence::open_with_parallel_scheduler( + let db = TurboPersistence::<_, 1>::open_with_parallel_scheduler( path.to_path_buf(), RayonParallelScheduler, )?; - let b = db.write_batch::<_, 1>()?; + let b = db.write_batch()?; for j in 0..=255 { put(&b, j, 0)?; } @@ -779,11 +780,11 @@ fn merge_file_removal() -> Result<()> { let i = i * 37; println!("Add more entries"); { - let db = TurboPersistence::open_with_parallel_scheduler( + let db = TurboPersistence::<_, 1>::open_with_parallel_scheduler( path.to_path_buf(), RayonParallelScheduler, )?; - let b = db.write_batch::<_, 1>()?; + let b = db.write_batch()?; for j in iter_bits(i) { println!("Put {j} = {i}"); expected_values[j as usize] = i; @@ -800,7 +801,7 @@ fn merge_file_removal() -> Result<()> { println!("Compaction"); { - let db = TurboPersistence::open_with_parallel_scheduler( + let db = TurboPersistence::<_, 1>::open_with_parallel_scheduler( path.to_path_buf(), RayonParallelScheduler, )?; @@ -821,7 +822,7 @@ fn merge_file_removal() -> Result<()> { println!("Restore check"); { - let db = TurboPersistence::open_with_parallel_scheduler( + let db = TurboPersistence::<_, 1>::open_with_parallel_scheduler( path.to_path_buf(), RayonParallelScheduler, )?; diff --git a/turbopack/crates/turbo-persistence/src/write_batch.rs b/turbopack/crates/turbo-persistence/src/write_batch.rs index 4426f8795af44..10c51233e50d9 100644 --- a/turbopack/crates/turbo-persistence/src/write_batch.rs +++ b/turbopack/crates/turbo-persistence/src/write_batch.rs @@ -267,8 +267,11 @@ impl /// Finishes the write batch by returning the new sequence number and the new SST files. This /// writes all outstanding thread local data to disk. - #[tracing::instrument(level = "trace", skip(self))] - pub(crate) fn finish(&mut self) -> Result { + #[tracing::instrument(level = "trace", skip_all)] + pub(crate) fn finish( + &mut self, + get_accessed_key_hashes: impl Fn(u32) -> qfilter::Filter + Send + Sync, + ) -> Result { let mut new_blob_files = Vec::new(); // First, we flush all thread local collectors to the global collectors. @@ -343,7 +346,7 @@ impl }, )?; - // Not we need to write the new meta files. + // Now we need to write the new meta files. let new_meta_collectors = [(); FAMILIES].map(|_| Mutex::new(Vec::new())); let meta_collectors = replace(&mut self.meta_collectors, new_meta_collectors); let keys_written = AtomicU64::new(0); @@ -366,6 +369,8 @@ impl builder.add(seq, sst); } keys_written.fetch_add(entries, Ordering::Relaxed); + let accessed_key_hashes = get_accessed_key_hashes(family); + builder.set_used_key_hashes_amqf(accessed_key_hashes); let seq = self.current_sequence_number.fetch_add(1, Ordering::SeqCst) + 1; let file = builder.write(&self.db_path, seq)?; Ok((seq, file)) diff --git a/turbopack/crates/turbo-tasks-backend/src/database/turbo/mod.rs b/turbopack/crates/turbo-tasks-backend/src/database/turbo/mod.rs index f4414f90099cc..aa7de939553e9 100644 --- a/turbopack/crates/turbo-tasks-backend/src/database/turbo/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/database/turbo/mod.rs @@ -21,6 +21,9 @@ use crate::database::{ mod parallel_scheduler; +/// Number of key families, see KeySpace enum for their numbers. +const FAMILIES: usize = 5; + const MB: u64 = 1024 * 1024; const COMPACT_CONFIG: CompactConfig = CompactConfig { min_merge_count: 3, @@ -33,7 +36,7 @@ const COMPACT_CONFIG: CompactConfig = CompactConfig { }; pub struct TurboKeyValueDatabase { - db: Arc>, + db: Arc>, compact_join_handle: Mutex>>>, is_ci: bool, is_short_session: bool, @@ -129,7 +132,7 @@ impl KeyValueDatabase for TurboKeyValueDatabase { } fn do_compact( - db: &TurboPersistence, + db: &TurboPersistence, message: &'static str, max_merge_segment_count: usize, ) -> Result<()> { @@ -151,8 +154,9 @@ fn do_compact( } pub struct TurboWriteBatch<'a> { - batch: turbo_persistence::WriteBatch, TurboTasksParallelScheduler, 5>, - db: &'a Arc>, + batch: + turbo_persistence::WriteBatch, TurboTasksParallelScheduler, FAMILIES>, + db: &'a Arc>, compact_join_handle: Option<&'a Mutex>>>>, } From cf65d4c15f599a97f4d98fbcc22e1508ba835477 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Fri, 31 Oct 2025 16:50:36 +0100 Subject: [PATCH 02/13] During compaction separate used and unused entries --- turbopack/crates/turbo-persistence/src/db.rs | 196 +++++++++++------- .../crates/turbo-persistence/src/meta_file.rs | 14 ++ 2 files changed, 134 insertions(+), 76 deletions(-) diff --git a/turbopack/crates/turbo-persistence/src/db.rs b/turbopack/crates/turbo-persistence/src/db.rs index 777ed03dd013e..573e1fbe59658 100644 --- a/turbopack/crates/turbo-persistence/src/db.rs +++ b/turbopack/crates/turbo-persistence/src/db.rs @@ -859,15 +859,7 @@ impl TurboPersistence }) .collect::>(); - let families = ssts_with_ranges - .iter() - .map(|s| s.range.family) - .max() - .unwrap() as usize - + 1; - - let mut sst_by_family = Vec::with_capacity(families); - sst_by_family.resize_with(families, Vec::new); + let mut sst_by_family = [(); FAMILIES].map(|_| Vec::new()); for sst in ssts_with_ranges { sst_by_family[sst.range.family as usize].push(sst); @@ -902,6 +894,22 @@ impl TurboPersistence }) .collect::>(); + let mut used_key_hashes = [(); FAMILIES].map(|_| Vec::new()); + + { + for &(family, ..) in merge_jobs.iter() { + used_key_hashes[family].extend( + meta_files + .iter() + .filter(|m| m.family() == family as u32) + .filter_map(|meta_file| { + meta_file.deserialize_used_key_hashes_amqf().transpose() + }) + .collect::>>()?, + ); + } + } + let result = self .parallel_scheduler .parallel_map_collect_owned::<_, _, Result>>( @@ -1016,55 +1024,73 @@ impl TurboPersistence let mut keys_written = 0; - let mut total_key_size = 0; - let mut total_value_size = 0; let mut current: Option> = None; - let mut entries = Vec::new(); - let mut last_entries = Vec::new(); - let mut last_entries_total_key_size = 0; + + #[derive(Default)] + struct Collector<'l> { + entries: Vec>, + total_key_size: usize, + total_value_size: usize, + last_entries: Vec>, + last_entries_total_key_size: usize, + } + let mut used_collector = Collector::default(); + let mut unused_collector = Collector::default(); for entry in iter { let entry = entry?; // Remove duplicates if let Some(current) = current.take() { if current.key != entry.key { + let is_used = used_key_hashes[family as usize] + .iter() + .any(|amqf| amqf.contains(current.hash)); + let collector = if is_used { + &mut used_collector + } else { + &mut unused_collector + }; let key_size = current.key.len(); let value_size = current.value.uncompressed_size_in_sst(); - total_key_size += key_size; - total_value_size += value_size; + collector.total_key_size += key_size; + collector.total_value_size += value_size; - if total_key_size + total_value_size + if collector.total_key_size + collector.total_value_size > DATA_THRESHOLD_PER_COMPACTED_FILE - || entries.len() >= MAX_ENTRIES_PER_COMPACTED_FILE + || collector.entries.len() + >= MAX_ENTRIES_PER_COMPACTED_FILE { let selected_total_key_size = - last_entries_total_key_size; - swap(&mut entries, &mut last_entries); - last_entries_total_key_size = - total_key_size - key_size; - total_key_size = key_size; - total_value_size = value_size; - - if !entries.is_empty() { + collector.last_entries_total_key_size; + swap( + &mut collector.entries, + &mut collector.last_entries, + ); + collector.last_entries_total_key_size = + collector.total_key_size - key_size; + collector.total_key_size = key_size; + collector.total_value_size = value_size; + + if !collector.entries.is_empty() { let seq = sequence_number .fetch_add(1, Ordering::SeqCst) + 1; - keys_written += entries.len() as u64; + keys_written += collector.entries.len() as u64; new_sst_files.push(create_sst_file( &self.parallel_scheduler, - &entries, + &collector.entries, selected_total_key_size, path, seq, )?); - entries.clear(); + collector.entries.clear(); } } - entries.push(current); + collector.entries.push(current); } else { // Override value } @@ -1072,57 +1098,75 @@ impl TurboPersistence current = Some(entry); } if let Some(entry) = current { - total_key_size += entry.key.len(); + let is_used = used_key_hashes[family as usize] + .iter() + .any(|amqf| amqf.contains(entry.hash)); + let collector = if is_used { + &mut used_collector + } else { + &mut unused_collector + }; + + collector.total_key_size += entry.key.len(); // Obsolete as we no longer need total_value_size // total_value_size += entry.value.uncompressed_size_in_sst(); - entries.push(entry); + collector.entries.push(entry); } // If we have one set of entries left, write them to a new SST file - if last_entries.is_empty() && !entries.is_empty() { - let seq = sequence_number.fetch_add(1, Ordering::SeqCst) + 1; - - keys_written += entries.len() as u64; - new_sst_files.push(create_sst_file( - &self.parallel_scheduler, - &entries, - total_key_size, - path, - seq, - )?); - } else - // If we have two sets of entries left, merge them and - // split it into two SST files, to avoid having a - // single SST file that is very small. - if !last_entries.is_empty() { - last_entries.append(&mut entries); - - last_entries_total_key_size += total_key_size; - - let (part1, part2) = - last_entries.split_at(last_entries.len() / 2); - - let seq1 = sequence_number.fetch_add(1, Ordering::SeqCst) + 1; - let seq2 = sequence_number.fetch_add(1, Ordering::SeqCst) + 1; - - keys_written += part1.len() as u64; - new_sst_files.push(create_sst_file( - &self.parallel_scheduler, - part1, - // We don't know the exact sizes so we estimate them - last_entries_total_key_size / 2, - path, - seq1, - )?); - - keys_written += part2.len() as u64; - new_sst_files.push(create_sst_file( - &self.parallel_scheduler, - part2, - last_entries_total_key_size / 2, - path, - seq2, - )?); + for collector in [&mut used_collector, &mut unused_collector] { + if collector.last_entries.is_empty() + && !collector.entries.is_empty() + { + let seq = + sequence_number.fetch_add(1, Ordering::SeqCst) + 1; + + keys_written += collector.entries.len() as u64; + new_sst_files.push(create_sst_file( + &self.parallel_scheduler, + &collector.entries, + collector.total_key_size, + path, + seq, + )?); + } else + // If we have two sets of entries left, merge them and + // split it into two SST files, to avoid having a + // single SST file that is very small. + if !collector.last_entries.is_empty() { + collector.last_entries.append(&mut collector.entries); + + collector.last_entries_total_key_size += + collector.total_key_size; + + let (part1, part2) = collector + .last_entries + .split_at(collector.last_entries.len() / 2); + + let seq1 = + sequence_number.fetch_add(1, Ordering::SeqCst) + 1; + let seq2 = + sequence_number.fetch_add(1, Ordering::SeqCst) + 1; + + keys_written += part1.len() as u64; + new_sst_files.push(create_sst_file( + &self.parallel_scheduler, + part1, + // We don't know the exact sizes so we estimate them + collector.last_entries_total_key_size / 2, + path, + seq1, + )?); + + keys_written += part2.len() as u64; + new_sst_files.push(create_sst_file( + &self.parallel_scheduler, + part2, + collector.last_entries_total_key_size / 2, + path, + seq2, + )?); + } } Ok(PartialMergeResult::Merged { new_sst_files, diff --git a/turbopack/crates/turbo-persistence/src/meta_file.rs b/turbopack/crates/turbo-persistence/src/meta_file.rs index 16b291bcd0e1a..721b5e48178db 100644 --- a/turbopack/crates/turbo-persistence/src/meta_file.rs +++ b/turbopack/crates/turbo-persistence/src/meta_file.rs @@ -283,6 +283,20 @@ impl MetaFile { &self.mmap } + pub fn deserialize_used_key_hashes_amqf(&self) -> Result> { + if self.start_of_used_keys_amqf_data_offset == self.end_of_used_keys_amqf_data_offset { + return Ok(None); + } + let amqf = &self.amqf_data()[self.start_of_used_keys_amqf_data_offset as usize + ..self.end_of_used_keys_amqf_data_offset as usize]; + Ok(Some(pot::from_slice(amqf).with_context(|| { + format!( + "Failed to deserialize used key hashes AMQF from {:08}.meta", + self.sequence_number + ) + })?)) + } + pub fn retain_entries(&mut self, mut predicate: impl FnMut(u32) -> bool) -> bool { let old_len = self.entries.len(); self.entries.retain(|entry| { From 015064b022918d02600b9bdfaa889cd5e456f1f1 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Wed, 5 Nov 2025 20:19:21 +0100 Subject: [PATCH 03/13] remove unneeded span --- turbopack/crates/turbo-persistence/src/compression.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/turbopack/crates/turbo-persistence/src/compression.rs b/turbopack/crates/turbo-persistence/src/compression.rs index 378d8b7cabec3..adfa37017ce03 100644 --- a/turbopack/crates/turbo-persistence/src/compression.rs +++ b/turbopack/crates/turbo-persistence/src/compression.rs @@ -3,7 +3,6 @@ use std::{mem::MaybeUninit, sync::Arc}; use anyhow::{Context, Result}; use lzzzz::lz4::{ACC_LEVEL_DEFAULT, decompress, decompress_with_dict}; -#[tracing::instrument(level = "trace", skip_all, name = "decompress database block")] pub fn decompress_into_arc( uncompressed_length: u32, block: &[u8], From bc25ff4cfad8ded47b61f381c5f80a95b84216f9 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Thu, 6 Nov 2025 09:05:19 +0100 Subject: [PATCH 04/13] track cold and hot status of SST files --- .../turbo-persistence-tools/src/main.rs | 4 +- turbopack/crates/turbo-persistence/README.md | 9 +++-- .../src/compaction/selector.rs | 10 ++++- turbopack/crates/turbo-persistence/src/db.rs | 37 ++++++++++--------- .../crates/turbo-persistence/src/meta_file.rs | 7 ++++ .../src/meta_file_builder.rs | 1 + .../src/static_sorted_file_builder.rs | 4 ++ .../turbo-persistence/src/write_batch.rs | 2 +- 8 files changed, 49 insertions(+), 25 deletions(-) diff --git a/turbopack/crates/turbo-persistence-tools/src/main.rs b/turbopack/crates/turbo-persistence-tools/src/main.rs index d23e257822e6f..183f9680cdad4 100644 --- a/turbopack/crates/turbo-persistence-tools/src/main.rs +++ b/turbopack/crates/turbo-persistence-tools/src/main.rs @@ -34,12 +34,14 @@ fn main() -> Result<()> { amqf_size, amqf_entries, sst_size, + cold, key_compression_dictionary_size, block_count, } in meta_file.entries { println!( - " SST {sequence_number:08}.sst: {min_hash:016x} - {max_hash:016x} (p = 1/{})", + " {} SST {sequence_number:08}.sst: {min_hash:016x} - {max_hash:016x} (p = 1/{})", + if cold { "COLD" } else { "HOT" }, u64::MAX / (max_hash - min_hash + 1) ); println!(" AMQF {amqf_entries} entries = {} KiB", amqf_size / 1024); diff --git a/turbopack/crates/turbo-persistence/README.md b/turbopack/crates/turbo-persistence/README.md index 7ee9d2bc09e2a..b5b4d67efea86 100644 --- a/turbopack/crates/turbo-persistence/README.md +++ b/turbopack/crates/turbo-persistence/README.md @@ -49,6 +49,7 @@ A meta file can contain metadata about multiple SST files. The metadata is store - 8 bytes min hash - 8 bytes max hash - 8 bytes SST file size + - 4 bytes flags (bit 0: cold) - 4 bytes end of AMQF offset relative to start of all AMQF data - 4 bytes end of AMQF offset relative to start of all AMQF data of the "used key hashes" AMQF - foreach described SST file @@ -171,7 +172,7 @@ Compaction chooses a few SST files and runs the merge step of merge sort on tham Example: -``` +``` text key hash range: | 0 ... u64::MAX | SST 1: |----------------| SST 2: |----------------| @@ -180,7 +181,7 @@ SST 3: |-----| can be compacted into: -``` +``` text key hash range: | 0 ... u64::MAX | SST 1': |-------| SST 2': |------| @@ -208,7 +209,7 @@ Full example: Example: -``` +``` text key hash range: | 0 ... u64::MAX | Family SST 1: |-| 1 SST 2: |----------------| 1 @@ -236,7 +237,7 @@ Then we delete SST files 2, 3, 6 and 4, 5, 8 and 7, 9. The SST files 1 stays unchanged. -``` +``` text key hash range: | 0 ... u64::MAX | Family SST 1: |-| 1 SST 10: |-----| 1 diff --git a/turbopack/crates/turbo-persistence/src/compaction/selector.rs b/turbopack/crates/turbo-persistence/src/compaction/selector.rs index 9357ac08afdd1..6b7975ff26dbf 100644 --- a/turbopack/crates/turbo-persistence/src/compaction/selector.rs +++ b/turbopack/crates/turbo-persistence/src/compaction/selector.rs @@ -37,6 +37,7 @@ fn extend_range(a: &mut RangeInclusive, b: &RangeInclusive) -> bool { extended } +#[cfg(test)] #[derive(Debug)] pub struct CompactableMetrics { /// The total coverage of the compactables. @@ -53,6 +54,7 @@ pub struct CompactableMetrics { } /// Computes metrics about the compactables. +#[cfg(test)] pub fn compute_metrics( compactables: &[T], full_range: RangeInclusive, @@ -167,6 +169,7 @@ impl DuplicationInfo { } /// The estimated size (in bytes) of a database segment containing `range` keys. + #[cfg(test)] fn size(&self, range: &RangeInclusive) -> u64 { if self.total_size == 0 { return 0; @@ -633,13 +636,16 @@ mod tests { let new_metrics = compute_metrics(&containers, 0..=KEY_RANGE); println!( - "Compaction done: coverage: {} ({}), overlap: {} ({}), duplication: {} ({})", + "Compaction done: coverage: {} ({}), overlap: {} ({}), duplication: {} ({}), \ + duplicated_size: {} ({})", new_metrics.coverage, new_metrics.coverage - metrics.coverage, new_metrics.overlap, new_metrics.overlap - metrics.overlap, new_metrics.duplication, - new_metrics.duplication - metrics.duplication + new_metrics.duplication - metrics.duplication, + new_metrics.duplicated_size, + (new_metrics.duplicated_size as f32) - metrics.duplicated_size as f32, ); } else { println!("No compaction needed"); diff --git a/turbopack/crates/turbo-persistence/src/db.rs b/turbopack/crates/turbo-persistence/src/db.rs index 573e1fbe59658..a9a5f2a4a6b25 100644 --- a/turbopack/crates/turbo-persistence/src/db.rs +++ b/turbopack/crates/turbo-persistence/src/db.rs @@ -22,7 +22,7 @@ pub use crate::compaction::selector::CompactConfig; use crate::{ QueryKey, arc_slice::ArcSlice, - compaction::selector::{Compactable, compute_metrics, get_merge_segments}, + compaction::selector::{Compactable, get_merge_segments}, compression::decompress_into_arc, constants::{ AMQF_AVG_SIZE, AMQF_CACHE_SIZE, DATA_THRESHOLD_PER_COMPACTED_FILE, KEY_BLOCK_AVG_SIZE, @@ -552,7 +552,8 @@ impl TurboPersistence let seq = entry.sequence_number(); let range = entry.range(); let size = entry.size(); - (seq, range.min_hash, range.max_hash, size) + let cold = entry.cold(); + (seq, range.min_hash, range.max_hash, size, cold) }) .collect::>(); ( @@ -644,12 +645,13 @@ impl TurboPersistence writeln!(log, "Commit {seq:08} {keys_written} keys in {span:#}")?; writeln!(log, "FAM | META SEQ | SST SEQ | RANGE")?; for (meta_seq, family, ssts, obsolete) in new_meta_info { - for (seq, min, max, size) in ssts { + for (seq, min, max, size, cold) in ssts { writeln!( log, - "{family:3} | {meta_seq:08} | {seq:08} SST | {} ({} MiB)", + "{family:3} | {meta_seq:08} | {seq:08} SST | {} ({} MiB, {})", range_to_str(min, max), - size / 1024 / 1024 + size / 1024 / 1024, + if cold { "cold" } else { "warm" } )?; } for obsolete in obsolete.chunks(15) { @@ -927,8 +929,6 @@ impl TurboPersistence }); } - let metrics = compute_metrics(&ssts_with_ranges, 0..=u64::MAX); - // Later we will remove the merged files let sst_seq_numbers_to_delete = merge_jobs .iter() @@ -973,6 +973,7 @@ impl TurboPersistence .key_compression_dictionary_length(), block_count: entry.block_count(), size: entry.size(), + cold: entry.cold(), entries: 0, }; return Ok(PartialMergeResult::Move { @@ -987,6 +988,7 @@ impl TurboPersistence total_key_size: usize, path: &Path, seq: u32, + cold: bool, ) -> Result<(u32, File, StaticSortedFileBuilderMeta<'static>)> { let _span = @@ -996,6 +998,7 @@ impl TurboPersistence entries, total_key_size, &path.join(format!("{seq:08}.sst")), + cold, ) })?; Ok((seq, file, meta)) @@ -1084,6 +1087,7 @@ impl TurboPersistence selected_total_key_size, path, seq, + !is_used, )?); collector.entries.clear(); @@ -1114,7 +1118,9 @@ impl TurboPersistence } // If we have one set of entries left, write them to a new SST file - for collector in [&mut used_collector, &mut unused_collector] { + for (collector, cold) in + [(&mut used_collector, false), (&mut unused_collector, true)] + { if collector.last_entries.is_empty() && !collector.entries.is_empty() { @@ -1128,6 +1134,7 @@ impl TurboPersistence collector.total_key_size, path, seq, + cold, )?); } else // If we have two sets of entries left, merge them and @@ -1156,6 +1163,7 @@ impl TurboPersistence collector.last_entries_total_key_size / 2, path, seq1, + cold, )?); keys_written += part2.len() as u64; @@ -1165,6 +1173,7 @@ impl TurboPersistence collector.last_entries_total_key_size / 2, path, seq2, + cold, )?); } } @@ -1210,15 +1219,7 @@ impl TurboPersistence self.parallel_scheduler.block_in_place(|| { let guard = log_mutex.lock(); let mut log = self.open_log()?; - writeln!( - log, - "{family:3} | {meta_seq:08} | Compaction (coverage: {}, overlap: {}, \ - duplication: {} / {} MiB):", - metrics.coverage, - metrics.overlap, - metrics.duplication, - metrics.duplicated_size / 1024 / 1024 - )?; + writeln!(log, "{family:3} | {meta_seq:08} | Compaction:",)?; for result in merge_result { match result { PartialMergeResult::Merged { @@ -1413,6 +1414,7 @@ impl TurboPersistence min_hash: entry.min_hash(), max_hash: entry.max_hash(), sst_size: entry.size(), + cold: entry.cold(), amqf_size: entry.amqf_size(), amqf_entries: amqf.len(), key_compression_dictionary_size: entry @@ -1477,6 +1479,7 @@ pub struct MetaFileEntryInfo { pub amqf_size: u32, pub amqf_entries: usize, pub sst_size: u64, + pub cold: bool, pub key_compression_dictionary_size: u16, pub block_count: u16, } diff --git a/turbopack/crates/turbo-persistence/src/meta_file.rs b/turbopack/crates/turbo-persistence/src/meta_file.rs index 721b5e48178db..7604558253a7d 100644 --- a/turbopack/crates/turbo-persistence/src/meta_file.rs +++ b/turbopack/crates/turbo-persistence/src/meta_file.rs @@ -42,6 +42,8 @@ pub struct MetaEntry { max_hash: u64, /// The size of the SST file in bytes. size: u64, + /// If the file is cold or warm. + cold: bool, /// The offset of the start of the AMQF data in the meta file relative to the end of the /// header. start_of_amqf_data_offset: u32, @@ -64,6 +66,10 @@ impl MetaEntry { self.size } + pub fn cold(&self) -> bool { + self.cold + } + pub fn amqf_size(&self) -> u32 { self.end_of_amqf_data_offset - self.start_of_amqf_data_offset } @@ -230,6 +236,7 @@ impl MetaFile { min_hash: file.read_u64::()?, max_hash: file.read_u64::()?, size: file.read_u64::()?, + cold: file.read_u32::()? != 0, start_of_amqf_data_offset, end_of_amqf_data_offset: file.read_u32::()?, amqf: OnceLock::new(), diff --git a/turbopack/crates/turbo-persistence/src/meta_file_builder.rs b/turbopack/crates/turbo-persistence/src/meta_file_builder.rs index 1e34c8f75cd5b..157a847d1cb77 100644 --- a/turbopack/crates/turbo-persistence/src/meta_file_builder.rs +++ b/turbopack/crates/turbo-persistence/src/meta_file_builder.rs @@ -70,6 +70,7 @@ impl<'a> MetaFileBuilder<'a> { file.write_u64::(sst.min_hash)?; file.write_u64::(sst.max_hash)?; file.write_u64::(sst.size)?; + file.write_u32::(if sst.cold { 1 } else { 0 })?; amqf_offset += sst.amqf.len(); file.write_u32::(amqf_offset as u32)?; } diff --git a/turbopack/crates/turbo-persistence/src/static_sorted_file_builder.rs b/turbopack/crates/turbo-persistence/src/static_sorted_file_builder.rs index 66d0d043ce25f..5110be7da994e 100644 --- a/turbopack/crates/turbo-persistence/src/static_sorted_file_builder.rs +++ b/turbopack/crates/turbo-persistence/src/static_sorted_file_builder.rs @@ -88,6 +88,8 @@ pub struct StaticSortedFileBuilderMeta<'a> { pub block_count: u16, /// The file size of the SST file pub size: u64, + /// If the file is cold or warm + pub cold: bool, /// The number of entries in the SST file pub entries: u64, } @@ -96,6 +98,7 @@ pub fn write_static_stored_file( entries: &[E], total_key_size: usize, file: &Path, + cold: bool, ) -> Result<(StaticSortedFileBuilderMeta<'static>, File)> { debug_assert!(entries.iter().map(|e| e.key_hash()).is_sorted()); @@ -141,6 +144,7 @@ pub fn write_static_stored_file( key_compression_dictionary_length: key_dict.len().try_into().unwrap(), block_count, size: file.stream_position()?, + cold, entries: entries.len() as u64, }; Ok((meta, file.into_inner()?)) diff --git a/turbopack/crates/turbo-persistence/src/write_batch.rs b/turbopack/crates/turbo-persistence/src/write_batch.rs index 10c51233e50d9..0b0b3da1ac4c3 100644 --- a/turbopack/crates/turbo-persistence/src/write_batch.rs +++ b/turbopack/crates/turbo-persistence/src/write_batch.rs @@ -420,7 +420,7 @@ impl let path = self.db_path.join(format!("{seq:08}.sst")); let (meta, file) = self .parallel_scheduler - .block_in_place(|| write_static_stored_file(entries, total_key_size, &path)) + .block_in_place(|| write_static_stored_file(entries, total_key_size, &path, true)) .with_context(|| format!("Unable to write SST file {seq:08}.sst"))?; #[cfg(feature = "verify_sst_content")] From 304da23323bf1cd31f3282b0851f840fec6ebf81 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Thu, 6 Nov 2025 09:41:34 +0100 Subject: [PATCH 05/13] fixup --- turbopack/crates/turbo-persistence/src/db.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/turbopack/crates/turbo-persistence/src/db.rs b/turbopack/crates/turbo-persistence/src/db.rs index a9a5f2a4a6b25..00c4c6856cd1e 100644 --- a/turbopack/crates/turbo-persistence/src/db.rs +++ b/turbopack/crates/turbo-persistence/src/db.rs @@ -1045,9 +1045,10 @@ impl TurboPersistence // Remove duplicates if let Some(current) = current.take() { if current.key != entry.key { - let is_used = used_key_hashes[family as usize] - .iter() - .any(|amqf| amqf.contains(current.hash)); + let is_used = + used_key_hashes[family as usize].iter().any( + |amqf| amqf.contains_fingerprint(current.hash), + ); let collector = if is_used { &mut used_collector } else { @@ -1104,7 +1105,7 @@ impl TurboPersistence if let Some(entry) = current { let is_used = used_key_hashes[family as usize] .iter() - .any(|amqf| amqf.contains(entry.hash)); + .any(|amqf| amqf.contains_fingerprint(entry.hash)); let collector = if is_used { &mut used_collector } else { From 61d66bd65c6b4d3805b32adfb74b906226969af2 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Thu, 6 Nov 2025 13:48:35 +0100 Subject: [PATCH 06/13] use more flags --- Cargo.lock | 1 + .../turbo-persistence-tools/src/main.rs | 6 +-- turbopack/crates/turbo-persistence/Cargo.toml | 1 + turbopack/crates/turbo-persistence/README.md | 4 +- turbopack/crates/turbo-persistence/src/db.rs | 38 +++++++++-------- .../crates/turbo-persistence/src/meta_file.rs | 41 ++++++++++++++++--- .../src/meta_file_builder.rs | 2 +- .../src/static_sorted_file_builder.rs | 9 ++-- .../turbo-persistence/src/write_batch.rs | 5 ++- 9 files changed, 75 insertions(+), 32 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e1037d83e1a0f..53b041469c54a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9106,6 +9106,7 @@ name = "turbo-persistence" version = "0.1.0" dependencies = [ "anyhow", + "bitfield", "byteorder", "dashmap 6.1.0", "either", diff --git a/turbopack/crates/turbo-persistence-tools/src/main.rs b/turbopack/crates/turbo-persistence-tools/src/main.rs index 183f9680cdad4..1467399722a11 100644 --- a/turbopack/crates/turbo-persistence-tools/src/main.rs +++ b/turbopack/crates/turbo-persistence-tools/src/main.rs @@ -34,14 +34,14 @@ fn main() -> Result<()> { amqf_size, amqf_entries, sst_size, - cold, + flags, key_compression_dictionary_size, block_count, } in meta_file.entries { println!( - " {} SST {sequence_number:08}.sst: {min_hash:016x} - {max_hash:016x} (p = 1/{})", - if cold { "COLD" } else { "HOT" }, + " SST {sequence_number:08}.sst: {} {min_hash:016x} - {max_hash:016x} (p = 1/{})", + flags, u64::MAX / (max_hash - min_hash + 1) ); println!(" AMQF {amqf_entries} entries = {} KiB", amqf_size / 1024); diff --git a/turbopack/crates/turbo-persistence/Cargo.toml b/turbopack/crates/turbo-persistence/Cargo.toml index f5d36f510fb41..e25a637f0ce04 100644 --- a/turbopack/crates/turbo-persistence/Cargo.toml +++ b/turbopack/crates/turbo-persistence/Cargo.toml @@ -14,6 +14,7 @@ verbose_log = [] [dependencies] anyhow = { workspace = true } +bitfield = { workspace = true } dashmap = { workspace = true} either = { workspace = true } pot = "3.0.0" diff --git a/turbopack/crates/turbo-persistence/README.md b/turbopack/crates/turbo-persistence/README.md index b5b4d67efea86..fc50fa2c6352d 100644 --- a/turbopack/crates/turbo-persistence/README.md +++ b/turbopack/crates/turbo-persistence/README.md @@ -49,7 +49,9 @@ A meta file can contain metadata about multiple SST files. The metadata is store - 8 bytes min hash - 8 bytes max hash - 8 bytes SST file size - - 4 bytes flags (bit 0: cold) + - 4 bytes flags + - bit 0: cold (compacted and not recently accessed) + - bit 1: fresh (not yet compacted) - 4 bytes end of AMQF offset relative to start of all AMQF data - 4 bytes end of AMQF offset relative to start of all AMQF data of the "used key hashes" AMQF - foreach described SST file diff --git a/turbopack/crates/turbo-persistence/src/db.rs b/turbopack/crates/turbo-persistence/src/db.rs index 00c4c6856cd1e..48dfc33a1f39a 100644 --- a/turbopack/crates/turbo-persistence/src/db.rs +++ b/turbopack/crates/turbo-persistence/src/db.rs @@ -32,7 +32,7 @@ use crate::{ key::{StoreKey, hash_key}, lookup_entry::{LookupEntry, LookupValue}, merge_iter::MergeIter, - meta_file::{AmqfCache, MetaFile, MetaLookupResult, StaticSortedFileRange}, + meta_file::{AmqfCache, MetaEntryFlags, MetaFile, MetaLookupResult, StaticSortedFileRange}, meta_file_builder::MetaFileBuilder, parallel_scheduler::ParallelScheduler, sst_filter::SstFilter, @@ -552,8 +552,8 @@ impl TurboPersistence let seq = entry.sequence_number(); let range = entry.range(); let size = entry.size(); - let cold = entry.cold(); - (seq, range.min_hash, range.max_hash, size, cold) + let flags = entry.flags(); + (seq, range.min_hash, range.max_hash, size, flags) }) .collect::>(); ( @@ -645,13 +645,13 @@ impl TurboPersistence writeln!(log, "Commit {seq:08} {keys_written} keys in {span:#}")?; writeln!(log, "FAM | META SEQ | SST SEQ | RANGE")?; for (meta_seq, family, ssts, obsolete) in new_meta_info { - for (seq, min, max, size, cold) in ssts { + for (seq, min, max, size, flags) in ssts { writeln!( log, "{family:3} | {meta_seq:08} | {seq:08} SST | {} ({} MiB, {})", range_to_str(min, max), size / 1024 / 1024, - if cold { "cold" } else { "warm" } + flags )?; } for obsolete in obsolete.chunks(15) { @@ -973,7 +973,7 @@ impl TurboPersistence .key_compression_dictionary_length(), block_count: entry.block_count(), size: entry.size(), - cold: entry.cold(), + flags: entry.flags(), entries: 0, }; return Ok(PartialMergeResult::Move { @@ -988,7 +988,7 @@ impl TurboPersistence total_key_size: usize, path: &Path, seq: u32, - cold: bool, + flags: MetaEntryFlags, ) -> Result<(u32, File, StaticSortedFileBuilderMeta<'static>)> { let _span = @@ -998,7 +998,7 @@ impl TurboPersistence entries, total_key_size, &path.join(format!("{seq:08}.sst")), - cold, + flags, ) })?; Ok((seq, file, meta)) @@ -1082,13 +1082,16 @@ impl TurboPersistence + 1; keys_written += collector.entries.len() as u64; + + let mut flags = MetaEntryFlags::default(); + flags.set_cold(!is_used); new_sst_files.push(create_sst_file( &self.parallel_scheduler, &collector.entries, selected_total_key_size, path, seq, - !is_used, + flags, )?); collector.entries.clear(); @@ -1119,9 +1122,10 @@ impl TurboPersistence } // If we have one set of entries left, write them to a new SST file - for (collector, cold) in - [(&mut used_collector, false), (&mut unused_collector, true)] - { + for (collector, flags) in [ + (&mut used_collector, MetaEntryFlags::WARM), + (&mut unused_collector, MetaEntryFlags::COLD), + ] { if collector.last_entries.is_empty() && !collector.entries.is_empty() { @@ -1135,7 +1139,7 @@ impl TurboPersistence collector.total_key_size, path, seq, - cold, + flags, )?); } else // If we have two sets of entries left, merge them and @@ -1164,7 +1168,7 @@ impl TurboPersistence collector.last_entries_total_key_size / 2, path, seq1, - cold, + flags, )?); keys_written += part2.len() as u64; @@ -1174,7 +1178,7 @@ impl TurboPersistence collector.last_entries_total_key_size / 2, path, seq2, - cold, + flags, )?); } } @@ -1415,7 +1419,7 @@ impl TurboPersistence min_hash: entry.min_hash(), max_hash: entry.max_hash(), sst_size: entry.size(), - cold: entry.cold(), + flags: entry.flags(), amqf_size: entry.amqf_size(), amqf_entries: amqf.len(), key_compression_dictionary_size: entry @@ -1480,7 +1484,7 @@ pub struct MetaFileEntryInfo { pub amqf_size: u32, pub amqf_entries: usize, pub sst_size: u64, - pub cold: bool, + pub flags: MetaEntryFlags, pub key_compression_dictionary_size: u16, pub block_count: u16, } diff --git a/turbopack/crates/turbo-persistence/src/meta_file.rs b/turbopack/crates/turbo-persistence/src/meta_file.rs index 7604558253a7d..afc79d7256068 100644 --- a/turbopack/crates/turbo-persistence/src/meta_file.rs +++ b/turbopack/crates/turbo-persistence/src/meta_file.rs @@ -1,4 +1,5 @@ use std::{ + fmt::Display, fs::File, hash::BuildHasherDefault, io::{BufReader, Seek}, @@ -8,6 +9,7 @@ use std::{ }; use anyhow::{Context, Result, bail}; +use bitfield::bitfield; use byteorder::{BE, ReadBytesExt}; use either::Either; use memmap2::{Mmap, MmapOptions}; @@ -31,6 +33,35 @@ impl quick_cache::Weighter> for AmqfWeighter { pub type AmqfCache = quick_cache::sync::Cache, AmqfWeighter, BuildHasherDefault>; +bitfield! { + #[derive(Clone, Copy, Default)] + pub struct MetaEntryFlags(u32); + impl Debug; + impl From; + /// The SST file was compacted and none of the entries have been accessed recently. + pub cold, set_cold: 0; + /// The SST file was freshly written and has not been compacted yet. + pub fresh, set_fresh: 1; +} + +impl MetaEntryFlags { + pub const FRESH: MetaEntryFlags = MetaEntryFlags(0b10); + pub const COLD: MetaEntryFlags = MetaEntryFlags(0b01); + pub const WARM: MetaEntryFlags = MetaEntryFlags(0b00); +} + +impl Display for MetaEntryFlags { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + if self.fresh() { + write!(f, "fresh") + } else if self.cold() { + write!(f, "cold") + } else { + write!(f, "warm") + } + } +} + pub struct MetaEntry { /// The metadata for the static sorted file. sst_data: StaticSortedFileMetaData, @@ -42,8 +73,8 @@ pub struct MetaEntry { max_hash: u64, /// The size of the SST file in bytes. size: u64, - /// If the file is cold or warm. - cold: bool, + /// The status flags for this entry. + flags: MetaEntryFlags, /// The offset of the start of the AMQF data in the meta file relative to the end of the /// header. start_of_amqf_data_offset: u32, @@ -66,8 +97,8 @@ impl MetaEntry { self.size } - pub fn cold(&self) -> bool { - self.cold + pub fn flags(&self) -> MetaEntryFlags { + self.flags } pub fn amqf_size(&self) -> u32 { @@ -236,7 +267,7 @@ impl MetaFile { min_hash: file.read_u64::()?, max_hash: file.read_u64::()?, size: file.read_u64::()?, - cold: file.read_u32::()? != 0, + flags: MetaEntryFlags(file.read_u32::()?), start_of_amqf_data_offset, end_of_amqf_data_offset: file.read_u32::()?, amqf: OnceLock::new(), diff --git a/turbopack/crates/turbo-persistence/src/meta_file_builder.rs b/turbopack/crates/turbo-persistence/src/meta_file_builder.rs index 157a847d1cb77..ea68769589213 100644 --- a/turbopack/crates/turbo-persistence/src/meta_file_builder.rs +++ b/turbopack/crates/turbo-persistence/src/meta_file_builder.rs @@ -70,7 +70,7 @@ impl<'a> MetaFileBuilder<'a> { file.write_u64::(sst.min_hash)?; file.write_u64::(sst.max_hash)?; file.write_u64::(sst.size)?; - file.write_u32::(if sst.cold { 1 } else { 0 })?; + file.write_u32::(sst.flags.0)?; amqf_offset += sst.amqf.len(); file.write_u32::(amqf_offset as u32)?; } diff --git a/turbopack/crates/turbo-persistence/src/static_sorted_file_builder.rs b/turbopack/crates/turbo-persistence/src/static_sorted_file_builder.rs index 5110be7da994e..89278ed5e323d 100644 --- a/turbopack/crates/turbo-persistence/src/static_sorted_file_builder.rs +++ b/turbopack/crates/turbo-persistence/src/static_sorted_file_builder.rs @@ -11,6 +11,7 @@ use byteorder::{BE, ByteOrder, WriteBytesExt}; use crate::{ compression::compress_into_buffer, + meta_file::MetaEntryFlags, static_sorted_file::{ BLOCK_TYPE_INDEX, BLOCK_TYPE_KEY, KEY_BLOCK_ENTRY_TYPE_BLOB, KEY_BLOCK_ENTRY_TYPE_DELETED, KEY_BLOCK_ENTRY_TYPE_MEDIUM, KEY_BLOCK_ENTRY_TYPE_SMALL, @@ -88,8 +89,8 @@ pub struct StaticSortedFileBuilderMeta<'a> { pub block_count: u16, /// The file size of the SST file pub size: u64, - /// If the file is cold or warm - pub cold: bool, + /// The status flags for this SST file + pub flags: MetaEntryFlags, /// The number of entries in the SST file pub entries: u64, } @@ -98,7 +99,7 @@ pub fn write_static_stored_file( entries: &[E], total_key_size: usize, file: &Path, - cold: bool, + flags: MetaEntryFlags, ) -> Result<(StaticSortedFileBuilderMeta<'static>, File)> { debug_assert!(entries.iter().map(|e| e.key_hash()).is_sorted()); @@ -144,7 +145,7 @@ pub fn write_static_stored_file( key_compression_dictionary_length: key_dict.len().try_into().unwrap(), block_count, size: file.stream_position()?, - cold, + flags, entries: entries.len() as u64, }; Ok((meta, file.into_inner()?)) diff --git a/turbopack/crates/turbo-persistence/src/write_batch.rs b/turbopack/crates/turbo-persistence/src/write_batch.rs index 0b0b3da1ac4c3..81ca849151b9b 100644 --- a/turbopack/crates/turbo-persistence/src/write_batch.rs +++ b/turbopack/crates/turbo-persistence/src/write_batch.rs @@ -21,6 +21,7 @@ use crate::{ compression::compress_into_buffer, constants::{MAX_MEDIUM_VALUE_SIZE, THREAD_LOCAL_SIZE_SHIFT}, key::StoreKey, + meta_file::MetaEntryFlags, meta_file_builder::MetaFileBuilder, parallel_scheduler::ParallelScheduler, static_sorted_file_builder::{StaticSortedFileBuilderMeta, write_static_stored_file}, @@ -420,7 +421,9 @@ impl let path = self.db_path.join(format!("{seq:08}.sst")); let (meta, file) = self .parallel_scheduler - .block_in_place(|| write_static_stored_file(entries, total_key_size, &path, true)) + .block_in_place(|| { + write_static_stored_file(entries, total_key_size, &path, MetaEntryFlags::FRESH) + }) .with_context(|| format!("Unable to write SST file {seq:08}.sst"))?; #[cfg(feature = "verify_sst_content")] From 4842084ab56d809c5753f6333c27ed59f3d83c09 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Thu, 6 Nov 2025 16:22:40 +0100 Subject: [PATCH 07/13] fixup --- turbopack/crates/turbo-persistence/src/db.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/turbopack/crates/turbo-persistence/src/db.rs b/turbopack/crates/turbo-persistence/src/db.rs index 48dfc33a1f39a..aa53c4de5bd88 100644 --- a/turbopack/crates/turbo-persistence/src/db.rs +++ b/turbopack/crates/turbo-persistence/src/db.rs @@ -470,7 +470,7 @@ impl TurboPersistence let set = &inner.accessed_key_hashes[family as usize]; // len is only a snapshot at that time and it can change while we create the filter. // So we give it 5% more space to make resizes less likely. - let initial_capacity = set.len() * 19 / 20; + let initial_capacity = set.len() * 20 / 19; let mut amqf = qfilter::Filter::with_fingerprint_size(initial_capacity as u64, u64::BITS as u8) .unwrap(); From aa9acb124c6235a88e189a62b54bbdcfc43e52cd Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Mon, 10 Nov 2025 12:13:44 +0100 Subject: [PATCH 08/13] improve log --- turbopack/crates/turbo-persistence/src/db.rs | 15 +++++++++------ .../crates/turbo-persistence/src/meta_file.rs | 6 +++--- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/turbopack/crates/turbo-persistence/src/db.rs b/turbopack/crates/turbo-persistence/src/db.rs index aa53c4de5bd88..a350410897530 100644 --- a/turbopack/crates/turbo-persistence/src/db.rs +++ b/turbopack/crates/turbo-persistence/src/db.rs @@ -643,7 +643,7 @@ impl TurboPersistence writeln!(log, "Time {time}")?; let span = time.until(Timestamp::now())?; writeln!(log, "Commit {seq:08} {keys_written} keys in {span:#}")?; - writeln!(log, "FAM | META SEQ | SST SEQ | RANGE")?; + writeln!(log, "FAM | META SEQ | SST SEQ | RANGE")?; for (meta_seq, family, ssts, obsolete) in new_meta_info { for (seq, min, max, size, flags) in ssts { writeln!( @@ -703,7 +703,7 @@ impl TurboPersistence #[cfg(feature = "verbose_log")] { writeln!(log, "New database state:")?; - writeln!(log, "FAM | META SEQ | SST SEQ | RANGE")?; + writeln!(log, "FAM | META SEQ | SST SEQ FLAGS | RANGE")?; let inner = self.inner.read(); let families = inner.meta_files.iter().map(|meta| meta.family()).filter({ let mut set = HashSet::new(); @@ -720,7 +720,8 @@ impl TurboPersistence let range = entry.range(); writeln!( log, - "{family:3} | {meta_seq:08} | {seq:08} | {}", + "{family:3} | {meta_seq:08} | {seq:08} {:>6} | {}", + entry.flags(), range_to_str(range.min_hash, range.max_hash) )?; } @@ -1243,7 +1244,7 @@ impl TurboPersistence let (min, max) = ssts_with_ranges[*i].range().into_inner(); writeln!( log, - "{family:3} | {meta_seq:08} | {seq:08} INPUT | {} ", + "{family:3} | {meta_seq:08} | {seq:08} INPUT | {}", range_to_str(min, max) )?; } @@ -1252,8 +1253,10 @@ impl TurboPersistence let max = meta.max_hash; writeln!( log, - "{family:3} | {meta_seq:08} | {seq:08} OUTPUT | {}", - range_to_str(min, max) + "{family:3} | {meta_seq:08} | {seq:08} OUTPUT | {} \ + ({})", + range_to_str(min, max), + meta.flags )?; meta_file_builder.add(seq, meta); diff --git a/turbopack/crates/turbo-persistence/src/meta_file.rs b/turbopack/crates/turbo-persistence/src/meta_file.rs index afc79d7256068..7f279c0aec614 100644 --- a/turbopack/crates/turbo-persistence/src/meta_file.rs +++ b/turbopack/crates/turbo-persistence/src/meta_file.rs @@ -53,11 +53,11 @@ impl MetaEntryFlags { impl Display for MetaEntryFlags { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { if self.fresh() { - write!(f, "fresh") + f.pad_integral(true, "", "fresh") } else if self.cold() { - write!(f, "cold") + f.pad_integral(true, "", "cold") } else { - write!(f, "warm") + f.pad_integral(true, "", "warm") } } } From 37dae29f17e805e186ad50ce6fd05c5f498462a6 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Mon, 10 Nov 2025 12:23:59 +0100 Subject: [PATCH 09/13] sort cold before warm sst files --- turbopack/crates/turbo-persistence/src/db.rs | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/turbopack/crates/turbo-persistence/src/db.rs b/turbopack/crates/turbo-persistence/src/db.rs index a350410897530..c808eef536ed9 100644 --- a/turbopack/crates/turbo-persistence/src/db.rs +++ b/turbopack/crates/turbo-persistence/src/db.rs @@ -3,7 +3,7 @@ use std::{ collections::HashSet, fs::{self, File, OpenOptions, ReadDir}, io::{BufWriter, Write}, - mem::swap, + mem::{swap, take}, ops::RangeInclusive, path::{Path, PathBuf}, sync::atomic::{AtomicBool, AtomicU32, Ordering}, @@ -1005,8 +1005,6 @@ impl TurboPersistence Ok((seq, file, meta)) } - let mut new_sst_files = Vec::new(); - // Iterate all SST files let iters = indicies .iter() @@ -1037,6 +1035,8 @@ impl TurboPersistence total_value_size: usize, last_entries: Vec>, last_entries_total_key_size: usize, + new_sst_files: + Vec<(u32, File, StaticSortedFileBuilderMeta<'static>)>, } let mut used_collector = Collector::default(); let mut unused_collector = Collector::default(); @@ -1086,7 +1086,7 @@ impl TurboPersistence let mut flags = MetaEntryFlags::default(); flags.set_cold(!is_used); - new_sst_files.push(create_sst_file( + collector.new_sst_files.push(create_sst_file( &self.parallel_scheduler, &collector.entries, selected_total_key_size, @@ -1134,7 +1134,7 @@ impl TurboPersistence sequence_number.fetch_add(1, Ordering::SeqCst) + 1; keys_written += collector.entries.len() as u64; - new_sst_files.push(create_sst_file( + collector.new_sst_files.push(create_sst_file( &self.parallel_scheduler, &collector.entries, collector.total_key_size, @@ -1162,7 +1162,7 @@ impl TurboPersistence sequence_number.fetch_add(1, Ordering::SeqCst) + 1; keys_written += part1.len() as u64; - new_sst_files.push(create_sst_file( + collector.new_sst_files.push(create_sst_file( &self.parallel_scheduler, part1, // We don't know the exact sizes so we estimate them @@ -1173,7 +1173,7 @@ impl TurboPersistence )?); keys_written += part2.len() as u64; - new_sst_files.push(create_sst_file( + collector.new_sst_files.push(create_sst_file( &self.parallel_scheduler, part2, collector.last_entries_total_key_size / 2, @@ -1183,6 +1183,8 @@ impl TurboPersistence )?); } } + let mut new_sst_files = take(&mut unused_collector.new_sst_files); + new_sst_files.append(&mut used_collector.new_sst_files); Ok(PartialMergeResult::Merged { new_sst_files, blob_seq_numbers_to_delete, From 04e1e5c5482ffa14e863336b777322583f38b3ba Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Mon, 10 Nov 2025 12:28:00 +0100 Subject: [PATCH 10/13] add TODO --- turbopack/crates/turbo-persistence/src/db.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/turbopack/crates/turbo-persistence/src/db.rs b/turbopack/crates/turbo-persistence/src/db.rs index c808eef536ed9..b452b0a93a2f2 100644 --- a/turbopack/crates/turbo-persistence/src/db.rs +++ b/turbopack/crates/turbo-persistence/src/db.rs @@ -1102,6 +1102,7 @@ impl TurboPersistence collector.entries.push(current); } else { // Override value + // TODO delete blob file } } current = Some(entry); From e3ffe8069df38e19edcbf472dbbe7fb6e9a6a617 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Wed, 12 Nov 2025 15:23:23 +0100 Subject: [PATCH 11/13] fix compaction for cold and warm files --- .../src/compaction/selector.rs | 27 ++++++++++++++----- turbopack/crates/turbo-persistence/src/db.rs | 6 +++++ 2 files changed, 26 insertions(+), 7 deletions(-) diff --git a/turbopack/crates/turbo-persistence/src/compaction/selector.rs b/turbopack/crates/turbo-persistence/src/compaction/selector.rs index 6b7975ff26dbf..1e134f3b336fe 100644 --- a/turbopack/crates/turbo-persistence/src/compaction/selector.rs +++ b/turbopack/crates/turbo-persistence/src/compaction/selector.rs @@ -1,5 +1,6 @@ use std::ops::RangeInclusive; +use rustc_hash::FxHashMap; use smallvec::{SmallVec, smallvec}; use crate::compaction::interval_map::IntervalMap; @@ -12,6 +13,12 @@ pub trait Compactable { /// The size of the compactable database segment in bytes. fn size(&self) -> u64; + + /// The category of the compactable. Overlap between different categories is not considered for + /// compaction. + fn category(&self) -> u8 { + 0 + } } fn is_overlapping(a: &RangeInclusive, b: &RangeInclusive) -> bool { @@ -157,7 +164,7 @@ impl DuplicationInfo { /// Get a value in the range `0..=u64` that represents the estimated amount of duplication /// across the given range. The units are arbitrary, but linear. fn duplication(&self, range: &RangeInclusive) -> u64 { - if self.total_size == 0 { + if self.max_size == self.total_size { return 0; } // the maximum numerator value is `u64::MAX + 1` @@ -193,11 +200,14 @@ impl DuplicationInfo { } } -fn total_duplication_size(duplication: &IntervalMap>) -> u64 { +fn total_duplication_size(duplication: &IntervalMap>) -> u64 { duplication .iter() - .flat_map(|(range, info)| Some((range, info.as_ref()?))) - .map(|(range, info)| info.duplication(&range)) + .map(|(range, info)| { + info.values() + .map(|info| info.duplication(&range)) + .sum::() + }) .sum() } @@ -241,16 +251,18 @@ pub fn get_merge_segments( } let start_compactable_range = start_compactable.range(); let start_compactable_size = start_compactable.size(); + let start_compactable_category = start_compactable.category(); let mut current_range = start_compactable_range.clone(); // We might need to restart the search if we need to extend the range. 'search: loop { let mut current_set = smallvec![start_index]; let mut current_size = start_compactable_size; - let mut duplication = IntervalMap::>::new(); + let mut duplication = IntervalMap::>::new(); duplication.update(start_compactable_range.clone(), |dup_info| { dup_info - .get_or_insert_default() + .entry(start_compactable_category) + .or_default() .add(start_compactable_size, &start_compactable_range); }); let mut current_skip = 0; @@ -340,8 +352,9 @@ pub fn get_merge_segments( // set. current_set.push(next_index); current_size += size; + let category = compactable.category(); duplication.update(range.clone(), |dup_info| { - dup_info.get_or_insert_default().add(size, &range); + dup_info.entry(category).or_default().add(size, &range); }); } } diff --git a/turbopack/crates/turbo-persistence/src/db.rs b/turbopack/crates/turbo-persistence/src/db.rs index b452b0a93a2f2..37f58caf63df8 100644 --- a/turbopack/crates/turbo-persistence/src/db.rs +++ b/turbopack/crates/turbo-persistence/src/db.rs @@ -833,6 +833,7 @@ impl TurboPersistence seq: u32, range: StaticSortedFileRange, size: u64, + flags: MetaEntryFlags, } impl Compactable for SstWithRange { @@ -843,6 +844,10 @@ impl TurboPersistence fn size(&self) -> u64 { self.size } + + fn category(&self) -> u8 { + if self.flags.cold() { 1 } else { 0 } + } } let ssts_with_ranges = meta_files @@ -858,6 +863,7 @@ impl TurboPersistence seq: entry.sequence_number(), range: entry.range(), size: entry.size(), + flags: entry.flags(), }) }) .collect::>(); From 01eb7744ddb08e9b9b9e8f88e7220d5bfb0767f8 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Wed, 12 Nov 2025 20:28:56 +0100 Subject: [PATCH 12/13] Avoid compacting a fresh database since we don't have any usage info yet --- .../src/database/turbo/mod.rs | 33 +++++++++++-------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/turbopack/crates/turbo-tasks-backend/src/database/turbo/mod.rs b/turbopack/crates/turbo-tasks-backend/src/database/turbo/mod.rs index aa7de939553e9..e8501ff485219 100644 --- a/turbopack/crates/turbo-tasks-backend/src/database/turbo/mod.rs +++ b/turbopack/crates/turbo-tasks-backend/src/database/turbo/mod.rs @@ -40,6 +40,7 @@ pub struct TurboKeyValueDatabase { compact_join_handle: Mutex>>>, is_ci: bool, is_short_session: bool, + is_fresh: bool, } impl TurboKeyValueDatabase { @@ -50,6 +51,7 @@ impl TurboKeyValueDatabase { compact_join_handle: Mutex::new(None), is_ci, is_short_session, + is_fresh: db.is_empty(), }) } } @@ -111,20 +113,23 @@ impl KeyValueDatabase for TurboKeyValueDatabase { join_handle.join()?; } // Compact the database on shutdown - if self.is_ci { - // Fully compact in CI to reduce cache size - do_compact( - &self.db, - "Finished filesystem cache database compaction", - usize::MAX, - )?; - } else { - // Compact with a reasonable limit in non-CI environments - do_compact( - &self.db, - "Finished filesystem cache database compaction", - available_parallelism().map_or(4, |c| max(4, c.get())), - )?; + // (Avoid compacting a fresh database since we don't have any usage info yet) + if !self.is_fresh { + if self.is_ci { + // Fully compact in CI to reduce cache size + do_compact( + &self.db, + "Finished filesystem cache database compaction", + usize::MAX, + )?; + } else { + // Compact with a reasonable limit in non-CI environments + do_compact( + &self.db, + "Finished filesystem cache database compaction", + available_parallelism().map_or(4, |c| max(4, c.get())), + )?; + } } // Shutdown the database self.db.shutdown() From ae9462561735983658b8ba324a50c591a146b850 Mon Sep 17 00:00:00 2001 From: Tobias Koppers Date: Mon, 1 Dec 2025 11:28:44 +0100 Subject: [PATCH 13/13] review fixes --- turbopack/crates/turbo-persistence-tools/src/main.rs | 4 ++-- turbopack/crates/turbo-persistence/src/db.rs | 3 +++ 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/turbopack/crates/turbo-persistence-tools/src/main.rs b/turbopack/crates/turbo-persistence-tools/src/main.rs index 1467399722a11..759242caea19a 100644 --- a/turbopack/crates/turbo-persistence-tools/src/main.rs +++ b/turbopack/crates/turbo-persistence-tools/src/main.rs @@ -40,8 +40,8 @@ fn main() -> Result<()> { } in meta_file.entries { println!( - " SST {sequence_number:08}.sst: {} {min_hash:016x} - {max_hash:016x} (p = 1/{})", - flags, + " SST {sequence_number:08}.sst: {flags} {min_hash:016x} - {max_hash:016x} (p = \ + 1/{})", u64::MAX / (max_hash - min_hash + 1) ); println!(" AMQF {amqf_entries} entries = {} KiB", amqf_size / 1024); diff --git a/turbopack/crates/turbo-persistence/src/db.rs b/turbopack/crates/turbo-persistence/src/db.rs index 37f58caf63df8..e44941b414339 100644 --- a/turbopack/crates/turbo-persistence/src/db.rs +++ b/turbopack/crates/turbo-persistence/src/db.rs @@ -846,6 +846,8 @@ impl TurboPersistence } fn category(&self) -> u8 { + // Cold and non-cold files are placed separately so we pass different category + // values to ensure they are not merged together. if self.flags.cold() { 1 } else { 0 } } } @@ -1335,6 +1337,7 @@ impl TurboPersistence /// Get a value from the database. Returns None if the key is not found. The returned value /// might hold onto a block of the database and it should not be hold long-term. pub fn get(&self, family: usize, key: &K) -> Result>> { + debug_assert!(family < FAMILIES, "Family index out of bounds"); let hash = hash_key(key); let inner = self.inner.read(); for meta in inner.meta_files.iter().rev() {