Skip to content
Open
10 changes: 10 additions & 0 deletions migrations/postgres/v7.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
CREATE TABLE metrics_request_rollup_minute (
minute TIMESTAMPTZ NOT NULL,
path TEXT NOT NULL,
request_count INTEGER NOT NULL DEFAULT 0,
error_count INTEGER NOT NULL DEFAULT 0,
total_latency_ms INTEGER NOT NULL DEFAULT 0,
min_latency_ms INTEGER,
max_latency_ms INTEGER,
PRIMARY KEY (minute, path)
);
10 changes: 10 additions & 0 deletions migrations/sqlite/v7.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
CREATE TABLE metrics_request_rollup_minute (
minute TEXT NOT NULL,
path TEXT NOT NULL,
request_count INTEGER NOT NULL DEFAULT 0,
error_count INTEGER NOT NULL DEFAULT 0,
total_latency_ms INTEGER NOT NULL DEFAULT 0,
min_latency_ms INTEGER,
max_latency_ms INTEGER,
PRIMARY KEY (minute, path)
);
11 changes: 11 additions & 0 deletions src/catalog/caching_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1304,4 +1304,15 @@ impl CatalogManager for CachingCatalogManager {
self.cached_read(&key, || self.inner().list_dataset_table_names(&schema))
.await
}

async fn record_request_rollup_minute(
&self,
minute: &str,
path: &str,
bucket: &crate::metrics::RollupBucket,
) -> Result<()> {
self.inner()
.record_request_rollup_minute(minute, path, bucket)
.await
}
}
13 changes: 13 additions & 0 deletions src/catalog/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -872,4 +872,17 @@ pub trait CatalogManager: Debug + Send + Sync {

/// Delete a dataset by ID. Returns the deleted dataset if it existed.
async fn delete_dataset(&self, id: &str) -> Result<Option<DatasetInfo>>;

// Metrics methods

/// Insert or additively update a per-minute HTTP request rollup row.
///
/// `minute` is an ISO-8601 string truncated to the minute, e.g. `"2026-01-01T12:00:00Z"`.
/// All numeric fields are added to any existing row for the same (minute, path).
async fn record_request_rollup_minute(
&self,
minute: &str,
path: &str,
bucket: &crate::metrics::RollupBucket,
) -> Result<()>;
}
9 changes: 9 additions & 0 deletions src/catalog/mock_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,4 +443,13 @@ impl CatalogManager for MockCatalog {
async fn delete_dataset(&self, _id: &str) -> Result<Option<DatasetInfo>> {
Ok(None)
}

async fn record_request_rollup_minute(
&self,
_minute: &str,
_path: &str,
_bucket: &crate::metrics::RollupBucket,
) -> Result<()> {
Ok(())
}
}
29 changes: 29 additions & 0 deletions src/catalog/postgres_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1336,6 +1336,35 @@ impl CatalogManager for PostgresCatalogManager {

Ok(dataset)
}

async fn record_request_rollup_minute(
&self,
minute: &str,
path: &str,
bucket: &crate::metrics::RollupBucket,
) -> Result<()> {
sqlx::query(
"INSERT INTO metrics_request_rollup_minute
(minute, path, request_count, error_count, total_latency_ms, min_latency_ms, max_latency_ms)
VALUES ($1::timestamptz, $2, $3, $4, $5, $6, $7)
ON CONFLICT (minute, path) DO UPDATE SET
request_count = metrics_request_rollup_minute.request_count + EXCLUDED.request_count,
error_count = metrics_request_rollup_minute.error_count + EXCLUDED.error_count,
total_latency_ms = metrics_request_rollup_minute.total_latency_ms + EXCLUDED.total_latency_ms,
min_latency_ms = LEAST(metrics_request_rollup_minute.min_latency_ms, EXCLUDED.min_latency_ms),
max_latency_ms = GREATEST(metrics_request_rollup_minute.max_latency_ms, EXCLUDED.max_latency_ms)",
)
.bind(minute)
.bind(path)
.bind(bucket.request_count as i32)
.bind(bucket.error_count as i32)
.bind(bucket.total_latency_ms as i32)
.bind(bucket.min_latency_ms as i32)
.bind(bucket.max_latency_ms as i32)
.execute(self.backend.pool())
.await?;
Ok(())
}
}

