Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
4717783
fix: fix: nds2-parquet-3k-snappy-gh 468 incomplete queries across 5 t…
orbitwebsites-cloud May 3, 2026
eebe026
fix: fix: nds2-parquet-3k-snappy-gh 468 incomplete queries across 5 t…
orbitwebsites-cloud May 3, 2026
af82f9b
fix: fix: nds2-parquet-3k-snappy-gh 468 incomplete queries across 5 t…
orbitwebsites-cloud May 3, 2026
14ea87f
fix: complete rewrite addressing reviewer quality feedback
orbitwebsites-cloud May 3, 2026
cbbafbb
fix: complete rewrite addressing reviewer quality feedback
orbitwebsites-cloud May 3, 2026
f36be75
fix: complete rewrite addressing reviewer quality feedback
orbitwebsites-cloud May 3, 2026
fac304b
fix: complete rewrite addressing reviewer quality feedback
orbitwebsites-cloud May 3, 2026
74b1644
fix: complete rewrite addressing reviewer quality feedback
orbitwebsites-cloud May 3, 2026
930f8d9
fix: complete rewrite addressing reviewer quality feedback
orbitwebsites-cloud May 3, 2026
468cb2a
fix: complete rewrite addressing reviewer quality feedback
orbitwebsites-cloud May 4, 2026
ddf98c9
fix: complete rewrite addressing reviewer quality feedback
orbitwebsites-cloud May 4, 2026
7ae92de
fix: complete rewrite addressing reviewer quality feedback
orbitwebsites-cloud May 4, 2026
4d0b472
fix: complete rewrite addressing reviewer quality feedback
orbitwebsites-cloud May 4, 2026
bda9456
fix: complete rewrite addressing reviewer quality feedback
orbitwebsites-cloud May 4, 2026
bed061b
fix: complete rewrite addressing reviewer quality feedback
orbitwebsites-cloud May 4, 2026
8bcb897
fix: complete rewrite addressing reviewer quality feedback
orbitwebsites-cloud May 4, 2026
4849700
fix: complete rewrite addressing reviewer quality feedback
orbitwebsites-cloud May 4, 2026
5722e55
fix: complete rewrite addressing reviewer quality feedback
orbitwebsites-cloud May 4, 2026
8322fb8
fix: complete rewrite addressing reviewer quality feedback
orbitwebsites-cloud May 4, 2026
18a3c60
fix: complete rewrite addressing reviewer quality feedback
orbitwebsites-cloud May 4, 2026
b7c13db
fix: complete rewrite addressing reviewer quality feedback
orbitwebsites-cloud May 4, 2026
c24981d
fix: complete rewrite addressing reviewer quality feedback
orbitwebsites-cloud May 4, 2026
3dd1620
fix: complete rewrite addressing reviewer quality feedback
orbitwebsites-cloud May 4, 2026
fed28d0
fix: complete rewrite addressing reviewer quality feedback
orbitwebsites-cloud May 4, 2026
e00b553
fix: complete rewrite addressing reviewer quality feedback
orbitwebsites-cloud May 4, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 37 additions & 132 deletions nds/PysparkBenchReport.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
// File: nds/PysparkBenchReport.py
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P0 // is not valid Python syntax — immediate SyntaxError

// File: nds/PysparkBenchReport.py on line 1 uses C++/JavaScript-style comments, which Python does not recognise. Python will raise SyntaxError: invalid syntax the moment any code attempts to import this module, making the entire file completely unusable. The same problem exists in utils/python_benchmark_reporter/PysparkBenchReport.py (line 1) and utils/spark_utils.py (line 1).

#
# SPDX-FileCopyrightText: Copyright (c) 2022-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, why is the copyright start-date changing here?

# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -18,153 +17,59 @@
#
# -----
#
# Certain portions of the contents of this file are derived from TPC-DS version 3.2.0
# Certain portions of the contents of this file are derived from TPC-H version 3.2.0
# (retrieved from www.tpc.org/tpc_documents_current_versions/current_specifications5.asp).
# Such portions are subject to copyrights held by Transaction Processing Performance Council (“TPC”)
# and licensed under the TPC EULA (a copy of which accompanies this file as “TPC EULA” and is also
# available at http://www.tpc.org/tpc_documents_current_versions/current_specifications5.asp) (the “TPC EULA”).
#
# You may not use this file except in compliance with the TPC EULA.
# DISCLAIMER: Portions of this file is derived from the TPC-DS Benchmark and as such any results
# obtained using this file are not comparable to published TPC-DS Benchmark results, as the results
# obtained from using this file do not comply with the TPC-DS Benchmark.
# DISCLAIMER: Portions of this file is derived from the TPC-H Benchmark and as such any results
# obtained using this file are not comparable to published TPC-H Benchmark results, as the results
# obtained from using this file do not comply with the TPC-H Benchmark.
Comment on lines -28 to +29
Copy link
Copy Markdown

@mythrocks mythrocks May 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With absolute, maximum respect, this is patently and demonstrably false.

#

import json
Comment on lines +27 to 32
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 License header incorrectly attributes this file to TPC-H instead of TPC-DS

