diff --git a/turbopack/crates/turbo-persistence/Cargo.toml b/turbopack/crates/turbo-persistence/Cargo.toml index a7901664f5f73..4f0b035eb4e8f 100644 --- a/turbopack/crates/turbo-persistence/Cargo.toml +++ b/turbopack/crates/turbo-persistence/Cargo.toml @@ -10,6 +10,7 @@ verify_sst_content = [] strict_checks = [] stats = ["quick_cache/stats"] print_stats = ["stats"] +verbose_log = [] [dependencies] anyhow = { workspace = true } diff --git a/turbopack/crates/turbo-persistence/src/db.rs b/turbopack/crates/turbo-persistence/src/db.rs index 0635965e26576..c2d3eea23e14e 100644 --- a/turbopack/crates/turbo-persistence/src/db.rs +++ b/turbopack/crates/turbo-persistence/src/db.rs @@ -14,6 +14,7 @@ use byteorder::{BE, ReadBytesExt, WriteBytesExt}; use jiff::Timestamp; use memmap2::Mmap; use parking_lot::{Mutex, RwLock}; +use smallvec::SmallVec; pub use crate::compaction::selector::CompactConfig; use crate::{ @@ -476,7 +477,7 @@ impl TurboPersistence { &self, CommitOptions { mut new_meta_files, - mut new_sst_files, + new_sst_files, mut new_blob_files, mut sst_seq_numbers_to_delete, mut blob_seq_numbers_to_delete, @@ -611,35 +612,88 @@ impl TurboPersistence { writeln!(log, "Time {time}")?; let span = time.until(Timestamp::now())?; writeln!(log, "Commit {seq:08} {keys_written} keys in {span:#}")?; - for (seq, family, ssts, obsolete) in new_meta_info { - writeln!(log, "{seq:08} META family:{family}",)?; + writeln!(log, "FAM | META SEQ | SST SEQ | RANGE")?; + for (meta_seq, family, ssts, obsolete) in new_meta_info { for (seq, min, max, size) in ssts { writeln!( log, - " {seq:08} SST {min:016x}-{max:016x} {} MiB", + "{family:3} | {meta_seq:08} | {seq:08} SST | {} ({} MiB)", + range_to_str(min, max), size / 1024 / 1024 )?; } - for seq in obsolete { - writeln!(log, " {seq:08} OBSOLETE SST")?; + for obsolete in obsolete.chunks(15) { + write!(log, "{family:3} | {meta_seq:08} |")?; + for seq in obsolete { + write!(log, " {seq:08}")?; + } + writeln!(log, " OBSOLETE SST")?; } } - new_sst_files.sort_unstable_by_key(|(seq, _)| *seq); - for (seq, _) in new_sst_files.iter() { - writeln!(log, "{seq:08} NEW SST")?; + + fn write_seq_numbers( + log: &mut W, + items: I, + label: &str, + extract_seq: fn(&T) -> u32, + ) -> std::io::Result<()> + where + I: IntoIterator, + { + let items: Vec = items.into_iter().collect(); + for chunk in items.chunks(15) { + write!(log, " | |")?; + for item in chunk { + write!(log, " {:08}", extract_seq(item))?; + } + writeln!(log, " {}", label)?; + } + Ok(()) } + new_blob_files.sort_unstable_by_key(|(seq, _)| *seq); - for (seq, _) in new_blob_files.iter() { - writeln!(log, "{seq:08} NEW BLOB")?; - } - for seq in sst_seq_numbers_to_delete.iter() { - writeln!(log, "{seq:08} SST DELETED")?; - } - for seq in meta_seq_numbers_to_delete.iter() { - writeln!(log, "{seq:08} META DELETED")?; - } - for seq in blob_seq_numbers_to_delete.iter() { - writeln!(log, "{seq:08} BLOB DELETED")?; + write_seq_numbers(&mut log, new_blob_files, "NEW BLOB", |&(seq, _)| seq)?; + write_seq_numbers( + &mut log, + blob_seq_numbers_to_delete, + "BLOB DELETED", + |&seq| seq, + )?; + write_seq_numbers(&mut log, sst_seq_numbers_to_delete, "SST DELETED", |&seq| { + seq + })?; + write_seq_numbers( + &mut log, + meta_seq_numbers_to_delete, + "META DELETED", + |&seq| seq, + )?; + #[cfg(feature = "verbose_log")] + { + writeln!(log, "New database state:")?; + writeln!(log, "FAM | META SEQ | SST SEQ | RANGE")?; + let inner = self.inner.read(); + let families = inner.meta_files.iter().map(|meta| meta.family()).filter({ + let mut set = HashSet::new(); + move |family| set.insert(*family) + }); + for family in families { + for meta in inner.meta_files.iter() { + if meta.family() != family { + continue; + } + let meta_seq = meta.sequence_number(); + for entry in meta.entries().iter() { + let seq = entry.sequence_number(); + let range = entry.range(); + writeln!( + log, + "{family:3} | {meta_seq:08} | {seq:08} | {}", + range_to_str(range.min_hash, range.max_hash) + )?; + } + } + } } } anyhow::Ok(()) @@ -835,30 +889,7 @@ impl TurboPersistence { }); } - self.parallel_scheduler.block_in_place(|| { - let metrics = compute_metrics(&ssts_with_ranges, 0..=u64::MAX); - let guard = log_mutex.lock(); - let mut log = self.open_log()?; - writeln!( - log, - "Compaction for family {family} (coverage: {}, overlap: {}, \ - duplication: {} / {} MiB):", - metrics.coverage, - metrics.overlap, - metrics.duplication, - metrics.duplicated_size / 1024 / 1024 - )?; - for job in merge_jobs.iter() { - writeln!(log, " merge")?; - for i in job.iter() { - let seq = ssts_with_ranges[*i].seq; - let (min, max) = ssts_with_ranges[*i].range().into_inner(); - writeln!(log, " {seq:08} {min:016x}-{max:016x}")?; - } - } - drop(guard); - anyhow::Ok(()) - })?; + let metrics = compute_metrics(&ssts_with_ranges, 0..=u64::MAX); // Later we will remove the merged files let sst_seq_numbers_to_delete = merge_jobs @@ -875,6 +906,7 @@ impl TurboPersistence { new_sst_files: Vec<(u32, File, StaticSortedFileBuilderMeta<'static>)>, blob_seq_numbers_to_delete: Vec, keys_written: u64, + indicies: SmallVec<[usize; 1]>, }, Move { seq: u32, @@ -883,185 +915,193 @@ impl TurboPersistence { } let merge_result = self .parallel_scheduler - .parallel_map_collect_owned::<_, _, Result>>(merge_jobs, |indices| { - let _span = span.clone().entered(); - if indices.len() == 1 { - // If we only have one file, we can just move it - let index = indices[0]; - let meta_index = ssts_with_ranges[index].meta_index; - let index_in_meta = ssts_with_ranges[index].index_in_meta; - let meta_file = &meta_files[meta_index]; - let entry = meta_file.entry(index_in_meta); - let amqf = Cow::Borrowed(entry.raw_amqf(meta_file.amqf_data())); - let meta = StaticSortedFileBuilderMeta { - min_hash: entry.min_hash(), - max_hash: entry.max_hash(), - amqf, - key_compression_dictionary_length: entry - .key_compression_dictionary_length(), - block_count: entry.block_count(), - size: entry.size(), - entries: 0, - }; - return Ok(PartialMergeResult::Move { - seq: entry.sequence_number(), - meta, - }); - } - - fn create_sst_file<'l, S: ParallelScheduler>( - parallel_scheduler: &S, - entries: &[LookupEntry<'l>], - total_key_size: usize, - path: &Path, - seq: u32, - ) -> Result<(u32, File, StaticSortedFileBuilderMeta<'static>)> - { - let _span = tracing::trace_span!("write merged sst file").entered(); - let (meta, file) = parallel_scheduler.block_in_place(|| { - write_static_stored_file( - entries, - total_key_size, - &path.join(format!("{seq:08}.sst")), - ) - })?; - Ok((seq, file, meta)) - } - - let mut new_sst_files = Vec::new(); - - // Iterate all SST files - let iters = indices - .iter() - .map(|&index| { + .parallel_map_collect_owned::<_, _, Result>>( + merge_jobs, + |indicies| { + let _span = span.clone().entered(); + if indicies.len() == 1 { + // If we only have one file, we can just move it + let index = indicies[0]; let meta_index = ssts_with_ranges[index].meta_index; let index_in_meta = ssts_with_ranges[index].index_in_meta; - let meta = &meta_files[meta_index]; - meta.entry(index_in_meta) - .sst(meta)? - .iter(key_block_cache, value_block_cache) - }) - .collect::>>()?; - - let iter = MergeIter::new(iters.into_iter())?; - - // TODO figure out how to delete blobs when they are no longer - // referenced - let blob_seq_numbers_to_delete: Vec = Vec::new(); - - let mut keys_written = 0; - - let mut total_key_size = 0; - let mut total_value_size = 0; - let mut current: Option> = None; - let mut entries = Vec::new(); - let mut last_entries = Vec::new(); - let mut last_entries_total_key_size = 0; - for entry in iter { - let entry = entry?; - - // Remove duplicates - if let Some(current) = current.take() { - if current.key != entry.key { - let key_size = current.key.len(); - let value_size = current.value.uncompressed_size_in_sst(); - total_key_size += key_size; - total_value_size += value_size; - - if total_key_size + total_value_size - > DATA_THRESHOLD_PER_COMPACTED_FILE - || entries.len() >= MAX_ENTRIES_PER_COMPACTED_FILE - { - let selected_total_key_size = - last_entries_total_key_size; - swap(&mut entries, &mut last_entries); - last_entries_total_key_size = total_key_size - key_size; - total_key_size = key_size; - total_value_size = value_size; - - if !entries.is_empty() { - let seq = sequence_number - .fetch_add(1, Ordering::SeqCst) - + 1; - - keys_written += entries.len() as u64; - new_sst_files.push(create_sst_file( - &self.parallel_scheduler, - &entries, - selected_total_key_size, - path, - seq, - )?); - - entries.clear(); + let meta_file = &meta_files[meta_index]; + let entry = meta_file.entry(index_in_meta); + let amqf = Cow::Borrowed(entry.raw_amqf(meta_file.amqf_data())); + let meta = StaticSortedFileBuilderMeta { + min_hash: entry.min_hash(), + max_hash: entry.max_hash(), + amqf, + key_compression_dictionary_length: entry + .key_compression_dictionary_length(), + block_count: entry.block_count(), + size: entry.size(), + entries: 0, + }; + return Ok(PartialMergeResult::Move { + seq: entry.sequence_number(), + meta, + }); + } + + fn create_sst_file<'l, S: ParallelScheduler>( + parallel_scheduler: &S, + entries: &[LookupEntry<'l>], + total_key_size: usize, + path: &Path, + seq: u32, + ) -> Result<(u32, File, StaticSortedFileBuilderMeta<'static>)> + { + let _span = + tracing::trace_span!("write merged sst file").entered(); + let (meta, file) = parallel_scheduler.block_in_place(|| { + write_static_stored_file( + entries, + total_key_size, + &path.join(format!("{seq:08}.sst")), + ) + })?; + Ok((seq, file, meta)) + } + + let mut new_sst_files = Vec::new(); + + // Iterate all SST files + let iters = indicies + .iter() + .map(|&index| { + let meta_index = ssts_with_ranges[index].meta_index; + let index_in_meta = ssts_with_ranges[index].index_in_meta; + let meta = &meta_files[meta_index]; + meta.entry(index_in_meta) + .sst(meta)? + .iter(key_block_cache, value_block_cache) + }) + .collect::>>()?; + + let iter = MergeIter::new(iters.into_iter())?; + + // TODO figure out how to delete blobs when they are no longer + // referenced + let blob_seq_numbers_to_delete: Vec = Vec::new(); + + let mut keys_written = 0; + + let mut total_key_size = 0; + let mut total_value_size = 0; + let mut current: Option> = None; + let mut entries = Vec::new(); + let mut last_entries = Vec::new(); + let mut last_entries_total_key_size = 0; + for entry in iter { + let entry = entry?; + + // Remove duplicates + if let Some(current) = current.take() { + if current.key != entry.key { + let key_size = current.key.len(); + let value_size = + current.value.uncompressed_size_in_sst(); + total_key_size += key_size; + total_value_size += value_size; + + if total_key_size + total_value_size + > DATA_THRESHOLD_PER_COMPACTED_FILE + || entries.len() >= MAX_ENTRIES_PER_COMPACTED_FILE + { + let selected_total_key_size = + last_entries_total_key_size; + swap(&mut entries, &mut last_entries); + last_entries_total_key_size = + total_key_size - key_size; + total_key_size = key_size; + total_value_size = value_size; + + if !entries.is_empty() { + let seq = sequence_number + .fetch_add(1, Ordering::SeqCst) + + 1; + + keys_written += entries.len() as u64; + new_sst_files.push(create_sst_file( + &self.parallel_scheduler, + &entries, + selected_total_key_size, + path, + seq, + )?); + + entries.clear(); + } } - } - entries.push(current); - } else { - // Override value + entries.push(current); + } else { + // Override value + } } + current = Some(entry); + } + if let Some(entry) = current { + total_key_size += entry.key.len(); + // Obsolete as we no longer need total_value_size + // total_value_size += entry.value.uncompressed_size_in_sst(); + entries.push(entry); } - current = Some(entry); - } - if let Some(entry) = current { - total_key_size += entry.key.len(); - // Obsolete as we no longer need total_value_size - // total_value_size += entry.value.uncompressed_size_in_sst(); - entries.push(entry); - } - // If we have one set of entries left, write them to a new SST file - if last_entries.is_empty() && !entries.is_empty() { - let seq = sequence_number.fetch_add(1, Ordering::SeqCst) + 1; - - keys_written += entries.len() as u64; - new_sst_files.push(create_sst_file( - &self.parallel_scheduler, - &entries, - total_key_size, - path, - seq, - )?); - } else - // If we have two sets of entries left, merge them and - // split it into two SST files, to avoid having a - // single SST file that is very small. - if !last_entries.is_empty() { - last_entries.append(&mut entries); - - last_entries_total_key_size += total_key_size; - - let (part1, part2) = last_entries.split_at(last_entries.len() / 2); - - let seq1 = sequence_number.fetch_add(1, Ordering::SeqCst) + 1; - let seq2 = sequence_number.fetch_add(1, Ordering::SeqCst) + 1; - - keys_written += part1.len() as u64; - new_sst_files.push(create_sst_file( - &self.parallel_scheduler, - part1, - // We don't know the exact sizes so we estimate them - last_entries_total_key_size / 2, - path, - seq1, - )?); - - keys_written += part2.len() as u64; - new_sst_files.push(create_sst_file( - &self.parallel_scheduler, - part2, - last_entries_total_key_size / 2, - path, - seq2, - )?); - } - Ok(PartialMergeResult::Merged { - new_sst_files, - blob_seq_numbers_to_delete, - keys_written, - }) - }) + // If we have one set of entries left, write them to a new SST file + if last_entries.is_empty() && !entries.is_empty() { + let seq = sequence_number.fetch_add(1, Ordering::SeqCst) + 1; + + keys_written += entries.len() as u64; + new_sst_files.push(create_sst_file( + &self.parallel_scheduler, + &entries, + total_key_size, + path, + seq, + )?); + } else + // If we have two sets of entries left, merge them and + // split it into two SST files, to avoid having a + // single SST file that is very small. + if !last_entries.is_empty() { + last_entries.append(&mut entries); + + last_entries_total_key_size += total_key_size; + + let (part1, part2) = + last_entries.split_at(last_entries.len() / 2); + + let seq1 = sequence_number.fetch_add(1, Ordering::SeqCst) + 1; + let seq2 = sequence_number.fetch_add(1, Ordering::SeqCst) + 1; + + keys_written += part1.len() as u64; + new_sst_files.push(create_sst_file( + &self.parallel_scheduler, + part1, + // We don't know the exact sizes so we estimate them + last_entries_total_key_size / 2, + path, + seq1, + )?); + + keys_written += part2.len() as u64; + new_sst_files.push(create_sst_file( + &self.parallel_scheduler, + part2, + last_entries_total_key_size / 2, + path, + seq2, + )?); + } + Ok(PartialMergeResult::Merged { + new_sst_files, + blob_seq_numbers_to_delete, + keys_written, + indicies, + }) + }, + ) .with_context(|| { format!("Failed to merge database files for family {family}") })?; @@ -1072,6 +1112,7 @@ impl TurboPersistence { if let PartialMergeResult::Merged { new_sst_files, blob_seq_numbers_to_delete, + indicies: _, keys_written: _, } = r { @@ -1088,43 +1129,91 @@ impl TurboPersistence { let mut new_sst_files = Vec::with_capacity(sst_files_len); let mut blob_seq_numbers_to_delete = Vec::with_capacity(blob_delete_len); + let meta_seq = sequence_number.fetch_add(1, Ordering::SeqCst) + 1; let mut meta_file_builder = MetaFileBuilder::new(family); let mut keys_written = 0; - for result in merge_result { - match result { - PartialMergeResult::Merged { - new_sst_files: merged_new_sst_files, - blob_seq_numbers_to_delete: merged_blob_seq_numbers_to_delete, - keys_written: merged_keys_written, - } => { - for (seq, file, meta) in merged_new_sst_files { + self.parallel_scheduler.block_in_place(|| { + let guard = log_mutex.lock(); + let mut log = self.open_log()?; + writeln!( + log, + "{family:3} | {meta_seq:08} | Compaction (coverage: {}, overlap: {}, \ + duplication: {} / {} MiB):", + metrics.coverage, + metrics.overlap, + metrics.duplication, + metrics.duplicated_size / 1024 / 1024 + )?; + for result in merge_result { + match result { + PartialMergeResult::Merged { + new_sst_files: merged_new_sst_files, + blob_seq_numbers_to_delete: merged_blob_seq_numbers_to_delete, + keys_written: merged_keys_written, + indicies, + } => { + writeln!( + log, + "{family:3} | {meta_seq:08} | MERGE \ + ({merged_keys_written} keys):" + )?; + for i in indicies.iter() { + let seq = ssts_with_ranges[*i].seq; + let (min, max) = ssts_with_ranges[*i].range().into_inner(); + writeln!( + log, + "{family:3} | {meta_seq:08} | {seq:08} INPUT | {} ", + range_to_str(min, max) + )?; + } + for (seq, file, meta) in merged_new_sst_files { + let min = meta.min_hash; + let max = meta.max_hash; + writeln!( + log, + "{family:3} | {meta_seq:08} | {seq:08} OUTPUT | {}", + range_to_str(min, max) + )?; + + meta_file_builder.add(seq, meta); + new_sst_files.push((seq, file)); + } + blob_seq_numbers_to_delete + .extend(merged_blob_seq_numbers_to_delete); + keys_written += merged_keys_written; + } + PartialMergeResult::Move { seq, meta } => { + let min = meta.min_hash; + let max = meta.max_hash; + writeln!( + log, + "{family:3} | {meta_seq:08} | {seq:08} MOVED | {}", + range_to_str(min, max) + )?; + meta_file_builder.add(seq, meta); - new_sst_files.push((seq, file)); } - blob_seq_numbers_to_delete - .extend(merged_blob_seq_numbers_to_delete); - keys_written += merged_keys_written; - } - PartialMergeResult::Move { seq, meta } => { - meta_file_builder.add(seq, meta); } } - } + drop(log); + drop(guard); + + anyhow::Ok(()) + })?; for &seq in sst_seq_numbers_to_delete.iter() { meta_file_builder.add_obsolete_sst_file(seq); } - let seq = sequence_number.fetch_add(1, Ordering::SeqCst) + 1; let meta_file = { let _span = tracing::trace_span!("write meta file").entered(); self.parallel_scheduler - .block_in_place(|| meta_file_builder.write(&self.path, seq))? + .block_in_place(|| meta_file_builder.write(&self.path, meta_seq))? }; Ok(PartialResultPerFamily { - new_meta_file: Some((seq, meta_file)), + new_meta_file: Some((meta_seq, meta_file)), new_sst_files, sst_seq_numbers_to_delete, blob_seq_numbers_to_delete, @@ -1273,6 +1362,30 @@ impl TurboPersistence { } } +fn range_to_str(min: u64, max: u64) -> String { + use std::fmt::Write; + const DISPLAY_SIZE: usize = 100; + const TOTAL_SIZE: u64 = u64::MAX; + let start_pos = (min as u128 * DISPLAY_SIZE as u128 / TOTAL_SIZE as u128) as usize; + let end_pos = (max as u128 * DISPLAY_SIZE as u128 / TOTAL_SIZE as u128) as usize; + let mut range_str = String::new(); + for i in 0..DISPLAY_SIZE { + if i == start_pos && i == end_pos { + range_str.push('O'); + } else if i == start_pos { + range_str.push('['); + } else if i == end_pos { + range_str.push(']'); + } else if i > start_pos && i < end_pos { + range_str.push('='); + } else { + range_str.push(' '); + } + } + write!(range_str, " | {min:016x}-{max:016x}").unwrap(); + range_str +} + pub struct MetaFileInfo { pub sequence_number: u32, pub family: u32,