From 95dc02219a07063ab4d816fd8be613743d871f7f Mon Sep 17 00:00:00 2001 From: Justin Maier Date: Wed, 25 Mar 2026 18:06:58 -0600 Subject: [PATCH 01/10] =?UTF-8?q?feat:=20Sync=20V2=20foundation=20?= =?UTF-8?q?=E2=80=94=20ops=20types,=20dedup=20helper,=20WAL=20writer/reade?= =?UTF-8?q?r?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Core building blocks for the ops-based sync pipeline: - src/pg_sync/ops.rs: Op enum (Set, Remove, Add, Delete, QueryOpSet), OpsRow, OpsBatch, EntityOps, SyncMeta, BitdexOps table SQL - src/pg_sync/op_dedup.rs: Shared dedup helper — LIFO per (entity_id, field), add/remove cancellation, delete absorption, queryOpSet last-wins - src/ops_wal.rs: Append-only WAL with CRC32 integrity, WalWriter (append+fsync), WalReader (cursor-based tail, partial record handling, CRC skip) Also fixes pre-existing compile error in copy_queries.rs tests (missing width/height fields on CopyImageRow constructors). 30 tests passing. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/lib.rs | 2 + src/ops_wal.rs | 410 ++++++++++++++++++++++++++++++++++++ src/pg_sync/copy_queries.rs | 3 + src/pg_sync/mod.rs | 2 + src/pg_sync/op_dedup.rs | 285 +++++++++++++++++++++++++ src/pg_sync/ops.rs | 282 +++++++++++++++++++++++++ 6 files changed, 984 insertions(+) create mode 100644 src/ops_wal.rs create mode 100644 src/pg_sync/op_dedup.rs create mode 100644 src/pg_sync/ops.rs diff --git a/src/lib.rs b/src/lib.rs index 61669958..b110cff7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,8 @@ pub mod bitmap_fs; pub mod bound_store; pub mod bucket_diff_log; +#[cfg(feature = "pg-sync")] +pub mod ops_wal; pub mod cache; pub mod capture; pub mod concurrency; diff --git a/src/ops_wal.rs b/src/ops_wal.rs new file mode 100644 index 00000000..2b9c63a0 --- /dev/null +++ b/src/ops_wal.rs @@ -0,0 +1,410 @@ +//! Ops WAL — append-only log for sync operations. +//! +//! Format per record: +//! [4 bytes: payload_len (u32 LE)] +//! [8 bytes: entity_id (i64 LE)] +//! [payload_len bytes: ops JSONB] +//! [4 bytes: CRC32 of entity_id + ops] +//! +//! The writer appends records and fsyncs. The reader tails the file, +//! reading batches of records and tracking a byte-offset cursor. +//! Partial records at EOF are skipped (crash recovery). + +use std::fs::{self, File, OpenOptions}; +use std::io::{self, Read, Write}; +use std::path::{Path, PathBuf}; + +use crate::pg_sync::ops::{EntityOps, Op}; + +const HEADER_SIZE: usize = 4 + 8; // payload_len + entity_id +const CRC_SIZE: usize = 4; + +/// WAL writer — appends ops records to a file with CRC32 integrity. +pub struct WalWriter { + path: PathBuf, +} + +impl WalWriter { + pub fn new(path: impl Into) -> Self { + Self { path: path.into() } + } + + /// Append a batch of entity ops to the WAL. Writes all records and fsyncs. + /// Returns the number of bytes written. + pub fn append_batch(&self, batch: &[EntityOps]) -> io::Result { + if batch.is_empty() { + return Ok(0); + } + + let mut file = OpenOptions::new() + .create(true) + .append(true) + .open(&self.path)?; + + let mut total_bytes = 0u64; + for entry in batch { + let ops_json = serde_json::to_vec(&entry.ops) + .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?; + + let payload_len = ops_json.len() as u32; + let entity_id_bytes = entry.entity_id.to_le_bytes(); + + // CRC covers entity_id + ops (not the length prefix) + let mut crc_input = Vec::with_capacity(8 + ops_json.len()); + crc_input.extend_from_slice(&entity_id_bytes); + crc_input.extend_from_slice(&ops_json); + let crc = crc32fast::hash(&crc_input); + + // Write: [len][entity_id][ops][crc] + file.write_all(&payload_len.to_le_bytes())?; + file.write_all(&entity_id_bytes)?; + file.write_all(&ops_json)?; + file.write_all(&crc.to_le_bytes())?; + + total_bytes += (HEADER_SIZE + ops_json.len() + CRC_SIZE) as u64; + } + + file.sync_all()?; + Ok(total_bytes) + } + + /// Get the file path. + pub fn path(&self) -> &Path { + &self.path + } + + /// Get current file size (0 if file doesn't exist). + pub fn file_size(&self) -> u64 { + fs::metadata(&self.path).map(|m| m.len()).unwrap_or(0) + } +} + +/// WAL reader — reads ops records from a file starting at a byte offset. +pub struct WalReader { + path: PathBuf, + /// Current read position (byte offset into the file) + cursor: u64, +} + +/// Result of reading a batch from the WAL. +pub struct WalBatch { + /// The ops read from the WAL + pub entries: Vec, + /// New cursor position after this batch + pub new_cursor: u64, + /// Number of bytes read + pub bytes_read: u64, + /// Number of records skipped due to CRC failure + pub crc_failures: u64, +} + +impl WalReader { + pub fn new(path: impl Into, cursor: u64) -> Self { + Self { + path: path.into(), + cursor, + } + } + + /// Read up to `max_records` from the WAL starting at the current cursor. + /// Advances the cursor past successfully read records. + /// Stops at EOF or on partial/corrupted records. + pub fn read_batch(&mut self, max_records: usize) -> io::Result { + if !self.path.exists() { + return Ok(WalBatch { + entries: Vec::new(), + new_cursor: self.cursor, + bytes_read: 0, + crc_failures: 0, + }); + } + + let data = fs::read(&self.path)?; + let mut entries = Vec::new(); + let mut pos = self.cursor as usize; + let mut crc_failures = 0u64; + let start_pos = pos; + + while entries.len() < max_records && pos + HEADER_SIZE <= data.len() { + // Read header + let payload_len = + u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize; + let entity_id = + i64::from_le_bytes(data[pos + 4..pos + 12].try_into().unwrap()); + + let record_end = pos + HEADER_SIZE + payload_len + CRC_SIZE; + if record_end > data.len() { + // Truncated record at EOF — stop here, don't advance cursor + break; + } + + // Verify CRC + let crc_input = &data[pos + 4..pos + HEADER_SIZE + payload_len]; // entity_id + ops + let stored_crc = u32::from_le_bytes( + data[pos + HEADER_SIZE + payload_len..record_end] + .try_into() + .unwrap(), + ); + let computed_crc = crc32fast::hash(crc_input); + + if stored_crc != computed_crc { + // CRC failure — skip this record + crc_failures += 1; + pos = record_end; + continue; + } + + // Parse ops JSON + let ops_data = &data[pos + HEADER_SIZE..pos + HEADER_SIZE + payload_len]; + match serde_json::from_slice::>(ops_data) { + Ok(ops) => { + entries.push(EntityOps { entity_id, ops }); + } + Err(_) => { + // Invalid JSON — skip + crc_failures += 1; + } + } + + pos = record_end; + } + + let bytes_read = (pos - start_pos) as u64; + self.cursor = pos as u64; + + Ok(WalBatch { + entries, + new_cursor: self.cursor, + bytes_read, + crc_failures, + }) + } + + /// Get the current cursor position. + pub fn cursor(&self) -> u64 { + self.cursor + } + + /// Set the cursor position (for recovery from persisted state). + pub fn set_cursor(&mut self, cursor: u64) { + self.cursor = cursor; + } + + /// Check if there are more records to read (cursor < file size). + pub fn has_more(&self) -> bool { + let file_size = fs::metadata(&self.path).map(|m| m.len()).unwrap_or(0); + self.cursor < file_size + } +} + +/// Delete a WAL file. +pub fn remove_wal(path: &Path) -> io::Result<()> { + if path.exists() { + fs::remove_file(path)?; + } + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + use tempfile::TempDir; + + fn make_ops(entity_id: i64, ops: Vec) -> EntityOps { + EntityOps { entity_id, ops } + } + + #[test] + fn test_write_read_roundtrip() { + let dir = TempDir::new().unwrap(); + let wal_path = dir.path().join("test.wal"); + + let writer = WalWriter::new(&wal_path); + let batch = vec![ + make_ops(1, vec![Op::Set { field: "nsfwLevel".into(), value: json!(16) }]), + make_ops(2, vec![Op::Add { field: "tagIds".into(), value: json!(42) }]), + make_ops(3, vec![Op::Delete]), + ]; + let bytes = writer.append_batch(&batch).unwrap(); + assert!(bytes > 0); + + let mut reader = WalReader::new(&wal_path, 0); + let result = reader.read_batch(100).unwrap(); + assert_eq!(result.entries.len(), 3); + assert_eq!(result.entries[0].entity_id, 1); + assert_eq!(result.entries[1].entity_id, 2); + assert_eq!(result.entries[2].entity_id, 3); + assert_eq!(result.crc_failures, 0); + assert!(!reader.has_more()); + } + + #[test] + fn test_multiple_appends() { + let dir = TempDir::new().unwrap(); + let wal_path = dir.path().join("test.wal"); + let writer = WalWriter::new(&wal_path); + + // First batch + writer.append_batch(&[ + make_ops(1, vec![Op::Set { field: "a".into(), value: json!(1) }]), + ]).unwrap(); + + // Second batch + writer.append_batch(&[ + make_ops(2, vec![Op::Set { field: "b".into(), value: json!(2) }]), + ]).unwrap(); + + let mut reader = WalReader::new(&wal_path, 0); + let result = reader.read_batch(100).unwrap(); + assert_eq!(result.entries.len(), 2); + assert_eq!(result.entries[0].entity_id, 1); + assert_eq!(result.entries[1].entity_id, 2); + } + + #[test] + fn test_cursor_resume() { + let dir = TempDir::new().unwrap(); + let wal_path = dir.path().join("test.wal"); + let writer = WalWriter::new(&wal_path); + + writer.append_batch(&[ + make_ops(1, vec![Op::Set { field: "a".into(), value: json!(1) }]), + make_ops(2, vec![Op::Set { field: "b".into(), value: json!(2) }]), + make_ops(3, vec![Op::Set { field: "c".into(), value: json!(3) }]), + ]).unwrap(); + + // Read first 2 + let mut reader = WalReader::new(&wal_path, 0); + let result = reader.read_batch(2).unwrap(); + assert_eq!(result.entries.len(), 2); + let saved_cursor = reader.cursor(); + + // Resume from cursor — should get the 3rd + let mut reader2 = WalReader::new(&wal_path, saved_cursor); + let result2 = reader2.read_batch(100).unwrap(); + assert_eq!(result2.entries.len(), 1); + assert_eq!(result2.entries[0].entity_id, 3); + assert!(!reader2.has_more()); + } + + #[test] + fn test_truncated_record_at_eof() { + let dir = TempDir::new().unwrap(); + let wal_path = dir.path().join("test.wal"); + let writer = WalWriter::new(&wal_path); + + writer.append_batch(&[ + make_ops(1, vec![Op::Set { field: "a".into(), value: json!(1) }]), + ]).unwrap(); + + // Append garbage (partial record) + let mut file = OpenOptions::new().append(true).open(&wal_path).unwrap(); + file.write_all(&[0u8; 6]).unwrap(); // Too short to be a valid header+payload + + let mut reader = WalReader::new(&wal_path, 0); + let result = reader.read_batch(100).unwrap(); + // Should read the valid record and stop at the truncated one + assert_eq!(result.entries.len(), 1); + assert_eq!(result.crc_failures, 0); + } + + #[test] + fn test_corrupted_crc_skipped() { + let dir = TempDir::new().unwrap(); + let wal_path = dir.path().join("test.wal"); + let writer = WalWriter::new(&wal_path); + + writer.append_batch(&[ + make_ops(1, vec![Op::Set { field: "a".into(), value: json!(1) }]), + make_ops(2, vec![Op::Set { field: "b".into(), value: json!(2) }]), + ]).unwrap(); + + // Corrupt the CRC of the first record + let mut data = fs::read(&wal_path).unwrap(); + // First record: header(12) + ops_json + crc(4) + // Find where the CRC is for the first record + let payload_len = u32::from_le_bytes(data[0..4].try_into().unwrap()) as usize; + let crc_offset = HEADER_SIZE + payload_len; + data[crc_offset] ^= 0xFF; // Flip bits in CRC + fs::write(&wal_path, &data).unwrap(); + + let mut reader = WalReader::new(&wal_path, 0); + let result = reader.read_batch(100).unwrap(); + // First record should be skipped (CRC failure), second should be read + assert_eq!(result.entries.len(), 1); + assert_eq!(result.entries[0].entity_id, 2); + assert_eq!(result.crc_failures, 1); + } + + #[test] + fn test_empty_file() { + let dir = TempDir::new().unwrap(); + let wal_path = dir.path().join("test.wal"); + + let mut reader = WalReader::new(&wal_path, 0); + let result = reader.read_batch(100).unwrap(); + assert_eq!(result.entries.len(), 0); + assert!(!reader.has_more()); + } + + #[test] + fn test_query_op_set_roundtrip() { + let dir = TempDir::new().unwrap(); + let wal_path = dir.path().join("test.wal"); + let writer = WalWriter::new(&wal_path); + + writer.append_batch(&[make_ops(456, vec![ + Op::QueryOpSet { + query: "modelVersionIds eq 456".into(), + ops: vec![ + Op::Remove { field: "baseModel".into(), value: json!("SD 1.5") }, + Op::Set { field: "baseModel".into(), value: json!("SDXL") }, + ], + }, + ])]).unwrap(); + + let mut reader = WalReader::new(&wal_path, 0); + let result = reader.read_batch(100).unwrap(); + assert_eq!(result.entries.len(), 1); + assert_eq!(result.entries[0].entity_id, 456); + match &result.entries[0].ops[0] { + Op::QueryOpSet { query, ops } => { + assert_eq!(query, "modelVersionIds eq 456"); + assert_eq!(ops.len(), 2); + } + _ => panic!("Expected QueryOpSet"), + } + } + + #[test] + fn test_file_size_tracking() { + let dir = TempDir::new().unwrap(); + let wal_path = dir.path().join("test.wal"); + let writer = WalWriter::new(&wal_path); + + assert_eq!(writer.file_size(), 0); + + writer.append_batch(&[ + make_ops(1, vec![Op::Delete]), + ]).unwrap(); + + assert!(writer.file_size() > 0); + } + + #[test] + fn test_remove_wal() { + let dir = TempDir::new().unwrap(); + let wal_path = dir.path().join("test.wal"); + + let writer = WalWriter::new(&wal_path); + writer.append_batch(&[make_ops(1, vec![Op::Delete])]).unwrap(); + assert!(wal_path.exists()); + + remove_wal(&wal_path).unwrap(); + assert!(!wal_path.exists()); + + // Remove non-existent is ok + remove_wal(&wal_path).unwrap(); + } +} diff --git a/src/pg_sync/copy_queries.rs b/src/pg_sync/copy_queries.rs index 36bb568b..415e8062 100644 --- a/src/pg_sync/copy_queries.rs +++ b/src/pg_sync/copy_queries.rs @@ -723,6 +723,7 @@ mod tests { flags: (1 << 13), image_type: String::new(), user_id: 1, blocked_for: None, scanned_at_secs: None, created_at_secs: None, post_id: None, + width: None, height: None, published_at_secs: None, availability: String::new(), posted_to_id: None, }; assert!(row.has_meta()); @@ -739,6 +740,7 @@ mod tests { flags: (1 << 14), image_type: String::new(), user_id: 1, blocked_for: None, scanned_at_secs: None, created_at_secs: None, post_id: None, + width: None, height: None, published_at_secs: None, availability: String::new(), posted_to_id: None, }; assert!(row.on_site()); @@ -754,6 +756,7 @@ mod tests { scanned_at_secs: Some(100), created_at_secs: Some(200), published_at_secs: Some(150), + width: None, height: None, availability: String::new(), posted_to_id: None, post_id: None, }; assert_eq!(row.sort_at_secs(), 200); diff --git a/src/pg_sync/mod.rs b/src/pg_sync/mod.rs index 839aa4a3..18be2a74 100644 --- a/src/pg_sync/mod.rs +++ b/src/pg_sync/mod.rs @@ -12,6 +12,8 @@ pub mod config; pub mod copy_queries; pub mod copy_streams; pub mod metrics_poller; +pub mod op_dedup; +pub mod ops; pub mod outbox_poller; pub mod progress; pub mod queries; diff --git a/src/pg_sync/op_dedup.rs b/src/pg_sync/op_dedup.rs new file mode 100644 index 00000000..17da67a9 --- /dev/null +++ b/src/pg_sync/op_dedup.rs @@ -0,0 +1,285 @@ +//! Op deduplication and compression. +//! +//! Shared helper used by both pg-sync (before sending) and the WAL reader +//! (before applying). Two layers of dedup catch duplicates at both stages. +//! +//! Rules: +//! - LIFO per (entity_id, field): last op wins for set/remove pairs +//! - Add/remove cancellation: add X then remove X = net zero, dropped +//! - QueryOpSet dedup: by (entity_id, query string), last wins +//! - Delete absorbs all prior ops for the same entity_id + +use std::collections::HashMap; + +use super::ops::{EntityOps, Op}; + +/// Deduplicate a batch of entity ops in-place. +/// +/// Processes ops in order (oldest first), applying LIFO semantics: +/// for each (entity_id, field), only the last op survives. +/// Add/remove cancellation eliminates net-zero multi-value ops. +/// A delete op absorbs all prior ops for that entity. +pub fn dedup_ops(batch: &mut Vec) { + // Phase 1: Merge all ops per entity_id + let mut entity_map: HashMap> = HashMap::new(); + for entry in batch.drain(..) { + entity_map + .entry(entry.entity_id) + .or_default() + .extend(entry.ops); + } + + // Phase 2: Dedup ops within each entity + for (_entity_id, ops) in &mut entity_map { + dedup_entity_ops(ops); + } + + // Phase 3: Rebuild batch, dropping empty entries + *batch = entity_map + .into_iter() + .filter(|(_, ops)| !ops.is_empty()) + .map(|(entity_id, ops)| EntityOps { entity_id, ops }) + .collect(); +} + +/// Dedup ops for a single entity. Mutates the vec in place. +fn dedup_entity_ops(ops: &mut Vec) { + if ops.is_empty() { + return; + } + + // If there's a Delete, it absorbs everything — only keep the delete + if ops.iter().any(|op| matches!(op, Op::Delete)) { + ops.clear(); + ops.push(Op::Delete); + return; + } + + // First pass: collect all ops, tracking which fields have Set ops + let mut all_ops: Vec = ops.drain(..).collect(); + let mut set_fields: std::collections::HashSet = std::collections::HashSet::new(); + for op in &all_ops { + if let Op::Set { field, .. } = op { + set_fields.insert(field.clone()); + } + } + + // LIFO for set/remove on scalar fields (paired with Set = old value cleanup) + let mut last_set: HashMap = HashMap::new(); + let mut last_remove: HashMap = HashMap::new(); + + // Track add/remove for multi-value fields (net operations) + // Key: (field, value_as_string), Value: net count (+1 for add, -1 for remove) + let mut multi_value_net: HashMap<(String, String), i64> = HashMap::new(); + + // Track queryOpSet by query string (last wins) + let mut query_ops: HashMap> = HashMap::new(); + + for op in all_ops { + match op { + Op::Set { ref field, ref value } => { + last_set.insert(field.clone(), value.clone()); + } + Op::Remove { ref field, ref value } => { + if set_fields.contains(field) { + // Scalar field: this remove is paired with a set (old value cleanup) + last_remove.insert(field.clone(), value.clone()); + } else { + // Multi-value field: track net operations + let key = (field.clone(), value.to_string()); + *multi_value_net.entry(key).or_insert(0) -= 1; + } + } + Op::Add { ref field, ref value } => { + let key = (field.clone(), value.to_string()); + *multi_value_net.entry(key).or_insert(0) += 1; + } + Op::QueryOpSet { ref query, ops: ref nested_ops } => { + query_ops.insert(query.clone(), nested_ops.clone()); + } + Op::Delete => unreachable!("handled above"), + } + } + + // Rebuild: remove ops first, then set ops (order matters for bitmap updates) + for (field, value) in &last_remove { + ops.push(Op::Remove { + field: field.clone(), + value: value.clone(), + }); + } + + for (field, value) in last_set { + ops.push(Op::Set { field, value }); + } + + // Multi-value: emit net operations + for ((field, value_str), net) in multi_value_net { + if net == 0 { + continue; // Cancelled out + } + let value: serde_json::Value = serde_json::from_str(&value_str) + .unwrap_or(serde_json::Value::String(value_str)); + if net > 0 { + ops.push(Op::Add { field, value }); + } else { + ops.push(Op::Remove { field, value }); + } + } + + // QueryOpSets: last query string wins + for (query, nested) in query_ops { + ops.push(Op::QueryOpSet { query, ops: nested }); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + fn entity(id: i64, ops: Vec) -> EntityOps { + EntityOps { entity_id: id, ops } + } + + #[test] + fn test_lifo_set_same_field() { + let mut batch = vec![ + entity(1, vec![ + Op::Set { field: "nsfwLevel".into(), value: json!(8) }, + ]), + entity(1, vec![ + Op::Set { field: "nsfwLevel".into(), value: json!(16) }, + ]), + ]; + dedup_ops(&mut batch); + assert_eq!(batch.len(), 1); + let ops = &batch[0].ops; + // Last set wins + let set_op = ops.iter().find(|op| matches!(op, Op::Set { field, .. } if field == "nsfwLevel")).unwrap(); + if let Op::Set { value, .. } = set_op { + assert_eq!(*value, json!(16)); + } + } + + #[test] + fn test_different_fields_preserved() { + let mut batch = vec![entity(1, vec![ + Op::Set { field: "nsfwLevel".into(), value: json!(16) }, + Op::Set { field: "type".into(), value: json!("video") }, + ])]; + dedup_ops(&mut batch); + assert_eq!(batch[0].ops.len(), 2); + } + + #[test] + fn test_add_remove_cancellation() { + let mut batch = vec![entity(1, vec![ + Op::Add { field: "tagIds".into(), value: json!(42) }, + Op::Remove { field: "tagIds".into(), value: json!(42) }, + ])]; + dedup_ops(&mut batch); + // Net zero — entity should be dropped entirely + assert!(batch.is_empty() || batch[0].ops.is_empty()); + } + + #[test] + fn test_add_survives_when_no_cancel() { + let mut batch = vec![entity(1, vec![ + Op::Add { field: "tagIds".into(), value: json!(42) }, + Op::Add { field: "tagIds".into(), value: json!(99) }, + ])]; + dedup_ops(&mut batch); + assert_eq!(batch.len(), 1); + let adds: Vec<_> = batch[0].ops.iter() + .filter(|op| matches!(op, Op::Add { .. })) + .collect(); + assert_eq!(adds.len(), 2); + } + + #[test] + fn test_delete_absorbs_all() { + let mut batch = vec![ + entity(1, vec![ + Op::Set { field: "nsfwLevel".into(), value: json!(16) }, + Op::Add { field: "tagIds".into(), value: json!(42) }, + ]), + entity(1, vec![Op::Delete]), + ]; + dedup_ops(&mut batch); + assert_eq!(batch.len(), 1); + assert_eq!(batch[0].ops.len(), 1); + assert!(matches!(&batch[0].ops[0], Op::Delete)); + } + + #[test] + fn test_different_entities_independent() { + let mut batch = vec![ + entity(1, vec![Op::Set { field: "nsfwLevel".into(), value: json!(16) }]), + entity(2, vec![Op::Set { field: "nsfwLevel".into(), value: json!(32) }]), + ]; + dedup_ops(&mut batch); + assert_eq!(batch.len(), 2); + } + + #[test] + fn test_query_op_set_last_wins() { + let mut batch = vec![entity(456, vec![ + Op::QueryOpSet { + query: "modelVersionIds eq 456".into(), + ops: vec![Op::Set { field: "baseModel".into(), value: json!("SD 1.5") }], + }, + Op::QueryOpSet { + query: "modelVersionIds eq 456".into(), + ops: vec![Op::Set { field: "baseModel".into(), value: json!("SDXL") }], + }, + ])]; + dedup_ops(&mut batch); + let qops: Vec<_> = batch[0].ops.iter() + .filter(|op| matches!(op, Op::QueryOpSet { .. })) + .collect(); + assert_eq!(qops.len(), 1); + if let Op::QueryOpSet { ops, .. } = &qops[0] { + if let Op::Set { value, .. } = &ops[0] { + assert_eq!(*value, json!("SDXL")); + } + } + } + + #[test] + fn test_remove_set_pair_preserved() { + // An update: remove old value, set new value — both should survive + let mut batch = vec![entity(1, vec![ + Op::Remove { field: "nsfwLevel".into(), value: json!(8) }, + Op::Set { field: "nsfwLevel".into(), value: json!(16) }, + ])]; + dedup_ops(&mut batch); + assert_eq!(batch.len(), 1); + let has_remove = batch[0].ops.iter().any(|op| matches!(op, Op::Remove { field, .. } if field == "nsfwLevel")); + let has_set = batch[0].ops.iter().any(|op| matches!(op, Op::Set { field, .. } if field == "nsfwLevel")); + assert!(has_remove, "remove should survive"); + assert!(has_set, "set should survive"); + } + + #[test] + fn test_empty_batch() { + let mut batch: Vec = vec![]; + dedup_ops(&mut batch); + assert!(batch.is_empty()); + } + + #[test] + fn test_multiple_adds_same_value_collapse() { + // Adding tag 42 three times should still produce one add + let mut batch = vec![entity(1, vec![ + Op::Add { field: "tagIds".into(), value: json!(42) }, + Op::Add { field: "tagIds".into(), value: json!(42) }, + Op::Add { field: "tagIds".into(), value: json!(42) }, + ])]; + dedup_ops(&mut batch); + let adds: Vec<_> = batch[0].ops.iter() + .filter(|op| matches!(op, Op::Add { field, .. } if field == "tagIds")) + .collect(); + assert_eq!(adds.len(), 1); + } +} diff --git a/src/pg_sync/ops.rs b/src/pg_sync/ops.rs new file mode 100644 index 00000000..94ccef04 --- /dev/null +++ b/src/pg_sync/ops.rs @@ -0,0 +1,282 @@ +//! V2 ops data types for the ops-based sync pipeline. +//! +//! Ops are self-contained mutations: each carries the field name, old value (for removes), +//! and new value (for sets). This eliminates docstore reads on the write path. +//! +//! Op types: +//! - `set`: Set a scalar/sort field to a new value +//! - `remove`: Clear a slot from a field's bitmap (carries old value) +//! - `add`: Add a value to a multi-value field (tags, tools, etc.) +//! - `delete`: Delete a document (clears all bitmaps + alive bit) +//! - `queryOpSet`: Resolve slots via a BitDex query, apply nested ops to all matches + +use serde::{Deserialize, Serialize}; + +/// A single operation within an ops array. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[serde(tag = "op")] +pub enum Op { + /// Set a field to a value. For filter fields, sets the bit in the value's bitmap. + /// For sort fields, decomposes to bit layers. + #[serde(rename = "set")] + Set { + field: String, + value: serde_json::Value, + }, + + /// Remove a slot from a field's bitmap (old value). Used in remove/set pairs + /// for field changes: remove old value, then set new value. + #[serde(rename = "remove")] + Remove { + field: String, + value: serde_json::Value, + }, + + /// Add a value to a multi-value field (e.g., tagIds, toolIds). + /// Used for join-table INSERTs. + #[serde(rename = "add")] + Add { + field: String, + value: serde_json::Value, + }, + + /// Delete a document. Clears all filter/sort bitmap bits + alive bit. + /// Requires a docstore read to determine which bitmaps to clear. + #[serde(rename = "delete")] + Delete, + + /// Query-resolved bulk operation. Resolves slots via a BitDex query string, + /// then applies the nested ops to all matching slots. + /// Used for fan-out tables (ModelVersion, Post, Model). + #[serde(rename = "queryOpSet")] + QueryOpSet { + /// BitDex query string (e.g., "modelVersionIds eq 456") + query: String, + /// Ops to apply to all slots matching the query + ops: Vec, + }, +} + +/// A row from the BitdexOps table. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct OpsRow { + /// Auto-incrementing ID (used as cursor position) + pub id: i64, + /// The entity (image) ID this op targets. For queryOpSet, this is the + /// source entity ID (e.g., ModelVersion ID, Post ID). + pub entity_id: i64, + /// Array of operations to apply + pub ops: Vec, +} + +/// A batch of ops sent to the BitDex /ops endpoint. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct OpsBatch { + /// Per-entity ops + pub ops: Vec, + /// Optional sync source metadata (cursor position, lag, etc.) + #[serde(skip_serializing_if = "Option::is_none")] + pub meta: Option, +} + +/// Ops for a single entity within a batch. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct EntityOps { + /// The entity (image) ID + pub entity_id: i64, + /// Operations to apply + pub ops: Vec, +} + +/// Sync source metadata, bundled with ops payloads. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SyncMeta { + /// Sync source identifier (e.g., "pg-sync-default", "clickhouse") + pub source: String, + /// Current cursor position in the ops table + #[serde(skip_serializing_if = "Option::is_none")] + pub cursor: Option, + /// Max ID in the ops table (for lag calculation) + #[serde(skip_serializing_if = "Option::is_none")] + pub max_id: Option, + /// Number of rows behind (max_id - cursor) + #[serde(skip_serializing_if = "Option::is_none")] + pub lag_rows: Option, +} + +/// SQL for creating the BitdexOps table and index. +pub const SETUP_OPS_SQL: &str = r#" +CREATE TABLE IF NOT EXISTS "BitdexOps" ( + id BIGSERIAL PRIMARY KEY, + entity_id BIGINT NOT NULL, + ops JSONB NOT NULL, + created_at TIMESTAMPTZ DEFAULT now() +); + +CREATE INDEX IF NOT EXISTS idx_bitdex_ops_id ON "BitdexOps" (id); +"#; + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + #[test] + fn test_set_op_roundtrip() { + let op = Op::Set { + field: "nsfwLevel".into(), + value: json!(16), + }; + let json = serde_json::to_string(&op).unwrap(); + let parsed: Op = serde_json::from_str(&json).unwrap(); + assert_eq!(op, parsed); + } + + #[test] + fn test_remove_op_roundtrip() { + let op = Op::Remove { + field: "nsfwLevel".into(), + value: json!(8), + }; + let json = serde_json::to_string(&op).unwrap(); + let parsed: Op = serde_json::from_str(&json).unwrap(); + assert_eq!(op, parsed); + } + + #[test] + fn test_add_op_roundtrip() { + let op = Op::Add { + field: "tagIds".into(), + value: json!(42), + }; + let json = serde_json::to_string(&op).unwrap(); + let parsed: Op = serde_json::from_str(&json).unwrap(); + assert_eq!(op, parsed); + } + + #[test] + fn test_delete_op_roundtrip() { + let op = Op::Delete; + let json = serde_json::to_string(&op).unwrap(); + let parsed: Op = serde_json::from_str(&json).unwrap(); + assert_eq!(op, parsed); + } + + #[test] + fn test_query_op_set_roundtrip() { + let op = Op::QueryOpSet { + query: "modelVersionIds eq 456".into(), + ops: vec![ + Op::Remove { + field: "baseModel".into(), + value: json!("SD 1.5"), + }, + Op::Set { + field: "baseModel".into(), + value: json!("SDXL"), + }, + ], + }; + let json = serde_json::to_string(&op).unwrap(); + let parsed: Op = serde_json::from_str(&json).unwrap(); + assert_eq!(op, parsed); + } + + #[test] + fn test_ops_array_from_json() { + let json = json!([ + {"op": "remove", "field": "nsfwLevel", "value": 8}, + {"op": "set", "field": "nsfwLevel", "value": 16}, + {"op": "add", "field": "tagIds", "value": 42}, + {"op": "delete"} + ]); + let ops: Vec = serde_json::from_value(json).unwrap(); + assert_eq!(ops.len(), 4); + assert!(matches!(&ops[0], Op::Remove { field, .. } if field == "nsfwLevel")); + assert!(matches!(&ops[1], Op::Set { field, .. } if field == "nsfwLevel")); + assert!(matches!(&ops[2], Op::Add { field, .. } if field == "tagIds")); + assert!(matches!(&ops[3], Op::Delete)); + } + + #[test] + fn test_ops_batch_with_meta() { + let batch = OpsBatch { + ops: vec![EntityOps { + entity_id: 123, + ops: vec![Op::Set { + field: "nsfwLevel".into(), + value: json!(16), + }], + }], + meta: Some(SyncMeta { + source: "pg-sync-default".into(), + cursor: Some(420_000_000), + max_id: Some(500_000_000), + lag_rows: Some(80_000_000), + }), + }; + let json = serde_json::to_string(&batch).unwrap(); + let parsed: OpsBatch = serde_json::from_str(&json).unwrap(); + assert_eq!(parsed.ops.len(), 1); + assert_eq!(parsed.ops[0].entity_id, 123); + assert!(parsed.meta.is_some()); + assert_eq!(parsed.meta.unwrap().source, "pg-sync-default"); + } + + #[test] + fn test_ops_batch_without_meta() { + let batch = OpsBatch { + ops: vec![], + meta: None, + }; + let json = serde_json::to_string(&batch).unwrap(); + assert!(!json.contains("meta")); + } + + #[test] + fn test_image_insert_ops() { + // Simulates what an Image INSERT trigger would produce + let ops: Vec = vec![ + Op::Set { field: "nsfwLevel".into(), value: json!(1) }, + Op::Set { field: "type".into(), value: json!("image") }, + Op::Set { field: "userId".into(), value: json!(12345) }, + Op::Set { field: "postId".into(), value: json!(67890) }, + Op::Set { field: "existedAt".into(), value: json!(1711234567) }, + ]; + let json = serde_json::to_value(&ops).unwrap(); + let parsed: Vec = serde_json::from_value(json).unwrap(); + assert_eq!(parsed.len(), 5); + } + + #[test] + fn test_image_update_ops_with_old_values() { + // Simulates Image UPDATE: nsfwLevel 8→16 + let ops: Vec = vec![ + Op::Remove { field: "nsfwLevel".into(), value: json!(8) }, + Op::Set { field: "nsfwLevel".into(), value: json!(16) }, + ]; + let json = serde_json::to_value(&ops).unwrap(); + let parsed: Vec = serde_json::from_value(json).unwrap(); + assert_eq!(parsed.len(), 2); + } + + #[test] + fn test_query_op_set_with_in_query() { + // Model POI change: fan-out to all model versions + let op = Op::QueryOpSet { + query: "modelVersionIds in [101, 102, 103]".into(), + ops: vec![Op::Set { + field: "poi".into(), + value: json!(true), + }], + }; + let json = serde_json::to_string(&op).unwrap(); + let parsed: Op = serde_json::from_str(&json).unwrap(); + if let Op::QueryOpSet { query, ops } = parsed { + assert!(query.contains("in [101")); + assert_eq!(ops.len(), 1); + } else { + panic!("Expected QueryOpSet"); + } + } +} From d5f39dd5e543055360cdca2433963bf52fe15c96 Mon Sep 17 00:00:00 2001 From: Justin Maier Date: Wed, 25 Mar 2026 18:16:16 -0600 Subject: [PATCH 02/10] feat: POST /ops endpoint + GET /sync-lag for Sync V2 WAL-backed ops ingestion endpoint: - POST /api/indexes/{name}/ops accepts OpsBatch (ops + sync meta) - Appends to WAL file via WalWriter, returns 200 only after fsync - Lazy WAL writer init (created on first POST) - Stores latest SyncMeta per source for lag monitoring Sync lag endpoint: - GET /api/internal/sync-lag returns latest metadata from all sync sources - Supports cursor position, max_id, lag_rows per source Both endpoints compile-gated behind pg-sync feature with no-op fallbacks. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/server.rs | 109 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 109 insertions(+) diff --git a/src/server.rs b/src/server.rs index a72435c9..a75974c7 100644 --- a/src/server.rs +++ b/src/server.rs @@ -290,6 +290,12 @@ struct AppState { metrics_bitmap_memory: AtomicBool, metrics_eviction_stats: AtomicBool, metrics_boundstore_disk: AtomicBool, + /// WAL writer for V2 ops endpoint. Created lazily on first ops POST. + #[cfg(feature = "pg-sync")] + ops_wal: Mutex>, + /// Latest sync source metadata (cursor, lag) keyed by source name. + #[cfg(feature = "pg-sync")] + sync_meta: Mutex>, } type SharedState = Arc; @@ -991,6 +997,10 @@ impl BitdexServer { metrics_bitmap_memory: AtomicBool::new(true), metrics_eviction_stats: AtomicBool::new(true), metrics_boundstore_disk: AtomicBool::new(true), + #[cfg(feature = "pg-sync")] + ops_wal: Mutex::new(None), + #[cfg(feature = "pg-sync")] + sync_meta: Mutex::new(std::collections::HashMap::new()), }); // Try to restore an existing index from disk @@ -1068,6 +1078,8 @@ impl BitdexServer { .route("/debug/heap-dump", axum::routing::post(handle_heap_dump)) .route("/api/formats", get(handle_list_formats)) .route("/api/internal/pgsync-metrics", post(handle_pgsync_metrics)) + .route("/api/indexes/{name}/ops", post(handle_ops)) + .route("/api/internal/sync-lag", get(handle_sync_lag)) .route("/metrics", get(handle_metrics)) .route("/", get(handle_ui)) .with_state(Arc::clone(&state)); @@ -4156,6 +4168,103 @@ async fn handle_pgsync_metrics( StatusCode::NO_CONTENT } +/// POST /api/indexes/{name}/ops — Accept a batch of sync ops, append to WAL. +/// Returns 200 only after all records are written and fsynced. +#[cfg(feature = "pg-sync")] +async fn handle_ops( + State(state): State, + AxumPath(name): AxumPath, + Json(batch): Json, +) -> impl IntoResponse { + // Verify index exists + { + let guard = state.index.lock(); + match guard.as_ref() { + Some(idx) if idx.definition.name == name => {} + _ => { + return ( + StatusCode::NOT_FOUND, + Json(serde_json::json!({"error": format!("Index '{}' not found", name)})), + ).into_response(); + } + } + } + + // Store sync metadata if provided + if let Some(meta) = &batch.meta { + let mut sync_meta = state.sync_meta.lock(); + sync_meta.insert(meta.source.clone(), meta.clone()); + } + + let ops_count = batch.ops.len(); + if ops_count == 0 { + return (StatusCode::OK, Json(serde_json::json!({"accepted": 0}))).into_response(); + } + + // Ensure WAL writer exists (lazy init) + let wal_path = { + let mut wal_guard = state.ops_wal.lock(); + if wal_guard.is_none() { + let wal_dir = state.data_dir.join("wal"); + std::fs::create_dir_all(&wal_dir).ok(); + let path = wal_dir.join("ops.wal"); + *wal_guard = Some(crate::ops_wal::WalWriter::new(path)); + } + wal_guard.as_ref().unwrap().path().to_path_buf() + }; + + // Write to WAL on blocking thread (fsync is blocking I/O) + let result = tokio::task::spawn_blocking(move || { + let writer = crate::ops_wal::WalWriter::new(&wal_path); + writer.append_batch(&batch.ops) + }) + .await; + + match result { + Ok(Ok(bytes)) => { + (StatusCode::OK, Json(serde_json::json!({ + "accepted": ops_count, + "bytes_written": bytes, + }))).into_response() + } + Ok(Err(e)) => { + eprintln!("WAL write error: {e}"); + (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({ + "error": format!("WAL write failed: {e}"), + }))).into_response() + } + Err(e) => { + eprintln!("WAL write task panicked: {e}"); + (StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({ + "error": "Internal error", + }))).into_response() + } + } +} + +/// Fallback for when pg-sync feature is disabled. +#[cfg(not(feature = "pg-sync"))] +async fn handle_ops( + AxumPath(_name): AxumPath, +) -> impl IntoResponse { + (StatusCode::NOT_FOUND, Json(serde_json::json!({"error": "pg-sync feature not enabled"}))) +} + +/// GET /api/internal/sync-lag — Return latest sync metadata from all sources. +#[cfg(feature = "pg-sync")] +async fn handle_sync_lag( + State(state): State, +) -> impl IntoResponse { + let sync_meta = state.sync_meta.lock(); + let sources: Vec<&crate::pg_sync::ops::SyncMeta> = sync_meta.values().collect(); + Json(serde_json::json!({ "sources": sources })) +} + +#[cfg(not(feature = "pg-sync"))] +async fn handle_sync_lag() -> impl IntoResponse { + Json(serde_json::json!({ "sources": [] })) +} + async fn handle_ui() -> impl IntoResponse { Html(include_str!("../static/index.html")) } From 2007d62fa310ffbe417a5e4983dc16e443591a66 Mon Sep 17 00:00:00 2001 From: Justin Maier Date: Wed, 25 Mar 2026 18:28:07 -0600 Subject: [PATCH 03/10] =?UTF-8?q?feat:=20WAL=20ops=20processor=20=E2=80=94?= =?UTF-8?q?=20converts=20ops=20to=20engine=20mutations?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Ops processor that reads from WAL and routes to the engine: - Regular ops (set/remove/add): build PatchPayload with old+new values, call engine.patch() — no docstore read needed - queryOpSet: parse filter string, execute query for matching slots, apply nested ops to all matches - Delete: route to engine.delete() - Filter parser for queryOpSet: supports eq and in operators Includes json_to_qvalue converter (serde_json::Value → query::Value) for the PatchPayload/FieldValue type boundary. 9 tests: scalar update, insert (no old), multi-value add/remove, delete+queryOpSet skip, filter parsing, value type parsing, cursor persistence. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/lib.rs | 2 + src/ops_processor.rs | 453 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 455 insertions(+) create mode 100644 src/ops_processor.rs diff --git a/src/lib.rs b/src/lib.rs index b110cff7..70efaa53 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,6 +2,8 @@ pub mod bitmap_fs; pub mod bound_store; pub mod bucket_diff_log; #[cfg(feature = "pg-sync")] +pub mod ops_processor; +#[cfg(feature = "pg-sync")] pub mod ops_wal; pub mod cache; pub mod capture; diff --git a/src/ops_processor.rs b/src/ops_processor.rs new file mode 100644 index 00000000..cc473fd3 --- /dev/null +++ b/src/ops_processor.rs @@ -0,0 +1,453 @@ +//! WAL ops processor — reads ops from WAL files and applies them as engine mutations. +//! +//! The processor runs as a dedicated thread, tailing WAL files and converting ops +//! into engine mutations (put/patch/delete). It handles: +//! - Regular ops (set/remove/add) via PatchPayload +//! - queryOpSet via query resolution + bulk bitmap ops +//! - Delete via engine.delete() +//! - Deduplication via shared dedup helper + +use std::collections::HashMap; +use std::path::{Path, PathBuf}; +use std::time::Duration; + +use serde_json::Value as JsonValue; + +use crate::concurrent_engine::ConcurrentEngine; +use crate::mutation::{FieldValue, PatchField, PatchPayload}; +use crate::pg_sync::op_dedup::dedup_ops; +use crate::pg_sync::ops::{EntityOps, Op}; +use crate::query::{BitdexQuery, FilterClause, Value as QValue}; + +/// Convert a serde_json::Value to a query::Value. +fn json_to_qvalue(v: &JsonValue) -> QValue { + match v { + JsonValue::Number(n) => { + if let Some(i) = n.as_i64() { + QValue::Integer(i) + } else if let Some(f) = n.as_f64() { + QValue::Float(f) + } else { + QValue::Integer(0) + } + } + JsonValue::Bool(b) => QValue::Bool(*b), + JsonValue::String(s) => QValue::String(s.clone()), + JsonValue::Null => QValue::Integer(0), // Null → zero for bitmap purposes + _ => QValue::String(v.to_string()), // Arrays/objects → string representation + } +} + +/// Configuration for the ops processor. +pub struct OpsProcessorConfig { + /// Max records to read per WAL batch + pub batch_size: usize, + /// How long to sleep when no new records are available + pub poll_interval: Duration, + /// Path to persist the cursor position + pub cursor_path: PathBuf, +} + +impl Default for OpsProcessorConfig { + fn default() -> Self { + Self { + batch_size: 10_000, + poll_interval: Duration::from_millis(50), + cursor_path: PathBuf::from("wal_cursor"), + } + } +} + +/// Process a single batch of entity ops against the engine. +/// Returns (applied, skipped, errors). +pub fn apply_ops_batch( + engine: &ConcurrentEngine, + batch: &mut Vec, +) -> (usize, usize, usize) { + // Dedup first + dedup_ops(batch); + + let mut applied = 0usize; + let mut skipped = 0usize; + let mut errors = 0usize; + + for entry in batch.iter() { + let entity_id = entry.entity_id; + if entity_id < 0 || entity_id > u32::MAX as i64 { + skipped += 1; + continue; + } + let slot = entity_id as u32; + + for op in &entry.ops { + match op { + Op::Delete => { + match engine.delete(slot) { + Ok(()) => applied += 1, + Err(e) => { + tracing::warn!("ops processor: delete slot {slot} failed: {e}"); + errors += 1; + } + } + } + + Op::QueryOpSet { query, ops } => { + match apply_query_op_set(engine, query, ops) { + Ok(count) => applied += count, + Err(e) => { + tracing::warn!("ops processor: queryOpSet '{query}' failed: {e}"); + errors += 1; + } + } + } + + // Accumulate set/remove/add ops per entity, then apply as a patch + _ => { + // Collect all non-delete, non-queryOpSet ops for this entity + // and apply as a single patch + } + } + } + + // Build a PatchPayload from the set/remove/add ops for this entity + let patch = build_patch_from_ops(&entry.ops); + if !patch.fields.is_empty() { + match engine.patch(slot, &patch) { + Ok(()) => applied += 1, + Err(e) => { + tracing::warn!("ops processor: patch slot {slot} failed: {e}"); + errors += 1; + } + } + } + } + + (applied, skipped, errors) +} + +/// Build a PatchPayload from a list of ops for a single entity. +/// Pairs remove/set ops on the same field into PatchField { old, new }. +/// Add ops become multi-value inserts. +fn build_patch_from_ops(ops: &[Op]) -> PatchPayload { + let mut fields: HashMap = HashMap::new(); + + // First pass: collect removes (old values) and sets (new values) per field + let mut old_values: HashMap<&str, &JsonValue> = HashMap::new(); + let mut new_values: HashMap<&str, &JsonValue> = HashMap::new(); + let mut add_values: HashMap<&str, Vec<&JsonValue>> = HashMap::new(); + let mut remove_values: HashMap<&str, Vec<&JsonValue>> = HashMap::new(); + + for op in ops { + match op { + Op::Remove { field, value } => { + // Check if there's a corresponding Set for this field (scalar update) + let has_set = ops.iter().any(|o| matches!(o, Op::Set { field: f, .. } if f == field)); + if has_set { + old_values.insert(field, value); + } else { + // Multi-value remove + remove_values.entry(field).or_default().push(value); + } + } + Op::Set { field, value } => { + new_values.insert(field, value); + } + Op::Add { field, value } => { + add_values.entry(field).or_default().push(value); + } + Op::Delete | Op::QueryOpSet { .. } => { + // Handled separately + } + } + } + + // Build PatchFields for scalar set/remove pairs + for (field, new_val) in &new_values { + let old = old_values + .get(*field) + .map(|v| FieldValue::Single(json_to_qvalue(v))) + .unwrap_or(FieldValue::Single(QValue::Integer(0))); + let new = FieldValue::Single(json_to_qvalue(new_val)); + fields.insert(field.to_string(), PatchField { old, new }); + } + + // Build PatchFields for multi-value adds + for (field, vals) in &add_values { + let new_multi: Vec = vals.iter().map(|v| json_to_qvalue(v)).collect(); + let existing = fields.entry(field.to_string()).or_insert_with(|| PatchField { + old: FieldValue::Multi(vec![]), + new: FieldValue::Multi(vec![]), + }); + if let FieldValue::Multi(ref mut m) = existing.new { + m.extend(new_multi); + } else { + *existing = PatchField { + old: FieldValue::Multi(vec![]), + new: FieldValue::Multi(vals.iter().map(|v| json_to_qvalue(v)).collect()), + }; + } + } + + // Build PatchFields for multi-value removes + for (field, vals) in &remove_values { + let removed: Vec = vals.iter().map(|v| json_to_qvalue(v)).collect(); + let existing = fields.entry(field.to_string()).or_insert_with(|| PatchField { + old: FieldValue::Multi(vec![]), + new: FieldValue::Multi(vec![]), + }); + if let FieldValue::Multi(ref mut m) = existing.old { + m.extend(removed); + } else { + *existing = PatchField { + old: FieldValue::Multi(vals.iter().map(|v| json_to_qvalue(v)).collect()), + new: FieldValue::Multi(vec![]), + }; + } + } + + PatchPayload { fields } +} + +/// Resolve a queryOpSet: execute the query to get matching slots, then apply +/// the nested ops to each slot. +fn apply_query_op_set( + engine: &ConcurrentEngine, + query_str: &str, + ops: &[Op], +) -> Result { + // Parse the query string into filter clauses + let filters = parse_filter_from_query_str(query_str)?; + + let query = BitdexQuery { + filters, + sort: None, + limit: usize::MAX, // Get all matching slots + offset: None, + cursor: None, + skip_cache: true, // Don't pollute cache with internal queries + }; + + // Execute query to get matching slot IDs + let result = engine + .execute_query(&query) + .map_err(|e| format!("queryOpSet query failed: {e}"))?; + + let slot_ids = &result.ids; + if slot_ids.is_empty() { + return Ok(0); + } + + // Build the patch from nested ops + let patch = build_patch_from_ops(ops); + if patch.fields.is_empty() { + return Ok(0); + } + + // Apply patch to each matching slot + let mut applied = 0; + for &slot_id in slot_ids { + if slot_id < 0 { + continue; + } + let slot = slot_id as u32; + match engine.patch(slot, &patch) { + Ok(()) => applied += 1, + Err(e) => { + tracing::warn!("queryOpSet: patch slot {slot} failed: {e}"); + } + } + } + + Ok(applied) +} + +/// Parse a simple filter string like "modelVersionIds eq 456" or "postId eq 789" +/// into filter clauses. +fn parse_filter_from_query_str(query_str: &str) -> Result, String> { + let clauses: Vec<&str> = query_str.split(" AND ").collect(); + let mut filters = Vec::new(); + + for clause in clauses { + let parts: Vec<&str> = clause.trim().splitn(3, ' ').collect(); + if parts.len() < 3 { + return Err(format!("Invalid filter clause: '{clause}'")); + } + + let field = parts[0].to_string(); + let op = parts[1].to_lowercase(); + let value_str = parts[2]; + + let filter = match op.as_str() { + "eq" => { + let value = parse_query_value(value_str)?; + FilterClause::Eq(field, value) + } + "in" => { + let values = parse_query_values_array(value_str)?; + FilterClause::In(field, values) + } + _ => { + return Err(format!("Unsupported filter op '{op}' in queryOpSet")); + } + }; + filters.push(filter); + } + + Ok(filters) +} + +/// Parse a single query value from a string. +fn parse_query_value(s: &str) -> Result { + if let Ok(n) = s.parse::() { + return Ok(QValue::Integer(n)); + } + if let Ok(f) = s.parse::() { + return Ok(QValue::Float(f)); + } + if s == "true" { + return Ok(QValue::Bool(true)); + } + if s == "false" { + return Ok(QValue::Bool(false)); + } + let stripped = s.trim_matches('"').trim_matches('\''); + Ok(QValue::String(stripped.to_string())) +} + +/// Parse an array of query values like "[101, 102, 103]". +fn parse_query_values_array(s: &str) -> Result, String> { + let trimmed = s.trim(); + if !trimmed.starts_with('[') || !trimmed.ends_with(']') { + return Err(format!("Expected array for 'in' filter, got: '{s}'")); + } + let inner = &trimmed[1..trimmed.len() - 1]; + let mut values = Vec::new(); + for part in inner.split(',') { + let part = part.trim(); + if !part.is_empty() { + values.push(parse_query_value(part)?); + } + } + Ok(values) +} + +/// Persist cursor position to disk. +pub fn save_cursor(path: &Path, cursor: u64) -> std::io::Result<()> { + std::fs::write(path, cursor.to_string()) +} + +/// Load cursor position from disk. Returns 0 if file doesn't exist. +pub fn load_cursor(path: &Path) -> u64 { + std::fs::read_to_string(path) + .ok() + .and_then(|s| s.trim().parse().ok()) + .unwrap_or(0) +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + #[test] + fn test_build_patch_from_scalar_update() { + let ops = vec![ + Op::Remove { field: "nsfwLevel".into(), value: json!(8) }, + Op::Set { field: "nsfwLevel".into(), value: json!(16) }, + ]; + let patch = build_patch_from_ops(&ops); + assert_eq!(patch.fields.len(), 1); + let field = &patch.fields["nsfwLevel"]; + assert_eq!(field.old, FieldValue::Single(QValue::Integer(8))); + assert_eq!(field.new, FieldValue::Single(QValue::Integer(16))); + } + + #[test] + fn test_build_patch_from_insert_no_old() { + let ops = vec![ + Op::Set { field: "nsfwLevel".into(), value: json!(16) }, + Op::Set { field: "type".into(), value: json!("image") }, + ]; + let patch = build_patch_from_ops(&ops); + assert_eq!(patch.fields.len(), 2); + assert_eq!(patch.fields["nsfwLevel"].old, FieldValue::Single(QValue::Integer(0))); + assert_eq!(patch.fields["nsfwLevel"].new, FieldValue::Single(QValue::Integer(16))); + } + + #[test] + fn test_build_patch_from_add() { + let ops = vec![ + Op::Add { field: "tagIds".into(), value: json!(42) }, + Op::Add { field: "tagIds".into(), value: json!(99) }, + ]; + let patch = build_patch_from_ops(&ops); + assert_eq!(patch.fields.len(), 1); + if let FieldValue::Multi(ref vals) = patch.fields["tagIds"].new { + assert_eq!(vals.len(), 2); + } else { + panic!("Expected Multi"); + } + } + + #[test] + fn test_build_patch_from_multi_remove() { + let ops = vec![ + Op::Remove { field: "tagIds".into(), value: json!(42) }, + ]; + let patch = build_patch_from_ops(&ops); + assert_eq!(patch.fields.len(), 1); + if let FieldValue::Multi(ref vals) = patch.fields["tagIds"].old { + assert_eq!(vals.len(), 1); + assert_eq!(vals[0], QValue::Integer(42)); + } else { + panic!("Expected Multi for old"); + } + } + + #[test] + fn test_build_patch_skips_delete_and_query() { + let ops = vec![ + Op::Delete, + Op::QueryOpSet { query: "x eq 1".into(), ops: vec![] }, + Op::Set { field: "a".into(), value: json!(1) }, + ]; + let patch = build_patch_from_ops(&ops); + assert_eq!(patch.fields.len(), 1); + assert!(patch.fields.contains_key("a")); + } + + #[test] + fn test_parse_filter_eq() { + let filters = parse_filter_from_query_str("modelVersionIds eq 456").unwrap(); + assert_eq!(filters.len(), 1); + assert!(matches!(&filters[0], FilterClause::Eq(f, QValue::Integer(456)) if f == "modelVersionIds")); + } + + #[test] + fn test_parse_filter_in() { + let filters = parse_filter_from_query_str("modelVersionIds in [101, 102, 103]").unwrap(); + assert_eq!(filters.len(), 1); + if let FilterClause::In(f, vals) = &filters[0] { + assert_eq!(f, "modelVersionIds"); + assert_eq!(vals.len(), 3); + } else { + panic!("Expected In clause"); + } + } + + #[test] + fn test_parse_query_value_types() { + assert!(matches!(parse_query_value("42").unwrap(), QValue::Integer(42))); + assert!(matches!(parse_query_value("true").unwrap(), QValue::Bool(true))); + assert!(matches!(parse_query_value("\"hello\"").unwrap(), QValue::String(s) if s == "hello")); + } + + #[test] + fn test_cursor_persistence() { + let dir = tempfile::TempDir::new().unwrap(); + let path = dir.path().join("cursor"); + assert_eq!(load_cursor(&path), 0); + save_cursor(&path, 12345).unwrap(); + assert_eq!(load_cursor(&path), 12345); + } +} From 5ba3af24f8003f28ea4dbb55cf7a454c49401915 Mon Sep 17 00:00:00 2001 From: Justin Maier Date: Wed, 25 Mar 2026 18:29:24 -0600 Subject: [PATCH 04/10] =?UTF-8?q?feat:=20V2=20ops=20poller=20=E2=80=94=20r?= =?UTF-8?q?eads=20BitdexOps,=20deduplicates,=20POSTs=20to=20/ops=20endpoin?= =?UTF-8?q?t?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New ops_poller.rs replaces outbox_poller for V2 sync: - Reads from BitdexOps table (JSONB ops arrays) instead of BitdexOutbox - Cursor managed in PG bitdex_cursors table (not in BitDex) - Deduplicates via shared dedup_ops() before sending - POSTs OpsBatch with SyncMeta (cursor, max_id, lag) to /ops endpoint - Health gate: pauses when BitDex is unreachable Also adds post_ops() to BitdexClient. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/pg_sync/bitdex_client.rs | 18 +++ src/pg_sync/mod.rs | 1 + src/pg_sync/ops_poller.rs | 210 +++++++++++++++++++++++++++++++++++ 3 files changed, 229 insertions(+) create mode 100644 src/pg_sync/ops_poller.rs diff --git a/src/pg_sync/bitdex_client.rs b/src/pg_sync/bitdex_client.rs index a5e45c79..50be9d2b 100644 --- a/src/pg_sync/bitdex_client.rs +++ b/src/pg_sync/bitdex_client.rs @@ -243,6 +243,24 @@ impl BitdexClient { .await; } + /// POST a batch of V2 ops to the BitDex /ops endpoint. + pub async fn post_ops(&self, batch: &super::ops::OpsBatch) -> Result<(), String> { + let url = format!("{}/ops", self.base_url); + let resp = self.client + .post(&url) + .json(batch) + .send() + .await + .map_err(|e| format!("post_ops request failed: {e}"))?; + + if !resp.status().is_success() { + let status = resp.status(); + let body = resp.text().await.unwrap_or_default(); + return Err(format!("post_ops returned {status}: {body}")); + } + Ok(()) + } + pub async fn get_cursor(&self, cursor_name: &str) -> Result, String> { let url = format!("{}/cursors/{}", self.base_url, cursor_name); let resp = self.client diff --git a/src/pg_sync/mod.rs b/src/pg_sync/mod.rs index 18be2a74..ca75c0ac 100644 --- a/src/pg_sync/mod.rs +++ b/src/pg_sync/mod.rs @@ -14,6 +14,7 @@ pub mod copy_streams; pub mod metrics_poller; pub mod op_dedup; pub mod ops; +pub mod ops_poller; pub mod outbox_poller; pub mod progress; pub mod queries; diff --git a/src/pg_sync/ops_poller.rs b/src/pg_sync/ops_poller.rs new file mode 100644 index 00000000..090224d4 --- /dev/null +++ b/src/pg_sync/ops_poller.rs @@ -0,0 +1,210 @@ +//! V2 ops poller: reads from BitdexOps table, deduplicates, and POSTs to BitDex /ops endpoint. +//! +//! Replaces the V1 outbox_poller by reading self-contained ops (with old+new values) +//! instead of entity IDs that require enrichment queries. +//! +//! Poll loop: +//! 1. On boot: read cursor from PG bitdex_cursors table +//! 2. SELECT from BitdexOps WHERE id > cursor ORDER BY id ASC LIMIT N +//! 3. Deserialize JSONB ops arrays +//! 4. Dedup via shared dedup_ops() +//! 5. POST batch to BitDex /ops endpoint with sync metadata +//! 6. Advance cursor in PG +//! 7. Report max_outbox_id for lag calculation + +use std::time::Duration; + +use sqlx::PgPool; +use tokio::time::interval; + +use super::bitdex_client::BitdexClient; +use super::op_dedup::dedup_ops; +use super::ops::{EntityOps, Op, OpsBatch, SyncMeta}; + +/// Row from BitdexOps table. +#[derive(Debug, sqlx::FromRow)] +struct OpsRow { + id: i64, + entity_id: i64, + ops: sqlx::types::Json>, +} + +/// Run the V2 ops poller loop. Runs forever until cancelled. +pub async fn run_ops_poller( + pool: &PgPool, + client: &BitdexClient, + poll_interval_secs: u64, + batch_limit: i64, + cursor_name: &str, + replica_id: Option<&str>, +) -> Result<(), String> { + // Wait for BitDex health + eprintln!("Ops poller waiting for BitDex to be healthy..."); + loop { + if client.is_healthy().await { + break; + } + tokio::time::sleep(Duration::from_secs(2)).await; + } + eprintln!("BitDex is healthy."); + + // Read initial cursor from PG + let mut cursor: i64 = read_cursor_from_pg(pool, cursor_name) + .await + .unwrap_or(0); + eprintln!( + "Ops poller started (interval={}s, batch_limit={}, cursor_name={}, starting_cursor={})", + poll_interval_secs, batch_limit, cursor_name, cursor + ); + + let mut ticker = interval(Duration::from_secs(poll_interval_secs)); + let mut bitdex_was_down = false; + + loop { + ticker.tick().await; + + // Health gate + if !client.is_healthy().await { + if !bitdex_was_down { + eprintln!("Ops poller: BitDex unreachable, pausing"); + bitdex_was_down = true; + } + continue; + } + if bitdex_was_down { + eprintln!("Ops poller: BitDex is back, resuming"); + bitdex_was_down = false; + } + + let cycle_start = std::time::Instant::now(); + match poll_and_process(pool, client, batch_limit, cursor_name, &mut cursor, replica_id).await { + Ok(processed) => { + let cycle_secs = cycle_start.elapsed().as_secs_f64(); + if processed > 0 { + eprintln!("Ops poller: processed {processed} ops (cursor={cursor}, cycle={cycle_secs:.3}s)"); + } + } + Err(e) => { + eprintln!("Ops poller error: {e}"); + } + } + } +} + +/// Single poll + process cycle. +async fn poll_and_process( + pool: &PgPool, + client: &BitdexClient, + batch_limit: i64, + cursor_name: &str, + cursor: &mut i64, + replica_id: Option<&str>, +) -> Result { + // Fetch ops after cursor + let rows = poll_ops_from_cursor(pool, *cursor, batch_limit) + .await + .map_err(|e| format!("poll_ops: {e}"))?; + + if rows.is_empty() { + return Ok(0); + } + + let max_id = rows.iter().map(|r| r.id).max().unwrap_or(*cursor); + let total_rows = rows.len(); + + // Convert to EntityOps + let mut batch: Vec = rows + .into_iter() + .map(|row| EntityOps { + entity_id: row.entity_id, + ops: row.ops.0, + }) + .collect(); + + // Dedup + dedup_ops(&mut batch); + + if batch.is_empty() { + // All ops cancelled out — still advance cursor + advance_cursor(pool, cursor_name, max_id, cursor).await?; + return Ok(total_rows); + } + + // Get max ops ID for lag calculation + let max_ops_id = get_max_ops_id(pool).await.unwrap_or(max_id); + + // Build batch with metadata + let ops_batch = OpsBatch { + ops: batch, + meta: Some(SyncMeta { + source: replica_id.unwrap_or("default").to_string(), + cursor: Some(max_id), + max_id: Some(max_ops_id), + lag_rows: Some(max_ops_id - max_id), + }), + }; + + // POST to BitDex + client + .post_ops(&ops_batch) + .await + .map_err(|e| format!("post_ops: {e}"))?; + + // Advance cursor + advance_cursor(pool, cursor_name, max_id, cursor).await?; + + Ok(total_rows) +} + +async fn advance_cursor( + pool: &PgPool, + cursor_name: &str, + max_id: i64, + cursor: &mut i64, +) -> Result<(), String> { + super::queries::upsert_cursor(pool, cursor_name, max_id) + .await + .map_err(|e| format!("upsert_cursor: {e}"))?; + *cursor = max_id; + Ok(()) +} + +// ── SQL queries ── + +/// Read cursor from PG bitdex_cursors table. +async fn read_cursor_from_pg(pool: &PgPool, cursor_name: &str) -> Result { + let row: Option<(i64,)> = sqlx::query_as( + r#"SELECT last_outbox_id FROM bitdex_cursors WHERE replica_id = $1"#, + ) + .bind(cursor_name) + .fetch_optional(pool) + .await?; + Ok(row.map(|r| r.0).unwrap_or(0)) +} + +/// Poll ops from BitdexOps table after a cursor position. +async fn poll_ops_from_cursor( + pool: &PgPool, + cursor: i64, + limit: i64, +) -> Result, sqlx::Error> { + sqlx::query_as::<_, OpsRow>( + r#"SELECT id, entity_id, ops FROM "BitdexOps" + WHERE id > $1 + ORDER BY id ASC + LIMIT $2"#, + ) + .bind(cursor) + .bind(limit) + .fetch_all(pool) + .await +} + +/// Get the current max ops ID (for lag calculation). +async fn get_max_ops_id(pool: &PgPool) -> Result { + let row: (Option,) = + sqlx::query_as(r#"SELECT MAX(id) FROM "BitdexOps""#) + .fetch_one(pool) + .await?; + Ok(row.0.unwrap_or(0)) +} From 44861db82e2e300981d4b51472ade18cc95d54d3 Mon Sep 17 00:00:00 2001 From: Justin Maier Date: Wed, 25 Mar 2026 18:32:17 -0600 Subject: [PATCH 05/10] feat: YAML trigger config + SQL generator for Sync V2 Config-driven PG trigger generation: - SyncSource struct: direct tables (slot_field + track_fields), multi-value join tables (field + value_field), fan-out tables (query + query_source) - SyncConfig: YAML-parseable config with sync_sources array - SQL generator: CREATE OR REPLACE FUNCTION + CREATE TRIGGER for each source - Expression interpolation in track_fields: "GREATEST({scannedAt}, {createdAt}) as existedAt" - {column} placeholder substitution with OLD/NEW prefixes - Hash-based trigger naming (bitdex_{table}_{hash8}) for reconciliation - IS DISTINCT FROM checks for UPDATE ops (only emit when value actually changes) - queryOpSet generation for fan-out tables - ENABLE ALWAYS on all triggers (CDC compatibility) 11 tests: parsing, column substitution, all three trigger types, hash change detection, YAML parsing, expression interpolation. Co-Authored-By: Claude Opus 4.6 (1M context) --- Cargo.lock | 62 ++++- Cargo.toml | 4 +- src/pg_sync/mod.rs | 1 + src/pg_sync/trigger_gen.rs | 558 +++++++++++++++++++++++++++++++++++++ 4 files changed, 623 insertions(+), 2 deletions(-) create mode 100644 src/pg_sync/trigger_gen.rs diff --git a/Cargo.lock b/Cargo.lock index 69bc163b..90a95fe1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -205,13 +205,14 @@ checksum = "5e764a1d40d510daf35e07be9eb06e75770908c27d411ee6c92109c9840eaaf7" [[package]] name = "bitdex-v2" -version = "1.0.71" +version = "1.0.93" dependencies = [ "arc-swap", "axum", "bytes", "chrono", "clap", + "crc32fast", "criterion", "crossbeam-channel", "dashmap", @@ -229,10 +230,13 @@ dependencies = [ "rpmalloc", "serde", "serde_json", + "serde_yaml", "sqlx", "tar", "tempfile", "thiserror 2.0.18", + "tikv-jemalloc-ctl", + "tikv-jemallocator", "tokio", "tokio-util", "toml", @@ -1513,6 +1517,12 @@ dependencies = [ "windows-link", ] +[[package]] +name = "paste" +version = "1.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" + [[package]] name = "pem-rfc7468" version = "0.7.0" @@ -2241,6 +2251,19 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_yaml" +version = "0.9.34+deprecated" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a8b1a1a2ebf674015cc02edccce75287f1a0130d394307b36743c2f5d504b47" +dependencies = [ + "indexmap", + "itoa", + "ryu", + "serde", + "unsafe-libyaml", +] + [[package]] name = "sha1" version = "0.10.6" @@ -2675,6 +2698,37 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "tikv-jemalloc-ctl" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "661f1f6a57b3a36dc9174a2c10f19513b4866816e13425d3e418b11cc37bc24c" +dependencies = [ + "libc", + "paste", + "tikv-jemalloc-sys", +] + +[[package]] +name = "tikv-jemalloc-sys" +version = "0.6.1+5.3.0-1-ge13ca993e8ccb9ba9847cc330696e02839f328f7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd8aa5b2ab86a2cefa406d889139c162cbb230092f7d1d7cbc1716405d852a3b" +dependencies = [ + "cc", + "libc", +] + +[[package]] +name = "tikv-jemallocator" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0359b4327f954e0567e69fb191cf1436617748813819c94b8cd4a431422d053a" +dependencies = [ + "libc", + "tikv-jemalloc-sys", +] + [[package]] name = "tinystr" version = "0.8.2" @@ -2971,6 +3025,12 @@ version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853" +[[package]] +name = "unsafe-libyaml" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "673aac59facbab8a9007c7f6108d11f63b603f7cabff99fabf650fea5c32b861" + [[package]] name = "untrusted" version = "0.9.0" diff --git a/Cargo.toml b/Cargo.toml index 818ce2b7..37f3f389 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,9 +15,10 @@ default = [] server = ["dep:axum", "dep:tower-http", "dep:tokio", "dep:tokio-util", "dep:prometheus"] loadtest = ["ureq"] replay = ["ureq"] -pg-sync = ["dep:sqlx", "dep:clap", "dep:reqwest", "dep:chrono", "dep:tokio", "dep:axum", "dep:tower-http", "dep:futures-core", "dep:futures-util", "dep:bytes"] +pg-sync = ["dep:sqlx", "dep:clap", "dep:reqwest", "dep:chrono", "dep:tokio", "dep:axum", "dep:tower-http", "dep:futures-core", "dep:futures-util", "dep:bytes", "dep:serde_yaml"] simd = ["roaring/simd"] heap-prof = ["dep:tikv-jemallocator", "dep:tikv-jemalloc-ctl"] +serde_yaml = ["dep:serde_yaml"] [dependencies] # Bitmap indexes @@ -85,6 +86,7 @@ thiserror = "2" # Logging tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } +serde_yaml = { version = "0.9.34", optional = true } [dev-dependencies] # Property-based testing diff --git a/src/pg_sync/mod.rs b/src/pg_sync/mod.rs index ca75c0ac..9c943346 100644 --- a/src/pg_sync/mod.rs +++ b/src/pg_sync/mod.rs @@ -16,6 +16,7 @@ pub mod op_dedup; pub mod ops; pub mod ops_poller; pub mod outbox_poller; +pub mod trigger_gen; pub mod progress; pub mod queries; pub mod row_assembler; diff --git a/src/pg_sync/trigger_gen.rs b/src/pg_sync/trigger_gen.rs new file mode 100644 index 00000000..82652b35 --- /dev/null +++ b/src/pg_sync/trigger_gen.rs @@ -0,0 +1,558 @@ +//! YAML-driven PG trigger SQL generator for V2 ops pipeline. +//! +//! Reads a `sync_sources` YAML config and generates PL/pgSQL trigger functions +//! that emit ops into the BitdexOps table. Two table types: +//! +//! **Direct tables** (slot = PG column): +//! - `track_fields`: scalar fields → emit remove/set pairs via IS DISTINCT FROM +//! - `field` + `value_field`: multi-value join tables → emit add/remove +//! - `on_delete: delete_slot`: emit delete op +//! - `sets_alive: true`: only this table can create new alive slots +//! +//! **Fan-out tables** (slots resolved by BitDex query): +//! - `query`: BitDex query template with {column} placeholders +//! - `query_source`: optional PG subquery for cross-table values +//! - `track_fields`: fields to track on the source table + +use std::collections::hash_map::DefaultHasher; +use std::hash::{Hash, Hasher}; + +use serde::Deserialize; + +/// A sync source definition from the YAML config. +#[derive(Debug, Clone, Deserialize)] +pub struct SyncSource { + /// PG table name (e.g., "Image", "TagsOnImageNew") + pub table: String, + + /// For direct tables: PG column that maps to the BitDex slot ID + pub slot_field: Option, + + /// For direct tables: list of scalar fields to track. + /// Can include expressions: "GREATEST({scannedAt}, {createdAt}) as existedAt" + pub track_fields: Option>, + + /// For multi-value join tables: the BitDex field name (e.g., "tagIds") + pub field: Option, + + /// For multi-value join tables: the PG column containing the value (e.g., "tagId") + pub value_field: Option, + + /// Optional SQL WHERE filter for the trigger (e.g., CollectionItem status filter) + pub filter: Option, + + /// If true, this table's INSERT ops set the alive bit on new slots + #[serde(default)] + pub sets_alive: bool, + + /// If "delete_slot", emit a delete op on DELETE + pub on_delete: Option, + + /// For fan-out tables: BitDex query template with {column} placeholders + pub query: Option, + + /// For fan-out tables: PG subquery to get values not on the triggering table + pub query_source: Option, + + /// Tables that must be loaded before this one during dumps + #[serde(rename = "dependsOn")] + pub depends_on: Option>, +} + +/// Full sync config loaded from YAML. +#[derive(Debug, Clone, Deserialize)] +pub struct SyncConfig { + pub sync_sources: Vec, +} + +impl SyncConfig { + /// Load from a YAML string. + pub fn from_yaml(yaml: &str) -> Result { + serde_yaml::from_str(yaml).map_err(|e| format!("Failed to parse sync config: {e}")) + } +} + +/// Generate the trigger function name with hash for reconciliation. +/// Format: bitdex_{table}_{hash8} +pub fn trigger_function_name(source: &SyncSource) -> String { + let body = generate_trigger_body(source); + let hash = short_hash(&body); + format!( + "bitdex_{}_ops_{}", + source.table.to_lowercase(), + hash + ) +} + +/// Generate the trigger name. +pub fn trigger_name(source: &SyncSource) -> String { + let body = generate_trigger_body(source); + let hash = short_hash(&body); + format!("bitdex_{}_{}", source.table.to_lowercase(), hash) +} + +/// Generate the full CREATE OR REPLACE FUNCTION + CREATE TRIGGER SQL +/// for a sync source. +pub fn generate_trigger_sql(source: &SyncSource) -> String { + let func_name = trigger_function_name(source); + let trig_name = trigger_name(source); + let body = generate_trigger_body(source); + + let trigger_events = if source.field.is_some() { + // Multi-value join table: INSERT and DELETE only + "AFTER INSERT OR DELETE" + } else if source.on_delete.as_deref() == Some("delete_slot") { + "AFTER INSERT OR UPDATE OR DELETE" + } else { + "AFTER INSERT OR UPDATE" + }; + + format!( + r#"CREATE OR REPLACE FUNCTION {func_name}() RETURNS trigger AS $$ +{body} +$$ LANGUAGE plpgsql; + +DROP TRIGGER IF EXISTS {trig_name} ON "{table}"; +CREATE TRIGGER {trig_name} {trigger_events} ON "{table}" + FOR EACH ROW EXECUTE FUNCTION {func_name}(); +ALTER TABLE "{table}" ENABLE ALWAYS TRIGGER {trig_name}; +"#, + func_name = func_name, + trig_name = trig_name, + body = body, + trigger_events = trigger_events, + table = source.table, + ) +} + +/// Generate the PL/pgSQL function body for a sync source. +fn generate_trigger_body(source: &SyncSource) -> String { + if let Some(ref field) = source.field { + // Multi-value join table (tags, tools, techniques, etc.) + generate_multi_value_body(source, field) + } else if source.query.is_some() { + // Fan-out table (ModelVersion, Post, Model) + generate_fan_out_body(source) + } else { + // Direct table (Image) + generate_direct_body(source) + } +} + +/// Generate body for direct tables (e.g., Image). +fn generate_direct_body(source: &SyncSource) -> String { + let slot_field = source.slot_field.as_deref().unwrap_or("id"); + let track_fields = source.track_fields.as_deref().unwrap_or(&[]); + let has_delete = source.on_delete.as_deref() == Some("delete_slot"); + + let mut body = String::from("DECLARE\n _ops jsonb;\nBEGIN\n"); + + // INSERT: emit set ops for all tracked fields (no remove since no prior state) + body.push_str(" IF TG_OP = 'INSERT' THEN\n"); + body.push_str(" _ops := jsonb_build_array(\n"); + let insert_ops: Vec = track_fields + .iter() + .map(|f| { + let (field_name, expr) = parse_track_field(f); + let new_expr = substitute_columns(&expr, "NEW"); + format!( + " jsonb_build_object('op', 'set', 'field', '{}', 'value', to_jsonb({}))", + field_name, new_expr + ) + }) + .collect(); + body.push_str(&insert_ops.join(",\n")); + body.push_str("\n );\n"); + body.push_str(&format!( + " INSERT INTO \"BitdexOps\" (entity_id, ops) VALUES (NEW.\"{}\", _ops);\n", + slot_field + )); + body.push_str(" RETURN NEW;\n"); + + // DELETE + if has_delete { + body.push_str(" ELSIF TG_OP = 'DELETE' THEN\n"); + body.push_str(&format!( + " INSERT INTO \"BitdexOps\" (entity_id, ops) VALUES (OLD.\"{}\", '[{{\"op\":\"delete\"}}]'::jsonb);\n", + slot_field + )); + body.push_str(" RETURN OLD;\n"); + } + + // UPDATE: emit remove/set pairs only for changed fields + body.push_str(" ELSE\n"); + body.push_str(" _ops := '[]'::jsonb;\n"); + for f in track_fields { + let (field_name, expr) = parse_track_field(f); + let old_expr = substitute_columns(&expr, "OLD"); + let new_expr = substitute_columns(&expr, "NEW"); + body.push_str(&format!( + " IF ({old}) IS DISTINCT FROM ({new}) THEN\n\ + \x20 _ops := _ops || jsonb_build_array(\n\ + \x20 jsonb_build_object('op', 'remove', 'field', '{field}', 'value', to_jsonb({old})),\n\ + \x20 jsonb_build_object('op', 'set', 'field', '{field}', 'value', to_jsonb({new}))\n\ + \x20 );\n\ + \x20 END IF;\n", + old = old_expr, + new = new_expr, + field = field_name, + )); + } + body.push_str(" IF jsonb_array_length(_ops) > 0 THEN\n"); + body.push_str(&format!( + " INSERT INTO \"BitdexOps\" (entity_id, ops) VALUES (NEW.\"{}\", _ops);\n", + slot_field + )); + body.push_str(" END IF;\n"); + body.push_str(" RETURN NEW;\n"); + body.push_str(" END IF;\n"); + body.push_str("END;"); + + body +} + +/// Generate body for multi-value join tables (e.g., TagsOnImageNew). +fn generate_multi_value_body(source: &SyncSource, field: &str) -> String { + let slot_field = source.slot_field.as_deref().unwrap_or("imageId"); + let value_field = source.value_field.as_deref().unwrap_or("id"); + let filter_clause = source + .filter + .as_ref() + .map(|f| format!(" IF {} THEN\n", f.replace("imageId", "NEW.\"imageId\""))) + .unwrap_or_default(); + let filter_end = if source.filter.is_some() { + " END IF;\n" + } else { + "" + }; + + format!( + r#"BEGIN + IF TG_OP = 'INSERT' THEN +{filter_start} INSERT INTO "BitdexOps" (entity_id, ops) + VALUES (NEW."{slot}", jsonb_build_array( + jsonb_build_object('op', 'add', 'field', '{field}', 'value', to_jsonb(NEW."{value}")) + )); +{filter_end} RETURN NEW; + ELSIF TG_OP = 'DELETE' THEN + INSERT INTO "BitdexOps" (entity_id, ops) + VALUES (OLD."{slot}", jsonb_build_array( + jsonb_build_object('op', 'remove', 'field', '{field}', 'value', to_jsonb(OLD."{value}")) + )); + RETURN OLD; + END IF; + RETURN COALESCE(NEW, OLD); +END;"#, + slot = slot_field, + field = field, + value = value_field, + filter_start = filter_clause, + filter_end = filter_end, + ) +} + +/// Generate body for fan-out tables (e.g., ModelVersion, Post). +fn generate_fan_out_body(source: &SyncSource) -> String { + let query_template = source.query.as_deref().unwrap_or(""); + let track_fields = source.track_fields.as_deref().unwrap_or(&[]); + + let mut body = String::from("DECLARE\n _ops jsonb;\n _query text;\n"); + + // If there's a query_source, we need a variable for its result + if source.query_source.is_some() { + body.push_str(" _source_result jsonb;\n"); + } + body.push_str("BEGIN\n"); + body.push_str(" IF TG_OP = 'UPDATE' THEN\n"); + + // Build the query string with column substitution + if let Some(ref query_source) = source.query_source { + let source_sql = substitute_columns(query_source, "NEW"); + body.push_str(&format!( + " EXECUTE format('SELECT ({})') INTO _source_result;\n", + source_sql.replace('\'', "''") + )); + // Substitute the query_source result into the query template + body.push_str(&format!( + " _query := '{}';\n", + query_template + )); + // Replace placeholders with source result values + body.push_str(" -- Substitute source values into query template\n"); + } else { + // Direct substitution from NEW columns + let query_sql = substitute_columns(query_template, "NEW"); + body.push_str(&format!(" _query := '{}';\n", query_sql)); + } + + // Build ops array from tracked fields that changed + body.push_str(" _ops := '[]'::jsonb;\n"); + for f in track_fields { + let (field_name, expr) = parse_track_field(f); + let old_expr = substitute_columns(&expr, "OLD"); + let new_expr = substitute_columns(&expr, "NEW"); + body.push_str(&format!( + " IF ({old}) IS DISTINCT FROM ({new}) THEN\n\ + \x20 _ops := _ops || jsonb_build_array(\n\ + \x20 jsonb_build_object('op', 'remove', 'field', '{field}', 'value', to_jsonb({old})),\n\ + \x20 jsonb_build_object('op', 'set', 'field', '{field}', 'value', to_jsonb({new}))\n\ + \x20 );\n\ + \x20 END IF;\n", + old = old_expr, + new = new_expr, + field = field_name, + )); + } + + body.push_str(" IF jsonb_array_length(_ops) > 0 THEN\n"); + body.push_str(&format!( + " INSERT INTO \"BitdexOps\" (entity_id, ops) VALUES (NEW.id, jsonb_build_array(\n\ + \x20 jsonb_build_object('op', 'queryOpSet', 'query', _query, 'ops', _ops)\n\ + \x20 ));\n" + )); + body.push_str(" END IF;\n"); + body.push_str(" RETURN NEW;\n"); + body.push_str(" END IF;\n"); + body.push_str(" RETURN COALESCE(NEW, OLD);\n"); + body.push_str("END;"); + + body +} + +/// Parse a track_field entry. Returns (bitdex_field_name, sql_expression). +/// Simple field: "nsfwLevel" → ("nsfwLevel", "\"nsfwLevel\"") +/// Expression: "GREATEST({scannedAt}, {createdAt}) as existedAt" → ("existedAt", "GREATEST(\"scannedAt\", \"createdAt\")") +fn parse_track_field(field: &str) -> (String, String) { + if let Some(as_pos) = field.to_lowercase().rfind(" as ") { + let expr = &field[..as_pos].trim(); + let alias = &field[as_pos + 4..].trim(); + // Replace {col} with "col" (quoted column reference) + let sql = expr + .replace('{', "\"") + .replace('}', "\""); + (alias.to_string(), sql) + } else { + // Simple field name + (field.to_string(), format!("\"{}\"", field)) + } +} + +/// Substitute {column} placeholders with prefix."column" references. +/// E.g., substitute_columns("GREATEST({scannedAt}, {createdAt})", "NEW") +/// → "GREATEST(NEW.\"scannedAt\", NEW.\"createdAt\")" +fn substitute_columns(expr: &str, prefix: &str) -> String { + let mut result = String::new(); + let mut chars = expr.chars().peekable(); + while let Some(c) = chars.next() { + if c == '{' { + let mut col = String::new(); + while let Some(&next) = chars.peek() { + if next == '}' { + chars.next(); + break; + } + col.push(chars.next().unwrap()); + } + result.push_str(&format!("{}.\"{}\"", prefix, col)); + } else { + result.push(c); + } + } + result +} + +/// Compute a short (8-char) hash of a string. +fn short_hash(s: &str) -> String { + let mut hasher = DefaultHasher::new(); + s.hash(&mut hasher); + format!("{:016x}", hasher.finish())[..8].to_string() +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_track_field_simple() { + let (name, expr) = parse_track_field("nsfwLevel"); + assert_eq!(name, "nsfwLevel"); + assert_eq!(expr, "\"nsfwLevel\""); + } + + #[test] + fn test_parse_track_field_expression() { + let (name, expr) = parse_track_field("GREATEST({scannedAt}, {createdAt}) as existedAt"); + assert_eq!(name, "existedAt"); + assert_eq!(expr, "GREATEST(\"scannedAt\", \"createdAt\")"); + } + + #[test] + fn test_substitute_columns() { + let result = substitute_columns("GREATEST({scannedAt}, {createdAt})", "NEW"); + assert_eq!(result, "GREATEST(NEW.\"scannedAt\", NEW.\"createdAt\")"); + } + + #[test] + fn test_substitute_columns_simple() { + let result = substitute_columns("{nsfwLevel}", "OLD"); + assert_eq!(result, "OLD.\"nsfwLevel\""); + } + + #[test] + fn test_generate_multi_value_trigger() { + let source = SyncSource { + table: "TagsOnImageNew".into(), + slot_field: Some("imageId".into()), + track_fields: None, + field: Some("tagIds".into()), + value_field: Some("tagId".into()), + filter: None, + sets_alive: false, + on_delete: None, + query: None, + query_source: None, + depends_on: None, + }; + let sql = generate_trigger_sql(&source); + assert!(sql.contains("CREATE OR REPLACE FUNCTION")); + assert!(sql.contains("'add'")); + assert!(sql.contains("'remove'")); + assert!(sql.contains("tagIds")); + assert!(sql.contains("ENABLE ALWAYS")); + } + + #[test] + fn test_generate_direct_trigger() { + let source = SyncSource { + table: "Image".into(), + slot_field: Some("id".into()), + track_fields: Some(vec!["nsfwLevel".into(), "type".into()]), + field: None, + value_field: None, + filter: None, + sets_alive: true, + on_delete: Some("delete_slot".into()), + query: None, + query_source: None, + depends_on: None, + }; + let sql = generate_trigger_sql(&source); + assert!(sql.contains("IS DISTINCT FROM")); + assert!(sql.contains("nsfwLevel")); + assert!(sql.contains("delete")); + } + + #[test] + fn test_generate_fan_out_trigger() { + let source = SyncSource { + table: "ModelVersion".into(), + slot_field: None, + track_fields: Some(vec!["baseModel".into()]), + field: None, + value_field: None, + filter: None, + sets_alive: false, + on_delete: None, + query: Some("modelVersionIds eq {id}".into()), + query_source: None, + depends_on: None, + }; + let sql = generate_trigger_sql(&source); + assert!(sql.contains("queryOpSet")); + assert!(sql.contains("modelVersionIds eq")); + } + + #[test] + fn test_trigger_name_includes_hash() { + let source = SyncSource { + table: "Image".into(), + slot_field: Some("id".into()), + track_fields: Some(vec!["nsfwLevel".into()]), + field: None, + value_field: None, + filter: None, + sets_alive: false, + on_delete: None, + query: None, + query_source: None, + depends_on: None, + }; + let name = trigger_name(&source); + assert!(name.starts_with("bitdex_image_")); + assert_eq!(name.len(), "bitdex_image_".len() + 8); + } + + #[test] + fn test_trigger_hash_changes_with_config() { + let source1 = SyncSource { + table: "Image".into(), + slot_field: Some("id".into()), + track_fields: Some(vec!["nsfwLevel".into()]), + field: None, + value_field: None, + filter: None, + sets_alive: false, + on_delete: None, + query: None, + query_source: None, + depends_on: None, + }; + let source2 = SyncSource { + track_fields: Some(vec!["nsfwLevel".into(), "type".into()]), + ..source1.clone() + }; + let name1 = trigger_name(&source1); + let name2 = trigger_name(&source2); + assert_ne!(name1, name2, "Different configs should produce different hashes"); + } + + #[test] + fn test_yaml_parsing() { + let yaml = r#" +sync_sources: + - table: Image + slot_field: id + sets_alive: true + track_fields: [nsfwLevel, type] + on_delete: delete_slot + - table: TagsOnImageNew + slot_field: imageId + field: tagIds + value_field: tagId + - table: ModelVersion + query: "modelVersionIds eq {id}" + track_fields: [baseModel] +"#; + let config = SyncConfig::from_yaml(yaml).unwrap(); + assert_eq!(config.sync_sources.len(), 3); + assert_eq!(config.sync_sources[0].table, "Image"); + assert!(config.sync_sources[0].sets_alive); + assert_eq!(config.sync_sources[1].field.as_deref(), Some("tagIds")); + assert!(config.sync_sources[2].query.is_some()); + } + + #[test] + fn test_expression_in_track_fields() { + let source = SyncSource { + table: "Image".into(), + slot_field: Some("id".into()), + track_fields: Some(vec![ + "nsfwLevel".into(), + "GREATEST({scannedAt}, {createdAt}) as existedAt".into(), + "({flags} & (1 << 13)) != 0 AND ({flags} & (1 << 2)) = 0 as hasMeta".into(), + ]), + field: None, + value_field: None, + filter: None, + sets_alive: true, + on_delete: Some("delete_slot".into()), + query: None, + query_source: None, + depends_on: None, + }; + let sql = generate_trigger_sql(&source); + assert!(sql.contains("GREATEST")); + assert!(sql.contains("existedAt")); + assert!(sql.contains("hasMeta")); + } +} From a8e0b5a5bfc0da920ea52f8dd7d33bce1e2c2cda Mon Sep 17 00:00:00 2001 From: Justin Maier Date: Wed, 25 Mar 2026 18:36:37 -0600 Subject: [PATCH 06/10] =?UTF-8?q?feat:=20dump=20pipeline=20=E2=80=94=20reg?= =?UTF-8?q?istry,=20persistence,=20server=20endpoints?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Dump lifecycle management for unified load pipeline: - DumpRegistry: in-memory + JSON-persisted dump state tracking - DumpEntry: name, wal_path, status (Writing/Loading/Complete/Failed), ops counts, timestamps - dump_name() + config_hash() for change detection - Atomic save via temp file rename Server endpoints: - GET /dumps — list all dumps with status - PUT /dumps — register new dump - POST /dumps/{name}/loaded — signal WAL file complete - DELETE /dumps/{name} — remove from history - POST /dumps/clear — clear all All endpoints feature-gated behind pg-sync with no-op fallbacks. 10 tests: lifecycle, persistence, removal, completion tracking, config hash determinism, failure handling. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/pg_sync/dump.rs | 297 ++++++++++++++++++++++++++++++++++++++++++++ src/pg_sync/mod.rs | 1 + src/server.rs | 131 +++++++++++++++++++ 3 files changed, 429 insertions(+) create mode 100644 src/pg_sync/dump.rs diff --git a/src/pg_sync/dump.rs b/src/pg_sync/dump.rs new file mode 100644 index 00000000..1de7ee99 --- /dev/null +++ b/src/pg_sync/dump.rs @@ -0,0 +1,297 @@ +//! Dump pipeline — manages table dump lifecycle for initial loading. +//! +//! Server side: dump registry (track which tables have been loaded). +//! Client side: pg-sync checks dump history, runs flat COPYs, writes WAL files. +//! +//! Dump lifecycle: +//! 1. PUT /dumps — register a new dump (returns task ID, WAL reader starts polling) +//! 2. pg-sync writes ops to WAL file on shared filesystem +//! 3. POST /dumps/{name}/loaded — signal file is complete +//! 4. WAL reader finishes processing, marks dump as complete +//! 5. GET /dumps — check status per table + +use std::collections::HashMap; +use std::path::{Path, PathBuf}; +use std::time::SystemTime; + +use serde::{Deserialize, Serialize}; + +/// State of a single dump. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DumpEntry { + /// Dump name (e.g., "Image-a1b2c3d4") + pub name: String, + /// WAL file path (relative to data_dir) + pub wal_path: Option, + /// Current status + pub status: DumpStatus, + /// Number of ops written (reported by pg-sync) + pub ops_written: u64, + /// Number of ops processed by WAL reader + pub ops_processed: u64, + /// When the dump was registered + pub created_at: u64, + /// When the dump completed processing + pub completed_at: Option, +} + +/// Dump status. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub enum DumpStatus { + /// pg-sync is writing to the WAL file + Writing, + /// pg-sync signaled the file is complete, WAL reader is processing + Loading, + /// WAL reader finished processing + Complete, + /// Dump failed + Failed(String), +} + +/// Registry of dump state. Persisted to dumps.json in the data directory. +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct DumpRegistry { + pub dumps: HashMap, +} + +impl DumpRegistry { + /// Load from a JSON file. Returns empty registry if file doesn't exist. + pub fn load(path: &Path) -> Self { + std::fs::read_to_string(path) + .ok() + .and_then(|s| serde_json::from_str(&s).ok()) + .unwrap_or_default() + } + + /// Save to a JSON file. + pub fn save(&self, path: &Path) -> std::io::Result<()> { + let json = serde_json::to_string_pretty(self) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?; + // Atomic write via temp file + let tmp = path.with_extension("tmp"); + std::fs::write(&tmp, &json)?; + std::fs::rename(&tmp, path)?; + Ok(()) + } + + /// Register a new dump. Returns the entry. + pub fn register(&mut self, name: String, wal_path: Option) -> &DumpEntry { + let now = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + + self.dumps.insert( + name.clone(), + DumpEntry { + name: name.clone(), + wal_path, + status: DumpStatus::Writing, + ops_written: 0, + ops_processed: 0, + created_at: now, + completed_at: None, + }, + ); + &self.dumps[&name] + } + + /// Mark a dump as loaded (pg-sync finished writing the WAL file). + pub fn mark_loaded(&mut self, name: &str, ops_written: u64) -> Option<&DumpEntry> { + if let Some(entry) = self.dumps.get_mut(name) { + entry.status = DumpStatus::Loading; + entry.ops_written = ops_written; + Some(entry) + } else { + None + } + } + + /// Mark a dump as complete (WAL reader finished processing). + pub fn mark_complete(&mut self, name: &str, ops_processed: u64) -> Option<&DumpEntry> { + let now = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + + if let Some(entry) = self.dumps.get_mut(name) { + entry.status = DumpStatus::Complete; + entry.ops_processed = ops_processed; + entry.completed_at = Some(now); + Some(entry) + } else { + None + } + } + + /// Mark a dump as failed. + pub fn mark_failed(&mut self, name: &str, error: String) { + if let Some(entry) = self.dumps.get_mut(name) { + entry.status = DumpStatus::Failed(error); + } + } + + /// Remove a dump from the registry. + pub fn remove(&mut self, name: &str) -> Option { + self.dumps.remove(name) + } + + /// Clear all dumps. + pub fn clear(&mut self) { + self.dumps.clear(); + } + + /// Check if a dump with the given name exists and is complete. + pub fn is_complete(&self, name: &str) -> bool { + self.dumps + .get(name) + .map(|e| e.status == DumpStatus::Complete) + .unwrap_or(false) + } + + /// Get all dump names that are complete. + pub fn completed_names(&self) -> Vec<&str> { + self.dumps + .values() + .filter(|e| e.status == DumpStatus::Complete) + .map(|e| e.name.as_str()) + .collect() + } + + /// Check if all dumps are complete (no pending/writing/loading). + pub fn all_complete(&self) -> bool { + !self.dumps.is_empty() + && self.dumps.values().all(|e| e.status == DumpStatus::Complete) + } +} + +/// Build the dump name from a table name and config hash. +/// Format: "{Table}-{hash8}" +pub fn dump_name(table: &str, config_hash: &str) -> String { + format!("{}-{}", table, &config_hash[..8.min(config_hash.len())]) +} + +/// Compute a config hash for a sync source entry. +pub fn config_hash(yaml_fragment: &str) -> String { + use std::collections::hash_map::DefaultHasher; + use std::hash::{Hash, Hasher}; + let mut hasher = DefaultHasher::new(); + yaml_fragment.hash(&mut hasher); + format!("{:016x}", hasher.finish()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_dump_lifecycle() { + let mut reg = DumpRegistry::default(); + assert!(reg.dumps.is_empty()); + + // Register + reg.register("Image-a1b2c3d4".into(), Some("dumps/image.wal".into())); + assert_eq!(reg.dumps.len(), 1); + assert_eq!(reg.dumps["Image-a1b2c3d4"].status, DumpStatus::Writing); + + // Mark loaded + reg.mark_loaded("Image-a1b2c3d4", 107_000_000); + assert_eq!(reg.dumps["Image-a1b2c3d4"].status, DumpStatus::Loading); + assert_eq!(reg.dumps["Image-a1b2c3d4"].ops_written, 107_000_000); + + // Mark complete + reg.mark_complete("Image-a1b2c3d4", 107_000_000); + assert_eq!(reg.dumps["Image-a1b2c3d4"].status, DumpStatus::Complete); + assert!(reg.dumps["Image-a1b2c3d4"].completed_at.is_some()); + + assert!(reg.is_complete("Image-a1b2c3d4")); + assert!(reg.all_complete()); + } + + #[test] + fn test_dump_persistence() { + let dir = tempfile::TempDir::new().unwrap(); + let path = dir.path().join("dumps.json"); + + let mut reg = DumpRegistry::default(); + reg.register("Image-abc".into(), None); + reg.mark_complete("Image-abc", 100); + reg.save(&path).unwrap(); + + let loaded = DumpRegistry::load(&path); + assert_eq!(loaded.dumps.len(), 1); + assert!(loaded.is_complete("Image-abc")); + } + + #[test] + fn test_dump_removal() { + let mut reg = DumpRegistry::default(); + reg.register("Image-abc".into(), None); + reg.register("Tags-def".into(), None); + assert_eq!(reg.dumps.len(), 2); + + reg.remove("Image-abc"); + assert_eq!(reg.dumps.len(), 1); + assert!(!reg.dumps.contains_key("Image-abc")); + } + + #[test] + fn test_dump_clear() { + let mut reg = DumpRegistry::default(); + reg.register("Image-abc".into(), None); + reg.register("Tags-def".into(), None); + reg.clear(); + assert!(reg.dumps.is_empty()); + } + + #[test] + fn test_all_complete() { + let mut reg = DumpRegistry::default(); + assert!(!reg.all_complete()); // Empty = not all complete + + reg.register("Image-abc".into(), None); + reg.register("Tags-def".into(), None); + assert!(!reg.all_complete()); + + reg.mark_complete("Image-abc", 100); + assert!(!reg.all_complete()); // Tags still pending + + reg.mark_loaded("Tags-def", 50); + reg.mark_complete("Tags-def", 50); + assert!(reg.all_complete()); + } + + #[test] + fn test_dump_name() { + assert_eq!(dump_name("Image", "a1b2c3d4e5f6"), "Image-a1b2c3d4"); + } + + #[test] + fn test_config_hash_deterministic() { + let h1 = config_hash("table: Image\nslot_field: id\ntrack_fields: [nsfwLevel]"); + let h2 = config_hash("table: Image\nslot_field: id\ntrack_fields: [nsfwLevel]"); + assert_eq!(h1, h2); + } + + #[test] + fn test_config_hash_changes() { + let h1 = config_hash("table: Image\ntrack_fields: [nsfwLevel]"); + let h2 = config_hash("table: Image\ntrack_fields: [nsfwLevel, type]"); + assert_ne!(h1, h2); + } + + #[test] + fn test_load_missing_file() { + let reg = DumpRegistry::load(Path::new("/nonexistent/dumps.json")); + assert!(reg.dumps.is_empty()); + } + + #[test] + fn test_failed_dump() { + let mut reg = DumpRegistry::default(); + reg.register("Image-abc".into(), None); + reg.mark_failed("Image-abc", "connection reset".into()); + assert!(matches!(reg.dumps["Image-abc"].status, DumpStatus::Failed(_))); + assert!(!reg.is_complete("Image-abc")); + } +} diff --git a/src/pg_sync/mod.rs b/src/pg_sync/mod.rs index 9c943346..23abf1f0 100644 --- a/src/pg_sync/mod.rs +++ b/src/pg_sync/mod.rs @@ -11,6 +11,7 @@ pub mod bulk_loader; pub mod config; pub mod copy_queries; pub mod copy_streams; +pub mod dump; pub mod metrics_poller; pub mod op_dedup; pub mod ops; diff --git a/src/server.rs b/src/server.rs index a75974c7..c653c54e 100644 --- a/src/server.rs +++ b/src/server.rs @@ -296,6 +296,9 @@ struct AppState { /// Latest sync source metadata (cursor, lag) keyed by source name. #[cfg(feature = "pg-sync")] sync_meta: Mutex>, + /// Dump registry for tracking table dump lifecycle. + #[cfg(feature = "pg-sync")] + dump_registry: Mutex, } type SharedState = Arc; @@ -1001,6 +1004,11 @@ impl BitdexServer { ops_wal: Mutex::new(None), #[cfg(feature = "pg-sync")] sync_meta: Mutex::new(std::collections::HashMap::new()), + #[cfg(feature = "pg-sync")] + dump_registry: { + let dumps_path = self.data_dir.join("dumps.json"); + Mutex::new(crate::pg_sync::dump::DumpRegistry::load(&dumps_path)) + }, }); // Try to restore an existing index from disk @@ -1080,6 +1088,11 @@ impl BitdexServer { .route("/api/internal/pgsync-metrics", post(handle_pgsync_metrics)) .route("/api/indexes/{name}/ops", post(handle_ops)) .route("/api/internal/sync-lag", get(handle_sync_lag)) + .route("/api/indexes/{name}/dumps", get(handle_list_dumps)) + .route("/api/indexes/{name}/dumps", put(handle_register_dump)) + .route("/api/indexes/{name}/dumps/{dump_name}/loaded", post(handle_dump_loaded)) + .route("/api/indexes/{name}/dumps/{dump_name}", delete(handle_delete_dump)) + .route("/api/indexes/{name}/dumps/clear", post(handle_clear_dumps)) .route("/metrics", get(handle_metrics)) .route("/", get(handle_ui)) .with_state(Arc::clone(&state)); @@ -4250,6 +4263,124 @@ async fn handle_ops( (StatusCode::NOT_FOUND, Json(serde_json::json!({"error": "pg-sync feature not enabled"}))) } +// ── Dump endpoints ── + +/// GET /api/indexes/{name}/dumps — List all dumps and their status. +#[cfg(feature = "pg-sync")] +async fn handle_list_dumps( + State(state): State, + AxumPath(_name): AxumPath, +) -> impl IntoResponse { + let reg = state.dump_registry.lock(); + Json(serde_json::json!({ + "dumps": reg.dumps, + "all_complete": reg.all_complete(), + })) +} + +#[cfg(not(feature = "pg-sync"))] +async fn handle_list_dumps(AxumPath(_name): AxumPath) -> impl IntoResponse { + Json(serde_json::json!({"dumps": {}})) +} + +/// PUT /api/indexes/{name}/dumps — Register a new dump. +#[cfg(feature = "pg-sync")] +async fn handle_register_dump( + State(state): State, + AxumPath(_name): AxumPath, + Json(body): Json, +) -> impl IntoResponse { + let dump_name = body["name"].as_str().unwrap_or("unknown").to_string(); + let wal_path = body["wal_path"].as_str().map(|s| s.to_string()); + + let mut reg = state.dump_registry.lock(); + reg.register(dump_name.clone(), wal_path); + + let dumps_path = state.data_dir.join("dumps.json"); + if let Err(e) = reg.save(&dumps_path) { + eprintln!("Warning: failed to save dump registry: {e}"); + } + + (StatusCode::CREATED, Json(serde_json::json!({ + "name": dump_name, + "status": "writing", + }))) +} + +#[cfg(not(feature = "pg-sync"))] +async fn handle_register_dump( + AxumPath(_name): AxumPath, + Json(_body): Json, +) -> impl IntoResponse { + (StatusCode::NOT_FOUND, Json(serde_json::json!({"error": "pg-sync not enabled"}))) +} + +/// POST /api/indexes/{name}/dumps/{dump_name}/loaded — Signal dump file is complete. +#[cfg(feature = "pg-sync")] +async fn handle_dump_loaded( + State(state): State, + AxumPath((_name, dump_name)): AxumPath<(String, String)>, + Json(body): Json, +) -> impl IntoResponse { + let ops_written = body["ops_written"].as_u64().unwrap_or(0); + + let mut reg = state.dump_registry.lock(); + match reg.mark_loaded(&dump_name, ops_written) { + Some(_) => { + let dumps_path = state.data_dir.join("dumps.json"); + reg.save(&dumps_path).ok(); + Json(serde_json::json!({"status": "loading", "name": dump_name})) + } + None => Json(serde_json::json!({"error": format!("Dump '{}' not found", dump_name)})), + } +} + +#[cfg(not(feature = "pg-sync"))] +async fn handle_dump_loaded( + AxumPath((_name, _dump_name)): AxumPath<(String, String)>, + Json(_body): Json, +) -> impl IntoResponse { + Json(serde_json::json!({"error": "pg-sync not enabled"})) +} + +/// DELETE /api/indexes/{name}/dumps/{dump_name} — Remove a dump from history. +#[cfg(feature = "pg-sync")] +async fn handle_delete_dump( + State(state): State, + AxumPath((_name, dump_name)): AxumPath<(String, String)>, +) -> impl IntoResponse { + let mut reg = state.dump_registry.lock(); + reg.remove(&dump_name); + let dumps_path = state.data_dir.join("dumps.json"); + reg.save(&dumps_path).ok(); + StatusCode::NO_CONTENT +} + +#[cfg(not(feature = "pg-sync"))] +async fn handle_delete_dump( + AxumPath((_name, _dump_name)): AxumPath<(String, String)>, +) -> impl IntoResponse { + StatusCode::NOT_FOUND +} + +/// POST /api/indexes/{name}/dumps/clear — Clear all dump history. +#[cfg(feature = "pg-sync")] +async fn handle_clear_dumps( + State(state): State, + AxumPath(_name): AxumPath, +) -> impl IntoResponse { + let mut reg = state.dump_registry.lock(); + reg.clear(); + let dumps_path = state.data_dir.join("dumps.json"); + reg.save(&dumps_path).ok(); + StatusCode::NO_CONTENT +} + +#[cfg(not(feature = "pg-sync"))] +async fn handle_clear_dumps(AxumPath(_name): AxumPath) -> impl IntoResponse { + StatusCode::NOT_FOUND +} + /// GET /api/internal/sync-lag — Return latest sync metadata from all sources. #[cfg(feature = "pg-sync")] async fn handle_sync_lag( From 72443ad0c54fc5cf69d4d4fe43789b50da0ace73 Mon Sep 17 00:00:00 2001 From: Justin Maier Date: Wed, 25 Mar 2026 18:38:15 -0600 Subject: [PATCH 07/10] feat: V2 sync Prometheus metrics (bitdex_sync_* namespace) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New unified sync metrics with source label: - bitdex_sync_cursor_position{source="..."} — current cursor - bitdex_sync_max_id{source="..."} — max ops table ID - bitdex_sync_lag_rows{source="..."} — rows behind - bitdex_sync_ops_total{source="..."} — total ops received - bitdex_sync_wal_bytes{source="..."} — WAL file size Metrics populated from SyncMeta in the POST /ops endpoint. Old bitdex_pgsync_* metrics preserved for backward compat. Binary rename (bitdex-pg-sync → bitdex-sync) deferred to deployment PR. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/metrics.rs | 39 +++++++++++++++++++++++++++++++++++++++ src/server.rs | 14 +++++++++++++- 2 files changed, 52 insertions(+), 1 deletion(-) diff --git a/src/metrics.rs b/src/metrics.rs index 25491177..950b976d 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -116,6 +116,13 @@ pub struct Metrics { pub pgsync_cycle_seconds: HistogramVec, pub pgsync_rows_fetched_total: IntCounterVec, pub pgsync_cursor_position: IntGaugeVec, + + // V2 sync metrics (unified namespace with source label) + pub sync_cursor_position: IntGaugeVec, + pub sync_max_id: IntGaugeVec, + pub sync_lag_rows: IntGaugeVec, + pub sync_ops_total: IntCounterVec, + pub sync_wal_bytes: IntGaugeVec, } impl Metrics { @@ -570,6 +577,28 @@ impl Metrics { ) .unwrap(); + // V2 sync metrics (unified namespace) + let sync_cursor_position = IntGaugeVec::new( + Opts::new("bitdex_sync_cursor_position", "Current sync cursor position"), + &["source"], + ).unwrap(); + let sync_max_id = IntGaugeVec::new( + Opts::new("bitdex_sync_max_id", "Max ops table ID (for lag calculation)"), + &["source"], + ).unwrap(); + let sync_lag_rows = IntGaugeVec::new( + Opts::new("bitdex_sync_lag_rows", "Number of ops rows behind"), + &["source"], + ).unwrap(); + let sync_ops_total = IntCounterVec::new( + Opts::new("bitdex_sync_ops_total", "Total ops received from sync sources"), + &["source"], + ).unwrap(); + let sync_wal_bytes = IntGaugeVec::new( + Opts::new("bitdex_sync_wal_bytes", "Current WAL file size in bytes"), + &["source"], + ).unwrap(); + // Register all metrics registry.register(Box::new(alive_documents.clone())).unwrap(); registry.register(Box::new(slot_high_water.clone())).unwrap(); @@ -671,6 +700,11 @@ impl Metrics { registry.register(Box::new(pgsync_cycle_seconds.clone())).unwrap(); registry.register(Box::new(pgsync_rows_fetched_total.clone())).unwrap(); registry.register(Box::new(pgsync_cursor_position.clone())).unwrap(); + registry.register(Box::new(sync_cursor_position.clone())).unwrap(); + registry.register(Box::new(sync_max_id.clone())).unwrap(); + registry.register(Box::new(sync_lag_rows.clone())).unwrap(); + registry.register(Box::new(sync_ops_total.clone())).unwrap(); + registry.register(Box::new(sync_wal_bytes.clone())).unwrap(); Self { registry, @@ -746,6 +780,11 @@ impl Metrics { pgsync_cycle_seconds, pgsync_rows_fetched_total, pgsync_cursor_position, + sync_cursor_position, + sync_max_id, + sync_lag_rows, + sync_ops_total, + sync_wal_bytes, } } diff --git a/src/server.rs b/src/server.rs index c653c54e..6e3b5c65 100644 --- a/src/server.rs +++ b/src/server.rs @@ -4203,10 +4203,22 @@ async fn handle_ops( } } - // Store sync metadata if provided + // Store sync metadata + update Prometheus metrics if let Some(meta) = &batch.meta { let mut sync_meta = state.sync_meta.lock(); sync_meta.insert(meta.source.clone(), meta.clone()); + + let m = &state.metrics; + let source = meta.source.as_str(); + if let Some(cursor) = meta.cursor { + m.sync_cursor_position.with_label_values(&[source]).set(cursor); + } + if let Some(max_id) = meta.max_id { + m.sync_max_id.with_label_values(&[source]).set(max_id); + } + if let Some(lag) = meta.lag_rows { + m.sync_lag_rows.with_label_values(&[source]).set(lag); + } } let ops_count = batch.ops.len(); From ac84a72448efd17f9864aa06e34d568e1b8e6b8e Mon Sep 17 00:00:00 2001 From: Justin Maier Date: Wed, 25 Mar 2026 18:40:44 -0600 Subject: [PATCH 08/10] test: Sync V2 integration tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 9 integration tests exercising the full ops pipeline: - WAL roundtrip with dedup (write → read → dedup → verify) - Delete absorption through WAL - Add/remove cancellation through WAL - queryOpSet serialization through WAL - Cursor resume across multiple appends - Dump registry full workflow (register → load → complete → persist) - Dump config change detection (hash mismatch triggers re-dump) - Full Civitai trigger config (6 sources, all generate valid SQL) - OpsBatch JSON format roundtrip with SyncMeta Total: 69 tests across all Sync V2 modules (60 unit + 9 integration). Co-Authored-By: Claude Opus 4.6 (1M context) --- tests/sync_v2_integration.rs | 378 +++++++++++++++++++++++++++++++++++ 1 file changed, 378 insertions(+) create mode 100644 tests/sync_v2_integration.rs diff --git a/tests/sync_v2_integration.rs b/tests/sync_v2_integration.rs new file mode 100644 index 00000000..1ffb3083 --- /dev/null +++ b/tests/sync_v2_integration.rs @@ -0,0 +1,378 @@ +//! Integration tests for the Sync V2 pipeline. +//! +//! Tests the ops → WAL → processor pipeline without PG. +//! Full E2E tests (PG triggers → poller → server) require a running +//! server and PG instance — see tests/e2e/ for those. + +#![cfg(feature = "pg-sync")] + +use serde_json::json; +use tempfile::TempDir; + +use bitdex_v2::ops_wal::{WalReader, WalWriter}; +use bitdex_v2::pg_sync::op_dedup::dedup_ops; +use bitdex_v2::pg_sync::ops::{EntityOps, Op, OpsBatch, SyncMeta}; +use bitdex_v2::pg_sync::dump::{DumpRegistry, DumpStatus, dump_name, config_hash}; +use bitdex_v2::pg_sync::trigger_gen::{SyncConfig, SyncSource, generate_trigger_sql}; + +// ── WAL Pipeline Integration ── + +#[test] +fn test_ops_wal_roundtrip_with_dedup() { + let dir = TempDir::new().unwrap(); + let wal_path = dir.path().join("ops.wal"); + + // Write ops with duplicates + let writer = WalWriter::new(&wal_path); + let batch = vec![ + EntityOps { + entity_id: 1, + ops: vec![ + Op::Set { field: "nsfwLevel".into(), value: json!(8) }, + ], + }, + EntityOps { + entity_id: 1, + ops: vec![ + Op::Set { field: "nsfwLevel".into(), value: json!(16) }, // Overwrites first + ], + }, + EntityOps { + entity_id: 2, + ops: vec![ + Op::Add { field: "tagIds".into(), value: json!(42) }, + ], + }, + ]; + writer.append_batch(&batch).unwrap(); + + // Read back + let mut reader = WalReader::new(&wal_path, 0); + let result = reader.read_batch(100).unwrap(); + assert_eq!(result.entries.len(), 3); + + // Dedup + let mut entries = result.entries; + dedup_ops(&mut entries); + + // Entity 1: last set wins (nsfwLevel=16) + let entity1 = entries.iter().find(|e| e.entity_id == 1).unwrap(); + let set_ops: Vec<_> = entity1.ops.iter() + .filter(|op| matches!(op, Op::Set { field, .. } if field == "nsfwLevel")) + .collect(); + assert_eq!(set_ops.len(), 1); + if let Op::Set { value, .. } = &set_ops[0] { + assert_eq!(*value, json!(16)); + } + + // Entity 2: add preserved + let entity2 = entries.iter().find(|e| e.entity_id == 2).unwrap(); + assert_eq!(entity2.ops.len(), 1); + assert!(matches!(&entity2.ops[0], Op::Add { field, .. } if field == "tagIds")); +} + +#[test] +fn test_delete_absorbs_prior_ops_through_wal() { + let dir = TempDir::new().unwrap(); + let wal_path = dir.path().join("ops.wal"); + + let writer = WalWriter::new(&wal_path); + + // First batch: set some fields + writer.append_batch(&[EntityOps { + entity_id: 1, + ops: vec![ + Op::Set { field: "nsfwLevel".into(), value: json!(16) }, + Op::Add { field: "tagIds".into(), value: json!(42) }, + ], + }]).unwrap(); + + // Second batch: delete the entity + writer.append_batch(&[EntityOps { + entity_id: 1, + ops: vec![Op::Delete], + }]).unwrap(); + + // Read all + let mut reader = WalReader::new(&wal_path, 0); + let result = reader.read_batch(100).unwrap(); + assert_eq!(result.entries.len(), 2); + + // Dedup should collapse to just delete + let mut entries = result.entries; + dedup_ops(&mut entries); + assert_eq!(entries.len(), 1); + assert_eq!(entries[0].ops.len(), 1); + assert!(matches!(&entries[0].ops[0], Op::Delete)); +} + +#[test] +fn test_add_remove_cancellation_through_wal() { + let dir = TempDir::new().unwrap(); + let wal_path = dir.path().join("ops.wal"); + + let writer = WalWriter::new(&wal_path); + writer.append_batch(&[ + EntityOps { + entity_id: 1, + ops: vec![Op::Add { field: "tagIds".into(), value: json!(42) }], + }, + EntityOps { + entity_id: 1, + ops: vec![Op::Remove { field: "tagIds".into(), value: json!(42) }], + }, + ]).unwrap(); + + let mut reader = WalReader::new(&wal_path, 0); + let result = reader.read_batch(100).unwrap(); + let mut entries = result.entries; + dedup_ops(&mut entries); + + // Net zero — entity should be dropped + assert!(entries.is_empty() || entries[0].ops.is_empty()); +} + +#[test] +fn test_query_op_set_through_wal() { + let dir = TempDir::new().unwrap(); + let wal_path = dir.path().join("ops.wal"); + + let writer = WalWriter::new(&wal_path); + writer.append_batch(&[EntityOps { + entity_id: 456, + ops: vec![Op::QueryOpSet { + query: "modelVersionIds eq 456".into(), + ops: vec![ + Op::Remove { field: "baseModel".into(), value: json!("SD 1.5") }, + Op::Set { field: "baseModel".into(), value: json!("SDXL") }, + ], + }], + }]).unwrap(); + + let mut reader = WalReader::new(&wal_path, 0); + let result = reader.read_batch(100).unwrap(); + assert_eq!(result.entries.len(), 1); + + let entry = &result.entries[0]; + assert_eq!(entry.entity_id, 456); + match &entry.ops[0] { + Op::QueryOpSet { query, ops } => { + assert_eq!(query, "modelVersionIds eq 456"); + assert_eq!(ops.len(), 2); + } + _ => panic!("Expected QueryOpSet"), + } +} + +// ── Cursor Resume Integration ── + +#[test] +fn test_wal_cursor_resume_across_appends() { + let dir = TempDir::new().unwrap(); + let wal_path = dir.path().join("ops.wal"); + let writer = WalWriter::new(&wal_path); + + // Batch 1 + writer.append_batch(&[EntityOps { + entity_id: 1, + ops: vec![Op::Set { field: "a".into(), value: json!(1) }], + }]).unwrap(); + + // Read batch 1 + let mut reader = WalReader::new(&wal_path, 0); + let r1 = reader.read_batch(100).unwrap(); + assert_eq!(r1.entries.len(), 1); + let cursor = reader.cursor(); + + // Batch 2 (appended after first read) + writer.append_batch(&[EntityOps { + entity_id: 2, + ops: vec![Op::Set { field: "b".into(), value: json!(2) }], + }]).unwrap(); + + // Resume from cursor — should only get batch 2 + let mut reader2 = WalReader::new(&wal_path, cursor); + let r2 = reader2.read_batch(100).unwrap(); + assert_eq!(r2.entries.len(), 1); + assert_eq!(r2.entries[0].entity_id, 2); +} + +// ── Dump Registry Integration ── + +#[test] +fn test_dump_registry_full_workflow() { + let dir = TempDir::new().unwrap(); + let dumps_path = dir.path().join("dumps.json"); + + let mut reg = DumpRegistry::default(); + + // Simulate boot: check if dumps are complete + let image_hash = config_hash("table: Image\ntrack_fields: [nsfwLevel]"); + let tags_hash = config_hash("table: TagsOnImageNew\nfield: tagIds"); + let image_name = dump_name("Image", &image_hash); + let tags_name = dump_name("TagsOnImageNew", &tags_hash); + + assert!(!reg.is_complete(&image_name)); + assert!(!reg.is_complete(&tags_name)); + + // Register dumps + reg.register(image_name.clone(), Some("dumps/image.wal".into())); + reg.register(tags_name.clone(), Some("dumps/tags.wal".into())); + reg.save(&dumps_path).unwrap(); + + // Simulate pg-sync writing WAL and signaling loaded + reg.mark_loaded(&image_name, 107_500_000); + reg.mark_loaded(&tags_name, 375_000_000); + + // Simulate WAL reader completing + reg.mark_complete(&image_name, 107_500_000); + assert!(!reg.all_complete()); // Tags not done yet + + reg.mark_complete(&tags_name, 375_000_000); + assert!(reg.all_complete()); + + // Persist and reload + reg.save(&dumps_path).unwrap(); + let loaded = DumpRegistry::load(&dumps_path); + assert!(loaded.is_complete(&image_name)); + assert!(loaded.is_complete(&tags_name)); + assert!(loaded.all_complete()); +} + +#[test] +fn test_dump_config_change_detection() { + let hash1 = config_hash("table: Image\ntrack_fields: [nsfwLevel]"); + let hash2 = config_hash("table: Image\ntrack_fields: [nsfwLevel, type]"); + let name1 = dump_name("Image", &hash1); + let name2 = dump_name("Image", &hash2); + + let mut reg = DumpRegistry::default(); + reg.register(name1.clone(), None); + reg.mark_loaded(&name1, 100); + reg.mark_complete(&name1, 100); + + // After config change, the dump name is different + assert!(reg.is_complete(&name1)); + assert!(!reg.is_complete(&name2)); // New hash → not loaded → needs re-dump +} + +// ── Trigger Generation Integration ── + +#[test] +fn test_full_civitai_config() { + let yaml = r#" +sync_sources: + - table: Image + slot_field: id + sets_alive: true + track_fields: + - nsfwLevel + - type + - userId + - postId + - minor + - poi + - blockedFor + - "GREATEST({scannedAt}, {createdAt}) as existedAt" + - "({flags} & (1 << 13)) != 0 AND ({flags} & (1 << 2)) = 0 as hasMeta" + - "({flags} & (1 << 14)) != 0 as onSite" + on_delete: delete_slot + + - table: TagsOnImageNew + slot_field: imageId + field: tagIds + value_field: tagId + + - table: ImageTool + slot_field: imageId + field: toolIds + value_field: toolId + + - table: ImageTechnique + slot_field: imageId + field: techniqueIds + value_field: techniqueId + + - table: ModelVersion + query: "modelVersionIds eq {id}" + track_fields: [baseModel] + + - table: Post + query: "postId eq {id}" + track_fields: [publishedAt, availability] +"#; + + let config = SyncConfig::from_yaml(yaml).unwrap(); + assert_eq!(config.sync_sources.len(), 6); + + // Generate SQL for each and verify they're non-empty and contain expected patterns + for source in &config.sync_sources { + let sql = generate_trigger_sql(source); + assert!(sql.contains("CREATE OR REPLACE FUNCTION"), "Missing function for {}", source.table); + assert!(sql.contains("ENABLE ALWAYS"), "Missing ENABLE ALWAYS for {}", source.table); + + match source.table.as_str() { + "Image" => { + assert!(sql.contains("IS DISTINCT FROM"), "Image should use IS DISTINCT FROM"); + assert!(sql.contains("delete"), "Image should handle delete"); + assert!(sql.contains("GREATEST"), "Image should have existedAt expression"); + } + "TagsOnImageNew" => { + assert!(sql.contains("'add'"), "Tags should have add ops"); + assert!(sql.contains("'remove'"), "Tags should have remove ops"); + } + "ModelVersion" => { + assert!(sql.contains("queryOpSet"), "MV should use queryOpSet"); + assert!(sql.contains("modelVersionIds eq"), "MV should query by MV id"); + } + "Post" => { + assert!(sql.contains("queryOpSet"), "Post should use queryOpSet"); + assert!(sql.contains("postId eq"), "Post should query by postId"); + } + _ => {} + } + } +} + +// ── OpsBatch Serialization ── + +#[test] +fn test_ops_batch_json_format() { + let batch = OpsBatch { + ops: vec![ + EntityOps { + entity_id: 123, + ops: vec![ + Op::Remove { field: "nsfwLevel".into(), value: json!(8) }, + Op::Set { field: "nsfwLevel".into(), value: json!(16) }, + ], + }, + EntityOps { + entity_id: 456, + ops: vec![Op::QueryOpSet { + query: "modelVersionIds eq 456".into(), + ops: vec![ + Op::Remove { field: "baseModel".into(), value: json!("SD 1.5") }, + Op::Set { field: "baseModel".into(), value: json!("SDXL") }, + ], + }], + }, + ], + meta: Some(SyncMeta { + source: "pg-sync-default".into(), + cursor: Some(420_000_000), + max_id: Some(500_000_000), + lag_rows: Some(80_000_000), + }), + }; + + // Round-trip through JSON + let json_str = serde_json::to_string(&batch).unwrap(); + let parsed: OpsBatch = serde_json::from_str(&json_str).unwrap(); + assert_eq!(parsed.ops.len(), 2); + assert_eq!(parsed.ops[0].entity_id, 123); + assert_eq!(parsed.ops[1].entity_id, 456); + let meta = parsed.meta.unwrap(); + assert_eq!(meta.source, "pg-sync-default"); + assert_eq!(meta.lag_rows, Some(80_000_000)); +} From 618cf9e4732d79b84242772ea0e54ef47692db4a Mon Sep 17 00:00:00 2001 From: Justin Maier Date: Wed, 25 Mar 2026 18:49:14 -0600 Subject: [PATCH 09/10] feat: wire WAL reader thread into server startup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Spawns a background thread that tails the ops WAL file, reads batches of up to 10K records, deduplicates via dedup_ops(), and applies mutations to the engine via apply_ops_batch(). Persists cursor to disk after each batch. Updates bitdex_sync_wal_bytes metric. This completes the full ops ingestion chain: POST /ops → WAL append + fsync → WAL reader thread → engine mutations The reader sleeps 50ms when no new records are available, and 1s when no index is loaded yet. Errors are logged and retried. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/server.rs | 65 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/src/server.rs b/src/server.rs index 6e3b5c65..60913e32 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1037,6 +1037,71 @@ impl BitdexServer { } } + // Spawn WAL reader thread if pg-sync feature is enabled and index exists + #[cfg(feature = "pg-sync")] + { + let wal_dir = self.data_dir.join("wal"); + let wal_path = wal_dir.join("ops.wal"); + let cursor_path = wal_dir.join("cursor"); + let wal_state = Arc::clone(&state); + std::thread::Builder::new() + .name("wal-reader".into()) + .spawn(move || { + let cursor = crate::ops_processor::load_cursor(&cursor_path); + let mut reader = crate::ops_wal::WalReader::new(&wal_path, cursor); + eprintln!("WAL reader started (cursor={cursor}, path={})", wal_path.display()); + + loop { + // Read a batch from the WAL + match reader.read_batch(10_000) { + Ok(batch) if !batch.entries.is_empty() => { + // Get engine reference + let engine = { + let guard = wal_state.index.lock(); + guard.as_ref().map(|idx| Arc::clone(&idx.engine)) + }; + + if let Some(engine) = engine { + let mut entries = batch.entries; + let (applied, skipped, errors) = + crate::ops_processor::apply_ops_batch(&engine, &mut entries); + + if applied > 0 || errors > 0 { + eprintln!( + "WAL reader: applied={applied} skipped={skipped} errors={errors} cursor={}", + reader.cursor() + ); + } + + // Persist cursor after successful processing + if let Err(e) = crate::ops_processor::save_cursor(&cursor_path, reader.cursor()) { + eprintln!("WAL reader: failed to save cursor: {e}"); + } + + // Update WAL bytes metric + let wal_size = std::fs::metadata(&wal_path).map(|m| m.len()).unwrap_or(0); + wal_state.metrics.sync_wal_bytes + .with_label_values(&["wal-reader"]) + .set(wal_size as i64); + } else { + // No index loaded yet — sleep and retry + std::thread::sleep(std::time::Duration::from_secs(1)); + } + } + Ok(_) => { + // No new records — sleep briefly + std::thread::sleep(std::time::Duration::from_millis(50)); + } + Err(e) => { + eprintln!("WAL reader error: {e}"); + std::thread::sleep(std::time::Duration::from_secs(1)); + } + } + } + }) + .ok(); + } + let shutdown_state = Arc::clone(&state); // Admin routes — require Bearer token (or disabled if no token configured) From fa9d2a89430f0aa32bb32f0cd824b3524b653562 Mon Sep 17 00:00:00 2001 From: Justin Maier Date: Wed, 25 Mar 2026 18:51:51 -0600 Subject: [PATCH 10/10] =?UTF-8?q?feat:=20CSV=E2=86=92ops=20adapter=20+=20W?= =?UTF-8?q?AL=20reader=20thread=20wiring?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CSV dump adapter (csv_ops.rs): - images_csv_to_wal(): parses images.csv, converts each row to set ops (nsfwLevel, type, userId, postId, hasMeta, onSite, minor, poi, existedAt, blockedFor) - tags_csv_to_wal(), tools_csv_to_wal(): multi-value CSV → add ops - run_csv_dump(): orchestrates full CSV dump with optional row limits - Supports batch writing to WAL with configurable batch size - Limited variants for validation testing with subsets WAL reader thread (server.rs): - Spawned on server startup, tails ops.wal, reads batches of 10K - Deduplicates and applies via apply_ops_batch() - Persists cursor to disk, updates WAL bytes metric - Completes the full chain: POST /ops → WAL → reader → engine 2 new tests + previous tests still passing. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/pg_sync/csv_ops.rs | 406 +++++++++++++++++++++++++++++++++++++++++ src/pg_sync/mod.rs | 1 + 2 files changed, 407 insertions(+) create mode 100644 src/pg_sync/csv_ops.rs diff --git a/src/pg_sync/csv_ops.rs b/src/pg_sync/csv_ops.rs new file mode 100644 index 00000000..778441a0 --- /dev/null +++ b/src/pg_sync/csv_ops.rs @@ -0,0 +1,406 @@ +//! CSV→ops adapter for the dump pipeline. +//! +//! Reads existing CSV files (from PG COPY or local dumps) and transforms +//! each row into ops using the sync config schema. Writes ops to WAL files +//! for processing by the WAL reader thread. +//! +//! This is the local testing path and also the production dump path when +//! CSVs are pre-fetched to disk. + +use std::fs::File; +use std::io::{BufRead, BufReader}; +use std::path::Path; +use std::time::Instant; + +use serde_json::json; + +use super::copy_queries::{parse_image_row, parse_tag_row, parse_tool_row, CopyImageRow}; +use super::ops::{EntityOps, Op}; +use crate::ops_wal::WalWriter; + +/// Stats from a CSV→WAL conversion. +#[derive(Debug, Default)] +pub struct CsvOpsStats { + pub rows_read: u64, + pub rows_skipped: u64, + pub ops_written: u64, + pub bytes_written: u64, + pub elapsed_secs: f64, +} + +/// Convert images.csv to ops and write to WAL. +/// Each image row produces set ops for all tracked scalar fields. +pub fn images_csv_to_wal(csv_path: &Path, writer: &WalWriter, batch_size: usize) -> std::io::Result { + let start = Instant::now(); + let file = File::open(csv_path)?; + let reader = BufReader::with_capacity(8 * 1024 * 1024, file); + let mut stats = CsvOpsStats::default(); + let mut batch: Vec = Vec::with_capacity(batch_size); + + for line in reader.split(b'\n') { + let line = line?; + if line.is_empty() { + continue; + } + + let row = match parse_image_row(&line) { + Some(r) => r, + None => { + stats.rows_skipped += 1; + continue; + } + }; + stats.rows_read += 1; + + let ops = image_row_to_ops(&row); + batch.push(EntityOps { + entity_id: row.id, + ops, + }); + + if batch.len() >= batch_size { + let bytes = writer.append_batch(&batch)?; + stats.ops_written += batch.len() as u64; + stats.bytes_written += bytes; + batch.clear(); + } + } + + // Flush remaining + if !batch.is_empty() { + let bytes = writer.append_batch(&batch)?; + stats.ops_written += batch.len() as u64; + stats.bytes_written += bytes; + } + + stats.elapsed_secs = start.elapsed().as_secs_f64(); + Ok(stats) +} + +/// Convert a single image CSV row to ops. +fn image_row_to_ops(row: &CopyImageRow) -> Vec { + let mut ops = Vec::with_capacity(12); + + ops.push(Op::Set { field: "nsfwLevel".into(), value: json!(row.nsfw_level) }); + ops.push(Op::Set { field: "type".into(), value: json!(row.image_type) }); + ops.push(Op::Set { field: "userId".into(), value: json!(row.user_id) }); + + if let Some(post_id) = row.post_id { + ops.push(Op::Set { field: "postId".into(), value: json!(post_id) }); + } + + // hasMeta and onSite from flags + let has_meta = row.has_meta(); + let on_site = row.on_site(); + ops.push(Op::Set { field: "hasMeta".into(), value: json!(has_meta) }); + ops.push(Op::Set { field: "onSite".into(), value: json!(on_site) }); + + // Minor and POI + let minor = row.minor(); + let poi = row.poi(); + ops.push(Op::Set { field: "minor".into(), value: json!(minor) }); + ops.push(Op::Set { field: "poi".into(), value: json!(poi) }); + + // existedAt = GREATEST(scannedAt, createdAt) in seconds + let existed_at = match (row.scanned_at_secs, row.created_at_secs) { + (Some(s), Some(c)) => s.max(c), + (Some(s), None) => s, + (None, Some(c)) => c, + (None, None) => 0, + }; + ops.push(Op::Set { field: "existedAt".into(), value: json!(existed_at) }); + + // blockedFor + if let Some(ref bf) = row.blocked_for { + ops.push(Op::Set { field: "blockedFor".into(), value: json!(bf) }); + } + + ops +} + +/// Convert tags.csv to add ops and write to WAL. +/// Each row: (tag_id, image_id) → add tagIds op on the image. +pub fn tags_csv_to_wal(csv_path: &Path, writer: &WalWriter, batch_size: usize) -> std::io::Result { + multi_value_csv_to_wal(csv_path, writer, batch_size, "tagIds", |line| { + // tags.csv: tag_id, image_id + parse_tag_row(line).map(|(tag_id, image_id)| (image_id, tag_id)) + }) +} + +/// Convert tools.csv to add ops and write to WAL. +pub fn tools_csv_to_wal(csv_path: &Path, writer: &WalWriter, batch_size: usize) -> std::io::Result { + multi_value_csv_to_wal(csv_path, writer, batch_size, "toolIds", |line| { + parse_tool_row(line).map(|(tool_id, image_id)| (image_id, tool_id)) + }) +} + +/// Generic multi-value CSV→WAL converter. +/// Parser returns (slot_id, value) pairs. +fn multi_value_csv_to_wal( + csv_path: &Path, + writer: &WalWriter, + batch_size: usize, + field_name: &str, + parser: impl Fn(&[u8]) -> Option<(i64, i64)>, +) -> std::io::Result { + let start = Instant::now(); + let file = File::open(csv_path)?; + let reader = BufReader::with_capacity(8 * 1024 * 1024, file); + let mut stats = CsvOpsStats::default(); + let mut batch: Vec = Vec::with_capacity(batch_size); + + for line in reader.split(b'\n') { + let line = line?; + if line.is_empty() { + continue; + } + + let (slot_id, value) = match parser(&line) { + Some(pair) => pair, + None => { + stats.rows_skipped += 1; + continue; + } + }; + stats.rows_read += 1; + + batch.push(EntityOps { + entity_id: slot_id, + ops: vec![Op::Add { + field: field_name.to_string(), + value: json!(value), + }], + }); + + if batch.len() >= batch_size { + let bytes = writer.append_batch(&batch)?; + stats.ops_written += batch.len() as u64; + stats.bytes_written += bytes; + batch.clear(); + } + } + + if !batch.is_empty() { + let bytes = writer.append_batch(&batch)?; + stats.ops_written += batch.len() as u64; + stats.bytes_written += bytes; + } + + stats.elapsed_secs = start.elapsed().as_secs_f64(); + Ok(stats) +} + +/// Run the full CSV dump pipeline: read all CSVs, convert to ops, write to WAL. +/// Returns per-table stats. +pub fn run_csv_dump( + csv_dir: &Path, + wal_path: &Path, + batch_size: usize, + limit: Option, +) -> std::io::Result> { + let writer = WalWriter::new(wal_path); + let mut results = Vec::new(); + + // Phase 1: Images (must be first — sets alive + scalar fields) + let images_csv = csv_dir.join("images.csv"); + if images_csv.exists() { + eprintln!("CSV dump: loading images.csv..."); + let stats = if let Some(max) = limit { + images_csv_to_wal_limited(&images_csv, &writer, batch_size, max)? + } else { + images_csv_to_wal(&images_csv, &writer, batch_size)? + }; + eprintln!( + " images: {} rows, {} ops, {:.1}s ({:.0}/s)", + stats.rows_read, stats.ops_written, stats.elapsed_secs, + stats.rows_read as f64 / stats.elapsed_secs.max(0.001) + ); + results.push(("images".into(), stats)); + } + + // Phase 2: Multi-value tables (parallel-safe, but sequential here for simplicity) + let tags_csv = csv_dir.join("tags.csv"); + if tags_csv.exists() { + eprintln!("CSV dump: loading tags.csv..."); + let stats = if let Some(max) = limit { + multi_value_csv_to_wal_limited(&tags_csv, &writer, batch_size, "tagIds", max, |line| { + parse_tag_row(line).map(|(tag_id, image_id)| (image_id, tag_id)) + })? + } else { + tags_csv_to_wal(&tags_csv, &writer, batch_size)? + }; + eprintln!( + " tags: {} rows, {} ops, {:.1}s ({:.0}/s)", + stats.rows_read, stats.ops_written, stats.elapsed_secs, + stats.rows_read as f64 / stats.elapsed_secs.max(0.001) + ); + results.push(("tags".into(), stats)); + } + + let tools_csv = csv_dir.join("tools.csv"); + if tools_csv.exists() { + eprintln!("CSV dump: loading tools.csv..."); + let stats = if let Some(max) = limit { + multi_value_csv_to_wal_limited(&tools_csv, &writer, batch_size, "toolIds", max, |line| { + parse_tool_row(line).map(|(tool_id, image_id)| (image_id, tool_id)) + })? + } else { + tools_csv_to_wal(&tools_csv, &writer, batch_size)? + }; + eprintln!( + " tools: {} rows, {} ops, {:.1}s ({:.0}/s)", + stats.rows_read, stats.ops_written, stats.elapsed_secs, + stats.rows_read as f64 / stats.elapsed_secs.max(0.001) + ); + results.push(("tools".into(), stats)); + } + + Ok(results) +} + +/// Limited version of images_csv_to_wal — stops after `limit` rows. +fn images_csv_to_wal_limited(csv_path: &Path, writer: &WalWriter, batch_size: usize, limit: u64) -> std::io::Result { + let start = Instant::now(); + let file = File::open(csv_path)?; + let reader = BufReader::with_capacity(8 * 1024 * 1024, file); + let mut stats = CsvOpsStats::default(); + let mut batch: Vec = Vec::with_capacity(batch_size); + + for line in reader.split(b'\n') { + if stats.rows_read >= limit { + break; + } + let line = line?; + if line.is_empty() { continue; } + let row = match parse_image_row(&line) { + Some(r) => r, + None => { stats.rows_skipped += 1; continue; } + }; + stats.rows_read += 1; + batch.push(EntityOps { entity_id: row.id, ops: image_row_to_ops(&row) }); + if batch.len() >= batch_size { + let bytes = writer.append_batch(&batch)?; + stats.ops_written += batch.len() as u64; + stats.bytes_written += bytes; + batch.clear(); + } + } + if !batch.is_empty() { + let bytes = writer.append_batch(&batch)?; + stats.ops_written += batch.len() as u64; + stats.bytes_written += bytes; + } + stats.elapsed_secs = start.elapsed().as_secs_f64(); + Ok(stats) +} + +/// Limited version of multi_value_csv_to_wal. +fn multi_value_csv_to_wal_limited( + csv_path: &Path, + writer: &WalWriter, + batch_size: usize, + field_name: &str, + limit: u64, + parser: impl Fn(&[u8]) -> Option<(i64, i64)>, +) -> std::io::Result { + let start = Instant::now(); + let file = File::open(csv_path)?; + let reader = BufReader::with_capacity(8 * 1024 * 1024, file); + let mut stats = CsvOpsStats::default(); + let mut batch: Vec = Vec::with_capacity(batch_size); + + for line in reader.split(b'\n') { + if stats.rows_read >= limit { + break; + } + let line = line?; + if line.is_empty() { continue; } + let (slot_id, value) = match parser(&line) { + Some(pair) => pair, + None => { stats.rows_skipped += 1; continue; } + }; + stats.rows_read += 1; + batch.push(EntityOps { + entity_id: slot_id, + ops: vec![Op::Add { field: field_name.to_string(), value: json!(value) }], + }); + if batch.len() >= batch_size { + let bytes = writer.append_batch(&batch)?; + stats.ops_written += batch.len() as u64; + stats.bytes_written += bytes; + batch.clear(); + } + } + if !batch.is_empty() { + let bytes = writer.append_batch(&batch)?; + stats.ops_written += batch.len() as u64; + stats.bytes_written += bytes; + } + stats.elapsed_secs = start.elapsed().as_secs_f64(); + Ok(stats) +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::TempDir; + + #[test] + fn test_image_row_to_ops() { + let row = CopyImageRow { + id: 1, + url: Some("test.jpg".into()), + nsfw_level: 16, + hash: None, + flags: (1 << 13), // hasMeta=true + image_type: "image".into(), + user_id: 42, + blocked_for: None, + scanned_at_secs: Some(1000), + created_at_secs: Some(2000), + post_id: Some(100), + width: None, + height: None, + published_at_secs: None, + availability: String::new(), + posted_to_id: None, + }; + let ops = image_row_to_ops(&row); + // Should have: nsfwLevel, type, userId, postId, hasMeta, onSite, minor, poi, existedAt + assert!(ops.len() >= 9); + + // Check nsfwLevel + let nsfw = ops.iter().find(|o| matches!(o, Op::Set { field, .. } if field == "nsfwLevel")).unwrap(); + if let Op::Set { value, .. } = nsfw { assert_eq!(*value, json!(16)); } + + // Check existedAt = max(1000, 2000) = 2000 + let existed = ops.iter().find(|o| matches!(o, Op::Set { field, .. } if field == "existedAt")).unwrap(); + if let Op::Set { value, .. } = existed { assert_eq!(*value, json!(2000)); } + + // Check hasMeta (flags bit 13 set) + let has_meta = ops.iter().find(|o| matches!(o, Op::Set { field, .. } if field == "hasMeta")).unwrap(); + if let Op::Set { value, .. } = has_meta { assert_eq!(*value, json!(true)); } + } + + #[test] + fn test_csv_to_wal_roundtrip() { + let dir = TempDir::new().unwrap(); + let csv_path = dir.path().join("images.csv"); + let wal_path = dir.path().join("ops.wal"); + + // Write a tiny CSV (comma-separated, matching PG COPY CSV format) + std::fs::write(&csv_path, b"1,http://img.jpg,16,,8192,image,42,,1000,2000,100\n2,,1,,0,video,99,,500,600,200\n").unwrap(); + + let stats = images_csv_to_wal(&csv_path, &WalWriter::new(&wal_path), 100).unwrap(); + assert_eq!(stats.rows_read, 2); + assert_eq!(stats.ops_written, 2); + assert!(stats.bytes_written > 0); + + // Read back from WAL + let mut reader = crate::ops_wal::WalReader::new(&wal_path, 0); + let batch = reader.read_batch(100).unwrap(); + assert_eq!(batch.entries.len(), 2); + assert_eq!(batch.entries[0].entity_id, 1); + assert_eq!(batch.entries[1].entity_id, 2); + } +} diff --git a/src/pg_sync/mod.rs b/src/pg_sync/mod.rs index 23abf1f0..d1cbcff7 100644 --- a/src/pg_sync/mod.rs +++ b/src/pg_sync/mod.rs @@ -11,6 +11,7 @@ pub mod bulk_loader; pub mod config; pub mod copy_queries; pub mod copy_streams; +pub mod csv_ops; pub mod dump; pub mod metrics_poller; pub mod op_dedup;