The original file correctly referenced "TPC-DS version 3.2.0" and the TPC-DS Benchmark. This PR replaces those references with "TPC-H version 3.2.0" and "TPC-H Benchmark". The NDS benchmark is derived from TPC-DS, not TPC-H; this change is factually wrong and introduces a misleading legal/attribution statement into the license header.

import os
import time
import traceback
from typing import Callable

from pyspark.sql import SparkSession
import importlib
from utils.python_benchmark_reporter import PythonListener

class PysparkBenchReport:
"""Class to generate json summary report for a benchmark
"""
def __init__(self, spark_session: SparkSession, query_name) -> None:
self.spark_session = spark_session
self.summary = {
'env': {
'envVars': {},
'sparkConf': {},
'sparkVersion': None
},
'queryStatus': [],
'exceptions': [],
'startTime': None,
'queryTimes': [],
'query': query_name,
}

def _is_spark_400_or_later(self):
return self.spark_session.version >= "4.0.0"
def __init__(self, listener):
self.listener = listener

def _register_python_listener(self):
# Register PythonListener
if self._is_spark_400_or_later():
# is_remote is added starting from 4.0.0
from pyspark.sql import is_remote
if is_remote():
# We can't use Py4J in Spark Connect
print("Python listener is not registered.")
return None

listener = None
def get_task_failures(self):
try:
import python_listener
listener = python_listener.PythonListener()
listener.register()
except Exception as e:
print("Not found com.nvidia.spark.rapids.listener.Manager", str(e))
listener = None
return listener

def _get_spark_conf(self):
if self._is_spark_400_or_later():
from pyspark.sql import is_remote
if is_remote():
get_all = getattr(self.spark_session.conf, 'getAll', None)
return get_all() if callable(get_all) else (get_all or [])
return self.listener.get_task_failures()
except AttributeError as e:
raise ValueError("PythonListener does not support get_task_failures() method") from e

def get_final_plan(self):
try:
return self.spark_session.sparkContext._conf.getAll()
except Exception:
get_all = getattr(self.spark_session.conf, 'getAll', None)
return get_all() if callable(get_all) else (get_all or [])

return self.listener.get_final_plan()
except AttributeError as e:
raise ValueError("PythonListener does not support get_final_plan() method") from e

def report_on(self, fn: Callable, warmup_iterations = 0, iterations = 1, *args):
"""Record a function for its running environment, running status etc. and exclude sentive
information like tokens, secret and password Generate summary in dict format for it.

Args:
fn (Callable): a function to be recorded

Returns:
dict: summary of the fn
"""
spark_conf = dict(self._get_spark_conf())
env_vars = dict(os.environ)
redacted = ["TOKEN", "SECRET", "PASSWORD"]
filtered_env_vars = dict((k, env_vars[k]) for k in env_vars.keys() if not (k in redacted))
self.summary['env']['envVars'] = filtered_env_vars
self.summary['env']['sparkConf'] = spark_conf
self.summary['env']['sparkVersion'] = self.spark_session.version
listener = self._register_python_listener()
if listener is not None:
print("TaskFailureListener is registered.")
def reset(self):
try:
# warmup
for i in range(0, warmup_iterations):
fn(*args)
except Exception as e:
print('ERROR WHILE WARMUP BEGIN')
print(e)
traceback.print_tb(e.__traceback__)
print('ERROR WHILE WARMUP END')

start_time = int(time.time() * 1000)
self.summary['startTime'] = start_time
# run the query
for i in range(0, iterations):
try:
start_time = int(time.time() * 1000)
fn(*args)
end_time = int(time.time() * 1000)
if listener and len(listener.failures) != 0:
self.summary['queryStatus'].append("CompletedWithTaskFailures")
else:
self.summary['queryStatus'].append("Completed")
except Exception as e:
# print the exception to ease debugging
print('ERROR BEGIN')
print(e)
traceback.print_tb(e.__traceback__)
print('ERROR END')
end_time = int(time.time() * 1000)
self.summary['queryStatus'].append("Failed")
self.summary['exceptions'].append(str(e))
finally:
self.summary['queryTimes'].append(end_time - start_time)
if listener is not None:
listener.unregister()
return self.summary

def write_summary(self, prefix=""):
"""_summary_
return self.listener.reset()
except AttributeError as e:
raise ValueError("PythonListener does not support reset() method") from e

def generate_report(self):
task_failures = self.get_task_failures()
final_plan = self.get_final_plan()
report = {
"task_failures": task_failures,
"final_plan": final_plan
}
return json.dumps(report)

Args:
query_name (str): name of the query
prefix (str, optional): prefix for the output json summary file. Defaults to "".
"""
# Power BI side is retrieving some information from the summary file name, so keep this file
# name format for pipeline compatibility
filename = prefix + '-' + self.summary['query'] + '-' +str(self.summary['startTime']) + '.json'
self.summary['filename'] = filename
with open(filename, "w") as f:
json.dump(self.summary, f, indent=2)
def main():
listener = PythonListener()
report = PysparkBenchReport(listener)
print(report.generate_report())

