Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
16e07a9
Combine create_normal_table and save_normal_table_metadata
CGodiksen Mar 18, 2026
6346568
Remove previous calls to save_normal_table_metadata
CGodiksen Mar 18, 2026
25ac67b
Combine create_time_series_table and save_time_series_table_metadata
CGodiksen Mar 18, 2026
27c3e43
Fix tests after combining time series table methods
CGodiksen Mar 18, 2026
8e84d74
Remove previous calls to save_time_series_table_metadata
CGodiksen Mar 18, 2026
f41728a
Combine drop_table and drop_table_metadata
CGodiksen Mar 18, 2026
a38f3b4
Fix tests after combining drop table methods
CGodiksen Mar 18, 2026
fed3e77
Remove previous calls to drop_table_metadata
CGodiksen Mar 18, 2026
265d2c6
Fix minor doc issue
CGodiksen Mar 18, 2026
c1e5c80
Remove unused public method and make public methods private
CGodiksen Mar 18, 2026
6728a45
Remove public register_metadata_table function
CGodiksen Mar 18, 2026
267eb51
Combine create_metadata_table and register_metadata_table
CGodiksen Mar 18, 2026
8cec09e
Update use of create_metadata_table to use new method
CGodiksen Mar 18, 2026
54bae4b
Update mod.rs
CGodiksen Mar 18, 2026
b4d1f0e
Fix inconsistent naming for drop_table
CGodiksen Mar 18, 2026
67e2aba
Add optional data sink to NormalTable struct
CGodiksen Mar 19, 2026
2665d28
Remove no longer used MetadataTable struct
CGodiksen Mar 19, 2026
4acbe67
Make metadata methods private
CGodiksen Mar 19, 2026
c3d278e
Move DeltaTableWriter to a seperate file in the same module
CGodiksen Mar 19, 2026
725b8b0
Use a simpler check for if the table is a time series table in table_…
CGodiksen Mar 19, 2026
ac7a2a8
Use a single method for writing to both normal tables and time series…
CGodiksen Mar 19, 2026
4232217
Rename normal_or_metadata_table_writer to normal_table_writer
CGodiksen Mar 19, 2026
5588a67
Remove unnecessary if statement
CGodiksen Mar 19, 2026
731e387
Add try_new method to simplify constructor methods
CGodiksen Mar 19, 2026
ada9426
Move DeltaTableWriter constructors into DeltaTableWriter struct
CGodiksen Mar 19, 2026
02572c3
Move write_record_batches_to_table to DeltaTableWriter
CGodiksen Mar 19, 2026
9e3be19
Reordering methods in DataFolder to group related methods
CGodiksen Mar 19, 2026
d1d2945
Reorder tests to match new method ordering
CGodiksen Mar 19, 2026
34e14e3
Merge branch 'main' into dev/clean-data-folder
CGodiksen Mar 19, 2026
f5e7e9d
Add simple tests for open_local_url and open_memory
CGodiksen Mar 20, 2026
02aca5e
Add tests for duplicates and uint error checks
CGodiksen Mar 20, 2026
b19af79
Check for specific error message in test_open_local_url_with_invalid_…
CGodiksen Mar 20, 2026
f748fc4
Check for specific error message in other DataFolder tests
CGodiksen Mar 20, 2026
5e2a290
Add tests for truncate_table
CGodiksen Mar 20, 2026
03dd063
Add tests for vacuum_table
CGodiksen Mar 20, 2026
855fece
Add tests for write_record_batches
CGodiksen Mar 20, 2026
f970886
Add tests for normal_table_schema()
CGodiksen Mar 20, 2026
bcf1c23
Move NoOpDataSink to modelardb_test crate
CGodiksen Mar 20, 2026
2939ceb
Add test for register_tables()
CGodiksen Mar 20, 2026
d48939c
Add tests for time_series_table_metadata_for_registered_time_series_t…
CGodiksen Mar 20, 2026
9b05d22
Add tests to DeltaTableWriter
CGodiksen Mar 20, 2026
5503c21
Add test for empty write and better assertions
CGodiksen Mar 20, 2026
3445eca
Add test utility function to create_normal_table_writer
CGodiksen Mar 20, 2026
b98447d
Use time series tables in the DeltaTableWriter tests
CGodiksen Mar 20, 2026
81d5f99
Import TIME_SERIES_TABLE_NAME directly
CGodiksen Mar 20, 2026
0f14a95
Use time_series_table_metadata_for_registered_time_series_table in ta…
CGodiksen Mar 20, 2026
5162a66
Use partition column to identify time series tables
CGodiksen Mar 20, 2026
ac33ab2
Update based on comment from @chrthomsen
CGodiksen Mar 24, 2026
dbec391
Update based on comment from @skejserjensen
CGodiksen Mar 24, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion crates/modelardb_bulkloader/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ use deltalake::{ObjectStore, Path};
use futures::stream::StreamExt;
use modelardb_embedded::error::{ModelarDbEmbeddedError, Result};
use modelardb_embedded::operations::Operations;
use modelardb_storage::data_folder::{DataFolder, DeltaTableWriter};
use modelardb_storage::data_folder::DataFolder;
use modelardb_storage::data_folder::delta_table_writer::DeltaTableWriter;
use modelardb_types::types::TimeSeriesTableMetadata;
use sysinfo::System;

