Skip to content

Conversation

rluvaton
Copy link
Contributor

Which issue does this PR close?

N/A

Rationale for this change

In ExternalSorter there are cases where the batch_size config is not respected

What changes are included in this PR?

Wrap the stream with BatchSplitStream when we only have a single batch to sort and when we can sort in place (the paths that did not respect the batch size)

Are these changes tested?

Yes

Are there any user-facing changes?

Not an API change

@github-actions github-actions bot added the physical-plan Changes to the physical-plan crate label Aug 19, 2025
Comment on lines +745 to +749
output = Box::pin(BatchSplitStream::new(
output,
self.batch_size,
self.metrics.split_metrics.clone(),
));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem I have with BatchSplitStream is that it is not accounting for memory that it hold when it hold the batches

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here caller is already accounting the memory, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in our case yes

@rluvaton rluvaton changed the title fix: sort should output batch_size batches fix: sort should always output batches with batch_size rows Aug 19, 2025
Copy link
Contributor

@2010YOUY01 2010YOUY01 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you. It's well written and well tested.

We found an issue in #17163, and this PR is the direct fix.

output = Box::pin(BatchSplitStream::new(
output,
self.batch_size,
self.metrics.split_metrics.clone(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we construct a dummy metric here? If we add it into operator's metrics set, it will display inside EXPLAIN ANALYZE, and I think this metric is not useful in SortExec

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not, split can also happen before spill so it can be called more than one time

Comment on lines +745 to +749
output = Box::pin(BatchSplitStream::new(
output,
self.batch_size,
self.metrics.split_metrics.clone(),
));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here caller is already accounting the memory, right?

@ding-young
Copy link
Contributor

Thank you for the fix! I actually encountered the same issue, and really glad to see this PR.

BTW, have you considered directly creating a stream that sorts a batch and slices it into multiple batch_size batches, instead of wrapping it with BatchSplitStream?
That way, we could avoid generating dummy metrics and have more direct control over memory reservation.

Something like this: https://gist.github.com/ding-young/25ce38cd6449a3c35bc0df430e6a2eab

I'm just sharing the idea since you mentioned the issue with BatchSplitStream, and I think this pr's approach is also nice. 😄

Plus, after this PR gets merged, I'll be glad to hear your review on #17163

@rluvaton
Copy link
Contributor Author

Thank you for the fix! I actually encountered the same issue, and really glad to see this PR.

BTW, have you considered directly creating a stream that sorts a batch and slices it into multiple batch_size batches, instead of wrapping it with BatchSplitStream? That way, we could avoid generating dummy metrics and have more direct control over memory reservation.

Yes but decided not to for:

  1. simplicity
  2. it should be batch split stream responsibility as datafusion take on memory is whoever hold the data should account for it

    /// between operators. Furthermore, operators should not reserve memory for the
    /// batches they produce. Instead, if a consumer operator needs to hold batches
    /// from its producers in memory for an extended period, it is the consumer
    /// operator's responsibility to reserve the necessary memory for those batches.

@2010YOUY01 2010YOUY01 merged commit 3b7eb26 into apache:main Aug 21, 2025
28 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
physical-plan Changes to the physical-plan crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants