Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
118 commits
Select commit Hold shift + click to select a range
7b25cc6
Added basic structure for write-ahead-log
CGodiksen Dec 14, 2025
2401f86
Add try_new method to WriteAheadLogFile
CGodiksen Dec 14, 2025
f17390e
Add try_new method to WriteAheadLog
CGodiksen Dec 14, 2025
b7e688a
Add method to get location of DataFolder
CGodiksen Dec 14, 2025
ab01fb6
Add write-ahead log to context
CGodiksen Dec 14, 2025
fa57835
Fix bug causing an error if the log folder already exists and ensurin…
CGodiksen Dec 14, 2025
cd15223
Add InvalidState to ModelarDbStoreError
CGodiksen Dec 14, 2025
ecd00ad
Add method for creating a table log in the WAL
CGodiksen Dec 14, 2025
a3d14b6
Add table logs when creating the write ahead log
CGodiksen Dec 14, 2025
6c1582b
Add file path to log file struct
CGodiksen Dec 14, 2025
a4ed7c3
Add method to remove table log
CGodiksen Dec 14, 2025
4bcb746
Add StreamWriter to WriteAheadLogFile
CGodiksen Dec 15, 2025
16aa8a4
Add method to append and sync to WriteAheadLogFile
CGodiksen Dec 15, 2025
c7f3771
Add simple method to read all data from a log file
CGodiksen Dec 15, 2025
0219a57
Lock the file when we append and sync
CGodiksen Dec 15, 2025
1c68123
No longer allow WriteAheadLog to be created with non-local data folder
CGodiksen Jan 31, 2026
4183734
Use path saved in field when reading log file
CGodiksen Jan 31, 2026
484cc56
Use Mutex instead of locking the file itself
CGodiksen Feb 1, 2026
c4448ba
Acquire mutex during reading for safety
CGodiksen Feb 1, 2026
cc581ce
Handle unexpected EOF error directly instead of allowing all errors
CGodiksen Feb 1, 2026
5b9ecbd
Create table log file when time series table is created
CGodiksen Feb 1, 2026
99269e9
Delete the table log file when dropping time series tables
CGodiksen Feb 1, 2026
b2dd2b0
Pass write-ahead log to storage engine as well
CGodiksen Feb 1, 2026
748662a
Append to write-ahead log in storage engine when inserting data points
CGodiksen Feb 1, 2026
cf77ef7
Use sync_data() instead of sync_all()
CGodiksen Feb 3, 2026
41f3151
Prevent duplicate WAL schema header on open
CGodiksen Mar 10, 2026
7dc9a47
Fix windows issue with append mode and set_len
CGodiksen Mar 10, 2026
842e87e
Add unit tests for WriteAheadLogFile
CGodiksen Mar 10, 2026
80965a4
Import module for test function imports
CGodiksen Mar 10, 2026
2152dd2
Install serde_json for commit metadata
CGodiksen Mar 10, 2026
5bfaf0c
Add batch ids to DeltaTableWriter
CGodiksen Mar 10, 2026
ed74f3b
Add WAL batch id to IngestedDataBuffer so it is passed to uncompresse…
CGodiksen Mar 10, 2026
92e576f
No longer read spilled buffers back into memory on startup
CGodiksen Mar 10, 2026
3a6f06e
Pass batch ids through uncompressed data manager
CGodiksen Mar 10, 2026
3ab29e6
Pass batch ids through compressed data manager
CGodiksen Mar 10, 2026
01f672b
Pass batch ids when writing compressed segments
CGodiksen Mar 10, 2026
766cf53
Add batch id to WriteAheadLogFile
CGodiksen Mar 10, 2026
f3f114a
Use folder path instead of file path when initializing WriteAheadLogFile
CGodiksen Mar 10, 2026
eb74385
Handle existing wal files and ensure batch id is initialized correctly
CGodiksen Mar 10, 2026
676288c
Add checks the next batch id in unit tests
CGodiksen Mar 10, 2026
bb9cbc5
Add batch offset to WriteAheadLogFile struct fields
CGodiksen Mar 11, 2026
7d403c6
Add method to mark batches as persisted
CGodiksen Mar 11, 2026
8332e4a
Add write ahead log to compressed data manager
CGodiksen Mar 11, 2026
e39d1bf
Add method to WriteAheadLog to mark batches as persisted in table log
CGodiksen Mar 11, 2026
3c50c93
Remove duplicated code to find table log
CGodiksen Mar 11, 2026
030fddb
Mark batches as persisted when right after they are saved
CGodiksen Mar 11, 2026
1be33dd
Use AtomicU64 for next batch id instead of a Mutex
CGodiksen Mar 11, 2026
7238e41
Add treshold for when to rotate segments
CGodiksen Mar 11, 2026
e3ab7bc
Remove the entire folder when removing table log
CGodiksen Mar 11, 2026
c2e109f
Add struct for completed write ahead log segments
CGodiksen Mar 11, 2026
ef0e761
Add struct for active segments
CGodiksen Mar 11, 2026
7cb50d6
Update WriteAheadLogFile fields to use new closed and active segments
CGodiksen Mar 11, 2026
72f30ec
Add util function for finding closed segments
CGodiksen Mar 11, 2026
39a017f
Add util method to close leftover active segment
CGodiksen Mar 11, 2026
2bb6a39
Add util method to read batches from path with tolerance for missing EOF
CGodiksen Mar 11, 2026
2e21bf4
Add new try_new method that finds closed segments and uses new Active…
CGodiksen Mar 11, 2026
222a747
Refactor append_and_sync so it rotates segments if the treshold is hit
CGodiksen Mar 11, 2026
affbb91
Add new method to rotate active segment
CGodiksen Mar 11, 2026
5d91b70
Use ActiveSegment initialization function to remove duplicated code
CGodiksen Mar 11, 2026
cd39c88
Now deleting fully persisted segments when marking batches as persisted
CGodiksen Mar 11, 2026
1f8d98d
Read both from closed and active segment in read_all
CGodiksen Mar 11, 2026
8b329b3
Update tests to match new segment implementation
CGodiksen Mar 11, 2026
623e78c
Add tests for closing leftover active segments
CGodiksen Mar 11, 2026
9102239
Add more unit testing for mark_batches_as_persisted
CGodiksen Mar 11, 2026
58099db
Add method to storage engine to insert data that already has a WAL ba…
CGodiksen Mar 12, 2026
39681d5
Update read_all so it now returns both record batches and batch id
CGodiksen Mar 12, 2026
4b72b48
Fix tests and a small underflow bug caused by reading empty file
CGodiksen Mar 12, 2026
d7de30f
Add method to find all unpersisted batches
CGodiksen Mar 12, 2026
f456aa5
Add separate method to load persisted batches from delta table
CGodiksen Mar 12, 2026
4c578aa
Add separate method to get unpersisted batches
CGodiksen Mar 12, 2026
9e323cb
Now initializing log files with persisted batches from commit history
CGodiksen Mar 12, 2026
cff29c6
No longer create temp dir in util method
CGodiksen Mar 12, 2026
5623427
Add test for checking that commit history with no batch ids changes n…
CGodiksen Mar 12, 2026
53ceffd
Add tests for checking that commit history updates persisted batch ids
CGodiksen Mar 12, 2026
926c951
Add test for checking that fully persisted closed segments are deleted
CGodiksen Mar 12, 2026
ae9afa3
Add test for checking that partially persisted closed segments are re…
CGodiksen Mar 12, 2026
d451700
Add tests for unpersisted_batches()
CGodiksen Mar 12, 2026
3a97ef0
Add test for ensuring that unpersisted_batches includes closed and ac…
CGodiksen Mar 12, 2026
f6d2420
Add method to get unpersisted batches in table log
CGodiksen Mar 12, 2026
253b594
Add method to context to replay write ahead log
CGodiksen Mar 12, 2026
ecae75b
Calling replay write ahead log in main
CGodiksen Mar 12, 2026
c574c8b
Create time series table in compressed data manager tests
CGodiksen Mar 12, 2026
08afca4
Fix doc issues
CGodiksen Mar 12, 2026
25d12cc
Fix dependency issue after rebase
CGodiksen Mar 12, 2026
a722bb8
Add logging to WriteAheadLog struct
CGodiksen Mar 13, 2026
709a036
Add logging for creating WAL file and appending
CGodiksen Mar 13, 2026
6fdd510
Add logging for rotating segments, deleting closed segments, and load…
CGodiksen Mar 13, 2026
8006bac
Add logging for reading WAL files and closing leftover active files
CGodiksen Mar 13, 2026
c1c93fe
Add warn message when replaying unpersisted batches
CGodiksen Mar 13, 2026
e092e65
Add debug message for marking batches as persisted
CGodiksen Mar 13, 2026
417578a
Write-ahead log instead of write-ahead-log
CGodiksen Mar 13, 2026
a4fe666
Add unit tests for WriteAheadLog try_new
CGodiksen Mar 13, 2026
f14b88a
Add unit tests for creating table log
CGodiksen Mar 13, 2026
b6c0d7e
Add unit tests for removing table log
CGodiksen Mar 13, 2026
1b0d688
Add unit tests for append to table log
CGodiksen Mar 13, 2026
1ac6b35
Add unit tests for mark batches as persisted
CGodiksen Mar 13, 2026
ac7eff9
Add unit tests for unpersisted batches
CGodiksen Mar 13, 2026
7feda8f
Add unit test for removing table log and appending again
CGodiksen Mar 13, 2026
6c67bb4
Add unit test for loading persisted batch ids on try new
CGodiksen Mar 13, 2026
aaee538
Add more assertions to cover scenario with one closed segment and one…
CGodiksen Mar 13, 2026
c38839b
Only log closed segments if there are any
CGodiksen Mar 13, 2026
c4f240a
Minor type and documentation changes
CGodiksen Mar 13, 2026
d1d508d
Locking closed segments immediately when rotating active segment
CGodiksen Mar 13, 2026
13a1e6f
Minor refactoring to WAL code
CGodiksen Mar 13, 2026
91fbb66
Minor test refactoring to WAL code
CGodiksen Mar 13, 2026
b74bb0a
Fix small issue with order or retain and delete when marking as persi…
CGodiksen Mar 13, 2026
55f97c7
Merge branch 'main' into dev/write-ahead-log
CGodiksen Mar 16, 2026
73f9357
Expanded documentation for batch_ids field
CGodiksen Mar 24, 2026
081a943
Append CompressedSegmentBatch to CompressedDataBuffer directly
CGodiksen Mar 24, 2026
55c6b11
Add more WAL specific documentation and change WAL folder
CGodiksen Mar 24, 2026
369affc
Remove all mentions of operations from WAL
CGodiksen Mar 24, 2026
e6602a7
Remove DeltaTable from create_table_log arguments
CGodiksen Mar 24, 2026
aa7b209
Rename WriteAheadLogFile to SegmentedLog
CGodiksen Mar 24, 2026
a65df7a
No longer use "rotation" terminology in WAL
CGodiksen Mar 24, 2026
9825bd9
Update based on comments from @skejserjensen
CGodiksen Mar 24, 2026
d5da572
Merge branch 'main' into dev/write-ahead-log
CGodiksen Mar 24, 2026
9addf20
Fix writing of record batches after merge
CGodiksen Mar 24, 2026
023b277
Update based on comments from @chrthomsen
CGodiksen Mar 27, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
674 changes: 371 additions & 303 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ prost-build = "0.14.1"
rand = "0.9.2"
rustyline = "17.0.2"
serde = { version = "1.0.228", features = ["derive"] }
serde_json = "1.0.149"
snmalloc-rs = "0.3.8"
sqlparser = "0.59.0"
sysinfo = "0.37.2"
Expand Down
7 changes: 6 additions & 1 deletion crates/modelardb_server/src/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ mod tests {
use std::sync::Arc;

use modelardb_storage::data_folder::DataFolder;
use modelardb_storage::write_ahead_log::WriteAheadLog;
use modelardb_types::types::{Node, ServerMode};
use tempfile::TempDir;
use tokio::sync::RwLock;
Expand Down Expand Up @@ -689,6 +690,10 @@ mod tests {
.await
.unwrap();

let write_ahead_log = Arc::new(RwLock::new(
WriteAheadLog::try_new(&local_data_folder).await.unwrap(),
));

let configuration_manager = Arc::new(RwLock::new(
ConfigurationManager::try_new(
local_data_folder,
Expand All @@ -699,7 +704,7 @@ mod tests {
));

let storage_engine = Arc::new(RwLock::new(
StorageEngine::try_new(data_folders, &configuration_manager)
StorageEngine::try_new(data_folders, write_ahead_log, &configuration_manager)
.await
.unwrap(),
));
Expand Down
70 changes: 64 additions & 6 deletions crates/modelardb_server/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ use std::sync::Arc;

use datafusion::arrow::datatypes::Schema;
use datafusion::catalog::{SchemaProvider, TableProvider};
use modelardb_storage::write_ahead_log::WriteAheadLog;
use modelardb_types::types::TimeSeriesTableMetadata;
use tokio::sync::RwLock;
use tracing::info;
use tracing::{info, warn};

use crate::configuration::ConfigurationManager;
use crate::error::{ModelarDbServerError, Result};
Expand All @@ -38,6 +39,8 @@ pub struct Context {
pub configuration_manager: Arc<RwLock<ConfigurationManager>>,
/// Manages all uncompressed and compressed data in the system.
pub storage_engine: Arc<RwLock<StorageEngine>>,
/// Write-ahead log for persisting data and operations.
write_ahead_log: Arc<RwLock<WriteAheadLog>>,
}

impl Context {
Expand All @@ -50,14 +53,24 @@ impl Context {
.await?,
));

let write_ahead_log = Arc::new(RwLock::new(
WriteAheadLog::try_new(&data_folders.local_data_folder).await?,
));

let storage_engine = Arc::new(RwLock::new(
StorageEngine::try_new(data_folders.clone(), &configuration_manager).await?,
StorageEngine::try_new(
data_folders.clone(),
write_ahead_log.clone(),
&configuration_manager,
)
.await?,
));

Ok(Context {
data_folders,
configuration_manager,
storage_engine,
write_ahead_log,
})
}

Expand Down Expand Up @@ -103,6 +116,12 @@ impl Context {
self.register_and_save_time_series_table(time_series_table_metadata)
.await?;

// Create a file in the write-ahead log to log uncompressed data for the table.
let mut write_ahead_log = self.write_ahead_log.write().await;
write_ahead_log
.create_table_log(time_series_table_metadata)
.await?;

Ok(())
}

Expand Down Expand Up @@ -237,6 +256,40 @@ impl Context {
Ok(())
}

/// For each time series table in the local data folder, use the write-ahead log to replay any
/// data that was written to the storage engine but not compressed and saved to disk. Note that
/// this method should only be called before the storage engine starts ingesting data to avoid
/// replaying data that is currently in memory.
pub(super) async fn replay_write_ahead_log(&self) -> Result<()> {
let local_data_folder = &self.data_folders.local_data_folder;

let write_ahead_log = self.write_ahead_log.write().await;
let mut storage_engine = self.storage_engine.write().await;

for metadata in local_data_folder.time_series_table_metadata().await? {
let unpersisted_batches =
write_ahead_log.unpersisted_batches_in_table_log(&metadata.name)?;

if !unpersisted_batches.is_empty() {
warn!(
table = %metadata.name,
batch_count = unpersisted_batches.len(),
"Replaying unpersisted batches for time series table."
);
}

for (batch_id, batch) in unpersisted_batches {
storage_engine.insert_data_points_with_batch_id(
metadata.clone(),
batch,
batch_id,
)?;
}
}

Ok(())
}

/// Drop the table with `table_name` if it exists. The table is deregistered from the Apache
/// Arrow Datafusion session context and deleted from the storage engine and Delta Lake. If the
/// table does not exist or if it could not be dropped, [`ModelarDbServerError`] is returned.
Expand All @@ -254,11 +307,16 @@ impl Context {

self.drop_table_from_storage_engine(table_name).await?;

let local_data_folder = &self.data_folders.local_data_folder;

// If the table is a time series table, delete the table log file from the write-ahead log.
if local_data_folder.is_time_series_table(table_name).await? {
let mut write_ahead_log = self.write_ahead_log.write().await;
write_ahead_log.remove_table_log(table_name)?;
}

// Drop the table from the Delta Lake.
self.data_folders
.local_data_folder
.drop_table(table_name)
.await?;
local_data_folder.drop_table(table_name).await?;

Ok(())
}
Expand Down
9 changes: 2 additions & 7 deletions crates/modelardb_server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,8 @@ async fn main() -> Result<()> {
// Setup CTRL+C handler.
setup_ctrl_c_handler(&context);

// Initialize storage engine with spilled buffers.
context
.storage_engine
.read()
.await
.initialize(&context)
.await?;
// Replay any data that was written to the storage engine but not compressed and saved to disk.
context.replay_write_ahead_log().await?;

// Start the Apache Arrow Flight interface.
remote::start_apache_arrow_flight_server(context, *PORT).await?;
Expand Down
58 changes: 39 additions & 19 deletions crates/modelardb_server/src/storage/compressed_data_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

//! Buffer for compressed segments from the same time series table.

use std::collections::HashSet;
use std::sync::Arc;

use datafusion::arrow::record_batch::RecordBatch;
Expand All @@ -30,16 +31,21 @@ pub(super) struct CompressedSegmentBatch {
pub(super) time_series_table_metadata: Arc<TimeSeriesTableMetadata>,
/// Compressed segments representing the data points to insert.
pub(super) compressed_segments: Vec<RecordBatch>,
/// The ids of the uncompressed batches that correspond to the compressed segments. The ids are
/// assigned by the WAL and are used to delete uncompressed data when compressed data is saved.
pub(super) batch_ids: HashSet<u64>,
}

impl CompressedSegmentBatch {
pub(super) fn new(
time_series_table_metadata: Arc<TimeSeriesTableMetadata>,
compressed_segments: Vec<RecordBatch>,
batch_ids: HashSet<u64>,
) -> Self {
Self {
time_series_table_metadata,
compressed_segments,
batch_ids,
}
}

Expand All @@ -59,6 +65,9 @@ pub(super) struct CompressedDataBuffer {
compressed_segments: Vec<RecordBatch>,
/// Continuously updated total sum of the size of the compressed segments.
pub(super) size_in_bytes: u64,
/// The ids of the uncompressed batches that correspond to the compressed segments. The ids are
/// assigned by the WAL and are used to delete uncompressed data when compressed data is saved.
batch_ids: HashSet<u64>,
}

impl CompressedDataBuffer {
Expand All @@ -67,16 +76,19 @@ impl CompressedDataBuffer {
time_series_table_metadata,
compressed_segments: vec![],
size_in_bytes: 0,
batch_ids: HashSet::new(),
}
}

/// Append `compressed_segments` to the [`CompressedDataBuffer`] and return the size of
/// `compressed_segments` in bytes if their schema matches the time series table, otherwise
/// [`ModelarDbServerError`] is returned.
pub(super) fn append_compressed_segments(
/// Append the compressed segments in `compressed_segment_batch` to the [`CompressedDataBuffer`]
/// and return the size of the compressed segments in bytes if their schema matches the time
/// series table, otherwise [`ModelarDbServerError`] is returned.
pub(super) fn append_compressed_segment_batch(
&mut self,
mut compressed_segments: Vec<RecordBatch>,
compressed_segment_batch: CompressedSegmentBatch,
) -> Result<u64> {
let mut compressed_segments = compressed_segment_batch.compressed_segments;

if compressed_segments.iter().any(|compressed_segments| {
compressed_segments.schema() != self.time_series_table_metadata.compressed_schema
}) {
Expand All @@ -94,6 +106,8 @@ impl CompressedDataBuffer {
self.size_in_bytes += compressed_segments_size;
}

self.batch_ids.extend(compressed_segment_batch.batch_ids);

Ok(compressed_segments_size)
}

Expand All @@ -102,6 +116,11 @@ impl CompressedDataBuffer {
self.compressed_segments
}

/// Return the ids given to the uncompressed batches by the WAL.
pub(super) fn batch_ids(&self) -> HashSet<u64> {
self.batch_ids.clone()
}

/// Return the size in bytes of `compressed_segments`.
fn size_of_compressed_segments(compressed_segments: &RecordBatch) -> u64 {
let mut total_size: u64 = 0;
Expand All @@ -128,15 +147,12 @@ mod tests {
use modelardb_test::table;

#[test]
fn test_can_append_valid_compressed_segments() {
fn test_can_append_valid_compressed_segment_batch() {
let mut compressed_data_buffer =
CompressedDataBuffer::new(table::time_series_table_metadata_arc());

compressed_data_buffer
.append_compressed_segments(vec![
table::compressed_segments_record_batch(),
table::compressed_segments_record_batch(),
])
.append_compressed_segment_batch(compressed_segment_batch())
.unwrap();

assert_eq!(compressed_data_buffer.compressed_segments.len(), 2);
Expand All @@ -150,10 +166,7 @@ mod tests {
CompressedDataBuffer::new(table::time_series_table_metadata_arc());

compressed_data_buffer
.append_compressed_segments(vec![
table::compressed_segments_record_batch(),
table::compressed_segments_record_batch(),
])
.append_compressed_segment_batch(compressed_segment_batch())
.unwrap();

assert!(compressed_data_buffer.size_in_bytes > 0);
Expand All @@ -164,12 +177,8 @@ mod tests {
let mut compressed_data_buffer =
CompressedDataBuffer::new(table::time_series_table_metadata_arc());

let compressed_segments = vec![
table::compressed_segments_record_batch(),
table::compressed_segments_record_batch(),
];
compressed_data_buffer
.append_compressed_segments(compressed_segments)
.append_compressed_segment_batch(compressed_segment_batch())
.unwrap();

let record_batches = compressed_data_buffer.record_batches();
Expand All @@ -179,6 +188,17 @@ mod tests {
assert_eq!(record_batch.num_rows(), 6);
}

fn compressed_segment_batch() -> CompressedSegmentBatch {
CompressedSegmentBatch::new(
table::time_series_table_metadata_arc(),
vec![
table::compressed_segments_record_batch(),
table::compressed_segments_record_batch(),
],
HashSet::from([0, 1, 2]),
)
}

#[test]
fn test_get_size_of_compressed_data_buffer() {
let compressed_data_buffer = table::compressed_segments_record_batch();
Expand Down
Loading
Loading