Skip to content
2 changes: 1 addition & 1 deletion datafusion/core/tests/fuzz_cases/sort_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ async fn test_sort_strings_100k_mem() {
#[cfg_attr(tarpaulin, ignore)]
async fn test_sort_multi_columns_100k_mem() {
for (batch_size, should_spill) in
[(5, false), (1000, false), (10000, true), (20000, true)]
[(5, false), (750, false), (10000, true), (20000, true)]
{
let (input, collected) = SortTest::new()
.with_int32_utf8_batches(batch_size)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::sync::Arc;

use crate::fuzz_cases::aggregate_fuzz::assert_spill_count_metric;
use crate::fuzz_cases::once_exec::OnceExec;
use arrow::array::UInt64Array;
use arrow::array::{UInt32Array, UInt64Array};
use arrow::{array::StringArray, compute::SortOptions, record_batch::RecordBatch};
use arrow_schema::{DataType, Field, Schema};
use datafusion::common::Result;
Expand Down Expand Up @@ -80,9 +80,9 @@ async fn test_sort_with_limited_memory() -> Result<()> {

let total_spill_files_size = spill_count * record_batch_size;
assert!(
total_spill_files_size > pool_size,
"Total spill files size {total_spill_files_size} should be greater than pool size {pool_size}",
);
total_spill_files_size > pool_size,
"Total spill files size {total_spill_files_size} should be greater than pool size {pool_size}",
);

Ok(())
}
Expand Down Expand Up @@ -325,6 +325,138 @@ fn grow_memory_as_much_as_possible(
Ok(was_able_to_grow)
}

#[tokio::test]
async fn test_sort_with_limited_memory_larger_cursor() -> Result<()> {
let record_batch_size = 8192;
let pool_size = 2 * MB as usize;
let task_ctx = {
let memory_pool = Arc::new(FairSpillPool::new(pool_size));
TaskContext::default()
.with_session_config(
SessionConfig::new()
.with_batch_size(record_batch_size)
.with_sort_spill_reservation_bytes(1),
)
.with_runtime(Arc::new(
RuntimeEnvBuilder::new()
.with_memory_pool(memory_pool)
.build()?,
))
};

// Test that the merge degree of multi level merge sort cannot be fixed size when there is not enough memory
run_sort_test_q5_like_no_payload(RunTestWithLimitedMemoryArgs {
pool_size,
task_ctx: Arc::new(task_ctx),
number_of_record_batches: 100,
get_size_of_record_batch_to_generate: Box::pin(move |_| pool_size / 6),
memory_behavior: Default::default(),
})
.await?;

Ok(())
}
/// Q5: 3 sort keys + no payload
async fn run_sort_test_q5_like_no_payload(
mut args: RunTestWithLimitedMemoryArgs,
) -> Result<usize> {
let _ = std::mem::replace(
&mut args.get_size_of_record_batch_to_generate,
Box::pin(move |_| unreachable!("should not be called after take")),
);

// l_linenumber: Int32, l_suppkey: Int64, l_orderkey: Int64
let scan_schema = Arc::new(Schema::new(vec![
Field::new("l_linenumber", DataType::UInt32, false),
Field::new("l_suppkey", DataType::UInt64, false),
Field::new("l_orderkey", DataType::UInt64, false),
]));

let record_batch_size = args.task_ctx.session_config().batch_size() as i64;

let lnum_step: i64 = 5;
let supp_step: i64 = 9_973;
let order_step: i64 = 104_729;

const L_LINE_NUMBER_CARD: i64 = 7;
const L_SUPPKEY_CARD: i64 = 10_000;
const L_ORDERKEY_CARD: i64 = 1_500_000;
let schema = Arc::clone(&scan_schema);
let plan: Arc<dyn ExecutionPlan> =
Arc::new(OnceExec::new(Box::pin(RecordBatchStreamAdapter::new(
Arc::clone(&schema),
futures::stream::iter((0..args.number_of_record_batches as i64).map(
move |batch_idx| {
let start = batch_idx * record_batch_size;

// l_linenumber ∈ [1,7], l_suppkey ∈ [1,10_000], l_orderkey ∈ [1,1_500_000]
let linenumbers =
UInt32Array::from_iter_values((0..record_batch_size).map(|i| {
let n = start + i;
// 1..=7
((n * lnum_step).rem_euclid(L_LINE_NUMBER_CARD) + 1) as u32
}));

let suppkeys =
UInt64Array::from_iter_values((0..record_batch_size).map(|i| {
let n = start + i;
// 1..=10_000
((n * supp_step).rem_euclid(L_SUPPKEY_CARD) + 1) as u64
}));

let orderkeys =
UInt64Array::from_iter_values((0..record_batch_size).map(|i| {
let n = start + i;
// 1..=1_500_000
((n * order_step).rem_euclid(L_ORDERKEY_CARD) + 1) as u64
}));

RecordBatch::try_new(
Arc::clone(&schema),
vec![
Arc::new(linenumbers) as _,
Arc::new(suppkeys) as _,
Arc::new(orderkeys) as _,
],
)
.map_err(|e| e.into())
},
)),
))));

// ORDER BY l_linenumber, l_suppkey, l_orderkey ASC
let sort_exec = Arc::new(SortExec::new(
LexOrdering::new(vec![
PhysicalSortExpr {
expr: col("l_linenumber", &scan_schema)?,
options: SortOptions {
descending: false,
nulls_first: true,
},
},
PhysicalSortExpr {
expr: col("l_suppkey", &scan_schema)?,
options: SortOptions {
descending: false,
nulls_first: true,
},
},
PhysicalSortExpr {
expr: col("l_orderkey", &scan_schema)?,
options: SortOptions {
descending: false,
nulls_first: true,
},
},
])
.unwrap(),
plan,
));

let result = sort_exec.execute(0, Arc::clone(&args.task_ctx))?;
run_test(args, sort_exec, result).await
}

#[tokio::test]
async fn test_aggregate_with_high_cardinality_with_limited_memory() -> Result<()> {
let record_batch_size = 8192;
Expand Down
6 changes: 2 additions & 4 deletions datafusion/core/tests/memory_limit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,11 +313,9 @@ async fn sort_spill_reservation() {
// the sort will fail while trying to merge
.with_sort_spill_reservation_bytes(1024);

// TODO since we col->row conversion first, sort succeeds without limited sort_spill_reservation_bytes
test.clone()
.with_expected_errors(vec![
"Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:",
"B for ExternalSorterMerge",
])
.with_expected_success()
Comment on lines +316 to +318
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering whether the sort_spill_reservation_bytes configuration will still be necessary after this change. Since we're now estimating the memory required for the merge phase based on the actual ratio measured from the first batch, the manual override may become less important in practice.

.with_config(config)
.run()
.await;
Expand Down
5 changes: 5 additions & 0 deletions datafusion/physical-plan/src/sorts/multi_level_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ pub(crate) struct MultiLevelMergeBuilder {
reservation: MemoryReservation,
fetch: Option<usize>,
enable_round_robin_tie_breaker: bool,
cursor_batch_ratio: Option<f64>,
}

impl Debug for MultiLevelMergeBuilder {
Expand All @@ -158,6 +159,7 @@ impl MultiLevelMergeBuilder {
reservation: MemoryReservation,
fetch: Option<usize>,
enable_round_robin_tie_breaker: bool,
cursor_batch_ratio: Option<f64>,
) -> Self {
Self {
spill_manager,
Expand All @@ -170,6 +172,7 @@ impl MultiLevelMergeBuilder {
reservation,
enable_round_robin_tie_breaker,
fetch,
cursor_batch_ratio,
}
}

Expand Down Expand Up @@ -356,6 +359,8 @@ impl MultiLevelMergeBuilder {
// and there should be some upper limit to memory reservation so we won't starve the system
match reservation.try_grow(get_reserved_byte_for_record_batch_size(
spill.max_record_batch_memory * buffer_len,
// ratio for both original batch(+1.0) and its associated cursor
self.cursor_batch_ratio.map(|ratio| ratio + 1.0),
)) {
Ok(_) => {
number_of_spills_to_read_for_current_phase += 1;
Expand Down
Loading