Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 105 additions & 0 deletions core/src/main/resources/configs/reports/coreRawMetricsReport.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -867,6 +867,111 @@ reportDefinitions:
dataType: Long
description: >-
TBD
# GPU task metric aggregations (stage / sql / app)
- label: coreRawGpuStageLevelAggregatedTaskMetricsCSV
description: >-
GPU task metric aggregations at stage level. Long-format: one row per
(stageId, metricName). Auto-discovered from accumulator names starting
with gpu, perfio.s3., or equal to multithreadReaderMaxParallelism.
fileName: gpu_stage_level_aggregated_task_metrics.csv
scope: per-app
columns:
- name: stageId
dataType: Int
description: >-
Stage identifier.
- name: numTasks
dataType: Int
description: >-
Number of tasks in the stage (includes task-attempt duplicates).
- name: metricName
dataType: String
description: >-
GPU accumulator name (e.g. gpuTime, gpuMaxDeviceMemoryBytes).
- name: unit
dataType: String
description: >-
ms for time/wait metrics, bytes for byte metrics, count otherwise.
- name: sum
dataType: Long
description: >-
Total across tasks in the stage. Empty for max-aggregated metrics.
- name: max
dataType: Long
description: >-
Peak per-task value in the stage.
- name: avg
dataType: Long
description: >-
Rolling per-task average. Empty for max-aggregated metrics.
- label: coreRawGpuSqlLevelAggregatedTaskMetricsCSV
description: >-
GPU task metric aggregations at SQL level. Rolls up stage-level GPU
rows across stages in the SQL (via sqlIdToStages). numTasks is
deliberately omitted: it would be a constant per SQL across every
metric row (already in sql_level_aggregated_task_metrics.csv).
fileName: gpu_sql_level_aggregated_task_metrics.csv
scope: per-app
columns:
- name: sqlId
dataType: Long
description: >-
SQL identifier.
- name: metricName
dataType: String
description: >-
GPU accumulator name.
- name: unit
dataType: String
description: >-
ms / bytes / count.
- name: sum
dataType: Long
description: >-
Sum of stage sums. Empty for max-aggregated metrics.
- name: max
dataType: Long
description: >-
Max of stage maxes.
- name: avg
dataType: Long
description: >-
Task-weighted average across stages. Empty for max-aggregated
metrics.
- label: coreRawGpuAppLevelAggregatedTaskMetricsCSV
description: >-
GPU task metric aggregations at application level. One row per
(appId, metricName) for the whole app (rolls up all stage-level
GPU rows).
fileName: gpu_app_level_aggregated_task_metrics.csv
scope: per-app
columns:
- name: appId
dataType: String
description: >-
Application identifier.
- name: metricName
dataType: String
description: >-
GPU accumulator name.
- name: unit
dataType: String
description: >-
ms / bytes / count.
- name: sum
dataType: Long
description: >-
Sum across all stages in the app. Empty for max-aggregated
metrics.
- name: max
dataType: Long
description: >-
Overall peak reading any task produced during the run.
- name: avg
dataType: Long
description: >-
Task-weighted average across all stages. Empty for
max-aggregated metrics.
# AccumProfileResults
- label: coreRawStageLevelAllMetricsCSV
description: >-
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package com.nvidia.spark.rapids.tool.analysis

import com.nvidia.spark.rapids.tool.profiling.{IOAnalysisProfileResult, JobAggTaskMetricsProfileResult, ShuffleSkewProfileResult, SQLDurationExecutorTimeProfileResult, SQLTaskAggMetricsProfileResult, StageAggTaskMetricsProfileResult, StageDiagnosticResult}
import com.nvidia.spark.rapids.tool.profiling.{AppAggGpuMetricsProfileResult, IOAnalysisProfileResult, JobAggTaskMetricsProfileResult, ShuffleSkewProfileResult, SQLAggGpuMetricsProfileResult, SQLDurationExecutorTimeProfileResult, SQLTaskAggMetricsProfileResult, StageAggGpuMetricsProfileResult, StageAggTaskMetricsProfileResult, StageDiagnosticResult}

