@@ -22,7 +22,7 @@ use std::sync::Arc;
22
22
23
23
use crate :: fuzz_cases:: aggregate_fuzz:: assert_spill_count_metric;
24
24
use crate :: fuzz_cases:: once_exec:: OnceExec ;
25
- use arrow:: array:: UInt64Array ;
25
+ use arrow:: array:: { UInt32Array , UInt64Array } ;
26
26
use arrow:: { array:: StringArray , compute:: SortOptions , record_batch:: RecordBatch } ;
27
27
use arrow_schema:: { DataType , Field , Schema } ;
28
28
use datafusion:: common:: Result ;
@@ -325,6 +325,138 @@ fn grow_memory_as_much_as_possible(
325
325
Ok ( was_able_to_grow)
326
326
}
327
327
328
+ #[ tokio:: test]
329
+ async fn test_sort_with_limited_memory_larger_cursor ( ) -> Result < ( ) > {
330
+ let record_batch_size = 8192 ;
331
+ let pool_size = 2 * MB as usize ;
332
+ let task_ctx = {
333
+ let memory_pool = Arc :: new ( FairSpillPool :: new ( pool_size) ) ;
334
+ TaskContext :: default ( )
335
+ . with_session_config (
336
+ SessionConfig :: new ( )
337
+ . with_batch_size ( record_batch_size)
338
+ . with_sort_spill_reservation_bytes ( 1 ) ,
339
+ )
340
+ . with_runtime ( Arc :: new (
341
+ RuntimeEnvBuilder :: new ( )
342
+ . with_memory_pool ( memory_pool)
343
+ . build ( ) ?,
344
+ ) )
345
+ } ;
346
+
347
+ // Test that the merge degree of multi level merge sort cannot be fixed size when there is not enough memory
348
+ run_sort_test_q5_like_no_payload ( RunTestWithLimitedMemoryArgs {
349
+ pool_size,
350
+ task_ctx : Arc :: new ( task_ctx) ,
351
+ number_of_record_batches : 100 ,
352
+ get_size_of_record_batch_to_generate : Box :: pin ( move |_| pool_size / 6 ) ,
353
+ memory_behavior : Default :: default ( ) ,
354
+ } )
355
+ . await ?;
356
+
357
+ Ok ( ( ) )
358
+ }
359
+ /// Q5: 3 sort keys + no payload
360
+ async fn run_sort_test_q5_like_no_payload (
361
+ mut args : RunTestWithLimitedMemoryArgs ,
362
+ ) -> Result < usize > {
363
+ let _ = std:: mem:: replace (
364
+ & mut args. get_size_of_record_batch_to_generate ,
365
+ Box :: pin ( move |_| unreachable ! ( "should not be called after take" ) ) ,
366
+ ) ;
367
+
368
+ // l_linenumber: Int32, l_suppkey: Int64, l_orderkey: Int64
369
+ let scan_schema = Arc :: new ( Schema :: new ( vec ! [
370
+ Field :: new( "l_linenumber" , DataType :: UInt32 , false ) ,
371
+ Field :: new( "l_suppkey" , DataType :: UInt64 , false ) ,
372
+ Field :: new( "l_orderkey" , DataType :: UInt64 , false ) ,
373
+ ] ) ) ;
374
+
375
+ let record_batch_size = args. task_ctx . session_config ( ) . batch_size ( ) as i64 ;
376
+
377
+ let lnum_step: i64 = 5 ;
378
+ let supp_step: i64 = 9_973 ;
379
+ let order_step: i64 = 104_729 ;
380
+
381
+ const L_LINE_NUMBER_CARD : i64 = 7 ;
382
+ const L_SUPPKEY_CARD : i64 = 10_000 ;
383
+ const L_ORDERKEY_CARD : i64 = 1_500_000 ;
384
+ let schema = Arc :: clone ( & scan_schema) ;
385
+ let plan: Arc < dyn ExecutionPlan > =
386
+ Arc :: new ( OnceExec :: new ( Box :: pin ( RecordBatchStreamAdapter :: new (
387
+ Arc :: clone ( & schema) ,
388
+ futures:: stream:: iter ( ( 0 ..args. number_of_record_batches as i64 ) . map (
389
+ move |batch_idx| {
390
+ let start = batch_idx * record_batch_size;
391
+
392
+ // l_linenumber ∈ [1,7], l_suppkey ∈ [1,10_000], l_orderkey ∈ [1,1_500_000]
393
+ let linenumbers =
394
+ UInt32Array :: from_iter_values ( ( 0 ..record_batch_size) . map ( |i| {
395
+ let n = start + i;
396
+ // 1..=7
397
+ ( ( n * lnum_step) . rem_euclid ( L_LINE_NUMBER_CARD ) + 1 ) as u32
398
+ } ) ) ;
399
+
400
+ let suppkeys =
401
+ UInt64Array :: from_iter_values ( ( 0 ..record_batch_size) . map ( |i| {
402
+ let n = start + i;
403
+ // 1..=10_000
404
+ ( ( n * supp_step) . rem_euclid ( L_SUPPKEY_CARD ) + 1 ) as u64
405
+ } ) ) ;
406
+
407
+ let orderkeys =
408
+ UInt64Array :: from_iter_values ( ( 0 ..record_batch_size) . map ( |i| {
409
+ let n = start + i;
410
+ // 1..=1_500_000
411
+ ( ( n * order_step) . rem_euclid ( L_ORDERKEY_CARD ) + 1 ) as u64
412
+ } ) ) ;
413
+
414
+ RecordBatch :: try_new (
415
+ Arc :: clone ( & schema) ,
416
+ vec ! [
417
+ Arc :: new( linenumbers) as _,
418
+ Arc :: new( suppkeys) as _,
419
+ Arc :: new( orderkeys) as _,
420
+ ] ,
421
+ )
422
+ . map_err ( |e| e. into ( ) )
423
+ } ,
424
+ ) ) ,
425
+ ) ) ) ) ;
426
+
427
+ // ORDER BY l_linenumber, l_suppkey, l_orderkey ASC
428
+ let sort_exec = Arc :: new ( SortExec :: new (
429
+ LexOrdering :: new ( vec ! [
430
+ PhysicalSortExpr {
431
+ expr: col( "l_linenumber" , & scan_schema) ?,
432
+ options: SortOptions {
433
+ descending: false ,
434
+ nulls_first: true ,
435
+ } ,
436
+ } ,
437
+ PhysicalSortExpr {
438
+ expr: col( "l_suppkey" , & scan_schema) ?,
439
+ options: SortOptions {
440
+ descending: false ,
441
+ nulls_first: true ,
442
+ } ,
443
+ } ,
444
+ PhysicalSortExpr {
445
+ expr: col( "l_orderkey" , & scan_schema) ?,
446
+ options: SortOptions {
447
+ descending: false ,
448
+ nulls_first: true ,
449
+ } ,
450
+ } ,
451
+ ] )
452
+ . unwrap ( ) ,
453
+ plan,
454
+ ) ) ;
455
+
456
+ let result = sort_exec. execute ( 0 , Arc :: clone ( & args. task_ctx ) ) ?;
457
+ run_test ( args, sort_exec, result) . await
458
+ }
459
+
328
460
#[ tokio:: test]
329
461
async fn test_aggregate_with_high_cardinality_with_limited_memory ( ) -> Result < ( ) > {
330
462
let record_batch_size = 8192 ;
0 commit comments