Skip to content

feat: PG-Sync V2 — ops-based outbox, WAL pipeline, trigger generator#84

Merged
JustMaier merged 10 commits intomainfrom
feat/sync-v2
Mar 26, 2026
Merged

feat: PG-Sync V2 — ops-based outbox, WAL pipeline, trigger generator#84
JustMaier merged 10 commits intomainfrom
feat/sync-v2

Conversation

@JustMaier
Copy link
Copy Markdown
Contributor

Summary

Complete implementation of the PG-Sync V2 pipeline, replacing the enrichment-heavy V1 outbox poller with self-contained ops. This is the Rust foundation — trigger deployment and full E2E server tests are follow-up work.

New Modules (8 files, ~2,800 lines)

  • src/pg_sync/ops.rs — Op enum (Set, Remove, Add, Delete, QueryOpSet), OpsRow, OpsBatch, EntityOps, SyncMeta, BitdexOps table SQL
  • src/pg_sync/op_dedup.rs — Shared dedup helper: LIFO per (entity_id, field), add/remove cancellation, delete absorption, queryOpSet dedup
  • src/ops_wal.rs — Append-only WAL with CRC32 integrity. WalWriter (append+fsync), WalReader (cursor-based tail, partial record handling)
  • src/ops_processor.rs — Converts ops to engine mutations. PatchPayload builder from ops, queryOpSet resolver (filter parser + query execution), cursor persistence
  • src/pg_sync/ops_poller.rs — V2 poller: reads BitdexOps from PG, deduplicates, POSTs to /ops endpoint with SyncMeta
  • src/pg_sync/trigger_gen.rs — YAML config → PL/pgSQL trigger SQL. Direct tables (IS DISTINCT FROM), multi-value join tables (add/remove), fan-out tables (queryOpSet). Expression interpolation, hash-based reconciliation
  • src/pg_sync/dump.rs — Dump registry: lifecycle tracking (Writing/Loading/Complete/Failed), JSON persistence, config hash change detection
  • tests/sync_v2_integration.rs — 9 integration tests covering the full pipeline

Server Changes

  • POST /api/indexes/{name}/ops — WAL-backed ops ingestion (fsync before 200)
  • GET /api/internal/sync-lag — Sync source lag from SyncMeta
  • GET/PUT/POST/DELETE /api/indexes/{name}/dumps — Dump lifecycle management
  • bitdex_sync_* Prometheus metrics (cursor, max_id, lag, ops_total, wal_bytes)
  • post_ops() added to BitdexClient

Design Docs

Separate PR #81 has the full design docs (pg-sync-v2-final.md, working doc, computed-sort-fields.md).

Test Plan

  • 60 unit tests across all new modules
  • 9 integration tests (WAL roundtrip, dedup, delete absorption, queryOpSet, cursor resume, dump workflow, config change detection, full Civitai config)
  • E2E tests with running server + PG (follow-up)
  • Trigger deployment to staging PG (follow-up)

🤖 Generated with Claude Code

JustMaier and others added 10 commits March 25, 2026 18:06
Core building blocks for the ops-based sync pipeline:

- src/pg_sync/ops.rs: Op enum (Set, Remove, Add, Delete, QueryOpSet),
  OpsRow, OpsBatch, EntityOps, SyncMeta, BitdexOps table SQL
- src/pg_sync/op_dedup.rs: Shared dedup helper — LIFO per (entity_id, field),
  add/remove cancellation, delete absorption, queryOpSet last-wins
- src/ops_wal.rs: Append-only WAL with CRC32 integrity, WalWriter (append+fsync),
  WalReader (cursor-based tail, partial record handling, CRC skip)

Also fixes pre-existing compile error in copy_queries.rs tests (missing
width/height fields on CopyImageRow constructors).

30 tests passing.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
WAL-backed ops ingestion endpoint:
- POST /api/indexes/{name}/ops accepts OpsBatch (ops + sync meta)
- Appends to WAL file via WalWriter, returns 200 only after fsync
- Lazy WAL writer init (created on first POST)
- Stores latest SyncMeta per source for lag monitoring

Sync lag endpoint:
- GET /api/internal/sync-lag returns latest metadata from all sync sources
- Supports cursor position, max_id, lag_rows per source

Both endpoints compile-gated behind pg-sync feature with no-op fallbacks.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Ops processor that reads from WAL and routes to the engine:
- Regular ops (set/remove/add): build PatchPayload with old+new values,
  call engine.patch() — no docstore read needed
- queryOpSet: parse filter string, execute query for matching slots,
  apply nested ops to all matches
- Delete: route to engine.delete()
- Filter parser for queryOpSet: supports eq and in operators

Includes json_to_qvalue converter (serde_json::Value → query::Value)
for the PatchPayload/FieldValue type boundary.

