diff --git a/core/src/main/resources/configs/reports/coreRawMetricsReport.yaml b/core/src/main/resources/configs/reports/coreRawMetricsReport.yaml index 404bc7ede..06f710254 100644 --- a/core/src/main/resources/configs/reports/coreRawMetricsReport.yaml +++ b/core/src/main/resources/configs/reports/coreRawMetricsReport.yaml @@ -867,6 +867,111 @@ reportDefinitions: dataType: Long description: >- TBD + # GPU task metric aggregations (stage / sql / app) + - label: coreRawGpuStageLevelAggregatedTaskMetricsCSV + description: >- + GPU task metric aggregations at stage level. Long-format: one row per + (stageId, metricName). Auto-discovered from accumulator names starting + with gpu, perfio.s3., or equal to multithreadReaderMaxParallelism. + fileName: gpu_stage_level_aggregated_task_metrics.csv + scope: per-app + columns: + - name: stageId + dataType: Int + description: >- + Stage identifier. + - name: numTasks + dataType: Int + description: >- + Number of tasks in the stage (includes task-attempt duplicates). + - name: metricName + dataType: String + description: >- + GPU accumulator name (e.g. gpuTime, gpuMaxDeviceMemoryBytes). + - name: unit + dataType: String + description: >- + ms for time/wait metrics, bytes for byte metrics, count otherwise. + - name: sum + dataType: Long + description: >- + Total across tasks in the stage. Empty for max-aggregated metrics. + - name: max + dataType: Long + description: >- + Peak per-task value in the stage. + - name: avg + dataType: Long + description: >- + Rolling per-task average. Empty for max-aggregated metrics. + - label: coreRawGpuSqlLevelAggregatedTaskMetricsCSV + description: >- + GPU task metric aggregations at SQL level. Rolls up stage-level GPU + rows across stages in the SQL (via sqlIdToStages). numTasks is + deliberately omitted: it would be a constant per SQL across every + metric row (already in sql_level_aggregated_task_metrics.csv). + fileName: gpu_sql_level_aggregated_task_metrics.csv + scope: per-app + columns: + - name: sqlId + dataType: Long + description: >- + SQL identifier. + - name: metricName + dataType: String + description: >- + GPU accumulator name. + - name: unit + dataType: String + description: >- + ms / bytes / count. + - name: sum + dataType: Long + description: >- + Sum of stage sums. Empty for max-aggregated metrics. + - name: max + dataType: Long + description: >- + Max of stage maxes. + - name: avg + dataType: Long + description: >- + Task-weighted average across stages. Empty for max-aggregated + metrics. + - label: coreRawGpuAppLevelAggregatedTaskMetricsCSV + description: >- + GPU task metric aggregations at application level. One row per + (appId, metricName) for the whole app (rolls up all stage-level + GPU rows). + fileName: gpu_app_level_aggregated_task_metrics.csv + scope: per-app + columns: + - name: appId + dataType: String + description: >- + Application identifier. + - name: metricName + dataType: String + description: >- + GPU accumulator name. + - name: unit + dataType: String + description: >- + ms / bytes / count. + - name: sum + dataType: Long + description: >- + Sum across all stages in the app. Empty for max-aggregated + metrics. + - name: max + dataType: Long + description: >- + Overall peak reading any task produced during the run. + - name: avg + dataType: Long + description: >- + Task-weighted average across all stages. Empty for + max-aggregated metrics. # AccumProfileResults - label: coreRawStageLevelAllMetricsCSV description: >- diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AggRawMetricsResult.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AggRawMetricsResult.scala index c460f3d5c..f5f33924e 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AggRawMetricsResult.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AggRawMetricsResult.scala @@ -16,7 +16,7 @@ package com.nvidia.spark.rapids.tool.analysis -import com.nvidia.spark.rapids.tool.profiling.{IOAnalysisProfileResult, JobAggTaskMetricsProfileResult, ShuffleSkewProfileResult, SQLDurationExecutorTimeProfileResult, SQLTaskAggMetricsProfileResult, StageAggTaskMetricsProfileResult, StageDiagnosticResult} +import com.nvidia.spark.rapids.tool.profiling.{AppAggGpuMetricsProfileResult, IOAnalysisProfileResult, JobAggTaskMetricsProfileResult, ShuffleSkewProfileResult, SQLAggGpuMetricsProfileResult, SQLDurationExecutorTimeProfileResult, SQLTaskAggMetricsProfileResult, StageAggGpuMetricsProfileResult, StageAggTaskMetricsProfileResult, StageDiagnosticResult} /** * The result of the aggregation of the raw metrics. It contains the aggregated metrics for an @@ -32,6 +32,9 @@ import com.nvidia.spark.rapids.tool.profiling.{IOAnalysisProfileResult, JobAggTa * @param ioAggs lists the SQLs along their IO metrics * @param sqlDurAggs the aggregated duration and CPU time for SQLs * @param stageDiagnostics the stage level Spark metrics for diagnostic purposes + * @param gpuStageAggs GPU task metric aggregations at stage level + * @param gpuSqlAggs GPU task metric aggregations at SQL level + * @param gpuAppAggs GPU task metric aggregations at app level */ case class AggRawMetricsResult( jobAggs: Seq[JobAggTaskMetricsProfileResult], @@ -40,4 +43,7 @@ case class AggRawMetricsResult( sqlAggs: Seq[SQLTaskAggMetricsProfileResult], ioAggs: Seq[IOAnalysisProfileResult], sqlDurAggs: Seq[SQLDurationExecutorTimeProfileResult], - stageDiagnostics: Seq[StageDiagnosticResult]) + stageDiagnostics: Seq[StageDiagnosticResult], + gpuStageAggs: Seq[StageAggGpuMetricsProfileResult] = Seq.empty, + gpuSqlAggs: Seq[SQLAggGpuMetricsProfileResult] = Seq.empty, + gpuAppAggs: Seq[AppAggGpuMetricsProfileResult] = Seq.empty) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAggTrait.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAggTrait.scala index 67b899962..287e0de48 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAggTrait.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAggTrait.scala @@ -38,6 +38,7 @@ trait AppSparkMetricsAggTrait extends AppIndexMapperTrait { sqlAnalyzer: Option[AppSQLPlanAnalyzer] = None): AggRawMetricsResult = { val analysisObj = AppSparkMetricsAnalyzer(app) val sqlMetricsAgg = analysisObj.aggregateSparkMetricsBySql(index) + val gpuStageRows = analysisObj.aggregateGpuMetricsByStage(index) AggRawMetricsResult( analysisObj.aggregateSparkMetricsByJob(index), analysisObj.aggregateSparkMetricsByStage(index), @@ -45,7 +46,10 @@ trait AppSparkMetricsAggTrait extends AppIndexMapperTrait { sqlMetricsAgg, analysisObj.aggregateIOMetricsBySql(sqlMetricsAgg), analysisObj.aggregateDurationAndCPUTimeBySql(index), - analysisObj.aggregateDiagnosticMetricsByStage(index, sqlAnalyzer)) + analysisObj.aggregateDiagnosticMetricsByStage(index, sqlAnalyzer), + gpuStageRows, + analysisObj.aggregateGpuMetricsBySql(index, gpuStageRows), + analysisObj.aggregateGpuMetricsByApp(index, gpuStageRows)) } /** @@ -66,7 +70,10 @@ trait AppSparkMetricsAggTrait extends AppIndexMapperTrait { agg1.sqlAggs ++ agg2.sqlAggs, agg1.ioAggs ++ agg2.ioAggs, agg1.sqlDurAggs ++ agg2.sqlDurAggs, - agg1.stageDiagnostics ++ agg2.stageDiagnostics) + agg1.stageDiagnostics ++ agg2.stageDiagnostics, + agg1.gpuStageAggs ++ agg2.gpuStageAggs, + agg1.gpuSqlAggs ++ agg2.gpuSqlAggs, + agg1.gpuAppAggs ++ agg2.gpuAppAggs) } } } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAnalyzer.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAnalyzer.scala index c1b13463a..f0ed6bf60 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAnalyzer.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/analysis/AppSparkMetricsAnalyzer.scala @@ -23,6 +23,7 @@ import com.nvidia.spark.rapids.tool.analysis.util.AggAccumHelper import com.nvidia.spark.rapids.tool.analysis.util.StageAccumDiagnosticMetrics._ import com.nvidia.spark.rapids.tool.profiling._ +import org.apache.spark.internal.Logging import org.apache.spark.sql.rapids.tool.{AppBase, ToolUtils} import org.apache.spark.sql.rapids.tool.profiling.ApplicationInfo import org.apache.spark.sql.rapids.tool.store.AccumMetaRef @@ -47,7 +48,7 @@ import org.apache.spark.sql.rapids.tool.store.AccumMetaRef * * @param app the AppBase object to analyze */ -class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) { +class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) with Logging { // Hashmap to cache the stage level metrics. It is initialized to None just in case the caller // does not call methods in order starting with stage level metrics. private var stageLevelCache: @@ -404,6 +405,180 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) { stageLevelSparkMetrics(index).put(sm.stageInfo.stageId, rowToStore) } } + + // --------------------------------------------------------------------------- + // GPU task metric aggregations (Stage / SQL / App) + // --------------------------------------------------------------------------- + // + // Discovery convention: an accumulator is a GPU task metric if its name + // - starts with "gpu", OR + // - starts with "perfio.s3.", OR + // - equals "multithreadReaderMaxParallelism". + // Unit convention (from name): contains Time|Wait → ms (raw ns / 1e6); + // contains Bytes → bytes; otherwise → count. + // Max-aggregated metrics are discriminated via AccumMetaRef.isAggregateByMax; + // for those, sum and avg are empty and only max is meaningful. + + private def isGpuMetric(name: String): Boolean = { + name.startsWith("gpu") || + name.startsWith("perfio.s3.") || + name == "multithreadReaderMaxParallelism" + } + + private def unitForMetric(name: String): String = { + if (name.contains("Time") || name.contains("Wait")) "ms" + else if (name.contains("Bytes")) "bytes" + else "count" + } + + private def convertValue(name: String, raw: Long): Long = { + if (name.contains("Time") || name.contains("Wait")) raw / 1000000L else raw + } + + /** + * Aggregate GPU task accumulators by stage. Emits one row per (stageId, + * metricName). Returns Seq.empty when the app has no GPU metrics, which + * upstream uses to suppress CSV generation. + */ + def aggregateGpuMetricsByStage(index: Int): Seq[StageAggGpuMetricsProfileResult] = { + val gpuAccums = app.accumManager.accumInfoMap.values.filter { ai => + isGpuMetric(ai.infoRef.getName()) + }.toSeq + if (gpuAccums.isEmpty) { + return Seq.empty + } + val stageCache = stageLevelSparkMetrics(index) + val rows = scala.collection.mutable.ArrayBuffer[StageAggGpuMetricsProfileResult]() + gpuAccums.foreach { ai => + val name = ai.infoRef.getName() + val unit = unitForMetric(name) + val isMax = ai.infoRef.isAggregateByMax + ai.getStageIds.foreach { stageId => + ai.calculateAccStatsForStage(stageId).foreach { stats => + // Invariant: stages with GPU accumulators are tracked by stageManager + // and therefore cached. The fallback to 0 is defensive for edge cases + // (e.g. driver-side accumulators) where the stage is absent from the + // task-metrics cache. A 0 here would also exclude the stage from the + // task-weighted avg in rollupGpuRows while sum/max still accumulate, + // so log a warning so the inconsistency is visible. + val numTasks = stageCache.get(stageId).map(_.numTasks).getOrElse { + logWarning(s"GPU accumulator '$name' references stage $stageId which " + + s"is not in the stage-task metrics cache; using numTasks = 0. " + + s"This will be excluded from task-weighted averages at SQL/app level.") + 0 + } + val (sum, max, avg) = if (isMax) { + (None: Option[Long], + Some(convertValue(name, stats.max)), + None: Option[Long]) + } else { + (Some(convertValue(name, stats.total)), + Some(convertValue(name, stats.max)), + Some(convertValue(name, stats.med))) + } + // Skip rows carrying no signal (both sum and max zero/absent). + val zeroSum = sum.forall(_ == 0L) + val zeroMax = max.forall(_ == 0L) + if (!(zeroSum && zeroMax)) { + rows += StageAggGpuMetricsProfileResult( + stageId = stageId, + numTasks = numTasks, + metricName = name, + unit = unit, + sum = sum, + max = max, + avg = avg) + } + } + } + } + rows.toSeq.sortBy(r => (r.stageId, r.metricName)) + } + + /** + * Rollup helper: groups stage-level GPU rows by metric name and reduces to + * (unit, sum, max, avg). sum is Σ stage.sum (None for max metrics); max is + * max stage.max; avg is task-weighted Σ(stage.avg * stage.numTasks) + * / Σ stage.numTasks over the stages that recorded the metric. numTasks is + * intentionally not propagated — see SQLAggGpuMetricsProfileResult / + * AppAggGpuMetricsProfileResult docstrings. + */ + private def rollupGpuRows( + rows: Seq[StageAggGpuMetricsProfileResult] + ): Seq[(String, String, Option[Long], Option[Long], Option[Long])] = { + rows.groupBy(_.metricName).map { case (metricName, group) => + val unit = group.head.unit + val sumOpt: Option[Long] = { + val xs = group.flatMap(_.sum) + if (xs.isEmpty) None else Some(xs.sum) + } + val maxOpt: Option[Long] = { + val xs = group.flatMap(_.max) + if (xs.isEmpty) None else Some(xs.max) + } + val avgOpt: Option[Long] = { + val weighted = group.flatMap { r => r.avg.map(a => (a, r.numTasks)) } + val weightTasks = weighted.map(_._2).sum + if (weighted.isEmpty || weightTasks == 0) { + None + } else { + Some(weighted.map { case (a, n) => a * n }.sum / weightTasks) + } + } + (metricName, unit, sumOpt, maxOpt, avgOpt) + }.toSeq + } + + /** + * Aggregate GPU task metrics by SQL. Rolls up stage-level rows using + * app.sqlIdToStages. One row per (sqlId, metricName). + */ + def aggregateGpuMetricsBySql( + index: Int, + stageRows: Seq[StageAggGpuMetricsProfileResult] + ): Seq[SQLAggGpuMetricsProfileResult] = { + if (stageRows.isEmpty) { + return Seq.empty + } + val stageMap: Map[Int, Seq[StageAggGpuMetricsProfileResult]] = + stageRows.groupBy(_.stageId) + app.sqlIdToStages.toSeq.flatMap { case (sqlId, stageIds) => + val rowsForSql: Seq[StageAggGpuMetricsProfileResult] = + stageIds.toSeq.flatMap(s => stageMap.getOrElse(s, Seq.empty)) + rollupGpuRows(rowsForSql).map { case (metric, unit, sum, max, avg) => + SQLAggGpuMetricsProfileResult( + sqlId = sqlId, + metricName = metric, + unit = unit, + sum = sum, + max = max, + avg = avg) + } + }.sortBy(r => (r.sqlId, r.metricName)) + } + + /** + * Aggregate GPU task metrics across the whole application. One row per + * metricName. For max-aggregated metrics this gives the peak reading any + * task produced during the run. + */ + def aggregateGpuMetricsByApp( + index: Int, + stageRows: Seq[StageAggGpuMetricsProfileResult] + ): Seq[AppAggGpuMetricsProfileResult] = { + if (stageRows.isEmpty) { + return Seq.empty + } + rollupGpuRows(stageRows).map { case (metric, unit, sum, max, avg) => + AppAggGpuMetricsProfileResult( + appId = app.appId, + metricName = metric, + unit = unit, + sum = sum, + max = max, + avg = avg) + }.sortBy(_.metricName) + } } /** diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationSummaryInfo.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationSummaryInfo.scala index 9e1d2ebc8..e5b063118 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationSummaryInfo.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ApplicationSummaryInfo.scala @@ -54,7 +54,10 @@ case class ApplicationSummaryInfo( sparkRapidsBuildInfo: Seq[SparkRapidsBuildInfoEvent], writeOpsInfo: Seq[WriteOpProfileResult], sqlPlanInfo: Seq[SQLPlanInfoProfileResult], - appLevelRecommendationSignals: Seq[AppLevelRecommendationSignalsProfileResult] = Seq.empty) + appLevelRecommendationSignals: Seq[AppLevelRecommendationSignalsProfileResult] = Seq.empty, + gpuStageAggMetrics: Seq[StageAggGpuMetricsProfileResult] = Seq.empty, + gpuSqlAggMetrics: Seq[SQLAggGpuMetricsProfileResult] = Seq.empty, + gpuAppAggMetrics: Seq[AppAggGpuMetricsProfileResult] = Seq.empty) trait AppInfoPropertyGetter { // returns all the properties (i.e., spark) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileClassWarehouse.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileClassWarehouse.scala index b467db8c6..73ef3e357 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileClassWarehouse.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileClassWarehouse.scala @@ -1568,3 +1568,103 @@ object SparkRapidsOomExceptions { object UnixExitCode { val FORCE_KILLED = 137 } + +/** + * GPU task metric aggregation at stage level — one row per (stageId, metricName). + * Long/transposed schema: unit and sum/max/avg vary by metric. Empty `sum` / `avg` + * denote max-aggregated metrics (e.g. `gpuMaxDeviceMemoryBytes`). + * + * Note on stage attempts: unlike StageAggTaskMetricsProfileResult, this class has + * no aggregateStageProfileMetric helper because attempt merging happens upstream + * at the AccumInfo layer — `AccumInfo.stagesStatMap` is keyed by stageId only + * (not stageId + attemptNumber), so calculateAccStatsForStage already returns + * the merged result across attempts. + */ +case class StageAggGpuMetricsProfileResult( + stageId: Int, + numTasks: Int, + metricName: String, + unit: String, + sum: Option[Long], + max: Option[Long], + avg: Option[Long]) extends ProfileResult { + + override def outputHeaders: Array[String] = { + OutHeaderRegistry.outputHeaders("StageAggGpuMetricsProfileResult") + } + + override def convertToSeq(): Array[String] = { + Array( + stageId.toString, + numTasks.toString, + metricName, + unit, + sum.map(_.toString).getOrElse(""), + max.map(_.toString).getOrElse(""), + avg.map(_.toString).getOrElse("")) + } + + override def convertToCSVSeq(): Array[String] = convertToSeq() +} + +/** + * GPU task metric aggregation at SQL level — one row per (sqlId, metricName). + * Rolled up from stage-level rows: sum = Σ stage.sum, max = max stage.max, + * avg = task-weighted average over stage.avg. numTasks is intentionally not + * carried — it would be a constant per SQL across every metric row (the non-GPU + * sql_level_aggregated_task_metrics.csv already has it once per SQL). + */ +case class SQLAggGpuMetricsProfileResult( + sqlId: Long, + metricName: String, + unit: String, + sum: Option[Long], + max: Option[Long], + avg: Option[Long]) extends ProfileResult { + + override def outputHeaders: Array[String] = { + OutHeaderRegistry.outputHeaders("SQLAggGpuMetricsProfileResult") + } + + override def convertToSeq(): Array[String] = { + Array( + sqlId.toString, + metricName, + unit, + sum.map(_.toString).getOrElse(""), + max.map(_.toString).getOrElse(""), + avg.map(_.toString).getOrElse("")) + } + + override def convertToCSVSeq(): Array[String] = convertToSeq() +} + +/** + * GPU task metric aggregation at app level — one row per (appId, metricName). + * Same rollup rules as SQL-level. numTasks intentionally omitted (would be a + * constant per app and is available in existing per-app CSVs). + */ +case class AppAggGpuMetricsProfileResult( + appId: String, + metricName: String, + unit: String, + sum: Option[Long], + max: Option[Long], + avg: Option[Long]) extends ProfileResult { + + override def outputHeaders: Array[String] = { + OutHeaderRegistry.outputHeaders("AppAggGpuMetricsProfileResult") + } + + override def convertToSeq(): Array[String] = { + Array( + appId, + metricName, + unit, + sum.map(_.toString).getOrElse(""), + max.map(_.toString).getOrElse(""), + avg.map(_.toString).getOrElse("")) + } + + override def convertToCSVSeq(): Array[String] = convertToSeq() +} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala index 2b689397b..72b3ba618 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala @@ -350,7 +350,10 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea sparkRapidsBuildInfo = collect.getSparkRapidsInfo, writeOpsInfo = collect.getWriteOperationInfo, sqlPlanInfo = collect.getSQLPlanInfoTruncated, - appLevelRecommendationSignals = appLevelRecommendationSignals) + appLevelRecommendationSignals = appLevelRecommendationSignals, + gpuStageAggMetrics = analysis.gpuStageAggs, + gpuSqlAggMetrics = analysis.gpuSqlAggs, + gpuAppAggMetrics = analysis.gpuAppAggs) (appInfoSummary, DiagnosticSummaryInfo(analysis.stageDiagnostics, collect.getIODiagnosticMetrics)) } @@ -418,6 +421,15 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea profileOutputWriter.writeCSVTable(JOB_AGG_LABEL, app.jobAggMetrics) profileOutputWriter.writeCSVTable(STAGE_AGG_LABEL, app.stageAggMetrics) profileOutputWriter.writeCSVTable(SQL_AGG_LABEL, app.sqlTaskAggMetrics) + if (app.gpuStageAggMetrics.nonEmpty) { + profileOutputWriter.writeCSVTable(GPU_STAGE_AGG_LABEL, app.gpuStageAggMetrics) + } + if (app.gpuSqlAggMetrics.nonEmpty) { + profileOutputWriter.writeCSVTable(GPU_SQL_AGG_LABEL, app.gpuSqlAggMetrics) + } + if (app.gpuAppAggMetrics.nonEmpty) { + profileOutputWriter.writeCSVTable(GPU_APP_AGG_LABEL, app.gpuAppAggMetrics) + } profileOutputWriter.writeCSVTable(IO_LABEL, app.ioMetrics) profileOutputWriter.writeCSVTable(SQL_DUR_LABEL, app.durAndCpuMet) // writeOps are generated in only CSV format diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/views/OutHeaderRegistry.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/views/OutHeaderRegistry.scala index ac61a5d3b..09fad297a 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/views/OutHeaderRegistry.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/views/OutHeaderRegistry.scala @@ -314,6 +314,12 @@ object OutHeaderRegistry { "WriteOpProfileResult" -> Array("sqlID", "sqlPlanVersion", "nodeId", "fromFinalPlan", "execName", "format", "location", "tableName", "dataBase", "outputColumns", "writeMode", - "partitionColumns", "compressionOption", "fullDescription") + "partitionColumns", "compressionOption", "fullDescription"), + "StageAggGpuMetricsProfileResult" -> + Array("stageId", "numTasks", "metricName", "unit", "sum", "max", "avg"), + "SQLAggGpuMetricsProfileResult" -> + Array("sqlId", "metricName", "unit", "sum", "max", "avg"), + "AppAggGpuMetricsProfileResult" -> + Array("appId", "metricName", "unit", "sum", "max", "avg") ) // End of outputHeaders map initialization } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/views/QualRawReportGenerator.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/views/QualRawReportGenerator.scala index c26a9b789..f7d87b41b 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/views/QualRawReportGenerator.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/views/QualRawReportGenerator.scala @@ -37,7 +37,10 @@ object QualRawReportGenerator extends Logging { AggMetricsResultSorter.sortSqlAgg(aggRawResult.sqlAggs), AggMetricsResultSorter.sortIO(aggRawResult.ioAggs), AggMetricsResultSorter.sortSqlDurationAgg(aggRawResult.sqlDurAggs), - AggMetricsResultSorter.sortStageDiagnostics(aggRawResult.stageDiagnostics)) + AggMetricsResultSorter.sortStageDiagnostics(aggRawResult.stageDiagnostics), + aggRawResult.gpuStageAggs.sortBy(r => (r.stageId, r.metricName)), + aggRawResult.gpuSqlAggs.sortBy(r => (r.sqlId, r.metricName)), + aggRawResult.gpuAppAggs.sortBy(_.metricName)) Map( STAGE_AGG_LABEL -> sortedRes.stageAggs, JOB_AGG_LABEL -> sortedRes.jobAggs, @@ -45,7 +48,10 @@ object QualRawReportGenerator extends Logging { SQL_AGG_LABEL -> sortedRes.sqlAggs, IO_LABEL -> sortedRes.ioAggs, SQL_DUR_LABEL -> sortedRes.sqlDurAggs, - STAGE_DIAGNOSTICS_LABEL -> sortedRes.stageDiagnostics) + STAGE_DIAGNOSTICS_LABEL -> sortedRes.stageDiagnostics, + GPU_STAGE_AGG_LABEL -> sortedRes.gpuStageAggs, + GPU_SQL_AGG_LABEL -> sortedRes.gpuSqlAggs, + GPU_APP_AGG_LABEL -> sortedRes.gpuAppAggs) } private def generateSQLProcessingView( diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/views/RawMetricProfView.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/views/RawMetricProfView.scala index 3ba76db7b..b5220c9a8 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/views/RawMetricProfView.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/views/RawMetricProfView.scala @@ -17,7 +17,7 @@ package com.nvidia.spark.rapids.tool.views import com.nvidia.spark.rapids.tool.analysis.ProfSparkMetricsAggregator -import com.nvidia.spark.rapids.tool.profiling.{IOAnalysisProfileResult, JobAggTaskMetricsProfileResult, ShuffleSkewProfileResult, SQLDurationExecutorTimeProfileResult, SQLTaskAggMetricsProfileResult, StageAggTaskMetricsProfileResult, StageDiagnosticResult} +import com.nvidia.spark.rapids.tool.profiling.{AppAggGpuMetricsProfileResult, IOAnalysisProfileResult, JobAggTaskMetricsProfileResult, ShuffleSkewProfileResult, SQLAggGpuMetricsProfileResult, SQLDurationExecutorTimeProfileResult, SQLTaskAggMetricsProfileResult, StageAggGpuMetricsProfileResult, StageAggTaskMetricsProfileResult, StageDiagnosticResult} import org.apache.spark.sql.rapids.tool.profiling.ApplicationInfo @@ -30,7 +30,10 @@ case class ProfilerAggregatedView( sqlAggs: Seq[SQLTaskAggMetricsProfileResult], ioAggs: Seq[IOAnalysisProfileResult], sqlDurAggs: Seq[SQLDurationExecutorTimeProfileResult], - stageDiagnostics: Seq[StageDiagnosticResult]) + stageDiagnostics: Seq[StageDiagnosticResult], + gpuStageAggs: Seq[StageAggGpuMetricsProfileResult] = Seq.empty, + gpuSqlAggs: Seq[SQLAggGpuMetricsProfileResult] = Seq.empty, + gpuAppAggs: Seq[AppAggGpuMetricsProfileResult] = Seq.empty) object RawMetricProfilerView { def getAggMetrics(apps: Seq[ApplicationInfo]): ProfilerAggregatedView = { @@ -42,6 +45,9 @@ object RawMetricProfilerView { AggMetricsResultSorter.sortSqlAgg(aggMetricsResults.sqlAggs), AggMetricsResultSorter.sortIO(aggMetricsResults.ioAggs), AggMetricsResultSorter.sortSqlDurationAgg(aggMetricsResults.sqlDurAggs), - AggMetricsResultSorter.sortStageDiagnostics(aggMetricsResults.stageDiagnostics)) + AggMetricsResultSorter.sortStageDiagnostics(aggMetricsResults.stageDiagnostics), + aggMetricsResults.gpuStageAggs.sortBy(r => (r.stageId, r.metricName)), + aggMetricsResults.gpuSqlAggs.sortBy(r => (r.sqlId, r.metricName)), + aggMetricsResults.gpuAppAggs.sortBy(_.metricName)) } } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/views/package.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/views/package.scala index 7251a4090..e17d98efc 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/views/package.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/views/package.scala @@ -31,6 +31,9 @@ package object views { val STAGE_DIAGNOSTICS_LABEL = "Stage Level Diagnostic Metrics" val APP_LEVEL_RECOMMENDATION_SIGNALS = "App Level Recommendation Signals" val CLUSTER_INFORMATION_LABEL = "Cluster Information" + val GPU_STAGE_AGG_LABEL = "GPU stage level aggregated task metrics" + val GPU_SQL_AGG_LABEL = "GPU SQL level aggregated task metrics" + val GPU_APP_AGG_LABEL = "GPU app level aggregated task metrics" val AGG_DESCRIPTION = Map( STAGE_AGG_LABEL -> "Stage metrics", @@ -39,6 +42,9 @@ package object views { IO_LABEL -> "IO Metrics per SQL", SQL_DUR_LABEL -> "Total duration and CPUTime per SQL", TASK_SHUFFLE_SKEW -> - "(When task's Shuffle Read Size > 3 * Avg Stage-level size)" + "(When task's Shuffle Read Size > 3 * Avg Stage-level size)", + GPU_STAGE_AGG_LABEL -> "GPU task metrics per stage", + GPU_SQL_AGG_LABEL -> "GPU task metrics per SQL", + GPU_APP_AGG_LABEL -> "GPU task metrics per application" ) } diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/AccumMetaRef.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/AccumMetaRef.scala index 4df1ea64b..12eaad03d 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/AccumMetaRef.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/store/AccumMetaRef.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2024-2025, NVIDIA CORPORATION. + * Copyright (c) 2024-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -45,7 +45,12 @@ object AccumMetaRef { "gpuMaxPageableMemoryBytes", "gpuMaxDeviceMemoryBytes", "gpuMaxHostMemoryBytes", - "gpuMaxPinnedMemoryBytes" + "gpuMaxPinnedMemoryBytes", + "gpuMaxDiskMemoryBytes", + "gpuMaxTaskFootprint", + "gpuOnGpuTasksWaitingGPUMaxCount", + "gpuMaxConcurrentGpuTasks", + "multithreadReaderMaxParallelism" ) val EMPTY_ACCUM_META_REF: AccumMetaRef = new AccumMetaRef(0L, AccumNameRef.EMPTY_ACC_NAME_REF) diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AnalysisSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AnalysisSuite.scala index b7115c0ec..9b89d8a17 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AnalysisSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/AnalysisSuite.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2025, NVIDIA CORPORATION. + * Copyright (c) 2021-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -401,4 +401,105 @@ class AnalysisSuite extends AnyFunSuite { val dataSourceResults = ProfDataSourceView.getRawView(apps.toSeq) assert(dataSourceResults.exists(_.scan_time > 0)) } + + // --------------------------------------------------------------------------- + // GPU task metric aggregations (stage / SQL / app) + // --------------------------------------------------------------------------- + + test("GPU metric aggregation: stage / sql / app rows produced for GPU log") { + val logs = Array(s"$logDir/gpu_oom_eventlog.zstd") + val apps = ToolTestUtils.processProfileApps(logs, sparkSession) + val agg = RawMetricProfilerView.getAggMetrics(apps.toSeq) + assert(agg.gpuStageAggs.nonEmpty, "expected stage-level GPU rows") + assert(agg.gpuSqlAggs.nonEmpty, "expected SQL-level GPU rows") + assert(agg.gpuAppAggs.nonEmpty, "expected app-level GPU rows") + + // Discovery: only gpu* / perfio.s3.* / multithreadReaderMaxParallelism names. + agg.gpuStageAggs.foreach { row => + val n = row.metricName + assert(n.startsWith("gpu") || n.startsWith("perfio.s3.") || + n == "multithreadReaderMaxParallelism", s"unexpected metric name: $n") + } + + // Unit convention is internally consistent. + agg.gpuStageAggs.foreach { row => + val expected = + if (row.metricName.contains("Time") || row.metricName.contains("Wait")) "ms" + else if (row.metricName.contains("Bytes")) "bytes" + else "count" + assert(row.unit == expected, + s"unit mismatch for ${row.metricName}: got ${row.unit}, expected $expected") + } + + // Rollup math: SQL.sum == Σ stage.sum across the SQL's stages, per metric. + val stageRowsByMetric = agg.gpuStageAggs.groupBy(_.metricName) + val sqlRowsByMetric = agg.gpuSqlAggs.groupBy(_.metricName) + sqlRowsByMetric.foreach { case (metric, sqlRows) => + val sqlSum = sqlRows.flatMap(_.sum).sum + val stageSum = stageRowsByMetric.getOrElse(metric, Seq.empty).flatMap(_.sum).sum + assert(sqlSum == stageSum, s"SQL sum mismatch for $metric: $sqlSum vs $stageSum") + val sqlMax = sqlRows.flatMap(_.max) + val stageMax = stageRowsByMetric.getOrElse(metric, Seq.empty).flatMap(_.max) + if (stageMax.nonEmpty) { + assert(sqlMax.nonEmpty && sqlMax.max == stageMax.max, + s"SQL max mismatch for $metric") + } + } + + // App row exactly matches the rollup of all stage rows per metric. + val appByMetric = agg.gpuAppAggs.groupBy(_.metricName).mapValues(_.head) + stageRowsByMetric.foreach { case (metric, stageRows) => + val appRow = appByMetric(metric) + val expectedSum = stageRows.flatMap(_.sum) + assert(appRow.sum.map(s => s == expectedSum.sum).getOrElse(expectedSum.isEmpty), + s"App sum mismatch for $metric") + val expectedMax = stageRows.flatMap(_.max) + assert(appRow.max.map(m => m == expectedMax.max).getOrElse(expectedMax.isEmpty), + s"App max mismatch for $metric") + } + } + + test("GPU metric aggregation: max-aggregated metrics carry only max") { + val logs = Array(s"$logDir/gpu_oom_eventlog.zstd") + val apps = ToolTestUtils.processProfileApps(logs, sparkSession) + val agg = RawMetricProfilerView.getAggMetrics(apps.toSeq) + val maxOnlyNames = Set( + "gpuMaxDeviceMemoryBytes", "gpuMaxHostMemoryBytes", "gpuMaxPageableMemoryBytes", + "gpuMaxPinnedMemoryBytes", "gpuMaxDiskMemoryBytes", "gpuMaxTaskFootprint", + "gpuOnGpuTasksWaitingGPUMaxCount", "gpuMaxConcurrentGpuTasks", + "multithreadReaderMaxParallelism") + val maxRows = agg.gpuStageAggs.filter(r => maxOnlyNames.contains(r.metricName)) ++ + agg.gpuSqlAggs.filter(r => maxOnlyNames.contains(r.metricName)) ++ + agg.gpuAppAggs.filter(r => maxOnlyNames.contains(r.metricName)) + assert(maxRows.nonEmpty, "expected at least one max-aggregated metric row") + maxRows.foreach { r => + val sum = r match { + case s: StageAggGpuMetricsProfileResult => s.sum + case s: SQLAggGpuMetricsProfileResult => s.sum + case s: AppAggGpuMetricsProfileResult => s.sum + } + val avg = r match { + case s: StageAggGpuMetricsProfileResult => s.avg + case s: SQLAggGpuMetricsProfileResult => s.avg + case s: AppAggGpuMetricsProfileResult => s.avg + } + val max = r match { + case s: StageAggGpuMetricsProfileResult => s.max + case s: SQLAggGpuMetricsProfileResult => s.max + case s: AppAggGpuMetricsProfileResult => s.max + } + assert(sum.isEmpty, s"max-only metric should have empty sum: $r") + assert(avg.isEmpty, s"max-only metric should have empty avg: $r") + assert(max.isDefined, s"max-only metric should have a max value: $r") + } + } + + test("GPU metric aggregation: empty for CPU-only event log") { + val logs = Array(s"$qualLogDir/nds_q86_test") + val apps = ToolTestUtils.processProfileApps(logs, sparkSession) + val agg = RawMetricProfilerView.getAggMetrics(apps.toSeq) + assert(agg.gpuStageAggs.isEmpty, "CPU-only log should produce no GPU stage rows") + assert(agg.gpuSqlAggs.isEmpty, "CPU-only log should produce no GPU SQL rows") + assert(agg.gpuAppAggs.isEmpty, "CPU-only log should produce no GPU app rows") + } }