diff --git a/core/src/main/resources/configs/reports/connectReport.yaml b/core/src/main/resources/configs/reports/connectReport.yaml new file mode 100644 index 000000000..dc7314de8 --- /dev/null +++ b/core/src/main/resources/configs/reports/connectReport.yaml @@ -0,0 +1,121 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Spark Connect session / operation report written per application by both the +# profiling and qualification tools. + +reportDefinitions: + - reportId: connectReport + description: >- + Per-application Spark Connect session and operation metadata plus a + directory of per-operation protobuf-text statement payloads. + scope: per-app + tableDefinitions: + - label: connectSessions + description: >- + One row per Spark Connect session observed in the event log. + fileName: connect_sessions.csv + scope: per-app + columns: + - name: appID + dataType: String + description: Application ID. + - name: sessionId + dataType: String + description: UUID of the Spark Connect session. + - name: userId + dataType: String + description: User who owns the session. + - name: startTime + dataType: Long + description: Epoch millis when the session started. + - name: endTime + dataType: Long + description: Epoch millis when the session closed, if observed. + - name: durationMs + dataType: Long + description: Session duration in milliseconds, or -1 if still open. + - name: operationCount + dataType: Long + description: Number of operations observed in the session. + - label: connectOperations + description: >- + One row per Spark Connect operation with core lifecycle timestamps, + linked SQL/job identifiers, and statement sidecar metadata. + fileName: connect_operations.csv + scope: per-app + columns: + - name: appID + dataType: String + description: Application ID. + - name: operationId + dataType: String + description: UUID of the Spark Connect operation. + - name: sessionId + dataType: String + description: Session UUID that owns the operation. + - name: userId + dataType: String + description: User who submitted the operation. + - name: jobTag + dataType: String + description: Correlation tag shared with SQLExecutionStart and JobStart events. + - name: startTime + dataType: Long + description: Epoch millis when the operation started. + - name: finishTime + dataType: Long + description: Epoch millis when execution completed, if observed. + - name: closeTime + dataType: Long + description: Epoch millis when the operation closed, if observed. + - name: failTime + dataType: Long + description: Epoch millis when the operation failed, if observed. + - name: cancelTime + dataType: Long + description: Epoch millis when the operation was canceled, if observed. + - name: durationMs + dataType: Long + description: Derived end-to-end operation duration in milliseconds. + - name: status + dataType: String + description: Derived operation status (RUNNING, SUCCEEDED, FAILED, CANCELED). + - name: errorMessage + dataType: String + description: Error message reported by a failure event, if present. + - name: sqlIds + dataType: String + description: Semicolon-separated SQL execution IDs correlated to the operation. + - name: jobIds + dataType: String + description: Semicolon-separated Spark job IDs correlated to the operation. + - name: statementFile + dataType: String + description: Sidecar filename under connect_statements/ for this operation, if written. + - name: statementTruncated + dataType: Boolean + description: >- + True when statementText includes Spark's textual truncation + marker. This does not detect Spark 4.x depth-based subtree + elision, which collapses nested structures to {} without a + marker. + - label: connectStatements + description: >- + Directory of per-operation statementText sidecars. Each + .txt file contains the protobuf debug-format text of the + Connect operation statement when Connect statement sidecars are enabled. + fileName: connect_statements + fileFormat: DIRECTORY + scope: per-app diff --git a/core/src/main/resources/configs/reports/profCoreReport.yaml b/core/src/main/resources/configs/reports/profCoreReport.yaml index c551036ea..9d9820b8b 100644 --- a/core/src/main/resources/configs/reports/profCoreReport.yaml +++ b/core/src/main/resources/configs/reports/profCoreReport.yaml @@ -1,4 +1,4 @@ -# Copyright (c) 2025, NVIDIA CORPORATION. +# Copyright (c) 2025-2026, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -32,6 +32,7 @@ reportDefinitions: scope: global nestedReports: - reportId: coreRawMetrics + - reportId: connectReport tableDefinitions: # AppStatusResult - label: coreCSVStatus diff --git a/core/src/main/resources/configs/reports/qualCoreReport.yaml b/core/src/main/resources/configs/reports/qualCoreReport.yaml index e9c383b6d..b82d39170 100644 --- a/core/src/main/resources/configs/reports/qualCoreReport.yaml +++ b/core/src/main/resources/configs/reports/qualCoreReport.yaml @@ -64,6 +64,8 @@ reportDefinitions: relativePath: qual_metrics - reportId: coreRawMetrics relativePath: raw_metrics + - reportId: connectReport + relativePath: raw_metrics - reportId: qualTuningApps relativePath: tuning_apps tableDefinitions: diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ConnectProfileResults.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ConnectProfileResults.scala new file mode 100644 index 000000000..1ef0e3e4f --- /dev/null +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ConnectProfileResults.scala @@ -0,0 +1,199 @@ +/* + * Copyright (c) 2026, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.tool.profiling + +import com.nvidia.spark.rapids.tool.views.OutHeaderRegistry + +import org.apache.spark.sql.rapids.tool.util.StringUtils + +/** + * CSV row for a Spark Connect session. Serializes the lifecycle metadata for a + * single session into the columns registered under + * `ConnectSessionProfileResult` in [[com.nvidia.spark.rapids.tool.views.OutHeaderRegistry]]. + * + * `durationMs` is `endTime - startTime` when `endTime` is defined, else `-1` + * (matches the convention used for open/unfinished sessions in other result + * classes). + */ +case class ConnectSessionProfileResult( + appId: String, + sessionId: String, + userId: String, + startTime: Long, + endTime: Option[Long], + operationCount: Long) extends ProfileResult { + + override def outputHeaders: Array[String] = { + OutHeaderRegistry.outputHeaders("ConnectSessionProfileResult") + } + + override def convertToSeq(): Array[String] = { + Array( + appId, + sessionId, + userId, + startTime.toString, + endTime.map(_.toString).orNull, + endTime.map(e => (e - startTime).toString).getOrElse("-1"), + operationCount.toString) + } + + override def convertToCSVSeq(): Array[String] = { + Array( + StringUtils.reformatCSVString(appId), + StringUtils.reformatCSVString(sessionId), + StringUtils.reformatCSVString(userId), + startTime.toString, + endTime.map(_.toString).orNull, + endTime.map(e => (e - startTime).toString).getOrElse("-1"), + operationCount.toString) + } +} + +/** + * CSV row for a single Spark Connect operation. Captures the core lifecycle + * (start/finish/close/fail/cancel timestamps), derived status, error message, + * and the joined sqlIDs/jobIDs. Also records the sidecar basename for the + * `connect_statements/.txt` artifact. + * + * sqlIds and jobIds are serialized semicolon-separated to keep the CSV + * single-column and avoid quoting issues. + */ +case class ConnectOperationProfileResult( + appId: String, + operationId: String, + sessionId: String, + userId: String, + jobTag: String, + startTime: Long, + finishTime: Option[Long], + closeTime: Option[Long], + failTime: Option[Long], + cancelTime: Option[Long], + durationMs: Long, + status: String, + errorMessage: Option[String], + sqlIds: Seq[Long], + jobIds: Seq[Int], + statementFile: Option[String], + statementTruncated: Boolean) extends ProfileResult { + + override def outputHeaders: Array[String] = { + OutHeaderRegistry.outputHeaders("ConnectOperationProfileResult") + } + + override def convertToSeq(): Array[String] = { + Array( + appId, + operationId, + sessionId, + userId, + jobTag, + startTime.toString, + finishTime.map(_.toString).orNull, + closeTime.map(_.toString).orNull, + failTime.map(_.toString).orNull, + cancelTime.map(_.toString).orNull, + durationMs.toString, + status, + errorMessage.getOrElse(""), + sqlIds.mkString(";"), + jobIds.mkString(";"), + statementFile.getOrElse(""), + statementTruncated.toString) + } + + override def convertToCSVSeq(): Array[String] = { + Array( + StringUtils.reformatCSVString(appId), + StringUtils.reformatCSVString(operationId), + StringUtils.reformatCSVString(sessionId), + StringUtils.reformatCSVString(userId), + StringUtils.reformatCSVString(jobTag), + startTime.toString, + finishTime.map(_.toString).orNull, + closeTime.map(_.toString).orNull, + failTime.map(_.toString).orNull, + cancelTime.map(_.toString).orNull, + durationMs.toString, + StringUtils.reformatCSVString(status), + StringUtils.reformatCSVString(errorMessage.getOrElse("")), + StringUtils.reformatCSVString(sqlIds.mkString(";")), + StringUtils.reformatCSVString(jobIds.mkString(";")), + StringUtils.reformatCSVString(statementFile.getOrElse("")), + statementTruncated.toString) + } +} + +object ConnectOperationProfileResult { + + /** + * Marker substring embedded by the Spark Connect server-side abbreviator when + * a statement/plan text exceeds its configured limit. Presence of this marker + * in `statementText` indicates the artifact we persist is a truncated + * representation of the original plan. + * + * Note: Spark 4.x also performs depth-based structural elision by collapsing + * subtrees to `{}` once the protobuf text formatter exceeds its nesting cap. + * Those cases do not emit a textual marker, so `statementTruncated` is + * intentionally limited to marker-based truncation only. + */ + private[profiling] val TruncationMarker: String = "[truncated(size=" + + /** + * Derives operation status from the observed lifecycle timestamps. + * Priority: CANCELED -> FAILED -> SUCCEEDED -> RUNNING. + * CANCELED precedes FAILED because server-side cancellation sometimes + * surfaces a trailing failure event we should not misattribute. + */ + private def deriveStatus(op: ConnectOperationInfo): String = { + if (op.cancelTime.isDefined) "CANCELED" + else if (op.failTime.isDefined) "FAILED" + else if (op.finishTime.isDefined || op.closeTime.isDefined) "SUCCEEDED" + else "RUNNING" + } + + def from( + appId: String, + op: ConnectOperationInfo, + sqlIds: Seq[Long], + jobIds: Seq[Int], + statementFile: Option[String]): ConnectOperationProfileResult = { + val endForDuration = + op.closeTime.orElse(op.finishTime).orElse(op.failTime).orElse(op.cancelTime) + val durationMs = endForDuration.map(_ - op.startTime).getOrElse(-1L) + val statementTruncated = op.statementText.contains(TruncationMarker) + ConnectOperationProfileResult( + appId = appId, + operationId = op.operationId, + sessionId = op.sessionId, + userId = op.userId, + jobTag = op.jobTag, + startTime = op.startTime, + finishTime = op.finishTime, + closeTime = op.closeTime, + failTime = op.failTime, + cancelTime = op.cancelTime, + durationMs = durationMs, + status = deriveStatus(op), + errorMessage = op.errorMessage, + sqlIds = sqlIds, + jobIds = jobIds, + statementFile = statementFile, + statementTruncated = statementTruncated) + } +} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ConnectStatementWriter.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ConnectStatementWriter.scala new file mode 100644 index 000000000..1560eeb4a --- /dev/null +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ConnectStatementWriter.scala @@ -0,0 +1,102 @@ +/* + * Copyright (c) 2026, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.tool.profiling + +import com.nvidia.spark.rapids.tool.ToolTextFileWriter +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path + +import org.apache.spark.internal.Logging + +/** + * Writes per-operation Spark Connect `statementText` payloads to sidecar files + * so large protobuf-text plans do not inflate the `connect_operations.csv` + * table. Files are written under `/connect_statements/.txt` and + * the returned map records the basenames for inclusion in the operation CSV + * `statementFile` column. + * + * Operations with empty `statementText` are skipped entirely (no file, no map + * entry). The `connect_statements` subdirectory is created lazily on the first + * non-empty statement, so apps with no statements at all do not produce an + * empty directory. Per-file IO errors are logged and skipped; they do not + * abort the batch. + * + * Writes go through [[ToolTextFileWriter]] so the same UTF-8, permissions, and + * local/raw-filesystem behavior used by the rest of the tools output applies + * here as well. + */ +object ConnectStatementWriter extends Logging { + + val SUB_DIR: String = "connect_statements" + val FILE_EXTENSION: String = ".txt" + private val UnsafePathChars = "[^A-Za-z0-9._-]".r + + private def sanitizeOperationId(operationId: String): String = { + UnsafePathChars.replaceAllIn(operationId, "_") + } + + /** + * Writes each operation's `statementText` to + * `/connect_statements/.txt` when non-empty. + * + * @param rootDir per-app output directory (already exists) + * @param ops operations to persist + * @param hadoopConf Hadoop configuration used to resolve the target + * filesystem. When `None`, a fresh `Configuration` is + * used (which resolves the default filesystem, typically + * local). + * @return map of `operationId -> ".txt"` basenames for + * the operations whose sidecar file was written successfully. + */ + def writeStatementFiles( + rootDir: String, + ops: Iterable[ConnectOperationInfo], + hadoopConf: Option[Configuration] = None): Map[String, String] = { + val subDirPath = new Path(rootDir, SUB_DIR) + val builder = Map.newBuilder[String, String] + ops.foreach { op => + val text = op.statementText + if (text.nonEmpty) { + try { + val safeId = sanitizeOperationId(op.operationId) + val basename = s"$safeId$FILE_EXTENSION" + val target = new Path(subDirPath, basename) + // Defense-in-depth containment check. Sanitization already removes + // `/` and `..`, but verify the resolved parent matches. + require(target.getParent == subDirPath, + s"Refusing to write Connect statement sidecar outside $subDirPath: $target") + val writer = new ToolTextFileWriter( + subDirPath.toString, + basename, + s"Connect statement sidecar for operation ${op.operationId}", + hadoopConf) + try { + writer.write(text) + } finally { + writer.close() + } + builder += (op.operationId -> basename) + } catch { + case e: Exception => + logWarning(s"Failed to write Connect statement sidecar for operation " + + s"${op.operationId} under $subDirPath", e) + } + } + } + builder.result() + } +} diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileArgs.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileArgs.scala index 81a253ebf..c1f49fb55 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileArgs.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileArgs.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2025, NVIDIA CORPORATION. + * Copyright (c) 2021-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -91,6 +91,12 @@ Usage: java -cp rapids-4-spark-tools_2.12-.jar:$SPARK_HOME/jars/* val csv: ScallopOption[Boolean] = opt[Boolean](required = false, descr = "Output each table to a CSV file as well creating the summary text file.") + val connectStatements: ScallopOption[Boolean] = + toggle("connect-statements", + default = Some(false), + prefix = "no-", + descrYes = "Write Spark Connect statementText sidecar files. Disabled by default.", + descrNo = "Do not write Spark Connect statementText sidecar files.") val timeout: ScallopOption[Long] = opt[Long](required = false, descr = "Maximum time in seconds to wait for the event logs to be processed. " + diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileOutputWriter.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileOutputWriter.scala index 1bb6f6dc3..46c0fc54c 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileOutputWriter.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileOutputWriter.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2025, NVIDIA CORPORATION. + * Copyright (c) 2021-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,7 +26,7 @@ import org.json4s.jackson.Serialization * In case the outputCSV is set to true, it will write each table to a * separate CSV file. */ -class ProfileOutputWriter(outputDir: String, filePrefix: String, numOutputRows: Int, +class ProfileOutputWriter(val outputDir: String, filePrefix: String, numOutputRows: Int, outputCSV: Boolean = false) { implicit val formats: DefaultFormats.type = DefaultFormats diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala index 2b689397b..dd9d7162c 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/Profiler.scala @@ -55,6 +55,7 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea s"/${Profiler.SUBDIR}" private val numOutputRows = appArgs.numOutputRows.getOrElse(1000) private val outputCSV: Boolean = appArgs.csv() + private val writeConnectStatements: Boolean = appArgs.connectStatements() private val useAutoTuner: Boolean = appArgs.autoTuner() private val outputAlignedSQLIds: Boolean = appArgs.outputSqlIdsAligned() private val enableDiagnosticViews: Boolean = appArgs.enableDiagnosticViews() @@ -432,6 +433,8 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs, enablePB: Boolea profileOutputWriter.writeTable(ProfRemovedBLKMgrView.getLabel, app.removedBMs) profileOutputWriter.writeCSVTable(ProfRemovedExecutorView.getLabel, app.removedExecutors) profileOutputWriter.writeCSVTable("Unsupported SQL Plan", app.unsupportedOps) + Profiler.writeConnectTables(profileOutputWriter, profilerResult.app, + writeConnectStatements, Some(hadoopConf)) if (outputAlignedSQLIds) { profileOutputWriter.writeTable( ProfSQLPlanAlignedView.getLabel, app.sqlCleanedAlignedIds, @@ -496,6 +499,59 @@ object Profiler { val PROFILE_LOG_NAME = "profile" val SUBDIR = "rapids_4_spark_profile" + /** + * Writes `Connect Sessions` and `Connect Operations` per-app CSV tables when + * the application is in Spark Connect mode. No-op otherwise: the underlying + * `writeCSVTable` returns early on empty input, so non-Connect apps produce + * no file at all (matches the behavior of every other per-app table). + * + * When enabled, each operation's `statementText` is written to a sidecar file + * under `/connect_statements/.txt` and the basename + * is recorded in the `statementFile` column of `connect_operations.csv`. + */ + def writeConnectTables( + writer: ProfileOutputWriter, + app: AppBase, + writeStatementSidecars: Boolean = false, + hadoopConf: Option[Configuration] = None): Unit = { + if (!app.isConnectMode) return + val appId = app.appId + // Group once so the per-session operation count is O(operations) overall + // instead of O(sessions * operations). + val opCountBySession: Map[String, Long] = + app.connectOperations.values.groupBy(_.sessionId).map { case (sid, ops) => + sid -> ops.size.toLong + } + val sessionRows = app.connectSessions.values.toSeq.sortBy(_.sessionId).map { s => + ConnectSessionProfileResult( + appId = appId, + sessionId = s.sessionId, + userId = s.userId, + startTime = s.startTime, + endTime = s.endTime, + operationCount = opCountBySession.getOrElse(s.sessionId, 0L)) + } + writer.writeCSVTable("Connect Sessions", sessionRows) + val statementFiles: Map[String, String] = + if (writeStatementSidecars) { + ConnectStatementWriter.writeStatementFiles( + writer.outputDir, app.connectOperations.values, hadoopConf) + } else { + Map.empty + } + val opRows = app.connectOperations.values.toSeq.sortBy(_.operationId).map { op => + ConnectOperationProfileResult.from( + appId = appId, + op = op, + sqlIds = app.operationIdToSqlIds.get(op.operationId) + .map(_.toSeq.sorted).getOrElse(Seq.empty), + jobIds = app.operationIdToJobIds.get(op.operationId) + .map(_.toSeq.sorted).getOrElse(Seq.empty), + statementFile = statementFiles.get(op.operationId)) + } + writer.writeCSVTable("Connect Operations", opRows) + } + def getAutoTunerResultsAsString(props: Seq[TuningEntryTrait], comments: Seq[RecommendedCommentResult]): String = { val propStr = if (props.nonEmpty) { diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/Qualification.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/Qualification.scala index f84ed895e..249f4e470 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/Qualification.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/Qualification.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2025, NVIDIA CORPORATION. + * Copyright (c) 2021-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -40,6 +40,7 @@ class Qualification( enablePB: Boolean, reportSqlLevel: Boolean, maxSQLDescLength: Int, + writeConnectStatements: Boolean, mlOpsEnabled: Boolean, penalizeTransitions: Boolean, tunerContext: Option[TunerContext], @@ -155,7 +156,7 @@ class Qualification( val dsInfo = AppSubscriber.withSafeValidAttempt(app.appId, app.attemptId) { () => QualRawReportGenerator.generateRawMetricQualViewAndGetDataSourceInfo( - outputDir, app) + outputDir, app, writeConnectStatements, Some(hadoopConf)) }.getOrElse(Seq.empty) val qualSumInfo = app.aggregateStats() AppSubscriber.withSafeValidAttempt(app.appId, app.attemptId) { () => diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala index 31086f995..44465f01d 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2025, NVIDIA CORPORATION. + * Copyright (c) 2021-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -157,6 +157,13 @@ Usage: java -cp rapids-4-spark-tools_2.12-.jar:$SPARK_HOME/jars/* val perSql : ScallopOption[Boolean] = opt[Boolean](required = false, descr = "Report at the individual SQL query level.") + val connectStatements: ScallopOption[Boolean] = + toggle("connect-statements", + default = Some(false), + prefix = "no-", + descrYes = "Write Spark Connect statementText sidecar files in raw_metrics. " + + "Disabled by default.", + descrNo = "Do not write Spark Connect statementText sidecar files.") val maxSqlDescLength: ScallopOption[Int] = opt[Int](required = false, descr = "Maximum length of the SQL description string output with the " + diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationMain.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationMain.scala index 95773212c..cf4d422c7 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationMain.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationMain.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2025, NVIDIA CORPORATION. + * Copyright (c) 2021-2026, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -119,7 +119,8 @@ object QualificationMain extends Logging { } val qual = new Qualification(outputDirectory, hadoopConf, timeout, nThreads, pluginTypeChecker, - enablePB, reportSqlLevel, maxSQLDescLength, mlOpsEnabled, penalizeTransitions, + enablePB, reportSqlLevel, maxSQLDescLength, appArgs.connectStatements(), + mlOpsEnabled, penalizeTransitions, tunerContext, appArgs.clusterReport(), appArgs.platform(), appArgs.targetClusterInfo.toOption) val res = qual.qualifyApps(filteredLogs) diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/views/OutHeaderRegistry.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/views/OutHeaderRegistry.scala index ac61a5d3b..8d242332c 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/views/OutHeaderRegistry.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/views/OutHeaderRegistry.scala @@ -314,6 +314,14 @@ object OutHeaderRegistry { "WriteOpProfileResult" -> Array("sqlID", "sqlPlanVersion", "nodeId", "fromFinalPlan", "execName", "format", "location", "tableName", "dataBase", "outputColumns", "writeMode", - "partitionColumns", "compressionOption", "fullDescription") + "partitionColumns", "compressionOption", "fullDescription"), + "ConnectSessionProfileResult" -> + Array("appID", "sessionId", "userId", "startTime", "endTime", "durationMs", + "operationCount"), + "ConnectOperationProfileResult" -> + Array("appID", "operationId", "sessionId", "userId", "jobTag", + "startTime", "finishTime", "closeTime", "failTime", "cancelTime", "durationMs", + "status", "errorMessage", + "sqlIds", "jobIds", "statementFile", "statementTruncated") ) // End of outputHeaders map initialization } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/views/QualRawReportGenerator.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/views/QualRawReportGenerator.scala index c26a9b789..539d4ca50 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/views/QualRawReportGenerator.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/views/QualRawReportGenerator.scala @@ -17,7 +17,10 @@ package com.nvidia.spark.rapids.tool.views import com.nvidia.spark.rapids.tool.analysis.{AggRawMetricsResult, AppSQLPlanAnalyzer, QualSparkMetricsAggregator} -import com.nvidia.spark.rapids.tool.profiling.{AppLevelRecommendationSignalsProfileResult, DataSourceProfileResult, ProfileOutputWriter, ProfileResult, SQLAccumProfileResults} +import com.nvidia.spark.rapids.tool.profiling.{ + AppLevelRecommendationSignalsProfileResult, DataSourceProfileResult, ProfileOutputWriter, + Profiler, ProfileResult, SQLAccumProfileResults} +import org.apache.hadoop.conf.Configuration import org.apache.spark.internal.Logging import org.apache.spark.sql.rapids.tool.qualification.QualificationAppInfo @@ -63,7 +66,9 @@ object QualRawReportGenerator extends Logging { def generateRawMetricQualViewAndGetDataSourceInfo( rootDir: String, - app: QualificationAppInfo): Seq[DataSourceProfileResult] = { + app: QualificationAppInfo, + writeConnectStatements: Boolean = false, + hadoopConf: Option[Configuration] = None): Seq[DataSourceProfileResult] = { val metricsDirectory = s"$rootDir/raw_metrics/${app.appId}" val sqlPlanAnalyzer = AppSQLPlanAnalyzer(app) var dataSourceInfo: Seq[DataSourceProfileResult] = Seq.empty @@ -117,6 +122,7 @@ object QualRawReportGenerator extends Logging { QualRemovedExecutorView.getLabel, QualRemovedExecutorView.getRawView(Seq(app))) // we only need to write the CSV report of the WriteOps pWriter.writeCSVTable(QualWriteOpsView.getLabel, QualWriteOpsView.getRawView(Seq(app))) + Profiler.writeConnectTables(pWriter, app, writeConnectStatements, hadoopConf) } catch { case e: Exception => logError(s"Error generating raw metrics for ${app.appId}: ${e.getMessage}") diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala index 203e3bb12..7634b2fac 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala @@ -143,7 +143,11 @@ abstract class AppBase( val connectOperations: HashMap[String, ConnectOperationInfo] = HashMap.empty // jobTag -> operationId index for correlation with SQL executions and jobs. val jobTagToConnectOpId: HashMap[String, String] = HashMap.empty - def isConnectMode: Boolean = connectOperations.nonEmpty + // operationId -> sqlIDs discovered via SparkListenerSQLExecutionStart.jobTags. + val operationIdToSqlIds: HashMap[String, HashSet[Long]] = HashMap.empty + // operationId -> jobIDs discovered via SparkListenerJobStart.properties["spark.job.tags"]. + val operationIdToJobIds: HashMap[String, HashSet[Int]] = HashMap.empty + def isConnectMode: Boolean = connectSessions.nonEmpty || connectOperations.nonEmpty def sqlPlans: immutable.Map[Long, SparkPlanInfo] = sqlManager.getPlanInfos diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/EventProcessorBase.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/EventProcessorBase.scala index def5ecfd0..14fdd4d54 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/EventProcessorBase.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/EventProcessorBase.scala @@ -16,6 +16,7 @@ package org.apache.spark.sql.rapids.tool +import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.util.control.NonFatal @@ -168,6 +169,17 @@ abstract class EventProcessorBase[T <: AppBase](app: T) extends SparkListener wi // redaction and predicate updates (gpuMode, etc.). // Last-write-wins if multiple SQL executions have different modifiedConfigs. app.mergeModifiedConfigs(modifiedConfigs) + + // Correlate Connect operations to this sqlID via jobTags (Spark 3.5+). + if (app.isConnectMode) { + EventUtils.readJobTagsFromSQLStartEvent(event).foreach { tag => + app.jobTagToConnectOpId.get(tag).foreach { opId => + app.operationIdToSqlIds + .getOrElseUpdate(opId, mutable.HashSet.empty[Long]) + .add(event.executionId) + } + } + } } def doSparkListenerSQLExecutionEnd( @@ -418,6 +430,20 @@ abstract class EventProcessorBase[T <: AppBase](app: T) extends SparkListener wi app.sqlIdToStages.getOrElseUpdate(sqlID.get, ArrayBuffer.empty) ++= event.stageIds } sqlID.foreach(app.jobIdToSqlID(event.jobId) = _) + + // Correlate Connect operations to this jobID via spark.job.tags. + if (app.isConnectMode) { + val tagStr = event.properties.getProperty("spark.job.tags") + if (tagStr != null && tagStr.nonEmpty) { + tagStr.split(",").iterator.map(_.trim).filter(_.nonEmpty).foreach { tag => + app.jobTagToConnectOpId.get(tag).foreach { opId => + app.operationIdToJobIds + .getOrElseUpdate(opId, mutable.HashSet.empty[Int]) + .add(event.jobId) + } + } + } + } } override def onJobStart(jobStart: SparkListenerJobStart): Unit = { diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/ConnectEventHandler.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/ConnectEventHandler.scala index ead26780c..8f23f7a14 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/ConnectEventHandler.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/ConnectEventHandler.scala @@ -32,7 +32,8 @@ import org.apache.spark.sql.rapids.tool.AppBase * directly imported to preserve compatibility with older Spark profiles. * Events are identified by class-name prefix, fields are extracted via * cached reflective accessors in [[EventUtils]], and results are stored - * in [[ConnectSessionInfo]] and [[ConnectOperationInfo]]. + * in [[com.nvidia.spark.rapids.tool.profiling.ConnectSessionInfo]] and + * [[com.nvidia.spark.rapids.tool.profiling.ConnectOperationInfo]]. */ object ConnectEventHandler extends Logging { diff --git a/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/EventUtils.scala b/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/EventUtils.scala index 6f7eeb84f..d770e9e62 100644 --- a/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/EventUtils.scala +++ b/core/src/main/scala/org/apache/spark/sql/rapids/tool/util/EventUtils.scala @@ -210,6 +210,16 @@ object EventUtils extends Logging { }.toOption.flatten.getOrElse(Map.empty) } + // Reads jobTags via reflection (Spark 3.5+, introduced for Connect support). + // Returns empty set on older versions. + def readJobTagsFromSQLStartEvent( + event: SparkListenerSQLExecutionStart): Set[String] = { + Try { + Option(invokeMethodOnEvent(event, "jobTags")) + .map(_.asInstanceOf[Set[String]]) + }.toOption.flatten.getOrElse(Set.empty) + } + @throws[com.fasterxml.jackson.core.JsonParseException] private def handleEventJsonParseEx( ex: com.fasterxml.jackson.core.JsonParseException): Unit = { @@ -355,7 +365,7 @@ object EventUtils extends Logging { /** * Invoke a no-arg method on an object, caching the Method reference. - * @throws NoSuchMethodException if the method does not exist on the object's class. + * Throws NoSuchMethodException if the method does not exist on the object's class. */ @throws[NoSuchMethodException] def invokeMethodOnEvent(event: AnyRef, methodName: String): Any = { diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/ConnectCorrelationSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/ConnectCorrelationSuite.scala new file mode 100644 index 000000000..b6ac574c3 --- /dev/null +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/ConnectCorrelationSuite.scala @@ -0,0 +1,193 @@ +/* + * Copyright (c) 2026, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.tool.profiling + +import java.nio.charset.StandardCharsets +import java.nio.file.{Files, Paths} + +import scala.collection.mutable + +import com.nvidia.spark.rapids.BaseNoSparkSuite +import com.nvidia.spark.rapids.tool.EventLogPathProcessor + +import org.apache.spark.scheduler.SparkListenerJobStart +import org.apache.spark.sql.TrampolineUtil +import org.apache.spark.sql.execution.SparkPlanInfo +import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart +import org.apache.spark.sql.rapids.tool.profiling.ApplicationInfo +import org.apache.spark.sql.rapids.tool.util.RapidsToolsConfUtil + +/** + * Tests for Spark Connect sqlID/jobID correlation indexes on `AppBase`. + * Covers: + * - the reverse-index `HashMap`s exist and are initialized empty on a fresh app, + * - `operationIdToSqlIds` is populated from `SparkListenerSQLExecutionStart.jobTags`, + * - `operationIdToJobIds` is populated from + * `SparkListenerJobStart.properties["spark.job.tags"]`. + */ +class ConnectCorrelationSuite extends BaseNoSparkSuite { + + private val hadoopConf = RapidsToolsConfUtil.newHadoopConf() + + private val logStartEvent = + """{"Event":"SparkListenerLogStart","Spark Version":"3.5.0"}""" + private val appStartEvent = + """{"Event":"SparkListenerApplicationStart","App Name":"CorrelationTest",""" + + """"App ID":"local-correlation","Timestamp":100000,"User":"testUser"}""" + private val envUpdateEvent = + """{"Event":"SparkListenerEnvironmentUpdate","JVM Information":{},""" + + """"Spark Properties":{"spark.master":"local[*]"},""" + + """"Hadoop Properties":{},"System Properties":{"file.encoding":"UTF-8"},""" + + """"Classpath Entries":{}}""" + private val appEndEvent = + """{"Event":"SparkListenerApplicationEnd","Timestamp":200000}""" + + private def withEventLog(events: String*)(verify: ApplicationInfo => Unit): Unit = { + val content = events.mkString("\n") + TrampolineUtil.withTempDir { tempDir => + val path = Paths.get(tempDir.getAbsolutePath, "test_eventlog") + Files.write(path, content.getBytes(StandardCharsets.UTF_8)) + val app = new ApplicationInfo(hadoopConf, + EventLogPathProcessor.getEventLogInfo(path.toString, hadoopConf).head._1) + verify(app) + } + } + + /** + * True when the running Spark profile's SparkListenerSQLExecutionStart has a + * jobTags accessor (Spark 3.5+). Used to skip tests on older profiles. + */ + private def checkJobTagsAvailable(): (Boolean, String) = { + val available = try { + classOf[SparkListenerSQLExecutionStart].getMethod("jobTags") + true + } catch { + case _: NoSuchMethodException => false + } + (available, "SparkListenerSQLExecutionStart.jobTags requires Spark 3.5+") + } + + /** + * Builds a SparkListenerSQLExecutionStart with the given jobTags via reflection, + * matching the 9-arg constructor introduced in Spark 3.5. + */ + private def buildSQLStartEvent(executionId: Long, jobTags: Set[String]) + : SparkListenerSQLExecutionStart = { + val planInfo = new SparkPlanInfo( + "TestNode", "test", Nil, Map.empty[String, String], Nil) + val ctors = classOf[SparkListenerSQLExecutionStart].getConstructors + val ctor = ctors.find(_.getParameterCount == 9).getOrElse( + throw new AssertionError("Expected 9-arg SparkListenerSQLExecutionStart constructor")) + ctor.newInstance( + java.lang.Long.valueOf(executionId), + None, + "desc", + "details", + "physicalPlan", + planInfo, + java.lang.Long.valueOf(123000L), + Map.empty[String, String], + jobTags).asInstanceOf[SparkListenerSQLExecutionStart] + } + + test("operationIdToSqlIds / operationIdToJobIds are initialized empty on AppBase") { + withEventLog(logStartEvent, appStartEvent, envUpdateEvent, appEndEvent) { app => + assert(app.operationIdToSqlIds.isEmpty) + assert(app.operationIdToJobIds.isEmpty) + } + } + + runConditionalTest( + "operationIdToSqlIds populated from SparkListenerSQLExecutionStart.jobTags", + checkJobTagsAvailable) { + withEventLog(logStartEvent, appStartEvent, envUpdateEvent, appEndEvent) { app => + // Manually seed Connect state as if a ConnectOperationStarted event had fired. + val jobTag = + "SparkConnect_OperationTag_User_alice_Session_sess-1_Operation_op-1" + app.connectOperations.put("op-1", new ConnectOperationInfo( + operationId = "op-1", + sessionId = "sess-1", + userId = "alice", + jobTag = jobTag, + statementText = "range(0, 10)", + startTime = 110000L)) + app.jobTagToConnectOpId.put(jobTag, "op-1") + assert(app.isConnectMode, "Should detect Connect mode after seeding") + + // Drive a SparkListenerSQLExecutionStart tagged with the Connect operation. + val evt = buildSQLStartEvent(executionId = 42L, jobTags = Set(jobTag)) + app.processEvent(evt) + + assert(app.operationIdToSqlIds.contains("op-1"), + "operationIdToSqlIds should contain op-1 after SQL start") + assert(app.operationIdToSqlIds("op-1").contains(42L), + "op-1 should map to executionId 42") + + // An untagged SQL execution should not map to any Connect op. + val untagged = buildSQLStartEvent(executionId = 43L, jobTags = Set.empty) + app.processEvent(untagged) + assert(app.operationIdToSqlIds("op-1") == mutable.HashSet(42L), + "Untagged execution should not be attributed to op-1") + } + } + + test("operationIdToJobIds populated from SparkListenerJobStart spark.job.tags") { + withEventLog(logStartEvent, appStartEvent, envUpdateEvent, appEndEvent) { app => + // Seed Connect state as if a ConnectOperationStarted event had fired. + val jobTag = + "SparkConnect_OperationTag_User_u_Session_s_Operation_op-2" + app.connectOperations.put("op-2", new ConnectOperationInfo( + operationId = "op-2", + sessionId = "s", + userId = "u", + jobTag = jobTag, + statementText = "range(0, 10)", + startTime = 110000L)) + app.jobTagToConnectOpId.put(jobTag, "op-2") + assert(app.isConnectMode, "Should detect Connect mode after seeding") + + // Simulate a JobStart whose spark.job.tags mixes the Connect tag with a + // user-supplied tag (e.g., from spark.addTag). + val props = new java.util.Properties() + props.setProperty("spark.job.tags", s"$jobTag,custom-user-tag") + val evt = SparkListenerJobStart( + jobId = 7, time = 2000L, stageInfos = Nil, properties = props) + app.processEvent(evt) + + assert(app.operationIdToJobIds("op-2") == mutable.HashSet(7), + "op-2 should map to exactly jobId 7") + assert(app.operationIdToJobIds.size == 1, + "No spurious opIds should be created from user tags") + } + } + + test("operationIdToJobIds stays empty when app is not in Connect mode") { + withEventLog(logStartEvent, appStartEvent, envUpdateEvent, appEndEvent) { app => + assert(!app.isConnectMode, "Fresh app should not be in Connect mode") + + val props = new java.util.Properties() + props.setProperty("spark.job.tags", + "SparkConnect_OperationTag_User_u_Session_s_Operation_op-x") + val evt = SparkListenerJobStart( + jobId = 8, time = 3000L, stageInfos = Nil, properties = props) + app.processEvent(evt) + + assert(app.operationIdToJobIds.isEmpty, + "Non-Connect app should not populate operationIdToJobIds") + } + } +} diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/ConnectProfileResultsSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/ConnectProfileResultsSuite.scala new file mode 100644 index 000000000..a93f4a8b8 --- /dev/null +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/ConnectProfileResultsSuite.scala @@ -0,0 +1,195 @@ +/* + * Copyright (c) 2026, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.tool.profiling + +import com.nvidia.spark.rapids.tool.views.OutHeaderRegistry +import org.scalatest.funsuite.AnyFunSuite + +class ConnectProfileResultsSuite extends AnyFunSuite { + + private def operationHeaders: Array[String] = + OutHeaderRegistry.outputHeaders("ConnectOperationProfileResult") + + private def operationCol(row: ConnectOperationProfileResult, name: String): String = + row.convertToSeq()(operationHeaders.indexOf(name)) + + test("ConnectSessionProfileResult emits correct raw columns") { + val row = ConnectSessionProfileResult( + appId = "app-1", sessionId = "s1", userId = "u1", + startTime = 1000L, endTime = Some(5000L), operationCount = 3) + assert(row.convertToSeq().toSeq === + Seq("app-1", "s1", "u1", "1000", "5000", "4000", "3")) + + val open = ConnectSessionProfileResult( + appId = "app-2", sessionId = "s2", userId = "u2", + startTime = 1000L, endTime = None, operationCount = 0) + val openSeq = open.convertToSeq() + // endTime is rendered as null (orNull) when absent, matching the convention + // used by neighboring ProfileResult classes in ProfileClassWarehouse.scala. + assert(openSeq(0) == "app-2") + assert(openSeq(1) == "s2") + assert(openSeq(2) == "u2") + assert(openSeq(3) == "1000") + assert(openSeq(4) == null) + assert(openSeq(5) == "-1") + assert(openSeq(6) == "0") + } + + test("ConnectSessionProfileResult.convertToCSVSeq wraps string fields in quotes") { + val row = ConnectSessionProfileResult( + appId = "app,comma", sessionId = "s\"quote", userId = "u1", + startTime = 1000L, endTime = Some(5000L), operationCount = 3) + val csv = row.convertToCSVSeq() + // reformatCSVString escapes inner " by doubling and wraps the result in "..." + assert(csv(0) == "\"app,comma\"") + assert(csv(1) == "\"s\"\"quote\"") + assert(csv(2) == "\"u1\"") + // numeric fields stay raw + assert(csv(3) == "1000") + assert(csv(4) == "5000") + assert(csv(5) == "4000") + assert(csv(6) == "3") + } + + test("OutHeaderRegistry includes ConnectSession and ConnectOperation headers") { + assert(OutHeaderRegistry.outputHeaders.contains("ConnectSessionProfileResult")) + assert(OutHeaderRegistry.outputHeaders.contains("ConnectOperationProfileResult")) + assert(OutHeaderRegistry.outputHeaders("ConnectSessionProfileResult").toSeq === + Seq("appID", "sessionId", "userId", "startTime", "endTime", "durationMs", + "operationCount")) + } + + test("row array length matches header count") { + val sess = ConnectSessionProfileResult("a", "s", "u", 0L, None, 0L) + assert(sess.convertToCSVSeq().length == sess.outputHeaders.length) + assert(sess.convertToSeq().length == sess.outputHeaders.length) + + val op = new ConnectOperationInfo("o", "s", "u", "t", "", 0L) + val opRow = ConnectOperationProfileResult.from("a", op, Seq.empty, Seq.empty, None) + assert(opRow.convertToCSVSeq().length == opRow.outputHeaders.length) + assert(opRow.convertToSeq().length == opRow.outputHeaders.length) + } + + test("ConnectOperationProfileResult derives status and core columns correctly") { + val op = new ConnectOperationInfo( + operationId = "op", sessionId = "s", userId = "u", + jobTag = "tag", statementText = "SELECT 1", startTime = 100L) + op.analyzeTime = Some(200L) + op.readyForExecTime = Some(300L) + op.finishTime = Some(500L) + op.closeTime = Some(600L) + op.producedRowCount = Some(1L) + val row = ConnectOperationProfileResult.from( + appId = "app-1", op = op, sqlIds = Seq(42L), jobIds = Seq(7), + statementFile = Some("op.txt")) + assert(operationCol(row, "operationId") == "op") + assert(operationCol(row, "status") == "SUCCEEDED") + assert(operationCol(row, "durationMs") == "500") + assert(operationCol(row, "finishTime") == "500") + assert(operationCol(row, "closeTime") == "600") + assert(operationCol(row, "sqlIds") == "42") + assert(operationCol(row, "jobIds") == "7") + assert(operationCol(row, "statementFile") == "op.txt") + assert(operationCol(row, "statementTruncated") == "false") + } + + test("ConnectOperationProfileResult derives FAILED status with errorMessage") { + val op = new ConnectOperationInfo( + operationId = "op-f", sessionId = "s", userId = "u", + jobTag = "tag", statementText = "bad", startTime = 100L) + op.failTime = Some(150L) + op.errorMessage = Some("boom") + val row = ConnectOperationProfileResult.from( + appId = "app", op = op, sqlIds = Seq.empty, jobIds = Seq.empty, + statementFile = None) + assert(operationCol(row, "status") == "FAILED") + assert(operationCol(row, "errorMessage") == "boom") + assert(operationCol(row, "statementFile") == "") + assert(operationCol(row, "durationMs") == "50") + assert(operationCol(row, "failTime") == "150") + assert(operationCol(row, "sqlIds") == "") + } + + test("ConnectOperationProfileResult derives CANCELED status takes priority over FAILED") { + val op = new ConnectOperationInfo( + operationId = "op-c", sessionId = "s", userId = "u", + jobTag = "tag", statementText = "", startTime = 100L) + op.cancelTime = Some(150L) + op.failTime = Some(155L) + val row = ConnectOperationProfileResult.from("app", op, Seq.empty, Seq.empty, None) + assert(operationCol(row, "status") == "CANCELED") + } + + test("ConnectOperationProfileResult derives RUNNING status when no terminal timestamp") { + val op = new ConnectOperationInfo( + operationId = "op-r", sessionId = "s", userId = "u", + jobTag = "tag", statementText = "", startTime = 100L) + val row = ConnectOperationProfileResult.from("app", op, Seq.empty, Seq.empty, None) + assert(operationCol(row, "status") == "RUNNING") + assert(operationCol(row, "durationMs") == "-1") + } + + test("ConnectOperationProfileResult detects truncated statementText") { + val op = new ConnectOperationInfo( + operationId = "op-t", sessionId = "s", userId = "u", + jobTag = "tag", + statementText = "plan body ... " + ConnectOperationProfileResult.TruncationMarker + + "1234)] more", + startTime = 100L) + val row = ConnectOperationProfileResult.from("app", op, Seq.empty, Seq.empty, None) + assert(operationCol(row, "statementTruncated") == "true") + } + + test("ConnectOperationProfileResult does not infer truncation from structural elision") { + val op = new ConnectOperationInfo( + operationId = "op-struct", sessionId = "s", userId = "u", + jobTag = "tag", + statementText = + """filter { + | input { + | common {} + | join {} + | } + |}""".stripMargin, + startTime = 100L) + val row = ConnectOperationProfileResult.from("app", op, Seq.empty, Seq.empty, None) + assert(operationCol(row, "statementTruncated") == "false") + } + + test("ConnectOperationProfileResult.convertToCSVSeq wraps string fields in quotes") { + val op = new ConnectOperationInfo( + operationId = "op,x", sessionId = "s\"q", userId = "u1", + jobTag = "tag", statementText = "SELECT 1", startTime = 100L) + op.errorMessage = Some("bad, msg") + val row = ConnectOperationProfileResult.from( + appId = "app-1", op = op, sqlIds = Seq(1L, 2L), jobIds = Seq(3, 4), + statementFile = Some("f,x.txt")) + val csv = row.convertToCSVSeq() + def csvCol(name: String): String = csv(operationHeaders.indexOf(name)) + assert(csvCol("appID") == "\"app-1\"") + assert(csvCol("operationId") == "\"op,x\"") + assert(csvCol("sessionId") == "\"s\"\"q\"") + assert(csvCol("userId") == "\"u1\"") + assert(csvCol("errorMessage") == "\"bad, msg\"") + assert(csvCol("sqlIds") == "\"1;2\"") + assert(csvCol("jobIds") == "\"3;4\"") + assert(csvCol("statementFile") == "\"f,x.txt\"") + // numeric/boolean fields remain raw + assert(csvCol("startTime") == "100") + assert(csvCol("statementTruncated") == "false") + } +} diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/ConnectProfilerOutputSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/ConnectProfilerOutputSuite.scala new file mode 100644 index 000000000..d3ea099eb --- /dev/null +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/ConnectProfilerOutputSuite.scala @@ -0,0 +1,329 @@ +/* + * Copyright (c) 2026, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.tool.profiling + +import java.nio.charset.StandardCharsets +import java.nio.file.{Files, Path, Paths} +import java.util.Comparator + +import scala.collection.mutable + +import com.nvidia.spark.rapids.BaseNoSparkSuite +import com.nvidia.spark.rapids.tool.EventLogPathProcessor +import com.nvidia.spark.rapids.tool.views.OutHeaderRegistry + +import org.apache.spark.sql.TrampolineUtil +import org.apache.spark.sql.rapids.tool.profiling.ApplicationInfo +import org.apache.spark.sql.rapids.tool.util.{RapidsToolsConfUtil, UTF8Source} + +/** + * Tests that [[ConnectSessionProfileResult]] and + * [[ConnectOperationProfileResult]] are wired into the Profiler's per-app CSV + * output — `connect_sessions.csv` and `connect_operations.csv` are produced in + * Connect mode and absent otherwise. + */ +class ConnectProfilerOutputSuite extends BaseNoSparkSuite { + + private val hadoopConf = RapidsToolsConfUtil.newHadoopConf() + + private val logStartEvent = + """{"Event":"SparkListenerLogStart","Spark Version":"3.5.0"}""" + private val appStartEvent = + """{"Event":"SparkListenerApplicationStart","App Name":"ConnectOutputTest",""" + + """"App ID":"local-connect-output","Timestamp":100000,"User":"testUser"}""" + private val envUpdateEvent = + """{"Event":"SparkListenerEnvironmentUpdate","JVM Information":{},""" + + """"Spark Properties":{"spark.master":"local[*]"},""" + + """"Hadoop Properties":{},"System Properties":{"file.encoding":"UTF-8"},""" + + """"Classpath Entries":{}}""" + private val appEndEvent = + """{"Event":"SparkListenerApplicationEnd","Timestamp":200000}""" + + private def withEventLog(events: String*)(verify: ApplicationInfo => Unit): Unit = { + val content = events.mkString("\n") + TrampolineUtil.withTempDir { tempDir => + val path = Paths.get(tempDir.getAbsolutePath, "test_eventlog") + Files.write(path, content.getBytes(StandardCharsets.UTF_8)) + val app = new ApplicationInfo(hadoopConf, + EventLogPathProcessor.getEventLogInfo(path.toString, hadoopConf).head._1) + verify(app) + } + } + + private def deleteRecursively(root: Path): Unit = { + if (Files.exists(root)) { + val stream = Files.walk(root) + try { + stream.sorted(Comparator.reverseOrder[Path]()) + .forEach(p => Files.deleteIfExists(p)) + } finally { + stream.close() + } + } + } + + private def readAllLines(path: Path): Seq[String] = { + val src = UTF8Source.fromFile(path.toFile) + try { + src.getLines().toList + } finally { + src.close() + } + } + + test("writeConnectTables emits connect_sessions.csv and connect_operations.csv " + + "when isConnectMode") { + withEventLog(logStartEvent, appStartEvent, envUpdateEvent, appEndEvent) { app => + // Seed two sessions and two operations (one SUCCEEDED, one FAILED) plus + // sqlID / jobID correlations, so both result types exercise their + // convertToCSVSeq paths. + app.connectSessions.put("sess-1", new ConnectSessionInfo( + sessionId = "sess-1", + userId = "alice", + startTime = 100L, + endTime = Some(500L))) + + val op1StatementText = "SELECT 1 plan body" + val op1 = new ConnectOperationInfo( + operationId = "op-1", + sessionId = "sess-1", + userId = "alice", + jobTag = "SparkConnect_OperationTag_User_alice_Session_sess-1_Operation_op-1", + statementText = op1StatementText, + startTime = 110L, + analyzeTime = Some(120L), + readyForExecTime = Some(130L), + finishTime = Some(150L), + closeTime = Some(160L), + producedRowCount = Some(10L)) + app.connectOperations.put("op-1", op1) + app.operationIdToSqlIds.put("op-1", mutable.HashSet(42L)) + app.operationIdToJobIds.put("op-1", mutable.HashSet(7)) + + val op2 = new ConnectOperationInfo( + operationId = "op-2", + sessionId = "sess-1", + userId = "alice", + jobTag = "SparkConnect_OperationTag_User_alice_Session_sess-1_Operation_op-2", + statementText = "", + startTime = 200L, + failTime = Some(260L), + errorMessage = Some("boom")) + app.connectOperations.put("op-2", op2) + + assert(app.isConnectMode, "Seeded app should report Connect mode") + + val tmpDir = Files.createTempDirectory("prof-connect-out-").toFile + try { + val writer = new ProfileOutputWriter(tmpDir.getAbsolutePath, "profile", + numOutputRows = 1000, outputCSV = true) + try { + Profiler.writeConnectTables(writer, app) + } finally { + writer.close() + } + + val sessionsCsv = Paths.get(tmpDir.getAbsolutePath, "connect_sessions.csv") + val operationsCsv = Paths.get(tmpDir.getAbsolutePath, "connect_operations.csv") + assert(Files.exists(sessionsCsv), s"expected $sessionsCsv to exist") + assert(Files.exists(operationsCsv), s"expected $operationsCsv to exist") + + val sessionLines = readAllLines(sessionsCsv) + // Header + 1 session row + assert(sessionLines.size == 2, s"unexpected session rows: $sessionLines") + assert(sessionLines.head == + "appID,sessionId,userId,startTime,endTime,durationMs,operationCount", + s"unexpected session header: ${sessionLines.head}") + + val opLines = readAllLines(operationsCsv) + // Header + 2 op rows + assert(opLines.size == 3, s"unexpected operation rows: $opLines") + assert(opLines.head == + "appID,operationId,sessionId,userId,jobTag,startTime,finishTime,closeTime," + + "failTime,cancelTime,durationMs,status,errorMessage,sqlIds,jobIds," + + "statementFile,statementTruncated", + s"unexpected operation header: ${opLines.head}") + // Per-column parse: find the `status` column index from the registry and + // assert exactly one SUCCEEDED and one FAILED row. Rows in this test case + // contain no embedded commas, so simple string split is sufficient. + // CSV string columns are wrapped in double quotes by reformatCSVString; + // strip surrounding quotes before comparison. + val opHeaders = OutHeaderRegistry.outputHeaders("ConnectOperationProfileResult") + val statusIdx = opHeaders.indexOf("status") + assert(statusIdx >= 0, s"status column missing from registry headers: ${ + opHeaders.mkString(",")}") + val statusValues = opLines.tail.map(_.split(",", -1)(statusIdx).stripPrefix("\"") + .stripSuffix("\"")) + assert(statusValues.count(_ == "SUCCEEDED") == 1, + s"expected exactly one SUCCEEDED row: $statusValues") + assert(statusValues.count(_ == "FAILED") == 1, + s"expected exactly one FAILED row: $statusValues") + + // Sidecars are disabled by default. The statementFile column remains + // empty until the caller opts into writing sidecars. + val statementsDir = Paths.get(tmpDir.getAbsolutePath, + ConnectStatementWriter.SUB_DIR) + assert(!Files.exists(statementsDir), + s"expected no $statementsDir when sidecars are disabled") + val opIdIdx = opHeaders.indexOf("operationId") + val statementFileIdx = opHeaders.indexOf("statementFile") + assert(statementFileIdx >= 0, + s"statementFile column missing from registry headers: ${ + opHeaders.mkString(",")}") + val stmtFileByOp = opLines.tail.map { line => + val cols = line.split(",", -1) + val opId = cols(opIdIdx).stripPrefix("\"").stripSuffix("\"") + val stmtFile = cols(statementFileIdx).stripPrefix("\"").stripSuffix("\"") + opId -> stmtFile + }.toMap + assert(stmtFileByOp("op-1") == "", + s"expected op-1 statementFile empty by default, got ${stmtFileByOp("op-1")}") + assert(stmtFileByOp("op-2") == "", + s"expected op-2 statementFile empty, got ${stmtFileByOp("op-2")}") + } finally { + deleteRecursively(tmpDir.toPath) + } + } + } + + test("writeConnectTables writes no files when app is not in Connect mode") { + withEventLog(logStartEvent, appStartEvent, envUpdateEvent, appEndEvent) { app => + assert(!app.isConnectMode, "Fresh app should not be in Connect mode") + + val tmpDir = Files.createTempDirectory("prof-connect-out-").toFile + try { + val writer = new ProfileOutputWriter(tmpDir.getAbsolutePath, "profile", + numOutputRows = 1000, outputCSV = true) + try { + Profiler.writeConnectTables(writer, app) + } finally { + writer.close() + } + + val sessionsCsv = Paths.get(tmpDir.getAbsolutePath, "connect_sessions.csv") + val operationsCsv = Paths.get(tmpDir.getAbsolutePath, "connect_operations.csv") + assert(!Files.exists(sessionsCsv), + s"expected no $sessionsCsv for non-Connect app") + assert(!Files.exists(operationsCsv), + s"expected no $operationsCsv for non-Connect app") + } finally { + deleteRecursively(tmpDir.toPath) + } + } + } + + test("writeConnectTables emits connect_sessions.csv for session-only Connect logs") { + withEventLog(logStartEvent, appStartEvent, envUpdateEvent, appEndEvent) { app => + app.connectSessions.put("sess-1", new ConnectSessionInfo( + sessionId = "sess-1", + userId = "alice", + startTime = 100L, + endTime = Some(500L))) + + assert(app.isConnectMode, "Session-only app should report Connect mode") + + val tmpDir = Files.createTempDirectory("prof-connect-out-").toFile + try { + val writer = new ProfileOutputWriter(tmpDir.getAbsolutePath, "profile", + numOutputRows = 1000, outputCSV = true) + try { + Profiler.writeConnectTables(writer, app) + } finally { + writer.close() + } + + val sessionsCsv = Paths.get(tmpDir.getAbsolutePath, "connect_sessions.csv") + val operationsCsv = Paths.get(tmpDir.getAbsolutePath, "connect_operations.csv") + assert(Files.exists(sessionsCsv), s"expected $sessionsCsv to exist") + assert(!Files.exists(operationsCsv), + s"expected no $operationsCsv for session-only Connect app") + + val sessionLines = readAllLines(sessionsCsv) + assert(sessionLines.size == 2, s"unexpected session rows: $sessionLines") + assert(sessionLines(1).contains("sess-1"), + s"expected sess-1 row in session output: ${sessionLines(1)}") + } finally { + deleteRecursively(tmpDir.toPath) + } + } + } + + test("writeConnectTables writes statement sidecars only when enabled") { + withEventLog(logStartEvent, appStartEvent, envUpdateEvent, appEndEvent) { app => + val op1StatementText = "SELECT 1 plan body" + app.connectSessions.put("sess-1", new ConnectSessionInfo( + sessionId = "sess-1", + userId = "alice", + startTime = 100L, + endTime = Some(500L))) + app.connectOperations.put("op-1", new ConnectOperationInfo( + operationId = "op-1", + sessionId = "sess-1", + userId = "alice", + jobTag = "SparkConnect_OperationTag_User_alice_Session_sess-1_Operation_op-1", + statementText = op1StatementText, + startTime = 110L)) + app.connectOperations.put("op-2", new ConnectOperationInfo( + operationId = "op-2", + sessionId = "sess-1", + userId = "alice", + jobTag = "SparkConnect_OperationTag_User_alice_Session_sess-1_Operation_op-2", + statementText = "", + startTime = 120L)) + + val tmpDir = Files.createTempDirectory("prof-connect-out-").toFile + try { + val writer = new ProfileOutputWriter(tmpDir.getAbsolutePath, "profile", + numOutputRows = 1000, outputCSV = true) + try { + Profiler.writeConnectTables(writer, app, writeStatementSidecars = true) + } finally { + writer.close() + } + + val opLines = readAllLines(Paths.get(tmpDir.getAbsolutePath, "connect_operations.csv")) + val opHeaders = OutHeaderRegistry.outputHeaders("ConnectOperationProfileResult") + val opIdIdx = opHeaders.indexOf("operationId") + val statementFileIdx = opHeaders.indexOf("statementFile") + val stmtFileByOp = opLines.tail.map { line => + val cols = line.split(",", -1) + val opId = cols(opIdIdx).stripPrefix("\"").stripSuffix("\"") + val stmtFile = cols(statementFileIdx).stripPrefix("\"").stripSuffix("\"") + opId -> stmtFile + }.toMap + + val statementsDir = Paths.get(tmpDir.getAbsolutePath, ConnectStatementWriter.SUB_DIR) + val op1Sidecar = statementsDir.resolve("op-1.txt") + val op2Sidecar = statementsDir.resolve("op-2.txt") + assert(Files.isDirectory(statementsDir), + s"expected $statementsDir directory when sidecars are enabled") + assert(Files.exists(op1Sidecar), s"expected $op1Sidecar to exist") + assert(!Files.exists(op2Sidecar), + s"expected $op2Sidecar not to exist for empty statementText") + val op1Contents = new String(Files.readAllBytes(op1Sidecar), StandardCharsets.UTF_8) + assert(op1Contents == op1StatementText, + s"sidecar contents mismatch: $op1Contents vs $op1StatementText") + assert(stmtFileByOp("op-1") == "op-1.txt", + s"expected op-1 statementFile=op-1.txt, got ${stmtFileByOp("op-1")}") + assert(stmtFileByOp("op-2") == "", + s"expected op-2 statementFile empty, got ${stmtFileByOp("op-2")}") + } finally { + deleteRecursively(tmpDir.toPath) + } + } + } +} diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/ConnectStatementWriterSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/ConnectStatementWriterSuite.scala new file mode 100644 index 000000000..b16612eb6 --- /dev/null +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/profiling/ConnectStatementWriterSuite.scala @@ -0,0 +1,140 @@ +/* + * Copyright (c) 2026, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.tool.profiling + +import java.nio.charset.StandardCharsets +import java.nio.file.{Files, Path, Paths} +import java.util.Comparator + +import org.scalatest.funsuite.AnyFunSuite + +/** + * Tests for [[ConnectStatementWriter]]: writes each operation's `statementText` + * to a sidecar at `/connect_statements/.txt` + * and returns the basename map used to populate the `statementFile` column in + * `connect_operations.csv`. + */ +class ConnectStatementWriterSuite extends AnyFunSuite { + + private def deleteRecursively(root: Path): Unit = { + if (Files.exists(root)) { + val stream = Files.walk(root) + try { + stream.sorted(Comparator.reverseOrder[Path]()) + .forEach(p => Files.deleteIfExists(p)) + } finally { + stream.close() + } + } + } + + private def makeOp(opId: String, stmt: String): ConnectOperationInfo = { + new ConnectOperationInfo( + operationId = opId, + sessionId = "sess-1", + userId = "alice", + jobTag = s"SparkConnect_OperationTag_User_alice_Session_sess-1_Operation_$opId", + statementText = stmt, + startTime = 100L) + } + + test("writes non-empty statementText to sidecar file and returns basename") { + val tmpDir = Files.createTempDirectory("connect-stmt-writer-") + try { + val op = makeOp("op-1", "range(0, 10)") + val result = ConnectStatementWriter.writeStatementFiles( + tmpDir.toString, Seq(op)) + assert(result == Map("op-1" -> "op-1.txt"), + s"expected map op-1 -> op-1.txt, got $result") + val expectedPath = tmpDir.resolve(ConnectStatementWriter.SUB_DIR).resolve("op-1.txt") + assert(Files.exists(expectedPath), s"expected sidecar at $expectedPath") + val written = new String(Files.readAllBytes(expectedPath), StandardCharsets.UTF_8) + assert(written == "range(0, 10)", s"unexpected content: $written") + } finally { + deleteRecursively(tmpDir) + } + } + + test("empty statementText is skipped and not in returned map") { + val tmpDir = Files.createTempDirectory("connect-stmt-writer-") + try { + val op1 = makeOp("op-1", "plan body") + val op2 = makeOp("op-2", "") + val result = ConnectStatementWriter.writeStatementFiles( + tmpDir.toString, Seq(op1, op2)) + assert(result.keySet == Set("op-1"), + s"expected only op-1 in map, got ${result.keySet}") + assert(result("op-1") == "op-1.txt") + val op2Path = tmpDir.resolve(ConnectStatementWriter.SUB_DIR).resolve("op-2.txt") + assert(!Files.exists(op2Path), s"op-2 sidecar should not exist: $op2Path") + } finally { + deleteRecursively(tmpDir) + } + } + + test("does not create connect_statements dir when all statementText empty") { + val tmpDir = Files.createTempDirectory("connect-stmt-writer-") + try { + val op1 = makeOp("op-1", "") + val op2 = makeOp("op-2", "") + val result = ConnectStatementWriter.writeStatementFiles( + tmpDir.toString, Seq(op1, op2)) + assert(result.isEmpty, s"expected empty map, got $result") + val subDir = tmpDir.resolve(ConnectStatementWriter.SUB_DIR) + assert(!Files.exists(subDir), + s"sidecar directory should not have been created: $subDir") + } finally { + deleteRecursively(tmpDir) + } + } + + test("Unicode / multi-byte statementText roundtrips through UTF-8") { + val tmpDir = Files.createTempDirectory("connect-stmt-writer-") + try { + val unicode = "SELECT 'λ', '漢字', '🚀' FROM t" + val op = makeOp("op-1", unicode) + val result = ConnectStatementWriter.writeStatementFiles( + tmpDir.toString, Seq(op)) + assert(result == Map("op-1" -> "op-1.txt")) + val path = Paths.get(tmpDir.toString, ConnectStatementWriter.SUB_DIR, "op-1.txt") + val bytes = Files.readAllBytes(path) + assert(bytes.sameElements(unicode.getBytes(StandardCharsets.UTF_8)), + "bytes on disk should match UTF-8 encoded original") + val roundtrip = new String(bytes, StandardCharsets.UTF_8) + assert(roundtrip == unicode, s"roundtrip mismatch: $roundtrip vs $unicode") + } finally { + deleteRecursively(tmpDir) + } + } + + test("sanitizes operationId before resolving sidecar path") { + val tmpDir = Files.createTempDirectory("connect-stmt-writer-") + try { + val op = makeOp("../../etc/foo", "range(0, 10)") + val result = ConnectStatementWriter.writeStatementFiles(tmpDir.toString, Seq(op)) + val expectedBasename = ".._.._etc_foo.txt" + assert(result == Map("../../etc/foo" -> expectedBasename), + s"expected sanitized basename map, got $result") + val expectedPath = tmpDir.resolve(ConnectStatementWriter.SUB_DIR).resolve(expectedBasename) + assert(Files.exists(expectedPath), s"expected sidecar at $expectedPath") + assert(expectedPath.normalize().startsWith(tmpDir.resolve(ConnectStatementWriter.SUB_DIR)), + s"sidecar should remain under ${ConnectStatementWriter.SUB_DIR}: $expectedPath") + } finally { + deleteRecursively(tmpDir) + } + } +} diff --git a/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationConnectOutputSuite.scala b/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationConnectOutputSuite.scala new file mode 100644 index 000000000..605ef93fc --- /dev/null +++ b/core/src/test/scala/com/nvidia/spark/rapids/tool/qualification/QualificationConnectOutputSuite.scala @@ -0,0 +1,217 @@ +/* + * Copyright (c) 2026, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.tool.qualification + +import java.nio.charset.StandardCharsets +import java.nio.file.{Files, Path, Paths} +import java.util.Comparator + +import scala.collection.mutable + +import com.nvidia.spark.rapids.BaseNoSparkSuite +import com.nvidia.spark.rapids.tool.profiling.{ConnectOperationInfo, ConnectSessionInfo} +import com.nvidia.spark.rapids.tool.views.QualRawReportGenerator + +import org.apache.spark.sql.TrampolineUtil +import org.apache.spark.sql.rapids.tool.qualification.QualificationAppInfo +import org.apache.spark.sql.rapids.tool.util.UTF8Source + +/** + * Verifies that the qualification raw-metrics writer emits the same Spark + * Connect tables and statement sidecars as profiling, but under + * `raw_metrics//`. + */ +class QualificationConnectOutputSuite extends BaseNoSparkSuite { + + private val logStartEvent = + """{"Event":"SparkListenerLogStart","Spark Version":"3.5.0"}""" + private val appStartEvent = + """{"Event":"SparkListenerApplicationStart","App Name":"QualConnectOutputTest",""" + + """"App ID":"local-qual-connect-output","Timestamp":100000,"User":"testUser"}""" + private val envUpdateEvent = + """{"Event":"SparkListenerEnvironmentUpdate","JVM Information":{},""" + + """"Spark Properties":{"spark.master":"local[*]"},""" + + """"Hadoop Properties":{},"System Properties":{"file.encoding":"UTF-8"},""" + + """"Classpath Entries":{}}""" + private val appEndEvent = + """{"Event":"SparkListenerApplicationEnd","Timestamp":200000}""" + + private def withQualificationApp(events: String*)(verify: QualificationAppInfo => Unit): Unit = { + val content = events.mkString("\n") + TrampolineUtil.withTempDir { tempDir => + val path = Paths.get(tempDir.getAbsolutePath, "test_eventlog") + Files.write(path, content.getBytes(StandardCharsets.UTF_8)) + val app = createAppFromEventlog(path.toString) + verify(app) + } + } + + private def deleteRecursively(root: Path): Unit = { + if (Files.exists(root)) { + val stream = Files.walk(root) + try { + stream.sorted(Comparator.reverseOrder[Path]()) + .forEach(p => Files.deleteIfExists(p)) + } finally { + stream.close() + } + } + } + + private def readAllLines(path: Path): Seq[String] = { + val src = UTF8Source.fromFile(path.toFile) + try { + src.getLines().toList + } finally { + src.close() + } + } + + test("qualification raw metrics emit connect CSVs and statement sidecars when enabled") { + withQualificationApp(logStartEvent, appStartEvent, envUpdateEvent, appEndEvent) { app => + app.connectSessions.put("sess-1", new ConnectSessionInfo( + sessionId = "sess-1", + userId = "alice", + startTime = 100L, + endTime = Some(500L))) + + val op1StatementText = "SELECT 1 plan body" + app.connectOperations.put("op-1", new ConnectOperationInfo( + operationId = "op-1", + sessionId = "sess-1", + userId = "alice", + jobTag = "SparkConnect_OperationTag_User_alice_Session_sess-1_Operation_op-1", + statementText = op1StatementText, + startTime = 110L, + analyzeTime = Some(120L), + readyForExecTime = Some(130L), + finishTime = Some(150L), + closeTime = Some(160L), + producedRowCount = Some(10L))) + app.operationIdToSqlIds.put("op-1", mutable.HashSet(42L)) + app.operationIdToJobIds.put("op-1", mutable.HashSet(7)) + + app.connectOperations.put("op-2", new ConnectOperationInfo( + operationId = "op-2", + sessionId = "sess-1", + userId = "alice", + jobTag = "SparkConnect_OperationTag_User_alice_Session_sess-1_Operation_op-2", + statementText = "", + startTime = 200L, + failTime = Some(260L), + errorMessage = Some("boom"))) + + val tmpDir = Files.createTempDirectory("qual-connect-out-") + try { + QualRawReportGenerator.generateRawMetricQualViewAndGetDataSourceInfo( + tmpDir.toString, app, writeConnectStatements = true) + + val appDir = tmpDir.resolve("raw_metrics").resolve(app.appId) + val sessionsCsv = appDir.resolve("connect_sessions.csv") + val operationsCsv = appDir.resolve("connect_operations.csv") + assert(Files.exists(sessionsCsv), s"expected $sessionsCsv to exist") + assert(Files.exists(operationsCsv), s"expected $operationsCsv to exist") + + val sessionLines = readAllLines(sessionsCsv) + assert(sessionLines.size == 2, s"unexpected session rows: $sessionLines") + assert(sessionLines.head == + "appID,sessionId,userId,startTime,endTime,durationMs,operationCount", + s"unexpected session header: ${sessionLines.head}") + + val opLines = readAllLines(operationsCsv) + assert(opLines.size == 3, s"unexpected operation rows: $opLines") + assert(opLines.head.contains("statementFile"), + s"connect_operations header should include statementFile: ${opLines.head}") + + val statementsDir = appDir.resolve("connect_statements") + assert(Files.isDirectory(statementsDir), + s"expected $statementsDir directory for op-1 sidecar") + val op1Sidecar = statementsDir.resolve("op-1.txt") + val op2Sidecar = statementsDir.resolve("op-2.txt") + assert(Files.exists(op1Sidecar), s"expected $op1Sidecar to exist") + assert(!Files.exists(op2Sidecar), + s"expected $op2Sidecar not to exist for empty statementText") + val op1Contents = new String(Files.readAllBytes(op1Sidecar), StandardCharsets.UTF_8) + assert(op1Contents == op1StatementText, + s"sidecar contents mismatch: $op1Contents vs $op1StatementText") + } finally { + deleteRecursively(tmpDir) + } + } + } + + test("qualification raw metrics emit connect_sessions.csv for session-only Connect logs") { + withQualificationApp(logStartEvent, appStartEvent, envUpdateEvent, appEndEvent) { app => + app.connectSessions.put("sess-1", new ConnectSessionInfo( + sessionId = "sess-1", + userId = "alice", + startTime = 100L, + endTime = Some(500L))) + + assert(app.isConnectMode, "Session-only qualification app should report Connect mode") + + val tmpDir = Files.createTempDirectory("qual-connect-out-") + try { + QualRawReportGenerator.generateRawMetricQualViewAndGetDataSourceInfo(tmpDir.toString, app) + + val appDir = tmpDir.resolve("raw_metrics").resolve(app.appId) + val sessionsCsv = appDir.resolve("connect_sessions.csv") + val operationsCsv = appDir.resolve("connect_operations.csv") + assert(Files.exists(sessionsCsv), s"expected $sessionsCsv to exist") + assert(!Files.exists(operationsCsv), + s"expected no $operationsCsv for session-only Connect app") + + val sessionLines = readAllLines(sessionsCsv) + assert(sessionLines.size == 2, s"unexpected session rows: $sessionLines") + assert(sessionLines(1).contains("sess-1"), + s"expected sess-1 row in session output: ${sessionLines(1)}") + } finally { + deleteRecursively(tmpDir) + } + } + } + + test("qualification raw metrics do not emit statement sidecars by default") { + withQualificationApp(logStartEvent, appStartEvent, envUpdateEvent, appEndEvent) { app => + app.connectSessions.put("sess-1", new ConnectSessionInfo( + sessionId = "sess-1", + userId = "alice", + startTime = 100L, + endTime = Some(500L))) + app.connectOperations.put("op-1", new ConnectOperationInfo( + operationId = "op-1", + sessionId = "sess-1", + userId = "alice", + jobTag = "SparkConnect_OperationTag_User_alice_Session_sess-1_Operation_op-1", + statementText = "SELECT 1 plan body", + startTime = 110L)) + + val tmpDir = Files.createTempDirectory("qual-connect-out-") + try { + QualRawReportGenerator.generateRawMetricQualViewAndGetDataSourceInfo(tmpDir.toString, app) + + val appDir = tmpDir.resolve("raw_metrics").resolve(app.appId) + assert(Files.exists(appDir.resolve("connect_operations.csv")), + s"expected connect_operations.csv under $appDir") + assert(!Files.exists(appDir.resolve("connect_statements")), + s"expected no connect_statements directory under $appDir by default") + } finally { + deleteRecursively(tmpDir) + } + } + } +} diff --git a/user_tools/src/spark_rapids_pytools/resources/databricks_aws-configs.json b/user_tools/src/spark_rapids_pytools/resources/databricks_aws-configs.json index a8573ccaa..e45ff012f 100644 --- a/user_tools/src/spark_rapids_pytools/resources/databricks_aws-configs.json +++ b/user_tools/src/spark_rapids_pytools/resources/databricks_aws-configs.json @@ -19,6 +19,17 @@ "relativePath": "jars/*" } }, + { + "name": "Apache Spark Connect", + "uri": "https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.7/spark-connect_2.12-3.5.7.jar", + "verification": { + "fileHash": { + "algorithm": "sha1", + "value": "11469f1eeb53c250b9bada24fff2f76b25c407eb" + }, + "size": 14147901 + } + }, { "name": "Hadoop AWS", "uri": "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.4/hadoop-aws-3.3.4.jar", @@ -58,6 +69,17 @@ "relativePath": "jars/*" } }, + { + "name": "Apache Spark Connect", + "uri": "https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.6/spark-connect_2.12-3.5.6.jar", + "verification": { + "fileHash": { + "algorithm": "sha1", + "value": "91c16f5383ba28f96f01b92a8553433bc1df0f67" + }, + "size": 14147901 + } + }, { "name": "Hadoop AWS", "uri": "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.4/hadoop-aws-3.3.4.jar", @@ -97,6 +119,17 @@ "relativePath": "jars/*" } }, + { + "name": "Apache Spark Connect", + "uri": "https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.0/spark-connect_2.12-3.5.0.jar", + "verification": { + "fileHash": { + "algorithm": "sha1", + "value": "6a514b94478fbb86217162211991a17612d32a15" + }, + "size": 17177871 + } + }, { "name": "Hadoop AWS", "uri": "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.4/hadoop-aws-3.3.4.jar", diff --git a/user_tools/src/spark_rapids_pytools/resources/databricks_azure-configs.json b/user_tools/src/spark_rapids_pytools/resources/databricks_azure-configs.json index de51a509b..a9a058e63 100644 --- a/user_tools/src/spark_rapids_pytools/resources/databricks_azure-configs.json +++ b/user_tools/src/spark_rapids_pytools/resources/databricks_azure-configs.json @@ -19,6 +19,17 @@ "relativePath": "jars/*" } }, + { + "name": "Apache Spark Connect", + "uri": "https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.7/spark-connect_2.12-3.5.7.jar", + "verification": { + "fileHash": { + "algorithm": "sha1", + "value": "11469f1eeb53c250b9bada24fff2f76b25c407eb" + }, + "size": 14147901 + } + }, { "name": "Hadoop Azure", "uri": "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-azure/3.3.4/hadoop-azure-3.3.4.jar", @@ -47,6 +58,17 @@ "relativePath": "jars/*" } }, + { + "name": "Apache Spark Connect", + "uri": "https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.6/spark-connect_2.12-3.5.6.jar", + "verification": { + "fileHash": { + "algorithm": "sha1", + "value": "91c16f5383ba28f96f01b92a8553433bc1df0f67" + }, + "size": 14147901 + } + }, { "name": "Hadoop Azure", "uri": "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-azure/3.3.4/hadoop-azure-3.3.4.jar", @@ -75,6 +97,17 @@ "relativePath": "jars/*" } }, + { + "name": "Apache Spark Connect", + "uri": "https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.0/spark-connect_2.12-3.5.0.jar", + "verification": { + "fileHash": { + "algorithm": "sha1", + "value": "6a514b94478fbb86217162211991a17612d32a15" + }, + "size": 17177871 + } + }, { "name": "Hadoop Azure", "uri": "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-azure/3.3.4/hadoop-azure-3.3.4.jar", diff --git a/user_tools/src/spark_rapids_pytools/resources/dataproc-configs.json b/user_tools/src/spark_rapids_pytools/resources/dataproc-configs.json index 3a4e39689..a9ffe2829 100644 --- a/user_tools/src/spark_rapids_pytools/resources/dataproc-configs.json +++ b/user_tools/src/spark_rapids_pytools/resources/dataproc-configs.json @@ -19,6 +19,17 @@ "relativePath": "jars/*" } }, + { + "name": "Apache Spark Connect", + "uri": "https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.7/spark-connect_2.12-3.5.7.jar", + "verification": { + "fileHash": { + "algorithm": "sha1", + "value": "11469f1eeb53c250b9bada24fff2f76b25c407eb" + }, + "size": 14147901 + } + }, { "name": "GCS Connector Hadoop3", "uri": "https://repo1.maven.org/maven2/com/google/cloud/bigdataoss/gcs-connector/hadoop3-2.2.14/gcs-connector-hadoop3-2.2.14-shaded.jar", @@ -47,6 +58,17 @@ "relativePath": "jars/*" } }, + { + "name": "Apache Spark Connect", + "uri": "https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.6/spark-connect_2.12-3.5.6.jar", + "verification": { + "fileHash": { + "algorithm": "sha1", + "value": "91c16f5383ba28f96f01b92a8553433bc1df0f67" + }, + "size": 14147901 + } + }, { "name": "GCS Connector Hadoop3", "uri": "https://repo1.maven.org/maven2/com/google/cloud/bigdataoss/gcs-connector/hadoop3-2.2.14/gcs-connector-hadoop3-2.2.14-shaded.jar", @@ -75,6 +97,17 @@ "relativePath": "jars/*" } }, + { + "name": "Apache Spark Connect", + "uri": "https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.0/spark-connect_2.12-3.5.0.jar", + "verification": { + "fileHash": { + "algorithm": "sha1", + "value": "6a514b94478fbb86217162211991a17612d32a15" + }, + "size": 17177871 + } + }, { "name": "GCS Connector Hadoop3", "uri": "https://repo1.maven.org/maven2/com/google/cloud/bigdataoss/gcs-connector/hadoop3-2.2.19/gcs-connector-hadoop3-2.2.19-shaded.jar", diff --git a/user_tools/src/spark_rapids_pytools/resources/dataproc_gke-configs.json b/user_tools/src/spark_rapids_pytools/resources/dataproc_gke-configs.json index ee3779027..1f396d119 100644 --- a/user_tools/src/spark_rapids_pytools/resources/dataproc_gke-configs.json +++ b/user_tools/src/spark_rapids_pytools/resources/dataproc_gke-configs.json @@ -19,6 +19,17 @@ "relativePath": "jars/*" } }, + { + "name": "Apache Spark Connect", + "uri": "https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.7/spark-connect_2.12-3.5.7.jar", + "verification": { + "fileHash": { + "algorithm": "sha1", + "value": "11469f1eeb53c250b9bada24fff2f76b25c407eb" + }, + "size": 14147901 + } + }, { "name": "GCS Connector Hadoop3", "uri": "https://repo1.maven.org/maven2/com/google/cloud/bigdataoss/gcs-connector/hadoop3-2.2.14/gcs-connector-hadoop3-2.2.14-shaded.jar", @@ -47,6 +58,17 @@ "relativePath": "jars/*" } }, + { + "name": "Apache Spark Connect", + "uri": "https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.0/spark-connect_2.12-3.5.0.jar", + "verification": { + "fileHash": { + "algorithm": "sha1", + "value": "6a514b94478fbb86217162211991a17612d32a15" + }, + "size": 17177871 + } + }, { "name": "GCS Connector Hadoop3", "uri": "https://repo1.maven.org/maven2/com/google/cloud/bigdataoss/gcs-connector/hadoop3-2.2.19/gcs-connector-hadoop3-2.2.19-shaded.jar", diff --git a/user_tools/src/spark_rapids_pytools/resources/emr-configs.json b/user_tools/src/spark_rapids_pytools/resources/emr-configs.json index cf651c4b3..7a69e98a3 100644 --- a/user_tools/src/spark_rapids_pytools/resources/emr-configs.json +++ b/user_tools/src/spark_rapids_pytools/resources/emr-configs.json @@ -19,6 +19,17 @@ "relativePath": "jars/*" } }, + { + "name": "Apache Spark Connect", + "uri": "https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.7/spark-connect_2.12-3.5.7.jar", + "verification": { + "fileHash": { + "algorithm": "sha1", + "value": "11469f1eeb53c250b9bada24fff2f76b25c407eb" + }, + "size": 14147901 + } + }, { "name": "Hadoop AWS", "uri": "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.4/hadoop-aws-3.3.4.jar", @@ -58,6 +69,17 @@ "relativePath": "jars/*" } }, + { + "name": "Apache Spark Connect", + "uri": "https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.6/spark-connect_2.12-3.5.6.jar", + "verification": { + "fileHash": { + "algorithm": "sha1", + "value": "91c16f5383ba28f96f01b92a8553433bc1df0f67" + }, + "size": 14147901 + } + }, { "name": "Hadoop AWS", "uri": "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.4/hadoop-aws-3.3.4.jar", @@ -97,6 +119,17 @@ "relativePath": "jars/*" } }, + { + "name": "Apache Spark Connect", + "uri": "https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.0/spark-connect_2.12-3.5.0.jar", + "verification": { + "fileHash": { + "algorithm": "sha1", + "value": "6a514b94478fbb86217162211991a17612d32a15" + }, + "size": 17177871 + } + }, { "name": "Hadoop AWS", "uri": "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.4/hadoop-aws-3.3.4.jar", diff --git a/user_tools/src/spark_rapids_pytools/resources/onprem-configs.json b/user_tools/src/spark_rapids_pytools/resources/onprem-configs.json index eef37fabc..0894aad29 100644 --- a/user_tools/src/spark_rapids_pytools/resources/onprem-configs.json +++ b/user_tools/src/spark_rapids_pytools/resources/onprem-configs.json @@ -18,6 +18,17 @@ "depType": "archive", "relativePath": "jars/*" } + }, + { + "name": "Apache Spark Connect", + "uri": "https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.7/spark-connect_2.12-3.5.7.jar", + "verification": { + "fileHash": { + "algorithm": "sha1", + "value": "11469f1eeb53c250b9bada24fff2f76b25c407eb" + }, + "size": 14147901 + } } ], "356": [ @@ -35,6 +46,17 @@ "depType": "archive", "relativePath": "jars/*" } + }, + { + "name": "Apache Spark Connect", + "uri": "https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.6/spark-connect_2.12-3.5.6.jar", + "verification": { + "fileHash": { + "algorithm": "sha1", + "value": "91c16f5383ba28f96f01b92a8553433bc1df0f67" + }, + "size": 14147901 + } } ], "350": [ @@ -52,6 +74,17 @@ "depType": "archive", "relativePath": "jars/*" } + }, + { + "name": "Apache Spark Connect", + "uri": "https://repo1.maven.org/maven2/org/apache/spark/spark-connect_2.12/3.5.0/spark-connect_2.12-3.5.0.jar", + "verification": { + "fileHash": { + "algorithm": "sha1", + "value": "6a514b94478fbb86217162211991a17612d32a15" + }, + "size": 17177871 + } } ], "342": [ diff --git a/user_tools/src/spark_rapids_tools/api_v1/builder.py b/user_tools/src/spark_rapids_tools/api_v1/builder.py index 0e706d313..9dcf79211 100644 --- a/user_tools/src/spark_rapids_tools/api_v1/builder.py +++ b/user_tools/src/spark_rapids_tools/api_v1/builder.py @@ -1,4 +1,4 @@ -# Copyright (c) 2025, NVIDIA CORPORATION. +# Copyright (c) 2025-2026, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -1169,9 +1169,24 @@ def out_path(self) -> Optional[BoundedCspPath]: def is_empty(self) -> bool: return self._res_h.is_empty() + def get_table_path(self, table_label: str) -> Optional[BoundedCspPath]: + return self._res_h.get_table_path(table_label) + + def get_per_app_table_path(self, table_label: str, app_id: str) -> Optional[BoundedCspPath]: + return self._res_h.get_per_app_table_path(table_label, app_id) + def get_raw_metrics_path(self) -> Optional[BoundedCspPath]: return self._res_h.get_raw_metrics_path() + def get_connect_statements_dir(self, app_id: str) -> Optional[BoundedCspPath]: + return self._res_h.get_connect_statements_dir(app_id) + + def list_connect_statement_ops(self, app_id: str) -> List[str]: + return self._res_h.list_connect_statement_ops(app_id) + + def load_connect_statement(self, app_id: str, operation_id: str) -> Optional[str]: + return self._res_h.load_connect_statement(app_id, operation_id) + @dataclass class ProfWrapper(APIResHandler[ProfWrapperResultHandler]): diff --git a/user_tools/src/spark_rapids_tools/api_v1/report_loader.py b/user_tools/src/spark_rapids_tools/api_v1/report_loader.py index ed0176ffd..eb7b82c6f 100644 --- a/user_tools/src/spark_rapids_tools/api_v1/report_loader.py +++ b/user_tools/src/spark_rapids_tools/api_v1/report_loader.py @@ -1,4 +1,4 @@ -# Copyright (c) 2025, NVIDIA CORPORATION. +# Copyright (c) 2025-2026, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -134,7 +134,8 @@ def core_report_definitions(self) -> List[str]: return [ f'{self.core_report_dir}/qualCoreReport.yaml', f'{self.core_report_dir}/profCoreReport.yaml', - f'{self.core_report_dir}/coreRawMetricsReport.yaml' + f'{self.core_report_dir}/coreRawMetricsReport.yaml', + f'{self.core_report_dir}/connectReport.yaml' ] @property diff --git a/user_tools/src/spark_rapids_tools/api_v1/result_handler.py b/user_tools/src/spark_rapids_tools/api_v1/result_handler.py index d8c3f3574..2e86b7d06 100644 --- a/user_tools/src/spark_rapids_tools/api_v1/result_handler.py +++ b/user_tools/src/spark_rapids_tools/api_v1/result_handler.py @@ -1,4 +1,4 @@ -# Copyright (c) 2025, NVIDIA CORPORATION. +# Copyright (c) 2025-2026, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -28,6 +28,7 @@ from spark_rapids_tools.api_v1 import AppHandler from spark_rapids_tools.api_v1.report_reader import ToolReportReader from spark_rapids_tools.storagelib.cspfs import BoundedCspPath, CspFs +from spark_rapids_tools.utils.data_utils import DataUtils class ResultHandlerBaseMeta: # pylint: disable=too-few-public-methods @@ -87,6 +88,7 @@ class Meta(ResultHandlerBaseMeta): # pylint: disable=too-few-public-methods readers: Dict[str, ToolReportReader] logger: Optional[Logger] = field(default=None) app_handlers: Dict[str, AppHandler] = field(default_factory=dict, init=False) + _connect_statement_unsafe_chars = re.compile(r'[^A-Za-z0-9._-]') def __post_init__(self): # init the logger if it is not defined @@ -225,6 +227,82 @@ def is_empty(self) -> bool: def get_raw_metrics_path(self) -> Optional[BoundedCspPath]: return self.get_reader_path('coreRawMetrics') + def get_per_app_table_path(self, table_label: str, app_id: str) -> Optional[BoundedCspPath]: + """ + Resolve the per-application path for a table definition. + :param table_label: Label of the table definition. + :param app_id: Application ID under the per-app report root. + :return: The resolved path or None when the table/app is not available. + """ + reader = self.get_reader_by_tbl(table_label) + if reader is None or not reader.is_per_app(): + return None + if app_id not in self.app_handlers: + return None + table_def = reader.get_table(table_label) + if table_def is None: + return None + return reader.out_path.create_sub_path(f'{app_id}/{table_def.file_name}') + + def get_connect_statements_dir(self, app_id: str) -> Optional[BoundedCspPath]: + """ + Return the connect_statements directory for a given application, if present. + """ + stmt_dir = self.get_per_app_table_path('connectStatements', app_id) + if stmt_dir is None or not stmt_dir.exists(): + return None + return stmt_dir + + @classmethod + def _sanitize_connect_operation_id(cls, operation_id: str) -> str: + """ + Sanitize operation IDs to the on-disk basename convention used by the Scala writer. + """ + return cls._connect_statement_unsafe_chars.sub('_', operation_id) + + def list_connect_statement_ops(self, app_id: str) -> List[str]: + """ + List the sanitized operation IDs for all statement sidecars of an app. + + Each file under ``//connect_statements/*.txt`` contributes + one entry (``.txt`` stripped). Operation IDs are sanitized to match the on-disk + basename: characters outside ``[A-Za-z0-9._-]`` are replaced with ``_``. Use + ``connect_operations.csv`` and its ``statementFile`` column to recover the + original operation IDs. + + :param app_id: Spark application ID whose sidecar directory should be listed. + :return: Sorted list of sanitized operation IDs, or an empty list when no + ``connect_statements/`` directory exists for the app. + """ + stmt_dir = self.get_connect_statements_dir(app_id) + if stmt_dir is None: + return [] + op_files = CspFs.glob_path( + path=stmt_dir, + pattern=re.compile(r'.*\.txt$'), + item_type=FileType.File, + recursive=False + ) + return sorted([p.base_name().rsplit('.txt', 1)[0] for p in op_files]) + + def load_connect_statement(self, app_id: str, operation_id: str) -> Optional[str]: + """ + Load the statementText sidecar for a single Connect operation. + """ + stmt_dir = self.get_connect_statements_dir(app_id) + if stmt_dir is None: + return None + safe_operation_id = self._sanitize_connect_operation_id(operation_id) + sub_path = stmt_dir.create_sub_path(f'{safe_operation_id}.txt') + if not sub_path.exists(): + return None + txt_res = DataUtils.load_txt(sub_path) + if not txt_res.success or txt_res.data is None: + return None + if isinstance(txt_res.data, bytes): + return txt_res.decode_txt() + return txt_res.data + ######################### # Type Definitions ######################### diff --git a/user_tools/src/spark_rapids_tools/enums.py b/user_tools/src/spark_rapids_tools/enums.py index 5e0ddf05c..e00cf96d0 100644 --- a/user_tools/src/spark_rapids_tools/enums.py +++ b/user_tools/src/spark_rapids_tools/enums.py @@ -1,4 +1,4 @@ -# Copyright (c) 2023-2025, NVIDIA CORPORATION. +# Copyright (c) 2023-2026, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -257,6 +257,7 @@ class ReportTableFormat(EnumeratedType): """Values used to define the format of the report tables""" CSV = 'csv' CONF = 'conf' + DIRECTORY = 'directory' JSON = 'json' LOG = 'log' PROPERTIES = 'properties' @@ -286,6 +287,7 @@ def compatible(self, candidate: Union[str, 'ReportTableFormat']) -> bool: self.CSV: [self.TXT], self.JSON: [self.TXT, self.CSV], # It is possible to convert JSON to CSV (pandas normalizes JSON) self.CONF: [self.PROPERTIES], + self.DIRECTORY: [], self.PROPERTIES: [self.TXT], self.TXT: [] } diff --git a/user_tools/tests/spark_rapids_tools_ut/api/test_connect_e2e.py b/user_tools/tests/spark_rapids_tools_ut/api/test_connect_e2e.py new file mode 100644 index 000000000..1ec89f338 --- /dev/null +++ b/user_tools/tests/spark_rapids_tools_ut/api/test_connect_e2e.py @@ -0,0 +1,78 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Golden roundtrip checks for Spark Connect profiler output.""" + +import shutil +import tempfile +import unittest +from pathlib import Path + +import pandas as pd + +from spark_rapids_tools.api_v1 import ProfCore + + +class TestConnectE2E(unittest.TestCase): + """Verifies a committed Connect profiler output tree is readable end to end.""" + + sample_app_id = 'local-connect-e2e' + expected_statement = 'common { plan_id: 0 } range { start: 0 end: 100 step: 1 }\n' + expected_operation_columns = [ + 'appID', 'operationId', 'sessionId', 'userId', 'jobTag', 'startTime', + 'finishTime', 'closeTime', 'failTime', 'cancelTime', 'durationMs', 'status', + 'errorMessage', 'sqlIds', 'jobIds', 'statementFile', 'statementTruncated' + ] + + def setUp(self): + self.temp_dir = tempfile.mkdtemp() + fixture_root = Path(__file__).resolve().parents[1] / 'resources' / 'connect_e2e' + self.prof_output = Path(self.temp_dir) / 'rapids_4_spark_profile' + shutil.copytree(fixture_root / 'rapids_4_spark_profile', self.prof_output) + + def tearDown(self): + shutil.rmtree(self.temp_dir, ignore_errors=True) + + def test_connect_operations_roundtrip_via_csv_and_api(self): + handler = ProfCore(str(self.prof_output)) + api_res = handler.csv('connectOperations').app(self.sample_app_id).load() + self.assertTrue(api_res.success) + + csv_path = self.prof_output / self.sample_app_id / 'connect_operations.csv' + raw_df = pd.read_csv(csv_path) + api_df = api_res.data + + self.assertEqual(list(raw_df.columns), self.expected_operation_columns) + self.assertEqual(list(api_df.columns), self.expected_operation_columns) + self.assertEqual(list(raw_df['operationId']), ['op-bbb-222', 'op-ccc-333']) + self.assertEqual(list(api_df['operationId'].astype(str)), list(raw_df['operationId'])) + self.assertEqual(api_df.loc[api_df['operationId'] == 'op-bbb-222', 'sqlIds'].iat[0], '42') + self.assertEqual(api_df.loc[api_df['operationId'] == 'op-bbb-222', 'jobIds'].iat[0], '7') + self.assertEqual(api_df.loc[api_df['operationId'] == 'op-ccc-333', 'status'].iat[0], 'FAILED') + + sessions_res = handler.csv('connectSessions').app(self.sample_app_id).load() + self.assertTrue(sessions_res.success) + self.assertEqual(int(sessions_res.data['operationCount'].iat[0]), 2) + + def test_connect_statement_sidecar_roundtrip(self): + handler = ProfCore(str(self.prof_output)) + + stmt_dir = handler.get_connect_statements_dir(self.sample_app_id) + self.assertIsNotNone(stmt_dir) + self.assertEqual(stmt_dir.base_name(), 'connect_statements') + self.assertEqual(handler.list_connect_statement_ops(self.sample_app_id), ['op-bbb-222']) + self.assertEqual( + handler.load_connect_statement(self.sample_app_id, 'op-bbb-222'), + self.expected_statement) + self.assertIsNone(handler.load_connect_statement(self.sample_app_id, 'op-ccc-333')) diff --git a/user_tools/tests/spark_rapids_tools_ut/api/test_connect_helpers.py b/user_tools/tests/spark_rapids_tools_ut/api/test_connect_helpers.py new file mode 100644 index 000000000..c704d5e5a --- /dev/null +++ b/user_tools/tests/spark_rapids_tools_ut/api/test_connect_helpers.py @@ -0,0 +1,100 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for generic artifact-path lookups and Connect-specific helpers.""" + +import os +import shutil +import tempfile +import unittest + +from spark_rapids_tools.api_v1 import ProfCore, ProfWrapper + + +class TestConnectHelpers(unittest.TestCase): + """Verifies listing and reading Connect statement sidecars.""" + + sample_app_id = 'application_1234567890_0001' + + def setUp(self): + self.temp_dir = tempfile.mkdtemp() + self.prof_output = os.path.join(self.temp_dir, 'rapids_4_spark_profile') + self.prof_wrapper_output = os.path.join(self.temp_dir, 'prof_20260422010101_deadbeef') + self.prof_wrapper_core_output = os.path.join(self.prof_wrapper_output, 'rapids_4_spark_profile') + self.app_dir = os.path.join(self.prof_output, self.sample_app_id) + self.statements_dir = os.path.join(self.app_dir, 'connect_statements') + os.makedirs(self.statements_dir, exist_ok=True) + os.makedirs(os.path.join( + self.prof_wrapper_core_output, + self.sample_app_id, + 'connect_statements' + ), exist_ok=True) + + for base_dir in (self.prof_output, self.prof_wrapper_core_output): + app_dir = os.path.join(base_dir, self.sample_app_id) + statements_dir = os.path.join(app_dir, 'connect_statements') + with open(os.path.join(base_dir, 'profiling_status.csv'), 'w', encoding='utf-8') as fh: + fh.write('Event Log,Status,App ID,Attempt ID,App Name,Description\n') + fh.write(f'/path/to/eventlog,SUCCESS,{self.sample_app_id},0,ProfTest,ok\n') + with open(os.path.join(statements_dir, 'op-1.txt'), 'w', encoding='utf-8') as fh: + fh.write('SELECT 1') + with open(os.path.join(statements_dir, 'op-2.txt'), 'w', encoding='utf-8') as fh: + fh.write('SELECT 2') + with open(os.path.join(self.app_dir, 'secret.txt'), 'w', encoding='utf-8') as fh: + fh.write('SECRET') + + def tearDown(self): + shutil.rmtree(self.temp_dir, ignore_errors=True) + + def test_get_connect_statements_dir_returns_per_app_path(self): + handler = ProfCore(self.prof_output) + path = handler.get_connect_statements_dir(self.sample_app_id) + self.assertIsNotNone(path) + self.assertEqual(path.base_name(), 'connect_statements') + + def test_get_per_app_table_path_returns_connect_directory_path(self): + handler = ProfCore(self.prof_output) + path = handler.get_per_app_table_path('connectStatements', self.sample_app_id) + self.assertIsNotNone(path) + self.assertEqual(path.base_name(), 'connect_statements') + self.assertTrue(str(path).endswith(f'/{self.sample_app_id}/connect_statements')) + + def test_get_table_path_resolves_nested_core_artifacts_from_wrapper(self): + handler = ProfWrapper(self.prof_wrapper_output) + status_path = handler.get_table_path('coreCSVStatus') + stmt_dir = handler.get_per_app_table_path('connectStatements', self.sample_app_id) + self.assertIsNotNone(status_path) + self.assertIsNotNone(stmt_dir) + self.assertTrue(str(status_path).endswith('/rapids_4_spark_profile/profiling_status.csv')) + self.assertTrue(str(stmt_dir).endswith( + f'/rapids_4_spark_profile/{self.sample_app_id}/connect_statements')) + + def test_list_connect_statement_ops_returns_sorted_operation_ids(self): + handler = ProfCore(self.prof_output) + ops = handler.list_connect_statement_ops(self.sample_app_id) + self.assertEqual(ops, ['op-1', 'op-2']) + + def test_load_connect_statement_reads_file(self): + handler = ProfCore(self.prof_output) + text = handler.load_connect_statement(self.sample_app_id, 'op-2') + self.assertEqual(text, 'SELECT 2') + + def test_load_connect_statement_missing_returns_none(self): + handler = ProfCore(self.prof_output) + self.assertIsNone(handler.load_connect_statement(self.sample_app_id, 'missing-op')) + self.assertIsNone(handler.load_connect_statement('missing-app', 'op-1')) + + def test_load_connect_statement_sanitizes_operation_id_before_reading(self): + handler = ProfCore(self.prof_output) + self.assertIsNone(handler.load_connect_statement(self.sample_app_id, '../secret')) diff --git a/user_tools/tests/spark_rapids_tools_ut/api/test_connect_report_loader.py b/user_tools/tests/spark_rapids_tools_ut/api/test_connect_report_loader.py new file mode 100644 index 000000000..78ae64281 --- /dev/null +++ b/user_tools/tests/spark_rapids_tools_ut/api/test_connect_report_loader.py @@ -0,0 +1,75 @@ +# Copyright (c) 2026, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Smoke tests for Connect report discovery through YAML catalogs.""" + +import os +import shutil +import tempfile +import unittest + +from spark_rapids_tools.api_v1 import ProfCore, QualCore + + +class TestConnectReportLoader(unittest.TestCase): + """Verifies connectReport is discoverable from prof/qual result handlers.""" + + sample_app_id = 'application_1234567890_0001' + + def setUp(self): + self.temp_dir = tempfile.mkdtemp() + self.prof_output = os.path.join(self.temp_dir, 'rapids_4_spark_profile') + self.qual_output = os.path.join(self.temp_dir, 'qual_core_output') + os.makedirs(self.prof_output, exist_ok=True) + os.makedirs(self.qual_output, exist_ok=True) + self._write_prof_status() + self._write_qual_status() + + def tearDown(self): + shutil.rmtree(self.temp_dir, ignore_errors=True) + + def _write_prof_status(self): + status_csv = os.path.join(self.prof_output, 'profiling_status.csv') + with open(status_csv, 'w', encoding='utf-8') as fh: + fh.write('Event Log,Status,App ID,Attempt ID,App Name,Description\n') + fh.write(f'/path/to/eventlog,SUCCESS,{self.sample_app_id},0,ProfTest,ok\n') + + def _write_qual_status(self): + status_csv = os.path.join(self.qual_output, 'status.csv') + with open(status_csv, 'w', encoding='utf-8') as fh: + fh.write('Event Log,Status,App ID,Attempt ID,App Name,Description\n') + fh.write(f'/path/to/eventlog,SUCCESS,{self.sample_app_id},0,QualTest,ok\n') + + def test_prof_core_registers_connect_tables(self): + handler = ProfCore(self.prof_output).handler + + for label in ('connectSessions', 'connectOperations', 'connectStatements'): + self.assertIn(label, handler.tbl_reader_map) + self.assertTrue(handler.is_per_app_tbl(label)) + + reader = handler.get_reader_by_tbl('connectStatements') + self.assertIsNotNone(reader) + self.assertEqual(reader.report_id, 'connectReport') + + def test_qual_core_registers_connect_tables_under_raw_metrics(self): + handler = QualCore(self.qual_output).handler + + for label in ('connectSessions', 'connectOperations', 'connectStatements'): + self.assertIn(label, handler.tbl_reader_map) + self.assertTrue(handler.is_per_app_tbl(label)) + + reader = handler.get_reader_by_tbl('connectStatements') + self.assertIsNotNone(reader) + self.assertEqual(reader.report_id, 'connectReport') + self.assertTrue(str(reader.out_path).endswith('/raw_metrics')) diff --git a/user_tools/tests/spark_rapids_tools_ut/resources/connect_e2e/rapids_4_spark_profile/local-connect-e2e/connect_operations.csv b/user_tools/tests/spark_rapids_tools_ut/resources/connect_e2e/rapids_4_spark_profile/local-connect-e2e/connect_operations.csv new file mode 100644 index 000000000..478916abf --- /dev/null +++ b/user_tools/tests/spark_rapids_tools_ut/resources/connect_e2e/rapids_4_spark_profile/local-connect-e2e/connect_operations.csv @@ -0,0 +1,3 @@ +appID,operationId,sessionId,userId,jobTag,startTime,finishTime,closeTime,failTime,cancelTime,durationMs,status,errorMessage,sqlIds,jobIds,statementFile,statementTruncated +local-connect-e2e,op-bbb-222,sess-aaa-111,userA,SparkConnect_OperationTag_User_userA_Session_sess-aaa-111_Operation_op-bbb-222,120000,125000,125500,,,5500,SUCCEEDED,,42,7,op-bbb-222.txt,false +local-connect-e2e,op-ccc-333,sess-aaa-111,userA,SparkConnect_OperationTag_User_userA_Session_sess-aaa-111_Operation_op-ccc-333,130000,,,131000,,1000,FAILED,boom,,,,false diff --git a/user_tools/tests/spark_rapids_tools_ut/resources/connect_e2e/rapids_4_spark_profile/local-connect-e2e/connect_sessions.csv b/user_tools/tests/spark_rapids_tools_ut/resources/connect_e2e/rapids_4_spark_profile/local-connect-e2e/connect_sessions.csv new file mode 100644 index 000000000..b274c772c --- /dev/null +++ b/user_tools/tests/spark_rapids_tools_ut/resources/connect_e2e/rapids_4_spark_profile/local-connect-e2e/connect_sessions.csv @@ -0,0 +1,2 @@ +appID,sessionId,userId,startTime,endTime,durationMs,operationCount +local-connect-e2e,sess-aaa-111,userA,110000,190000,80000,2 diff --git a/user_tools/tests/spark_rapids_tools_ut/resources/connect_e2e/rapids_4_spark_profile/local-connect-e2e/connect_statements/op-bbb-222.txt b/user_tools/tests/spark_rapids_tools_ut/resources/connect_e2e/rapids_4_spark_profile/local-connect-e2e/connect_statements/op-bbb-222.txt new file mode 100644 index 000000000..b1b897177 --- /dev/null +++ b/user_tools/tests/spark_rapids_tools_ut/resources/connect_e2e/rapids_4_spark_profile/local-connect-e2e/connect_statements/op-bbb-222.txt @@ -0,0 +1 @@ +common { plan_id: 0 } range { start: 0 end: 100 step: 1 } diff --git a/user_tools/tests/spark_rapids_tools_ut/resources/connect_e2e/rapids_4_spark_profile/profiling_status.csv b/user_tools/tests/spark_rapids_tools_ut/resources/connect_e2e/rapids_4_spark_profile/profiling_status.csv new file mode 100644 index 000000000..8513b4bf2 --- /dev/null +++ b/user_tools/tests/spark_rapids_tools_ut/resources/connect_e2e/rapids_4_spark_profile/profiling_status.csv @@ -0,0 +1,2 @@ +Event Log,Status,App ID,Attempt ID,App Name,Description +/path/to/connect_with_sql_job,SUCCESS,local-connect-e2e,0,ConnectE2E,Processing time: 1ms