From d3800acc61a6dddd100e84df5045b06f559b8edb Mon Sep 17 00:00:00 2001 From: Ke Jia Date: Fri, 20 Mar 2026 06:22:02 -0700 Subject: [PATCH 1/2] Implement the executeCollect() method in ColumnarCollectLimitExec --- .../execution/ColumnarCollectLimitExec.scala | 18 ++++++++++++++++++ .../execution/ColumnarToRowExecBase.scala | 9 +++++++++ 2 files changed, 27 insertions(+) diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala index a3ee421e59a5..a5dc67fea7f2 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala @@ -19,6 +19,7 @@ package org.apache.gluten.execution import org.apache.gluten.columnarbatch.ColumnarBatches import org.apache.gluten.columnarbatch.VeloxColumnarBatches +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.vectorized.ColumnarBatch @@ -96,6 +97,23 @@ case class ColumnarCollectLimitExec( } } + override def executeCollect(): Array[InternalRow] = { + if (limit <= 0) { + return Array.empty + } + val adjusted = math.max(0, limit - math.max(0, offset)) + if (adjusted == 0) { + return Array.empty + } + val rowsRdd = child.executeColumnar().mapPartitions { + it => + val rows = VeloxColumnarToRowExec.toRowIterator(it) + rows.map(_.copy()) + } + val taken = rowsRdd.take(offset + adjusted) + if (offset > 0) taken.drop(offset) else taken + } + override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) } diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarToRowExecBase.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarToRowExecBase.scala index 3d3f4445c5bb..35e41a4e5384 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarToRowExecBase.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarToRowExecBase.scala @@ -60,4 +60,13 @@ abstract class ColumnarToRowExecBase(child: SparkPlan) override def doExecute(): RDD[InternalRow] = { doExecuteInternal() } + + override def executeCollect(): Array[InternalRow] = { + child match { + case l: ColumnarCollectLimitBaseExec => + l.executeCollect() + case _ => + super.executeCollect() + } + } } From 71b9aeebae07c1c84a37c375a5abfac6229530d0 Mon Sep 17 00:00:00 2001 From: Ke Jia Date: Fri, 20 Mar 2026 11:42:32 -0700 Subject: [PATCH 2/2] fix failed unit tests --- .../execution/ColumnarCollectLimitExec.scala | 25 +++++++++++-------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala index a5dc67fea7f2..55d5a8628db8 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala @@ -98,20 +98,25 @@ case class ColumnarCollectLimitExec( } override def executeCollect(): Array[InternalRow] = { - if (limit <= 0) { - return Array.empty - } - val adjusted = math.max(0, limit - math.max(0, offset)) - if (adjusted == 0) { - return Array.empty - } - val rowsRdd = child.executeColumnar().mapPartitions { + val inputBatches = + if (limit >= 0) { + child.executeColumnar() + } else { + executeColumnar() + } + val rowsRdd = inputBatches.mapPartitions { it => val rows = VeloxColumnarToRowExec.toRowIterator(it) rows.map(_.copy()) } - val taken = rowsRdd.take(offset + adjusted) - if (offset > 0) taken.drop(offset) else taken + if (limit >= 0) { + val toTake = math.max(0, offset) + limit + val taken = rowsRdd.take(toTake) + if (offset > 0) taken.drop(offset) else taken + } else { + val all = rowsRdd.collect() + if (offset > 0) all.drop(offset) else all + } } override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =