feat: PG-Sync V2 — ops pipeline with BitmapSink dual-path#85
Merged
feat: PG-Sync V2 — ops pipeline with BitmapSink dual-path#85
Conversation
Core building blocks for the ops-based sync pipeline: - src/pg_sync/ops.rs: Op enum (Set, Remove, Add, Delete, QueryOpSet), OpsRow, OpsBatch, EntityOps, SyncMeta, BitdexOps table SQL - src/pg_sync/op_dedup.rs: Shared dedup helper — LIFO per (entity_id, field), add/remove cancellation, delete absorption, queryOpSet last-wins - src/ops_wal.rs: Append-only WAL with CRC32 integrity, WalWriter (append+fsync), WalReader (cursor-based tail, partial record handling, CRC skip) Also fixes pre-existing compile error in copy_queries.rs tests (missing width/height fields on CopyImageRow constructors). 30 tests passing. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
WAL-backed ops ingestion endpoint:
- POST /api/indexes/{name}/ops accepts OpsBatch (ops + sync meta)
- Appends to WAL file via WalWriter, returns 200 only after fsync
- Lazy WAL writer init (created on first POST)
- Stores latest SyncMeta per source for lag monitoring
Sync lag endpoint:
- GET /api/internal/sync-lag returns latest metadata from all sync sources
- Supports cursor position, max_id, lag_rows per source
Both endpoints compile-gated behind pg-sync feature with no-op fallbacks.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Ops processor that reads from WAL and routes to the engine: - Regular ops (set/remove/add): build PatchPayload with old+new values, call engine.patch() — no docstore read needed - queryOpSet: parse filter string, execute query for matching slots, apply nested ops to all matches - Delete: route to engine.delete() - Filter parser for queryOpSet: supports eq and in operators Includes json_to_qvalue converter (serde_json::Value → query::Value) for the PatchPayload/FieldValue type boundary. 9 tests: scalar update, insert (no old), multi-value add/remove, delete+queryOpSet skip, filter parsing, value type parsing, cursor persistence. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…dpoint New ops_poller.rs replaces outbox_poller for V2 sync: - Reads from BitdexOps table (JSONB ops arrays) instead of BitdexOutbox - Cursor managed in PG bitdex_cursors table (not in BitDex) - Deduplicates via shared dedup_ops() before sending - POSTs OpsBatch with SyncMeta (cursor, max_id, lag) to /ops endpoint - Health gate: pauses when BitDex is unreachable Also adds post_ops() to BitdexClient. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Config-driven PG trigger generation:
- SyncSource struct: direct tables (slot_field + track_fields),
multi-value join tables (field + value_field), fan-out tables
(query + query_source)
- SyncConfig: YAML-parseable config with sync_sources array
- SQL generator: CREATE OR REPLACE FUNCTION + CREATE TRIGGER for each source
- Expression interpolation in track_fields: "GREATEST({scannedAt}, {createdAt}) as existedAt"
- {column} placeholder substitution with OLD/NEW prefixes
- Hash-based trigger naming (bitdex_{table}_{hash8}) for reconciliation
- IS DISTINCT FROM checks for UPDATE ops (only emit when value actually changes)
- queryOpSet generation for fan-out tables
- ENABLE ALWAYS on all triggers (CDC compatibility)
11 tests: parsing, column substitution, all three trigger types,
hash change detection, YAML parsing, expression interpolation.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Dump lifecycle management for unified load pipeline:
- DumpRegistry: in-memory + JSON-persisted dump state tracking
- DumpEntry: name, wal_path, status (Writing/Loading/Complete/Failed),
ops counts, timestamps
- dump_name() + config_hash() for change detection
- Atomic save via temp file rename
Server endpoints:
- GET /dumps — list all dumps with status
- PUT /dumps — register new dump
- POST /dumps/{name}/loaded — signal WAL file complete
- DELETE /dumps/{name} — remove from history
- POST /dumps/clear — clear all
All endpoints feature-gated behind pg-sync with no-op fallbacks.
10 tests: lifecycle, persistence, removal, completion tracking,
config hash determinism, failure handling.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
New unified sync metrics with source label:
- bitdex_sync_cursor_position{source="..."} — current cursor
- bitdex_sync_max_id{source="..."} — max ops table ID
- bitdex_sync_lag_rows{source="..."} — rows behind
- bitdex_sync_ops_total{source="..."} — total ops received
- bitdex_sync_wal_bytes{source="..."} — WAL file size
Metrics populated from SyncMeta in the POST /ops endpoint.
Old bitdex_pgsync_* metrics preserved for backward compat.
Binary rename (bitdex-pg-sync → bitdex-sync) deferred to deployment PR.
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
9 integration tests exercising the full ops pipeline: - WAL roundtrip with dedup (write → read → dedup → verify) - Delete absorption through WAL - Add/remove cancellation through WAL - queryOpSet serialization through WAL - Cursor resume across multiple appends - Dump registry full workflow (register → load → complete → persist) - Dump config change detection (hash mismatch triggers re-dump) - Full Civitai trigger config (6 sources, all generate valid SQL) - OpsBatch JSON format roundtrip with SyncMeta Total: 69 tests across all Sync V2 modules (60 unit + 9 integration). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Spawns a background thread that tails the ops WAL file, reads batches of up to 10K records, deduplicates via dedup_ops(), and applies mutations to the engine via apply_ops_batch(). Persists cursor to disk after each batch. Updates bitdex_sync_wal_bytes metric. This completes the full ops ingestion chain: POST /ops → WAL append + fsync → WAL reader thread → engine mutations The reader sleeps 50ms when no new records are available, and 1s when no index is loaded yet. Errors are logged and retried. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
CSV dump adapter (csv_ops.rs): - images_csv_to_wal(): parses images.csv, converts each row to set ops (nsfwLevel, type, userId, postId, hasMeta, onSite, minor, poi, existedAt, blockedFor) - tags_csv_to_wal(), tools_csv_to_wal(): multi-value CSV → add ops - run_csv_dump(): orchestrates full CSV dump with optional row limits - Supports batch writing to WAL with configurable batch size - Limited variants for validation testing with subsets WAL reader thread (server.rs): - Spawned on server startup, tails ops.wal, reads batches of 10K - Deduplicates and applies via apply_ops_batch() - Persists cursor to disk, updates WAL bytes metric - Completes the full chain: POST /ops → WAL → reader → engine 2 new tests + previous tests still passing. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Rewrites ops_processor.rs per the Sync V2 design doc: Steady-state path: ops → BitmapSink (CoalescerSink) → coalescer channel. No more engine.put() — ops translate directly to FilterInsert/FilterRemove/ SortSet/SortClear/AliveInsert/AliveRemove via the existing mutation helpers (value_to_bitmap_key, value_to_sort_u32). Dump path: ops → BitmapSink (AccumSink) → BitmapAccum → apply_accum(). Bypasses coalescer, snapshot publishing, and cache invalidation entirely. process_csv_dump_direct() goes CSV → ops → AccumSink in one pass. Key changes: - FieldMeta: precomputed field metadata from Config (filter/sort field types) - creates_slot flag on EntityOps (persisted in WAL binary header) - apply_accum() on ConcurrentEngine for direct staging merge - mutation_sender() exposed on ConcurrentEngine for CoalescerSink - WAL format updated: 1-byte flags field after entity_id - Dedup preserves creates_slot via OR across merged sources - Validation harness supports --direct, --steady-state, WAL dump modes Benchmarks at 1M scale: - Direct dump: 367K images/s (beats 345K/s single-pass baseline) - WAL dump: 41K ops/s - Steady-state: 2.7K ops/s (expected — per-op channel overhead) 14 unit tests + 9 integration tests passing. CSV validation PASS at 10K, 100K, 1M with zero errors. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…rocessing Reflects the architecture split validated by benchmarks: - pg-sync: thin data mover (COPY→CSV, cursor management, ops polling) - BitDex: all processing (CSV parse, ops→AccumSink, bitmap accumulation) - Dumps skip WAL entirely — CSV→AccumSink direct path at 367K/s - Updated throughput table with measured numbers - Boot sequence uses pre_dump_cursor for gap safety Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Uses rayon fold+reduce for parallel CSV parsing, matching the single-pass loader pattern. Each rayon worker builds a thread-local BitmapAccum, merged at the end via bitmap OR. 1M benchmark: 2.7M ops/s total (images 2.0M/s, tags 5.4M/s, tools 8.8M/s). Previous single-threaded: 931K ops/s. Speedup: 2.9x. vs single-pass baseline (345K/s): 5.8x faster on images. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…apply_accum F1 (Critical): WAL reader was reading entire file every poll via fs::read(). Now uses seek(cursor) + read_exact() for incremental reads. O(new_data) per poll instead of O(file_size). F2 (Critical): apply_accum() cloned snapshot without loading mode, triggering Arc clone cascade (94s stalls at 105M). Now enters/exits loading mode automatically — staging refcount=1, no deep clones. Also: chunked block reader for direct dump path (300MB blocks via reader thread). Prevents OOM on 67GB tags.csv at full scale. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
CH poller now emits Op::Set for reactionCount/commentCount/collectedCount instead of V1 full-document patches. Removes PG dependency (metrics are self-contained sort values from ClickHouse). Batched at 5K entities per POST /ops request. creates_slot: false (sort-only, no alive bit changes). 6 unit tests for metrics→ops conversion. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Direct mode reads CSVs directly — no need to write WAL first. Eliminates ~40s wasted I/O per run. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ader hang Reverts to simple read_lines approach (no chunked block reader thread). The chunked reader deadlocked due to sync_channel backpressure with rayon. Loading mode entered once for the entire dump, exited after all tables. Headless engines get a harmless timeout warning (no flush thread). 1M benchmark: 2.5M ops/s (images 1.35M/s, tags 5.1M/s, tools 6.8M/s). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Reads 10M lines per chunk instead of entire file. Each chunk is rayon processed and applied to staging, then freed. Caps memory at ~2GB per chunk instead of 67GB for the full tags file. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
7 tasks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Key Files
src/ops_processor.rssrc/ingester.rssrc/pg_sync/ops.rssrc/ops_wal.rssrc/pg_sync/op_dedup.rssrc/concurrent_engine.rssrc/server.rsdocs/design/pg-sync-v2-final.mdBenchmarks (1M scale)
Direct dump path is 20% faster than the single-pass loader baseline.
Test plan
🤖 Generated with Claude Code