9 tests: scalar update, insert (no old), multi-value add/remove,
delete+queryOpSet skip, filter parsing, value type parsing, cursor
persistence.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…dpoint

New ops_poller.rs replaces outbox_poller for V2 sync:
- Reads from BitdexOps table (JSONB ops arrays) instead of BitdexOutbox
- Cursor managed in PG bitdex_cursors table (not in BitDex)
- Deduplicates via shared dedup_ops() before sending
- POSTs OpsBatch with SyncMeta (cursor, max_id, lag) to /ops endpoint
- Health gate: pauses when BitDex is unreachable

Also adds post_ops() to BitdexClient.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Config-driven PG trigger generation:
- SyncSource struct: direct tables (slot_field + track_fields),
  multi-value join tables (field + value_field), fan-out tables
  (query + query_source)
- SyncConfig: YAML-parseable config with sync_sources array
- SQL generator: CREATE OR REPLACE FUNCTION + CREATE TRIGGER for each source
- Expression interpolation in track_fields: "GREATEST({scannedAt}, {createdAt}) as existedAt"
- {column} placeholder substitution with OLD/NEW prefixes
- Hash-based trigger naming (bitdex_{table}_{hash8}) for reconciliation
- IS DISTINCT FROM checks for UPDATE ops (only emit when value actually changes)
- queryOpSet generation for fan-out tables
- ENABLE ALWAYS on all triggers (CDC compatibility)

11 tests: parsing, column substitution, all three trigger types,
hash change detection, YAML parsing, expression interpolation.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Dump lifecycle management for unified load pipeline:
- DumpRegistry: in-memory + JSON-persisted dump state tracking
- DumpEntry: name, wal_path, status (Writing/Loading/Complete/Failed),
  ops counts, timestamps
- dump_name() + config_hash() for change detection
- Atomic save via temp file rename

Server endpoints:
- GET /dumps — list all dumps with status
- PUT /dumps — register new dump
- POST /dumps/{name}/loaded — signal WAL file complete
- DELETE /dumps/{name} — remove from history
- POST /dumps/clear — clear all

All endpoints feature-gated behind pg-sync with no-op fallbacks.
10 tests: lifecycle, persistence, removal, completion tracking,
config hash determinism, failure handling.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
New unified sync metrics with source label:
- bitdex_sync_cursor_position{source="..."} — current cursor
- bitdex_sync_max_id{source="..."} — max ops table ID
- bitdex_sync_lag_rows{source="..."} — rows behind
- bitdex_sync_ops_total{source="..."} — total ops received
- bitdex_sync_wal_bytes{source="..."} — WAL file size

Metrics populated from SyncMeta in the POST /ops endpoint.
Old bitdex_pgsync_* metrics preserved for backward compat.
Binary rename (bitdex-pg-sync → bitdex-sync) deferred to deployment PR.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
9 integration tests exercising the full ops pipeline:
- WAL roundtrip with dedup (write → read → dedup → verify)
- Delete absorption through WAL
- Add/remove cancellation through WAL
- queryOpSet serialization through WAL
- Cursor resume across multiple appends
- Dump registry full workflow (register → load → complete → persist)
- Dump config change detection (hash mismatch triggers re-dump)
- Full Civitai trigger config (6 sources, all generate valid SQL)
- OpsBatch JSON format roundtrip with SyncMeta

Total: 69 tests across all Sync V2 modules (60 unit + 9 integration).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Spawns a background thread that tails the ops WAL file, reads batches
of up to 10K records, deduplicates via dedup_ops(), and applies
mutations to the engine via apply_ops_batch(). Persists cursor to
disk after each batch. Updates bitdex_sync_wal_bytes metric.

This completes the full ops ingestion chain:
POST /ops → WAL append + fsync → WAL reader thread → engine mutations

The reader sleeps 50ms when no new records are available, and 1s when
no index is loaded yet. Errors are logged and retried.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
CSV dump adapter (csv_ops.rs):
- images_csv_to_wal(): parses images.csv, converts each row to set ops
  (nsfwLevel, type, userId, postId, hasMeta, onSite, minor, poi, existedAt, blockedFor)
- tags_csv_to_wal(), tools_csv_to_wal(): multi-value CSV → add ops
- run_csv_dump(): orchestrates full CSV dump with optional row limits
- Supports batch writing to WAL with configurable batch size
- Limited variants for validation testing with subsets

WAL reader thread (server.rs):
- Spawned on server startup, tails ops.wal, reads batches of 10K
- Deduplicates and applies via apply_ops_batch()
- Persists cursor to disk, updates WAL bytes metric
- Completes the full chain: POST /ops → WAL → reader → engine

2 new tests + previous tests still passing.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@JustMaier JustMaier merged commit fa3ca1c into main Mar 26, 2026
1 check failed
JustMaier added a commit that referenced this pull request Mar 26, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant