Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.spark.util.Utils
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
import org.apache.hadoop.hive.ql.plan.TableDesc
import org.apache.hadoop.mapred.TextInputFormat
import org.apache.hadoop.mapred.{InputFormat, TextInputFormat}

import java.net.URI

Expand Down Expand Up @@ -76,17 +76,46 @@ case class HiveTableScanExecTransformer(
override def getPartitionWithReadFileFormats: Seq[(Partition, ReadFileFormat)] =
partitionWithReadFileFormats

override def getDistinctPartitionReadFileFormats: Set[ReadFileFormat] =
if (
relation.isPartitioned &&
basePrunedPartitions.exists(_.getInputFormatClass != tableDesc.getInputFileFormatClass)
) {
basePrunedPartitions.map {
partition => getReadFileFormat(HiveClientImpl.fromHivePartition(partition).storage)
}.toSet
} else {
// Only used for file format validation on the driver side. Must not trigger subquery execution.
override def getDistinctPartitionReadFileFormats: Set[ReadFileFormat] = {
if (!relation.isPartitioned) {
return Set(fileFormat)
}
// Single pass: use getInputFormatClass (cheap) to classify each partition.
// formatCache deduplicates by (InputFormatClass, Option[serdeClass]) so that
// HiveClientImpl.fromHivePartition is called at most once per distinct format combination.
// The serde is included in the key only for TextInputFormat, since its ReadFileFormat
// depends on both the InputFormat and the serde (e.g. JsonSerDe -> JsonReadFormat).
val tableInputFormatClass = tableDesc.getInputFileFormatClass
var hasTableFormatPartitions = false
val formatCache =
collection.mutable.Map[(Class[_ <: InputFormat[_, _]], Option[String]), ReadFileFormat]()
basePrunedPartitions.foreach {
partition =>
val cls = partition.getInputFormatClass
if (cls == tableInputFormatClass) {
hasTableFormatPartitions = true
} else {
val serdeKey =
if (TEXT_INPUT_FORMAT_CLASS.isAssignableFrom(cls)) {
Option(partition.getTPartition.getSd.getSerdeInfo.getSerializationLib)
} else {
None
}
formatCache.getOrElseUpdate(
(cls, serdeKey),
getReadFileFormat(HiveClientImpl.fromHivePartition(partition).storage))
}
}
val otherFormats = formatCache.values.toSet
if (otherFormats.isEmpty) {
Set(fileFormat)
} else if (hasTableFormatPartitions) {
otherFormats + fileFormat
} else {
otherFormats
}
}

override def getPartitionSchema: StructType = relation.tableMeta.partitionSchema

Expand All @@ -101,30 +130,42 @@ case class HiveTableScanExecTransformer(
@transient private lazy val hivePartitionConverter =
new HivePartitionConverter(session.sessionState.newHadoopConf(), session)

@transient private lazy val existsMixedInputFormat: Boolean =
prunedPartitions.exists(_.getInputFormatClass != tableDesc.getInputFileFormatClass)

@transient private lazy val partitionWithReadFileFormats: Seq[(Partition, ReadFileFormat)] =
if (!relation.isPartitioned) {
val tableLocation: URI = relation.tableMeta.storage.locationUri.getOrElse {
throw new UnsupportedOperationException("Table path not set.")
}

hivePartitionConverter.createFilePartition(tableLocation).map((_, fileFormat))
} else if (existsMixedInputFormat) {
} else {
// Optimized: use the same caching strategy as getDistinctPartitionReadFileFormats
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any duplicated code?

// to avoid redundant HiveClientImpl.fromHivePartition calls
val tableInputFormatClass = tableDesc.getInputFileFormatClass
val formatCache =
collection.mutable.Map[(Class[_ <: InputFormat[_, _]], Option[String]), ReadFileFormat]()

val readFileFormats = prunedPartitions.map {
partition => getReadFileFormat(HiveClientImpl.fromHivePartition(partition).storage)
partition =>
val cls = partition.getInputFormatClass
if (cls == tableInputFormatClass) {
fileFormat
} else {
val serdeKey = if (TEXT_INPUT_FORMAT_CLASS.isAssignableFrom(cls)) {
Option(partition.getTPartition.getSd.getSerdeInfo.getSerializationLib)
} else {
None
}
formatCache.getOrElseUpdate(
(cls, serdeKey),
getReadFileFormat(HiveClientImpl.fromHivePartition(partition).storage)
)
}
}

hivePartitionConverter.createFilePartition(
prunedPartitions,
relation.partitionCols.map(_.dataType),
readFileFormats)
} else {
val filePartitions = hivePartitionConverter
.createFilePartition(prunedPartitions, relation.partitionCols.map(_.dataType))

filePartitions.map((_, fileFormat))
}

@transient private lazy val partitions: Seq[Partition] = partitionWithReadFileFormats.unzip._1
Expand Down
Loading