From 692d208c2712c2fab1f8374dd9132eb7ed676ab4 Mon Sep 17 00:00:00 2001 From: Brendan Ryan <1572504+brendanjryan@users.noreply.github.com> Date: Sun, 3 May 2026 13:40:43 -0700 Subject: [PATCH 1/5] fix: validate clickhouse queries --- src/api/mod.rs | 2 +- src/api/views.rs | 4 +- src/clickhouse.rs | 54 ++++- src/query/mod.rs | 2 +- src/query/validator.rs | 453 +++++++++++++++++++++++++++++++++++++++++ 5 files changed, 505 insertions(+), 10 deletions(-) diff --git a/src/api/mod.rs b/src/api/mod.rs index 4c1dc552..a7a781aa 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -412,7 +412,7 @@ async fn handle_query_once( })?; clickhouse - .query(¶ms.sql, &sigs) + .query_user(¶ms.sql, &sigs, options.timeout_ms) .await .map(|r| QueryResult { columns: r.columns, diff --git a/src/api/views.rs b/src/api/views.rs index 43559315..66ad0574 100644 --- a/src/api/views.rs +++ b/src/api/views.rs @@ -8,7 +8,7 @@ use serde::{Deserialize, Serialize}; use std::net::SocketAddr; use super::{ApiError, AppState}; -use crate::query::EventSignature; +use crate::query::{EventSignature, validate_clickhouse_query}; /// Validate view name (alphanumeric + underscore only) fn is_valid_view_name(name: &str) -> bool { @@ -248,6 +248,8 @@ pub async fn create_view( } else { req.sql.clone() }; + validate_clickhouse_query(&sql) + .map_err(|e| ApiError::BadRequest(format!("Unsafe view SQL: {e}")))?; // 1. Ensure database exists let create_db = format!("CREATE DATABASE IF NOT EXISTS {}", database); diff --git a/src/clickhouse.rs b/src/clickhouse.rs index 440ee208..8bb5cec0 100644 --- a/src/clickhouse.rs +++ b/src/clickhouse.rs @@ -12,7 +12,7 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use tracing::{error, warn}; use crate::config::ClickHouseConfig; -use crate::query::{EventSignature, extract_raw_column_predicates}; +use crate::query::{EventSignature, extract_raw_column_predicates, validate_clickhouse_query}; /// A single ClickHouse instance (connection + URL). struct Instance { @@ -69,6 +69,7 @@ impl ClickHouseEngine { ) -> Result { let http_client = reqwest::Client::builder() .pool_max_idle_per_host(4) + .timeout(std::time::Duration::from_secs(35)) .build() .map_err(|e| anyhow!("Failed to create HTTP client: {e}"))?; Ok(Instance { @@ -90,6 +91,33 @@ impl ClickHouseEngine { /// ClickHouse query errors (syntax, missing table, etc.) are returned /// immediately. pub async fn query(&self, sql: &str, signatures: &[&str]) -> Result { + self.query_with_timeout(sql, signatures, 30_000).await + } + + /// Execute a public user query after applying signature rewrites, SQL + /// validation, and caller-provided timeout limits. + pub async fn query_user( + &self, + sql: &str, + signatures: &[&str], + timeout_ms: u64, + ) -> Result { + let sql = self.prepare_query(sql, signatures)?; + validate_clickhouse_query(&sql)?; + self.execute_prepared_query(&sql, timeout_ms).await + } + + pub async fn query_with_timeout( + &self, + sql: &str, + signatures: &[&str], + timeout_ms: u64, + ) -> Result { + let sql = self.prepare_query(sql, signatures)?; + self.execute_prepared_query(&sql, timeout_ms).await + } + + fn prepare_query(&self, sql: &str, signatures: &[&str]) -> Result { let sql = if !signatures.is_empty() { let sigs: Vec = signatures .iter() @@ -112,6 +140,10 @@ impl ClickHouseEngine { sql.to_string() }; + Ok(sql) + } + + async fn execute_prepared_query(&self, sql: &str, timeout_ms: u64) -> Result { let start = std::time::Instant::now(); let n = self.instances.len(); let starting = self.active.load(Ordering::Relaxed); @@ -120,7 +152,7 @@ impl ClickHouseEngine { let idx = (starting + attempt) % n; let inst = &self.instances[idx]; - match self.try_query(inst, &sql, start).await { + match self.try_query(inst, sql, start, timeout_ms).await { Ok(result) => { if attempt > 0 { self.active.store(idx, Ordering::Relaxed); @@ -153,23 +185,31 @@ impl ClickHouseEngine { inst: &Instance, sql: &str, start: std::time::Instant, + timeout_ms: u64, ) -> Result { + let max_execution_time = ((timeout_ms + 999) / 1000).max(1); let url = format!( - "{}/?database={}&default_format=JSON", + "{}/?database={}&default_format=JSON&max_execution_time={}", inst.url.trim_end_matches('/'), - self.database + self.database, + max_execution_time ); - let mut req = inst.http_client.post(&url).body(sql.to_string()); + let request_timeout = std::time::Duration::from_millis(timeout_ms + 100); + let mut req = inst + .http_client + .post(&url) + .timeout(request_timeout) + .body(sql.to_string()); if let Some(ref user) = inst.user { req = req.header("X-ClickHouse-User", user); } if let Some(ref password) = inst.password { req = req.header("X-ClickHouse-Key", password); } - let resp = req - .send() + let resp = tokio::time::timeout(request_timeout, req.send()) .await + .map_err(|_| anyhow!("ClickHouse query timed out"))? .map_err(|e| anyhow!("ClickHouse HTTP request failed: {e}"))?; if !resp.status().is_success() { diff --git a/src/query/mod.rs b/src/query/mod.rs index 53b5fe1c..6ebdd602 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -7,7 +7,7 @@ pub use parser::{ extract_group_by_columns, extract_order_by_columns, extract_raw_column_predicates, }; pub use router::QueryEngine; -pub use validator::{HARD_LIMIT_MAX, validate_query}; +pub use validator::{HARD_LIMIT_MAX, validate_clickhouse_query, validate_query}; use regex_lite::Regex; use std::sync::LazyLock; diff --git a/src/query/validator.rs b/src/query/validator.rs index 43a9db29..27a4f4b0 100644 --- a/src/query/validator.rs +++ b/src/query/validator.rs @@ -56,6 +56,50 @@ pub fn validate_query(sql: &str) -> Result<()> { } } +/// Validates that a ClickHouse user query is safe to execute. +/// +/// This path is deny-list oriented because ClickHouse analytics commonly use +/// engine-specific scalar functions, but table functions, system catalogs, and +/// multi-statement/non-SELECT queries are never needed for public queries. +pub fn validate_clickhouse_query(sql: &str) -> Result<()> { + if sql.len() > MAX_QUERY_LENGTH { + return Err(anyhow!( + "Query too large ({} bytes, max {})", + sql.len(), + MAX_QUERY_LENGTH + )); + } + + let dialect = sqlparser::dialect::ClickHouseDialect {}; + let statements = + Parser::parse_sql(&dialect, sql).map_err(|e| anyhow!("SQL parse error: {e}"))?; + + if statements.is_empty() { + return Err(anyhow!("Empty query")); + } + if statements.len() > 1 { + return Err(anyhow!("Multiple statements not allowed")); + } + + match &statements[0] { + Statement::Query(query) => { + let cte_names = HashSet::new(); + validate_clickhouse_query_ast(query, &cte_names, 0) + } + _ => Err(anyhow!("Only SELECT queries are allowed")), + } +} + +fn extract_cte_names(query: &Query) -> HashSet { + let mut names = HashSet::new(); + if let Some(with) = &query.with { + for cte in &with.cte_tables { + names.insert(cte.alias.name.value.to_lowercase()); + } + } + names +} + fn validate_query_ast(query: &Query, cte_names: &HashSet, depth: usize) -> Result<()> { if depth > MAX_SUBQUERY_DEPTH { return Err(anyhow!( @@ -163,6 +207,368 @@ fn validate_limit_expr(expr: &Expr, context: &str) -> Result<()> { } } +const CLICKHOUSE_BLOCKED_SCHEMAS: &[&str] = &["system", "information_schema"]; +const CLICKHOUSE_DANGEROUS_FUNCTIONS: &[&str] = &[ + "url", + "file", + "s3", + "remote", + "remotesecure", + "mysql", + "postgresql", + "jdbc", + "odbc", + "hdfs", + "mongodb", + "redis", +]; + +fn validate_clickhouse_query_ast( + query: &Query, + cte_names: &HashSet, + depth: usize, +) -> Result<()> { + if depth > MAX_SUBQUERY_DEPTH { + return Err(anyhow!( + "Subquery nesting too deep (max {} levels)", + MAX_SUBQUERY_DEPTH + )); + } + + if query.fetch.is_some() || !query.locks.is_empty() { + return Err(anyhow!("Unsupported query clause")); + } + + let mut all_cte_names = cte_names.clone(); + if let Some(with) = &query.with { + if with.recursive { + return Err(anyhow!("Recursive CTEs are not allowed")); + } + for cte in &with.cte_tables { + validate_clickhouse_query_ast(&cte.query, &all_cte_names, depth + 1)?; + all_cte_names.insert(cte.alias.name.value.to_lowercase()); + } + } + + validate_clickhouse_set_expr(&query.body, &all_cte_names, depth)?; + + if let Some(order_by) = &query.order_by { + if let sqlparser::ast::OrderByKind::Expressions(exprs) = &order_by.kind { + for order_expr in exprs { + validate_clickhouse_expr(&order_expr.expr, &all_cte_names, depth)?; + } + } + } + + Ok(()) +} + +fn validate_clickhouse_set_expr( + set_expr: &SetExpr, + cte_names: &HashSet, + depth: usize, +) -> Result<()> { + match set_expr { + SetExpr::Select(select) => { + if select.into.is_some() { + return Err(anyhow!("SELECT INTO is not allowed")); + } + for table in &select.from { + validate_clickhouse_table_with_joins(table, cte_names, depth)?; + } + for item in &select.projection { + if let sqlparser::ast::SelectItem::UnnamedExpr(expr) + | sqlparser::ast::SelectItem::ExprWithAlias { expr, .. } = item + { + validate_clickhouse_expr(expr, cte_names, depth)?; + } + } + if let Some(selection) = &select.selection { + validate_clickhouse_expr(selection, cte_names, depth)?; + } + if let sqlparser::ast::GroupByExpr::Expressions(exprs, _) = &select.group_by { + for expr in exprs { + validate_clickhouse_expr(expr, cte_names, depth)?; + } + } + if let Some(having) = &select.having { + validate_clickhouse_expr(having, cte_names, depth)?; + } + Ok(()) + } + SetExpr::Query(query) => validate_clickhouse_query_ast(query, cte_names, depth + 1), + SetExpr::SetOperation { left, right, .. } => { + validate_clickhouse_set_expr(left, cte_names, depth + 1)?; + validate_clickhouse_set_expr(right, cte_names, depth + 1) + } + SetExpr::Values(values) => { + for row in &values.rows { + for expr in row { + validate_clickhouse_expr(expr, cte_names, depth)?; + } + } + Ok(()) + } + _ => Err(anyhow!("Only SELECT queries are allowed")), + } +} + +fn validate_clickhouse_table_with_joins( + table: &TableWithJoins, + cte_names: &HashSet, + depth: usize, +) -> Result<()> { + validate_clickhouse_table_factor(&table.relation, cte_names, depth)?; + for join in &table.joins { + validate_clickhouse_table_factor(&join.relation, cte_names, depth)?; + match &join.join_operator { + sqlparser::ast::JoinOperator::Join(sqlparser::ast::JoinConstraint::On(expr)) + | sqlparser::ast::JoinOperator::Inner(sqlparser::ast::JoinConstraint::On(expr)) + | sqlparser::ast::JoinOperator::Left(sqlparser::ast::JoinConstraint::On(expr)) + | sqlparser::ast::JoinOperator::LeftOuter(sqlparser::ast::JoinConstraint::On(expr)) + | sqlparser::ast::JoinOperator::Right(sqlparser::ast::JoinConstraint::On(expr)) + | sqlparser::ast::JoinOperator::RightOuter(sqlparser::ast::JoinConstraint::On(expr)) + | sqlparser::ast::JoinOperator::FullOuter(sqlparser::ast::JoinConstraint::On(expr)) + | sqlparser::ast::JoinOperator::CrossJoin(sqlparser::ast::JoinConstraint::On(expr)) => { + validate_clickhouse_expr(expr, cte_names, depth)?; + } + _ => {} + } + } + Ok(()) +} + +fn validate_clickhouse_table_factor( + factor: &TableFactor, + cte_names: &HashSet, + depth: usize, +) -> Result<()> { + match factor { + TableFactor::Table { + name, + args, + sample, + with_ordinality, + .. + } => { + if args.is_some() || sample.is_some() || *with_ordinality { + return Err(anyhow!("Table functions are not allowed")); + } + validate_clickhouse_table_name(name, cte_names) + } + TableFactor::Derived { subquery, .. } => { + validate_clickhouse_query_ast(subquery, cte_names, depth + 1) + } + TableFactor::NestedJoin { + table_with_joins, .. + } => validate_clickhouse_table_with_joins(table_with_joins, cte_names, depth), + TableFactor::TableFunction { .. } | TableFactor::Function { .. } => { + Err(anyhow!("Table functions are not allowed")) + } + _ => Err(anyhow!("Unsupported FROM clause type")), + } +} + +fn validate_clickhouse_table_name(name: &ObjectName, cte_names: &HashSet) -> Result<()> { + let parts: Vec = name + .0 + .iter() + .map(|part| { + part.as_ident() + .map(|ident| ident.value.to_lowercase()) + .ok_or_else(|| anyhow!("Unsupported table identifier")) + }) + .collect::>()?; + + if parts.len() > 1 && CLICKHOUSE_BLOCKED_SCHEMAS.contains(&parts[0].as_str()) { + return Err(anyhow!( + "Access to ClickHouse system schema '{}' is not allowed", + parts[0] + )); + } + + let bare_name = parts.last().cloned().unwrap_or_default(); + if ALLOWED_TABLES.contains(&bare_name.as_str()) { + return Ok(()); + } + if parts.len() == 1 && cte_names.contains(&bare_name) { + return Ok(()); + } + + Err(anyhow!("Access to table '{bare_name}' is not allowed")) +} + +fn validate_clickhouse_expr(expr: &Expr, cte_names: &HashSet, depth: usize) -> Result<()> { + match expr { + Expr::Identifier(_) + | Expr::CompoundIdentifier(_) + | Expr::Value(_) + | Expr::TypedString(_) + | Expr::Wildcard(_) + | Expr::QualifiedWildcard(_, _) => Ok(()), + Expr::Function(func) => validate_clickhouse_function(func, cte_names, depth), + Expr::Subquery(query) => validate_clickhouse_query_ast(query, cte_names, depth + 1), + Expr::InSubquery { expr, subquery, .. } => { + validate_clickhouse_expr(expr, cte_names, depth)?; + validate_clickhouse_query_ast(subquery, cte_names, depth + 1) + } + Expr::Exists { subquery, .. } => { + validate_clickhouse_query_ast(subquery, cte_names, depth + 1) + } + Expr::BinaryOp { left, right, .. } => { + validate_clickhouse_expr(left, cte_names, depth)?; + validate_clickhouse_expr(right, cte_names, depth) + } + Expr::UnaryOp { expr, .. } | Expr::Nested(expr) | Expr::Cast { expr, .. } => { + validate_clickhouse_expr(expr, cte_names, depth) + } + Expr::Between { + expr, low, high, .. + } => { + validate_clickhouse_expr(expr, cte_names, depth)?; + validate_clickhouse_expr(low, cte_names, depth)?; + validate_clickhouse_expr(high, cte_names, depth) + } + Expr::InList { expr, list, .. } => { + validate_clickhouse_expr(expr, cte_names, depth)?; + for item in list { + validate_clickhouse_expr(item, cte_names, depth)?; + } + Ok(()) + } + Expr::Case { + operand, + conditions, + else_result, + .. + } => { + if let Some(operand) = operand { + validate_clickhouse_expr(operand, cte_names, depth)?; + } + for case_when in conditions { + validate_clickhouse_expr(&case_when.condition, cte_names, depth)?; + validate_clickhouse_expr(&case_when.result, cte_names, depth)?; + } + if let Some(else_result) = else_result { + validate_clickhouse_expr(else_result, cte_names, depth)?; + } + Ok(()) + } + Expr::Like { expr, pattern, .. } | Expr::ILike { expr, pattern, .. } => { + validate_clickhouse_expr(expr, cte_names, depth)?; + validate_clickhouse_expr(pattern, cte_names, depth) + } + Expr::Substring { + expr, + substring_from, + substring_for, + .. + } => { + validate_clickhouse_expr(expr, cte_names, depth)?; + if let Some(from) = substring_from { + validate_clickhouse_expr(from, cte_names, depth)?; + } + if let Some(for_expr) = substring_for { + validate_clickhouse_expr(for_expr, cte_names, depth)?; + } + Ok(()) + } + Expr::Trim { + expr, trim_what, .. + } => { + validate_clickhouse_expr(expr, cte_names, depth)?; + if let Some(trim_what) = trim_what { + validate_clickhouse_expr(trim_what, cte_names, depth)?; + } + Ok(()) + } + Expr::Extract { expr, .. } + | Expr::Ceil { expr, .. } + | Expr::Floor { expr, .. } + | Expr::Collate { expr, .. } => validate_clickhouse_expr(expr, cte_names, depth), + Expr::IsNull(expr) + | Expr::IsNotNull(expr) + | Expr::IsTrue(expr) + | Expr::IsFalse(expr) + | Expr::IsNotTrue(expr) + | Expr::IsNotFalse(expr) + | Expr::IsUnknown(expr) + | Expr::IsNotUnknown(expr) => validate_clickhouse_expr(expr, cte_names, depth), + Expr::Tuple(exprs) => { + for expr in exprs { + validate_clickhouse_expr(expr, cte_names, depth)?; + } + Ok(()) + } + Expr::Array(arr) => { + for expr in &arr.elem { + validate_clickhouse_expr(expr, cte_names, depth)?; + } + Ok(()) + } + _ => Err(anyhow!("Unsupported expression type")), + } +} + +fn validate_clickhouse_function( + func: &Function, + cte_names: &HashSet, + depth: usize, +) -> Result<()> { + let func_name = func.name.to_string().to_lowercase(); + let bare_name = func_name.rsplit('.').next().unwrap_or(&func_name); + if CLICKHOUSE_DANGEROUS_FUNCTIONS.contains(&bare_name) { + return Err(anyhow!("Function '{}' is not allowed", func_name)); + } + + if let FunctionArguments::List(arg_list) = &func.args { + for arg in &arg_list.args { + if let FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)) + | FunctionArg::Named { + arg: FunctionArgExpr::Expr(expr), + .. + } = arg + { + validate_clickhouse_expr(expr, cte_names, depth)?; + } + } + for clause in &arg_list.clauses { + match clause { + sqlparser::ast::FunctionArgumentClause::OrderBy(order_by) => { + for order_expr in order_by { + validate_clickhouse_expr(&order_expr.expr, cte_names, depth)?; + } + } + sqlparser::ast::FunctionArgumentClause::Limit(expr) => { + validate_limit_expr(expr, "FUNCTION LIMIT")?; + } + sqlparser::ast::FunctionArgumentClause::Having(bound) => { + validate_clickhouse_expr(&bound.1, cte_names, depth)?; + } + sqlparser::ast::FunctionArgumentClause::IgnoreOrRespectNulls(_) => {} + _ => return Err(anyhow!("Unsupported function clause")), + } + } + } + + if let Some(filter) = &func.filter { + validate_clickhouse_expr(filter, cte_names, depth)?; + } + for order_expr in &func.within_group { + validate_clickhouse_expr(&order_expr.expr, cte_names, depth)?; + } + if let Some(sqlparser::ast::WindowType::WindowSpec(spec)) = &func.over { + for expr in &spec.partition_by { + validate_clickhouse_expr(expr, cte_names, depth)?; + } + for order_expr in &spec.order_by { + validate_clickhouse_expr(&order_expr.expr, cte_names, depth)?; + } + } + + Ok(()) +} + fn validate_set_expr(set_expr: &SetExpr, cte_names: &HashSet, depth: usize) -> Result<()> { if depth > MAX_SUBQUERY_DEPTH { return Err(anyhow!( @@ -1184,4 +1590,51 @@ mod tests { let sql = format!("SELECT * FROM {sql} nested"); assert!(validate_query(&sql).is_err()); } + + #[test] + fn test_clickhouse_allows_expected_analytics_query() { + assert!( + validate_clickhouse_query( + "SELECT lower(substring(tx_hash, 3)), count() FROM logs GROUP BY tx_hash LIMIT 10" + ) + .is_ok() + ); + } + + #[test] + fn test_clickhouse_rejects_non_select_and_multiple_statements() { + assert!(validate_clickhouse_query("DROP TABLE logs").is_err()); + assert!(validate_clickhouse_query("SELECT * FROM logs; SELECT * FROM txs").is_err()); + } + + #[test] + fn test_clickhouse_rejects_system_tables() { + assert!(validate_clickhouse_query("SELECT * FROM system.tables").is_err()); + assert!(validate_clickhouse_query(r#"SELECT * FROM "system".tables"#).is_err()); + } + + #[test] + fn test_clickhouse_rejects_table_functions() { + assert!(validate_clickhouse_query("SELECT * FROM url('http://169.254.169.254/')").is_err()); + assert!(validate_clickhouse_query("SELECT * FROM file('/etc/passwd')").is_err()); + assert!(validate_clickhouse_query("SELECT * FROM s3('s3://bucket/key')").is_err()); + } + + #[test] + fn test_clickhouse_rejects_dangerous_scalar_functions() { + assert!(validate_clickhouse_query("SELECT url('http://example.com') FROM logs").is_err()); + assert!( + validate_clickhouse_query("SELECT remote('host', 'db', 'table') FROM logs").is_err() + ); + } + + #[test] + fn test_clickhouse_rejects_schema_qualified_cte_shadowing() { + assert!( + validate_clickhouse_query( + "WITH system AS (SELECT * FROM logs) SELECT * FROM system.tables" + ) + .is_err() + ); + } } From f440842e32ce9686ee4610af13f42142aa59f5b2 Mon Sep 17 00:00:00 2001 From: Brendan Ryan <1572504+brendanjryan@users.noreply.github.com> Date: Sun, 3 May 2026 13:54:52 -0700 Subject: [PATCH 2/5] docs: add clickhouse query changelog --- .changelog/pr-180-clickhouse-query-safety.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changelog/pr-180-clickhouse-query-safety.md diff --git a/.changelog/pr-180-clickhouse-query-safety.md b/.changelog/pr-180-clickhouse-query-safety.md new file mode 100644 index 00000000..54089b2e --- /dev/null +++ b/.changelog/pr-180-clickhouse-query-safety.md @@ -0,0 +1,5 @@ +--- +tidx: patch +--- + +Validate public ClickHouse queries, block system catalogs and dangerous table functions, enforce ClickHouse request timeouts, and validate view SELECT SQL before execution. From 3f37bfdc0d9b6a6706eb7f7665fbc5877e1b907d Mon Sep 17 00:00:00 2001 From: Brendan Ryan <1572504+brendanjryan@users.noreply.github.com> Date: Sun, 3 May 2026 17:33:22 -0700 Subject: [PATCH 3/5] fix: satisfy strict clickhouse clippy --- src/clickhouse.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/clickhouse.rs b/src/clickhouse.rs index 8bb5cec0..ea6d9d89 100644 --- a/src/clickhouse.rs +++ b/src/clickhouse.rs @@ -102,7 +102,7 @@ impl ClickHouseEngine { signatures: &[&str], timeout_ms: u64, ) -> Result { - let sql = self.prepare_query(sql, signatures)?; + let sql = Self::prepare_query(sql, signatures)?; validate_clickhouse_query(&sql)?; self.execute_prepared_query(&sql, timeout_ms).await } @@ -113,11 +113,11 @@ impl ClickHouseEngine { signatures: &[&str], timeout_ms: u64, ) -> Result { - let sql = self.prepare_query(sql, signatures)?; + let sql = Self::prepare_query(sql, signatures)?; self.execute_prepared_query(&sql, timeout_ms).await } - fn prepare_query(&self, sql: &str, signatures: &[&str]) -> Result { + fn prepare_query(sql: &str, signatures: &[&str]) -> Result { let sql = if !signatures.is_empty() { let sigs: Vec = signatures .iter() @@ -187,7 +187,7 @@ impl ClickHouseEngine { start: std::time::Instant, timeout_ms: u64, ) -> Result { - let max_execution_time = ((timeout_ms + 999) / 1000).max(1); + let max_execution_time = timeout_ms.div_ceil(1000).max(1); let url = format!( "{}/?database={}&default_format=JSON&max_execution_time={}", inst.url.trim_end_matches('/'), From d23e3feefc74cb3fa21828d9524df48db239d918 Mon Sep 17 00:00:00 2001 From: Brendan Ryan <1572504+brendanjryan@users.noreply.github.com> Date: Sun, 3 May 2026 18:16:38 -0700 Subject: [PATCH 4/5] fix: preserve internal clickhouse timeouts --- src/clickhouse.rs | 103 +++++++++++++++++++++++++++++++++++----------- 1 file changed, 80 insertions(+), 23 deletions(-) diff --git a/src/clickhouse.rs b/src/clickhouse.rs index ea6d9d89..b57dacb0 100644 --- a/src/clickhouse.rs +++ b/src/clickhouse.rs @@ -69,7 +69,6 @@ impl ClickHouseEngine { ) -> Result { let http_client = reqwest::Client::builder() .pool_max_idle_per_host(4) - .timeout(std::time::Duration::from_secs(35)) .build() .map_err(|e| anyhow!("Failed to create HTTP client: {e}"))?; Ok(Instance { @@ -91,7 +90,8 @@ impl ClickHouseEngine { /// ClickHouse query errors (syntax, missing table, etc.) are returned /// immediately. pub async fn query(&self, sql: &str, signatures: &[&str]) -> Result { - self.query_with_timeout(sql, signatures, 30_000).await + let sql = Self::prepare_query(sql, signatures)?; + self.execute_prepared_query(&sql, None).await } /// Execute a public user query after applying signature rewrites, SQL @@ -104,7 +104,7 @@ impl ClickHouseEngine { ) -> Result { let sql = Self::prepare_query(sql, signatures)?; validate_clickhouse_query(&sql)?; - self.execute_prepared_query(&sql, timeout_ms).await + self.execute_prepared_query(&sql, Some(timeout_ms)).await } pub async fn query_with_timeout( @@ -114,7 +114,7 @@ impl ClickHouseEngine { timeout_ms: u64, ) -> Result { let sql = Self::prepare_query(sql, signatures)?; - self.execute_prepared_query(&sql, timeout_ms).await + self.execute_prepared_query(&sql, Some(timeout_ms)).await } fn prepare_query(sql: &str, signatures: &[&str]) -> Result { @@ -143,7 +143,11 @@ impl ClickHouseEngine { Ok(sql) } - async fn execute_prepared_query(&self, sql: &str, timeout_ms: u64) -> Result { + async fn execute_prepared_query( + &self, + sql: &str, + timeout_ms: Option, + ) -> Result { let start = std::time::Instant::now(); let n = self.instances.len(); let starting = self.active.load(Ordering::Relaxed); @@ -180,37 +184,51 @@ impl ClickHouseEngine { Err(anyhow!("All ClickHouse instances unreachable")) } + fn query_url(&self, inst: &Instance, timeout_ms: Option) -> String { + let base = format!( + "{}/?database={}&default_format=JSON", + inst.url.trim_end_matches('/'), + self.database + ); + if let Some(timeout_ms) = timeout_ms { + let max_execution_time = timeout_ms.div_ceil(1000).max(1); + format!("{base}&max_execution_time={max_execution_time}") + } else { + base + } + } + async fn try_query( &self, inst: &Instance, sql: &str, start: std::time::Instant, - timeout_ms: u64, + timeout_ms: Option, ) -> Result { - let max_execution_time = timeout_ms.div_ceil(1000).max(1); - let url = format!( - "{}/?database={}&default_format=JSON&max_execution_time={}", - inst.url.trim_end_matches('/'), - self.database, - max_execution_time - ); + let url = self.query_url(inst, timeout_ms); - let request_timeout = std::time::Duration::from_millis(timeout_ms + 100); - let mut req = inst - .http_client - .post(&url) - .timeout(request_timeout) - .body(sql.to_string()); + let request_timeout = timeout_ms + .map(|timeout_ms| std::time::Duration::from_millis(timeout_ms.saturating_add(100))); + let mut req = inst.http_client.post(&url).body(sql.to_string()); + if let Some(timeout) = request_timeout { + req = req.timeout(timeout); + } if let Some(ref user) = inst.user { req = req.header("X-ClickHouse-User", user); } if let Some(ref password) = inst.password { req = req.header("X-ClickHouse-Key", password); } - let resp = tokio::time::timeout(request_timeout, req.send()) - .await - .map_err(|_| anyhow!("ClickHouse query timed out"))? - .map_err(|e| anyhow!("ClickHouse HTTP request failed: {e}"))?; + let send = req.send(); + let resp = if let Some(timeout) = request_timeout { + tokio::time::timeout(timeout, send) + .await + .map_err(|_| anyhow!("ClickHouse query timed out"))? + .map_err(|e| anyhow!("ClickHouse HTTP request failed: {e}"))? + } else { + send.await + .map_err(|e| anyhow!("ClickHouse HTTP request failed: {e}"))? + }; if !resp.status().is_success() { let error_text = resp.text().await.unwrap_or_default(); @@ -378,4 +396,43 @@ mod tests { let engine = ClickHouseEngine::new(&config, 4217).unwrap(); assert_eq!(engine.database(), "tidx_4217"); } + + #[test] + fn test_internal_query_url_omits_timeout() { + let config = ClickHouseConfig { + enabled: true, + url: "http://clickhouse-1:8123".to_string(), + failover_urls: vec![], + database: None, + ..Default::default() + }; + + let engine = ClickHouseEngine::new(&config, 4217).unwrap(); + let url = engine.query_url(&engine.instances[0], None); + + assert_eq!( + url, + "http://clickhouse-1:8123/?database=tidx_4217&default_format=JSON" + ); + assert!(!url.contains("max_execution_time")); + } + + #[test] + fn test_user_query_url_sets_ceiled_timeout_seconds() { + let config = ClickHouseConfig { + enabled: true, + url: "http://clickhouse-1:8123".to_string(), + failover_urls: vec![], + database: None, + ..Default::default() + }; + + let engine = ClickHouseEngine::new(&config, 4217).unwrap(); + let url = engine.query_url(&engine.instances[0], Some(1_001)); + + assert_eq!( + url, + "http://clickhouse-1:8123/?database=tidx_4217&default_format=JSON&max_execution_time=2" + ); + } } From 0eab9faf939d904efa01b5f8a2457825185e920a Mon Sep 17 00:00:00 2001 From: Brendan Ryan <1572504+brendanjryan@users.noreply.github.com> Date: Sun, 3 May 2026 18:19:52 -0700 Subject: [PATCH 5/5] fix: remove unused validator helper --- src/query/validator.rs | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/src/query/validator.rs b/src/query/validator.rs index 27a4f4b0..79559eba 100644 --- a/src/query/validator.rs +++ b/src/query/validator.rs @@ -90,16 +90,6 @@ pub fn validate_clickhouse_query(sql: &str) -> Result<()> { } } -fn extract_cte_names(query: &Query) -> HashSet { - let mut names = HashSet::new(); - if let Some(with) = &query.with { - for cte in &with.cte_tables { - names.insert(cte.alias.name.value.to_lowercase()); - } - } - names -} - fn validate_query_ast(query: &Query, cte_names: &HashSet, depth: usize) -> Result<()> { if depth > MAX_SUBQUERY_DEPTH { return Err(anyhow!(