Skip to content

feat: Sync V2 data coverage — all CSV tables + deferred alive#86

Merged
JustMaier merged 27 commits intomainfrom
feat/sync-v2
Mar 26, 2026
Merged

feat: Sync V2 data coverage — all CSV tables + deferred alive#86
JustMaier merged 27 commits intomainfrom
feat/sync-v2

Conversation

@JustMaier
Copy link
Copy Markdown
Contributor

Summary

Completes data coverage for the Sync V2 dump pipeline. Previously only images/tags/tools were processed (PR #85). Now all 10 CSV tables are wired with full enrichment, string dictionaries, computed sort fields, and deferred alive.

  • 7 new CSV tables: techniques, resources (with MV→model enrichment chain for baseModel + poi), posts (publishedAt, availability, postedToId, isPublished), metrics (reactionCount, commentCount, collectedCount from CH dump), collection_items, model_versions, models
  • String dictionary resolution: type, blockedFor, availability, baseModel resolved through FieldDictionary for LCS fields (fixes silent drops from value_to_bitmap_key returning None for strings)
  • Computed sortAt: GREATEST(existedAt, publishedAt) emitted during image processing
  • Deferred alive: Future publishedAt → DeferredAlive instead of AliveInsert in both steady-state (BitmapSink trait) and dump path (BitmapAccum collection). Deploy gate per Sky.
  • id sort field: Emitted for sort-by-id queries
  • Generic process_multi_value_csv helper: DRY chunked rayon pattern for tags/tools/techniques/collection_items

Validation

  • 50M: PASS — 202M ops at 475K/s, zero errors, all 7 tables
  • 107M images: 247K/s — completes successfully, but tags phase OOMs on 16GB local machine (production K8s has 32GB+)
  • 16 unit tests (ops_processor) + 28 unit tests (copy_queries) passing

Config changes needed (separate commit on main)

  • existedAt sort field (source for computed sortAt)
  • sortAt.computed = {op: "greatest", source_fields: ["existedAt", "publishedAt"]}
  • collectionIds multi_value filter field

Test plan

🤖 Generated with Claude Code

JustMaier and others added 27 commits March 25, 2026 19:50
Core building blocks for the ops-based sync pipeline:

- src/pg_sync/ops.rs: Op enum (Set, Remove, Add, Delete, QueryOpSet),
  OpsRow, OpsBatch, EntityOps, SyncMeta, BitdexOps table SQL
- src/pg_sync/op_dedup.rs: Shared dedup helper — LIFO per (entity_id, field),
  add/remove cancellation, delete absorption, queryOpSet last-wins
- src/ops_wal.rs: Append-only WAL with CRC32 integrity, WalWriter (append+fsync),
  WalReader (cursor-based tail, partial record handling, CRC skip)

Also fixes pre-existing compile error in copy_queries.rs tests (missing
width/height fields on CopyImageRow constructors).

30 tests passing.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
WAL-backed ops ingestion endpoint:
- POST /api/indexes/{name}/ops accepts OpsBatch (ops + sync meta)
- Appends to WAL file via WalWriter, returns 200 only after fsync
- Lazy WAL writer init (created on first POST)
- Stores latest SyncMeta per source for lag monitoring

Sync lag endpoint:
- GET /api/internal/sync-lag returns latest metadata from all sync sources
- Supports cursor position, max_id, lag_rows per source

Both endpoints compile-gated behind pg-sync feature with no-op fallbacks.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Ops processor that reads from WAL and routes to the engine:
- Regular ops (set/remove/add): build PatchPayload with old+new values,
  call engine.patch() — no docstore read needed
- queryOpSet: parse filter string, execute query for matching slots,
  apply nested ops to all matches
- Delete: route to engine.delete()
- Filter parser for queryOpSet: supports eq and in operators

Includes json_to_qvalue converter (serde_json::Value → query::Value)
for the PatchPayload/FieldValue type boundary.

9 tests: scalar update, insert (no old), multi-value add/remove,
delete+queryOpSet skip, filter parsing, value type parsing, cursor
persistence.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…dpoint

New ops_poller.rs replaces outbox_poller for V2 sync:
- Reads from BitdexOps table (JSONB ops arrays) instead of BitdexOutbox
- Cursor managed in PG bitdex_cursors table (not in BitDex)
- Deduplicates via shared dedup_ops() before sending
- POSTs OpsBatch with SyncMeta (cursor, max_id, lag) to /ops endpoint
- Health gate: pauses when BitDex is unreachable

Also adds post_ops() to BitdexClient.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Config-driven PG trigger generation:
- SyncSource struct: direct tables (slot_field + track_fields),
  multi-value join tables (field + value_field), fan-out tables
  (query + query_source)
- SyncConfig: YAML-parseable config with sync_sources array
- SQL generator: CREATE OR REPLACE FUNCTION + CREATE TRIGGER for each source
- Expression interpolation in track_fields: "GREATEST({scannedAt}, {createdAt}) as existedAt"
- {column} placeholder substitution with OLD/NEW prefixes
- Hash-based trigger naming (bitdex_{table}_{hash8}) for reconciliation
- IS DISTINCT FROM checks for UPDATE ops (only emit when value actually changes)
- queryOpSet generation for fan-out tables
- ENABLE ALWAYS on all triggers (CDC compatibility)

11 tests: parsing, column substitution, all three trigger types,
hash change detection, YAML parsing, expression interpolation.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Dump lifecycle management for unified load pipeline:
- DumpRegistry: in-memory + JSON-persisted dump state tracking
- DumpEntry: name, wal_path, status (Writing/Loading/Complete/Failed),
  ops counts, timestamps
- dump_name() + config_hash() for change detection
- Atomic save via temp file rename

Server endpoints:
- GET /dumps — list all dumps with status
- PUT /dumps — register new dump
- POST /dumps/{name}/loaded — signal WAL file complete
- DELETE /dumps/{name} — remove from history
- POST /dumps/clear — clear all

All endpoints feature-gated behind pg-sync with no-op fallbacks.
10 tests: lifecycle, persistence, removal, completion tracking,
config hash determinism, failure handling.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
New unified sync metrics with source label:
- bitdex_sync_cursor_position{source="..."} — current cursor
- bitdex_sync_max_id{source="..."} — max ops table ID
- bitdex_sync_lag_rows{source="..."} — rows behind
- bitdex_sync_ops_total{source="..."} — total ops received
- bitdex_sync_wal_bytes{source="..."} — WAL file size

Metrics populated from SyncMeta in the POST /ops endpoint.
Old bitdex_pgsync_* metrics preserved for backward compat.
Binary rename (bitdex-pg-sync → bitdex-sync) deferred to deployment PR.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
9 integration tests exercising the full ops pipeline:
- WAL roundtrip with dedup (write → read → dedup → verify)
- Delete absorption through WAL
- Add/remove cancellation through WAL
- queryOpSet serialization through WAL
- Cursor resume across multiple appends
- Dump registry full workflow (register → load → complete → persist)
- Dump config change detection (hash mismatch triggers re-dump)
- Full Civitai trigger config (6 sources, all generate valid SQL)
- OpsBatch JSON format roundtrip with SyncMeta

Total: 69 tests across all Sync V2 modules (60 unit + 9 integration).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Spawns a background thread that tails the ops WAL file, reads batches
of up to 10K records, deduplicates via dedup_ops(), and applies
mutations to the engine via apply_ops_batch(). Persists cursor to
disk after each batch. Updates bitdex_sync_wal_bytes metric.

This completes the full ops ingestion chain:
POST /ops → WAL append + fsync → WAL reader thread → engine mutations

The reader sleeps 50ms when no new records are available, and 1s when
no index is loaded yet. Errors are logged and retried.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
CSV dump adapter (csv_ops.rs):
- images_csv_to_wal(): parses images.csv, converts each row to set ops
  (nsfwLevel, type, userId, postId, hasMeta, onSite, minor, poi, existedAt, blockedFor)
- tags_csv_to_wal(), tools_csv_to_wal(): multi-value CSV → add ops
- run_csv_dump(): orchestrates full CSV dump with optional row limits
- Supports batch writing to WAL with configurable batch size
- Limited variants for validation testing with subsets

WAL reader thread (server.rs):
- Spawned on server startup, tails ops.wal, reads batches of 10K
- Deduplicates and applies via apply_ops_batch()
- Persists cursor to disk, updates WAL bytes metric
- Completes the full chain: POST /ops → WAL → reader → engine

2 new tests + previous tests still passing.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Rewrites ops_processor.rs per the Sync V2 design doc:

Steady-state path: ops → BitmapSink (CoalescerSink) → coalescer channel.
No more engine.put() — ops translate directly to FilterInsert/FilterRemove/
SortSet/SortClear/AliveInsert/AliveRemove via the existing mutation helpers
(value_to_bitmap_key, value_to_sort_u32).

Dump path: ops → BitmapSink (AccumSink) → BitmapAccum → apply_accum().
Bypasses coalescer, snapshot publishing, and cache invalidation entirely.
process_csv_dump_direct() goes CSV → ops → AccumSink in one pass.

Key changes:
- FieldMeta: precomputed field metadata from Config (filter/sort field types)
- creates_slot flag on EntityOps (persisted in WAL binary header)
- apply_accum() on ConcurrentEngine for direct staging merge
- mutation_sender() exposed on ConcurrentEngine for CoalescerSink
- WAL format updated: 1-byte flags field after entity_id
- Dedup preserves creates_slot via OR across merged sources
- Validation harness supports --direct, --steady-state, WAL dump modes

Benchmarks at 1M scale:
- Direct dump: 367K images/s (beats 345K/s single-pass baseline)
- WAL dump: 41K ops/s
- Steady-state: 2.7K ops/s (expected — per-op channel overhead)

14 unit tests + 9 integration tests passing.
CSV validation PASS at 10K, 100K, 1M with zero errors.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…rocessing

Reflects the architecture split validated by benchmarks:
- pg-sync: thin data mover (COPY→CSV, cursor management, ops polling)
- BitDex: all processing (CSV parse, ops→AccumSink, bitmap accumulation)
- Dumps skip WAL entirely — CSV→AccumSink direct path at 367K/s
- Updated throughput table with measured numbers
- Boot sequence uses pre_dump_cursor for gap safety

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Uses rayon fold+reduce for parallel CSV parsing, matching the
single-pass loader pattern. Each rayon worker builds a thread-local
BitmapAccum, merged at the end via bitmap OR.

1M benchmark: 2.7M ops/s total (images 2.0M/s, tags 5.4M/s, tools 8.8M/s).
Previous single-threaded: 931K ops/s. Speedup: 2.9x.
vs single-pass baseline (345K/s): 5.8x faster on images.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…apply_accum

F1 (Critical): WAL reader was reading entire file every poll via fs::read().
Now uses seek(cursor) + read_exact() for incremental reads. O(new_data) per
poll instead of O(file_size).

F2 (Critical): apply_accum() cloned snapshot without loading mode, triggering
Arc clone cascade (94s stalls at 105M). Now enters/exits loading mode
automatically — staging refcount=1, no deep clones.

Also: chunked block reader for direct dump path (300MB blocks via reader thread).
Prevents OOM on 67GB tags.csv at full scale.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
CH poller now emits Op::Set for reactionCount/commentCount/collectedCount
instead of V1 full-document patches. Removes PG dependency (metrics are
self-contained sort values from ClickHouse). Batched at 5K entities per
POST /ops request. creates_slot: false (sort-only, no alive bit changes).

6 unit tests for metrics→ops conversion.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Direct mode reads CSVs directly — no need to write WAL first.
Eliminates ~40s wasted I/O per run.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…ader hang

Reverts to simple read_lines approach (no chunked block reader thread).
The chunked reader deadlocked due to sync_channel backpressure with rayon.

Loading mode entered once for the entire dump, exited after all tables.
Headless engines get a harmless timeout warning (no flush thread).

1M benchmark: 2.5M ops/s (images 1.35M/s, tags 5.1M/s, tools 6.8M/s).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Reads 10M lines per chunk instead of entire file. Each chunk is rayon
processed and applied to staging, then freed. Caps memory at ~2GB per
chunk instead of 67GB for the full tags file.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Adds ComputedSortInfo and reverse dependency map to FieldMeta so the ops
processor knows which computed fields to recompute when source fields change.

Gap analysis identified remaining work before deploy:
- 7/10 CSV tables need ops adapters (parsers exist)
- Docstore writes not wired
- String dictionary resolution for LCS fields
- Deferred alive for future-dated posts
- Doc-only fields (width, height, url, hash)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Adds processing for 7 remaining tables (techniques, resources, posts,
model_versions, models, metrics, collection_items) to the direct dump
pipeline. Images now enriched with post data (publishedAt, availability,
postedToId, isPublished) and computed sortAt = GREATEST(existedAt,
publishedAt). Resources chain through model_versions → models for
baseModel and resource-level poi. Metrics from CH dump set sort fields
(reactionCount, commentCount, collectedCount).

String dictionary resolution via FieldDictionary for LCS fields (type,
blockedFor, availability, baseModel) — fixes silent drops from
value_to_bitmap_key returning None for strings.

Extracted process_multi_value_csv helper to DRY the chunked rayon
pattern across tags/tools/techniques/collection_items.

Validated at 100K: zero errors, all tables processed.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Adds deferred_alive method to BitmapSink trait. When creates_slot=true
and the deferred_alive source field (publishedAt) has a future timestamp,
the slot's filter/sort bitmaps are set immediately but the alive bit is
deferred via DeferredAlive MutationOp (steady-state) or skipped in
AccumSink (dump mode).

This prevents scheduled posts from appearing in queries before their
publishedAt time — a deploy gate per Sky.

Two new unit tests verify future vs past publishedAt behavior.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
10M chunks OOM'd at 40M images due to enriched fields (~16 bitmap ops
per image vs 10 before). 5M chunks OOM'd at 70M. 2M chunks complete
107M images at 247K/s. Enrichment HashMaps (posts 22.8M, MVs 1M,
models 823K) freed after image phase to reclaim ~1.5GB for tags.

