Add GPU ArrayAggregate for SUM/PRODUCT/MAX/MIN/ALL/ANY#14652
Add GPU ArrayAggregate for SUM/PRODUCT/MAX/MIN/ALL/ANY#14652thirtiseven wants to merge 12 commits intoNVIDIA:mainfrom
Conversation
Implements ArrayAggregate on the GPU for lambdas decomposable as (acc, x) -> acc + g(x) with an identity finish. Other shapes fall back to the CPU. - ArrayAggregateDecomposer: match merge body against Add(acc, g), unwrap Cast on the acc side, validate finish is identity - GpuArrayAggregate: evaluate g(x) via the existing GpuArrayTransformBase explode path, then listReduce + combine with zero. Uses NullPolicy.INCLUDE so null elements poison the sum, matching Spark's iterative `acc + null = null` semantics. Empty (non-null) lists are substituted with op's identity before the add-zero step; null lists stay null and propagate. - Decimal identity scalar is bound to the column's DType (via Scalar.fromDecimal(BigInteger, DType)) so ifElse / add don't trip on DECIMAL32-vs-DECIMAL128 width mismatches. - Unit tests for the decomposer and integration tests covering the client pattern, null / empty arrays, non-zero init, outer-column refs, struct-field access, long overflow, decimal sum, and fallback cases. Addresses part of NVIDIA#8532. A follow-up refactor will introduce a normalize pass and AggOp trait to support PRODUCT / MIN / MAX / AND / OR and Cast stripping. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Greptile SummaryThis PR implements Confidence Score: 5/5Safe to merge; all previously identified P0/P1 issues have been addressed and only P2 defensive-coding concerns remain. All items from the prior review thread (NaN semantics for MAX/MIN, float SUM ordering, nullable correctness, ALL/ANY 3VL) are resolved. Two new P2 findings: convertToGpuImpl throws IllegalStateException instead of willNotWorkOnGpu if g fails GPU tagging (dead code in practice given the decomposer invariant), and a minor resource-leak edge case when GpuColumnVector.from throws after mergeNulls. Neither represents a present defect on any tested code path. sql-plugin/src/main/scala/com/nvidia/spark/rapids/higherOrderFunctions.scala — GpuArrayAggregateMeta.convertToGpuImpl and the mergeNulls call site in GpuArrayAggregate.columnarEval Important Files Changed
Flowchart%%{init: {'theme': 'neutral'}}%%
flowchart TD
A[ArrayAggregate SQL node] --> B[GpuArrayAggregateMeta.tagExprForGpu]
B --> C{ArrayAggregateDecomposer.decompose}
C -->|Left: reason| D[willNotWorkOnGpu → CPU fallback]
C -->|Right: Decomposition| E{SumOp/ProductOp on Float/Double?}
E -->|variableFloatAgg=false| D
E -->|OK| F[decomposition = Some d]
F --> G[convertToGpuImpl]
G --> H[Wrap d.g as gLambda x→g x]
H --> I[GpuArrayAggregate columnarEval]
I --> J[Step 1: makeElementProjectBatch + g eval + listReduce]
J --> K[Step 2: substituteMask → ifElse identity]
K --> L[Step 3: combineWithZero via op]
L --> M{arg has nulls?}
M -->|Yes| N[Step 4: mergeNulls restores null list rows]
M -->|No| O[Return result]
N --> O
Reviews (7): Last reviewed commit: "Gate float/double SUM/PRODUCT in ArrayAg..." | Re-trigger Greptile |
cuDF's segmented max / min and the combineWithZero compare + ifElse both follow
IEEE 754, where fmax(NaN, x) = x (NaN is absorbed). Spark's Greatest / Least use
Double.compare, which treats NaN as larger than every other value and propagates
it. For an array column containing NaN, GPU would return a non-NaN result while
CPU would return NaN — a data-correctness divergence flagged on the PR.
Since customer workloads for ArrayAggregate MAX / MIN are integral-typed, take
the conservative route: narrow ExtremumOp.supportsType to
{Byte, Short, Int, Long} and fall back to CPU on Float / Double. Precise
NaN propagation would require two extra segmented reduces per batch and
explicit NaN handling in combineWithZero; leaving that for a follow-up if a
real workload needs it.
Added an integration test that verifies the Float / Double fallback.
|
Thanks @greptile-apps for catching the NaN divergence. Pushed a conservative fix: Precise NaN propagation on the GPU would need two extra segmented reduces per batch (for |
|
The fix looks correct. One minor nit: The reasoning in the scaladoc ( Tip: You can customize Greptile's behavior for this repo with |
ExtremumOp.supportsType already rejects Float/Double, so the per-type cascade in MaxOp.identityScalar and MinOp.identityScalar will never see them today. Remove the dead arms to keep the code honest; they can be added back when a follow-up adds real NaN propagation.
Previously `nullable = argument.nullable`, which is incorrect when the outer list is non-nullable but its elements can be null. For INCLUDE-policy ops (SUM / PRODUCT), a null element anywhere in a non-null list poisons the accumulator and yields a null output row. Reporting nullable=false in that case can let the Spark optimizer elide null checks and cause silent wrong results downstream. Spark's own ArrayAggregate.nullable returns `argument.nullable || finish.nullable`, and the finish lambda's acc variable is always bound with nullable=true (see ArrayAggregate.bind's `zero.dataType -> true`), so the CPU side is effectively always true. Match that.
|
Good catch, pushed a fix. |
… int tests
- AllOp / AnyOp combineWithZero now pass outDType to cuDF's and / or (ProductOp
and SumOp were already doing this via add / mul). MaxOp / MinOp use ifElse,
which has no outType argument; the output type there is determined by the
inputs (both reduced and zero carry outDType already).
- ArrayAggregateDecomposition now stores the g sub-expression directly instead
of a gChildIndex. convertToGpuImpl locates the GPU g via fastEquals under the
merge body's meta children rather than positional indexing, so we don't rely
on the Add / Multiply / And / Or / Greatest / Least meta-children happening
to be laid out as [left, right]. Decomposer unit tests assert on g identity.
- Each val-chain boundary in columnarEval is now wrapped in closeOnExcept(x) {
_ => withResource(x) { ... } } so the transitional window between a step's
result being assigned and the next withResource taking ownership is covered.
cuDF's ColumnVector.close is refcount-based, so the rare double-close on
exception paths is benign.
- Added a parametric native-integer integration test hitting int / long SUM,
int MAX, and long MIN without the Cast-to-BIGINT that the existing numeric
test uses, exercising identityScalar / combineWithZero on the primitive
types directly.
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
|
Filed follow-up performance issue #14711 |
…gg.enabled cuDF's parallel tree-reduction sums in a different order than Spark's sequential left-fold, so GPU vs CPU can differ in the low bits on Float/Double. Reuse the same conf gate as scalar GpuSum/GpuAverage (spark.rapids.sql.variableFloatAgg.enabled) via GpuOverrides.checkAndTagFloatAgg in GpuArrayAggregateMeta. Default true matches the global policy. Added integration test for the conf=false fallback path. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Contributes to #8532.
What ArrayAggregate does
ArrayAggregate(SQLaggregate/reduce) is a 4-arg HOF:Semantically a fold: starting from
zero, each elementxis folded into the accumulator viamerge, andfinishtransforms the final accumulator (defaults to identity). Any lambda is allowed, so the CPU implementation is a sequential per-element loop — there is no general GPU mapping.Motivation. A customer workload using
aggregate(filter(arr, …), 0, (acc, z) -> acc + CASE WHEN <predicate> THEN 1 ELSE 0 END)currently falls back entirely becauseArrayAggregatehas no GPU implementation. #8532 lists the candidate strategies (cuDF AST, PTX UDF, pattern rewrite); this PR takes the pattern-rewrite path with a small extensibleAggOptrait.Approach
A general fold can't run on the GPU, but in practice almost every real
aggregatelambda has the shapewhere
opis associative + commutative (Add/Multiply/Greatest/Least/And/Or) andg(x)only depends onx. Withfinishas identity, this is equivalent to:g(x)parallelises element-wise;opmaps to cuDF'slistReduce(segmented reduction). At plan time we pattern-match the lambda; matched shapes run on the GPU, everything else falls back to CPU.Components
AggOp(sealed trait) + 6 case objects (SumOp,ProductOp,MaxOp,MinOp,AllOp,AnyOp)Add→SumOp,Greatest(a,b)→MaxOp, …). Adding a new op is one case object + appending toallOps.ArrayAggregateDecomposer.decompose(merge, finish, argType, zeroType): Either[String, Decomposition]g.dataType == zeroType, ALL/ANY null-element guard. ReturnsLeft(reason)for any mismatch.GpuArrayAggregateMetadecompose'sLefttowillNotWorkOnGpu(reason), picks the GPU sub-meta forg(x)by index (gIsLeftOfMergeBody) instead of walking the meta tree.GpuArrayAggregateGpuArrayTransformBasewas split: it now holds only the explode + lambda-projection plumbing shared withGpuArrayTransform / GpuArrayExists / GpuArrayFilter(which moved to a newGpuArrayElementWiseTransformsub-trait that ownstransformListColumnView+ the standardcolumnarEval).GpuArrayAggregateextends the smaller base directly and writes its owncolumnarEval.Runtime pipeline
For each input batch:
g(x)+ segmented reduce. ReuseGpuArrayTransformBase's explode path to projectg(x)over the array children, rewrap aslist<g_type>with the original offsets/validity, and calllistReduce(op.cudfAgg, op.nullPolicy, outDType).substituteMaskbuilds a mask of those rows andifElses inop.identityScalar(zeroType)so the next step sees a sane value.result = op.combineWithZero(adjusted, zero, outDType). Whenzerois aGpuLiteralwe pass acudf.Scalardirectly (skips the per-batch column broadcast); otherwise we evaluatezeroas a column.NULL_MAX / NULL_MIN / LOGICAL_AND / LOGICAL_ORdon't propagate null the Spark 3VL way, so the finalNullUtilities.mergeNullsre-appliesarg's null mask. Skipped outright whenarghas no nulls.Each step releases the previous step's intermediate GPU column via
withResourcechaining, so the exploded batch (typically the largest allocation) doesn't pin memory through the whole pipeline.Null semantics
NullPolicy.INCLUDE— one null element poisons the row, matching Spark's iterativeacc op null = null.NullPolicy.EXCLUDE— null elements are skipped, matching Spark'sGreatest/Least.combineWithZerousesBinaryOp.NULL_MAX / NULL_MIN(the same primitiveGpuGreatest/GpuLeastuse), so step 3 is one kernel rather thangreaterThan + ifElse.mergeNulls.What falls back to CPU (and why)
All of these are decided in the decomposer and surface as a
willNotWorkOnGpureason:g(x)referencesaccg.dataType != zeroTypecheckInputDataTypesrequires merge.dataType == zero.dataType; we honour that to avoid silent recasts.fmax(NaN, x) = x(NaN absorbed) vs Spark'sDouble.compare(NaN propagates).false AND null = false,true OR null = true).DecimalUtils.multiplyDecimalsfor overflow handling; SUM on DECIMAL is fine and supported.finishSubtract/Divide/ 3-aryGreatest/ etc.opmust be one of the registered associative+commutative shapes.argumentelement type outsidecommonCudfTypes + DECIMAL_128 + NULL + BINARY + STRUCTGpuArrayTransformBase's explode path. BINARY support added here matches PR #14618.Tests
ArrayAggregateDecomposerSuite) — 24 cases covering each op's positive shape, commutation,Cast(acc, …)unwrap, complexg, and every rejection path the decomposer owns (wrong shape, non-identity finish,greferences acc, type mismatch, NaN-affected MAX, ALL on null-bearing arrays).higher_order_functions_test.py) — happy paths for each op (numeric / native int / boolean), structural variations (count-if pattern, filter+aggregate composed pattern, non-zero init, null/empty array, lambda referencing outer column, zero from outer column, struct field, BINARY element vialength(x), decimal SUM, long-overflow wrap), plus explicit fallback tests for every rejection reason listed above.ArrayAggregate perf
500000 rows × array(len≈100), 200 replicas/query, avg of 3 runs (warmup × 1)
Checklists
Documentation
Testing
Performance