-
Notifications
You must be signed in to change notification settings - Fork 49
feat(connect): add phase 3 metadata reporting #2081
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
sayedbilalbari
wants to merge
20
commits into
NVIDIA:dev
Choose a base branch
from
sayedbilalbari:sbari-issue-2065
base: dev
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
20 commits
Select commit
Hold shift + click to select a range
aece5d8
feat(connect): add sqlID/jobID reverse indexes on AppBase
sayedbilalbari 20e7046
feat(connect): populate operationIdToSqlIds from SQLExecutionStart.jo…
sayedbilalbari 8c855dc
feat(connect): populate operationIdToJobIds from JobStart spark.job.tags
sayedbilalbari 1ef9ae0
feat(connect): add ConnectSession/ConnectOperation profile results
sayedbilalbari 4a79192
feat(connect): emit connect_sessions.csv and connect_operations.csv f…
sayedbilalbari 227c206
feat(connect): write statementText sidecar files per Connect operation
sayedbilalbari a68c9ce
feat(connect): emit connect CSVs and statement sidecars from qualific…
sayedbilalbari 6457e39
feat(connect): register connectReport in qual and prof YAML catalogs
sayedbilalbari 4e96bdc
feat(connect): Python API helpers for listing and reading statement f…
sayedbilalbari 244252e
test(connect): add profiler-to-python roundtrip golden fixture
sayedbilalbari afea0dd
test(connect): assert golden connect CSV header order
sayedbilalbari 225ba74
chore(connect): fix Scala style issues from verification
sayedbilalbari f0fc5b4
chore: refresh expired license headers
sayedbilalbari b2746b5
fix(connect): sanitize sidecar paths and keep session-only logs
sayedbilalbari 1ad999a
fix(connect): tighten sidecar handling and trim schema
sayedbilalbari ff0ffda
test(connect): clarify truncation marker semantics
sayedbilalbari ed5a4cf
fix(connect): address greptile review feedback
sayedbilalbari 623b464
fix(connect): scalastyle, scaladoc, and audit cleanups
sayedbilalbari 9ed68bc
Merge nv/dev into sbari-issue-2065
sayedbilalbari 04de3cc
fix: add Spark Connect runtime dependencies
sayedbilalbari File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
121 changes: 121 additions & 0 deletions
121
core/src/main/resources/configs/reports/connectReport.yaml
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,121 @@ | ||
| # Copyright (c) 2026, NVIDIA CORPORATION. | ||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| # Spark Connect session / operation report written per application by both the | ||
| # profiling and qualification tools. | ||
|
|
||
| reportDefinitions: | ||
| - reportId: connectReport | ||
| description: >- | ||
| Per-application Spark Connect session and operation metadata plus a | ||
| directory of per-operation protobuf-text statement payloads. | ||
| scope: per-app | ||
| tableDefinitions: | ||
| - label: connectSessions | ||
| description: >- | ||
| One row per Spark Connect session observed in the event log. | ||
| fileName: connect_sessions.csv | ||
| scope: per-app | ||
| columns: | ||
| - name: appID | ||
| dataType: String | ||
| description: Application ID. | ||
| - name: sessionId | ||
| dataType: String | ||
| description: UUID of the Spark Connect session. | ||
| - name: userId | ||
| dataType: String | ||
| description: User who owns the session. | ||
| - name: startTime | ||
| dataType: Long | ||
| description: Epoch millis when the session started. | ||
| - name: endTime | ||
| dataType: Long | ||
| description: Epoch millis when the session closed, if observed. | ||
| - name: durationMs | ||
| dataType: Long | ||
| description: Session duration in milliseconds, or -1 if still open. | ||
| - name: operationCount | ||
| dataType: Long | ||
| description: Number of operations observed in the session. | ||
| - label: connectOperations | ||
| description: >- | ||
| One row per Spark Connect operation with core lifecycle timestamps, | ||
| linked SQL/job identifiers, and statement sidecar metadata. | ||
| fileName: connect_operations.csv | ||
| scope: per-app | ||
| columns: | ||
| - name: appID | ||
| dataType: String | ||
| description: Application ID. | ||
| - name: operationId | ||
| dataType: String | ||
| description: UUID of the Spark Connect operation. | ||
| - name: sessionId | ||
| dataType: String | ||
| description: Session UUID that owns the operation. | ||
| - name: userId | ||
| dataType: String | ||
| description: User who submitted the operation. | ||
| - name: jobTag | ||
| dataType: String | ||
| description: Correlation tag shared with SQLExecutionStart and JobStart events. | ||
| - name: startTime | ||
| dataType: Long | ||
| description: Epoch millis when the operation started. | ||
| - name: finishTime | ||
| dataType: Long | ||
| description: Epoch millis when execution completed, if observed. | ||
| - name: closeTime | ||
| dataType: Long | ||
| description: Epoch millis when the operation closed, if observed. | ||
| - name: failTime | ||
| dataType: Long | ||
| description: Epoch millis when the operation failed, if observed. | ||
| - name: cancelTime | ||
| dataType: Long | ||
| description: Epoch millis when the operation was canceled, if observed. | ||
| - name: durationMs | ||
| dataType: Long | ||
| description: Derived end-to-end operation duration in milliseconds. | ||
| - name: status | ||
| dataType: String | ||
| description: Derived operation status (RUNNING, SUCCEEDED, FAILED, CANCELED). | ||
| - name: errorMessage | ||
| dataType: String | ||
| description: Error message reported by a failure event, if present. | ||
| - name: sqlIds | ||
| dataType: String | ||
| description: Semicolon-separated SQL execution IDs correlated to the operation. | ||
| - name: jobIds | ||
| dataType: String | ||
| description: Semicolon-separated Spark job IDs correlated to the operation. | ||
| - name: statementFile | ||
| dataType: String | ||
| description: Sidecar filename under connect_statements/ for this operation, if written. | ||
| - name: statementTruncated | ||
| dataType: Boolean | ||
| description: >- | ||
| True when statementText includes Spark's textual truncation | ||
| marker. This does not detect Spark 4.x depth-based subtree | ||
| elision, which collapses nested structures to {} without a | ||
| marker. | ||
| - label: connectStatements | ||
| description: >- | ||
| Directory of per-operation statementText sidecars. Each | ||
| <operationId>.txt file contains the protobuf debug-format text of the | ||
| Connect operation statement when Connect statement sidecars are enabled. | ||
| fileName: connect_statements | ||
| fileFormat: DIRECTORY | ||
| scope: per-app | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
199 changes: 199 additions & 0 deletions
199
core/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ConnectProfileResults.scala
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,199 @@ | ||
| /* | ||
| * Copyright (c) 2026, NVIDIA CORPORATION. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package com.nvidia.spark.rapids.tool.profiling | ||
|
|
||
| import com.nvidia.spark.rapids.tool.views.OutHeaderRegistry | ||
|
|
||
| import org.apache.spark.sql.rapids.tool.util.StringUtils | ||
|
|
||
| /** | ||
| * CSV row for a Spark Connect session. Serializes the lifecycle metadata for a | ||
| * single session into the columns registered under | ||
| * `ConnectSessionProfileResult` in [[com.nvidia.spark.rapids.tool.views.OutHeaderRegistry]]. | ||
| * | ||
| * `durationMs` is `endTime - startTime` when `endTime` is defined, else `-1` | ||
| * (matches the convention used for open/unfinished sessions in other result | ||
| * classes). | ||
| */ | ||
| case class ConnectSessionProfileResult( | ||
| appId: String, | ||
| sessionId: String, | ||
| userId: String, | ||
| startTime: Long, | ||
| endTime: Option[Long], | ||
| operationCount: Long) extends ProfileResult { | ||
|
|
||
| override def outputHeaders: Array[String] = { | ||
| OutHeaderRegistry.outputHeaders("ConnectSessionProfileResult") | ||
| } | ||
|
|
||
| override def convertToSeq(): Array[String] = { | ||
| Array( | ||
| appId, | ||
| sessionId, | ||
| userId, | ||
| startTime.toString, | ||
| endTime.map(_.toString).orNull, | ||
| endTime.map(e => (e - startTime).toString).getOrElse("-1"), | ||
| operationCount.toString) | ||
| } | ||
|
|
||
| override def convertToCSVSeq(): Array[String] = { | ||
| Array( | ||
| StringUtils.reformatCSVString(appId), | ||
| StringUtils.reformatCSVString(sessionId), | ||
| StringUtils.reformatCSVString(userId), | ||
| startTime.toString, | ||
| endTime.map(_.toString).orNull, | ||
| endTime.map(e => (e - startTime).toString).getOrElse("-1"), | ||
| operationCount.toString) | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * CSV row for a single Spark Connect operation. Captures the core lifecycle | ||
| * (start/finish/close/fail/cancel timestamps), derived status, error message, | ||
| * and the joined sqlIDs/jobIDs. Also records the sidecar basename for the | ||
| * `connect_statements/<operationId>.txt` artifact. | ||
| * | ||
| * sqlIds and jobIds are serialized semicolon-separated to keep the CSV | ||
| * single-column and avoid quoting issues. | ||
| */ | ||
| case class ConnectOperationProfileResult( | ||
| appId: String, | ||
| operationId: String, | ||
| sessionId: String, | ||
| userId: String, | ||
| jobTag: String, | ||
| startTime: Long, | ||
| finishTime: Option[Long], | ||
| closeTime: Option[Long], | ||
| failTime: Option[Long], | ||
| cancelTime: Option[Long], | ||
| durationMs: Long, | ||
| status: String, | ||
| errorMessage: Option[String], | ||
| sqlIds: Seq[Long], | ||
| jobIds: Seq[Int], | ||
| statementFile: Option[String], | ||
| statementTruncated: Boolean) extends ProfileResult { | ||
|
|
||
| override def outputHeaders: Array[String] = { | ||
| OutHeaderRegistry.outputHeaders("ConnectOperationProfileResult") | ||
| } | ||
|
|
||
| override def convertToSeq(): Array[String] = { | ||
| Array( | ||
| appId, | ||
| operationId, | ||
| sessionId, | ||
| userId, | ||
| jobTag, | ||
| startTime.toString, | ||
| finishTime.map(_.toString).orNull, | ||
| closeTime.map(_.toString).orNull, | ||
| failTime.map(_.toString).orNull, | ||
| cancelTime.map(_.toString).orNull, | ||
| durationMs.toString, | ||
| status, | ||
| errorMessage.getOrElse(""), | ||
| sqlIds.mkString(";"), | ||
| jobIds.mkString(";"), | ||
| statementFile.getOrElse(""), | ||
| statementTruncated.toString) | ||
| } | ||
|
|
||
| override def convertToCSVSeq(): Array[String] = { | ||
| Array( | ||
| StringUtils.reformatCSVString(appId), | ||
| StringUtils.reformatCSVString(operationId), | ||
| StringUtils.reformatCSVString(sessionId), | ||
| StringUtils.reformatCSVString(userId), | ||
| StringUtils.reformatCSVString(jobTag), | ||
| startTime.toString, | ||
| finishTime.map(_.toString).orNull, | ||
| closeTime.map(_.toString).orNull, | ||
| failTime.map(_.toString).orNull, | ||
| cancelTime.map(_.toString).orNull, | ||
| durationMs.toString, | ||
| StringUtils.reformatCSVString(status), | ||
| StringUtils.reformatCSVString(errorMessage.getOrElse("")), | ||
| StringUtils.reformatCSVString(sqlIds.mkString(";")), | ||
| StringUtils.reformatCSVString(jobIds.mkString(";")), | ||
| StringUtils.reformatCSVString(statementFile.getOrElse("")), | ||
| statementTruncated.toString) | ||
| } | ||
| } | ||
|
|
||
| object ConnectOperationProfileResult { | ||
|
|
||
| /** | ||
| * Marker substring embedded by the Spark Connect server-side abbreviator when | ||
| * a statement/plan text exceeds its configured limit. Presence of this marker | ||
| * in `statementText` indicates the artifact we persist is a truncated | ||
| * representation of the original plan. | ||
| * | ||
| * Note: Spark 4.x also performs depth-based structural elision by collapsing | ||
| * subtrees to `{}` once the protobuf text formatter exceeds its nesting cap. | ||
| * Those cases do not emit a textual marker, so `statementTruncated` is | ||
| * intentionally limited to marker-based truncation only. | ||
| */ | ||
| private[profiling] val TruncationMarker: String = "[truncated(size=" | ||
|
|
||
| /** | ||
| * Derives operation status from the observed lifecycle timestamps. | ||
| * Priority: CANCELED -> FAILED -> SUCCEEDED -> RUNNING. | ||
| * CANCELED precedes FAILED because server-side cancellation sometimes | ||
| * surfaces a trailing failure event we should not misattribute. | ||
| */ | ||
| private def deriveStatus(op: ConnectOperationInfo): String = { | ||
| if (op.cancelTime.isDefined) "CANCELED" | ||
| else if (op.failTime.isDefined) "FAILED" | ||
| else if (op.finishTime.isDefined || op.closeTime.isDefined) "SUCCEEDED" | ||
| else "RUNNING" | ||
| } | ||
|
|
||
| def from( | ||
| appId: String, | ||
| op: ConnectOperationInfo, | ||
| sqlIds: Seq[Long], | ||
| jobIds: Seq[Int], | ||
|
sayedbilalbari marked this conversation as resolved.
|
||
| statementFile: Option[String]): ConnectOperationProfileResult = { | ||
| val endForDuration = | ||
| op.closeTime.orElse(op.finishTime).orElse(op.failTime).orElse(op.cancelTime) | ||
| val durationMs = endForDuration.map(_ - op.startTime).getOrElse(-1L) | ||
| val statementTruncated = op.statementText.contains(TruncationMarker) | ||
| ConnectOperationProfileResult( | ||
| appId = appId, | ||
| operationId = op.operationId, | ||
| sessionId = op.sessionId, | ||
| userId = op.userId, | ||
| jobTag = op.jobTag, | ||
| startTime = op.startTime, | ||
| finishTime = op.finishTime, | ||
| closeTime = op.closeTime, | ||
| failTime = op.failTime, | ||
| cancelTime = op.cancelTime, | ||
| durationMs = durationMs, | ||
| status = deriveStatus(op), | ||
| errorMessage = op.errorMessage, | ||
| sqlIds = sqlIds, | ||
| jobIds = jobIds, | ||
| statementFile = statementFile, | ||
| statementTruncated = statementTruncated) | ||
| } | ||
| } | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are there any industry standard format file extension for storing TextProto files?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very valid point. The canonical protobuf TextFormat is
.txtpbwhich means this is a valid proto output. But in our case the statementText is a diagnostic file that can be truncated. So better to have .txt file. Does not imply a complete protobug TextFormat file