Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -780,6 +780,14 @@ abstract class Platform(var gpuDevice: Option[GpuDevice],
targetCluster.exists(_.getSparkProperties.excludePropertiesSet.contains(propertyKey))
}

/**
* Check if the property has a user override from the target cluster — either an
* `enforced` value or a `preserve` directive.
*/
final def isPropertyUserOverridden(propertyKey: String): Boolean = {
getUserEnforcedSparkProperty(propertyKey).isDefined || isPropertyPreserved(propertyKey)
}

/**
* Set the default driver node type for the recommended cluster if it is missing.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,31 @@ abstract class AutoTuner(
configProvider.getEntry("GPU_MEM_PER_TASK").getDefaultAsMemory(ByteUnit.MiB)))
}

/**
* Extracts the unique RAPIDS plugin jar version from the application's classpath
* entries. Returns None if no version (or more than one distinct version) is found.
*/
private def getRapidsPluginJarVersion: Option[String] = {
appInfoProvider.getRapidsJars
.flatMap(autoTunerHelper.pluginJarRegEx.findAllMatchIn(_).map(_.group(1)))
.distinct match {
case Seq(ver) => Some(ver)
case _ => None
}
}

/**
* Returns true when the application uses a RAPIDS plugin version that already
* auto-tunes `spark.rapids.sql.concurrentGpuTasks` at runtime, in which case
* the AutoTuner should drop its recommendation for that property.
* Reference: https://github.com/NVIDIA/spark-rapids/pull/12374
*/
private def isConcurrentGpuTasksAutoTunedByPlugin: Boolean = {
getRapidsPluginJarVersion.exists { jarVer =>
ToolUtils.compareVersions(jarVer, autoTunerHelper.pluginVersionAutoConcurrentGpuTasks) >= 0
}
}
Comment on lines +647 to +651
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 compareVersions returns 0 on failure, silently skipping recommendation

ToolUtils.compareVersions catches any exception and returns 0 (treating the two versions as equal). Because the check is >= 0, a comparison failure is interpreted as "version is at the threshold", and the recommendation is incorrectly dropped. The pluginVersionAutoConcurrentGpuTasks constant is well-formed so this is very unlikely in practice, but a defensive fallback would make the intent explicit and prevent silent misbehavior on unusual version strings.