impl Debug for PostgresCatalogManager {
Expand Down
29 changes: 29 additions & 0 deletions src/catalog/sqlite_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1447,6 +1447,35 @@ impl CatalogManager for SqliteCatalogManager {

Ok(dataset)
}

async fn record_request_rollup_minute(
&self,
minute: &str,
path: &str,
bucket: &crate::metrics::RollupBucket,
) -> Result<()> {
sqlx::query(
"INSERT INTO metrics_request_rollup_minute
(minute, path, request_count, error_count, total_latency_ms, min_latency_ms, max_latency_ms)
VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (minute, path) DO UPDATE SET
request_count = request_count + EXCLUDED.request_count,
error_count = error_count + EXCLUDED.error_count,
total_latency_ms = total_latency_ms + EXCLUDED.total_latency_ms,
min_latency_ms = MIN(min_latency_ms, EXCLUDED.min_latency_ms),
max_latency_ms = MAX(max_latency_ms, EXCLUDED.max_latency_ms)",
)
.bind(minute)
.bind(path)
.bind(bucket.request_count)
.bind(bucket.error_count)
.bind(bucket.total_latency_ms)
.bind(bucket.min_latency_ms)
.bind(bucket.max_latency_ms)
.execute(self.backend.pool())
.await?;
Ok(())
}
}

