From 4239a20b6d3e68f3b2094f673ad63a9d70d39b9e Mon Sep 17 00:00:00 2001 From: AndreaBozzo Date: Thu, 18 Dec 2025 13:39:17 +0100 Subject: [PATCH] feat(sqllogictest): support dynamic catalog configuration per engine This PR implements issue #1780 by allowing each engine in the sqllogictest framework to configure its own catalog. Changes: - Remove global [catalog] section from schedule parsing - Each engine now creates its own catalog based on engine-specific config - DataFusionEngine reads 'catalog_type' and 'catalog_properties' from config - Default catalog type is 'memory' with a temp warehouse for testing - Support for all catalog types via iceberg-catalog-loader (rest, glue, hms, sql, s3tables) Example configuration: ```toml [engines] df = { type = "datafusion", catalog_type = "rest", catalog_properties = { uri = "http://localhost:8181" } } ``` Closes #1780 --- Cargo.lock | 1 + Cargo.toml | 1 + crates/sqllogictest/Cargo.toml | 1 + crates/sqllogictest/src/engine/datafusion.rs | 123 +++++++++++++++---- crates/sqllogictest/src/engine/mod.rs | 19 ++- crates/sqllogictest/src/schedule.rs | 27 ++++ 6 files changed, 148 insertions(+), 24 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 97ee25d658..4bd2622559 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3597,6 +3597,7 @@ dependencies = [ "enum-ordinalize", "env_logger", "iceberg", + "iceberg-catalog-loader", "iceberg-datafusion", "indicatif", "libtest-mimic", diff --git a/Cargo.toml b/Cargo.toml index d099398dbd..4b0669da1c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -81,6 +81,7 @@ http = "1.2" iceberg = { version = "0.8.0", path = "./crates/iceberg" } iceberg-catalog-glue = { version = "0.8.0", path = "./crates/catalog/glue" } iceberg-catalog-hms = { version = "0.8.0", path = "./crates/catalog/hms" } +iceberg-catalog-loader = { version = "0.8.0", path = "./crates/catalog/loader" } iceberg-catalog-rest = { version = "0.8.0", path = "./crates/catalog/rest" } iceberg-catalog-s3tables = { version = "0.8.0", path = "./crates/catalog/s3tables" } iceberg-catalog-sql = { version = "0.8.0", path = "./crates/catalog/sql" } diff --git a/crates/sqllogictest/Cargo.toml b/crates/sqllogictest/Cargo.toml index e826ad7ae0..0c3280ee70 100644 --- a/crates/sqllogictest/Cargo.toml +++ b/crates/sqllogictest/Cargo.toml @@ -32,6 +32,7 @@ datafusion-sqllogictest = { workspace = true } enum-ordinalize = { workspace = true } env_logger = { workspace = true } iceberg = { workspace = true } +iceberg-catalog-loader = { workspace = true } iceberg-datafusion = { workspace = true } indicatif = { workspace = true } log = { workspace = true } diff --git a/crates/sqllogictest/src/engine/datafusion.rs b/crates/sqllogictest/src/engine/datafusion.rs index e3402dfa97..5db02b6cb8 100644 --- a/crates/sqllogictest/src/engine/datafusion.rs +++ b/crates/sqllogictest/src/engine/datafusion.rs @@ -19,12 +19,12 @@ use std::collections::HashMap; use std::path::{Path, PathBuf}; use std::sync::Arc; -use datafusion::catalog::CatalogProvider; use datafusion::prelude::{SessionConfig, SessionContext}; use datafusion_sqllogictest::DataFusion; use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder}; use iceberg::spec::{NestedField, PrimitiveType, Schema, Transform, Type, UnboundPartitionSpec}; use iceberg::{Catalog, CatalogBuilder, NamespaceIdent, TableCreation}; +use iceberg_catalog_loader::CatalogLoader; use iceberg_datafusion::IcebergCatalogProvider; use indicatif::ProgressBar; use toml::Table as TomlTable; @@ -32,6 +32,8 @@ use toml::Table as TomlTable; use crate::engine::{EngineRunner, run_slt_with_runner}; use crate::error::Result; +const DEFAULT_CATALOG_TYPE: &str = "memory"; + pub struct DataFusionEngine { test_data_path: PathBuf, session_context: SessionContext, @@ -59,12 +61,38 @@ impl EngineRunner for DataFusionEngine { } impl DataFusionEngine { + /// Create a new DataFusion engine with catalog configuration from the TOML config. + /// + /// # Configuration + /// + /// The engine reads catalog configuration from the TOML config: + /// - `catalog_type`: The type of catalog to use (e.g., "memory", "rest"). Defaults to "memory". + /// - `catalog_properties`: Additional properties for the catalog (optional). + /// + /// # Example configuration + /// + /// ```toml + /// [engines] + /// df = { type = "datafusion", catalog_type = "rest", catalog_properties = { uri = "http://localhost:8181" } } + /// ``` pub async fn new(config: TomlTable) -> Result { + let catalog = Self::create_catalog(&config).await?; + let session_config = SessionConfig::new() .with_target_partitions(4) .with_information_schema(true); let ctx = SessionContext::new_with_config(session_config); - ctx.register_catalog("default", Self::create_catalog(&config).await?); + + // Create test namespace and tables in the catalog + Self::setup_test_data(&catalog).await?; + + // Register the catalog with DataFusion + let catalog_provider = IcebergCatalogProvider::try_new(catalog) + .await + .map_err(|e| { + crate::error::Error(anyhow::anyhow!("Failed to create catalog provider: {e}")) + })?; + ctx.register_catalog("default", Arc::new(catalog_provider)); Ok(Self { test_data_path: PathBuf::from("testdata"), @@ -72,36 +100,87 @@ impl DataFusionEngine { }) } - async fn create_catalog(_: &TomlTable) -> anyhow::Result> { - // TODO: support dynamic catalog configuration - // See: https://github.com/apache/iceberg-rust/issues/1780 - let catalog = MemoryCatalogBuilder::default() - .load( - "memory", - HashMap::from([( + /// Create a catalog from the engine configuration. + /// + /// Supported catalog types: + /// - "memory": In-memory catalog (default), useful for testing + /// - "rest": REST catalog + /// - "glue": AWS Glue catalog + /// - "hms": Hive Metastore catalog + /// - "s3tables": S3 Tables catalog + /// - "sql": SQL catalog + async fn create_catalog(config: &TomlTable) -> Result> { + let catalog_type = config + .get("catalog_type") + .and_then(|v| v.as_str()) + .unwrap_or(DEFAULT_CATALOG_TYPE); + + let catalog_properties: HashMap = config + .get("catalog_properties") + .and_then(|v| v.as_table()) + .map(|t| { + t.iter() + .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string()))) + .collect() + }) + .unwrap_or_default(); + + if catalog_type == "memory" { + // Memory catalog is built-in to iceberg crate, not in catalog-loader + // Ensure warehouse is set for memory catalog + let mut props = catalog_properties; + if !props.contains_key(MEMORY_CATALOG_WAREHOUSE) { + // Use a temp directory as default warehouse for testing + props.insert( MEMORY_CATALOG_WAREHOUSE.to_string(), - "memory://".to_string(), - )]), - ) - .await?; + std::env::temp_dir() + .join("iceberg-sqllogictest") + .to_string_lossy() + .to_string(), + ); + } + let catalog = MemoryCatalogBuilder::default() + .load("default", props) + .await + .map_err(|e| { + crate::error::Error(anyhow::anyhow!("Failed to load memory catalog: {e}")) + })?; + Ok(Arc::new(catalog)) + } else { + // Use catalog-loader for other catalog types + let catalog = CatalogLoader::from(catalog_type) + .load("default".to_string(), catalog_properties) + .await + .map_err(|e| crate::error::Error(anyhow::anyhow!("Failed to load catalog: {e}")))?; + Ok(catalog) + } + } + /// Set up the test namespace and tables in the catalog. + async fn setup_test_data(catalog: &Arc) -> anyhow::Result<()> { // Create a test namespace for INSERT INTO tests let namespace = NamespaceIdent::new("default".to_string()); - catalog.create_namespace(&namespace, HashMap::new()).await?; - // Create test tables - Self::create_unpartitioned_table(&catalog, &namespace).await?; - Self::create_partitioned_table(&catalog, &namespace).await?; + // Try to create the namespace, ignore if it already exists + if catalog + .create_namespace(&namespace, HashMap::new()) + .await + .is_err() + { + // Namespace might already exist, that's ok + } - Ok(Arc::new( - IcebergCatalogProvider::try_new(Arc::new(catalog)).await?, - )) + // Create test tables (ignore errors if they already exist) + let _ = Self::create_unpartitioned_table(catalog, &namespace).await; + let _ = Self::create_partitioned_table(catalog, &namespace).await; + + Ok(()) } /// Create an unpartitioned test table with id and name columns /// TODO: this can be removed when we support CREATE TABLE async fn create_unpartitioned_table( - catalog: &impl Catalog, + catalog: &Arc, namespace: &NamespaceIdent, ) -> anyhow::Result<()> { let schema = Schema::builder() @@ -128,7 +207,7 @@ impl DataFusionEngine { /// Partitioned by category using identity transform /// TODO: this can be removed when we support CREATE TABLE async fn create_partitioned_table( - catalog: &impl Catalog, + catalog: &Arc, namespace: &NamespaceIdent, ) -> anyhow::Result<()> { let schema = Schema::builder() diff --git a/crates/sqllogictest/src/engine/mod.rs b/crates/sqllogictest/src/engine/mod.rs index 724359fbe5..2a05491e17 100644 --- a/crates/sqllogictest/src/engine/mod.rs +++ b/crates/sqllogictest/src/engine/mod.rs @@ -33,6 +33,9 @@ pub trait EngineRunner: Send { async fn run_slt_file(&mut self, path: &Path) -> Result<()>; } +/// Load an engine runner based on the engine type and configuration. +/// Each engine is responsible for creating its own catalog based on the +/// `catalog_type` and `catalog_properties` fields in the config. pub async fn load_engine_runner( engine_type: &str, cfg: TomlTable, @@ -80,7 +83,7 @@ mod tests { } #[tokio::test] - async fn test_load_datafusion() { + async fn test_load_datafusion_default_catalog() { let input = r#" [engines] df = { type = "datafusion" } @@ -88,6 +91,18 @@ mod tests { let tbl = toml::from_str(input).unwrap(); let result = load_engine_runner(TYPE_DATAFUSION, tbl).await; - assert!(result.is_ok()); + assert!(result.is_ok(), "Failed to load engine: {:?}", result.err()); + } + + #[tokio::test] + async fn test_load_datafusion_with_memory_catalog() { + let input = r#" + [engines] + df = { type = "datafusion", catalog_type = "memory" } + "#; + let tbl = toml::from_str(input).unwrap(); + let result = load_engine_runner(TYPE_DATAFUSION, tbl).await; + + assert!(result.is_ok(), "Failed to load engine: {:?}", result.err()); } } diff --git a/crates/sqllogictest/src/schedule.rs b/crates/sqllogictest/src/schedule.rs index 7c13ad4d12..9b72c77d18 100644 --- a/crates/sqllogictest/src/schedule.rs +++ b/crates/sqllogictest/src/schedule.rs @@ -204,4 +204,31 @@ mod tests { assert!(result.is_err()); } + + #[tokio::test] + async fn test_parse_engines_with_catalog_config() { + let toml_content = r#" + [engines] + df = { type = "datafusion", catalog_type = "memory" } + "#; + + let table: TomlTable = toml::from_str(toml_content).unwrap(); + let result = Schedule::parse_engines(&table).await; + + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_parse_engines_default_catalog() { + let toml_content = r#" + [engines] + df = { type = "datafusion" } + "#; + + let table: TomlTable = toml::from_str(toml_content).unwrap(); + let result = Schedule::parse_engines(&table).await; + + // Should default to memory catalog + assert!(result.is_ok()); + } }