From ad5f78145b8611a82b4dcbdd3db765db8e799252 Mon Sep 17 00:00:00 2001 From: beliefer Date: Fri, 20 Mar 2026 11:18:58 +0800 Subject: [PATCH] [GLUTEN-11797][CORE] Improve the performance of getDistinctPartitionReadFileFormats for HiveTableScanExecTransformer --- .../hive/HiveTableScanExecTransformer.scala | 81 ++++++++++++++----- 1 file changed, 61 insertions(+), 20 deletions(-) diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala index 0abb1a91f4d7..65345f9c0fd6 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala @@ -37,7 +37,7 @@ import org.apache.spark.util.Utils import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat import org.apache.hadoop.hive.ql.plan.TableDesc -import org.apache.hadoop.mapred.TextInputFormat +import org.apache.hadoop.mapred.{InputFormat, TextInputFormat} import java.net.URI @@ -76,17 +76,46 @@ case class HiveTableScanExecTransformer( override def getPartitionWithReadFileFormats: Seq[(Partition, ReadFileFormat)] = partitionWithReadFileFormats - override def getDistinctPartitionReadFileFormats: Set[ReadFileFormat] = - if ( - relation.isPartitioned && - basePrunedPartitions.exists(_.getInputFormatClass != tableDesc.getInputFileFormatClass) - ) { - basePrunedPartitions.map { - partition => getReadFileFormat(HiveClientImpl.fromHivePartition(partition).storage) - }.toSet - } else { + // Only used for file format validation on the driver side. Must not trigger subquery execution. + override def getDistinctPartitionReadFileFormats: Set[ReadFileFormat] = { + if (!relation.isPartitioned) { + return Set(fileFormat) + } + // Single pass: use getInputFormatClass (cheap) to classify each partition. + // formatCache deduplicates by (InputFormatClass, Option[serdeClass]) so that + // HiveClientImpl.fromHivePartition is called at most once per distinct format combination. + // The serde is included in the key only for TextInputFormat, since its ReadFileFormat + // depends on both the InputFormat and the serde (e.g. JsonSerDe -> JsonReadFormat). + val tableInputFormatClass = tableDesc.getInputFileFormatClass + var hasTableFormatPartitions = false + val formatCache = + collection.mutable.Map[(Class[_ <: InputFormat[_, _]], Option[String]), ReadFileFormat]() + basePrunedPartitions.foreach { + partition => + val cls = partition.getInputFormatClass + if (cls == tableInputFormatClass) { + hasTableFormatPartitions = true + } else { + val serdeKey = + if (TEXT_INPUT_FORMAT_CLASS.isAssignableFrom(cls)) { + Option(partition.getTPartition.getSd.getSerdeInfo.getSerializationLib) + } else { + None + } + formatCache.getOrElseUpdate( + (cls, serdeKey), + getReadFileFormat(HiveClientImpl.fromHivePartition(partition).storage)) + } + } + val otherFormats = formatCache.values.toSet + if (otherFormats.isEmpty) { Set(fileFormat) + } else if (hasTableFormatPartitions) { + otherFormats + fileFormat + } else { + otherFormats } + } override def getPartitionSchema: StructType = relation.tableMeta.partitionSchema @@ -101,9 +130,6 @@ case class HiveTableScanExecTransformer( @transient private lazy val hivePartitionConverter = new HivePartitionConverter(session.sessionState.newHadoopConf(), session) - @transient private lazy val existsMixedInputFormat: Boolean = - prunedPartitions.exists(_.getInputFormatClass != tableDesc.getInputFileFormatClass) - @transient private lazy val partitionWithReadFileFormats: Seq[(Partition, ReadFileFormat)] = if (!relation.isPartitioned) { val tableLocation: URI = relation.tableMeta.storage.locationUri.getOrElse { @@ -111,20 +137,35 @@ case class HiveTableScanExecTransformer( } hivePartitionConverter.createFilePartition(tableLocation).map((_, fileFormat)) - } else if (existsMixedInputFormat) { + } else { + // Optimized: use the same caching strategy as getDistinctPartitionReadFileFormats + // to avoid redundant HiveClientImpl.fromHivePartition calls + val tableInputFormatClass = tableDesc.getInputFileFormatClass + val formatCache = + collection.mutable.Map[(Class[_ <: InputFormat[_, _]], Option[String]), ReadFileFormat]() + val readFileFormats = prunedPartitions.map { - partition => getReadFileFormat(HiveClientImpl.fromHivePartition(partition).storage) + partition => + val cls = partition.getInputFormatClass + if (cls == tableInputFormatClass) { + fileFormat + } else { + val serdeKey = if (TEXT_INPUT_FORMAT_CLASS.isAssignableFrom(cls)) { + Option(partition.getTPartition.getSd.getSerdeInfo.getSerializationLib) + } else { + None + } + formatCache.getOrElseUpdate( + (cls, serdeKey), + getReadFileFormat(HiveClientImpl.fromHivePartition(partition).storage) + ) + } } hivePartitionConverter.createFilePartition( prunedPartitions, relation.partitionCols.map(_.dataType), readFileFormats) - } else { - val filePartitions = hivePartitionConverter - .createFilePartition(prunedPartitions, relation.partitionCols.map(_.dataType)) - - filePartitions.map((_, fileFormat)) } @transient private lazy val partitions: Seq[Partition] = partitionWithReadFileFormats.unzip._1