diff --git a/Cargo.lock b/Cargo.lock index 3e9bbcb2..ece9bc31 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3338,10 +3338,12 @@ dependencies = [ "object_store", "proptest", "prost", + "serde", "snmalloc-rs", "tempfile", "tokio", "tokio-stream", + "toml", "tonic", "tracing", "tracing-subscriber", @@ -4561,6 +4563,15 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_spanned" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e24345aa0fe688594e73770a5f6d1b216508b4f93484c0026d521acd30134392" +dependencies = [ + "serde_core", +] + [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -5031,6 +5042,45 @@ dependencies = [ "tokio", ] +[[package]] +name = "toml" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0dc8b1fb61449e27716ec0e1bdf0f6b8f3e8f6b05391e8497b8b6d7804ea6d8" +dependencies = [ + "indexmap", + "serde_core", + "serde_spanned", + "toml_datetime", + "toml_parser", + "toml_writer", + "winnow", +] + +[[package]] +name = "toml_datetime" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2cdb639ebbc97961c51720f858597f7f24c4fc295327923af55b74c3c724533" +dependencies = [ + "serde_core", +] + +[[package]] +name = "toml_parser" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c0cbe268d35bdb4bb5a56a2de88d0ad0eb70af5384a99d648cd4b3d04039800e" +dependencies = [ + "winnow", +] + +[[package]] +name = "toml_writer" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df8b2b54733674ad286d16267dcfc7a71ed5c776e4ac7aa3c3e2561f7c637bf2" + [[package]] name = "tonic" version = "0.13.1" @@ -5791,6 +5841,12 @@ version = "0.53.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "271414315aff87387382ec3d271b52d7ae78726f5d44ac98b4f4030c91880486" +[[package]] +name = "winnow" +version = "0.7.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a5364e9d77fcdeeaa6062ced926ee3381faa2ee02d3eb83a5c27a8825540829" + [[package]] name = "wit-bindgen-rt" version = "0.33.0" diff --git a/Cargo.toml b/Cargo.toml index 8f6910cd..ef932a13 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,12 +37,14 @@ prost = "0.13.1" prost-build = "0.13.1" rand = "0.9.2" rustyline = "17.0.2" +serde = { version = "1.0.228", features = ["derive"] } snmalloc-rs = "0.3.8" sqlparser = "0.58.0" sysinfo = "0.37.2" tempfile = "3.23.0" tokio = "1.48.0" tokio-stream = "0.1.17" +toml = "0.9.8" tonic = "0.13.1" tracing = "0.1.41" tracing-subscriber = "0.3.20" diff --git a/crates/modelardb_server/Cargo.toml b/crates/modelardb_server/Cargo.toml index 4e8e8de7..68ed1a0a 100644 --- a/crates/modelardb_server/Cargo.toml +++ b/crates/modelardb_server/Cargo.toml @@ -38,9 +38,11 @@ modelardb_storage = { path = "../modelardb_storage" } modelardb_types = { path = "../modelardb_types" } object_store = { workspace = true, features = ["aws", "azure"] } prost.workspace = true +serde.workspace = true snmalloc-rs = { workspace = true, features = ["build_cc"] } tokio = { workspace = true, features = ["rt-multi-thread", "signal"] } tokio-stream.workspace = true +toml.workspace = true tonic.workspace = true uuid.workspace = true diff --git a/crates/modelardb_server/src/configuration.rs b/crates/modelardb_server/src/configuration.rs index 90ff04a9..812ec174 100644 --- a/crates/modelardb_server/src/configuration.rs +++ b/crates/modelardb_server/src/configuration.rs @@ -19,20 +19,25 @@ use std::env; use std::sync::Arc; +use modelardb_storage::data_folder::DataFolder; use modelardb_types::flight::protocol; +use object_store::path::Path; +use object_store::{Error, ObjectStore, PutPayload}; use prost::Message; +use serde::{Deserialize, Serialize}; use tokio::sync::RwLock; use crate::ClusterMode; -use crate::error::Result; +use crate::error::{ModelarDbServerError, Result}; use crate::storage::StorageEngine; -/// Manages the system's configuration and provides functionality for updating the configuration. -#[derive(Clone)] -pub struct ConfigurationManager { - /// The mode of the cluster used to determine the behaviour when starting the server, - /// creating tables, updating the remote object store, and querying. - pub(crate) cluster_mode: ClusterMode, +const CONFIGURATION_FILE_NAME: &str = "modelardbd.toml"; + +/// The system's configuration. The configuration can be serialized into a [`CONFIGURATION_FILE_NAME`] +/// configuration file and deserialized from it. Accessing and modifying the configuration should +/// only be done through the [`ConfigurationManager`]. +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +struct Configuration { /// Amount of memory to reserve for storing multivariate time series. multivariate_reserved_memory_in_bytes: u64, /// Amount of memory to reserve for storing uncompressed data buffers. @@ -54,57 +59,158 @@ pub struct ConfigurationManager { pub(crate) writer_threads: u8, } -impl ConfigurationManager { - pub fn new(cluster_mode: ClusterMode) -> Self { - let multivariate_reserved_memory_in_bytes = - env::var("MODELARDBD_MULTIVARIATE_RESERVED_MEMORY_IN_BYTES") - .map_or(512 * 1024 * 1024, |value| value.parse().unwrap()); +impl Configuration { + /// If the corresponding environment variable is set, update the configuration with the value + /// from the environment variable. If the value could not be parsed, return + /// [`ModelarDbServerError`]. + fn update_from_env(&mut self) -> Result<()> { + if let Ok(value) = env::var("MODELARDBD_MULTIVARIATE_RESERVED_MEMORY_IN_BYTES") { + self.multivariate_reserved_memory_in_bytes = value.parse()?; + }; + + if let Ok(value) = env::var("MODELARDBD_UNCOMPRESSED_RESERVED_MEMORY_IN_BYTES") { + self.uncompressed_reserved_memory_in_bytes = value.parse()?; + }; + + if let Ok(value) = env::var("MODELARDBD_COMPRESSED_RESERVED_MEMORY_IN_BYTES") { + self.compressed_reserved_memory_in_bytes = value.parse()?; + }; + + if let Ok(value) = env::var("MODELARDBD_TRANSFER_BATCH_SIZE_IN_BYTES") { + self.transfer_batch_size_in_bytes = Some(value.parse()?); + } + + if let Ok(value) = env::var("MODELARDBD_TRANSFER_TIME_IN_SECONDS") { + self.transfer_time_in_seconds = Some(value.parse()?); + } + + Ok(()) + } - let uncompressed_reserved_memory_in_bytes = - env::var("MODELARDBD_UNCOMPRESSED_RESERVED_MEMORY_IN_BYTES") - .map_or(512 * 1024 * 1024, |value| value.parse().unwrap()); + /// Validate the fields in the configuration and return [`Ok`] if they are valid. If the + /// configuration is invalid, return [`ModelarDbServerError`]. + fn validate(&self) -> Result<()> { + // TODO: Add support for running multiple threads per component. The individual + // components in the storage engine have not been validated with multiple threads, e.g., + // UncompressedDataManager may have race conditions finishing buffers if multiple + // different data points are added by multiple different clients in parallel. + if self.ingestion_threads != 1 || self.compression_threads != 1 || self.writer_threads != 1 + { + return Err(ModelarDbServerError::InvalidState( + "Only one thread per component is currently supported.".to_string(), + )); + }; - let compressed_reserved_memory_in_bytes = - env::var("MODELARDBD_COMPRESSED_RESERVED_MEMORY_IN_BYTES") - .map_or(512 * 1024 * 1024, |value| value.parse().unwrap()); + Ok(()) + } - let transfer_batch_size_in_bytes = env::var("MODELARDBD_TRANSFER_BATCH_SIZE_IN_BYTES") - .map_or(Some(64 * 1024 * 1024), |value| Some(value.parse().unwrap())); + /// Save the configuration to the configuration file at the root of `local_data_folder`. If the + /// file does not exist, it is created. If the configuration file could not be updated or + /// created, return [`ModelarDbServerError`]. + async fn save_to_toml(&self, local_data_folder: &DataFolder) -> Result<()> { + let toml = toml::to_string(self)?; + let object_store = local_data_folder.object_store(); + + object_store + .put( + &Path::from(CONFIGURATION_FILE_NAME), + PutPayload::from(toml.into_bytes()), + ) + .await?; - let transfer_time_in_seconds = env::var("MODELARDBD_TRANSFER_TIME_IN_SECONDS") - .map_or(None, |value| Some(value.parse().unwrap())); + Ok(()) + } +} +impl Default for Configuration { + fn default() -> Self { Self { - cluster_mode, - multivariate_reserved_memory_in_bytes, - uncompressed_reserved_memory_in_bytes, - compressed_reserved_memory_in_bytes, - transfer_batch_size_in_bytes, - transfer_time_in_seconds, - // TODO: Add support for running multiple threads per component. The individual - // components in the storage engine have not been validated with multiple threads, e.g., - // UncompressedDataManager may have race conditions finishing buffers if multiple - // different data points are added by multiple different clients in parallel. + multivariate_reserved_memory_in_bytes: 512 * 1024 * 1024, + uncompressed_reserved_memory_in_bytes: 512 * 1024 * 1024, + compressed_reserved_memory_in_bytes: 512 * 1024 * 1024, + transfer_batch_size_in_bytes: Some(64 * 1024 * 1024), + transfer_time_in_seconds: None, ingestion_threads: 1, compression_threads: 1, writer_threads: 1, } } +} + +/// Manages the system's configuration and provides functionality for updating the configuration. +#[derive(Clone)] +pub struct ConfigurationManager { + /// The mode of the cluster used to determine the behaviour when starting the server, + /// creating tables, updating the remote object store, and querying. + cluster_mode: ClusterMode, + /// The local data folder that stores the configuration file at the root. + local_data_folder: DataFolder, + /// The configuration of the system. This is stored in a separate type to allow for easier + /// serialization and deserialization. + configuration: Configuration, +} + +impl ConfigurationManager { + /// Create a new [`ConfigurationManager`] using the [`CONFIGURATION_FILE_NAME`] configuration + /// file in the local data folder. If the file does not exist, a configuration file is created + /// with the default values. Note that the configuration file and default values are overwritten + /// if the corresponding environment variables are set. If the configuration file could not be + /// read or created, [`ModelarDbServerError`] is returned. + pub async fn try_new(local_data_folder: DataFolder, cluster_mode: ClusterMode) -> Result { + // Check if there is a configuration file in the local data folder. + let object_store = local_data_folder.object_store(); + let maybe_conf_file = object_store.get(&Path::from(CONFIGURATION_FILE_NAME)).await; + + let mut configuration = match maybe_conf_file { + Ok(conf_file) => { + // If the configuration file exists, load the configuration from the file. + let bytes = conf_file.bytes().await?; + toml::from_slice::(&bytes)? + } + Err(error) => match error { + Error::NotFound { .. } => { + // If the configuration file does not exist, create one with the default values. + Configuration::default() + } + error => { + return Err(ModelarDbServerError::InvalidState(format!( + "Configuration file '{CONFIGURATION_FILE_NAME}' could not be read: {error}" + ))); + } + }, + }; + + configuration.update_from_env()?; + configuration.validate()?; + configuration.save_to_toml(&local_data_folder).await?; + + Ok(Self { + cluster_mode, + local_data_folder, + configuration, + }) + } + + pub(crate) fn cluster_mode(&self) -> &ClusterMode { + &self.cluster_mode + } pub(crate) fn multivariate_reserved_memory_in_bytes(&self) -> u64 { - self.multivariate_reserved_memory_in_bytes + self.configuration.multivariate_reserved_memory_in_bytes } - /// Set the new value and update the amount of memory for multivariate data in the storage engine. + /// Set the new value and update the amount of memory for multivariate data in the storage + /// engine. If the new configuration could not be saved to the configuration file, return + /// [`ModelarDbServerError`]. pub(crate) async fn set_multivariate_reserved_memory_in_bytes( &mut self, new_multivariate_reserved_memory_in_bytes: u64, storage_engine: Arc>, - ) { + ) -> Result<()> { // Since the storage engine only keeps track of the remaining reserved memory, calculate // how much the value should change. let value_change = new_multivariate_reserved_memory_in_bytes as i64 - - self.multivariate_reserved_memory_in_bytes as i64; + - self.configuration.multivariate_reserved_memory_in_bytes as i64; storage_engine .write() @@ -112,16 +218,22 @@ impl ConfigurationManager { .adjust_multivariate_remaining_memory_in_bytes(value_change) .await; - self.multivariate_reserved_memory_in_bytes = new_multivariate_reserved_memory_in_bytes; + self.configuration.multivariate_reserved_memory_in_bytes = + new_multivariate_reserved_memory_in_bytes; + + self.configuration + .save_to_toml(&self.local_data_folder) + .await } pub(crate) fn uncompressed_reserved_memory_in_bytes(&self) -> u64 { - self.uncompressed_reserved_memory_in_bytes + self.configuration.uncompressed_reserved_memory_in_bytes } /// Set the new value and update the amount of memory for uncompressed data in the storage - /// engine. Returns [`ModelarDbServerError`](crate::error::ModelarDbServerError) if the memory - /// cannot be updated because a buffer cannot be spilled. + /// engine. If the memory could not be updated because a buffer could not be spilled or if the + /// new configuration could not be saved to the configuration file, return + /// [`ModelarDbServerError`]. pub(crate) async fn set_uncompressed_reserved_memory_in_bytes( &mut self, new_uncompressed_reserved_memory_in_bytes: u64, @@ -130,7 +242,7 @@ impl ConfigurationManager { // Since the storage engine only keeps track of the remaining reserved memory, calculate // how much the value should change. let value_change = new_uncompressed_reserved_memory_in_bytes as i64 - - self.uncompressed_reserved_memory_in_bytes as i64; + - self.configuration.uncompressed_reserved_memory_in_bytes as i64; storage_engine .write() @@ -138,18 +250,21 @@ impl ConfigurationManager { .adjust_uncompressed_remaining_memory_in_bytes(value_change) .await?; - self.uncompressed_reserved_memory_in_bytes = new_uncompressed_reserved_memory_in_bytes; + self.configuration.uncompressed_reserved_memory_in_bytes = + new_uncompressed_reserved_memory_in_bytes; - Ok(()) + self.configuration + .save_to_toml(&self.local_data_folder) + .await } pub(crate) fn compressed_reserved_memory_in_bytes(&self) -> u64 { - self.compressed_reserved_memory_in_bytes + self.configuration.compressed_reserved_memory_in_bytes } /// Set the new value and update the amount of memory for compressed data in the storage engine. - /// If the value was updated, return [`Ok`], otherwise return - /// [`ModelarDbServerError`](crate::error::ModelarDbServerError). + /// If the memory could not be updated because a buffer could not be saved to disk or if the new + /// configuration could not be saved to the configuration file, return [`ModelarDbServerError`]. pub(crate) async fn set_compressed_reserved_memory_in_bytes( &mut self, new_compressed_reserved_memory_in_bytes: u64, @@ -158,7 +273,7 @@ impl ConfigurationManager { // Since the storage engine only keeps track of the remaining reserved memory, calculate // how much the value should change. let value_change = new_compressed_reserved_memory_in_bytes as i64 - - self.compressed_reserved_memory_in_bytes as i64; + - self.configuration.compressed_reserved_memory_in_bytes as i64; storage_engine .write() @@ -166,18 +281,21 @@ impl ConfigurationManager { .adjust_compressed_remaining_memory_in_bytes(value_change) .await?; - self.compressed_reserved_memory_in_bytes = new_compressed_reserved_memory_in_bytes; + self.configuration.compressed_reserved_memory_in_bytes = + new_compressed_reserved_memory_in_bytes; - Ok(()) + self.configuration + .save_to_toml(&self.local_data_folder) + .await } pub(crate) fn transfer_batch_size_in_bytes(&self) -> Option { - self.transfer_batch_size_in_bytes + self.configuration.transfer_batch_size_in_bytes } - /// Set the new value and update the transfer batch size in the storage engine. If the value was - /// updated, return [`Ok`], otherwise return - /// [`ModelarDbServerError`](crate::error::ModelarDbServerError). + /// Set the new value and update the transfer batch size in the storage engine. If the batch + /// size could not be updated or if the new configuration could not be saved to the + /// configuration file, return [`ModelarDbServerError`]. pub(crate) async fn set_transfer_batch_size_in_bytes( &mut self, new_transfer_batch_size_in_bytes: Option, @@ -189,18 +307,20 @@ impl ConfigurationManager { .set_transfer_batch_size_in_bytes(new_transfer_batch_size_in_bytes) .await?; - self.transfer_batch_size_in_bytes = new_transfer_batch_size_in_bytes; + self.configuration.transfer_batch_size_in_bytes = new_transfer_batch_size_in_bytes; - Ok(()) + self.configuration + .save_to_toml(&self.local_data_folder) + .await } pub(crate) fn transfer_time_in_seconds(&self) -> Option { - self.transfer_time_in_seconds + self.configuration.transfer_time_in_seconds } - /// Set the new value and update the transfer time in the storage engine. If the value was - /// updated, return [`Ok`], otherwise return - /// [`ModelarDbServerError`](crate::error::ModelarDbServerError). + /// Set the new value and update the transfer time in the storage engine. If the transfer time + /// could not be updated or if the new configuration could not be saved to the configuration + /// file, return [`ModelarDbServerError`]. pub(crate) async fn set_transfer_time_in_seconds( &mut self, new_transfer_time_in_seconds: Option, @@ -212,23 +332,43 @@ impl ConfigurationManager { .set_transfer_time_in_seconds(new_transfer_time_in_seconds) .await?; - self.transfer_time_in_seconds = new_transfer_time_in_seconds; + self.configuration.transfer_time_in_seconds = new_transfer_time_in_seconds; - Ok(()) + self.configuration + .save_to_toml(&self.local_data_folder) + .await + } + + pub(crate) fn ingestion_threads(&self) -> u8 { + self.configuration.ingestion_threads + } + + pub(crate) fn compression_threads(&self) -> u8 { + self.configuration.compression_threads + } + + pub(crate) fn writer_threads(&self) -> u8 { + self.configuration.writer_threads } /// Encode the current configuration into a [`Configuration`](protocol::Configuration) /// protobuf message and serialize it. pub(crate) fn encode_and_serialize(&self) -> Vec { let configuration = protocol::Configuration { - multivariate_reserved_memory_in_bytes: self.multivariate_reserved_memory_in_bytes, - uncompressed_reserved_memory_in_bytes: self.uncompressed_reserved_memory_in_bytes, - compressed_reserved_memory_in_bytes: self.compressed_reserved_memory_in_bytes, - transfer_batch_size_in_bytes: self.transfer_batch_size_in_bytes, - transfer_time_in_seconds: self.transfer_time_in_seconds, - ingestion_threads: self.ingestion_threads as u32, - compression_threads: self.compression_threads as u32, - writer_threads: self.writer_threads as u32, + multivariate_reserved_memory_in_bytes: self + .configuration + .multivariate_reserved_memory_in_bytes, + uncompressed_reserved_memory_in_bytes: self + .configuration + .uncompressed_reserved_memory_in_bytes, + compressed_reserved_memory_in_bytes: self + .configuration + .compressed_reserved_memory_in_bytes, + transfer_batch_size_in_bytes: self.configuration.transfer_batch_size_in_bytes, + transfer_time_in_seconds: self.configuration.transfer_time_in_seconds, + ingestion_threads: self.configuration.ingestion_threads as u32, + compression_threads: self.configuration.compression_threads as u32, + writer_threads: self.configuration.writer_threads as u32, }; configuration.encode_to_vec() @@ -251,6 +391,94 @@ mod tests { use crate::storage::StorageEngine; // Tests for ConfigurationManager. + #[tokio::test] + async fn test_configuration_file_is_created_if_it_does_not_exist() { + let temp_dir = tempfile::tempdir().unwrap(); + let (_storage_engine, configuration_manager) = create_components(&temp_dir).await; + + let configuration_from_manager = configuration_manager.read().await.configuration.clone(); + let configuration_from_file = configuration_from_file(&temp_dir).await; + + assert_eq!(configuration_from_manager, configuration_from_file); + } + + #[tokio::test] + async fn test_configuration_file_is_read_if_it_exists() { + let temp_dir = tempfile::tempdir().unwrap(); + let local_url = temp_dir.path().to_str().unwrap(); + let local_data_folder = DataFolder::open_local_url(local_url).await.unwrap(); + + let existing_configuration = Configuration { + multivariate_reserved_memory_in_bytes: 1, + uncompressed_reserved_memory_in_bytes: 1, + compressed_reserved_memory_in_bytes: 1, + transfer_batch_size_in_bytes: Some(1), + transfer_time_in_seconds: Some(1), + ..Configuration::default() + }; + + existing_configuration + .save_to_toml(&local_data_folder) + .await + .unwrap(); + + let (_storage_engine, configuration_manager) = create_components(&temp_dir).await; + + let configuration_from_manager = configuration_manager.read().await.configuration.clone(); + let configuration_from_file = configuration_from_file(&temp_dir).await; + + assert_eq!(existing_configuration, configuration_from_manager); + assert_eq!(existing_configuration, configuration_from_file); + } + + #[tokio::test] + async fn test_invalid_configuration_in_configuration_file() { + let temp_dir = tempfile::tempdir().unwrap(); + let local_url = temp_dir.path().to_str().unwrap(); + let local_data_folder = DataFolder::open_local_url(local_url).await.unwrap(); + + // Multiple threads per component are not supported. + let invalid_configuration = Configuration { + ingestion_threads: 2, + ..Configuration::default() + }; + + invalid_configuration + .save_to_toml(&local_data_folder) + .await + .unwrap(); + + let result = + ConfigurationManager::try_new(local_data_folder, ClusterMode::SingleNode).await; + + assert_eq!( + result.err().unwrap().to_string(), + "Invalid State Error: Only one thread per component is currently supported.".to_owned() + ); + } + + #[tokio::test] + async fn test_invalid_toml_in_configuration_file() { + let temp_dir = tempfile::tempdir().unwrap(); + let local_url = temp_dir.path().to_str().unwrap(); + let local_data_folder = DataFolder::open_local_url(local_url).await.unwrap(); + + // Write invalid TOML to the configuration file. + let path = temp_dir.path().join(CONFIGURATION_FILE_NAME); + std::fs::write(path, "invalid_toml").unwrap(); + + let result = + ConfigurationManager::try_new(local_data_folder, ClusterMode::SingleNode).await; + + assert!( + result + .err() + .unwrap() + .to_string() + .contains("TOML Deserialize Error: TOML parse error at line 1") + ); + } + #[tokio::test] async fn test_set_multivariate_reserved_memory_in_bytes() { let temp_dir = tempfile::tempdir().unwrap(); @@ -269,7 +497,8 @@ mod tests { .write() .await .set_multivariate_reserved_memory_in_bytes(new_value, storage_engine) - .await; + .await + .unwrap(); assert_eq!( configuration_manager @@ -278,6 +507,12 @@ mod tests { .multivariate_reserved_memory_in_bytes(), new_value ); + + let configuration_from_file = configuration_from_file(&temp_dir).await; + assert_eq!( + configuration_from_file.multivariate_reserved_memory_in_bytes, + new_value + ); } #[tokio::test] @@ -308,6 +543,12 @@ mod tests { .uncompressed_reserved_memory_in_bytes(), new_value ); + + let configuration_from_file = configuration_from_file(&temp_dir).await; + assert_eq!( + configuration_from_file.uncompressed_reserved_memory_in_bytes, + new_value + ); } #[tokio::test] @@ -338,6 +579,12 @@ mod tests { .compressed_reserved_memory_in_bytes(), new_value ); + + let configuration_from_file = configuration_from_file(&temp_dir).await; + assert_eq!( + configuration_from_file.compressed_reserved_memory_in_bytes, + new_value + ); } #[tokio::test] @@ -368,6 +615,12 @@ mod tests { .transfer_batch_size_in_bytes(), new_value ); + + let configuration_from_file = configuration_from_file(&temp_dir).await; + assert_eq!( + configuration_from_file.transfer_batch_size_in_bytes, + new_value + ); } #[tokio::test] @@ -398,6 +651,17 @@ mod tests { .transfer_time_in_seconds(), new_value ); + + let configuration_from_file = configuration_from_file(&temp_dir).await; + assert_eq!(configuration_from_file.transfer_time_in_seconds, new_value) + } + + /// Return the configuration from the configuration file at the root of `temp_dir`. + async fn configuration_from_file(temp_dir: &TempDir) -> Configuration { + let configuration_file_path = temp_dir.path().join(CONFIGURATION_FILE_NAME); + let file_content = std::fs::read_to_string(&configuration_file_path).unwrap(); + + toml::from_str::(&file_content).unwrap() } /// Create a [`StorageEngine`] and a [`ConfigurationManager`]. @@ -417,14 +681,16 @@ mod tests { let data_folders = DataFolders::new( local_data_folder.clone(), Some(remote_data_folder), - local_data_folder, + local_data_folder.clone(), ); let manager = Manager::new(Uuid::new_v4().to_string()); - let configuration_manager = Arc::new(RwLock::new(ConfigurationManager::new( - ClusterMode::MultiNode(manager), - ))); + let configuration_manager = Arc::new(RwLock::new( + ConfigurationManager::try_new(local_data_folder, ClusterMode::MultiNode(manager)) + .await + .unwrap(), + )); let storage_engine = Arc::new(RwLock::new( StorageEngine::try_new(data_folders, &configuration_manager) diff --git a/crates/modelardb_server/src/context.rs b/crates/modelardb_server/src/context.rs index 3c7bc54a..982c63b2 100644 --- a/crates/modelardb_server/src/context.rs +++ b/crates/modelardb_server/src/context.rs @@ -45,7 +45,10 @@ impl Context { /// metadata manager or storage engine could not be created, [`ModelarDbServerError`] is /// returned. pub async fn try_new(data_folders: DataFolders, cluster_mode: ClusterMode) -> Result { - let configuration_manager = Arc::new(RwLock::new(ConfigurationManager::new(cluster_mode))); + let configuration_manager = Arc::new(RwLock::new( + ConfigurationManager::try_new(data_folders.local_data_folder.clone(), cluster_mode) + .await?, + )); let storage_engine = Arc::new(RwLock::new( StorageEngine::try_new(data_folders.clone(), &configuration_manager).await?, diff --git a/crates/modelardb_server/src/error.rs b/crates/modelardb_server/src/error.rs index 545e8b9f..24ac9366 100644 --- a/crates/modelardb_server/src/error.rs +++ b/crates/modelardb_server/src/error.rs @@ -18,6 +18,7 @@ use std::error::Error; use std::fmt::{Display, Formatter}; use std::io::Error as IoError; +use std::num::ParseIntError; use std::result::Result as StdResult; use crossbeam_channel::RecvError as CrossbeamRecvError; @@ -30,6 +31,8 @@ use modelardb_types::error::ModelarDbTypesError; use object_store::Error as ObjectStoreError; use object_store::path::Error as ObjectStorePathError; use prost::DecodeError; +use toml::de::Error as TomlDeserializeError; +use toml::ser::Error as TomlSerializeError; use tonic::Status as TonicStatusError; use tonic::transport::Error as TonicTransportError; @@ -63,8 +66,14 @@ pub enum ModelarDbServerError { ObjectStore(ObjectStoreError), /// Error returned by ObjectStorePath. ObjectStorePath(ObjectStorePathError), + /// Error returned when failing to parse a string to an integer. + ParseInt(ParseIntError), /// Error returned by Prost when decoding a message that is not valid. ProstDecode(DecodeError), + /// Error returned by TOML when deserializing a configuration file. + TomlDeserialize(TomlDeserializeError), + /// Error returned by TOML when serializing a configuration file. + TomlSerialize(TomlSerializeError), /// Status returned by Tonic. TonicStatus(Box), /// Error returned by Tonic. @@ -86,7 +95,10 @@ impl Display for ModelarDbServerError { Self::ModelarDbTypes(reason) => write!(f, "ModelarDB Types Error: {reason}"), Self::ObjectStore(reason) => write!(f, "Object Store Error: {reason}"), Self::ObjectStorePath(reason) => write!(f, "Object Store Path Error: {reason}"), + Self::ParseInt(reason) => write!(f, "Parse Int Error: {reason}"), Self::ProstDecode(reason) => write!(f, "Prost Decode Error: {reason}"), + Self::TomlDeserialize(reason) => write!(f, "TOML Deserialize Error: {reason}"), + Self::TomlSerialize(reason) => write!(f, "TOML Serialize Error: {reason}"), Self::TonicStatus(reason) => write!(f, "Tonic Status Error: {reason}"), Self::TonicTransport(reason) => write!(f, "Tonic Transport Error: {reason}"), } @@ -109,7 +121,10 @@ impl Error for ModelarDbServerError { Self::ModelarDbTypes(reason) => Some(reason), Self::ObjectStore(reason) => Some(reason), Self::ObjectStorePath(reason) => Some(reason), + Self::ParseInt(reason) => Some(reason), Self::ProstDecode(reason) => Some(reason), + Self::TomlDeserialize(reason) => Some(reason), + Self::TomlSerialize(reason) => Some(reason), Self::TonicStatus(reason) => Some(reason), Self::TonicTransport(reason) => Some(reason), } @@ -176,12 +191,30 @@ impl From for ModelarDbServerError { } } +impl From for ModelarDbServerError { + fn from(error: ParseIntError) -> Self { + Self::ParseInt(error) + } +} + impl From for ModelarDbServerError { fn from(error: DecodeError) -> Self { Self::ProstDecode(error) } } +impl From for ModelarDbServerError { + fn from(error: TomlDeserializeError) -> Self { + Self::TomlDeserialize(error) + } +} + +impl From for ModelarDbServerError { + fn from(error: TomlSerializeError) -> Self { + Self::TomlSerialize(error) + } +} + impl From for ModelarDbServerError { fn from(error: TonicStatusError) -> Self { Self::TonicStatus(Box::new(error)) diff --git a/crates/modelardb_server/src/remote.rs b/crates/modelardb_server/src/remote.rs index 8df16abe..5f344c71 100644 --- a/crates/modelardb_server/src/remote.rs +++ b/crates/modelardb_server/src/remote.rs @@ -349,7 +349,7 @@ impl FlightServiceHandler { async fn validate_request(&self, request_metadata: &MetadataMap) -> StdResult<(), Status> { let configuration_manager = self.context.configuration_manager.read().await; - if let ClusterMode::MultiNode(manager) = &configuration_manager.cluster_mode { + if let ClusterMode::MultiNode(manager) = configuration_manager.cluster_mode() { manager .validate_request(request_metadata) .map_err(|error| Status::unauthenticated(error.to_string())) @@ -640,7 +640,8 @@ impl FlightService for FlightServiceHandler { /// configuration is returned in a [`Configuration`](protocol::Configuration) protobuf message. /// * `UpdateConfiguration`: Update a single setting in the configuration. The setting to update /// and the new value are provided in the [`UpdateConfiguration`](protocol::UpdateConfiguration) - /// protobuf message in the action body. + /// protobuf message in the action body. The setting is updated in the live server configuration + /// and the change is persisted in the configuration file. /// * `NodeType`: Get the type of the node. The type is always `server`. The type of the node /// is returned as a string. async fn do_action( @@ -744,9 +745,8 @@ impl FlightService for FlightServiceHandler { configuration_manager .set_multivariate_reserved_memory_in_bytes(new_value, storage_engine) - .await; - - Ok(()) + .await + .map_err(error_to_status_internal) } Ok(protocol::update_configuration::Setting::UncompressedReservedMemoryInBytes) => { let new_value = maybe_new_value.ok_or(invalid_null_error)?; diff --git a/crates/modelardb_server/src/storage/mod.rs b/crates/modelardb_server/src/storage/mod.rs index 65c8e6f0..f7488f10 100644 --- a/crates/modelardb_server/src/storage/mod.rs +++ b/crates/modelardb_server/src/storage/mod.rs @@ -120,7 +120,7 @@ impl StorageEngine { let uncompressed_data_manager = uncompressed_data_manager.clone(); Self::start_threads( - configuration_manager.ingestion_threads, + configuration_manager.ingestion_threads(), "Ingestion", move || { if let Err(error) = @@ -138,7 +138,7 @@ impl StorageEngine { let uncompressed_data_manager = uncompressed_data_manager.clone(); Self::start_threads( - configuration_manager.compression_threads, + configuration_manager.compression_threads(), "Compression", move || { if let Err(error) = @@ -179,7 +179,7 @@ impl StorageEngine { let compressed_data_manager = compressed_data_manager.clone(); Self::start_threads( - configuration_manager.writer_threads, + configuration_manager.writer_threads(), "Writer", move || { if let Err(error) = diff --git a/docs/user/README.md b/docs/user/README.md index b6f3c3ab..7abf0676 100644 --- a/docs/user/README.md +++ b/docs/user/README.md @@ -387,8 +387,13 @@ modelardb_node = modelardb.connect(node) ``` ## ModelarDB configuration +When the server is started for the first time, a configuration file is created in the root of the data folder named +`modelardbd.toml`. This file is used to persist updates to the configuration made using the `UpdateConfiguration` +action. If the file is changed manually, the changes are only applied when the server is restarted. + `ModelarDB` can be configured before the server is started using environment variables. A full list of the environment -variables is provided here. If an environment variable is not set, the specified default value will be used. +variables is provided here. If an environment variable is not set, the configuration file will be used. If neither the +environment variable nor the configuration file contains the variable, the specified default value will be used. | **Variable** | **Default** | **Description** | |--------------------------------------------------|-------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|