@@ -19,19 +19,21 @@ use std::collections::HashMap;
1919use std:: path:: { Path , PathBuf } ;
2020use std:: sync:: Arc ;
2121
22- use datafusion:: catalog:: CatalogProvider ;
2322use datafusion:: prelude:: { SessionConfig , SessionContext } ;
2423use datafusion_sqllogictest:: DataFusion ;
2524use iceberg:: memory:: { MEMORY_CATALOG_WAREHOUSE , MemoryCatalogBuilder } ;
2625use iceberg:: spec:: { NestedField , PrimitiveType , Schema , Transform , Type , UnboundPartitionSpec } ;
2726use iceberg:: { Catalog , CatalogBuilder , NamespaceIdent , TableCreation } ;
27+ use iceberg_catalog_loader:: CatalogLoader ;
2828use iceberg_datafusion:: IcebergCatalogProvider ;
2929use indicatif:: ProgressBar ;
3030use toml:: Table as TomlTable ;
3131
3232use crate :: engine:: { EngineRunner , run_slt_with_runner} ;
3333use crate :: error:: Result ;
3434
35+ const DEFAULT_CATALOG_TYPE : & str = "memory" ;
36+
3537pub struct DataFusionEngine {
3638 test_data_path : PathBuf ,
3739 session_context : SessionContext ,
@@ -59,49 +61,126 @@ impl EngineRunner for DataFusionEngine {
5961}
6062
6163impl DataFusionEngine {
64+ /// Create a new DataFusion engine with catalog configuration from the TOML config.
65+ ///
66+ /// # Configuration
67+ ///
68+ /// The engine reads catalog configuration from the TOML config:
69+ /// - `catalog_type`: The type of catalog to use (e.g., "memory", "rest"). Defaults to "memory".
70+ /// - `catalog_properties`: Additional properties for the catalog (optional).
71+ ///
72+ /// # Example configuration
73+ ///
74+ /// ```toml
75+ /// [engines]
76+ /// df = { type = "datafusion", catalog_type = "rest", catalog_properties = { uri = "http://localhost:8181" } }
77+ /// ```
6278 pub async fn new ( config : TomlTable ) -> Result < Self > {
79+ let catalog = Self :: create_catalog ( & config) . await ?;
80+
6381 let session_config = SessionConfig :: new ( )
6482 . with_target_partitions ( 4 )
6583 . with_information_schema ( true ) ;
6684 let ctx = SessionContext :: new_with_config ( session_config) ;
67- ctx. register_catalog ( "default" , Self :: create_catalog ( & config) . await ?) ;
85+
86+ // Create test namespace and tables in the catalog
87+ Self :: setup_test_data ( & catalog) . await ?;
88+
89+ // Register the catalog with DataFusion
90+ let catalog_provider = IcebergCatalogProvider :: try_new ( catalog)
91+ . await
92+ . map_err ( |e| {
93+ crate :: error:: Error ( anyhow:: anyhow!( "Failed to create catalog provider: {e}" ) )
94+ } ) ?;
95+ ctx. register_catalog ( "default" , Arc :: new ( catalog_provider) ) ;
6896
6997 Ok ( Self {
7098 test_data_path : PathBuf :: from ( "testdata" ) ,
7199 session_context : ctx,
72100 } )
73101 }
74102
75- async fn create_catalog ( _: & TomlTable ) -> anyhow:: Result < Arc < dyn CatalogProvider > > {
76- // TODO: support dynamic catalog configuration
77- // See: https://github.com/apache/iceberg-rust/issues/1780
78- let catalog = MemoryCatalogBuilder :: default ( )
79- . load (
80- "memory" ,
81- HashMap :: from ( [ (
103+ /// Create a catalog from the engine configuration.
104+ ///
105+ /// Supported catalog types:
106+ /// - "memory": In-memory catalog (default), useful for testing
107+ /// - "rest": REST catalog
108+ /// - "glue": AWS Glue catalog
109+ /// - "hms": Hive Metastore catalog
110+ /// - "s3tables": S3 Tables catalog
111+ /// - "sql": SQL catalog
112+ async fn create_catalog ( config : & TomlTable ) -> Result < Arc < dyn Catalog > > {
113+ let catalog_type = config
114+ . get ( "catalog_type" )
115+ . and_then ( |v| v. as_str ( ) )
116+ . unwrap_or ( DEFAULT_CATALOG_TYPE ) ;
117+
118+ let catalog_properties: HashMap < String , String > = config
119+ . get ( "catalog_properties" )
120+ . and_then ( |v| v. as_table ( ) )
121+ . map ( |t| {
122+ t. iter ( )
123+ . filter_map ( |( k, v) | v. as_str ( ) . map ( |s| ( k. clone ( ) , s. to_string ( ) ) ) )
124+ . collect ( )
125+ } )
126+ . unwrap_or_default ( ) ;
127+
128+ if catalog_type == "memory" {
129+ // Memory catalog is built-in to iceberg crate, not in catalog-loader
130+ // Ensure warehouse is set for memory catalog
131+ let mut props = catalog_properties;
132+ if !props. contains_key ( MEMORY_CATALOG_WAREHOUSE ) {
133+ // Use a temp directory as default warehouse for testing
134+ props. insert (
82135 MEMORY_CATALOG_WAREHOUSE . to_string ( ) ,
83- "memory://" . to_string ( ) ,
84- ) ] ) ,
85- )
86- . await ?;
136+ std:: env:: temp_dir ( )
137+ . join ( "iceberg-sqllogictest" )
138+ . to_string_lossy ( )
139+ . to_string ( ) ,
140+ ) ;
141+ }
142+ let catalog = MemoryCatalogBuilder :: default ( )
143+ . load ( "default" , props)
144+ . await
145+ . map_err ( |e| {
146+ crate :: error:: Error ( anyhow:: anyhow!( "Failed to load memory catalog: {e}" ) )
147+ } ) ?;
148+ Ok ( Arc :: new ( catalog) )
149+ } else {
150+ // Use catalog-loader for other catalog types
151+ let catalog = CatalogLoader :: from ( catalog_type)
152+ . load ( "default" . to_string ( ) , catalog_properties)
153+ . await
154+ . map_err ( |e| crate :: error:: Error ( anyhow:: anyhow!( "Failed to load catalog: {e}" ) ) ) ?;
155+ Ok ( catalog)
156+ }
157+ }
87158
159+ /// Set up the test namespace and tables in the catalog.
160+ async fn setup_test_data ( catalog : & Arc < dyn Catalog > ) -> anyhow:: Result < ( ) > {
88161 // Create a test namespace for INSERT INTO tests
89162 let namespace = NamespaceIdent :: new ( "default" . to_string ( ) ) ;
90- catalog. create_namespace ( & namespace, HashMap :: new ( ) ) . await ?;
91163
92- // Create test tables
93- Self :: create_unpartitioned_table ( & catalog, & namespace) . await ?;
94- Self :: create_partitioned_table ( & catalog, & namespace) . await ?;
164+ // Try to create the namespace, ignore if it already exists
165+ if catalog
166+ . create_namespace ( & namespace, HashMap :: new ( ) )
167+ . await
168+ . is_err ( )
169+ {
170+ // Namespace might already exist, that's ok
171+ }
95172
96- Ok ( Arc :: new (
97- IcebergCatalogProvider :: try_new ( Arc :: new ( catalog) ) . await ?,
98- ) )
173+ // Create test tables (ignore errors if they already exist)
174+ let _ = Self :: create_unpartitioned_table ( catalog, & namespace) . await ;
175+ let _ = Self :: create_partitioned_table ( catalog, & namespace) . await ;
176+
177+ Ok ( ( ) )
99178 }
100179
101180 /// Create an unpartitioned test table with id and name columns
102181 /// TODO: this can be removed when we support CREATE TABLE
103182 async fn create_unpartitioned_table (
104- catalog : & impl Catalog ,
183+ catalog : & Arc < dyn Catalog > ,
105184 namespace : & NamespaceIdent ,
106185 ) -> anyhow:: Result < ( ) > {
107186 let schema = Schema :: builder ( )
@@ -128,7 +207,7 @@ impl DataFusionEngine {
128207 /// Partitioned by category using identity transform
129208 /// TODO: this can be removed when we support CREATE TABLE
130209 async fn create_partitioned_table (
131- catalog : & impl Catalog ,
210+ catalog : & Arc < dyn Catalog > ,
132211 namespace : & NamespaceIdent ,
133212 ) -> anyhow:: Result < ( ) > {
134213 let schema = Schema :: builder ( )
0 commit comments