Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
508 changes: 175 additions & 333 deletions chaos/grafana/dashboards/deltaforge.json

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions crates/deltaforge-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,16 @@ pub struct Metadata {

/// Business oriented tenant identifier
pub tenant: String,

/// Key-value labels for filtering, grouping, and Grafana variables.
/// Example: `{"env": "prod", "team": "platform", "tier": "critical"}`
#[serde(default)]
pub labels: std::collections::HashMap<String, String>,

/// Free-form annotations for non-filtering metadata (docs, links, ownership).
/// Example: `{"owner": "team-platform@company.com", "runbook": "https://..."}`
#[serde(default)]
pub annotations: std::collections::HashMap<String, String>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down
40 changes: 33 additions & 7 deletions crates/rest-api/src/errors.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use axum::Json;
use axum::http::StatusCode;
use serde::Serialize;
use tracing::error;

#[derive(Debug)]
Expand All @@ -9,6 +11,16 @@ pub enum PipelineAPIError {
Failed(anyhow::Error),
}

/// Structured error response — parseable by automation and CLIs.
#[derive(Serialize)]
pub struct ApiError {
pub code: &'static str,
pub message: String,
}

/// Standard API result type used across all endpoint modules.
pub type ApiResult<T> = Result<Json<T>, (StatusCode, Json<ApiError>)>;

impl std::fmt::Display for PipelineAPIError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Expand Down Expand Up @@ -44,14 +56,28 @@ impl From<anyhow::Error> for PipelineAPIError {
}
}

pub fn pipeline_error(err: PipelineAPIError) -> (StatusCode, String) {
pub fn pipeline_error(err: PipelineAPIError) -> (StatusCode, Json<ApiError>) {
error!(error=?err, "pipeline lifecycle operation failed");
let status = match err {
PipelineAPIError::NotFound(_) => StatusCode::NOT_FOUND,
PipelineAPIError::AlreadyExists(_) => StatusCode::CONFLICT,
PipelineAPIError::NameMismatch { .. } => StatusCode::BAD_REQUEST,
PipelineAPIError::Failed(_) => StatusCode::INTERNAL_SERVER_ERROR,
let (status, code) = match &err {
PipelineAPIError::NotFound(_) => {
(StatusCode::NOT_FOUND, "PIPELINE_NOT_FOUND")
}
PipelineAPIError::AlreadyExists(_) => {
(StatusCode::CONFLICT, "PIPELINE_ALREADY_EXISTS")
}
PipelineAPIError::NameMismatch { .. } => {
(StatusCode::BAD_REQUEST, "PIPELINE_NAME_MISMATCH")
}
PipelineAPIError::Failed(_) => {
(StatusCode::INTERNAL_SERVER_ERROR, "INTERNAL_ERROR")
}
};

(status, err.to_string())
(
status,
Json(ApiError {
code,
message: err.to_string(),
}),
)
}
76 changes: 72 additions & 4 deletions crates/rest-api/src/health.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,31 @@ pub fn router(state: AppState) -> Router {
Router::new()
.route("/health", get(healthz))
.route("/ready", get(readyz))
.route("/log-level", get(get_log_level))
.route("/validate", axum::routing::post(validate_config))
.with_state(state)
}

async fn healthz(State(st): State<AppState>) -> impl IntoResponse {
let pipelines = st.controller.list().await;
if pipelines.iter().any(|p| p.status == "failed") {
return (StatusCode::SERVICE_UNAVAILABLE, "pipeline failed\n")
.into_response();
let failed: Vec<_> = pipelines
.iter()
.filter(|p| p.status == "failed")
.map(|p| p.name.clone())
.collect();

if !failed.is_empty() {
let body = serde_json::json!({
"status": "unhealthy",
"failed_pipelines": failed,
});
return (StatusCode::SERVICE_UNAVAILABLE, Json(body)).into_response();
}
(StatusCode::OK, "ok\n").into_response()
let body = serde_json::json!({
"status": "healthy",
"pipelines": pipelines.len(),
});
(StatusCode::OK, Json(body)).into_response()
}

#[derive(Serialize)]
Expand All @@ -37,3 +52,56 @@ async fn readyz(State(st): State<AppState>) -> Json<ReadyStatus> {
pipelines,
})
}

// ── Log level ────────────────────────────────────────────────────────────────

#[derive(Serialize)]
struct LogLevelResponse {
level: String,
}

async fn get_log_level() -> Json<LogLevelResponse> {
let level =
std::env::var("RUST_LOG").unwrap_or_else(|_| "info".to_string());
Json(LogLevelResponse { level })
}

// ── Config validation ────────────────────────────────────────────────────────

/// Validate a pipeline config without creating it. Accepts JSON body.
/// Returns {"valid": true, ...} or {"valid": false, "error": "..."}.
async fn validate_config(
Json(body): Json<serde_json::Value>,
) -> impl IntoResponse {
match serde_json::from_value::<deltaforge_config::PipelineSpec>(body) {
Ok(spec) => {
let name = &spec.metadata.name;
let source_type = match &spec.spec.source {
deltaforge_config::SourceCfg::Mysql(_) => "mysql",
deltaforge_config::SourceCfg::Postgres(_) => "postgres",
#[allow(unreachable_patterns)]
_ => "other",
};
let sink_count = spec.spec.sinks.len();

(
StatusCode::OK,
Json(serde_json::json!({
"valid": true,
"pipeline": name,
"source_type": source_type,
"sink_count": sink_count,
})),
)
.into_response()
}
Err(e) => (
StatusCode::BAD_REQUEST,
Json(serde_json::json!({
"valid": false,
"error": e.to_string(),
})),
)
.into_response(),
}
}
8 changes: 6 additions & 2 deletions crates/rest-api/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use axum::Router;
mod errors;
mod health;
mod pipelines;
pub mod pipelines;
mod schemas;
mod sensing;

Expand Down Expand Up @@ -196,6 +196,8 @@ mod tests {
metadata: Metadata {
name: "demo".to_string(),
tenant: "acme".to_string(),
labels: Default::default(),
annotations: Default::default(),
},
spec: Spec {
sharding: None,
Expand Down Expand Up @@ -229,6 +231,7 @@ mod tests {
journal: None,
},
},
ops: None,
}
}

