From 74c25d176d054f101dfc1d9f4d8e76829522ccad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B8ren=20Kejser=20Jensen?= Date: Sun, 2 Feb 2025 15:47:51 +0100 Subject: [PATCH 1/5] Add cast macro and define other macros using it --- crates/modelardb_types/src/macros.rs | 45 ++++++++++++++++++++-------- 1 file changed, 32 insertions(+), 13 deletions(-) diff --git a/crates/modelardb_types/src/macros.rs b/crates/modelardb_types/src/macros.rs index c1d21862c..728f068a8 100644 --- a/crates/modelardb_types/src/macros.rs +++ b/crates/modelardb_types/src/macros.rs @@ -13,8 +13,31 @@ * limitations under the License. */ -/// Extract an [`array`](arrow::array::Array) from a slice of -/// `ArrayRef` and cast it to the specified type or panic with `msg`: +/// Cast an [`Array`](arrow::array::Array) or [`ArrayRef`](arrow::array::ArrayRef) to its actual or +/// panic: +/// +/// ``` +/// # use std::sync::Arc; +/// # +/// # use arrow::array::ArrayRef; +/// # use modelardb_types::types::{Timestamp, TimestampArray}; +/// # +/// # let array_ref: ArrayRef = Arc::new(TimestampArray::from(Vec::::new())); +/// let timestamp_array = modelardb_types::cast!(array_ref, TimestampArray); +/// ``` +/// +/// # Panics +/// +/// Panics if `array` cannot be cast to `type`. +#[macro_export] +macro_rules! cast { + ($array:expr, $type:ident) => { + $array.as_any().downcast_ref::<$type>().unwrap() + }; +} + +/// Extract an [`array`](arrow::array::Array) from a slice of `ArrayRef` and cast it to the +/// specified type or panic: /// /// ``` /// # use std::sync::Arc; @@ -32,16 +55,16 @@ /// /// # Panics /// -/// Panics with `message` if `index` is not in `values` or if it cannot be cast to `type`. +/// Panics if `index` is not in `values` or if the array cannot be cast to `type`. #[macro_export] macro_rules! value { - ($values:ident, $index:expr, $type:ident) => { - $values[$index].as_any().downcast_ref::<$type>().unwrap() + ($values:expr, $index:expr, $type:ident) => { + $crate::cast!($values[$index], $type) }; } /// Extract an [`array`](arrow::array::Array) from a -/// [`RecordBatch`](arrow::record_batch::RecordBatch) and cast it to the specified type: +/// [`RecordBatch`](arrow::record_batch::RecordBatch) and cast it to the specified type or panic: /// /// ``` /// # use std::sync::Arc; @@ -62,21 +85,17 @@ macro_rules! value { /// /// # Panics /// -/// Panics if `column` is not in `batch` or if it cannot be cast to `type`. +/// Panics if `index` is not in `batch` or if the array cannot be cast to `type`. #[macro_export] macro_rules! array { ($batch:ident, $column:expr, $type:ident) => { - $batch - .column($column) - .as_any() - .downcast_ref::<$type>() - .unwrap() + $crate::cast!($batch.column($column), $type) }; } /// Extract the [`arrays`](arrow::array::Array) required to execute queries against a model table /// from a [`RecordBatch`](arrow::record_batch::RecordBatch), cast them to the required type, and -/// assign the resulting arrays to the specified variables: +/// assign the resulting arrays to the specified variables. Panics if any of these steps fail: /// /// ``` /// # use std::sync::Arc; From 1e62ff9feb0cfd6a7c812234f9c20550b655c66a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B8ren=20Kejser=20Jensen?= Date: Sun, 2 Feb 2025 16:07:36 +0100 Subject: [PATCH 2/5] Use the cast!() macro for arrays --- .../src/storage/uncompressed_data_manager.rs | 11 ++++----- .../tests/integration_test.rs | 24 +++---------------- crates/modelardb_storage/src/lib.rs | 24 ++++--------------- 3 files changed, 11 insertions(+), 48 deletions(-) diff --git a/crates/modelardb_server/src/storage/uncompressed_data_manager.rs b/crates/modelardb_server/src/storage/uncompressed_data_manager.rs index f25e90399..43a5c9656 100644 --- a/crates/modelardb_server/src/storage/uncompressed_data_manager.rs +++ b/crates/modelardb_server/src/storage/uncompressed_data_manager.rs @@ -212,24 +212,21 @@ impl UncompressedDataManager { // Prepare the timestamp column for iteration. let timestamp_index = model_table_metadata.timestamp_column_index; - let timestamp_column_array: &TimestampArray = data_points - .column(timestamp_index) - .as_any() - .downcast_ref() - .unwrap(); + let timestamp_column_array: &TimestampArray = + modelardb_types::array!(data_points, timestamp_index, TimestampArray); // Prepare the tag columns for iteration. let tag_column_arrays: Vec<&StringArray> = model_table_metadata .tag_column_indices .iter() - .map(|index| data_points.column(*index).as_any().downcast_ref().unwrap()) + .map(|index| modelardb_types::array!(data_points, *index, StringArray)) .collect(); // Prepare the field columns for iteration. let field_column_arrays: Vec<&ValueArray> = model_table_metadata .field_column_indices .iter() - .map(|index| data_points.column(*index).as_any().downcast_ref().unwrap()) + .map(|index| modelardb_types::array!(data_points, *index, ValueArray)) .collect(); // For each data point, compute a hash from the tags and pass the fields to the storage diff --git a/crates/modelardb_server/tests/integration_test.rs b/crates/modelardb_server/tests/integration_test.rs index 1d0aad845..33a20f8f8 100644 --- a/crates/modelardb_server/tests/integration_test.rs +++ b/crates/modelardb_server/tests/integration_test.rs @@ -804,25 +804,13 @@ fn test_can_collect_metrics() { // The used_ingested_memory metric should record when data is received and ingested. let ingested_buffer_size = test::INGESTED_BUFFER_SIZE as u32; - assert_eq!( - values_array - .value(0) - .as_any() - .downcast_ref::() - .unwrap() - .values(), + assert_eq!(modelardb_types::cast!(values_array.value(0), UInt32Array).values(), &[ingested_buffer_size, 0] ); // The used_uncompressed_memory metric should record the change when ingesting and when flushing. let uncompressed_buffer_size = UNCOMPRESSED_BUFFER_SIZE as u32; - assert_eq!( - values_array - .value(1) - .as_any() - .downcast_ref::() - .unwrap() - .values(), + assert_eq!(modelardb_types::cast!(values_array.value(1), UInt32Array).values(), &[uncompressed_buffer_size, 0] ); @@ -831,13 +819,7 @@ fn test_can_collect_metrics() { assert_eq!(values_array.value(2).len(), 2); // The ingested_data_points metric should record the single request to ingest data points. - assert_eq!( - values_array - .value(3) - .as_any() - .downcast_ref::() - .unwrap() - .values(), + assert_eq!(modelardb_types::cast!(values_array.value(3), UInt32Array).values(), &[TIME_SERIES_TEST_LENGTH as u32] ); diff --git a/crates/modelardb_storage/src/lib.rs b/crates/modelardb_storage/src/lib.rs index 644d7b5d1..c2388b214 100644 --- a/crates/modelardb_storage/src/lib.rs +++ b/crates/modelardb_storage/src/lib.rs @@ -523,12 +523,7 @@ pub fn table_metadata_from_record_batch( /// Parse the error bound values in `error_bounds_array` into a list of [`ErrorBounds`](ErrorBound). /// Returns [`ModelarDbServerError`] if an error bound value is invalid. fn array_to_error_bounds(error_bounds_array: ArrayRef) -> Result> { - // unwrap() is safe since error bound values are always f32. - let value_array = error_bounds_array - .as_any() - .downcast_ref::() - .unwrap(); - + let value_array = modelardb_types::cast!(error_bounds_array, Float32Array); let mut error_bounds = Vec::with_capacity(value_array.len()); for value in value_array.iter().flatten() { if value < 0.0 { @@ -548,12 +543,7 @@ fn array_to_generated_columns( generated_columns_array: ArrayRef, df_schema: &DFSchema, ) -> Result>> { - // unwrap() is safe since generated column expressions are always strings. - let expr_array = generated_columns_array - .as_any() - .downcast_ref::() - .unwrap(); - + let expr_array = modelardb_types::cast!(generated_columns_array, StringArray); let mut generated_columns = Vec::with_capacity(expr_array.len()); for maybe_expr in expr_array.iter() { if let Some(expr) = maybe_expr { @@ -803,18 +793,12 @@ mod tests { ); let error_bounds_array = modelardb_types::array!(record_batch, 3, ListArray).value(0); - let value_array = error_bounds_array - .as_any() - .downcast_ref::() - .unwrap(); + let value_array = modelardb_types::cast!(error_bounds_array, Float32Array); assert_eq!(value_array, &Float32Array::from(vec![0.0, 1.0, -5.0, -0.0])); let generated_columns_array = modelardb_types::array!(record_batch, 4, ListArray).value(0); - let expr_array = generated_columns_array - .as_any() - .downcast_ref::() - .unwrap(); + let expr_array = modelardb_types::cast!(generated_columns_array, StringArray); assert_eq!(expr_array, &StringArray::new_null(4)); } From 783c64df7e408cfd8cf9f4c2ba68b166624c5dcc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B8ren=20Kejser=20Jensen?= Date: Sun, 2 Feb 2025 16:21:02 +0100 Subject: [PATCH 3/5] Generalize cast! as it work for more than arrays --- crates/modelardb_types/src/macros.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/crates/modelardb_types/src/macros.rs b/crates/modelardb_types/src/macros.rs index 728f068a8..021f0b299 100644 --- a/crates/modelardb_types/src/macros.rs +++ b/crates/modelardb_types/src/macros.rs @@ -13,8 +13,10 @@ * limitations under the License. */ -/// Cast an [`Array`](arrow::array::Array) or [`ArrayRef`](arrow::array::ArrayRef) to its actual or -/// panic: +/// Convert the result of an expression to [`Any`](std::any::Any) using the `as_any()` method and +/// then cast it to a concrete type using the `downcast_ref()` method. Panics if the cast fails. For +/// example, cast an [`Array`](arrow::array::Array) or [`ArrayRef`](arrow::array::ArrayRef) to its +/// actual type or panic: /// /// ``` /// # use std::sync::Arc; @@ -28,11 +30,11 @@ /// /// # Panics /// -/// Panics if `array` cannot be cast to `type`. +/// Panics if the result of `expr` cannot be cast to `type`. #[macro_export] macro_rules! cast { - ($array:expr, $type:ident) => { - $array.as_any().downcast_ref::<$type>().unwrap() + ($expr:expr, $type:ident) => { + $expr.as_any().downcast_ref::<$type>().unwrap() }; } From 239ff01a66352817de4d20d0452f83f82d7a5dc4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B8ren=20Kejser=20Jensen?= Date: Sun, 2 Feb 2025 16:48:52 +0100 Subject: [PATCH 4/5] Fix ast-grep error and incorrectly named argument --- crates/modelardb_types/src/macros.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/modelardb_types/src/macros.rs b/crates/modelardb_types/src/macros.rs index 021f0b299..7fc5efcfe 100644 --- a/crates/modelardb_types/src/macros.rs +++ b/crates/modelardb_types/src/macros.rs @@ -25,7 +25,7 @@ /// # use modelardb_types::types::{Timestamp, TimestampArray}; /// # /// # let array_ref: ArrayRef = Arc::new(TimestampArray::from(Vec::::new())); -/// let timestamp_array = modelardb_types::cast!(array_ref, TimestampArray); +/// let timestamp_array = modelardb_types::cast!(array_ref, TimestampArray); /// ``` /// /// # Panics @@ -90,8 +90,8 @@ macro_rules! value { /// Panics if `index` is not in `batch` or if the array cannot be cast to `type`. #[macro_export] macro_rules! array { - ($batch:ident, $column:expr, $type:ident) => { - $crate::cast!($batch.column($column), $type) + ($batch:expr, $index:expr, $type:ident) => { + $crate::cast!($batch.column($index), $type) }; } From 3511be2952db63b3ed50358a577390ed4003f159 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B8ren=20Kejser=20Jensen?= Date: Tue, 4 Feb 2025 11:57:17 +0100 Subject: [PATCH 5/5] Update based on comments from @CGodiksen --- .../src/storage/uncompressed_data_manager.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/modelardb_server/src/storage/uncompressed_data_manager.rs b/crates/modelardb_server/src/storage/uncompressed_data_manager.rs index 43a5c9656..26d7a26e9 100644 --- a/crates/modelardb_server/src/storage/uncompressed_data_manager.rs +++ b/crates/modelardb_server/src/storage/uncompressed_data_manager.rs @@ -212,18 +212,18 @@ impl UncompressedDataManager { // Prepare the timestamp column for iteration. let timestamp_index = model_table_metadata.timestamp_column_index; - let timestamp_column_array: &TimestampArray = + let timestamp_column_array = modelardb_types::array!(data_points, timestamp_index, TimestampArray); // Prepare the tag columns for iteration. - let tag_column_arrays: Vec<&StringArray> = model_table_metadata + let tag_column_arrays: Vec<_> = model_table_metadata .tag_column_indices .iter() .map(|index| modelardb_types::array!(data_points, *index, StringArray)) .collect(); // Prepare the field columns for iteration. - let field_column_arrays: Vec<&ValueArray> = model_table_metadata + let field_column_arrays: Vec<_> = model_table_metadata .field_column_indices .iter() .map(|index| modelardb_types::array!(data_points, *index, ValueArray))