Operational improvements: labels, checkpoints, structured errors, production dashboard#67
Merged
Operational improvements: labels, checkpoints, structured errors, production dashboard#67
Conversation
Pipeline metadata: labels + annotations (HashMap<String, String>).
Labels enable filtering via GET /pipelines?label=env:prod (AND logic).
Checkpoint inspection: GET /pipelines/{name}/checkpoints returns
per-sink checkpoint positions with age.
Per-table replication lag: deltaforge_source_table_lag_seconds{table}
gauge emitted per table in each batch alongside pipeline-level lag.
Correctness test matrix added to guarantees.md — maps every guarantee
to its test with status (exists/planned).
Metadata: labels (HashMap) and annotations (HashMap) with serde(default). GET /pipelines?label=env:prod - AND logic, key:value or key-only filter. Labels enable Grafana variables, operator selection, fleet management.
GET /pipelines/{name}: ops field with DLQ count, per-sink checkpoints.
GET /health: returns JSON with status + failed_pipelines list (was plain text).
GET /pipelines/{name}/checkpoints: per-sink positions and ages.
deltaforge_pipeline_info{pipeline,tenant} gauge for Grafana joins.
Rebuilt dashboard optimized for fleet operations: - Fleet overview: aggregate totals (events/s, data/s, max lag, DLQ, errors) - Top-N panels: top 10 laggiest, throughput, DLQ backlogs (readable at scale) - Tenant variable for filtering by tenant label - Per-table lag (top 10 tables) - All DLQ metrics: entries, events/s, saturation, overflow - All EOS metrics: checkpoint status, commit rate, txn commits/aborts - Table legends with sortable values (replaces unreadable list legends) - Collapsed sections for batching and infrastructure (reduce noise)
All error responses now return {"code": "PIPELINE_NOT_FOUND", "message": "..."}.
Shared ApiResult type across pipelines, schemas, and sensing modules.
Pipeline uptime: started_at tracked in runtime, exposed as ops.uptime_seconds. GET /log-level: returns current RUST_LOG value. POST /validate: dry-run config validation without creating pipeline. GET /health: returns JSON with status + failed_pipelines list.
Added: DLQ (peek/count/ack/purge), checkpoint inspection, label filtering, log-level, config validation. Updated: health (JSON response), get pipeline (ops field), error responses (structured codes).
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What
Operational improvements for production fleet management at scale.
Pipeline metadata:
labels: {env: prod, team: platform})GET /pipelines?label=env:prod- filter by label with AND logicdeltaforge_pipeline_info{pipeline, tenant}gauge for Grafana joinsEnriched APIs:
GET /pipelines/{name}includesopsfield: uptime, DLQ count, per-sink checkpointsGET /pipelines/{name}/checkpoints— per-sink positions with ageGET /healthreturns JSON withstatus+failed_pipelines(was plain text)GET /log-level- current RUST_LOG valuePOST /validate- dry-run config validation without creating pipeline{"code": "PIPELINE_NOT_FOUND", "message": "..."}Per-table lag:
deltaforge_source_table_lag_seconds{pipeline, table}gaugeGrafana dashboard rebuilt for 300+ pipelines:
Documentation:
Why
Operators managing hundreds of pipelines need: label-based filtering, one-call status with all operational data, structured errors for automation, top-N dashboards instead of 300 unreadable series, and config validation before deployment.
Testing
cargo test --workspace --lib- all tests passcargo clippy --all-targets --all-features -- -D warnings- cleanmdbook build docs/- buildsChecklist
cargo test)cargo fmt)