Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions crates/modelardb_embedded/src/operations/data_folder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,6 @@ impl Operations for DataFolder {
match table_type {
TableType::NormalTable(schema) => {
let delta_table = self.create_normal_table(table_name, &schema).await?;

self.save_normal_table_metadata(table_name).await?;

let data_sink = Arc::new(DataFolderDataSink::new());

modelardb_storage::register_normal_table(
Expand All @@ -149,9 +146,6 @@ impl Operations for DataFolder {
.create_time_series_table(&time_series_table_metadata)
.await?;

self.save_time_series_table_metadata(&time_series_table_metadata)
.await?;

let data_sink = Arc::new(DataFolderDataSink::new());

modelardb_storage::register_time_series_table(
Expand Down Expand Up @@ -517,9 +511,6 @@ impl Operations for DataFolder {
// Drop the table from the Apache Arrow DataFusion session.
self.session_context().deregister_table(table_name)?;

// Delete the table metadata from the Delta Lake.
self.drop_table_metadata(table_name).await?;

// Drop the table from the Delta Lake.
self.drop_table(table_name).await?;

Expand Down
22 changes: 0 additions & 22 deletions crates/modelardb_server/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,6 @@ impl Cluster {
.create_normal_table(table_name, schema)
.await?;

self.remote_data_folder
.save_normal_table_metadata(table_name)
.await?;

// Create the normal table in each peer node.
let protobuf_bytes = modelardb_types::flight::encode_and_serialize_normal_table_metadata(
table_name, schema,
Expand Down Expand Up @@ -178,10 +174,6 @@ impl Cluster {
.create_time_series_table(time_series_table_metadata)
.await?;

self.remote_data_folder
.save_time_series_table_metadata(time_series_table_metadata)
.await?;

// Create the time series table in each peer node.
let protobuf_bytes =
modelardb_types::flight::encode_and_serialize_time_series_table_metadata(
Expand All @@ -203,10 +195,6 @@ impl Cluster {
pub(crate) async fn drop_cluster_tables(&self, table_names: &[String]) -> Result<()> {
// Drop the tables from the remote data folder.
for table_name in table_names {
self.remote_data_folder
.drop_table_metadata(table_name)
.await?;

self.remote_data_folder.drop_table(table_name).await?;
}

Expand Down Expand Up @@ -663,11 +651,6 @@ mod test {
.create_normal_table(table_name, &schema)
.await
.unwrap();

data_folder
.save_normal_table_metadata(table_name)
.await
.unwrap();
}

/// Create a time series table named `table_name` with a field column named `column_name` in
Expand All @@ -694,11 +677,6 @@ mod test {
.create_time_series_table(&time_series_table_metadata)
.await
.unwrap();

data_folder
.save_time_series_table_metadata(&time_series_table_metadata)
.await
.unwrap();
}

/// Call [`Cluster::retrieve_and_create_tables`] if the [`ClusterMode`] of `context` is
Expand Down
22 changes: 2 additions & 20 deletions crates/modelardb_server/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl Context {
table_name: &str,
schema: &Schema,
) -> Result<()> {
// Create an empty Delta Lake table.
// Create an empty Delta Lake table and save the normal table metadata to the Delta Lake.
self.data_folders
.local_data_folder
.create_normal_table(table_name, schema)
Expand All @@ -87,12 +87,6 @@ impl Context {
// Register the normal table with Apache DataFusion.
self.register_normal_table(table_name).await?;

// Persist the new normal table to the Delta Lake.
self.data_folders
.local_data_folder
.save_normal_table_metadata(table_name)
.await?;

info!("Created normal table '{}'.", table_name);

Ok(())
Expand All @@ -119,7 +113,7 @@ impl Context {
&self,
time_series_table_metadata: &TimeSeriesTableMetadata,
) -> Result<()> {
// Create an empty Delta Lake table.
// Create an empty Delta Lake table and save the time series table metadata to the Delta Lake.
self.data_folders
.local_data_folder
.create_time_series_table(time_series_table_metadata)
Expand All @@ -129,12 +123,6 @@ impl Context {
self.register_time_series_table(Arc::new(time_series_table_metadata.clone()))
.await?;

// Persist the new time series table to the Delta Lake.
self.data_folders
.local_data_folder
.save_time_series_table_metadata(time_series_table_metadata)
.await?;

info!(
"Created time series table '{}'.",
time_series_table_metadata.name
Expand Down Expand Up @@ -266,12 +254,6 @@ impl Context {

self.drop_table_from_storage_engine(table_name).await?;

// Drop the table metadata from the Delta Lake.
self.data_folders
.local_data_folder
.drop_table_metadata(table_name)
.await?;

// Drop the table from the Delta Lake.
self.data_folders
.local_data_folder
Expand Down
15 changes: 2 additions & 13 deletions crates/modelardb_server/src/storage/compressed_data_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ mod tests {
let local_data_folder = data_manager.local_data_folder.clone();

let mut delta_table = local_data_folder
.create_time_series_table(&table::time_series_table_metadata())
.delta_table(TIME_SERIES_TABLE_NAME)
.await
.unwrap();

Expand Down Expand Up @@ -443,14 +443,8 @@ mod tests {
#[tokio::test]
async fn test_remaining_memory_incremented_when_saving_compressed_segments() {
let (_temp_dir, data_manager) = create_compressed_data_manager().await;
let local_data_folder = data_manager.local_data_folder.clone();

let segments = compressed_segments_record_batch();
local_data_folder
.create_time_series_table(&segments.time_series_table_metadata)
.await
.unwrap();

data_manager
.insert_compressed_segments(segments.clone())
.await
Expand Down Expand Up @@ -497,14 +491,9 @@ mod tests {
#[tokio::test]
async fn test_decrease_compressed_remaining_memory_in_bytes() {
let (_temp_dir, data_manager) = create_compressed_data_manager().await;
let local_data_folder = data_manager.local_data_folder.clone();

// Insert data that should be saved when the remaining memory is decreased.
let segments = compressed_segments_record_batch();
local_data_folder
.create_time_series_table(&segments.time_series_table_metadata)
.await
.unwrap();
data_manager
.insert_compressed_segments(segments)
.await
Expand Down Expand Up @@ -559,7 +548,7 @@ mod tests {

let time_series_table_metadata = table::time_series_table_metadata();
local_data_folder
.save_time_series_table_metadata(&time_series_table_metadata)
.create_time_series_table(&time_series_table_metadata)
.await
.unwrap();

Expand Down
10 changes: 0 additions & 10 deletions crates/modelardb_server/src/storage/data_transfer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -474,23 +474,13 @@ mod tests {
.await
.unwrap();

local_data_folder
.save_normal_table_metadata(NORMAL_TABLE_NAME)
.await
.unwrap();

// Create a time series table.
let time_series_table_metadata = table::time_series_table_metadata();
local_data_folder
.create_time_series_table(&time_series_table_metadata)
.await
.unwrap();

local_data_folder
.save_time_series_table_metadata(&time_series_table_metadata)
.await
.unwrap();

(temp_dir, local_data_folder)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1285,7 +1285,7 @@ mod tests {
let time_series_table_metadata = table::time_series_table_metadata();

local_data_folder
.save_time_series_table_metadata(&time_series_table_metadata)
.create_time_series_table(&time_series_table_metadata)
.await
.unwrap();

Expand Down
34 changes: 14 additions & 20 deletions crates/modelardb_storage/src/data_folder/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use uuid::Uuid;

use crate::data_folder::DataFolder;
use crate::error::Result;
use crate::{register_metadata_table, sql_and_concat};
use crate::sql_and_concat;

/// Trait that extends [`DataFolder`] to provide management of the Delta Lake for the cluster.
#[allow(async_fn_in_trait)]
Expand All @@ -51,27 +51,21 @@ impl ClusterMetadata for DataFolder {
/// [`ModelarDbStorageError`](crate::error::ModelarDbStorageError).
async fn create_and_register_cluster_metadata_tables(&self) -> Result<()> {
// Create and register the cluster_metadata table if it does not exist.
let delta_table = self
.create_metadata_table(
"cluster_metadata",
&Schema::new(vec![Field::new("key", DataType::Utf8, false)]),
)
.await?;

register_metadata_table(self.session_context(), "cluster_metadata", delta_table)?;
self.create_and_register_metadata_table(
"cluster_metadata",
&Schema::new(vec![Field::new("key", DataType::Utf8, false)]),
)
.await?;

// Create and register the nodes table if it does not exist.
let delta_table = self
.create_metadata_table(
"nodes",
&Schema::new(vec![
Field::new("url", DataType::Utf8, false),
Field::new("mode", DataType::Utf8, false),
]),
)
.await?;

register_metadata_table(self.session_context(), "nodes", delta_table)?;
self.create_and_register_metadata_table(
"nodes",
&Schema::new(vec![
Field::new("url", DataType::Utf8, false),
Field::new("mode", DataType::Utf8, false),
]),
)
.await?;

Ok(())
}
Expand Down
Loading
Loading