diff --git a/src/engine.rs b/src/engine.rs index e442561..ba9cb32 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -334,7 +334,7 @@ impl RuntimeEngine { let server = config.liquid_cache.server_address.as_ref().ok_or_else(|| { anyhow::anyhow!("liquid cache enabled but server_address not set") })?; - info!("Enabling liquid cache with server: {}", server); + info!(server = %server, "Enabling liquid cache"); builder = builder.liquid_cache_server(server.clone()); } @@ -477,7 +477,7 @@ impl RuntimeEngine { let cache_prefix = self.storage.cache_prefix(connection_id); if let Err(e) = self.storage.delete_prefix(&cache_prefix).await { - warn!("Failed to delete cache prefix {}: {}", cache_prefix, e); + warn!(prefix = %cache_prefix, error = %e, "Failed to delete cache prefix"); } Ok(()) @@ -488,7 +488,7 @@ impl RuntimeEngine { // Delete versioned cache directory if it exists if let Some(parquet_path) = &table_info.parquet_path { if let Err(e) = self.storage.delete_prefix(parquet_path).await { - warn!("Failed to delete cache directory {}: {}", parquet_path, e); + warn!(path = %parquet_path, error = %e, "Failed to delete cache directory"); } } @@ -543,7 +543,7 @@ impl RuntimeEngine { // via the async ConnectionsCatalogList. No pre-registration with DataFusion needed. tracing::Span::current().record("runtimedb.connection_id", &connection_id); - info!("Connection '{}' registered (discovery pending)", name); + info!(connection = %name, id = %connection_id, "Connection registered (discovery pending)"); Ok(connection_id) } @@ -616,7 +616,7 @@ impl RuntimeEngine { ) )] pub async fn execute_query(&self, sql: &str) -> Result { - info!("Executing query: {}", sql); + info!(sql = %sql, "Executing query"); let start = Instant::now(); if crate::telemetry::include_sql_in_traces() { tracing::Span::current().record("runtimedb.sql", sql); @@ -627,7 +627,7 @@ impl RuntimeEngine { // Step 1: Parse SQL into a statement let dialect = session_state.config().options().sql_parser.dialect; let statement = session_state.sql_to_statement(sql, &dialect).map_err(|e| { - error!("SQL parse error: {}", e); + error!(error = %e, "SQL parse error"); e })?; @@ -635,7 +635,7 @@ impl RuntimeEngine { let references = session_state .resolve_table_references(&statement) .map_err(|e| { - error!("Failed to resolve table references: {}", e); + error!(error = %e, "Failed to resolve table references"); e })?; @@ -649,7 +649,7 @@ impl RuntimeEngine { .instrument(tracing::info_span!("resolve_catalogs")) .await .map_err(|e| { - error!("Failed to resolve catalogs: {}", e); + error!(error = %e, "Failed to resolve catalogs"); e })?; @@ -666,7 +666,7 @@ impl RuntimeEngine { .instrument(tracing::info_span!("statement_to_plan")) .await .map_err(|e| { - error!("Planning error: {}", e); + error!(error = %e, "Planning error"); e })?; @@ -676,7 +676,7 @@ impl RuntimeEngine { // Execute physical plan and collect results let results = async { let results = df.collect().await.map_err(|e| { - error!("Error getting query result: {}", e); + error!(error = %e, "Error getting query result"); e })?; tracing::Span::current().record("runtimedb.batches_collected", results.len()); @@ -688,7 +688,7 @@ impl RuntimeEngine { )) .await?; - info!("Execution completed in {:?}", start.elapsed()); + info!(elapsed = ?start.elapsed(), "Execution completed"); let row_count: usize = results.iter().map(|b| b.num_rows()).sum(); tracing::Span::current().record("runtimedb.rows_returned", row_count); @@ -3000,7 +3000,8 @@ impl RuntimeEngineBuilder { // Build liquid-cache config if server is configured let liquid_cache_config = self.liquid_cache_server.map(|server| { - let store_configs: Vec<_> = storage.get_object_store_config().into_iter().collect(); + let store_configs: Vec<_> = + storage.get_object_store_config().into_iter().collect(); (server, store_configs) });