diff --git a/chaos/grafana/dashboards/deltaforge.json b/chaos/grafana/dashboards/deltaforge.json index 85bfb9b..1b46409 100644 --- a/chaos/grafana/dashboards/deltaforge.json +++ b/chaos/grafana/dashboards/deltaforge.json @@ -2,8 +2,8 @@ "title": "DeltaForge CDC", "uid": "deltaforge-cdc", "schemaVersion": 39, - "refresh": "5s", - "time": { "from": "now-30m", "to": "now" }, + "refresh": "10s", + "time": { "from": "now-1h", "to": "now" }, "editable": true, "templating": { @@ -21,11 +21,11 @@ "sort": 1 }, { - "name": "pipeline", - "label": "Pipeline", + "name": "tenant", + "label": "Tenant", "type": "query", "datasource": "Prometheus", - "query": "label_values(deltaforge_pipeline_status{instance=~\"$instance\"}, pipeline)", + "query": "label_values(deltaforge_pipeline_info{instance=~\"$instance\"}, tenant)", "multi": true, "includeAll": true, "current": { "text": "All", "value": "$__all" }, @@ -33,11 +33,11 @@ "sort": 1 }, { - "name": "source", - "label": "Source", + "name": "pipeline", + "label": "Pipeline", "type": "query", "datasource": "Prometheus", - "query": "label_values(deltaforge_source_events_total{instance=~\"$instance\", pipeline=~\"$pipeline\", source=~\"$source\"}, source)", + "query": "label_values(deltaforge_pipeline_status{instance=~\"$instance\"}, pipeline)", "multi": true, "includeAll": true, "current": { "text": "All", "value": "$__all" }, @@ -55,18 +55,6 @@ "current": { "text": "All", "value": "$__all" }, "refresh": 2, "sort": 1 - }, - { - "name": "processor", - "label": "Processor", - "type": "query", - "datasource": "Prometheus", - "query": "label_values(deltaforge_processor_latency_seconds_count{instance=~\"$instance\", pipeline=~\"$pipeline\", processor=~\"$processor\"}, processor)", - "multi": true, - "includeAll": true, - "current": { "text": "All", "value": "$__all" }, - "refresh": 2, - "sort": 1 } ] }, @@ -75,244 +63,223 @@ { "id": 100, "type": "row", "title": "Fleet Overview", "gridPos": { "x": 0, "y": 0, "w": 24, "h": 1 }, "collapsed": false, "panels": [] }, - { "id": 1, "title": "Pipeline States", - "type": "bargauge", - "gridPos": { "x": 0, "y": 1, "w": 5, "h": 4 }, + { "id": 1, "title": "Pipelines", + "type": "stat", + "gridPos": { "x": 0, "y": 1, "w": 3, "h": 4 }, "targets": [ - { "datasource": "Prometheus", "expr": "count(deltaforge_pipeline_status{instance=~\"$instance\", pipeline=~\"$pipeline\"} == 1) or vector(0)", "legendFormat": "RUNNING" }, - { "datasource": "Prometheus", "expr": "count(deltaforge_pipeline_status{instance=~\"$instance\", pipeline=~\"$pipeline\"} == 0.5) or vector(0)", "legendFormat": "PAUSED" }, - { "datasource": "Prometheus", "expr": "count(deltaforge_pipeline_status{instance=~\"$instance\", pipeline=~\"$pipeline\"} == 0) or vector(0)", "legendFormat": "STOPPED" }, - { "datasource": "Prometheus", "expr": "count(deltaforge_pipeline_status{instance=~\"$instance\", pipeline=~\"$pipeline\"} < 0) or vector(0)", "legendFormat": "FAILED" } + { "datasource": "Prometheus", "expr": "count(deltaforge_pipeline_status{instance=~\"$instance\", pipeline=~\"$pipeline\"} == 1)", "legendFormat": "Running" }, + { "datasource": "Prometheus", "expr": "count(deltaforge_pipeline_status{instance=~\"$instance\", pipeline=~\"$pipeline\"} < 1) or vector(0)", "legendFormat": "Unhealthy" } ], - "options": { "orientation": "horizontal", "reduceOptions": { "calcs": ["lastNotNull"] }, "displayMode": "basic" }, - "fieldConfig": { - "defaults": { "unit": "short", "min": 0, "thresholds": { "mode": "absolute", "steps": [{ "color": "green", "value": 0 }] } }, - "overrides": [ - { "matcher": { "id": "byName", "options": "RUNNING" }, "properties": [{ "id": "color", "value": { "fixedColor": "green", "mode": "fixed" } }] }, - { "matcher": { "id": "byName", "options": "PAUSED" }, "properties": [{ "id": "color", "value": { "fixedColor": "orange", "mode": "fixed" } }] }, - { "matcher": { "id": "byName", "options": "STOPPED" }, "properties": [{ "id": "color", "value": { "fixedColor": "gray", "mode": "fixed" } }] }, - { "matcher": { "id": "byName", "options": "FAILED" }, "properties": [{ "id": "color", "value": { "fixedColor": "red", "mode": "fixed" } }] } - ] - } + "fieldConfig": { "defaults": { "unit": "short" } }, + "options": { "reduceOptions": { "calcs": ["lastNotNull"] }, "textMode": "value_and_name", "colorMode": "background" } }, - { "id": 2, "title": "Events Delivered", + { "id": 2, "title": "Total Events/s", "type": "stat", - "gridPos": { "x": 5, "y": 1, "w": 4, "h": 4 }, - "targets": [{ "datasource": "Prometheus", "expr": "sum by(pipeline)(deltaforge_sink_events_total{instance=~\"$instance\", pipeline=~\"$pipeline\", sink=~\"$sink\"})", "legendFormat": "{{pipeline}}" }], - "fieldConfig": { "defaults": { "unit": "short" } }, + "gridPos": { "x": 3, "y": 1, "w": 3, "h": 4 }, + "targets": [{ "datasource": "Prometheus", "expr": "sum(rate(deltaforge_sink_events_total{instance=~\"$instance\", pipeline=~\"$pipeline\", sink=~\"$sink\"}[30s]))", "legendFormat": "events/s" }], + "fieldConfig": { "defaults": { "unit": "ops" } }, "options": { "reduceOptions": { "calcs": ["lastNotNull"] }, "textMode": "value_and_name" } }, - { "id": 32, "title": "Data Moved", + { "id": 3, "title": "Total Data/s", "type": "stat", - "gridPos": { "x": 9, "y": 1, "w": 4, "h": 4 }, - "targets": [{ "datasource": "Prometheus", "expr": "sum by(pipeline)(deltaforge_bytes_total{instance=~\"$instance\", pipeline=~\"$pipeline\"})", "legendFormat": "{{pipeline}}" }], - "fieldConfig": { "defaults": { "unit": "bytes" } }, + "gridPos": { "x": 6, "y": 1, "w": 3, "h": 4 }, + "targets": [{ "datasource": "Prometheus", "expr": "sum(rate(deltaforge_bytes_total{instance=~\"$instance\", pipeline=~\"$pipeline\"}[30s]))", "legendFormat": "bytes/s" }], + "fieldConfig": { "defaults": { "unit": "Bps" } }, "options": { "reduceOptions": { "calcs": ["lastNotNull"] }, "textMode": "value_and_name" } }, - { "id": 3, "title": "Checkpoint Age", + { "id": 4, "title": "Max Lag", "type": "stat", - "gridPos": { "x": 13, "y": 1, "w": 4, "h": 4 }, - "targets": [{ "datasource": "Prometheus", "expr": "(time() - deltaforge_last_checkpoint_ts{instance=~\"$instance\", pipeline=~\"$pipeline\"}) and on(pipeline, instance) (deltaforge_pipeline_status{instance=~\"$instance\", pipeline=~\"$pipeline\"} == 1)", "legendFormat": "{{pipeline}}" }], + "gridPos": { "x": 9, "y": 1, "w": 3, "h": 4 }, + "targets": [{ "datasource": "Prometheus", "expr": "max(deltaforge_source_lag_seconds{instance=~\"$instance\", pipeline=~\"$pipeline\"})", "legendFormat": "worst lag" }], "fieldConfig": { - "defaults": { - "unit": "s", - "thresholds": { "mode": "absolute", "steps": [{ "color": "green", "value": 0 }, { "color": "yellow", "value": 30 }, { "color": "red", "value": 120 }] } - } + "defaults": { "unit": "s", "thresholds": { "mode": "absolute", "steps": [{ "color": "green", "value": 0 }, { "color": "yellow", "value": 5 }, { "color": "red", "value": 30 }] } } }, - "options": { "reduceOptions": { "calcs": ["lastNotNull"] }, "textMode": "value_and_name", "colorMode": "background" } + "options": { "reduceOptions": { "calcs": ["lastNotNull"] }, "colorMode": "background" } }, - { "id": 6, "title": "Replication Lag", + { "id": 5, "title": "DLQ Total", "type": "stat", - "gridPos": { "x": 17, "y": 1, "w": 3, "h": 4 }, - "targets": [{ "datasource": "Prometheus", "expr": "deltaforge_source_lag_seconds{instance=~\"$instance\", pipeline=~\"$pipeline\", source=~\"$source\"} and on(pipeline, instance) (deltaforge_pipeline_status{instance=~\"$instance\", pipeline=~\"$pipeline\"} == 1)", "legendFormat": "{{pipeline}}" }], + "gridPos": { "x": 12, "y": 1, "w": 3, "h": 4 }, + "targets": [{ "datasource": "Prometheus", "expr": "sum(deltaforge_dlq_entries{instance=~\"$instance\", pipeline=~\"$pipeline\"}) or vector(0)", "legendFormat": "DLQ entries" }], "fieldConfig": { - "defaults": { - "unit": "s", "decimals": 1, - "thresholds": { "mode": "absolute", "steps": [{ "color": "green", "value": 0 }, { "color": "yellow", "value": 5 }, { "color": "red", "value": 30 }] } - } + "defaults": { "unit": "short", "thresholds": { "mode": "absolute", "steps": [{ "color": "green", "value": 0 }, { "color": "yellow", "value": 10 }, { "color": "red", "value": 100 }] } } }, - "options": { "reduceOptions": { "calcs": ["lastNotNull"] }, "textMode": "value_and_name", "colorMode": "background" } + "options": { "reduceOptions": { "calcs": ["lastNotNull"] }, "colorMode": "background" } }, - { "id": 4, "title": "Reconnects", + { "id": 6, "title": "Reconnects", "type": "stat", - "gridPos": { "x": 20, "y": 1, "w": 4, "h": 4 }, - "targets": [ - { "datasource": "Prometheus", "expr": "sum by(pipeline)(deltaforge_source_reconnects_total{instance=~\"$instance\", pipeline=~\"$pipeline\", source=~\"$source\"})", "legendFormat": "src {{pipeline}}" }, - { "datasource": "Prometheus", "expr": "sum by(pipeline)(deltaforge_sink_reconnects_total{instance=~\"$instance\", pipeline=~\"$pipeline\"})", "legendFormat": "sink {{pipeline}}" } - ], - "fieldConfig": { "defaults": { "unit": "short" } }, - "options": { "reduceOptions": { "calcs": ["lastNotNull"] }, "textMode": "value_and_name" } + "gridPos": { "x": 15, "y": 1, "w": 3, "h": 4 }, + "targets": [{ "datasource": "Prometheus", "expr": "sum(rate(deltaforge_source_reconnects_total{instance=~\"$instance\", pipeline=~\"$pipeline\"}[5m]))", "legendFormat": "reconnects/5m" }], + "fieldConfig": { "defaults": { "unit": "ops" } }, + "options": { "reduceOptions": { "calcs": ["lastNotNull"] } } }, - { "id": 101, "type": "row", "title": "Throughput", "gridPos": { "x": 0, "y": 5, "w": 24, "h": 1 }, "collapsed": false, "panels": [] }, + { "id": 7, "title": "Txn Aborts", + "type": "stat", + "gridPos": { "x": 18, "y": 1, "w": 3, "h": 4 }, + "targets": [{ "datasource": "Prometheus", "expr": "sum(rate(deltaforge_sink_txn_aborts_total{instance=~\"$instance\", pipeline=~\"$pipeline\"}[5m])) or vector(0)", "legendFormat": "aborts/5m" }], + "fieldConfig": { + "defaults": { "unit": "ops", "thresholds": { "mode": "absolute", "steps": [{ "color": "green", "value": 0 }, { "color": "red", "value": 0.01 }] } } + }, + "options": { "reduceOptions": { "calcs": ["lastNotNull"] }, "colorMode": "background" } + }, - { "id": 5, "title": "Event Throughput (events/s)", - "type": "timeseries", - "gridPos": { "x": 0, "y": 6, "w": 8, "h": 8 }, - "targets": [ - { "datasource": "Prometheus", "expr": "sum by(pipeline)(rate(deltaforge_source_events_total{instance=~\"$instance\", pipeline=~\"$pipeline\", source=~\"$source\"}[30s]))", "legendFormat": "source {{pipeline}}" }, - { "datasource": "Prometheus", "expr": "sum by(pipeline)(rate(deltaforge_sink_events_total{instance=~\"$instance\", pipeline=~\"$pipeline\", sink=~\"$sink\"}[30s]))", "legendFormat": "sink {{pipeline}}" } - ], - "fieldConfig": { "defaults": { "unit": "ops", "custom": { "lineWidth": 2, "fillOpacity": 15 } } }, - "options": { "tooltip": { "mode": "multi" }, "legend": { "displayMode": "list", "placement": "bottom" } } + { "id": 8, "title": "Sink Errors", + "type": "stat", + "gridPos": { "x": 21, "y": 1, "w": 3, "h": 4 }, + "targets": [{ "datasource": "Prometheus", "expr": "sum(rate(deltaforge_sink_errors_total{instance=~\"$instance\", pipeline=~\"$pipeline\"}[5m]))", "legendFormat": "errors/5m" }], + "fieldConfig": { + "defaults": { "unit": "ops", "thresholds": { "mode": "absolute", "steps": [{ "color": "green", "value": 0 }, { "color": "red", "value": 0.01 }] } } + }, + "options": { "reduceOptions": { "calcs": ["lastNotNull"] }, "colorMode": "background" } }, - { "id": 23, "title": "Source-Sink Delta (events/s)", + { "id": 101, "type": "row", "title": "Top Pipelines (worst first)", "gridPos": { "x": 0, "y": 5, "w": 24, "h": 1 }, "collapsed": false, "panels": [] }, + + { "id": 10, "title": "Top 10 Laggiest Pipelines", "type": "timeseries", - "gridPos": { "x": 8, "y": 6, "w": 8, "h": 8 }, - "targets": [ - { "datasource": "Prometheus", "expr": "sum by(pipeline)(rate(deltaforge_source_events_total{instance=~\"$instance\", pipeline=~\"$pipeline\", source=~\"$source\"}[30s]))", "legendFormat": "source {{pipeline}}" }, - { "datasource": "Prometheus", "expr": "-sum by(pipeline)(rate(deltaforge_sink_events_total{instance=~\"$instance\", pipeline=~\"$pipeline\", sink=~\"$sink\"}[30s]))", "legendFormat": "sink {{pipeline}}", "hide": true }, - { "datasource": "Prometheus", "expr": "sum by(pipeline)(rate(deltaforge_source_events_total{instance=~\"$instance\", pipeline=~\"$pipeline\", source=~\"$source\"}[30s])) - sum by(pipeline)(rate(deltaforge_sink_events_total{instance=~\"$instance\", pipeline=~\"$pipeline\", sink=~\"$sink\"}[30s]))", "legendFormat": "delta {{pipeline}}" } - ], + "gridPos": { "x": 0, "y": 6, "w": 8, "h": 8 }, + "targets": [{ "datasource": "Prometheus", "expr": "topk(10, deltaforge_source_lag_seconds{instance=~\"$instance\", pipeline=~\"$pipeline\"})", "legendFormat": "{{pipeline}}" }], "fieldConfig": { - "defaults": { - "unit": "ops", - "custom": { "lineWidth": 2, "fillOpacity": 15, "gradientMode": "scheme" }, - "color": { "mode": "thresholds" }, - "thresholds": { "mode": "absolute", "steps": [ - { "color": "#73BF69", "value": null }, - { "color": "#FF9830", "value": 0 }, - { "color": "#F2495C", "value": 2000 } - ]} + "defaults": { "unit": "s", "custom": { "lineWidth": 2 }, + "thresholds": { "mode": "absolute", "steps": [{ "color": "green", "value": 0 }, { "color": "yellow", "value": 5 }, { "color": "red", "value": 30 }] } } }, - "options": { "tooltip": { "mode": "multi" }, "legend": { "displayMode": "list", "placement": "bottom" } } + "options": { "tooltip": { "mode": "multi" }, "legend": { "displayMode": "table", "placement": "right", "calcs": ["lastNotNull"] } } + }, + + { "id": 11, "title": "Top 10 by Throughput (events/s)", + "type": "timeseries", + "gridPos": { "x": 8, "y": 6, "w": 8, "h": 8 }, + "targets": [{ "datasource": "Prometheus", "expr": "topk(10, sum by(pipeline)(rate(deltaforge_sink_events_total{instance=~\"$instance\", pipeline=~\"$pipeline\", sink=~\"$sink\"}[30s])))", "legendFormat": "{{pipeline}}" }], + "fieldConfig": { "defaults": { "unit": "ops", "custom": { "lineWidth": 2 } } }, + "options": { "tooltip": { "mode": "multi" }, "legend": { "displayMode": "table", "placement": "right", "calcs": ["lastNotNull"] } } }, - { "id": 31, "title": "Data Throughput (bytes/s)", + { "id": 12, "title": "Top 10 DLQ Backlogs", "type": "timeseries", "gridPos": { "x": 16, "y": 6, "w": 8, "h": 8 }, - "targets": [ - { "datasource": "Prometheus", "expr": "sum by(pipeline)(rate(deltaforge_source_bytes_total{instance=~\"$instance\", pipeline=~\"$pipeline\", source=~\"$source\"}[30s]))", "legendFormat": "source {{pipeline}}" }, - { "datasource": "Prometheus", "expr": "sum by(pipeline)(rate(deltaforge_bytes_total{instance=~\"$instance\", pipeline=~\"$pipeline\"}[30s]))", "legendFormat": "pipeline {{pipeline}}" } - ], - "fieldConfig": { "defaults": { "unit": "Bps", "custom": { "lineWidth": 2, "fillOpacity": 15 } } }, - "options": { "tooltip": { "mode": "multi" }, "legend": { "displayMode": "list", "placement": "bottom" } } + "targets": [{ "datasource": "Prometheus", "expr": "topk(10, deltaforge_dlq_entries{instance=~\"$instance\", pipeline=~\"$pipeline\"})", "legendFormat": "{{pipeline}}" }], + "fieldConfig": { + "defaults": { "unit": "short", "custom": { "lineWidth": 2 }, + "thresholds": { "mode": "absolute", "steps": [{ "color": "green", "value": 0 }, { "color": "yellow", "value": 100 }, { "color": "red", "value": 1000 }] } + } + }, + "options": { "tooltip": { "mode": "multi" }, "legend": { "displayMode": "table", "placement": "right", "calcs": ["lastNotNull"] } } }, - { "id": 102, "type": "row", "title": "Latency", "gridPos": { "x": 0, "y": 14, "w": 24, "h": 1 }, "collapsed": false, "panels": [] }, + { "id": 102, "type": "row", "title": "Throughput", "gridPos": { "x": 0, "y": 14, "w": 24, "h": 1 }, "collapsed": false, "panels": [] }, - { "id": 9, "title": "E2E Latency p50 / p95", + { "id": 20, "title": "Aggregate Event Throughput (events/s)", "type": "timeseries", "gridPos": { "x": 0, "y": 15, "w": 8, "h": 8 }, "targets": [ - { "datasource": "Prometheus", "expr": "histogram_quantile(0.50, rate(deltaforge_e2e_latency_seconds_bucket{instance=~\"$instance\", pipeline=~\"$pipeline\"}[60s]))", "legendFormat": "p50 {{pipeline}}" }, - { "datasource": "Prometheus", "expr": "histogram_quantile(0.95, rate(deltaforge_e2e_latency_seconds_bucket{instance=~\"$instance\", pipeline=~\"$pipeline\"}[60s]))", "legendFormat": "p95 {{pipeline}}" } + { "datasource": "Prometheus", "expr": "sum(rate(deltaforge_source_events_total{instance=~\"$instance\", pipeline=~\"$pipeline\"}[30s]))", "legendFormat": "source total" }, + { "datasource": "Prometheus", "expr": "sum(rate(deltaforge_sink_events_total{instance=~\"$instance\", pipeline=~\"$pipeline\", sink=~\"$sink\"}[30s]))", "legendFormat": "sink total" } ], - "fieldConfig": { "defaults": { "unit": "s", "custom": { "lineWidth": 2 } } }, + "fieldConfig": { "defaults": { "unit": "ops", "custom": { "lineWidth": 2, "fillOpacity": 15 } } }, "options": { "tooltip": { "mode": "multi" }, "legend": { "displayMode": "list", "placement": "bottom" } } }, - { "id": 10, "title": "Sink Latency (avg ms)", + { "id": 21, "title": "Per-Pipeline Throughput (events/s)", "type": "timeseries", "gridPos": { "x": 8, "y": 15, "w": 8, "h": 8 }, - "targets": [{ "datasource": "Prometheus", "expr": "rate(deltaforge_sink_latency_seconds_sum{instance=~\"$instance\", pipeline=~\"$pipeline\", sink=~\"$sink\"}[30s]) / rate(deltaforge_sink_latency_seconds_count{instance=~\"$instance\", pipeline=~\"$pipeline\", sink=~\"$sink\"}[30s]) * 1000", "legendFormat": "{{pipeline}} / {{sink}}" }], - "fieldConfig": { "defaults": { "unit": "ms", "custom": { "lineWidth": 2 } } }, - "options": { "tooltip": { "mode": "multi" }, "legend": { "displayMode": "list", "placement": "bottom" } } + "targets": [{ "datasource": "Prometheus", "expr": "sum by(pipeline)(rate(deltaforge_sink_events_total{instance=~\"$instance\", pipeline=~\"$pipeline\", sink=~\"$sink\"}[30s]))", "legendFormat": "{{pipeline}}" }], + "fieldConfig": { "defaults": { "unit": "ops", "custom": { "lineWidth": 2, "fillOpacity": 10 } } }, + "options": { "tooltip": { "mode": "multi" }, "legend": { "displayMode": "table", "placement": "right", "calcs": ["lastNotNull"] } } }, - { "id": 15, "title": "Stage Latency (avg ms)", + { "id": 22, "title": "Data Throughput (bytes/s)", "type": "timeseries", "gridPos": { "x": 16, "y": 15, "w": 8, "h": 8 }, - "targets": [{ "datasource": "Prometheus", "expr": "sum by(pipeline, stage)(rate(deltaforge_stage_latency_seconds_sum{instance=~\"$instance\", pipeline=~\"$pipeline\"}[30s])) / sum by(pipeline, stage)(rate(deltaforge_stage_latency_seconds_count{instance=~\"$instance\", pipeline=~\"$pipeline\"}[30s])) * 1000", "legendFormat": "{{pipeline}} {{stage}}" }], - "fieldConfig": { "defaults": { "unit": "ms", "custom": { "lineWidth": 2 } } }, - "options": { "tooltip": { "mode": "multi" }, "legend": { "displayMode": "list", "placement": "bottom" } } + "targets": [ + { "datasource": "Prometheus", "expr": "sum(rate(deltaforge_bytes_total{instance=~\"$instance\", pipeline=~\"$pipeline\"}[30s]))", "legendFormat": "total" }, + { "datasource": "Prometheus", "expr": "sum by(pipeline)(rate(deltaforge_bytes_total{instance=~\"$instance\", pipeline=~\"$pipeline\"}[30s]))", "legendFormat": "{{pipeline}}" } + ], + "fieldConfig": { "defaults": { "unit": "Bps", "custom": { "lineWidth": 2, "fillOpacity": 15 } } }, + "options": { "tooltip": { "mode": "multi" }, "legend": { "displayMode": "table", "placement": "right", "calcs": ["lastNotNull"] } } }, - { "id": 108, "type": "row", "title": "Checkpoints & Exactly-Once", "gridPos": { "x": 0, "y": 23, "w": 24, "h": 1 }, "collapsed": false, "panels": [] }, + { "id": 103, "type": "row", "title": "Latency & Lag", "gridPos": { "x": 0, "y": 23, "w": 24, "h": 1 }, "collapsed": false, "panels": [] }, - { "id": 50, "title": "Per-Sink Checkpoint Status", - "description": "1 = caught up, 0 = behind/failed. Breaks down by sink within each pipeline.", + { "id": 30, "title": "E2E Latency p50 / p95", "type": "timeseries", - "gridPos": { "x": 0, "y": 24, "w": 6, "h": 8 }, - "targets": [{ "datasource": "Prometheus", "expr": "deltaforge_sink_checkpoint_status{instance=~\"$instance\", pipeline=~\"$pipeline\", sink=~\"$sink\"}", "legendFormat": "{{pipeline}} / {{sink}}" }], - "fieldConfig": { - "defaults": { "unit": "short", "min": 0, "max": 1, "custom": { "lineWidth": 2 }, - "thresholds": { "mode": "absolute", "steps": [{ "color": "red", "value": 0 }, { "color": "green", "value": 1 }] } - } - }, + "gridPos": { "x": 0, "y": 24, "w": 8, "h": 8 }, + "targets": [ + { "datasource": "Prometheus", "expr": "histogram_quantile(0.50, rate(deltaforge_e2e_latency_seconds_bucket{instance=~\"$instance\", pipeline=~\"$pipeline\"}[60s]))", "legendFormat": "p50 {{pipeline}}" }, + { "datasource": "Prometheus", "expr": "histogram_quantile(0.95, rate(deltaforge_e2e_latency_seconds_bucket{instance=~\"$instance\", pipeline=~\"$pipeline\"}[60s]))", "legendFormat": "p95 {{pipeline}}" } + ], + "fieldConfig": { "defaults": { "unit": "s", "custom": { "lineWidth": 2 } } }, "options": { "tooltip": { "mode": "multi" }, "legend": { "displayMode": "list", "placement": "bottom" } } }, - { "id": 51, "title": "Checkpoint Age (seconds)", + { "id": 31, "title": "Source Lag (seconds)", "type": "timeseries", - "gridPos": { "x": 6, "y": 24, "w": 6, "h": 8 }, - "targets": [ - { "datasource": "Prometheus", "expr": "(time() - deltaforge_last_checkpoint_ts{instance=~\"$instance\", pipeline=~\"$pipeline\"}) and on(pipeline, instance) (deltaforge_pipeline_status{instance=~\"$instance\", pipeline=~\"$pipeline\"} == 1)", "legendFormat": "pipeline {{pipeline}}" }, - { "datasource": "Prometheus", "expr": "(time() - deltaforge_sink_last_checkpoint_ts{instance=~\"$instance\", pipeline=~\"$pipeline\", sink=~\"$sink\"}) and on(pipeline, instance) (deltaforge_pipeline_status{instance=~\"$instance\", pipeline=~\"$pipeline\"} == 1)", "legendFormat": "{{pipeline}} / {{sink}}" } - ], + "gridPos": { "x": 8, "y": 24, "w": 8, "h": 8 }, + "targets": [{ "datasource": "Prometheus", "expr": "deltaforge_source_lag_seconds{instance=~\"$instance\", pipeline=~\"$pipeline\"}", "legendFormat": "{{pipeline}}" }], "fieldConfig": { "defaults": { "unit": "s", "custom": { "lineWidth": 2 }, - "thresholds": { "mode": "absolute", "steps": [{ "color": "green", "value": 0 }, { "color": "yellow", "value": 30 }, { "color": "red", "value": 120 }] } + "thresholds": { "mode": "absolute", "steps": [{ "color": "green", "value": 0 }, { "color": "yellow", "value": 5 }, { "color": "red", "value": 30 }] } } }, - "options": { "tooltip": { "mode": "multi" }, "legend": { "displayMode": "list", "placement": "bottom" } } + "options": { "tooltip": { "mode": "multi" }, "legend": { "displayMode": "table", "placement": "right", "calcs": ["lastNotNull"] } } }, - { "id": 52, "title": "Txn Commits (commits/s)", + { "id": 32, "title": "Per-Table Lag (Top 10)", "type": "timeseries", - "gridPos": { "x": 12, "y": 24, "w": 6, "h": 8 }, - "targets": [{ "datasource": "Prometheus", "expr": "rate(deltaforge_sink_txn_commits_total{instance=~\"$instance\", pipeline=~\"$pipeline\", sink=~\"$sink\"}[30s])", "legendFormat": "{{pipeline}} / {{sink}}" }], - "fieldConfig": { "defaults": { "unit": "ops", "custom": { "lineWidth": 2 }, "color": { "fixedColor": "green", "mode": "fixed" } } }, - "options": { "tooltip": { "mode": "multi" }, "legend": { "displayMode": "list", "placement": "bottom" } } + "gridPos": { "x": 16, "y": 24, "w": 8, "h": 8 }, + "targets": [{ "datasource": "Prometheus", "expr": "topk(10, deltaforge_source_table_lag_seconds{instance=~\"$instance\", pipeline=~\"$pipeline\"})", "legendFormat": "{{pipeline}} / {{table}}" }], + "fieldConfig": { + "defaults": { "unit": "s", "custom": { "lineWidth": 2 }, + "thresholds": { "mode": "absolute", "steps": [{ "color": "green", "value": 0 }, { "color": "yellow", "value": 5 }, { "color": "red", "value": 30 }] } + } + }, + "options": { "tooltip": { "mode": "multi" }, "legend": { "displayMode": "table", "placement": "right", "calcs": ["lastNotNull"] } } }, - { "id": 53, "title": "Txn Aborts (aborts/s)", - "description": "Should be near zero. Spikes indicate Kafka transaction failures or producer fencing.", + { "id": 104, "type": "row", "title": "Checkpoints & Exactly-Once", "gridPos": { "x": 0, "y": 32, "w": 24, "h": 1 }, "collapsed": false, "panels": [] }, + + { "id": 40, "title": "Per-Sink Checkpoint Status", "type": "timeseries", - "gridPos": { "x": 18, "y": 24, "w": 6, "h": 8 }, - "targets": [{ "datasource": "Prometheus", "expr": "rate(deltaforge_sink_txn_aborts_total{instance=~\"$instance\", pipeline=~\"$pipeline\", sink=~\"$sink\"}[30s])", "legendFormat": "{{pipeline}} / {{sink}}" }], - "fieldConfig": { "defaults": { "unit": "ops", "custom": { "lineWidth": 2 }, "color": { "fixedColor": "red", "mode": "fixed" } } }, + "gridPos": { "x": 0, "y": 33, "w": 6, "h": 8 }, + "targets": [{ "datasource": "Prometheus", "expr": "deltaforge_sink_checkpoint_status{instance=~\"$instance\", pipeline=~\"$pipeline\", sink=~\"$sink\"}", "legendFormat": "{{pipeline}} / {{sink}}" }], + "fieldConfig": { "defaults": { "unit": "short", "min": 0, "max": 1, "custom": { "lineWidth": 2 } } }, "options": { "tooltip": { "mode": "multi" }, "legend": { "displayMode": "list", "placement": "bottom" } } }, - { "id": 12, "title": "Checkpoint Rate (commits/s)", + { "id": 41, "title": "Checkpoint Rate (commits/s)", "type": "timeseries", - "gridPos": { "x": 0, "y": 32, "w": 8, "h": 8 }, + "gridPos": { "x": 6, "y": 33, "w": 6, "h": 8 }, "targets": [{ "datasource": "Prometheus", "expr": "rate(deltaforge_checkpoints_total{instance=~\"$instance\", pipeline=~\"$pipeline\"}[30s])", "legendFormat": "{{pipeline}}" }], "fieldConfig": { "defaults": { "unit": "ops", "custom": { "lineWidth": 2 } } }, "options": { "tooltip": { "mode": "multi" }, "legend": { "displayMode": "list", "placement": "bottom" } } }, - { "id": 40, "title": "Replication Lag (seconds)", + { "id": 42, "title": "Txn Commits (commits/s)", "type": "timeseries", - "gridPos": { "x": 8, "y": 32, "w": 8, "h": 8 }, - "targets": [{ "datasource": "Prometheus", "expr": "deltaforge_source_lag_seconds{instance=~\"$instance\", pipeline=~\"$pipeline\", source=~\"$source\"}", "legendFormat": "{{pipeline}}" }], - "fieldConfig": { - "defaults": { "unit": "s", "custom": { "lineWidth": 2 }, - "thresholds": { "mode": "absolute", "steps": [{ "color": "green", "value": 0 }, { "color": "yellow", "value": 5 }, { "color": "red", "value": 30 }] } - } - }, + "gridPos": { "x": 12, "y": 33, "w": 6, "h": 8 }, + "targets": [{ "datasource": "Prometheus", "expr": "rate(deltaforge_sink_txn_commits_total{instance=~\"$instance\", pipeline=~\"$pipeline\", sink=~\"$sink\"}[30s])", "legendFormat": "{{pipeline}} / {{sink}}" }], + "fieldConfig": { "defaults": { "unit": "ops", "custom": { "lineWidth": 2 }, "color": { "fixedColor": "green", "mode": "fixed" } } }, "options": { "tooltip": { "mode": "multi" }, "legend": { "displayMode": "list", "placement": "bottom" } } }, - { "id": 30, "title": "Pipeline State Timeline", + { "id": 43, "title": "Txn Aborts (aborts/s)", "type": "timeseries", - "gridPos": { "x": 16, "y": 32, "w": 8, "h": 8 }, - "targets": [{ "datasource": "Prometheus", "expr": "deltaforge_pipeline_status{instance=~\"$instance\", pipeline=~\"$pipeline\"}", "legendFormat": "{{pipeline}}" }], - "fieldConfig": { - "defaults": { "unit": "short", "custom": { "lineWidth": 2, "fillOpacity": 10 }, - "thresholds": { "mode": "absolute", "steps": [ - { "color": "red", "value": -1 }, { "color": "gray", "value": 0 }, - { "color": "orange", "value": 0.5 }, { "color": "green", "value": 1 } - ]} - } - }, + "gridPos": { "x": 18, "y": 33, "w": 6, "h": 8 }, + "targets": [{ "datasource": "Prometheus", "expr": "rate(deltaforge_sink_txn_aborts_total{instance=~\"$instance\", pipeline=~\"$pipeline\", sink=~\"$sink\"}[30s])", "legendFormat": "{{pipeline}} / {{sink}}" }], + "fieldConfig": { "defaults": { "unit": "ops", "custom": { "lineWidth": 2 }, "color": { "fixedColor": "red", "mode": "fixed" } } }, "options": { "tooltip": { "mode": "multi" }, "legend": { "displayMode": "list", "placement": "bottom" } } }, - { "id": 110, "type": "row", "title": "Dead Letter Queue", "gridPos": { "x": 0, "y": 40, "w": 24, "h": 1 }, "collapsed": false, "panels": [] }, + { "id": 105, "type": "row", "title": "Dead Letter Queue", "gridPos": { "x": 0, "y": 41, "w": 24, "h": 1 }, "collapsed": false, "panels": [] }, - { "id": 70, "title": "DLQ Entries", - "description": "Current unacked entries in the DLQ per pipeline.", + { "id": 50, "title": "DLQ Entries", "type": "timeseries", - "gridPos": { "x": 0, "y": 41, "w": 6, "h": 8 }, + "gridPos": { "x": 0, "y": 42, "w": 6, "h": 8 }, "targets": [{ "datasource": "Prometheus", "expr": "deltaforge_dlq_entries{instance=~\"$instance\", pipeline=~\"$pipeline\"}", "legendFormat": "{{pipeline}}" }], "fieldConfig": { "defaults": { "unit": "short", "custom": { "lineWidth": 2 }, @@ -322,19 +289,17 @@ "options": { "tooltip": { "mode": "multi" }, "legend": { "displayMode": "list", "placement": "bottom" } } }, - { "id": 71, "title": "DLQ Events/s", - "description": "Rate of events being routed to the DLQ.", + { "id": 51, "title": "DLQ Events/s", "type": "timeseries", - "gridPos": { "x": 6, "y": 41, "w": 6, "h": 8 }, + "gridPos": { "x": 6, "y": 42, "w": 6, "h": 8 }, "targets": [{ "datasource": "Prometheus", "expr": "rate(deltaforge_dlq_events_total{instance=~\"$instance\", pipeline=~\"$pipeline\"}[30s])", "legendFormat": "{{pipeline}} / {{sink}} / {{error_kind}}" }], "fieldConfig": { "defaults": { "unit": "ops", "custom": { "lineWidth": 2 }, "color": { "fixedColor": "orange", "mode": "fixed" } } }, "options": { "tooltip": { "mode": "multi" }, "legend": { "displayMode": "list", "placement": "bottom" } } }, - { "id": 72, "title": "DLQ Saturation", - "description": "DLQ fullness ratio (current/max). >80% triggers warning, >95% error.", + { "id": 52, "title": "DLQ Saturation", "type": "timeseries", - "gridPos": { "x": 12, "y": 41, "w": 6, "h": 8 }, + "gridPos": { "x": 12, "y": 42, "w": 6, "h": 8 }, "targets": [{ "datasource": "Prometheus", "expr": "deltaforge_dlq_saturation_ratio{instance=~\"$instance\", pipeline=~\"$pipeline\"}", "legendFormat": "{{pipeline}}" }], "fieldConfig": { "defaults": { "unit": "percentunit", "min": 0, "max": 1, "custom": { "lineWidth": 2 }, @@ -344,10 +309,9 @@ "options": { "tooltip": { "mode": "multi" }, "legend": { "displayMode": "list", "placement": "bottom" } } }, - { "id": 73, "title": "DLQ Overflow (evicted + rejected)", - "description": "Events lost due to DLQ overflow. Evicted = drop_oldest, Rejected = reject policy.", + { "id": 53, "title": "DLQ Overflow (evicted + rejected)", "type": "timeseries", - "gridPos": { "x": 18, "y": 41, "w": 6, "h": 8 }, + "gridPos": { "x": 18, "y": 42, "w": 6, "h": 8 }, "targets": [ { "datasource": "Prometheus", "expr": "rate(deltaforge_dlq_evicted_total{instance=~\"$instance\", pipeline=~\"$pipeline\"}[30s])", "legendFormat": "evicted {{pipeline}}" }, { "datasource": "Prometheus", "expr": "rate(deltaforge_dlq_rejected_total{instance=~\"$instance\", pipeline=~\"$pipeline\"}[30s])", "legendFormat": "rejected {{pipeline}}" } @@ -356,207 +320,85 @@ "options": { "tooltip": { "mode": "multi" }, "legend": { "displayMode": "list", "placement": "bottom" } } }, - { "id": 103, "type": "row", "title": "Errors & Reliability", "gridPos": { "x": 0, "y": 49, "w": 24, "h": 1 }, "collapsed": false, "panels": [] }, + { "id": 106, "type": "row", "title": "Errors & Reliability", "gridPos": { "x": 0, "y": 50, "w": 24, "h": 1 }, "collapsed": false, "panels": [] }, - { "id": 7, "title": "Sink Errors (errors/s)", + { "id": 60, "title": "Sink Errors (errors/s)", "type": "timeseries", - "gridPos": { "x": 0, "y": 41, "w": 8, "h": 8 }, + "gridPos": { "x": 0, "y": 51, "w": 8, "h": 8 }, "targets": [{ "datasource": "Prometheus", "expr": "rate(deltaforge_sink_errors_total{instance=~\"$instance\", pipeline=~\"$pipeline\", sink=~\"$sink\"}[30s])", "legendFormat": "{{pipeline}} / {{sink}}" }], "fieldConfig": { "defaults": { "unit": "ops", "color": { "fixedColor": "red", "mode": "fixed" }, "custom": { "lineWidth": 2 } } }, "options": { "tooltip": { "mode": "multi" }, "legend": { "displayMode": "list", "placement": "bottom" } } }, - { "id": 8, "title": "Source Errors by Kind (errors/s)", + { "id": 61, "title": "Reconnects (cumulative)", "type": "timeseries", - "gridPos": { "x": 8, "y": 41, "w": 8, "h": 8 }, - "targets": [{ "datasource": "Prometheus", "expr": "rate(deltaforge_source_errors_total{instance=~\"$instance\", pipeline=~\"$pipeline\", source=~\"$source\"}[30s])", "legendFormat": "{{pipeline}} / {{kind}}" }], - "fieldConfig": { "defaults": { "unit": "ops", "color": { "fixedColor": "dark-red", "mode": "fixed" }, "custom": { "lineWidth": 2 } } }, - "options": { "tooltip": { "mode": "multi" }, "legend": { "displayMode": "list", "placement": "bottom" } } - }, - - { "id": 13, "title": "Reconnects (cumulative)", - "type": "timeseries", - "gridPos": { "x": 16, "y": 41, "w": 8, "h": 8 }, + "gridPos": { "x": 8, "y": 51, "w": 8, "h": 8 }, "targets": [ - { "datasource": "Prometheus", "expr": "deltaforge_source_reconnects_total{instance=~\"$instance\", pipeline=~\"$pipeline\", source=~\"$source\"}", "legendFormat": "source {{pipeline}}" }, + { "datasource": "Prometheus", "expr": "deltaforge_source_reconnects_total{instance=~\"$instance\", pipeline=~\"$pipeline\"}", "legendFormat": "source {{pipeline}}" }, { "datasource": "Prometheus", "expr": "deltaforge_sink_reconnects_total{instance=~\"$instance\", pipeline=~\"$pipeline\", sink=~\"$sink\"}", "legendFormat": "sink {{pipeline}} / {{sink}}" } ], "fieldConfig": { "defaults": { "unit": "short", "color": { "fixedColor": "orange", "mode": "fixed" }, "custom": { "lineWidth": 2 } } }, "options": { "tooltip": { "mode": "multi" }, "legend": { "displayMode": "list", "placement": "bottom" } } }, - { "id": 109, "type": "row", "title": "Pipeline Detail: $pipeline", - "gridPos": { "x": 0, "y": 49, "w": 24, "h": 1 }, - "collapsed": true, - "repeat": "pipeline", - "repeatDirection": "h", - "panels": [ - - { "id": 60, "title": "[$pipeline] Events by Op (events/s)", - "type": "timeseries", - "gridPos": { "x": 0, "y": 50, "w": 8, "h": 8 }, - "targets": [{ "datasource": "Prometheus", "expr": "sum by(op)(rate(deltaforge_source_events_total{instance=~\"$instance\", pipeline=~\"$pipeline\", source=~\"$source\"}[30s]))", "legendFormat": "{{op}}" }], - "fieldConfig": { "defaults": { "unit": "ops", "custom": { "lineWidth": 1, "fillOpacity": 60, "stacking": { "mode": "normal", "group": "A" } } } }, - "options": { "tooltip": { "mode": "multi" }, "legend": { "displayMode": "list", "placement": "bottom" } } - }, - - { "id": 61, "title": "[$pipeline] Sink Throughput (events/s)", - "type": "timeseries", - "gridPos": { "x": 8, "y": 50, "w": 8, "h": 8 }, - "targets": [{ "datasource": "Prometheus", "expr": "rate(deltaforge_sink_events_total{instance=~\"$instance\", pipeline=~\"$pipeline\", sink=~\"$sink\"}[30s])", "legendFormat": "{{sink}}" }], - "fieldConfig": { "defaults": { "unit": "ops", "custom": { "lineWidth": 2, "fillOpacity": 15 } } }, - "options": { "tooltip": { "mode": "multi" }, "legend": { "displayMode": "list", "placement": "bottom" } } - }, - - { "id": 62, "title": "[$pipeline] Sink Latency (avg ms)", - "type": "timeseries", - "gridPos": { "x": 16, "y": 50, "w": 8, "h": 8 }, - "targets": [{ "datasource": "Prometheus", "expr": "rate(deltaforge_sink_latency_seconds_sum{instance=~\"$instance\", pipeline=~\"$pipeline\", sink=~\"$sink\"}[30s]) / rate(deltaforge_sink_latency_seconds_count{instance=~\"$instance\", pipeline=~\"$pipeline\", sink=~\"$sink\"}[30s]) * 1000", "legendFormat": "{{sink}}" }], - "fieldConfig": { "defaults": { "unit": "ms", "custom": { "lineWidth": 2 } } }, - "options": { "tooltip": { "mode": "multi" }, "legend": { "displayMode": "list", "placement": "bottom" } } - }, - - { "id": 63, "title": "[$pipeline] Batch Size (events/batch)", - "type": "timeseries", - "gridPos": { "x": 0, "y": 58, "w": 8, "h": 8 }, - "targets": [{ "datasource": "Prometheus", "expr": "rate(deltaforge_batch_events_sum{instance=~\"$instance\", pipeline=~\"$pipeline\"}[30s]) / rate(deltaforge_batch_events_count{instance=~\"$instance\", pipeline=~\"$pipeline\"}[30s])", "legendFormat": "avg batch" }], - "fieldConfig": { "defaults": { "unit": "short", "custom": { "lineWidth": 2 } } }, - "options": { "tooltip": { "mode": "multi" }, "legend": { "displayMode": "list", "placement": "bottom" } } - }, - - { "id": 64, "title": "[$pipeline] Sink Errors + Aborts", - "type": "timeseries", - "gridPos": { "x": 8, "y": 58, "w": 8, "h": 8 }, - "targets": [ - { "datasource": "Prometheus", "expr": "rate(deltaforge_sink_errors_total{instance=~\"$instance\", pipeline=~\"$pipeline\", sink=~\"$sink\"}[30s])", "legendFormat": "errors {{sink}}" }, - { "datasource": "Prometheus", "expr": "rate(deltaforge_sink_txn_aborts_total{instance=~\"$instance\", pipeline=~\"$pipeline\", sink=~\"$sink\"}[30s])", "legendFormat": "txn aborts {{sink}}" } - ], - "fieldConfig": { "defaults": { "unit": "ops", "color": { "fixedColor": "red", "mode": "fixed" }, "custom": { "lineWidth": 2 } } }, - "options": { "tooltip": { "mode": "multi" }, "legend": { "displayMode": "list", "placement": "bottom" } } - }, - - { "id": 65, "title": "[$pipeline] Checkpoint Age per Sink (s)", - "type": "timeseries", - "gridPos": { "x": 16, "y": 58, "w": 8, "h": 8 }, - "targets": [{ "datasource": "Prometheus", "expr": "time() - deltaforge_sink_last_checkpoint_ts{instance=~\"$instance\", pipeline=~\"$pipeline\", sink=~\"$sink\"}", "legendFormat": "{{sink}}" }], - "fieldConfig": { - "defaults": { "unit": "s", "custom": { "lineWidth": 2 }, - "thresholds": { "mode": "absolute", "steps": [{ "color": "green", "value": 0 }, { "color": "yellow", "value": 30 }, { "color": "red", "value": 120 }] } - } - }, - "options": { "tooltip": { "mode": "multi" }, "legend": { "displayMode": "list", "placement": "bottom" } } + { "id": 62, "title": "Pipeline State Timeline", + "type": "timeseries", + "gridPos": { "x": 16, "y": 51, "w": 8, "h": 8 }, + "targets": [{ "datasource": "Prometheus", "expr": "deltaforge_pipeline_status{instance=~\"$instance\", pipeline=~\"$pipeline\"}", "legendFormat": "{{pipeline}}" }], + "fieldConfig": { + "defaults": { "unit": "short", "custom": { "lineWidth": 2, "fillOpacity": 10 }, + "thresholds": { "mode": "absolute", "steps": [ + { "color": "red", "value": -1 }, { "color": "gray", "value": 0 }, + { "color": "orange", "value": 0.5 }, { "color": "green", "value": 1 } + ]} } - - ] + }, + "options": { "tooltip": { "mode": "multi" }, "legend": { "displayMode": "list", "placement": "bottom" } } }, - { "id": 104, "type": "row", "title": "Batching & Kafka", "gridPos": { "x": 0, "y": 66, "w": 24, "h": 1 }, "collapsed": true, "panels": [ + { "id": 107, "type": "row", "title": "Batching & Kafka", "gridPos": { "x": 0, "y": 59, "w": 24, "h": 1 }, "collapsed": true, "panels": [ - { "id": 16, "title": "Avg Batch Size (events/batch)", + { "id": 70, "title": "Avg Batch Size (events/batch)", "type": "timeseries", - "gridPos": { "x": 0, "y": 67, "w": 8, "h": 8 }, + "gridPos": { "x": 0, "y": 60, "w": 8, "h": 8 }, "targets": [{ "datasource": "Prometheus", "expr": "rate(deltaforge_batch_events_sum{instance=~\"$instance\", pipeline=~\"$pipeline\"}[30s]) / rate(deltaforge_batch_events_count{instance=~\"$instance\", pipeline=~\"$pipeline\"}[30s])", "legendFormat": "{{pipeline}}" }], "fieldConfig": { "defaults": { "unit": "short", "custom": { "lineWidth": 2 } } }, "options": { "tooltip": { "mode": "multi" }, "legend": { "displayMode": "list", "placement": "bottom" } } }, - { "id": 33, "title": "Avg Batch Bytes", + { "id": 71, "title": "Avg Batch Bytes", "type": "timeseries", - "gridPos": { "x": 8, "y": 67, "w": 8, "h": 8 }, + "gridPos": { "x": 8, "y": 60, "w": 8, "h": 8 }, "targets": [{ "datasource": "Prometheus", "expr": "rate(deltaforge_batch_bytes_sum{instance=~\"$instance\", pipeline=~\"$pipeline\"}[30s]) / rate(deltaforge_batch_bytes_count{instance=~\"$instance\", pipeline=~\"$pipeline\"}[30s])", "legendFormat": "{{pipeline}}" }], "fieldConfig": { "defaults": { "unit": "bytes", "custom": { "lineWidth": 2 } } }, "options": { "tooltip": { "mode": "multi" }, "legend": { "displayMode": "list", "placement": "bottom" } } }, - { "id": 14, "title": "Kafka Producer Queue (messages)", - "type": "timeseries", - "gridPos": { "x": 16, "y": 67, "w": 8, "h": 8 }, - "targets": [{ "datasource": "Prometheus", "expr": "deltaforge_kafka_producer_queue_messages{instance=~\"$instance\", pipeline=~\"$pipeline\", sink=~\"$sink\"}", "legendFormat": "{{pipeline}} / {{sink}}" }], - "fieldConfig": { - "defaults": { "unit": "short", "custom": { "lineWidth": 2 }, - "thresholds": { "mode": "absolute", "steps": [{ "color": "green", "value": 0 }, { "color": "yellow", "value": 5000 }, { "color": "red", "value": 50000 }] } - } - }, - "options": { "tooltip": { "mode": "multi" }, "legend": { "displayMode": "list", "placement": "bottom" } } - } - - ]}, - - { "id": 105, "type": "row", "title": "Schema & Processors", "gridPos": { "x": 0, "y": 75, "w": 24, "h": 1 }, "collapsed": true, "panels": [ - - { "id": 17, "title": "Source Schema Cache Hit Ratio", - "type": "timeseries", - "gridPos": { "x": 0, "y": 76, "w": 8, "h": 8 }, - "targets": [{ "datasource": "Prometheus", "expr": "sum by(pipeline)(rate(deltaforge_source_schema_cache_hits_total{instance=~\"$instance\", pipeline=~\"$pipeline\", source=~\"$source\"}[60s])) / (sum by(pipeline)(rate(deltaforge_source_schema_cache_hits_total{instance=~\"$instance\", pipeline=~\"$pipeline\", source=~\"$source\"}[60s])) + sum by(pipeline)(rate(deltaforge_source_schema_cache_misses_total{instance=~\"$instance\", pipeline=~\"$pipeline\", source=~\"$source\"}[60s])))", "legendFormat": "{{pipeline}}" }], - "fieldConfig": { "defaults": { "unit": "percentunit", "min": 0, "max": 1, "custom": { "lineWidth": 2 } } }, - "options": { "tooltip": { "mode": "multi" }, "legend": { "displayMode": "list", "placement": "bottom" } } - }, - - { "id": 25, "title": "Schema Sensing Cache Hit Ratio", - "type": "timeseries", - "gridPos": { "x": 8, "y": 76, "w": 8, "h": 8 }, - "targets": [{ "datasource": "Prometheus", "expr": "sum by(pipeline)(rate(deltaforge_schema_sensing_cache_hits_total{instance=~\"$instance\", pipeline=~\"$pipeline\"}[60s])) / (sum by(pipeline)(rate(deltaforge_schema_sensing_cache_hits_total{instance=~\"$instance\", pipeline=~\"$pipeline\"}[60s])) + sum by(pipeline)(rate(deltaforge_schema_sensing_cache_misses_total{instance=~\"$instance\", pipeline=~\"$pipeline\"}[60s])))", "legendFormat": "{{pipeline}}" }], - "fieldConfig": { "defaults": { "unit": "percentunit", "min": 0, "max": 1, "custom": { "lineWidth": 2 } } }, - "options": { "tooltip": { "mode": "multi" }, "legend": { "displayMode": "list", "placement": "bottom" } } - }, - - { "id": 18, "title": "Schema Drift Detected", - "type": "timeseries", - "gridPos": { "x": 16, "y": 76, "w": 8, "h": 8 }, - "targets": [{ "datasource": "Prometheus", "expr": "deltaforge_schema_drift_detected{instance=~\"$instance\", pipeline=~\"$pipeline\"}", "legendFormat": "{{pipeline}}" }], - "fieldConfig": { "defaults": { "unit": "short", "color": { "fixedColor": "purple", "mode": "fixed" }, "custom": { "lineWidth": 2 } } }, - "options": { "tooltip": { "mode": "multi" }, "legend": { "displayMode": "list", "placement": "bottom" } } - }, - - { "id": 24, "title": "Processor Latency (avg ms)", + { "id": 72, "title": "Sink Latency (avg ms)", "type": "timeseries", - "gridPos": { "x": 0, "y": 84, "w": 8, "h": 8 }, - "targets": [{ "datasource": "Prometheus", "expr": "rate(deltaforge_processor_latency_seconds_sum{instance=~\"$instance\", pipeline=~\"$pipeline\", processor=~\"$processor\"}[30s]) / rate(deltaforge_processor_latency_seconds_count{instance=~\"$instance\", pipeline=~\"$pipeline\", processor=~\"$processor\"}[30s]) * 1000", "legendFormat": "{{pipeline}} {{processor}}" }], + "gridPos": { "x": 16, "y": 60, "w": 8, "h": 8 }, + "targets": [{ "datasource": "Prometheus", "expr": "rate(deltaforge_sink_latency_seconds_sum{instance=~\"$instance\", pipeline=~\"$pipeline\", sink=~\"$sink\"}[30s]) / rate(deltaforge_sink_latency_seconds_count{instance=~\"$instance\", pipeline=~\"$pipeline\", sink=~\"$sink\"}[30s]) * 1000", "legendFormat": "{{pipeline}} / {{sink}}" }], "fieldConfig": { "defaults": { "unit": "ms", "custom": { "lineWidth": 2 } } }, "options": { "tooltip": { "mode": "multi" }, "legend": { "displayMode": "list", "placement": "bottom" } } - }, - - { "id": 19, "title": "Processor Drop Rate (events/s)", - "type": "timeseries", - "gridPos": { "x": 8, "y": 84, "w": 8, "h": 8 }, - "targets": [{ "datasource": "Prometheus", "expr": "rate(deltaforge_processor_events_in_total{instance=~\"$instance\", pipeline=~\"$pipeline\", processor=~\"$processor\"}[30s]) - rate(deltaforge_processor_events_out_total{instance=~\"$instance\", pipeline=~\"$pipeline\", processor=~\"$processor\"}[30s])", "legendFormat": "{{pipeline}} / {{processor}}" }], - "fieldConfig": { "defaults": { "unit": "ops", "color": { "fixedColor": "orange", "mode": "fixed" }, "custom": { "lineWidth": 2 } } }, - "options": { "tooltip": { "mode": "multi" }, "legend": { "displayMode": "list", "placement": "bottom" } } - }, - - { "id": 20, "title": "Processor Errors (errors/s)", - "type": "timeseries", - "gridPos": { "x": 16, "y": 84, "w": 8, "h": 8 }, - "targets": [{ "datasource": "Prometheus", "expr": "rate(deltaforge_processor_errors_total{instance=~\"$instance\", pipeline=~\"$pipeline\", processor=~\"$processor\"}[30s])", "legendFormat": "{{pipeline}} / {{processor}}" }], - "fieldConfig": { "defaults": { "unit": "ops", "color": { "fixedColor": "dark-orange", "mode": "fixed" }, "custom": { "lineWidth": 2 } } }, - "options": { "tooltip": { "mode": "multi" }, "legend": { "displayMode": "list", "placement": "bottom" } } } ]}, - { "id": 107, "type": "row", "title": "Infrastructure", "gridPos": { "x": 0, "y": 92, "w": 24, "h": 1 }, "collapsed": true, "panels": [ + { "id": 108, "type": "row", "title": "Infrastructure", "gridPos": { "x": 0, "y": 68, "w": 24, "h": 1 }, "collapsed": true, "panels": [ - { "id": 21, "title": "Container CPU Usage", + { "id": 80, "title": "Container CPU Usage", "type": "timeseries", - "gridPos": { "x": 0, "y": 93, "w": 12, "h": 8 }, + "gridPos": { "x": 0, "y": 69, "w": 12, "h": 8 }, "targets": [{ "datasource": "Prometheus", "expr": "rate(container_cpu_usage_seconds_total{container_label_com_docker_compose_service=~\"deltaforge.*\"}[30s]) * 100", "legendFormat": "{{container_label_com_docker_compose_service}}" }], - "fieldConfig": { - "defaults": { "unit": "percent", "min": 0, "custom": { "lineWidth": 2 }, - "thresholds": { "mode": "absolute", "steps": [{ "color": "green", "value": 0 }, { "color": "yellow", "value": 70 }, { "color": "red", "value": 90 }] } - } - }, + "fieldConfig": { "defaults": { "unit": "percent", "min": 0, "custom": { "lineWidth": 2 } } }, "options": { "tooltip": { "mode": "multi" }, "legend": { "displayMode": "list", "placement": "bottom" } } }, - { "id": 22, "title": "Container Memory (working set)", + { "id": 81, "title": "Container Memory (working set)", "type": "timeseries", - "gridPos": { "x": 12, "y": 93, "w": 12, "h": 8 }, + "gridPos": { "x": 12, "y": 69, "w": 12, "h": 8 }, "targets": [{ "datasource": "Prometheus", "expr": "container_memory_working_set_bytes{container_label_com_docker_compose_service=~\"deltaforge.*\"}", "legendFormat": "{{container_label_com_docker_compose_service}}" }], - "fieldConfig": { - "defaults": { "unit": "bytes", "custom": { "lineWidth": 2 }, - "thresholds": { "mode": "absolute", "steps": [{ "color": "green", "value": 0 }, { "color": "yellow", "value": 536870912 }, { "color": "red", "value": 1073741824 }] } - } - }, + "fieldConfig": { "defaults": { "unit": "bytes", "custom": { "lineWidth": 2 } } }, "options": { "tooltip": { "mode": "multi" }, "legend": { "displayMode": "list", "placement": "bottom" } } } diff --git a/crates/deltaforge-config/src/lib.rs b/crates/deltaforge-config/src/lib.rs index c7e2707..28ce3df 100644 --- a/crates/deltaforge-config/src/lib.rs +++ b/crates/deltaforge-config/src/lib.rs @@ -83,6 +83,16 @@ pub struct Metadata { /// Business oriented tenant identifier pub tenant: String, + + /// Key-value labels for filtering, grouping, and Grafana variables. + /// Example: `{"env": "prod", "team": "platform", "tier": "critical"}` + #[serde(default)] + pub labels: std::collections::HashMap, + + /// Free-form annotations for non-filtering metadata (docs, links, ownership). + /// Example: `{"owner": "team-platform@company.com", "runbook": "https://..."}` + #[serde(default)] + pub annotations: std::collections::HashMap, } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/crates/rest-api/src/errors.rs b/crates/rest-api/src/errors.rs index 2d54d24..01e5415 100644 --- a/crates/rest-api/src/errors.rs +++ b/crates/rest-api/src/errors.rs @@ -1,4 +1,6 @@ +use axum::Json; use axum::http::StatusCode; +use serde::Serialize; use tracing::error; #[derive(Debug)] @@ -9,6 +11,16 @@ pub enum PipelineAPIError { Failed(anyhow::Error), } +/// Structured error response — parseable by automation and CLIs. +#[derive(Serialize)] +pub struct ApiError { + pub code: &'static str, + pub message: String, +} + +/// Standard API result type used across all endpoint modules. +pub type ApiResult = Result, (StatusCode, Json)>; + impl std::fmt::Display for PipelineAPIError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { @@ -44,14 +56,28 @@ impl From for PipelineAPIError { } } -pub fn pipeline_error(err: PipelineAPIError) -> (StatusCode, String) { +pub fn pipeline_error(err: PipelineAPIError) -> (StatusCode, Json) { error!(error=?err, "pipeline lifecycle operation failed"); - let status = match err { - PipelineAPIError::NotFound(_) => StatusCode::NOT_FOUND, - PipelineAPIError::AlreadyExists(_) => StatusCode::CONFLICT, - PipelineAPIError::NameMismatch { .. } => StatusCode::BAD_REQUEST, - PipelineAPIError::Failed(_) => StatusCode::INTERNAL_SERVER_ERROR, + let (status, code) = match &err { + PipelineAPIError::NotFound(_) => { + (StatusCode::NOT_FOUND, "PIPELINE_NOT_FOUND") + } + PipelineAPIError::AlreadyExists(_) => { + (StatusCode::CONFLICT, "PIPELINE_ALREADY_EXISTS") + } + PipelineAPIError::NameMismatch { .. } => { + (StatusCode::BAD_REQUEST, "PIPELINE_NAME_MISMATCH") + } + PipelineAPIError::Failed(_) => { + (StatusCode::INTERNAL_SERVER_ERROR, "INTERNAL_ERROR") + } }; - (status, err.to_string()) + ( + status, + Json(ApiError { + code, + message: err.to_string(), + }), + ) } diff --git a/crates/rest-api/src/health.rs b/crates/rest-api/src/health.rs index 01b3d86..11500e5 100644 --- a/crates/rest-api/src/health.rs +++ b/crates/rest-api/src/health.rs @@ -10,16 +10,31 @@ pub fn router(state: AppState) -> Router { Router::new() .route("/health", get(healthz)) .route("/ready", get(readyz)) + .route("/log-level", get(get_log_level)) + .route("/validate", axum::routing::post(validate_config)) .with_state(state) } async fn healthz(State(st): State) -> impl IntoResponse { let pipelines = st.controller.list().await; - if pipelines.iter().any(|p| p.status == "failed") { - return (StatusCode::SERVICE_UNAVAILABLE, "pipeline failed\n") - .into_response(); + let failed: Vec<_> = pipelines + .iter() + .filter(|p| p.status == "failed") + .map(|p| p.name.clone()) + .collect(); + + if !failed.is_empty() { + let body = serde_json::json!({ + "status": "unhealthy", + "failed_pipelines": failed, + }); + return (StatusCode::SERVICE_UNAVAILABLE, Json(body)).into_response(); } - (StatusCode::OK, "ok\n").into_response() + let body = serde_json::json!({ + "status": "healthy", + "pipelines": pipelines.len(), + }); + (StatusCode::OK, Json(body)).into_response() } #[derive(Serialize)] @@ -37,3 +52,56 @@ async fn readyz(State(st): State) -> Json { pipelines, }) } + +// ── Log level ──────────────────────────────────────────────────────────────── + +#[derive(Serialize)] +struct LogLevelResponse { + level: String, +} + +async fn get_log_level() -> Json { + let level = + std::env::var("RUST_LOG").unwrap_or_else(|_| "info".to_string()); + Json(LogLevelResponse { level }) +} + +// ── Config validation ──────────────────────────────────────────────────────── + +/// Validate a pipeline config without creating it. Accepts JSON body. +/// Returns {"valid": true, ...} or {"valid": false, "error": "..."}. +async fn validate_config( + Json(body): Json, +) -> impl IntoResponse { + match serde_json::from_value::(body) { + Ok(spec) => { + let name = &spec.metadata.name; + let source_type = match &spec.spec.source { + deltaforge_config::SourceCfg::Mysql(_) => "mysql", + deltaforge_config::SourceCfg::Postgres(_) => "postgres", + #[allow(unreachable_patterns)] + _ => "other", + }; + let sink_count = spec.spec.sinks.len(); + + ( + StatusCode::OK, + Json(serde_json::json!({ + "valid": true, + "pipeline": name, + "source_type": source_type, + "sink_count": sink_count, + })), + ) + .into_response() + } + Err(e) => ( + StatusCode::BAD_REQUEST, + Json(serde_json::json!({ + "valid": false, + "error": e.to_string(), + })), + ) + .into_response(), + } +} diff --git a/crates/rest-api/src/lib.rs b/crates/rest-api/src/lib.rs index 4fb1f4e..8a7ee51 100644 --- a/crates/rest-api/src/lib.rs +++ b/crates/rest-api/src/lib.rs @@ -1,7 +1,7 @@ use axum::Router; mod errors; mod health; -mod pipelines; +pub mod pipelines; mod schemas; mod sensing; @@ -196,6 +196,8 @@ mod tests { metadata: Metadata { name: "demo".to_string(), tenant: "acme".to_string(), + labels: Default::default(), + annotations: Default::default(), }, spec: Spec { sharding: None, @@ -229,6 +231,7 @@ mod tests { journal: None, }, }, + ops: None, } } @@ -259,7 +262,8 @@ mod tests { assert_eq!(StatusCode::OK, resp.status()); let body = to_bytes(resp.into_body(), usize::MAX).await.unwrap(); - assert_eq!(&body[..], b"ok\n"); + let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(json["status"], "healthy"); let ready = app .oneshot( diff --git a/crates/rest-api/src/pipelines.rs b/crates/rest-api/src/pipelines.rs index b7a2fd6..b517b07 100644 --- a/crates/rest-api/src/pipelines.rs +++ b/crates/rest-api/src/pipelines.rs @@ -22,6 +22,24 @@ pub struct PipeInfo { pub name: String, pub status: String, pub spec: PipelineSpec, + /// Operational status — populated by the controller, optional for backward compat. + #[serde(skip_serializing_if = "Option::is_none")] + pub ops: Option, +} + +/// Operational status fields — everything an operator needs in one response. +#[derive(Clone, Serialize, Deserialize, Default)] +pub struct PipelineOpsStatus { + /// Replication lag in seconds (source event time vs wall clock). + pub lag_seconds: Option, + /// DLQ entry count (0 if journal not enabled). + pub dlq_entries: u64, + /// Last error per sink (empty if all healthy). + pub sink_errors: std::collections::HashMap, + /// Pipeline uptime in seconds since last start/restart. + pub uptime_seconds: Option, + /// Per-sink checkpoint positions. + pub checkpoints: Vec, } #[async_trait] @@ -98,6 +116,25 @@ pub trait PipelineController: Send + Sync { "DLQ not enabled for this pipeline" ))) } + + // ── Checkpoint inspection ────────────────────────────────────────── + + /// Get per-sink checkpoint positions for a pipeline. + async fn checkpoints( + &self, + name: &str, + ) -> Result, PipelineAPIError> { + let _ = name; + Ok(vec![]) + } +} + +/// Per-sink checkpoint position returned by the inspection API. +#[derive(Clone, Serialize, Deserialize)] +pub struct CheckpointInfo { + pub sink_id: String, + pub position: Value, + pub age_seconds: f64, } pub fn router(state: AppState) -> Router { @@ -119,13 +156,43 @@ pub fn router(state: AppState) -> Router { ) .route("/pipelines/{name}/journal/dlq/count", get(handle_dlq_count)) .route("/pipelines/{name}/journal/dlq/ack", post(handle_dlq_ack)) + // Checkpoint inspection + .route("/pipelines/{name}/checkpoints", get(handle_checkpoints)) .with_state(state) } -type ApiResult = Result, (StatusCode, String)>; +use crate::errors::ApiResult; + +#[derive(Deserialize, Default)] +struct ListPipelinesParams { + /// Filter by label: `?label=env:prod` or `?label=team:platform`. + /// Multiple labels: `?label=env:prod&label=team:platform` (AND logic). + #[serde(default)] + label: Vec, +} + +async fn list_pipelines( + State(st): State, + Query(params): Query, +) -> Json> { + let mut pipelines = st.controller.list().await; + + // Filter by labels (AND logic — all specified labels must match). + if !params.label.is_empty() { + pipelines.retain(|p| { + let meta_labels = &p.spec.metadata.labels; + params.label.iter().all(|filter| { + if let Some((key, value)) = filter.split_once(':') { + meta_labels.get(key).map(|v| v == value).unwrap_or(false) + } else { + // Key-only filter: label exists regardless of value + meta_labels.contains_key(filter) + } + }) + }); + } -async fn list_pipelines(State(st): State) -> Json> { - Json(st.controller.list().await) + Json(pipelines) } async fn get_pipeline( @@ -198,7 +265,8 @@ async fn stop_pipeline( async fn delete_pipeline( State(st): State, Path(name): Path, -) -> Result { +) -> Result)> { + // delete returns StatusCode directly, not wrapped in ApiResult st.controller .delete(&name) .await @@ -314,6 +382,19 @@ async fn handle_dlq_purge( Ok(Json(DlqPurgeResponse { purged })) } +// ── Checkpoint inspection handler ──────────────────────────────────────────── + +async fn handle_checkpoints( + State(st): State, + Path(name): Path, +) -> ApiResult> { + st.controller + .checkpoints(&name) + .await + .map(Json) + .map_err(pipeline_error) +} + #[cfg(test)] mod tests { use super::*; @@ -335,6 +416,8 @@ mod tests { metadata: Metadata { name: "demo".to_string(), tenant: "acme".to_string(), + labels: Default::default(), + annotations: Default::default(), }, spec: Spec { sharding: None, @@ -368,6 +451,7 @@ mod tests { journal: None, }, }, + ops: None, } } diff --git a/crates/rest-api/src/schemas.rs b/crates/rest-api/src/schemas.rs index ec4ac8a..9b22a86 100644 --- a/crates/rest-api/src/schemas.rs +++ b/crates/rest-api/src/schemas.rs @@ -2,7 +2,6 @@ use async_trait::async_trait; use axum::{ Json, Router, extract::{Path, State}, - http::StatusCode, routing::{get, post}, }; use chrono::{DateTime, Utc}; @@ -140,7 +139,7 @@ pub fn router(state: SchemaState) -> Router { .with_state(state) } -type ApiResult = Result, (StatusCode, String)>; +use crate::errors::ApiResult; async fn list_schemas( State(st): State, diff --git a/crates/rest-api/src/sensing.rs b/crates/rest-api/src/sensing.rs index de28be5..6528d87 100644 --- a/crates/rest-api/src/sensing.rs +++ b/crates/rest-api/src/sensing.rs @@ -6,7 +6,6 @@ use async_trait::async_trait; use axum::{ Json, Router, extract::{Path, State}, - http::StatusCode, routing::get, }; use chrono::{DateTime, Utc}; @@ -207,7 +206,7 @@ pub fn router(state: SensingState) -> Router { .with_state(state) } -type ApiResult = Result, (StatusCode, String)>; +use crate::errors::ApiResult; async fn list_inferred_schemas( State(st): State, @@ -280,7 +279,7 @@ mod tests { use super::*; use axum::{ body::{Body, to_bytes}, - http::{Method, Request}, + http::{Method, Request, StatusCode}, }; use tower::ServiceExt; diff --git a/crates/runner/src/coordinator.rs b/crates/runner/src/coordinator.rs index 94be08f..8acf7e6 100644 --- a/crates/runner/src/coordinator.rs +++ b/crates/runner/src/coordinator.rs @@ -811,6 +811,22 @@ impl Coordinator { ) .set(lag_secs); + // Per-table lag: track the last event timestamp per table in this batch. + let mut table_lag: HashMap = HashMap::new(); + for ev in &events { + let table_key = format!("{}.{}", ev.source.db, ev.source.table); + let ev_lag = ((now_ms - ev.ts_ms).max(0) as f64) / 1000.0; + table_lag.insert(table_key, ev_lag); + } + for (table, lag) in &table_lag { + gauge!( + "deltaforge_source_table_lag_seconds", + "pipeline" => self.pipeline_name.to_string(), + "table" => table.clone(), + ) + .set(*lag); + } + // E2E latency: first event's pipeline-receive time → now (before sink // delivery). Uses received_at_ms (wall-clock at parse time) rather than // ts_ms (binlog header timestamp, second-precision) so the metric reflects diff --git a/crates/runner/src/pipeline_manager.rs b/crates/runner/src/pipeline_manager.rs index 23e5ffd..79477ff 100644 --- a/crates/runner/src/pipeline_manager.rs +++ b/crates/runner/src/pipeline_manager.rs @@ -136,6 +136,7 @@ pub(crate) struct PipelineRuntime { pub(crate) table_patterns: Vec, pub(crate) sensor_state: Option>, pub(crate) dlq_writer: Option>, + pub(crate) started_at: std::time::Instant, } impl PipelineRuntime { @@ -171,6 +172,7 @@ impl PipelineRuntime { name: self.spec.metadata.name.clone(), status: status.to_string(), spec: self.spec.clone(), + ops: None, // populated async by controller.get() } } } @@ -445,9 +447,28 @@ impl PipelineManager { result }); - gauge!("deltaforge_pipeline_status", "pipeline" => pipeline_name) + gauge!("deltaforge_pipeline_status", "pipeline" => pipeline_name.clone()) .set(1.0); + // Emit pipeline info metric with labels for Grafana joins. + // This is a constant gauge (always 1) that carries metadata as labels. + let tenant = spec.metadata.tenant.clone(); + let mut info_labels = vec![ + ("pipeline".to_string(), pipeline_name.clone()), + ("tenant".to_string(), tenant), + ]; + for (k, v) in &spec.metadata.labels { + info_labels.push((k.clone(), v.clone())); + } + // Build gauge with dynamic labels — use the pipeline + tenant as fixed, + // and emit user labels as part of the metric name context. + gauge!( + "deltaforge_pipeline_info", + "pipeline" => pipeline_name.clone(), + "tenant" => spec.metadata.tenant.clone(), + ) + .set(1.0); + Ok(PipelineRuntime { spec, status: PipelineStatus::Running, @@ -460,6 +481,7 @@ impl PipelineManager { table_patterns, sensor_state: sensor_for_runtime, dlq_writer, + started_at: std::time::Instant::now(), }) } @@ -529,8 +551,30 @@ impl PipelineController for PipelineManager { } async fn get(&self, name: &str) -> Result { - self.get_pipeline(name) - .ok_or_else(|| PipelineAPIError::NotFound(name.to_string())) + let (mut info, uptime) = { + let guard = self.pipelines.read(); + let runtime = guard + .get(name) + .ok_or_else(|| PipelineAPIError::NotFound(name.to_string()))?; + (runtime.info(), runtime.started_at.elapsed().as_secs_f64()) + }; + + // Enrich with operational status. + let checkpoints = self.checkpoints(name).await.unwrap_or_default(); + let dlq_count = match self.get_dlq_writer(name) { + Ok(dlq) => dlq.len().await.unwrap_or(0), + Err(_) => 0, + }; + + info.ops = Some(rest_api::pipelines::PipelineOpsStatus { + lag_seconds: None, + dlq_entries: dlq_count, + sink_errors: Default::default(), + uptime_seconds: Some(uptime), + checkpoints, + }); + + Ok(info) } async fn create( @@ -760,6 +804,61 @@ impl PipelineController for PipelineManager { let dlq = self.get_dlq_writer(name)?; dlq.purge().await.map_err(PipelineAPIError::Failed) } + + async fn checkpoints( + &self, + name: &str, + ) -> Result, PipelineAPIError> + { + let (source_id, prefix) = { + let guard = self.pipelines.read(); + let runtime = guard + .get(name) + .ok_or_else(|| PipelineAPIError::NotFound(name.to_string()))?; + let sid = runtime.spec.spec.source.source_id().to_string(); + let pfx = format!("{}::sink::", sid); + (sid, pfx) + }; // guard dropped here + let _ = source_id; // used for future expansion + + let keys = self + .ckpt_store + .list_with_prefix(&prefix) + .await + .map_err(|e| PipelineAPIError::Failed(e.into()))?; + + let now = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_secs_f64()) + .unwrap_or(0.0); + + let mut result = Vec::with_capacity(keys.len()); + for key in &keys { + let sink_id = key.strip_prefix(&prefix).unwrap_or(key).to_string(); + + let position = match self.ckpt_store.get_raw(key).await { + Ok(Some(bytes)) => serde_json::from_slice(&bytes) + .unwrap_or(serde_json::Value::Null), + _ => serde_json::Value::Null, + }; + + // Checkpoint age: time since last write. We use the checkpoint + // timestamp if available, otherwise report 0. + let age = position + .get("ts_ms") + .and_then(|v| v.as_f64()) + .map(|ts| now - ts / 1000.0) + .unwrap_or(0.0); + + result.push(rest_api::pipelines::CheckpointInfo { + sink_id, + position, + age_seconds: age, + }); + } + + Ok(result) + } } // ============================================================================ @@ -819,6 +918,8 @@ mod tests { metadata: Metadata { name: name.to_string(), tenant: "acme".to_string(), + labels: Default::default(), + annotations: Default::default(), }, spec: Spec { sharding: None, diff --git a/docs/src/apireference.md b/docs/src/apireference.md index 9d56708..0edbd39 100644 --- a/docs/src/apireference.md +++ b/docs/src/apireference.md @@ -25,16 +25,14 @@ GET /health Returns `ok` when the process is running and all pipelines are healthy. Returns `503` if any pipeline has entered a failed state (e.g. position lost after failover, binlog purged, unrecoverable source error). Use for Kubernetes liveness probes — a `503` indicates the process should be restarted. **Response:** `200 OK` — all pipelines healthy -``` -ok +```json +{"status": "healthy", "pipelines": 3} ``` **Response:** `503 Service Unavailable` — one or more pipelines failed +```json +{"status": "unhealthy", "failed_pipelines": ["orders-cdc"]} ``` -pipeline failed -``` - -Pipeline status can be inspected via `/ready` or `GET /pipelines` to identify which pipeline failed and why. ### Readiness Probe @@ -66,9 +64,11 @@ Returns pipeline states. Use for Kubernetes readiness probes. ```http GET /pipelines +GET /pipelines?label=env:prod +GET /pipelines?label=env:prod&label=team:platform ``` -Returns all pipelines with current status. +Returns all pipelines with current status. Filter by labels with AND logic. Key-only filter (`?label=env`) matches any value. **Response:** `200 OK` ```json @@ -90,14 +90,22 @@ Returns all pipelines with current status. GET /pipelines/{name} ``` -Returns a single pipeline by name. +Returns a single pipeline by name with operational status. **Response:** `200 OK` ```json { "name": "orders-cdc", "status": "running", - "spec": { ... } + "spec": { ... }, + "ops": { + "uptime_seconds": 3600.5, + "dlq_entries": 0, + "sink_errors": {}, + "checkpoints": [ + {"sink_id": "kafka-primary", "position": {"file": "mysql-bin.000005", "pos": 12345}, "age_seconds": 0.3} + ] + } } ``` @@ -538,19 +546,128 @@ Returns drift detection results for a specific table. --- +## Dead Letter Queue + +See the [DLQ page](dlq.md) for full documentation. + +### Peek DLQ Entries + +```http +GET /pipelines/{name}/journal/dlq?limit=50&sink_id=kafka-primary&error_kind=serialization +``` + +Returns DLQ entries (oldest first). All query params are optional. + +### DLQ Count + +```http +GET /pipelines/{name}/journal/dlq/count +``` + +**Response:** `200 OK` +```json +{"count": 42} +``` + +### Acknowledge DLQ Entries + +```http +POST /pipelines/{name}/journal/dlq/ack +Content-Type: application/json + +{"up_to_seq": 42} +``` + +Permanently removes entries from the head up to the given sequence number. + +**Response:** `200 OK` +```json +{"acked": 12} +``` + +### Purge DLQ + +```http +DELETE /pipelines/{name}/journal/dlq +``` + +**Response:** `200 OK` +```json +{"purged": 42} +``` + +--- + +## Checkpoint Inspection + +### Get Checkpoints + +```http +GET /pipelines/{name}/checkpoints +``` + +Returns per-sink checkpoint positions and ages. + +**Response:** `200 OK` +```json +[ + {"sink_id": "kafka-primary", "position": {"file": "mysql-bin.000005", "pos": 12345}, "age_seconds": 0.3}, + {"sink_id": "redis-cache", "position": {"file": "mysql-bin.000005", "pos": 11000}, "age_seconds": 2.1} +] +``` + +--- + +## System Endpoints + +### Log Level + +```http +GET /log-level +``` + +Returns the current `RUST_LOG` value. + +**Response:** `200 OK` +```json +{"level": "deltaforge=info,sources=info,sinks=info,warn"} +``` + +### Validate Config + +```http +POST /validate +Content-Type: application/json +``` + +Dry-run validation of a pipeline config without creating it. + +**Response:** `200 OK` — config is valid +```json +{"valid": true, "pipeline": "orders-cdc", "source_type": "mysql", "sink_count": 2} +``` + +**Response:** `400 Bad Request` — config has errors +```json +{"valid": false, "error": "spec: missing field `processors` at line 7 column 3"} +``` + +--- + ## Error Responses -All error responses follow this format: +All error responses return structured JSON: ```json { - "error": "Description of the error" + "code": "PIPELINE_NOT_FOUND", + "message": "pipeline orders-cdc not found" } ``` -| Status Code | Meaning | -|-------------|---------| -| `400 Bad Request` | Invalid request body or parameters | -| `404 Not Found` | Resource doesn't exist | -| `409 Conflict` | Resource already exists | -| `500 Internal Server Error` | Unexpected server error | \ No newline at end of file +| Status Code | Code | Meaning | +|-------------|------|---------| +| `400 Bad Request` | `PIPELINE_NAME_MISMATCH` | Invalid request body or name mismatch | +| `404 Not Found` | `PIPELINE_NOT_FOUND` | Resource doesn't exist | +| `409 Conflict` | `PIPELINE_ALREADY_EXISTS` | Resource already exists | +| `500 Internal Server Error` | `INTERNAL_ERROR` | Unexpected server error | \ No newline at end of file diff --git a/docs/src/guarantees.md b/docs/src/guarantees.md index e0d70e5..8f61be0 100644 --- a/docs/src/guarantees.md +++ b/docs/src/guarantees.md @@ -260,6 +260,38 @@ How long should consumers remember processed event IDs? Match your maximum expec | Unplanned crashes with auto-restart | 15-30 minutes | | Disaster recovery | Match your RPO | +## Correctness Test Matrix + +Every guarantee is backed by a test. This matrix maps guarantees to their verification: + +| Guarantee | Test | Type | Status | +|-----------|------|------|--------| +| No data loss (at-least-once) | `crash_recovery` chaos scenario | Chaos | Exists | +| Kafka end-to-end exactly-once | `exactly_once` chaos scenario + `kafka_sink_exactly_once_*` | Chaos + Integration | Exists | +| Producer fencing detection | `kafka_sink_exactly_once_producer_fencing` | Integration | Exists | +| Per-primary-key ordering | Events keyed by PK → same Kafka partition | By design | Verified via Kafka partition assignment | +| Transaction boundary preservation | `respect_source_tx` + `check_and_split` coordinator logic | Unit | Exists | +| Per-sink checkpoint independence | `test_per_sink_checkpoint_only_advances_on_success` | Unit | Exists | +| Per-sink checkpoint legacy fallback | `per_sink_proxy_falls_back_to_legacy_key` | Unit | Exists | +| Commit policy gate before checkpoint | `test_per_sink_checkpoint_only_advances_on_success` | Unit | Exists | +| DLQ routes per-event failures | `test_dlq_routes_failed_events_and_pipeline_continues` | Unit | Exists | +| DLQ all-fail batch | `test_dlq_all_events_fail_no_send` | Unit | Exists | +| DLQ overflow (drop_oldest) | `dlq::overflow_drop_oldest` | Unit | Exists | +| DLQ overflow (reject) | `dlq::overflow_reject_drops_new` | Unit | Exists | +| DLQ overflow (block) | `dlq::overflow_block_waits_for_ack` | Unit | Exists | +| DLQ cleanup expired | `dlq::cleanup_expired_removes_old_entries` | Unit | Exists | +| Partial batch timer flush | `test_partial_batch_flushed_by_timer` | Unit | Exists | +| Network partition recovery | `network_partition` chaos scenario | Chaos | Exists | +| Sink outage recovery | `sink_outage` chaos scenario | Chaos | Exists | +| Schema drift handling | `schema_drift` chaos scenario | Chaos | Exists | +| MySQL failover detection | `failover` chaos scenario | Chaos | Exists | +| Postgres failover detection | `pg_failover` chaos scenario | Chaos | Exists | +| Binlog purge detection | `binlog_purge` chaos scenario | Chaos | Exists | +| Replication slot drop detection | `slot_dropped` chaos scenario | Chaos | Exists | +| NATS dedup within window | Verify `Nats-Msg-Id` prevents duplicates | Integration | Planned | +| Redis idempotency key | Verify consumer-side dedup via key | Integration | Planned | +| Snapshot → CDC handoff | No gaps or duplicates at boundary | Integration | Planned | + ## Limitations These are **not guaranteed** and are documented honestly: diff --git a/docs/src/observability.md b/docs/src/observability.md index 46bc727..4d6ad3a 100644 --- a/docs/src/observability.md +++ b/docs/src/observability.md @@ -18,7 +18,8 @@ The sections below call out concrete metrics and log events to add per component | --- | --- | --- | | ✅ Implemented | `deltaforge_source_events_total{pipeline,source,table}` counter increments when MySQL events are handed to the coordinator. | Surfaces ingress per table and pipeline. | | ✅ Implemented | `deltaforge_source_reconnects_total{pipeline,source}` counter when binlog reads reconnect. | Makes retry storms visible. | -| 🚧 Gap | `deltaforge_source_lag_seconds{pipeline,source}` gauge based on binlog/WAL position vs. server time. | Alert when sources fall behind. | +| ✅ Implemented | `deltaforge_source_lag_seconds{pipeline}` gauge — replication lag based on last event timestamp vs. wall clock. | Alert when sources fall behind. | +| ✅ Implemented | `deltaforge_source_table_lag_seconds{pipeline,table}` gauge — per-table replication lag within each batch. | Identify which tables are lagging. | | 🚧 Gap | `deltaforge_source_idle_seconds{pipeline,source}` gauge updated when no events arrive within the inactivity window. | Catch stuck readers before downstream backlogs form. | ### Coordinator and batching @@ -31,7 +32,7 @@ The sections below call out concrete metrics and log events to add per component | ✅ Implemented | `deltaforge_stage_latency_seconds{pipeline,stage,trigger}` histogram for processor stage. | Provides batch timing per trigger (timer/limits/shutdown). | | ✅ Implemented | `deltaforge_processor_latency_seconds{pipeline,processor}` histogram around every processor invocation. | Identify slow user functions. | | 🚧 Gap | `deltaforge_pipeline_channel_depth{pipeline}` gauge from `mpsc::Sender::capacity()`/`len()`. | Detect backpressure between sources and coordinator. | -| 🚧 Gap | Checkpoint outcome counters/logs (`deltaforge_checkpoint_success_total` / `_failure_total`). | Alert on persistence regressions and correlate to data loss risk. | +| ✅ Implemented | `deltaforge_checkpoints_total{pipeline}` counter — successful checkpoint commits. | Monitor checkpoint throughput. | ### Sinks (Kafka/Redis/custom) @@ -39,9 +40,12 @@ The sections below call out concrete metrics and log events to add per component | --- | --- | --- | | ✅ Implemented | `deltaforge_sink_events_total{pipeline,sink}` counter and `deltaforge_sink_latency_seconds{pipeline,sink}` histogram around each send. | Throughput and responsiveness per sink. | | ✅ Implemented | `deltaforge_sink_batch_total{pipeline,sink}` counter for send. | Number of batches sent per sink. | -| 🚧 Gap | Error taxonomy in `deltaforge_sink_failures_total` (add `kind`/`details`). | Easier alerting on specific failure classes (auth, timeout, schema). | +| ✅ Implemented | `deltaforge_sink_errors_total{pipeline,sink}` counter with per-sink error tracking. | Alert on sink failures. | +| ✅ Implemented | `deltaforge_sink_txn_commits_total{pipeline,sink}` counter — Kafka transaction commits/s. | Track exactly-once throughput. | +| ✅ Implemented | `deltaforge_sink_txn_aborts_total{pipeline,sink}` counter — Kafka transaction aborts/s. Should be ~0. | Detect fencing or broker issues. | +| ✅ Implemented | `deltaforge_sink_checkpoint_status{pipeline,sink}` gauge (1=ok, 0=behind). | Per-sink checkpoint health. | +| ✅ Implemented | `deltaforge_sink_last_checkpoint_ts{pipeline,sink}` epoch timestamp. | Per-sink checkpoint age. | | 🚧 Gap | Backpressure gauge for client buffers (rdkafka queue, Redis pipeline depth). | Early signal before errors occur. | -| 🚧 Gap | Drop/skip counters from processors/sinks. | Auditing and reconciliation. | ### Pipeline lifecycle @@ -49,7 +53,9 @@ The sections below call out concrete metrics and log events to add per component | --- | --- | --- | | ✅ Implemented | `deltaforge_pipeline_status{pipeline}` gauge reflecting the current lifecycle state of each pipeline. | Single gauge to alert on stopped or failed pipelines and drive dashboards. | | ✅ Implemented | `deltaforge_e2e_latency_seconds{pipeline}` histogram measuring wall-clock time from when an event was received by DeltaForge to when it was delivered to the sink. | Measures pipeline delivery latency independently of source clock precision. | -| 🚧 Gap | `deltaforge_replication_lag_seconds{pipeline,source}` gauge based on binlog/WAL event timestamp vs. wall clock. | Alert when the source is behind real time (slow producers, network issues). | +| ✅ Implemented | `deltaforge_source_lag_seconds{pipeline}` gauge — replication lag based on event timestamp vs. wall clock. | Alert when the source is behind real time. | +| ✅ Implemented | `deltaforge_checkpoints_total{pipeline}` counter — checkpoint commits/s. | Monitor checkpoint throughput. | +| ✅ Implemented | `deltaforge_last_checkpoint_ts{pipeline}` epoch timestamp — pipeline-level checkpoint age. | Alert on stale checkpoints. | #### `deltaforge_pipeline_status` value semantics @@ -81,6 +87,17 @@ E2E latency is measured from the wall-clock time the event was **received and pa The **replication lag** metric (separate from E2E latency) uses the binlog timestamp and measures how far behind the source is relative to real time — that one-second precision is acceptable for lag alerting. +### Dead Letter Queue + +| Status | Metric/log | Rationale | +| --- | --- | --- | +| ✅ Implemented | `deltaforge_dlq_events_total{pipeline,sink,error_kind}` counter. | Track rate of events routed to DLQ. | +| ✅ Implemented | `deltaforge_dlq_entries{pipeline}` gauge — current unacked entries. | Monitor DLQ backlog size. | +| ✅ Implemented | `deltaforge_dlq_saturation_ratio{pipeline}` gauge (0.0-1.0). | Alert at 80% (warning) and 95% (critical). | +| ✅ Implemented | `deltaforge_dlq_evicted_total{pipeline}` counter — entries lost to drop_oldest overflow. | Track data loss from overflow. | +| ✅ Implemented | `deltaforge_dlq_rejected_total{pipeline}` counter — entries lost to reject overflow. | Track data loss from rejection. | +| ✅ Implemented | `deltaforge_dlq_write_failures_total{pipeline}` counter — DLQ storage failures. | Alert on DLQ infrastructure issues. | + ### Control plane and health endpoints | Need | Suggested metric/log | Rationale | @@ -89,3 +106,40 @@ The **replication lag** metric (separate from E2E latency) uses the binlog times | Ready/Liveness transitions | Logs with pipeline counts and per-pipeline status when readiness changes. | Explain probe failures in log aggregation. | | Pipeline lifecycle counters | Counters for create/patch/stop/resume actions with success/error labels. | Auditable control-plane operations. | +## Grafana Dashboard + +A production-ready Grafana dashboard is included in the repository, optimized for fleet operations with hundreds of pipelines: + +**[Download: deltaforge.json](https://github.com/vnvo/deltaforge/blob/main/chaos/grafana/dashboards/deltaforge.json)** + +Import it via Grafana UI → Dashboards → Import → Upload JSON file. + +### What's included + +| Row | Panels | Purpose | +|-----|--------|---------| +| **Fleet Overview** | Running/unhealthy count, total events/s, total data/s, max lag, DLQ total, reconnects, txn aborts, sink errors | One-glance health across all pipelines | +| **Top Pipelines** | Top 10 laggiest, top 10 throughput, top 10 DLQ backlogs | Identify outliers without drowning in 300 series | +| **Throughput** | Aggregate events/s, per-pipeline events/s, data throughput | Capacity planning and anomaly detection | +| **Latency & Lag** | E2E latency p50/p95, source lag, per-table lag (top 10) | SLA monitoring, identify slow tables | +| **Checkpoints & EOS** | Per-sink status, commit rate, txn commits/aborts | Exactly-once health, checkpoint freshness | +| **Dead Letter Queue** | Entries, events/s, saturation, overflow rate | DLQ monitoring and alerting | +| **Errors & Reliability** | Sink errors, reconnects, pipeline state timeline | Incident detection | +| **Batching & Kafka** | Batch size, batch bytes, sink latency (collapsed) | Tuning reference | +| **Infrastructure** | Container CPU, memory (collapsed) | Resource monitoring | + +### Template variables + +The dashboard includes dropdown filters at the top: + +- **Instance** — select DeltaForge instances +- **Tenant** — filter by tenant (from `deltaforge_pipeline_info` metric) +- **Pipeline** — select specific pipelines +- **Sink** — filter by sink + +### Prerequisites + +- Prometheus scraping DeltaForge metrics on port 9000 (`/metrics`) +- Prometheus datasource configured in Grafana as "Prometheus" +- For container metrics: cAdvisor scraping enabled +