Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changelog/pr-180-clickhouse-query-safety.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
tidx: patch
---

Validate public ClickHouse queries, block system catalogs and dangerous table functions, enforce ClickHouse request timeouts, and validate view SELECT SQL before execution.
2 changes: 1 addition & 1 deletion src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ async fn handle_query_once(
})?;

clickhouse
.query(&params.sql, &sigs)
.query_user(&params.sql, &sigs, options.timeout_ms)
.await
.map(|r| QueryResult {
columns: r.columns,
Expand Down
4 changes: 3 additions & 1 deletion src/api/views.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use serde::{Deserialize, Serialize};
use std::net::SocketAddr;

use super::{ApiError, AppState};
use crate::query::EventSignature;
use crate::query::{EventSignature, validate_clickhouse_query};

/// Validate view name (alphanumeric + underscore only)
fn is_valid_view_name(name: &str) -> bool {
Expand Down Expand Up @@ -248,6 +248,8 @@ pub async fn create_view(
} else {
req.sql.clone()
};
validate_clickhouse_query(&sql)
.map_err(|e| ApiError::BadRequest(format!("Unsafe view SQL: {e}")))?;

// 1. Ensure database exists
let create_db = format!("CREATE DATABASE IF NOT EXISTS {}", database);
Expand Down
119 changes: 108 additions & 11 deletions src/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use tracing::{error, warn};

use crate::config::ClickHouseConfig;
use crate::query::{EventSignature, extract_raw_column_predicates};
use crate::query::{EventSignature, extract_raw_column_predicates, validate_clickhouse_query};

/// A single ClickHouse instance (connection + URL).
struct Instance {
Expand Down Expand Up @@ -90,6 +90,34 @@ impl ClickHouseEngine {
/// ClickHouse query errors (syntax, missing table, etc.) are returned
/// immediately.
pub async fn query(&self, sql: &str, signatures: &[&str]) -> Result<QueryResult> {
let sql = Self::prepare_query(sql, signatures)?;
self.execute_prepared_query(&sql, None).await
}

/// Execute a public user query after applying signature rewrites, SQL
/// validation, and caller-provided timeout limits.
pub async fn query_user(
&self,
sql: &str,
signatures: &[&str],
timeout_ms: u64,
) -> Result<QueryResult> {
let sql = Self::prepare_query(sql, signatures)?;
validate_clickhouse_query(&sql)?;
self.execute_prepared_query(&sql, Some(timeout_ms)).await
}

pub async fn query_with_timeout(
&self,
sql: &str,
signatures: &[&str],
timeout_ms: u64,
) -> Result<QueryResult> {
let sql = Self::prepare_query(sql, signatures)?;
self.execute_prepared_query(&sql, Some(timeout_ms)).await
}

fn prepare_query(sql: &str, signatures: &[&str]) -> Result<String> {
let sql = if !signatures.is_empty() {
let sigs: Vec<EventSignature> = signatures
.iter()
Expand All @@ -112,6 +140,14 @@ impl ClickHouseEngine {
sql.to_string()
};

Ok(sql)
}

async fn execute_prepared_query(
&self,
sql: &str,
timeout_ms: Option<u64>,
) -> Result<QueryResult> {
let start = std::time::Instant::now();
let n = self.instances.len();
let starting = self.active.load(Ordering::Relaxed);
Expand All @@ -120,7 +156,7 @@ impl ClickHouseEngine {
let idx = (starting + attempt) % n;
let inst = &self.instances[idx];

match self.try_query(inst, &sql, start).await {
match self.try_query(inst, sql, start, timeout_ms).await {
Ok(result) => {
if attempt > 0 {
self.active.store(idx, Ordering::Relaxed);
Expand Down Expand Up @@ -148,29 +184,51 @@ impl ClickHouseEngine {
Err(anyhow!("All ClickHouse instances unreachable"))
}

fn query_url(&self, inst: &Instance, timeout_ms: Option<u64>) -> String {
let base = format!(
"{}/?database={}&default_format=JSON",
inst.url.trim_end_matches('/'),
self.database
);
if let Some(timeout_ms) = timeout_ms {
let max_execution_time = timeout_ms.div_ceil(1000).max(1);
format!("{base}&max_execution_time={max_execution_time}")
} else {
base
}
}

async fn try_query(
&self,
inst: &Instance,
sql: &str,
start: std::time::Instant,
timeout_ms: Option<u64>,
) -> Result<QueryResult> {
let url = format!(
"{}/?database={}&default_format=JSON",
inst.url.trim_end_matches('/'),
self.database
);
let url = self.query_url(inst, timeout_ms);

let request_timeout = timeout_ms
.map(|timeout_ms| std::time::Duration::from_millis(timeout_ms.saturating_add(100)));
let mut req = inst.http_client.post(&url).body(sql.to_string());
if let Some(timeout) = request_timeout {
req = req.timeout(timeout);
}
if let Some(ref user) = inst.user {
req = req.header("X-ClickHouse-User", user);
}
if let Some(ref password) = inst.password {
req = req.header("X-ClickHouse-Key", password);
}
let resp = req
.send()
.await
.map_err(|e| anyhow!("ClickHouse HTTP request failed: {e}"))?;
let send = req.send();
let resp = if let Some(timeout) = request_timeout {
tokio::time::timeout(timeout, send)
.await
.map_err(|_| anyhow!("ClickHouse query timed out"))?
.map_err(|e| anyhow!("ClickHouse HTTP request failed: {e}"))?
} else {
send.await
.map_err(|e| anyhow!("ClickHouse HTTP request failed: {e}"))?
};

if !resp.status().is_success() {
let error_text = resp.text().await.unwrap_or_default();
Expand Down Expand Up @@ -338,4 +396,43 @@ mod tests {
let engine = ClickHouseEngine::new(&config, 4217).unwrap();
assert_eq!(engine.database(), "tidx_4217");
}

#[test]
fn test_internal_query_url_omits_timeout() {
let config = ClickHouseConfig {
enabled: true,
url: "http://clickhouse-1:8123".to_string(),
failover_urls: vec![],
database: None,
..Default::default()
};

let engine = ClickHouseEngine::new(&config, 4217).unwrap();
let url = engine.query_url(&engine.instances[0], None);

assert_eq!(
url,
"http://clickhouse-1:8123/?database=tidx_4217&default_format=JSON"
);
assert!(!url.contains("max_execution_time"));
}

#[test]
fn test_user_query_url_sets_ceiled_timeout_seconds() {
let config = ClickHouseConfig {
enabled: true,
url: "http://clickhouse-1:8123".to_string(),
failover_urls: vec![],
database: None,
..Default::default()
};

let engine = ClickHouseEngine::new(&config, 4217).unwrap();
let url = engine.query_url(&engine.instances[0], Some(1_001));

assert_eq!(
url,
"http://clickhouse-1:8123/?database=tidx_4217&default_format=JSON&max_execution_time=2"
);
}
}
2 changes: 1 addition & 1 deletion src/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ pub use parser::{
extract_group_by_columns, extract_order_by_columns, extract_raw_column_predicates,
};
pub use router::QueryEngine;
pub use validator::{HARD_LIMIT_MAX, validate_query};
pub use validator::{HARD_LIMIT_MAX, validate_clickhouse_query, validate_query};

use regex_lite::Regex;
use std::sync::LazyLock;
Expand Down
Loading
Loading