diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..55563e8 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,47 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "Python Debugger: XFaaS Run Benchmark", + "type": "debugpy", + "request": "launch", + "program": "${file}", + "console": "integratedTerminal", + "args": [ + "--csp", "aws", + "--region", "ap-south-1", + "--max-rps", "2", + "--duration", "60", + "--payload-size", "small", + "--dynamism", "static", + "--wf-name", "test?", + "--wf-user-directory", "", + "--path-to-client-config", "./crack/client_config.json", + "--client-key", "debug", + "--dag-file-name", "", + "--teardown-flag", "", + "--is_singleton_wf", "0", + "--function-class", "graph", + "--function-name", "graph_bft", + "--function-code", "test_code", + "--node_name", "TestNode", + "--XFBENCH_DIR", "crack" + ] + }, + { + "name": "Python Debugger: XFaaS Main", + "type": "debugpy", + "request": "launch", + "program": "${file}", + "console": "integratedTerminal", + "args": ["./crack/XFBench/workflows/custom_workflows/math_processing_wf", + "dag.json", + "benchmark.json", + "openwhisk", "ap-south-1" + ] + } + ] +} \ No newline at end of file diff --git a/install_nodejs.sh b/install_nodejs.sh new file mode 100644 index 0000000..ada288f --- /dev/null +++ b/install_nodejs.sh @@ -0,0 +1,4 @@ +NODEJS_DIR=$1 + +export N_PREFIX=$NODEJS_DIR +curl -fsSL https://raw.githubusercontent.com/tj/n/master/bin/n | bash -s 14.20.0 diff --git a/serwo/benchmark_resources/openwhisk_jmx_template.jmx b/serwo/benchmark_resources/openwhisk_jmx_template.jmx new file mode 100644 index 0000000..49b02a1 --- /dev/null +++ b/serwo/benchmark_resources/openwhisk_jmx_template.jmx @@ -0,0 +1,167 @@ + + + + + + false + true + false + + + + + + + + continue + + false + -1 + + 2 + 1 + true + DURATION + + true + + + + 1 + + throughput + RPS + 0.0 + + + + + true + + + + false + + { + "workflow_instance_id": ${__counter(FALSE)}, + "request_timestamp": ${__time(,)}, + "session_id": "SESSION", + "deployment_id": "DEPLOYMENT_ID", + OPENWHISK_PAYLOAD_TO_REPLACE + } + + = + + + + + + https + + URL + POST + true + false + true + false + + + + + + + + + content-type + application/json + + + accept + application/json + + + Authorization + Basic BASE64_AUTH + + + + + + + false + + saveConfig + + + true + true + true + + true + true + true + true + false + true + true + false + false + false + true + false + false + false + true + 0 + true + true + true + true + true + true + + + JMETER_RESPONSES_PATH + + + + + false + + saveConfig + + + true + true + true + + true + true + true + true + false + true + true + false + false + false + true + false + false + false + true + 0 + true + true + true + true + true + true + + + + + + + + \ No newline at end of file diff --git a/serwo/openwhisk_create_statemachine.py b/serwo/openwhisk_create_statemachine.py new file mode 100644 index 0000000..b468744 --- /dev/null +++ b/serwo/openwhisk_create_statemachine.py @@ -0,0 +1,523 @@ +import os +import glob +import json +import shutil +import zipfile +import subprocess +from pathlib import Path +from jinja2 import Environment, FileSystemLoader + +from python.src.utils.classes.openwhisk.user_dag import UserDag +from python.src.utils.classes.commons.logger import LoggerFactory + +logger = LoggerFactory.get_logger(__file__, log_level="INFO") + + +class OpenWhisk: + def __init__(self, user_dir, dag_file_name, part_id): + self.__user_dir = Path(user_dir) + self.__dag_file_name = dag_file_name + self.__part_id = part_id + + self.__parent_directory_path = Path(__file__).parent + + # xfaas specfic directories + self.__serwo_build_dir = self.__user_dir / "build" / "workflow" + self.__serwo_resources_dir = self.__serwo_build_dir / "resources" + + # This holds utility scripts and templates + self.__serwo_utils_dir = self.__parent_directory_path / "python" + self.__runner_template_dir = self.__serwo_utils_dir / "src" / "runner-templates" / "openwhisk" + + self.__dag_definition_path = self.__user_dir / self.__dag_file_name + + # openwhisk specific directories + self.__openwhisk_build_dir = self.__serwo_build_dir / "openwhisk" + self.__openwhisk_functions_dir = self.__openwhisk_build_dir / "functions" + # This holds the zip files to be deployed + self.__openwhisk_artifacts_dir = self.__openwhisk_build_dir / "artifacts" + # This holds the bash script to setup local nodejs for openwhisk composer + self.__openwhisk_helpers_dir = self.__openwhisk_build_dir / "helpers" + self.__openwhisk_helpers_nodejs_dir = self.__openwhisk_helpers_dir / "local_nodejs" + + # OpenWhisk configuration parameters with sensible defaults suggested in OW documentation + self.__action_namespace = os.environ.get("XFAAS_OW_ACTION_NS", "guest") + self.__action_concurrency = int(os.environ.get("XFAAS_OW_ACTION_CONCURRENCY", 1)) + self.__action_timeout = int(os.environ.get("XFAAS_OW_ACTION_TIMEOUT", 300000)) + self.__action_memory = int(os.environ.get("XFAAS_OW_ACTION_MEMORY", 256)) # This is only used for orchestrator action for now + self.__redis_url = os.environ.get("XFAAS_OW_REDIS_URL", "redis://owdev-redis.openwhisk.svc.cluster.local:6379") + self.__ignore_certs = bool(os.environ.get("XFAAS_OW_REDIS_IGNORE_CERTS", 1)) # 1/0 for True or False + + # DAG related parameters + self.__user_dag = UserDag(self.__dag_definition_path) + self.__openwhisk_workflow_orchestrator_action_name = f"/{self.__action_namespace}/{self.__user_dag.get_user_dag_name()}/orchestrator" + self.__openwhisk_composer_input_path = self.__openwhisk_build_dir / f"{self.__user_dag.get_user_dag_name()}_workflow_composition.js" + self.__openwhisk_composer_output_path = self.__openwhisk_build_dir / f"{self.__user_dag.get_user_dag_name()}_workflow_composition.json" + + + def __create_environment(self): + """ + Create the directories to put functions into. + * /functions/ directory holds the converted functions + * /artifacts/ directory contains the zip files to be deployed to OpenWhisk + """ + # TODO: Check me removing and shizz + if os.path.exists(self.__openwhisk_functions_dir): + shutil.rmtree(self.__openwhisk_functions_dir) + + if os.path.exists(self.__openwhisk_artifacts_dir): + shutil.rmtree(self.__openwhisk_artifacts_dir) + + # Not deleting this to save the time spent on downloading nodejs + # if os.path.exists(self.__openwhisk_helpers_dir): + # shutil.rmtree(self.__openwhisk_helpers_dir) + + self.__openwhisk_functions_dir.mkdir(parents=True, exist_ok=True) + self.__openwhisk_artifacts_dir.mkdir(parents=True, exist_ok=True) + self.__openwhisk_helpers_dir.mkdir(parents=True, exist_ok=True) + self.__openwhisk_helpers_nodejs_dir.mkdir(parents=True, exist_ok=True) + self.__serwo_resources_dir.mkdir(parents=True, exist_ok=True) + + # Copying the bash script to install a local nodejs copy (used by openwhisk-composer) + shutil.copyfile(src=self.__runner_template_dir/"local_nodejs_installer.sh", dst=self.__openwhisk_helpers_dir/"local_nodejs_installer.sh") + + + def __render_runner_template(self, runner_template_dir, function_id, function_name, function_runner_filename): + """ + """ + runner_template_filename = function_runner_filename + + try: + file_loader = FileSystemLoader(runner_template_dir) + env = Environment(loader=file_loader) + template = env.get_template("runner_template.py") + logger.info( + f"Created jinja2 environement for templating Openwhisk function ids for function::{function_name}" + ) + except: + raise Exception( + f"Unable to load environment for Openwhisk function id templating for function::{function_name}" + ) + + # render the template + try: + output = template.render(function_id_placeholder=function_id) + with open(f"{runner_template_dir}/{runner_template_filename}", "w") as out: + out.write(output) + logger.info( + f"Rendered and flushed the runner template for Openwhisk function for function::{function_name}" + ) + except Exception as e: + logger.error(e) + raise Exception( + f"Unable to render the function runner template for Openwhisk function for function::{function_name}" + ) + + return runner_template_filename + + + def __append_xfaas_default_requirements(self, filepath): + """ + Appends XFaaS dependencies to the function requirements + """ + with open(filepath, "r") as file: + lines = file.readlines() + lines.append("psutil\n") + lines.append("objsize\n") + unqiue_dependencies = set(lines) + file.flush() + + with open(filepath, "w") as file: + for line in [x.strip("\n") for x in sorted(unqiue_dependencies)]: + file.write(f"{line}\n") + + + def __create_openwhisk_actions( + self, + user_fn_path, + fn_name, + fn_module_name, + runner_template_filename, + runner_template_dir, + ): + """ + user_fn_path: Workflow code is taken from hardcoded XFaaS samples from this path + """ + fn_requirements_filename = "requirements.txt" + src_fn_dir = user_fn_path / fn_requirements_filename + dst_fn_dir = self.__openwhisk_functions_dir / fn_name + + dst_requirements_path = dst_fn_dir / fn_requirements_filename + + logger.info(f"Creating function directory for {fn_name}") + if not os.path.exists(dst_fn_dir): + os.makedirs(dst_fn_dir) + + logger.info(f"Moving requirements file for {fn_name} for user at to {dst_fn_dir}") + shutil.copyfile(src=src_fn_dir, dst=dst_requirements_path) + + logger.info(f"Adding default requirements {fn_name}") + self.__append_xfaas_default_requirements(dst_requirements_path) + + # place the dependencies folder from the user function path if it exists + if os.path.exists(user_fn_path / "dependencies"): + shutil.copytree( + user_fn_path / "dependencies", + dst_fn_dir / "dependencies", + dirs_exist_ok=True + ) + + logger.info(f"Moving xfaas boilerplate for {fn_name}") + shutil.copytree(src=self.__serwo_utils_dir, dst=dst_fn_dir / "python", dirs_exist_ok=True) + + logger.info(f"Generating Runners for function {fn_name}") + + fnr_string = f"USER_FUNCTION_PLACEHOLDER" + temp_runner_path = user_fn_path / f"{fn_name}_temp_runner.py" + runner_template_path = runner_template_dir / runner_template_filename + + print("Here", runner_template_path) + with open(runner_template_path, "r") as file: + contents = file.read() + contents = contents.replace(fnr_string, fn_module_name) + + with open(temp_runner_path, "w") as file: + file.write(contents) + + # XFaaS code provided by the user comes as the module name due to + # import USER_FUNCTION_PLACEHOLDER being replaced by the module name + final_import_file_src_path = user_fn_path / f"{fn_module_name}.py" + final_import_file_dst_path = dst_fn_dir / f"{fn_module_name}.py" + final_runner_file_path = dst_fn_dir / "__main__.py" + + os.system(f"cp {final_import_file_src_path} {final_import_file_dst_path}") + + # Standalone runner from other clouds is replaced by __main__.py file for OpenWhisk + os.system(f"cp {temp_runner_path} {final_runner_file_path}") + + logger.info(f"Deleting temporary runner") + os.remove(temp_runner_path) + + logger.info(f"Successfully created build directory for function {fn_name}") + + + def __create_standalone_runners(self): + """ + The function entrypoint is __main__.py due to OpenWhisk deployment + requirements for large library imports + """ + function_metadata_list = self.__user_dag.get_node_param_list() + function_object_map = self.__user_dag.get_node_object_map() + + for function_metadata in function_metadata_list: + function_name = function_metadata["name"] + function_runner_filename = "__main__" # OpenWhisk requires the main file name to be __main__.py + + function_runner_filename = function_object_map[ + function_metadata["name"] + ].get_runner_filename() + + function_path = function_object_map[function_name].get_path() + function_module_name = function_object_map[function_name].get_module_name() + function_id = function_object_map[function_name].get_id() + + runner_template_filename = self.__render_runner_template( + runner_template_dir=self.__runner_template_dir, + function_id=function_id, + function_name=function_name, + function_runner_filename=function_runner_filename + ) + + logger.info(f"Starting Standalone Runner Creation for function {function_name}") + + self.__create_openwhisk_actions( + self.__parent_directory_path / function_path, + function_name, + function_module_name, + function_runner_filename, + self.__runner_template_dir, + ) + + runner_template_filepath = self.__runner_template_dir / runner_template_filename + logger.info(f"Deleting Temporary Runner Template at {runner_template_filepath}") + os.remove(f"{runner_template_filepath}") + + + def __create_workflow_orchestrator(self, file_name_prefix): + """ + Generates a js file to send as an input to openwhisk composer. + See: https://github.com/apache/openwhisk-composer + """ + composer_file_path = self.__openwhisk_build_dir / f"{file_name_prefix}_workflow_composition.js" + + generated_code = self.__user_dag.get_orchestrator_code() + with open(composer_file_path, "w") as f: + f.write(generated_code) + + + def build_resources(self): + """ + First of the 3 main functions for the workflow. + + 1. Creates OpenWhisk's action compatible source code function files + 2. TODO: Creates openwhisk composer js files + """ + logger.info("*" * 30) + logger.info("Build Resources: Started") + logger.info("*" * 30) + + logger.info(f"Creating environment for {self.__user_dag.get_user_dag_name()}") + self.__create_environment() + + logger.info(f"Initating standalone runner creation for {self.__user_dag.get_user_dag_name()}") + self.__create_standalone_runners() + + logger.info(f"Initating openwhisk composer js orchestrator for {self.__user_dag.get_user_dag_name()}") + self.__create_workflow_orchestrator(self.__user_dag.get_user_dag_name()) + + logger.info("*" * 30) + logger.info("Build Resources: Success") + logger.info("*" * 30) + + + def build_workflow(self): + """ + Second of the 3 main functions for the workflow. + - Creates zip artifacts for all the functions + - Downloads a local nodejs copy using "n" + - Converts the openwhisk-composer js file into json + + ------------ + Assumptions: + ------------ + - wsk tool is setup -> User's responsibility + """ + # Create a zip artifact from each function + for func_name in self.__user_dag.get_node_object_map(): + output_artifact_path = self.__openwhisk_build_dir/"artifacts"/f"{func_name}.zip" + + files_to_zip = [] + for file in os.listdir(self.__openwhisk_functions_dir/func_name): + if file.endswith(".py"): + curr_file_path = self.__openwhisk_functions_dir/func_name/file + # basename thing is required to unzip correctly + files_to_zip.append([curr_file_path, os.path.basename(curr_file_path)]) + + with zipfile.ZipFile(output_artifact_path, "w") as archive: + for fz in files_to_zip: + archive.write(filename=fz[0], arcname=os.path.basename(fz[1])) + + req_file_path = self.__openwhisk_functions_dir/func_name/"requirements.txt" + archive.write(filename=req_file_path, arcname=os.path.basename(req_file_path)) + + xfaas_folder_path = str(self.__openwhisk_functions_dir/func_name/"python"/"**"/"*") + for file in glob.glob(xfaas_folder_path, recursive=True): + path = file.split("python/")[1] + archive.write(filename=file, arcname=os.path.join("python", path)) + + dependencies_folder_path = str(self.__openwhisk_functions_dir/func_name/"dependencies"/"**"/"*") + for file in glob.glob(dependencies_folder_path, recursive=True): + path = file.split("dependencies/")[1] + archive.write(filename=file, arcname=os.path.join("dependencies", path)) + + logger.info(":" * 30) + logger.info("Installing nodejs and openwhisk-composer") + logger.info(":" * 30) + + builder_dir = str(self.__openwhisk_helpers_dir.resolve()) + builder_path = os.path.join(builder_dir, "local_nodejs_installer.sh") + nodejs_local_dir = str(self.__openwhisk_helpers_nodejs_dir.resolve()) + + # Only downloading a local NodeJS copy if one doesn't exist already with required libraries installed + ow_composer_binary_path = os.path.join(nodejs_local_dir, "node_modules", "openwhisk-composer", "bin", "compose.js") + if not os.path.exists(ow_composer_binary_path): + logger.info("Installing NodeJS using 'https://raw.githubusercontent.com/tj/n/master/bin/n'") + nodejs_installation_status = subprocess.run(["sh", builder_path, nodejs_local_dir]) + + if nodejs_installation_status.returncode == 0: + logger.info("NodeJS installation success") + else: + # TODO: Handle me gracefully + logger.info("NodeJS installation failure") + raise Exception("NodeJS installation failed") + + # ------------------------------------------------------------------------------------------ + # Doing a "$ npm install openwhisk-composer" from the local npm binary + logger.info("Starting to install openwhisk-composer using npm: See 'https://github.com/apache/openwhisk-composer'") + + local_nodejs_binary_path = self.__openwhisk_helpers_nodejs_dir/"bin"/"node" + local_npm_binary_path = self.__openwhisk_helpers_nodejs_dir/"bin"/"npm" + ow_composer_installation_status = subprocess.run([str(local_nodejs_binary_path.resolve()), str(local_npm_binary_path.resolve()), "install", "--prefix", nodejs_local_dir, "openwhisk-composer"]) + if ow_composer_installation_status.returncode == 0: + logger.info("Successfully installed openwhisk-composer") + else: + # TODO: Handle me gracefully + logger.info("openwhisk-composer installation failure") + raise Exception("openwhisk-composer installation failed") + # ------------------------------------------------------------------------------------------ + else: + logger.info(":" * 30) + logger.info("Required local NodeJS already exists in the build directory") + logger.info(":" * 30) + + logger.info(":" * 30) + logger.info("Installing nodejs and openwhisk-composer: SUCCESS") + logger.info(":" * 30) + + logger.info(":" * 30) + logger.info("Creating workflow composition files using openwhisk-composer") + logger.info(":" * 30) + + local_nodejs_binary_path = self.__openwhisk_helpers_nodejs_dir/"bin"/"node" + ow_composer_binary_path = os.path.join(nodejs_local_dir, "node_modules", "openwhisk-composer", "bin", "compose.js") + orchestrator_creation_status = subprocess.run([ + local_nodejs_binary_path, ow_composer_binary_path, + str(self.__openwhisk_composer_input_path.resolve()), + "-o", str(self.__openwhisk_composer_output_path.resolve()), + ]) + + if orchestrator_creation_status.returncode != 0: + logger.info("Orchestrator file creation failed") + raise Exception("Orchestrator file creation failed") + + logger.info(":" * 30) + logger.info("Creating openwhisk-composer files: SUCCESS") + logger.info(":" * 30) + + + def deploy_workflow(self): + """ + Third of the 3 main functions for the workflow. + 1. Removes existing actions with the current workflow's name + 2. Deploys all the actions + 3. Creates openwhisk-compatible orchestrator json file using third-party tool + 4. Deploys the orchestrator action + + ----- + TODO: + 1. Figure out web api mode, web hooks and related stuff + 2. Handle the input.json somehow to allow parallel workflow actions - "wsk -i action invoke /guest/graph-workflow -P input.json" + 3. Throw Exception if wsk command is not working -> Or find a better way to connect to the cluster + ----- + """ + logger.info(":" * 30) + logger.info("Deleting any existing OpenWhisk components") + logger.info(":" * 30) + + # ----------------- Existing OpenWhisk components deletion ----------------- + for node_name in self.__user_dag.get_node_object_map(): + curr_action_name = f"/{self.__action_namespace}/{self.__user_dag.get_user_dag_name()}/{node_name}" + try: + os.system(f"wsk -i action delete {curr_action_name}") + except Exception as e: + # TODO: Handle me gracefully + print("Either the openwhisk action does not exist or some error happened") + print(e) + + try: + os.system(f"wsk -i action delete {self.__openwhisk_workflow_orchestrator_action_name}") + except Exception as e: + # TODO: Handle me gracefully + print("Either the openwhisk workflow orchestrator does not exist or some error happened") + print(e) + + try: + # TODO: This creates the package in the "/guest/" namespace, need to figure out how to change that + # Hints: Helm changes are required + os.system(f"wsk -i package delete {self.__user_dag.get_user_dag_name()}") + os.system(f"wsk -i package create {self.__user_dag.get_user_dag_name()}") + except Exception as e: + # TODO: Handle me gracefully + print("Either the openwhisk package does not exist or some error happened") + print(e) + # ------------------------------------------------------------------------- + + # ----------------- New OpenWhisk components creation ----------------- + # Creating the actions manually using the wsk tool + logger.info(":" * 30) + logger.info("Deploying OpenWhisk action for each function") + logger.info(":" * 30) + + for func_name in self.__user_dag.get_node_object_map(): + action_name = f"/{self.__action_namespace}/{self.__user_dag.get_user_dag_name()}/{func_name}" + action_zip_path = self.__openwhisk_artifacts_dir / f"{func_name}.zip" + + # This overrides the default configuration set from the environment at an action level from the provided DAG. + memory_in_mb = self.__user_dag.get_node_object_map()[func_name].get_memory() + + try: + os.system(f"wsk -i action create {action_name} --kind python:3 {action_zip_path} --timeout {self.__action_timeout} --concurrency {self.__action_concurrency} --memory {memory_in_mb}") + except Exception as e: + print("X" * 20) + print(f"Action creation command failed for {action_name}") + print(e) + print("X" * 20) + + logger.info(":" * 30) + logger.info("Deploying OpenWhisk action for each function: SUCCESS") + logger.info(":" * 30) + + logger.info(":" * 30) + logger.info("Deploying OpenWhisk orchestrator action") + logger.info(":" * 30) + + nodejs_local_dir = str(self.__openwhisk_helpers_nodejs_dir.resolve()) + local_nodejs_binary_path = self.__openwhisk_helpers_nodejs_dir/"bin"/"node" + ow_deployer_binary_path = os.path.join(nodejs_local_dir, "node_modules", "openwhisk-composer", "bin", "deploy.js") + os.system(f"{local_nodejs_binary_path} {ow_deployer_binary_path} {self.__openwhisk_workflow_orchestrator_action_name} {self.__openwhisk_composer_output_path} -w -i") + + ignore_certs = "false" + if self.__ignore_certs: + ignore_certs = "true" + + # Hackish way to create this complex string input for orchestrator function + workflow_update_cmd = f"wsk -i action update {self.__openwhisk_workflow_orchestrator_action_name} --timeout {self.__action_timeout} --concurrency {self.__action_concurrency} --memory {self.__action_memory}" + workflow_update_cmd += " --param '$composer' '{" + workflow_update_cmd += '"redis":{"uri":"' + workflow_update_cmd += self.__redis_url + workflow_update_cmd += '"},"openwhisk":{"ignore_certs":' + workflow_update_cmd += ignore_certs + workflow_update_cmd += '}' + workflow_update_cmd += "}'" + + try: + os.system(workflow_update_cmd) + except Exception as e: + print("Workflow action updation command failed") + print(e) + + logger.info(":" * 30) + logger.info("Deploying OpenWhisk orchestrator action: SUCCESS") + logger.info(":" * 30) + + logger.info(":" * 30) + logger.info("Writing output json resource file") + logger.info(":" * 30) + + wsk_host_output = subprocess.run(["wsk", "property", "get", "--apihost"], capture_output=True) + wsk_auth_output = subprocess.run(["wsk", "property", "get", "--auth"], capture_output=True) + + if wsk_host_output.returncode != 0 or wsk_auth_output.returncode != 0: + print(" X " * 30) + print("Failed to get host and auth from 'wsk' client for the resources") + print(" X " * 30) + + wsk_host = wsk_host_output.stdout.decode("utf-8").strip().split()[-1].strip() + execution_auth = wsk_auth_output.stdout.decode("utf-8").strip().split()[-1].strip() + execution_url = f"https://{wsk_host}/api/v1/namespaces/{self.__action_namespace}/actions/{self.__user_dag.get_user_dag_name()}/orchestrator?blocking=false" + + output_resource_file_path = self.__serwo_resources_dir / f"openwhisk-{self.__region}-{self.__part_id}.json" + with open(output_resource_file_path, "w") as f: + output = { + "WorkflowEntrypoint": self.__openwhisk_workflow_orchestrator_action_name, + "Decription": f"Run 'wsk -i action invoke {self.__openwhisk_workflow_orchestrator_action_name}' to start the workflow", + "ExecutionUrl": execution_url, + "ExecutionAuth": execution_auth, + } + + f.write(json.dumps(output, indent=4)) + + logger.info(":" * 30) + logger.info("Writing output json resource file: SUCCESS") + logger.info(":" * 30) + # ------------------------------------------------------------------------- diff --git a/serwo/python/src/runner-templates/openwhisk/local_nodejs_installer.sh b/serwo/python/src/runner-templates/openwhisk/local_nodejs_installer.sh new file mode 100644 index 0000000..ada288f --- /dev/null +++ b/serwo/python/src/runner-templates/openwhisk/local_nodejs_installer.sh @@ -0,0 +1,4 @@ +NODEJS_DIR=$1 + +export N_PREFIX=$NODEJS_DIR +curl -fsSL https://raw.githubusercontent.com/tj/n/master/bin/n | bash -s 14.20.0 diff --git a/serwo/python/src/runner-templates/openwhisk/runner_template.py b/serwo/python/src/runner-templates/openwhisk/runner_template.py new file mode 100644 index 0000000..e0a8530 --- /dev/null +++ b/serwo/python/src/runner-templates/openwhisk/runner_template.py @@ -0,0 +1,139 @@ +""" +SerwoObject - single one +SerwoListObject - [SerwoObject] +""" +import os +import uuid +import time +import psutil +import objsize +from copy import deepcopy + +from USER_FUNCTION_PLACEHOLDER import user_function as USER_FUNCTION_PLACEHOLDER_function + +from python.src.utils.classes.commons.serwo_objects import SerWOObject +from python.src.utils.classes.commons.serwo_objects import build_serwo_object +from python.src.utils.classes.commons.serwo_objects import build_serwo_list_object + +downstream = 0 + +def get_delta(timestamp): + return round(time.time() * 1000) - timestamp + +# Openwhisk entry point +def main(event): + # For the first function of the workflow iff it is empty + if event is None or len(event) == 0: + event = { + "workflow_instance_id": str(uuid.uuid4()), + "request_timestamp": round(time.time() * 1000), + "session_id": str(uuid.uuid4()), + "deployment_id": str(uuid.uuid4()), + } + + start_time = round(time.time() * 1000) + # capturing input payload size + input_payload_size_bytes = None + + # Case of a fan-in from multiple actions. + # Note: Sending 'value' at the first level can cause an edge case bug + if "value" in event and isinstance(event["value"], list): + # TODO: exception handling + serwo_request_object = build_serwo_list_object(event["value"]) + + # Calculate input payload size + input_payload_size_bytes = sum([objsize.get_deep_size(x.get_body()) for x in serwo_request_object.get_objects()]) + + elif isinstance(event, dict): + if "metadata" not in event: + new_event = deepcopy(event) + event = dict(body=new_event) + wf_instance_id = event["body"].get("workflow_instance_id") + request_timestamp = event["body"].get("request_timestamp") + session_id = event["body"].get("session_id") # NOTE - new variable to keep track of requests + deployment_id = event["body"].get("deployment_id") + overheads = start_time - request_timestamp + event["metadata"] = dict( + workflow_instance_id=wf_instance_id, + workflow_start_time=start_time, + request_timestamp=request_timestamp, + session_id=session_id, + overheads=overheads, + deployment_id=deployment_id, + functions=[], + ) + serwo_request_object = build_serwo_object(event) + input_payload_size_bytes = objsize.get_deep_size(serwo_request_object.get_body()) + else: + # TODO: Report error and return + pass + + serwo_request_object.set_basepath("") + # user function exec + status_code = 200 + try: + start_epoch_time = serwo_request_object.get_metadata().get( + "workflow_start_time" + ) + start_time_delta = get_delta(start_epoch_time) + wf_instance_id = serwo_request_object.get_metadata().get("workflow_instance_id") + function_id = "{{function_id_placeholder}}" + process = psutil.Process(os.getpid()) + + memory_before = process.memory_info().rss + print(f"SerWOMemUsage::Before::{wf_instance_id},{function_id},{memory_before}") + response_object = USER_FUNCTION_PLACEHOLDER_function(serwo_request_object) + process = psutil.Process(os.getpid()) + memory_after = process.memory_info().rss + print(f"SerWOMemUsage::After::{wf_instance_id},{function_id},{memory_after}") + + # Sanity check for user function response + if not isinstance(response_object, SerWOObject): + status_code = 500 + return dict( + statusCode=status_code, + body="Response Object Must be of type SerWOObject", + metadata="None", + ) + end_time_delta = get_delta(start_epoch_time) + # st_time = int(time.time()*1000) + # cpu_brand = cpuinfo.get_cpu_info()["brand_raw"] + # en_time = int(time.time()*1000) + # time_taken = en_time - st_time + # cpu_brand = f"{cpu_brand}_{time_taken}" + # Get current metadata here + metadata = serwo_request_object.get_metadata() + function_metadata_list = metadata.get("functions") + # NOTE - the template for generated function id + function_metadata_list.append( + { + function_id: dict( + start_delta=start_time_delta, + end_delta=end_time_delta, + mem_before=memory_before, + mem_after=memory_after, + in_payload_bytes=input_payload_size_bytes, + out_payload_bytes=objsize.get_deep_size(response_object.get_body()), + # cpu=cpu_brand + + ) + } + ) + metadata.update(dict(functions=function_metadata_list)) + except Exception as e: + # if user compute fails then default to status code as 500 and no response body + print(e) + status_code = 500 + return dict( + statusCode=status_code, body="Error in user compute", metadata="None" + ) + # post function handler + # NOTE - leaving empty for now and returning a response is. + # Send service bus/ storage queue + body = response_object.get_body() + response = dict(statusCode=status_code, body=body, metadata=metadata) + if downstream == 0: + # TODO - change while doing egress + return response + return response + diff --git a/serwo/python/src/utils/classes/commons/csp.py b/serwo/python/src/utils/classes/commons/csp.py index 1e10507..7a4ddcd 100644 --- a/serwo/python/src/utils/classes/commons/csp.py +++ b/serwo/python/src/utils/classes/commons/csp.py @@ -2,8 +2,9 @@ import scripts.azure.azure_builder as azure_builder import scripts.azure.azure_deploy as azure_deployer - from serwo.aws_create_statemachine import AWS +from serwo.openwhisk_create_statemachine import OpenWhisk + class CSP: __name = None @@ -15,20 +16,30 @@ def get_name(self): #TODO: Factory pattern for csp - def build_resources(self,user_dir, dag_definition_path, region, part_id, dag_definition_file, is_netherite): + def build_resources(self, user_dir, dag_definition_path, region, part_id, dag_definition_file, is_netherite): if self.__name == 'azure': self.build_az(dag_definition_file, dag_definition_path, part_id, region, user_dir, is_netherite) - if self.__name == 'aws': + elif self.__name == 'aws': if part_id == "0000": aws_deployer = AWS(user_dir, dag_definition_file, "REST", part_id, region) - aws_deployer.build_resources() - aws_deployer.build_workflow() - aws_deployer.deploy_workflow() else: aws_deployer = AWS(user_dir, dag_definition_file, "SQS", part_id, region) - aws_deployer.build_resources() - aws_deployer.build_workflow() - aws_deployer.deploy_workflow() + + aws_deployer.build_resources() + aws_deployer.build_workflow() + aws_deployer.deploy_workflow() + + elif self.__name.lower() == 'openwhisk': + private_cloud_deployer = OpenWhisk(user_dir=user_dir, dag_file_name=dag_definition_file, part_id=part_id) + + print(":" * 80, "Generating resources for OpenWhisk Actions") + private_cloud_deployer.build_resources() + + print(":" * 80, "Generating workflow files for OpenWhisk") + private_cloud_deployer.build_workflow() + + print(":" * 80, "Deploying OpenWhisk") + private_cloud_deployer.deploy_workflow() def build_az(self, dag_definition_file, dag_definition_path, part_id, region, user_dir,is_netherite): diff --git a/serwo/python/src/utils/classes/openwhisk/function.py b/serwo/python/src/utils/classes/openwhisk/function.py new file mode 100644 index 0000000..877a476 --- /dev/null +++ b/serwo/python/src/utils/classes/openwhisk/function.py @@ -0,0 +1,38 @@ +""" +A Function object refers to an Action in Openwhisk +""" + +class Function: + def __init__(self, id, name, path, entry_point, memory): + self._id = id + self._name = name + self._path = path + self._memory = memory + self._uri = "functions/" + name + self._module_name = entry_point.split(".")[0] + + self._runner_filename = "standalone_" + entry_point.split(".")[0] + "_runner" + self._handler = self._runner_filename + ".main" + + def get_runner_filename(self): + return self._runner_filename + + def get_path(self): + return self._path + + def get_module_name(self): + return self._module_name + + def get_memory(self): + return self._memory + + def get_id(self): + return self._id + + def get_as_dict(self): + return { + "name": self._name, + "uri": self._uri, + "handler": self._handler, + "memory": self._memory, + } diff --git a/serwo/python/src/utils/classes/openwhisk/user_dag.py b/serwo/python/src/utils/classes/openwhisk/user_dag.py new file mode 100644 index 0000000..fd4322b --- /dev/null +++ b/serwo/python/src/utils/classes/openwhisk/user_dag.py @@ -0,0 +1,310 @@ +import json +import copy +import random +import string +import itertools +import networkx as nx +from collections import defaultdict + +from .function import Function + +class UserDag: + # dag configuration (picked up from user file) + __dag_config_data = dict() + + # map: nodeName -> nodeId (used internally) [NOTE [TK] - This map is changed from nodeName -> NodeId to UserGivenNodeId -> our internal nodeID] + __nodeIDMap = ({}) + + __dag = nx.DiGraph() # networkx directed graph + __functions = {} # map: functionName -> functionObject + + def __init__(self, user_config_path) -> None: + try: + self.__dag_config_data = self.__load_user_spec(user_config_path) + except Exception as e: + raise e + + for index, node in enumerate(self.__dag_config_data["Nodes"]): + nodeID = "n" + str(index+1) + # nodeID = "n" + node["NodeName"] # Uncomment to make debugging easier + nodeVar = self._generate_random_variable_name() + + self.__nodeIDMap[node["NodeName"]] = nodeID + self.__nodeIDMap[node["NodeId"]] = nodeID + self.__functions[node["NodeName"]] = Function( + id=node["NodeId"], + name=node["NodeName"], + path=node["Path"], + entry_point=node["EntryPoint"], + memory=node["MemoryInMB"], + ) + + workflowName = self.__dag_config_data["WorkflowName"] + nodeName = node["NodeName"] + + self.__dag.add_node( + nodeID, + NodeName=node["NodeName"], + Path=node["Path"], + EntryPoint=node["EntryPoint"], + CSP=node.get("CSP"), + MemoryInMB=node["MemoryInMB"], + machine_list=[nodeID], + var_machine_list=[nodeVar], + ret=[f'composer.action("/guest/{workflowName}/{nodeName}")'], # only applicable for leaf nodes (not for sequence/parallel) + var=nodeVar, + code_generation_done=False, + node_type="action", # Can be one of [action/parallel/sequence] + ) + + for edge in self.__dag_config_data["Edges"]: + for key in edge: + for val in edge[key]: + self.__dag.add_edge(self.__nodeIDMap[key], self.__nodeIDMap[val]) + + def _generate_random_variable_name(self, n=4): + res = ''.join(random.choices(string.ascii_letters, k=n)) + return str(res).lower() + + def __load_user_spec(self, user_config_path): + with open(user_config_path, "r") as user_dag_spec: + dag_data = json.load(user_dag_spec) + + return dag_data + + def get_user_dag_name(self): + return self.__dag_config_data["WorkflowName"] + + def get_node_object_map(self): + return self.__functions + + def get_node_param_list(self): + functions_list = [] + for f in self.__functions.values(): + functions_list.append(f.get_as_dict()) + + return functions_list + + def _merge_linear_nodes(self, graph: nx.DiGraph, node_list: list): + """ + Doesn't return anything since operating on deep copy of graphs might get heavy. + Works on side effect on nx.Digraph. + """ + if len(node_list) == 0: + return + + new_node_machine_list = [] + new_node_var_machine_list = [] + for node in node_list: + new_node_machine_list.extend(graph.nodes[node]["machine_list"]) + + if graph.nodes[node]["node_type"] == "action": + new_node_var_machine_list.extend(graph.nodes[node]["var_machine_list"]) + else: + # new_node_var_machine_list.extend(graph.nodes[node]["var"]) + if type(graph.nodes[node]["var"]) == str: + new_node_var_machine_list.extend([graph.nodes[node]["var"]]) + else: + new_node_var_machine_list.extend(graph.nodes[node]["var"]) + + new_node_id = "n" + str(node_list) + graph.add_node( + new_node_id, var=self._generate_random_variable_name(), + machine_list=new_node_machine_list, + var_machine_list=new_node_var_machine_list, + node_type="sequence", code_generation_done=False, + ) + + for u, v in list(graph.edges()): + if v == node_list[0]: + # replace the first node of the sequence with sequence node's id + graph.add_edge(u, new_node_id) + + if u == node_list[-1]: + # make the last node of the sequence point to sequence's next node (where sequence ends) + graph.add_edge(new_node_id, v) + + # remove the individual nodes + for n in node_list: + graph.remove_node(n) + + return + + + def _collapse_linear_chains(self, graph: nx.DiGraph): + """ + Doesn't return anything since operating on deep copy of graphs might get heavy. + Works on side effect on nx.Digraph. + """ + start_node = [node for node in graph.nodes if graph.in_degree(node) == 0][0] + dfs_edges = list(nx.dfs_edges(graph, source=start_node)) + + linear_chain = [] + set_of_linear_chains = set() + for u, v in dfs_edges: + if graph.out_degree(u) == 1 and graph.in_degree(v) == 1: + if u not in linear_chain: + linear_chain.append(u) + + if v not in linear_chain: + linear_chain.append(v) + else: + if linear_chain: + set_of_linear_chains.add(tuple(linear_chain)) + + linear_chain = [] + + if linear_chain != []: + set_of_linear_chains.add(tuple(linear_chain)) + linear_chain = [] + + for chain in set_of_linear_chains: + node_list = list(chain) + self._merge_linear_nodes(graph, node_list) + + return + + def _merge_parallel_nodes(self, graph: nx.DiGraph, node_list): + """ + Doesn't return anything since operating on deep copy of graphs might get heavy. + Works on side effect on nx.Digraph. + """ + if len(node_list) == 0: + return + + new_node_machine_list = [] + new_node_var_machine_list = [] + for node in node_list: + new_node_machine_list.append(graph.nodes[node]["machine_list"]) + # new_node_var_machine_list.append(graph.nodes[node]["var_machine_list"]) + + if graph.nodes[node]["node_type"] == "action": + new_node_var_machine_list.extend(graph.nodes[node]["var_machine_list"]) + else: + # new_node_var_machine_list.append(graph.nodes[node]["var"]) + if type(graph.nodes[node]["var"]) == str: + new_node_var_machine_list.extend([graph.nodes[node]["var"]]) + else: + new_node_var_machine_list.extend(graph.nodes[node]["var"]) + + # since we are only merging diamonds (same predecessor, same successor) + predecessor = list(graph.predecessors(node_list[0]))[0] + successor = list(graph.successors(node_list[0]))[0] + + new_node_id = "n" + str(new_node_machine_list) + graph.add_node( + new_node_id, var=self._generate_random_variable_name(), + machine_list=[new_node_machine_list], + var_machine_list=new_node_var_machine_list, + node_type="parallel", code_generation_done=False, + ) + + for node in node_list: + graph.remove_node(node) + + graph.add_edge(predecessor, new_node_id) + graph.add_edge(new_node_id, successor) + + return + + def _collapse_parallel_chains(self, graph: nx.DiGraph): + """ + Doesn't return anything since operating on deep copy of graphs might get heavy. + Works on side effect on nx.Digraph. + """ + start_node = [node for node in graph.nodes if graph.in_degree(node) == 0][0] + dfs_nodes = list(nx.dfs_preorder_nodes(graph, source=start_node)) + + set_of_parallel_chains = set() + for curr_node in dfs_nodes: + curr_node_succ = list(graph.successors(curr_node)) + diamond_forming_node = [] + + for succ in curr_node_succ: + if graph.out_degree(succ) == 1: + diamond_forming_node.append(succ) + + group_by_succ_dict = defaultdict(list) + for node in diamond_forming_node: + succ = list(graph.successors(node))[0] + group_by_succ_dict[succ].append(node) + + for val in group_by_succ_dict.values(): + if len(val) > 1: + set_of_parallel_chains.add(tuple(val)) + + for chain in set_of_parallel_chains: + chain_list = list(chain) + self._merge_parallel_nodes(graph, chain_list) + + return + + def get_updated_nodes(self, dag): + """ + TODO: Change me to a better approach, I won't scale well for bigger graphs. + """ + updated_node_ids = [] + + start_node = [node for node in dag.nodes if dag.in_degree(node) == 0][0] + bfs_nodes = list(nx.bfs_layers(dag, sources=start_node)) + bfs_nodes = list(itertools.chain.from_iterable(bfs_nodes)) + for node_id in bfs_nodes: + if not dag.nodes[node_id]["code_generation_done"]: + updated_node_ids.append(node_id) + + return updated_node_ids + + def get_orchestrator_code(self): + """ + Breaker of linear and parallel chains. + TODO: Test me thoroughly + """ + original_dag = self.__dag + output_dag = copy.deepcopy(original_dag) # preserving the original dag + generated_code = 'const composer = require("openwhisk-composer");\n\n' + + # Creating an action definition for each node of the graph + start_node = [node for node in output_dag.nodes if output_dag.in_degree(node) == 0][0] + bfs_nodes = list(nx.bfs_layers(output_dag, sources=start_node)) + bfs_nodes = list(itertools.chain.from_iterable(bfs_nodes)) + for curr_node in bfs_nodes: + generated_code += f"{output_dag.nodes[curr_node]['var']} = {output_dag.nodes[curr_node]['ret'][0]};\n"; + output_dag.nodes[curr_node]["code_generation_done"] = True + + iteration = 1 + while len(output_dag.nodes()) != 1: + self._collapse_linear_chains(output_dag) + new_linear_nodes = self.get_updated_nodes(output_dag) + if len(new_linear_nodes) > 0: + generated_code += f"\n// Iteration{iteration}: Sequence\n" + + for new_node in new_linear_nodes: + node = output_dag.nodes[new_node] + node["code_generation_done"] = True + + sub_node_machines = [] + for machine in node["var_machine_list"]: + sub_node_machines.append(machine) + + generated_code += f"{node['var']} = composer.sequence({', '.join(sub_node_machines)});\n"; + + self._collapse_parallel_chains(output_dag) + new_parallel_nodes = self.get_updated_nodes(output_dag) + if len(new_parallel_nodes) > 0: + generated_code += f"\n// Iteration{iteration}: Parallel\n" + + for new_node in new_parallel_nodes: + node = output_dag.nodes[new_node] + node["code_generation_done"] = True + + sub_node_machines = [] + for machine in node["var_machine_list"]: + sub_node_machines.append(machine) + + generated_code += f"{node['var']} = composer.parallel({', '.join(sub_node_machines)});\n"; + + iteration += 1 + + generated_code += f"\nmodule.exports = {output_dag.nodes[list(output_dag.nodes)[0]]['var']};\n" + return generated_code + \ No newline at end of file diff --git a/serwo/xfaas_provenance.py b/serwo/xfaas_provenance.py index 0cba306..ec5e093 100644 --- a/serwo/xfaas_provenance.py +++ b/serwo/xfaas_provenance.py @@ -109,9 +109,14 @@ def generate_provenance_artifacts(user_dir, wf_id, refactored_wf_id, wf_deployme resources_dir = pathlib.Path.joinpath( pathlib.Path(user_dir), "build/workflow/resources" ) - resouces_file = f'{resources_dir}/{csp}-{region}-{part_id}.json' - ##load json from file - with open(resouces_file) as f: + + if csp == 'openwhisk': + resources_file = f'{resources_dir}/{csp}-{part_id}.json' + else: + resources_file = f'{resources_dir}/{csp}-{region}-{part_id}.json' + + # load json from file + with open(resources_file) as f: resources = json.load(f) if csp == 'aws': @@ -120,6 +125,8 @@ def generate_provenance_artifacts(user_dir, wf_id, refactored_wf_id, wf_deployme app_name = r['OutputValue'] elif csp == 'azure' or csp == 'azure_v2': app_name = resources['group'] + elif csp == 'openwhisk': + app_name = resources['WorkflowEntrypoint'] if queue_details is None: raise Exception("Queue details not found, Try a Fresh Deployment by clearing CollectLogs") diff --git a/serwo/xfaas_resource_generator.py b/serwo/xfaas_resource_generator.py index 239db8a..dabbfa3 100644 --- a/serwo/xfaas_resource_generator.py +++ b/serwo/xfaas_resource_generator.py @@ -4,6 +4,7 @@ from jinja2 import Environment, FileSystemLoader from botocore.exceptions import ClientError import os + def generate(user_dir, partition_config, dag_definition_file): partition_config = list(reversed(partition_config)) @@ -36,8 +37,10 @@ def generate(user_dir, partition_config, dag_definition_file): part_id = partition_config[i].get_part_id() updated_user_dir = f"{user_dir}/partitions/{csp}-{region}-{part_id}" dag_path = f"{updated_user_dir}/{dag_definition_file}" + with open(dag_path, "r") as dag_file: dag_from_file = json.load(dag_file) + if downstream_csp == "aws": function_id = "252" function_name = "PushToSQS" @@ -60,7 +63,6 @@ def generate(user_dir, partition_config, dag_definition_file): "secret_access_key": aws_secret_access_key, } - template_dir = f"{root_dir}/python/src/faas-templates/aws/push-to-sqs-template/{function_name}" output_path = f"{updated_user_dir}/" os.system(f"cp -r {template_dir} {output_path}") @@ -85,8 +87,6 @@ def generate(user_dir, partition_config, dag_definition_file): output_path = f"{updated_user_dir}/" os.system(f"cp -r {template_dir} {output_path}") template_push_to_queue(updated_user_dir, function_name, entry_point, resources, "azure") - - egress_node = { "NodeId": function_id, @@ -107,16 +107,12 @@ def generate(user_dir, partition_config, dag_definition_file): CSP(csp).build_resources(updated_user_dir, dag_definition_path,region,part_id,dag_definition_file,is_netherite) - - - def template_push_to_queue( user_source_dir: str, egress_fn_name: str, egress_fn_entrypoint: str, resources: dict, csp:str - ): template_dir = f"{user_source_dir}/{egress_fn_name}" try: @@ -167,4 +163,4 @@ def template_push_to_queue( out.write(output) print(f"Updaing PushToQueue funciton for {egress_fn_name}") except: - raise Exception(f"Error in flushing {egress_fn_name} template") \ No newline at end of file + raise Exception(f"Error in flushing {egress_fn_name} template") diff --git a/serwo/xfaas_run_benchmark.py b/serwo/xfaas_run_benchmark.py index 818f27e..bde1a93 100644 --- a/serwo/xfaas_run_benchmark.py +++ b/serwo/xfaas_run_benchmark.py @@ -6,10 +6,12 @@ import shutil import sys import time +import base64 from datetime import datetime from xfaas_main import run as xfaas_deployer import time from xfbench_plotter import XFBenchPlotter + parser = argparse.ArgumentParser( prog="ProgramName", description="What the program does", @@ -41,6 +43,11 @@ server_ip = None server_user_id = None server_pem_file_path = None + +shell_script_commands = [] +aws_shell_script_commands = [] + + def get_client_login_details(config_path): global server_ip, server_user_id, server_pem_file_path with open(config_path) as f: @@ -51,9 +58,6 @@ def get_client_login_details(config_path): if 'server_pem_file_path' in data: server_pem_file_path = data['server_pem_file_path'] -shell_script_commands = [] -aws_shell_script_commands = [] - def get_aws_payload(payload): payload = json.dumps(payload) @@ -61,12 +65,19 @@ def get_aws_payload(payload): payload = payload[1:-1] return payload + def get_azure_payload(payload): payload = json.dumps(payload) payload = payload.replace('"', '"') return payload +def get_openwhisk_payload(payload): + payload = json.dumps(payload) + payload = payload.replace('"', '"') + payload = payload[1:-1] + return payload + def read_dynamism_file(dynamism,duration, max_rps): file_path = os.getenv("XFBENCH_DIR") + f"/workloads/{dynamism}-{max_rps}-{duration}.csv" @@ -107,6 +118,13 @@ def get_aws_resources(csp,region,part_id,wf_user_directory): return execute_url, state_machine_arn +def get_openwhisk_resources(csp, region, part_id, wf_user_directory): + resources = read_resources(csp, region, part_id, wf_user_directory) + execution_url = resources["ExecutionUrl"] + execution_auth_creds = resources["ExecutionAuth"] + return execution_url, execution_auth_creds + + def template_azure_jmx_file(rps, duration, execute_url, payload_size, input_jmx, output_path, session_id,payload): rps_keyword = "RPS" execute_url_keyword = "URL" @@ -167,17 +185,61 @@ def template_aws_jmx_file(rps, duration, execute_url, state_machine_arn, payload f.write(data) -def make_jmx_file(csp, rps, duration, payload_size, wf_name, execute_url,state_machine_arn, dynamism, session_id, wf_user_directory, part_id, region, wf_deployment_id,run_id, payload,is_localhost): +def template_openwhisk_jmx_file(rps, duration, execute_url, execute_auth_base64, input_jmx, output_path, session_id, payload): + global deployment_id + + with open(input_jmx) as f: + data = f.read() + + data = data.replace("DURATION", str(int(duration))) + data = data.replace("RPS", str(rps)) + data = data.replace("OPENWHISK_PAYLOAD_TO_REPLACE", get_openwhisk_payload(payload)) + data = data.replace("URL", execute_url) + data = data.replace("SESSION", str(session_id)) + data = data.replace("DEPLOYMENT_ID", deployment_id) + data = data.replace("BASE64_AUTH", execute_auth_base64) + + to_replace = '"ThreadGroup.num_threads">2' + if rps == 1020.0 or rps == 840.0: + data = data.replace(to_replace, f'"ThreadGroup.num_threads">17') + elif rps == 1920.0 or rps == 3840.0 or rps == 7680.0: + data = data.replace(to_replace, f'"ThreadGroup.num_threads">64') + + with open(output_path, "w") as f: + f.write(data) + + +def add_jmeter_openwhisk_response_path(jmx_file_path, wf_user_directory, wf_deployment_id, run_id): + with open(jmx_file_path) as f: + data = f.read() + + jmeter_responses_file_path = f"{wf_user_directory}/{wf_deployment_id}/{run_id}/jmeter_responses.csv" + data = data.replace("JMETER_RESPONSES_PATH", jmeter_responses_file_path) + + with open(jmx_file_path, "w") as f: + f.write(data) + + +def make_jmx_file(csp, rps, duration, payload_size, wf_name, execute_url, state_machine_arn, dynamism, session_id, wf_user_directory, part_id, region, wf_deployment_id, run_id, payload, is_localhost, execute_auth_creds): + """ + Changelog: + Added encoded_auth: It will be not None only for openwhisk + """ jmx_template_path, jmx_output_path,jmx_output_filename = get_jmx_paths(csp, rps, duration, payload_size, wf_name, dynamism,session_id,region) + if 'azure' in csp: template_azure_jmx_file(rps, duration, execute_url, payload_size, jmx_template_path, jmx_output_path, session_id,payload) + elif 'openwhisk' in csp: + encoded_auth = base64.b64encode(execute_auth_creds.encode()).decode('utf-8') + template_openwhisk_jmx_file(rps, duration, execute_url, encoded_auth, jmx_template_path, jmx_output_path, session_id, payload) + add_jmeter_openwhisk_response_path(jmx_output_path, wf_user_directory, wf_deployment_id, run_id) else: template_aws_jmx_file(rps, duration, execute_url, state_machine_arn, payload_size, jmx_template_path, jmx_output_path, session_id,payload) + send_jmx_file_to_server(jmx_output_path,jmx_output_filename,rps,duration,is_localhost) dump_experiment_conf(jmx_output_filename, csp, rps, duration, payload_size, wf_name, dynamism, session_id, wf_user_directory, part_id, region, wf_deployment_id,run_id) - def dump_experiment_conf(jmx_output_filename, csp, rps, duration, payload_size, wf_name, dynamism, session_id, wf_user_directory, part_id, region,wf_deployment_id,run_id): provenance_artefacts_updated_path = f"{wf_user_directory}/{wf_deployment_id}/{run_id}/{artifact_suffix}" @@ -280,9 +342,8 @@ def load_payload(wf_user_directory,payload_size): return payload -def run_workload(csp,region,part_id,max_rps,duration,payload_size,dynamism,wf_name, wf_user_directory, wf_deployment_id,run_id, is_localhost): - - copy_provenance_artifacts(csp, region, part_id, wf_user_directory, wf_deployment_id,max_rps,run_id) +def run_workload(csp, region, part_id, max_rps, duration, payload_size, dynamism, wf_name, wf_user_directory, wf_deployment_id, run_id, is_localhost): + copy_provenance_artifacts(csp, region, part_id, wf_user_directory, wf_deployment_id, max_rps,run_id) dynamism_data = read_dynamism_file(dynamism, duration, max_rps) if 'azure' in csp: @@ -290,7 +351,9 @@ def run_workload(csp,region,part_id,max_rps,duration,payload_size,dynamism,wf_na state_machine_arn = '' elif csp == 'aws': execute_url, state_machine_arn = get_aws_resources(csp,region,part_id,wf_user_directory) - + elif csp == 'openwhisk': + execute_url, execute_auth_creds = get_openwhisk_resources(csp, region, part_id, wf_user_directory) + state_machine_arn = '' dynamism_updated = dynamism @@ -314,24 +377,23 @@ def run_workload(csp,region,part_id,max_rps,duration,payload_size,dynamism,wf_na # payload = load_payload(wf_user_directory,payload_size) ne_session_id = session_id + str(i) - make_jmx_file(csp, rps * 60.0, duration, payload_size, wf_name, execute_url,state_machine_arn, dynamism, ne_session_id, wf_user_directory, part_id, region , wf_deployment_id, run_id,payload,is_localhost) + make_jmx_file(csp, rps * 60.0, duration, payload_size, wf_name, execute_url, state_machine_arn, dynamism, ne_session_id, wf_user_directory, part_id, region , wf_deployment_id, run_id, payload, is_localhost, execute_auth_creds) i += 1 - generate_shell_script_and_scp(csp,payload_size, wf_name, max_rps, duration,dynamism,region,is_localhost) - + + generate_shell_script_and_scp(csp, payload_size, wf_name, max_rps, duration, dynamism, region, is_localhost) def copy_provenance_artifacts(csp, region, part_id, wf_user_directory,wf_deployment_id,rps,run_id): - global deployment_id os.makedirs(f"{wf_user_directory}/{wf_deployment_id}/{run_id}", exist_ok=True) provenance_artefacts_path = f"{wf_user_directory}/build/workflow/resources/provenance-artifacts-{csp}-{region}-{part_id}.json" + with open(provenance_artefacts_path) as f: provenance_artifact = json.load(f) - deployment_id = provenance_artifact['deployment_id'] + deployment_id = provenance_artifact['deployment_id'] provenance_artefacts_updated_path = f"{wf_user_directory}/{wf_deployment_id}/{run_id}/{artifact_suffix}" - shutil.copyfile(provenance_artefacts_path, provenance_artefacts_updated_path)