From 5e422ea3a11da49d8752691cb2d55694311f8289 Mon Sep 17 00:00:00 2001 From: Chong Gao Date: Fri, 28 Apr 2023 20:57:25 +0800 Subject: [PATCH 1/2] Support power run on a notebook Signed-off-by: Chong Gao Copyright Signed-off-by: Chong Gao Refactor; Support spark.properties; Add more info into readme.md Update doc --- .gitignore | 4 ++ nds/PysparkBenchReport.py | 2 +- nds/README.md | 4 ++ nds/__init__.py | 31 +++++++++ nds/configs_in_zip/__init__.py | 31 +++++++++ nds/configs_in_zip/parameter | 1 + nds/configs_in_zip/query_0.sql | 16 +++++ nds/configs_in_zip/readme.md | 57 ++++++++++++++++ nds/configs_in_zip/spark.properties | 0 nds/nds_gen_query_stream.py | 2 +- nds/nds_power.py | 101 +++++++++++++++++++++------- nds/nds_transcode.py | 2 +- 12 files changed, 222 insertions(+), 29 deletions(-) create mode 100644 .gitignore create mode 100644 nds/__init__.py create mode 100644 nds/configs_in_zip/__init__.py create mode 100644 nds/configs_in_zip/parameter create mode 100644 nds/configs_in_zip/query_0.sql create mode 100644 nds/configs_in_zip/readme.md create mode 100644 nds/configs_in_zip/spark.properties diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..a0c7d55 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +__pycache__ +.vscode +target +*.zip diff --git a/nds/PysparkBenchReport.py b/nds/PysparkBenchReport.py index f2632e8..0fce991 100644 --- a/nds/PysparkBenchReport.py +++ b/nds/PysparkBenchReport.py @@ -37,7 +37,7 @@ from typing import Callable from pyspark.sql import SparkSession -import python_listener +from . import python_listener class PysparkBenchReport: """Class to generate json summary report for a benchmark diff --git a/nds/README.md b/nds/README.md index fd8c84a..6e975f5 100644 --- a/nds/README.md +++ b/nds/README.md @@ -363,6 +363,10 @@ time.csv \ --output_format parquet ``` +### Power Run on Notebook +User can also [run POWER RUN on a notebook](configs_in_zip/readme.md) if user is on EKS, Databricks or other cloud +environment. + ### Throughput Run Throughput Run simulates the scenario that multiple query sessions are running simultaneously in diff --git a/nds/__init__.py b/nds/__init__.py new file mode 100644 index 0000000..d3eb6b7 --- /dev/null +++ b/nds/__init__.py @@ -0,0 +1,31 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# SPDX-FileCopyrightText: Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ----- +# +# Certain portions of the contents of this file are derived from TPC-DS version 3.2.0 +# (retrieved from www.tpc.org/tpc_documents_current_versions/current_specifications5.asp). +# Such portions are subject to copyrights held by Transaction Processing Performance Council (“TPC”) +# and licensed under the TPC EULA (a copy of which accompanies this file as “TPC EULA” and is also +# available at http://www.tpc.org/tpc_documents_current_versions/current_specifications5.asp) (the “TPC EULA”). +# +# You may not use this file except in compliance with the TPC EULA. +# DISCLAIMER: Portions of this file is derived from the TPC-DS Benchmark and as such any results +# obtained using this file are not comparable to published TPC-DS Benchmark results, as the results +# obtained from using this file do not comply with the TPC-DS Benchmark. +# diff --git a/nds/configs_in_zip/__init__.py b/nds/configs_in_zip/__init__.py new file mode 100644 index 0000000..d3eb6b7 --- /dev/null +++ b/nds/configs_in_zip/__init__.py @@ -0,0 +1,31 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# SPDX-FileCopyrightText: Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ----- +# +# Certain portions of the contents of this file are derived from TPC-DS version 3.2.0 +# (retrieved from www.tpc.org/tpc_documents_current_versions/current_specifications5.asp). +# Such portions are subject to copyrights held by Transaction Processing Performance Council (“TPC”) +# and licensed under the TPC EULA (a copy of which accompanies this file as “TPC EULA” and is also +# available at http://www.tpc.org/tpc_documents_current_versions/current_specifications5.asp) (the “TPC EULA”). +# +# You may not use this file except in compliance with the TPC EULA. +# DISCLAIMER: Portions of this file is derived from the TPC-DS Benchmark and as such any results +# obtained using this file are not comparable to published TPC-DS Benchmark results, as the results +# obtained from using this file do not comply with the TPC-DS Benchmark. +# diff --git a/nds/configs_in_zip/parameter b/nds/configs_in_zip/parameter new file mode 100644 index 0000000..bf3f0b9 --- /dev/null +++ b/nds/configs_in_zip/parameter @@ -0,0 +1 @@ +s3://ndsv2-data/parquet_sf1000 time-123.csv --output_prefix s3://chongg/test/tmp --json_summary_folder 500 --keep_sc \ No newline at end of file diff --git a/nds/configs_in_zip/query_0.sql b/nds/configs_in_zip/query_0.sql new file mode 100644 index 0000000..b8ab96f --- /dev/null +++ b/nds/configs_in_zip/query_0.sql @@ -0,0 +1,16 @@ +-- start query 1 in stream 0 using template query96.tpl +select count(*) +from store_sales + ,household_demographics + ,time_dim, store +where ss_sold_time_sk = time_dim.t_time_sk + and ss_hdemo_sk = household_demographics.hd_demo_sk + and ss_store_sk = s_store_sk + and time_dim.t_hour = 8 + and time_dim.t_minute >= 30 + and household_demographics.hd_dep_count = 5 + and store.s_store_name = 'ese' +order by count(*) + LIMIT 100; + +-- end query 1 in stream 0 using template query96.tpl diff --git a/nds/configs_in_zip/readme.md b/nds/configs_in_zip/readme.md new file mode 100644 index 0000000..6d5f1da --- /dev/null +++ b/nds/configs_in_zip/readme.md @@ -0,0 +1,57 @@ +This document describes how to do [Power Run](../README.md#power-run) on Notebook. +The approach is binding all the needed parameters to a zip file, then execute the zip file on Notebook. +First config the files in [configs_in_zip](../configs_in_zip), +then compress the [nds folder](../../nds) into a zip, +Finally run the `nds_power.py` in the zip on Notebook. +# config parameter files +## parameter file +e.g.: +``` +s3://ndsv2-data/parquet_sf1000 time-123.csv --output_prefix s3://chongg/test/tmp --json_summary_folder 500 --keep_sc +``` +`s3://ndsv2-data/parquet_sf1000` is the value of `input_prefix` parameter +`time-123.csv` is the value of `time_log` parameter +`s3://chongg/test/tmp` is the value of `output_prefix` parameter +`500` is the value of `json_summary_folder` parameter +Note: please do not specify `property_file`, refer to the next section to config Spark property file. +For more details, refer to the parameters in the [Power Run](../README.md#power-run) +## spark.properties file +Specify the Spark properties. +e.g.: +``` +spark.executor.memoryOverhead=512M +``` +## query_0.sql +This file is the stream file. +Put all the queries into this file. +For how to generate stream file, refer to [README](../README.md). +# create NDS zip file and put it into S3 +Do the following commands to zip all the nds folder into a zip file. +``` +cd spark-rapids-benchmarks +zip -r nds.zip nds +aws s3 cp nds.zip s3://path/to/this/zip +``` +# run nds_power on notebook +``` +spark.sparkContext.addPyFile("s3://path/to/this/zip") +from nds import nds_power +nds_power.run_query_stream_for_zip() +``` +# How to run another Power RUN with different parameters +Updating the zip file in S3 does not take effective because Spark have caches for the zip file. +You can put another zip file in to S3. +Do the following: +``` +cd spark-rapids-benchmarks +cp -r nds +# update the parameter files +zip -r .zip +aws s3 cp .zip s3:// +``` +On Notebook run: +``` +spark.sparkContext.addPyFile("s3://") +from import nds_power +nds_power.run_query_stream_for_zip() +``` \ No newline at end of file diff --git a/nds/configs_in_zip/spark.properties b/nds/configs_in_zip/spark.properties new file mode 100644 index 0000000..e69de29 diff --git a/nds/nds_gen_query_stream.py b/nds/nds_gen_query_stream.py index 1c97658..f699322 100644 --- a/nds/nds_gen_query_stream.py +++ b/nds/nds_gen_query_stream.py @@ -35,7 +35,7 @@ import subprocess import sys -from check import check_build, check_version, get_abs_path +from .check import check_build, check_version, get_abs_path check_version() diff --git a/nds/nds_power.py b/nds/nds_power.py index 886ce43..be2c0c0 100644 --- a/nds/nds_power.py +++ b/nds/nds_power.py @@ -34,14 +34,15 @@ import csv import os import time +import uuid from collections import OrderedDict from pyspark.sql import SparkSession -from PysparkBenchReport import PysparkBenchReport +from .PysparkBenchReport import PysparkBenchReport from pyspark.sql import DataFrame -from check import check_json_summary_folder, check_query_subset_exists, check_version -from nds_gen_query_stream import split_special_query -from nds_schema import get_schemas +from .check import check_json_summary_folder, check_query_subset_exists, check_version +from .nds_gen_query_stream import split_special_query +from .nds_schema import get_schemas check_version() @@ -57,6 +58,9 @@ def gen_sql_from_stream(query_stream_file_path): """ with open(query_stream_file_path, 'r') as f: stream = f.read() + gen_sql_from_stream_text(stream) + +def gen_sql_from_stream_text(stream): all_queries = stream.split('-- start')[1:] # split query in query14, query23, query24, query39 extended_queries = OrderedDict() @@ -141,9 +145,9 @@ def is_column_part(char): return char.isalpha() or char.isdigit() or char == '_' def is_valid(column_name): - return len(column_name) > 0 and is_column_start(column_name[0]) and all( + len(column_name) > 0 and is_column_start(column_name[0]) and all( [is_column_part(char) for char in column_name[1:]]) - + def make_valid(column_name): # To simplify: replace all invalid char with '_' valid_name = '' @@ -157,7 +161,7 @@ def make_valid(column_name): else: valid_name += char return valid_name - + def deduplicate(column_names): # In some queries like q35, it's possible to get columns with the same name. Append a number # suffix to resolve this problem. @@ -182,6 +186,7 @@ def get_query_subset(query_dict, subset): def run_query_stream(input_prefix, property_file, + spark_properties, query_dict, time_log_output_path, extra_time_log_output_path, @@ -198,7 +203,9 @@ def run_query_stream(input_prefix, for easy accesibility. TempView Creation time is also recorded. Args: - input_prefix (str): path of input data or warehouse if input_format is "iceberg" or hive_external=True. + input_prefix (str): path of input data or warehouse if input_format is "iceberg". + property_file (str): the path of spark property file + spark_properties (Dict): the property dict loaded from above property_file query_dict (OrderedDict): ordered dict {query_name: query_content} of all TPC-DS queries runnable in Spark time_log_output_path (str): path of the log that contains query execution time, both local and HDFS path are supported. @@ -216,26 +223,18 @@ def run_query_stream(input_prefix, app_name = "NDS - " + list(query_dict.keys())[0] else: app_name = "NDS - Power Run" - # Execute Power Run or Specific query in Spark + # Execute Power Run or Specific query in Spark # build Spark Session session_builder = SparkSession.builder - if property_file: - spark_properties = load_properties(property_file) - for k,v in spark_properties.items(): - session_builder = session_builder.config(k,v) + for k,v in spark_properties.items(): + session_builder = session_builder.config(k,v) if input_format == 'iceberg': session_builder.config("spark.sql.catalog.spark_catalog.warehouse", input_prefix) if input_format == 'delta' and not delta_unmanaged: session_builder.config("spark.sql.warehouse.dir", input_prefix) - session_builder.enableHiveSupport() - if hive_external: - session_builder.enableHiveSupport() - + session_builder.config("spark.sql.catalogImplementation", "hive") spark_session = session_builder.appName( app_name).getOrCreate() - if hive_external: - spark_session.catalog.setCurrentDatabase(input_prefix) - if input_format == 'delta' and delta_unmanaged: # Register tables for Delta Lake. This is only needed for unmanaged tables. execution_time_list = register_delta_tables(spark_session, input_prefix, execution_time_list) @@ -304,14 +303,54 @@ def run_query_stream(input_prefix, time_df.coalesce(1).write.csv(extra_time_log_output_path) def load_properties(filename): - myvars = {} with open(filename) as myfile: - for line in myfile: - name, var = line.partition("=")[::2] - myvars[name.strip()] = var.strip() + lines = myfile.readlines() + return load_properties_from_lines(lines) + +def load_properties_from_lines(lines): + myvars = {} + for line in lines: + name, var = line.partition("=")[::2] + myvars[name.strip()] = var.strip() return myvars -if __name__ == "__main__": +def run_query_stream_in_zip(): + try: + import importlib.resources as pkg_resources + except ImportError: + # Try backported to PY<37 `importlib_resources`. + import importlib_resources as pkg_resources + from . import configs_in_zip # relative-import the *package* containing the templates + stream = pkg_resources.read_text(configs_in_zip, 'query_0.sql') + parameters = pkg_resources.read_text(configs_in_zip, 'parameter') + parser = get_parser(use_local_stream_file = False) + args = parser.parse_args(parameters.split()) + query_dict = gen_sql_from_stream_text(stream) + + if args.property_file is not None: + raise Exception("Please do not specify --property_file, " + + "instead update the spark.properies file in NDS zip file, " + + "path is nds/configs_in_zip/spark.properies") + spark_properties_text = pkg_resources.read_text(configs_in_zip, 'spark.properties') + spark_properties = load_properties_from_lines(spark_properties_text.splitlines()) + + run_query_stream(args.input_prefix, + 'spark.properties', + spark_properties, + query_dict, + args.time_log, + args.extra_time_log, + args.sub_queries, + args.input_format, + not args.floats, + args.output_prefix, + args.output_format, + args.json_summary_folder + str(uuid.uuid1()), # append uuid to avoid folder conflict + args.delta_unmanaged, + args.keep_sc, + args.hive) + +def get_parser(use_local_stream_file = True): parser = parser = argparse.ArgumentParser() parser.add_argument('input_prefix', help='text to prepend to every input file path (e.g., "hdfs:///ds-generated-data"). ' + @@ -320,7 +359,8 @@ def load_properties(filename): 'session name "spark_catalog" is supported now, customized catalog is not ' + 'yet supported. Note if this points to a Delta Lake table, the path must be ' + 'absolute. Issue: https://github.com/delta-io/delta/issues/555') - parser.add_argument('query_stream_file', + if use_local_stream_file: + parser.add_argument('query_stream_file', help='query stream file that contains NDS queries in specific order') parser.add_argument('time_log', help='path to execution time log, only support local path.', @@ -371,10 +411,19 @@ def load_properties(filename): 'in the stream file will be run. e.g. "query1,query2,query3". Note, use ' + '"_part1" and "_part2" suffix for the following query names: ' + 'query14, query23, query24, query39. e.g. query14_part1, query39_part2') + return parser + +if __name__ == "__main__": + parser = get_parser() args = parser.parse_args() query_dict = gen_sql_from_stream(args.query_stream_file) + if args.property_file: + spark_properties = load_properties(args.property_file) + else: + spark_properties = {} run_query_stream(args.input_prefix, args.property_file, + spark_properties, query_dict, args.time_log, args.extra_time_log, diff --git a/nds/nds_transcode.py b/nds/nds_transcode.py index 7c8a69b..50a5638 100644 --- a/nds/nds_transcode.py +++ b/nds/nds_transcode.py @@ -39,7 +39,7 @@ from pyspark.sql.types import * from pyspark.sql.functions import col -from nds_schema import * +from .nds_schema import * # Note the specific partitioning is applied when save the parquet data files. TABLE_PARTITIONING = { From 5dbfbc201b4af2f43aded8be2d5772651879fb5c Mon Sep 17 00:00:00 2001 From: Chong Gao Date: Thu, 1 Jun 2023 12:00:54 +0800 Subject: [PATCH 2/2] Update readme.md; modify parameter to not save result as AB test does Signed-off-by: Chong Gao --- nds/configs_in_zip/parameter | 2 +- nds/configs_in_zip/readme.md | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/nds/configs_in_zip/parameter b/nds/configs_in_zip/parameter index bf3f0b9..d833553 100644 --- a/nds/configs_in_zip/parameter +++ b/nds/configs_in_zip/parameter @@ -1 +1 @@ -s3://ndsv2-data/parquet_sf1000 time-123.csv --output_prefix s3://chongg/test/tmp --json_summary_folder 500 --keep_sc \ No newline at end of file +s3://ndsv2-data/parquet_sf3000 time-123.csv --json_summary_folder 500 --keep_sc diff --git a/nds/configs_in_zip/readme.md b/nds/configs_in_zip/readme.md index 6d5f1da..4a4862d 100644 --- a/nds/configs_in_zip/readme.md +++ b/nds/configs_in_zip/readme.md @@ -36,7 +36,7 @@ aws s3 cp nds.zip s3://path/to/this/zip ``` spark.sparkContext.addPyFile("s3://path/to/this/zip") from nds import nds_power -nds_power.run_query_stream_for_zip() +nds_power.run_query_stream_in_zip() ``` # How to run another Power RUN with different parameters Updating the zip file in S3 does not take effective because Spark have caches for the zip file. @@ -53,5 +53,5 @@ On Notebook run: ``` spark.sparkContext.addPyFile("s3://") from import nds_power -nds_power.run_query_stream_for_zip() -``` \ No newline at end of file +nds_power.run_query_stream_in_zip() +```