diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala index 2d2193a53871..60cd76a916a5 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala @@ -106,7 +106,7 @@ object VeloxValidatorApi { private def isPrimitiveType(dataType: DataType): Boolean = { dataType match { case BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType | - StringType | BinaryType | _: DecimalType | DateType | TimestampType | + StringType | BinaryType | _: DecimalType | DateType | TimestampType | TimestampNTZType | YearMonthIntervalType.DEFAULT | NullType => true case _ => false diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala index 097c7d489785..8581646c8b5d 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala @@ -50,6 +50,7 @@ case class VeloxColumnarToRowExec(child: SparkPlan) extends ColumnarToRowExecBas case _: DoubleType => case _: StringType => case _: TimestampType => + case _: TimestampNTZType => case _: DateType => case _: BinaryType => case _: DecimalType => diff --git a/backends-velox/src/test/scala/org/apache/gluten/functions/DateFunctionsValidateSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/functions/DateFunctionsValidateSuite.scala index d120842fd292..359f33bcdaaa 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/functions/DateFunctionsValidateSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/functions/DateFunctionsValidateSuite.scala @@ -16,10 +16,11 @@ */ package org.apache.gluten.functions -import org.apache.gluten.execution.ProjectExecTransformer +import org.apache.gluten.execution.{BatchScanExecTransformer, ProjectExecTransformer} import org.apache.spark.sql.execution.ProjectExec -import org.apache.spark.sql.types.Decimal +import org.apache.spark.sql.internal.SQLConf.TimestampTypes +import org.apache.spark.sql.types.{Decimal, StructType, TimestampNTZType} import java.sql.Timestamp @@ -489,4 +490,73 @@ class DateFunctionsValidateSuite extends FunctionsValidateSuite { } } } + + test("cast string to timestamp_ntz") { + val inputs: Seq[String] = Seq( + "1970-01-01", + "1970-01-01 00:00:00-02:00", + "1970-01-01 00:00:00 +02:00", + "2000-01-01", + "1970-01-01 00:00:00", + "2000-01-01 12:21:56", + "2015-03-18T12:03:17Z", + "2015-03-18 12:03:17", + "2015-03-18T12:03:17", + "2015-03-18 12:03:17.123", + "2015-03-18T12:03:17.123", + "2015-03-18T12:03:17.456", + "2015-03-18 12:03:17.456" + ) + + inputs.foreach { + s => + val query = s"select cast('$s' as timestamp_ntz)" + runQueryAndCompare(query) { + checkGlutenPlan[ProjectExecTransformer] + } + } + } + + test("read as timestamp_ntz") { + val inputs: Seq[String] = Seq( + "1970-01-01", + "1970-01-01 00:00:00-02:00", + "1970-01-01 00:00:00 +02:00", + "2000-01-01", + "1970-01-01 00:00:00", + "2000-01-01 12:21:56", + "2015-03-18T12:03:17Z", + "2015-03-18 12:03:17", + "2015-03-18T12:03:17", + "2015-03-18 12:03:17.123", + "2015-03-18T12:03:17.123", + "2015-03-18T12:03:17.456", + "2015-03-18 12:03:17.456" + ) + + withTempPath { + dir => + withSQLConf("spark.sql.timestampType" -> TimestampTypes.TIMESTAMP_NTZ.toString) { + val path = dir.getAbsolutePath + val inputDF = spark.createDataset(inputs).toDF("input") + val df = inputDF.selectExpr("cast(input as timestamp_ntz) as ts") + // TODO: The Parquet writer creates TIMESTAMP(MICROS,true), but for timestamp_ntz type, + // the 'isAdjustedToUTC' should be false. Spark will fail to read this file as + // timestamp_ntz values. + df.coalesce(1).write.mode("overwrite").parquet(path) + + val schema = new StructType().add("ts", TimestampNTZType) + val readDf = spark.read.schema(schema).parquet(path) + readDf.collect() + assert( + readDf.queryExecution.executedPlan.exists( + f => f.isInstanceOf[BatchScanExecTransformer])) + + // Ensures the fallback of unsupported function works. + readDf.createOrReplaceTempView("view") + val testDf = spark.sql("select hour(ts) from view") + testDf.collect() + } + } + } } diff --git a/cpp/velox/compute/VeloxBackend.cc b/cpp/velox/compute/VeloxBackend.cc index ddf417405eb8..abbe6c76806e 100644 --- a/cpp/velox/compute/VeloxBackend.cc +++ b/cpp/velox/compute/VeloxBackend.cc @@ -39,6 +39,7 @@ #include "jni/JniFileSystem.h" #include "memory/GlutenBufferedInputBuilder.h" #include "operators/functions/SparkExprToSubfieldFilterParser.h" +#include "operators/plannodes/RowVectorStream.h" #include "shuffle/ArrowShuffleDictionaryWriter.h" #include "udf/UdfLoader.h" #include "utils/Exception.h" @@ -47,7 +48,6 @@ #include "velox/connectors/hive/BufferedInputBuilder.h" #include "velox/connectors/hive/HiveConnector.h" #include "velox/connectors/hive/HiveDataSource.h" -#include "operators/plannodes/RowVectorStream.h" #include "velox/connectors/hive/storage_adapters/abfs/RegisterAbfsFileSystem.h" // @manual #include "velox/connectors/hive/storage_adapters/gcs/RegisterGcsFileSystem.h" // @manual #include "velox/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h" @@ -56,6 +56,7 @@ #include "velox/dwio/orc/reader/OrcReader.h" #include "velox/dwio/parquet/RegisterParquetReader.h" #include "velox/dwio/parquet/RegisterParquetWriter.h" +#include "velox/functions/sparksql/types/TimestampNTZRegistration.h" #include "velox/serializers/PrestoSerializer.h" DECLARE_bool(velox_exception_user_stacktrace_enabled); @@ -195,6 +196,7 @@ void VeloxBackend::init( velox::orc::registerOrcReaderFactory(); velox::exec::ExprToSubfieldFilterParser::registerParser(std::make_unique()); velox::connector::hive::BufferedInputBuilder::registerBuilder(std::make_shared()); + velox::functions::sparksql::registerTimestampNTZType(); // Register Velox functions registerAllFunctions(); @@ -318,13 +320,13 @@ void VeloxBackend::initConnector(const std::shared_ptr(kHiveConnectorId, hiveConf, ioExecutor_.get())); - + // Register value-stream connector for runtime iterator-based inputs auto valueStreamDynamicFilterEnabled = backendConf_->get(kValueStreamDynamicFilterEnabled, kValueStreamDynamicFilterEnabledDefault); velox::connector::registerConnector( std::make_shared(kIteratorConnectorId, hiveConf, valueStreamDynamicFilterEnabled)); - + #ifdef GLUTEN_ENABLE_GPU if (backendConf_->get(kCudfEnableTableScan, kCudfEnableTableScanDefault) && backendConf_->get(kCudfEnabled, kCudfEnabledDefault)) { diff --git a/cpp/velox/substrait/SubstraitParser.cc b/cpp/velox/substrait/SubstraitParser.cc index c67ad56f0932..8cf941a8bc02 100644 --- a/cpp/velox/substrait/SubstraitParser.cc +++ b/cpp/velox/substrait/SubstraitParser.cc @@ -17,9 +17,9 @@ #include "SubstraitParser.h" #include "TypeUtils.h" -#include "velox/common/base/Exceptions.h" - #include "VeloxSubstraitSignature.h" +#include "velox/common/base/Exceptions.h" +#include "velox/functions/sparksql/types/TimestampNTZType.h" namespace gluten { @@ -78,6 +78,8 @@ TypePtr SubstraitParser::parseType(const ::substrait::Type& substraitType, bool return DATE(); case ::substrait::Type::KindCase::kTimestampTz: return TIMESTAMP(); + case ::substrait::Type::KindCase::kTimestamp: + return facebook::velox::functions::sparksql::TIMESTAMP_NTZ(); case ::substrait::Type::KindCase::kDecimal: { auto precision = substraitType.decimal().precision(); auto scale = substraitType.decimal().scale(); @@ -356,6 +358,9 @@ int64_t SubstraitParser::getLiteralValue(const ::substrait::Expression::Literal& memcpy(&decimalValue, decimal.c_str(), 16); return static_cast(decimalValue); } + if (literal.has_timestamp()) { + return literal.timestamp(); + } return literal.i64(); } diff --git a/cpp/velox/substrait/SubstraitToVeloxExpr.cc b/cpp/velox/substrait/SubstraitToVeloxExpr.cc index 467df25ca881..f19ff0806be1 100755 --- a/cpp/velox/substrait/SubstraitToVeloxExpr.cc +++ b/cpp/velox/substrait/SubstraitToVeloxExpr.cc @@ -17,11 +17,11 @@ #include "SubstraitToVeloxExpr.h" #include "TypeUtils.h" +#include "velox/functions/sparksql/types/TimestampNTZType.h" +#include "velox/type/Timestamp.h" #include "velox/vector/FlatVector.h" #include "velox/vector/VariantToVector.h" -#include "velox/type/Timestamp.h" - using namespace facebook::velox; namespace { @@ -133,6 +133,8 @@ TypePtr getScalarType(const ::substrait::Expression::Literal& literal) { return DATE(); case ::substrait::Expression_Literal::LiteralTypeCase::kTimestampTz: return TIMESTAMP(); + case ::substrait::Expression_Literal::LiteralTypeCase::kTimestamp: + return facebook::velox::functions::sparksql::TIMESTAMP_NTZ(); case ::substrait::Expression_Literal::LiteralTypeCase::kString: return VARCHAR(); case ::substrait::Expression_Literal::LiteralTypeCase::kVarChar: diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc b/cpp/velox/substrait/SubstraitToVeloxPlan.cc index adb7fc5f45b6..1c26ce64081c 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc @@ -24,6 +24,7 @@ #include "operators/plannodes/RowVectorStream.h" #include "velox/connectors/hive/HiveDataSink.h" #include "velox/exec/TableWriter.h" +#include "velox/functions/sparksql/types/TimestampNTZType.h" #include "velox/type/Type.h" #include "utils/ConfigExtractor.h" @@ -1496,6 +1497,17 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait:: auto baseSchema = ROW(std::move(names), std::move(types)); // The columns present in the table, if not available default to the baseSchema. auto tableSchema = splitInfo->tableSchema ? splitInfo->tableSchema : baseSchema; + if (tableSchema) { + auto tableNames = tableSchema->names(); + auto tableTypes = tableSchema->children(); + for (size_t i = 0; i < tableSchema->size(); i++) { + if (functions::sparksql::isTimestampNTZType(tableTypes[i])) { + // Spark's TimestampNTZ type is stored as TIMESTAMP in file. + tableTypes[i] = TIMESTAMP(); + } + } + tableSchema = ROW(std::move(tableNames), std::move(tableTypes)); + } connector::ConnectorTableHandlePtr tableHandle; auto remainingFilter = readRel.has_filter() ? exprConverter_->toVeloxExpr(readRel.filter(), baseSchema) : nullptr; diff --git a/ep/build-velox/src/get-velox.sh b/ep/build-velox/src/get-velox.sh index b6c73c9aa146..44dc1b0a17ae 100755 --- a/ep/build-velox/src/get-velox.sh +++ b/ep/build-velox/src/get-velox.sh @@ -17,8 +17,8 @@ set -exu CURRENT_DIR=$(cd "$(dirname "$BASH_SOURCE")"; pwd) -VELOX_REPO=https://github.com/IBM/velox.git -VELOX_BRANCH=dft-2026_03_15-iceberg +VELOX_REPO=https://github.com/rui-mo/velox.git +VELOX_BRANCH=ts_ntz_gluten VELOX_ENHANCED_BRANCH=ibm-2026_03_15 VELOX_HOME="" RUN_SETUP_SCRIPT=ON diff --git a/gluten-substrait/src/main/java/org/apache/gluten/substrait/expression/TimestampNTZLiteralNode.java b/gluten-substrait/src/main/java/org/apache/gluten/substrait/expression/TimestampNTZLiteralNode.java new file mode 100644 index 000000000000..1475378a4a3c --- /dev/null +++ b/gluten-substrait/src/main/java/org/apache/gluten/substrait/expression/TimestampNTZLiteralNode.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.substrait.expression; + +import org.apache.gluten.substrait.type.TimestampTypeNode; +import org.apache.gluten.substrait.type.TypeNode; + +import io.substrait.proto.Expression.Literal.Builder; + +public class TimestampNTZLiteralNode extends LiteralNodeWithValue { + public TimestampNTZLiteralNode(Long value) { + super(value, new TimestampTypeNode(true)); + } + + public TimestampNTZLiteralNode(Long value, TypeNode typeNode) { + super(value, typeNode); + } + + @Override + protected void updateLiteralBuilder(Builder literalBuilder, Long value) { + literalBuilder.setTimestamp(value); + } +} diff --git a/gluten-substrait/src/main/java/org/apache/gluten/substrait/type/TimestampNTZTypeNode.java b/gluten-substrait/src/main/java/org/apache/gluten/substrait/type/TimestampNTZTypeNode.java new file mode 100644 index 000000000000..83d27cfb09f2 --- /dev/null +++ b/gluten-substrait/src/main/java/org/apache/gluten/substrait/type/TimestampNTZTypeNode.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.substrait.type; + +import io.substrait.proto.Type; + +public class TimestampNTZTypeNode extends TypeNode { + + public TimestampNTZTypeNode(Boolean nullable) { + super(nullable); + } + + @Override + public Type toProtobuf() { + Type.Timestamp.Builder timestampBuilder = Type.Timestamp.newBuilder(); + if (nullable) { + timestampBuilder.setNullability(Type.Nullability.NULLABILITY_NULLABLE); + } else { + timestampBuilder.setNullability(Type.Nullability.NULLABILITY_REQUIRED); + } + + Type.Builder builder = Type.newBuilder(); + builder.setTimestamp(timestampBuilder.build()); + return builder.build(); + } +} diff --git a/gluten-substrait/src/main/java/org/apache/gluten/substrait/type/TypeBuilder.java b/gluten-substrait/src/main/java/org/apache/gluten/substrait/type/TypeBuilder.java index 28cb10be27d6..bd7e3566eb5f 100644 --- a/gluten-substrait/src/main/java/org/apache/gluten/substrait/type/TypeBuilder.java +++ b/gluten-substrait/src/main/java/org/apache/gluten/substrait/type/TypeBuilder.java @@ -81,6 +81,10 @@ public static TypeNode makeTimestamp(Boolean nullable) { return new TimestampTypeNode(nullable); } + public static TypeNode makeTimestampNTZ(Boolean nullable) { + return new TimestampNTZTypeNode(nullable); + } + public static TypeNode makeStruct(Boolean nullable, List types, List names) { return new StructNode(nullable, types, names); } diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ConverterUtils.scala b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ConverterUtils.scala index 6db1f188d8bc..6e9ef064d8be 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ConverterUtils.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ConverterUtils.scala @@ -160,6 +160,8 @@ object ConverterUtils extends Logging { (StringType, isNullable(substraitType.getString.getNullability)) case Type.KindCase.BINARY => (BinaryType, isNullable(substraitType.getBinary.getNullability)) + case Type.KindCase.TIMESTAMP => + (TimestampNTZType, isNullable(substraitType.getTimestamp.getNullability)) case Type.KindCase.TIMESTAMP_TZ => (TimestampType, isNullable(substraitType.getTimestampTz.getNullability)) case Type.KindCase.DATE => @@ -226,6 +228,8 @@ object ConverterUtils extends Logging { TypeBuilder.makeDecimal(nullable, precision, scale) case TimestampType => TypeBuilder.makeTimestamp(nullable) + case TimestampNTZType => + TypeBuilder.makeTimestampNTZ(nullable) case m: MapType => TypeBuilder.makeMap( nullable, @@ -399,6 +403,7 @@ object ConverterUtils extends Logging { case DoubleType => "fp64" case DateType => "date" case TimestampType => "ts" + case TimestampNTZType => "ts_ntz" case StringType => "str" case BinaryType => "vbin" case DecimalType() => diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala index 714821436584..d403926e485b 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala @@ -313,7 +313,7 @@ object Validators { .fallbackComplexExpressions() .fallbackByBackendSettings() .fallbackByUserOptions() - .fallbackByTimestampNTZ() +// .fallbackByTimestampNTZ() .fallbackByTestInjects() .build() }