Expand Down
12 changes: 6 additions & 6 deletions crates/modelardb_embedded/src/operations/data_folder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,15 +215,15 @@ impl Operations for DataFolder {
&uncompressed_data,
)?;

self.write_compressed_segments_to_time_series_table(table_name, compressed_data)
self.write_record_batches(table_name, compressed_data)
.await?;
} else if let Some(normal_table_schema) = self.normal_table_schema(table_name).await {
// Normal table.
if !schemas_are_compatible(&uncompressed_data.schema(), &normal_table_schema) {
return Err(schema_mismatch_error);
}

self.write_record_batches_to_normal_table(table_name, vec![uncompressed_data])
self.write_record_batches(table_name, vec![uncompressed_data])
.await?;
} else {
return Err(ModelarDbEmbeddedError::InvalidArgument(format!(
Expand Down Expand Up @@ -282,7 +282,7 @@ impl Operations for DataFolder {
let record_batches = common::collect(record_batch_stream).await?;

target_data_folder
.write_record_batches_to_normal_table(target_table_name, record_batches)
.write_record_batches(target_table_name, record_batches)
.await?;

Ok(())
Expand Down Expand Up @@ -407,7 +407,7 @@ impl Operations for DataFolder {

// Write read data to target_table_name in target.
target_data_folder
.write_compressed_segments_to_time_series_table(target_table_name, record_batches)
.write_record_batches(target_table_name, record_batches)
.await?;

Ok(())
Expand Down Expand Up @@ -455,7 +455,7 @@ impl Operations for DataFolder {
let record_batches: Vec<RecordBatch> = stream.try_collect().await?;

target_data_folder
.write_compressed_segments_to_time_series_table(target_table_name, record_batches)
.write_record_batches(target_table_name, record_batches)
.await?;
} else if let (Some(source_normal_table_schema), Some(target_normal_table_schema)) = (
self.normal_table_schema(source_table_name).await,
Expand All @@ -474,7 +474,7 @@ impl Operations for DataFolder {
let record_batches: Vec<RecordBatch> = stream.try_collect().await?;

target_data_folder
.write_record_batches_to_normal_table(target_table_name, record_batches)
.write_record_batches(target_table_name, record_batches)
.await?;
} else {
return Err(ModelarDbEmbeddedError::InvalidArgument(format!(
Expand Down
7 changes: 2 additions & 5 deletions crates/modelardb_server/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -747,10 +747,7 @@ mod tests {
// Write data to the normal table.
let local_data_folder = &context.data_folders.local_data_folder;
local_data_folder
.write_record_batches_to_normal_table(
NORMAL_TABLE_NAME,
vec![table::normal_table_record_batch()],
)
.write_record_batches(NORMAL_TABLE_NAME, vec![table::normal_table_record_batch()])
.await
.unwrap();

Expand Down Expand Up @@ -818,7 +815,7 @@ mod tests {
// Write data to the time series table.
let local_data_folder = &context.data_folders.local_data_folder;
local_data_folder
.write_compressed_segments_to_time_series_table(
.write_record_batches(
TIME_SERIES_TABLE_NAME,
vec![table::compressed_segments_record_batch()],
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl CompressedDataManager {
let record_batch_size_in_bytes = record_batch.get_array_memory_size();

self.local_data_folder
.write_record_batches_to_normal_table(table_name, vec![record_batch])
.write_record_batches(table_name, vec![record_batch])
.await?;

// Inform the data transfer component about the new data if a remote data folder was
Expand Down Expand Up @@ -246,7 +246,7 @@ impl CompressedDataManager {
let compressed_data_buffer_size_in_bytes = compressed_data_buffer.size_in_bytes;
let compressed_segments = compressed_data_buffer.record_batches();
self.local_data_folder
.write_compressed_segments_to_time_series_table(table_name, compressed_segments)
.write_record_batches(table_name, compressed_segments)
.await?;

// Inform the data transfer component about the new data if a remote data folder was
Expand Down
23 changes: 5 additions & 18 deletions crates/modelardb_server/src/storage/data_transfer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,19 +244,9 @@ impl DataTransfer {
debug!("Transferring {current_size_in_bytes} bytes for the table '{table_name}'.",);

// Write the data to the remote Delta Lake.
if self
.local_data_folder
.is_time_series_table(table_name)
.await?
{
self.remote_data_folder
.write_compressed_segments_to_time_series_table(table_name, record_batches)
.await?;
} else {
self.remote_data_folder
.write_record_batches_to_normal_table(table_name, record_batches)
.await?;
}
self.remote_data_folder
.write_record_batches(table_name, record_batches)
.await?;

// Delete the data that has been transferred to the remote Delta Lake.
self.local_data_folder.truncate_table(table_name).await?;
Expand Down Expand Up @@ -493,16 +483,13 @@ mod tests {
for _ in 0..batch_write_count {
// Write to the normal table.
local_data_folder
.write_record_batches_to_normal_table(
NORMAL_TABLE_NAME,
vec![table::normal_table_record_batch()],
)
.write_record_batches(NORMAL_TABLE_NAME, vec![table::normal_table_record_batch()])
.await
.unwrap();

// Write to the time series table.
local_data_folder
.write_compressed_segments_to_time_series_table(
.write_record_batches(
TIME_SERIES_TABLE_NAME,
vec![table::compressed_segments_record_batch()],
)
Expand Down
Loading
Loading