From 3f33d3e7c42c886ab829cd78eda58fdd92196049 Mon Sep 17 00:00:00 2001 From: SeoYoung Lee Date: Mon, 25 Aug 2025 12:58:43 +0000 Subject: [PATCH] fix incorrect memory accounting for StringViewArray --- .../physical-plan/src/spill/spill_manager.rs | 79 +++++++++++++++++++ 1 file changed, 79 insertions(+) diff --git a/datafusion/physical-plan/src/spill/spill_manager.rs b/datafusion/physical-plan/src/spill/spill_manager.rs index 467a7aa72b9c..ad23bd66a021 100644 --- a/datafusion/physical-plan/src/spill/spill_manager.rs +++ b/datafusion/physical-plan/src/spill/spill_manager.rs @@ -17,6 +17,7 @@ //! Define the `SpillManager` struct, which is responsible for reading and writing `RecordBatch`es to raw files based on the provided configurations. +use arrow::array::StringViewArray; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use datafusion_execution::runtime_env::RuntimeEnv; @@ -185,6 +186,8 @@ impl SpillManager { pub(crate) trait GetSlicedSize { /// Returns the size of the `RecordBatch` when sliced. + /// Note: if multiple arrays or even a single array share the same data buffers, we may double count each buffer. + /// Therefore, make sure we call gc() or organize_stringview_arrays() before using this method. fn get_sliced_size(&self) -> Result; } @@ -194,7 +197,83 @@ impl GetSlicedSize for RecordBatch { for array in self.columns() { let data = array.to_data(); total += data.get_slice_memory_size()?; + + // While StringViewArray holds large data buffer for non inlined string, the Arrow layout (BufferSpec) + // does not include any data buffers. Currently, ArrayData::get_slice_memory_size() + // under-counts memory size by accounting only views buffer although data buffer is cloned during slice() + // + // Therefore, we manually add the sum of the lengths used by all non inlined views + // on top of the sliced size for views buffer. This matches the intended semantics of + // "bytes needed if we materialized exactly this slice into fresh buffers". + // This is a workaround until https://github.com/apache/arrow-rs/issues/8230 + if let Some(sv) = array.as_any().downcast_ref::() { + for buffer in sv.data_buffers() { + total += buffer.capacity(); + } + } } Ok(total) } } + +#[cfg(test)] +mod tests { + use crate::spill::{get_record_batch_memory_size, spill_manager::GetSlicedSize}; + use arrow::datatypes::{DataType, Field, Schema}; + use arrow::{ + array::{ArrayRef, StringViewArray}, + record_batch::RecordBatch, + }; + use datafusion_common::Result; + use std::sync::Arc; + + #[test] + fn check_sliced_size_for_string_view_array() -> Result<()> { + let array_length = 50; + let short_len = 8; + let long_len = 25; + + // Build StringViewArray that includes both inline strings and non inlined strings + let strings: Vec = (0..array_length) + .map(|i| { + if i % 2 == 0 { + "a".repeat(short_len) + } else { + "b".repeat(long_len) + } + }) + .collect(); + + let string_array = StringViewArray::from(strings); + let array_ref: ArrayRef = Arc::new(string_array); + let batch = RecordBatch::try_new( + Arc::new(Schema::new(vec![Field::new( + "strings", + DataType::Utf8View, + false, + )])), + vec![array_ref], + ) + .unwrap(); + + // We did not slice the batch, so these two memory size should be equal + assert_eq!( + batch.get_sliced_size().unwrap(), + get_record_batch_memory_size(&batch) + ); + + // Slice the batch into half + let half_batch = batch.slice(0, array_length / 2); + // Now sliced_size is smaller because the views buffer is sliced + assert!( + half_batch.get_sliced_size().unwrap() + < get_record_batch_memory_size(&half_batch) + ); + let data = arrow::array::Array::to_data(&half_batch.column(0)); + let views_sliced_size = data.get_slice_memory_size()?; + // The sliced size should be larger than sliced views buffer size + assert!(views_sliced_size < half_batch.get_sliced_size().unwrap()); + + Ok(()) + } +}