diff --git a/Cargo.lock b/Cargo.lock index 899f7205..aced4a3f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3403,6 +3403,8 @@ name = "modelardb_test" version = "0.1.0" dependencies = [ "arrow", + "async-trait", + "datafusion", "modelardb_types", "rand 0.9.2", ] diff --git a/crates/modelardb_bulkloader/src/main.rs b/crates/modelardb_bulkloader/src/main.rs index 0a6f9fcc..5e153eb4 100644 --- a/crates/modelardb_bulkloader/src/main.rs +++ b/crates/modelardb_bulkloader/src/main.rs @@ -38,7 +38,8 @@ use deltalake::{ObjectStore, Path}; use futures::stream::StreamExt; use modelardb_embedded::error::{ModelarDbEmbeddedError, Result}; use modelardb_embedded::operations::Operations; -use modelardb_storage::data_folder::{DataFolder, DeltaTableWriter}; +use modelardb_storage::data_folder::DataFolder; +use modelardb_storage::data_folder::delta_table_writer::DeltaTableWriter; use modelardb_types::types::TimeSeriesTableMetadata; use sysinfo::System; diff --git a/crates/modelardb_embedded/src/operations/data_folder.rs b/crates/modelardb_embedded/src/operations/data_folder.rs index 7f9a5c7b..92700c89 100644 --- a/crates/modelardb_embedded/src/operations/data_folder.rs +++ b/crates/modelardb_embedded/src/operations/data_folder.rs @@ -215,7 +215,7 @@ impl Operations for DataFolder { &uncompressed_data, )?; - self.write_compressed_segments_to_time_series_table(table_name, compressed_data) + self.write_record_batches(table_name, compressed_data) .await?; } else if let Some(normal_table_schema) = self.normal_table_schema(table_name).await { // Normal table. @@ -223,7 +223,7 @@ impl Operations for DataFolder { return Err(schema_mismatch_error); } - self.write_record_batches_to_normal_table(table_name, vec![uncompressed_data]) + self.write_record_batches(table_name, vec![uncompressed_data]) .await?; } else { return Err(ModelarDbEmbeddedError::InvalidArgument(format!( @@ -282,7 +282,7 @@ impl Operations for DataFolder { let record_batches = common::collect(record_batch_stream).await?; target_data_folder - .write_record_batches_to_normal_table(target_table_name, record_batches) + .write_record_batches(target_table_name, record_batches) .await?; Ok(()) @@ -407,7 +407,7 @@ impl Operations for DataFolder { // Write read data to target_table_name in target. target_data_folder - .write_compressed_segments_to_time_series_table(target_table_name, record_batches) + .write_record_batches(target_table_name, record_batches) .await?; Ok(()) @@ -455,7 +455,7 @@ impl Operations for DataFolder { let record_batches: Vec = stream.try_collect().await?; target_data_folder - .write_compressed_segments_to_time_series_table(target_table_name, record_batches) + .write_record_batches(target_table_name, record_batches) .await?; } else if let (Some(source_normal_table_schema), Some(target_normal_table_schema)) = ( self.normal_table_schema(source_table_name).await, @@ -474,7 +474,7 @@ impl Operations for DataFolder { let record_batches: Vec = stream.try_collect().await?; target_data_folder - .write_record_batches_to_normal_table(target_table_name, record_batches) + .write_record_batches(target_table_name, record_batches) .await?; } else { return Err(ModelarDbEmbeddedError::InvalidArgument(format!( diff --git a/crates/modelardb_server/src/context.rs b/crates/modelardb_server/src/context.rs index c015e925..c88b7be6 100644 --- a/crates/modelardb_server/src/context.rs +++ b/crates/modelardb_server/src/context.rs @@ -747,10 +747,7 @@ mod tests { // Write data to the normal table. let local_data_folder = &context.data_folders.local_data_folder; local_data_folder - .write_record_batches_to_normal_table( - NORMAL_TABLE_NAME, - vec![table::normal_table_record_batch()], - ) + .write_record_batches(NORMAL_TABLE_NAME, vec![table::normal_table_record_batch()]) .await .unwrap(); @@ -818,7 +815,7 @@ mod tests { // Write data to the time series table. let local_data_folder = &context.data_folders.local_data_folder; local_data_folder - .write_compressed_segments_to_time_series_table( + .write_record_batches( TIME_SERIES_TABLE_NAME, vec![table::compressed_segments_record_batch()], ) diff --git a/crates/modelardb_server/src/storage/compressed_data_manager.rs b/crates/modelardb_server/src/storage/compressed_data_manager.rs index 88931d12..52a42869 100644 --- a/crates/modelardb_server/src/storage/compressed_data_manager.rs +++ b/crates/modelardb_server/src/storage/compressed_data_manager.rs @@ -87,7 +87,7 @@ impl CompressedDataManager { let record_batch_size_in_bytes = record_batch.get_array_memory_size(); self.local_data_folder - .write_record_batches_to_normal_table(table_name, vec![record_batch]) + .write_record_batches(table_name, vec![record_batch]) .await?; // Inform the data transfer component about the new data if a remote data folder was @@ -246,7 +246,7 @@ impl CompressedDataManager { let compressed_data_buffer_size_in_bytes = compressed_data_buffer.size_in_bytes; let compressed_segments = compressed_data_buffer.record_batches(); self.local_data_folder - .write_compressed_segments_to_time_series_table(table_name, compressed_segments) + .write_record_batches(table_name, compressed_segments) .await?; // Inform the data transfer component about the new data if a remote data folder was diff --git a/crates/modelardb_server/src/storage/data_transfer.rs b/crates/modelardb_server/src/storage/data_transfer.rs index 31d8471f..cad3ff90 100644 --- a/crates/modelardb_server/src/storage/data_transfer.rs +++ b/crates/modelardb_server/src/storage/data_transfer.rs @@ -244,19 +244,9 @@ impl DataTransfer { debug!("Transferring {current_size_in_bytes} bytes for the table '{table_name}'.",); // Write the data to the remote Delta Lake. - if self - .local_data_folder - .is_time_series_table(table_name) - .await? - { - self.remote_data_folder - .write_compressed_segments_to_time_series_table(table_name, record_batches) - .await?; - } else { - self.remote_data_folder - .write_record_batches_to_normal_table(table_name, record_batches) - .await?; - } + self.remote_data_folder + .write_record_batches(table_name, record_batches) + .await?; // Delete the data that has been transferred to the remote Delta Lake. self.local_data_folder.truncate_table(table_name).await?; @@ -493,16 +483,13 @@ mod tests { for _ in 0..batch_write_count { // Write to the normal table. local_data_folder - .write_record_batches_to_normal_table( - NORMAL_TABLE_NAME, - vec![table::normal_table_record_batch()], - ) + .write_record_batches(NORMAL_TABLE_NAME, vec![table::normal_table_record_batch()]) .await .unwrap(); // Write to the time series table. local_data_folder - .write_compressed_segments_to_time_series_table( + .write_record_batches( TIME_SERIES_TABLE_NAME, vec![table::compressed_segments_record_batch()], ) diff --git a/crates/modelardb_storage/src/data_folder/delta_table_writer.rs b/crates/modelardb_storage/src/data_folder/delta_table_writer.rs new file mode 100644 index 00000000..8501c4f7 --- /dev/null +++ b/crates/modelardb_storage/src/data_folder/delta_table_writer.rs @@ -0,0 +1,431 @@ +/* Copyright 2026 The ModelarDB Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +//! Implementation of [`DeltaTableWriter`] for transactionally writing +//! [`RecordBatches`](RecordBatch) to a Delta table stored in an object store. Writing can be +//! committed or rolled back to ensure that the Delta table is always in a consistent state. + +use std::sync::Arc; + +use arrow::array::RecordBatch; +use arrow::datatypes::Schema; +use datafusion::catalog::TableProvider; +use datafusion::parquet::file::metadata::SortingColumn; +use datafusion::parquet::file::properties::WriterProperties; +use delta_kernel::table_properties::DataSkippingNumIndexedCols; +use deltalake::DeltaTable; +use deltalake::delta_datafusion::DeltaDataChecker; +use deltalake::kernel::transaction::{CommitBuilder, CommitProperties}; +use deltalake::kernel::{Action, Add}; +use deltalake::operations::write::writer::{DeltaWriter, WriterConfig}; +use deltalake::protocol::{DeltaOperation, SaveMode}; +use modelardb_types::schemas::{COMPRESSED_SCHEMA, FIELD_COLUMN}; +use object_store::ObjectStore; +use object_store::path::Path; +use uuid::Uuid; + +use crate::apache_parquet_writer_properties; +use crate::error::{ModelarDbStorageError, Result}; + +/// Functionality for transactionally writing [`RecordBatches`](RecordBatch) to a Delta table stored +/// in an object store. +pub struct DeltaTableWriter { + /// Delta table that all of the record batches will be written to. + delta_table: DeltaTable, + /// Checker that ensures all of the record batches match the table. + delta_data_checker: DeltaDataChecker, + /// Write operation that will be committed to the Delta table. + delta_operation: DeltaOperation, + /// Unique identifier for this write operation to the Delta table. + operation_id: Uuid, + /// Writes record batches to the Delta table as Apache Parquet files. + delta_writer: DeltaWriter, +} + +impl DeltaTableWriter { + /// Create a [`DeltaTableWriter`] configured for writing to a normal table. + pub(crate) fn try_new_for_normal_table(delta_table: DeltaTable) -> Result { + let writer_properties = apache_parquet_writer_properties(None); + Self::try_new(delta_table, vec![], writer_properties) + } + + /// Create a [`DeltaTableWriter`] configured for writing to a time series table. + pub(crate) fn try_new_for_time_series_table(delta_table: DeltaTable) -> Result { + let partition_columns = vec![FIELD_COLUMN.to_owned()]; + + // Specify that the file must be sorted by the tag columns and then by start_time. + let base_compressed_schema_len = COMPRESSED_SCHEMA.0.fields().len(); + let compressed_schema_len = TableProvider::schema(&delta_table).fields().len(); + let sorting_columns_len = (compressed_schema_len - base_compressed_schema_len) + 1; + let mut sorting_columns = Vec::with_capacity(sorting_columns_len); + + // Compressed segments have the tag columns at the end of the schema. + for tag_column_index in base_compressed_schema_len..compressed_schema_len { + sorting_columns.push(SortingColumn { + column_idx: tag_column_index as i32, + descending: false, + nulls_first: false, + }); + } + + // Compressed segments store the first timestamp in the second column. + sorting_columns.push(SortingColumn { + column_idx: 1, + descending: false, + nulls_first: false, + }); + + let writer_properties = apache_parquet_writer_properties(Some(sorting_columns)); + Self::try_new(delta_table, partition_columns, writer_properties) + } + + /// Create a new [`DeltaTableWriter`]. Returns a [`ModelarDbStorageError`] if the state of the + /// Delta table cannot be loaded from `delta_table`. + pub fn try_new( + delta_table: DeltaTable, + partition_columns: Vec, + writer_properties: WriterProperties, + ) -> Result { + // Checks whether record batches match the table’s invariants, constraints, and nullability. + let delta_table_state = delta_table.snapshot()?; + let snapshot = delta_table_state.snapshot(); + let delta_data_checker = DeltaDataChecker::new(snapshot); + + // Operation that will be committed. + let delta_operation = DeltaOperation::Write { + mode: SaveMode::Append, + partition_by: if partition_columns.is_empty() { + None + } else { + Some(partition_columns.clone()) + }, + predicate: None, + }; + + // A UUID version 4 is used as the operation id to match the existing Operation trait in the + // deltalake crate as it is pub(crate) and thus cannot be used directly in DeltaTableWriter. + let operation_id = Uuid::new_v4(); + + // Writer that will write the record batches. + let object_store = delta_table.log_store().object_store(Some(operation_id)); + let table_schema: Arc = TableProvider::schema(&delta_table); + let num_indexed_cols = + DataSkippingNumIndexedCols::NumColumns(table_schema.fields.len() as u64); + let writer_config = WriterConfig::new( + table_schema, + partition_columns, + Some(writer_properties), + None, + None, + num_indexed_cols, + None, + ); + let delta_writer = DeltaWriter::new(object_store, writer_config); + + Ok(Self { + delta_table, + delta_data_checker, + delta_operation, + operation_id, + delta_writer, + }) + } + + /// Write `record_batch` to the Delta table. Returns a [`ModelarDbStorageError`] if the + /// [`RecordBatches`](RecordBatch) does not match the schema of the Delta table or if the + /// writing fails. + pub async fn write(&mut self, record_batch: &RecordBatch) -> Result<()> { + self.delta_data_checker.check_batch(record_batch).await?; + self.delta_writer.write(record_batch).await?; + Ok(()) + } + + /// Write all `record_batches` to the Delta table. Returns a [`ModelarDbStorageError`] if one of + /// the [`RecordBatches`](RecordBatch) does not match the schema of the Delta table or if the + /// writing fails. + pub async fn write_all(&mut self, record_batches: &[RecordBatch]) -> Result<()> { + for record_batch in record_batches { + self.write(record_batch).await?; + } + Ok(()) + } + + /// Write all `record_batches` and commit. If writing fails, roll back all writes and return + /// [`ModelarDbStorageError`]. Returns the updated [`DeltaTable`] if all `record_batches` are + /// written and committed successfully. + pub async fn write_all_and_commit( + mut self, + record_batches: &[RecordBatch], + ) -> Result { + match self.write_all(record_batches).await { + Ok(_) => self.commit().await, + Err(error) => { + self.rollback().await?; + Err(error) + } + } + } + + /// Consume the [`DeltaTableWriter`], finish the writing, and commit the files that have been + /// written to the log. If an error occurs before the commit is finished, the already written + /// files are deleted if possible. Returns a [`ModelarDbStorageError`] if an error occurs when + /// finishing the writing, committing the files that have been written, deleting the written + /// files, or updating the [`DeltaTable`]. + pub async fn commit(mut self) -> Result { + // Write the remaining buffered files. + let added_files = self.delta_writer.close().await?; + + // Clone added_files in case of rollback. + let actions = added_files + .clone() + .into_iter() + .map(Action::Add) + .collect::>(); + + // Prepare all inputs to the commit. + let object_store = self.delta_table.object_store(); + let commit_properties = CommitProperties::default(); + let table_data = match self.delta_table.snapshot() { + Ok(table_data) => table_data, + Err(delta_table_error) => { + delete_added_files(&object_store, added_files).await?; + return Err(ModelarDbStorageError::DeltaLake(delta_table_error)); + } + }; + let log_store = self.delta_table.log_store(); + + // Construct the commit to be written. + let commit_builder = CommitBuilder::from(commit_properties) + .with_actions(actions) + .with_operation_id(self.operation_id) + .build(Some(table_data), log_store, self.delta_operation); + + // Write the commit to the Delta table. + let _finalized_commit = match commit_builder.await { + Ok(finalized_commit) => finalized_commit, + Err(delta_table_error) => { + delete_added_files(&object_store, added_files).await?; + return Err(ModelarDbStorageError::DeltaLake(delta_table_error)); + } + }; + + // Return Delta table with the commit. + self.delta_table.load().await?; + Ok(self.delta_table) + } + + /// Consume the [`DeltaTableWriter`], abort the writing, and delete all of the files that have + /// already been written. Returns a [`ModelarDbStorageError`] if an error occurs when aborting + /// the writing or deleting the files that have already been written. Rollback is not called + /// automatically as drop() is not async and async_drop() is not yet a stable API. + pub async fn rollback(self) -> Result { + let object_store = self.delta_table.object_store(); + let added_files = self.delta_writer.close().await?; + delete_added_files(&object_store, added_files).await?; + Ok(self.delta_table) + } +} + +/// Delete the `added_files` from `object_store`. Returns a [`ModelarDbStorageError`] if a file +/// could not be deleted. It is a function instead of a method on [`DeltaTableWriter`] so it can be +/// called by [`DeltaTableWriter`] after the [`DeltaWriter`] is closed without lifetime issues. +async fn delete_added_files(object_store: &dyn ObjectStore, added_files: Vec) -> Result<()> { + for add_file in added_files { + let path: Path = Path::from(add_file.path); + object_store.delete(&path).await?; + } + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + use modelardb_test::table as test; + use modelardb_test::table::{NORMAL_TABLE_NAME, TIME_SERIES_TABLE_NAME}; + use tempfile::TempDir; + + use crate::data_folder::DataFolder; + use crate::sql_and_concat; + + // Tests for DeltaTableWriter. + #[tokio::test] + async fn test_try_new() { + let (_temp_dir, data_folder) = create_data_folder_with_normal_table().await; + let delta_table = data_folder.delta_table(NORMAL_TABLE_NAME).await.unwrap(); + let writer = + DeltaTableWriter::try_new(delta_table, vec![], WriterProperties::default()).unwrap(); + + let delta_table = writer + .write_all_and_commit(&[test::normal_table_record_batch()]) + .await + .unwrap(); + + assert_eq!(delta_table.get_file_uris().unwrap().count(), 1); + } + + #[tokio::test] + async fn test_write_and_commit() { + let (_temp_dir, _data_folder, mut writer) = create_time_series_table_writer().await; + + writer + .write(&test::compressed_segments_record_batch()) + .await + .unwrap(); + + let delta_table = writer.commit().await.unwrap(); + assert_eq!(delta_table.get_file_uris().unwrap().count(), 1); + } + + #[tokio::test] + async fn test_write_empty_record_batch() { + let (_temp_dir, _data_folder, mut writer) = create_time_series_table_writer().await; + + let empty_batch = + RecordBatch::new_empty(test::time_series_table_metadata().compressed_schema); + writer.write(&empty_batch).await.unwrap(); + + let delta_table = writer.commit().await.unwrap(); + assert_eq!(delta_table.get_file_uris().unwrap().count(), 0); + } + + #[tokio::test] + async fn test_write_with_schema_mismatch() { + let (_temp_dir, _data_folder, mut writer) = create_time_series_table_writer().await; + + let result = writer.write(&test::normal_table_record_batch()).await; + + assert!( + result + .unwrap_err() + .to_string() + .starts_with("Delta Lake Error: Attempted to write invalid data to the table:") + ); + } + + #[tokio::test] + async fn test_write_all_and_commit() { + let (_temp_dir, data_folder, writer) = create_time_series_table_writer().await; + + let batch = test::compressed_segments_record_batch(); + let delta_table = writer + .write_all_and_commit(&[batch.clone(), batch]) + .await + .unwrap(); + + assert_eq!(delta_table.get_file_uris().unwrap().count(), 1); + + // Verify both batches were written. + data_folder + .session_context() + .register_table(TIME_SERIES_TABLE_NAME, Arc::new(delta_table)) + .unwrap(); + + let result = sql_and_concat( + data_folder.session_context(), + &format!("SELECT * FROM {TIME_SERIES_TABLE_NAME}"), + ) + .await + .unwrap(); + + assert_eq!(result.num_rows(), 6); + } + + #[tokio::test] + async fn test_write_all_and_commit_rolls_back_on_error() { + let (temp_dir, data_folder, writer) = create_time_series_table_writer().await; + + let valid_batch = test::compressed_segments_record_batch(); + let invalid_batch = test::normal_table_record_batch(); + let result = writer + .write_all_and_commit(&[valid_batch, invalid_batch]) + .await; + + assert!( + result + .unwrap_err() + .to_string() + .starts_with("Delta Lake Error: Attempted to write invalid data to the table:") + ); + + // Verify no commit was made. + let delta_table = data_folder + .delta_table(TIME_SERIES_TABLE_NAME) + .await + .unwrap(); + assert_eq!(delta_table.get_file_uris().unwrap().count(), 0); + + // Verify the physical files were cleaned up from the partition folder. + let column_path = format!( + "{}/tables/{TIME_SERIES_TABLE_NAME}/field_column=0", + temp_dir.path().to_str().unwrap() + ); + assert_eq!(std::fs::read_dir(&column_path).unwrap().count(), 0); + } + + #[tokio::test] + async fn test_commit_without_writes() { + let (_temp_dir, _data_folder, writer) = create_time_series_table_writer().await; + + let delta_table = writer.commit().await.unwrap(); + assert_eq!(delta_table.get_file_uris().unwrap().count(), 0); + } + + #[tokio::test] + async fn test_rollback() { + let (temp_dir, _data_folder, mut writer) = create_time_series_table_writer().await; + + writer + .write(&test::compressed_segments_record_batch()) + .await + .unwrap(); + + let delta_table = writer.rollback().await.unwrap(); + assert_eq!(delta_table.get_file_uris().unwrap().count(), 0); + + // Verify the physical files were cleaned up from the partition folder. + let column_path = format!( + "{}/tables/{TIME_SERIES_TABLE_NAME}/field_column=0", + temp_dir.path().to_str().unwrap() + ); + assert_eq!(std::fs::read_dir(&column_path).unwrap().count(), 0); + } + + async fn create_data_folder_with_normal_table() -> (TempDir, DataFolder) { + let temp_dir = tempfile::tempdir().unwrap(); + let data_folder = DataFolder::open_local(temp_dir.path()).await.unwrap(); + + data_folder + .create_normal_table(NORMAL_TABLE_NAME, &test::normal_table_schema()) + .await + .unwrap(); + + (temp_dir, data_folder) + } + + async fn create_time_series_table_writer() -> (TempDir, DataFolder, DeltaTableWriter) { + let temp_dir = tempfile::tempdir().unwrap(); + let data_folder = DataFolder::open_local(temp_dir.path()).await.unwrap(); + + let delta_table = data_folder + .create_time_series_table(&test::time_series_table_metadata()) + .await + .unwrap(); + + let writer = DeltaTableWriter::try_new_for_time_series_table(delta_table).unwrap(); + + (temp_dir, data_folder, writer) + } +} diff --git a/crates/modelardb_storage/src/data_folder/mod.rs b/crates/modelardb_storage/src/data_folder/mod.rs index ef3be87f..9dc814e8 100644 --- a/crates/modelardb_storage/src/data_folder/mod.rs +++ b/crates/modelardb_storage/src/data_folder/mod.rs @@ -16,11 +16,12 @@ //! Implementation of the type used to interact with local and remote storage through a Delta Lake. pub mod cluster; +pub mod delta_table_writer; use std::collections::HashMap; +use std::env; use std::path::Path as StdPath; use std::sync::Arc; -use std::{env, fs}; use arrow::array::{ ArrayRef, ArrowPrimitiveType, BinaryArray, BooleanArray, Float32Array, Int16Array, RecordBatch, @@ -33,22 +34,16 @@ use datafusion::catalog::TableProvider; use datafusion::common::{DFSchema, TableReference, ToDFSchema}; use datafusion::datasource::sink::DataSink; use datafusion::logical_expr::{Expr, lit}; -use datafusion::parquet::file::properties::WriterProperties; use datafusion::prelude::{SessionContext, col}; use datafusion_proto::bytes::Serializeable; use delta_kernel::engine::arrow_conversion::TryIntoKernel; -use delta_kernel::table_properties::DataSkippingNumIndexedCols; -use deltalake::delta_datafusion::DeltaDataChecker; -use deltalake::kernel::transaction::{CommitBuilder, CommitProperties}; -use deltalake::kernel::{Action, Add, StructField}; +use deltalake::kernel::StructField; use deltalake::operations::create::CreateBuilder; -use deltalake::operations::write::writer::{DeltaWriter, WriterConfig}; -use deltalake::parquet::file::metadata::SortingColumn; -use deltalake::protocol::{DeltaOperation, SaveMode}; +use deltalake::protocol::SaveMode; use deltalake::{DeltaTable, DeltaTableError}; use futures::{StreamExt, TryStreamExt}; use modelardb_types::functions::{try_convert_bytes_to_schema, try_convert_schema_to_bytes}; -use modelardb_types::schemas::{COMPRESSED_SCHEMA, FIELD_COLUMN}; +use modelardb_types::schemas::FIELD_COLUMN; use modelardb_types::types::{ ArrowValue, ErrorBound, GeneratedColumn, MAX_RETENTION_PERIOD_IN_SECONDS, TimeSeriesTableMetadata, @@ -59,11 +54,11 @@ use object_store::local::LocalFileSystem; use object_store::memory::InMemory; use object_store::path::Path; use url::Url; -use uuid::Uuid; +use crate::data_folder::delta_table_writer::DeltaTableWriter; use crate::error::{ModelarDbStorageError, Result}; -use crate::query::metadata_table::MetadataTable; -use crate::{METADATA_FOLDER, TABLE_FOLDER, apache_parquet_writer_properties, sql_and_concat}; +use crate::query::normal_table::NormalTable; +use crate::{METADATA_FOLDER, TABLE_FOLDER, sql_and_concat}; /// Types of tables supported by ModelarDB. enum TableType { @@ -105,17 +100,12 @@ impl DataFolder { /// Create a new [`DataFolder`] that manages the Delta tables in memory. pub async fn open_memory() -> Result { - let data_folder = Self { - location: "memory:///modelardb".to_owned(), - storage_options: HashMap::new(), - object_store: Arc::new(InMemory::new()), - delta_table_cache: DashMap::new(), - session_context: Arc::new(crate::create_session_context()), - }; - - data_folder.create_and_register_metadata_tables().await?; - - Ok(data_folder) + Self::try_new( + "memory:///modelardb".to_owned(), + HashMap::new(), + Arc::new(InMemory::new()), + ) + .await } /// Create a new [`DataFolder`] that manages the Delta tables in `data_folder_path`. Returns a @@ -123,7 +113,7 @@ impl DataFolder { /// the metadata tables cannot be created. pub async fn open_local(data_folder_path: &StdPath) -> Result { // Ensure the directories in the path exists as LocalFileSystem otherwise returns an error. - fs::create_dir_all(data_folder_path) + std::fs::create_dir_all(data_folder_path) .map_err(|error| DeltaTableError::generic(error.to_string()))?; // Use with_automatic_cleanup to ensure empty directories are deleted automatically. @@ -136,17 +126,7 @@ impl DataFolder { .ok_or_else(|| DeltaTableError::generic("Local data folder path is not UTF-8."))? .to_owned(); - let data_folder = Self { - location, - storage_options: HashMap::new(), - object_store: Arc::new(object_store), - delta_table_cache: DashMap::new(), - session_context: Arc::new(crate::create_session_context()), - }; - - data_folder.create_and_register_metadata_tables().await?; - - Ok(data_folder) + Self::try_new(location, HashMap::new(), Arc::new(object_store)).await } /// Create a new [`DataFolder`] that manages Delta tables in the remote object store given by @@ -225,17 +205,7 @@ impl DataFolder { ) .build()?; - let data_folder = DataFolder { - location, - storage_options, - object_store: Arc::new(object_store), - delta_table_cache: DashMap::new(), - session_context: Arc::new(crate::create_session_context()), - }; - - data_folder.create_and_register_metadata_tables().await?; - - Ok(data_folder) + Self::try_new(location, storage_options, Arc::new(object_store)).await } /// Create a new [`DataFolder`] that manages the Delta tables in an object store with an @@ -258,10 +228,21 @@ impl DataFolder { .map_err(|error| ModelarDbStorageError::InvalidArgument(error.to_string()))?; let (object_store, _path) = object_store::parse_url_opts(&url, &storage_options)?; - let data_folder = DataFolder { + Self::try_new(location, storage_options, Arc::new(object_store)).await + } + + /// Create a new [`DataFolder`] with the given `location`, `storage_options`, and `object_store`, + /// create the metadata tables, and return the [`DataFolder`]. Returns [`ModelarDbStorageError`] + /// if the metadata tables cannot be created. + async fn try_new( + location: String, + storage_options: HashMap, + object_store: Arc, + ) -> Result { + let data_folder = Self { location, storage_options, - object_store: Arc::new(object_store), + object_store, delta_table_cache: DashMap::new(), session_context: Arc::new(crate::create_session_context()), }; @@ -318,6 +299,43 @@ impl DataFolder { Ok(()) } + /// Create a Delta Lake table for a metadata table with `table_name` and `schema` and register + /// it in the [`SessionContext`] in the `metadata` schema. If the table already exists, it is + /// reused. Return [`ModelarDbStorageError`] if the table cannot be created or cannot be + /// registered. + async fn create_and_register_metadata_table( + &self, + table_name: &str, + schema: &Schema, + ) -> Result<()> { + let delta_table = self + .create_delta_lake_table( + table_name, + schema, + &[], + self.location_of_metadata_table(table_name), + SaveMode::Ignore, + ) + .await?; + + let table_reference = TableReference::partial("metadata", table_name); + let metadata_table = Arc::new(NormalTable::new(delta_table, None)); + self.session_context + .register_table(table_reference, metadata_table)?; + + Ok(()) + } + + /// Return the session context used to query the tables using Apache DataFusion. + pub fn session_context(&self) -> &SessionContext { + &self.session_context + } + + /// Return an [`ObjectStore`] to access the root of the Delta Lake. + pub fn object_store(&self) -> Arc { + self.object_store.clone() + } + /// Register all normal tables and time series tables in `self` with its [`SessionContext`]. /// `data_sink` is set as the [`DataSink`] for all of the tables. If the tables could not be /// registered, [`ModelarDbStorageError`] is returned. @@ -349,222 +367,6 @@ impl DataFolder { Ok(()) } - /// Return the session context used to query the tables using Apache DataFusion. - pub fn session_context(&self) -> &SessionContext { - &self.session_context - } - - /// Return an [`ObjectStore`] to access the root of the Delta Lake. - pub fn object_store(&self) -> Arc { - self.object_store.clone() - } - - /// Return a [`DeltaTable`] for manipulating the metadata table with `table_name` in the - /// Delta Lake, or a [`ModelarDbStorageError`] if a connection to the Delta Lake cannot be - /// established or the table does not exist. - pub async fn metadata_delta_table(&self, table_name: &str) -> Result { - let table_path = self.location_of_metadata_table(table_name); - self.delta_table_from_path(&table_path).await - } - - /// Return a [`DeltaTable`] for manipulating the table with `table_name` in the Delta Lake, or a - /// [`ModelarDbStorageError`] if a connection to the Delta Lake cannot be established or the - /// table does not exist. - pub async fn delta_table(&self, table_name: &str) -> Result { - let table_path = self.location_of_table(table_name); - self.delta_table_from_path(&table_path).await - } - - /// Return a [`DeltaTable`] for manipulating the table at `table_path` in the Delta Lake, or a - /// [`ModelarDbStorageError`] if a connection to the Delta Lake cannot be established or the - /// table does not exist. - async fn delta_table_from_path(&self, table_path: &str) -> Result { - // Use the cache if possible and load to get the latest table data. - if let Some(mut delta_table) = self.delta_table_cache.get_mut(table_path) { - delta_table.load().await?; - Ok(delta_table.clone()) - } else { - // If the table is not in the cache, open it and add it to the cache before returning. - let table_url = deltalake::ensure_table_uri(table_path)?; - let delta_table = - deltalake::open_table_with_storage_options(table_url, self.storage_options.clone()) - .await?; - - self.delta_table_cache - .insert(table_path.to_owned(), delta_table.clone()); - - Ok(delta_table) - } - } - - /// Return `true` if the table with `table_name` is a normal table, otherwise return `false`. - pub async fn is_normal_table(&self, table_name: &str) -> Result { - Ok(self - .normal_table_names() - .await? - .contains(&table_name.to_owned())) - } - - /// Return `true` if the table with `table_name` is a time series table, otherwise return `false`. - pub async fn is_time_series_table(&self, table_name: &str) -> Result { - Ok(self - .time_series_table_names() - .await? - .contains(&table_name.to_owned())) - } - - /// Return the name of each table currently in the Delta Lake. If the table names cannot be - /// retrieved, [`ModelarDbStorageError`] is returned. - pub async fn table_names(&self) -> Result> { - let normal_table_names = self.normal_table_names().await?; - let time_series_table_names = self.time_series_table_names().await?; - - let mut table_names = normal_table_names; - table_names.extend(time_series_table_names); - - Ok(table_names) - } - - /// Return the name of each normal table currently in the Delta Lake. Note that this does not - /// include time series tables. If the normal table names cannot be retrieved, - /// [`ModelarDbStorageError`] is returned. - pub async fn normal_table_names(&self) -> Result> { - self.table_names_of_type(TableType::NormalTable).await - } - - /// Return the schema of the table with the name in `table_name` if it is a normal table. If the - /// table does not exist or the table is not a normal table, return [`None`]. - pub async fn normal_table_schema(&self, table_name: &str) -> Option> { - if self - .is_normal_table(table_name) - .await - .is_ok_and(|is_normal_table| is_normal_table) - { - let schema = self - .delta_table(table_name) - .await - .expect("Delta Lake table should exist if the metadata is in the Delta Lake.") - .schema(); - - Some(schema) - } else { - None - } - } - - /// Return the name of each time series table currently in the Delta Lake. Note that this does - /// not include normal tables. If the time series table names cannot be retrieved, - /// [`ModelarDbStorageError`] is returned. - pub async fn time_series_table_names(&self) -> Result> { - self.table_names_of_type(TableType::TimeSeriesTable).await - } - - /// Return the name of tables of `table_type`. Returns [`ModelarDbStorageError`] if the table - /// names cannot be retrieved. - async fn table_names_of_type(&self, table_type: TableType) -> Result> { - let table_type = match table_type { - TableType::NormalTable => "normal_table", - TableType::TimeSeriesTable => "time_series_table", - }; - - let sql = format!("SELECT table_name FROM metadata.{table_type}_metadata"); - let batch = sql_and_concat(&self.session_context, &sql).await?; - - let table_names = modelardb_types::array!(batch, 0, StringArray); - Ok(table_names.iter().flatten().map(str::to_owned).collect()) - } - - /// Return a [`DeltaTableWriter`] for writing to the table with `table_name` in the Delta Lake, - /// or a [`ModelarDbStorageError`] if a connection to the Delta Lake cannot be established or - /// the table does not exist. - pub async fn table_writer(&self, table_name: &str) -> Result { - let delta_table = self.delta_table(table_name).await?; - if self - .time_series_table_metadata_for_registered_time_series_table(table_name) - .await - .is_some() - { - self.time_series_table_writer(delta_table).await - } else { - self.normal_or_metadata_table_writer(delta_table).await - } - } - - /// Return a [`DeltaTableWriter`] for writing to the time series table corresponding to - /// `delta_table` in the Delta Lake, or a [`ModelarDbStorageError`] if a connection to the Delta - /// Lake cannot be established or the table does not exist. - async fn time_series_table_writer(&self, delta_table: DeltaTable) -> Result { - let partition_columns = vec![FIELD_COLUMN.to_owned()]; - - // Specify that the file must be sorted by the tag columns and then by start_time. - let base_compressed_schema_len = COMPRESSED_SCHEMA.0.fields().len(); - let compressed_schema_len = TableProvider::schema(&delta_table).fields().len(); - let sorting_columns_len = (compressed_schema_len - base_compressed_schema_len) + 1; - let mut sorting_columns = Vec::with_capacity(sorting_columns_len); - - // Compressed segments have the tag columns at the end of the schema. - for tag_column_index in base_compressed_schema_len..compressed_schema_len { - sorting_columns.push(SortingColumn { - column_idx: tag_column_index as i32, - descending: false, - nulls_first: false, - }); - } - - // Compressed segments store the first timestamp in the second column. - sorting_columns.push(SortingColumn { - column_idx: 1, - descending: false, - nulls_first: false, - }); - - let writer_properties = apache_parquet_writer_properties(Some(sorting_columns)); - DeltaTableWriter::try_new(delta_table, partition_columns, writer_properties) - } - - /// Return a [`DeltaTableWriter`] for writing to the table corresponding to `delta_table` in the - /// Delta Lake, or a [`ModelarDbStorageError`] if a connection to the Delta Lake cannot be - /// established or the table does not exist. - async fn normal_or_metadata_table_writer( - &self, - delta_table: DeltaTable, - ) -> Result { - let writer_properties = apache_parquet_writer_properties(None); - DeltaTableWriter::try_new(delta_table, vec![], writer_properties) - } - - /// Create a Delta Lake table for a metadata table with `table_name` and `schema` and register - /// it in the [`SessionContext`] in the `metadata` schema. If the table already exists, it is - /// reused. Return [`ModelarDbStorageError`] if the table cannot be created or cannot be - /// registered. - pub async fn create_and_register_metadata_table( - &self, - table_name: &str, - schema: &Schema, - ) -> Result<()> { - let delta_table = self - .create_delta_lake_table( - table_name, - schema, - &[], - self.location_of_metadata_table(table_name), - SaveMode::Ignore, - ) - .await?; - - let table_reference = TableReference::partial("metadata", table_name); - let metadata_table = Arc::new(MetadataTable::new(delta_table)); - self.session_context - .register_table(table_reference, metadata_table)?; - - Ok(()) - } - - /// Return the location of the metadata table with `table_name`. - fn location_of_metadata_table(&self, table_name: &str) -> String { - format!("{}/{METADATA_FOLDER}/{table_name}", self.location) - } - /// Create a Delta Lake table for a normal table with `table_name` and `schema` and save the /// table metadata to the `normal_table_metadata` table. If the table already exists or /// the metadata could not be saved, return [`ModelarDbStorageError`], otherwise return @@ -626,11 +428,6 @@ impl DataFolder { Ok(delta_table) } - /// Return the location of the table with `table_name`. - fn location_of_table(&self, table_name: &str) -> String { - format!("{}/{TABLE_FOLDER}/{table_name}", self.location) - } - /// Save the created time series table to the Delta Lake. This includes adding a row to the /// `time_series_table_metadata` table and adding a row to the `time_series_table_field_columns` /// table for each field column. @@ -862,65 +659,178 @@ impl DataFolder { Ok(()) } - /// Write `columns` to a Delta Lake table with `table_name`. Returns an updated [`DeltaTable`] - /// version if the file was written successfully, otherwise returns [`ModelarDbStorageError`]. - pub async fn write_columns_to_metadata_table( - &self, - table_name: &str, - columns: Vec, - ) -> Result { - let delta_table = self.metadata_delta_table(table_name).await?; - let record_batch = RecordBatch::try_new(TableProvider::schema(&delta_table), columns)?; - let delta_table_writer = self.normal_or_metadata_table_writer(delta_table).await?; - self.write_record_batches_to_table(delta_table_writer, vec![record_batch]) - .await + /// Return a [`DeltaTableWriter`] for writing to the table with `table_name` in the Delta Lake, + /// or a [`ModelarDbStorageError`] if a connection to the Delta Lake cannot be established or + /// the table does not exist. + pub async fn table_writer(&self, table_name: &str) -> Result { + let delta_table = self.delta_table(table_name).await?; + let partition_columns = delta_table.snapshot()?.metadata().partition_columns(); + + // If the table is a time series table, the partition column is the field column. + // self.is_time_series_table() is not used to avoid a redundant query to the metadata table. + if partition_columns.contains(&FIELD_COLUMN.to_owned()) { + DeltaTableWriter::try_new_for_time_series_table(delta_table) + } else { + DeltaTableWriter::try_new_for_normal_table(delta_table) + } } - /// Write `record_batches` to a Delta Lake table for a normal table with `table_name`. Returns - /// an updated [`DeltaTable`] version if the file was written successfully, otherwise returns - /// [`ModelarDbStorageError`]. - pub async fn write_record_batches_to_normal_table( + /// Write `record_batches` to the table with `table_name` in the Delta Lake. The correct + /// writer is selected automatically based on the table type. Returns an updated [`DeltaTable`] + /// if the file was written successfully, otherwise returns [`ModelarDbStorageError`]. + pub async fn write_record_batches( &self, table_name: &str, record_batches: Vec, ) -> Result { - let delta_table = self.delta_table(table_name).await?; - let delta_table_writer = self.normal_or_metadata_table_writer(delta_table).await?; - self.write_record_batches_to_table(delta_table_writer, record_batches) + let delta_table_writer = self.table_writer(table_name).await?; + + delta_table_writer + .write_all_and_commit(&record_batches) .await } - /// Write `compressed_segments` to a Delta Lake table for a time series table with `table_name`. - /// Returns an updated [`DeltaTable`] if the file was written successfully, otherwise returns - /// [`ModelarDbStorageError`]. - pub async fn write_compressed_segments_to_time_series_table( + /// Write `columns` to a Delta Lake table with `table_name`. Returns an updated [`DeltaTable`] + /// version if the file was written successfully, otherwise returns [`ModelarDbStorageError`]. + async fn write_columns_to_metadata_table( &self, table_name: &str, - compressed_segments: Vec, + columns: Vec, ) -> Result { - let delta_table = self.delta_table(table_name).await?; - let delta_table_writer = self.time_series_table_writer(delta_table).await?; - self.write_record_batches_to_table(delta_table_writer, compressed_segments) + let delta_table = self.metadata_delta_table(table_name).await?; + let record_batch = RecordBatch::try_new(TableProvider::schema(&delta_table), columns)?; + let delta_table_writer = DeltaTableWriter::try_new_for_normal_table(delta_table)?; + + delta_table_writer + .write_all_and_commit(&[record_batch]) .await } - /// Write `record_batches` to the `delta_table_writer` and commit. Returns an updated - /// [`DeltaTable`] if all `record_batches` are written and committed successfully, otherwise it - /// rolls back all writes done using `delta_table_writer` and returns [`ModelarDbStorageError`]. - async fn write_record_batches_to_table( - &self, - mut delta_table_writer: DeltaTableWriter, - record_batches: Vec, - ) -> Result { - match delta_table_writer.write_all(&record_batches).await { - Ok(_) => delta_table_writer.commit().await, - Err(error) => { - delta_table_writer.rollback().await?; - Err(error) - } + /// Return a [`DeltaTable`] for manipulating the metadata table with `table_name` in the + /// Delta Lake, or a [`ModelarDbStorageError`] if a connection to the Delta Lake cannot be + /// established or the table does not exist. + async fn metadata_delta_table(&self, table_name: &str) -> Result { + let table_path = self.location_of_metadata_table(table_name); + self.delta_table_from_path(&table_path).await + } + + /// Return a [`DeltaTable`] for manipulating the table with `table_name` in the Delta Lake, or a + /// [`ModelarDbStorageError`] if a connection to the Delta Lake cannot be established or the + /// table does not exist. + pub async fn delta_table(&self, table_name: &str) -> Result { + let table_path = self.location_of_table(table_name); + self.delta_table_from_path(&table_path).await + } + + /// Return a [`DeltaTable`] for manipulating the table at `table_path` in the Delta Lake, or a + /// [`ModelarDbStorageError`] if a connection to the Delta Lake cannot be established or the + /// table does not exist. + async fn delta_table_from_path(&self, table_path: &str) -> Result { + // Use the cache if possible and load to get the latest table data. + if let Some(mut delta_table) = self.delta_table_cache.get_mut(table_path) { + delta_table.load().await?; + Ok(delta_table.clone()) + } else { + // If the table is not in the cache, open it and add it to the cache before returning. + let table_url = deltalake::ensure_table_uri(table_path)?; + let delta_table = + deltalake::open_table_with_storage_options(table_url, self.storage_options.clone()) + .await?; + + self.delta_table_cache + .insert(table_path.to_owned(), delta_table.clone()); + + Ok(delta_table) + } + } + + /// Return `true` if the table with `table_name` is a normal table, otherwise return `false`. + pub async fn is_normal_table(&self, table_name: &str) -> Result { + Ok(self + .normal_table_names() + .await? + .contains(&table_name.to_owned())) + } + + /// Return `true` if the table with `table_name` is a time series table, otherwise return `false`. + pub async fn is_time_series_table(&self, table_name: &str) -> Result { + Ok(self + .time_series_table_names() + .await? + .contains(&table_name.to_owned())) + } + + /// Return the name of each table currently in the Delta Lake. If the table names cannot be + /// retrieved, [`ModelarDbStorageError`] is returned. + pub async fn table_names(&self) -> Result> { + let normal_table_names = self.normal_table_names().await?; + let time_series_table_names = self.time_series_table_names().await?; + + let mut table_names = normal_table_names; + table_names.extend(time_series_table_names); + + Ok(table_names) + } + + /// Return the name of each normal table currently in the Delta Lake. Note that this does not + /// include time series tables. If the normal table names cannot be retrieved, + /// [`ModelarDbStorageError`] is returned. + pub async fn normal_table_names(&self) -> Result> { + self.table_names_of_type(TableType::NormalTable).await + } + + /// Return the name of each time series table currently in the Delta Lake. Note that this does + /// not include normal tables. If the time series table names cannot be retrieved, + /// [`ModelarDbStorageError`] is returned. + pub async fn time_series_table_names(&self) -> Result> { + self.table_names_of_type(TableType::TimeSeriesTable).await + } + + /// Return the name of tables of `table_type`. Returns [`ModelarDbStorageError`] if the table + /// names cannot be retrieved. + async fn table_names_of_type(&self, table_type: TableType) -> Result> { + let table_type = match table_type { + TableType::NormalTable => "normal_table", + TableType::TimeSeriesTable => "time_series_table", + }; + + let sql = format!("SELECT table_name FROM metadata.{table_type}_metadata"); + let batch = sql_and_concat(&self.session_context, &sql).await?; + + let table_names = modelardb_types::array!(batch, 0, StringArray); + Ok(table_names.iter().flatten().map(str::to_owned).collect()) + } + + /// Return the schema of the table with the name in `table_name` if it is a normal table. If the + /// table does not exist or the table is not a normal table, return [`None`]. + pub async fn normal_table_schema(&self, table_name: &str) -> Option> { + if self + .is_normal_table(table_name) + .await + .is_ok_and(|is_normal_table| is_normal_table) + { + let schema = self + .delta_table(table_name) + .await + .expect("Delta Lake table should exist if the metadata is in the Delta Lake.") + .schema(); + + Some(schema) + } else { + None } } + /// Return the location of the metadata table with `table_name`. + fn location_of_metadata_table(&self, table_name: &str) -> String { + format!("{}/{METADATA_FOLDER}/{table_name}", self.location) + } + + /// Return the location of the table with `table_name`. + fn location_of_table(&self, table_name: &str) -> String { + format!("{}/{TABLE_FOLDER}/{table_name}", self.location) + } + /// Return the [`TimeSeriesTableMetadata`] of each time series table currently in the metadata /// Delta Lake. If the [`TimeSeriesTableMetadata`] cannot be retrieved, /// [`ModelarDbStorageError`] is returned. @@ -1093,164 +1003,6 @@ impl DataFolder { } } -/// Functionality for transactionally writing [`RecordBatches`](RecordBatch) to a Delta table stored -/// in an object store. -pub struct DeltaTableWriter { - /// Delta table that all of the record batches will be written to. - delta_table: DeltaTable, - /// Checker that ensures all of the record batches match the table. - delta_data_checker: DeltaDataChecker, - /// Write operation that will be committed to the Delta table. - delta_operation: DeltaOperation, - /// Unique identifier for this write operation to the Delta table. - operation_id: Uuid, - /// Writes record batches to the Delta table as Apache Parquet files. - delta_writer: DeltaWriter, -} - -impl DeltaTableWriter { - /// Create a new [`DeltaTableWriter`]. Returns a [`ModelarDbStorageError`] if the state of the - /// Delta table cannot be loaded from `delta_table`. - pub fn try_new( - delta_table: DeltaTable, - partition_columns: Vec, - writer_properties: WriterProperties, - ) -> Result { - // Checker for if record batches match the table’s invariants, constraints, and nullability. - let delta_table_state = delta_table.snapshot()?; - let snapshot = delta_table_state.snapshot(); - let delta_data_checker = DeltaDataChecker::new(snapshot); - - // Operation that will be committed. - let delta_operation = DeltaOperation::Write { - mode: SaveMode::Append, - partition_by: if partition_columns.is_empty() { - None - } else { - Some(partition_columns.clone()) - }, - predicate: None, - }; - - // A UUID version 4 is used as the operation id to match the existing Operation trait in the - // deltalake crate as it is pub(trait) and thus cannot be used directly in DeltaTableWriter. - let operation_id = Uuid::new_v4(); - - // Writer that will write the record batches. - let object_store = delta_table.log_store().object_store(Some(operation_id)); - let table_schema: Arc = TableProvider::schema(&delta_table); - let num_indexed_cols = - DataSkippingNumIndexedCols::NumColumns(table_schema.fields.len() as u64); - let writer_config = WriterConfig::new( - table_schema, - partition_columns, - Some(writer_properties), - None, - None, - num_indexed_cols, - None, - ); - let delta_writer = DeltaWriter::new(object_store, writer_config); - - Ok(Self { - delta_table, - delta_data_checker, - delta_operation, - operation_id, - delta_writer, - }) - } - - /// Write `record_batch` to the Delta table. Returns a [`ModelarDbStorageError`] if the - /// [`RecordBatches`](RecordBatch) does not match the schema of the Delta table or if the - /// writing fails. - pub async fn write(&mut self, record_batch: &RecordBatch) -> Result<()> { - self.delta_data_checker.check_batch(record_batch).await?; - self.delta_writer.write(record_batch).await?; - Ok(()) - } - - /// Write all `record_batches` to the Delta table. Returns a [`ModelarDbStorageError`] if one of - /// the [`RecordBatches`](RecordBatch) does not match the schema of the Delta table or if the - /// writing fails. - pub async fn write_all(&mut self, record_batches: &[RecordBatch]) -> Result<()> { - for record_batch in record_batches { - self.write(record_batch).await?; - } - Ok(()) - } - - /// Consume the [`DeltaTableWriter`], finish the writing, and commit the files that have been - /// written to the log. If an error occurs before the commit is finished, the already written - /// files are deleted if possible. Returns a [`ModelarDbStorageError`] if an error occurs when - /// finishing the writing, committing the files that have been written, deleting the written - /// files, or updating the [`DeltaTable`]. - pub async fn commit(mut self) -> Result { - // Write the remaining buffered files. - let added_files = self.delta_writer.close().await?; - - // Clone added_files in case of rollback. - let actions = added_files - .clone() - .into_iter() - .map(Action::Add) - .collect::>(); - - // Prepare all inputs to the commit. - let object_store = self.delta_table.object_store(); - let commit_properties = CommitProperties::default(); - let table_data = match self.delta_table.snapshot() { - Ok(table_data) => table_data, - Err(delta_table_error) => { - delete_added_files(&object_store, added_files).await?; - return Err(ModelarDbStorageError::DeltaLake(delta_table_error)); - } - }; - let log_store = self.delta_table.log_store(); - - // Construct the commit to be written. - let commit_builder = CommitBuilder::from(commit_properties) - .with_actions(actions) - .with_operation_id(self.operation_id) - .build(Some(table_data), log_store, self.delta_operation); - - // Write the commit to the Delta table. - let _finalized_commit = match commit_builder.await { - Ok(finalized_commit) => finalized_commit, - Err(delta_table_error) => { - delete_added_files(&object_store, added_files).await?; - return Err(ModelarDbStorageError::DeltaLake(delta_table_error)); - } - }; - - // Return Delta table with the commit. - self.delta_table.load().await?; - Ok(self.delta_table) - } - - /// Consume the [`DeltaTableWriter`], abort the writing, and delete all of the files that have - /// already been written. Returns a [`ModelarDbStorageError`] if an error occurs when aborting - /// the writing or deleting the files that have already been written. Rollback is not called - /// automatically as drop() is not async and async_drop() is not yet a stable API. - pub async fn rollback(self) -> Result { - let object_store = self.delta_table.object_store(); - let added_files = self.delta_writer.close().await?; - delete_added_files(&object_store, added_files).await?; - Ok(self.delta_table) - } -} - -/// Delete the `added_files` from `object_store`. Returns a [`ModelarDbStorageError`] if a file -/// could not be deleted. It is a function instead of a method on [`DeltaTableWriter`] so it can be -/// called by [`DeltaTableWriter`] after the [`DeltaWriter`] is closed without lifetime issues. -async fn delete_added_files(object_store: &dyn ObjectStore, added_files: Vec) -> Result<()> { - for add_file in added_files { - let path: Path = Path::from(add_file.path); - object_store.delete(&path).await?; - } - Ok(()) -} - #[cfg(test)] mod tests { use super::*; @@ -1260,10 +1012,53 @@ mod tests { use datafusion::common::ScalarValue::Int64; use datafusion::logical_expr::Expr::Literal; use modelardb_test::table as test; + use modelardb_test::table::{NoOpDataSink, TIME_SERIES_TABLE_NAME}; use modelardb_types::types::ArrowTimestamp; use tempfile::TempDir; // Tests for DataFolder. + #[tokio::test] + async fn test_open_local_url_without_scheme() { + let temp_dir = tempfile::tempdir().unwrap(); + let path = temp_dir.path().to_str().unwrap(); + + let data_folder = DataFolder::open_local_url(path).await; + assert!(data_folder.is_ok()); + } + + #[tokio::test] + async fn test_open_local_url_with_file_scheme() { + let temp_dir = tempfile::tempdir().unwrap(); + let path = temp_dir.path().to_str().unwrap(); + let url = format!("file://{path}"); + + let data_folder = DataFolder::open_local_url(&url).await; + assert!(data_folder.is_ok()); + } + + #[tokio::test] + async fn test_open_local_url_with_memory_scheme() { + let data_folder = DataFolder::open_local_url("memory://").await; + assert!(data_folder.is_ok()); + } + + #[tokio::test] + async fn test_open_local_url_with_invalid_scheme() { + let url = "invalid://path"; + let result = DataFolder::open_local_url(url).await; + + assert_eq!( + result.err().unwrap().to_string(), + format!("Invalid Argument Error: {url} is not a valid local URL.") + ); + } + + #[tokio::test] + async fn test_open_memory() { + let data_folder = DataFolder::open_memory().await; + assert!(data_folder.is_ok()); + } + #[tokio::test] async fn test_create_metadata_data_folder_tables() { let temp_dir = tempfile::tempdir().unwrap(); @@ -1294,106 +1089,75 @@ mod tests { } #[tokio::test] - async fn test_normal_table_is_normal_table() { + async fn test_register_tables() { let (_temp_dir, data_folder) = create_data_folder_and_create_normal_tables().await; - assert!(data_folder.is_normal_table("normal_table_1").await.unwrap()); - } - #[tokio::test] - async fn test_time_series_table_is_not_normal_table() { - let (_temp_dir, data_folder) = create_data_folder_and_create_time_series_table().await; - assert!( - !data_folder - .is_normal_table(test::TIME_SERIES_TABLE_NAME) - .await - .unwrap() - ); - } + data_folder + .create_time_series_table(&test::time_series_table_metadata()) + .await + .unwrap(); - #[tokio::test] - async fn test_time_series_table_is_time_series_table() { - let (_temp_dir, data_folder) = create_data_folder_and_create_time_series_table().await; + data_folder + .register_tables(Arc::new(NoOpDataSink {})) + .await + .unwrap(); + + // Verify the tables are queryable through the session context. assert!( data_folder - .is_time_series_table(test::TIME_SERIES_TABLE_NAME) + .session_context + .table_provider("normal_table_1") .await - .unwrap() + .is_ok() ); - } - - #[tokio::test] - async fn test_normal_table_is_not_time_series_table() { - let (_temp_dir, data_folder) = create_data_folder_and_create_normal_tables().await; assert!( - !data_folder - .is_time_series_table("normal_table_1") + data_folder + .session_context + .table_provider(TIME_SERIES_TABLE_NAME) .await - .unwrap() + .is_ok() ); } #[tokio::test] - async fn test_table_names() { + async fn test_create_normal_table() { let (_temp_dir, data_folder) = create_data_folder_and_create_normal_tables().await; - let time_series_table_metadata = test::time_series_table_metadata(); - data_folder - .create_time_series_table(&time_series_table_metadata) + assert!(data_folder.delta_table("normal_table_1").await.is_ok()); + + // Retrieve the normal table metadata from the Delta Lake. + let sql = "SELECT table_name FROM metadata.normal_table_metadata ORDER BY table_name"; + let batch = sql_and_concat(&data_folder.session_context, sql) .await .unwrap(); - let table_names = data_folder.table_names().await.unwrap(); assert_eq!( - table_names, - vec![ - "normal_table_2", - "normal_table_1", - test::TIME_SERIES_TABLE_NAME - ] + **batch.column(0), + StringArray::from(vec!["normal_table_1", "normal_table_2"]) ); } #[tokio::test] - async fn test_normal_table_names() { + async fn test_create_existing_normal_table() { let (_temp_dir, data_folder) = create_data_folder_and_create_normal_tables().await; - let normal_table_names = data_folder.normal_table_names().await.unwrap(); - assert_eq!(normal_table_names, vec!["normal_table_2", "normal_table_1"]); + let result = data_folder + .create_normal_table("normal_table_1", &test::normal_table_schema()) + .await; + + assert_eq!( + result.unwrap_err().to_string(), + "Delta Lake Error: Generic error: A Delta Lake table already exists at that location." + ); } #[tokio::test] - async fn test_time_series_table_names() { - let (_temp_dir, data_folder) = create_data_folder_and_create_time_series_table().await; - - let time_series_table_names = data_folder.time_series_table_names().await.unwrap(); - assert_eq!(time_series_table_names, vec![test::TIME_SERIES_TABLE_NAME]); - } - - #[tokio::test] - async fn test_create_normal_table() { - let (_temp_dir, data_folder) = create_data_folder_and_create_normal_tables().await; - - assert!(data_folder.delta_table("normal_table_1").await.is_ok()); - - // Retrieve the normal table metadata from the Delta Lake. - let sql = "SELECT table_name FROM metadata.normal_table_metadata ORDER BY table_name"; - let batch = sql_and_concat(&data_folder.session_context, sql) - .await - .unwrap(); - - assert_eq!( - **batch.column(0), - StringArray::from(vec!["normal_table_1", "normal_table_2"]) - ); - } - - #[tokio::test] - async fn test_create_time_series_table() { + async fn test_create_time_series_table() { let (_temp_dir, data_folder) = create_data_folder_and_create_time_series_table().await; assert!( data_folder - .delta_table(test::TIME_SERIES_TABLE_NAME) + .delta_table(TIME_SERIES_TABLE_NAME) .await .is_ok() ); @@ -1406,7 +1170,7 @@ mod tests { assert_eq!( **batch.column(0), - StringArray::from(vec![test::TIME_SERIES_TABLE_NAME]) + StringArray::from(vec![TIME_SERIES_TABLE_NAME]) ); assert_eq!( **batch.column(1), @@ -1425,10 +1189,7 @@ mod tests { assert_eq!( **batch.column(0), - StringArray::from(vec![ - test::TIME_SERIES_TABLE_NAME, - test::TIME_SERIES_TABLE_NAME - ]) + StringArray::from(vec![TIME_SERIES_TABLE_NAME, TIME_SERIES_TABLE_NAME]) ); assert_eq!( **batch.column(1), @@ -1443,6 +1204,39 @@ mod tests { ); } + #[tokio::test] + async fn test_create_existing_time_series_table() { + let (_temp_dir, data_folder) = create_data_folder_and_create_time_series_table().await; + + let result = data_folder + .create_time_series_table(&test::time_series_table_metadata()) + .await; + + assert_eq!( + result.unwrap_err().to_string(), + "Delta Lake Error: Generic error: A Delta Lake table already exists at that location." + ); + } + + #[tokio::test] + async fn test_create_table_with_unsigned_integers() { + let temp_dir = tempfile::tempdir().unwrap(); + let data_folder = DataFolder::open_local(temp_dir.path()).await.unwrap(); + + let schema = Schema::new(vec![ + Field::new("id", DataType::UInt32, false), + Field::new("name", DataType::Utf8, false), + ]); + + let result = data_folder.create_normal_table("uint_table", &schema).await; + + assert_eq!( + result.unwrap_err().to_string(), + "Delta Lake Error: Data does not match the schema or partitions of the table: \ + Unsigned integers are not supported." + ); + } + #[tokio::test] async fn test_drop_normal_table() { let (_temp_dir, data_folder) = create_data_folder_and_create_normal_tables().await; @@ -1465,13 +1259,13 @@ mod tests { let (_temp_dir, data_folder) = create_data_folder_and_create_time_series_table().await; data_folder - .drop_table(test::TIME_SERIES_TABLE_NAME) + .drop_table(TIME_SERIES_TABLE_NAME) .await .unwrap(); assert!( data_folder - .delta_table(test::TIME_SERIES_TABLE_NAME) + .delta_table(TIME_SERIES_TABLE_NAME) .await .is_err() ); @@ -1497,25 +1291,376 @@ mod tests { async fn test_drop_missing_table() { let (_temp_dir, data_folder) = create_data_folder_and_create_normal_tables().await; - assert!(data_folder.drop_table("missing_table").await.is_err()); + let result = data_folder.drop_table("missing_table").await; + + assert_eq!( + result.unwrap_err().to_string(), + "Invalid Argument Error: Table with name 'missing_table' does not exist." + ); } - async fn create_data_folder_and_create_normal_tables() -> (TempDir, DataFolder) { + #[tokio::test] + async fn test_truncate_normal_table() { + let (_temp_dir, data_folder) = create_data_folder_and_create_normal_tables().await; + + let mut delta_table = data_folder + .write_record_batches("normal_table_1", vec![test::normal_table_record_batch()]) + .await + .unwrap(); + + assert_eq!(delta_table.get_file_uris().unwrap().count(), 1); + + data_folder.truncate_table("normal_table_1").await.unwrap(); + + delta_table.load().await.unwrap(); + assert_eq!(delta_table.get_file_uris().unwrap().count(), 0); + } + + #[tokio::test] + async fn test_truncate_time_series_table() { + let (_temp_dir, data_folder) = create_data_folder_and_create_time_series_table().await; + + let mut delta_table = data_folder + .write_record_batches( + TIME_SERIES_TABLE_NAME, + vec![test::compressed_segments_record_batch()], + ) + .await + .unwrap(); + + assert_eq!(delta_table.get_file_uris().unwrap().count(), 1); + + data_folder + .truncate_table(TIME_SERIES_TABLE_NAME) + .await + .unwrap(); + + delta_table.load().await.unwrap(); + assert_eq!(delta_table.get_file_uris().unwrap().count(), 0); + } + + #[tokio::test] + async fn test_truncate_missing_table() { + let (_temp_dir, data_folder) = create_data_folder_and_create_normal_tables().await; + + let result = data_folder.truncate_table("missing_table").await; + + assert_eq!( + result.unwrap_err().to_string(), + "Delta Lake Error: Not a Delta table: Generic delta kernel error: No files in log segment" + ); + } + + #[tokio::test] + async fn test_vacuum_normal_table() { + let (temp_dir, data_folder) = create_data_folder_and_create_normal_tables().await; + + data_folder + .write_record_batches("normal_table_1", vec![test::normal_table_record_batch()]) + .await + .unwrap(); + + data_folder.truncate_table("normal_table_1").await.unwrap(); + + // The stale Parquet file should still exist on disk alongside the _delta_log folder. + let table_path = format!( + "{}/tables/normal_table_1", + temp_dir.path().to_str().unwrap() + ); + assert_eq!(std::fs::read_dir(&table_path).unwrap().count(), 2); + + data_folder + .vacuum_table("normal_table_1", Some(0)) + .await + .unwrap(); + + // Only the _delta_log folder should remain. + assert_eq!(std::fs::read_dir(&table_path).unwrap().count(), 1); + } + + #[tokio::test] + async fn test_vacuum_time_series_table() { + let (temp_dir, data_folder) = create_data_folder_and_create_time_series_table().await; + + data_folder + .write_record_batches( + TIME_SERIES_TABLE_NAME, + vec![test::compressed_segments_record_batch()], + ) + .await + .unwrap(); + + data_folder + .truncate_table(TIME_SERIES_TABLE_NAME) + .await + .unwrap(); + + // The stale Parquet file should still exist in the partition folder. + let column_path = format!( + "{}/tables/{}/field_column=0", + temp_dir.path().to_str().unwrap(), + TIME_SERIES_TABLE_NAME + ); + assert_eq!(std::fs::read_dir(&column_path).unwrap().count(), 1); + + data_folder + .vacuum_table(TIME_SERIES_TABLE_NAME, Some(0)) + .await + .unwrap(); + + // The stale Parquet file should have been removed. + assert_eq!(std::fs::read_dir(&column_path).unwrap().count(), 0); + } + + #[tokio::test] + async fn test_vacuum_table_with_default_retention_period() { + let (temp_dir, data_folder) = create_data_folder_and_create_normal_tables().await; + + data_folder + .write_record_batches("normal_table_1", vec![test::normal_table_record_batch()]) + .await + .unwrap(); + + data_folder.truncate_table("normal_table_1").await.unwrap(); + + // Vacuum with None uses the default 7-day retention period, so the recently + // truncated file is not yet stale and should still exist on disk. + data_folder + .vacuum_table("normal_table_1", None) + .await + .unwrap(); + + let table_path = format!( + "{}/tables/normal_table_1", + temp_dir.path().to_str().unwrap() + ); + assert_eq!(std::fs::read_dir(&table_path).unwrap().count(), 2); + } + + #[tokio::test] + async fn test_vacuum_table_with_out_of_bounds_retention_period() { + let (_temp_dir, data_folder) = create_data_folder_and_create_normal_tables().await; + + let result = data_folder + .vacuum_table("normal_table_1", Some(MAX_RETENTION_PERIOD_IN_SECONDS + 1)) + .await; + + assert_eq!( + result.unwrap_err().to_string(), + format!( + "Invalid Argument Error: \ + Retention period cannot be more than {MAX_RETENTION_PERIOD_IN_SECONDS} seconds." + ) + ); + } + + #[tokio::test] + async fn test_vacuum_missing_table() { + let (_temp_dir, data_folder) = create_data_folder_and_create_normal_tables().await; + + let result = data_folder.vacuum_table("missing_table", None).await; + + assert_eq!( + result.unwrap_err().to_string(), + "Delta Lake Error: Not a Delta table: Generic delta kernel error: No files in log segment" + ); + } + + #[tokio::test] + async fn test_write_record_batches_to_normal_table() { + let (_temp_dir, data_folder) = create_data_folder_and_create_normal_tables().await; + + let batch_to_write = test::normal_table_record_batch(); + let delta_table = data_folder + .write_record_batches("normal_table_1", vec![batch_to_write.clone()]) + .await + .unwrap(); + + // Verify the write produced a Parquet file. + assert_eq!(delta_table.get_file_uris().unwrap().count(), 1); + + // Read the data back and verify the content. + data_folder + .session_context + .register_table("normal_table_1", Arc::new(delta_table)) + .unwrap(); + + let read_batch = + sql_and_concat(&data_folder.session_context, "SELECT * FROM normal_table_1") + .await + .unwrap(); + + assert_eq!(read_batch, batch_to_write); + } + + #[tokio::test] + async fn test_write_record_batches_to_time_series_table() { + let (_temp_dir, data_folder) = create_data_folder_and_create_time_series_table().await; + + let batch_to_write = test::compressed_segments_record_batch(); + let delta_table = data_folder + .write_record_batches(TIME_SERIES_TABLE_NAME, vec![batch_to_write.clone()]) + .await + .unwrap(); + + // Verify the write produced a Parquet file. + assert_eq!(delta_table.get_file_uris().unwrap().count(), 1); + + // Read the data back and verify the content. The partition column (field_column) is + // moved to the end by Delta Lake, so SELECT the columns in the original schema order. + let schema = test::time_series_table_metadata().compressed_schema.clone(); + let column_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect(); + + data_folder + .session_context + .register_table(TIME_SERIES_TABLE_NAME, Arc::new(delta_table)) + .unwrap(); + + let read_batch = sql_and_concat( + &data_folder.session_context, + &format!( + "SELECT {} FROM {}", + column_names.join(", "), + TIME_SERIES_TABLE_NAME + ), + ) + .await + .unwrap(); + + assert_eq!(read_batch, batch_to_write); + } + + #[tokio::test] + async fn test_write_empty_vec_to_table() { + let (_temp_dir, data_folder) = create_data_folder_and_create_normal_tables().await; + + let delta_table = data_folder + .write_record_batches("normal_table_1", vec![]) + .await + .unwrap(); + + assert_eq!(delta_table.get_file_uris().unwrap().count(), 0); + } + + #[tokio::test] + async fn test_write_empty_record_batch_to_table() { + let (_temp_dir, data_folder) = create_data_folder_and_create_normal_tables().await; + + let empty_batch = RecordBatch::new_empty(Arc::new(test::normal_table_schema())); + let delta_table = data_folder + .write_record_batches("normal_table_1", vec![empty_batch]) + .await + .unwrap(); + + assert_eq!(delta_table.get_file_uris().unwrap().count(), 0); + } + + #[tokio::test] + async fn test_write_record_batches_to_missing_table() { let temp_dir = tempfile::tempdir().unwrap(); let data_folder = DataFolder::open_local(temp_dir.path()).await.unwrap(); - let normal_table_schema = test::normal_table_schema(); + let result = data_folder + .write_record_batches("missing_table", vec![test::normal_table_record_batch()]) + .await; + + assert_eq!( + result.unwrap_err().to_string(), + "Delta Lake Error: Not a Delta table: Generic delta kernel error: No files in log segment" + ); + } + + #[tokio::test] + async fn test_normal_table_is_normal_table() { + let (_temp_dir, data_folder) = create_data_folder_and_create_normal_tables().await; + assert!(data_folder.is_normal_table("normal_table_1").await.unwrap()); + } + + #[tokio::test] + async fn test_time_series_table_is_not_normal_table() { + let (_temp_dir, data_folder) = create_data_folder_and_create_time_series_table().await; + assert!( + !data_folder + .is_normal_table(TIME_SERIES_TABLE_NAME) + .await + .unwrap() + ); + } + + #[tokio::test] + async fn test_time_series_table_is_time_series_table() { + let (_temp_dir, data_folder) = create_data_folder_and_create_time_series_table().await; + assert!( + data_folder + .is_time_series_table(TIME_SERIES_TABLE_NAME) + .await + .unwrap() + ); + } + + #[tokio::test] + async fn test_normal_table_is_not_time_series_table() { + let (_temp_dir, data_folder) = create_data_folder_and_create_normal_tables().await; + assert!( + !data_folder + .is_time_series_table("normal_table_1") + .await + .unwrap() + ); + } + + #[tokio::test] + async fn test_table_names() { + let (_temp_dir, data_folder) = create_data_folder_and_create_normal_tables().await; + + let time_series_table_metadata = test::time_series_table_metadata(); data_folder - .create_normal_table("normal_table_1", &normal_table_schema) + .create_time_series_table(&time_series_table_metadata) .await .unwrap(); - data_folder - .create_normal_table("normal_table_2", &normal_table_schema) + let table_names = data_folder.table_names().await.unwrap(); + assert_eq!( + table_names, + vec!["normal_table_2", "normal_table_1", TIME_SERIES_TABLE_NAME] + ); + } + + #[tokio::test] + async fn test_normal_table_names() { + let (_temp_dir, data_folder) = create_data_folder_and_create_normal_tables().await; + + let normal_table_names = data_folder.normal_table_names().await.unwrap(); + assert_eq!(normal_table_names, vec!["normal_table_2", "normal_table_1"]); + } + + #[tokio::test] + async fn test_time_series_table_names() { + let (_temp_dir, data_folder) = create_data_folder_and_create_time_series_table().await; + + let time_series_table_names = data_folder.time_series_table_names().await.unwrap(); + assert_eq!(time_series_table_names, vec![TIME_SERIES_TABLE_NAME]); + } + + #[tokio::test] + async fn test_normal_table_schema_for_existing_normal_table() { + let (_temp_dir, data_folder) = create_data_folder_and_create_normal_tables().await; + + let schema = data_folder + .normal_table_schema("normal_table_1") .await .unwrap(); - (temp_dir, data_folder) + assert_eq!(schema.as_ref(), &test::normal_table_schema()); + } + + #[tokio::test] + async fn test_normal_table_schema_for_missing_table() { + let (_temp_dir, data_folder) = create_data_folder_and_create_normal_tables().await; + + let maybe_schema = data_folder.normal_table_schema("missing_table").await; + + assert!(maybe_schema.is_none()); } #[tokio::test] @@ -1535,7 +1680,7 @@ mod tests { let (_temp_dir, data_folder) = create_data_folder_and_create_time_series_table().await; let time_series_table_metadata = data_folder - .time_series_table_metadata_for_time_series_table(test::TIME_SERIES_TABLE_NAME) + .time_series_table_metadata_for_time_series_table(TIME_SERIES_TABLE_NAME) .await .unwrap(); @@ -1549,11 +1694,44 @@ mod tests { async fn test_time_series_table_metadata_for_missing_time_series_table() { let (_temp_dir, data_folder) = create_data_folder_and_create_time_series_table().await; - let time_series_table_metadata = data_folder + let result = data_folder .time_series_table_metadata_for_time_series_table("missing_table") .await; - assert!(time_series_table_metadata.is_err()); + assert_eq!( + result.unwrap_err().to_string(), + "Invalid Argument Error: No metadata for time series table named 'missing_table'." + ); + } + + #[tokio::test] + async fn test_time_series_table_metadata_for_registered_time_series_table() { + let (_temp_dir, data_folder) = create_data_folder_and_create_time_series_table().await; + + data_folder + .register_tables(Arc::new(NoOpDataSink {})) + .await + .unwrap(); + + let metadata = data_folder + .time_series_table_metadata_for_registered_time_series_table(TIME_SERIES_TABLE_NAME) + .await; + + assert_eq!( + metadata.unwrap().as_ref(), + &test::time_series_table_metadata(), + ); + } + + #[tokio::test] + async fn test_time_series_table_metadata_for_unregistered_time_series_table() { + let (_temp_dir, data_folder) = create_data_folder_and_create_time_series_table().await; + + let metadata = data_folder + .time_series_table_metadata_for_registered_time_series_table(TIME_SERIES_TABLE_NAME) + .await; + + assert!(metadata.is_none()); } #[tokio::test] @@ -1561,7 +1739,7 @@ mod tests { let (_temp_dir, data_folder) = create_data_folder_and_create_time_series_table().await; let error_bounds = data_folder - .error_bounds(test::TIME_SERIES_TABLE_NAME, 4) + .error_bounds(TIME_SERIES_TABLE_NAME, 4) .await .unwrap(); @@ -1643,6 +1821,24 @@ mod tests { ); } + async fn create_data_folder_and_create_normal_tables() -> (TempDir, DataFolder) { + let temp_dir = tempfile::tempdir().unwrap(); + let data_folder = DataFolder::open_local(temp_dir.path()).await.unwrap(); + + let normal_table_schema = test::normal_table_schema(); + data_folder + .create_normal_table("normal_table_1", &normal_table_schema) + .await + .unwrap(); + + data_folder + .create_normal_table("normal_table_2", &normal_table_schema) + .await + .unwrap(); + + (temp_dir, data_folder) + } + async fn create_data_folder_and_create_time_series_table() -> (TempDir, DataFolder) { let temp_dir = tempfile::tempdir().unwrap(); let data_folder = DataFolder::open_local(temp_dir.path()).await.unwrap(); diff --git a/crates/modelardb_storage/src/lib.rs b/crates/modelardb_storage/src/lib.rs index 6ff9f993..2c5bebe9 100644 --- a/crates/modelardb_storage/src/lib.rs +++ b/crates/modelardb_storage/src/lib.rs @@ -95,7 +95,7 @@ pub fn register_normal_table( delta_table: DeltaTable, data_sink: Arc, ) -> Result<()> { - let normal_table = Arc::new(NormalTable::new(delta_table, data_sink)); + let normal_table = Arc::new(NormalTable::new(delta_table, Some(data_sink))); session_context.register_table(table_name, normal_table)?; Ok(()) diff --git a/crates/modelardb_storage/src/optimizer/model_simple_aggregates.rs b/crates/modelardb_storage/src/optimizer/model_simple_aggregates.rs index 7b408227..8fd4bba6 100644 --- a/crates/modelardb_storage/src/optimizer/model_simple_aggregates.rs +++ b/crates/modelardb_storage/src/optimizer/model_simple_aggregates.rs @@ -621,64 +621,19 @@ impl Accumulator for ModelAvgAccumulator { mod tests { use super::*; - use std::any::{Any, TypeId}; - use std::fmt::{Debug, Formatter, Result as FmtResult}; + use std::any::TypeId; - use datafusion::arrow::datatypes::Schema; - use datafusion::datasource::sink::DataSink; - use datafusion::execution::{SendableRecordBatchStream, TaskContext}; use datafusion::physical_plan::aggregates::AggregateExec; use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec; use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion::physical_plan::filter::FilterExec; - use datafusion::physical_plan::metrics::MetricsSet; - use datafusion::physical_plan::{DisplayAs, DisplayFormatType}; - use modelardb_test::table::{self, TIME_SERIES_TABLE_NAME}; + use modelardb_test::table::{self, NoOpDataSink, TIME_SERIES_TABLE_NAME}; use tempfile::TempDir; - use tonic::async_trait; use crate::data_folder::DataFolder; use crate::query::grid_exec::GridExec; use crate::query::time_series_table::TimeSeriesTable; - // DataSink for testing. - struct NoOpDataSink {} - - #[async_trait] - impl DataSink for NoOpDataSink { - fn as_any(&self) -> &dyn Any { - unimplemented!(); - } - - fn schema(&self) -> &Arc { - unimplemented!(); - } - - fn metrics(&self) -> Option { - unimplemented!(); - } - - async fn write_all( - &self, - _data: SendableRecordBatchStream, - _context: &Arc, - ) -> DataFusionResult { - unimplemented!(); - } - } - - impl Debug for NoOpDataSink { - fn fmt(&self, _f: &mut Formatter<'_>) -> FmtResult { - unimplemented!(); - } - } - - impl DisplayAs for NoOpDataSink { - fn fmt_as(&self, _t: DisplayFormatType, _f: &mut Formatter<'_>) -> FmtResult { - unimplemented!(); - } - } - // Tests for ModelSimpleAggregates. #[tokio::test] async fn test_rewrite_aggregate_on_one_column_without_predicates() { diff --git a/crates/modelardb_storage/src/query/metadata_table.rs b/crates/modelardb_storage/src/query/metadata_table.rs deleted file mode 100644 index d93bce26..00000000 --- a/crates/modelardb_storage/src/query/metadata_table.rs +++ /dev/null @@ -1,85 +0,0 @@ -/* Copyright 2024 The ModelarDB Contributors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -//! Implementation of [`MetadataTable`] which allows metadata tables to be queried through Apache -//! DataFusion. It wraps a [`DeltaTable`] and forwards most method calls to it. However, for -//! [`TableProvider::scan()`] it updates the [`DeltaTable`] to the latest version. - -use std::{any::Any, sync::Arc}; - -use arrow::datatypes::Schema; -use datafusion::catalog::Session; -use datafusion::datasource::{TableProvider, TableType}; -use datafusion::error::{DataFusionError, Result as DataFusionResult}; -use datafusion::logical_expr::Expr; -use datafusion::physical_plan::ExecutionPlan; -use deltalake::DeltaTable; -use tonic::async_trait; - -/// A queryable representation of a metadata table. [`MetadataTable`] wraps the [`TableProvider`] of -/// [`DeltaTable`] and passes most methods calls directly to it. Thus, it can be registered with -/// Apache DataFusion. The only difference from [`DeltaTable`] is that `delta_table` is updated to -/// the latest snapshot when accessed. -#[derive(Debug)] -pub(crate) struct MetadataTable { - /// Access to the Delta Lake table. - delta_table: DeltaTable, -} - -impl MetadataTable { - pub(crate) fn new(delta_table: DeltaTable) -> Self { - Self { delta_table } - } -} - -#[async_trait] -impl TableProvider for MetadataTable { - /// Return `self` as [`Any`] so it can be downcast. - fn as_any(&self) -> &dyn Any { - self.delta_table.as_any() - } - - /// Return the query schema of the metadata table registered with Apache DataFusion. - fn schema(&self) -> Arc { - TableProvider::schema(&self.delta_table) - } - - /// Specify that metadata tables are base tables and not views or temporary tables. - fn table_type(&self) -> TableType { - self.delta_table.table_type() - } - - /// Create an [`ExecutionPlan`] that will scan the metadata table. Returns a - /// [`DataFusionError::Plan`] if the necessary metadata cannot be retrieved. - async fn scan( - &self, - state: &dyn Session, - projection: Option<&Vec>, - filters: &[Expr], - limit: Option, - ) -> DataFusionResult> { - // Clone the Delta Lake table and update it to the latest version. self.data_folder.load( - // &mut self) is not an option due to TypeProvider::scan(&self, ...). Storing the DeltaTable - // in a Mutex and RwLock is also not an option since most of the methods in TypeProvider - // return a reference and the locks will be dropped at the end of the method. - let mut delta_table = self.delta_table.clone(); - delta_table - .load() - .await - .map_err(|error| DataFusionError::Plan(error.to_string()))?; - - delta_table.scan(state, projection, filters, limit).await - } -} diff --git a/crates/modelardb_storage/src/query/mod.rs b/crates/modelardb_storage/src/query/mod.rs index d3f2d3fd..05fd544a 100644 --- a/crates/modelardb_storage/src/query/mod.rs +++ b/crates/modelardb_storage/src/query/mod.rs @@ -13,15 +13,13 @@ * limitations under the License. */ -//! Implementation of types which allow normal tables, metadata tables, and time series tables to be -//! added to Apache DataFusion. This allows them to be queried and small amounts of data to be added -//! with INSERT. +//! Implementation of types which allow normal tables and time series tables to be added to Apache +//! DataFusion. This allows them to be queried and small amounts of data to be added with INSERT. // grid_exec and sorted_join_exec are pub(crate) so the rules added to Apache DataFusion's physical // optimizer can access them. mod generated_as_exec; pub(crate) mod grid_exec; -pub(crate) mod metadata_table; pub(crate) mod normal_table; pub(crate) mod sorted_join_exec; pub(crate) mod time_series_table; diff --git a/crates/modelardb_storage/src/query/normal_table.rs b/crates/modelardb_storage/src/query/normal_table.rs index a30f74d7..73d446d3 100644 --- a/crates/modelardb_storage/src/query/normal_table.rs +++ b/crates/modelardb_storage/src/query/normal_table.rs @@ -13,10 +13,10 @@ * limitations under the License. */ -//! Implementation of [`NormalTable`] which allows normal tables to be queried through Apache DataFusion. -//! It wraps a [`DeltaTable`] and forwards most method calls to it. However, for -//! [`TableProvider::scan()`] it updates the [`DeltaTable`] to the latest version and it implements -//! [`TableProvider::insert_into()`] so rows can be inserted with INSERT. +//! Implementation of [`NormalTable`] which allows normal tables and metadata tables to be queried +//! through Apache DataFusion. It wraps a [`DeltaTable`] and forwards most method calls to it. +//! However, for [`TableProvider::scan()`] it updates the [`DeltaTable`] to the latest version. If a +//! [`DataSink`] is provided, [`TableProvider::insert_into()`] is also supported. use std::borrow::Cow; use std::{any::Any, sync::Arc}; @@ -33,23 +33,26 @@ use datafusion::physical_plan::{ExecutionPlan, Statistics}; use deltalake::DeltaTable; use tonic::async_trait; -/// A queryable representation of a normal table. [`NormalTable`] wraps the [`TableProvider`] -/// [`DeltaTable`] and passes most methods calls directly to it. Thus, it can be registered with -/// Apache DataFusion. [`DeltaTable`] is extended in two ways, `delta_table` is updated to the -/// latest snapshot when accessed and support for inserting has been added. +/// A queryable representation of a normal table or a metadata table. [`NormalTable`] wraps the +/// [`TableProvider`] of [`DeltaTable`] and passes most method calls directly to it. Thus, it can be +/// registered with Apache DataFusion. [`DeltaTable`] is extended in two ways, `delta_table` is +/// updated to the latest snapshot when accessed and, if a [`DataSink`] is provided, support for +/// inserting data with INSERT has been added. Metadata tables are registered without a [`DataSink`] +/// since they are managed internally and should not be modified by users. #[derive(Debug)] pub(crate) struct NormalTable { /// Access to the Delta Lake table. delta_table: DeltaTable, - /// Where data should be written to. - data_sink: Arc, + /// Where data should be written to. [`None`] for metadata tables since they should not support + /// INSERT as they are managed internally. + maybe_data_sink: Option>, } impl NormalTable { - pub(crate) fn new(delta_table: DeltaTable, data_sink: Arc) -> Self { + pub(crate) fn new(delta_table: DeltaTable, maybe_data_sink: Option>) -> Self { Self { delta_table, - data_sink, + maybe_data_sink, } } } @@ -128,15 +131,21 @@ impl TableProvider for NormalTable { /// Create an [`ExecutionPlan`] that will insert the result of `input` into the normal table. /// Generally, [`arrow_flight::flight_service_server::FlightService::do_put()`] should be used - /// instead of this method as it is more efficient. Returns a [`DataFusionError::Plan`] if the - /// necessary metadata cannot be retrieved from the Delta Lake. + /// instead of this method as it is more efficient. Returns a [`DataFusionError::Plan`] if no + /// [`DataSink`] was provided (the table is a metadata table) or if the necessary metadata + /// cannot be retrieved from the Delta Lake. async fn insert_into( &self, _state: &dyn Session, input: Arc, _insert_op: InsertOp, ) -> DataFusionResult> { - let file_sink = Arc::new(DataSinkExec::new(input, self.data_sink.clone(), None)); + let data_sink = self.maybe_data_sink.clone().ok_or_else(|| { + DataFusionError::Plan("INSERT is not supported for metadata tables.".to_owned()) + })?; + + let file_sink = Arc::new(DataSinkExec::new(input, data_sink, None)); + Ok(file_sink) } } diff --git a/crates/modelardb_test/Cargo.toml b/crates/modelardb_test/Cargo.toml index 368cb4ef..2ddc5065 100644 --- a/crates/modelardb_test/Cargo.toml +++ b/crates/modelardb_test/Cargo.toml @@ -21,5 +21,7 @@ authors = ["Soeren Kejser Jensen "] [dependencies] arrow.workspace = true +async-trait.workspace = true +datafusion.workspace = true modelardb_types = { path = "../modelardb_types" } rand.workspace = true diff --git a/crates/modelardb_test/src/table.rs b/crates/modelardb_test/src/table.rs index 56fccb6e..3c2a0f68 100644 --- a/crates/modelardb_test/src/table.rs +++ b/crates/modelardb_test/src/table.rs @@ -15,10 +15,18 @@ //! Implementation of table related functions and constants used throughout ModelarDB for testing purposes. +use std::any::Any; +use std::fmt::{Debug, Formatter, Result as FmtResult}; use std::sync::Arc; use arrow::array::{BinaryArray, Float32Array, Int8Array, Int16Array, RecordBatch, StringArray}; use arrow::datatypes::{ArrowPrimitiveType, DataType, Field, Schema}; +use async_trait::async_trait; +use datafusion::datasource::sink::DataSink; +use datafusion::error::Result as DataFusionResult; +use datafusion::execution::{SendableRecordBatchStream, TaskContext}; +use datafusion::physical_plan::metrics::MetricsSet; +use datafusion::physical_plan::{DisplayAs, DisplayFormatType}; use modelardb_types::types::{ ArrowTimestamp, ArrowValue, ErrorBound, TimeSeriesTableMetadata, Timestamp, TimestampArray, Value, ValueArray, @@ -39,6 +47,44 @@ pub const TIME_SERIES_TABLE_SQL: &str = "CREATE TIME SERIES TABLE time_series_ta /// Name of the time series table used in tests. pub const TIME_SERIES_TABLE_NAME: &str = "time_series_table"; +/// [`DataSink`] implementation that does nothing. +pub struct NoOpDataSink {} + +#[async_trait] +impl DataSink for NoOpDataSink { + fn as_any(&self) -> &dyn Any { + unimplemented!(); + } + + fn metrics(&self) -> Option { + unimplemented!(); + } + + fn schema(&self) -> &Arc { + unimplemented!(); + } + + async fn write_all( + &self, + _data: SendableRecordBatchStream, + _context: &Arc, + ) -> DataFusionResult { + unimplemented!(); + } +} + +impl Debug for NoOpDataSink { + fn fmt(&self, _f: &mut Formatter<'_>) -> FmtResult { + unimplemented!(); + } +} + +impl DisplayAs for NoOpDataSink { + fn fmt_as(&self, _t: DisplayFormatType, _f: &mut Formatter<'_>) -> FmtResult { + unimplemented!(); + } +} + /// Return protobuf message bytes containing metadata for a normal table. pub fn normal_table_metadata_protobuf_bytes() -> Vec { modelardb_types::flight::encode_and_serialize_normal_table_metadata(