Expand Down Expand Up @@ -259,7 +262,8 @@ mod tests {

assert_eq!(StatusCode::OK, resp.status());
let body = to_bytes(resp.into_body(), usize::MAX).await.unwrap();
assert_eq!(&body[..], b"ok\n");
let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
assert_eq!(json["status"], "healthy");

let ready = app
.oneshot(
Expand Down
92 changes: 88 additions & 4 deletions crates/rest-api/src/pipelines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,24 @@ pub struct PipeInfo {
pub name: String,
pub status: String,
pub spec: PipelineSpec,
/// Operational status — populated by the controller, optional for backward compat.
#[serde(skip_serializing_if = "Option::is_none")]
pub ops: Option<PipelineOpsStatus>,
}

/// Operational status fields — everything an operator needs in one response.
#[derive(Clone, Serialize, Deserialize, Default)]
pub struct PipelineOpsStatus {
/// Replication lag in seconds (source event time vs wall clock).
pub lag_seconds: Option<f64>,
/// DLQ entry count (0 if journal not enabled).
pub dlq_entries: u64,
/// Last error per sink (empty if all healthy).
pub sink_errors: std::collections::HashMap<String, String>,
/// Pipeline uptime in seconds since last start/restart.
pub uptime_seconds: Option<f64>,
/// Per-sink checkpoint positions.
pub checkpoints: Vec<CheckpointInfo>,
}

#[async_trait]
Expand Down Expand Up @@ -98,6 +116,25 @@ pub trait PipelineController: Send + Sync {
"DLQ not enabled for this pipeline"
)))
}

// ── Checkpoint inspection ──────────────────────────────────────────

/// Get per-sink checkpoint positions for a pipeline.
async fn checkpoints(
&self,
name: &str,
) -> Result<Vec<CheckpointInfo>, PipelineAPIError> {
let _ = name;
Ok(vec![])
}
}

/// Per-sink checkpoint position returned by the inspection API.
#[derive(Clone, Serialize, Deserialize)]
pub struct CheckpointInfo {
pub sink_id: String,
pub position: Value,
pub age_seconds: f64,
}