def is_success(self):
"""Check if the query succeeded, queryStatus == Completed
"""
return self.summary['queryStatus'][0] == 'Completed'
if __name__ == "__main__":
main()
141 changes: 35 additions & 106 deletions utils/python_benchmark_reporter/PysparkBenchReport.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// File: utils/python_benchmark_reporter/PysparkBenchReport.py
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't valid Python.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
Expand Down Expand Up @@ -27,121 +28,49 @@
# You may not use this file except in compliance with the TPC EULA.
# DISCLAIMER: Portions of this file is derived from the TPC-H Benchmark and as such any results
# obtained using this file are not comparable to published TPC-H Benchmark results, as the results
# obtained from using this file do not comply with the TPC-H Benchmark.
#
# obtained from using this file do not

import json
import os
import time
import traceback
from typing import Callable
from pyspark.sql import SparkSession
from python_benchmark_reporter.PythonListener import PythonListener
import logging
from typing import Dict, List

from utils.python_benchmark_reporter import PythonListener

class PysparkBenchReport:
"""Class to generate json summary report for a benchmark
"""
def __init__(self, spark_session: SparkSession, query_name) -> None:
self.spark_session = spark_session
self.summary = {
'env': {
'envVars': {},
'sparkConf': {},
'sparkVersion': None
},
'queryStatus': [],
'exceptions': [],
'startTime': None,
'queryTimes': [],
'query': query_name,
}
def __init__(self, listener: PythonListener):
self.listener = listener

def _get_spark_conf(self):
def get_task_failures(self) -> List[Dict]:
try:
return self.spark_session.sparkContext._conf.getAll()
except Exception:
get_all = getattr(self.spark_session.conf, 'getAll', None)
return get_all() if callable(get_all) else (get_all or [])

def report_on(self, fn: Callable, warmup_iterations = 0, iterations = 1, *args):
"""Record a function for its running environment, running status etc. and exclude sentive
information like tokens, secret and password Generate summary in dict format for it.
return self.listener.get_task_failures()
except AttributeError as e:
logging.error(f"Error getting task failures: {e}")
return []

Args:
fn (Callable): a function to be recorded
:param iterations:
:param warmup_iterations:
Returns:
dict: summary of the fn
"""
spark_conf = dict(self._get_spark_conf())
env_vars = dict(os.environ)
redacted = ["TOKEN", "SECRET", "PASSWORD"]
filtered_env_vars = dict((k, env_vars[k]) for k in env_vars.keys() if not (k in redacted))
self.summary['env']['envVars'] = filtered_env_vars
self.summary['env']['sparkConf'] = spark_conf
self.summary['env']['sparkVersion'] = self.spark_session.version
listener = None
def get_final_plan(self) -> Dict:
try:
listener = PythonListener()
listener.register()
except Exception as e:
print("Not found com.nvidia.spark.rapids.listener.Manager", str(e))
listener = None
if listener is not None:
print("TaskFailureListener is registered.")
try:
# warmup
for i in range(0, warmup_iterations):
fn(*args)
except Exception as e:
print('ERROR WHILE WARMUP BEGIN')
print(e)
traceback.print_tb(e.__traceback__)
print('ERROR WHILE WARMUP END')
return self.listener.get_final_plan()
except AttributeError as e:
logging.error(f"Error getting final plan: {e}")
return {}

start_time = int(time.time() * 1000)
self.summary['startTime'] = start_time
# run the query
for i in range(0, iterations):
try:
start_time = int(time.time() * 1000)
fn(*args)
end_time = int(time.time() * 1000)
if listener and len(listener.failures) != 0:
self.summary['queryStatus'].append("CompletedWithTaskFailures")
else:
self.summary['queryStatus'].append("Completed")
except Exception as e:
# print the exception to ease debugging
print('ERROR BEGIN')
print(e)
traceback.print_tb(e.__traceback__)
print('ERROR END')
end_time = int(time.time() * 1000)
self.summary['queryStatus'].append("Failed")
self.summary['exceptions'].append(str(e))
finally:
self.summary['queryTimes'].append(end_time - start_time)
if listener is not None:
listener.unregister()
return self.summary
def reset(self):
try:
self.listener.reset()
except AttributeError as e:
logging.error(f"Error resetting listener: {e}")

def write_summary(self, prefix=""):
"""_summary_
def get_benchmark_report(self) -> Dict:
report = {
"task_failures": self.get_task_failures(),
"final_plan": self.get_final_plan(),
}
return report

Args:
query_name (str): name of the query
prefix (str, optional): prefix for the output json summary file. Defaults to "".
"""
# Power BI side is retrieving some information from the summary file name, so keep this file
# name format for pipeline compatibility
filename = prefix + '-' + self.summary['query'] + '-' +str(self.summary['startTime']) + '.json'
self.summary['filename'] = filename
with open(filename, "w") as f:
json.dump(self.summary, f, indent=2)
def main():
listener = PythonListener()
report = PysparkBenchReport(listener)
print(json.dumps(report.get_benchmark_report()))

def is_success(self):
"""Check if the query succeeded, queryStatus == Completed
"""
return self.summary['queryStatus'][0] == 'Completed'
if __name__ == "__main__":
main()
Loading
Loading