You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Is your feature request related to a problem? Please describe.
When multiple array higher-order functions (transform, filter, exists, aggregate, …) sit in the same Project and all read the same array column, each one independently goes through the same explode step. GpuArrayTransformBase.makeElementProjectBatch calls Table.explode per HOF instance, so a Project with N HOFs over the same column allocates N exploded batches and runs N copies of the lambda projection plumbing — even though steps 1–2 of every HOF are identical.
CPU Spark doesn't suffer from this because codegen folds the whole Project into a single per-row loop where every lambda is evaluated sequentially against the same row scan. On the GPU the explode is an explicit kernel and there's no codegen-level fusion, so the redundant work is paid in full.
Concrete observation (perf testing PR #14652): with 200 ArrayAggregate calls over the same array<int> column on a 50M-element dataset, GPU time was ~75 ms per HOF — clearly dominated by the per-call explode + JNI + memory-pool overhead, not the segmented reduce itself. The analogous CPU run was ~36 s vs GPU ~16 s, a 2.5× speedup that would be much higher if the explode were shared. Real customer workloads have only 1–3 HOFs per Project so the absolute impact is smaller, but the ratio of wasted work scales linearly with the number of HOFs over the same column.
Describe the solution you'd like
A plan-level rule that identifies a Project with multiple array-HOF expressions reading the same argument and rewrites them into a single fused node — call it GpuMultiArrayHOFExec (or similar) — that:
Computes the union of boundIntermediate references and explodes the input array once.
Evaluates each HOF's lambda body against the shared exploded batch, producing one result column per HOF.
Runs each HOF's post-process (rewrap-as-list for transform, applyBooleanMask for filter, segmented reduce + identity-substitute + combineWithZero for aggregate, etc.) against the shared list offsets/validity.
The fused node would emit one output column per original HOF, with the schemas and types preserved.
This converts the per-HOF cost from O(N) × explode + O(N) × lambda eval + O(N) × post-process to 1 × explode + O(N) × lambda eval + O(N) × post-process, with the explode + JNI overhead being the biggest savings.
Describe alternatives you've considered
Rely on GpuEquivalentExpressions sub-expression elimination (already on by default via spark.rapids.sql.combined.expressions.enabled). In principle this should dedup identical HOF expressions, but in practice every fresh expr("aggregate(…)") (or any plan that re-resolves lambdas) gets new NamedLambdaVariable exprIds, so semanticEquals returns false and the dedup misses. We could try to canonicalize lambda variable exprIds before equivalence checks, but that's brittle and only helps when the user happens to write structurally identical HOFs — the fusion approach above benefits all HOFs over the same column regardless of whether they're identical.
Skip the explode and use cuDF lists::* segmented APIs directly. Works well when the lambda body is simple (e.g. transform(arr, x -> x + 1) could become a kernel over the list child). Doesn't generalize to lambda bodies with If, CaseWhen, nested expressions, or outer-column references — explode is still the most general way to evaluate arbitrary lambda bodies element-wise.
Subexpression elimination at the HOF granularity in spark-rapids' own plan rules, ignoring lambda exprId mismatches. Closer to what we want but still doesn't help when HOFs differ in their lambda body — fusion of the explode step only still applies.
Additional context
Relevant code: sql-plugin/src/main/scala/com/nvidia/spark/rapids/higherOrderFunctions.scala — GpuArrayTransformBase.makeElementProjectBatch is the per-HOF explode call site. GpuArrayElementWiseTransform.columnarEval and GpuArrayAggregate.columnarEval are the consumers.
A reasonable first slice would be to fuse only homogeneous HOFs (e.g. multiple transform calls over the same column, or multiple aggregate calls) before tackling the heterogeneous case.
Worth checking whether Databricks Photon or other SQL engines do this kind of fusion — they all face the same explode-per-HOF problem when leaving codegen-land.
Is your feature request related to a problem? Please describe.
When multiple array higher-order functions (
transform,filter,exists,aggregate, …) sit in the sameProjectand all read the same array column, each one independently goes through the same explode step.GpuArrayTransformBase.makeElementProjectBatchcallsTable.explodeper HOF instance, so aProjectwith N HOFs over the same column allocates N exploded batches and runs N copies of the lambda projection plumbing — even though steps 1–2 of every HOF are identical.CPU Spark doesn't suffer from this because codegen folds the whole
Projectinto a single per-row loop where every lambda is evaluated sequentially against the same row scan. On the GPU the explode is an explicit kernel and there's no codegen-level fusion, so the redundant work is paid in full.Concrete observation (perf testing PR #14652): with 200
ArrayAggregatecalls over the samearray<int>column on a 50M-element dataset, GPU time was ~75 ms per HOF — clearly dominated by the per-call explode + JNI + memory-pool overhead, not the segmented reduce itself. The analogous CPU run was ~36 s vs GPU ~16 s, a 2.5× speedup that would be much higher if the explode were shared. Real customer workloads have only 1–3 HOFs per Project so the absolute impact is smaller, but the ratio of wasted work scales linearly with the number of HOFs over the same column.Describe the solution you'd like
A plan-level rule that identifies a
Projectwith multiple array-HOF expressions reading the same argument and rewrites them into a single fused node — call itGpuMultiArrayHOFExec(or similar) — that:boundIntermediatereferences and explodes the input array once.transform,applyBooleanMaskforfilter, segmented reduce + identity-substitute + combineWithZero foraggregate, etc.) against the shared list offsets/validity.The fused node would emit one output column per original HOF, with the schemas and types preserved.
This converts the per-HOF cost from
O(N) × explode + O(N) × lambda eval + O(N) × post-processto1 × explode + O(N) × lambda eval + O(N) × post-process, with the explode + JNI overhead being the biggest savings.Describe alternatives you've considered
GpuEquivalentExpressionssub-expression elimination (already on by default viaspark.rapids.sql.combined.expressions.enabled). In principle this should dedup identical HOF expressions, but in practice every freshexpr("aggregate(…)")(or any plan that re-resolves lambdas) gets newNamedLambdaVariableexprIds, sosemanticEqualsreturns false and the dedup misses. We could try to canonicalize lambda variable exprIds before equivalence checks, but that's brittle and only helps when the user happens to write structurally identical HOFs — the fusion approach above benefits all HOFs over the same column regardless of whether they're identical.lists::*segmented APIs directly. Works well when the lambda body is simple (e.g.transform(arr, x -> x + 1)could become a kernel over the list child). Doesn't generalize to lambda bodies withIf,CaseWhen, nested expressions, or outer-column references — explode is still the most general way to evaluate arbitrary lambda bodies element-wise.Additional context
sql-plugin/src/main/scala/com/nvidia/spark/rapids/higherOrderFunctions.scala—GpuArrayTransformBase.makeElementProjectBatchis the per-HOF explode call site.GpuArrayElementWiseTransform.columnarEvalandGpuArrayAggregate.columnarEvalare the consumers.transformcalls over the same column, or multipleaggregatecalls) before tackling the heterogeneous case.