/**
* Recommendation for initial heap size based on certain amount of memory per core.
* Note that we will later reduce this if needed for off heap memory.
Expand Down Expand Up @@ -1186,8 +1211,18 @@ abstract class AutoTuner(
recommendExecutorResourceGpuProps()
appendRecommendation("spark.task.resource.gpu.amount",
configProvider.getEntry("TASK_GPU_RESOURCE_AMT").getDefault.toDouble)
appendRecommendation("spark.rapids.sql.concurrentGpuTasks",
calcGpuConcTasks().toInt)
val concGpuTasksKey = "spark.rapids.sql.concurrentGpuTasks"
// Target cluster `enforced` and `preserve` overrides take precedence; only drop the
// recommendation when neither is set and the plugin already auto-tunes it.
if (!platform.isPropertyUserOverridden(concGpuTasksKey) &&
isConcurrentGpuTasksAutoTunedByPlugin) {
// Plugin version auto-tunes concurrent GPU tasks based on memory usage,
// so suppress the AutoTuner recommendation and the corresponding missing comment.
// Reference: https://github.com/NVIDIA/spark-rapids/pull/12374
skippedRecommendations += concGpuTasksKey
} else {
appendRecommendation(concGpuTasksKey, calcGpuConcTasks())
}
val execCores = platform.recommendedClusterInfo.map(_.coresPerExecutor).getOrElse(1)
val availableMemPerExec =
platform.recommendedWorkerNode.map(_.getMemoryPerExec).getOrElse(0.0)
Expand Down Expand Up @@ -2236,6 +2271,10 @@ trait AutoTunerHelper extends Logging {
def recommendedClusterSizingStrategy(platform: Platform): ClusterSizingStrategy
// the plugin jar is in the form of rapids-4-spark_scala_binary-(version)-*.jar
lazy val pluginJarRegEx: Regex = "rapids-4-spark_\\d\\.\\d+-(\\d{2}\\.\\d{2}\\.\\d+).*\\.jar".r
// Starting with this plugin version, the RAPIDS plugin auto-tunes the number of
// concurrent GPU tasks based on memory usage (see spark-rapids#12374), so the
// AutoTuner should no longer recommend `spark.rapids.sql.concurrentGpuTasks`.
lazy val pluginVersionAutoConcurrentGpuTasks: String = "25.06.0"
lazy val gpuKryoRegistratorClassName = "com.nvidia.spark.rapids.GpuKryoRegistrator"
lazy val rapidsPluginClassName = "com.nvidia.spark.SQLPlugin"
lazy val kubernetesGpuVendor = "nvidia.com"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1600,6 +1600,75 @@ class ProfilingAutoTunerSuite extends ProfilingAutoTunerSuiteBase {
compareOutput(expectedResults, autoTunerOutput)
}

// Helper that runs the AutoTuner without pre-setting `spark.rapids.sql.concurrentGpuTasks`
// so the default recommendation path is exercised. Returns the AutoTuner output string.
private def runConcurrentGpuTasksScenario(
rapidsJars: Seq[String],
enforcedProps: Map[String, String] = Map.empty,
preserveProps: List[String] = List.empty): String = {
val customProps = mutable.LinkedHashMap(
"spark.executor.cores" -> "16",
"spark.executor.memory" -> "122880MiB",
"spark.executor.memoryOverhead" -> "8396m",
"spark.rapids.memory.pinnedPool.size" -> "4096m",
"spark.rapids.shuffle.multiThreaded.reader.threads" -> "16",
"spark.rapids.shuffle.multiThreaded.writer.threads" -> "16",
"spark.rapids.sql.multiThreadedRead.numThreads" -> "20",
"spark.shuffle.manager" ->
s"com.nvidia.spark.rapids.spark$testSmVersion.RapidsShuffleManager",
"spark.sql.files.maxPartitionBytes" -> "512m",
"spark.task.resource.gpu.amount" -> "0.001")
val sparkProps = defaultDataprocProps.++(customProps)
val platform = if (enforcedProps.nonEmpty || preserveProps.nonEmpty) {
val targetClusterInfo = ToolTestUtils.buildTargetClusterInfo(
enforcedSparkProperties = enforcedProps,
preserveSparkProperties = preserveProps
)
PlatformFactory.createInstance(PlatformNames.DATAPROC, Some(targetClusterInfo))
} else {
PlatformFactory.createInstance(PlatformNames.DATAPROC)
}
configureEventLogClusterInfoForTest(
platform,
numCores = 32,
numWorkers = 4,
gpuCount = 2,
sparkProperties = sparkProps.toMap
)
val autoTuner =
buildAutoTunerForTests(getGpuAppMockInfoProvider(
propsFromLog = sparkProps,
rapidsJars = rapidsJars), platform)
val (properties, comments) = autoTuner.getRecommendedProperties()
Profiler.getAutoTunerResultsAsString(properties, comments)
}

test("AutoTuner drops concurrentGpuTasks recommendation for plugin >= 25.06") {
val output = runConcurrentGpuTasksScenario(Seq("rapids-4-spark_2.12-25.06.0.jar"))
assert(!output.contains("spark.rapids.sql.concurrentGpuTasks"),
s"Expected no concurrentGpuTasks recommendation/comment, got:\n$output")
}

test("AutoTuner keeps concurrentGpuTasks recommendation for plugin < 25.06") {
val output = runConcurrentGpuTasksScenario(Seq("rapids-4-spark_2.12-25.04.0.jar"))
assert(output.contains("spark.rapids.sql.concurrentGpuTasks"),
s"Expected concurrentGpuTasks to be present, got:\n$output")
}

test("AutoTuner keeps concurrentGpuTasks recommendation when no plugin jar version found") {
val output = runConcurrentGpuTasksScenario(Seq.empty)
assert(output.contains("spark.rapids.sql.concurrentGpuTasks"),
s"Expected concurrentGpuTasks to be present, got:\n$output")
}

test("Target cluster enforced concurrentGpuTasks overrides plugin >= 25.06 drop") {
val output = runConcurrentGpuTasksScenario(
Seq("rapids-4-spark_2.12-25.08.0.jar"),
enforcedProps = Map("spark.rapids.sql.concurrentGpuTasks" -> "4"))
assert(output.contains("spark.rapids.sql.concurrentGpuTasks=4"),
s"Expected enforced concurrentGpuTasks=4 to be present, got:\n$output")
}

test("No recommendation when the jar pluginJar is up-to-date") {
Comment on lines +1667 to 1672
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Missing test for preserve override path

isPropertyUserOverridden has two branches — getUserEnforcedSparkProperty (enforced) and isPropertyPreserved (preserve) — but only the enforced branch is exercised by the new tests. The runConcurrentGpuTasksScenario helper already accepts preserveProps, so a fifth test can close this gap:

test("Target cluster preserve concurrentGpuTasks overrides plugin >= 25.06 drop") {
  val output = runConcurrentGpuTasksScenario(
    Seq("rapids-4-spark_2.12-25.08.0.jar"),
    preserveProps = List("spark.rapids.sql.concurrentGpuTasks"))
  assert(output.contains("spark.rapids.sql.concurrentGpuTasks"),
    s"Expected preserved concurrentGpuTasks to be present, got:\n$output")
}

// 1. Pull the latest release from mvn.
// 2. The Autotuner finds tha the jar version is latest. No comments should be added
Expand Down