-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-53406][SQL] Avoid unnecessary shuffle join in direct passthrough shuffle id #52443
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[SPARK-53406][SQL] Avoid unnecessary shuffle join in direct passthrough shuffle id #52443
Conversation
numPartitions: Int) extends Expression with Partitioning with Unevaluable { | ||
|
||
// TODO(SPARK-53401): Support Shuffle Spec in Direct Partition ID Pass Through | ||
// We don't support creating partitioning for ShufflePartitionIdPassThrough. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment should be put in ShufflePartitionIdPassThroughSpec#canCreatePartitioning
) | ||
|
||
// Mismatched key positions should be incompatible | ||
val dist1 = ClusteredDistribution(Seq($"a", $"b")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the same as dist
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was going to wait for address of @cloud-fan's comments :) but i agree this test is a bit hard to read. The variable declaration is not so consistent (ie, on very top or above the check). Also as wenchen point out, some variable like ClusteredDistribution(Seq(
How about, we make some better names too, like ab, cd, a, b? (or a bit longer if necessary)
checkCompatible(
ShufflePartitionIdPassThrough(b, 10).createDist(ab),
ShuffelPartitionIdPassThrough(c, 10).createDist(cd)
expected = false
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for calling it out! I followed this and updated all tests in this pr
newChildren.isDefined | ||
} | ||
|
||
val isShufflePassThroughCompatible = !isKeyGroupCompatible && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we combine this with the key group compatibility check?
// Check if the following conditions are satisfied:
// 1. There are exactly two children (e.g., join). Note that Spark doesn't support
// multi-way join at the moment, so this check should be sufficient.
// 2. All children are of the same partitioning, and they are compatible with each other
// If both are true, skip shuffle.
val areChildrenCompatible = parent.isDefined &&
children.length == 2 && childrenIndexes.length == 2 && {
key group check and id pass through check
}
TransformExpression(BucketFunction, expr, Some(numBuckets)) | ||
} | ||
|
||
test("ShufflePartitionIdPassThrough - avoid necessary shuffle when they are compatible") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
test("ShufflePartitionIdPassThrough - avoid necessary shuffle when they are compatible") { | |
test("ShufflePartitionIdPassThrough - avoid unnecessary shuffle when children are compatible") { |
val plan2 = DummySparkPlan( | ||
outputPartitioning = ShufflePartitionIdPassThrough(DirectShufflePartitionID(exprC), 5)) | ||
// Join on different keys than partitioning keys | ||
val smjExec = SortMergeJoinExec(exprB :: Nil, exprD :: Nil, Inner, None, plan1, plan2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val smjExec = SortMergeJoinExec(exprB :: Nil, exprD :: Nil, Inner, None, plan1, plan2) | |
val smjExec = SortMergeJoinExec(exprA :: exprB :: Nil, exprD :: exprC :: Nil, Inner, None, plan1, plan2) |
case SortMergeJoinExec(_, _, _, _, | ||
SortExec(_, _, _: DummySparkPlan, _), | ||
SortExec(_, _, ShuffleExchangeExec(_: HashPartitioning, _, _, _), _), _) => | ||
// Right side shuffled, left side kept as-is |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the test result is random and these two cases can both happen?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, i dont get why the return value match both
} | ||
} | ||
|
||
test("ShufflePartitionIdPassThrough - cross position matching behavior") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks the same as ShufflePartitionIdPassThrough incompatibility - key position mismatch
?
} | ||
} | ||
|
||
test("ShufflePartitionIdPassThrough - compatible when partition key matches at any position") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we merge this test case into ShufflePartitionIdPassThrough - compatible with multiple clustering keys
?
// as the partitioning expression, we check compatibility as follows: | ||
// 1. Same number of clustering expressions | ||
// 2. Same number of partitions | ||
// 3. each pair of partitioning expression from both sides has overlapping positions in their |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry, im a bit new to this and curious, do we need to actually check the partition expression of repartitionByExpression() is compatible or not (like for KeyGroupedPartitioning) before we can skip shuffle, or I miss something ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need to, as it's just an expression without any data so it's more similar to the hash partitioning, while key grouped partitioning has actual partition data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah got it, i read the comment on DirectShufflePartitionID
that explain the limitation on the child expression, didnt see that earlier.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code changes sense to me
What changes were proposed in this pull request?
This PR implements compatibility checking for ShufflePartitionIdPassThrough partitioning to avoid unnecessary shuffle operations when both sides of a join use compatible direct partition ID pass-through.
Why are the changes needed?
Improve performance
Does this PR introduce any user-facing change?
No
How was this patch tested?
New unit tests
Was this patch authored or co-authored using generative AI tooling?
Yes