Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions datafusion/physical-plan/src/spill/spill_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! Define the `SpillManager` struct, which is responsible for reading and writing `RecordBatch`es to raw files based on the provided configurations.

use arrow::array::StringViewArray;
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use datafusion_execution::runtime_env::RuntimeEnv;
Expand Down Expand Up @@ -185,6 +186,8 @@ impl SpillManager {

pub(crate) trait GetSlicedSize {
/// Returns the size of the `RecordBatch` when sliced.
/// Note: if multiple arrays or even a single array share the same data buffers, we may double count each buffer.
/// Therefore, make sure we call gc() or organize_stringview_arrays() before using this method.
fn get_sliced_size(&self) -> Result<usize>;
}

Expand All @@ -194,7 +197,83 @@ impl GetSlicedSize for RecordBatch {
for array in self.columns() {
let data = array.to_data();
total += data.get_slice_memory_size()?;

// While StringViewArray holds large data buffer for non inlined string, the Arrow layout (BufferSpec)
// does not include any data buffers. Currently, ArrayData::get_slice_memory_size()
// under-counts memory size by accounting only views buffer although data buffer is cloned during slice()
//
// Therefore, we manually add the sum of the lengths used by all non inlined views
// on top of the sliced size for views buffer. This matches the intended semantics of
// "bytes needed if we materialized exactly this slice into fresh buffers".
// This is a workaround until https://github.com/apache/arrow-rs/issues/8230
if let Some(sv) = array.as_any().downcast_ref::<StringViewArray>() {
for buffer in sv.data_buffers() {
total += buffer.capacity();
}
}
}
Ok(total)
}
}

#[cfg(test)]
mod tests {
use crate::spill::{get_record_batch_memory_size, spill_manager::GetSlicedSize};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::{
array::{ArrayRef, StringViewArray},
record_batch::RecordBatch,
};
use datafusion_common::Result;
use std::sync::Arc;

#[test]
fn check_sliced_size_for_string_view_array() -> Result<()> {
let array_length = 50;
let short_len = 8;
let long_len = 25;

// Build StringViewArray that includes both inline strings and non inlined strings
let strings: Vec<String> = (0..array_length)
.map(|i| {
if i % 2 == 0 {
"a".repeat(short_len)
} else {
"b".repeat(long_len)
}
})
.collect();

let string_array = StringViewArray::from(strings);
let array_ref: ArrayRef = Arc::new(string_array);
let batch = RecordBatch::try_new(
Arc::new(Schema::new(vec![Field::new(
"strings",
DataType::Utf8View,
false,
)])),
vec![array_ref],
)
.unwrap();

// We did not slice the batch, so these two memory size should be equal
assert_eq!(
batch.get_sliced_size().unwrap(),
get_record_batch_memory_size(&batch)
);

// Slice the batch into half
let half_batch = batch.slice(0, array_length / 2);
// Now sliced_size is smaller because the views buffer is sliced
assert!(
half_batch.get_sliced_size().unwrap()
< get_record_batch_memory_size(&half_batch)
);
let data = arrow::array::Array::to_data(&half_batch.column(0));
let views_sliced_size = data.get_slice_memory_size()?;
// The sliced size should be larger than sliced views buffer size
assert!(views_sliced_size < half_batch.get_sliced_size().unwrap());

Ok(())
}
}