/**
* The result of the aggregation of the raw metrics. It contains the aggregated metrics for an
Expand All @@ -32,6 +32,9 @@ import com.nvidia.spark.rapids.tool.profiling.{IOAnalysisProfileResult, JobAggTa
* @param ioAggs lists the SQLs along their IO metrics
* @param sqlDurAggs the aggregated duration and CPU time for SQLs
* @param stageDiagnostics the stage level Spark metrics for diagnostic purposes
* @param gpuStageAggs GPU task metric aggregations at stage level
* @param gpuSqlAggs GPU task metric aggregations at SQL level
* @param gpuAppAggs GPU task metric aggregations at app level
*/
case class AggRawMetricsResult(
jobAggs: Seq[JobAggTaskMetricsProfileResult],
Expand All @@ -40,4 +43,7 @@ case class AggRawMetricsResult(
sqlAggs: Seq[SQLTaskAggMetricsProfileResult],
ioAggs: Seq[IOAnalysisProfileResult],
sqlDurAggs: Seq[SQLDurationExecutorTimeProfileResult],
stageDiagnostics: Seq[StageDiagnosticResult])
stageDiagnostics: Seq[StageDiagnosticResult],
gpuStageAggs: Seq[StageAggGpuMetricsProfileResult] = Seq.empty,
gpuSqlAggs: Seq[SQLAggGpuMetricsProfileResult] = Seq.empty,
gpuAppAggs: Seq[AppAggGpuMetricsProfileResult] = Seq.empty)
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,18 @@ trait AppSparkMetricsAggTrait extends AppIndexMapperTrait {
sqlAnalyzer: Option[AppSQLPlanAnalyzer] = None): AggRawMetricsResult = {
val analysisObj = AppSparkMetricsAnalyzer(app)
val sqlMetricsAgg = analysisObj.aggregateSparkMetricsBySql(index)
val gpuStageRows = analysisObj.aggregateGpuMetricsByStage(index)
AggRawMetricsResult(
analysisObj.aggregateSparkMetricsByJob(index),
analysisObj.aggregateSparkMetricsByStage(index),
analysisObj.shuffleSkewCheck(index),
sqlMetricsAgg,
analysisObj.aggregateIOMetricsBySql(sqlMetricsAgg),
analysisObj.aggregateDurationAndCPUTimeBySql(index),
analysisObj.aggregateDiagnosticMetricsByStage(index, sqlAnalyzer))
analysisObj.aggregateDiagnosticMetricsByStage(index, sqlAnalyzer),
gpuStageRows,
analysisObj.aggregateGpuMetricsBySql(index, gpuStageRows),
analysisObj.aggregateGpuMetricsByApp(index, gpuStageRows))
}

/**
Expand All @@ -66,7 +70,10 @@ trait AppSparkMetricsAggTrait extends AppIndexMapperTrait {
agg1.sqlAggs ++ agg2.sqlAggs,
agg1.ioAggs ++ agg2.ioAggs,
agg1.sqlDurAggs ++ agg2.sqlDurAggs,
agg1.stageDiagnostics ++ agg2.stageDiagnostics)
agg1.stageDiagnostics ++ agg2.stageDiagnostics,
agg1.gpuStageAggs ++ agg2.gpuStageAggs,
agg1.gpuSqlAggs ++ agg2.gpuSqlAggs,
agg1.gpuAppAggs ++ agg2.gpuAppAggs)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import com.nvidia.spark.rapids.tool.analysis.util.AggAccumHelper
import com.nvidia.spark.rapids.tool.analysis.util.StageAccumDiagnosticMetrics._
import com.nvidia.spark.rapids.tool.profiling._

import org.apache.spark.internal.Logging
import org.apache.spark.sql.rapids.tool.{AppBase, ToolUtils}
import org.apache.spark.sql.rapids.tool.profiling.ApplicationInfo
import org.apache.spark.sql.rapids.tool.store.AccumMetaRef
Expand All @@ -47,7 +48,7 @@ import org.apache.spark.sql.rapids.tool.store.AccumMetaRef
*
* @param app the AppBase object to analyze
*/
class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) with Logging {
// Hashmap to cache the stage level metrics. It is initialized to None just in case the caller
// does not call methods in order starting with stage level metrics.
private var stageLevelCache:
Expand Down Expand Up @@ -404,6 +405,180 @@ class AppSparkMetricsAnalyzer(app: AppBase) extends AppAnalysisBase(app) {
stageLevelSparkMetrics(index).put(sm.stageInfo.stageId, rowToStore)
}
}

