From b29ddb17d3918b01aeb36315d566ec9db33a4ab0 Mon Sep 17 00:00:00 2001 From: Justin Maier Date: Fri, 3 Apr 2026 15:02:09 -0600 Subject: [PATCH 01/91] feat: DocOp::Merge, DataSilo crate, config-driven UI, pipeline optimizations MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Multi-phase dump correctness: - DocOp::Merge variant: merges fields into existing docs instead of replacing - All dump phases use Merge for object-level writes (fixes data loss bug) - Tags post-pass: bitmap inversion writes one Merge per slot (4.5B→109M ops) - 10 unit tests for Merge semantics (roundtrip, accumulate, delete+resurrect) Pipeline performance (StreamingDocWriter fixes): - BufWriter 256→8192 bytes on new shard creation (2x throughput improvement) - Hardware CRC32 via crc32fast (replaces software byte-at-a-time table) - Remove per-shard fsync in finalize (saves 20-80s per phase) - Background enrichment drop (50s blocking → non-blocking) - Mmap explicit drop after parse (zombie RSS 83GB→24GB) DataSilo crate (crates/datasilo/): - Generic mmap'd key-value store: 35M writes/sec, 23M reads/sec - ParallelWriter with atomic bump + 1MB thread-local regions - OpsLog with CRC32 append + replay on startup - Compaction (replay ops → rewrite data file) - 6 unit tests passing Server endpoints: - POST /time-buckets/rebuild: rebuild from sort field data + cache clear - GET /dictionaries: reverse maps for LCS/MappedString fields - GET /ui-config: serves YAML as JSON for config-driven UI Config-driven UI (static/index.html): - Dynamic filter/sort controls from engine metadata + YAML overrides - Card rendering with image URL templates, badges, meta fields - Detail modal with configurable fields, display types, formats - URL state sync for bookmarkable/shareable filter states - Civitai UI config (deploy/configs/civitai/ui-config.yaml) Design docs: - docs/design/docop-merge.md (GPT + Gemini reviewed) - docs/design/datasilo-implementation-plan.md (full migration plan) Co-Authored-By: Claude Opus 4.6 (1M context) --- Cargo.lock | 23 + Cargo.toml | 2 +- crates/datasilo/Cargo.toml | 14 + crates/datasilo/src/lib.rs | 561 ++++++++++ crates/datasilo/src/ops_log.rs | 156 +++ deploy/configs/civitai/config.yaml | 90 ++ deploy/configs/civitai/ui-config.yaml | 133 +++ docs/design/datasilo-implementation-plan.md | 301 ++++++ docs/design/docop-merge.md | 186 ++++ scratch/Cargo.toml | 4 + scripts/dump-test.sh | 291 ++++++ src/concurrent_engine.rs | 66 ++ src/dump_processor.rs | 214 +++- src/field_handler.rs | 1 + src/server.rs | 128 +++ src/shard_store.rs | 10 +- src/shard_store_doc.rs | 330 +++++- static/index.html | 1016 ++++++++++--------- 18 files changed, 3004 insertions(+), 522 deletions(-) create mode 100644 crates/datasilo/Cargo.toml create mode 100644 crates/datasilo/src/lib.rs create mode 100644 crates/datasilo/src/ops_log.rs create mode 100644 deploy/configs/civitai/config.yaml create mode 100644 deploy/configs/civitai/ui-config.yaml create mode 100644 docs/design/datasilo-implementation-plan.md create mode 100644 docs/design/docop-merge.md create mode 100644 scripts/dump-test.sh diff --git a/Cargo.lock b/Cargo.lock index e649afa1..6a42f5bd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -568,6 +568,16 @@ dependencies = [ "parking_lot_core", ] +[[package]] +name = "datasilo" +version = "0.1.0" +dependencies = [ + "crc32fast", + "memmap2", + "parking_lot", + "tempfile", +] + [[package]] name = "der" version = "0.7.10" @@ -2003,6 +2013,15 @@ dependencies = [ "serde", ] +[[package]] +name = "rmpv" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a4e1d4b9b938a26d2996af33229f0ca0956c652c1375067f0b45291c1df8417" +dependencies = [ + "rmp", +] + [[package]] name = "roaring" version = "0.10.12" @@ -2165,11 +2184,15 @@ name = "scratch" version = "0.0.0" dependencies = [ "dashmap", + "datasilo", "memmap2", "parking_lot", "rand 0.8.5", "rayon", + "rmp-serde", + "rmpv", "roaring", + "tempfile", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index d8d5ac42..9ad62515 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace] -members = [".", "scratch"] +members = [".", "scratch", "crates/datasilo"] default-members = ["."] [package] diff --git a/crates/datasilo/Cargo.toml b/crates/datasilo/Cargo.toml new file mode 100644 index 00000000..59f18dc0 --- /dev/null +++ b/crates/datasilo/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "datasilo" +version = "0.1.0" +edition = "2021" +publish = false +description = "Generic mmap'd key-value store with append-only ops log" + +[dependencies] +memmap2 = "0.9" +crc32fast = "1" +parking_lot = "0.12" + +[dev-dependencies] +tempfile = "3" diff --git a/crates/datasilo/src/lib.rs b/crates/datasilo/src/lib.rs new file mode 100644 index 00000000..3c2ead8a --- /dev/null +++ b/crates/datasilo/src/lib.rs @@ -0,0 +1,561 @@ +//! DataSilo — Generic mmap'd key-value store with append-only ops log. +//! +//! Three components: +//! - **Index**: key → (offset, length) in the data file. Mmap'd dense array. +//! - **Data**: packed variable-size entries. Mmap'd. +//! - **Ops log**: append-only mutations with CRC32. Used for post-bulk-load changes. +//! +//! Write path (bulk): ParallelWriter → 35M entries/sec via mmap memcpy (32 threads) +//! Write path (ops): append to ops log → held in pending HashMap for reads +//! Read path: check pending → index lookup (mmap deref) → data read (mmap deref) +//! +//! Encoding is caller's responsibility — DataSilo stores raw `&[u8]`. + +use std::collections::HashMap; +use std::fs::{File, OpenOptions}; +use std::io::{self, Write}; +use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicU64, Ordering}; + +mod ops_log; + +pub use ops_log::{SiloOp, OpsLog}; + +// --------------------------------------------------------------------------- +// Index entry — 16 bytes per key +// --------------------------------------------------------------------------- + +#[derive(Clone, Copy, Debug, Default)] +#[repr(C)] +pub struct IndexEntry { + pub offset: u64, + pub length: u32, + pub allocated: u32, +} + +const INDEX_ENTRY_SIZE: usize = std::mem::size_of::(); // 16 + +// --------------------------------------------------------------------------- +// ParallelWriter — lock-free concurrent bulk writer +// --------------------------------------------------------------------------- + +pub struct ParallelWriter { + data_mmap: memmap2::MmapMut, + index_mmap: memmap2::MmapMut, + data_offset: AtomicU64, + index_count: u32, + entries_written: AtomicU64, +} + +unsafe impl Send for ParallelWriter {} +unsafe impl Sync for ParallelWriter {} + +/// Per-thread writer with 1MB sequential regions for OS prefetch. +pub struct ThreadWriter<'a> { + pw: &'a ParallelWriter, + cursor: usize, + region_end: usize, +} + +const REGION_SIZE: u64 = 1 << 20; // 1MB + +impl ParallelWriter { + // Raw accessors for benchmarks + pub fn data_offset_ref(&self) -> &AtomicU64 { &self.data_offset } + pub fn data_ptr(&self) -> *mut u8 { self.data_mmap.as_ptr() as *mut u8 } + pub fn data_len(&self) -> usize { self.data_mmap.len() } + pub fn index_ptr(&self) -> *mut u8 { self.index_mmap.as_ptr() as *mut u8 } + pub fn index_len(&self) -> usize { self.index_mmap.len() } + pub fn entries_counter(&self) -> &AtomicU64 { &self.entries_written } + + /// Write an entry. Thread-safe, lock-free. + #[inline] + pub fn write(&self, key: u32, data: &[u8]) -> Option { + let len = data.len() as u32; + if len == 0 || key >= self.index_count { return None; } + + let offset = self.data_offset.fetch_add(len as u64, Ordering::Relaxed); + let start = offset as usize; + let end = start + len as usize; + if end > self.data_mmap.len() { return None; } + + let dst = &self.data_mmap[start..end] as *const [u8] as *mut [u8]; + unsafe { (*dst).copy_from_slice(data); } + + let entry = IndexEntry { offset, length: len, allocated: len }; + let idx_pos = key as usize * INDEX_ENTRY_SIZE; + if idx_pos + INDEX_ENTRY_SIZE <= self.index_mmap.len() { + let bytes: [u8; INDEX_ENTRY_SIZE] = unsafe { std::mem::transmute(entry) }; + let dst = &self.index_mmap[idx_pos..idx_pos + INDEX_ENTRY_SIZE] as *const [u8] as *mut [u8]; + unsafe { (*dst).copy_from_slice(&bytes); } + } + + self.entries_written.fetch_add(1, Ordering::Relaxed); + Some(offset) + } + + /// Get a thread-local writer with 1MB sequential regions. + pub fn thread_writer(&self) -> ThreadWriter<'_> { + ThreadWriter { pw: self, cursor: 0, region_end: 0 } + } + + /// Finalize: flush mmaps, truncate data to actual size. + pub fn finish(self) -> io::Result<(u64, u64)> { + let count = self.entries_written.load(Ordering::Relaxed); + let data_used = self.data_offset.load(Ordering::Relaxed); + self.data_mmap.flush()?; + self.index_mmap.flush()?; + Ok((count, data_used)) + } +} + +impl<'a> ThreadWriter<'a> { + /// Write an entry using thread-local region (sequential, OS-prefetch friendly). + #[inline] + pub fn write(&mut self, key: u32, data: &[u8]) -> Option { + let len = data.len(); + if len == 0 || key >= self.pw.index_count { return None; } + + if self.cursor + len > self.region_end { + let start = self.pw.data_offset.fetch_add(REGION_SIZE, Ordering::Relaxed) as usize; + self.cursor = start; + self.region_end = start + REGION_SIZE as usize; + } + + let offset = self.cursor; + let end = offset + len; + if end > self.pw.data_mmap.len() { return None; } + + let dst = &self.pw.data_mmap[offset..end] as *const [u8] as *mut [u8]; + unsafe { (*dst).copy_from_slice(data); } + self.cursor = end; + + let entry = IndexEntry { offset: offset as u64, length: len as u32, allocated: len as u32 }; + let idx_pos = key as usize * INDEX_ENTRY_SIZE; + if idx_pos + INDEX_ENTRY_SIZE <= self.pw.index_mmap.len() { + let bytes: [u8; INDEX_ENTRY_SIZE] = unsafe { std::mem::transmute(entry) }; + let dst = &self.pw.index_mmap[idx_pos..idx_pos + INDEX_ENTRY_SIZE] as *const [u8] as *mut [u8]; + unsafe { (*dst).copy_from_slice(&bytes); } + } + + self.pw.entries_written.fetch_add(1, Ordering::Relaxed); + Some(offset as u64) + } +} + +// --------------------------------------------------------------------------- +// DataSilo — the main store +// --------------------------------------------------------------------------- + +pub struct SiloConfig { + pub buffer_ratio: f32, +} + +impl Default for SiloConfig { + fn default() -> Self { Self { buffer_ratio: 1.2 } } +} + +pub struct DataSilo { + path: PathBuf, + config: SiloConfig, + index_mmap: Option, + index_len: u32, + data_mmap: Option, + data_len: u64, + ops_log: parking_lot::Mutex, + pending: parking_lot::RwLock>>, +} + +// Send+Sync: MmapMut isn't Sync by default but we only write via ParallelWriter +// (disjoint regions) or single-threaded bulk_load. Reads are immutable. +unsafe impl Send for DataSilo {} +unsafe impl Sync for DataSilo {} + +impl DataSilo { + /// Open or create a DataSilo at the given directory. + pub fn open(path: &Path, config: SiloConfig) -> io::Result { + std::fs::create_dir_all(path)?; + let ops_log = OpsLog::open(&path.join("ops.log"))?; + + let mut silo = Self { + path: path.to_path_buf(), + config, + index_mmap: None, + index_len: 0, + data_mmap: None, + data_len: 0, + ops_log: parking_lot::Mutex::new(ops_log), + pending: parking_lot::RwLock::new(HashMap::new()), + }; + + silo.load_index()?; + silo.load_data()?; + silo.replay_ops()?; + Ok(silo) + } + + /// Create a parallel writer for bulk loading. Pre-allocates files. + /// Call `finish_parallel_write()` after all threads are done. + pub fn prepare_parallel_writer( + &mut self, + max_key: u32, + estimated_total_bytes: u64, + ) -> io::Result { + let data_path = self.path.join("data.bin"); + let index_path = self.path.join("index.bin"); + let index_count = max_key as usize + 1; + + let data_file = OpenOptions::new() + .create(true).read(true).write(true).open(&data_path)?; + data_file.set_len(estimated_total_bytes)?; + let data_mmap = unsafe { memmap2::MmapMut::map_mut(&data_file)? }; + + let index_size = (index_count * INDEX_ENTRY_SIZE) as u64; + let index_file = OpenOptions::new() + .create(true).read(true).write(true).open(&index_path)?; + index_file.set_len(index_size)?; + let index_mmap = unsafe { memmap2::MmapMut::map_mut(&index_file)? }; + + Ok(ParallelWriter { + data_mmap, + index_mmap, + data_offset: AtomicU64::new(0), + index_count: index_count as u32, + entries_written: AtomicU64::new(0), + }) + } + + /// Finalize after parallel write. Truncates data to actual size, loads mmaps for reads. + pub fn finish_parallel_write(&mut self, writer: ParallelWriter) -> io::Result { + let (count, data_used) = writer.finish()?; + + // Truncate data file to actual bytes used + let data_file = OpenOptions::new().write(true).open(self.path.join("data.bin"))?; + data_file.set_len(data_used)?; + drop(data_file); + + self.load_index()?; + self.load_data()?; + self.data_len = data_used; + + eprintln!("DataSilo: parallel write done — {} entries, {:.1}MB data, {:.1}MB index", + count, data_used as f64 / 1e6, + (self.index_len as usize * INDEX_ENTRY_SIZE) as f64 / 1e6); + Ok(count) + } + + /// Bulk load from an iterator (sequential, single-thread — use for small datasets). + pub fn bulk_load(&mut self, entries: I) -> io::Result + where I: Iterator)> + { + let data_path = self.path.join("data.bin"); + let mut data_file = io::BufWriter::with_capacity(1 << 20, File::create(&data_path)?); + let mut index_entries: Vec<(u32, IndexEntry)> = Vec::new(); + let mut offset: u64 = 0; + let mut count: u64 = 0; + let mut max_key: u32 = 0; + + for (key, value) in entries { + let len = value.len() as u32; + let allocated = (len as f32 * self.config.buffer_ratio).ceil() as u32; + data_file.write_all(&value)?; + if allocated > len { + let zeros = [0u8; 4096]; + let mut rem = (allocated - len) as usize; + while rem > 0 { let c = rem.min(4096); data_file.write_all(&zeros[..c])?; rem -= c; } + } + index_entries.push((key, IndexEntry { offset, length: len, allocated })); + offset += allocated as u64; + if key > max_key { max_key = key; } + count += 1; + } + data_file.flush()?; + drop(data_file); + + let index_count = max_key as usize + 1; + let index_path = self.path.join("index.bin"); + let index_file = OpenOptions::new() + .create(true).read(true).write(true).open(&index_path)?; + index_file.set_len((index_count * INDEX_ENTRY_SIZE) as u64)?; + let mut index_mmap = unsafe { memmap2::MmapMut::map_mut(&index_file)? }; + + for (key, entry) in &index_entries { + let pos = *key as usize * INDEX_ENTRY_SIZE; + if pos + INDEX_ENTRY_SIZE <= index_mmap.len() { + let bytes: [u8; INDEX_ENTRY_SIZE] = unsafe { std::mem::transmute(*entry) }; + index_mmap[pos..pos + INDEX_ENTRY_SIZE].copy_from_slice(&bytes); + } + } + index_mmap.flush()?; + self.index_mmap = Some(index_mmap); + self.index_len = index_count as u32; + self.load_data()?; + self.data_len = offset; + Ok(count) + } + + /// Append a mutation. Thread-safe (uses internal Mutex for ops log). + pub fn append_op(&self, key: u32, value: Vec) -> io::Result<()> { + self.ops_log.lock().append(SiloOp::Put { key, value: value.clone() })?; + self.pending.write().insert(key, value); + Ok(()) + } + + /// Append a batch of ops (one flush for the whole batch). Thread-safe. + pub fn append_ops_batch(&self, ops: &[(u32, Vec)]) -> io::Result<()> { + let mut log = self.ops_log.lock(); + let mut pending = self.pending.write(); + for (key, value) in ops { + log.append_no_sync(SiloOp::Put { key: *key, value: value.clone() })?; + pending.insert(*key, value.clone()); + } + log.sync()?; + Ok(()) + } + + /// Read an entry by key. Checks pending ops first, then mmap'd data. + pub fn get(&self, key: u32) -> Option<&[u8]> { + // Can't return &[u8] from RwLock — check pending separately + // For now, skip pending check in the hot path and let callers handle it + // TODO: return Cow or owned for pending entries + self.get_from_data(key) + } + + /// Read from the mmap'd data file only (no pending ops). + pub fn get_from_data(&self, key: u32) -> Option<&[u8]> { + let entry = self.index_entry(key)?; + if entry.length == 0 { return None; } + let mmap = self.data_mmap.as_ref()?; + let start = entry.offset as usize; + let end = start + entry.length as usize; + if end <= mmap.len() { Some(&mmap[start..end]) } else { None } + } + + /// Check if a key has a pending op value. + pub fn get_pending(&self, key: u32) -> Option> { + self.pending.read().get(&key).cloned() + } + + /// Read with pending ops overlay (returns owned data). + pub fn get_with_pending(&self, key: u32) -> Option> { + if let Some(v) = self.pending.read().get(&key) { + return Some(v.clone()); + } + self.get_from_data(key).map(|s| s.to_vec()) + } + + pub fn index_capacity(&self) -> u32 { self.index_len } + pub fn pending_count(&self) -> usize { self.pending.read().len() } + pub fn data_bytes(&self) -> u64 { self.data_len } + pub fn path(&self) -> &Path { &self.path } + + /// Compact: apply all pending ops into the data file, clear ops log. + /// After compaction, pending is empty and all data is in the mmap. + pub fn compact(&mut self) -> io::Result { + let pending = std::mem::take(&mut *self.pending.write()); + if pending.is_empty() { return Ok(0); } + + let count = pending.len() as u64; + + // For entries that fit in their allocated space: overwrite in place + // For entries that don't: append to end of data file + // For new entries (not in index): append + extend index + + // Simple approach: rewrite data file with all entries (bulk + pending merged) + // This is the correct but potentially slow approach for large silos. + // TODO: in-place update for entries that fit in allocated space + + let data_path = self.path.join("data.bin"); + let index_path = self.path.join("index.bin"); + + // Read all existing entries + overlay pending + let mut all_entries: Vec<(u32, Vec)> = Vec::new(); + let mut max_key: u32 = 0; + + // Existing entries from mmap + if let Some(ref index_mmap) = self.index_mmap { + for key in 0..self.index_len { + let pos = key as usize * INDEX_ENTRY_SIZE; + if pos + INDEX_ENTRY_SIZE > index_mmap.len() { break; } + let bytes: [u8; INDEX_ENTRY_SIZE] = index_mmap[pos..pos + INDEX_ENTRY_SIZE] + .try_into().unwrap(); + let entry: IndexEntry = unsafe { std::mem::transmute(bytes) }; + if entry.length == 0 { continue; } + + if let Some(pending_val) = pending.get(&key) { + // Pending overrides + all_entries.push((key, pending_val.clone())); + } else if let Some(data) = self.get_from_data(key) { + all_entries.push((key, data.to_vec())); + } + if key > max_key { max_key = key; } + } + } + + // New entries from pending (not in existing index) + for (key, value) in &pending { + if *key >= self.index_len { + all_entries.push((*key, value.clone())); + if *key > max_key { max_key = *key; } + } + } + + // Drop old mmaps before rewriting files + self.index_mmap = None; + self.data_mmap = None; + + // Rewrite via bulk_load + self.bulk_load(all_entries.into_iter())?; + + // Clear ops log + self.ops_log.lock().truncate()?; + + eprintln!("DataSilo: compacted {} pending ops", count); + Ok(count) + } + + // ---- Internal ---- + + fn index_entry(&self, key: u32) -> Option { + if key >= self.index_len { return None; } + let mmap = self.index_mmap.as_ref()?; + let pos = key as usize * INDEX_ENTRY_SIZE; + if pos + INDEX_ENTRY_SIZE > mmap.len() { return None; } + let bytes: [u8; INDEX_ENTRY_SIZE] = mmap[pos..pos + INDEX_ENTRY_SIZE].try_into().ok()?; + Some(unsafe { std::mem::transmute(bytes) }) + } + + fn load_index(&mut self) -> io::Result<()> { + let p = self.path.join("index.bin"); + if !p.exists() { return Ok(()); } + let f = OpenOptions::new().read(true).write(true).open(&p)?; + if f.metadata()?.len() < INDEX_ENTRY_SIZE as u64 { return Ok(()); } + let mmap = unsafe { memmap2::MmapMut::map_mut(&f)? }; + self.index_len = (mmap.len() / INDEX_ENTRY_SIZE) as u32; + self.index_mmap = Some(mmap); + Ok(()) + } + + fn load_data(&mut self) -> io::Result<()> { + let p = self.path.join("data.bin"); + if !p.exists() { return Ok(()); } + let f = File::open(&p)?; + let meta = f.metadata()?; + if meta.len() == 0 { return Ok(()); } + let mmap = unsafe { memmap2::Mmap::map(&f)? }; + self.data_len = meta.len(); + self.data_mmap = Some(mmap); + Ok(()) + } + + fn replay_ops(&mut self) -> io::Result<()> { + let ops = self.ops_log.lock().read_all()?; + let mut pending = self.pending.write(); + for op in ops { + match op { + SiloOp::Put { key, value } => { pending.insert(key, value); } + SiloOp::Delete { key } => { pending.remove(&key); } + } + } + Ok(()) + } +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_bulk_load_and_read() { + let dir = tempfile::tempdir().unwrap(); + let mut silo = DataSilo::open(dir.path(), SiloConfig::default()).unwrap(); + let entries: Vec<(u32, Vec)> = (0..1000) + .map(|i| (i, format!("doc_{}", i).into_bytes())) + .collect(); + let count = silo.bulk_load(entries.into_iter()).unwrap(); + assert_eq!(count, 1000); + assert_eq!(silo.get(0).unwrap(), b"doc_0"); + assert_eq!(silo.get(999).unwrap(), b"doc_999"); + assert!(silo.get(1000).is_none()); + } + + #[test] + fn test_append_op_overrides_bulk() { + let dir = tempfile::tempdir().unwrap(); + let mut silo = DataSilo::open(dir.path(), SiloConfig::default()).unwrap(); + silo.bulk_load(vec![(42, b"original".to_vec())].into_iter()).unwrap(); + assert_eq!(silo.get(42).unwrap(), b"original"); + silo.append_op(42, b"updated".to_vec()).unwrap(); + assert_eq!(silo.get_with_pending(42).unwrap(), b"updated"); + } + + #[test] + fn test_reopen_with_ops() { + let dir = tempfile::tempdir().unwrap(); + { + let mut silo = DataSilo::open(dir.path(), SiloConfig::default()).unwrap(); + silo.bulk_load(vec![(1, b"hello".to_vec())].into_iter()).unwrap(); + silo.append_op(1, b"world".to_vec()).unwrap(); + silo.append_op(2, b"new_entry".to_vec()).unwrap(); + } + { + let silo = DataSilo::open(dir.path(), SiloConfig::default()).unwrap(); + assert_eq!(silo.get_with_pending(1).unwrap(), b"world"); + assert_eq!(silo.get_with_pending(2).unwrap(), b"new_entry"); + } + } + + #[test] + fn test_compact() { + let dir = tempfile::tempdir().unwrap(); + let mut silo = DataSilo::open(dir.path(), SiloConfig::default()).unwrap(); + silo.bulk_load(vec![(1, b"a".to_vec()), (2, b"b".to_vec())].into_iter()).unwrap(); + silo.append_op(1, b"updated_a".to_vec()).unwrap(); + silo.append_op(3, b"new_c".to_vec()).unwrap(); + assert_eq!(silo.pending_count(), 2); + + silo.compact().unwrap(); + assert_eq!(silo.pending_count(), 0); + // After compaction, all data is in mmap + assert_eq!(silo.get(1).unwrap(), b"updated_a"); + assert_eq!(silo.get(2).unwrap(), b"b"); + assert_eq!(silo.get(3).unwrap(), b"new_c"); + } + + #[test] + fn test_sparse_keys() { + let dir = tempfile::tempdir().unwrap(); + let mut silo = DataSilo::open(dir.path(), SiloConfig::default()).unwrap(); + silo.bulk_load(vec![ + (0, b"zero".to_vec()), + (1000, b"thousand".to_vec()), + (100000, b"hundred_k".to_vec()), + ].into_iter()).unwrap(); + assert_eq!(silo.get(0).unwrap(), b"zero"); + assert_eq!(silo.get(1000).unwrap(), b"thousand"); + assert_eq!(silo.get(100000).unwrap(), b"hundred_k"); + assert!(silo.get(500).is_none()); + } + + #[test] + fn test_thread_safe_ops() { + let dir = tempfile::tempdir().unwrap(); + let mut silo = DataSilo::open(dir.path(), SiloConfig::default()).unwrap(); + silo.bulk_load(vec![(0, b"init".to_vec())].into_iter()).unwrap(); + + // append_op is &self (thread-safe via internal Mutex) + silo.append_op(1, b"from_thread".to_vec()).unwrap(); + silo.append_ops_batch(&[ + (2, b"batch_a".to_vec()), + (3, b"batch_b".to_vec()), + ]).unwrap(); + + assert_eq!(silo.get_with_pending(1).unwrap(), b"from_thread"); + assert_eq!(silo.get_with_pending(2).unwrap(), b"batch_a"); + assert_eq!(silo.pending_count(), 3); + } +} diff --git a/crates/datasilo/src/ops_log.rs b/crates/datasilo/src/ops_log.rs new file mode 100644 index 00000000..adb26ba2 --- /dev/null +++ b/crates/datasilo/src/ops_log.rs @@ -0,0 +1,156 @@ +//! Append-only ops log with CRC32 per entry. +//! +//! Format: [u8 tag][u32 key][u32 value_len][value bytes][u32 crc32] +//! Tags: 0x01 = Put, 0x02 = Delete + +use std::fs::{File, OpenOptions}; +use std::io::{self, BufWriter, Read, Seek, Write}; +use std::path::PathBuf; + +const OP_TAG_PUT: u8 = 0x01; +const OP_TAG_DELETE: u8 = 0x02; + +/// A mutation operation. +pub enum SiloOp { + Put { key: u32, value: Vec }, + Delete { key: u32 }, +} + +/// Append-only ops log file. +pub struct OpsLog { + path: PathBuf, + writer: BufWriter, +} + +impl OpsLog { + /// Open or create the ops log file. + pub fn open(path: &PathBuf) -> io::Result { + let file = OpenOptions::new() + .create(true) + .append(true) + .read(true) + .open(path)?; + Ok(Self { + path: path.clone(), + writer: BufWriter::with_capacity(65536, file), + }) + } + + /// Append an op and sync to disk. + pub fn append(&mut self, op: SiloOp) -> io::Result<()> { + self.write_op(&op)?; + self.writer.flush()?; + Ok(()) + } + + /// Append an op without syncing (for batch use — call sync() after). + pub fn append_no_sync(&mut self, op: SiloOp) -> io::Result<()> { + self.write_op(&op) + } + + /// Flush the write buffer to disk. + pub fn sync(&mut self) -> io::Result<()> { + self.writer.flush() + } + + /// Read all ops from the log file (for replay on startup). + pub fn read_all(&self) -> io::Result> { + let mut file = File::open(&self.path)?; + let meta = file.metadata()?; + if meta.len() == 0 { + return Ok(Vec::new()); + } + file.seek(io::SeekFrom::Start(0))?; + let mut data = Vec::with_capacity(meta.len() as usize); + file.read_to_end(&mut data)?; + + let mut ops = Vec::new(); + let mut pos = 0; + + while pos < data.len() { + match Self::decode_op(&data, &mut pos) { + Some(op) => ops.push(op), + None => break, // Truncated entry — stop replay + } + } + + Ok(ops) + } + + /// Clear the ops log (after compaction). + pub fn truncate(&mut self) -> io::Result<()> { + let file = OpenOptions::new() + .write(true) + .truncate(true) + .open(&self.path)?; + self.writer = BufWriter::with_capacity(65536, file); + Ok(()) + } + + // ---- Internal ---- + + fn write_op(&mut self, op: &SiloOp) -> io::Result<()> { + let mut buf = Vec::with_capacity(128); + + match op { + SiloOp::Put { key, value } => { + buf.push(OP_TAG_PUT); + buf.extend_from_slice(&key.to_le_bytes()); + buf.extend_from_slice(&(value.len() as u32).to_le_bytes()); + buf.extend_from_slice(value); + } + SiloOp::Delete { key } => { + buf.push(OP_TAG_DELETE); + buf.extend_from_slice(&key.to_le_bytes()); + } + } + + let crc = crc32fast::hash(&buf); + self.writer.write_all(&buf)?; + self.writer.write_all(&crc.to_le_bytes())?; + Ok(()) + } + + fn decode_op(data: &[u8], pos: &mut usize) -> Option { + if *pos >= data.len() { return None; } + let entry_start = *pos; + let tag = data[*pos]; + *pos += 1; + + match tag { + OP_TAG_PUT => { + if *pos + 8 > data.len() { return None; } + let key = u32::from_le_bytes(data[*pos..*pos + 4].try_into().ok()?); + *pos += 4; + let value_len = u32::from_le_bytes(data[*pos..*pos + 4].try_into().ok()?) as usize; + *pos += 4; + if *pos + value_len + 4 > data.len() { return None; } + let value = data[*pos..*pos + value_len].to_vec(); + *pos += value_len; + let payload_end = *pos; + // Verify CRC + let expected_crc = u32::from_le_bytes(data[*pos..*pos + 4].try_into().ok()?); + *pos += 4; + let actual_crc = crc32fast::hash(&data[entry_start..payload_end]); + if actual_crc != expected_crc { + return None; + } + Some(SiloOp::Put { key, value }) + } + OP_TAG_DELETE => { + if *pos + 4 + 4 > data.len() { return None; } + let key = u32::from_le_bytes(data[*pos..*pos + 4].try_into().ok()?); + *pos += 4; + let payload_end = *pos; + let expected_crc = u32::from_le_bytes(data[*pos..*pos + 4].try_into().ok()?); + *pos += 4; + let actual_crc = crc32fast::hash(&data[entry_start..payload_end]); + if actual_crc != expected_crc { + return None; + } + Some(SiloOp::Delete { key }) + } + _ => None, + } + } +} diff --git a/deploy/configs/civitai/config.yaml b/deploy/configs/civitai/config.yaml new file mode 100644 index 00000000..83c3ef9f --- /dev/null +++ b/deploy/configs/civitai/config.yaml @@ -0,0 +1,90 @@ +name: civitai + +config: + filter_fields: + - { name: nsfwLevel, field_type: single_value, eager_load: true } + - { name: userId, field_type: single_value, eager_load: true } + - { name: type, field_type: single_value, eager_load: true } + - { name: baseModel, field_type: single_value, eager_load: true } + - { name: availability, field_type: single_value, eager_load: true } + - { name: postId, field_type: single_value, per_value_lazy: true } + - { name: postedToId, field_type: single_value, per_value_lazy: true } + - { name: remixOfId, field_type: single_value } + - { name: hasMeta, field_type: boolean, eager_load: true } + - { name: onSite, field_type: boolean, eager_load: true } + - { name: poi, field_type: boolean } + - { name: minor, field_type: boolean } + - { name: isPublished, field_type: boolean, eager_load: true } + - { name: isRemix, field_type: boolean } + - { name: blockedFor, field_type: single_value, eager_load: true } + - { name: tagIds, field_type: multi_value } + - { name: modelVersionIds, field_type: multi_value } + - { name: modelVersionIdsManual, field_type: multi_value } + - { name: toolIds, field_type: multi_value } + - { name: techniqueIds, field_type: multi_value } + + sort_fields: + - { name: reactionCount, bits: 32, eager_load: true } + - name: sortAt + bits: 32 + eager_load: true + computed: + op: greatest + source_fields: [existedAt, publishedAt] + - { name: commentCount, bits: 32, eager_load: true } + - { name: collectedCount, bits: 32, eager_load: true } + - { name: existedAt, bits: 32 } + - { name: publishedAt, bits: 32 } + - { name: id, bits: 32 } + + max_page_size: 200 + + deferred_alive: + source_field: publishedAt + + time_buckets: + filter_field: sortAtUnix + sort_field: sortAt + range_buckets: + - { name: 24h, duration_secs: 86400, refresh_interval_secs: 300 } + - { name: 7d, duration_secs: 604800, refresh_interval_secs: 3600 } + - { name: 30d, duration_secs: 2592000, refresh_interval_secs: 3600 } + - { name: 1y, duration_secs: 31536000, refresh_interval_secs: 86400 } + +data_schema: + id_field: id + schema_version: 1 + fields: + - { source: nsfwLevel, target: nsfwLevel, value_type: integer, fallback: combinedNsfwLevel } + - { source: userId, target: userId, value_type: integer } + - { source: type, target: type, value_type: low_cardinality_string } + - { source: baseModel, target: baseModel, value_type: low_cardinality_string, nullable: true } + - { source: availability, target: availability, value_type: low_cardinality_string, nullable: true } + - { source: postId, target: postId, value_type: integer, nullable: true } + - { source: postedToId, target: postedToId, value_type: integer, nullable: true } + - { source: remixOfId, target: remixOfId, value_type: integer, nullable: true } + - { source: publishedAtUnix, target: isPublished, value_type: exists_boolean } + - { source: remixOfId, target: isRemix, value_type: exists_boolean } + - { source: blockedFor, target: blockedFor, value_type: low_cardinality_string, nullable: true } + - { source: hasMeta, target: hasMeta, value_type: boolean, default: false } + - { source: onSite, target: onSite, value_type: boolean, default: false } + - { source: poi, target: poi, value_type: boolean, default: false } + - { source: minor, target: minor, value_type: boolean, default: false } + - { source: tagIds, target: tagIds, value_type: integer_array, default: [] } + - { source: modelVersionIds, target: modelVersionIds, value_type: integer_array, default: [] } + - { source: modelVersionIdsManual, target: modelVersionIdsManual, value_type: integer_array, default: [], filter_only: true } + - { source: toolIds, target: toolIds, value_type: integer_array, default: [], filter_only: true } + - { source: techniqueIds, target: techniqueIds, value_type: integer_array, default: [], filter_only: true } + - { source: reactionCount, target: reactionCount, value_type: integer, default: 0 } + - { source: sortAtUnix, target: sortAt, value_type: integer, fallback: sortAt, ms_to_seconds: true } + - { source: commentCount, target: commentCount, value_type: integer, default: 0 } + - { source: collectedCount, target: collectedCount, value_type: integer, default: 0 } + - { source: publishedAtUnix, target: publishedAt, value_type: integer, ms_to_seconds: true } + - { source: existedAt, target: existedAt, value_type: integer } + - { source: url, target: url, value_type: string, doc_only: true } + - { source: hash, target: hash, value_type: string, doc_only: true } + - { source: width, target: width, value_type: integer, doc_only: true } + - { source: height, target: height, value_type: integer, doc_only: true } + - { source: needsReview, target: needsReview, value_type: string, doc_only: true } + - { source: acceptableMinor, target: acceptableMinor, value_type: boolean, doc_only: true, default: false } + - { source: index, target: index, value_type: integer, doc_only: true, default: 0 } diff --git a/deploy/configs/civitai/ui-config.yaml b/deploy/configs/civitai/ui-config.yaml new file mode 100644 index 00000000..7a47717f --- /dev/null +++ b/deploy/configs/civitai/ui-config.yaml @@ -0,0 +1,133 @@ +# BitDex UI Config — Civitai Images +# +# This file controls how the embedded web UI renders for this index. +# Loaded from data_dir/indexes/{name}/ui-config.yaml and served at +# GET /api/indexes/{name}/ui-config +# +# Without this file, the UI auto-generates controls from the engine config: +# - boolean fields → select (Any/Yes/No) +# - single_value with dictionary → select (populated from /dictionaries) +# - single_value without dictionary → number input +# - multi_value → comma-separated text input +# - sort fields → dropdown from engine config +# - time ranges → from config.time_buckets + +title: "BitDex — Civitai Images" + +# ── Filter Controls ── +# Only fields that need overrides. Unlisted fields auto-generate. +# Set control: hidden to suppress a field entirely. +filters: + nsfwLevel: + control: checklist + label: "NSFW Level" + options: + - { value: 1, label: "PG" } + - { value: 2, label: "PG-13" } + - { value: 4, label: "Mature" } + - { value: 8, label: "X" } + - { value: 16, label: "XXX" } + - { value: 32, label: "Blocked" } + default: [1] + span: 2 + + tagIds: { label: "Tag IDs" } + modelVersionIds: { label: "Model Versions" } + toolIds: { label: "Tool IDs" } + techniqueIds: { label: "Technique IDs" } + userId: { label: "User ID" } + postId: { label: "Post ID" } + + # Hide fields that exist in the engine but aren't useful as UI filters + isPublished: { control: hidden } + isRemix: { control: hidden } + blockedFor: { control: hidden } + remixOfId: { control: hidden } + postedToId: { control: hidden } + modelVersionIdsManual: { control: hidden } + +# ── Sort Controls ── +sort: + default_field: reactionCount + default_direction: Desc + labels: + reactionCount: "Most Reactions" + sortAt: "Date" + commentCount: "Most Comments" + collectedCount: "Most Collected" + id: "ID" + +# ── Display ── +display: + page_size: 100 + +# ── Card Rendering ── +# How result cards appear in the grid +card: + image: + field: url + template: "https://image.civitai.com/xG1nkqKTMzGDvpLrqFT7WA/{value}/width={width}/image.jpeg" + thumbnail_width: 400 + full_width: 1200 + badges: + - { field: baseModel, position: top-right } + - { fields: [width, height], position: top-left, template: "{width}×{height}" } + meta: + left: { field: reactionCount, prefix: "❤ ", format: number } + right: { field: _slot_id, prefix: "#" } + +# ── Detail Modal ── +# What shows when you click a card. Fields render in order listed. +# Any document fields NOT listed here appear at the bottom alphabetically. +# +# Display types: +# image — render as , supports width_field/height_field for dimensions +# link — clickable using link template +# code — monospace font +# (default) — auto-detect: dictionary fields show labels, others show raw value +# +# Format types: +# number — locale-formatted (12345 → "12,345") +# timestamp — unix epoch → human date +# count — arrays: "[N items]" if large, comma list if small +# (default) — raw value +# +# hide_if_empty: true — hide the row when the value is null, empty, or 0 + +detail: + fields: + - field: url + label: "Image" + display: image + template: "https://image.civitai.com/xG1nkqKTMzGDvpLrqFT7WA/{value}/width=800/image.jpeg" + width_field: width + height_field: height + + - { field: baseModel, label: "Base Model" } + - { field: nsfwLevel, label: "NSFW Level" } + - { field: type, label: "Type" } + - { field: availability, label: "Availability", hide_if_empty: true } + + - { field: userId, label: "User", link: "https://civitai.com/user/{value}" } + - { field: postId, label: "Post", link: "https://civitai.com/posts/{value}", hide_if_empty: true } + + - { field: reactionCount, label: "Reactions", format: number } + - { field: commentCount, label: "Comments", format: number } + - { field: collectedCount, label: "Collected", format: number } + + - { field: sortAt, label: "Sort Date", format: timestamp } + - { field: publishedAt, label: "Published", format: timestamp, hide_if_empty: true } + - { field: existedAt, label: "Created", format: timestamp, hide_if_empty: true } + + - { field: tagIds, label: "Tags", format: count } + - { field: modelVersionIds, label: "Model Versions", format: count, hide_if_empty: true } + - { field: toolIds, label: "Tools", format: count, hide_if_empty: true } + - { field: techniqueIds, label: "Techniques", format: count, hide_if_empty: true } + + - { field: hash, label: "Hash", display: code, hide_if_empty: true } + + - { field: poi, label: "POI", hide_if_empty: true } + - { field: minor, label: "Minor", hide_if_empty: true } + + # Fields to never show in the modal (even in the overflow section) + hidden: [width, height, index, acceptableMinor, needsReview, url] diff --git a/docs/design/datasilo-implementation-plan.md b/docs/design/datasilo-implementation-plan.md new file mode 100644 index 00000000..8bffe027 --- /dev/null +++ b/docs/design/datasilo-implementation-plan.md @@ -0,0 +1,301 @@ +# DataSilo Implementation Plan + +## Benchmark Findings + +### Write Throughput (10M entries × 230B, 32 threads) + +| Approach | Rate | At 109M | +|---|---|---| +| Current StreamingDocWriter (200K shard files) | 82K/s | 22 min | +| BufWriter (single file, sequential) | 6.2M/s | 17.5s | +| DataSilo parallel mmap (1MB regions, cold) | 35.3M/s | 3.1s | +| DataSilo parallel mmap (hot pages) | 56.1M/s | 1.9s | + +### Read Throughput + +| Approach | Rate | +|---|---| +| Current DocStoreV3 (cold, shard file open) | ~60/s (16ms each) | +| Current DocCache (hot) | ~1M/s (<1μs) | +| DataSilo mmap (random keys, hot) | 23-27M/s | + +### Encoding Formats (1M iterations, 20-field doc) + +| Format | Encode | Decode | Size | Verdict | +|---|---|---|---|---| +| msgpack (rmp_serde) | 334ns (3.0M/s) | 177ns (5.6M/s) | ~230B | Too slow | +| Raw binary (hand-rolled) | 72ns (13.9M/s) | 17ns (58.8M/s) | 211B | Fast | +| **DocOpCodec (current BitDex)** | **71ns (14.1M/s)** | **16ns (62.5M/s)** | 221B | **Winner — keep** | + +**Decision:** Keep DocOpCodec format. Encoding at 71ns with 32 threads = ~2.2ns amortized — well within the 28.6ns budget at 35M writes/sec. + +### Pre-faulting + +| Strategy | Prefault | Write | Total | Rate | +|---|---|---|---|---| +| Cold (no prefault) | — | 0.283s | 0.283s | **35.3M/s** | +| Sequential memset | 1.376s | 0.177s | 1.552s | 6.4M/s | +| Parallel memset | 0.322s | 0.181s | 0.503s | 19.9M/s | +| Parallel page-touch | 0.355s | 0.173s | 0.527s | 19.0M/s | + +**Decision:** No pre-faulting. Cold writes at 35M/s are already faster than any prefault+write combination. Pre-faulting doubles I/O (touch every page twice). The OS handles page faults efficiently for sequential-within-region access patterns. + +**Caveat:** On the 32GB K8s pod under memory pressure, cold page faults may be slower. If needed, parallel page-touch (0.36s for 2.3GB) is the best cross-platform option. Gemini also flagged `MADV_POPULATE_WRITE` (Linux 5.14+) and `SetFileValidData` (Windows, admin-only) as OS-specific accelerators. + +### Pipeline Bottleneck Analysis (images phase, 14.6M rows from 1GB CSV) + +| Step | Time | Notes | +|---|---|---| +| Enrichment load | 7s | posts.csv HashMap | +| Parallel parse + bitmap build + doc write | 26s | 32 rayon threads | +| Bitmap merge | 6.5s | rayon fold+reduce | +| **Enrichment drop** | **50.5s** | Freeing 56M String allocations | +| StreamingDocWriter finalize | 1s | (after fsync removal) | +| Bitmap save to disk | 4s | ShardStore writes | +| **Total** | **~95s** | Enrichment drop was the hidden bottleneck | + +**Fix applied:** Background-thread enrichment drop. Reduced wall-clock from 145s → 51s. + +--- + +## Architecture + +### Generic DataSilo Crate + +One engine, trait-parameterized. No code duplication across doc/bitmap/cache silos. + +```rust +// crates/datasilo/src/lib.rs +pub struct DataSilo { + index: MmapMut, // key → (offset, length, allocated) + data: MmapMut, // packed variable-size entries + ops_log: OpsLog, // append-only mutations with CRC32 + pending: HashMap>, // in-memory ops for read-time apply +} + +pub trait SiloKey: Copy + Eq + Hash + Send + Sync { + fn to_index(&self) -> usize; +} + +// Three instantiations: +type DocSilo = DataSilo; // slot_id → DocOpCodec bytes +type BitmapSilo = DataSilo; // (field,value) → frozen bitmap bytes +type CacheSilo = DataSilo; // query_hash → cache entry bytes + +// Parallel writer for bulk loads (dump pipeline) +pub struct ParallelWriter { ... } +pub struct ThreadWriter<'a> { ... } // per-thread, 1MB regions, lock-free +``` + +### Three Files per Silo (replaces 205K shard files) + +| Silo | Index | Data | Ops | +|---|---|---|---| +| DocSilo | 2GB (126M × 16B) | 25GB (109M × 230B) | small | +| BitmapSilo | <1MB (32K × 16B) | 5-6GB (frozen bitmaps) | small | +| CacheSilo | <1MB | variable | small | + +**Total: ~9 files** (down from 205K) + +### Dump Pipeline Architecture (all merge ops, compaction after) + +``` +For each CSV phase (images, tags, resources, tools, techniques, metrics): + 32 rayon threads in parallel: + parse CSV row → slot_id + field values + encode doc fields → DocOpCodec bytes + doc_silo.thread_writer.write(slot_id, &doc_bytes) ← mmap memcpy + for each bitmap field: + bitmap_silo.thread_writer.write(bitmap_key, &op) ← mmap append merge op + +After ALL phases complete: + bitmap_silo.compact() → replay merge ops, build final bitmaps + doc_silo is already final (each slot written once per phase, Merge semantics) +``` + +**Key insight:** During dump, bitmap data is written as merge ops (append-only, no memory accumulation). Compaction after dump replays ops to build final bitmaps. This means: + +- **Zero bitmap memory during parse** — no per-thread HashMaps of RoaringBitmaps +- **Maximum write throughput** — each thread writes at mmap speed (35M/s) +- **Compaction is fast** — ops are binary (no CSV re-parse), smaller than CSV, parallelizable by bitmap key + +**Trade-off:** Bitmap ops log for tags would be ~36GB (4.5B × 8B). Compaction reads 36GB and builds 28K bitmaps. This is disk I/O traded for memory. On machines with limited RAM (32GB pod) this is a win. On 128GB machines the current in-memory approach is faster. + +**Hybrid option:** Use merge ops for large multi-value phases (tags: 4.5B rows) and in-memory accumulation for small phases (images: 109M rows with few distinct values per filter field). + +--- + +## Implementation Phases + +### Phase 1: DataSilo Crate (crates/datasilo/) + +Core generic engine. ~500-800 lines. + +- [x] `DataSilo` with open/get/bulk_load +- [x] OpsLog with CRC32 append + replay +- [x] IndexEntry (16 bytes: offset + length + allocated) +- [x] ParallelWriter with atomic bump + 1MB thread-local regions +- [x] ThreadWriter for sequential-within-region writes +- [x] 5 unit tests passing +- [x] Benchmarks: 35M/s write, 23-27M/s read, 56M/s hot +- [ ] Make generic over `K: SiloKey` (currently hardcoded u32) +- [ ] Thread-safe append_op (interior mutability for concurrent ops) +- [ ] Compaction (rewrite data file, reclaim dead space, clear ops log) +- [ ] Delete support (mark index entry as tombstone) +- [ ] Multi-shard support (optional, for very large data files) + +### Phase 2: DocSilo Integration + +Replace DocStoreV3 → DataSilo for doc storage. Immediate dump perf fix. + +- [ ] Wire `DataSilo` as ConcurrentEngine's doc store +- [ ] Dump: parse threads write docs via `ThreadWriter` inline (no channel, no StreamingDocWriter) +- [ ] Multi-phase merge: later phases append via ops log (Merge semantics in caller, DataSilo stores raw bytes) +- [ ] Server read path: `silo.get(slot)` + DocOpCodec decode → StoredDoc +- [ ] Remove DocCache (mmap reads at 23M/s replace it) +- [ ] Remove StreamingDocWriter, ShardStoreBulkWriter, ShardPreCreator + +### Phase 3: BitmapSilo Integration + +Replace FilterBitmapStore + SortBitmapStore + AliveBitmapStore. + +- [ ] `BitmapKey` type: hash of (field_name, value) or (field_name, bit_layer) +- [ ] Dump: write bitmap merge ops via ThreadWriter +- [ ] Post-dump compaction: replay ops → build RoaringBitmaps → serialize → write to data file +- [ ] Query path: `silo.get(key)` → frozen bitmap bytes → `FrozenRoaringBitmap::view()` (zero-copy) +- [ ] Mutation path: bitmap diffs as ops (union/subtract) +- [ ] Lazy loading eliminated (mmap = instant access) + +### Phase 4: CacheSilo + Cleanup + +- [ ] BoundStore → CacheSilo +- [ ] Delete old storage code (~11K lines): docstore.rs, doc_cache.rs, bitmap_fs.rs, shard_store.rs, shard_store_bitmap.rs, shard_store_meta.rs, shard_store_doc.rs, bound_store.rs +- [ ] Update CLAUDE.md, tests, docs + +--- + +## What Stays vs What Goes + +| Keep | Why | +|------|-----| +| ConcurrentEngine | Core query/mutation orchestration | +| InnerEngine + ArcSwap | Snapshot isolation for reads | +| Flush thread | Mutation batching + cache maintenance | +| FilterIndex, SortIndex | In-memory bitmap structures for queries | +| QueryExecutor, sort.rs | Query evaluation logic | +| DocOpCodec format | Fastest encode/decode (71ns/16ns) | +| DumpProcessor CSV parsing | Parse + enrichment logic unchanged | + +| Delete | Replaced by | +|--------|-------------| +| DocStoreV3 + DocShardStore | DataSilo (doc reads/writes) | +| StreamingDocWriter | ParallelWriter (dump) | +| ShardStoreBulkWriter | ParallelWriter | +| ShardStore generic | DataSilo | +| FilterBitmapStore | DataSilo (bitmap silo) | +| SortBitmapStore | DataSilo (bitmap silo) | +| AliveBitmapStore | DataSilo | +| DocCache | Eliminated (mmap reads fast enough) | +| ShardPreCreator | Eliminated (no per-shard files) | +| BoundStore | DataSilo (cache silo) | +| bitmap_fs.rs | Eliminated | + +**Lines deleted: ~10,000. Lines added: ~1,500 (DataSilo crate). Lines rewritten: ~750.** + +--- + +## Code Removal Map (from LSP scout) + +### Files to Delete Entirely (9,790 lines) + +| File | Lines | Purpose | +|---|---|---| +| `src/shard_store.rs` | 1,779 | ShardStore generic engine, generation system, codecs | +| `src/shard_store_bitmap.rs` | 1,723 | Alive/Filter/Sort bitmap stores | +| `src/shard_store_meta.rs` | 292 | MetaStore (slot_counter, time_buckets, cursors) | +| `src/bitmap_fs.rs` | 1,137 | Legacy BitmapFs (.roar file persistence) | +| `src/doc_cache.rs` | ~786 | DocCache (generational LRU, replaced by mmap) | +| `src/bound_store.rs` | 1,083 | BoundStore (cache persistence, replaced by CacheSilo) | + +### From shard_store_doc.rs — Partial Delete + +**Delete:** DocStoreV3, DocSnapshot, DocOp enum, DocOpCodec apply logic, DocSnapshotCodec, SlotHexShard, ShardStoreBulkWriter, StreamingDocWriter, ShardPreCreator. + +**Keep:** `StoredDoc` (doc schema type), `PackedValue` (value enum), `DocOpCodec::encode_op/decode_op` (fastest encoding at 71ns), field conversion utilities. Move these to a new `src/doc_format.rs` or keep in a trimmed `shard_store_doc.rs`. + +### Files to Rewire (12 files, ~750 lines) + +| File | Lines Changed | Key Changes | +|---|---|---| +| `concurrent_engine.rs` | ~500 | Remove 6 storage fields + doc_cache, delete pin_shard_generations/compact_all/purge_bound_store, rewrite build() init, rewrite docstore accessor | +| `dump_processor.rs` | ~250 | Rewrite save_phase_to_disk signature (4 ShardStore params → DataSilo), rewrite bitmap save loops, delete StreamingDocWriter/ShardPreCreator refs | +| `server.rs` | ~25 | Remove 3 pin_shard_generations() calls in capture handlers | +| `capture.rs` | ~40 | Remove gen_start/gen_stop fields and set methods | +| `ops_processor.rs` | ~20 | Rewrite DocStoreV3 constructor + tests | +| `ingester.rs` | ~30 | Rewrite DocSink wrapper type | +| `engine.rs` | ~15 | Rewrite DocStoreV3::open() calls | +| `mutation.rs` | ~20 | Rewrite docstore parameter types + tests | +| `config.rs` | ~25 | Delete DocCacheConfigEntry + doc_cache field | +| `pg_sync/backfill.rs` | ~40 | Remove BitmapFs references | +| `pg_sync/bulk_loader.rs` | ~5 | Update writer type | +| `metrics.rs` | ~30 | Remove BoundStore/DocCache/ShardStore metric stubs | + +### Generation System Removal + +All generation/pinning symbols removed with ShardStore: +- `shard_store.rs`: `current_generation()`, `pin_generation()` — deleted with file +- `concurrent_engine.rs`: `pin_shard_generations()` method — delete +- `server.rs`: 3 call sites to `pin_shard_generations()` — delete +- `capture.rs`: `gen_start`, `gen_stop`, `set_gen_start()`, `set_gen_stop()` — delete + +**Safe:** `ops_wal.rs::current_generation()` is unrelated (WAL file naming) — KEEP. + +### Files Safe / No Changes + +- `src/loader.rs` — only imports StoredDoc (schema type, stays) +- `src/ops_wal.rs` — WAL generations separate from ShardStore +- `src/query.rs`, `src/sort.rs`, `src/filter.rs` — pure in-memory operations + +--- + +## Execution Plan + +### Step 1: Finish DataSilo Crate + +Complete the generic `DataSilo` with: +- [ ] Generic over key type (currently u32-only) +- [ ] Thread-safe `append_op` (Mutex for ops log — low contention) +- [ ] Compaction: replay ops → rewrite data file → clear ops log +- [ ] Delete support (tombstone in index) +- [ ] `flush()` method (explicit mmap flush for crash safety) + +### Step 2: Delete Old Storage + Wire DocSilo + +Do this in ONE pass — delete the files, fix compile errors by wiring DataSilo: + +1. Delete 6 storage files +2. Trim `shard_store_doc.rs` → `doc_format.rs` (keep StoredDoc, PackedValue, DocOpCodec) +3. Add `DataSilo` as docstore field in `ConcurrentEngine` +4. Rewrite `build()` to open/create DocSilo +5. Rewrite doc read path: `silo.get(slot)` + DocOpCodec decode +6. Rewrite dump pipeline: ParallelWriter inline in parse loop +7. Delete generation pinning from server.rs + capture.rs +8. Delete DocCache, config entries, metric stubs +9. Fix all compile errors in secondary consumers +10. Run tests + +### Step 3: Wire BitmapSilo + +1. Add `DataSilo` for filter + sort + alive bitmaps +2. Dump pipeline: write bitmap merge ops to BitmapSilo during parse +3. Post-dump compaction: replay ops → build bitmaps → write to data file +4. Query path: read frozen bitmaps from silo +5. Mutation path: diffs as ops +6. Remove in-memory bitmap accumulation from dump (optional — can keep for now) + +### Step 4: Wire CacheSilo + Final Cleanup + +1. Replace BoundStore with CacheSilo +2. Final code cleanup — remove any remaining dead refs +3. Update CLAUDE.md architecture section +4. Update all design docs diff --git a/docs/design/docop-merge.md b/docs/design/docop-merge.md new file mode 100644 index 00000000..bbb51e79 --- /dev/null +++ b/docs/design/docop-merge.md @@ -0,0 +1,186 @@ +# DocOp::Merge — Multi-Phase Dump Docstore Fix + +## Problem + +Multi-phase CSV dumps lose data from earlier phases. After all 6 phases complete (images → tags → resources → tools → techniques → metrics), documents only contain the last phase's fields. All earlier fields are zeroed out. + +### Root Cause + +The dump processor writes all phases using `DocOp::Create`, which **replaces** the entire document: + +```rust +// shard_store_doc.rs:531-533 +DocOp::Create { slot, fields } => { + snapshot.docs.insert(*slot, fields.clone()); // REPLACES +} +``` + +Phase 1 (images) writes `Create { slot=42, fields=[userId, nsfwLevel, url, ...] }`. Phase 6 (metrics) writes `Create { slot=42, fields=[reactionCount, commentCount, collectedCount] }`. On read, phase 6's Create replaces phase 1's data entirely. + +### Why Set Alone Doesn't Fix It + +`DocOp::Set` works field-by-field and merges correctly. But using Set for object-level writes during dumps would mean N individual ops per document per phase (one per field), which is far less compact than a single op with all fields. At 109M records x 20 fields, that's 2.18B ops vs 109M ops. + +## Design: DocOp::Merge + +Add a new `DocOp::Merge` variant that combines fields into an existing document without replacing it. + +### Op Definition + +```rust +pub enum DocOp { + Set { slot: u32, field: u16, value: PackedValue }, + Append { slot: u32, field: u16, value: PackedValue }, + Remove { slot: u32, field: u16, value: PackedValue }, + Delete { slot: u32 }, + Create { slot: u32, fields: Vec<(u16, PackedValue)> }, + Merge { slot: u32, fields: Vec<(u16, PackedValue)> }, // NEW +} +``` + +### Apply Semantics + +```rust +DocOp::Merge { slot, fields } => { + let doc = snapshot.docs.entry(*slot).or_default(); + for (field_idx, value) in fields { + if let Some(entry) = doc.iter_mut().find(|(f, _)| *f == *field_idx) { + entry.1 = value.clone(); // overwrite existing field + } else { + doc.push((*field_idx, value.clone())); // add new field + } + } +} +``` + +**Key semantic rules:** +- **Merge is an upsert on document existence.** If slot exists, patch fields. If not, create doc with provided fields via `or_default()`. +- **Last-write-wins per field.** If the merged field already exists, overwrite it. +- **Duplicate field indices within one Merge op** resolve by last occurrence wins (linear scan behavior). Reject/deduplicate at write time when practical. +- **Field order is not semantically meaningful.** All lookups use `.find()` linear scan, not binary search. No sorting required. +- **Delete does not block future writes.** `Delete` removes current doc state. A subsequent `Merge` or `Set` recreates the doc via `or_default()`. This is standard log-structured upsert semantics. + +Key difference from Create: +- **Create**: `snapshot.docs.insert(slot, fields)` — replaces entire document +- **Merge**: iterates fields and upserts each one into the existing document + +### Wire Format + +- Tag: `OP_TAG_MERGE = 0x06` +- Encoding: identical to Create — `[tag][slot:u32][num_fields:u16][field_pairs...]` +- Only the tag byte differs + +### Backward/Forward Compatibility + +- **New reader + old file**: Fully supported. Old files contain no Merge ops. +- **Old reader + new file**: Old binaries will encounter `0x06` and fail with "unknown doc op tag" error (existing error path in `decode_op`). This is a clear, fast failure. +- **Rollback after writing Merge ops**: Requires compaction first to resolve Merge ops into snapshot data. After compaction, the shard file contains only a snapshot (no ops), which old binaries can read. +- **Mitigation**: Deploy new binary, run compaction, verify. If rollback is needed, compact all shards first. + +### Compaction Behavior + +During compaction (`read_up_to_generation`), ops are applied in order over the snapshot: +1. Snapshot (if present) provides the base document +2. Ops are applied sequentially via `OpCodec::apply()` +3. Merge ops merge fields into whatever exists + +After compaction writes a new snapshot, the snapshot contains the fully merged document. No special compaction logic needed — the standard apply path handles it. + +### When to Use Each Op + +| Op | Use Case | During Dump | +|----|----------|-------------| +| `Create` | Destructive full replacement (ops pipeline upserts where full doc is known) | NOT used during dump — Merge is safer | +| `Merge` | Add/update fields on an existing or new document | ALL object-level dump phases (images, resources enrichment, metrics) | +| `Set` | Single field update | Individual tuple writes (tags, tools, techniques) | +| `Append` | Add value to multi-value field | Not used during dump currently | + +**Critical design decision (per GPT/Gemini review):** ALL dump phases use `Merge` for object-level writes, including phase 1 (images). This eliminates the ordering hazard where a late `Create` could wipe earlier `Merge` data. `Create` is reserved for the ops pipeline where full-document replacement semantics are explicitly intended. + +### Dump Processor Changes + +The `StreamingDocWriter` gets explicit methods instead of a boolean mode flag (per review feedback — explicit methods are harder to misuse): + +1. **`write_merge_doc(slot, fields)`** — NEW: Writes `DocOp::Merge`. Used by all dump phases for object-level writes. +2. **`write_doc(slot, fields)`** — EXISTING: Continues to write `DocOp::Create`. Used by ops pipeline only. +3. **`write_field(slot, field_idx, value)`** — EXISTING: Writes `DocOp::Set`. Used for individual tuple fields (tags, tools, techniques). Unchanged. + +In `dump_processor.rs`, change all calls from `write_doc()`/`append_tuples_raw()` to `write_merge_doc()` for object-level phase writes. Tuple phases (tags, tools, techniques) continue using `write_field()` / `Set` as today. + +### Hardcoded Generation: gen_000 + +The docstore has a hardcoded `gen_000` path: + +```rust +// shard_store_doc.rs:1164 +root.join("gen_000") +``` + +This is fine — the docstore uses a single-generation model (unlike bitmap shardstore which uses multi-gen). The `gen_000` is effectively a constant directory name, not a dynamic generation. No change needed here. + +### Files Changed + +1. **`src/shard_store_doc.rs`** + - Add `Merge` variant to `DocOp` enum (line ~159) + - Add `OP_TAG_MERGE = 0x06` constant (line ~170) + - Add encode/decode for Merge in `DocOpCodec` (identical to Create encoding, different tag) + - Add apply logic for Merge in `DocOpCodec::apply()` (line ~469) + - Add `write_merge_doc()` method to `StreamingDocWriter` + - Add `append_tuples_merge()` method (like `append_tuples_raw` but emits Merge) + +2. **`src/dump_processor.rs`** + - Change object-level write calls from `append_tuples_raw()` to `append_tuples_merge()` + - Tuple phases (tags, tools, techniques) unchanged — they use `write_field()` / `Set` + +3. **No changes to `concurrent_engine.rs`** — the writer creation doesn't need a flag + +### Operational Invariants + +- **Phase ordering within a dump**: Not strictly required for correctness since all phases use Merge, but phases should still run in documented order for operational clarity. +- **Every slot need not appear in phase 1**: If a slot only appears in phase 3, Merge creates a partial doc. This is acceptable — the alive bitmap (set by phase 1) determines visibility. +- **No Create after Merge for same slot during dumps**: Enforced by using only Merge in the dump path. Create is reserved for the ops pipeline. +- **Field index consistency**: All phases share the same `field_to_idx` mapping from the config-driven schema. This is enforced by the `StreamingDocWriter` using the engine's field registry. + +### Test Plan + +#### Unit Tests (shard_store_doc.rs) + +1. `test_merge_op_roundtrip` — encode/decode Merge +2. `test_apply_merge_combines_fields` — Merge into existing doc preserves old fields +3. `test_apply_merge_overwrites_existing_field` — Merge updates fields that already exist +4. `test_apply_merge_on_empty_doc` — Merge on nonexistent slot creates the doc +5. `test_merge_then_merge_accumulates` — Two Merge ops for same slot, verify union of fields +6. `test_create_then_merge_preserves_both` — Create phase 1, Merge phase 2, verify both fields present +7. `test_merge_then_create_replaces` — Verify Create after Merge still replaces (for ops pipeline correctness) +8. `test_delete_then_merge_resurrects` — Delete followed by Merge creates new doc +9. `test_merge_duplicate_fields_last_wins` — Merge with duplicate field indices, verify last occurrence wins +10. `test_compaction_preserves_merge_chain` — Create + Merge + Merge, compact, read, verify all fields + +#### Integration Tests + +11. `test_streaming_writer_merge_between_phases` — Phase 1 write_merge_doc, finalize, Phase 2 write_merge_doc, verify combined +12. `test_streaming_writer_merge_and_set_between_phases` — Phase 1 write_merge_doc, Phase 2 write_field (Set), verify combined +13. `test_read_before_and_after_compaction_identical` — Build state via ops, read, compact, read again, compare + +#### Local Dump Tests + +14. Small dataset (1000 records), 2 phases (images + metrics), verify all fields present +15. Small dataset, 3 phases (images + tags + metrics), verify mixed Merge + Set works + +#### Full Dump Test + +16. 109M records, all 6 phases, verify documents have all fields from all phases + +### Potential Gaps + +1. **Crash/recovery mid-phase**: If phase 2 writes Merge for 30% of docs then crashes, rerunning phase 2 writes duplicate Merge ops. This is safe — Merge is idempotent for scalar fields (last write wins). For multi-value fields written via Set, duplicates are also safe (Set overwrites). +2. **Partial/corrupt op at tail**: The existing shard reader truncates incomplete trailing ops (CRC validation). Merge ops use the same framing, so tail recovery works unchanged. +3. **Wrong method selected**: Using `write_doc()` (Create) instead of `write_merge_doc()` during a dump would still cause data loss. Mitigated by: explicit method names, no boolean mode, clear documentation. Could add a runtime warning if Create is used during an active dump task. +4. **Schema drift**: If phases somehow use different field index mappings, Merge would silently write wrong fields. Mitigated by: all phases use the same engine's field registry. Could add a schema hash to shard headers for extra safety (future work). +5. **Append/Remove interaction with Merge**: A field introduced by Merge and later modified by Append should work correctly since Merge upserts the field entry and Append modifies the existing value. Should be covered by unit tests. + +### Review History + +- **GPT-5.4 review**: Recommended Merge for all phases (not just 2+), explicit methods over boolean flag, stronger compaction tests, post-Delete resurrection semantics, forward compatibility gating. +- **Gemini 3.1 Pro review**: Flagged field ordering (confirmed not an issue — linear scan), alive bitmap interaction, downgrade compatibility, property-based testing. +- Both agreed the design is sound with these additions. diff --git a/scratch/Cargo.toml b/scratch/Cargo.toml index 1be5db41..c1247bcf 100644 --- a/scratch/Cargo.toml +++ b/scratch/Cargo.toml @@ -24,3 +24,7 @@ parking_lot = "0.12" rand = "0.8" rayon = "1" memmap2 = "0.9" +datasilo = { path = "../crates/datasilo" } +tempfile = "3" +rmp-serde = "1" +rmpv = "1" diff --git a/scripts/dump-test.sh b/scripts/dump-test.sh new file mode 100644 index 00000000..f133d097 --- /dev/null +++ b/scripts/dump-test.sh @@ -0,0 +1,291 @@ +#!/usr/bin/env bash +# Local 6-phase dump test with 32GB RSS kill threshold. +# Usage: bash scripts/dump-test.sh +# +# Starts bitdex-server on port 3001, sends PUT /dumps for each phase, +# monitors RSS every 10s, kills if >32GB. + +set -euo pipefail + +PORT=3001 +BASE_URL="http://localhost:${PORT}" +REPO_DIR="$(cd "$(dirname "$0")/.." && pwd)" +DATA_DIR="${REPO_DIR}/data-dump-test" +STAGE_DIR="${REPO_DIR}/data/load_stage" +INDEX_CONFIG_DIR="${REPO_DIR}/deploy/configs" +MAX_RSS_BYTES=$((32 * 1024 * 1024 * 1024)) # 32GB in bytes +SERVER_PID="" +MONITOR_PID="" + +cleanup() { + echo "[cleanup] Stopping monitor and server..." + [ -n "$MONITOR_PID" ] && kill "$MONITOR_PID" 2>/dev/null || true + [ -n "$SERVER_PID" ] && kill "$SERVER_PID" 2>/dev/null || true + wait 2>/dev/null || true + echo "[cleanup] Done." +} +trap cleanup EXIT + +# ── 0. Clean slate ────────────────────────────────────────────────── +echo "=== Cleaning data dir: $DATA_DIR ===" +rm -rf "$DATA_DIR" +mkdir -p "$DATA_DIR" + +# ── 1. Start server ──────────────────────────────────────────────── +echo "=== Starting bitdex-server on port $PORT ===" +"${REPO_DIR}/target/release/bitdex-server.exe" \ + --port "$PORT" \ + --data-dir "$DATA_DIR" \ + --index-dir "$INDEX_CONFIG_DIR" \ + 2>&1 | tee "$DATA_DIR/server.log" & +SERVER_PID=$! +echo "Server PID: $SERVER_PID" + +# Wait for server to be ready +echo "Waiting for server..." +for i in $(seq 1 60); do + if curl -s "$BASE_URL/health" > /dev/null 2>&1; then + echo "Server ready after ${i}s" + break + fi + if ! kill -0 "$SERVER_PID" 2>/dev/null; then + echo "ERROR: Server died during startup. Check $DATA_DIR/server.log" + exit 1 + fi + sleep 1 +done + +# Verify index was created +echo "=== Checking index status ===" +curl -s "$BASE_URL/api/indexes" | head -200 +echo "" + +# ── 2. RSS monitor (background) ─────────────────────────────────── +monitor_rss() { + local peak_rss=0 + while kill -0 "$SERVER_PID" 2>/dev/null; do + # Windows: use tasklist to get memory (Working Set in KB) + local mem_kb + mem_kb=$(tasklist //FI "PID eq $SERVER_PID" //FO CSV //NH 2>/dev/null \ + | tr -d '"' | awk -F',' '{gsub(/[^0-9]/,"",$NF); print $NF}' 2>/dev/null || echo "0") + + if [ "$mem_kb" = "0" ] || [ -z "$mem_kb" ]; then + # Fallback: try powershell + mem_kb=$(powershell -NoProfile -Command "(Get-Process -Id $SERVER_PID -ErrorAction SilentlyContinue).WorkingSet64 / 1KB" 2>/dev/null | tr -d '\r' || echo "0") + fi + + local mem_bytes=$((mem_kb * 1024)) + local mem_gb=$(awk "BEGIN {printf \"%.2f\", $mem_bytes / 1073741824}") + + if [ "$mem_bytes" -gt "$peak_rss" ]; then + peak_rss=$mem_bytes + fi + + local peak_gb=$(awk "BEGIN {printf \"%.2f\", $peak_rss / 1073741824}") + local ts=$(date +%H:%M:%S) + echo "[$ts] RSS: ${mem_gb}GB (peak: ${peak_gb}GB)" + + if [ "$mem_bytes" -gt "$MAX_RSS_BYTES" ]; then + echo "!!!! RSS ${mem_gb}GB EXCEEDS ${MAX_RSS_BYTES} bytes (32GB) — KILLING SERVER !!!!" + kill "$SERVER_PID" + echo "OOM_KILLED" > "$DATA_DIR/result.txt" + echo "peak_rss_bytes=$peak_rss" >> "$DATA_DIR/result.txt" + exit 1 + fi + + sleep 10 + done + echo "peak_rss_bytes=$peak_rss" >> "$DATA_DIR/result.txt" +} +monitor_rss & +MONITOR_PID=$! + +# ── 3. Convert Windows paths ────────────────────────────────────── +# The server runs on Windows, so CSV paths need Windows-style absolute paths +ABS_STAGE_DIR=$(cd "$STAGE_DIR" && pwd -W 2>/dev/null || pwd) + +# ── 4. Send dump requests (sequential) ──────────────────────────── +send_dump() { + local name="$1" + local json="$2" + echo "" + echo "=== Phase: $name ===" + echo "Sending PUT /api/indexes/civitai/dumps ..." + + local response + response=$(curl -s -w "\n%{http_code}" -X PUT \ + "$BASE_URL/api/indexes/civitai/dumps" \ + -H "Content-Type: application/json" \ + -d "$json") + + local http_code=$(echo "$response" | tail -1) + local body=$(echo "$response" | head -n -1) + echo "HTTP $http_code: $body" + + if [ "$http_code" != "200" ] && [ "$http_code" != "201" ] && [ "$http_code" != "202" ]; then + echo "ERROR: Dump registration failed for $name" + return 1 + fi + + # Poll for completion + echo "Polling for completion..." + local start_time=$(date +%s) + while true; do + local status_resp + status_resp=$(curl -s "$BASE_URL/api/indexes/civitai/dumps" 2>/dev/null) + local phase_status=$(echo "$status_resp" | python3 -c " +import sys, json +data = json.load(sys.stdin) +dumps = data.get('dumps', {}) +for k, v in dumps.items(): + if k.startswith('$name'): + print(v.get('status', 'unknown')) + sys.exit(0) +print('not_found') +" 2>/dev/null || echo "unknown") + + local elapsed=$(( $(date +%s) - start_time )) + echo " [$name] status=$phase_status elapsed=${elapsed}s" + + if [ "$phase_status" = "Complete" ]; then + echo " [$name] COMPLETE in ${elapsed}s" + break + elif [ "$phase_status" = "Failed" ]; then + echo " [$name] FAILED after ${elapsed}s" + return 1 + fi + + sleep 5 + done +} + +# Phase 1: Images (14GB, sets_alive, with enrichment) +send_dump "images" '{ + "name": "images", + "csv_path": "'"$ABS_STAGE_DIR/images.csv"'", + "format": "csv", + "slot_field": "id", + "sets_alive": true, + "fields": [ + "nsfwLevel", + {"column": "type", "target": "type"}, + "userId", + "postId", + "blockedFor", + {"column": "url", "target": "url"}, + {"column": "hash", "target": "hash"}, + "width", + "height" + ], + "computed_fields": [ + {"target": "hasMeta", "expression": "(flags >> 13) & 1 == 1 && (flags >> 2) & 1 == 0"}, + {"target": "onSite", "expression": "(flags >> 14) & 1 == 1"}, + {"target": "minor", "expression": "(flags >> 3) & 1 == 1"}, + {"target": "poi", "expression": "(flags >> 4) & 1 == 1"}, + {"target": "existedAt", "expression": "max(scannedAtSecs, createdAtSecs)"}, + {"target": "id", "expression": "id"} + ], + "enrichment": [ + { + "csv_path": "'"$ABS_STAGE_DIR/posts.csv"'", + "key": "id", + "join_on": "postId", + "fields": [ + {"column": "publishedAtSecs", "target": "publishedAt"}, + {"column": "availability", "target": "availability"} + ], + "computed_fields": [ + {"target": "postedToId", "expression": "lookup_key"}, + {"target": "isPublished", "expression": "publishedAtSecs != null"} + ] + } + ] +}' + +# Phase 2: Tags (63GB) +send_dump "tags" '{ + "name": "tags", + "csv_path": "'"$ABS_STAGE_DIR/tags.csv"'", + "format": "csv", + "slot_field": "imageId", + "fields": [ + {"column": "tagId", "target": "tagIds"} + ], + "filter": "(attributes >> 10) & 1 = 0" +}' + +# Phase 3: Resources (820MB, with nested enrichment) +send_dump "resources" '{ + "name": "resources", + "csv_path": "'"$ABS_STAGE_DIR/resources.csv"'", + "format": "csv", + "slot_field": "imageId", + "fields": [ + {"column": "modelVersionId", "target": "modelVersionIds"} + ], + "computed_fields": [ + {"target": "modelVersionIdsManual", "expression": "detected == false", "value": "modelVersionId"} + ], + "enrichment": [ + { + "csv_path": "'"$ABS_STAGE_DIR/model_versions.csv"'", + "key": "id", + "join_on": "modelVersionId", + "fields": [ + {"column": "baseModel", "target": "baseModel"} + ], + "enrichment": [ + { + "csv_path": "'"$ABS_STAGE_DIR/models.csv"'", + "key": "id", + "join_on": "modelId", + "fields": [ + {"column": "poi", "target": "poi"} + ], + "filter": "type = '\''Checkpoint'\''" + } + ] + } + ] +}' + +# Phase 4: Tools (50MB) +send_dump "tools" '{ + "name": "tools", + "csv_path": "'"$ABS_STAGE_DIR/tools.csv"'", + "format": "csv", + "slot_field": "imageId", + "fields": [ + {"column": "toolId", "target": "toolIds"} + ] +}' + +# Phase 5: Techniques (71MB) +send_dump "techniques" '{ + "name": "techniques", + "csv_path": "'"$ABS_STAGE_DIR/techniques.csv"'", + "format": "csv", + "slot_field": "imageId", + "fields": [ + {"column": "techniqueId", "target": "techniqueIds"} + ] +}' + +# Phase 6: Metrics (1.4GB TSV) +send_dump "metrics" '{ + "name": "metrics", + "csv_path": "'"$ABS_STAGE_DIR/metrics.tsv"'", + "format": "tsv", + "slot_field": "imageId", + "fields": ["reactionCount", "commentCount", "collectedCount"] +}' + +# ── 5. Final status ────────────────────────────────────────────── +echo "" +echo "=== ALL PHASES COMPLETE ===" +echo "PASS" > "$DATA_DIR/result.txt" + +# Get final stats +curl -s "$BASE_URL/api/indexes/civitai/stats" | python3 -m json.tool 2>/dev/null || true +echo "" +echo "=== Dump test finished ===" diff --git a/src/concurrent_engine.rs b/src/concurrent_engine.rs index 0d1d2d1c..4bc5b0c4 100644 --- a/src/concurrent_engine.rs +++ b/src/concurrent_engine.rs @@ -5431,6 +5431,72 @@ impl ConcurrentEngine { pub fn set_cache_min_filter_size(&self, v: usize) { self.unified_cache.lock().config_mut().min_filter_size = v; } + /// Rebuild all time bucket bitmaps from scratch by scanning the sort field + /// for all alive slots. Use after a bulk dump or when buckets are empty/stale. + /// Returns (bucket_count, total_slots_scanned) or an error. + pub fn rebuild_time_buckets(&self) -> crate::error::Result<(usize, u64)> { + let tb_arc = self.time_buckets.as_ref().ok_or_else(|| { + crate::error::BitdexError::Config("no time_buckets configured".into()) + })?; + let snap = self.snapshot(); + let sort_field_name = { + let tb = tb_arc.lock(); + tb.sort_field_name().to_string() + }; + let sort_field = snap.sorts.get_field(&sort_field_name).ok_or_else(|| { + crate::error::BitdexError::Config(format!( + "time bucket sort field '{}' not loaded", sort_field_name + )) + })?; + let alive = snap.slots.alive_bitmap(); + let now_secs = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + // Collect (slot, timestamp) for all alive slots + let slot_count = alive.len(); + let mut slot_values: Vec<(u32, u64)> = Vec::with_capacity(slot_count as usize); + for slot in alive.iter() { + let ts = sort_field.reconstruct_value(slot) as u64; + slot_values.push((slot, ts)); + } + // Rebuild each bucket + let mut tb = tb_arc.lock(); + let bucket_names: Vec = tb.bucket_names(); + for name in &bucket_names { + tb.rebuild_bucket(name, slot_values.iter().copied(), now_secs); + } + let bucket_count = bucket_names.len(); + // Mark dirty so merge thread persists + self.dirty_since_snapshot.store(true, std::sync::atomic::Ordering::Release); + // Invalidate cache — stale entries may hold 0-result bitmaps from before rebuild + self.unified_cache.lock().clear(); + eprintln!( + "rebuild_time_buckets: rebuilt {} buckets from {} alive slots in sort field '{}'", + bucket_count, slot_count, sort_field_name + ); + Ok((bucket_count, slot_count)) + } + + /// Get per-bucket statistics (name, slot count, cutoff). + pub fn time_bucket_stats(&self) -> serde_json::Value { + if let Some(ref tb_arc) = self.time_buckets { + let tb = tb_arc.lock(); + let mut buckets = serde_json::Map::new(); + for name in tb.bucket_names() { + if let Some(bucket) = tb.get_bucket(&name) { + buckets.insert(name, serde_json::json!({ + "slots": bucket.bitmap().len(), + "last_cutoff": bucket.last_cutoff(), + })); + } + } + serde_json::Value::Object(buckets) + } else { + serde_json::Value::Null + } + } + /// Update the refresh interval for a named time bucket. /// Returns true if the bucket was found and updated, false if no time bucket /// manager exists or the bucket name was not found. diff --git a/src/dump_processor.rs b/src/dump_processor.rs index 9d433b45..8b84ccbc 100644 --- a/src/dump_processor.rs +++ b/src/dump_processor.rs @@ -1256,15 +1256,19 @@ pub fn process_dump( slot_watermark: Option>, shutdown: Option bool + Send + Sync>>, ) -> Result { + let t_total = Instant::now(); let mut result = process_dump_with_progress(request, engine, stage_dir, progress_counter, data_schema, slot_watermark.as_ref(), shutdown.as_ref())?; + eprintln!(" Dump {} process_dump_with_progress returned in {:.1}s", request.name, t_total.elapsed().as_secs_f64()); let (alive_s, filter_s, sort_s, meta_s) = engine .shard_stores() .ok_or_else(|| "no bitmap_path configured; cannot process dump".to_string())?; let bitmap_path = engine.config().storage.bitmap_path.as_ref() .ok_or_else(|| "no bitmap_path configured".to_string())?.clone(); let dictionaries = engine.dictionaries_arc(); + let t_save = Instant::now(); save_phase_to_disk(&mut result, &alive_s, &filter_s, &sort_s, &meta_s, &bitmap_path, &dictionaries, &request.name, request.sets_alive)?; - eprintln!(" Dump {} save complete", request.name); + eprintln!(" Dump {} save_phase_to_disk in {:.1}s", request.name, t_save.elapsed().as_secs_f64()); + eprintln!(" Dump {} total process_dump in {:.1}s", request.name, t_total.elapsed().as_secs_f64()); Ok(result) } @@ -1426,7 +1430,12 @@ pub fn process_dump_with_progress( } } - // Mmap the CSV/TSV file + // Mmap the CSV/TSV file. + // IMPORTANT: The mmap is scoped tightly around the parse phase (see the + // `mmap_scope` block below). After parsing completes and the PhaseResult + // is built, the mmap is dropped immediately. This prevents zombie processes + // from holding 80+ GB of virtual memory after a forced kill — the mmap is + // the largest allocation and must not outlive the parse. let csv_path = std::path::Path::new(&request.csv_path); let file = std::fs::File::open(csv_path) .map_err(|e| format!("open {}: {e}", csv_path.display()))?; @@ -1506,7 +1515,7 @@ pub fn process_dump_with_progress( }; if is_tags_optimization { - return process_multi_value_phase( + let result = process_multi_value_phase( request, body, delimiter, @@ -1517,6 +1526,11 @@ pub fn process_dump_with_progress( slot_watermark, shutdown, ); + // Drop the mmap immediately after parsing — prevents zombie processes. + drop(mmap); + drop(file); + eprintln!(" Dump {}: mmap released", request.name); + return result; } emit_stage(&request.name, "parallel_parse", "start", &t, 0); @@ -2088,6 +2102,28 @@ pub fn process_dump_with_progress( emit_stage(&request.name, "parallel_parse", "done", &t, total.load(Ordering::Relaxed)); + // Drop the mmap immediately after parsing — prevents zombie processes from + // holding 80+ GB of virtual memory if the process is force-killed during + // the merge/save phase. NLL ensures the borrow of `body`/`data` has ended. + drop(mmap); + drop(file); + eprintln!(" Dump {}: mmap released", request.name); + + // Drop enrichment tables on a background thread — they can be 5+ GB and + // take 30-60s to free due to millions of individual heap allocations. + // Spawning the drop avoids blocking the save phase. + { + let name = request.name.clone(); + std::thread::spawn(move || { + let t_drop = Instant::now(); + drop(enrichment_mgr); + let secs = t_drop.elapsed().as_secs_f64(); + if secs > 1.0 { + eprintln!(" Dump {}: enrichment drop took {:.1}s (background)", name, secs); + } + }); + } + emit_stage(&request.name, "merge", "start", &t, total.load(Ordering::Relaxed)); // Merge all thread results — parallel tree reduction type MergeAccum = ( @@ -2452,9 +2488,15 @@ fn process_multi_value_phase( let total = AtomicU64::new(0); let total_ref = &total; - // Spawn docstore writer thread — rayon threads push (slot, value) to channel, - // writer drains and writes per shard. Zero contention on parse threads. - let (doc_tx, doc_rx) = if field_idx.is_some() { + // For the vec path (tagIds): docstore writes are deferred to a post-pass after + // bitmap merge. We invert the merged bitmaps shard-by-shard and write one Merge + // op per slot with the complete multi-value array. This reduces 4.5B individual + // writes to ~109M (one per slot) and fixes the correctness bug where Set overwrote + // previous values instead of accumulating. + // + // For the HashMap path (tools/techniques): use the old channel-based writer since + // these are small datasets where per-row Set ops are fine. + let (doc_tx, doc_rx) = if !use_vec && field_idx.is_some() { let (tx, rx) = crossbeam_channel::bounded::>(64); (Some(tx), Some(rx)) } else { @@ -2474,7 +2516,6 @@ fn process_multi_value_phase( } } } - // Finalize: flush BufWriters and update shard headers if let Err(e) = bw.finalize() { eprintln!("StreamingDocWriter: multi-value finalize error: {e}"); } @@ -2499,7 +2540,6 @@ fn process_multi_value_phase( let chunk = &body[range_start..range_end]; let mut bitmaps: Vec = (0..MAX_TAG_ID).map(|_| RoaringBitmap::new()).collect(); - let mut doc_batch: Vec<(u32, i64)> = Vec::with_capacity(10_000); let mut local_max_slot: u32 = 0; let mut count = 0u64; let mut line_start = 0; @@ -2545,16 +2585,6 @@ fn process_multi_value_phase( if value < MAX_TAG_ID { bitmaps[value].insert(slot); } - // Batch for writer thread - if doc_tx.is_some() { - doc_batch.push((slot, value as i64)); - if doc_batch.len() >= 10_000 { - if let Some(ref tx) = doc_tx { - let _ = tx.send(std::mem::take(&mut doc_batch)); - doc_batch = Vec::with_capacity(10_000); - } - } - } count += 1; if count % LOG_INTERVAL == 0 { total_ref.fetch_add(LOG_INTERVAL, Ordering::Relaxed); @@ -2562,11 +2592,6 @@ fn process_multi_value_phase( if let Some(ref sf) = shutdown { if sf() { break; } } } } - if !doc_batch.is_empty() { - if let Some(ref tx) = doc_tx { - let _ = tx.send(doc_batch); - } - } let remainder = count % LOG_INTERVAL; total_ref.fetch_add(remainder, Ordering::Relaxed); if let Some(ref p) = progress_counter { p.fetch_add(remainder, Ordering::Relaxed); } @@ -2578,10 +2603,8 @@ fn process_multi_value_phase( }) .collect(); - // Docstore writes sent to writer thread above - // Merge Vec — parallel tree reduction - let mut merged_vec = thread_results + let merged_vec = thread_results .into_par_iter() .reduce( || (0..MAX_TAG_ID).map(|_| RoaringBitmap::new()).collect::>(), @@ -2595,32 +2618,133 @@ fn process_multi_value_phase( }, ); - // Convert to HashMap (non-empty only) - let mut filter_map: HashMap = HashMap::new(); - for (i, bm) in merged_vec.drain(..).enumerate() { - if !bm.is_empty() { - filter_map.insert(i as u64, bm); - } - } - let total_rows = total.load(Ordering::Relaxed); + // Collect non-empty tag IDs for iteration + let non_empty_tags: Vec = merged_vec.iter() + .enumerate() + .filter(|(_, bm)| !bm.is_empty()) + .map(|(i, _)| i) + .collect(); + let distinct_count = non_empty_tags.len(); eprintln!( " Dump {} ({target}): {} rows, {} distinct values", - request.name, - total_rows, - filter_map.len(), + request.name, total_rows, distinct_count, ); - let mut filter_maps = HashMap::new(); - filter_maps.insert(target, filter_map); + emit_stage(&request.name, "parallel_parse", "done", &t_mv, total_rows); + + // ── Post-pass: invert bitmaps → per-slot tag arrays, write Merge ops ── + // + // Process in shard ranges (1M slots each) using rayon parallelism. + // For each shard: count tags per slot, build flat array, write Merge ops. + // Uses min/max per-tag to skip bitmaps that don't overlap the shard. + // + // Benchmarked at ~5 min for 4.5B tag entries at 109M slots (synthetic). + // DashMap alternative was tested and is 3-5x slower due to lock contention. + if let Some(fidx) = field_idx { + let t_doc = Instant::now(); + const SHARD_SIZE: u32 = 1_000_000; + let max_slot = non_empty_tags.iter() + .filter_map(|&tag| merged_vec[tag].max()) + .max() + .unwrap_or(0); + let num_shards = (max_slot / SHARD_SIZE) + 1; + + // Pre-compute min/max slot per tag for fast range skipping + let tag_ranges: Vec<(usize, u32, u32)> = non_empty_tags.iter() + .filter_map(|&tag| { + let bm = &merged_vec[tag]; + Some((tag, bm.min()?, bm.max()?)) + }) + .collect(); + + let total_docs_written = AtomicU64::new(0); + let bw_ref = &*bulk_writer; + let merged_ref = &merged_vec; + let tag_ranges_ref = &tag_ranges; + + (0..num_shards).into_par_iter().for_each(|shard_idx| { + let shard_start = shard_idx * SHARD_SIZE; + let shard_end = shard_start + SHARD_SIZE; + + // Filter to tags that overlap this shard + let relevant_tags: Vec = tag_ranges_ref.iter() + .filter(|&&(_, min, max)| max >= shard_start && min < shard_end) + .map(|&(tag, _, _)| tag) + .collect(); + if relevant_tags.is_empty() { return; } + + // Pass 1: count tags per slot + let mut counts = vec![0u32; SHARD_SIZE as usize]; + for &tag_id in &relevant_tags { + for slot in merged_ref[tag_id].iter() { + if slot < shard_start { continue; } + if slot >= shard_end { break; } + counts[(slot - shard_start) as usize] += 1; + } + } + + // Pass 2: prefix sum + let mut offsets = vec![0u32; SHARD_SIZE as usize]; + let mut current_offset = 0u32; + for i in 0..SHARD_SIZE as usize { + offsets[i] = current_offset; + current_offset += counts[i]; + } + let total_tags = current_offset as usize; + if total_tags == 0 { return; } + + // Pass 3: fill flat tag array + let mut flat_tags = vec![0i64; total_tags]; + let mut cursors = offsets.clone(); + for &tag_id in &relevant_tags { + for slot in merged_ref[tag_id].iter() { + if slot < shard_start { continue; } + if slot >= shard_end { break; } + let idx = (slot - shard_start) as usize; + let pos = cursors[idx] as usize; + flat_tags[pos] = tag_id as i64; + cursors[idx] += 1; + } + } + + // Pass 4: write one Merge per slot + let mut shard_docs = 0u64; + for i in 0..SHARD_SIZE as usize { + if counts[i] > 0 { + let start = offsets[i] as usize; + let end = start + counts[i] as usize; + let tags = &flat_tags[start..end]; + let slot = shard_start + i as u32; + bw_ref.write_merge_doc(slot, &[ + (fidx, PackedValue::Mi(tags.to_vec())), + ]); + shard_docs += 1; + } + } + total_docs_written.fetch_add(shard_docs, Ordering::Relaxed); + }); - // Wait for docstore writer thread to finish - drop(doc_tx); - if let Some(handle) = doc_writer_handle { - handle.join().ok(); + if let Err(e) = bulk_writer.finalize() { + eprintln!(" dump {}: StreamingDocWriter finalize error: {e}", request.name); + } + let docs = total_docs_written.load(Ordering::Relaxed); + eprintln!( + " Dump {} docstore post-pass: {} docs in {:.1}s ({} shards, {:.0} docs/sec)", + request.name, docs, t_doc.elapsed().as_secs_f64(), num_shards, + docs as f64 / t_doc.elapsed().as_secs_f64().max(0.001) + ); } - emit_stage(&request.name, "parallel_parse", "done", &t_mv, total_rows); + // Convert to HashMap for return + let mut filter_map: HashMap = HashMap::new(); + for (i, bm) in merged_vec.into_iter().enumerate() { + if !bm.is_empty() { + filter_map.insert(i as u64, bm); + } + } + let mut filter_maps = HashMap::new(); + filter_maps.insert(target, filter_map); Ok(PhaseResult { row_count: total_rows, @@ -2917,7 +3041,7 @@ fn write_docstore_row_indexed( let refs: Vec<(u16, &[u8])> = tuple_buf.iter() .map(|&(idx, off, len)| (idx, &serialize_buf[off as usize..(off + len) as usize])) .collect(); - bulk_writer.append_tuples_raw(slot, &refs, write_buf); + bulk_writer.append_tuples_merge(slot, &refs, write_buf); } } diff --git a/src/field_handler.rs b/src/field_handler.rs index 1088abcd..060cac99 100644 --- a/src/field_handler.rs +++ b/src/field_handler.rs @@ -252,6 +252,7 @@ impl FieldRegistry { } DocOp::Delete { .. } => None, // Always valid DocOp::Create { .. } => None, // Always valid + DocOp::Merge { .. } => None, // Always valid } } diff --git a/src/server.rs b/src/server.rs index 88a3c2e2..c811430c 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1348,6 +1348,7 @@ impl BitdexServer { .route("/api/indexes/{name}/fields", post(handle_add_fields).delete(handle_remove_fields)) .route("/api/indexes/{name}/fields/{field}/reload", post(handle_reload_field)) .route("/api/indexes/{name}/compact", post(handle_compact)) + .route("/api/indexes/{name}/time-buckets/rebuild", post(handle_rebuild_time_buckets)) .route("/api/indexes/{name}/snapshot", post(handle_save_snapshot)) .route("/api/indexes/{name}/cursors/{cursor_name}", put(handle_set_cursor)) // Capture endpoints (Phase 2) @@ -1387,6 +1388,8 @@ impl BitdexServer { .route("/api/indexes/{name}/dumps/{dump_name}/loaded", post(handle_dump_loaded)) .route("/api/indexes/{name}/dumps/{dump_name}", delete(handle_delete_dump)) .route("/api/indexes/{name}/dumps/clear", post(handle_clear_dumps)) + .route("/api/indexes/{name}/dictionaries", get(handle_dictionaries)) + .route("/api/indexes/{name}/ui-config", get(handle_ui_config)) .route("/metrics", get(handle_metrics)) .route("/", get(handle_ui)) .with_state(Arc::clone(&state)); @@ -1952,6 +1955,95 @@ async fn handle_get_index( } } +// --------------------------------------------------------------------------- +// Handlers: UI — Dictionaries & UI Config +// --------------------------------------------------------------------------- + +/// GET /api/indexes/{name}/dictionaries — reverse maps (int → display string) +/// for all fields that have dictionaries (LowCardinalityString) or string_maps +/// (MappedString). The UI uses these to populate dropdowns and render labels. +async fn handle_dictionaries( + State(state): State, + AxumPath(name): AxumPath, +) -> impl IntoResponse { + let guard = state.index.lock(); + match guard.as_ref() { + Some(idx) if idx.definition.name == name => { + let mut result: serde_json::Map = serde_json::Map::new(); + + // LowCardinalityString dictionaries from the engine + for (field_name, dict) in idx.engine.dictionaries().iter() { + let snap = dict.snapshot(); + let reverse = snap.to_reverse_map(); + let map: serde_json::Map = reverse.iter() + .map(|(k, v)| (k.to_string(), serde_json::Value::String(v.clone()))) + .collect(); + result.insert(field_name.clone(), serde_json::Value::Object(map)); + } + + // MappedString fields from data_schema (reverse the string_map) + for mapping in &idx.definition.data_schema.fields { + if let Some(ref string_map) = mapping.string_map { + if !result.contains_key(&mapping.target) { + let reverse: serde_json::Map = string_map.iter() + .map(|(label, &id)| (id.to_string(), serde_json::Value::String(label.clone()))) + .collect(); + result.insert(mapping.target.clone(), serde_json::Value::Object(reverse)); + } + } + } + + Json(serde_json::Value::Object(result)).into_response() + } + _ => ( + StatusCode::NOT_FOUND, + Json(serde_json::json!({"error": format!("Index '{}' not found", name)})), + ).into_response(), + } +} + +/// GET /api/indexes/{name}/ui-config — serve the UI config YAML as JSON. +/// Loaded from data_dir/indexes/{name}/ui-config.yaml (or index_dir if set). +/// Returns {} if no UI config file exists (UI falls back to auto-generated controls). +async fn handle_ui_config( + State(state): State, + AxumPath(name): AxumPath, +) -> impl IntoResponse { + let config_source_dir = state.index_dir.clone() + .unwrap_or_else(|| state.data_dir.join("indexes")); + let candidates = [ + config_source_dir.join(&name).join("ui-config.yaml"), + config_source_dir.join(&name).join("ui-config.yml"), + state.data_dir.join("indexes").join(&name).join("ui-config.yaml"), + state.data_dir.join("indexes").join(&name).join("ui-config.yml"), + ]; + + for path in &candidates { + if path.exists() { + match std::fs::read_to_string(path) { + Ok(yaml_str) => { + match serde_yaml::from_str::(&yaml_str) { + Ok(val) => return Json(val).into_response(), + Err(e) => { + eprintln!("Failed to parse ui-config at {}: {e}", path.display()); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({"error": format!("Invalid ui-config YAML: {e}")})), + ).into_response(); + } + } + } + Err(e) => { + eprintln!("Failed to read ui-config at {}: {e}", path.display()); + } + } + } + } + + // No config file — return empty object (UI auto-generates) + Json(serde_json::json!({})).into_response() +} + // --------------------------------------------------------------------------- // Handlers: Config Patch // --------------------------------------------------------------------------- @@ -3448,6 +3540,42 @@ struct CompactRequest { workers: Option, } +async fn handle_rebuild_time_buckets( + State(state): State, + AxumPath(name): AxumPath, +) -> impl IntoResponse { + let engine = { + let guard = state.index.lock(); + match guard.as_ref() { + Some(idx) if idx.definition.name == name => Arc::clone(&idx.engine), + _ => { + return ( + StatusCode::NOT_FOUND, + Json(serde_json::json!({"error": format!("Index '{}' not found", name)})), + ).into_response(); + } + } + }; + match engine.rebuild_time_buckets() { + Ok((bucket_count, slots_scanned)) => { + // Include per-bucket counts in the response + let bucket_details = engine.time_bucket_stats(); + Json(serde_json::json!({ + "status": "ok", + "buckets_rebuilt": bucket_count, + "slots_scanned": slots_scanned, + "buckets": bucket_details, + })).into_response() + } + Err(e) => { + ( + StatusCode::BAD_REQUEST, + Json(serde_json::json!({"error": e.to_string()})), + ).into_response() + } + } +} + async fn handle_compact( State(state): State, AxumPath(name): AxumPath, diff --git a/src/shard_store.rs b/src/shard_store.rs index c9a13f20..7295c98d 100644 --- a/src/shard_store.rs +++ b/src/shard_store.rs @@ -239,14 +239,10 @@ pub fn read_op_entries_pub(data: &[u8]) -> Vec { read_op_entries::(data) } -/// Simple CRC32 (IEEE / CRC-32C via software). We use a basic lookup table. +/// CRC32 using hardware acceleration when available (SSE4.2/ARM NEON), +/// falling back to optimized software tables. 10-50x faster than naive. pub(crate) fn crc32_of(data: &[u8]) -> u32 { - let mut crc: u32 = 0xFFFF_FFFF; - for &byte in data { - let idx = ((crc ^ byte as u32) & 0xFF) as usize; - crc = CRC32_TABLE[idx] ^ (crc >> 8); - } - crc ^ 0xFFFF_FFFF + crc32fast::hash(data) } /// CRC-32 lookup table (IEEE polynomial 0xEDB88320). diff --git a/src/shard_store_doc.rs b/src/shard_store_doc.rs index efb78121..d30220c6 100644 --- a/src/shard_store_doc.rs +++ b/src/shard_store_doc.rs @@ -157,6 +157,11 @@ pub enum DocOp { /// Create a document with a full set of fields. Create { slot: u32, fields: Vec<(u16, PackedValue)> }, + + /// Merge fields into an existing document (or create if absent). + /// Unlike Create which replaces the entire doc, Merge upserts each field. + /// Used by multi-phase dump writes where phases add fields incrementally. + Merge { slot: u32, fields: Vec<(u16, PackedValue)> }, } // --------------------------------------------------------------------------- @@ -168,6 +173,7 @@ const OP_TAG_APPEND: u8 = 0x02; const OP_TAG_REMOVE: u8 = 0x03; const OP_TAG_DELETE: u8 = 0x04; const OP_TAG_CREATE: u8 = 0x05; +const OP_TAG_MERGE: u8 = 0x06; // --------------------------------------------------------------------------- // PackedValue binary encoding (compact, no msgpack dependency) @@ -393,8 +399,9 @@ impl OpCodec for DocOpCodec { buf.push(OP_TAG_DELETE); buf.extend_from_slice(&slot.to_le_bytes()); } - DocOp::Create { slot, fields } => { - buf.push(OP_TAG_CREATE); + DocOp::Create { slot, fields } | DocOp::Merge { slot, fields } => { + let tag = if matches!(op, DocOp::Merge { .. }) { OP_TAG_MERGE } else { OP_TAG_CREATE }; + buf.push(tag); buf.extend_from_slice(&slot.to_le_bytes()); buf.extend_from_slice(&(fields.len() as u16).to_le_bytes()); for (field_idx, value) in fields { @@ -443,13 +450,14 @@ impl OpCodec for DocOpCodec { })?); Ok(DocOp::Delete { slot }) } - OP_TAG_CREATE => { + OP_TAG_CREATE | OP_TAG_MERGE => { + let label = if tag == OP_TAG_MERGE { "Merge" } else { "Create" }; let slot = u32::from_le_bytes(bytes[pos..pos + 4].try_into().map_err(|_| { - io::Error::new(io::ErrorKind::UnexpectedEof, "truncated slot in Create") + io::Error::new(io::ErrorKind::UnexpectedEof, format!("truncated slot in {}", label)) })?); pos += 4; let num_fields = u16::from_le_bytes(bytes[pos..pos + 2].try_into().map_err(|_| { - io::Error::new(io::ErrorKind::UnexpectedEof, "truncated field count in Create") + io::Error::new(io::ErrorKind::UnexpectedEof, format!("truncated field count in {}", label)) })?) as usize; pos += 2; let mut fields = Vec::with_capacity(num_fields); @@ -457,7 +465,11 @@ impl OpCodec for DocOpCodec { let (field_idx, value) = decode_field_pair(bytes, &mut pos)?; fields.push((field_idx, value)); } - Ok(DocOp::Create { slot, fields }) + if tag == OP_TAG_MERGE { + Ok(DocOp::Merge { slot, fields }) + } else { + Ok(DocOp::Create { slot, fields }) + } } other => Err(io::Error::new( io::ErrorKind::InvalidData, @@ -531,6 +543,16 @@ impl OpCodec for DocOpCodec { DocOp::Create { slot, fields } => { snapshot.docs.insert(*slot, fields.clone()); } + DocOp::Merge { slot, fields } => { + let doc = snapshot.docs.entry(*slot).or_default(); + for (field_idx, value) in fields { + if let Some(entry) = doc.iter_mut().find(|(f, _)| *f == *field_idx) { + entry.1 = value.clone(); + } else { + doc.push((*field_idx, value.clone())); + } + } + } } } } @@ -1633,6 +1655,43 @@ impl StreamingDocWriter { shard.ops_count += 1; } + /// Write a doc's fields as a DocOp::Merge op to the shard file. + /// Unlike write_doc (Create), this merges fields into the existing document. + /// Used by multi-phase dumps where each phase adds fields incrementally. + pub fn write_merge_doc(&self, slot: u32, fields: &[(u16, PackedValue)]) { + let non_default: Vec<(u16, PackedValue)> = fields.iter() + .filter(|(idx, val)| { + self.field_defaults.get(idx).map_or(true, |d| d != val) + }) + .cloned() + .collect(); + + if non_default.is_empty() { + return; + } + + let shard_key = SlotHexShard::slot_to_shard(slot); + let mutex = self.shards.entry(shard_key) + .or_insert_with(|| { + Arc::new(parking_lot::Mutex::new(self.open_shard(shard_key))) + }) + .clone(); + + let op = DocOp::Merge { slot, fields: non_default }; + let mut payload = Vec::new(); + DocOpCodec::encode_op(&op, &mut payload); + + let len = payload.len() as u32; + let crc = crate::shard_store::crc32_of(&payload); + + let mut shard = mutex.lock(); + use std::io::Write; + let _ = shard.writer.write_all(&len.to_le_bytes()); + let _ = shard.writer.write_all(&payload); + let _ = shard.writer.write_all(&crc.to_le_bytes()); + shard.ops_count += 1; + } + /// Write a single field value as a DocOp::Set op. /// Used for multi-value phases (tags, resources) that append to existing docs. pub fn write_field(&self, slot: u32, field_idx: u16, value: &PackedValue) { @@ -1707,6 +1766,52 @@ impl StreamingDocWriter { shard.ops_count += 1; } + /// Write raw msgpack-encoded tuples as a DocOp::Merge. + /// Like append_tuples_raw but merges into existing docs instead of replacing. + /// Used by multi-phase dumps where each phase adds fields incrementally. + pub fn append_tuples_merge(&self, slot: u32, tuples: &[(u16, &[u8])], _write_buf: &mut Vec) { + if tuples.is_empty() { + return; + } + + let mut fields = Vec::with_capacity(tuples.len()); + for &(field_idx, value_bytes) in tuples { + let pv: PackedValue = match rmp_serde::from_slice(value_bytes) { + Ok(v) => v, + Err(_) => continue, + }; + if self.field_defaults.get(&field_idx).map_or(false, |d| d == &pv) { + continue; + } + fields.push((field_idx, pv)); + } + + if fields.is_empty() { + return; + } + + let shard_key = SlotHexShard::slot_to_shard(slot); + let mutex = self.shards.entry(shard_key) + .or_insert_with(|| { + Arc::new(parking_lot::Mutex::new(self.open_shard(shard_key))) + }) + .clone(); + + let op = DocOp::Merge { slot, fields }; + let mut payload = Vec::new(); + DocOpCodec::encode_op(&op, &mut payload); + + let len = payload.len() as u32; + let crc = crate::shard_store::crc32_of(&payload); + + let mut shard = mutex.lock(); + use std::io::Write; + let _ = shard.writer.write_all(&len.to_le_bytes()); + let _ = shard.writer.write_all(&payload); + let _ = shard.writer.write_all(&crc.to_le_bytes()); + shard.ops_count += 1; + } + /// Write a single raw msgpack tuple. API-compatible with ShardStoreBulkWriter. pub fn append_tuple_raw(&self, slot: u32, field_idx: u16, value_bytes: &[u8]) { let pv: PackedValue = match rmp_serde::from_slice(value_bytes) { @@ -1767,10 +1872,10 @@ impl StreamingDocWriter { continue; } - if let Err(e) = file.sync_all() { - eprintln!("StreamingDocWriter: sync shard {shard_key}: {e}"); - errors += 1; - } + // Note: sync_all() removed for bulk dump performance. + // Per-shard fsync on 200K+ files takes 20-200s. Dumps are idempotent + // (can be rerun on crash), so crash consistency is not required here. + // The bitmap save phase does its own fsync via ShardStore. } } @@ -1809,7 +1914,7 @@ impl StreamingDocWriter { use std::io::Seek; let _ = f.seek(std::io::SeekFrom::End(0)); return ShardFileWriter { - writer: std::io::BufWriter::with_capacity(256, f), + writer: std::io::BufWriter::with_capacity(8192, f), ops_count: header.ops_count, }; } @@ -1839,8 +1944,9 @@ impl StreamingDocWriter { header.encode(&mut header_bytes); let f = std::fs::File::create(&path).expect("failed to create shard file"); - // Small buffer: 213K shards × 256 bytes = 54MB total, vs 1.7GB with default 8KB - let mut writer = std::io::BufWriter::with_capacity(256, f); + // 8KB buffer: 213K shards × 8KB = 1.7GB worst case, but most shards aren't + // open simultaneously. 256B was causing per-write syscalls during bulk dumps. + let mut writer = std::io::BufWriter::with_capacity(8192, f); use std::io::Write; writer.write_all(&header_bytes).expect("failed to write shard header"); @@ -2486,6 +2592,204 @@ mod tests { let snap = store.read(&shard_key).unwrap().unwrap(); assert_eq!(snap.docs[&100][0], (0, PackedValue::I(42))); } + + // ---- DocOp::Merge tests ---- + + #[test] + fn test_merge_op_roundtrip() { + let op = DocOp::Merge { + slot: 42, + fields: vec![ + (0, PackedValue::I(1)), + (1, PackedValue::S("test".into())), + ], + }; + let mut buf = Vec::new(); + DocOpCodec::encode_op(&op, &mut buf); + let decoded = DocOpCodec::decode_op(&buf).unwrap(); + match decoded { + DocOp::Merge { slot, fields } => { + assert_eq!(slot, 42); + assert_eq!(fields.len(), 2); + assert_eq!(fields[0], (0, PackedValue::I(1))); + assert_eq!(fields[1], (1, PackedValue::S("test".into()))); + } + _ => panic!("expected Merge, got {:?}", decoded), + } + } + + #[test] + fn test_apply_merge_combines_fields() { + let mut snap = DocSnapshot::new(); + // Phase 1: Create doc with fields 0 and 1 + DocOpCodec::apply(&mut snap, &DocOp::Create { + slot: 1, + fields: vec![(0, PackedValue::I(100)), (1, PackedValue::S("hello".into()))], + }); + // Phase 2: Merge field 2 (new) and field 3 (new) + DocOpCodec::apply(&mut snap, &DocOp::Merge { + slot: 1, + fields: vec![(2, PackedValue::I(200)), (3, PackedValue::S("world".into()))], + }); + let doc = &snap.docs[&1]; + assert_eq!(doc.len(), 4); + assert_eq!(doc.iter().find(|(f, _)| *f == 0).unwrap().1, PackedValue::I(100)); + assert_eq!(doc.iter().find(|(f, _)| *f == 1).unwrap().1, PackedValue::S("hello".into())); + assert_eq!(doc.iter().find(|(f, _)| *f == 2).unwrap().1, PackedValue::I(200)); + assert_eq!(doc.iter().find(|(f, _)| *f == 3).unwrap().1, PackedValue::S("world".into())); + } + + #[test] + fn test_apply_merge_overwrites_existing_field() { + let mut snap = DocSnapshot::new(); + DocOpCodec::apply(&mut snap, &DocOp::Create { + slot: 1, + fields: vec![(0, PackedValue::I(100)), (1, PackedValue::S("old".into()))], + }); + DocOpCodec::apply(&mut snap, &DocOp::Merge { + slot: 1, + fields: vec![(1, PackedValue::S("new".into()))], + }); + let doc = &snap.docs[&1]; + assert_eq!(doc.len(), 2); + assert_eq!(doc.iter().find(|(f, _)| *f == 0).unwrap().1, PackedValue::I(100)); + assert_eq!(doc.iter().find(|(f, _)| *f == 1).unwrap().1, PackedValue::S("new".into())); + } + + #[test] + fn test_apply_merge_on_empty_doc() { + let mut snap = DocSnapshot::new(); + DocOpCodec::apply(&mut snap, &DocOp::Merge { + slot: 1, + fields: vec![(0, PackedValue::I(42))], + }); + let doc = &snap.docs[&1]; + assert_eq!(doc.len(), 1); + assert_eq!(doc[0], (0, PackedValue::I(42))); + } + + #[test] + fn test_merge_then_merge_accumulates() { + let mut snap = DocSnapshot::new(); + DocOpCodec::apply(&mut snap, &DocOp::Merge { + slot: 1, + fields: vec![(0, PackedValue::I(1))], + }); + DocOpCodec::apply(&mut snap, &DocOp::Merge { + slot: 1, + fields: vec![(1, PackedValue::I(2))], + }); + DocOpCodec::apply(&mut snap, &DocOp::Merge { + slot: 1, + fields: vec![(2, PackedValue::I(3)), (0, PackedValue::I(99))], // overwrites field 0 + }); + let doc = &snap.docs[&1]; + assert_eq!(doc.len(), 3); + assert_eq!(doc.iter().find(|(f, _)| *f == 0).unwrap().1, PackedValue::I(99)); + assert_eq!(doc.iter().find(|(f, _)| *f == 1).unwrap().1, PackedValue::I(2)); + assert_eq!(doc.iter().find(|(f, _)| *f == 2).unwrap().1, PackedValue::I(3)); + } + + #[test] + fn test_merge_then_create_replaces() { + let mut snap = DocSnapshot::new(); + DocOpCodec::apply(&mut snap, &DocOp::Merge { + slot: 1, + fields: vec![(0, PackedValue::I(1)), (1, PackedValue::I(2))], + }); + // Create replaces everything — this is the ops pipeline behavior + DocOpCodec::apply(&mut snap, &DocOp::Create { + slot: 1, + fields: vec![(5, PackedValue::I(99))], + }); + let doc = &snap.docs[&1]; + assert_eq!(doc.len(), 1); + assert_eq!(doc[0], (5, PackedValue::I(99))); + } + + #[test] + fn test_delete_then_merge_resurrects() { + let mut snap = DocSnapshot::new(); + DocOpCodec::apply(&mut snap, &DocOp::Create { + slot: 1, + fields: vec![(0, PackedValue::I(100))], + }); + DocOpCodec::apply(&mut snap, &DocOp::Delete { slot: 1 }); + assert!(!snap.docs.contains_key(&1)); + DocOpCodec::apply(&mut snap, &DocOp::Merge { + slot: 1, + fields: vec![(5, PackedValue::I(999))], + }); + let doc = &snap.docs[&1]; + assert_eq!(doc.len(), 1); + assert_eq!(doc[0], (5, PackedValue::I(999))); + } + + #[test] + fn test_merge_duplicate_fields_last_wins() { + let mut snap = DocSnapshot::new(); + // Merge with duplicate field 0 — second occurrence should win + DocOpCodec::apply(&mut snap, &DocOp::Merge { + slot: 1, + fields: vec![(0, PackedValue::I(1)), (0, PackedValue::I(2))], + }); + let doc = &snap.docs[&1]; + // First insert of field 0 creates entry, second overwrites it + assert_eq!(doc.iter().find(|(f, _)| *f == 0).unwrap().1, PackedValue::I(2)); + } + + #[test] + fn test_streaming_writer_merge_between_phases() { + let dir = tempfile::tempdir().unwrap(); + let docs_dir = dir.path().join("docs"); + let field_names = vec!["userId".to_string(), "nsfwLevel".to_string(), "reactionCount".to_string()]; + + // Phase 1: write userId + nsfwLevel via merge + let mut ds = DocStoreV3::open(&docs_dir).unwrap(); + let writer = ds.prepare_streaming_writer(&field_names).unwrap(); + let fidx = writer.field_to_idx().clone(); + writer.write_merge_doc(42, &[ + (fidx["userId"], PackedValue::I(123)), + (fidx["nsfwLevel"], PackedValue::I(1)), + ]); + writer.finalize().unwrap(); + + // Phase 2: write reactionCount via merge (new writer) + let writer2 = ds.prepare_streaming_writer(&field_names).unwrap(); + writer2.write_merge_doc(42, &[ + (fidx["reactionCount"], PackedValue::I(500)), + ]); + writer2.finalize().unwrap(); + + // Read back — should have all 3 fields + let doc = ds.get(42).unwrap().unwrap(); + assert_eq!(doc.fields.len(), 3, "expected 3 fields, got {:?}", doc.fields); + } + + #[test] + fn test_streaming_writer_merge_and_set_between_phases() { + let dir = tempfile::tempdir().unwrap(); + let docs_dir = dir.path().join("docs"); + let field_names = vec!["userId".to_string(), "tagIds".to_string()]; + + // Phase 1: write userId via merge + let mut ds = DocStoreV3::open(&docs_dir).unwrap(); + let writer = ds.prepare_streaming_writer(&field_names).unwrap(); + let fidx = writer.field_to_idx().clone(); + writer.write_merge_doc(42, &[ + (fidx["userId"], PackedValue::I(123)), + ]); + writer.finalize().unwrap(); + + // Phase 2: write tagIds via Set (single-field tuple write) + let writer2 = ds.prepare_streaming_writer(&field_names).unwrap(); + writer2.write_field(42, fidx["tagIds"], &PackedValue::Mi(vec![10, 20, 30])); + writer2.finalize().unwrap(); + + // Read back — should have both fields + let doc = ds.get(42).unwrap().unwrap(); + assert_eq!(doc.fields.len(), 2, "expected 2 fields, got {:?}", doc.fields); + } } // --------------------------------------------------------------------------- diff --git a/static/index.html b/static/index.html index 2a8c57b1..e461ed08 100644 --- a/static/index.html +++ b/static/index.html @@ -3,7 +3,7 @@ -BitDex V2 — Civitai Demo +BitDex V2 -
- +
Records: --
-
-
-
- -
- - - - - - -
-
-
- - -
-
- - -
-
- - -
-
- - -
-
- - -
-
- - -
-
- - -
-
- - -
-
- - -
-
- - -
-
- - -
-
-
-
- - -
-
- - -
-
- - -
-
- - -
-
+
+
-
-
- -
+
-
-