diff --git a/crates/modelardb_embedded/src/operations/data_folder.rs b/crates/modelardb_embedded/src/operations/data_folder.rs index cf056765..7f9a5c7b 100644 --- a/crates/modelardb_embedded/src/operations/data_folder.rs +++ b/crates/modelardb_embedded/src/operations/data_folder.rs @@ -125,9 +125,6 @@ impl Operations for DataFolder { match table_type { TableType::NormalTable(schema) => { let delta_table = self.create_normal_table(table_name, &schema).await?; - - self.save_normal_table_metadata(table_name).await?; - let data_sink = Arc::new(DataFolderDataSink::new()); modelardb_storage::register_normal_table( @@ -149,9 +146,6 @@ impl Operations for DataFolder { .create_time_series_table(&time_series_table_metadata) .await?; - self.save_time_series_table_metadata(&time_series_table_metadata) - .await?; - let data_sink = Arc::new(DataFolderDataSink::new()); modelardb_storage::register_time_series_table( @@ -517,9 +511,6 @@ impl Operations for DataFolder { // Drop the table from the Apache Arrow DataFusion session. self.session_context().deregister_table(table_name)?; - // Delete the table metadata from the Delta Lake. - self.drop_table_metadata(table_name).await?; - // Drop the table from the Delta Lake. self.drop_table(table_name).await?; diff --git a/crates/modelardb_server/src/cluster.rs b/crates/modelardb_server/src/cluster.rs index 8589437d..77b8f8a8 100644 --- a/crates/modelardb_server/src/cluster.rs +++ b/crates/modelardb_server/src/cluster.rs @@ -147,10 +147,6 @@ impl Cluster { .create_normal_table(table_name, schema) .await?; - self.remote_data_folder - .save_normal_table_metadata(table_name) - .await?; - // Create the normal table in each peer node. let protobuf_bytes = modelardb_types::flight::encode_and_serialize_normal_table_metadata( table_name, schema, @@ -178,10 +174,6 @@ impl Cluster { .create_time_series_table(time_series_table_metadata) .await?; - self.remote_data_folder - .save_time_series_table_metadata(time_series_table_metadata) - .await?; - // Create the time series table in each peer node. let protobuf_bytes = modelardb_types::flight::encode_and_serialize_time_series_table_metadata( @@ -203,10 +195,6 @@ impl Cluster { pub(crate) async fn drop_cluster_tables(&self, table_names: &[String]) -> Result<()> { // Drop the tables from the remote data folder. for table_name in table_names { - self.remote_data_folder - .drop_table_metadata(table_name) - .await?; - self.remote_data_folder.drop_table(table_name).await?; } @@ -663,11 +651,6 @@ mod test { .create_normal_table(table_name, &schema) .await .unwrap(); - - data_folder - .save_normal_table_metadata(table_name) - .await - .unwrap(); } /// Create a time series table named `table_name` with a field column named `column_name` in @@ -694,11 +677,6 @@ mod test { .create_time_series_table(&time_series_table_metadata) .await .unwrap(); - - data_folder - .save_time_series_table_metadata(&time_series_table_metadata) - .await - .unwrap(); } /// Call [`Cluster::retrieve_and_create_tables`] if the [`ClusterMode`] of `context` is diff --git a/crates/modelardb_server/src/context.rs b/crates/modelardb_server/src/context.rs index f21e7439..c015e925 100644 --- a/crates/modelardb_server/src/context.rs +++ b/crates/modelardb_server/src/context.rs @@ -78,7 +78,7 @@ impl Context { table_name: &str, schema: &Schema, ) -> Result<()> { - // Create an empty Delta Lake table. + // Create an empty Delta Lake table and save the normal table metadata to the Delta Lake. self.data_folders .local_data_folder .create_normal_table(table_name, schema) @@ -87,12 +87,6 @@ impl Context { // Register the normal table with Apache DataFusion. self.register_normal_table(table_name).await?; - // Persist the new normal table to the Delta Lake. - self.data_folders - .local_data_folder - .save_normal_table_metadata(table_name) - .await?; - info!("Created normal table '{}'.", table_name); Ok(()) @@ -119,7 +113,7 @@ impl Context { &self, time_series_table_metadata: &TimeSeriesTableMetadata, ) -> Result<()> { - // Create an empty Delta Lake table. + // Create an empty Delta Lake table and save the time series table metadata to the Delta Lake. self.data_folders .local_data_folder .create_time_series_table(time_series_table_metadata) @@ -129,12 +123,6 @@ impl Context { self.register_time_series_table(Arc::new(time_series_table_metadata.clone())) .await?; - // Persist the new time series table to the Delta Lake. - self.data_folders - .local_data_folder - .save_time_series_table_metadata(time_series_table_metadata) - .await?; - info!( "Created time series table '{}'.", time_series_table_metadata.name @@ -266,12 +254,6 @@ impl Context { self.drop_table_from_storage_engine(table_name).await?; - // Drop the table metadata from the Delta Lake. - self.data_folders - .local_data_folder - .drop_table_metadata(table_name) - .await?; - // Drop the table from the Delta Lake. self.data_folders .local_data_folder diff --git a/crates/modelardb_server/src/storage/compressed_data_manager.rs b/crates/modelardb_server/src/storage/compressed_data_manager.rs index e8667cab..88931d12 100644 --- a/crates/modelardb_server/src/storage/compressed_data_manager.rs +++ b/crates/modelardb_server/src/storage/compressed_data_manager.rs @@ -388,7 +388,7 @@ mod tests { let local_data_folder = data_manager.local_data_folder.clone(); let mut delta_table = local_data_folder - .create_time_series_table(&table::time_series_table_metadata()) + .delta_table(TIME_SERIES_TABLE_NAME) .await .unwrap(); @@ -443,14 +443,8 @@ mod tests { #[tokio::test] async fn test_remaining_memory_incremented_when_saving_compressed_segments() { let (_temp_dir, data_manager) = create_compressed_data_manager().await; - let local_data_folder = data_manager.local_data_folder.clone(); let segments = compressed_segments_record_batch(); - local_data_folder - .create_time_series_table(&segments.time_series_table_metadata) - .await - .unwrap(); - data_manager .insert_compressed_segments(segments.clone()) .await @@ -497,14 +491,9 @@ mod tests { #[tokio::test] async fn test_decrease_compressed_remaining_memory_in_bytes() { let (_temp_dir, data_manager) = create_compressed_data_manager().await; - let local_data_folder = data_manager.local_data_folder.clone(); // Insert data that should be saved when the remaining memory is decreased. let segments = compressed_segments_record_batch(); - local_data_folder - .create_time_series_table(&segments.time_series_table_metadata) - .await - .unwrap(); data_manager .insert_compressed_segments(segments) .await @@ -559,7 +548,7 @@ mod tests { let time_series_table_metadata = table::time_series_table_metadata(); local_data_folder - .save_time_series_table_metadata(&time_series_table_metadata) + .create_time_series_table(&time_series_table_metadata) .await .unwrap(); diff --git a/crates/modelardb_server/src/storage/data_transfer.rs b/crates/modelardb_server/src/storage/data_transfer.rs index b61b1557..31d8471f 100644 --- a/crates/modelardb_server/src/storage/data_transfer.rs +++ b/crates/modelardb_server/src/storage/data_transfer.rs @@ -474,11 +474,6 @@ mod tests { .await .unwrap(); - local_data_folder - .save_normal_table_metadata(NORMAL_TABLE_NAME) - .await - .unwrap(); - // Create a time series table. let time_series_table_metadata = table::time_series_table_metadata(); local_data_folder @@ -486,11 +481,6 @@ mod tests { .await .unwrap(); - local_data_folder - .save_time_series_table_metadata(&time_series_table_metadata) - .await - .unwrap(); - (temp_dir, local_data_folder) } diff --git a/crates/modelardb_server/src/storage/uncompressed_data_manager.rs b/crates/modelardb_server/src/storage/uncompressed_data_manager.rs index a7274e81..c00800c7 100644 --- a/crates/modelardb_server/src/storage/uncompressed_data_manager.rs +++ b/crates/modelardb_server/src/storage/uncompressed_data_manager.rs @@ -1285,7 +1285,7 @@ mod tests { let time_series_table_metadata = table::time_series_table_metadata(); local_data_folder - .save_time_series_table_metadata(&time_series_table_metadata) + .create_time_series_table(&time_series_table_metadata) .await .unwrap(); diff --git a/crates/modelardb_storage/src/data_folder/cluster.rs b/crates/modelardb_storage/src/data_folder/cluster.rs index e21dbd9b..0c76e9ef 100644 --- a/crates/modelardb_storage/src/data_folder/cluster.rs +++ b/crates/modelardb_storage/src/data_folder/cluster.rs @@ -28,7 +28,7 @@ use uuid::Uuid; use crate::data_folder::DataFolder; use crate::error::Result; -use crate::{register_metadata_table, sql_and_concat}; +use crate::sql_and_concat; /// Trait that extends [`DataFolder`] to provide management of the Delta Lake for the cluster. #[allow(async_fn_in_trait)] @@ -51,27 +51,21 @@ impl ClusterMetadata for DataFolder { /// [`ModelarDbStorageError`](crate::error::ModelarDbStorageError). async fn create_and_register_cluster_metadata_tables(&self) -> Result<()> { // Create and register the cluster_metadata table if it does not exist. - let delta_table = self - .create_metadata_table( - "cluster_metadata", - &Schema::new(vec![Field::new("key", DataType::Utf8, false)]), - ) - .await?; - - register_metadata_table(self.session_context(), "cluster_metadata", delta_table)?; + self.create_and_register_metadata_table( + "cluster_metadata", + &Schema::new(vec![Field::new("key", DataType::Utf8, false)]), + ) + .await?; // Create and register the nodes table if it does not exist. - let delta_table = self - .create_metadata_table( - "nodes", - &Schema::new(vec![ - Field::new("url", DataType::Utf8, false), - Field::new("mode", DataType::Utf8, false), - ]), - ) - .await?; - - register_metadata_table(self.session_context(), "nodes", delta_table)?; + self.create_and_register_metadata_table( + "nodes", + &Schema::new(vec![ + Field::new("url", DataType::Utf8, false), + Field::new("mode", DataType::Utf8, false), + ]), + ) + .await?; Ok(()) } diff --git a/crates/modelardb_storage/src/data_folder/mod.rs b/crates/modelardb_storage/src/data_folder/mod.rs index 2d4ec8cd..ef3be87f 100644 --- a/crates/modelardb_storage/src/data_folder/mod.rs +++ b/crates/modelardb_storage/src/data_folder/mod.rs @@ -30,7 +30,7 @@ use arrow::datatypes::{DataType, Field, Schema}; use chrono::TimeDelta; use dashmap::DashMap; use datafusion::catalog::TableProvider; -use datafusion::common::{DFSchema, ToDFSchema}; +use datafusion::common::{DFSchema, TableReference, ToDFSchema}; use datafusion::datasource::sink::DataSink; use datafusion::logical_expr::{Expr, lit}; use datafusion::parquet::file::properties::WriterProperties; @@ -62,10 +62,8 @@ use url::Url; use uuid::Uuid; use crate::error::{ModelarDbStorageError, Result}; -use crate::{ - METADATA_FOLDER, TABLE_FOLDER, apache_parquet_writer_properties, register_metadata_table, - sql_and_concat, -}; +use crate::query::metadata_table::MetadataTable; +use crate::{METADATA_FOLDER, TABLE_FOLDER, apache_parquet_writer_properties, sql_and_concat}; /// Types of tables supported by ModelarDB. enum TableType { @@ -285,54 +283,37 @@ impl DataFolder { /// [`ModelarDbStorageError`]. async fn create_and_register_metadata_tables(&self) -> Result<()> { // Create and register the normal_table_metadata table if it does not exist. - let delta_table = self - .create_metadata_table( - "normal_table_metadata", - &Schema::new(vec![Field::new("table_name", DataType::Utf8, false)]), - ) - .await?; - - register_metadata_table(&self.session_context, "normal_table_metadata", delta_table)?; + self.create_and_register_metadata_table( + "normal_table_metadata", + &Schema::new(vec![Field::new("table_name", DataType::Utf8, false)]), + ) + .await?; // Create and register the time_series_table_metadata table if it does not exist. - let delta_table = self - .create_metadata_table( - "time_series_table_metadata", - &Schema::new(vec![ - Field::new("table_name", DataType::Utf8, false), - Field::new("query_schema", DataType::Binary, false), - ]), - ) - .await?; - - register_metadata_table( - &self.session_context, + self.create_and_register_metadata_table( "time_series_table_metadata", - delta_table, - )?; + &Schema::new(vec![ + Field::new("table_name", DataType::Utf8, false), + Field::new("query_schema", DataType::Binary, false), + ]), + ) + .await?; // Create and register the time_series_table_field_columns table if it does not exist. Note // that column_index will only use a maximum of 10 bits. generated_column_expr is NULL if // the fields are stored as segments. - let delta_table = self - .create_metadata_table( - "time_series_table_field_columns", - &Schema::new(vec![ - Field::new("table_name", DataType::Utf8, false), - Field::new("column_name", DataType::Utf8, false), - Field::new("column_index", DataType::Int16, false), - Field::new("error_bound_value", DataType::Float32, false), - Field::new("error_bound_is_relative", DataType::Boolean, false), - Field::new("generated_column_expr", DataType::Binary, true), - ]), - ) - .await?; - - register_metadata_table( - &self.session_context, + self.create_and_register_metadata_table( "time_series_table_field_columns", - delta_table, - )?; + &Schema::new(vec![ + Field::new("table_name", DataType::Utf8, false), + Field::new("column_name", DataType::Utf8, false), + Field::new("column_index", DataType::Int16, false), + Field::new("error_bound_value", DataType::Float32, false), + Field::new("error_bound_is_relative", DataType::Boolean, false), + Field::new("generated_column_expr", DataType::Binary, true), + ]), + ) + .await?; Ok(()) } @@ -512,10 +493,7 @@ impl DataFolder { /// Return a [`DeltaTableWriter`] for writing to the time series table corresponding to /// `delta_table` in the Delta Lake, or a [`ModelarDbStorageError`] if a connection to the Delta /// Lake cannot be established or the table does not exist. - pub async fn time_series_table_writer( - &self, - delta_table: DeltaTable, - ) -> Result { + async fn time_series_table_writer(&self, delta_table: DeltaTable) -> Result { let partition_columns = vec![FIELD_COLUMN.to_owned()]; // Specify that the file must be sorted by the tag columns and then by start_time. @@ -547,7 +525,7 @@ impl DataFolder { /// Return a [`DeltaTableWriter`] for writing to the table corresponding to `delta_table` in the /// Delta Lake, or a [`ModelarDbStorageError`] if a connection to the Delta Lake cannot be /// established or the table does not exist. - pub async fn normal_or_metadata_table_writer( + async fn normal_or_metadata_table_writer( &self, delta_table: DeltaTable, ) -> Result { @@ -555,22 +533,31 @@ impl DataFolder { DeltaTableWriter::try_new(delta_table, vec![], writer_properties) } - /// Create a Delta Lake table for a metadata table with `table_name` and `schema` if it does not - /// already exist. If the metadata table could not be created, [`ModelarDbStorageError`] is - /// returned. An error is not returned if the metadata table already exists. - pub async fn create_metadata_table( + /// Create a Delta Lake table for a metadata table with `table_name` and `schema` and register + /// it in the [`SessionContext`] in the `metadata` schema. If the table already exists, it is + /// reused. Return [`ModelarDbStorageError`] if the table cannot be created or cannot be + /// registered. + pub async fn create_and_register_metadata_table( &self, table_name: &str, schema: &Schema, - ) -> Result { - self.create_table( - table_name, - schema, - &[], - self.location_of_metadata_table(table_name), - SaveMode::Ignore, - ) - .await + ) -> Result<()> { + let delta_table = self + .create_delta_lake_table( + table_name, + schema, + &[], + self.location_of_metadata_table(table_name), + SaveMode::Ignore, + ) + .await?; + + let table_reference = TableReference::partial("metadata", table_name); + let metadata_table = Arc::new(MetadataTable::new(delta_table)); + self.session_context + .register_table(table_reference, metadata_table)?; + + Ok(()) } /// Return the location of the metadata table with `table_name`. @@ -578,39 +565,65 @@ impl DataFolder { format!("{}/{METADATA_FOLDER}/{table_name}", self.location) } - /// Create a Delta Lake table for a normal table with `table_name` and `schema` if it does not - /// already exist. If the normal table could not be created, e.g., because it already exists, - /// [`ModelarDbStorageError`] is returned. + /// Create a Delta Lake table for a normal table with `table_name` and `schema` and save the + /// table metadata to the `normal_table_metadata` table. If the table already exists or + /// the metadata could not be saved, return [`ModelarDbStorageError`], otherwise return + /// the created [`DeltaTable`]. pub async fn create_normal_table( &self, table_name: &str, schema: &Schema, ) -> Result { - self.create_table( - table_name, - schema, - &[], - self.location_of_table(table_name), - SaveMode::ErrorIfExists, + let delta_table = self + .create_delta_lake_table( + table_name, + schema, + &[], + self.location_of_table(table_name), + SaveMode::ErrorIfExists, + ) + .await?; + + self.save_normal_table_metadata(table_name).await?; + + Ok(delta_table) + } + + /// Save the created normal table to the Delta Lake. This consists of adding a row to the + /// `normal_table_metadata` table with the `name` of the table. If the normal table metadata was + /// saved, return [`Ok`], otherwise return [`ModelarDbStorageError`]. + async fn save_normal_table_metadata(&self, name: &str) -> Result<()> { + self.write_columns_to_metadata_table( + "normal_table_metadata", + vec![Arc::new(StringArray::from(vec![name]))], ) - .await + .await?; + + Ok(()) } - /// Create a Delta Lake table for a time series table with `time_series_table_metadata` if it - /// does not already exist. Returns [`DeltaTable`] if the table could be created and - /// [`ModelarDbStorageError`] if it could not. + /// Create a Delta Lake table for a time series table with `time_series_table_metadata` and + /// save the table metadata to the `time_series_table_metadata` table. If the table already + /// exists or the metadata could not be saved, return [`ModelarDbStorageError`], otherwise + /// return the created [`DeltaTable`]. pub async fn create_time_series_table( &self, time_series_table_metadata: &TimeSeriesTableMetadata, ) -> Result { - self.create_table( - &time_series_table_metadata.name, - &time_series_table_metadata.compressed_schema, - &[FIELD_COLUMN.to_owned()], - self.location_of_table(&time_series_table_metadata.name), - SaveMode::ErrorIfExists, - ) - .await + let delta_table = self + .create_delta_lake_table( + &time_series_table_metadata.name, + &time_series_table_metadata.compressed_schema, + &[FIELD_COLUMN.to_owned()], + self.location_of_table(&time_series_table_metadata.name), + SaveMode::ErrorIfExists, + ) + .await?; + + self.save_time_series_table_metadata(time_series_table_metadata) + .await?; + + Ok(delta_table) } /// Return the location of the table with `table_name`. @@ -618,10 +631,89 @@ impl DataFolder { format!("{}/{TABLE_FOLDER}/{table_name}", self.location) } + /// Save the created time series table to the Delta Lake. This includes adding a row to the + /// `time_series_table_metadata` table and adding a row to the `time_series_table_field_columns` + /// table for each field column. + async fn save_time_series_table_metadata( + &self, + time_series_table_metadata: &TimeSeriesTableMetadata, + ) -> Result<()> { + // Convert the query schema to bytes, so it can be saved in the Delta Lake. + let query_schema_bytes = + try_convert_schema_to_bytes(&time_series_table_metadata.query_schema)?; + + // Add a new row in the time_series_table_metadata table to persist the time series table. + self.write_columns_to_metadata_table( + "time_series_table_metadata", + vec![ + Arc::new(StringArray::from(vec![ + time_series_table_metadata.name.clone(), + ])), + Arc::new(BinaryArray::from_vec(vec![&query_schema_bytes])), + ], + ) + .await?; + + // Add a row for each field column to the time_series_table_field_columns table. + for (query_schema_index, field) in time_series_table_metadata + .query_schema + .fields() + .iter() + .enumerate() + { + if field.data_type() == &ArrowValue::DATA_TYPE { + // Convert the generated column expression to bytes, if it exists. + let maybe_generated_column_expr = match time_series_table_metadata + .generated_columns + .get(query_schema_index) + { + Some(Some(generated_column)) => { + Some(generated_column.expr.to_bytes()?.to_vec()) + } + _ => None, + }; + + // error_bounds matches schema and not query_schema to simplify looking up the error + // bound during ingestion as it occurs far more often than creation of time series tables. + let (error_bound_value, error_bound_is_relative) = if let Ok(schema_index) = + time_series_table_metadata.schema.index_of(field.name()) + { + match time_series_table_metadata.error_bounds[schema_index] { + ErrorBound::Absolute(value) => (value, false), + ErrorBound::Relative(value) => (value, true), + ErrorBound::Lossless => (0.0, false), + } + } else { + (0.0, false) + }; + + // query_schema_index is simply cast as a time series table contains at most 32767 columns. + self.write_columns_to_metadata_table( + "time_series_table_field_columns", + vec![ + Arc::new(StringArray::from(vec![ + time_series_table_metadata.name.clone(), + ])), + Arc::new(StringArray::from(vec![field.name().clone()])), + Arc::new(Int16Array::from(vec![query_schema_index as i16])), + Arc::new(Float32Array::from(vec![error_bound_value])), + Arc::new(BooleanArray::from(vec![error_bound_is_relative])), + Arc::new(BinaryArray::from_opt_vec(vec![ + maybe_generated_column_expr.as_deref(), + ])), + ], + ) + .await?; + } + } + + Ok(()) + } + /// Create a Delta Lake table with `table_name`, `schema`, and `partition_columns` if it does /// not already exist. Returns [`DeltaTable`] if the table could be created and /// [`ModelarDbStorageError`] if it could not. - async fn create_table( + async fn create_delta_lake_table( &self, table_name: &str, schema: &Schema, @@ -663,26 +755,52 @@ impl DataFolder { Ok(delta_table) } - /// Drop the metadata table with `table_name` from the Delta Lake by deleting every file related - /// to the table. The table folder cannot be deleted directly since folders do not exist in - /// object stores and therefore cannot be operated upon. If the table was dropped successfully, - /// the paths to the deleted files are returned, otherwise a [`ModelarDbStorageError`] is - /// returned. - pub async fn drop_metadata_table(&self, table_name: &str) -> Result> { - let table_path = format!("{METADATA_FOLDER}/{table_name}"); - self.delete_table_files(&table_path).await - } - - /// Drop the Delta Lake table with `table_name` from the Delta Lake by deleting every file - /// related to the table. The table folder cannot be deleted directly since folders do not exist - /// in object stores and therefore cannot be operated upon. If the table was dropped - /// successfully, the paths to the deleted files are returned, otherwise a - /// [`ModelarDbStorageError`] is returned. + /// Drop the Delta Lake table with `table_name` from the Delta Lake by deleting the table + /// metadata and deleting every file related to the table. The table folder cannot be deleted + /// directly since folders do not exist in object stores and therefore cannot be operated upon. + /// If the table was dropped successfully, the paths to the deleted files are returned, + /// otherwise a [`ModelarDbStorageError`] is returned. pub async fn drop_table(&self, table_name: &str) -> Result> { + self.delete_table_metadata(table_name).await?; + let table_path = format!("{TABLE_FOLDER}/{table_name}"); self.delete_table_files(&table_path).await } + /// Depending on the type of the table with `table_name`, delete either the normal table metadata + /// or the time series table metadata from the Delta Lake. If the table does not exist or the + /// metadata could not be deleted, [`ModelarDbStorageError`] is returned. + async fn delete_table_metadata(&self, table_name: &str) -> Result<()> { + if self.is_normal_table(table_name).await? { + let delta_table = self.metadata_delta_table("normal_table_metadata").await?; + + delta_table + .delete() + .with_predicate(col("table_name").eq(lit(table_name))) + .await?; + } else if self.is_time_series_table(table_name).await? { + // Delete the table metadata from the time_series_table_metadata table. + self.metadata_delta_table("time_series_table_metadata") + .await? + .delete() + .with_predicate(col("table_name").eq(lit(table_name))) + .await?; + + // Delete the column metadata from the time_series_table_field_columns table. + self.metadata_delta_table("time_series_table_field_columns") + .await? + .delete() + .with_predicate(col("table_name").eq(lit(table_name))) + .await?; + } else { + return Err(ModelarDbStorageError::InvalidArgument(format!( + "Table with name '{table_name}' does not exist." + ))); + } + + Ok(()) + } + /// Delete all files in the folder at `table_path` using bulk operations if available. If the /// files were deleted successfully, the paths to the deleted files are returned. async fn delete_table_files(&self, table_path: &str) -> Result> { @@ -744,98 +862,6 @@ impl DataFolder { Ok(()) } - /// Save the created normal table to the Delta Lake. This consists of adding a row to the - /// `normal_table_metadata` table with the `name` of the table. If the normal table metadata was - /// saved, return [`Ok`], otherwise return [`ModelarDbStorageError`]. - pub async fn save_normal_table_metadata(&self, name: &str) -> Result<()> { - self.write_columns_to_metadata_table( - "normal_table_metadata", - vec![Arc::new(StringArray::from(vec![name]))], - ) - .await?; - - Ok(()) - } - - /// Save the created time series table to the Delta Lake. This includes adding a row to the - /// `time_series_table_metadata` table and adding a row to the `time_series_table_field_columns` - /// table for each field column. - pub async fn save_time_series_table_metadata( - &self, - time_series_table_metadata: &TimeSeriesTableMetadata, - ) -> Result<()> { - // Convert the query schema to bytes, so it can be saved in the Delta Lake. - let query_schema_bytes = - try_convert_schema_to_bytes(&time_series_table_metadata.query_schema)?; - - // Add a new row in the time_series_table_metadata table to persist the time series table. - self.write_columns_to_metadata_table( - "time_series_table_metadata", - vec![ - Arc::new(StringArray::from(vec![ - time_series_table_metadata.name.clone(), - ])), - Arc::new(BinaryArray::from_vec(vec![&query_schema_bytes])), - ], - ) - .await?; - - // Add a row for each field column to the time_series_table_field_columns table. - for (query_schema_index, field) in time_series_table_metadata - .query_schema - .fields() - .iter() - .enumerate() - { - if field.data_type() == &ArrowValue::DATA_TYPE { - // Convert the generated column expression to bytes, if it exists. - let maybe_generated_column_expr = match time_series_table_metadata - .generated_columns - .get(query_schema_index) - { - Some(Some(generated_column)) => { - Some(generated_column.expr.to_bytes()?.to_vec()) - } - _ => None, - }; - - // error_bounds matches schema and not query_schema to simplify looking up the error - // bound during ingestion as it occurs far more often than creation of time series tables. - let (error_bound_value, error_bound_is_relative) = if let Ok(schema_index) = - time_series_table_metadata.schema.index_of(field.name()) - { - match time_series_table_metadata.error_bounds[schema_index] { - ErrorBound::Absolute(value) => (value, false), - ErrorBound::Relative(value) => (value, true), - ErrorBound::Lossless => (0.0, false), - } - } else { - (0.0, false) - }; - - // query_schema_index is simply cast as a time series table contains at most 32767 columns. - self.write_columns_to_metadata_table( - "time_series_table_field_columns", - vec![ - Arc::new(StringArray::from(vec![ - time_series_table_metadata.name.clone(), - ])), - Arc::new(StringArray::from(vec![field.name().clone()])), - Arc::new(Int16Array::from(vec![query_schema_index as i16])), - Arc::new(Float32Array::from(vec![error_bound_value])), - Arc::new(BooleanArray::from(vec![error_bound_is_relative])), - Arc::new(BinaryArray::from_opt_vec(vec![ - maybe_generated_column_expr.as_deref(), - ])), - ], - ) - .await?; - } - } - - Ok(()) - } - /// Write `columns` to a Delta Lake table with `table_name`. Returns an updated [`DeltaTable`] /// version if the file was written successfully, otherwise returns [`ModelarDbStorageError`]. pub async fn write_columns_to_metadata_table( @@ -895,57 +921,6 @@ impl DataFolder { } } - /// Depending on the type of the table with `table_name`, drop either the normal table metadata - /// or the time series table metadata from the Delta Lake. If the table does not exist or the - /// metadata could not be dropped, [`ModelarDbStorageError`] is returned. - pub async fn drop_table_metadata(&self, table_name: &str) -> Result<()> { - if self.is_normal_table(table_name).await? { - self.drop_normal_table_metadata(table_name).await - } else if self.is_time_series_table(table_name).await? { - self.drop_time_series_table_metadata(table_name).await - } else { - Err(ModelarDbStorageError::InvalidArgument(format!( - "Table with name '{table_name}' does not exist." - ))) - } - } - - /// Drop the metadata for the normal table with `table_name` from the `normal_table_metadata` - /// table in the Delta Lake. If the metadata could not be dropped, [`ModelarDbStorageError`] is - /// returned. - async fn drop_normal_table_metadata(&self, table_name: &str) -> Result<()> { - let delta_table = self.metadata_delta_table("normal_table_metadata").await?; - - delta_table - .delete() - .with_predicate(col("table_name").eq(lit(table_name))) - .await?; - - Ok(()) - } - - /// Drop the metadata for the time series table with `table_name` from the Delta Lake. This - /// includes deleting a row from the `time_series_table_metadata` table and deleting a row from - /// the `time_series_table_field_columns` table for each field column. If the metadata could not - /// be dropped, [`ModelarDbStorageError`] is returned. - async fn drop_time_series_table_metadata(&self, table_name: &str) -> Result<()> { - // Delete the table metadata from the time_series_table_metadata table. - self.metadata_delta_table("time_series_table_metadata") - .await? - .delete() - .with_predicate(col("table_name").eq(lit(table_name))) - .await?; - - // Delete the column metadata from the time_series_table_field_columns table. - self.metadata_delta_table("time_series_table_field_columns") - .await? - .delete() - .with_predicate(col("table_name").eq(lit(table_name))) - .await?; - - Ok(()) - } - /// Return the [`TimeSeriesTableMetadata`] of each time series table currently in the metadata /// Delta Lake. If the [`TimeSeriesTableMetadata`] cannot be retrieved, /// [`ModelarDbStorageError`] is returned. @@ -1320,13 +1295,13 @@ mod tests { #[tokio::test] async fn test_normal_table_is_normal_table() { - let (_temp_dir, data_folder) = create_data_folder_and_save_normal_tables().await; + let (_temp_dir, data_folder) = create_data_folder_and_create_normal_tables().await; assert!(data_folder.is_normal_table("normal_table_1").await.unwrap()); } #[tokio::test] async fn test_time_series_table_is_not_normal_table() { - let (_temp_dir, data_folder) = create_data_folder_and_save_time_series_table().await; + let (_temp_dir, data_folder) = create_data_folder_and_create_time_series_table().await; assert!( !data_folder .is_normal_table(test::TIME_SERIES_TABLE_NAME) @@ -1337,7 +1312,7 @@ mod tests { #[tokio::test] async fn test_time_series_table_is_time_series_table() { - let (_temp_dir, data_folder) = create_data_folder_and_save_time_series_table().await; + let (_temp_dir, data_folder) = create_data_folder_and_create_time_series_table().await; assert!( data_folder .is_time_series_table(test::TIME_SERIES_TABLE_NAME) @@ -1348,7 +1323,7 @@ mod tests { #[tokio::test] async fn test_normal_table_is_not_time_series_table() { - let (_temp_dir, data_folder) = create_data_folder_and_save_normal_tables().await; + let (_temp_dir, data_folder) = create_data_folder_and_create_normal_tables().await; assert!( !data_folder .is_time_series_table("normal_table_1") @@ -1359,11 +1334,11 @@ mod tests { #[tokio::test] async fn test_table_names() { - let (_temp_dir, data_folder) = create_data_folder_and_save_normal_tables().await; + let (_temp_dir, data_folder) = create_data_folder_and_create_normal_tables().await; let time_series_table_metadata = test::time_series_table_metadata(); data_folder - .save_time_series_table_metadata(&time_series_table_metadata) + .create_time_series_table(&time_series_table_metadata) .await .unwrap(); @@ -1380,7 +1355,7 @@ mod tests { #[tokio::test] async fn test_normal_table_names() { - let (_temp_dir, data_folder) = create_data_folder_and_save_normal_tables().await; + let (_temp_dir, data_folder) = create_data_folder_and_create_normal_tables().await; let normal_table_names = data_folder.normal_table_names().await.unwrap(); assert_eq!(normal_table_names, vec!["normal_table_2", "normal_table_1"]); @@ -1388,17 +1363,19 @@ mod tests { #[tokio::test] async fn test_time_series_table_names() { - let (_temp_dir, data_folder) = create_data_folder_and_save_time_series_table().await; + let (_temp_dir, data_folder) = create_data_folder_and_create_time_series_table().await; let time_series_table_names = data_folder.time_series_table_names().await.unwrap(); assert_eq!(time_series_table_names, vec![test::TIME_SERIES_TABLE_NAME]); } #[tokio::test] - async fn test_save_normal_table_metadata() { - let (_temp_dir, data_folder) = create_data_folder_and_save_normal_tables().await; + async fn test_create_normal_table() { + let (_temp_dir, data_folder) = create_data_folder_and_create_normal_tables().await; + + assert!(data_folder.delta_table("normal_table_1").await.is_ok()); - // Retrieve the normal table from the Delta Lake. + // Retrieve the normal table metadata from the Delta Lake. let sql = "SELECT table_name FROM metadata.normal_table_metadata ORDER BY table_name"; let batch = sql_and_concat(&data_folder.session_context, sql) .await @@ -1411,8 +1388,15 @@ mod tests { } #[tokio::test] - async fn test_save_time_series_table_metadata() { - let (_temp_dir, data_folder) = create_data_folder_and_save_time_series_table().await; + async fn test_create_time_series_table() { + let (_temp_dir, data_folder) = create_data_folder_and_create_time_series_table().await; + + assert!( + data_folder + .delta_table(test::TIME_SERIES_TABLE_NAME) + .await + .is_ok() + ); // Check that a row has been added to the time_series_table_metadata table. let sql = "SELECT table_name, query_schema FROM metadata.time_series_table_metadata"; @@ -1460,13 +1444,12 @@ mod tests { } #[tokio::test] - async fn test_drop_normal_table_metadata() { - let (_temp_dir, data_folder) = create_data_folder_and_save_normal_tables().await; + async fn test_drop_normal_table() { + let (_temp_dir, data_folder) = create_data_folder_and_create_normal_tables().await; - data_folder - .drop_table_metadata("normal_table_2") - .await - .unwrap(); + data_folder.drop_table("normal_table_2").await.unwrap(); + + assert!(data_folder.delta_table("normal_table_2").await.is_err()); // Verify that normal_table_2 was deleted from the normal_table_metadata table. let sql = "SELECT table_name FROM metadata.normal_table_metadata"; @@ -1478,14 +1461,21 @@ mod tests { } #[tokio::test] - async fn test_drop_time_series_table_metadata() { - let (_temp_dir, data_folder) = create_data_folder_and_save_time_series_table().await; + async fn test_drop_time_series_table() { + let (_temp_dir, data_folder) = create_data_folder_and_create_time_series_table().await; data_folder - .drop_table_metadata(test::TIME_SERIES_TABLE_NAME) + .drop_table(test::TIME_SERIES_TABLE_NAME) .await .unwrap(); + assert!( + data_folder + .delta_table(test::TIME_SERIES_TABLE_NAME) + .await + .is_err() + ); + // Verify that the time series table was deleted from the time_series_table_metadata table. let sql = "SELECT table_name FROM metadata.time_series_table_metadata"; let batch = sql_and_concat(&data_folder.session_context, sql) @@ -1504,28 +1494,24 @@ mod tests { } #[tokio::test] - async fn test_drop_table_metadata_for_missing_table() { - let (_temp_dir, data_folder) = create_data_folder_and_save_normal_tables().await; + async fn test_drop_missing_table() { + let (_temp_dir, data_folder) = create_data_folder_and_create_normal_tables().await; - assert!( - data_folder - .drop_table_metadata("missing_table") - .await - .is_err() - ); + assert!(data_folder.drop_table("missing_table").await.is_err()); } - async fn create_data_folder_and_save_normal_tables() -> (TempDir, DataFolder) { + async fn create_data_folder_and_create_normal_tables() -> (TempDir, DataFolder) { let temp_dir = tempfile::tempdir().unwrap(); let data_folder = DataFolder::open_local(temp_dir.path()).await.unwrap(); + let normal_table_schema = test::normal_table_schema(); data_folder - .save_normal_table_metadata("normal_table_1") + .create_normal_table("normal_table_1", &normal_table_schema) .await .unwrap(); data_folder - .save_normal_table_metadata("normal_table_2") + .create_normal_table("normal_table_2", &normal_table_schema) .await .unwrap(); @@ -1534,7 +1520,7 @@ mod tests { #[tokio::test] async fn test_time_series_table_metadata() { - let (_temp_dir, data_folder) = create_data_folder_and_save_time_series_table().await; + let (_temp_dir, data_folder) = create_data_folder_and_create_time_series_table().await; let time_series_table_metadata = data_folder.time_series_table_metadata().await.unwrap(); @@ -1546,7 +1532,7 @@ mod tests { #[tokio::test] async fn test_time_series_table_metadata_for_existing_time_series_table() { - let (_temp_dir, data_folder) = create_data_folder_and_save_time_series_table().await; + let (_temp_dir, data_folder) = create_data_folder_and_create_time_series_table().await; let time_series_table_metadata = data_folder .time_series_table_metadata_for_time_series_table(test::TIME_SERIES_TABLE_NAME) @@ -1561,7 +1547,7 @@ mod tests { #[tokio::test] async fn test_time_series_table_metadata_for_missing_time_series_table() { - let (_temp_dir, data_folder) = create_data_folder_and_save_time_series_table().await; + let (_temp_dir, data_folder) = create_data_folder_and_create_time_series_table().await; let time_series_table_metadata = data_folder .time_series_table_metadata_for_time_series_table("missing_table") @@ -1572,7 +1558,7 @@ mod tests { #[tokio::test] async fn test_error_bound() { - let (_temp_dir, data_folder) = create_data_folder_and_save_time_series_table().await; + let (_temp_dir, data_folder) = create_data_folder_and_create_time_series_table().await; let error_bounds = data_folder .error_bounds(test::TIME_SERIES_TABLE_NAME, 4) @@ -1629,7 +1615,7 @@ mod tests { .unwrap(); data_folder - .save_time_series_table_metadata(&time_series_table_metadata) + .create_time_series_table(&time_series_table_metadata) .await .unwrap(); @@ -1657,14 +1643,13 @@ mod tests { ); } - async fn create_data_folder_and_save_time_series_table() -> (TempDir, DataFolder) { + async fn create_data_folder_and_create_time_series_table() -> (TempDir, DataFolder) { let temp_dir = tempfile::tempdir().unwrap(); let data_folder = DataFolder::open_local(temp_dir.path()).await.unwrap(); - // Save a time series table to the Delta Lake. let time_series_table_metadata = test::time_series_table_metadata(); data_folder - .save_time_series_table_metadata(&time_series_table_metadata) + .create_time_series_table(&time_series_table_metadata) .await .unwrap(); diff --git a/crates/modelardb_storage/src/lib.rs b/crates/modelardb_storage/src/lib.rs index ad1cd3e7..6ff9f993 100644 --- a/crates/modelardb_storage/src/lib.rs +++ b/crates/modelardb_storage/src/lib.rs @@ -41,7 +41,6 @@ use datafusion::parquet::basic::{Compression, Encoding, ZstdLevel}; use datafusion::parquet::errors::ParquetError; use datafusion::parquet::file::properties::{EnabledStatistics, WriterProperties}; use datafusion::prelude::SessionContext; -use datafusion::sql::TableReference; use datafusion::sql::parser::Statement as DFStatement; use deltalake::DeltaTable; use deltalake::parquet::file::metadata::SortingColumn; @@ -52,7 +51,6 @@ use object_store::path::Path; use sqlparser::ast::Statement; use crate::error::Result; -use crate::query::metadata_table::MetadataTable; use crate::query::normal_table::NormalTable; use crate::query::time_series_table::TimeSeriesTable; @@ -88,21 +86,6 @@ pub fn create_session_context() -> SessionContext { session_context } -/// Register the metadata table stored in `delta_table` with `table_name` in `session_context`. If -/// the metadata table could not be registered with Apache DataFusion, return -/// [`ModelarDbStorageError`](error::ModelarDbStorageError). -pub fn register_metadata_table( - session_context: &SessionContext, - table_name: &str, - delta_table: DeltaTable, -) -> Result<()> { - let table_reference = TableReference::partial("metadata", table_name); - let metadata_table = Arc::new(MetadataTable::new(delta_table)); - session_context.register_table(table_reference, metadata_table)?; - - Ok(()) -} - /// Register the normal table stored in `delta_table` with `table_name` and `data_sink` in /// `session_context`. If the normal table could not be registered with Apache DataFusion, return /// [`ModelarDbStorageError`](error::ModelarDbStorageError).