// ---------------------------------------------------------------------------
// GPU task metric aggregations (Stage / SQL / App)
// ---------------------------------------------------------------------------
//
// Discovery convention: an accumulator is a GPU task metric if its name
// - starts with "gpu", OR
// - starts with "perfio.s3.", OR
// - equals "multithreadReaderMaxParallelism".
// Unit convention (from name): contains Time|Wait → ms (raw ns / 1e6);
// contains Bytes → bytes; otherwise → count.
// Max-aggregated metrics are discriminated via AccumMetaRef.isAggregateByMax;
// for those, sum and avg are empty and only max is meaningful.

private def isGpuMetric(name: String): Boolean = {
name.startsWith("gpu") ||
name.startsWith("perfio.s3.") ||
name == "multithreadReaderMaxParallelism"
}

private def unitForMetric(name: String): String = {
if (name.contains("Time") || name.contains("Wait")) "ms"
else if (name.contains("Bytes")) "bytes"
else "count"
}

private def convertValue(name: String, raw: Long): Long = {
if (name.contains("Time") || name.contains("Wait")) raw / 1000000L else raw
}

/**
* Aggregate GPU task accumulators by stage. Emits one row per (stageId,
* metricName). Returns Seq.empty when the app has no GPU metrics, which
* upstream uses to suppress CSV generation.
*/
def aggregateGpuMetricsByStage(index: Int): Seq[StageAggGpuMetricsProfileResult] = {
val gpuAccums = app.accumManager.accumInfoMap.values.filter { ai =>
isGpuMetric(ai.infoRef.getName())
}.toSeq
if (gpuAccums.isEmpty) {
return Seq.empty
}
val stageCache = stageLevelSparkMetrics(index)
val rows = scala.collection.mutable.ArrayBuffer[StageAggGpuMetricsProfileResult]()
gpuAccums.foreach { ai =>
val name = ai.infoRef.getName()
val unit = unitForMetric(name)
val isMax = ai.infoRef.isAggregateByMax
ai.getStageIds.foreach { stageId =>
ai.calculateAccStatsForStage(stageId).foreach { stats =>
// Invariant: stages with GPU accumulators are tracked by stageManager
// and therefore cached. The fallback to 0 is defensive for edge cases
// (e.g. driver-side accumulators) where the stage is absent from the
// task-metrics cache. A 0 here would also exclude the stage from the
// task-weighted avg in rollupGpuRows while sum/max still accumulate,
// so log a warning so the inconsistency is visible.
val numTasks = stageCache.get(stageId).map(_.numTasks).getOrElse {
logWarning(s"GPU accumulator '$name' references stage $stageId which " +
s"is not in the stage-task metrics cache; using numTasks = 0. " +
s"This will be excluded from task-weighted averages at SQL/app level.")
0
}
val (sum, max, avg) = if (isMax) {
Comment thread
parthosa marked this conversation as resolved.
(None: Option[Long],
Some(convertValue(name, stats.max)),
None: Option[Long])
} else {
(Some(convertValue(name, stats.total)),
Some(convertValue(name, stats.max)),
Some(convertValue(name, stats.med)))
}
// Skip rows carrying no signal (both sum and max zero/absent).
val zeroSum = sum.forall(_ == 0L)
val zeroMax = max.forall(_ == 0L)
if (!(zeroSum && zeroMax)) {
rows += StageAggGpuMetricsProfileResult(
stageId = stageId,
numTasks = numTasks,
metricName = name,
unit = unit,
sum = sum,
max = max,
avg = avg)
}
}
}
}
rows.toSeq.sortBy(r => (r.stageId, r.metricName))
}