pub fn router(state: AppState) -> Router {
Expand All @@ -119,13 +156,43 @@ pub fn router(state: AppState) -> Router {
)
.route("/pipelines/{name}/journal/dlq/count", get(handle_dlq_count))
.route("/pipelines/{name}/journal/dlq/ack", post(handle_dlq_ack))
// Checkpoint inspection
.route("/pipelines/{name}/checkpoints", get(handle_checkpoints))
.with_state(state)
}

type ApiResult<T> = Result<Json<T>, (StatusCode, String)>;
use crate::errors::ApiResult;

#[derive(Deserialize, Default)]
struct ListPipelinesParams {
/// Filter by label: `?label=env:prod` or `?label=team:platform`.
/// Multiple labels: `?label=env:prod&label=team:platform` (AND logic).
#[serde(default)]
label: Vec<String>,
}

async fn list_pipelines(
State(st): State<AppState>,
Query(params): Query<ListPipelinesParams>,
) -> Json<Vec<PipeInfo>> {
let mut pipelines = st.controller.list().await;

// Filter by labels (AND logic — all specified labels must match).
if !params.label.is_empty() {
pipelines.retain(|p| {
let meta_labels = &p.spec.metadata.labels;
params.label.iter().all(|filter| {
if let Some((key, value)) = filter.split_once(':') {
meta_labels.get(key).map(|v| v == value).unwrap_or(false)
} else {
// Key-only filter: label exists regardless of value
meta_labels.contains_key(filter)
}
})
});
}

async fn list_pipelines(State(st): State<AppState>) -> Json<Vec<PipeInfo>> {
Json(st.controller.list().await)
Json(pipelines)
}

async fn get_pipeline(
Expand Down Expand Up @@ -198,7 +265,8 @@ async fn stop_pipeline(
async fn delete_pipeline(
State(st): State<AppState>,
Path(name): Path<String>,
) -> Result<StatusCode, (StatusCode, String)> {
) -> Result<StatusCode, (StatusCode, Json<crate::errors::ApiError>)> {
// delete returns StatusCode directly, not wrapped in ApiResult
st.controller
.delete(&name)
.await
Expand Down Expand Up @@ -314,6 +382,19 @@ async fn handle_dlq_purge(
Ok(Json(DlqPurgeResponse { purged }))
}

// ── Checkpoint inspection handler ────────────────────────────────────────────

async fn handle_checkpoints(
State(st): State<AppState>,
Path(name): Path<String>,
) -> ApiResult<Vec<CheckpointInfo>> {
st.controller
.checkpoints(&name)
.await
.map(Json)
.map_err(pipeline_error)
}

#[cfg(test)]
mod tests {
use super::*;
Expand All @@ -335,6 +416,8 @@ mod tests {
metadata: Metadata {
name: "demo".to_string(),
tenant: "acme".to_string(),
labels: Default::default(),
annotations: Default::default(),
},
spec: Spec {
sharding: None,
Expand Down Expand Up @@ -368,6 +451,7 @@ mod tests {
journal: None,
},
},
ops: None,
}
}

Expand Down
3 changes: 1 addition & 2 deletions crates/rest-api/src/schemas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use async_trait::async_trait;
use axum::{
Json, Router,
extract::{Path, State},
http::StatusCode,
routing::{get, post},
};
use chrono::{DateTime, Utc};
Expand Down Expand Up @@ -140,7 +139,7 @@ pub fn router(state: SchemaState) -> Router {
.with_state(state)
}

type ApiResult<T> = Result<Json<T>, (StatusCode, String)>;
use crate::errors::ApiResult;

async fn list_schemas(
State(st): State<SchemaState>,
Expand Down
5 changes: 2 additions & 3 deletions crates/rest-api/src/sensing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use async_trait::async_trait;
use axum::{
Json, Router,
extract::{Path, State},
http::StatusCode,
routing::get,
};
use chrono::{DateTime, Utc};
Expand Down Expand Up @@ -207,7 +206,7 @@ pub fn router(state: SensingState) -> Router {
.with_state(state)
}

type ApiResult<T> = Result<Json<T>, (StatusCode, String)>;
use crate::errors::ApiResult;

async fn list_inferred_schemas(
State(st): State<SensingState>,
Expand Down Expand Up @@ -280,7 +279,7 @@ mod tests {
use super::*;
use axum::{
body::{Body, to_bytes},
http::{Method, Request},
http::{Method, Request, StatusCode},
};
use tower::ServiceExt;

Expand Down
Loading
Loading