From 1a4a1d37dbd93337362c256f5ee07aa64f0f47dc Mon Sep 17 00:00:00 2001 From: Partho Sarthi Date: Mon, 27 Apr 2026 09:32:03 -0700 Subject: [PATCH 1/4] Add GPU task metric aggregations at stage / SQL / app levels MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Emits three new long-format CSVs covering the 26 GPU task accumulators from GpuTaskMetrics.scala (gpu_stage_/sql_/app_level_aggregated_task_metrics.csv). Auto-discovery by name (gpu*, perfio.s3.*, multithreadReaderMaxParallelism); units derived from the name (Time/Wait→ms, Bytes→bytes, else count); SQL/app levels re-sum stage rows. Skips emission when no GPU metrics are present. Job level intentionally skipped (each Spark action is a job — would either duplicate the SQL row or be meaningless). Fixes #2020 Co-Authored-By: Claude Opus 4.7 (1M context) Signed-off-by: Partho Sarthi --- .../configs/reports/coreRawMetricsReport.yaml | 100 +++++++++++ .../tool/analysis/AggRawMetricsResult.scala | 7 +- .../analysis/AppSparkMetricsAggTrait.scala | 11 +- .../analysis/AppSparkMetricsAnalyzer.scala | 161 ++++++++++++++++++ .../profiling/ApplicationSummaryInfo.scala | 5 +- .../profiling/ProfileClassWarehouse.scala | 127 ++++++++++++++ .../rapids/tool/profiling/Profiler.scala | 14 +- .../rapids/tool/views/OutHeaderRegistry.scala | 8 +- .../tool/views/QualRawReportGenerator.scala | 10 +- .../rapids/tool/views/RawMetricProfView.scala | 12 +- .../spark/rapids/tool/views/package.scala | 8 +- .../sql/rapids/tool/store/AccumMetaRef.scala | 7 +- .../rapids/tool/profiling/AnalysisSuite.scala | 101 +++++++++++ 13 files changed, 557 insertions(+), 14 deletions(-) diff --git a/core/src/main/resources/configs/reports/coreRawMetricsReport.yaml b/core/src/main/resources/configs/reports/coreRawMetricsReport.yaml index d9e4b1b6a..2ec512ed0 100644 --- a/core/src/main/resources/configs/reports/coreRawMetricsReport.yaml +++ b/core/src/main/resources/configs/reports/coreRawMetricsReport.yaml @@ -854,6 +854,106 @@ reportDefinitions: dataType: Long description: >- TBD + # GPU task metric aggregations (stage / sql / app) + - label: coreRawGpuStageLevelAggregatedTaskMetricsCSV + description: >- + GPU task metric aggregations at stage level. Long-format: one row per + (stageId, metricName). Auto-discovered from accumulator names starting + with gpu, perfio.s3., or equal to multithreadReaderMaxParallelism. + fileName: gpu_stage_level_aggregated_task_metrics.csv + scope: per-app + columns: + - name: stageId + dataType: Int + description: >- + Stage identifier. + - name: numTasks + dataType: Int + description: >- + Number of tasks in the stage (includes task-attempt duplicates). + - name: metricName + dataType: String + description: >- + GPU accumulator name (e.g. gpuTime, gpuMaxDeviceMemoryBytes). + - name: unit + dataType: String + description: >- + ms for time/wait metrics, bytes for byte metrics, count otherwise. + - name: sum + dataType: Long + description: >- + Total across tasks in the stage. Empty for max-aggregated metrics. + - name: max + dataType: Long + description: >- + Peak per-task value in the stage. + - name: avg + dataType: Long + description: >- + Rolling per-task average. Empty for max-aggregated metrics. + - label: coreRawGpuSqlLevelAggregatedTaskMetricsCSV + description: >- + GPU task metric aggregations at SQL level. Rolls up stage-level GPU + rows across stages in the SQL (via sqlIdToStages). numTasks is + deliberately omitted: it would be a constant per SQL across every + metric row (already in sql_level_aggregated_task_metrics.csv). + fileName: gpu_sql_level_aggregated_task_metrics.csv + scope: per-app + columns: + - name: sqlId + dataType: Long + description: >- + SQL identifier. + - name: metricName + dataType: String + description: >- + GPU accumulator name. + - name: unit + dataType: String + description: >- + ms / bytes / count. + - name: sum + dataType: Long + description: >- + Sum of stage sums. Empty for max-aggregated metrics. + - name: max + dataType: Long + description: >- + Max of stage maxes. + - name: avg + dataType: Long + description: >- + Task-weighted average across stages. Empty for max-aggregated + metrics. + - label: coreRawGpuAppLevelAggregatedTaskMetricsCSV + description: >- + GPU task metric aggregations at application level. One row per + metricName for the whole app (rolls up all stage-level GPU rows). + fileName: gpu_app_level_aggregated_task_metrics.csv + scope: per-app + columns: + - name: metricName + dataType: String + description: >- + GPU accumulator name. + - name: unit + dataType: String + description: >- + ms / bytes / count. + - name: sum + dataType: Long + description: >- + Sum across all stages in the app. Empty for max-aggregated + metrics. + - name: max + dataType: Long + description: >- + Overall peak reading any task produced during the run. + - name: avg + dataType: Long + description: >- + Task-weighted average across all stages. Empty for + max-aggregated metrics. # AccumProfileResults - label: coreRawStageLevelAllMetricsCSV description: >- diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AggRawMetricsResult.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AggRawMetricsResult.scala index 2584d702f..11bddfeb6 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AggRawMetricsResult.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AggRawMetricsResult.scala @@ -16,7 +16,7 @@ package com.nvidia.spark.rapids.tool.analysis -import com.nvidia.spark.rapids.tool.profiling.{IOAnalysisProfileResult, JobAggTaskMetricsProfileResult, ShuffleSkewProfileResult, SQLDurationExecutorTimeProfileResult, SQLMaxTaskInputSizes, SQLTaskAggMetricsProfileResult, StageAggTaskMetricsProfileResult, StageDiagnosticResult} +import com.nvidia.spark.rapids.tool.profiling.{AppAggGpuMetricsProfileResult, IOAnalysisProfileResult, JobAggTaskMetricsProfileResult, ShuffleSkewProfileResult, SQLAggGpuMetricsProfileResult, SQLDurationExecutorTimeProfileResult, SQLMaxTaskInputSizes, SQLTaskAggMetricsProfileResult, StageAggGpuMetricsProfileResult, StageAggTaskMetricsProfileResult, StageDiagnosticResult} /** * The result of the aggregation of the raw metrics. It contains the aggregated metrics for an @@ -42,4 +42,7 @@ case class AggRawMetricsResult( ioAggs: Seq[IOAnalysisProfileResult], sqlDurAggs: Seq[SQLDurationExecutorTimeProfileResult], maxTaskInputSizes: Seq[SQLMaxTaskInputSizes], - stageDiagnostics: Seq[StageDiagnosticResult]) + stageDiagnostics: Seq[StageDiagnosticResult], + gpuStageAggs: Seq[StageAggGpuMetricsProfileResult] = Seq.empty, + gpuSqlAggs: Seq[SQLAggGpuMetricsProfileResult] = Seq.empty, + gpuAppAggs: Seq[AppAggGpuMetricsProfileResult] = Seq.empty) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAggTrait.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAggTrait.scala index cec8cc9b8..23645e4f9 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAggTrait.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAggTrait.scala @@ -38,6 +38,7 @@ trait AppSparkMetricsAggTrait extends AppIndexMapperTrait { sqlAnalyzer: Option[AppSQLPlanAnalyzer] = None): AggRawMetricsResult = { val analysisObj = AppSparkMetricsAnalyzer(app) val sqlMetricsAgg = analysisObj.aggregateSparkMetricsBySql(index) + val gpuStageRows = analysisObj.aggregateGpuMetricsByStage(index) AggRawMetricsResult( analysisObj.aggregateSparkMetricsByJob(index), analysisObj.aggregateSparkMetricsByStage(index), @@ -46,7 +47,10 @@ trait AppSparkMetricsAggTrait extends AppIndexMapperTrait { analysisObj.aggregateIOMetricsBySql(sqlMetricsAgg), analysisObj.aggregateDurationAndCPUTimeBySql(index), Seq(analysisObj.maxTaskInputSizeBytesPerSQL(index)), - analysisObj.aggregateDiagnosticMetricsByStage(index, sqlAnalyzer)) + analysisObj.aggregateDiagnosticMetricsByStage(index, sqlAnalyzer), + gpuStageRows, + analysisObj.aggregateGpuMetricsBySql(index, gpuStageRows), + analysisObj.aggregateGpuMetricsByApp(index, gpuStageRows)) } /** @@ -68,7 +72,10 @@ trait AppSparkMetricsAggTrait extends AppIndexMapperTrait { agg1.ioAggs ++ agg2.ioAggs, agg1.sqlDurAggs ++ agg2.sqlDurAggs, agg1.maxTaskInputSizes ++ agg2.maxTaskInputSizes, - agg1.stageDiagnostics ++ agg2.stageDiagnostics) + agg1.stageDiagnostics ++ agg2.stageDiagnostics, + agg1.gpuStageAggs ++ agg2.gpuStageAggs, + agg1.gpuSqlAggs ++ agg2.gpuSqlAggs, + agg1.gpuAppAggs ++ agg2.gpuAppAggs) } } } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAnalyzer.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAnalyzer.scala index ee7332cb6..31d3bdca0 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAnalyzer.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAnalyzer.scala @@ -427,6 +427,167 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) { stageLevelSparkMetrics(index).put(sm.stageInfo.stageId, rowToStore) } } + + // --------------------------------------------------------------------------- + // GPU task metric aggregations (Stage / SQL / App) + // --------------------------------------------------------------------------- + // + // Discovery convention: an accumulator is a GPU task metric if its name + // - starts with "gpu", OR + // - starts with "perfio.s3.", OR + // - equals "multithreadReaderMaxParallelism". + // Unit convention (from name): contains Time|Wait → ms (raw ns / 1e6); + // contains Bytes → bytes; otherwise → count. + // Max-aggregated metrics are discriminated via AccumMetaRef.isAggregateByMax; + // for those, sum and avg are empty and only max is meaningful. + + private def isGpuMetric(name: String): Boolean = { + name.startsWith("gpu") || + name.startsWith("perfio.s3.") || + name == "multithreadReaderMaxParallelism" + } + + private def unitForMetric(name: String): String = { + if (name.contains("Time") || name.contains("Wait")) "ms" + else if (name.contains("Bytes")) "bytes" + else "count" + } + + private def convertValue(name: String, raw: Long): Long = { + if (name.contains("Time") || name.contains("Wait")) raw / 1000000L else raw + } + + /** + * Aggregate GPU task accumulators by stage. Emits one row per (stageId, + * metricName). Returns Seq.empty when the app has no GPU metrics, which + * upstream uses to suppress CSV generation. + */ + def aggregateGpuMetricsByStage(index: Int): Seq[StageAggGpuMetricsProfileResult] = { + val gpuAccums = app.accumManager.accumInfoMap.values.filter { ai => + isGpuMetric(ai.infoRef.getName()) + }.toSeq + if (gpuAccums.isEmpty) { + return Seq.empty + } + val stageCache = stageLevelSparkMetrics(index) + val rows = scala.collection.mutable.ArrayBuffer[StageAggGpuMetricsProfileResult]() + gpuAccums.foreach { ai => + val name = ai.infoRef.getName() + val unit = unitForMetric(name) + val isMax = ai.infoRef.isAggregateByMax + ai.getStageIds.foreach { stageId => + ai.calculateAccStatsForStage(stageId).foreach { stats => + val numTasks = stageCache.get(stageId).map(_.numTasks).getOrElse(0) + val (sum, max, avg) = if (isMax) { + (None: Option[Long], + Some(convertValue(name, stats.max)), + None: Option[Long]) + } else { + (Some(convertValue(name, stats.total)), + Some(convertValue(name, stats.max)), + Some(convertValue(name, stats.med))) + } + // Skip rows carrying no signal (both sum and max zero/absent). + val zeroSum = sum.forall(_ == 0L) + val zeroMax = max.forall(_ == 0L) + if (!(zeroSum && zeroMax)) { + rows += StageAggGpuMetricsProfileResult( + stageId = stageId, + numTasks = numTasks, + metricName = name, + unit = unit, + sum = sum, + max = max, + avg = avg) + } + } + } + } + rows.toSeq.sortBy(r => (r.stageId, r.metricName)) + } + + /** + * Rollup helper: groups stage-level GPU rows by metric name and reduces to + * (unit, sum, max, avg). sum is Σ stage.sum (None for max metrics); max is + * max stage.max; avg is task-weighted Σ(stage.avg * stage.numTasks) + * / Σ stage.numTasks over the stages that recorded the metric. numTasks is + * intentionally not propagated — see SQLAggGpuMetricsProfileResult / + * AppAggGpuMetricsProfileResult docstrings. + */ + private def rollupGpuRows( + rows: Seq[StageAggGpuMetricsProfileResult] + ): Seq[(String, String, Option[Long], Option[Long], Option[Long])] = { + rows.groupBy(_.metricName).map { case (metricName, group) => + val unit = group.head.unit + val sumOpt: Option[Long] = { + val xs = group.flatMap(_.sum) + if (xs.isEmpty) None else Some(xs.sum) + } + val maxOpt: Option[Long] = { + val xs = group.flatMap(_.max) + if (xs.isEmpty) None else Some(xs.max) + } + val avgOpt: Option[Long] = { + val weighted = group.flatMap { r => r.avg.map(a => (a, r.numTasks)) } + val weightTasks = weighted.map(_._2).sum + if (weighted.isEmpty || weightTasks == 0) { + None + } else { + Some(weighted.map { case (a, n) => a * n }.sum / weightTasks) + } + } + (metricName, unit, sumOpt, maxOpt, avgOpt) + }.toSeq + } + + /** + * Aggregate GPU task metrics by SQL. Rolls up stage-level rows using + * app.sqlIdToStages. One row per (sqlId, metricName). + */ + def aggregateGpuMetricsBySql( + index: Int, + stageRows: Seq[StageAggGpuMetricsProfileResult] + ): Seq[SQLAggGpuMetricsProfileResult] = { + if (stageRows.isEmpty) { + return Seq.empty + } + val stageMap: Map[Int, Seq[StageAggGpuMetricsProfileResult]] = + stageRows.groupBy(_.stageId) + app.sqlIdToStages.flatMap { case (sqlId, stageIds) => + val rowsForSql = stageIds.flatMap(stageMap.getOrElse(_, Seq.empty)) + rollupGpuRows(rowsForSql).map { case (metric, unit, sum, max, avg) => + SQLAggGpuMetricsProfileResult( + sqlId = sqlId, + metricName = metric, + unit = unit, + sum = sum, + max = max, + avg = avg) + } + }.toSeq.sortBy(r => (r.sqlId, r.metricName)) + } + + /** + * Aggregate GPU task metrics across the whole application. One row per + * metricName. For max-aggregated metrics this gives the peak reading any + * task produced during the run. + */ + def aggregateGpuMetricsByApp( + index: Int, + stageRows: Seq[StageAggGpuMetricsProfileResult] + ): Seq[AppAggGpuMetricsProfileResult] = { + if (stageRows.isEmpty) { + return Seq.empty + } + rollupGpuRows(stageRows).map { case (metric, unit, sum, max, avg) => + AppAggGpuMetricsProfileResult( + metricName = metric, + unit = unit, + sum = sum, + max = max, + avg = avg) + }.sortBy(_.metricName) + } } /** diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationSummaryInfo.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationSummaryInfo.scala index 783e2fc50..a80257f3d 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationSummaryInfo.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationSummaryInfo.scala @@ -54,7 +54,10 @@ case class ApplicationSummaryInfo( sqlCleanedAlignedIds: Seq[SQLCleanAndAlignIdsProfileResult], sparkRapidsBuildInfo: Seq[SparkRapidsBuildInfoEvent], writeOpsInfo: Seq[WriteOpProfileResult], - sqlPlanInfo: Seq[SQLPlanInfoProfileResult]) + sqlPlanInfo: Seq[SQLPlanInfoProfileResult], + gpuStageAggMetrics: Seq[StageAggGpuMetricsProfileResult] = Seq.empty, + gpuSqlAggMetrics: Seq[SQLAggGpuMetricsProfileResult] = Seq.empty, + gpuAppAggMetrics: Seq[AppAggGpuMetricsProfileResult] = Seq.empty) trait AppInfoPropertyGetter { // returns all the properties (i.e., spark) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileClassWarehouse.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileClassWarehouse.scala index 7bde04197..7beddd443 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileClassWarehouse.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileClassWarehouse.scala @@ -1516,3 +1516,130 @@ object SparkRapidsOomExceptions { object UnixExitCode { val FORCE_KILLED = 137 } + +/** + * GPU task metric aggregation at stage level — one row per (stageId, metricName). + * Long/transposed schema: unit and sum/max/avg vary by metric. Empty `sum` / `avg` + * denote max-aggregated metrics (e.g. `gpuMaxDeviceMemoryBytes`). + */ +case class StageAggGpuMetricsProfileResult( + stageId: Int, + numTasks: Int, + metricName: String, + unit: String, + sum: Option[Long], + max: Option[Long], + avg: Option[Long]) extends ProfileResult { + + override def outputHeaders: Array[String] = { + OutHeaderRegistry.outputHeaders("StageAggGpuMetricsProfileResult") + } + + override def convertToSeq(): Array[String] = { + Array( + stageId.toString, + numTasks.toString, + metricName, + unit, + sum.map(_.toString).getOrElse(""), + max.map(_.toString).getOrElse(""), + avg.map(_.toString).getOrElse("")) + } + + override def convertToCSVSeq(): Array[String] = convertToSeq() + + /** + * Combines two rows for the same (stageId, metricName) across stage attempts. + * Mirrors the policy used by StageAggTaskMetricsProfileResult: sum += sum, + * max = max(max), avg = (a+b)/2, numTasks += numTasks. + */ + def aggregateStageProfileMetric( + other: StageAggGpuMetricsProfileResult): StageAggGpuMetricsProfileResult = { + def addOpt(a: Option[Long], b: Option[Long]): Option[Long] = (a, b) match { + case (Some(x), Some(y)) => Some(x + y) + case (Some(x), None) => Some(x) + case (None, Some(y)) => Some(y) + case _ => None + } + def maxOpt(a: Option[Long], b: Option[Long]): Option[Long] = (a, b) match { + case (Some(x), Some(y)) => Some(Math.max(x, y)) + case (Some(x), None) => Some(x) + case (None, Some(y)) => Some(y) + case _ => None + } + def avgOpt(a: Option[Long], b: Option[Long]): Option[Long] = (a, b) match { + case (Some(x), Some(y)) => Some((x + y) / 2) + case (Some(x), None) => Some(x) + case (None, Some(y)) => Some(y) + case _ => None + } + StageAggGpuMetricsProfileResult( + stageId = this.stageId, + numTasks = this.numTasks + other.numTasks, + metricName = this.metricName, + unit = this.unit, + sum = addOpt(this.sum, other.sum), + max = maxOpt(this.max, other.max), + avg = avgOpt(this.avg, other.avg)) + } +} + +/** + * GPU task metric aggregation at SQL level — one row per (sqlId, metricName). + * Rolled up from stage-level rows: sum = Σ stage.sum, max = max stage.max, + * avg = task-weighted average over stage.avg. numTasks is intentionally not + * carried — it would be a constant per SQL across every metric row (the non-GPU + * sql_level_aggregated_task_metrics.csv already has it once per SQL). + */ +case class SQLAggGpuMetricsProfileResult( + sqlId: Long, + metricName: String, + unit: String, + sum: Option[Long], + max: Option[Long], + avg: Option[Long]) extends ProfileResult { + + override def outputHeaders: Array[String] = { + OutHeaderRegistry.outputHeaders("SQLAggGpuMetricsProfileResult") + } + + override def convertToSeq(): Array[String] = { + Array( + sqlId.toString, + metricName, + unit, + sum.map(_.toString).getOrElse(""), + max.map(_.toString).getOrElse(""), + avg.map(_.toString).getOrElse("")) + } + + override def convertToCSVSeq(): Array[String] = convertToSeq() +} + +/** + * GPU task metric aggregation at app level — one row per metricName for the + * whole application. Same rollup rules as SQL-level. numTasks intentionally + * omitted (would be constant per app and is available elsewhere). + */ +case class AppAggGpuMetricsProfileResult( + metricName: String, + unit: String, + sum: Option[Long], + max: Option[Long], + avg: Option[Long]) extends ProfileResult { + + override def outputHeaders: Array[String] = { + OutHeaderRegistry.outputHeaders("AppAggGpuMetricsProfileResult") + } + + override def convertToSeq(): Array[String] = { + Array( + metricName, + unit, + sum.map(_.toString).getOrElse(""), + max.map(_.toString).getOrElse(""), + avg.map(_.toString).getOrElse("")) + } + + override def convertToCSVSeq(): Array[String] = convertToSeq() +} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala index 1534fbf2b..ee4a83f79 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala @@ -337,7 +337,10 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea sqlCleanedAlignedIds = sqlIdAlign, sparkRapidsBuildInfo = collect.getSparkRapidsInfo, writeOpsInfo = collect.getWriteOperationInfo, - sqlPlanInfo = collect.getSQLPlanInfoTruncated) + sqlPlanInfo = collect.getSQLPlanInfoTruncated, + gpuStageAggMetrics = analysis.gpuStageAggs, + gpuSqlAggMetrics = analysis.gpuSqlAggs, + gpuAppAggMetrics = analysis.gpuAppAggs) (appInfoSummary, DiagnosticSummaryInfo(analysis.stageDiagnostics, collect.getIODiagnosticMetrics)) } @@ -405,6 +408,15 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea profileOutputWriter.writeCSVTable(JOB_AGG_LABEL, app.jobAggMetrics) profileOutputWriter.writeCSVTable(STAGE_AGG_LABEL, app.stageAggMetrics) profileOutputWriter.writeCSVTable(SQL_AGG_LABEL, app.sqlTaskAggMetrics) + if (app.gpuStageAggMetrics.nonEmpty) { + profileOutputWriter.writeCSVTable(GPU_STAGE_AGG_LABEL, app.gpuStageAggMetrics) + } + if (app.gpuSqlAggMetrics.nonEmpty) { + profileOutputWriter.writeCSVTable(GPU_SQL_AGG_LABEL, app.gpuSqlAggMetrics) + } + if (app.gpuAppAggMetrics.nonEmpty) { + profileOutputWriter.writeCSVTable(GPU_APP_AGG_LABEL, app.gpuAppAggMetrics) + } profileOutputWriter.writeCSVTable(IO_LABEL, app.ioMetrics) profileOutputWriter.writeCSVTable(SQL_DUR_LABEL, app.durAndCpuMet) // writeOps are generated in only CSV format diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/views/OutHeaderRegistry.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/views/OutHeaderRegistry.scala index 795de4c6a..eeee9cf8e 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/views/OutHeaderRegistry.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/views/OutHeaderRegistry.scala @@ -309,6 +309,12 @@ object OutHeaderRegistry { "WriteOpProfileResult" -> Array("sqlID", "sqlPlanVersion", "nodeId", "fromFinalPlan", "execName", "format", "location", "tableName", "dataBase", "outputColumns", "writeMode", - "partitionColumns", "compressionOption", "fullDescription") + "partitionColumns", "compressionOption", "fullDescription"), + "StageAggGpuMetricsProfileResult" -> + Array("stageId", "numTasks", "metricName", "unit", "sum", "max", "avg"), + "SQLAggGpuMetricsProfileResult" -> + Array("sqlId", "metricName", "unit", "sum", "max", "avg"), + "AppAggGpuMetricsProfileResult" -> + Array("metricName", "unit", "sum", "max", "avg") ) // End of outputHeaders map initialization } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/views/QualRawReportGenerator.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/views/QualRawReportGenerator.scala index 604fbfa7b..97142636f 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/views/QualRawReportGenerator.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/views/QualRawReportGenerator.scala @@ -38,7 +38,10 @@ object QualRawReportGenerator extends Logging { AggMetricsResultSorter.sortIO(aggRawResult.ioAggs), AggMetricsResultSorter.sortSqlDurationAgg(aggRawResult.sqlDurAggs), aggRawResult.maxTaskInputSizes, - AggMetricsResultSorter.sortStageDiagnostics(aggRawResult.stageDiagnostics)) + AggMetricsResultSorter.sortStageDiagnostics(aggRawResult.stageDiagnostics), + aggRawResult.gpuStageAggs.sortBy(r => (r.stageId, r.metricName)), + aggRawResult.gpuSqlAggs.sortBy(r => (r.sqlId, r.metricName)), + aggRawResult.gpuAppAggs.sortBy(_.metricName)) Map( STAGE_AGG_LABEL -> sortedRes.stageAggs, JOB_AGG_LABEL -> sortedRes.jobAggs, @@ -46,7 +49,10 @@ object QualRawReportGenerator extends Logging { SQL_AGG_LABEL -> sortedRes.sqlAggs, IO_LABEL -> sortedRes.ioAggs, SQL_DUR_LABEL -> sortedRes.sqlDurAggs, - STAGE_DIAGNOSTICS_LABEL -> sortedRes.stageDiagnostics) + STAGE_DIAGNOSTICS_LABEL -> sortedRes.stageDiagnostics, + GPU_STAGE_AGG_LABEL -> sortedRes.gpuStageAggs, + GPU_SQL_AGG_LABEL -> sortedRes.gpuSqlAggs, + GPU_APP_AGG_LABEL -> sortedRes.gpuAppAggs) } private def generateSQLProcessingView( diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/views/RawMetricProfView.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/views/RawMetricProfView.scala index 2d10c8f36..d9de41225 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/views/RawMetricProfView.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/views/RawMetricProfView.scala @@ -17,7 +17,7 @@ package com.nvidia.spark.rapids.tool.views import com.nvidia.spark.rapids.tool.analysis.ProfSparkMetricsAggregator -import com.nvidia.spark.rapids.tool.profiling.{IOAnalysisProfileResult, JobAggTaskMetricsProfileResult, ShuffleSkewProfileResult, SQLDurationExecutorTimeProfileResult, SQLMaxTaskInputSizes, SQLTaskAggMetricsProfileResult, StageAggTaskMetricsProfileResult, StageDiagnosticResult} +import com.nvidia.spark.rapids.tool.profiling.{AppAggGpuMetricsProfileResult, IOAnalysisProfileResult, JobAggTaskMetricsProfileResult, ShuffleSkewProfileResult, SQLAggGpuMetricsProfileResult, SQLDurationExecutorTimeProfileResult, SQLMaxTaskInputSizes, SQLTaskAggMetricsProfileResult, StageAggGpuMetricsProfileResult, StageAggTaskMetricsProfileResult, StageDiagnosticResult} import org.apache.spark.sql.rapids.tool.profiling.ApplicationInfo @@ -31,7 +31,10 @@ case class ProfilerAggregatedView( ioAggs: Seq[IOAnalysisProfileResult], sqlDurAggs: Seq[SQLDurationExecutorTimeProfileResult], maxTaskInputSizes: Seq[SQLMaxTaskInputSizes], - stageDiagnostics: Seq[StageDiagnosticResult]) + stageDiagnostics: Seq[StageDiagnosticResult], + gpuStageAggs: Seq[StageAggGpuMetricsProfileResult] = Seq.empty, + gpuSqlAggs: Seq[SQLAggGpuMetricsProfileResult] = Seq.empty, + gpuAppAggs: Seq[AppAggGpuMetricsProfileResult] = Seq.empty) object RawMetricProfilerView { def getAggMetrics(apps: Seq[ApplicationInfo]): ProfilerAggregatedView = { @@ -44,6 +47,9 @@ object RawMetricProfilerView { AggMetricsResultSorter.sortIO(aggMetricsResults.ioAggs), AggMetricsResultSorter.sortSqlDurationAgg(aggMetricsResults.sqlDurAggs), aggMetricsResults.maxTaskInputSizes, - AggMetricsResultSorter.sortStageDiagnostics(aggMetricsResults.stageDiagnostics)) + AggMetricsResultSorter.sortStageDiagnostics(aggMetricsResults.stageDiagnostics), + aggMetricsResults.gpuStageAggs.sortBy(r => (r.stageId, r.metricName)), + aggMetricsResults.gpuSqlAggs.sortBy(r => (r.sqlId, r.metricName)), + aggMetricsResults.gpuAppAggs.sortBy(_.metricName)) } } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/views/package.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/views/package.scala index bab4b8a2b..bc5f1461b 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/views/package.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/views/package.scala @@ -30,6 +30,9 @@ package object views { val SQL_MAX_INPUT_SIZE = "SQL Max Task Input Size" val STAGE_DIAGNOSTICS_LABEL = "Stage Level Diagnostic Metrics" val CLUSTER_INFORMATION_LABEL = "Cluster Information" + val GPU_STAGE_AGG_LABEL = "GPU stage level aggregated task metrics" + val GPU_SQL_AGG_LABEL = "GPU SQL level aggregated task metrics" + val GPU_APP_AGG_LABEL = "GPU app level aggregated task metrics" val AGG_DESCRIPTION = Map( STAGE_AGG_LABEL -> "Stage metrics", @@ -38,6 +41,9 @@ package object views { IO_LABEL -> "IO Metrics per SQL", SQL_DUR_LABEL -> "Total duration and CPUTime per SQL", TASK_SHUFFLE_SKEW -> - "(When task's Shuffle Read Size > 3 * Avg Stage-level size)" + "(When task's Shuffle Read Size > 3 * Avg Stage-level size)", + GPU_STAGE_AGG_LABEL -> "GPU task metrics per stage", + GPU_SQL_AGG_LABEL -> "GPU task metrics per SQL", + GPU_APP_AGG_LABEL -> "GPU task metrics per application" ) } diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/AccumMetaRef.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/AccumMetaRef.scala index 4df1ea64b..710884b28 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/AccumMetaRef.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/AccumMetaRef.scala @@ -45,7 +45,12 @@ object AccumMetaRef { "gpuMaxPageableMemoryBytes", "gpuMaxDeviceMemoryBytes", "gpuMaxHostMemoryBytes", - "gpuMaxPinnedMemoryBytes" + "gpuMaxPinnedMemoryBytes", + "gpuMaxDiskMemoryBytes", + "gpuMaxTaskFootprint", + "gpuOnGpuTasksWaitingGPUMaxCount", + "gpuMaxConcurrentGpuTasks", + "multithreadReaderMaxParallelism" ) val EMPTY_ACCUM_META_REF: AccumMetaRef = new AccumMetaRef(0L, AccumNameRef.EMPTY_ACC_NAME_REF) diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AnalysisSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AnalysisSuite.scala index b7115c0ec..8d9d3df25 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AnalysisSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AnalysisSuite.scala @@ -401,4 +401,105 @@ class AnalysisSuite extends AnyFunSuite { val dataSourceResults = ProfDataSourceView.getRawView(apps.toSeq) assert(dataSourceResults.exists(_.scan_time > 0)) } + + // --------------------------------------------------------------------------- + // GPU task metric aggregations (stage / SQL / app) + // --------------------------------------------------------------------------- + + test("GPU metric aggregation: stage / sql / app rows produced for GPU log") { + val logs = Array(s"$logDir/gpu_oom_eventlog.zstd") + val apps = ToolTestUtils.processProfileApps(logs, sparkSession) + val agg = RawMetricProfilerView.getAggMetrics(apps.toSeq) + assert(agg.gpuStageAggs.nonEmpty, "expected stage-level GPU rows") + assert(agg.gpuSqlAggs.nonEmpty, "expected SQL-level GPU rows") + assert(agg.gpuAppAggs.nonEmpty, "expected app-level GPU rows") + + // Discovery: only gpu* / perfio.s3.* / multithreadReaderMaxParallelism names. + agg.gpuStageAggs.foreach { row => + val n = row.metricName + assert(n.startsWith("gpu") || n.startsWith("perfio.s3.") || + n == "multithreadReaderMaxParallelism", s"unexpected metric name: $n") + } + + // Unit convention is internally consistent. + agg.gpuStageAggs.foreach { row => + val expected = + if (row.metricName.contains("Time") || row.metricName.contains("Wait")) "ms" + else if (row.metricName.contains("Bytes")) "bytes" + else "count" + assert(row.unit == expected, + s"unit mismatch for ${row.metricName}: got ${row.unit}, expected $expected") + } + + // Rollup math: SQL.sum == Σ stage.sum across the SQL's stages, per metric. + val stageRowsByMetric = agg.gpuStageAggs.groupBy(_.metricName) + val sqlRowsByMetric = agg.gpuSqlAggs.groupBy(_.metricName) + sqlRowsByMetric.foreach { case (metric, sqlRows) => + val sqlSum = sqlRows.flatMap(_.sum).sum + val stageSum = stageRowsByMetric.getOrElse(metric, Seq.empty).flatMap(_.sum).sum + assert(sqlSum == stageSum, s"SQL sum mismatch for $metric: $sqlSum vs $stageSum") + val sqlMax = sqlRows.flatMap(_.max) + val stageMax = stageRowsByMetric.getOrElse(metric, Seq.empty).flatMap(_.max) + if (stageMax.nonEmpty) { + assert(sqlMax.nonEmpty && sqlMax.max == stageMax.max, + s"SQL max mismatch for $metric") + } + } + + // App row exactly matches the rollup of all stage rows per metric. + val appByMetric = agg.gpuAppAggs.groupBy(_.metricName).mapValues(_.head) + stageRowsByMetric.foreach { case (metric, stageRows) => + val appRow = appByMetric(metric) + val expectedSum = stageRows.flatMap(_.sum) + assert(appRow.sum.map(s => s == expectedSum.sum).getOrElse(expectedSum.isEmpty), + s"App sum mismatch for $metric") + val expectedMax = stageRows.flatMap(_.max) + assert(appRow.max.map(m => m == expectedMax.max).getOrElse(expectedMax.isEmpty), + s"App max mismatch for $metric") + } + } + + test("GPU metric aggregation: max-aggregated metrics carry only max") { + val logs = Array(s"$logDir/gpu_oom_eventlog.zstd") + val apps = ToolTestUtils.processProfileApps(logs, sparkSession) + val agg = RawMetricProfilerView.getAggMetrics(apps.toSeq) + val maxOnlyNames = Set( + "gpuMaxDeviceMemoryBytes", "gpuMaxHostMemoryBytes", "gpuMaxPageableMemoryBytes", + "gpuMaxPinnedMemoryBytes", "gpuMaxDiskMemoryBytes", "gpuMaxTaskFootprint", + "gpuOnGpuTasksWaitingGPUMaxCount", "gpuMaxConcurrentGpuTasks", + "multithreadReaderMaxParallelism") + val maxRows = agg.gpuStageAggs.filter(r => maxOnlyNames.contains(r.metricName)) ++ + agg.gpuSqlAggs.filter(r => maxOnlyNames.contains(r.metricName)) ++ + agg.gpuAppAggs.filter(r => maxOnlyNames.contains(r.metricName)) + assert(maxRows.nonEmpty, "expected at least one max-aggregated metric row") + maxRows.foreach { r => + val sum = r match { + case s: StageAggGpuMetricsProfileResult => s.sum + case s: SQLAggGpuMetricsProfileResult => s.sum + case s: AppAggGpuMetricsProfileResult => s.sum + } + val avg = r match { + case s: StageAggGpuMetricsProfileResult => s.avg + case s: SQLAggGpuMetricsProfileResult => s.avg + case s: AppAggGpuMetricsProfileResult => s.avg + } + val max = r match { + case s: StageAggGpuMetricsProfileResult => s.max + case s: SQLAggGpuMetricsProfileResult => s.max + case s: AppAggGpuMetricsProfileResult => s.max + } + assert(sum.isEmpty, s"max-only metric should have empty sum: $r") + assert(avg.isEmpty, s"max-only metric should have empty avg: $r") + assert(max.isDefined, s"max-only metric should have a max value: $r") + } + } + + test("GPU metric aggregation: empty for CPU-only event log") { + val logs = Array(s"$qualLogDir/nds_q86_test") + val apps = ToolTestUtils.processProfileApps(logs, sparkSession) + val agg = RawMetricProfilerView.getAggMetrics(apps.toSeq) + assert(agg.gpuStageAggs.isEmpty, "CPU-only log should produce no GPU stage rows") + assert(agg.gpuSqlAggs.isEmpty, "CPU-only log should produce no GPU SQL rows") + assert(agg.gpuAppAggs.isEmpty, "CPU-only log should produce no GPU app rows") + } } From a3222bc92ac06c7f6f53a2e5c799514b99520a88 Mon Sep 17 00:00:00 2001 From: Partho Sarthi Date: Mon, 27 Apr 2026 09:48:20 -0700 Subject: [PATCH 2/4] Add appId column to gpu_app_level CSV and bump copyrights Adds appId as the leading column on gpu_app_level_aggregated_task_metrics.csv so downstream consumers can join by application without relying on the output directory path. Also bumps the copyright year on touched files to 2026 (the pre-commit hook's sed is BSD-incompatible on macOS and silently no-ops). Fixes #2020 Co-Authored-By: Claude Opus 4.7 (1M context) Signed-off-by: Partho Sarthi --- .../resources/configs/reports/coreRawMetricsReport.yaml | 9 +++++++-- .../spark/rapids/tool/analysis/AggRawMetricsResult.scala | 2 +- .../rapids/tool/analysis/AppSparkMetricsAggTrait.scala | 2 +- .../rapids/tool/analysis/AppSparkMetricsAnalyzer.scala | 3 ++- .../rapids/tool/profiling/ApplicationSummaryInfo.scala | 2 +- .../rapids/tool/profiling/ProfileClassWarehouse.scala | 8 +++++--- .../spark/rapids/tool/views/OutHeaderRegistry.scala | 4 ++-- .../spark/rapids/tool/views/QualRawReportGenerator.scala | 2 +- .../spark/rapids/tool/views/RawMetricProfView.scala | 2 +- .../com/nvidia/spark/rapids/tool/views/package.scala | 2 +- .../spark/sql/rapids/tool/store/AccumMetaRef.scala | 2 +- .../spark/rapids/tool/profiling/AnalysisSuite.scala | 2 +- 12 files changed, 24 insertions(+), 16 deletions(-) diff --git a/core/src/main/resources/configs/reports/coreRawMetricsReport.yaml b/core/src/main/resources/configs/reports/coreRawMetricsReport.yaml index 2ec512ed0..1ab9fd76f 100644 --- a/core/src/main/resources/configs/reports/coreRawMetricsReport.yaml +++ b/core/src/main/resources/configs/reports/coreRawMetricsReport.yaml @@ -1,4 +1,4 @@ -# Copyright (c) 2025, NVIDIA CORPORATION. +# Copyright (c) 2025-2026, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -928,10 +928,15 @@ reportDefinitions: - label: coreRawGpuAppLevelAggregatedTaskMetricsCSV description: >- GPU task metric aggregations at application level. One row per - metricName for the whole app (rolls up all stage-level GPU rows). + (appId, metricName) for the whole app (rolls up all stage-level + GPU rows). fileName: gpu_app_level_aggregated_task_metrics.csv scope: per-app columns: + - name: appId + dataType: String + description: >- + Application identifier. - name: metricName dataType: String description: >- diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AggRawMetricsResult.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AggRawMetricsResult.scala index 11bddfeb6..4b48df425 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AggRawMetricsResult.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AggRawMetricsResult.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2024, NVIDIA CORPORATION. + * Copyright (c) 2024-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAggTrait.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAggTrait.scala index 23645e4f9..91ba00ddc 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAggTrait.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAggTrait.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2024-2025, NVIDIA CORPORATION. + * Copyright (c) 2024-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAnalyzer.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAnalyzer.scala index 31d3bdca0..85e1ef8f3 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAnalyzer.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAnalyzer.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2024-2025, NVIDIA CORPORATION. + * Copyright (c) 2024-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -581,6 +581,7 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) { } rollupGpuRows(stageRows).map { case (metric, unit, sum, max, avg) => AppAggGpuMetricsProfileResult( + appId = app.appId, metricName = metric, unit = unit, sum = sum, diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationSummaryInfo.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationSummaryInfo.scala index a80257f3d..ace530656 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationSummaryInfo.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationSummaryInfo.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2025, NVIDIA CORPORATION. + * Copyright (c) 2021-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileClassWarehouse.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileClassWarehouse.scala index 7beddd443..ab5395618 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileClassWarehouse.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileClassWarehouse.scala @@ -1617,11 +1617,12 @@ case class SQLAggGpuMetricsProfileResult( } /** - * GPU task metric aggregation at app level — one row per metricName for the - * whole application. Same rollup rules as SQL-level. numTasks intentionally - * omitted (would be constant per app and is available elsewhere). + * GPU task metric aggregation at app level — one row per (appId, metricName). + * Same rollup rules as SQL-level. numTasks intentionally omitted (would be a + * constant per app and is available in existing per-app CSVs). */ case class AppAggGpuMetricsProfileResult( + appId: String, metricName: String, unit: String, sum: Option[Long], @@ -1634,6 +1635,7 @@ case class AppAggGpuMetricsProfileResult( override def convertToSeq(): Array[String] = { Array( + appId, metricName, unit, sum.map(_.toString).getOrElse(""), diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/views/OutHeaderRegistry.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/views/OutHeaderRegistry.scala index eeee9cf8e..7e073212c 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/views/OutHeaderRegistry.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/views/OutHeaderRegistry.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2025, NVIDIA CORPORATION. + * Copyright (c) 2025-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -315,6 +315,6 @@ object OutHeaderRegistry { "SQLAggGpuMetricsProfileResult" -> Array("sqlId", "metricName", "unit", "sum", "max", "avg"), "AppAggGpuMetricsProfileResult" -> - Array("metricName", "unit", "sum", "max", "avg") + Array("appId", "metricName", "unit", "sum", "max", "avg") ) // End of outputHeaders map initialization } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/views/QualRawReportGenerator.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/views/QualRawReportGenerator.scala index 97142636f..669fb6ba9 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/views/QualRawReportGenerator.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/views/QualRawReportGenerator.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2024-2025, NVIDIA CORPORATION. + * Copyright (c) 2024-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/views/RawMetricProfView.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/views/RawMetricProfView.scala index d9de41225..19c7fd008 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/views/RawMetricProfView.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/views/RawMetricProfView.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2024-2025, NVIDIA CORPORATION. + * Copyright (c) 2024-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/views/package.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/views/package.scala index bc5f1461b..6c2d47c10 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/views/package.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/views/package.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2024-2025, NVIDIA CORPORATION. + * Copyright (c) 2024-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/AccumMetaRef.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/AccumMetaRef.scala index 710884b28..12eaad03d 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/AccumMetaRef.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/AccumMetaRef.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2024-2025, NVIDIA CORPORATION. + * Copyright (c) 2024-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AnalysisSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AnalysisSuite.scala index 8d9d3df25..9b89d8a17 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AnalysisSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AnalysisSuite.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2025, NVIDIA CORPORATION. + * Copyright (c) 2021-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. From 1dae48acd70cadb0e7493509e7ad17f017b7e964 Mon Sep 17 00:00:00 2001 From: Partho Sarthi Date: Mon, 27 Apr 2026 12:35:23 -0700 Subject: [PATCH 3/4] Fix Scala 2.13 build for SQL-level GPU rollup ArrayBuffer.flatMap returns ArrayBuffer (mutable), which no longer auto-coerces to immutable.Seq under Scala 2.13. Materialize the per-SQL row collection as Seq before passing to rollupGpuRows, and use an explicit lambda for the inner flatMap. Fixes #2020 Co-Authored-By: Claude Opus 4.7 (1M context) Signed-off-by: Partho Sarthi --- .../rapids/tool/analysis/AppSparkMetricsAnalyzer.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAnalyzer.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAnalyzer.scala index 85e1ef8f3..f1ec5e2d9 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAnalyzer.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAnalyzer.scala @@ -553,8 +553,9 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) { } val stageMap: Map[Int, Seq[StageAggGpuMetricsProfileResult]] = stageRows.groupBy(_.stageId) - app.sqlIdToStages.flatMap { case (sqlId, stageIds) => - val rowsForSql = stageIds.flatMap(stageMap.getOrElse(_, Seq.empty)) + app.sqlIdToStages.toSeq.flatMap { case (sqlId, stageIds) => + val rowsForSql: Seq[StageAggGpuMetricsProfileResult] = + stageIds.toSeq.flatMap(s => stageMap.getOrElse(s, Seq.empty)) rollupGpuRows(rowsForSql).map { case (metric, unit, sum, max, avg) => SQLAggGpuMetricsProfileResult( sqlId = sqlId, @@ -564,7 +565,7 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) { max = max, avg = avg) } - }.toSeq.sortBy(r => (r.sqlId, r.metricName)) + }.sortBy(r => (r.sqlId, r.metricName)) } /** From f668a1d47b12661c6e3a69502b540b96f8a4f7e6 Mon Sep 17 00:00:00 2001 From: Partho Sarthi Date: Thu, 30 Apr 2026 16:30:08 -0700 Subject: [PATCH 4/4] Address greptile review on PR #2088 - Drop dead StageAggGpuMetricsProfileResult.aggregateStageProfileMetric. Stage attempts already merge upstream at the AccumInfo layer (stagesStatMap is keyed by stageId only, not stageId+attemptNumber), so a separate merge step on the case class is never invoked. Replaced the method with a comment explaining the upstream merging. - Document the numTasks=0 invariant in aggregateGpuMetricsByStage and log a warning if the stage-task metrics cache lookup misses (which would silently distort the task-weighted avg at SQL/app level). Fixes #2020 Co-Authored-By: Claude Opus 4.7 (1M context) Signed-off-by: Partho Sarthi --- .../analysis/AppSparkMetricsAnalyzer.scala | 16 +++++++- .../profiling/ProfileClassWarehouse.scala | 41 +++---------------- 2 files changed, 20 insertions(+), 37 deletions(-) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAnalyzer.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAnalyzer.scala index 1cf61d419..f0ed6bf60 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAnalyzer.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAnalyzer.scala @@ -23,6 +23,7 @@ import com.nvidia.spark.rapids.tool.analysis.util.AggAccumHelper import com.nvidia.spark.rapids.tool.analysis.util.StageAccumDiagnosticMetrics._ import com.nvidia.spark.rapids.tool.profiling._ +import org.apache.spark.internal.Logging import org.apache.spark.sql.rapids.tool.{AppBase, ToolUtils} import org.apache.spark.sql.rapids.tool.profiling.ApplicationInfo import org.apache.spark.sql.rapids.tool.store.AccumMetaRef @@ -47,7 +48,7 @@ import org.apache.spark.sql.rapids.tool.store.AccumMetaRef * * @param app the AppBase object to analyze */ -class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) { +class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) with Logging { // Hashmap to cache the stage level metrics. It is initialized to None just in case the caller // does not call methods in order starting with stage level metrics. private var stageLevelCache: @@ -454,7 +455,18 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) { val isMax = ai.infoRef.isAggregateByMax ai.getStageIds.foreach { stageId => ai.calculateAccStatsForStage(stageId).foreach { stats => - val numTasks = stageCache.get(stageId).map(_.numTasks).getOrElse(0) + // Invariant: stages with GPU accumulators are tracked by stageManager + // and therefore cached. The fallback to 0 is defensive for edge cases + // (e.g. driver-side accumulators) where the stage is absent from the + // task-metrics cache. A 0 here would also exclude the stage from the + // task-weighted avg in rollupGpuRows while sum/max still accumulate, + // so log a warning so the inconsistency is visible. + val numTasks = stageCache.get(stageId).map(_.numTasks).getOrElse { + logWarning(s"GPU accumulator '$name' references stage $stageId which " + + s"is not in the stage-task metrics cache; using numTasks = 0. " + + s"This will be excluded from task-weighted averages at SQL/app level.") + 0 + } val (sum, max, avg) = if (isMax) { (None: Option[Long], Some(convertValue(name, stats.max)), diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileClassWarehouse.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileClassWarehouse.scala index 0996a24fa..73ef3e357 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileClassWarehouse.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileClassWarehouse.scala @@ -1573,6 +1573,12 @@ object UnixExitCode { * GPU task metric aggregation at stage level — one row per (stageId, metricName). * Long/transposed schema: unit and sum/max/avg vary by metric. Empty `sum` / `avg` * denote max-aggregated metrics (e.g. `gpuMaxDeviceMemoryBytes`). + * + * Note on stage attempts: unlike StageAggTaskMetricsProfileResult, this class has + * no aggregateStageProfileMetric helper because attempt merging happens upstream + * at the AccumInfo layer — `AccumInfo.stagesStatMap` is keyed by stageId only + * (not stageId + attemptNumber), so calculateAccStatsForStage already returns + * the merged result across attempts. */ case class StageAggGpuMetricsProfileResult( stageId: Int, @@ -1599,41 +1605,6 @@ case class StageAggGpuMetricsProfileResult( } override def convertToCSVSeq(): Array[String] = convertToSeq() - - /** - * Combines two rows for the same (stageId, metricName) across stage attempts. - * Mirrors the policy used by StageAggTaskMetricsProfileResult: sum += sum, - * max = max(max), avg = (a+b)/2, numTasks += numTasks. - */ - def aggregateStageProfileMetric( - other: StageAggGpuMetricsProfileResult): StageAggGpuMetricsProfileResult = { - def addOpt(a: Option[Long], b: Option[Long]): Option[Long] = (a, b) match { - case (Some(x), Some(y)) => Some(x + y) - case (Some(x), None) => Some(x) - case (None, Some(y)) => Some(y) - case _ => None - } - def maxOpt(a: Option[Long], b: Option[Long]): Option[Long] = (a, b) match { - case (Some(x), Some(y)) => Some(Math.max(x, y)) - case (Some(x), None) => Some(x) - case (None, Some(y)) => Some(y) - case _ => None - } - def avgOpt(a: Option[Long], b: Option[Long]): Option[Long] = (a, b) match { - case (Some(x), Some(y)) => Some((x + y) / 2) - case (Some(x), None) => Some(x) - case (None, Some(y)) => Some(y) - case _ => None - } - StageAggGpuMetricsProfileResult( - stageId = this.stageId, - numTasks = this.numTasks + other.numTasks, - metricName = this.metricName, - unit = this.unit, - sum = addOpt(this.sum, other.sum), - max = maxOpt(this.max, other.max), - avg = avgOpt(this.avg, other.avg)) - } } /**