impl CatalogMigrations for SqliteMigrationBackend {
Expand Down
22 changes: 22 additions & 0 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::http::models::{
ConnectionRefreshResult, ConnectionSchemaError, RefreshWarning, SchemaRefreshResult,
TableRefreshError, TableRefreshResult,
};
use crate::metrics::{worker::MetricsWorker, MetricsEvent, METRICS_CHANNEL_CAPACITY};
use crate::secrets::{EncryptedCatalogBackend, SecretManager, ENCRYPTED_PROVIDER_TYPE};
use crate::source::Source;
use crate::storage::{FilesystemStorage, StorageManager};
Expand Down Expand Up @@ -173,6 +174,10 @@ pub struct RuntimeEngine {
persistence_tasks: Mutex<JoinSet<()>>,
/// Handle for the stale result cleanup worker task.
stale_result_cleanup_handle: Mutex<Option<tokio::task::JoinHandle<()>>>,
/// Sender half of the metrics event channel. Shared with the HTTP middleware.
pub metrics_sender: crate::metrics::MetricsSender,
/// Handle for the metrics worker background task.
metrics_worker_handle: Mutex<Option<tokio::task::JoinHandle<()>>>,
}

/// Build a reader-compatible schema where geometry (Binary) columns are replaced with Utf8.
Expand Down Expand Up @@ -1400,6 +1405,13 @@ impl RuntimeEngine {
let _ = tokio::time::timeout(Duration::from_secs(5), handle).await;
}

// Wait for the metrics worker to finish its final flush.
// The worker exits when the sender is dropped; we drop our clone here so the
// worker sees the close signal and flushes any buffered events before we return.
if let Some(handle) = self.metrics_worker_handle.lock().await.take() {
let _ = tokio::time::timeout(Duration::from_secs(5), handle).await;
}

// Wait for in-flight persistence tasks to complete
let mut tasks = self.persistence_tasks.lock().await;
let task_count = tasks.len();
Expand Down Expand Up @@ -3075,6 +3087,14 @@ impl RuntimeEngineBuilder {
self.stale_result_timeout,
);

// Start background metrics worker
let (metrics_sender, metrics_rx) =
tokio::sync::mpsc::channel::<MetricsEvent>(METRICS_CHANNEL_CAPACITY);
let metrics_worker_handle = {
let worker = MetricsWorker::new(metrics_rx, catalog.clone());
tokio::spawn(worker.run())
};

// Initialize catalog (starts warmup loop if configured)
catalog.init().await?;

Expand All @@ -3101,6 +3121,8 @@ impl RuntimeEngineBuilder {
persistence_semaphore: Arc::new(Semaphore::new(self.max_concurrent_persistence)),
persistence_tasks: Mutex::new(JoinSet::new()),
stale_result_cleanup_handle: Mutex::new(stale_result_cleanup_handle),
metrics_sender,
metrics_worker_handle: Mutex::new(Some(metrics_worker_handle)),
};

// Note: All catalogs (connections, datasets, runtimedb) are now resolved on-demand
Expand Down
5 changes: 5 additions & 0 deletions src/http/app_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::http::controllers::{
refresh_handler, update_dataset, update_saved_query, update_secret_handler, upload_file,
MAX_UPLOAD_SIZE,
};
use crate::http::metrics_middleware::metrics_middleware;
use crate::RuntimeEngine;
use axum::extract::DefaultBodyLimit;
use axum::http::{HeaderName, HeaderValue, Request};
Expand Down Expand Up @@ -160,6 +161,10 @@ impl AppServer {
.route(PATH_SAVED_QUERY_VERSIONS, get(list_saved_query_versions))
.route(PATH_SAVED_QUERY_EXECUTE, post(execute_saved_query))
.with_state(engine.clone())
.layer(middleware::from_fn_with_state(
engine.clone(),
metrics_middleware,
))
.layer(middleware::from_fn(trace_id_response_header))
.layer(
TraceLayer::new_for_http()
Expand Down
40 changes: 40 additions & 0 deletions src/http/metrics_middleware.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
use crate::metrics::MetricsEvent;
use crate::RuntimeEngine;
use axum::body::Body;
use axum::extract::{MatchedPath, Request, State};
use axum::middleware::Next;
use axum::response::Response;
use chrono::Utc;
use std::sync::Arc;
use std::time::Instant;

/// Axum middleware that captures per-request HTTP metrics.
///
/// Records start time before the handler runs, then after the response is ready
/// sends an [`MetricsEvent::HttpRequest`] to the metrics worker via a non-blocking
/// `try_send`. Events are silently dropped when the channel is full, so this
/// middleware adds no latency to the hot path.
pub async fn metrics_middleware(
State(engine): State<Arc<RuntimeEngine>>,
matched_path: Option<MatchedPath>,
request: Request<Body>,
next: Next,
) -> Response {
let start = Instant::now();
let method = request.method().to_string();
let path = matched_path
.map(|p| p.as_str().to_owned())
.unwrap_or_else(|| "/unknown".to_owned());

let response = next.run(request).await;

let _ = engine.metrics_sender.try_send(MetricsEvent::HttpRequest {
timestamp: Utc::now(),
path,
method,
status_code: response.status().as_u16(),
latency_ms: start.elapsed().as_millis() as u64,
});

response
}
1 change: 1 addition & 0 deletions src/http/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod app_server;
pub mod controllers;
pub mod error;
pub mod metrics_middleware;
pub mod models;
pub mod serialization;
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub mod datasets;
mod engine;
pub mod http;
pub mod id;
pub mod metrics;
pub mod secrets;
pub mod source;
pub mod storage;
Expand Down
62 changes: 62 additions & 0 deletions src/metrics/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
pub mod worker;

use chrono::{DateTime, Utc};
use tokio::sync::mpsc;

/// Channel capacity for the metrics event channel.
/// Events are silently dropped when the channel is full (hot path uses try_send).
pub const METRICS_CHANNEL_CAPACITY: usize = 1_000;

/// An HTTP request event captured by the metrics middleware.
pub enum MetricsEvent {
HttpRequest {
timestamp: DateTime<Utc>,
/// Matched route template, e.g. "/query" or "/connections/{connection_id}"
path: String,
method: String,
status_code: u16,
/// Wall-clock time from request receipt to response ready, in milliseconds
latency_ms: u64,
},
}

/// Convenience type alias for the metrics sender half.
pub type MetricsSender = mpsc::Sender<MetricsEvent>;

/// Accumulated metrics for a single (minute, path) bucket.
#[derive(Debug, Clone)]
pub struct RollupBucket {
pub request_count: i64,
pub error_count: i64,
pub total_latency_ms: i64,
pub min_latency_ms: i64,
pub max_latency_ms: i64,
}

impl RollupBucket {
pub fn new(latency_ms: u64, is_error: bool) -> Self {
let lat = latency_ms as i64;
Self {
request_count: 1,
error_count: if is_error { 1 } else { 0 },
total_latency_ms: lat,
min_latency_ms: lat,
max_latency_ms: lat,
}
}

pub fn accumulate(&mut self, latency_ms: u64, is_error: bool) {
let lat = latency_ms as i64;
self.request_count += 1;
if is_error {
self.error_count += 1;
}
self.total_latency_ms += lat;
if lat < self.min_latency_ms {
self.min_latency_ms = lat;
}
if lat > self.max_latency_ms {
self.max_latency_ms = lat;
}
}
}
Loading
Loading