Add split-and-retry path to GpuProjectExec#14724
Add split-and-retry path to GpuProjectExec#14724thirtiseven wants to merge 4 commits intoNVIDIA:mainfrom
Conversation
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Greptile SummaryThis PR adds a split-and-retry OOM recovery path to
Confidence Score: 3/5Not safe to merge as-is: the GpuProjectExec split-retry path double-closes the input SpillableColumnarBatch, corrupting its reference count. A P1 resource-lifecycle bug (double-close / ref-count corruption) is present in the GpuProjectExec dispatch path. The analogous GpuTieredProject path already applies the correct incRefCount fix, so the GpuProjectExec path missed the same treatment. The P1 pulls the score below the 4/5 ceiling. sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala — specifically GpuProjectExec.projectWithRetrySingleBatch split-retry early-return and the projectAndCloseWithRetrySingleBatch wrapper around it. Important Files Changed
Sequence DiagramsequenceDiagram
participant Caller
participant PAndC as projectAndCloseWithRetrySingleBatch
participant PWR as projectWithRetrySingleBatch
participant RSR as runWithSplitRetry
participant WR as withRetry(sb)
participant BNE as buildNonEmptyBatchFromTypes
Caller->>PAndC: sb (refCount=1)
PAndC->>PAndC: withResource(sb)
PAndC->>PWR: sb
alt All deterministic & splitRetry enabled
PWR->>RSR: sb (takes ownership)
RSR->>WR: sb
loop For each sub-batch (split on OOM)
WR->>RSR: spillable
RSR->>RSR: runProject(cb) → pieces += result
end
Note over WR: closes sb (refCount→0) ⚠️
RSR->>BNE: pieces.toArray
BNE-->>RSR: concatenated ColumnarBatch
RSR-->>PWR: ColumnarBatch
PWR-->>PAndC: ColumnarBatch
Note over PAndC: withResource closes sb again (refCount→-1) ⚠️ BUG
else Mixed / non-deterministic
PWR->>PWR: withRetryNoSplit
PWR-->>PAndC: ColumnarBatch
Note over PAndC: withResource closes sb correctly (refCount→0)
end
PAndC-->>Caller: ColumnarBatch
Reviews (1): Last reviewed commit: "simplify" | Re-trigger Greptile |
| if (new RapidsConf(SQLConf.get).isProjectSplitRetryEnabled && | ||
| boundExprs.forall(_.deterministic)) { | ||
| val retryables = GpuExpressionsUtils.collectRetryables(boundExprs) | ||
| return runWithSplitRetry(sb, retryables, project(_, boundExprs)) |
There was a problem hiding this comment.
Double-close of
sb in the split-retry path
runWithSplitRetry passes sb to withRetry, which takes ownership and closes sb once the iterator is drained (confirmed by the docstring: "This function will close the elements of input as fn is successfully invoked"). However, the caller projectAndCloseWithRetrySingleBatch wraps the whole call in withResource(sb) { _ => … }, which calls sb.close() a second time after projectWithRetrySingleBatch returns. This decrements SpillableColumnarBatchImpl.refCount from 0 to -1. The double-free guard is currently commented out (see issue #10161), so no exception is thrown, but the ref-count state is corrupted.
Compare with the analogous fix already applied in GpuTieredProject.projectWithRetrySingleBatchInternal: when closeInputBatch=false, sb.incRefCount() is called before passing to runWithSplitRetry to compensate for the extra close. The same pattern is needed here — call runWithSplitRetry(sb.incRefCount(), retryables, ...) so that withRetry holds one reference and the outer withResource(sb) holds the other.
Fixes #14191.
Description
GpuProjectExeccurrently retries OOM viawithRetryNoSplitonly — if a projection runs cuDF kernels with internal scratch allocations the pre-split estimator cannot see (regex, string-replace, etc.), an OOM during the projection cannot be recovered and fails the task.This PR adds a split-and-retry path: for purely deterministic projections,
GpuProjectExecnow drives the projection throughRmmRapidsRetryIterator.withRetry(splitSpillableInHalfByRows). On GPU OOM the input batch is halved by rows and the projection is re-run on each half; the resulting sub-batches are concatenated back viaConcatAndConsumeAll.buildNonEmptyBatchFromTypesto preserve the single-batch contract ofprojectAndCloseWithRetrySingleBatch.Mixed deterministic + non-deterministic projections fall through to the existing
withRetryNoSplitpath: the non-deterministic side is computed once on the full input batch and stitched row-by-row to the deterministic side, and row-splitting either side would break that alignment. BothGpuProjectExec.projectWithRetrySingleBatchandGpuTieredProject.projectWithRetrySingleBatchInternaldispatch on the same condition (forall(_.deterministic)/areAllDeterministic).A new internal config
spark.rapids.sql.projectExec.splitRetry.enabled(defaulttrue) gates the new path so it can be disabled to revert to the prior behavior if regressions surface.Checklists
Documentation
The new config is
internal()and the public behavior is unchanged in non-OOM cases.Testing
tests/src/test/scala/com/nvidia/spark/rapids/ProjectSplitRetrySuite.scalaadds five cases covering both dispatch sites:GpuProjectExec.projectAndCloseWithRetrySingleBatchwithforceSplitAndRetryOOMproduces output equal to a single-batch projectionGpuSplitAndRetryOOMthrough the legacy pathGpuTieredProject.projectAndCloseWithRetrySingleBatchGpuRandprojection routes through the legacy retry path (verified by comparing the rand column to a non-injected reference run)forceRetryOOMon the new path returns a single piece without splittingPerformance
TBD — perf testing to follow.