Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 6 additions & 9 deletions crates/modelardb_server/src/storage/uncompressed_data_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,24 +212,21 @@ impl UncompressedDataManager {

// Prepare the timestamp column for iteration.
let timestamp_index = model_table_metadata.timestamp_column_index;
let timestamp_column_array: &TimestampArray = data_points
.column(timestamp_index)
.as_any()
.downcast_ref()
.unwrap();
let timestamp_column_array =
modelardb_types::array!(data_points, timestamp_index, TimestampArray);

// Prepare the tag columns for iteration.
let tag_column_arrays: Vec<&StringArray> = model_table_metadata
let tag_column_arrays: Vec<_> = model_table_metadata
.tag_column_indices
.iter()
.map(|index| data_points.column(*index).as_any().downcast_ref().unwrap())
.map(|index| modelardb_types::array!(data_points, *index, StringArray))
.collect();

// Prepare the field columns for iteration.
let field_column_arrays: Vec<&ValueArray> = model_table_metadata
let field_column_arrays: Vec<_> = model_table_metadata
.field_column_indices
.iter()
.map(|index| data_points.column(*index).as_any().downcast_ref().unwrap())
.map(|index| modelardb_types::array!(data_points, *index, ValueArray))
.collect();

// For each data point, compute a hash from the tags and pass the fields to the storage
Expand Down
24 changes: 3 additions & 21 deletions crates/modelardb_server/tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -804,25 +804,13 @@ fn test_can_collect_metrics() {

// The used_ingested_memory metric should record when data is received and ingested.
let ingested_buffer_size = test::INGESTED_BUFFER_SIZE as u32;
assert_eq!(
values_array
.value(0)
.as_any()
.downcast_ref::<UInt32Array>()
.unwrap()
.values(),
assert_eq!(modelardb_types::cast!(values_array.value(0), UInt32Array).values(),
&[ingested_buffer_size, 0]
);

// The used_uncompressed_memory metric should record the change when ingesting and when flushing.
let uncompressed_buffer_size = UNCOMPRESSED_BUFFER_SIZE as u32;
assert_eq!(
values_array
.value(1)
.as_any()
.downcast_ref::<UInt32Array>()
.unwrap()
.values(),
assert_eq!(modelardb_types::cast!(values_array.value(1), UInt32Array).values(),
&[uncompressed_buffer_size, 0]
);

Expand All @@ -831,13 +819,7 @@ fn test_can_collect_metrics() {
assert_eq!(values_array.value(2).len(), 2);

// The ingested_data_points metric should record the single request to ingest data points.
assert_eq!(
values_array
.value(3)
.as_any()
.downcast_ref::<UInt32Array>()
.unwrap()
.values(),
assert_eq!(modelardb_types::cast!(values_array.value(3), UInt32Array).values(),
&[TIME_SERIES_TEST_LENGTH as u32]
);

Expand Down
24 changes: 4 additions & 20 deletions crates/modelardb_storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -523,12 +523,7 @@ pub fn table_metadata_from_record_batch(
/// Parse the error bound values in `error_bounds_array` into a list of [`ErrorBounds`](ErrorBound).
/// Returns [`ModelarDbServerError`] if an error bound value is invalid.
fn array_to_error_bounds(error_bounds_array: ArrayRef) -> Result<Vec<ErrorBound>> {
// unwrap() is safe since error bound values are always f32.
let value_array = error_bounds_array
.as_any()
.downcast_ref::<Float32Array>()
.unwrap();

let value_array = modelardb_types::cast!(error_bounds_array, Float32Array);
let mut error_bounds = Vec::with_capacity(value_array.len());
for value in value_array.iter().flatten() {
if value < 0.0 {
Expand All @@ -548,12 +543,7 @@ fn array_to_generated_columns(
generated_columns_array: ArrayRef,
df_schema: &DFSchema,
) -> Result<Vec<Option<GeneratedColumn>>> {
// unwrap() is safe since generated column expressions are always strings.
let expr_array = generated_columns_array
.as_any()
.downcast_ref::<StringArray>()
.unwrap();

let expr_array = modelardb_types::cast!(generated_columns_array, StringArray);
let mut generated_columns = Vec::with_capacity(expr_array.len());
for maybe_expr in expr_array.iter() {
if let Some(expr) = maybe_expr {
Expand Down Expand Up @@ -803,18 +793,12 @@ mod tests {
);

let error_bounds_array = modelardb_types::array!(record_batch, 3, ListArray).value(0);
let value_array = error_bounds_array
.as_any()
.downcast_ref::<Float32Array>()
.unwrap();
let value_array = modelardb_types::cast!(error_bounds_array, Float32Array);

assert_eq!(value_array, &Float32Array::from(vec![0.0, 1.0, -5.0, -0.0]));

let generated_columns_array = modelardb_types::array!(record_batch, 4, ListArray).value(0);
let expr_array = generated_columns_array
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let expr_array = modelardb_types::cast!(generated_columns_array, StringArray);

assert_eq!(expr_array, &StringArray::new_null(4));
}
Expand Down
49 changes: 35 additions & 14 deletions crates/modelardb_types/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,33 @@
* limitations under the License.
*/

/// Extract an [`array`](arrow::array::Array) from a slice of
/// `ArrayRef` and cast it to the specified type or panic with `msg`:
/// Convert the result of an expression to [`Any`](std::any::Any) using the `as_any()` method and
/// then cast it to a concrete type using the `downcast_ref()` method. Panics if the cast fails. For
/// example, cast an [`Array`](arrow::array::Array) or [`ArrayRef`](arrow::array::ArrayRef) to its
/// actual type or panic:
///
/// ```
/// # use std::sync::Arc;
/// #
/// # use arrow::array::ArrayRef;
/// # use modelardb_types::types::{Timestamp, TimestampArray};
/// #
/// # let array_ref: ArrayRef = Arc::new(TimestampArray::from(Vec::<Timestamp>::new()));
/// let timestamp_array = modelardb_types::cast!(array_ref, TimestampArray);
/// ```
///
/// # Panics
///
/// Panics if the result of `expr` cannot be cast to `type`.
#[macro_export]
macro_rules! cast {
($expr:expr, $type:ident) => {
$expr.as_any().downcast_ref::<$type>().unwrap()
};
}

/// Extract an [`array`](arrow::array::Array) from a slice of `ArrayRef` and cast it to the
/// specified type or panic:
///
/// ```
/// # use std::sync::Arc;
Expand All @@ -32,16 +57,16 @@
///
/// # Panics
///
/// Panics with `message` if `index` is not in `values` or if it cannot be cast to `type`.
/// Panics if `index` is not in `values` or if the array cannot be cast to `type`.
#[macro_export]
macro_rules! value {
($values:ident, $index:expr, $type:ident) => {
$values[$index].as_any().downcast_ref::<$type>().unwrap()
($values:expr, $index:expr, $type:ident) => {
$crate::cast!($values[$index], $type)
};
}

/// Extract an [`array`](arrow::array::Array) from a
/// [`RecordBatch`](arrow::record_batch::RecordBatch) and cast it to the specified type:
/// [`RecordBatch`](arrow::record_batch::RecordBatch) and cast it to the specified type or panic:
///
/// ```
/// # use std::sync::Arc;
Expand All @@ -62,21 +87,17 @@ macro_rules! value {
///
/// # Panics
///
/// Panics if `column` is not in `batch` or if it cannot be cast to `type`.
/// Panics if `index` is not in `batch` or if the array cannot be cast to `type`.
#[macro_export]
macro_rules! array {
($batch:ident, $column:expr, $type:ident) => {
$batch
.column($column)
.as_any()
.downcast_ref::<$type>()
.unwrap()
($batch:expr, $index:expr, $type:ident) => {
$crate::cast!($batch.column($index), $type)
};
}

/// Extract the [`arrays`](arrow::array::Array) required to execute queries against a model table
/// from a [`RecordBatch`](arrow::record_batch::RecordBatch), cast them to the required type, and
/// assign the resulting arrays to the specified variables:
/// assign the resulting arrays to the specified variables. Panics if any of these steps fail:
///
/// ```
/// # use std::sync::Arc;
Expand Down