From 50f429598296bf49a1750a2f6c577a4bfa581349 Mon Sep 17 00:00:00 2001 From: AndreaBozzo Date: Mon, 22 Dec 2025 13:36:01 +0100 Subject: [PATCH 1/2] feat(sqllogictest): use serde derived structs for schedule parsing Replace manual TOML parsing with serde-derived structs to improve maintainability and separate parsing from engine instantiation. Changes: - Add ScheduleConfig, EngineConfig, and EngineType structs with serde derives - Use #[serde(flatten)] for forward-compatibility with future config fields - Refactor Schedule::from_file() to use toml::from_str() directly - Add instantiate_engines() to separate parsing from engine creation - Remove manual parse_engines() and parse_steps() functions - Update tests to verify deserialization behavior Closes #1952 --- crates/sqllogictest/src/engine/datafusion.rs | 11 +- crates/sqllogictest/src/engine/mod.rs | 77 +++++--- crates/sqllogictest/src/schedule.rs | 183 +++++++++++-------- 3 files changed, 168 insertions(+), 103 deletions(-) diff --git a/crates/sqllogictest/src/engine/datafusion.rs b/crates/sqllogictest/src/engine/datafusion.rs index e3402dfa97..021fd30054 100644 --- a/crates/sqllogictest/src/engine/datafusion.rs +++ b/crates/sqllogictest/src/engine/datafusion.rs @@ -27,9 +27,8 @@ use iceberg::spec::{NestedField, PrimitiveType, Schema, Transform, Type, Unbound use iceberg::{Catalog, CatalogBuilder, NamespaceIdent, TableCreation}; use iceberg_datafusion::IcebergCatalogProvider; use indicatif::ProgressBar; -use toml::Table as TomlTable; -use crate::engine::{EngineRunner, run_slt_with_runner}; +use crate::engine::{EngineConfig, EngineRunner, run_slt_with_runner}; use crate::error::Result; pub struct DataFusionEngine { @@ -59,12 +58,12 @@ impl EngineRunner for DataFusionEngine { } impl DataFusionEngine { - pub async fn new(config: TomlTable) -> Result { + pub async fn new(_name: &str, config: &EngineConfig) -> Result { let session_config = SessionConfig::new() .with_target_partitions(4) .with_information_schema(true); let ctx = SessionContext::new_with_config(session_config); - ctx.register_catalog("default", Self::create_catalog(&config).await?); + ctx.register_catalog("default", Self::create_catalog(&config.extra).await?); Ok(Self { test_data_path: PathBuf::from("testdata"), @@ -72,7 +71,9 @@ impl DataFusionEngine { }) } - async fn create_catalog(_: &TomlTable) -> anyhow::Result> { + async fn create_catalog( + _extra: &HashMap, + ) -> anyhow::Result> { // TODO: support dynamic catalog configuration // See: https://github.com/apache/iceberg-rust/issues/1780 let catalog = MemoryCatalogBuilder::default() diff --git a/crates/sqllogictest/src/engine/mod.rs b/crates/sqllogictest/src/engine/mod.rs index 724359fbe5..b2773bcbd5 100644 --- a/crates/sqllogictest/src/engine/mod.rs +++ b/crates/sqllogictest/src/engine/mod.rs @@ -17,29 +17,44 @@ mod datafusion; +use std::collections::HashMap; use std::path::Path; use anyhow::anyhow; +use serde::Deserialize; use sqllogictest::{AsyncDB, MakeConnection, Runner, parse_file}; -use toml::Table as TomlTable; use crate::engine::datafusion::DataFusionEngine; use crate::error::{Error, Result}; -const TYPE_DATAFUSION: &str = "datafusion"; +/// Supported engine types for sqllogictest +#[derive(Debug, Clone, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "lowercase")] +pub enum EngineType { + Datafusion, +} + +/// Configuration for a single engine instance +#[derive(Debug, Clone, Deserialize)] +pub struct EngineConfig { + /// The type of engine + #[serde(rename = "type")] + pub engine_type: EngineType, + + /// Additional configuration fields for extensibility + /// This allows forward-compatibility with future fields like catalog_type, catalog_properties + #[serde(flatten)] + pub extra: HashMap, +} #[async_trait::async_trait] pub trait EngineRunner: Send { async fn run_slt_file(&mut self, path: &Path) -> Result<()>; } -pub async fn load_engine_runner( - engine_type: &str, - cfg: TomlTable, -) -> Result> { - match engine_type { - TYPE_DATAFUSION => Ok(Box::new(DataFusionEngine::new(cfg).await?)), - _ => Err(anyhow::anyhow!("Unsupported engine type: {engine_type}").into()), +pub async fn load_engine_runner(name: &str, config: EngineConfig) -> Result> { + match config.engine_type { + EngineType::Datafusion => Ok(Box::new(DataFusionEngine::new(name, &config).await?)), } } @@ -65,29 +80,45 @@ where #[cfg(test)] mod tests { - use crate::engine::{TYPE_DATAFUSION, load_engine_runner}; + use std::collections::HashMap; - #[tokio::test] - async fn test_engine_invalid_type() { + use crate::engine::{EngineConfig, EngineType, load_engine_runner}; + + #[test] + fn test_deserialize_engine_config() { let input = r#" - [engines] - random = { type = "random_engine", url = "http://localhost:8181" } + type = "datafusion" "#; - let tbl = toml::from_str(input).unwrap(); - let result = load_engine_runner("random_engine", tbl).await; - assert!(result.is_err()); + let config: EngineConfig = toml::from_str(input).unwrap(); + assert_eq!(config.engine_type, EngineType::Datafusion); + assert!(config.extra.is_empty()); } - #[tokio::test] - async fn test_load_datafusion() { + #[test] + fn test_deserialize_engine_config_with_extras() { let input = r#" - [engines] - df = { type = "datafusion" } + type = "datafusion" + catalog_type = "rest" + + [catalog_properties] + uri = "http://localhost:8181" "#; - let tbl = toml::from_str(input).unwrap(); - let result = load_engine_runner(TYPE_DATAFUSION, tbl).await; + let config: EngineConfig = toml::from_str(input).unwrap(); + assert_eq!(config.engine_type, EngineType::Datafusion); + assert!(config.extra.contains_key("catalog_type")); + assert!(config.extra.contains_key("catalog_properties")); + } + + #[tokio::test] + async fn test_load_datafusion() { + let config = EngineConfig { + engine_type: EngineType::Datafusion, + extra: HashMap::new(), + }; + + let result = load_engine_runner("df", config).await; assert!(result.is_ok()); } } diff --git a/crates/sqllogictest/src/schedule.rs b/crates/sqllogictest/src/schedule.rs index 7c13ad4d12..2e359d7427 100644 --- a/crates/sqllogictest/src/schedule.rs +++ b/crates/sqllogictest/src/schedule.rs @@ -21,10 +21,18 @@ use std::path::{Path, PathBuf}; use anyhow::{Context, anyhow}; use serde::{Deserialize, Serialize}; -use toml::{Table as TomlTable, Value}; use tracing::info; -use crate::engine::{EngineRunner, load_engine_runner}; +use crate::engine::{EngineConfig, EngineRunner, load_engine_runner}; + +/// Raw configuration parsed from the schedule TOML file +#[derive(Debug, Clone, Deserialize)] +pub struct ScheduleConfig { + /// Engine name to engine configuration + pub engines: HashMap, + /// List of test steps to run + pub steps: Vec, +} pub struct Schedule { /// Engine names to engine instances @@ -59,15 +67,27 @@ impl Schedule { pub async fn from_file>(path: P) -> anyhow::Result { let path_str = path.as_ref().to_string_lossy().to_string(); let content = read_to_string(path)?; - let toml_value = content.parse::()?; - let toml_table = toml_value - .as_table() - .ok_or_else(|| anyhow!("Schedule file must be a TOML table"))?; - let engines = Schedule::parse_engines(toml_table).await?; - let steps = Schedule::parse_steps(toml_table)?; + let config: ScheduleConfig = toml::from_str(&content) + .with_context(|| format!("Failed to parse schedule file: {path_str}"))?; + + let engines = Self::instantiate_engines(config.engines).await?; + + Ok(Self::new(engines, config.steps, path_str)) + } + + /// Instantiate engine runners from their configurations + async fn instantiate_engines( + configs: HashMap, + ) -> anyhow::Result>> { + let mut engines = HashMap::new(); + + for (name, config) in configs { + let engine = load_engine_runner(&name, config).await?; + engines.insert(name, engine); + } - Ok(Self::new(engines, steps, path_str)) + Ok(engines) } pub async fn run(mut self) -> anyhow::Result<()> { @@ -105,103 +125,116 @@ impl Schedule { } Ok(()) } +} - async fn parse_engines( - table: &TomlTable, - ) -> anyhow::Result>> { - let engines_tbl = table - .get("engines") - .with_context(|| "Schedule file must have an 'engines' table")? - .as_table() - .ok_or_else(|| anyhow!("'engines' must be a table"))?; - - let mut engines = HashMap::new(); - - for (name, engine_val) in engines_tbl { - let cfg_tbl = engine_val - .as_table() - .ok_or_else(|| anyhow!("Config of engine '{name}' is not a table"))? - .clone(); - - let engine_type = cfg_tbl - .get("type") - .ok_or_else(|| anyhow::anyhow!("Engine {name} doesn't have a 'type' field"))? - .as_str() - .ok_or_else(|| anyhow::anyhow!("Engine {name} type must be a string"))?; - - let engine = load_engine_runner(engine_type, cfg_tbl.clone()).await?; - - if engines.insert(name.clone(), engine).is_some() { - return Err(anyhow!("Duplicate engine '{name}'")); - } - } +#[cfg(test)] +mod tests { + use crate::engine::EngineType; + use crate::schedule::ScheduleConfig; - Ok(engines) - } + #[test] + fn test_deserialize_schedule_config() { + let input = r#" + [engines] + df = { type = "datafusion" } - fn parse_steps(table: &TomlTable) -> anyhow::Result> { - let steps_val = table - .get("steps") - .with_context(|| "Schedule file must have a 'steps' array")?; + [[steps]] + engine = "df" + slt = "test.slt" + "#; - let steps: Vec = steps_val - .clone() - .try_into() - .with_context(|| "Failed to deserialize steps")?; + let config: ScheduleConfig = toml::from_str(input).unwrap(); - Ok(steps) + assert_eq!(config.engines.len(), 1); + assert!(config.engines.contains_key("df")); + assert_eq!(config.engines["df"].engine_type, EngineType::Datafusion); + assert_eq!(config.steps.len(), 1); + assert_eq!(config.steps[0].engine, "df"); + assert_eq!(config.steps[0].slt, "test.slt"); } -} - -#[cfg(test)] -mod tests { - use toml::Table as TomlTable; - - use crate::schedule::Schedule; #[test] - fn test_parse_steps() { + fn test_deserialize_multiple_steps() { let input = r#" + [engines] + datafusion = { type = "datafusion" } + [[steps]] engine = "datafusion" slt = "test.slt" [[steps]] - engine = "spark" + engine = "datafusion" slt = "test2.slt" "#; - let tbl: TomlTable = toml::from_str(input).unwrap(); - let steps = Schedule::parse_steps(&tbl).unwrap(); + let config: ScheduleConfig = toml::from_str(input).unwrap(); - assert_eq!(steps.len(), 2); - assert_eq!(steps[0].engine, "datafusion"); - assert_eq!(steps[0].slt, "test.slt"); - assert_eq!(steps[1].engine, "spark"); - assert_eq!(steps[1].slt, "test2.slt"); + assert_eq!(config.steps.len(), 2); + assert_eq!(config.steps[0].engine, "datafusion"); + assert_eq!(config.steps[0].slt, "test.slt"); + assert_eq!(config.steps[1].engine, "datafusion"); + assert_eq!(config.steps[1].slt, "test2.slt"); } #[test] - fn test_parse_steps_empty() { + fn test_deserialize_with_extra_fields() { + // Test forward-compatibility with extra fields (for PR #1943) let input = r#" + [engines] + df = { type = "datafusion", catalog_type = "rest", some_future_field = "value" } + [[steps]] + engine = "df" + slt = "test.slt" "#; - let tbl: TomlTable = toml::from_str(input).unwrap(); - let steps = Schedule::parse_steps(&tbl); + let config: ScheduleConfig = toml::from_str(input).unwrap(); - assert!(steps.is_err()); + assert!(config.engines["df"].extra.contains_key("catalog_type")); + assert!(config.engines["df"].extra.contains_key("some_future_field")); } - #[tokio::test] - async fn test_parse_engines_invalid_table() { - let toml_content = r#" - engines = "not_a_table" + #[test] + fn test_deserialize_missing_engine_type() { + let input = r#" + [engines] + df = { } + + [[steps]] + engine = "df" + slt = "test.slt" "#; - let table: TomlTable = toml::from_str(toml_content).unwrap(); - let result = Schedule::parse_engines(&table).await; + let result: Result = toml::from_str(input); + assert!(result.is_err()); + } + + #[test] + fn test_deserialize_invalid_engine_type() { + let input = r#" + [engines] + df = { type = "unknown_engine" } + + [[steps]] + engine = "df" + slt = "test.slt" + "#; + + let result: Result = toml::from_str(input); + assert!(result.is_err()); + } + + #[test] + fn test_deserialize_missing_step_fields() { + let input = r#" + [engines] + df = { type = "datafusion" } + + [[steps]] + "#; + let result: Result = toml::from_str(input); assert!(result.is_err()); } } From f09e5e77c387b5ea309ffd61ab809e94a9b67307 Mon Sep 17 00:00:00 2001 From: AndreaBozzo Date: Tue, 23 Dec 2025 10:14:26 +0100 Subject: [PATCH 2/2] refactor(sqllogictest): use tagged enum for EngineConfig with CatalogConfig - Replace EngineConfig struct with tagged enum for better type safety - Add CatalogConfig struct with catalog_type and props fields - Remove unused _name parameter from DataFusionEngine::new() - Prepare structure for future catalog loader integration --- crates/sqllogictest/src/engine/datafusion.rs | 15 ++-- crates/sqllogictest/src/engine/mod.rs | 93 ++++++++++++-------- crates/sqllogictest/src/schedule.rs | 33 +++++-- 3 files changed, 89 insertions(+), 52 deletions(-) diff --git a/crates/sqllogictest/src/engine/datafusion.rs b/crates/sqllogictest/src/engine/datafusion.rs index 021fd30054..e66dc890d1 100644 --- a/crates/sqllogictest/src/engine/datafusion.rs +++ b/crates/sqllogictest/src/engine/datafusion.rs @@ -28,7 +28,7 @@ use iceberg::{Catalog, CatalogBuilder, NamespaceIdent, TableCreation}; use iceberg_datafusion::IcebergCatalogProvider; use indicatif::ProgressBar; -use crate::engine::{EngineConfig, EngineRunner, run_slt_with_runner}; +use crate::engine::{CatalogConfig, EngineRunner, run_slt_with_runner}; use crate::error::Result; pub struct DataFusionEngine { @@ -58,12 +58,15 @@ impl EngineRunner for DataFusionEngine { } impl DataFusionEngine { - pub async fn new(_name: &str, config: &EngineConfig) -> Result { + pub async fn new(catalog_config: Option) -> Result { let session_config = SessionConfig::new() .with_target_partitions(4) .with_information_schema(true); let ctx = SessionContext::new_with_config(session_config); - ctx.register_catalog("default", Self::create_catalog(&config.extra).await?); + ctx.register_catalog( + "default", + Self::create_catalog(catalog_config.as_ref()).await?, + ); Ok(Self { test_data_path: PathBuf::from("testdata"), @@ -72,10 +75,10 @@ impl DataFusionEngine { } async fn create_catalog( - _extra: &HashMap, + _catalog_config: Option<&CatalogConfig>, ) -> anyhow::Result> { - // TODO: support dynamic catalog configuration - // See: https://github.com/apache/iceberg-rust/issues/1780 + // TODO: Use catalog_config to load different catalog types via iceberg-catalog-loader + // See: https://github.com/apache/iceberg-rust/issues/1780 let catalog = MemoryCatalogBuilder::default() .load( "memory", diff --git a/crates/sqllogictest/src/engine/mod.rs b/crates/sqllogictest/src/engine/mod.rs index b2773bcbd5..9b7c4b0063 100644 --- a/crates/sqllogictest/src/engine/mod.rs +++ b/crates/sqllogictest/src/engine/mod.rs @@ -27,24 +27,25 @@ use sqllogictest::{AsyncDB, MakeConnection, Runner, parse_file}; use crate::engine::datafusion::DataFusionEngine; use crate::error::{Error, Result}; -/// Supported engine types for sqllogictest -#[derive(Debug, Clone, Deserialize, PartialEq, Eq)] -#[serde(rename_all = "lowercase")] -pub enum EngineType { - Datafusion, -} - -/// Configuration for a single engine instance +/// Configuration for the catalog used by an engine #[derive(Debug, Clone, Deserialize)] -pub struct EngineConfig { - /// The type of engine +pub struct CatalogConfig { + /// Catalog type: "memory", "rest", "glue", "hms", "s3tables", "sql" #[serde(rename = "type")] - pub engine_type: EngineType, + pub catalog_type: String, + /// Catalog properties passed to the catalog loader + #[serde(default)] + pub props: HashMap, +} - /// Additional configuration fields for extensibility - /// This allows forward-compatibility with future fields like catalog_type, catalog_properties - #[serde(flatten)] - pub extra: HashMap, +/// Engine configuration as a tagged enum +#[derive(Debug, Clone, Deserialize)] +#[serde(tag = "type", rename_all = "lowercase")] +pub enum EngineConfig { + Datafusion { + #[serde(default)] + catalog: Option, + }, } #[async_trait::async_trait] @@ -52,9 +53,9 @@ pub trait EngineRunner: Send { async fn run_slt_file(&mut self, path: &Path) -> Result<()>; } -pub async fn load_engine_runner(name: &str, config: EngineConfig) -> Result> { - match config.engine_type { - EngineType::Datafusion => Ok(Box::new(DataFusionEngine::new(name, &config).await?)), +pub async fn load_engine_runner(config: EngineConfig) -> Result> { + match config { + EngineConfig::Datafusion { catalog } => Ok(Box::new(DataFusionEngine::new(catalog).await?)), } } @@ -80,45 +81,63 @@ where #[cfg(test)] mod tests { - use std::collections::HashMap; - - use crate::engine::{EngineConfig, EngineType, load_engine_runner}; + use crate::engine::{CatalogConfig, EngineConfig, load_engine_runner}; #[test] fn test_deserialize_engine_config() { - let input = r#" - type = "datafusion" - "#; + let input = r#"type = "datafusion""#; let config: EngineConfig = toml::from_str(input).unwrap(); - assert_eq!(config.engine_type, EngineType::Datafusion); - assert!(config.extra.is_empty()); + assert!(matches!(config, EngineConfig::Datafusion { catalog: None })); } #[test] - fn test_deserialize_engine_config_with_extras() { + fn test_deserialize_engine_config_with_catalog() { let input = r#" type = "datafusion" - catalog_type = "rest" - [catalog_properties] + [catalog] + type = "rest" + + [catalog.props] uri = "http://localhost:8181" "#; let config: EngineConfig = toml::from_str(input).unwrap(); - assert_eq!(config.engine_type, EngineType::Datafusion); - assert!(config.extra.contains_key("catalog_type")); - assert!(config.extra.contains_key("catalog_properties")); + match config { + EngineConfig::Datafusion { catalog: Some(cat) } => { + assert_eq!(cat.catalog_type, "rest"); + assert_eq!( + cat.props.get("uri"), + Some(&"http://localhost:8181".to_string()) + ); + } + _ => panic!("Expected Datafusion with catalog"), + } + } + + #[test] + fn test_deserialize_catalog_config() { + let input = r#" + type = "memory" + + [props] + warehouse = "file:///tmp/warehouse" + "#; + + let config: CatalogConfig = toml::from_str(input).unwrap(); + assert_eq!(config.catalog_type, "memory"); + assert_eq!( + config.props.get("warehouse"), + Some(&"file:///tmp/warehouse".to_string()) + ); } #[tokio::test] async fn test_load_datafusion() { - let config = EngineConfig { - engine_type: EngineType::Datafusion, - extra: HashMap::new(), - }; + let config = EngineConfig::Datafusion { catalog: None }; - let result = load_engine_runner("df", config).await; + let result = load_engine_runner(config).await; assert!(result.is_ok()); } } diff --git a/crates/sqllogictest/src/schedule.rs b/crates/sqllogictest/src/schedule.rs index 2e359d7427..25728a2968 100644 --- a/crates/sqllogictest/src/schedule.rs +++ b/crates/sqllogictest/src/schedule.rs @@ -83,7 +83,7 @@ impl Schedule { let mut engines = HashMap::new(); for (name, config) in configs { - let engine = load_engine_runner(&name, config).await?; + let engine = load_engine_runner(config).await?; engines.insert(name, engine); } @@ -129,7 +129,7 @@ impl Schedule { #[cfg(test)] mod tests { - use crate::engine::EngineType; + use crate::engine::EngineConfig; use crate::schedule::ScheduleConfig; #[test] @@ -147,7 +147,9 @@ mod tests { assert_eq!(config.engines.len(), 1); assert!(config.engines.contains_key("df")); - assert_eq!(config.engines["df"].engine_type, EngineType::Datafusion); + assert!(matches!(config.engines["df"], EngineConfig::Datafusion { + catalog: None + })); assert_eq!(config.steps.len(), 1); assert_eq!(config.steps[0].engine, "df"); assert_eq!(config.steps[0].slt, "test.slt"); @@ -178,11 +180,16 @@ mod tests { } #[test] - fn test_deserialize_with_extra_fields() { - // Test forward-compatibility with extra fields (for PR #1943) + fn test_deserialize_with_catalog_config() { let input = r#" - [engines] - df = { type = "datafusion", catalog_type = "rest", some_future_field = "value" } + [engines.df] + type = "datafusion" + + [engines.df.catalog] + type = "rest" + + [engines.df.catalog.props] + uri = "http://localhost:8181" [[steps]] engine = "df" @@ -191,8 +198,16 @@ mod tests { let config: ScheduleConfig = toml::from_str(input).unwrap(); - assert!(config.engines["df"].extra.contains_key("catalog_type")); - assert!(config.engines["df"].extra.contains_key("some_future_field")); + match &config.engines["df"] { + EngineConfig::Datafusion { catalog: Some(cat) } => { + assert_eq!(cat.catalog_type, "rest"); + assert_eq!( + cat.props.get("uri"), + Some(&"http://localhost:8181".to_string()) + ); + } + _ => panic!("Expected Datafusion with catalog config"), + } } #[test]