Full 107M with all tables requires ~20GB RAM (staging bitmaps grow with
more filter/sort fields from enrichment). Local machine hits limit
during tags phase. Production K8s has sufficient RAM.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Fixes Ollie's #8 finding: image dump path unconditionally set alive bit
without checking publishedAt. Now checks deferred_alive config during
image processing — future publishedAt slots skip alive insert and are
collected as deferred entries, scheduled via MutationOp after loading
mode exits.

Added deferred_alive field to BitmapAccum for rayon fold collection.
Added deferred_alive method to BitmapSink trait with implementations
in CoalescerSink (DeferredAlive MutationOp) and AccumSink (no-op).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Replaces image_row_to_ops_pub → Vec<Op> → json_to_qvalue → bitmap_key
chain with direct CopyImageRow field → bitmap writes using macros for
filter_int/filter_bool/filter_str. Eliminates ~1.3B transient heap
allocs at 107M scale (12 String+JsonValue pairs per row).

This should reduce peak RSS by 1-2GB from heap fragmentation and
improve throughput by ~30-40% in the image dump phase.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Inter-phase save_and_unload() loses alive bitmap due to complex
flush thread staging/snapshot interaction. The old bulk_loader
manages this correctly but integrating it into the dump pipeline
requires careful staging lifecycle management. Left as TODO.

107M with all enriched fields needs ~20GB — production K8s has
this. 50M validates correctness on 16GB local machine.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@JustMaier JustMaier merged commit abe45df into main Mar 26, 2026
1 check failed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant