From 159cad5ae6bf74ec8c3cf0e97d0b60d237292d92 Mon Sep 17 00:00:00 2001 From: rajdeep1008 Date: Tue, 7 May 2024 16:27:20 +0530 Subject: [PATCH 01/18] Changelog: Basic code generation working for OpenWhisk Too fragile to be usable --- .vscode/launch.json | 47 +++ serwo/openwhisk_create_statemachine.py | 325 ++++++++++++++++++ .../openwhisk/runner_template.py | 131 +++++++ serwo/python/src/utils/classes/commons/csp.py | 14 +- .../src/utils/classes/openwhisk/function.py | 35 ++ .../src/utils/classes/openwhisk/user_dag.py | 73 ++++ serwo/xfaas_resource_generator.py | 5 +- 7 files changed, 623 insertions(+), 7 deletions(-) create mode 100644 .vscode/launch.json create mode 100644 serwo/openwhisk_create_statemachine.py create mode 100644 serwo/python/src/runner-templates/openwhisk/runner_template.py create mode 100644 serwo/python/src/utils/classes/openwhisk/function.py create mode 100644 serwo/python/src/utils/classes/openwhisk/user_dag.py diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..55563e8 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,47 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "Python Debugger: XFaaS Run Benchmark", + "type": "debugpy", + "request": "launch", + "program": "${file}", + "console": "integratedTerminal", + "args": [ + "--csp", "aws", + "--region", "ap-south-1", + "--max-rps", "2", + "--duration", "60", + "--payload-size", "small", + "--dynamism", "static", + "--wf-name", "test?", + "--wf-user-directory", "", + "--path-to-client-config", "./crack/client_config.json", + "--client-key", "debug", + "--dag-file-name", "", + "--teardown-flag", "", + "--is_singleton_wf", "0", + "--function-class", "graph", + "--function-name", "graph_bft", + "--function-code", "test_code", + "--node_name", "TestNode", + "--XFBENCH_DIR", "crack" + ] + }, + { + "name": "Python Debugger: XFaaS Main", + "type": "debugpy", + "request": "launch", + "program": "${file}", + "console": "integratedTerminal", + "args": ["./crack/XFBench/workflows/custom_workflows/math_processing_wf", + "dag.json", + "benchmark.json", + "openwhisk", "ap-south-1" + ] + } + ] +} \ No newline at end of file diff --git a/serwo/openwhisk_create_statemachine.py b/serwo/openwhisk_create_statemachine.py new file mode 100644 index 0000000..b755a9f --- /dev/null +++ b/serwo/openwhisk_create_statemachine.py @@ -0,0 +1,325 @@ +""" +Remarks: +* Each cloud could use its own folder with an API contract +""" +import os +import shutil +import zipfile +from pathlib import Path +from jinja2 import Environment, FileSystemLoader + +from python.src.utils.classes.openwhisk.user_dag import UserDag +from python.src.utils.classes.commons.logger import LoggerFactory + +logger = LoggerFactory.get_logger(__file__, log_level="INFO") + + +class OpenWhisk: + def __init__(self, user_dir, dag_file_name, part_id) -> None: + self.__user_dir = Path(user_dir) + self.__dag_file_name = dag_file_name + self.__part_id = part_id + + self.__parent_directory_path = Path(__file__).parent + + # xfaas specfic directories + self.__serwo_build_dir = self.__user_dir / "build" / "workflow" + self.__serwo_resources_dir = self.__serwo_build_dir / "resources" + self.__serwo_utils_dir = self.__parent_directory_path / "python" + + # TODO: Change me to generic private cloud implementation + self.__runner_template_dir = self.__serwo_utils_dir / "src" / "runner-templates" / "openwhisk" + + self.__dag_definition_path = self.__user_dir / self.__dag_file_name + + # openwhisk specific directories + # TODO: Decide between wsk cli and REST api based implementation + self.__openwhisk_build_dir = self.__serwo_build_dir / "openwhisk" + self.__openwhisk_functions_dir = self.__openwhisk_build_dir / "functions" + self.__openwhisk_artifacts_dir = self.__openwhisk_build_dir / "artifacts" + + # DAG related parameters + self.__user_dag = UserDag(self.__dag_definition_path) + + + def __create_environment(self): + """ + TODO: Check if the generated folders need to have __init__.py (be a module) + """ + self.__openwhisk_functions_dir.mkdir(parents=True, exist_ok=True) + self.__openwhisk_artifacts_dir.mkdir(parents=True, exist_ok=True) + + + def __render_runner_template(self, runner_template_dir, function_id, function_name, function_runner_filename): + """ + """ + runner_template_filename = function_runner_filename + + try: + file_loader = FileSystemLoader(runner_template_dir) + env = Environment(loader=file_loader) + template = env.get_template("runner_template.py") + logger.info( + f"Created jinja2 environement for templating Openwhisk function ids for function::{function_name}" + ) + except: + raise Exception( + f"Unable to load environment for Openwhisk function id templating for function::{function_name}" + ) + + # render the template + try: + output = template.render(function_id_placeholder=function_id) + with open(f"{runner_template_dir}/{runner_template_filename}", "w") as out: + out.write(output) + logger.info( + f"Rendered and flushed the runner template for Openwhisk function for function::{function_name}" + ) + except Exception as e: + logger.error(e) + raise Exception( + f"Unable to render the function runner template for Openwhisk function for function::{function_name}" + ) + + return runner_template_filename + + + def __append_xfaas_default_requirements(self, filepath): + """ + Appends XFaaS dependencies to the function requirements + """ + with open(filepath, "r") as file: + lines = file.readlines() + lines.append("psutil\n") + lines.append("objsize\n") + unqiue_dependencies = set(lines) + file.flush() + + with open(filepath, "w") as file: + for line in [x.strip("\n") for x in sorted(unqiue_dependencies)]: + file.write(f"{line}\n") + + + def __create_openwhisk_actions( + self, + user_fn_path, + fn_name, + fn_module_name, + runner_template_filename, + runner_template_dir, + runner_filename + ): + """ + TODO: Check WTF am I doing + user_fn_path: Workflow code is taken from hardcoded XFaaS samples from this path + """ + fn_requirements_filename = "requirements.txt" + src_fn_dir = user_fn_path / fn_requirements_filename + dst_fn_dir = self.__openwhisk_functions_dir / fn_name + + dst_requirements_path = dst_fn_dir / fn_requirements_filename + + logger.info(f"Creating function directory for {fn_name}") + if not os.path.exists(dst_fn_dir): + os.makedirs(dst_fn_dir) + + logger.info(f"Moving requirements file for {fn_name} for user at to {dst_fn_dir}") + shutil.copyfile(src=src_fn_dir, dst=dst_requirements_path) + + logger.info(f"Adding default requirements {fn_name}") + self.__append_xfaas_default_requirements(dst_requirements_path) + + # place the dependencies folder from the user function path if it exists + if os.path.exists(user_fn_path / "dependencies"): + shutil.copytree( + user_fn_path / "dependencies", + dst_fn_dir / "dependencies", + dirs_exist_ok=True + ) + + logger.info(f"Moving xfaas boilerplate for {fn_name}") + shutil.copytree(src=self.__serwo_utils_dir, dst=dst_fn_dir / "python", dirs_exist_ok=True) + + logger.info(f"Generating Runners for function {fn_name}") + + fnr_string = f"USER_FUNCTION_PLACEHOLDER" + temp_runner_path = user_fn_path / f"{fn_name}_temp_runner.py" + runner_template_path = runner_template_dir / runner_template_filename + + print("Here", runner_template_path) + with open(runner_template_path, "r") as file: + contents = file.read() + contents = contents.replace(fnr_string, fn_module_name) + + with open(temp_runner_path, "w") as file: + file.write(contents) + + # TODO - Fix the stickytape issue: GitHub Issue link - https://github.com/dream-lab/XFaaS/issues/4 + logger.info(f"Stickytape the runner template for dependency resolution") + runner_file_path = dst_fn_dir / f"{runner_filename}.py" + os.system(f"stickytape {temp_runner_path} > {runner_file_path}") + + logger.info(f"Deleting temporary runner") + os.remove(temp_runner_path) + + logger.info(f"Successfully created build directory for function {fn_name}") + + + def __create_standalone_runners(self): + """ + TODO: Create functions with __main__.py name + """ + function_metadata_list = self.__user_dag.get_node_param_list() + function_object_map = self.__user_dag.get_node_object_map() + + for function_metadata in function_metadata_list: + function_name = function_metadata["name"] + function_runner_filename = function_object_map[function_name].get_runner_filename() + + # generalize later + function_runner_filename = "__main__" + + function_path = function_object_map[function_name].get_path() + function_module_name = function_object_map[function_name].get_module_name() + function_id = function_object_map[function_name].get_id() + + # template the function runner template in the runner template directory + runner_template_filename = self.__render_runner_template( + runner_template_dir=self.__runner_template_dir, + function_id=function_id, + function_name=function_name, + function_runner_filename=function_runner_filename + ) + + logger.info(f"Starting Standalone Runner Creation for function {function_name}") + + self.__create_openwhisk_actions( + self.__parent_directory_path / function_path, + function_name, + function_module_name, + function_runner_filename, + self.__runner_template_dir, + runner_template_filename + ) + + runner_template_filepath = self.__runner_template_dir / runner_template_filename + logger.info(f"Deleting Temporary Runner Template at {runner_template_filepath}") + os.remove(f"{runner_template_filepath}") + + + def __create_workflow_orchestrator(self, file_name_prefix): + """ + Create composer.js file for openwhisk composer + TODO: Fix me -> Hardcoded AF js file + TODO: Setup a local npm + node combo with openwhisk-composer library + """ + composer_file_path = self.__openwhisk_build_dir / f"{file_name_prefix}_workflow_composition.js" + + # TODO: Everything here is dummy stuff + # Another assumption here is that action creation will be taken care of at some other place + action_sequence = [] + for func_name in self.__user_dag.get_node_object_map(): + action_name = f"/guest/{self.__user_dag.get_user_dag_name()}/{func_name}" + action_sequence.append(f'composer.action("{action_name}")') + + with open(composer_file_path, "w") as f: + f.write('const composer = require("openwhisk-composer");\n\n') + + temp = ", ".join(action_sequence) + f.write(f'module.exports = composer.sequence({temp});\n') + + # TODO: This is also temporary + # To allow parallel workflows and other stuff, a redis instance is needed + # with an input.json file with corresponding redis info + redis_input_file_path = self.__openwhisk_build_dir / f"{file_name_prefix}_input.json" + with open(redis_input_file_path, "w") as f: + f.write(""" +{ + "$composer": { + "redis": { + "uri": "redis://owdev-redis.openwhisk.svc.cluster.local:6379" + }, + "openwhisk": { + "ignore_certs": true + } + } +} +""") + + + def build_resources(self): + """ + TODO: Implement me + 1. Create action compatible function files + 2. Create openwhisk composer files -> Check AWS logic for the graph creation + """ + logger.info(f"Creating environment for {self.__user_dag.get_user_dag_name()}") + self.__create_environment() + + logger.info(f"Initating standalone runner creation for {self.__user_dag.get_user_dag_name()}") + self.__create_standalone_runners() + + logger.info(f"Initating openwhisk composer js orchestrator for {self.__user_dag.get_user_dag_name()}") + self.__create_workflow_orchestrator(self.__user_dag.get_user_dag_name()) + + # Some API gateway?? + + + def build_workflow(self): + """ + TODO: Implement me + TODO: Differentiate between docker actions vs pure python actions + 1. Create zip artifacts for all the functions + 2. Create json file out of the compose.js file + + Need wsk, node, npm + """ + + # Creates a zip artifact from each function + for func_name in self.__user_dag.get_node_object_map(): + # func = self.__user_dag.get_node_object_map()[func_name] + + py_file = self.__openwhisk_functions_dir / func_name / "__main__.py" + req_file = self.__openwhisk_functions_dir / func_name / "requirements.txt" + + output_artifact_path = self.__openwhisk_build_dir / "artifacts" / f"{func_name}.zip" + + # Zip the requirements and __main__.py from input_dir -> Write to output_dir + with zipfile.ZipFile(output_artifact_path, "w") as f: + f.write(py_file, os.path.basename(py_file)) + f.write(req_file, os.path.basename(req_file)) + + composer_input_path = self.__openwhisk_build_dir / f"{self.__user_dag.get_user_dag_name()}_workflow_composition.js" + composer_output_path = self.__openwhisk_build_dir / f"{self.__user_dag.get_user_dag_name()}_workflow_composition.json" + + # TODO: Change me... Be a better human than this + os.system(f"source ~/.bashrc && compose.js {composer_input_path} -o {composer_output_path}") + + + def deploy_workflow(self): + """ + TODO: Setup wsk if not present somehow + 1. Deploy all the actions + 2. Deploy composition + 3. Web hooks and stuff + # Create or update the action + + TODO: wsk -i action invoke /guest/graph-workflow -P input.json --result -> Handle this input.json thingy somehow + TODO: Throw Exception if wsk command is not working -> Or find a better way to connect to the cluster + """ + logger.info(f"Deleting any existing package with same name") + + # TODO: Check if deleting a package deletes all the actions automatically? + os.system(f"wsk -i package delete {self.__user_dag.get_user_dag_name()}") + os.system(f"wsk -i package create {self.__user_dag.get_user_dag_name()}") + + # Create actions manually + for func_name in self.__user_dag.get_node_object_map(): + action_name = f"/guest/{self.__user_dag.get_user_dag_name()}/{func_name}" + action_zip_path = self.__openwhisk_artifacts_dir / func_name / ".zip" + os.system(f"wsk -i action create {action_name} --kind python:3 {action_zip_path} --timeout 300000 --concurrency 10") + + # Create composition + composition_name = f"{self.__user_dag.get_user_dag_name()}-composition" + composer_config_path = self.__openwhisk_build_dir / f"{self.__user_dag.get_user_dag_name()}_workflow_composition.json" + os.system(f"source ~/.bashrc && deploy.js {composition_name} {composer_config_path} -w -i") diff --git a/serwo/python/src/runner-templates/openwhisk/runner_template.py b/serwo/python/src/runner-templates/openwhisk/runner_template.py new file mode 100644 index 0000000..7a4c2db --- /dev/null +++ b/serwo/python/src/runner-templates/openwhisk/runner_template.py @@ -0,0 +1,131 @@ +""" +SerwoObject - single one +SerwoListObject - [SerwoObject] +""" +import os +import time +import psutil +import objsize +from copy import deepcopy + +from USER_FUNCTION_PLACEHOLDER import user_function as USER_FUNCTION_PLACEHOLDER_function + +from python.src.utils.classes.commons.serwo_objects import SerWOObject +from python.src.utils.classes.commons.serwo_objects import build_serwo_object +from python.src.utils.classes.commons.serwo_objects import build_serwo_list_object + +downstream = 0 + + +def get_delta(timestamp): + return round(time.time() * 1000) - timestamp + + +# Openwhisk entry point +def main(args): + start_time = round(time.time() * 1000) + # capturing input payload size + input_payload_size_bytes = None + + if isinstance(event, list): + # TODO: exception handling + serwo_request_object = build_serwo_list_object(event) + + # Calculate input payload size + input_payload_size_bytes = sum([objsize.get_deep_size(x.get_body()) for x in serwo_request_object.get_objects()]) + + elif isinstance(event, dict): + if "metadata" not in event: + new_event = deepcopy(event) + event = dict(body=new_event) + wf_instance_id = event["body"].get("workflow_instance_id") + request_timestamp = event["body"].get("request_timestamp") + session_id = event["body"].get("session_id") # NOTE - new variable to keep track of requests + deployment_id = event["body"].get("deployment_id") + overheads = start_time - request_timestamp + event["metadata"] = dict( + workflow_instance_id=wf_instance_id, + workflow_start_time=start_time, + request_timestamp=request_timestamp, + session_id=session_id, + overheads=overheads, + deployment_id=deployment_id, + functions=[], + ) + serwo_request_object = build_serwo_object(event) + input_payload_size_bytes = objsize.get_deep_size(serwo_request_object.get_body()) + else: + # TODO: Report error and return + pass + serwo_request_object.set_basepath("") + # user function exec + status_code = 200 + try: + start_epoch_time = serwo_request_object.get_metadata().get( + "workflow_start_time" + ) + start_time_delta = get_delta(start_epoch_time) + wf_instance_id = serwo_request_object.get_metadata().get("workflow_instance_id") + function_id = "{{function_id_placeholder}}" + process = psutil.Process(os.getpid()) + + memory_before = process.memory_info().rss + print(f"SerWOMemUsage::Before::{wf_instance_id},{function_id},{memory_before}") + response_object = USER_FUNCTION_PLACEHOLDER_function(serwo_request_object) + process = psutil.Process(os.getpid()) + memory_after = process.memory_info().rss + print(f"SerWOMemUsage::After::{wf_instance_id},{function_id},{memory_after}") + + # Sanity check for user function response + if not isinstance(response_object, SerWOObject): + status_code = 500 + return dict( + statusCode=status_code, + body="Response Object Must be of type SerWOObject", + metadata="None", + ) + end_time_delta = get_delta(start_epoch_time) + # st_time = int(time.time()*1000) + # cpu_brand = cpuinfo.get_cpu_info()["brand_raw"] + # en_time = int(time.time()*1000) + # time_taken = en_time - st_time + # cpu_brand = f"{cpu_brand}_{time_taken}" + # Get current metadata here + metadata = serwo_request_object.get_metadata() + function_metadata_list = metadata.get("functions") + # NOTE - the template for generated function id + function_metadata_list.append( + { + function_id: dict( + start_delta=start_time_delta, + end_delta=end_time_delta, + mem_before=memory_before, + mem_after=memory_after, + in_payload_bytes=input_payload_size_bytes, + out_payload_bytes=objsize.get_deep_size(response_object.get_body()), + # cpu=cpu_brand + + ) + } + ) + metadata.update(dict(functions=function_metadata_list)) + except Exception as e: + # if user compute fails then default to status code as 500 and no response body + print(e) + status_code = 500 + return dict( + statusCode=status_code, body="Error in user compute", metadata="None" + ) + # post function handler + # NOTE - leaving empty for now and returning a response is. + # Send service bus/ storage queue + body = response_object.get_body() + response = dict(statusCode=status_code, body=body, metadata=metadata) + if downstream == 0: + # TODO - change while doing egress + return response + return response + + +if __name__ == "__main__": + print("Main Method") diff --git a/serwo/python/src/utils/classes/commons/csp.py b/serwo/python/src/utils/classes/commons/csp.py index 5339e39..35b3c5a 100644 --- a/serwo/python/src/utils/classes/commons/csp.py +++ b/serwo/python/src/utils/classes/commons/csp.py @@ -2,8 +2,9 @@ import scripts.azure.azure_builder as azure_builder import scripts.azure.azure_deploy as azure_deployer - from serwo.aws_create_statemachine import AWS +from serwo.openwhisk_create_statemachine import OpenWhisk + class CSP: __name = None @@ -15,15 +16,20 @@ def get_name(self): #TODO: Factory pattern for csp - def build_resources(self,user_dir, dag_definition_path, region, part_id, dag_definition_file, is_netherite): + def build_resources(self, user_dir, dag_definition_path, region, part_id, dag_definition_file, is_netherite): if self.__name == 'azure': self.build_az(dag_definition_file, dag_definition_path, part_id, region, user_dir, is_netherite) - if self.__name == 'aws': + elif self.__name == 'aws': aws_deployer = AWS(user_dir, dag_definition_file, "REST", part_id, region) aws_deployer.build_resources() aws_deployer.build_workflow() aws_deployer.deploy_workflow() - pass + elif self.__name == 'openwhisk': + # TODO: Convert me to a generic implementation for private clouds + private_cloud_deployer = OpenWhisk(user_dir=user_dir, dag_file_name=dag_definition_file, part_id=part_id) + private_cloud_deployer.build_resources() + private_cloud_deployer.build_workflow() + private_cloud_deployer.deploy_workflow() def build_az(self, dag_definition_file, dag_definition_path, part_id, region, user_dir,is_netherite): diff --git a/serwo/python/src/utils/classes/openwhisk/function.py b/serwo/python/src/utils/classes/openwhisk/function.py new file mode 100644 index 0000000..fafb8bf --- /dev/null +++ b/serwo/python/src/utils/classes/openwhisk/function.py @@ -0,0 +1,35 @@ +""" +A Function object refers to an Action in Openwhisk +""" + +class Function: + def __init__(self, id, name, path, entry_point, memory): + self._id = id + self._name = name + self._path = path + self._memory = memory + self._uri = "functions/" + name + self._module_name = entry_point.split(".")[0] + + self._runner_filename = "standalone_" + entry_point.split(".")[0] + "_runner" + self._handler = self._runner_filename + ".main" + + def get_runner_filename(self): + return self._runner_filename + + def get_path(self): + return self._path + + def get_module_name(self): + return self._module_name + + def get_id(self): + return self._id + + def get_as_dict(self): + return { + "name": self._name, + "uri": self._uri, + "handler": self._handler, + "memory": self._memory, + } diff --git a/serwo/python/src/utils/classes/openwhisk/user_dag.py b/serwo/python/src/utils/classes/openwhisk/user_dag.py new file mode 100644 index 0000000..55d15ec --- /dev/null +++ b/serwo/python/src/utils/classes/openwhisk/user_dag.py @@ -0,0 +1,73 @@ +import json +import networkx as nx + +from .function import Function + +class UserDag: + # dag configuration (picked up from user file) + __dag_config_data = dict() + + # map: nodeName -> nodeId (used internally) [NOTE [TK] - This map is changed from nodeName -> NodeId to UserGivenNodeId -> our internal nodeID] + __nodeIDMap = ({}) + + __dag = nx.DiGraph() # networkx directed graph + __functions = {} # map: functionName -> functionObject + + def __init__(self, user_config_path) -> None: + try: + self.__dag_config_data = self.__load_user_spec(user_config_path) + except Exception as e: + raise e + + for index, node in enumerate(self.__dag_config_data["Nodes"]): + nodeID = "n" + str(index+1) + self.__nodeIDMap[node["NodeName"]] = nodeID + self.__nodeIDMap[node["NodeId"]] = nodeID + self.__functions[node["NodeName"]] = Function( + id=node["NodeId"], + name=node["NodeName"], + path=node["Path"], + entry_point=node["EntryPoint"], + memory=node["MemoryInMB"], + ) + + # TODO: Add support for private cloud related params here in _get_state() + # TODO: AWS reference also stores ARN and retry, backoff, max attempts etc in __dag + # generate hardcoded Action's package, etc here? + self.__dag.add_node( + nodeID, + NodeName=node["NodeName"], + Path=node["Path"], + EntryPoint=node["EntryPoint"], + CSP=node.get("CSP"), + MemoryInMB=node["MemoryInMB"], + ) + + for edge in self.__dag_config_data["Edges"]: + for key in edge: + for val in edge[key]: + self.__dag.add_edge(self.__nodeIDMap[key], self.__nodeIDMap[val]) + + + def __load_user_spec(self, user_config_path): + with open(user_config_path, "r") as user_dag_spec: + dag_data = json.load(user_dag_spec) + + return dag_data + + + def get_user_dag_name(self): + return self.__dag_config_data["WorkflowName"] + + + def get_node_object_map(self): + return self.__functions + + + def get_node_param_list(self): + functions_list = [] + for f in self.__functions.values(): + functions_list.append(f.get_as_dict()) + + return functions_list + diff --git a/serwo/xfaas_resource_generator.py b/serwo/xfaas_resource_generator.py index 3db969e..c5cce60 100644 --- a/serwo/xfaas_resource_generator.py +++ b/serwo/xfaas_resource_generator.py @@ -12,6 +12,5 @@ def generate(user_dir, dag_definition_path, partition_config, dag_definition_fil is_netherite = True else: is_netherite = False - CSP(csp).build_resources(user_dir, dag_definition_path,region,part_id,dag_definition_file,is_netherite) - - + CSP(csp).build_resources(user_dir, dag_definition_path, region, part_id, dag_definition_file, is_netherite) + \ No newline at end of file From 418ef7b488e8e3d41d700c1b91fff83daf0d6de5 Mon Sep 17 00:00:00 2001 From: rajdeep1008 Date: Wed, 12 Jun 2024 12:31:31 +0530 Subject: [PATCH 02/18] Changelog: Added a more reliable openwhisk code generation --- serwo/openwhisk_create_statemachine.py | 295 ++++++++++++------ .../openwhisk/runner_template.py | 4 +- serwo/python/src/utils/classes/commons/csp.py | 9 +- .../src/utils/classes/openwhisk/user_dag.py | 185 ++++++++++- 4 files changed, 388 insertions(+), 105 deletions(-) diff --git a/serwo/openwhisk_create_statemachine.py b/serwo/openwhisk_create_statemachine.py index b755a9f..272f01c 100644 --- a/serwo/openwhisk_create_statemachine.py +++ b/serwo/openwhisk_create_statemachine.py @@ -1,10 +1,8 @@ -""" -Remarks: -* Each cloud could use its own folder with an API contract -""" import os +import glob import shutil import zipfile +import subprocess from pathlib import Path from jinja2 import Environment, FileSystemLoader @@ -15,7 +13,7 @@ class OpenWhisk: - def __init__(self, user_dir, dag_file_name, part_id) -> None: + def __init__(self, user_dir, dag_file_name, part_id): self.__user_dir = Path(user_dir) self.__dag_file_name = dag_file_name self.__part_id = part_id @@ -25,29 +23,53 @@ def __init__(self, user_dir, dag_file_name, part_id) -> None: # xfaas specfic directories self.__serwo_build_dir = self.__user_dir / "build" / "workflow" self.__serwo_resources_dir = self.__serwo_build_dir / "resources" + + # This holds utility scripts and templates self.__serwo_utils_dir = self.__parent_directory_path / "python" - - # TODO: Change me to generic private cloud implementation self.__runner_template_dir = self.__serwo_utils_dir / "src" / "runner-templates" / "openwhisk" self.__dag_definition_path = self.__user_dir / self.__dag_file_name # openwhisk specific directories - # TODO: Decide between wsk cli and REST api based implementation self.__openwhisk_build_dir = self.__serwo_build_dir / "openwhisk" self.__openwhisk_functions_dir = self.__openwhisk_build_dir / "functions" + # This holds the zip files to be deployed self.__openwhisk_artifacts_dir = self.__openwhisk_build_dir / "artifacts" + # This holds the bash script to setup local nodejs for openwhisk composer + self.__openwhisk_helpers_dir = self.__openwhisk_build_dir / "helpers" + self.__openwhisk_helpers_nodejs_dir = self.__openwhisk_helpers_dir / "local_nodejs" # DAG related parameters self.__user_dag = UserDag(self.__dag_definition_path) + self.__openwhisk_workflow_orchestrator_action_name = f"/guest/{self.__user_dag.get_user_dag_name()}/orchestrator" + self.__openwhisk_composer_input_path = self.__openwhisk_build_dir / f"{self.__user_dag.get_user_dag_name()}_workflow_composition.js" + self.__openwhisk_composer_output_path = self.__openwhisk_build_dir / f"{self.__user_dag.get_user_dag_name()}_workflow_composition.json" + self.__openwhisk_workflow_redis_input = self.__openwhisk_build_dir / f"{self.__user_dag.get_user_dag_name()}_workflow_input.json" # required to allow parallel action in OpenWhisk def __create_environment(self): """ - TODO: Check if the generated folders need to have __init__.py (be a module) + Create the directories to put functions into. + * /functions/ directory holds the converted functions + * /artifacts/ directory contains the zip files to be deployed to OpenWhisk """ + # TODO: Check me removing and shizz + if os.path.exists(self.__openwhisk_functions_dir): + shutil.rmtree(self.__openwhisk_functions_dir) + + if os.path.exists(self.__openwhisk_artifacts_dir): + shutil.rmtree(self.__openwhisk_artifacts_dir) + + # Not deleting this to save the time spent on downloading nodejs + # if os.path.exists(self.__openwhisk_helpers_dir): + # shutil.rmtree(self.__openwhisk_helpers_dir) + self.__openwhisk_functions_dir.mkdir(parents=True, exist_ok=True) self.__openwhisk_artifacts_dir.mkdir(parents=True, exist_ok=True) + self.__openwhisk_helpers_dir.mkdir(parents=True, exist_ok=True) + self.__openwhisk_helpers_nodejs_dir.mkdir(parents=True, exist_ok=True) + + print("bp") def __render_runner_template(self, runner_template_dir, function_id, function_name, function_runner_filename): @@ -107,10 +129,8 @@ def __create_openwhisk_actions( fn_module_name, runner_template_filename, runner_template_dir, - runner_filename ): """ - TODO: Check WTF am I doing user_fn_path: Workflow code is taken from hardcoded XFaaS samples from this path """ fn_requirements_filename = "requirements.txt" @@ -130,12 +150,12 @@ def __create_openwhisk_actions( self.__append_xfaas_default_requirements(dst_requirements_path) # place the dependencies folder from the user function path if it exists - if os.path.exists(user_fn_path / "dependencies"): - shutil.copytree( - user_fn_path / "dependencies", - dst_fn_dir / "dependencies", - dirs_exist_ok=True - ) + # if os.path.exists(user_fn_path / "dependencies"): + # shutil.copytree( + # user_fn_path / "dependencies", + # dst_fn_dir / "dependencies", + # dirs_exist_ok=True + # ) logger.info(f"Moving xfaas boilerplate for {fn_name}") shutil.copytree(src=self.__serwo_utils_dir, dst=dst_fn_dir / "python", dirs_exist_ok=True) @@ -154,11 +174,18 @@ def __create_openwhisk_actions( with open(temp_runner_path, "w") as file: file.write(contents) - # TODO - Fix the stickytape issue: GitHub Issue link - https://github.com/dream-lab/XFaaS/issues/4 - logger.info(f"Stickytape the runner template for dependency resolution") - runner_file_path = dst_fn_dir / f"{runner_filename}.py" - os.system(f"stickytape {temp_runner_path} > {runner_file_path}") + # XFaaS code provided by the user comes as the module name due to + # import USER_FUNCTION_PLACEHOLDER being replaced by the module name + final_import_file_src_path = user_fn_path / f"{fn_module_name}.py" + final_import_file_dst_path = dst_fn_dir / f"{fn_module_name}.py" + final_runner_file_path = dst_fn_dir / "__main__.py" + + os.system(f"cp {final_import_file_src_path} {final_import_file_dst_path}") + + # Standalone runner from other clouds is replaced by __main__.py file for OpenWhisk + os.system(f"cp {temp_runner_path} {final_runner_file_path}") + logger.info(f"Deleting temporary runner") os.remove(temp_runner_path) @@ -167,23 +194,25 @@ def __create_openwhisk_actions( def __create_standalone_runners(self): """ - TODO: Create functions with __main__.py name + 1. The function entrypoint is __main__.py due to OpenWhisk deployment + requirements for large library imports + """ function_metadata_list = self.__user_dag.get_node_param_list() function_object_map = self.__user_dag.get_node_object_map() for function_metadata in function_metadata_list: - function_name = function_metadata["name"] - function_runner_filename = function_object_map[function_name].get_runner_filename() - - # generalize later - function_runner_filename = "__main__" + function_name = function_metadata["name"] + function_runner_filename = "__main__" # OpenWhisk requires the main file name to be __main__.py + + function_runner_filename = function_object_map[ + function_metadata["name"] + ].get_runner_filename() function_path = function_object_map[function_name].get_path() function_module_name = function_object_map[function_name].get_module_name() function_id = function_object_map[function_name].get_id() - # template the function runner template in the runner template directory runner_template_filename = self.__render_runner_template( runner_template_dir=self.__runner_template_dir, function_id=function_id, @@ -199,7 +228,6 @@ def __create_standalone_runners(self): function_module_name, function_runner_filename, self.__runner_template_dir, - runner_template_filename ) runner_template_filepath = self.__runner_template_dir / runner_template_filename @@ -209,31 +237,20 @@ def __create_standalone_runners(self): def __create_workflow_orchestrator(self, file_name_prefix): """ - Create composer.js file for openwhisk composer - TODO: Fix me -> Hardcoded AF js file + Generates a js file to send as an input to openwhisk composer. + See: https://github.com/apache/openwhisk-composer + TODO: Setup a local npm + node combo with openwhisk-composer library """ composer_file_path = self.__openwhisk_build_dir / f"{file_name_prefix}_workflow_composition.js" + redis_input_file_path = self.__openwhisk_build_dir / f"{file_name_prefix}_workflow_input.json" # required to allow parallel action in OpenWhisk - # TODO: Everything here is dummy stuff - # Another assumption here is that action creation will be taken care of at some other place - action_sequence = [] - for func_name in self.__user_dag.get_node_object_map(): - action_name = f"/guest/{self.__user_dag.get_user_dag_name()}/{func_name}" - action_sequence.append(f'composer.action("{action_name}")') + # ------------------------------------------------------------------------------------------------ + generated_code = self.__user_dag.get_orchestrator_code() - with open(composer_file_path, "w") as f: - f.write('const composer = require("openwhisk-composer");\n\n') - - temp = ", ".join(action_sequence) - f.write(f'module.exports = composer.sequence({temp});\n') - - # TODO: This is also temporary # To allow parallel workflows and other stuff, a redis instance is needed # with an input.json file with corresponding redis info - redis_input_file_path = self.__openwhisk_build_dir / f"{file_name_prefix}_input.json" - with open(redis_input_file_path, "w") as f: - f.write(""" + redis_input_file = """ { "$composer": { "redis": { @@ -244,15 +261,26 @@ def __create_workflow_orchestrator(self, file_name_prefix): } } } -""") +""" + # ------------------------------------------------------------------------------------------------ + with open(composer_file_path, "w") as f: + f.write(generated_code) + + with open(redis_input_file_path, "w") as f: + f.write(redis_input_file) def build_resources(self): """ - TODO: Implement me - 1. Create action compatible function files - 2. Create openwhisk composer files -> Check AWS logic for the graph creation + First of the 3 main functions for the workflow. + + 1. Creates OpenWhisk's action compatible source code function files + 2. TODO: Creates openwhisk composer js files """ + logger.info("*" * 10) + logger.info("Build Resources: Started") + logger.info("*" * 10) + logger.info(f"Creating environment for {self.__user_dag.get_user_dag_name()}") self.__create_environment() @@ -261,65 +289,146 @@ def build_resources(self): logger.info(f"Initating openwhisk composer js orchestrator for {self.__user_dag.get_user_dag_name()}") self.__create_workflow_orchestrator(self.__user_dag.get_user_dag_name()) - - # Some API gateway?? + + logger.info("*" * 10) + logger.info("Build Resources: Success") + logger.info("*" * 10) def build_workflow(self): """ - TODO: Implement me - TODO: Differentiate between docker actions vs pure python actions - 1. Create zip artifacts for all the functions - 2. Create json file out of the compose.js file - - Need wsk, node, npm + Second of the 3 main functions for the workflow. + - Creates zip artifacts for all the functions + - Downloads a local nodejs copy using "n" + - Converts the openwhisk-composer js file into json + + ------------ + Assumptions: + ------------ + - NodeJS is installed -> Right now using 'n' to install nodejs (https://www.npmjs.com/package/n) + - wsk tool is setup -> User's responsibility """ - - # Creates a zip artifact from each function + # Create a zip artifact from each function for func_name in self.__user_dag.get_node_object_map(): - # func = self.__user_dag.get_node_object_map()[func_name] + output_artifact_path = self.__openwhisk_build_dir/"artifacts"/f"{func_name}.zip" - py_file = self.__openwhisk_functions_dir / func_name / "__main__.py" - req_file = self.__openwhisk_functions_dir / func_name / "requirements.txt" + files_to_zip = [] + for file in os.listdir(self.__openwhisk_functions_dir/func_name): + if file.endswith(".py"): + curr_file_path = self.__openwhisk_functions_dir/func_name/file + # basename thing is required to unzip correctly + files_to_zip.append([curr_file_path, os.path.basename(curr_file_path)]) - output_artifact_path = self.__openwhisk_build_dir / "artifacts" / f"{func_name}.zip" - - # Zip the requirements and __main__.py from input_dir -> Write to output_dir - with zipfile.ZipFile(output_artifact_path, "w") as f: - f.write(py_file, os.path.basename(py_file)) - f.write(req_file, os.path.basename(req_file)) - - composer_input_path = self.__openwhisk_build_dir / f"{self.__user_dag.get_user_dag_name()}_workflow_composition.js" - composer_output_path = self.__openwhisk_build_dir / f"{self.__user_dag.get_user_dag_name()}_workflow_composition.json" - - # TODO: Change me... Be a better human than this - os.system(f"source ~/.bashrc && compose.js {composer_input_path} -o {composer_output_path}") + with zipfile.ZipFile(output_artifact_path, "w") as archive: + for fz in files_to_zip: + archive.write(filename=fz[0], arcname=os.path.basename(fz[1])) + + req_file_path = self.__openwhisk_functions_dir/func_name/"requirements.txt" + archive.write(filename=req_file_path, arcname=os.path.basename(req_file_path)) + + xfaas_folder_path = str(self.__openwhisk_functions_dir/func_name/"python"/"**"/"*") + for file in glob.glob(xfaas_folder_path, recursive=True): + path = file.split("python/")[1] + archive.write(filename=file, arcname=os.path.join("python", path)) + + logger.info(":" * 10) + logger.info("Installing nodejs and openwhisk-composer") + logger.info(":" * 10) + + # TODO: if nodejs doesn't exist here + builder_dir = str(self.__openwhisk_helpers_dir.resolve()) + builder_path = os.path.join(builder_dir, "builder.sh") + nodejs_local_dir = str(self.__openwhisk_helpers_nodejs_dir.resolve()) + subprocess.call(["sh", builder_path, nodejs_local_dir]) + + logger.info(":" * 10) + logger.info("Installing nodejs and openwhisk-composer: SUCCESS") + logger.info(":" * 10) + + logger.info(":" * 10) + logger.info("Creating openwhisk-composer files") + logger.info(":" * 10) + + ow_composer_binary_path = os.path.join(nodejs_local_dir, "node_modules", "openwhisk-composer", "bin", "compose.js") + os.system(f"{ow_composer_binary_path} {self.__openwhisk_composer_input_path} -o {self.__openwhisk_composer_output_path}") + + logger.info(":" * 10) + logger.info("Creating openwhisk-composer files: SUCCESS") + logger.info(":" * 10) def deploy_workflow(self): """ - TODO: Setup wsk if not present somehow - 1. Deploy all the actions - 2. Deploy composition - 3. Web hooks and stuff - # Create or update the action - - TODO: wsk -i action invoke /guest/graph-workflow -P input.json --result -> Handle this input.json thingy somehow - TODO: Throw Exception if wsk command is not working -> Or find a better way to connect to the cluster + Third of the 3 main functions for the workflow. + 1. Removes existing actions with the current workflow's name + 2. Deploys all the actions + 3. Creates openwhisk-compatible orchestrator json file using third-party tool + 4. Deploys the orchestrator action + + ----- + TODO: + 1. Figure out web api mode, web hooks and related stuff + 2. Handle the input.json somehow to allow parallel workflow actions - "wsk -i action invoke /guest/graph-workflow -P input.json" + 3. Throw Exception if wsk command is not working -> Or find a better way to connect to the cluster + ----- """ - logger.info(f"Deleting any existing package with same name") + logger.info(":" * 10) + logger.info("Deleting any existing OpenWhisk components") + logger.info(":" * 10) + + # ----------------- Existing OpenWhisk components deletion ----------------- + # TODO: Limit this step to current workflow name or some package name + for node_name in self.__user_dag.get_node_object_map(): + curr_action_name = f"/guest/{self.__user_dag.get_user_dag_name()}/{node_name}" + try: + os.system(f"wsk -i action delete {curr_action_name}") + except Exception as e: + # TODO: Handle me gracefully + print("Either the openwhisk action does not exist or some error happened") + print(e) + + try: + os.system(f"wsk -i action delete {self.__openwhisk_workflow_orchestrator_action_name}") + except Exception as e: + # TODO: Handle me gracefully + print("Either the openwhisk workflow orchestrator does not exist or some error happened") + print(e) - # TODO: Check if deleting a package deletes all the actions automatically? - os.system(f"wsk -i package delete {self.__user_dag.get_user_dag_name()}") - os.system(f"wsk -i package create {self.__user_dag.get_user_dag_name()}") + try: + # TODO: This creates the package in the "/guest/" namespace, need to figure out how to change that + # Hints: Helm changes are required + os.system(f"wsk -i package delete {self.__user_dag.get_user_dag_name()}") + os.system(f"wsk -i package create {self.__user_dag.get_user_dag_name()}") + except Exception as e: + # TODO: Handle me gracefully + print("Either the openwhisk package does not exist or some error happened") + print(e) + # ------------------------------------------------------------------------- + + # ----------------- New OpenWhisk components creation ----------------- + # Creating the actions manually using the wsk tool + logger.info(":" * 10) + logger.info("Deploying OpenWhisk action for each function") + logger.info(":" * 10) - # Create actions manually for func_name in self.__user_dag.get_node_object_map(): action_name = f"/guest/{self.__user_dag.get_user_dag_name()}/{func_name}" action_zip_path = self.__openwhisk_artifacts_dir / func_name / ".zip" os.system(f"wsk -i action create {action_name} --kind python:3 {action_zip_path} --timeout 300000 --concurrency 10") - # Create composition - composition_name = f"{self.__user_dag.get_user_dag_name()}-composition" - composer_config_path = self.__openwhisk_build_dir / f"{self.__user_dag.get_user_dag_name()}_workflow_composition.json" - os.system(f"source ~/.bashrc && deploy.js {composition_name} {composer_config_path} -w -i") + logger.info(":" * 10) + logger.info("Deploying OpenWhisk action for each function: SUCCESS") + logger.info(":" * 10) + + logger.info(":" * 10) + logger.info("Deploying OpenWhisk orchestrator action") + logger.info(":" * 10) + + nodejs_local_dir = str(self.__openwhisk_helpers_nodejs_dir.resolve()) + ow_deployer_binary_path = os.path.join(nodejs_local_dir, "node_modules", "openwhisk-composer", "bin", "deploy.js") + os.system(f"{ow_deployer_binary_path} {self.__openwhisk_workflow_orchestrator_action_name} {self.__openwhisk_composer_output_path} -w -i") + + logger.info(":" * 10) + logger.info("Deploying OpenWhisk orchestrator action: SUCCESS") + logger.info(":" * 10) + # ------------------------------------------------------------------------- diff --git a/serwo/python/src/runner-templates/openwhisk/runner_template.py b/serwo/python/src/runner-templates/openwhisk/runner_template.py index 7a4c2db..532ecb1 100644 --- a/serwo/python/src/runner-templates/openwhisk/runner_template.py +++ b/serwo/python/src/runner-templates/openwhisk/runner_template.py @@ -16,11 +16,9 @@ downstream = 0 - def get_delta(timestamp): return round(time.time() * 1000) - timestamp - # Openwhisk entry point def main(args): start_time = round(time.time() * 1000) @@ -128,4 +126,4 @@ def main(args): if __name__ == "__main__": - print("Main Method") + print("Main Method: Nothing is executed") diff --git a/serwo/python/src/utils/classes/commons/csp.py b/serwo/python/src/utils/classes/commons/csp.py index 35b3c5a..46e0837 100644 --- a/serwo/python/src/utils/classes/commons/csp.py +++ b/serwo/python/src/utils/classes/commons/csp.py @@ -24,11 +24,16 @@ def build_resources(self, user_dir, dag_definition_path, region, part_id, dag_de aws_deployer.build_resources() aws_deployer.build_workflow() aws_deployer.deploy_workflow() - elif self.__name == 'openwhisk': - # TODO: Convert me to a generic implementation for private clouds + elif self.__name.lower() == 'openwhisk': private_cloud_deployer = OpenWhisk(user_dir=user_dir, dag_file_name=dag_definition_file, part_id=part_id) + + print(":" * 80, "Generating resources for OpenWhisk Actions") private_cloud_deployer.build_resources() + + print(":" * 80, "Generatinng workflow files for OpenWhisk") private_cloud_deployer.build_workflow() + + print(":" * 80, "Deploying OpenWhisk") private_cloud_deployer.deploy_workflow() diff --git a/serwo/python/src/utils/classes/openwhisk/user_dag.py b/serwo/python/src/utils/classes/openwhisk/user_dag.py index 55d15ec..a006ffb 100644 --- a/serwo/python/src/utils/classes/openwhisk/user_dag.py +++ b/serwo/python/src/utils/classes/openwhisk/user_dag.py @@ -1,5 +1,9 @@ import json +import copy +import random +import string import networkx as nx +from collections import defaultdict from .function import Function @@ -21,6 +25,8 @@ def __init__(self, user_config_path) -> None: for index, node in enumerate(self.__dag_config_data["Nodes"]): nodeID = "n" + str(index+1) + # nodeID = "n" + node["NodeName"] # Uncomment to make debugging easier + self.__nodeIDMap[node["NodeName"]] = nodeID self.__nodeIDMap[node["NodeId"]] = nodeID self.__functions[node["NodeName"]] = Function( @@ -31,9 +37,9 @@ def __init__(self, user_config_path) -> None: memory=node["MemoryInMB"], ) - # TODO: Add support for private cloud related params here in _get_state() - # TODO: AWS reference also stores ARN and retry, backoff, max attempts etc in __dag - # generate hardcoded Action's package, etc here? + workflowName = self.__dag_config_data["WorkflowName"] + nodeName = node["NodeName"] + self.__dag.add_node( nodeID, NodeName=node["NodeName"], @@ -41,6 +47,10 @@ def __init__(self, user_config_path) -> None: EntryPoint=node["EntryPoint"], CSP=node.get("CSP"), MemoryInMB=node["MemoryInMB"], + machine_list=[nodeID], + pre="", + ret=[f'composer.action("/guest/{workflowName}/{nodeName}")'], + var=self._generate_random_variable_name(), ) for edge in self.__dag_config_data["Edges"]: @@ -48,22 +58,22 @@ def __init__(self, user_config_path) -> None: for val in edge[key]: self.__dag.add_edge(self.__nodeIDMap[key], self.__nodeIDMap[val]) - + def _generate_random_variable_name(self, n=4): + res = ''.join(random.choices(string.ascii_letters, k=n)) + return str(res).lower() + def __load_user_spec(self, user_config_path): with open(user_config_path, "r") as user_dag_spec: dag_data = json.load(user_dag_spec) return dag_data - def get_user_dag_name(self): return self.__dag_config_data["WorkflowName"] - def get_node_object_map(self): return self.__functions - def get_node_param_list(self): functions_list = [] for f in self.__functions.values(): @@ -71,3 +81,164 @@ def get_node_param_list(self): return functions_list + def _get_orchestrator_code_linear_merge(self, graph: nx.DiGraph, nodes): + """ + TODO: Complete me + """ + pre = "" + last = nodes[-1] + previous_var = None + + return "", "", "" + + def _get_orchestrator_code_parallel_merge(self, graph: nx.DiGraph, nodes): + """ + TODO: Complete me + """ + pre = "" + last = nodes[-1] + previous_var = None + + return "", "", "" + + def _merge_linear_nodes(self, graph: nx.DiGraph, node_list: list): + """ + Doesn't return anything since operating on deep copy of graphs might get heavy. + Works on side effect on nx.Digraph. + """ + if len(node_list) == 0: + return + + new_node_machine_list = [] + for node in node_list: + new_node_machine_list.extend(graph.nodes[node]["machine_list"]) + + new_node_id = "n" + str(node_list) + pre, ret, var = self._get_orchestrator_code_linear_merge(graph, node_list) + graph.add_node(new_node_id, pre=pre, ret=ret, var=var, machine_list=new_node_machine_list) + + for u, v in list(graph.edges()): + if v == node_list[0]: + # replace the first node of the sequence with sequence node's id + graph.add_edge(u, new_node_id) + + if u == node_list[-1]: + # make the last node of the sequence point to sequence's next node (where sequence ends) + graph.add_edge(new_node_id, v) + + # remove the individual nodes + for n in node_list: + graph.remove_node(n) + + return + + + def _collapse_linear_chains(self, graph: nx.DiGraph): + """ + Doesn't return anything since operating on deep copy of graphs might get heavy. + Works on side effect on nx.Digraph. + """ + start_node = [node for node in graph.nodes if graph.in_degree(node) == 0][0] + dfs_edges = list(nx.dfs_edges(graph, source=start_node)) + + linear_chain = [] + set_of_linear_chains = set() + for u, v in dfs_edges: + if graph.out_degree(u) == 1 and graph.in_degree(v) == 1: + if u not in linear_chain: + linear_chain.append(u) + + if v not in linear_chain: + linear_chain.append(v) + else: + if linear_chain: + set_of_linear_chains.add(tuple(linear_chain)) + + linear_chain = [] + + if linear_chain != []: + set_of_linear_chains.add(tuple(linear_chain)) + linear_chain = [] + + for chain in set_of_linear_chains: + node_list = list(chain) + self._merge_linear_nodes(graph, node_list) + + return + + def _merge_parallel_nodes(self, graph: nx.DiGraph, node_list): + """ + Doesn't return anything since operating on deep copy of graphs might get heavy. + Works on side effect on nx.Digraph. + """ + if len(node_list) == 0: + return + + new_node_machine_list = [] + for node in node_list: + new_node_machine_list.append(graph.nodes[node]["machine_list"]) + + # since we are only merging diamonds (same predecessor, same successor) + predecessor = list(graph.predecessors(node_list[0]))[0] + successor = list(graph.successors(node_list[0]))[0] + + new_node_id = "n" + str(new_node_machine_list) + pre, ret, var = self._get_orchestrator_code_parallel_merge(graph, node_list) + graph.add_node(new_node_id, pre=pre, ret=ret, var=var, machine_list=[new_node_machine_list]) + + for node in node_list: + graph.remove_node(node) + + graph.add_edge(predecessor, new_node_id) + graph.add_edge(new_node_id, successor) + + return + + def _collapse_parallel_chains(self, graph: nx.DiGraph): + """ + Doesn't return anything since operating on deep copy of graphs might get heavy. + Works on side effect on nx.Digraph. + """ + start_node = [node for node in graph.nodes if graph.in_degree(node) == 0][0] + dfs_nodes = list(nx.dfs_preorder_nodes(graph, source=start_node)) + + set_of_parallel_chains = set() + for curr_node in dfs_nodes: + curr_node_succ = list(graph.successors(curr_node)) + diamond_forming_node = [] + + for succ in curr_node_succ: + if graph.out_degree(succ) == 1: + diamond_forming_node.append(succ) + + group_by_succ_dict = defaultdict(list) + for node in diamond_forming_node: + succ = list(graph.successors(node))[0] + group_by_succ_dict[succ].append(node) + + for val in group_by_succ_dict.values(): + if len(val) > 1: + set_of_parallel_chains.add(tuple(val)) + + for chain in set_of_parallel_chains: + chain_list = list(chain) + self._merge_parallel_nodes(graph, chain_list) + + return + + def get_orchestrator_code(self): + """ + Breaker of linear and parallel chains. + TODO: Actually generate the generated_code from the updated graph + """ + output_dag = copy.deepcopy(self.__dag) + while len(output_dag.nodes()) != 1: + self._collapse_linear_chains(output_dag) + self._collapse_parallel_chains(output_dag) + + generated_code = """ +const composer = require("openwhisk-composer"); + +module.exports = composer.sequence("/guest/graphs/graph_gen", composer.parallel("/guest/graphs/graph_bft", "/guest/graphs/pagerank", "/guest/graphs/graph_mst"), "/guest/graphs/aggregate"); +""" + return generated_code \ No newline at end of file From c9bd2bc56a3fcef614de6606dd7837a46d58059f Mon Sep 17 00:00:00 2001 From: rajdeep1008 Date: Fri, 14 Jun 2024 11:20:38 +0530 Subject: [PATCH 03/18] Wrapped up the openwhisk worflow files creation Changelog: 1. Added a local nodejs and npm installation for openwhisk-compose 2. Added code to generate the orchestrator.js file for openwhisk workflow --- serwo/openwhisk_create_statemachine.py | 102 ++++++++++++------ .../openwhisk/local_nodejs_installer.sh | 4 + serwo/python/src/utils/classes/commons/csp.py | 2 +- 3 files changed, 72 insertions(+), 36 deletions(-) create mode 100644 serwo/python/src/runner-templates/openwhisk/local_nodejs_installer.sh diff --git a/serwo/openwhisk_create_statemachine.py b/serwo/openwhisk_create_statemachine.py index 272f01c..5975a3b 100644 --- a/serwo/openwhisk_create_statemachine.py +++ b/serwo/openwhisk_create_statemachine.py @@ -68,8 +68,9 @@ def __create_environment(self): self.__openwhisk_artifacts_dir.mkdir(parents=True, exist_ok=True) self.__openwhisk_helpers_dir.mkdir(parents=True, exist_ok=True) self.__openwhisk_helpers_nodejs_dir.mkdir(parents=True, exist_ok=True) - - print("bp") + + # Copying the bash script to install a local nodejs copy (used by openwhisk-composer) + shutil.copyfile(src=self.__runner_template_dir/"local_nodejs_installer.sh", dst=self.__openwhisk_helpers_dir/"local_nodejs_installer.sh") def __render_runner_template(self, runner_template_dir, function_id, function_name, function_runner_filename): @@ -174,7 +175,6 @@ def __create_openwhisk_actions( with open(temp_runner_path, "w") as file: file.write(contents) - # XFaaS code provided by the user comes as the module name due to # import USER_FUNCTION_PLACEHOLDER being replaced by the module name final_import_file_src_path = user_fn_path / f"{fn_module_name}.py" @@ -194,9 +194,8 @@ def __create_openwhisk_actions( def __create_standalone_runners(self): """ - 1. The function entrypoint is __main__.py due to OpenWhisk deployment + The function entrypoint is __main__.py due to OpenWhisk deployment requirements for large library imports - """ function_metadata_list = self.__user_dag.get_node_param_list() function_object_map = self.__user_dag.get_node_object_map() @@ -239,8 +238,6 @@ def __create_workflow_orchestrator(self, file_name_prefix): """ Generates a js file to send as an input to openwhisk composer. See: https://github.com/apache/openwhisk-composer - - TODO: Setup a local npm + node combo with openwhisk-composer library """ composer_file_path = self.__openwhisk_build_dir / f"{file_name_prefix}_workflow_composition.js" redis_input_file_path = self.__openwhisk_build_dir / f"{file_name_prefix}_workflow_input.json" # required to allow parallel action in OpenWhisk @@ -277,9 +274,9 @@ def build_resources(self): 1. Creates OpenWhisk's action compatible source code function files 2. TODO: Creates openwhisk composer js files """ - logger.info("*" * 10) + logger.info("*" * 30) logger.info("Build Resources: Started") - logger.info("*" * 10) + logger.info("*" * 30) logger.info(f"Creating environment for {self.__user_dag.get_user_dag_name()}") self.__create_environment() @@ -290,9 +287,9 @@ def build_resources(self): logger.info(f"Initating openwhisk composer js orchestrator for {self.__user_dag.get_user_dag_name()}") self.__create_workflow_orchestrator(self.__user_dag.get_user_dag_name()) - logger.info("*" * 10) + logger.info("*" * 30) logger.info("Build Resources: Success") - logger.info("*" * 10) + logger.info("*" * 30) def build_workflow(self): @@ -305,7 +302,6 @@ def build_workflow(self): ------------ Assumptions: ------------ - - NodeJS is installed -> Right now using 'n' to install nodejs (https://www.npmjs.com/package/n) - wsk tool is setup -> User's responsibility """ # Create a zip artifact from each function @@ -331,30 +327,66 @@ def build_workflow(self): path = file.split("python/")[1] archive.write(filename=file, arcname=os.path.join("python", path)) - logger.info(":" * 10) + logger.info(":" * 30) logger.info("Installing nodejs and openwhisk-composer") - logger.info(":" * 10) + logger.info(":" * 30) - # TODO: if nodejs doesn't exist here builder_dir = str(self.__openwhisk_helpers_dir.resolve()) - builder_path = os.path.join(builder_dir, "builder.sh") + builder_path = os.path.join(builder_dir, "local_nodejs_installer.sh") nodejs_local_dir = str(self.__openwhisk_helpers_nodejs_dir.resolve()) - subprocess.call(["sh", builder_path, nodejs_local_dir]) - logger.info(":" * 10) + # Only downloading a local NodeJS copy if one doesn't exist already with required libraries installed + ow_composer_binary_path = os.path.join(nodejs_local_dir, "node_modules", "openwhisk-composer", "bin", "compose.js") + if not os.path.exists(ow_composer_binary_path): + logger.info("Installing NodeJS using 'https://raw.githubusercontent.com/tj/n/master/bin/n'") + nodejs_installation_status = subprocess.run(["sh", builder_path, nodejs_local_dir]) + + if nodejs_installation_status.returncode == 0: + logger.info("NodeJS installation success") + else: + # TODO: Handle me gracefully + logger.info("NodeJS installation failure") + raise Exception("NodeJS installation failed") + + # ------------------------------------------------------------------------------------------ + # Doing a "$ npm install openwhisk-composer" from the local npm binary + logger.info("Starting to install openwhisk-composer using npm: See 'https://github.com/apache/openwhisk-composer'") + + local_npm_binary_path = self.__openwhisk_helpers_nodejs_dir/"bin"/"npm" + ow_composer_installation_status = subprocess.run([str(local_npm_binary_path.resolve()), "install", "--prefix", nodejs_local_dir, "openwhisk-composer"]) + if ow_composer_installation_status.returncode == 0: + logger.info("Successfully installed openwhisk-composer") + else: + # TODO: Handle me gracefully + logger.info("openwhisk-composer installation failure") + raise Exception("openwhisk-composer installation failed") + # ------------------------------------------------------------------------------------------ + else: + logger.info(":" * 30) + logger.info("Required local NodeJS already exists in the build directory") + logger.info(":" * 30) + + logger.info(":" * 30) logger.info("Installing nodejs and openwhisk-composer: SUCCESS") - logger.info(":" * 10) + logger.info(":" * 30) - logger.info(":" * 10) - logger.info("Creating openwhisk-composer files") - logger.info(":" * 10) + logger.info(":" * 30) + logger.info("Creating workflow composition files using openwhisk-composer") + logger.info(":" * 30) ow_composer_binary_path = os.path.join(nodejs_local_dir, "node_modules", "openwhisk-composer", "bin", "compose.js") - os.system(f"{ow_composer_binary_path} {self.__openwhisk_composer_input_path} -o {self.__openwhisk_composer_output_path}") + orchestrator_creation_status = subprocess.run([ + ow_composer_binary_path, str(self.__openwhisk_composer_input_path.resolve()), + "-o", str(self.__openwhisk_composer_output_path.resolve()), + ]) + + if orchestrator_creation_status.returncode != 0: + logger.info("Orchestrator file creation failed") + raise Exception("Orchestrator file creation failed") - logger.info(":" * 10) + logger.info(":" * 30) logger.info("Creating openwhisk-composer files: SUCCESS") - logger.info(":" * 10) + logger.info(":" * 30) def deploy_workflow(self): @@ -372,9 +404,9 @@ def deploy_workflow(self): 3. Throw Exception if wsk command is not working -> Or find a better way to connect to the cluster ----- """ - logger.info(":" * 10) + logger.info(":" * 30) logger.info("Deleting any existing OpenWhisk components") - logger.info(":" * 10) + logger.info(":" * 30) # ----------------- Existing OpenWhisk components deletion ----------------- # TODO: Limit this step to current workflow name or some package name @@ -407,28 +439,28 @@ def deploy_workflow(self): # ----------------- New OpenWhisk components creation ----------------- # Creating the actions manually using the wsk tool - logger.info(":" * 10) + logger.info(":" * 30) logger.info("Deploying OpenWhisk action for each function") - logger.info(":" * 10) + logger.info(":" * 30) for func_name in self.__user_dag.get_node_object_map(): action_name = f"/guest/{self.__user_dag.get_user_dag_name()}/{func_name}" action_zip_path = self.__openwhisk_artifacts_dir / func_name / ".zip" os.system(f"wsk -i action create {action_name} --kind python:3 {action_zip_path} --timeout 300000 --concurrency 10") - logger.info(":" * 10) + logger.info(":" * 30) logger.info("Deploying OpenWhisk action for each function: SUCCESS") - logger.info(":" * 10) + logger.info(":" * 30) - logger.info(":" * 10) + logger.info(":" * 30) logger.info("Deploying OpenWhisk orchestrator action") - logger.info(":" * 10) + logger.info(":" * 30) nodejs_local_dir = str(self.__openwhisk_helpers_nodejs_dir.resolve()) ow_deployer_binary_path = os.path.join(nodejs_local_dir, "node_modules", "openwhisk-composer", "bin", "deploy.js") os.system(f"{ow_deployer_binary_path} {self.__openwhisk_workflow_orchestrator_action_name} {self.__openwhisk_composer_output_path} -w -i") - logger.info(":" * 10) + logger.info(":" * 30) logger.info("Deploying OpenWhisk orchestrator action: SUCCESS") - logger.info(":" * 10) + logger.info(":" * 30) # ------------------------------------------------------------------------- diff --git a/serwo/python/src/runner-templates/openwhisk/local_nodejs_installer.sh b/serwo/python/src/runner-templates/openwhisk/local_nodejs_installer.sh new file mode 100644 index 0000000..0df5f7b --- /dev/null +++ b/serwo/python/src/runner-templates/openwhisk/local_nodejs_installer.sh @@ -0,0 +1,4 @@ +NODEJS_DIR=$1 + +export N_PREFIX=$NODEJS_DIR +curl -fsSL https://raw.githubusercontent.com/tj/n/master/bin/n | bash -s lts diff --git a/serwo/python/src/utils/classes/commons/csp.py b/serwo/python/src/utils/classes/commons/csp.py index 46e0837..f253473 100644 --- a/serwo/python/src/utils/classes/commons/csp.py +++ b/serwo/python/src/utils/classes/commons/csp.py @@ -30,7 +30,7 @@ def build_resources(self, user_dir, dag_definition_path, region, part_id, dag_de print(":" * 80, "Generating resources for OpenWhisk Actions") private_cloud_deployer.build_resources() - print(":" * 80, "Generatinng workflow files for OpenWhisk") + print(":" * 80, "Generating workflow files for OpenWhisk") private_cloud_deployer.build_workflow() print(":" * 80, "Deploying OpenWhisk") From b426c053290aeff0ce48d11a472d9f29d8c3f158 Mon Sep 17 00:00:00 2001 From: rajdeep1008 Date: Mon, 17 Jun 2024 13:15:19 +0530 Subject: [PATCH 04/18] Changelog: 1. Actual user dag is now being used for openwhisk orchestrator code generation 2. Need to test the openwhisk code generation for all the existing workflows 3. Isn't tested on multi cloud cases --- .../src/utils/classes/openwhisk/user_dag.py | 134 +++++++++++++----- 1 file changed, 100 insertions(+), 34 deletions(-) diff --git a/serwo/python/src/utils/classes/openwhisk/user_dag.py b/serwo/python/src/utils/classes/openwhisk/user_dag.py index a006ffb..fd4322b 100644 --- a/serwo/python/src/utils/classes/openwhisk/user_dag.py +++ b/serwo/python/src/utils/classes/openwhisk/user_dag.py @@ -2,6 +2,7 @@ import copy import random import string +import itertools import networkx as nx from collections import defaultdict @@ -26,6 +27,7 @@ def __init__(self, user_config_path) -> None: for index, node in enumerate(self.__dag_config_data["Nodes"]): nodeID = "n" + str(index+1) # nodeID = "n" + node["NodeName"] # Uncomment to make debugging easier + nodeVar = self._generate_random_variable_name() self.__nodeIDMap[node["NodeName"]] = nodeID self.__nodeIDMap[node["NodeId"]] = nodeID @@ -48,9 +50,11 @@ def __init__(self, user_config_path) -> None: CSP=node.get("CSP"), MemoryInMB=node["MemoryInMB"], machine_list=[nodeID], - pre="", - ret=[f'composer.action("/guest/{workflowName}/{nodeName}")'], - var=self._generate_random_variable_name(), + var_machine_list=[nodeVar], + ret=[f'composer.action("/guest/{workflowName}/{nodeName}")'], # only applicable for leaf nodes (not for sequence/parallel) + var=nodeVar, + code_generation_done=False, + node_type="action", # Can be one of [action/parallel/sequence] ) for edge in self.__dag_config_data["Edges"]: @@ -81,26 +85,6 @@ def get_node_param_list(self): return functions_list - def _get_orchestrator_code_linear_merge(self, graph: nx.DiGraph, nodes): - """ - TODO: Complete me - """ - pre = "" - last = nodes[-1] - previous_var = None - - return "", "", "" - - def _get_orchestrator_code_parallel_merge(self, graph: nx.DiGraph, nodes): - """ - TODO: Complete me - """ - pre = "" - last = nodes[-1] - previous_var = None - - return "", "", "" - def _merge_linear_nodes(self, graph: nx.DiGraph, node_list: list): """ Doesn't return anything since operating on deep copy of graphs might get heavy. @@ -110,12 +94,26 @@ def _merge_linear_nodes(self, graph: nx.DiGraph, node_list: list): return new_node_machine_list = [] + new_node_var_machine_list = [] for node in node_list: new_node_machine_list.extend(graph.nodes[node]["machine_list"]) + if graph.nodes[node]["node_type"] == "action": + new_node_var_machine_list.extend(graph.nodes[node]["var_machine_list"]) + else: + # new_node_var_machine_list.extend(graph.nodes[node]["var"]) + if type(graph.nodes[node]["var"]) == str: + new_node_var_machine_list.extend([graph.nodes[node]["var"]]) + else: + new_node_var_machine_list.extend(graph.nodes[node]["var"]) + new_node_id = "n" + str(node_list) - pre, ret, var = self._get_orchestrator_code_linear_merge(graph, node_list) - graph.add_node(new_node_id, pre=pre, ret=ret, var=var, machine_list=new_node_machine_list) + graph.add_node( + new_node_id, var=self._generate_random_variable_name(), + machine_list=new_node_machine_list, + var_machine_list=new_node_var_machine_list, + node_type="sequence", code_generation_done=False, + ) for u, v in list(graph.edges()): if v == node_list[0]: @@ -175,16 +173,31 @@ def _merge_parallel_nodes(self, graph: nx.DiGraph, node_list): return new_node_machine_list = [] + new_node_var_machine_list = [] for node in node_list: new_node_machine_list.append(graph.nodes[node]["machine_list"]) + # new_node_var_machine_list.append(graph.nodes[node]["var_machine_list"]) + + if graph.nodes[node]["node_type"] == "action": + new_node_var_machine_list.extend(graph.nodes[node]["var_machine_list"]) + else: + # new_node_var_machine_list.append(graph.nodes[node]["var"]) + if type(graph.nodes[node]["var"]) == str: + new_node_var_machine_list.extend([graph.nodes[node]["var"]]) + else: + new_node_var_machine_list.extend(graph.nodes[node]["var"]) # since we are only merging diamonds (same predecessor, same successor) predecessor = list(graph.predecessors(node_list[0]))[0] successor = list(graph.successors(node_list[0]))[0] new_node_id = "n" + str(new_node_machine_list) - pre, ret, var = self._get_orchestrator_code_parallel_merge(graph, node_list) - graph.add_node(new_node_id, pre=pre, ret=ret, var=var, machine_list=[new_node_machine_list]) + graph.add_node( + new_node_id, var=self._generate_random_variable_name(), + machine_list=[new_node_machine_list], + var_machine_list=new_node_var_machine_list, + node_type="parallel", code_generation_done=False, + ) for node in node_list: graph.remove_node(node) @@ -225,20 +238,73 @@ def _collapse_parallel_chains(self, graph: nx.DiGraph): self._merge_parallel_nodes(graph, chain_list) return + + def get_updated_nodes(self, dag): + """ + TODO: Change me to a better approach, I won't scale well for bigger graphs. + """ + updated_node_ids = [] + + start_node = [node for node in dag.nodes if dag.in_degree(node) == 0][0] + bfs_nodes = list(nx.bfs_layers(dag, sources=start_node)) + bfs_nodes = list(itertools.chain.from_iterable(bfs_nodes)) + for node_id in bfs_nodes: + if not dag.nodes[node_id]["code_generation_done"]: + updated_node_ids.append(node_id) + + return updated_node_ids def get_orchestrator_code(self): """ Breaker of linear and parallel chains. - TODO: Actually generate the generated_code from the updated graph + TODO: Test me thoroughly """ - output_dag = copy.deepcopy(self.__dag) + original_dag = self.__dag + output_dag = copy.deepcopy(original_dag) # preserving the original dag + generated_code = 'const composer = require("openwhisk-composer");\n\n' + + # Creating an action definition for each node of the graph + start_node = [node for node in output_dag.nodes if output_dag.in_degree(node) == 0][0] + bfs_nodes = list(nx.bfs_layers(output_dag, sources=start_node)) + bfs_nodes = list(itertools.chain.from_iterable(bfs_nodes)) + for curr_node in bfs_nodes: + generated_code += f"{output_dag.nodes[curr_node]['var']} = {output_dag.nodes[curr_node]['ret'][0]};\n"; + output_dag.nodes[curr_node]["code_generation_done"] = True + + iteration = 1 while len(output_dag.nodes()) != 1: self._collapse_linear_chains(output_dag) + new_linear_nodes = self.get_updated_nodes(output_dag) + if len(new_linear_nodes) > 0: + generated_code += f"\n// Iteration{iteration}: Sequence\n" + + for new_node in new_linear_nodes: + node = output_dag.nodes[new_node] + node["code_generation_done"] = True + + sub_node_machines = [] + for machine in node["var_machine_list"]: + sub_node_machines.append(machine) + + generated_code += f"{node['var']} = composer.sequence({', '.join(sub_node_machines)});\n"; + self._collapse_parallel_chains(output_dag) + new_parallel_nodes = self.get_updated_nodes(output_dag) + if len(new_parallel_nodes) > 0: + generated_code += f"\n// Iteration{iteration}: Parallel\n" - generated_code = """ -const composer = require("openwhisk-composer"); + for new_node in new_parallel_nodes: + node = output_dag.nodes[new_node] + node["code_generation_done"] = True + + sub_node_machines = [] + for machine in node["var_machine_list"]: + sub_node_machines.append(machine) + + generated_code += f"{node['var']} = composer.parallel({', '.join(sub_node_machines)});\n"; + + iteration += 1 -module.exports = composer.sequence("/guest/graphs/graph_gen", composer.parallel("/guest/graphs/graph_bft", "/guest/graphs/pagerank", "/guest/graphs/graph_mst"), "/guest/graphs/aggregate"); -""" - return generated_code \ No newline at end of file + generated_code += f"\nmodule.exports = {output_dag.nodes[list(output_dag.nodes)[0]]['var']};\n" + return generated_code + \ No newline at end of file From 12c8c322d946c79001c19cd7a14bc833c1e38c9c Mon Sep 17 00:00:00 2001 From: rajdeep1008 Date: Thu, 20 Jun 2024 19:40:47 +0530 Subject: [PATCH 05/18] Bug fixes related to nodejs on older linux --- install_nodejs.sh | 4 ++++ serwo/openwhisk_create_statemachine.py | 12 ++++++++---- 2 files changed, 12 insertions(+), 4 deletions(-) create mode 100644 install_nodejs.sh diff --git a/install_nodejs.sh b/install_nodejs.sh new file mode 100644 index 0000000..ada288f --- /dev/null +++ b/install_nodejs.sh @@ -0,0 +1,4 @@ +NODEJS_DIR=$1 + +export N_PREFIX=$NODEJS_DIR +curl -fsSL https://raw.githubusercontent.com/tj/n/master/bin/n | bash -s 14.20.0 diff --git a/serwo/openwhisk_create_statemachine.py b/serwo/openwhisk_create_statemachine.py index 5975a3b..34b75a3 100644 --- a/serwo/openwhisk_create_statemachine.py +++ b/serwo/openwhisk_create_statemachine.py @@ -352,8 +352,9 @@ def build_workflow(self): # Doing a "$ npm install openwhisk-composer" from the local npm binary logger.info("Starting to install openwhisk-composer using npm: See 'https://github.com/apache/openwhisk-composer'") + local_nodejs_binary_path = self.__openwhisk_helpers_nodejs_dir/"bin"/"node" local_npm_binary_path = self.__openwhisk_helpers_nodejs_dir/"bin"/"npm" - ow_composer_installation_status = subprocess.run([str(local_npm_binary_path.resolve()), "install", "--prefix", nodejs_local_dir, "openwhisk-composer"]) + ow_composer_installation_status = subprocess.run([str(local_nodejs_binary_path.resolve()), str(local_npm_binary_path.resolve()), "install", "--prefix", nodejs_local_dir, "openwhisk-composer"]) if ow_composer_installation_status.returncode == 0: logger.info("Successfully installed openwhisk-composer") else: @@ -374,9 +375,11 @@ def build_workflow(self): logger.info("Creating workflow composition files using openwhisk-composer") logger.info(":" * 30) + local_nodejs_binary_path = self.__openwhisk_helpers_nodejs_dir/"bin"/"node" ow_composer_binary_path = os.path.join(nodejs_local_dir, "node_modules", "openwhisk-composer", "bin", "compose.js") orchestrator_creation_status = subprocess.run([ - ow_composer_binary_path, str(self.__openwhisk_composer_input_path.resolve()), + local_nodejs_binary_path, ow_composer_binary_path, + str(self.__openwhisk_composer_input_path.resolve()), "-o", str(self.__openwhisk_composer_output_path.resolve()), ]) @@ -445,7 +448,7 @@ def deploy_workflow(self): for func_name in self.__user_dag.get_node_object_map(): action_name = f"/guest/{self.__user_dag.get_user_dag_name()}/{func_name}" - action_zip_path = self.__openwhisk_artifacts_dir / func_name / ".zip" + action_zip_path = self.__openwhisk_artifacts_dir / f"{func_name}.zip" os.system(f"wsk -i action create {action_name} --kind python:3 {action_zip_path} --timeout 300000 --concurrency 10") logger.info(":" * 30) @@ -457,8 +460,9 @@ def deploy_workflow(self): logger.info(":" * 30) nodejs_local_dir = str(self.__openwhisk_helpers_nodejs_dir.resolve()) + local_nodejs_binary_path = self.__openwhisk_helpers_nodejs_dir/"bin"/"node" ow_deployer_binary_path = os.path.join(nodejs_local_dir, "node_modules", "openwhisk-composer", "bin", "deploy.js") - os.system(f"{ow_deployer_binary_path} {self.__openwhisk_workflow_orchestrator_action_name} {self.__openwhisk_composer_output_path} -w -i") + os.system(f"{local_nodejs_binary_path} {ow_deployer_binary_path} {self.__openwhisk_workflow_orchestrator_action_name} {self.__openwhisk_composer_output_path} -w -i") logger.info(":" * 30) logger.info("Deploying OpenWhisk orchestrator action: SUCCESS") From 6833e8639f1b3a5774ee7e67b767cc0acb3037c5 Mon Sep 17 00:00:00 2001 From: rajdeep1008 Date: Sun, 23 Jun 2024 17:26:33 +0530 Subject: [PATCH 06/18] More bug fixes related to redis --- serwo/openwhisk_create_statemachine.py | 5 +++- .../openwhisk/runner_template.py | 26 +++++++++++++++++-- 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/serwo/openwhisk_create_statemachine.py b/serwo/openwhisk_create_statemachine.py index 34b75a3..bba3a5c 100644 --- a/serwo/openwhisk_create_statemachine.py +++ b/serwo/openwhisk_create_statemachine.py @@ -247,11 +247,14 @@ def __create_workflow_orchestrator(self, file_name_prefix): # To allow parallel workflows and other stuff, a redis instance is needed # with an input.json file with corresponding redis info + # TODO: Change me to be picked up from a config file redis_input_file = """ { "$composer": { "redis": { - "uri": "redis://owdev-redis.openwhisk.svc.cluster.local:6379" + "uri": { + "url": "redis://owdev-redis.openwhisk.svc.cluster.local:6379" + } }, "openwhisk": { "ignore_certs": true diff --git a/serwo/python/src/runner-templates/openwhisk/runner_template.py b/serwo/python/src/runner-templates/openwhisk/runner_template.py index 532ecb1..2429fc4 100644 --- a/serwo/python/src/runner-templates/openwhisk/runner_template.py +++ b/serwo/python/src/runner-templates/openwhisk/runner_template.py @@ -3,6 +3,7 @@ SerwoListObject - [SerwoObject] """ import os +import uuid import time import psutil import objsize @@ -20,7 +21,27 @@ def get_delta(timestamp): return round(time.time() * 1000) - timestamp # Openwhisk entry point -def main(args): +def main(event): + # TODO: + # 1. Check if this is correct approach for the entrypoint function of the workflow + # 2. Check if this caters the list type usecase too + # 3. uuid won't be the desired solution. Check if all this apply in private cloud's case or not + if event == None or len(event) == 0: + event = { + "workflow_instance_id": str(uuid.uuid4()), + "request_timestamp": round(time.time() * 1000), # This is most probably wrong + "session_id": str(uuid.uuid4()), + "deployment_id": str(uuid.uuid4()), + } + + # TODO: Check if this helps + print("*" * 10) + act_id = os.environ.get("__OW_ACTIVATION_ID", "lalala") + trans_id = os.environ.get("__OW_TRANSACTION_ID", "lalala") + print(act_id) + print(act_id) + print("*" * 10) + start_time = round(time.time() * 1000) # capturing input payload size input_payload_size_bytes = None @@ -31,7 +52,7 @@ def main(args): # Calculate input payload size input_payload_size_bytes = sum([objsize.get_deep_size(x.get_body()) for x in serwo_request_object.get_objects()]) - + elif isinstance(event, dict): if "metadata" not in event: new_event = deepcopy(event) @@ -55,6 +76,7 @@ def main(args): else: # TODO: Report error and return pass + serwo_request_object.set_basepath("") # user function exec status_code = 200 From 4b5f0594ce24d01106bf0ceebfef82928cca56a6 Mon Sep 17 00:00:00 2001 From: rajdeep1008 Date: Sun, 23 Jun 2024 17:35:52 +0530 Subject: [PATCH 07/18] Updated timeout for orchestrator code --- serwo/openwhisk_create_statemachine.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/serwo/openwhisk_create_statemachine.py b/serwo/openwhisk_create_statemachine.py index bba3a5c..85cfb4c 100644 --- a/serwo/openwhisk_create_statemachine.py +++ b/serwo/openwhisk_create_statemachine.py @@ -466,6 +466,9 @@ def deploy_workflow(self): local_nodejs_binary_path = self.__openwhisk_helpers_nodejs_dir/"bin"/"node" ow_deployer_binary_path = os.path.join(nodejs_local_dir, "node_modules", "openwhisk-composer", "bin", "deploy.js") os.system(f"{local_nodejs_binary_path} {ow_deployer_binary_path} {self.__openwhisk_workflow_orchestrator_action_name} {self.__openwhisk_composer_output_path} -w -i") + + # TODO: Update me to be picked from a config file along with action create calls above + os.system(f"wsk -i action update {self.__openwhisk_workflow_orchestrator_action_name} --timeout 300000 --concurrency 10") logger.info(":" * 30) logger.info("Deploying OpenWhisk orchestrator action: SUCCESS") From 5036fbc6f6d5f35710c20814f35ee65fa7d9b356 Mon Sep 17 00:00:00 2001 From: rajdeep1008 Date: Mon, 24 Jun 2024 14:48:22 +0530 Subject: [PATCH 08/18] Added redis credentials binding for orchestrator function --- serwo/openwhisk_create_statemachine.py | 57 ++++++++++++-------------- 1 file changed, 27 insertions(+), 30 deletions(-) diff --git a/serwo/openwhisk_create_statemachine.py b/serwo/openwhisk_create_statemachine.py index 85cfb4c..ae39d1c 100644 --- a/serwo/openwhisk_create_statemachine.py +++ b/serwo/openwhisk_create_statemachine.py @@ -39,9 +39,16 @@ def __init__(self, user_dir, dag_file_name, part_id): self.__openwhisk_helpers_dir = self.__openwhisk_build_dir / "helpers" self.__openwhisk_helpers_nodejs_dir = self.__openwhisk_helpers_dir / "local_nodejs" + # OpenWhisk configuration parameters with sensible defaults suggested in OW documentation + self.__action_namespace = os.environ.get("OW_ACTION_NS", "guest") + self.__action_concurrency = int(os.environ.get("OW_ACTION_CONCURRENCY", 10)) + self.__action_timeout = int(os.environ.get("OW_ACTION_TIMEOUT", 300000)) + self.__redis_url = os.environ.get("OW_REDIS_URL", "redis://owdev-redis.openwhisk.svc.cluster.local:6379") + self.__ignore_certs = bool(os.environ.get("OW_REDIS_IGNORE_CERTS", 1)) # 1/0 for True or False + # DAG related parameters self.__user_dag = UserDag(self.__dag_definition_path) - self.__openwhisk_workflow_orchestrator_action_name = f"/guest/{self.__user_dag.get_user_dag_name()}/orchestrator" + self.__openwhisk_workflow_orchestrator_action_name = f"/{self.__action_namespace}/{self.__user_dag.get_user_dag_name()}/orchestrator" self.__openwhisk_composer_input_path = self.__openwhisk_build_dir / f"{self.__user_dag.get_user_dag_name()}_workflow_composition.js" self.__openwhisk_composer_output_path = self.__openwhisk_build_dir / f"{self.__user_dag.get_user_dag_name()}_workflow_composition.json" self.__openwhisk_workflow_redis_input = self.__openwhisk_build_dir / f"{self.__user_dag.get_user_dag_name()}_workflow_input.json" # required to allow parallel action in OpenWhisk @@ -240,34 +247,10 @@ def __create_workflow_orchestrator(self, file_name_prefix): See: https://github.com/apache/openwhisk-composer """ composer_file_path = self.__openwhisk_build_dir / f"{file_name_prefix}_workflow_composition.js" - redis_input_file_path = self.__openwhisk_build_dir / f"{file_name_prefix}_workflow_input.json" # required to allow parallel action in OpenWhisk - # ------------------------------------------------------------------------------------------------ generated_code = self.__user_dag.get_orchestrator_code() - - # To allow parallel workflows and other stuff, a redis instance is needed - # with an input.json file with corresponding redis info - # TODO: Change me to be picked up from a config file - redis_input_file = """ -{ - "$composer": { - "redis": { - "uri": { - "url": "redis://owdev-redis.openwhisk.svc.cluster.local:6379" - } - }, - "openwhisk": { - "ignore_certs": true - } - } -} -""" - # ------------------------------------------------------------------------------------------------ with open(composer_file_path, "w") as f: f.write(generated_code) - - with open(redis_input_file_path, "w") as f: - f.write(redis_input_file) def build_resources(self): @@ -417,7 +400,7 @@ def deploy_workflow(self): # ----------------- Existing OpenWhisk components deletion ----------------- # TODO: Limit this step to current workflow name or some package name for node_name in self.__user_dag.get_node_object_map(): - curr_action_name = f"/guest/{self.__user_dag.get_user_dag_name()}/{node_name}" + curr_action_name = f"/{self.__action_namespace}/{self.__user_dag.get_user_dag_name()}/{node_name}" try: os.system(f"wsk -i action delete {curr_action_name}") except Exception as e: @@ -450,9 +433,9 @@ def deploy_workflow(self): logger.info(":" * 30) for func_name in self.__user_dag.get_node_object_map(): - action_name = f"/guest/{self.__user_dag.get_user_dag_name()}/{func_name}" + action_name = f"/{self.__action_namespace}/{self.__user_dag.get_user_dag_name()}/{func_name}" action_zip_path = self.__openwhisk_artifacts_dir / f"{func_name}.zip" - os.system(f"wsk -i action create {action_name} --kind python:3 {action_zip_path} --timeout 300000 --concurrency 10") + os.system(f"wsk -i action create {action_name} --kind python:3 {action_zip_path} --timeout {self.__action_timeout} --concurrency {self.__action_concurrency}") logger.info(":" * 30) logger.info("Deploying OpenWhisk action for each function: SUCCESS") @@ -467,8 +450,22 @@ def deploy_workflow(self): ow_deployer_binary_path = os.path.join(nodejs_local_dir, "node_modules", "openwhisk-composer", "bin", "deploy.js") os.system(f"{local_nodejs_binary_path} {ow_deployer_binary_path} {self.__openwhisk_workflow_orchestrator_action_name} {self.__openwhisk_composer_output_path} -w -i") - # TODO: Update me to be picked from a config file along with action create calls above - os.system(f"wsk -i action update {self.__openwhisk_workflow_orchestrator_action_name} --timeout 300000 --concurrency 10") + ignore_certs = "false" + if self.__ignore_certs: + ignore_certs = "true" + + # Hackish way to create this complex string input for orchestrator function + workflow_update_cmd = f"wsk -i action update {self.__openwhisk_workflow_orchestrator_action_name} --timeout {self.__action_timeout} --concurrency {self.__action_concurrency}" + workflow_update_cmd += "--param '$composer' '{" + workflow_update_cmd += '"redis":{"uri":{"url":"' + workflow_update_cmd += self.__redis_url + workflow_update_cmd += '"}' + workflow_update_cmd += '},"openwhisk":{"ignore_certs":' + workflow_update_cmd += ignore_certs + workflow_update_cmd += '}' + workflow_update_cmd += "}'" + + os.system(workflow_update_cmd) logger.info(":" * 30) logger.info("Deploying OpenWhisk orchestrator action: SUCCESS") From 38e7d9a6e0bf2bf4c41d93c3b00754bee3ad941e Mon Sep 17 00:00:00 2001 From: rajdeep1008 Date: Mon, 24 Jun 2024 14:53:38 +0530 Subject: [PATCH 09/18] Event hardcoding removed from the entrypoint function --- .../openwhisk/runner_template.py | 20 ------------------- 1 file changed, 20 deletions(-) diff --git a/serwo/python/src/runner-templates/openwhisk/runner_template.py b/serwo/python/src/runner-templates/openwhisk/runner_template.py index 2429fc4..99c96a9 100644 --- a/serwo/python/src/runner-templates/openwhisk/runner_template.py +++ b/serwo/python/src/runner-templates/openwhisk/runner_template.py @@ -22,26 +22,6 @@ def get_delta(timestamp): # Openwhisk entry point def main(event): - # TODO: - # 1. Check if this is correct approach for the entrypoint function of the workflow - # 2. Check if this caters the list type usecase too - # 3. uuid won't be the desired solution. Check if all this apply in private cloud's case or not - if event == None or len(event) == 0: - event = { - "workflow_instance_id": str(uuid.uuid4()), - "request_timestamp": round(time.time() * 1000), # This is most probably wrong - "session_id": str(uuid.uuid4()), - "deployment_id": str(uuid.uuid4()), - } - - # TODO: Check if this helps - print("*" * 10) - act_id = os.environ.get("__OW_ACTIVATION_ID", "lalala") - trans_id = os.environ.get("__OW_TRANSACTION_ID", "lalala") - print(act_id) - print(act_id) - print("*" * 10) - start_time = round(time.time() * 1000) # capturing input payload size input_payload_size_bytes = None From 52faeb656ccf8ffca88066ffb3ec3abc57506b47 Mon Sep 17 00:00:00 2001 From: rajdeep1008 Date: Mon, 24 Jun 2024 15:41:50 +0530 Subject: [PATCH 10/18] local nodejs version changes for binary --- .../src/runner-templates/openwhisk/local_nodejs_installer.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/serwo/python/src/runner-templates/openwhisk/local_nodejs_installer.sh b/serwo/python/src/runner-templates/openwhisk/local_nodejs_installer.sh index 0df5f7b..ada288f 100644 --- a/serwo/python/src/runner-templates/openwhisk/local_nodejs_installer.sh +++ b/serwo/python/src/runner-templates/openwhisk/local_nodejs_installer.sh @@ -1,4 +1,4 @@ NODEJS_DIR=$1 export N_PREFIX=$NODEJS_DIR -curl -fsSL https://raw.githubusercontent.com/tj/n/master/bin/n | bash -s lts +curl -fsSL https://raw.githubusercontent.com/tj/n/master/bin/n | bash -s 14.20.0 From a5a4d970a8fb01f13ae92766970f05c17a6d935e Mon Sep 17 00:00:00 2001 From: rajdeep1008 Date: Wed, 3 Jul 2024 11:43:40 +0530 Subject: [PATCH 11/18] Changelog: 1. Update the env variable keys for openwhisk defaults 2. Memory field from DAG is now being respected for action creation 3. Redis binding changes for orchestrator 4. Minor runner template changes --- serwo/openwhisk_create_statemachine.py | 41 ++++++++++++------- .../openwhisk/runner_template.py | 3 -- .../src/utils/classes/openwhisk/function.py | 3 ++ 3 files changed, 30 insertions(+), 17 deletions(-) diff --git a/serwo/openwhisk_create_statemachine.py b/serwo/openwhisk_create_statemachine.py index ae39d1c..63f7b3d 100644 --- a/serwo/openwhisk_create_statemachine.py +++ b/serwo/openwhisk_create_statemachine.py @@ -40,11 +40,12 @@ def __init__(self, user_dir, dag_file_name, part_id): self.__openwhisk_helpers_nodejs_dir = self.__openwhisk_helpers_dir / "local_nodejs" # OpenWhisk configuration parameters with sensible defaults suggested in OW documentation - self.__action_namespace = os.environ.get("OW_ACTION_NS", "guest") - self.__action_concurrency = int(os.environ.get("OW_ACTION_CONCURRENCY", 10)) - self.__action_timeout = int(os.environ.get("OW_ACTION_TIMEOUT", 300000)) - self.__redis_url = os.environ.get("OW_REDIS_URL", "redis://owdev-redis.openwhisk.svc.cluster.local:6379") - self.__ignore_certs = bool(os.environ.get("OW_REDIS_IGNORE_CERTS", 1)) # 1/0 for True or False + self.__action_namespace = os.environ.get("XFAAS_OW_ACTION_NS", "guest") + self.__action_concurrency = int(os.environ.get("XFAAS_OW_ACTION_CONCURRENCY", 10)) + self.__action_timeout = int(os.environ.get("XFAAS_OW_ACTION_TIMEOUT", 300000)) + self.__action_memory = int(os.environ.get("XFAAS_OW_ACTION_MEMORY", 256)) # This is only used for orchestrator action for now + self.__redis_url = os.environ.get("XFAAS_OW_REDIS_URL", "redis://owdev-redis.openwhisk.svc.cluster.local:6379") + self.__ignore_certs = bool(os.environ.get("XFAAS_OW_REDIS_IGNORE_CERTS", 1)) # 1/0 for True or False # DAG related parameters self.__user_dag = UserDag(self.__dag_definition_path) @@ -398,7 +399,6 @@ def deploy_workflow(self): logger.info(":" * 30) # ----------------- Existing OpenWhisk components deletion ----------------- - # TODO: Limit this step to current workflow name or some package name for node_name in self.__user_dag.get_node_object_map(): curr_action_name = f"/{self.__action_namespace}/{self.__user_dag.get_user_dag_name()}/{node_name}" try: @@ -435,7 +435,17 @@ def deploy_workflow(self): for func_name in self.__user_dag.get_node_object_map(): action_name = f"/{self.__action_namespace}/{self.__user_dag.get_user_dag_name()}/{func_name}" action_zip_path = self.__openwhisk_artifacts_dir / f"{func_name}.zip" - os.system(f"wsk -i action create {action_name} --kind python:3 {action_zip_path} --timeout {self.__action_timeout} --concurrency {self.__action_concurrency}") + + # This overrides the default configuration set from the environment at an action level from the provided DAG. + memory_in_mb = self.__user_dag.get_node_object_map()[func_name].get_memory() + + try: + os.system(f"wsk -i action create {action_name} --kind python:3 {action_zip_path} --timeout {self.__action_timeout} --concurrency {self.__action_concurrency} --memory {memory_in_mb}") + except Exception as e: + print("X" * 20) + print(f"Action creation command failed for {action_name}") + print(e) + print("X" * 20) logger.info(":" * 30) logger.info("Deploying OpenWhisk action for each function: SUCCESS") @@ -455,18 +465,21 @@ def deploy_workflow(self): ignore_certs = "true" # Hackish way to create this complex string input for orchestrator function - workflow_update_cmd = f"wsk -i action update {self.__openwhisk_workflow_orchestrator_action_name} --timeout {self.__action_timeout} --concurrency {self.__action_concurrency}" - workflow_update_cmd += "--param '$composer' '{" - workflow_update_cmd += '"redis":{"uri":{"url":"' + workflow_update_cmd = f"wsk -i action update {self.__openwhisk_workflow_orchestrator_action_name} --timeout {self.__action_timeout} --concurrency {self.__action_concurrency} --memory {self.__action_memory}" + workflow_update_cmd += " --param '$composer' '{" + workflow_update_cmd += '"redis":{"uri":"' workflow_update_cmd += self.__redis_url - workflow_update_cmd += '"}' - workflow_update_cmd += '},"openwhisk":{"ignore_certs":' + workflow_update_cmd += '"},"openwhisk":{"ignore_certs":' workflow_update_cmd += ignore_certs workflow_update_cmd += '}' workflow_update_cmd += "}'" - os.system(workflow_update_cmd) - + try: + os.system(workflow_update_cmd) + except Exception as e: + print("Workflow action updation command failed") + print(e) + logger.info(":" * 30) logger.info("Deploying OpenWhisk orchestrator action: SUCCESS") logger.info(":" * 30) diff --git a/serwo/python/src/runner-templates/openwhisk/runner_template.py b/serwo/python/src/runner-templates/openwhisk/runner_template.py index 99c96a9..452ca95 100644 --- a/serwo/python/src/runner-templates/openwhisk/runner_template.py +++ b/serwo/python/src/runner-templates/openwhisk/runner_template.py @@ -126,6 +126,3 @@ def main(event): return response return response - -if __name__ == "__main__": - print("Main Method: Nothing is executed") diff --git a/serwo/python/src/utils/classes/openwhisk/function.py b/serwo/python/src/utils/classes/openwhisk/function.py index fafb8bf..877a476 100644 --- a/serwo/python/src/utils/classes/openwhisk/function.py +++ b/serwo/python/src/utils/classes/openwhisk/function.py @@ -22,6 +22,9 @@ def get_path(self): def get_module_name(self): return self._module_name + + def get_memory(self): + return self._memory def get_id(self): return self._id From fdc80e5dd45a1c2756ffc354a443caa87ec555b4 Mon Sep 17 00:00:00 2001 From: rajdeep1008 Date: Wed, 3 Jul 2024 14:40:25 +0530 Subject: [PATCH 12/18] Added a basic resources output file for openwhisk to fix provenance file generation error. --- serwo/openwhisk_create_statemachine.py | 20 +++++++++++++++++++- serwo/xfaas_provenance.py | 13 ++++++++++--- 2 files changed, 29 insertions(+), 4 deletions(-) diff --git a/serwo/openwhisk_create_statemachine.py b/serwo/openwhisk_create_statemachine.py index 63f7b3d..8bf53d6 100644 --- a/serwo/openwhisk_create_statemachine.py +++ b/serwo/openwhisk_create_statemachine.py @@ -1,5 +1,6 @@ import os import glob +import json import shutil import zipfile import subprocess @@ -52,7 +53,6 @@ def __init__(self, user_dir, dag_file_name, part_id): self.__openwhisk_workflow_orchestrator_action_name = f"/{self.__action_namespace}/{self.__user_dag.get_user_dag_name()}/orchestrator" self.__openwhisk_composer_input_path = self.__openwhisk_build_dir / f"{self.__user_dag.get_user_dag_name()}_workflow_composition.js" self.__openwhisk_composer_output_path = self.__openwhisk_build_dir / f"{self.__user_dag.get_user_dag_name()}_workflow_composition.json" - self.__openwhisk_workflow_redis_input = self.__openwhisk_build_dir / f"{self.__user_dag.get_user_dag_name()}_workflow_input.json" # required to allow parallel action in OpenWhisk def __create_environment(self): @@ -76,6 +76,7 @@ def __create_environment(self): self.__openwhisk_artifacts_dir.mkdir(parents=True, exist_ok=True) self.__openwhisk_helpers_dir.mkdir(parents=True, exist_ok=True) self.__openwhisk_helpers_nodejs_dir.mkdir(parents=True, exist_ok=True) + self.__serwo_resources_dir.mkdir(parents=True, exist_ok=True) # Copying the bash script to install a local nodejs copy (used by openwhisk-composer) shutil.copyfile(src=self.__runner_template_dir/"local_nodejs_installer.sh", dst=self.__openwhisk_helpers_dir/"local_nodejs_installer.sh") @@ -483,4 +484,21 @@ def deploy_workflow(self): logger.info(":" * 30) logger.info("Deploying OpenWhisk orchestrator action: SUCCESS") logger.info(":" * 30) + + logger.info(":" * 30) + logger.info("Writing output json resource file") + logger.info(":" * 30) + + output_resource_file_path = self.__serwo_resources_dir / f"openwhisk-{self.__part_id}.json" + with open(output_resource_file_path, "w") as f: + output = { + "WorkflowEntrypoint": self.__openwhisk_workflow_orchestrator_action_name, + "Decription": f"Run 'wsk -i action invoke {self.__openwhisk_workflow_orchestrator_action_name}' to start the workflow" + } + + f.write(json.dumps(output)) + + logger.info(":" * 30) + logger.info("Writing output json resource file: SUCCESS") + logger.info(":" * 30) # ------------------------------------------------------------------------- diff --git a/serwo/xfaas_provenance.py b/serwo/xfaas_provenance.py index 5f2c6ec..b48e39f 100644 --- a/serwo/xfaas_provenance.py +++ b/serwo/xfaas_provenance.py @@ -108,9 +108,14 @@ def generate_provenance_artifacts(user_dir, wf_id, refactored_wf_id, wf_deployme resources_dir = pathlib.Path.joinpath( pathlib.Path(user_dir), "build/workflow/resources" ) - resouces_file = f'{resources_dir}/{csp}-{region}-{part_id}.json' - ##load json from file - with open(resouces_file) as f: + + if csp == 'openwhisk': + resources_file = f'{resources_dir}/{csp}-{part_id}.json' + else: + resources_file = f'{resources_dir}/{csp}-{region}-{part_id}.json' + + # load json from file + with open(resources_file) as f: resources = json.load(f) if csp == 'aws': @@ -119,6 +124,8 @@ def generate_provenance_artifacts(user_dir, wf_id, refactored_wf_id, wf_deployme app_name = r['OutputValue'] elif csp == 'azure' or csp == 'azure_v2': app_name = resources['group'] + elif csp == 'openwhisk': + app_name = resources['WorkflowEntrypoint'] if queue_details is None: raise Exception("Queue details not found, Try a Fresh Deployment by clearing CollectLogs") From 4336a1c645809d226d9d78122ed7617bda9fb72d Mon Sep 17 00:00:00 2001 From: rajdeep1008 Date: Wed, 3 Jul 2024 14:55:54 +0530 Subject: [PATCH 13/18] Indentation for resources output file --- serwo/openwhisk_create_statemachine.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/serwo/openwhisk_create_statemachine.py b/serwo/openwhisk_create_statemachine.py index 8bf53d6..c4f70ff 100644 --- a/serwo/openwhisk_create_statemachine.py +++ b/serwo/openwhisk_create_statemachine.py @@ -496,7 +496,7 @@ def deploy_workflow(self): "Decription": f"Run 'wsk -i action invoke {self.__openwhisk_workflow_orchestrator_action_name}' to start the workflow" } - f.write(json.dumps(output)) + f.write(json.dumps(output, indent=4)) logger.info(":" * 30) logger.info("Writing output json resource file: SUCCESS") From 7c6b9fbd380fad8402e329590b4b8ac6ccf9e1c8 Mon Sep 17 00:00:00 2001 From: rajdeep1008 Date: Wed, 3 Jul 2024 17:38:13 +0530 Subject: [PATCH 14/18] Changelog: 1. Default concurrency for OW to 1. This fixes some issues, but still need to understand the cost to be paid for this. 2. Support for fan-ins in the OW runner template --- serwo/openwhisk_create_statemachine.py | 2 +- .../runner-templates/openwhisk/runner_template.py | 15 +++++++++++++-- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/serwo/openwhisk_create_statemachine.py b/serwo/openwhisk_create_statemachine.py index c4f70ff..fd21bb2 100644 --- a/serwo/openwhisk_create_statemachine.py +++ b/serwo/openwhisk_create_statemachine.py @@ -42,7 +42,7 @@ def __init__(self, user_dir, dag_file_name, part_id): # OpenWhisk configuration parameters with sensible defaults suggested in OW documentation self.__action_namespace = os.environ.get("XFAAS_OW_ACTION_NS", "guest") - self.__action_concurrency = int(os.environ.get("XFAAS_OW_ACTION_CONCURRENCY", 10)) + self.__action_concurrency = int(os.environ.get("XFAAS_OW_ACTION_CONCURRENCY", 1)) self.__action_timeout = int(os.environ.get("XFAAS_OW_ACTION_TIMEOUT", 300000)) self.__action_memory = int(os.environ.get("XFAAS_OW_ACTION_MEMORY", 256)) # This is only used for orchestrator action for now self.__redis_url = os.environ.get("XFAAS_OW_REDIS_URL", "redis://owdev-redis.openwhisk.svc.cluster.local:6379") diff --git a/serwo/python/src/runner-templates/openwhisk/runner_template.py b/serwo/python/src/runner-templates/openwhisk/runner_template.py index 452ca95..e0a8530 100644 --- a/serwo/python/src/runner-templates/openwhisk/runner_template.py +++ b/serwo/python/src/runner-templates/openwhisk/runner_template.py @@ -22,13 +22,24 @@ def get_delta(timestamp): # Openwhisk entry point def main(event): + # For the first function of the workflow iff it is empty + if event is None or len(event) == 0: + event = { + "workflow_instance_id": str(uuid.uuid4()), + "request_timestamp": round(time.time() * 1000), + "session_id": str(uuid.uuid4()), + "deployment_id": str(uuid.uuid4()), + } + start_time = round(time.time() * 1000) # capturing input payload size input_payload_size_bytes = None - if isinstance(event, list): + # Case of a fan-in from multiple actions. + # Note: Sending 'value' at the first level can cause an edge case bug + if "value" in event and isinstance(event["value"], list): # TODO: exception handling - serwo_request_object = build_serwo_list_object(event) + serwo_request_object = build_serwo_list_object(event["value"]) # Calculate input payload size input_payload_size_bytes = sum([objsize.get_deep_size(x.get_body()) for x in serwo_request_object.get_objects()]) From 27a40495d71feede0d39a121e0eae4f88c986883 Mon Sep 17 00:00:00 2001 From: rajdeep1008 Date: Fri, 5 Jul 2024 18:03:43 +0530 Subject: [PATCH 15/18] Support for dependencies folder --- serwo/openwhisk_create_statemachine.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/serwo/openwhisk_create_statemachine.py b/serwo/openwhisk_create_statemachine.py index fd21bb2..d1fa193 100644 --- a/serwo/openwhisk_create_statemachine.py +++ b/serwo/openwhisk_create_statemachine.py @@ -160,12 +160,12 @@ def __create_openwhisk_actions( self.__append_xfaas_default_requirements(dst_requirements_path) # place the dependencies folder from the user function path if it exists - # if os.path.exists(user_fn_path / "dependencies"): - # shutil.copytree( - # user_fn_path / "dependencies", - # dst_fn_dir / "dependencies", - # dirs_exist_ok=True - # ) + if os.path.exists(user_fn_path / "dependencies"): + shutil.copytree( + user_fn_path / "dependencies", + dst_fn_dir / "dependencies", + dirs_exist_ok=True + ) logger.info(f"Moving xfaas boilerplate for {fn_name}") shutil.copytree(src=self.__serwo_utils_dir, dst=dst_fn_dir / "python", dirs_exist_ok=True) @@ -315,6 +315,11 @@ def build_workflow(self): path = file.split("python/")[1] archive.write(filename=file, arcname=os.path.join("python", path)) + dependencies_folder_path = str(self.__openwhisk_functions_dir/func_name/"dependencies"/"**"/"*") + for file in glob.glob(dependencies_folder_path, recursive=True): + path = file.split("dependencies/")[1] + archive.write(filename=file, arcname=os.path.join("dependencies", path)) + logger.info(":" * 30) logger.info("Installing nodejs and openwhisk-composer") logger.info(":" * 30) From b44e879dbd8744edf375b66d2bea705b35823ee3 Mon Sep 17 00:00:00 2001 From: rajdeep1008 Date: Tue, 23 Jul 2024 15:09:54 +0530 Subject: [PATCH 16/18] Updated openwhisk resource file generation --- serwo/openwhisk_create_statemachine.py | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/serwo/openwhisk_create_statemachine.py b/serwo/openwhisk_create_statemachine.py index d1fa193..b468744 100644 --- a/serwo/openwhisk_create_statemachine.py +++ b/serwo/openwhisk_create_statemachine.py @@ -493,12 +493,26 @@ def deploy_workflow(self): logger.info(":" * 30) logger.info("Writing output json resource file") logger.info(":" * 30) + + wsk_host_output = subprocess.run(["wsk", "property", "get", "--apihost"], capture_output=True) + wsk_auth_output = subprocess.run(["wsk", "property", "get", "--auth"], capture_output=True) + + if wsk_host_output.returncode != 0 or wsk_auth_output.returncode != 0: + print(" X " * 30) + print("Failed to get host and auth from 'wsk' client for the resources") + print(" X " * 30) + + wsk_host = wsk_host_output.stdout.decode("utf-8").strip().split()[-1].strip() + execution_auth = wsk_auth_output.stdout.decode("utf-8").strip().split()[-1].strip() + execution_url = f"https://{wsk_host}/api/v1/namespaces/{self.__action_namespace}/actions/{self.__user_dag.get_user_dag_name()}/orchestrator?blocking=false" - output_resource_file_path = self.__serwo_resources_dir / f"openwhisk-{self.__part_id}.json" + output_resource_file_path = self.__serwo_resources_dir / f"openwhisk-{self.__region}-{self.__part_id}.json" with open(output_resource_file_path, "w") as f: output = { "WorkflowEntrypoint": self.__openwhisk_workflow_orchestrator_action_name, - "Decription": f"Run 'wsk -i action invoke {self.__openwhisk_workflow_orchestrator_action_name}' to start the workflow" + "Decription": f"Run 'wsk -i action invoke {self.__openwhisk_workflow_orchestrator_action_name}' to start the workflow", + "ExecutionUrl": execution_url, + "ExecutionAuth": execution_auth, } f.write(json.dumps(output, indent=4)) From 7e9c10d196d26cee3e93055b74754eb72916169a Mon Sep 17 00:00:00 2001 From: rajdeep1008 Date: Tue, 30 Jul 2024 14:56:31 +0530 Subject: [PATCH 17/18] Fixes for openwhisk jmeter code and template --- .../openwhisk_jmx_template.jmx | 167 ++++++++++++++++++ serwo/xfaas_run_benchmark.py | 80 +++++++-- 2 files changed, 232 insertions(+), 15 deletions(-) create mode 100644 serwo/benchmark_resources/openwhisk_jmx_template.jmx diff --git a/serwo/benchmark_resources/openwhisk_jmx_template.jmx b/serwo/benchmark_resources/openwhisk_jmx_template.jmx new file mode 100644 index 0000000..61e4b1f --- /dev/null +++ b/serwo/benchmark_resources/openwhisk_jmx_template.jmx @@ -0,0 +1,167 @@ + + + + + + false + true + false + + + + + + + + continue + + false + -1 + + 2 + 1 + true + DURATION + + true + + + + 1 + + throughput + RPS + 0.0 + + + + + true + + + + false + + { + "workflow_instance_id": ${__counter(FALSE)}, + "request_timestamp": ${__time(,)}, + "session_id": "SESSION", + "deployment_id": "DEPLOYMENT_ID", + OPENWHISK_PAYLOAD_TO_REPLACE + } + + = + + + + + + https + + URL + POST + true + false + true + false + + + + + + + + + content-type + application/json + + + accept + application/json + + + Authorization + Basic BASE64_AUTH + + + + + + + false + + saveConfig + + + true + true + true + + true + true + true + true + false + true + true + false + false + false + true + false + false + false + true + 0 + true + true + true + true + true + true + + + openwhisk_SESSION.csv + + + + + false + + saveConfig + + + true + true + true + + true + true + true + true + false + true + true + false + false + false + true + false + false + false + true + 0 + true + true + true + true + true + true + + + + + + + + \ No newline at end of file diff --git a/serwo/xfaas_run_benchmark.py b/serwo/xfaas_run_benchmark.py index 818f27e..9d6eea2 100644 --- a/serwo/xfaas_run_benchmark.py +++ b/serwo/xfaas_run_benchmark.py @@ -6,10 +6,12 @@ import shutil import sys import time +import base64 from datetime import datetime from xfaas_main import run as xfaas_deployer import time from xfbench_plotter import XFBenchPlotter + parser = argparse.ArgumentParser( prog="ProgramName", description="What the program does", @@ -41,6 +43,11 @@ server_ip = None server_user_id = None server_pem_file_path = None + +shell_script_commands = [] +aws_shell_script_commands = [] + + def get_client_login_details(config_path): global server_ip, server_user_id, server_pem_file_path with open(config_path) as f: @@ -51,9 +58,6 @@ def get_client_login_details(config_path): if 'server_pem_file_path' in data: server_pem_file_path = data['server_pem_file_path'] -shell_script_commands = [] -aws_shell_script_commands = [] - def get_aws_payload(payload): payload = json.dumps(payload) @@ -61,12 +65,19 @@ def get_aws_payload(payload): payload = payload[1:-1] return payload + def get_azure_payload(payload): payload = json.dumps(payload) payload = payload.replace('"', '"') return payload +def get_openwhisk_payload(payload): + payload = json.dumps(payload) + payload = payload.replace('"', '"') + payload = payload[1:-1] + return payload + def read_dynamism_file(dynamism,duration, max_rps): file_path = os.getenv("XFBENCH_DIR") + f"/workloads/{dynamism}-{max_rps}-{duration}.csv" @@ -107,6 +118,13 @@ def get_aws_resources(csp,region,part_id,wf_user_directory): return execute_url, state_machine_arn +def get_openwhisk_resources(csp, region, part_id, wf_user_directory): + resources = read_resources(csp, region, part_id, wf_user_directory) + execution_url = resources["ExecutionUrl"] + execution_auth_creds = resources["ExecutionAuth"] + return execution_url, execution_auth_creds + + def template_azure_jmx_file(rps, duration, execute_url, payload_size, input_jmx, output_path, session_id,payload): rps_keyword = "RPS" execute_url_keyword = "URL" @@ -167,17 +185,49 @@ def template_aws_jmx_file(rps, duration, execute_url, state_machine_arn, payload f.write(data) -def make_jmx_file(csp, rps, duration, payload_size, wf_name, execute_url,state_machine_arn, dynamism, session_id, wf_user_directory, part_id, region, wf_deployment_id,run_id, payload,is_localhost): +def template_openwhisk_jmx_file(rps, duration, execute_url, execute_auth_base64, input_jmx, output_path, session_id, payload): + global deployment_id + + with open(input_jmx) as f: + data = f.read() + + data = data.replace("DURATION", str(int(duration))) + data = data.replace("RPS", str(rps)) + data = data.replace("OPENWHISK_PAYLOAD_TO_REPLACE", get_openwhisk_payload(payload)) + data = data.replace("URL", execute_url) + data = data.replace("SESSION", str(session_id)) + data = data.replace("DEPLOYMENT_ID", deployment_id) + data = data.replace("BASE64_AUTH", execute_auth_base64) + + to_replace = '"ThreadGroup.num_threads">2' + if rps == 1020.0 or rps == 840.0: + data = data.replace(to_replace, f'"ThreadGroup.num_threads">17') + elif rps == 1920.0 or rps == 3840.0 or rps == 7680.0: + data = data.replace(to_replace, f'"ThreadGroup.num_threads">64') + + with open(output_path, "w") as f: + f.write(data) + + +def make_jmx_file(csp, rps, duration, payload_size, wf_name, execute_url, state_machine_arn, dynamism, session_id, wf_user_directory, part_id, region, wf_deployment_id, run_id, payload, is_localhost, execute_auth_creds): + """ + Changelog: + Added encoded_auth: It will be not None only for openwhisk + """ jmx_template_path, jmx_output_path,jmx_output_filename = get_jmx_paths(csp, rps, duration, payload_size, wf_name, dynamism,session_id,region) + if 'azure' in csp: template_azure_jmx_file(rps, duration, execute_url, payload_size, jmx_template_path, jmx_output_path, session_id,payload) + elif 'openwhisk' in csp: + encoded_auth = base64.b64encode(execute_auth_creds.encode()).decode('utf-8') + template_openwhisk_jmx_file(rps, duration, execute_url, encoded_auth, jmx_template_path, jmx_output_path, session_id, payload) else: template_aws_jmx_file(rps, duration, execute_url, state_machine_arn, payload_size, jmx_template_path, jmx_output_path, session_id,payload) + send_jmx_file_to_server(jmx_output_path,jmx_output_filename,rps,duration,is_localhost) dump_experiment_conf(jmx_output_filename, csp, rps, duration, payload_size, wf_name, dynamism, session_id, wf_user_directory, part_id, region, wf_deployment_id,run_id) - def dump_experiment_conf(jmx_output_filename, csp, rps, duration, payload_size, wf_name, dynamism, session_id, wf_user_directory, part_id, region,wf_deployment_id,run_id): provenance_artefacts_updated_path = f"{wf_user_directory}/{wf_deployment_id}/{run_id}/{artifact_suffix}" @@ -280,9 +330,8 @@ def load_payload(wf_user_directory,payload_size): return payload -def run_workload(csp,region,part_id,max_rps,duration,payload_size,dynamism,wf_name, wf_user_directory, wf_deployment_id,run_id, is_localhost): - - copy_provenance_artifacts(csp, region, part_id, wf_user_directory, wf_deployment_id,max_rps,run_id) +def run_workload(csp, region, part_id, max_rps, duration, payload_size, dynamism, wf_name, wf_user_directory, wf_deployment_id, run_id, is_localhost): + copy_provenance_artifacts(csp, region, part_id, wf_user_directory, wf_deployment_id, max_rps,run_id) dynamism_data = read_dynamism_file(dynamism, duration, max_rps) if 'azure' in csp: @@ -290,7 +339,9 @@ def run_workload(csp,region,part_id,max_rps,duration,payload_size,dynamism,wf_na state_machine_arn = '' elif csp == 'aws': execute_url, state_machine_arn = get_aws_resources(csp,region,part_id,wf_user_directory) - + elif csp == 'openwhisk': + execute_url, execute_auth_creds = get_openwhisk_resources(csp, region, part_id, wf_user_directory) + state_machine_arn = '' dynamism_updated = dynamism @@ -314,24 +365,23 @@ def run_workload(csp,region,part_id,max_rps,duration,payload_size,dynamism,wf_na # payload = load_payload(wf_user_directory,payload_size) ne_session_id = session_id + str(i) - make_jmx_file(csp, rps * 60.0, duration, payload_size, wf_name, execute_url,state_machine_arn, dynamism, ne_session_id, wf_user_directory, part_id, region , wf_deployment_id, run_id,payload,is_localhost) + make_jmx_file(csp, rps * 60.0, duration, payload_size, wf_name, execute_url, state_machine_arn, dynamism, ne_session_id, wf_user_directory, part_id, region , wf_deployment_id, run_id, payload, is_localhost, execute_auth_creds) i += 1 - generate_shell_script_and_scp(csp,payload_size, wf_name, max_rps, duration,dynamism,region,is_localhost) - + + generate_shell_script_and_scp(csp, payload_size, wf_name, max_rps, duration, dynamism, region, is_localhost) def copy_provenance_artifacts(csp, region, part_id, wf_user_directory,wf_deployment_id,rps,run_id): - global deployment_id os.makedirs(f"{wf_user_directory}/{wf_deployment_id}/{run_id}", exist_ok=True) provenance_artefacts_path = f"{wf_user_directory}/build/workflow/resources/provenance-artifacts-{csp}-{region}-{part_id}.json" + with open(provenance_artefacts_path) as f: provenance_artifact = json.load(f) - deployment_id = provenance_artifact['deployment_id'] + deployment_id = provenance_artifact['deployment_id'] provenance_artefacts_updated_path = f"{wf_user_directory}/{wf_deployment_id}/{run_id}/{artifact_suffix}" - shutil.copyfile(provenance_artefacts_path, provenance_artefacts_updated_path) From e07da9a5788093bf421d56ac0da47b05b6bf5f09 Mon Sep 17 00:00:00 2001 From: rajdeep1008 Date: Tue, 30 Jul 2024 18:47:43 +0530 Subject: [PATCH 18/18] Changed jmeter responses path to be configurable for openwhisk. Currently uses experiment output path. --- serwo/benchmark_resources/openwhisk_jmx_template.jmx | 2 +- serwo/xfaas_run_benchmark.py | 12 ++++++++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/serwo/benchmark_resources/openwhisk_jmx_template.jmx b/serwo/benchmark_resources/openwhisk_jmx_template.jmx index 61e4b1f..49b02a1 100644 --- a/serwo/benchmark_resources/openwhisk_jmx_template.jmx +++ b/serwo/benchmark_resources/openwhisk_jmx_template.jmx @@ -121,7 +121,7 @@ true - openwhisk_SESSION.csv + JMETER_RESPONSES_PATH diff --git a/serwo/xfaas_run_benchmark.py b/serwo/xfaas_run_benchmark.py index 9d6eea2..bde1a93 100644 --- a/serwo/xfaas_run_benchmark.py +++ b/serwo/xfaas_run_benchmark.py @@ -209,6 +209,17 @@ def template_openwhisk_jmx_file(rps, duration, execute_url, execute_auth_base64, f.write(data) +def add_jmeter_openwhisk_response_path(jmx_file_path, wf_user_directory, wf_deployment_id, run_id): + with open(jmx_file_path) as f: + data = f.read() + + jmeter_responses_file_path = f"{wf_user_directory}/{wf_deployment_id}/{run_id}/jmeter_responses.csv" + data = data.replace("JMETER_RESPONSES_PATH", jmeter_responses_file_path) + + with open(jmx_file_path, "w") as f: + f.write(data) + + def make_jmx_file(csp, rps, duration, payload_size, wf_name, execute_url, state_machine_arn, dynamism, session_id, wf_user_directory, part_id, region, wf_deployment_id, run_id, payload, is_localhost, execute_auth_creds): """ Changelog: @@ -221,6 +232,7 @@ def make_jmx_file(csp, rps, duration, payload_size, wf_name, execute_url, state_ elif 'openwhisk' in csp: encoded_auth = base64.b64encode(execute_auth_creds.encode()).decode('utf-8') template_openwhisk_jmx_file(rps, duration, execute_url, encoded_auth, jmx_template_path, jmx_output_path, session_id, payload) + add_jmeter_openwhisk_response_path(jmx_output_path, wf_user_directory, wf_deployment_id, run_id) else: template_aws_jmx_file(rps, duration, execute_url, state_machine_arn, payload_size, jmx_template_path, jmx_output_path, session_id,payload)