diff --git a/docs/ARCHITECTURE.md b/docs/ARCHITECTURE.md index 9c539b76..2e5763e0 100644 --- a/docs/ARCHITECTURE.md +++ b/docs/ARCHITECTURE.md @@ -200,6 +200,30 @@ When a stream table's defining query references another stream table (rather tha **Lifecycle.** ST change buffers are created automatically when a stream table gains its first downstream consumer (`create_st_change_buffer_table()`), and dropped when the last downstream consumer is removed (`drop_st_change_buffer_table()`). On upgrade from pre-v0.11.0, existing ST-to-ST dependencies have their buffers auto-created on the first scheduler tick. Consumed rows are cleaned up by `cleanup_st_change_buffers_by_frontier()` after each successful downstream refresh. +#### Frontier Visibility Holdback (Issue #536) + +The CDC frontier (`pgt_stream_tables.frontier`) is advanced based on **LSN ordering** while the change buffer is read under standard **MVCC visibility**. These two dimensions are orthogonal: a change buffer row may have an LSN below the new frontier yet still be invisible (uncommitted) at the moment the scheduler queries the buffer. + +**Failure scenario (trigger-based CDC only):** +Without holdback, a transaction that inserts into a tracked table and commits *after* the scheduler has captured the tick watermark (`pg_current_wal_lsn()`) will have its change-buffer row permanently skipped on the next tick, because the frontier advanced past the row's LSN while the row was still uncommitted. + +**Fix — `frontier_holdback_mode = 'xmin'` (default):** +Before computing the tick watermark, the scheduler probes `pg_stat_activity` and `pg_prepared_xacts` for the oldest in-progress transaction xmin. If any transaction from before the previous tick is still running, the frontier is held back to the previous tick's safe watermark rather than advancing to `pg_current_wal_lsn()`. This is a single cheap SPI round-trip per scheduler tick (~µs). + +The holdback algorithm (`cdc::classify_holdback`) is purely functional and unit-tested independently of the backend. + +**Configuration:** +- `pg_trickle.frontier_holdback_mode` — `'xmin'` (default, safe), `'none'` (fast but can lose rows), `'lsn:'` (hold back by N bytes, for debugging). +- `pg_trickle.frontier_holdback_warn_seconds` — emit a `WARNING` (at most once per minute) when holdback has been active longer than this many seconds (default: 60). + +**Note:** WAL/logical-replication CDC mode is immune to this issue (commit-LSN ordering is inherently safe). The holdback is skipped when `cdc_mode = 'wal'`. + +**Observability:** Two Prometheus gauges are exposed: +- `pg_trickle_frontier_holdback_lsn_bytes` — how many WAL bytes behind write_lsn the safe frontier currently is. +- `pg_trickle_frontier_holdback_seconds` — age (in seconds) of the oldest in-progress transaction. + +See `plans/safety/PLAN_FRONTIER_VISIBILITY_HOLDBACK.md` for the full design rationale. + ### 4. DVM Engine (`src/dvm/`) The Differential View Maintenance engine is the core of the system. It transforms the defining SQL query into an executable operator tree that can compute deltas efficiently. diff --git a/docs/TROUBLESHOOTING.md b/docs/TROUBLESHOOTING.md index 4aa7f3ee..a3b88303 100644 --- a/docs/TROUBLESHOOTING.md +++ b/docs/TROUBLESHOOTING.md @@ -32,6 +32,7 @@ pg_trickle in production. - [11. Schema Change Broke Stream Table](#11-schema-change-broke-stream-table) - [12. Worker Pool Exhaustion](#12-worker-pool-exhaustion) - [13. Fuse Tripped (Circuit Breaker)](#13-fuse-tripped-circuit-breaker) + - [14. Stream Table Appears Stuck Behind a Long Transaction](#14-stream-table-appears-stuck-behind-a-long-transaction) --- @@ -560,6 +561,100 @@ SELECT pgtrickle.reset_fuse('my_stream_table'); See the [Fuse Circuit Breaker tutorial](tutorials/FUSE_CIRCUIT_BREAKER.md) for details on fuse thresholds and configuration. + +--- + +### 14. Stream Table Appears Stuck Behind a Long Transaction + +**Symptoms:** +- A stream table's `data_timestamp` is not advancing even though the source + table is receiving new inserts. +- The `pg_trickle_frontier_holdback_lsn_bytes` Prometheus gauge is non-zero. +- Server log contains: `pg_trickle: frontier holdback active — the oldest in-progress transaction is Ns old`. + +**Cause:** +`frontier_holdback_mode = 'xmin'` (the default) prevents the scheduler from +advancing the frontier while any in-progress transaction exists that is older +than the previous tick's xmin baseline. A long-running or forgotten session +holding an open transaction will pause frontier advancement for all stream +tables on that PostgreSQL server. + +This is intentional: without the holdback, a transaction that inserts into a +tracked source table and commits *after* the scheduler ticks would have its +change permanently lost (see Issue #536 and `plans/safety/PLAN_FRONTIER_VISIBILITY_HOLDBACK.md`). + +**Diagnosis:** + +```sql +-- Find the oldest in-progress transaction +SELECT pid, usename, state, application_name, + backend_xmin, + EXTRACT(EPOCH FROM (now() - xact_start))::int AS xact_age_secs, + query +FROM pg_stat_activity +WHERE backend_xmin IS NOT NULL + AND state <> 'idle' +ORDER BY xact_start; + +-- Check for prepared (2PC) transactions +SELECT gid, prepared, + EXTRACT(EPOCH FROM (now() - prepared))::int AS age_secs, + owner, database +FROM pg_prepared_xacts +ORDER BY prepared; +``` + +**Resolution:** + +1. **Identify and terminate the blocking session:** + + ```sql + SELECT pg_terminate_backend(pid) + FROM pg_stat_activity + WHERE state = 'idle in transaction' + AND backend_xmin IS NOT NULL + ORDER BY xact_start + LIMIT 1; + ``` + +2. **Rollback a forgotten 2PC transaction:** + + ```sql + ROLLBACK PREPARED 'gid_from_pg_prepared_xacts'; + ``` + +3. **For benchmark or known-safe workloads only**, disable holdback to restore + the pre-fix fast path (risks silent data loss): + + ```sql + ALTER SYSTEM SET pg_trickle.frontier_holdback_mode = 'none'; + SELECT pg_reload_conf(); + ``` + +4. **Suppress the warning** (while keeping holdback active) by raising the + threshold: + + ```sql + ALTER SYSTEM SET pg_trickle.frontier_holdback_warn_seconds = 300; + SELECT pg_reload_conf(); + ``` + +5. **On managed PostgreSQL (RDS, Cloud SQL, Aiven, etc.)** where + `pg_stat_activity` is restricted to the current user's own sessions, + the probe will silently see no other backends and never trigger a + holdback. The server log will contain: + `pg_trickle: frontier holdback probe cannot see other PostgreSQL backends`. + + Fix by granting the monitoring role to the pg_trickle service account: + + ```sql + GRANT pg_monitor TO ; + ``` + + Then restart the pg_trickle scheduler (or reload PostgreSQL) so the new + privilege takes effect. + + --- ## General Diagnostic Workflow @@ -590,3 +685,5 @@ When investigating any issue, follow this sequence: | `pg_trickle.fixed_point_max_iterations` | `10` | Circular pipeline iteration limit | | `pg_trickle.differential_change_ratio_threshold` | `0.5` | Falls back to FULL above this ratio | | `pg_trickle.auto_backoff` | `on` | Stretches intervals up to 8x under load | +| `pg_trickle.frontier_holdback_mode` | `xmin` | `none` disables holdback (unsafe); `xmin` = safe default | +| `pg_trickle.frontier_holdback_warn_seconds` | `60` | Warn after holding back for this many seconds | diff --git a/plans/safety/PLAN_FRONTIER_VISIBILITY_HOLDBACK.md b/plans/safety/PLAN_FRONTIER_VISIBILITY_HOLDBACK.md new file mode 100644 index 00000000..381dada4 --- /dev/null +++ b/plans/safety/PLAN_FRONTIER_VISIBILITY_HOLDBACK.md @@ -0,0 +1,221 @@ +# PLAN — Frontier Visibility Holdback (Issue #536) + +**Status:** Implemented +**Owner:** TBD +**Tracking issue:** [#536](https://github.com/grove/pg-trickle/issues/536) +**Related:** [PLAN_OVERALL_ASSESSMENT_2.md](../PLAN_OVERALL_ASSESSMENT_2.md) (frontier/buffer non-atomic commit), ADR-001 / ADR-002 in [plans/adrs/PLAN_ADRS.md](../adrs/PLAN_ADRS.md) + +--- + +## 1. Problem Statement + +The CDC frontier (`pgt_stream_tables.frontier`) is advanced based on **LSN +ordering only**, while the change buffer is read under standard **MVCC +visibility**. These two dimensions are orthogonal: a change buffer row may +have an LSN below the new frontier yet still be invisible (uncommitted) at +the moment the scheduler queries the buffer. + +### Failure scenario (verified against current code) + +| Step | Actor | Action | +|------|-------|--------| +| T1 | User session A | Begins txn, modifies tracked source table. Trigger inserts row into `pgtrickle_changes.changes_` with `lsn = pg_current_wal_insert_lsn()` ≈ `0/100`. **Txn A does not commit.** | +| T2 | Scheduler tick N | Captures `tick_watermark = pg_current_wal_lsn()` ≈ `0/500` ([src/scheduler.rs#L2634](../../src/scheduler.rs#L2634)). | +| T3 | Refresh worker | Runs `WHERE lsn > prev_lsn AND lsn <= 0/500` ([src/refresh/mod.rs#L3865](../../src/refresh/mod.rs#L3865)). MVCC hides A's uncommitted row. Frontier advanced and persisted to `0/500`. | +| T4 | User session A | Commits. Row at `lsn = 0/100` becomes visible. | +| T5 | Scheduler tick N+1 | Runs `WHERE lsn > 0/500 AND lsn <= …`. **Row at `0/100` is permanently skipped.** | + +This is silent data loss, and there are currently **zero safeguards**: + +- No `pg_stat_activity.backend_xmin` check +- No snapshot-based visibility filter +- No xid/txid column on the change buffer +- No hold-back margin on the frontier +- The reporter's `pg_trickle.tick_watermark_enabled` GUC ([src/config.rs#L663](../../src/config.rs#L663)) only enforces *cross-source* consistency within a tick — it does **not** address this race + +### Practical likelihood + +- Sub-second OLTP transactions: vanishingly rare (txn must straddle a full + scheduler tick — typically hundreds of ms to seconds) +- Long batch jobs / interactive psql sessions / 2PC prepared txns: realistic +- Logical-decoding CDC mode (`src/wal_decoder.rs`) is **immune** because + logical replication only emits committed changes ordered by commit LSN + +### Out of scope for this plan + +- The reporter's "Executor Hook" suggestion — requires kernel patches; not + applicable to a contrib-style extension +- The `BIGSERIAL` cache-1 contention claim — real but a separate tuning + topic; tracked elsewhere +- The non-atomic frontier-vs-buffer commit window already covered in + [PLAN_OVERALL_ASSESSMENT_2.md](../PLAN_OVERALL_ASSESSMENT_2.md) + +--- + +## 2. Goals + +1. **Eliminate the silent data-loss path** under default configuration. +2. Preserve current throughput in the common case (no long transactions). +3. Provide observability so operators can see when holdback is active. +4. Keep the fix entirely inside the extension boundary (no kernel patches, + no `wal_level = logical` requirement for the trigger path). + +--- + +## 3. Proposed Solution + +A two-layer defence: + +### Layer A — Snapshot xmin holdback (primary fix, default ON) + +Before computing `new_lsn` for a refresh cycle, query the cluster's oldest +in-progress transaction xmin and translate it into a safe upper-bound LSN. +The frontier is never allowed to advance past LSNs that could still be +written by a not-yet-committed transaction. + +**Mechanism:** + +```sql +-- One probe per scheduler tick (cheap, ~µs): +SELECT + pg_current_wal_lsn() AS write_lsn, + coalesce(min(backend_xmin), txid_current()) AS oldest_xmin +FROM pg_stat_activity +WHERE backend_xmin IS NOT NULL + AND state <> 'idle' + AND pid <> pg_backend_pid(); +``` + +If `oldest_xmin` is older than the xmin observed at the *previous* tick, +hold the new frontier at the previous tick's `tick_watermark` instead of +advancing to today's `write_lsn`. Concretely: + +- Track per-tick `(tick_watermark_lsn, oldest_xmin)` in shared memory. +- For tick N, allowed upper bound = + `min(write_lsn_N, last_lsn_with_no_older_xmin)`. +- This is conservative — it may delay visibility of new changes by one + tick when long transactions are active, but it never skips a row. + +**Edge cases:** + +- 2PC prepared transactions: covered by `pg_prepared_xacts` — must be + unioned into the xmin probe. +- Hot standby feedback / replication slots: their xmin already shows up in + `pg_stat_activity`; no extra logic needed. +- Replication / logical-decoding CDC mode: skip the holdback (commit-LSN + ordering is already safe). + +### Layer B — Defensive xid stamping (secondary, opt-in) + +Add an optional `xmin xid8` column to new change-buffer tables (gated by +`pg_trickle.cdc_buffer_track_xid`, default `false` for v1). When set, the +trigger writes `pg_current_xact_id()` alongside `lsn`. The refresh delta +query then becomes: + +```sql +WHERE lsn > prev_lsn + AND lsn <= new_lsn + AND pg_xact_status(xmin) = 'committed' -- belt-and-suspenders +``` + +This is redundant under READ COMMITTED but provides: + +- An audit trail (every change row carries its source xid) +- A path to point-in-time / snapshot-consistent reads +- Forward compatibility with a future CSN-based scheme + +Layer B is not required to close the bug; it's documented here so we don't +pick a column layout that would block it later. + +### Layer C — Operator escape hatch + +GUC: `pg_trickle.frontier_holdback_mode` with values: + +| Value | Meaning | +|-------|---------| +| `xmin` (default) | Layer A enabled | +| `none` | Today's behaviour — fast, can lose rows under long txns | +| `lsn:` | Hold back frontier by a fixed N bytes (debugging) | + +This lets benchmark runs disable the probe and lets operators tune for +known-clean OLTP workloads. + +--- + +## 4. Implementation Steps + +1. **Add probe helper** — `src/cdc.rs::compute_safe_upper_bound(write_lsn, prev_oldest_xmin)` + - Single SPI roundtrip per tick. + - Returns `(safe_lsn, current_oldest_xmin)`. + - Pure-logic helper (`classify_holdback`) split out so it's unit-testable + without a backend (per AGENTS.md SPI rules). + +2. **Wire into scheduler** — modify [src/scheduler.rs#L2630-2636](../../src/scheduler.rs#L2630-L2636) + to consult `compute_safe_upper_bound` and feed the result into the + existing `tick_watermark` capping path at + [src/scheduler.rs#L5037-L5041](../../src/scheduler.rs#L5037-L5041). + +3. **Persist last tick xmin** — add `last_tick_oldest_xmin: u64` to + `PgTrickleSharedState` in [src/shmem.rs](../../src/shmem.rs). + +4. **GUC plumbing** — add `pg_trickle.frontier_holdback_mode` in + [src/config.rs](../../src/config.rs) (string GUC parsed once per tick). + +5. **Metrics** — emit two counters via the existing monitoring path: + - `pg_trickle_frontier_holdback_lsn_bytes` (gauge: how far behind write_lsn) + - `pg_trickle_frontier_holdback_seconds` (gauge: oldest in-progress txn age) + +6. **Docs** — + - Add ADR-XX explaining the choice of probe-based holdback over + xid stamping or executor hooks. + - Update [docs/ARCHITECTURE.md](../../docs/ARCHITECTURE.md) CDC section. + - Add troubleshooting entry in [docs/TROUBLESHOOTING.md](../../docs/TROUBLESHOOTING.md) + for "stream table appears stuck behind a long transaction". + +7. **Tests** + - **Unit:** `classify_holdback` logic tables (xmin advances, xmin frozen, + new oldest xmin appears, prepared xact present). + - **Integration (Testcontainers):** spawn a backend that opens a + transaction, performs DML on a tracked table, sleeps; verify + scheduler does not advance frontier past the row's LSN; commit; + verify next tick consumes it. + - **E2E:** add `tests/e2e_long_txn_visibility_tests.rs` covering: + - Standard READ COMMITTED txn straddling a tick + - REPEATABLE READ txn straddling multiple ticks + - 2PC prepared transaction held across many ticks + - GUC `frontier_holdback_mode = none` reproduces the data loss + (regression guard documenting the unsafe mode) + - **Bench:** measure overhead of the per-tick probe (expect <100 µs). + +--- + +## 5. Risks & Trade-offs + +| Risk | Mitigation | +|------|-----------| +| Long-lived backend xmin (e.g. forgotten psql session) freezes frontier indefinitely | Emit `WARNING` once per minute when holdback exceeds `pg_trickle.frontier_holdback_warn_seconds` (default 60s); expose as metric | +| `pg_stat_activity` scan cost on busy clusters | One probe per scheduler tick (default ≥1s); negligible. Cache result for tick duration | +| Replication slots holding xmin | Same effect as a long txn — correct behaviour is to wait; document it | +| Bench results regress slightly | Layer C `none` mode preserves the old fast path for benchmarks | + +--- + +## 6. Acceptance Criteria + +- [ ] New E2E test `e2e_long_txn_visibility_tests.rs` passes with default GUCs. +- [ ] Same test demonstrably fails with `frontier_holdback_mode = none`. +- [ ] No regression on `e2e_bench_cdc_overhead` workload (≤2% throughput delta). +- [ ] `just test-all` green. +- [ ] ADR added; ARCHITECTURE.md and TROUBLESHOOTING.md updated. +- [ ] Issue #536 closed with link to release notes. + +--- + +## 7. Out-of-scope follow-ups (separate issues) + +1. **Sequence cache contention on `change_id`** — evaluate `CACHE 32+` + default; document that gaps are harmless. +2. **Atomic frontier+buffer commit** — covered by + [PLAN_OVERALL_ASSESSMENT_2.md](../PLAN_OVERALL_ASSESSMENT_2.md). +3. **WAL/logical-decoding CDC as default** — already on roadmap; this fix + is for the trigger path that will remain the fallback. diff --git a/src/cdc.rs b/src/cdc.rs index 80b6f292..dd925f35 100644 --- a/src/cdc.rs +++ b/src/cdc.rs @@ -3016,6 +3016,192 @@ pub fn estimate_pending_changes(pgt_id: i64) -> Option { }) } +// ── #536: Frontier visibility holdback ──────────────────────────────────── + +/// Pure-logic holdback classifier — no SPI calls, fully unit-testable. +/// +/// Returns `true` when the frontier should be held back to prevent +/// silently skipping change-buffer rows from a long-running transaction. +/// +/// # Arguments +/// - `prev_oldest_xmin`: the minimum `backend_xmin` observed at the +/// **previous** scheduler tick. `0` means "no baseline yet" (first tick +/// or holdback was just enabled). +/// - `current_oldest_xmin`: the minimum `backend_xmin` across all +/// currently in-progress transactions (regular + 2PC). `0` means +/// there are no in-progress transactions right now. +/// +/// # Decision logic +/// - No in-progress transactions → safe to advance → returns `false`. +/// - First tick (no baseline) and in-progress transaction exists → hold +/// back conservatively → returns `true`. +/// - `current_oldest_xmin <= prev_oldest_xmin` → the same (or an older) +/// transaction from before the last tick is still running → returns `true`. +/// - `current_oldest_xmin > prev_oldest_xmin` → all pre-baseline +/// transactions committed; new ones are safe → returns `false`. +pub fn classify_holdback(prev_oldest_xmin: u64, current_oldest_xmin: u64) -> bool { + if current_oldest_xmin == 0 { + // No in-progress transactions — always safe to advance. + return false; + } + if prev_oldest_xmin == 0 { + // No baseline from previous tick; be conservative. + return true; + } + // Hold back if the oldest still-running xmin is at or before the baseline. + // + // Note: xids are 32-bit and wrap around at ~4 billion. We treat them as + // linear u64 here. True wraparound between two consecutive scheduler ticks + // (100ms–10s apart) would require ~4 billion transactions to commit in that + // window, which is impossible in practice. This assumption holds for PG18. + current_oldest_xmin <= prev_oldest_xmin +} + +/// Set to `true` after the first time we emit a warning about restricted +/// `pg_stat_activity` access (e.g. RDS / Cloud SQL without `pg_monitor`). +/// Prevents log spam -- warn once per server process lifetime. +static WARNED_PG_MONITOR_ACCESS: std::sync::atomic::AtomicBool = + std::sync::atomic::AtomicBool::new(false); + +/// Probe the cluster for the current write LSN and the oldest in-progress +/// transaction xmin, then compute the safe frontier upper bound. +/// +/// This performs a **single SPI round-trip** per scheduler tick. +/// The call must be made inside a `BackgroundWorker::transaction` block. +/// +/// # Arguments +/// - `prev_oldest_xmin`: value from `shmem::last_tick_oldest_xmin()` — +/// the oldest xmin seen at the previous tick. +/// +/// # Returns +/// `(safe_lsn, write_lsn, current_oldest_xmin, oldest_txn_age_secs)` +/// - `safe_lsn`: the LSN the frontier may safely advance to. +/// - `write_lsn`: the actual current write LSN (for holdback metric). +/// - `current_oldest_xmin`: value to persist via +/// `shmem::set_last_tick_holdback_state()` for the next tick. +/// - `oldest_txn_age_secs`: age of the oldest in-progress txn in seconds +/// (0 when no holdback is active, for the warning threshold check). +pub fn compute_safe_upper_bound( + prev_watermark_lsn: Option<&str>, + prev_oldest_xmin: u64, +) -> Result<(String, String, u64, u64), PgTrickleError> { + // One query fetches everything: write LSN, min xmin from active backends, + // min xmin from 2PC prepared transactions, and age of the oldest txn. + // The fourth column counts all other backends visible to this role. + // When it is 0 the role cannot see other sessions -- typical on + // managed services (RDS, Cloud SQL) where pg_stat_activity is + // restricted to the current user's own connections. We emit a + // one-time WARNING so operators can grant pg_monitor. + let result = Spi::connect(|client| { + let rows = client + .select( + // xid (type oid 28) is 32-bit in PostgreSQL up to and including + // PG18. Casting via ::text::bigint is safe because 2^32 fits + // comfortably in a signed bigint. If a future PG version exposes + // xid8 (64-bit) here, this cast will still work but the 32-bit + // wraparound assumption in classify_holdback() should be revisited. + "WITH active_xmins AS ( + SELECT + backend_xmin::text::bigint AS xmin, + EXTRACT(EPOCH FROM (now() - xact_start))::bigint AS age_secs + FROM pg_stat_activity + WHERE backend_xmin IS NOT NULL + AND state <> 'idle' + AND pid <> pg_backend_pid() + UNION ALL + SELECT + transaction::text::bigint AS xmin, + EXTRACT(EPOCH FROM (now() - prepared))::bigint AS age_secs + FROM pg_prepared_xacts + ) + SELECT + pg_current_wal_lsn()::text, + COALESCE(MIN(xmin), 0)::bigint, + COALESCE(MAX(age_secs), 0)::bigint, + (SELECT COUNT(*) FROM pg_stat_activity + WHERE pid <> pg_backend_pid())::bigint AS visible_other_backends + FROM active_xmins", + None, + &[], + ) + .map_err(|e| PgTrickleError::SpiError(e.to_string()))?; + + let mut write_lsn = String::from("0/0"); + let mut min_xmin: i64 = 0; + let mut max_age: i64 = 0; + let mut visible_other_backends: i64 = 0; + + for row in rows { + write_lsn = row + .get::(1) + .unwrap_or(None) + .unwrap_or_else(|| "0/0".to_string()); + min_xmin = row.get::(2).unwrap_or(None).unwrap_or(0); + max_age = row.get::(3).unwrap_or(None).unwrap_or(0); + visible_other_backends = row.get::(4).unwrap_or(None).unwrap_or(0); + } + + Ok::<_, PgTrickleError>((write_lsn, min_xmin, max_age, visible_other_backends)) + })?; + + let (write_lsn, min_xmin_i64, age_secs_i64, visible_other_backends) = result; + + // Detect restricted pg_stat_activity access. A healthy PostgreSQL server + // always has background processes (checkpointer, autovacuum launcher, etc.) + // visible to superusers / pg_monitor members. If we see 0 other backends, + // the role likely cannot read other sessions -- warn once so operators can + // grant pg_monitor to the pg_trickle service account. + if visible_other_backends == 0 + && WARNED_PG_MONITOR_ACCESS + .compare_exchange( + false, + true, + std::sync::atomic::Ordering::Relaxed, + std::sync::atomic::Ordering::Relaxed, + ) + .is_ok() + { + pgrx::warning!( + "pg_trickle: frontier holdback probe cannot see other PostgreSQL backends \ + in pg_stat_activity. On managed services (RDS, Cloud SQL) this means \ + long-running transactions from other sessions will NOT trigger a holdback, \ + risking silent data loss. \ + Fix: GRANT pg_monitor TO ;" + ); + } + let current_oldest_xmin = if min_xmin_i64 > 0 { + min_xmin_i64 as u64 + } else { + 0 + }; + let oldest_txn_age_secs = if age_secs_i64 > 0 { + age_secs_i64 as u64 + } else { + 0 + }; + + let should_hold = classify_holdback(prev_oldest_xmin, current_oldest_xmin); + + let safe_lsn = if should_hold { + // Hold back to the previous watermark when one exists. + match prev_watermark_lsn { + Some(prev) if !prev.is_empty() && prev != "0/0" => prev.to_string(), + // First tick or no previous watermark: advance anyway to avoid + // stalling indefinitely. + _ => write_lsn.clone(), + } + } else { + write_lsn.clone() + }; + + Ok(( + safe_lsn, + write_lsn, + current_oldest_xmin, + oldest_txn_age_secs, + )) +} + #[cfg(test)] mod tests { use super::*; @@ -3560,4 +3746,44 @@ mod tests { fn test_promote_negative_threshold_returns_false() { assert!(!should_promote_inner(999_999, false, "auto", -1)); } + + // ── #536: classify_holdback unit tests ───────────────────────── + + #[test] + fn test_classify_holdback_no_active_txns_never_holds() { + // current_oldest_xmin == 0 means no in-progress transactions. + assert!(!classify_holdback(0, 0)); + assert!(!classify_holdback(100, 0)); + assert!(!classify_holdback(u64::MAX, 0)); + } + + #[test] + fn test_classify_holdback_first_tick_with_active_txn_holds() { + // prev_oldest_xmin == 0 means no baseline yet. + assert!(classify_holdback(0, 50)); + assert!(classify_holdback(0, 1)); + assert!(classify_holdback(0, u64::MAX)); + } + + #[test] + fn test_classify_holdback_same_xmin_holds() { + // Same long-running transaction still active. + assert!(classify_holdback(100, 100)); + } + + #[test] + fn test_classify_holdback_xmin_advanced_safe() { + // All pre-baseline transactions committed; new ones are newer. + assert!(!classify_holdback(100, 101)); + assert!(!classify_holdback(100, 200)); + assert!(!classify_holdback(100, u64::MAX)); + } + + #[test] + fn test_classify_holdback_xmin_retreated_holds() { + // current xmin smaller than prev (defensive — xids are monotone + // but we handle it safely). + assert!(classify_holdback(200, 100)); + assert!(classify_holdback(200, 1)); + } } diff --git a/src/config.rs b/src/config.rs index 39fc94e7..980876e2 100644 --- a/src/config.rs +++ b/src/config.rs @@ -927,6 +927,75 @@ fn normalize_diff_output_format(value: Option) -> DiffOutputFormat { } } +// ── Issue #536: Frontier Visibility Holdback ─────────────────────────────── + +/// #536: Frontier holdback mode for the trigger-based CDC path. +/// +/// Controls whether the scheduler holds back the frontier LSN to avoid +/// silently skipping change-buffer rows from long-running transactions +/// that committed after the previous tick captured the watermark. +/// +/// | Value | Meaning | +/// |-------|---------| +/// | `"xmin"` (default) | Probe `pg_stat_activity` + `pg_prepared_xacts` once per tick and cap the frontier to the safe upper bound. | +/// | `"none"` | No holdback — current fast behaviour. Can silently lose rows under long-running transactions. | +/// | `"lsn:"` | Hold back the frontier by exactly N bytes for debugging. | +pub static PGS_FRONTIER_HOLDBACK_MODE: GucSetting> = + GucSetting::>::new(Some(c"xmin")); + +/// #536: Emit a WARNING when the frontier holdback has been active for +/// longer than this many seconds. +/// +/// A holdback occurs when a long-running (or forgotten) transaction keeps +/// the scheduler from advancing the frontier. When this threshold is +/// exceeded, a WARNING is emitted at most once per minute so operators +/// can identify the blocking session. +/// +/// Set to 0 to disable the warning (not recommended for production). +pub static PGS_FRONTIER_HOLDBACK_WARN_SECONDS: GucSetting = GucSetting::::new(60); + +/// #536: Frontier holdback mode enum. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum FrontierHoldbackMode { + /// Probe pg_stat_activity + pg_prepared_xacts and cap to safe LSN (default). + Xmin, + /// No holdback — fast but can lose rows under long transactions. + None, + /// Hold back the frontier by exactly N bytes (debugging only). + LsnBytes(u64), + /// Sentinel: `lsn:` was present but the number failed to parse. + /// The accessor converts this to `Xmin` after emitting a WARNING. + InvalidLsn, +} + +impl FrontierHoldbackMode { + /// Return a human-readable representation of the mode. + /// Unlike `as_str()` on simpler enums, this allocates for `LsnBytes` + /// to include the actual byte count (e.g. `"lsn:1048576"`). + pub fn display_string(&self) -> String { + match self { + FrontierHoldbackMode::Xmin => "xmin".to_string(), + FrontierHoldbackMode::None => "none".to_string(), + FrontierHoldbackMode::LsnBytes(n) => format!("lsn:{n}"), + FrontierHoldbackMode::InvalidLsn => "invalid".to_string(), + } + } +} + +pub fn normalize_frontier_holdback_mode(value: Option) -> FrontierHoldbackMode { + match value.as_deref().map(str::to_ascii_lowercase).as_deref() { + Some("none") => FrontierHoldbackMode::None, + Some(s) if s.starts_with("lsn:") => { + let tail = &s["lsn:".len()..]; + match tail.parse::() { + Ok(bytes) => FrontierHoldbackMode::LsnBytes(bytes), + Err(_) => FrontierHoldbackMode::InvalidLsn, + } + } + _ => FrontierHoldbackMode::Xmin, + } +} + /// Register all GUC variables. Called from `_PG_init()`. pub fn register_gucs() { GucRegistry::define_bool_guc( @@ -1878,6 +1947,34 @@ pub fn register_gucs() { GucContext::Suset, GucFlags::default(), ); + + // #536: Frontier visibility holdback GUCs. + GucRegistry::define_string_guc( + c"pg_trickle.frontier_holdback_mode", + c"Frontier holdback mode to prevent silent data loss from long-running transactions.", + c"'xmin' (default): probe pg_stat_activity + pg_prepared_xacts once per tick and \ + cap the frontier to the safe upper bound, preventing change-buffer rows from \ + uncommitted transactions from being silently skipped. \ + 'none': no holdback (fast but can lose rows under long-lived transactions). \ + 'lsn:': hold back by exactly N bytes (debugging only).", + &PGS_FRONTIER_HOLDBACK_MODE, + GucContext::Suset, + GucFlags::default(), + ); + + GucRegistry::define_int_guc( + c"pg_trickle.frontier_holdback_warn_seconds", + c"Emit a WARNING when frontier holdback exceeds this many seconds (0 = disabled).", + c"When a long-running or forgotten transaction keeps the scheduler from advancing \ + the frontier for longer than this many seconds, a WARNING is emitted at most \ + once per minute to help operators identify the blocking session. \ + Set to 0 to disable the warning.", + &PGS_FRONTIER_HOLDBACK_WARN_SECONDS, + 0, // min (0 = disabled) + 3600, // max (1 hour) + GucContext::Suset, + GucFlags::default(), + ); } // ── Convenience accessors ────────────────────────────────────────────────── @@ -2352,12 +2449,35 @@ pub fn pg_trickle_diff_output_format() -> DiffOutputFormat { ) } +/// #536: Returns the current frontier holdback mode. +pub fn pg_trickle_frontier_holdback_mode() -> FrontierHoldbackMode { + let raw = PGS_FRONTIER_HOLDBACK_MODE + .get() + .and_then(|cs| cs.to_str().ok().map(str::to_owned)); + let mode = normalize_frontier_holdback_mode(raw.clone()); + if matches!(mode, FrontierHoldbackMode::InvalidLsn) { + pgrx::warning!( + "pg_trickle: invalid frontier_holdback_mode '{}' — \ + expected 'lsn:' with a valid integer; defaulting to 'xmin'", + raw.as_deref().unwrap_or("") + ); + return FrontierHoldbackMode::Xmin; + } + mode +} + +/// #536: Returns the frontier holdback warning threshold in seconds (0 = disabled). +pub fn pg_trickle_frontier_holdback_warn_seconds() -> i32 { + PGS_FRONTIER_HOLDBACK_WARN_SECONDS.get() +} + #[cfg(test)] mod tests { use super::{ - CdcTriggerMode, DiffOutputFormat, DogFeedingAutoApply, MergeJoinStrategy, MergeStrategy, - ParallelRefreshMode, RefreshStrategy, UserTriggersMode, VolatileFunctionPolicy, - normalize_cdc_trigger_mode, normalize_diff_output_format, normalize_dog_feeding_auto_apply, + CdcTriggerMode, DiffOutputFormat, DogFeedingAutoApply, FrontierHoldbackMode, + MergeJoinStrategy, MergeStrategy, ParallelRefreshMode, RefreshStrategy, UserTriggersMode, + VolatileFunctionPolicy, normalize_cdc_trigger_mode, normalize_diff_output_format, + normalize_dog_feeding_auto_apply, normalize_frontier_holdback_mode, normalize_merge_join_strategy, normalize_merge_strategy, normalize_parallel_refresh_mode, normalize_recursive_max_depth, normalize_refresh_strategy, normalize_user_triggers_mode, normalize_volatile_function_policy, threshold_mb_to_bytes, @@ -2873,4 +2993,55 @@ mod tests { ); } } + + // ── #536: FrontierHoldbackMode normalizer tests ────────────────── + + #[test] + fn test_normalize_frontier_holdback_mode_defaults_to_xmin() { + assert_eq!( + normalize_frontier_holdback_mode(None), + FrontierHoldbackMode::Xmin + ); + assert_eq!( + normalize_frontier_holdback_mode(Some("xmin".to_string())), + FrontierHoldbackMode::Xmin + ); + assert_eq!( + normalize_frontier_holdback_mode(Some("XMIN".to_string())), + FrontierHoldbackMode::Xmin + ); + assert_eq!( + normalize_frontier_holdback_mode(Some("unexpected".to_string())), + FrontierHoldbackMode::Xmin + ); + } + + #[test] + fn test_normalize_frontier_holdback_mode_none() { + assert_eq!( + normalize_frontier_holdback_mode(Some("none".to_string())), + FrontierHoldbackMode::None + ); + assert_eq!( + normalize_frontier_holdback_mode(Some("NONE".to_string())), + FrontierHoldbackMode::None + ); + } + + #[test] + fn test_normalize_frontier_holdback_mode_lsn_bytes() { + assert_eq!( + normalize_frontier_holdback_mode(Some("lsn:1048576".to_string())), + FrontierHoldbackMode::LsnBytes(1_048_576) + ); + assert_eq!( + normalize_frontier_holdback_mode(Some("lsn:0".to_string())), + FrontierHoldbackMode::LsnBytes(0) + ); + // Invalid number → returns InvalidLsn sentinel (accessor converts to Xmin + warns) + assert_eq!( + normalize_frontier_holdback_mode(Some("lsn:notanumber".to_string())), + FrontierHoldbackMode::InvalidLsn + ); + } } diff --git a/src/monitor.rs b/src/monitor.rs index 544b7c58..2cf4696c 100644 --- a/src/monitor.rs +++ b/src/monitor.rs @@ -696,6 +696,25 @@ pub(crate) fn collect_metrics_text() -> String { out.push_str(&format!("pg_trickle_active{{{labels}}} {is_active}\n")); } + // #536: Frontier holdback gauges + let (holdback_lsn, holdback_age) = crate::shmem::read_holdback_metrics(); + out.push_str( + "# HELP pg_trickle_frontier_holdback_lsn_bytes \ + How many WAL bytes behind the write LSN the safe frontier currently is (0 = no holdback)\n", + ); + out.push_str("# TYPE pg_trickle_frontier_holdback_lsn_bytes gauge\n"); + out.push_str(&format!( + "pg_trickle_frontier_holdback_lsn_bytes {holdback_lsn}\n" + )); + out.push_str( + "# HELP pg_trickle_frontier_holdback_seconds \ + Age in seconds of the oldest in-progress transaction causing a holdback (0 = no holdback)\n", + ); + out.push_str("# TYPE pg_trickle_frontier_holdback_seconds gauge\n"); + out.push_str(&format!( + "pg_trickle_frontier_holdback_seconds {holdback_age}\n" + )); + // OpenMetrics requires the exposition to end with # EOF out.push_str("# EOF\n"); out diff --git a/src/scheduler.rs b/src/scheduler.rs index 9c4a9de8..5e867854 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -715,6 +715,210 @@ fn parse_worker_extra(extra: &str) -> Option<(String, i64)> { Some((db_name, job_id)) } +// ── #536: Frontier holdback tick watermark helpers ───────────────────────── + +/// Unix-epoch timestamp of the last holdback-active WARNING, used to +/// rate-limit warnings to at most one per minute. +static LAST_HOLDBACK_WARN_SECS: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0); + +/// Compute the tick watermark for the **coordinator** (main scheduler loop). +/// +/// Applies the `frontier_holdback_mode` GUC logic: +/// - `"none"` / watermark disabled: use raw `pg_current_wal_lsn()`. +/// - `"xmin"`: probe `pg_stat_activity` + `pg_prepared_xacts` and hold back +/// if a long-running transaction would cause data loss. +/// - `"lsn:"`: hold back by exactly N bytes. +/// +/// Side effects (when holdback fires): +/// - Updates `shmem::last_tick_oldest_xmin` for the next tick. +/// - Updates `shmem::last_tick_safe_lsn_u64` for dynamic workers. +/// - Updates the holdback gauge metrics. +/// - Emits a WARNING when holdback age exceeds the warn threshold. +/// +/// # Arguments +/// - `prev_watermark_lsn`: the safe LSN from the previous tick, if any. +/// +/// # Returns +/// `(tick_watermark, current_oldest_xmin, oldest_txn_age_secs)` +fn compute_coordinator_tick_watermark( + prev_watermark_lsn: Option<&str>, +) -> (Option, u64, u64) { + if !config::pg_trickle_tick_watermark_enabled() { + return (None, 0, 0); + } + + let mode = config::pg_trickle_frontier_holdback_mode(); + + match mode { + config::FrontierHoldbackMode::None => { + let lsn = Spi::get_one::("SELECT pg_current_wal_lsn()::text").unwrap_or(None); + // Store raw write LSN for workers. + if let Some(ref l) = lsn { + shmem::set_last_tick_safe_lsn(version::lsn_to_u64(l)); + } + shmem::update_holdback_metrics(0, 0); + (lsn, 0, 0) + } + + config::FrontierHoldbackMode::Xmin | config::FrontierHoldbackMode::InvalidLsn => { + // Skip the probe when CDC mode is WAL -- commit-LSN ordering + // is already safe in logical-replication mode. + if config::pg_trickle_cdc_mode() == "wal" { + let lsn = + Spi::get_one::("SELECT pg_current_wal_lsn()::text").unwrap_or(None); + if let Some(ref l) = lsn { + shmem::set_last_tick_safe_lsn(version::lsn_to_u64(l)); + } + shmem::update_holdback_metrics(0, 0); + return (lsn, 0, 0); + } + + let prev_oldest_xmin = shmem::last_tick_oldest_xmin(); + + match cdc::compute_safe_upper_bound(prev_watermark_lsn, prev_oldest_xmin) { + Ok((safe_lsn, write_lsn, current_oldest_xmin, age_secs)) => { + // Persist for next tick and for dynamic workers under a + // single lock so workers never see xmin/LSN out of sync. + let safe_u64 = version::lsn_to_u64(&safe_lsn); + shmem::set_last_tick_holdback_state(current_oldest_xmin, safe_u64); + + // Update holdback gauge metrics. + let write_u64 = version::lsn_to_u64(&write_lsn); + let holdback_bytes = write_u64.saturating_sub(safe_u64); + shmem::update_holdback_metrics(holdback_bytes, age_secs); + + // Warn when holdback has been active longer than the threshold. + if holdback_bytes > 0 { + emit_holdback_warning_if_needed(age_secs); + } + + (Some(safe_lsn), current_oldest_xmin, age_secs) + } + Err(e) => { + // On probe failure, hold at the previous watermark (if known) + // rather than advancing to the raw write LSN. Advancing on + // failure is the exact unsafe behaviour the holdback is meant + // to prevent — the probe may have failed precisely because a + // long-running transaction exists. + warning!( + "pg_trickle: holdback probe failed ({}); holding at previous watermark", + e + ); + let safe_lsn = match prev_watermark_lsn { + Some(prev) => { + // Re-use last known-safe watermark. + let u = version::lsn_to_u64(prev); + shmem::set_last_tick_safe_lsn(u); + Some(prev.to_string()) + } + None => { + // First tick — no previous watermark; fall back to + // write LSN to avoid stalling forever on startup. + let lsn = Spi::get_one::("SELECT pg_current_wal_lsn()::text") + .unwrap_or(None); + if let Some(ref l) = lsn { + shmem::set_last_tick_safe_lsn(version::lsn_to_u64(l)); + } + lsn + } + }; + shmem::update_holdback_metrics(0, 0); + (safe_lsn, 0, 0) + } + } + } + + config::FrontierHoldbackMode::LsnBytes(offset_bytes) => { + let write_lsn_str = Spi::get_one::("SELECT pg_current_wal_lsn()::text") + .unwrap_or(None) + .unwrap_or_else(|| "0/0".to_string()); + let write_u64 = version::lsn_to_u64(&write_lsn_str); + let safe_u64 = write_u64.saturating_sub(offset_bytes); + let safe_lsn = version::u64_to_lsn(safe_u64); + shmem::set_last_tick_safe_lsn(safe_u64); + shmem::update_holdback_metrics(offset_bytes.min(write_u64), 0); + (Some(safe_lsn), 0, 0) + } + } +} + +/// Compute the tick watermark for a **dynamic refresh worker**. +/// +/// Dynamic workers run after the coordinator and do not have access to +/// the previous tick's `prev_watermark_lsn`. They read the coordinator- +/// computed safe watermark from shared memory and cap it with the current +/// write LSN (in case the worker starts significantly after the tick). +/// +/// When holdback is disabled or shmem is unavailable, falls back to +/// `pg_current_wal_lsn()`. +fn compute_worker_tick_watermark() -> Option { + if !config::pg_trickle_tick_watermark_enabled() { + return None; + } + + let mode = config::pg_trickle_frontier_holdback_mode(); + + match mode { + config::FrontierHoldbackMode::None => { + Spi::get_one::("SELECT pg_current_wal_lsn()::text").unwrap_or(None) + } + + config::FrontierHoldbackMode::Xmin + | config::FrontierHoldbackMode::LsnBytes(_) + | config::FrontierHoldbackMode::InvalidLsn => { + // Read the safe watermark the coordinator stored in shmem. + let safe_lsn_u64 = shmem::last_tick_safe_lsn_u64(); + + if safe_lsn_u64 == 0 { + // No coordinator value yet — fall back to raw write LSN. + return Spi::get_one::("SELECT pg_current_wal_lsn()::text").unwrap_or(None); + } + + // Cap with current write LSN: don't advance past what's available now. + let write_lsn_str = Spi::get_one::("SELECT pg_current_wal_lsn()::text") + .unwrap_or(None) + .unwrap_or_else(|| "0/0".to_string()); + let write_u64 = version::lsn_to_u64(&write_lsn_str); + let effective_u64 = safe_lsn_u64.min(write_u64); + Some(version::u64_to_lsn(effective_u64)) + } + } +} + +/// Rate-limited WARNING for when frontier holdback has been active longer +/// than `pg_trickle.frontier_holdback_warn_seconds`. +/// +/// Emits at most one WARNING per minute. +fn emit_holdback_warning_if_needed(oldest_txn_age_secs: u64) { + let warn_secs = config::pg_trickle_frontier_holdback_warn_seconds(); + if warn_secs <= 0 { + return; + } + if oldest_txn_age_secs < warn_secs as u64 { + return; + } + + // Rate-limit: emit at most once per minute. + let now_secs = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + let last_warn = LAST_HOLDBACK_WARN_SECS.load(std::sync::atomic::Ordering::Relaxed); + if now_secs.saturating_sub(last_warn) < 60 { + return; + } + LAST_HOLDBACK_WARN_SECS.store(now_secs, std::sync::atomic::Ordering::Relaxed); + + pgrx::warning!( + "pg_trickle: frontier holdback active — the oldest in-progress transaction is {}s old \ + (threshold: {}s). Stream tables may lag behind. \ + Check pg_stat_activity for long-running sessions. \ + To suppress: SET pg_trickle.frontier_holdback_warn_seconds = 0.", + oldest_txn_age_secs, + warn_secs, + ); +} + /// Execute a singleton unit: refresh a single stream table using the existing inline path. fn execute_worker_singleton(job: &SchedulerJob) -> RefreshOutcome { let pgt_id = job.root_pgt_id; @@ -794,11 +998,8 @@ fn execute_worker_singleton(job: &SchedulerJob) -> RefreshOutcome { return RefreshOutcome::Success; } - let tick_watermark: Option = if config::pg_trickle_tick_watermark_enabled() { - Spi::get_one::("SELECT pg_current_wal_lsn()::text").unwrap_or(None) - } else { - None - }; + // #536: Use holdback-aware watermark for dynamic workers. + let tick_watermark: Option = compute_worker_tick_watermark(); let has_changes = has_table_source_changes(&st) || has_stream_table_source_changes(&st); let action = refresh::determine_refresh_action(&st, has_changes); @@ -836,11 +1037,8 @@ fn execute_worker_atomic_group(job: &SchedulerJob, is_repeatable_read: bool) -> let subtxn = SubTransaction::begin(); - let tick_watermark: Option = if config::pg_trickle_tick_watermark_enabled() { - Spi::get_one::("SELECT pg_current_wal_lsn()::text").unwrap_or(None) - } else { - None - }; + // #536: Use holdback-aware watermark for dynamic workers. + let tick_watermark: Option = compute_worker_tick_watermark(); let mut refreshed_count: usize = 0; // BOOT-4: Build gated-source set once for the whole group. @@ -994,11 +1192,8 @@ fn execute_worker_immediate_closure(job: &SchedulerJob) -> RefreshOutcome { return RefreshOutcome::Success; } - let tick_watermark: Option = if config::pg_trickle_tick_watermark_enabled() { - Spi::get_one::("SELECT pg_current_wal_lsn()::text").unwrap_or(None) - } else { - None - }; + // #536: Use holdback-aware watermark for dynamic workers. + let tick_watermark: Option = compute_worker_tick_watermark(); let has_changes = has_table_source_changes(&st) || has_stream_table_source_changes(&st); let action = refresh::determine_refresh_action(&st, has_changes); @@ -1038,11 +1233,8 @@ fn execute_worker_cyclic_scc(job: &SchedulerJob) -> RefreshOutcome { } } - let tick_watermark: Option = if config::pg_trickle_tick_watermark_enabled() { - Spi::get_one::("SELECT pg_current_wal_lsn()::text").unwrap_or(None) - } else { - None - }; + // #536: Use holdback-aware watermark for dynamic workers. + let tick_watermark: Option = compute_worker_tick_watermark(); let mut prev_row_counts: HashMap = member_ids .iter() @@ -1161,11 +1353,8 @@ fn execute_worker_fused_chain(job: &SchedulerJob) -> RefreshOutcome { job.job_id, ); - let tick_watermark: Option = if config::pg_trickle_tick_watermark_enabled() { - Spi::get_one::("SELECT pg_current_wal_lsn()::text").unwrap_or(None) - } else { - None - }; + // #536: Use holdback-aware watermark for dynamic workers. + let tick_watermark: Option = compute_worker_tick_watermark(); // BOOT-4: Build gated-source set once for the whole group. let gated_oids = load_gated_source_oids(); @@ -2146,6 +2335,21 @@ pub extern "C-unwind" fn pg_trickle_scheduler_main(_arg: pg_sys::Datum) { // with a fresh snapshot captures the committed edge changes. let mut pending_full_rebuild = false; + // #536: Previous tick's safe frontier watermark, used by the holdback + // algorithm to determine whether any long-running transaction spans a + // tick boundary. Seeded from shared memory on scheduler startup so + // that the first post-restart tick preserves the last known-safe LSN + // (avoids a one-tick window where the frontier could advance past an + // in-flight transaction that was already open before the restart). + let mut prev_tick_watermark: Option = { + let last_safe = crate::shmem::last_tick_safe_lsn_u64(); + if last_safe != 0 { + Some(crate::version::u64_to_lsn(last_safe)) + } else { + None + } + }; + // Per-ST retry state (in-memory only, reset on scheduler restart) let mut retry_states: HashMap = HashMap::new(); let retry_policy = RetryPolicy::default(); @@ -2627,14 +2831,13 @@ pub extern "C-unwind" fn pg_trickle_scheduler_main(_arg: pg_sys::Datum) { // Run the scheduler tick inside a transaction BackgroundWorker::transaction(AssertUnwindSafe(|| { - // CSS1: Capture tick watermark for cross-source snapshot consistency. - // All refreshes in this tick will cap their LSN consumption to this value, - // ensuring every stream table in the tick shares the same consistent LSN view. - let tick_watermark: Option = if config::pg_trickle_tick_watermark_enabled() { - Spi::get_one::("SELECT pg_current_wal_lsn()::text").unwrap_or(None) - } else { - None - }; + // CSS1 / #536: Capture tick watermark for cross-source snapshot consistency + // with frontier holdback to prevent silent data loss from long-running + // transactions that span a tick boundary. + let (tick_watermark, _current_oldest_xmin, _holdback_age_secs) = + compute_coordinator_tick_watermark(prev_tick_watermark.as_deref()); + // Persist this tick's safe watermark for the next tick's holdback comparison. + prev_tick_watermark.clone_from(&tick_watermark); // Step A: Check if DAG needs rebuild let current_version = shmem::current_dag_version(); diff --git a/src/shmem.rs b/src/shmem.rs index 79dbb21f..73a13294 100644 --- a/src/shmem.rs +++ b/src/shmem.rs @@ -38,6 +38,21 @@ pub struct PgTrickleSharedState { /// When true, more DDL events arrived than the ring can hold. /// The scheduler must do a full O(V+E) DAG rebuild. inv_overflow: bool, + + // ── #536: Frontier visibility holdback ─────────────────────────── + /// The oldest `backend_xmin` (including 2PC) seen at the previous + /// scheduler tick. Used by the xmin holdback algorithm to detect + /// long-running transactions that span a tick boundary. + /// + /// 0 means "not yet recorded" (first tick or holdback disabled). + pub last_tick_oldest_xmin: u64, + + /// The safe frontier LSN upper bound computed at the last scheduler tick, + /// stored as a raw u64 (see `version::lsn_to_u64` / `version::u64_to_lsn`). + /// + /// Dynamic refresh workers read this value and use it (capped with their + /// own current write_lsn) as their tick watermark. 0 means unset. + pub last_tick_safe_lsn_u64: u64, } impl Default for PgTrickleSharedState { @@ -50,6 +65,8 @@ impl Default for PgTrickleSharedState { inv_ring: [0; INVALIDATION_RING_CAPACITY], inv_count: 0, inv_overflow: false, + last_tick_oldest_xmin: 0, + last_tick_safe_lsn_u64: 0, } } } @@ -136,6 +153,23 @@ pub static TEMPLATE_CACHE_L1_HITS: PgAtomic = pub static TEMPLATE_CACHE_EVICTIONS: PgAtomic = unsafe { PgAtomic::new(c"pg_trickle_template_cache_evictions") }; +/// #536: Current frontier holdback in LSN bytes (gauge). +/// +/// Set to 0 when the frontier is not held back. +/// Set to `write_lsn - safe_lsn` in bytes when a long-running transaction +/// is preventing the frontier from advancing. +// SAFETY: PgAtomic::new requires a static CStr name. +pub static FRONTIER_HOLDBACK_LSN_BYTES: PgAtomic = + unsafe { PgAtomic::new(c"pg_trickle_frontier_holdback_lsn") }; + +/// #536: Age (in seconds) of the oldest in-progress transaction contributing +/// to a frontier holdback (gauge). +/// +/// Set to 0 when no holdback is active. +// SAFETY: PgAtomic::new requires a static CStr name. +pub static FRONTIER_HOLDBACK_AGE_SECS: PgAtomic = + unsafe { PgAtomic::new(c"pg_trickle_frontier_holdback_age") }; + /// Register shared memory allocations. Called from `_PG_init()`. pub fn init_shared_memory() { pg_shmem_init!(PGS_STATE); @@ -149,6 +183,8 @@ pub fn init_shared_memory() { pg_shmem_init!(TEMPLATE_CACHE_MISSES); pg_shmem_init!(TEMPLATE_CACHE_L1_HITS); pg_shmem_init!(TEMPLATE_CACHE_EVICTIONS); + pg_shmem_init!(FRONTIER_HOLDBACK_LSN_BYTES); + pg_shmem_init!(FRONTIER_HOLDBACK_AGE_SECS); SHMEM_INITIALIZED.store(true, std::sync::atomic::Ordering::Relaxed); } @@ -442,6 +478,92 @@ pub fn current_reconcile_epoch() -> u64 { .load(std::sync::atomic::Ordering::Relaxed) } +// ── #536: Frontier visibility holdback helpers ───────────────────────────── + +/// Read the oldest-xmin seen at the previous scheduler tick. +/// +/// Returns 0 when shmem is unavailable or no baseline has been recorded. +pub fn last_tick_oldest_xmin() -> u64 { + if !is_shmem_available() { + return 0; + } + PGS_STATE.share().last_tick_oldest_xmin +} + +/// Persist the oldest-xmin from the current tick so next tick can compare. +pub fn set_last_tick_oldest_xmin(xmin: u64) { + if !is_shmem_available() { + return; + } + PGS_STATE.exclusive().last_tick_oldest_xmin = xmin; +} + +/// Persist both the oldest-xmin and the safe frontier LSN **atomically** under +/// a single exclusive lock acquisition so that dynamic workers never see a +/// state where the xmin has advanced but the safe LSN has not yet been updated +/// (or vice-versa). +pub fn set_last_tick_holdback_state(xmin: u64, lsn_u64: u64) { + if !is_shmem_available() { + return; + } + let mut state = PGS_STATE.exclusive(); + state.last_tick_oldest_xmin = xmin; + state.last_tick_safe_lsn_u64 = lsn_u64; +} + +/// Read the safe frontier LSN (u64) written by the coordinator at the last tick. +/// +/// Dynamic refresh workers use this as a conservative upper bound. +/// Returns 0 when shmem is unavailable or unset. +pub fn last_tick_safe_lsn_u64() -> u64 { + if !is_shmem_available() { + return 0; + } + PGS_STATE.share().last_tick_safe_lsn_u64 +} + +/// Persist the safe frontier LSN (u64) so dynamic workers can read it. +pub fn set_last_tick_safe_lsn(lsn_u64: u64) { + if !is_shmem_available() { + return; + } + // Update only the cached safe LSN under the shared-memory lock. + // When both xmin and LSN must be updated together, use set_last_tick_holdback_state(). + PGS_STATE.exclusive().last_tick_safe_lsn_u64 = lsn_u64; +} + +/// Update the holdback gauge metrics. +/// +/// - `lsn_bytes`: how many bytes behind write_lsn the safe frontier is. +/// - `age_secs`: age (seconds) of the oldest in-progress transaction. +pub fn update_holdback_metrics(lsn_bytes: u64, age_secs: u64) { + if !is_shmem_available() { + return; + } + FRONTIER_HOLDBACK_LSN_BYTES + .get() + .store(lsn_bytes, std::sync::atomic::Ordering::Relaxed); + FRONTIER_HOLDBACK_AGE_SECS + .get() + .store(age_secs, std::sync::atomic::Ordering::Relaxed); +} + +/// Read the current holdback gauge metrics. +/// +/// Returns `(lsn_bytes, age_secs)`. +pub fn read_holdback_metrics() -> (u64, u64) { + if !is_shmem_available() { + return (0, 0); + } + let lsn = FRONTIER_HOLDBACK_LSN_BYTES + .get() + .load(std::sync::atomic::Ordering::Relaxed); + let age = FRONTIER_HOLDBACK_AGE_SECS + .get() + .load(std::sync::atomic::Ordering::Relaxed); + (lsn, age) +} + /// Flag indicating whether shared memory was initialized via _PG_init. static SHMEM_INITIALIZED: std::sync::atomic::AtomicBool = std::sync::atomic::AtomicBool::new(false); diff --git a/src/version.rs b/src/version.rs index fe23d5cb..d2c4df1b 100644 --- a/src/version.rs +++ b/src/version.rs @@ -140,6 +140,20 @@ pub fn lsn_gt(a: &str, b: &str) -> bool { parse_lsn(a) > parse_lsn(b) } +/// Parse a PostgreSQL LSN string (`"X/Y"`) into a `u64`. +#[inline] +pub fn lsn_to_u64(s: &str) -> u64 { + parse_lsn(s) +} + +/// Format a `u64` LSN value back into PostgreSQL `"X/Y"` notation. +#[inline] +pub fn u64_to_lsn(v: u64) -> String { + let hi = (v >> 32) as u32; + let lo = v as u32; + format!("{hi:X}/{lo:08X}") +} + /// Parse a PostgreSQL LSN string (`"X/Y"`) into a `u64`. #[inline] fn parse_lsn(s: &str) -> u64 { @@ -472,4 +486,30 @@ mod tests { ts ); } + + #[test] + fn test_lsn_to_u64_round_trip() { + // Round-trip: parse then format + assert_eq!(u64_to_lsn(lsn_to_u64("1/00000500")), "1/00000500"); + assert_eq!(u64_to_lsn(lsn_to_u64("0/00000001")), "0/00000001"); + assert_eq!(u64_to_lsn(lsn_to_u64("FF/FFFFFFFF")), "FF/FFFFFFFF"); + assert_eq!(u64_to_lsn(lsn_to_u64("0/0")), "0/00000000"); + } + + #[test] + fn test_lsn_to_u64_known_values() { + // High segment 1, low 0x500 = u64 value 0x1_0000_0500 + assert_eq!(lsn_to_u64("1/00000500"), 0x1_0000_0500); + assert_eq!(lsn_to_u64("0/00000001"), 1); + assert_eq!(lsn_to_u64("0/0"), 0); + } + + #[test] + fn test_u64_to_lsn_format() { + // u64_to_lsn uses uppercase hex with zero-padded 8-char low half, + // matching PostgreSQL's pg_lsn output format. + assert_eq!(u64_to_lsn(0x1_0000_0500), "1/00000500"); + assert_eq!(u64_to_lsn(1), "0/00000001"); + assert_eq!(u64_to_lsn(0), "0/00000000"); + } } diff --git a/tests/e2e_long_txn_visibility_tests.rs b/tests/e2e_long_txn_visibility_tests.rs new file mode 100644 index 00000000..a63adbfb --- /dev/null +++ b/tests/e2e_long_txn_visibility_tests.rs @@ -0,0 +1,453 @@ +//! E2E tests for frontier visibility holdback (Issue #536). +//! +//! These tests verify that the `pg_trickle.frontier_holdback_mode` GUC +//! prevents silent data loss when a long-running transaction inserts into +//! a tracked source table while the scheduler is advancing the frontier. +//! +//! # Background +//! +//! The CDC trigger path records changes with `lsn = pg_current_wal_insert_lsn()` +//! at trigger-fire time. If a transaction inserts a row at LSN 100 but does +//! not commit before the scheduler captures `write_lsn = 500` and sets the +//! frontier to 500, the next tick queries `lsn > 500` and permanently misses +//! the row at LSN 100. +//! +//! With `frontier_holdback_mode = 'xmin'` (the default), the scheduler probes +//! `pg_stat_activity` + `pg_prepared_xacts` and refuses to advance the frontier +//! past the last safe LSN while any in-progress transaction exists. +//! +//! **Test matrix:** +//! 1. `test_holdback_gucs_registered` — GUC defaults are correct. +//! 2. `test_holdback_read_committed_long_txn` — READ COMMITTED txn spanning a tick. +//! 3. `test_holdback_repeatable_read_long_txn` — REPEATABLE READ txn spanning ticks. +//! 4. `test_holdback_prepared_transaction` — 2PC PREPARE spanning many ticks. +//! 5. `test_holdback_none_mode_regression_guard` — demonstrates data loss with +//! `frontier_holdback_mode = 'none'` (regression guard). +//! +//! These tests require the full E2E Docker image (scheduler background worker). + +mod e2e; + +use e2e::E2eDb; +use std::time::Duration; + +// ── Shared setup helper ──────────────────────────────────────────────────── + +/// Set up a fast scheduler (100 ms tick, 1 s minimum schedule) and wait for +/// the pg_trickle scheduler BGW to appear in pg_stat_activity. +async fn configure_fast_scheduler(db: &E2eDb) { + db.execute("ALTER SYSTEM SET pg_trickle.scheduler_interval_ms = 100") + .await; + db.execute("ALTER SYSTEM SET pg_trickle.min_schedule_seconds = 1") + .await; + db.execute("ALTER SYSTEM SET pg_trickle.auto_backoff = off") + .await; + // Force trigger-based CDC so the holdback logic is exercised. + // WAL CDC (the default when wal_level=logical) is already immune + // to this race and would make these tests pass trivially. + db.execute("ALTER SYSTEM SET pg_trickle.cdc_mode = 'trigger'") + .await; + db.reload_config_and_wait().await; + db.wait_for_setting("pg_trickle.scheduler_interval_ms", "100") + .await; + db.wait_for_setting("pg_trickle.min_schedule_seconds", "1") + .await; + db.wait_for_setting("pg_trickle.cdc_mode", "trigger").await; + + let ok = db.wait_for_scheduler(Duration::from_secs(90)).await; + assert!(ok, "pg_trickle scheduler did not start within 90 s"); +} + +// ── Tests ────────────────────────────────────────────────────────────────── + +/// Verify that the holdback GUCs are registered with the correct defaults. +#[tokio::test] +async fn test_holdback_gucs_registered() { + let db = E2eDb::new_on_postgres_db().await.with_extension().await; + + let mode = db.show_setting("pg_trickle.frontier_holdback_mode").await; + assert_eq!( + mode, "xmin", + "frontier_holdback_mode default should be 'xmin'" + ); + + let warn_secs = db + .show_setting("pg_trickle.frontier_holdback_warn_seconds") + .await; + assert_eq!( + warn_secs, "60", + "frontier_holdback_warn_seconds default should be '60'" + ); +} + +/// Verify that a READ COMMITTED transaction straddling a scheduler tick +/// does NOT cause its row to be permanently skipped. +/// +/// Scenario: +/// 1. Begin transaction A on a second connection, insert a row (CDC records +/// the change at some LSN X). Do NOT commit. +/// 2. Wait for 3+ scheduler ticks (300+ ms with 100 ms interval). +/// With holdback enabled, the scheduler should NOT advance the frontier +/// past X because transaction A is still in-progress. +/// 3. Commit transaction A. +/// 4. Wait for the next tick to consume the row. +/// 5. Assert the stream table contains the inserted row. +#[tokio::test] +async fn test_holdback_read_committed_long_txn() { + let db = E2eDb::new_on_postgres_db().await.with_extension().await; + configure_fast_scheduler(&db).await; + + // Ensure holdback mode is enabled (the default). + db.execute("ALTER SYSTEM SET pg_trickle.frontier_holdback_mode = 'xmin'") + .await; + db.reload_config_and_wait().await; + db.wait_for_setting("pg_trickle.frontier_holdback_mode", "xmin") + .await; + + // Create source table + stream table. + db.execute("CREATE TABLE ltv_rc_src (id INT PRIMARY KEY, val TEXT NOT NULL)") + .await; + db.execute("INSERT INTO ltv_rc_src VALUES (1, 'initial')") + .await; + db.create_st( + "ltv_rc_st", + "SELECT id, val FROM ltv_rc_src", + "1s", + "DIFFERENTIAL", + ) + .await; + + // Wait for the ST to be initialized (populated with initial row). + let ok = db + .wait_for_condition( + "ltv_rc_st initial population", + "SELECT is_populated FROM pgtrickle.pgt_stream_tables \ + WHERE pgt_name = 'ltv_rc_st'", + Duration::from_secs(30), + Duration::from_millis(200), + ) + .await; + assert!(ok, "ST should be populated within 30 s"); + assert_eq!(db.count("public.ltv_rc_st").await, 1); + + // --- Phase 1: begin transaction, insert row (do NOT commit yet) --- + let pool = db.pool.clone(); + let mut long_txn_conn = pool.acquire().await.expect("acquire long-txn connection"); + sqlx::query("BEGIN") + .execute(&mut *long_txn_conn) + .await + .unwrap(); + sqlx::query("INSERT INTO ltv_rc_src VALUES (2, 'long_txn_row')") + .execute(&mut *long_txn_conn) + .await + .unwrap(); + + // --- Phase 2: wait for 3+ scheduler ticks while the txn is open --- + // With frontier_holdback_mode = 'xmin', the scheduler must NOT advance + // the frontier past the row's LSN. + tokio::time::sleep(Duration::from_millis(500)).await; + + // Stream table should still have only 1 row (txn not committed). + let count_before_commit: i64 = db.count("public.ltv_rc_st").await; + assert_eq!( + count_before_commit, 1, + "uncommitted row must not appear in stream table" + ); + + // --- Phase 3: commit the transaction --- + sqlx::query("COMMIT") + .execute(&mut *long_txn_conn) + .await + .unwrap(); + drop(long_txn_conn); + + // --- Phase 4: wait for the row to appear --- + let ok = db + .wait_for_condition( + "ltv_rc_st long-txn row", + "SELECT count(*) = 2 FROM public.ltv_rc_st", + Duration::from_secs(15), + Duration::from_millis(200), + ) + .await; + assert!( + ok, + "committed row from long-running READ COMMITTED transaction \ + must appear in stream table within 15 s (holdback must not lose it)" + ); + assert_eq!(db.count("public.ltv_rc_st").await, 2); +} + +/// Same as the READ COMMITTED test but uses REPEATABLE READ isolation, +/// which holds the snapshot open for longer and exercises the xmin-tracking +/// code path across multiple ticks. +#[tokio::test] +async fn test_holdback_repeatable_read_long_txn() { + let db = E2eDb::new_on_postgres_db().await.with_extension().await; + configure_fast_scheduler(&db).await; + + db.execute("ALTER SYSTEM SET pg_trickle.frontier_holdback_mode = 'xmin'") + .await; + db.reload_config_and_wait().await; + db.wait_for_setting("pg_trickle.frontier_holdback_mode", "xmin") + .await; + + db.execute("CREATE TABLE ltv_rr_src (id INT PRIMARY KEY, val TEXT NOT NULL)") + .await; + db.execute("INSERT INTO ltv_rr_src VALUES (1, 'initial')") + .await; + db.create_st( + "ltv_rr_st", + "SELECT id, val FROM ltv_rr_src", + "1s", + "DIFFERENTIAL", + ) + .await; + + let ok = db + .wait_for_condition( + "ltv_rr_st initial population", + "SELECT is_populated FROM pgtrickle.pgt_stream_tables \ + WHERE pgt_name = 'ltv_rr_st'", + Duration::from_secs(30), + Duration::from_millis(200), + ) + .await; + assert!(ok, "ST should be populated within 30 s"); + assert_eq!(db.count("public.ltv_rr_st").await, 1); + + // Open REPEATABLE READ transaction — holds xmin open for the full duration. + let pool = db.pool.clone(); + let mut long_txn_conn = pool.acquire().await.expect("acquire long-txn connection"); + sqlx::query("BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ") + .execute(&mut *long_txn_conn) + .await + .unwrap(); + // Touch the table to materialise the xmin. + sqlx::query("SELECT count(*) FROM ltv_rr_src") + .execute(&mut *long_txn_conn) + .await + .unwrap(); + // On the MAIN connection, insert the row that the long txn would cause to race. + db.execute("INSERT INTO ltv_rr_src VALUES (2, 'rr_race_row')") + .await; + + // Let 4+ ticks pass with the RR transaction still holding its snapshot. + tokio::time::sleep(Duration::from_millis(600)).await; + + // Commit the REPEATABLE READ transaction (no DML needed; just releasing the xmin). + sqlx::query("COMMIT") + .execute(&mut *long_txn_conn) + .await + .unwrap(); + drop(long_txn_conn); + + // Row inserted on the main connection should appear within a few ticks. + let ok = db + .wait_for_condition( + "ltv_rr_st race row", + "SELECT count(*) = 2 FROM public.ltv_rr_st", + Duration::from_secs(15), + Duration::from_millis(200), + ) + .await; + assert!( + ok, + "row inserted while REPEATABLE READ transaction held xmin \ + must appear in stream table after txn commits" + ); +} + +/// Verify that a 2PC PREPARE TRANSACTION spanning many ticks does not +/// cause data loss once the transaction is committed. +/// +/// Requires `max_prepared_transactions > 0` (set via ALTER SYSTEM). +#[tokio::test] +async fn test_holdback_prepared_transaction() { + let db = E2eDb::new_on_postgres_db().await.with_extension().await; + + // 2PC requires max_prepared_transactions > 0; restart-safe setting. + db.execute("ALTER SYSTEM SET max_prepared_transactions = 10") + .await; + // Need a server restart — use pg_reload_conf for GUC-level changes + // (max_prepared_transactions is a postmaster GUC, so we skip if not + // effective; the test will be a no-op if 2PC is unavailable). + db.reload_config_and_wait().await; + + let mpt: i32 = db + .query_scalar("SELECT current_setting('max_prepared_transactions')::int") + .await; + if mpt == 0 { + // Skip: max_prepared_transactions requires a server restart to take + // effect and cannot be changed online. This test is a best-effort + // check; full validation is done in the nightly soak suite. + eprintln!( + "test_holdback_prepared_transaction: skipping — \ + max_prepared_transactions = 0 (requires server restart to change)" + ); + return; + } + + configure_fast_scheduler(&db).await; + db.execute("ALTER SYSTEM SET pg_trickle.frontier_holdback_mode = 'xmin'") + .await; + db.reload_config_and_wait().await; + db.wait_for_setting("pg_trickle.frontier_holdback_mode", "xmin") + .await; + + db.execute("CREATE TABLE ltv_2pc_src (id INT PRIMARY KEY, val TEXT NOT NULL)") + .await; + db.execute("INSERT INTO ltv_2pc_src VALUES (1, 'initial')") + .await; + db.create_st( + "ltv_2pc_st", + "SELECT id, val FROM ltv_2pc_src", + "1s", + "DIFFERENTIAL", + ) + .await; + + let ok = db + .wait_for_condition( + "ltv_2pc_st initial population", + "SELECT is_populated FROM pgtrickle.pgt_stream_tables \ + WHERE pgt_name = 'ltv_2pc_st'", + Duration::from_secs(30), + Duration::from_millis(200), + ) + .await; + assert!(ok, "ST should be populated within 30 s"); + assert_eq!(db.count("public.ltv_2pc_st").await, 1); + + // Prepare a 2PC transaction — this holds xmin in pg_prepared_xacts. + let pool = db.pool.clone(); + let mut prep_conn = pool.acquire().await.expect("acquire 2pc connection"); + sqlx::query("BEGIN").execute(&mut *prep_conn).await.unwrap(); + sqlx::query("INSERT INTO ltv_2pc_src VALUES (2, '2pc_row')") + .execute(&mut *prep_conn) + .await + .unwrap(); + sqlx::query("PREPARE TRANSACTION 'ltv_holdback_test_2pc'") + .execute(&mut *prep_conn) + .await + .unwrap(); + drop(prep_conn); + + // Wait for several scheduler ticks while the prepared transaction holds xmin. + tokio::time::sleep(Duration::from_millis(600)).await; + + // Commit the prepared transaction. + db.execute("COMMIT PREPARED 'ltv_holdback_test_2pc'").await; + + // The row should appear after the next tick. + let ok = db + .wait_for_condition( + "ltv_2pc_st committed row", + "SELECT count(*) = 2 FROM public.ltv_2pc_st", + Duration::from_secs(15), + Duration::from_millis(200), + ) + .await; + assert!( + ok, + "row from COMMIT PREPARED transaction must appear in stream table \ + after the 2PC transaction commits" + ); +} + +/// Regression guard: with `frontier_holdback_mode = 'none'`, the bug +/// described in Issue #536 (silent data loss) can still occur. +/// +/// This test demonstrates the unsafe behaviour so the fix can be compared +/// against the baseline: +/// - With `mode = 'none'`, a long-running transaction that spans a tick +/// causes its change-buffer row to be silently skipped. +/// +/// **This test is expected to detect data loss and FAIL when holdback is +/// working.** It is kept as a regression guard to verify that `mode = 'none'` +/// truly reverts to the old unsafe behaviour (the unsafe escape hatch must +/// remain unsafe so that benchmark operators know what they're opting into). +/// +/// The test is marked `#[ignore]` so it does not run in normal CI. Run it +/// explicitly to confirm the unsafe mode still behaves as documented: +/// +/// ```bash +/// cargo test --test e2e_long_txn_visibility_tests test_holdback_none_mode_regression_guard -- --ignored --nocapture +/// ``` +#[tokio::test] +#[ignore] +async fn test_holdback_none_mode_regression_guard() { + let db = E2eDb::new_on_postgres_db().await.with_extension().await; + configure_fast_scheduler(&db).await; + + // Disable holdback — revert to the pre-fix unsafe behaviour. + db.execute("ALTER SYSTEM SET pg_trickle.frontier_holdback_mode = 'none'") + .await; + db.reload_config_and_wait().await; + db.wait_for_setting("pg_trickle.frontier_holdback_mode", "none") + .await; + + db.execute("CREATE TABLE ltv_none_src (id INT PRIMARY KEY, val TEXT NOT NULL)") + .await; + db.execute("INSERT INTO ltv_none_src VALUES (1, 'initial')") + .await; + db.create_st( + "ltv_none_st", + "SELECT id, val FROM ltv_none_src", + "1s", + "DIFFERENTIAL", + ) + .await; + + let ok = db + .wait_for_condition( + "ltv_none_st initial population", + "SELECT is_populated FROM pgtrickle.pgt_stream_tables \ + WHERE pgt_name = 'ltv_none_st'", + Duration::from_secs(30), + Duration::from_millis(200), + ) + .await; + assert!(ok, "ST should be populated within 30 s"); + assert_eq!(db.count("public.ltv_none_st").await, 1); + + // Begin transaction, insert row (CDC fires, records LSN X), hold open. + let pool = db.pool.clone(); + let mut long_txn_conn = pool.acquire().await.expect("acquire long-txn connection"); + sqlx::query("BEGIN") + .execute(&mut *long_txn_conn) + .await + .unwrap(); + sqlx::query("INSERT INTO ltv_none_src VALUES (2, 'lost_row')") + .execute(&mut *long_txn_conn) + .await + .unwrap(); + + // With mode = 'none', the scheduler fires multiple ticks here and + // advances the frontier PAST the row's LSN (the row is still uncommitted + // so MVCC hides it from the delta query). + tokio::time::sleep(Duration::from_millis(600)).await; + + // Commit the transaction. + sqlx::query("COMMIT") + .execute(&mut *long_txn_conn) + .await + .unwrap(); + drop(long_txn_conn); + + // Wait for a couple more ticks. + tokio::time::sleep(Duration::from_millis(400)).await; + + // With mode = 'none', the row may be permanently skipped (data loss). + // This assertion documents the UNSAFE behaviour — it may pass OR fail + // depending on exact timing, but it demonstrates the risk. + let final_count: i64 = db.count("public.ltv_none_st").await; + // If the row was lost (expected with mode=none), count = 1. + // If timing was lucky and no tick happened during the window, count = 2. + eprintln!( + "test_holdback_none_mode_regression_guard: final row count = {final_count} \ + (expected 1 with data loss, 2 if timing allowed capture)" + ); + // We don't hard-assert here because the race is timing-dependent. + // The intent is to show operators what 'none' mode implies. +}