diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/Platform.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/Platform.scala index 054307537..9458d33c0 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/Platform.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/Platform.scala @@ -780,6 +780,14 @@ abstract class Platform(var gpuDevice: Option[GpuDevice], targetCluster.exists(_.getSparkProperties.excludePropertiesSet.contains(propertyKey)) } + /** + * Check if the property has a user override from the target cluster — either an + * `enforced` value or a `preserve` directive. + */ + final def isPropertyUserOverridden(propertyKey: String): Boolean = { + getUserEnforcedSparkProperty(propertyKey).isDefined || isPropertyPreserved(propertyKey) + } + /** * Set the default driver node type for the recommended cluster if it is missing. */ diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/tuning/AutoTuner.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/tuning/AutoTuner.scala index 9cf961620..d6dbdb92f 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/tuning/AutoTuner.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/tuning/AutoTuner.scala @@ -625,6 +625,32 @@ abstract class AutoTuner( configProvider.getEntry("GPU_MEM_PER_TASK").getDefaultAsMemory(ByteUnit.MiB))) } + /** + * Extracts the unique RAPIDS plugin jar version from the application's classpath + * entries. Returns None if no version (or more than one distinct version) is found. + */ + private def getRapidsPluginJarVersion: Option[String] = { + appInfoProvider.getRapidsJars + .flatMap(autoTunerHelper.pluginJarRegEx.findAllMatchIn(_).map(_.group(1))) + .distinct match { + case Seq(ver) => Some(ver) + case _ => None + } + } + + /** + * Returns true when the application uses a RAPIDS plugin version that already + * auto-tunes `spark.rapids.sql.concurrentGpuTasks` at runtime, in which case + * the AutoTuner should drop its recommendation for that property. + * Reference: https://github.com/NVIDIA/spark-rapids/pull/12374 + */ + private def isConcurrentGpuTasksAutoTunedByPlugin: Boolean = { + getRapidsPluginJarVersion.exists { jarVer => + ToolUtils.compareVersions(jarVer, autoTunerHelper.pluginVersionAutoConcurrentGpuTasks) + .exists(_ >= 0) + } + } + /** * Recommendation for initial heap size based on certain amount of memory per core. * Note that we will later reduce this if needed for off heap memory. @@ -1186,8 +1212,18 @@ abstract class AutoTuner( recommendExecutorResourceGpuProps() appendRecommendation("spark.task.resource.gpu.amount", configProvider.getEntry("TASK_GPU_RESOURCE_AMT").getDefault.toDouble) - appendRecommendation("spark.rapids.sql.concurrentGpuTasks", - calcGpuConcTasks().toInt) + val concGpuTasksKey = "spark.rapids.sql.concurrentGpuTasks" + // Target cluster `enforced` and `preserve` overrides take precedence; only drop the + // recommendation when neither is set and the plugin already auto-tunes it. + if (!platform.isPropertyUserOverridden(concGpuTasksKey) && + isConcurrentGpuTasksAutoTunedByPlugin) { + // Plugin version auto-tunes concurrent GPU tasks based on memory usage, + // so suppress the AutoTuner recommendation and the corresponding missing comment. + // Reference: https://github.com/NVIDIA/spark-rapids/pull/12374 + skippedRecommendations += concGpuTasksKey + } else { + appendRecommendation(concGpuTasksKey, calcGpuConcTasks()) + } val execCores = platform.recommendedClusterInfo.map(_.coresPerExecutor).getOrElse(1) val availableMemPerExec = platform.recommendedWorkerNode.map(_.getMemoryPerExec).getOrElse(0.0) @@ -1524,7 +1560,7 @@ abstract class AutoTuner( val latestPluginVersion = WebCrawlerUtil.getLatestPluginRelease latestPluginVersion match { case Some(ver) => - if (ToolUtils.compareVersions(jarVer, ver) < 0) { + if (ToolUtils.compareVersions(jarVer, ver).exists(_ < 0)) { val jarURL = WebCrawlerUtil.getPluginMvnDownloadLink(ver) appendComment( "A newer RAPIDS Accelerator for Apache Spark plugin is available:\n" + @@ -2236,6 +2272,10 @@ trait AutoTunerHelper extends Logging { def recommendedClusterSizingStrategy(platform: Platform): ClusterSizingStrategy // the plugin jar is in the form of rapids-4-spark_scala_binary-(version)-*.jar lazy val pluginJarRegEx: Regex = "rapids-4-spark_\\d\\.\\d+-(\\d{2}\\.\\d{2}\\.\\d+).*\\.jar".r + // Starting with this plugin version, the RAPIDS plugin auto-tunes the number of + // concurrent GPU tasks based on memory usage (see spark-rapids#12374), so the + // AutoTuner should no longer recommend `spark.rapids.sql.concurrentGpuTasks`. + lazy val pluginVersionAutoConcurrentGpuTasks: String = "25.06.0" lazy val gpuKryoRegistratorClassName = "com.nvidia.spark.rapids.GpuKryoRegistrator" lazy val rapidsPluginClassName = "com.nvidia.spark.SQLPlugin" lazy val kubernetesGpuVendor = "nvidia.com" diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala index 87eb55b0f..d4bfa2808 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/ToolUtils.scala @@ -68,21 +68,28 @@ object ToolUtils extends Logging { org.apache.spark.SPARK_VERSION } - def compareVersions(verA: String, verB: String): Int = { + /** + * Compare two version strings using + * [[org.apache.maven.artifact.versioning.ComparableVersion#compareTo]]. The wrapped + * `Int` follows the standard `compareTo` sign convention (negative / zero / positive). + * Returns `None` if either string fails to parse, so callers cannot confuse a parse + * failure with version equality. + */ + def compareVersions(verA: String, verB: String): Option[Int] = { Try { val verObjA = new ComparableVersion(verA) val verObjB = new ComparableVersion(verB) verObjA.compareTo(verObjB) } match { - case Success(compRes) => compRes + case Success(compRes) => Some(compRes) case Failure(t) => logError(s"exception comparing two versions [$verA, $verB]", t) - 0 + None } } def runtimeIsSparkVersion(refVersion: String): Boolean = { - compareVersions(refVersion, sparkRuntimeVersion) == 0 + compareVersions(refVersion, sparkRuntimeVersion).contains(0) } private def compareToSparkVersion(currVersion: String, lookupVersion: String): Int = { diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/tuning/ProfilingAutoTunerSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/tuning/ProfilingAutoTunerSuite.scala index 24583ea60..809a52b72 100644 --- a/core/src/test/scala/com/nvidia/spark/rapids/tool/tuning/ProfilingAutoTunerSuite.scala +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/tuning/ProfilingAutoTunerSuite.scala @@ -1600,6 +1600,83 @@ class ProfilingAutoTunerSuite extends ProfilingAutoTunerSuiteBase { compareOutput(expectedResults, autoTunerOutput) } + // Helper that runs the AutoTuner without pre-setting `spark.rapids.sql.concurrentGpuTasks` + // so the default recommendation path is exercised. Returns the AutoTuner output string. + private def runConcurrentGpuTasksScenario( + rapidsJars: Seq[String], + enforcedProps: Map[String, String] = Map.empty, + preserveProps: List[String] = List.empty): String = { + val customProps = mutable.LinkedHashMap( + "spark.executor.cores" -> "16", + "spark.executor.memory" -> "122880MiB", + "spark.executor.memoryOverhead" -> "8396m", + "spark.rapids.memory.pinnedPool.size" -> "4096m", + "spark.rapids.shuffle.multiThreaded.reader.threads" -> "16", + "spark.rapids.shuffle.multiThreaded.writer.threads" -> "16", + "spark.rapids.sql.multiThreadedRead.numThreads" -> "20", + "spark.shuffle.manager" -> + s"com.nvidia.spark.rapids.spark$testSmVersion.RapidsShuffleManager", + "spark.sql.files.maxPartitionBytes" -> "512m", + "spark.task.resource.gpu.amount" -> "0.001") + val sparkProps = defaultDataprocProps.++(customProps) + val platform = if (enforcedProps.nonEmpty || preserveProps.nonEmpty) { + val targetClusterInfo = ToolTestUtils.buildTargetClusterInfo( + enforcedSparkProperties = enforcedProps, + preserveSparkProperties = preserveProps + ) + PlatformFactory.createInstance(PlatformNames.DATAPROC, Some(targetClusterInfo)) + } else { + PlatformFactory.createInstance(PlatformNames.DATAPROC) + } + configureEventLogClusterInfoForTest( + platform, + numCores = 32, + numWorkers = 4, + gpuCount = 2, + sparkProperties = sparkProps.toMap + ) + val autoTuner = + buildAutoTunerForTests(getGpuAppMockInfoProvider( + propsFromLog = sparkProps, + rapidsJars = rapidsJars), platform) + val (properties, comments) = autoTuner.getRecommendedProperties() + Profiler.getAutoTunerResultsAsString(properties, comments) + } + + test("AutoTuner drops concurrentGpuTasks recommendation for plugin >= 25.06") { + val output = runConcurrentGpuTasksScenario(Seq("rapids-4-spark_2.12-25.06.0.jar")) + assert(!output.contains("spark.rapids.sql.concurrentGpuTasks"), + s"Expected no concurrentGpuTasks recommendation/comment, got:\n$output") + } + + test("AutoTuner keeps concurrentGpuTasks recommendation for plugin < 25.06") { + val output = runConcurrentGpuTasksScenario(Seq("rapids-4-spark_2.12-25.04.0.jar")) + assert(output.contains("spark.rapids.sql.concurrentGpuTasks"), + s"Expected concurrentGpuTasks to be present, got:\n$output") + } + + test("AutoTuner keeps concurrentGpuTasks recommendation when no plugin jar version found") { + val output = runConcurrentGpuTasksScenario(Seq.empty) + assert(output.contains("spark.rapids.sql.concurrentGpuTasks"), + s"Expected concurrentGpuTasks to be present, got:\n$output") + } + + test("Target cluster enforced concurrentGpuTasks overrides plugin >= 25.06 drop") { + val output = runConcurrentGpuTasksScenario( + Seq("rapids-4-spark_2.12-25.08.0.jar"), + enforcedProps = Map("spark.rapids.sql.concurrentGpuTasks" -> "4")) + assert(output.contains("spark.rapids.sql.concurrentGpuTasks=4"), + s"Expected enforced concurrentGpuTasks=4 to be present, got:\n$output") + } + + test("Target cluster preserve concurrentGpuTasks overrides plugin >= 25.06 drop") { + val output = runConcurrentGpuTasksScenario( + Seq("rapids-4-spark_2.12-25.08.0.jar"), + preserveProps = List("spark.rapids.sql.concurrentGpuTasks")) + assert(output.contains("spark.rapids.sql.concurrentGpuTasks"), + s"Expected preserved concurrentGpuTasks to be present, got:\n$output") + } + test("No recommendation when the jar pluginJar is up-to-date") { // 1. Pull the latest release from mvn. // 2. The Autotuner finds tha the jar version is latest. No comments should be added