-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Improve memory accounting for cursor in external sort #17163
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
6e889a2
to
8b4a19e
Compare
// TODO since we col->row conversion first, sort succeeds without limited sort_spill_reservation_bytes | ||
test.clone() | ||
.with_expected_errors(vec![ | ||
"Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:", | ||
"B for ExternalSorterMerge", | ||
]) | ||
.with_expected_success() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering whether the sort_spill_reservation_bytes
configuration will still be necessary after this change. Since we're now estimating the memory required for the merge phase based on the actual ratio measured from the first batch, the manual override may become less important in practice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the work. The idea looks good to me, I'm trying to figure out why the last batch can be too large.
BTW, what's the reason for the remaining sort_tpch
query failures?
@@ -88,6 +88,10 @@ pub struct StreamingMergeBuilder<'a> { | |||
fetch: Option<usize>, | |||
reservation: Option<MemoryReservation>, | |||
enable_round_robin_tie_breaker: bool, | |||
/// Ratio of memory used by cursor batch to the original input RecordBatch. | |||
/// Used in `get_reserved_byte_for_record_batch_size` to estimate required memory for merge phase. | |||
/// Only passed when constructing MultiLevelMergeBuilder |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/// Only passed when constructing MultiLevelMergeBuilder | |
/// Only passed when constructing MultiLevelMergeBuilder | |
/// | |
/// A cursor is an interface for comparing entries between two batches. | |
/// It includes the original batch and the sort keys converted to Arrow Row format | |
/// for faster comparison. See `cursor.rs` for more details. |
datafusion/core/tests/fuzz_cases/spilling_fuzz_in_memory_constrained_env.rs
Outdated
Show resolved
Hide resolved
// Only for first time | ||
if self.cursor_batch_ratio.is_none() { | ||
let ratio = self.calculate_ratio(&input)?; | ||
self.cursor_batch_ratio = Some(ratio); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we defer this until we actually need it? as if each sort only get a single batch or we do in place sorting we will convert it to rows without need
(1) During (2) During sort, when there have already been previous spills in (3) In multi-level merge, when we can no longer reduce the number of spill files or buffer size to fit the memory limit. For (1) and (2), the core issue is that in order to spill in-memory batches, we need to sort them via streaming merge using a loser tree. But since all in-memory batches are already buffered at that point, attempting to merge them all as loser tree inputs further increases memory pressure, leading to failure. I’m currently experimenting with limiting the fanout (merge degree) of the loser tree to see if this can help mitigate the problem. |
8b4a19e
to
e44ca83
Compare
let sort_res = self.reservation.try_grow(batch_size); | ||
|
||
// if cursor is smaller than half of original batch, we may say that 2x batch is enough for both sort and merge phase | ||
let cursor_small = self |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here it's doing: if rows/original_batch < 1.0
, round up the rows
size estimation to 1.0
instead of the actual ratio. Why doing this round up?
BTW is this PR ready to review now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The round up here is still a heuristic. It's hard to provide a strict guarantee that StreamingMerge
won’t fail, even if we use the estimated ratio to increase merge_reservation
. So for now, I kept the conservative approach using the original 2x ratio when the estimated ratio is < 1.0.It’s a temporary workaround and might be revised later.
I still need to clean up some of the comments and test code, so it’d be great if you could take a look on the core logic for now.
Side note: this PR does fix the sort-tpch Q5 issue mentioned above, but based on my recent investigation into the StringViewArray
memory usage problem, this PR doesn’t seem to resolve the failures in other queries (Q3, Q7-11 that includes VARCHAR
col) if I make the correct memory estimation on StringViewArray
.
Which issue does this PR close?
Rationale for this change
DataFusion's sort implementation converts batches into a row-based format when there are two or more sort columns, in order to implement efficient comparisons using cursors. Currently, the memory reservation for each batch is estimated as roughly 2× the size of the original batch, to account for both the original and the cursor batch. However, the actual memory usage of the cursor may vary significantly, especially when payload columns and sort columns vary, leading to potential over- or under-estimation.
What changes are included in this PR?
This PR addresses two memory accounting issues:
We now convert the first batch to a row-based cursor, compute the memory ratio between the original batch and the cursor batch, and record it. This ratio is then used in the
get_reserved_byte_for_record_batch_size
function.TODO
Are these changes tested?
I ran sort-tpch with a memory limit, and more queries are now able to complete successfully.
While the performance of external sorts may have improved due to more accurate memory reservation, there’s a possibility of regression for small or fast queries since the first batch is converted and then discarded solely for estimation purposes.
(This hasn’t been tested yet, but it's something to be aware of.)
Running sort-tpch with 4 partition 100M memory, now Q8, 11 succeedsNo. after using correct memory size for
StringViewArray
, this PR still suffers from these query failures.cargo run --profile release-nonlto --bin dfbench -- sort-tpch --path benchmarks/data/tpch_sf1 --partitions 4 --memory-limit 100M
Are there any user-facing changes?