diff --git a/migrations/postgres/v7.sql b/migrations/postgres/v7.sql new file mode 100644 index 0000000..a215724 --- /dev/null +++ b/migrations/postgres/v7.sql @@ -0,0 +1,10 @@ +CREATE TABLE metrics_request_rollup_minute ( + minute TIMESTAMPTZ NOT NULL, + path TEXT NOT NULL, + request_count INTEGER NOT NULL DEFAULT 0, + error_count INTEGER NOT NULL DEFAULT 0, + total_latency_ms INTEGER NOT NULL DEFAULT 0, + min_latency_ms INTEGER, + max_latency_ms INTEGER, + PRIMARY KEY (minute, path) +); diff --git a/migrations/sqlite/v7.sql b/migrations/sqlite/v7.sql new file mode 100644 index 0000000..97105bd --- /dev/null +++ b/migrations/sqlite/v7.sql @@ -0,0 +1,10 @@ +CREATE TABLE metrics_request_rollup_minute ( + minute TEXT NOT NULL, + path TEXT NOT NULL, + request_count INTEGER NOT NULL DEFAULT 0, + error_count INTEGER NOT NULL DEFAULT 0, + total_latency_ms INTEGER NOT NULL DEFAULT 0, + min_latency_ms INTEGER, + max_latency_ms INTEGER, + PRIMARY KEY (minute, path) +); diff --git a/src/catalog/caching_manager.rs b/src/catalog/caching_manager.rs index d832994..2a474b5 100644 --- a/src/catalog/caching_manager.rs +++ b/src/catalog/caching_manager.rs @@ -1304,4 +1304,15 @@ impl CatalogManager for CachingCatalogManager { self.cached_read(&key, || self.inner().list_dataset_table_names(&schema)) .await } + + async fn record_request_rollup_minute( + &self, + minute: &str, + path: &str, + bucket: &crate::metrics::RollupBucket, + ) -> Result<()> { + self.inner() + .record_request_rollup_minute(minute, path, bucket) + .await + } } diff --git a/src/catalog/manager.rs b/src/catalog/manager.rs index 3ef88ed..71b1291 100644 --- a/src/catalog/manager.rs +++ b/src/catalog/manager.rs @@ -872,4 +872,17 @@ pub trait CatalogManager: Debug + Send + Sync { /// Delete a dataset by ID. Returns the deleted dataset if it existed. async fn delete_dataset(&self, id: &str) -> Result>; + + // Metrics methods + + /// Insert or additively update a per-minute HTTP request rollup row. + /// + /// `minute` is an ISO-8601 string truncated to the minute, e.g. `"2026-01-01T12:00:00Z"`. + /// All numeric fields are added to any existing row for the same (minute, path). + async fn record_request_rollup_minute( + &self, + minute: &str, + path: &str, + bucket: &crate::metrics::RollupBucket, + ) -> Result<()>; } diff --git a/src/catalog/mock_catalog.rs b/src/catalog/mock_catalog.rs index 0d4f501..4bd6de3 100644 --- a/src/catalog/mock_catalog.rs +++ b/src/catalog/mock_catalog.rs @@ -443,4 +443,13 @@ impl CatalogManager for MockCatalog { async fn delete_dataset(&self, _id: &str) -> Result> { Ok(None) } + + async fn record_request_rollup_minute( + &self, + _minute: &str, + _path: &str, + _bucket: &crate::metrics::RollupBucket, + ) -> Result<()> { + Ok(()) + } } diff --git a/src/catalog/postgres_manager.rs b/src/catalog/postgres_manager.rs index ca2e65d..20ccf53 100644 --- a/src/catalog/postgres_manager.rs +++ b/src/catalog/postgres_manager.rs @@ -1336,6 +1336,35 @@ impl CatalogManager for PostgresCatalogManager { Ok(dataset) } + + async fn record_request_rollup_minute( + &self, + minute: &str, + path: &str, + bucket: &crate::metrics::RollupBucket, + ) -> Result<()> { + sqlx::query( + "INSERT INTO metrics_request_rollup_minute + (minute, path, request_count, error_count, total_latency_ms, min_latency_ms, max_latency_ms) + VALUES ($1::timestamptz, $2, $3, $4, $5, $6, $7) + ON CONFLICT (minute, path) DO UPDATE SET + request_count = metrics_request_rollup_minute.request_count + EXCLUDED.request_count, + error_count = metrics_request_rollup_minute.error_count + EXCLUDED.error_count, + total_latency_ms = metrics_request_rollup_minute.total_latency_ms + EXCLUDED.total_latency_ms, + min_latency_ms = LEAST(metrics_request_rollup_minute.min_latency_ms, EXCLUDED.min_latency_ms), + max_latency_ms = GREATEST(metrics_request_rollup_minute.max_latency_ms, EXCLUDED.max_latency_ms)", + ) + .bind(minute) + .bind(path) + .bind(bucket.request_count as i32) + .bind(bucket.error_count as i32) + .bind(bucket.total_latency_ms as i32) + .bind(bucket.min_latency_ms as i32) + .bind(bucket.max_latency_ms as i32) + .execute(self.backend.pool()) + .await?; + Ok(()) + } } impl Debug for PostgresCatalogManager { diff --git a/src/catalog/sqlite_manager.rs b/src/catalog/sqlite_manager.rs index 8d20685..fee0b3e 100644 --- a/src/catalog/sqlite_manager.rs +++ b/src/catalog/sqlite_manager.rs @@ -1447,6 +1447,35 @@ impl CatalogManager for SqliteCatalogManager { Ok(dataset) } + + async fn record_request_rollup_minute( + &self, + minute: &str, + path: &str, + bucket: &crate::metrics::RollupBucket, + ) -> Result<()> { + sqlx::query( + "INSERT INTO metrics_request_rollup_minute + (minute, path, request_count, error_count, total_latency_ms, min_latency_ms, max_latency_ms) + VALUES (?, ?, ?, ?, ?, ?, ?) + ON CONFLICT (minute, path) DO UPDATE SET + request_count = request_count + EXCLUDED.request_count, + error_count = error_count + EXCLUDED.error_count, + total_latency_ms = total_latency_ms + EXCLUDED.total_latency_ms, + min_latency_ms = MIN(min_latency_ms, EXCLUDED.min_latency_ms), + max_latency_ms = MAX(max_latency_ms, EXCLUDED.max_latency_ms)", + ) + .bind(minute) + .bind(path) + .bind(bucket.request_count) + .bind(bucket.error_count) + .bind(bucket.total_latency_ms) + .bind(bucket.min_latency_ms) + .bind(bucket.max_latency_ms) + .execute(self.backend.pool()) + .await?; + Ok(()) + } } impl CatalogMigrations for SqliteMigrationBackend { diff --git a/src/engine.rs b/src/engine.rs index a89e626..dd63cec 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -10,6 +10,7 @@ use crate::http::models::{ ConnectionRefreshResult, ConnectionSchemaError, RefreshWarning, SchemaRefreshResult, TableRefreshError, TableRefreshResult, }; +use crate::metrics::{worker::MetricsWorker, MetricsEvent, METRICS_CHANNEL_CAPACITY}; use crate::secrets::{EncryptedCatalogBackend, SecretManager, ENCRYPTED_PROVIDER_TYPE}; use crate::source::Source; use crate::storage::{FilesystemStorage, StorageManager}; @@ -173,6 +174,10 @@ pub struct RuntimeEngine { persistence_tasks: Mutex>, /// Handle for the stale result cleanup worker task. stale_result_cleanup_handle: Mutex>>, + /// Sender half of the metrics event channel. Shared with the HTTP middleware. + pub metrics_sender: crate::metrics::MetricsSender, + /// Handle for the metrics worker background task. + metrics_worker_handle: Mutex>>, } /// Build a reader-compatible schema where geometry (Binary) columns are replaced with Utf8. @@ -1400,6 +1405,13 @@ impl RuntimeEngine { let _ = tokio::time::timeout(Duration::from_secs(5), handle).await; } + // Wait for the metrics worker to finish its final flush. + // The worker exits when the sender is dropped; we drop our clone here so the + // worker sees the close signal and flushes any buffered events before we return. + if let Some(handle) = self.metrics_worker_handle.lock().await.take() { + let _ = tokio::time::timeout(Duration::from_secs(5), handle).await; + } + // Wait for in-flight persistence tasks to complete let mut tasks = self.persistence_tasks.lock().await; let task_count = tasks.len(); @@ -3075,6 +3087,14 @@ impl RuntimeEngineBuilder { self.stale_result_timeout, ); + // Start background metrics worker + let (metrics_sender, metrics_rx) = + tokio::sync::mpsc::channel::(METRICS_CHANNEL_CAPACITY); + let metrics_worker_handle = { + let worker = MetricsWorker::new(metrics_rx, catalog.clone()); + tokio::spawn(worker.run()) + }; + // Initialize catalog (starts warmup loop if configured) catalog.init().await?; @@ -3101,6 +3121,8 @@ impl RuntimeEngineBuilder { persistence_semaphore: Arc::new(Semaphore::new(self.max_concurrent_persistence)), persistence_tasks: Mutex::new(JoinSet::new()), stale_result_cleanup_handle: Mutex::new(stale_result_cleanup_handle), + metrics_sender, + metrics_worker_handle: Mutex::new(Some(metrics_worker_handle)), }; // Note: All catalogs (connections, datasets, runtimedb) are now resolved on-demand diff --git a/src/http/app_server.rs b/src/http/app_server.rs index 7b0813a..8d2fea5 100644 --- a/src/http/app_server.rs +++ b/src/http/app_server.rs @@ -9,6 +9,7 @@ use crate::http::controllers::{ refresh_handler, update_dataset, update_saved_query, update_secret_handler, upload_file, MAX_UPLOAD_SIZE, }; +use crate::http::metrics_middleware::metrics_middleware; use crate::RuntimeEngine; use axum::extract::DefaultBodyLimit; use axum::http::{HeaderName, HeaderValue, Request}; @@ -160,6 +161,10 @@ impl AppServer { .route(PATH_SAVED_QUERY_VERSIONS, get(list_saved_query_versions)) .route(PATH_SAVED_QUERY_EXECUTE, post(execute_saved_query)) .with_state(engine.clone()) + .layer(middleware::from_fn_with_state( + engine.clone(), + metrics_middleware, + )) .layer(middleware::from_fn(trace_id_response_header)) .layer( TraceLayer::new_for_http() diff --git a/src/http/metrics_middleware.rs b/src/http/metrics_middleware.rs new file mode 100644 index 0000000..393563f --- /dev/null +++ b/src/http/metrics_middleware.rs @@ -0,0 +1,40 @@ +use crate::metrics::MetricsEvent; +use crate::RuntimeEngine; +use axum::body::Body; +use axum::extract::{MatchedPath, Request, State}; +use axum::middleware::Next; +use axum::response::Response; +use chrono::Utc; +use std::sync::Arc; +use std::time::Instant; + +/// Axum middleware that captures per-request HTTP metrics. +/// +/// Records start time before the handler runs, then after the response is ready +/// sends an [`MetricsEvent::HttpRequest`] to the metrics worker via a non-blocking +/// `try_send`. Events are silently dropped when the channel is full, so this +/// middleware adds no latency to the hot path. +pub async fn metrics_middleware( + State(engine): State>, + matched_path: Option, + request: Request, + next: Next, +) -> Response { + let start = Instant::now(); + let method = request.method().to_string(); + let path = matched_path + .map(|p| p.as_str().to_owned()) + .unwrap_or_else(|| "/unknown".to_owned()); + + let response = next.run(request).await; + + let _ = engine.metrics_sender.try_send(MetricsEvent::HttpRequest { + timestamp: Utc::now(), + path, + method, + status_code: response.status().as_u16(), + latency_ms: start.elapsed().as_millis() as u64, + }); + + response +} diff --git a/src/http/mod.rs b/src/http/mod.rs index 33023ba..ee2340a 100644 --- a/src/http/mod.rs +++ b/src/http/mod.rs @@ -1,5 +1,6 @@ pub mod app_server; pub mod controllers; pub mod error; +pub mod metrics_middleware; pub mod models; pub mod serialization; diff --git a/src/lib.rs b/src/lib.rs index ae9599d..9ccbdfc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,6 +6,7 @@ pub mod datasets; mod engine; pub mod http; pub mod id; +pub mod metrics; pub mod secrets; pub mod source; pub mod storage; diff --git a/src/metrics/mod.rs b/src/metrics/mod.rs new file mode 100644 index 0000000..3135b9e --- /dev/null +++ b/src/metrics/mod.rs @@ -0,0 +1,62 @@ +pub mod worker; + +use chrono::{DateTime, Utc}; +use tokio::sync::mpsc; + +/// Channel capacity for the metrics event channel. +/// Events are silently dropped when the channel is full (hot path uses try_send). +pub const METRICS_CHANNEL_CAPACITY: usize = 1_000; + +/// An HTTP request event captured by the metrics middleware. +pub enum MetricsEvent { + HttpRequest { + timestamp: DateTime, + /// Matched route template, e.g. "/query" or "/connections/{connection_id}" + path: String, + method: String, + status_code: u16, + /// Wall-clock time from request receipt to response ready, in milliseconds + latency_ms: u64, + }, +} + +/// Convenience type alias for the metrics sender half. +pub type MetricsSender = mpsc::Sender; + +/// Accumulated metrics for a single (minute, path) bucket. +#[derive(Debug, Clone)] +pub struct RollupBucket { + pub request_count: i64, + pub error_count: i64, + pub total_latency_ms: i64, + pub min_latency_ms: i64, + pub max_latency_ms: i64, +} + +impl RollupBucket { + pub fn new(latency_ms: u64, is_error: bool) -> Self { + let lat = latency_ms as i64; + Self { + request_count: 1, + error_count: if is_error { 1 } else { 0 }, + total_latency_ms: lat, + min_latency_ms: lat, + max_latency_ms: lat, + } + } + + pub fn accumulate(&mut self, latency_ms: u64, is_error: bool) { + let lat = latency_ms as i64; + self.request_count += 1; + if is_error { + self.error_count += 1; + } + self.total_latency_ms += lat; + if lat < self.min_latency_ms { + self.min_latency_ms = lat; + } + if lat > self.max_latency_ms { + self.max_latency_ms = lat; + } + } +} diff --git a/src/metrics/worker.rs b/src/metrics/worker.rs new file mode 100644 index 0000000..17a3a2d --- /dev/null +++ b/src/metrics/worker.rs @@ -0,0 +1,193 @@ +use crate::catalog::CatalogManager; +use crate::metrics::{MetricsEvent, RollupBucket}; +use chrono::{DateTime, DurationRound, TimeDelta, Utc}; +use std::collections::HashMap; +use std::sync::Arc; +use tokio::sync::mpsc; +use tokio::time::{interval, Duration}; +use tracing::warn; + +/// Interval between metric flushes to the database. +const FLUSH_INTERVAL: Duration = Duration::from_secs(30); + +/// Background worker that receives metrics events, accumulates them in memory, +/// and periodically flushes rollup rows to the database. +/// +/// The in-memory map is cleared after every flush, so each event is written +/// exactly once. Multiple instances sharing the same catalog DB converge +/// correctly because the UPSERT is additive. +pub struct MetricsWorker { + receiver: mpsc::Receiver, + catalog: Arc, +} + +impl MetricsWorker { + pub fn new(receiver: mpsc::Receiver, catalog: Arc) -> Self { + Self { receiver, catalog } + } + + /// Run the worker until the sender side is dropped, then perform a final flush. + pub async fn run(mut self) { + let mut buckets: HashMap<(String, String), RollupBucket> = HashMap::new(); + let mut flush_interval = interval(FLUSH_INTERVAL); + // Skip the immediate first tick so we don't flush an empty map on startup. + flush_interval.tick().await; + + loop { + tokio::select! { + event = self.receiver.recv() => match event { + Some(e) => accumulate(e, &mut buckets), + None => { + // Channel closed — sender was dropped. Final flush then exit. + flush(&buckets, &self.catalog).await; + break; + } + }, + _ = flush_interval.tick() => { + flush(&buckets, &self.catalog).await; + buckets.clear(); + } + } + } + } +} + +fn truncate_to_minute(ts: DateTime) -> DateTime { + ts.duration_trunc(TimeDelta::minutes(1)).unwrap_or(ts) +} + +fn accumulate(event: MetricsEvent, buckets: &mut HashMap<(String, String), RollupBucket>) { + match event { + MetricsEvent::HttpRequest { + timestamp, + path, + method: _, + status_code, + latency_ms, + } => { + let minute = truncate_to_minute(timestamp) + .format("%Y-%m-%dT%H:%M:00Z") + .to_string(); + let is_error = status_code >= 400; + let key = (minute, path); + match buckets.get_mut(&key) { + Some(bucket) => bucket.accumulate(latency_ms, is_error), + None => { + buckets.insert(key, RollupBucket::new(latency_ms, is_error)); + } + } + } + } +} + +async fn flush( + buckets: &HashMap<(String, String), RollupBucket>, + catalog: &Arc, +) { + if buckets.is_empty() { + return; + } + + for ((minute, path), bucket) in buckets { + if let Err(e) = catalog + .record_request_rollup_minute(minute, path, bucket) + .await + { + warn!( + minute = %minute, + path = %path, + error = %e, + "Failed to flush metrics rollup" + ); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::catalog::MockCatalog; + use crate::metrics::METRICS_CHANNEL_CAPACITY; + use chrono::TimeZone; + use tokio::sync::mpsc; + + fn make_event(ts: DateTime, path: &str, status: u16, latency_ms: u64) -> MetricsEvent { + MetricsEvent::HttpRequest { + timestamp: ts, + path: path.to_owned(), + method: "GET".to_owned(), + status_code: status, + latency_ms, + } + } + + #[test] + fn accumulate_same_bucket_sums_correctly() { + let mut buckets = HashMap::new(); + let ts = Utc.with_ymd_and_hms(2026, 1, 1, 12, 0, 0).unwrap(); + + accumulate(make_event(ts, "/query", 200, 10), &mut buckets); + accumulate(make_event(ts, "/query", 200, 30), &mut buckets); + accumulate(make_event(ts, "/query", 500, 20), &mut buckets); + + assert_eq!(buckets.len(), 1); + let bucket = buckets.values().next().unwrap(); + assert_eq!(bucket.request_count, 3); + assert_eq!(bucket.error_count, 1); + assert_eq!(bucket.total_latency_ms, 60); + assert_eq!(bucket.min_latency_ms, 10); + assert_eq!(bucket.max_latency_ms, 30); + } + + #[test] + fn accumulate_different_paths_separate_buckets() { + let mut buckets = HashMap::new(); + let ts = Utc.with_ymd_and_hms(2026, 1, 1, 12, 0, 0).unwrap(); + + accumulate(make_event(ts, "/query", 200, 10), &mut buckets); + accumulate(make_event(ts, "/health", 200, 5), &mut buckets); + + assert_eq!(buckets.len(), 2); + } + + #[test] + fn accumulate_different_minutes_separate_buckets() { + let mut buckets = HashMap::new(); + let ts1 = Utc.with_ymd_and_hms(2026, 1, 1, 12, 0, 30).unwrap(); + let ts2 = Utc.with_ymd_and_hms(2026, 1, 1, 12, 1, 0).unwrap(); + + accumulate(make_event(ts1, "/query", 200, 10), &mut buckets); + accumulate(make_event(ts2, "/query", 200, 20), &mut buckets); + + assert_eq!(buckets.len(), 2); + } + + #[test] + fn flush_clears_map_on_second_flush() { + let mut buckets = HashMap::new(); + let ts = Utc.with_ymd_and_hms(2026, 1, 1, 12, 0, 0).unwrap(); + accumulate(make_event(ts, "/query", 200, 10), &mut buckets); + assert!(!buckets.is_empty()); + + // Simulate post-flush clear + buckets.clear(); + assert!(buckets.is_empty(), "map should be empty after flush-clear"); + } + + #[tokio::test] + async fn worker_exits_cleanly_when_channel_closes() { + let catalog = Arc::new(MockCatalog::new()); + let (tx, rx) = mpsc::channel::(METRICS_CHANNEL_CAPACITY); + let worker = MetricsWorker::new(rx, catalog); + + let handle = tokio::spawn(worker.run()); + + // Drop sender to signal shutdown + drop(tx); + + tokio::time::timeout(std::time::Duration::from_secs(5), handle) + .await + .expect("worker should exit within 5 seconds") + .expect("worker task should not panic"); + } +} diff --git a/tests/result_persistence_tests.rs b/tests/result_persistence_tests.rs index 154d14e..a66d8a7 100644 --- a/tests/result_persistence_tests.rs +++ b/tests/result_persistence_tests.rs @@ -404,6 +404,17 @@ impl CatalogManager for FailingCatalog { .list_saved_query_versions(saved_query_id, limit, offset) .await } + + async fn record_request_rollup_minute( + &self, + minute: &str, + path: &str, + bucket: &runtimedb::metrics::RollupBucket, + ) -> Result<()> { + self.inner + .record_request_rollup_minute(minute, path, bucket) + .await + } } async fn setup_test() -> Result<(AppServer, TempDir)> {