/**
* Rollup helper: groups stage-level GPU rows by metric name and reduces to
* (unit, sum, max, avg). sum is Σ stage.sum (None for max metrics); max is
* max stage.max; avg is task-weighted Σ(stage.avg * stage.numTasks)
* / Σ stage.numTasks over the stages that recorded the metric. numTasks is
* intentionally not propagated — see SQLAggGpuMetricsProfileResult /
* AppAggGpuMetricsProfileResult docstrings.
*/
private def rollupGpuRows(
rows: Seq[StageAggGpuMetricsProfileResult]
): Seq[(String, String, Option[Long], Option[Long], Option[Long])] = {
rows.groupBy(_.metricName).map { case (metricName, group) =>
val unit = group.head.unit
val sumOpt: Option[Long] = {
val xs = group.flatMap(_.sum)
if (xs.isEmpty) None else Some(xs.sum)
}
val maxOpt: Option[Long] = {
val xs = group.flatMap(_.max)
if (xs.isEmpty) None else Some(xs.max)
}
val avgOpt: Option[Long] = {
val weighted = group.flatMap { r => r.avg.map(a => (a, r.numTasks)) }
val weightTasks = weighted.map(_._2).sum
if (weighted.isEmpty || weightTasks == 0) {
None
} else {
Some(weighted.map { case (a, n) => a * n }.sum / weightTasks)
}
}
(metricName, unit, sumOpt, maxOpt, avgOpt)
}.toSeq
}

/**
* Aggregate GPU task metrics by SQL. Rolls up stage-level rows using
* app.sqlIdToStages. One row per (sqlId, metricName).
*/
def aggregateGpuMetricsBySql(
index: Int,
stageRows: Seq[StageAggGpuMetricsProfileResult]
): Seq[SQLAggGpuMetricsProfileResult] = {
if (stageRows.isEmpty) {
return Seq.empty
}
val stageMap: Map[Int, Seq[StageAggGpuMetricsProfileResult]] =
stageRows.groupBy(_.stageId)
app.sqlIdToStages.toSeq.flatMap { case (sqlId, stageIds) =>
val rowsForSql: Seq[StageAggGpuMetricsProfileResult] =
stageIds.toSeq.flatMap(s => stageMap.getOrElse(s, Seq.empty))
rollupGpuRows(rowsForSql).map { case (metric, unit, sum, max, avg) =>
SQLAggGpuMetricsProfileResult(
sqlId = sqlId,
metricName = metric,
unit = unit,
sum = sum,
max = max,
avg = avg)
}
}.sortBy(r => (r.sqlId, r.metricName))
}

/**
* Aggregate GPU task metrics across the whole application. One row per
* metricName. For max-aggregated metrics this gives the peak reading any
* task produced during the run.
*/
def aggregateGpuMetricsByApp(
index: Int,
stageRows: Seq[StageAggGpuMetricsProfileResult]
): Seq[AppAggGpuMetricsProfileResult] = {
if (stageRows.isEmpty) {
return Seq.empty
}
rollupGpuRows(stageRows).map { case (metric, unit, sum, max, avg) =>
AppAggGpuMetricsProfileResult(
appId = app.appId,
metricName = metric,
unit = unit,
sum = sum,
max = max,
avg = avg)
}.sortBy(_.metricName)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ case class ApplicationSummaryInfo(
sparkRapidsBuildInfo: Seq[SparkRapidsBuildInfoEvent],
writeOpsInfo: Seq[WriteOpProfileResult],
sqlPlanInfo: Seq[SQLPlanInfoProfileResult],
appLevelRecommendationSignals: Seq[AppLevelRecommendationSignalsProfileResult] = Seq.empty)
appLevelRecommendationSignals: Seq[AppLevelRecommendationSignalsProfileResult] = Seq.empty,
gpuStageAggMetrics: Seq[StageAggGpuMetricsProfileResult] = Seq.empty,
gpuSqlAggMetrics: Seq[SQLAggGpuMetricsProfileResult] = Seq.empty,
gpuAppAggMetrics: Seq[AppAggGpuMetricsProfileResult] = Seq.empty)

trait AppInfoPropertyGetter {
// returns all the properties (i.e., spark)
Expand Down
Loading