Skip to content

[VL] Result mismatch in CollectList when 'sort by' clause is involved #8227

@NEUpanning

Description

@NEUpanning

Backend

VL (Velox)

Bug description

Reproducing SQL:

CREATE OR REPLACE TEMP VIEW temp_table AS
SELECT * FROM VALUES
  (1, 'a'), (1, 'b'), (1, 'c'),
  (2, 'd'), (2, 'e'), (2, 'f'),
  (3, 'g'), (3, 'h'), (3, 'i')
AS t(id, value);

SELECT 1-id, collect_list(value) AS values_list
FROM (
  select * from
  (SELECT id, value
  FROM temp_table
  DISTRIBUTE BY rand()) 
  DISTRIBUTE BY id sort by id,value
) t
GROUP BY 1;

Results:

The vanilla result is deterministic and values_list is sorted by value column:

id values_list
1 ["a", "b", "c"]
2 ["d", "e", "f"]
3 ["g", "h", "i"]
The gluten result is non-deterministic and values_list is not sorted, e.g. :

id values_list
1 ["a", "c", "b"]
3 ["g", "i", "h"]
2 ["f", "e", "d"]

gluten physical plan:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   VeloxColumnarToRowExec
   +- ^(4) HashAggregateTransformer(keys=[(1 - id#0)#15], functions=[velox_collect_list(value#1)], isStreamingAgg=false, output=[(1 - id)#4, values_list#2])
      +- ^(4) InputIteratorTransformer[(1 - id#0)#15, buffer#11]
         +- CustomShuffleReader coalesced
            +- ShuffleQueryStage 2
               +- ColumnarExchange hashpartitioning((1 - id#0)#15, 20), ENSURE_REQUIREMENTS, [(1 - id#0)#15, buffer#11], [id=#1137], [id=#1137], [OUTPUT] List((1 - id#0):IntegerType, buffer:ArrayType(StringType,false)), [OUTPUT] List((1 - id#0):IntegerType, buffer:ArrayType(StringType,false))
                  +- VeloxAppendBatches 3276
                     +- ^(3) ProjectExecTransformer [hash((1 - id#0)#15, 42) AS hash_partition_key#16, (1 - id#0)#15, buffer#11]
                        +- ^(3) FlushableHashAggregateTransformer(keys=[(1 - id#0)#15], functions=[partial_velox_collect_list(value#1)], isStreamingAgg=true, output=[(1 - id#0)#15, buffer#11])
                           +- ^(3) ProjectExecTransformer [id#0, value#1, (1 - id#0) AS (1 - id#0)#15]
                              +- ^(3) SortExecTransformer [(1 - id#0)#15 ASC NULLS FIRST], false, 0
                                 +- ^(3) ProjectExecTransformer [id#0, value#1, (1 - id#0) AS (1 - id#0)#15]
                                    +- ^(3) SortExecTransformer [id#0 ASC NULLS FIRST, value#1 ASC NULLS FIRST], false, 0
                                       +- ^(3) InputIteratorTransformer[id#0, value#1]
                                          +- ShuffleQueryStage 1
                                             +- ColumnarExchange hashpartitioning(id#0, 20), REPARTITION_WITH_NUM, [id#0, value#1], [id=#1015], [id=#1015], [OUTPUT] List(id:IntegerType, value:StringType), [OUTPUT] List(id:IntegerType, value:StringType)
                                                +- VeloxAppendBatches 3276
                                                   +- ^(2) ProjectExecTransformer [hash(id#0, 42) AS hash_partition_key#14, id#0, value#1]
                                                      +- ^(2) InputIteratorTransformer[id#0, value#1, _nondeterministic#5]
                                                         +- ShuffleQueryStage 0
                                                            +- ColumnarExchange hashpartitioning(_nondeterministic#5, 20), REPARTITION_WITH_NUM, [id#0, value#1, _nondeterministic#5], [id=#944], [id=#944], [OUTPUT] List(id:IntegerType, value:StringType, _nondeterministic:DoubleType), [OUTPUT] List(id:IntegerType, value:StringType, _nondeterministic:DoubleType)
                                                               +- VeloxAppendBatches 3276
                                                                  +- ^(1) ProjectExecTransformer [hash(_nondeterministic#5, 42) AS hash_partition_key#12, id#0, value#1, _nondeterministic#5]
                                                                     +- ^(1) InputIteratorTransformer[id#0, value#1, _nondeterministic#5]
                                                                        +- RowToVeloxColumnar
                                                                           +- LocalTableScan [id#0, value#1, _nondeterministic#5]

vanilla spark physical plan:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- ObjectHashAggregate(keys=[(1 - id#0)#13], functions=[collect_list(value#1, 0, 0)], output=[(1 - id)#4, values_list#2])
   +- CustomShuffleReader coalesced
      +- ShuffleQueryStage 2
         +- Exchange hashpartitioning((1 - id#0)#13, 20), true, [id=#766]
            +- ObjectHashAggregate(keys=[(1 - id#0) AS (1 - id#0)#13], functions=[partial_collect_list(value#1, 0, 0)], output=[(1 - id#0)#13, buf#10])
               +- *(2) Sort [id#0 ASC NULLS FIRST, value#1 ASC NULLS FIRST], false, 0
                  +- ShuffleQueryStage 1
                     +- Exchange hashpartitioning(id#0, 20), false, [id=#739]
                        +- *(1) Project [id#0, value#1]
                           +- ShuffleQueryStage 0
                              +- Exchange hashpartitioning(_nondeterministic#5, 20), false, [id=#680]
                                 +- LocalTableScan [id#0, value#1, _nondeterministic#5]

Root cause
CollectList that is a TypedImperativeAggregate function is replaced by VeloxCollectList function that is a DeclarativeAggregate in logical optimization phase. Therefore, SortAggregateExec is used in gluten for VeloxCollectList instead of ObjectHashAggregateExec. The SortOrder of SortExec that corresponds to SortExecTransformer [(1 - id#0)#15 ASC NULLS FIRST] in the Gluten physical plan differs from the SortExec added by the 'sort by' clause, which corresponds to SortExecTransformer [id#0 ASC NULLS FIRST, value#1 ASC NULLS FIRST]. As a result, the result is mismatched with vanilla spark.

Spark version

None

Gluten version

1.2.0

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't workingtriage

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions