diff --git a/.gitignore b/.gitignore index a94c5af..67fbd86 100644 --- a/.gitignore +++ b/.gitignore @@ -45,4 +45,8 @@ serwo/templates/azure/predefined-functions/Orchestrate/__init__.py serwo/templates/azure/predefined-functions/QueueTrigger/__init__.py serwo/benchmark_resources/aws-az-mmc.jmx serwo/benchmark_resources/az-aws-mmc.jmx -*.tgz \ No newline at end of file +apache-jmeter-5.5/ +*.tgz +*.sh +*.docx +Untitled-1.md \ No newline at end of file diff --git a/dag-benchmark-generator-with-datatransfer.py b/dag-benchmark-generator-with-datatransfer.py new file mode 100644 index 0000000..78ecc30 --- /dev/null +++ b/dag-benchmark-generator-with-datatransfer.py @@ -0,0 +1,128 @@ +import json +import statistics +import argparse + + +def process(aws, azure, dag): + nodes = {} + edges = {} + + for node in dag['Nodes']: + nid = node['NodeId'] + nodes[nid] = { + "0": calc_nodes(aws, nid), # AWS + "1": calc_nodes(azure, nid) # Azure + } + + for edge in dag['Edges']: + for k, v in edge.items(): + source_id = get_id(dag, k) + for target_node in v: + target_id = get_id(dag, target_node) + if source_id is None or target_id is None: + continue + if source_id == target_id: + continue + + if source_id not in edges: + edges[source_id] = {} + + # Latencies: keep existing per-cloud diagonals (placeholders for cross-cloud as in original) + aws_lat = calc_edges(aws, source_id, target_id) + az_lat = calc_edges(azure, source_id, target_id) + + # Data transfer sizes (bytes): combine AWS+Azure runs and take median of per-invocation max(out,in) + aws_pairs = transfer_pairs(aws, source_id, target_id) # list of per-invocation sizes + az_pairs = transfer_pairs(azure, source_id, target_id) + combined_pairs = aws_pairs + az_pairs + data_transfer_size = statistics.median(combined_pairs) if combined_pairs else 0 + + edges[source_id][target_id] = { + "DataTransferSize": data_transfer_size, # bytes (median across AWS+Azure runs) + "Latencies": [ + [aws_lat, 0], # [AWS->AWS, AWS->Azure] (cross left as in original) + [0, az_lat] # [Azure->AWS, Azure->Azure] + ] + } + + return {"NodeBenchmarks": nodes, "EdgeBenchmarks": edges} + + +def calc_nodes(data, nid): + latencies = [] + for entry in data: + funcs = entry.get('functions', {}) + if nid in funcs: + meta = funcs[nid] + lat = meta.get('end_delta', 0) - meta.get('start_delta', 0) + latencies.append(lat) + median = statistics.median(latencies) if latencies else 0 + return {"Latency": median, "Cost": 0} + + +def calc_edges(data, source_id, target_id): + latencies = [] + for entry in data: + funcs = entry.get('functions', {}) + if source_id in funcs and target_id in funcs: + s = funcs[source_id] + t = funcs[target_id] + latency = t.get('start_delta', 0) - s.get('end_delta', 0) + latencies.append(latency) + return statistics.median(latencies) if latencies else 0 + + +def transfer_pairs(data, source_id, target_id): + """ + Returns a list of per-invocation transfer sizes (bytes) for source->target, + using max(out_payload_bytes, in_payload_bytes) to avoid undercounting. + """ + sizes = [] + for entry in data: + funcs = entry.get('functions', {}) + if source_id in funcs and target_id in funcs: + s = funcs[source_id] + t = funcs[target_id] + out_b = s.get('out_payload_bytes', 0) or 0 + in_b = t.get('in_payload_bytes', 0) or 0 + sizes.append(max(out_b, in_b)) + return sizes + + +def get_id(dag, node_name): + for node in dag['Nodes']: + if node['NodeName'] == node_name or node['NodeId'] == node_name: + return node['NodeId'] + return None + + +def process_jsonl(file_path): + with open(file_path, 'r') as file: + return [json.loads(line) for line in file] + + +def main(): + parser = argparse.ArgumentParser(description='dag benchmark') + parser.add_argument('--aws', type=str, help='aws absolute path') + parser.add_argument('--azure', type=str, help='azure absolute path') + parser.add_argument('--dag', type=str, help='dag absolute path') + parser.add_argument('--output', type=str, help='output file name', default="dag-benchmark.json") + + args = parser.parse_args() + + aws = process_jsonl(args.aws) if args.aws else [] + azure = process_jsonl(args.azure) if args.azure else [] + with open(args.dag, 'r') as file: + dag = json.load(file) + + benchmarks = process(aws, azure, dag) + + with open(args.output, 'w') as file: + json.dump(benchmarks, file, indent=2) + + print(f"data saved - {args.output}") + + +if __name__ == "__main__": + main() + diff --git a/dag-benchmark-v2.json b/dag-benchmark-v2.json new file mode 100644 index 0000000..2a69245 --- /dev/null +++ b/dag-benchmark-v2.json @@ -0,0 +1,179 @@ +{ + "NodeBenchmarks": { + "1": { + "0": { + "Latency": 512, + "Cost": 0 + }, + "1": { + "Latency": 29990.0, + "Cost": 0 + } + }, + "2": { + "0": { + "Latency": 20028, + "Cost": 0 + }, + "1": { + "Latency": 237520.5, + "Cost": 0 + } + }, + "3": { + "0": { + "Latency": 20232, + "Cost": 0 + }, + "1": { + "Latency": 244109.5, + "Cost": 0 + } + }, + "4": { + "0": { + "Latency": 32596, + "Cost": 0 + }, + "1": { + "Latency": 284497.5, + "Cost": 0 + } + }, + "5": { + "0": { + "Latency": 31863, + "Cost": 0 + }, + "1": { + "Latency": 283017.0, + "Cost": 0 + } + }, + "6": { + "0": { + "Latency": 1, + "Cost": 0 + }, + "1": { + "Latency": 123.0, + "Cost": 0 + } + }, + "7": { + "0": { + "Latency": 127, + "Cost": 0 + }, + "1": { + "Latency": 31308.0, + "Cost": 0 + } + } + }, + "EdgeBenchmarks": { + "1": { + "2": { + "DataTransferSize": 16579, + "Latencies": [ + [ + 85, + 0 + ], + [ + 0, + 978488.5 + ] + ] + }, + "3": { + "DataTransferSize": 16579, + "Latencies": [ + [ + 85, + 0 + ], + [ + 0, + 1387289.0 + ] + ] + } + }, + "2": { + "4": { + "DataTransferSize": 28281, + "Latencies": [ + [ + 89, + 0 + ], + [ + 0, + 529383.5 + ] + ] + } + }, + "3": { + "5": { + "DataTransferSize": 28377, + "Latencies": [ + [ + 89, + 0 + ], + [ + 0, + 93317.0 + ] + ] + } + }, + "4": { + "6": { + "DataTransferSize": 43382, + "Latencies": [ + [ + 125, + 0 + ], + [ + 0, + 47225.5 + ] + ] + } + }, + "5": { + "6": { + "DataTransferSize": 43382, + "Latencies": [ + [ + 554, + 0 + ], + [ + 0, + 33583.0 + ] + ] + } + }, + "6": { + "7": { + "DataTransferSize": 43470, + "Latencies": [ + [ + 91, + 0 + ], + [ + 0, + 156913.5 + ] + ] + } + } + } +} \ No newline at end of file diff --git a/dag-benchmark_old.json b/dag-benchmark_old.json new file mode 100644 index 0000000..be62a1a --- /dev/null +++ b/dag-benchmark_old.json @@ -0,0 +1,179 @@ +{ + "NodeBenchmarks": { + "1": { + "0": { + "Latency": 512, + "Cost": 0 + }, + "1": { + "Latency": 802, + "Cost": 0 + } + }, + "2": { + "0": { + "Latency": 20028, + "Cost": 0 + }, + "1": { + "Latency": 14475, + "Cost": 0 + } + }, + "3": { + "0": { + "Latency": 20232, + "Cost": 0 + }, + "1": { + "Latency": 14054, + "Cost": 0 + } + }, + "4": { + "0": { + "Latency": 32596, + "Cost": 0 + }, + "1": { + "Latency": 47324, + "Cost": 0 + } + }, + "5": { + "0": { + "Latency": 31863, + "Cost": 0 + }, + "1": { + "Latency": 46279, + "Cost": 0 + } + }, + "6": { + "0": { + "Latency": 1, + "Cost": 0 + }, + "1": { + "Latency": 10, + "Cost": 0 + } + }, + "7": { + "0": { + "Latency": 127, + "Cost": 0 + }, + "1": { + "Latency": 361, + "Cost": 0 + } + } + }, + "EdgeBenchmarks": { + "1": { + "2": { + "DataTransferSize": 16599.0, + "Latencies": [ + [ + 85, + 0 + ], + [ + 0, + 1175448 + ] + ] + }, + "3": { + "DataTransferSize": 16599.0, + "Latencies": [ + [ + 85, + 0 + ], + [ + 0, + 490988 + ] + ] + } + }, + "2": { + "4": { + "DataTransferSize": 28479.0, + "Latencies": [ + [ + 89, + 0 + ], + [ + 0, + 821818 + ] + ] + } + }, + "3": { + "5": { + "DataTransferSize": 28589.0, + "Latencies": [ + [ + 89, + 0 + ], + [ + 0, + 1605943 + ] + ] + } + }, + "4": { + "6": { + "DataTransferSize": 43462.0, + "Latencies": [ + [ + 125, + 0 + ], + [ + 0, + 663975 + ] + ] + } + }, + "5": { + "6": { + "DataTransferSize": 43462.0, + "Latencies": [ + [ + 554, + 0 + ], + [ + 0, + 652349 + ] + ] + } + }, + "6": { + "7": { + "DataTransferSize": 43550.0, + "Latencies": [ + [ + 91, + 0 + ], + [ + 0, + 81610 + ] + ] + } + } + } +} \ No newline at end of file diff --git a/dag-intercloud-latency-benchmark-generator.py b/dag-intercloud-latency-benchmark-generator.py new file mode 100644 index 0000000..7c22444 --- /dev/null +++ b/dag-intercloud-latency-benchmark-generator.py @@ -0,0 +1,42 @@ +import json +from statistics import median + +#Az -> AWS +"""/Users/vaibhavjha/Documents/IISc/xfbench_multicloud/XFaaS/serwo/examples/static-fanout-comms/workflow-gen/70fc9eff-0c32-4456-8f0b-8d8f016daffb/exp1/logs/staticfanout_comms_azure_static_aws_dyndb_items.jsonl""" + +#Aws -> Az +"""/Users/vaibhavjha/Documents/IISc/xfbench_multicloud/XFaaS/serwo/examples/static-fanout-comms/workflow-gen/baac0abc-f3d2-42db-9508-e79bd1c515b9/exp1/logs/staticfanout_comms_aws_static_aws_dyndb_items.jsonl""" + +path_to_intercloud_benchmark = "/Users/vaibhavjha/Documents/IISc/xfbench_multicloud/XFaaS/serwo/examples/static-fanout-comms/workflow-gen/70fc9eff-0c32-4456-8f0b-8d8f016daffb/exp1/logs/staticfanout_comms_azure_static_aws_dyndb_items.jsonl" +# Load the data from the attached file +with open(path_to_intercloud_benchmark, 'r') as f: + lines = f.readlines() + +# Parse each JSON line and extract timing differences +differences = [] + +for line in lines: + # Parse the JSON record + record = json.loads(line) + functions = record.get('functions', {}) + + # Check if both node 1 and node 2 exist in the record + if '1' in functions and '2' in functions: + # Extract end_delta of node 1 and start_delta of node 2 + node1_end = functions['1']['end_delta'] + node2_start = functions['2']['start_delta'] + + # Calculate the difference: start time of node 2 - end time of node 1 + diff = node2_start - node1_end + differences.append(diff) + +# Calculate the median of all differences +median_difference = median(differences) + +print(f"Total data points analyzed: {len(differences)}") +print(f"Median difference (node 2 start - node 1 end): {median_difference} milliseconds") + +# Optional: Show some additional statistics +print(f"Minimum difference: {min(differences)} ms") +print(f"Maximum difference: {max(differences)} ms") +print(f"Average difference: {sum(differences)/len(differences):.2f} ms") diff --git a/serwo/aws_create_statemachine.py b/serwo/aws_create_statemachine.py index 5398e1d..a371dde 100644 --- a/serwo/aws_create_statemachine.py +++ b/serwo/aws_create_statemachine.py @@ -108,12 +108,14 @@ def __create_lambda_fns( runner_template_dir, runner_filename ): + # TODO - should this be taken in from the dag-description? fn_requirements_filename = "requirements.txt" fn_dir = serwo_fn_build_dir / f"{fn_name}" logger.info(f"Creating function directory for {fn_name}") if not os.path.exists(fn_dir): + print("Directory made") os.makedirs(fn_dir) pathlib.Path(fn_dir / "__init__.py").touch() @@ -354,7 +356,77 @@ def __generate_asl_template(self): # print("List of changing funtions:",changing_fns_list) # Statemachine.asl.josn should be changed here sfn_json_copy = copy.deepcopy(json.loads(sfn_json)) - data = add_async_afn_builder(sfn_json_copy, changing_fns_list) + data=add_async_afn_builder(sfn_json_copy,changing_fns_list) + + # apply conditional branching logic AFTER async modifications + conditional_branches = self.__user_dag.get_conditional_branches() + if conditional_branches and len(conditional_branches) > 0: + for branch in conditional_branches: + source_node = branch['SourceNode'] + + # Find the source state and modify its Next to point to Check state + if source_node in data['States']: + check_state_name = f"Check{source_node}" + data['States'][source_node]['Next'] = check_state_name + + # Remove 'End' if it exists + data['States'][source_node].pop('End', None) + + # Add the Choice state + data['States'][check_state_name] = { + 'Type': 'Choice', + 'Choices': [ + { + 'Variable': branch['ConditionVariable'], + branch['ConditionType']: branch['ConditionValue'], + 'Next': branch['TargetNode'] + } + ], + 'Default': branch['DefaultTarget'] + } + + # Add Success state if specified as default + if branch['DefaultTarget'] == 'Success': + data['States']['Success'] = { + 'Type': 'Succeed' + } + # Comment out when you need to inject Model names for Agentic workflows + # # NEW: Inject model_name Parameters for each Task state + # function_object_map = self.__user_dag.get_node_object_map() + + # # Define which nodes should NOT get Parameters injection + # # - CollectLogs: system function + # # - Start node: receives raw input without XFaaS wrapper + # nodes_not_to_inject = ['CollectLogs'] + # start_node = data.get('StartAt') + # if start_node: + # nodes_not_to_inject.append(start_node) + + # for state_name, state_def in data['States'].items(): + # if state_def.get('Type') == 'Task' and state_name not in nodes_not_to_inject: + # if state_name in function_object_map: + # model_name = function_object_map[state_name].get_model_name() + # state_def['Parameters'] = { + # "statusCode.$": "$.statusCode", + # "metadata.$": "$.metadata", + # "body": { + # "model_name": model_name, + # "_xfaas_wrapped_body.$": "$.body" + # } + # } + # print(f"Injected model_name '{model_name}' for state '{state_name}'") + # else: + # print(f"DEBUG: State '{state_name}' not found in function_object_map, using default") + # # Still inject Parameters for consistency (with default model) + # state_def['Parameters'] = { + # "statusCode.$": "$.statusCode", + # "metadata.$": "$.metadata", + # "body": { + # "model_name": "openai:gpt-4o-mini", # fallback default + # "_xfaas_wrapped_body.$": "$.body" + # } + # } + # print("Updated data of sfn builder after adding poll",data) with open(f"{self.__aws_build_dir}/{self.__json_file}", "w") as statemachinejson: statemachinejson.write(json.dumps(data)) @@ -414,16 +486,70 @@ def build_workflow(self): logger.info( f"Starting SAM Build for {self.__user_dag.get_user_dag_name()}") os.system( - f"sam build --build-dir {self.__sam_build_dir} --template-file {self.__aws_build_dir / self.__yaml_file} --use-container" + f"DOCKER_DEFAULT_PLATFORM=linux/amd64 " + f"sam build --use-container " + f"--build-dir {self.__sam_build_dir} " + f"--template-file {self.__aws_build_dir / self.__yaml_file}" ) + """ NOTE - deploy workflow """ + def __check_and_cleanup_failed_stack(self): + """Check if stack exists in failed state and delete it""" + import subprocess + import time + + try: + # Check stack status + result = subprocess.run( + f'aws cloudformation describe-stacks --stack-name {self.__sam_stackname} --region {self.__region} --query "Stacks[0].StackStatus" --output text', + shell=True, + capture_output=True, + text=True + ) + + stack_status = result.stdout.strip() + + # If stack is in a failed state, delete it + if stack_status in ['UPDATE_FAILED', 'CREATE_FAILED', 'ROLLBACK_FAILED', 'DELETE_FAILED', 'UPDATE_ROLLBACK_FAILED']: + logger.warning(f"Stack {self.__sam_stackname} is in {stack_status} state. Deleting...") + + # Delete the stack + os.system(f"aws cloudformation delete-stack --stack-name {self.__sam_stackname} --region {self.__region}") + + # Wait for deletion + logger.info("Waiting for stack deletion to complete...") + time.sleep(10) # Initial wait + + # Poll until deleted + for _ in range(30): # Max 5 minutes + check_result = subprocess.run( + f'aws cloudformation describe-stacks --stack-name {self.__sam_stackname} --region {self.__region}', + shell=True, + capture_output=True, + text=True + ) + if "does not exist" in check_result.stderr: + logger.info("Stack successfully deleted") + break + time.sleep(10) + else: + logger.warning("Stack deletion took longer than expected, continuing anyway...") + + except Exception as e: + # Stack doesn't exist, which is fine + logger.info(f"No existing stack found or error checking: {e}") + + def deploy_workflow(self): - logger.info( - f"Starting SAM Deploy for {self.__user_dag.get_user_dag_name()}") + logger.info(f"Starting SAM Deploy for {self.__user_dag.get_user_dag_name()}") + logger.info(f"Template file {self.__sam_build_dir / self.__yaml_file}") + + self.__check_and_cleanup_failed_stack() + os.system( f"sam deploy \ --template-file {self.__sam_build_dir / self.__yaml_file} \ @@ -446,7 +572,7 @@ def deploy_workflow(self): # print("Sam stack name",self.__sam_stackname) # print("Output File path",self.__outputs_filepath) - # add sam stack name to ouput filepat + ## add sam stack name to ouput filepath with open(self.__outputs_filepath, "r") as f: data = json.load(f) data.append({"OutputKey": "SAMStackName", @@ -457,31 +583,32 @@ def deploy_workflow(self): return self.__outputs_filepath -def add_async_afn_builder(data, list): - - for i in range(0, len(list)): - (fn_name, sub) = list[i] - poll_next = data["States"][fn_name]['Next'] - checkPollcondition = 'CheckPollCondition'+str(i) - waitstate = 'WaitState'+str(i) - data["States"][sub]['Next'] = waitstate - data["States"][fn_name]['Next'] = checkPollcondition - data["States"][checkPollcondition] = { - "Type": "Choice", - "Choices": [ +def add_async_afn_builder(data,list): + + for i in range(0,len(list)): + (fn_name,sub) =list[i] + + poll_next=data["States"][fn_name]['Next'] + checkPollcondition='CheckPollCondition'+str(i) + waitstate='WaitState'+str(i) + data["States"][sub]['Next']=waitstate + data["States"][fn_name]['Next']=checkPollcondition + data["States"][checkPollcondition]={ + "Type": "Choice", + "Choices": [ { "Variable": "$.body.Poll", "BooleanEquals": False, "Next": poll_next } - ], - "Default": waitstate - } - data["States"][waitstate] = { - "Type": "Wait", - "Seconds": 100, - "Next": fn_name - } + ], + "Default": waitstate + } + data["States"][waitstate]={ + "Type": "Wait", + "SecondsPath": "$.body.waittime", + "Next": fn_name + } return data diff --git a/serwo/config/xfaas_user_config.json b/serwo/config/xfaas_user_config.json index 2e7c66f..914e467 100644 --- a/serwo/config/xfaas_user_config.json +++ b/serwo/config/xfaas_user_config.json @@ -9,8 +9,5 @@ "password": "dummy_pass123" } }, - "user_pinned_nodes" : { - "1":"0" - - } + "user_pinned_nodes" : {} } \ No newline at end of file diff --git a/serwo/deploy/template.yaml b/serwo/deploy/template.yaml index 9a335d0..eb8a78e 100644 --- a/serwo/deploy/template.yaml +++ b/serwo/deploy/template.yaml @@ -82,7 +82,7 @@ Resources: Properties: CodeUri: {{ function.uri }} Handler: x - Runtime: python3.9 + Runtime: python3.13 MemorySize: 1024 Timeout: 300 Architectures: diff --git a/serwo/dp_xfaas_partitioner.py b/serwo/dp_xfaas_partitioner.py index 7cdc55e..1344243 100644 --- a/serwo/dp_xfaas_partitioner.py +++ b/serwo/dp_xfaas_partitioner.py @@ -121,12 +121,13 @@ def get_data_transfer_value(data_transfers_benchmark, i, j, v, data_tranfers,is_ def evaluate_inter_cloud_data_transfer_constraints(data_tranfers, i, j, v): flag = False - if v == 1 and j == 0 and data_tranfers[i - 1] > 64: - flag = True - if v == 0 and j == 1 and data_tranfers[i - 1] > 256: - flag = True - if v==0 and j==0 and data_tranfers[i-1] > 256: - flag = True + #Added Large payload support hence commenting this out + # if v == 1 and j == 0 and data_tranfers[i - 1] > 64: + # flag = True + # if v == 0 and j == 1 and data_tranfers[i - 1] > 256: + # flag = True + # if v==0 and j==0 and data_tranfers[i-1] > 256: + # flag = True return flag diff --git a/serwo/python/src/faas-templates/aws/push-to-sqs-template/PushToSQS/push_to_aws_q.py b/serwo/python/src/faas-templates/aws/push-to-sqs-template/PushToSQS/push_to_aws_q.py index 51ad42b..b4c0fd0 100644 --- a/serwo/python/src/faas-templates/aws/push-to-sqs-template/PushToSQS/push_to_aws_q.py +++ b/serwo/python/src/faas-templates/aws/push-to-sqs-template/PushToSQS/push_to_aws_q.py @@ -4,6 +4,7 @@ import botocore.session from python.src.utils.classes.commons.serwo_objects import SerWOObject import logging +import uuid # Create SQS client """ @@ -11,17 +12,43 @@ NOTE - Currently the access_key_id = , secret_access_key = are HARDCODED. WE NEED TO TEMPLATE THIS !!!!!!!!!!!!!!!!!!!!!!!!!!!!!! """ - +AWS_ACCESS_KEY_ID = "{{access_key_id}}" +AWS_SECRET_ACCESS_KEY = "{{secret_access_key}}" sqs = boto3.client( "sqs", region_name="ap-south-1", - aws_access_key_id="{{access_key_id}}", - aws_secret_access_key="{{secret_access_key}}", + aws_access_key_id=AWS_ACCESS_KEY_ID, + aws_secret_access_key=AWS_SECRET_ACCESS_KEY, ) +AWS_MESSAGE_THRESHOLD = 256 * 1024 queue_url = "{{queue_url}}" +def generate_s3_key(prefix="xfaas", extension="json"): + """ + Generate a unique S3 object key with the given prefix and extension. + """ + if prefix == None: + prefix = "xfaas" + return f"{prefix}/{uuid.uuid4()}.{extension}" + +def upload_json_payload(bucket, payload, prefix="xfaas"): + """ + Serializes and uploads the payload (dict/list) to the given S3 bucket. + Returns the key used. + """ + key = generate_s3_key(prefix=prefix, extension="json") + s3 = boto3.client( + "s3", + region_name="ap-south-1", + aws_access_key_id=AWS_ACCESS_KEY_ID, + aws_secret_access_key=AWS_SECRET_ACCESS_KEY, + ) + data_bytes = json.dumps(payload).encode("utf-8") + s3.put_object(Bucket=bucket, Key=key, Body=data_bytes) + return key + # Send message to SQS queue def user_function(serwoObject) -> SerWOObject: @@ -31,7 +58,35 @@ def user_function(serwoObject) -> SerWOObject: metadata = serwoObject.get_metadata() message["body"] = data message["metadata"] = metadata - logging.info(f"Input {message}") + #logging.info(f"Input {message}") + if "aws" not in message["body"]: + logging.info("AWS details for long message is not present in function body") + + output_bytes = json.dumps(message).encode("utf-8") + if len(output_bytes) > AWS_MESSAGE_THRESHOLD: + user_bucket = None + # If output is large (autotrigger) + event_body = message["body"] + if isinstance(event_body, dict): + if "aws" in event_body: + aws_details = event_body.get("aws", {}) + user_bucket = aws_details.get("bucket") + if not user_bucket: + raise ValueError("This workflow needs long payload size handling. Please provide valid storage details for each CSP in the event") + # --- Proceed to upload --- + deployment_id = metadata.get("deployment_id") + key = upload_json_payload(user_bucket, message["body"], prefix= deployment_id) + result = { + + "large_payload": 1, + "aws": { + "bucket": user_bucket, + "key": key + } + } + + message["body"] = result + response = sqs.send_message( QueueUrl=queue_url, MessageBody=json.dumps(message), diff --git a/serwo/python/src/faas-templates/aws/yaml-templates/apigw-awsstepfunctions.yaml b/serwo/python/src/faas-templates/aws/yaml-templates/apigw-awsstepfunctions.yaml index d55b94d..aa86f27 100644 --- a/serwo/python/src/faas-templates/aws/yaml-templates/apigw-awsstepfunctions.yaml +++ b/serwo/python/src/faas-templates/aws/yaml-templates/apigw-awsstepfunctions.yaml @@ -81,6 +81,13 @@ Resources: {% if function.iscontainer %} Type: AWS::Serverless::Function Properties: + Policies: + - Statement: + - Effect: Allow + Action: + - s3:GetObject + - s3:PutObject + Resource: arn:aws:s3:::*/* PackageType: Image Architectures: - x86_64 @@ -93,9 +100,16 @@ Resources: {% else %} Type: AWS::Serverless::Function Properties: + Policies: + - Statement: + - Effect: Allow + Action: + - s3:GetObject + - s3:PutObject + Resource: arn:aws:s3:::*/* CodeUri: {{ function.uri }} Handler: {{ function.handler }} - Runtime: python3.9 + Runtime: python3.13 MemorySize: {{ function.memory }} Timeout: 300 Architectures: diff --git a/serwo/python/src/faas-templates/aws/yaml-templates/awsstepfunctions.yaml b/serwo/python/src/faas-templates/aws/yaml-templates/awsstepfunctions.yaml index 7dd3062..adc0c2d 100644 --- a/serwo/python/src/faas-templates/aws/yaml-templates/awsstepfunctions.yaml +++ b/serwo/python/src/faas-templates/aws/yaml-templates/awsstepfunctions.yaml @@ -38,7 +38,7 @@ Resources: Properties: CodeUri: {{ function.uri }} Handler: {{ function.handler }} - Runtime: python3.9 + Runtime: python3.13 Architectures: - x86_64 {% endfor %} \ No newline at end of file diff --git a/serwo/python/src/faas-templates/aws/yaml-templates/sqs-awsstepfunctions.yaml b/serwo/python/src/faas-templates/aws/yaml-templates/sqs-awsstepfunctions.yaml index 43691dd..3a6f96c 100644 --- a/serwo/python/src/faas-templates/aws/yaml-templates/sqs-awsstepfunctions.yaml +++ b/serwo/python/src/faas-templates/aws/yaml-templates/sqs-awsstepfunctions.yaml @@ -63,7 +63,7 @@ Resources: input=message_body ) - {StateMachineArn: !Ref {{stepfunctionname}}} # templated here - Runtime: "python3.9" + Runtime: "python3.13" Timeout: "30" # mostly static @@ -107,13 +107,63 @@ Resources: {% for function in functions -%} {{function.name}}: + {% if function.iscontainer %} Type: AWS::Serverless::Function Properties: + Policies: + - Statement: + - Effect: Allow + Action: + - s3:GetObject + - s3:PutObject + - dynamodb:GetItem + - dynamodb:PutItem + - dynamodb:UpdateItem + - dynamodb:DeleteItem + - dynamodb:Query + - dynamodb:Scan + - dynamodb:BatchGetItem + - dynamodb:BatchWriteItem + - logs:CreateLogGroup + - logs:CreateLogStream + - logs:PutLogEvents + Resource: "*" + PackageType: Image + Architectures: + - x86_64 + Timeout: 600 + MemorySize: {{function.memory}} + Metadata: + Dockerfile: Dockerfile + DockerContext: ./functions/{{function.name}} + DockerTag: python3.13 + {% else %} + Type: AWS::Serverless::Function + Properties: + Policies: + - Statement: + - Effect: Allow + Action: + - s3:GetObject + - s3:PutObject + - dynamodb:GetItem + - dynamodb:PutItem + - dynamodb:UpdateItem + - dynamodb:DeleteItem + - dynamodb:Query + - dynamodb:Scan + - dynamodb:BatchGetItem + - dynamodb:BatchWriteItem + - logs:CreateLogGroup + - logs:CreateLogStream + - logs:PutLogEvents + Resource: "*" CodeUri: {{ function.uri }} Handler: {{ function.handler }} - Runtime: python3.9 + Runtime: python3.13 MemorySize: {{ function.memory }} Timeout: 300 Architectures: - x86_64 + {% endif%} {% endfor %} \ No newline at end of file diff --git a/serwo/python/src/runner-templates/aws/runner_template.py b/serwo/python/src/runner-templates/aws/runner_template.py index 3461074..1120467 100644 --- a/serwo/python/src/runner-templates/aws/runner_template.py +++ b/serwo/python/src/runner-templates/aws/runner_template.py @@ -1,6 +1,3 @@ -# TODO - !! MOVE THIS OUT TO A DIFFERENT MODULE # -# SerwoObject - single one -# SerwoListObject - [SerwoObject] import importlib import json import sys @@ -11,12 +8,13 @@ import os import psutil import objsize -# from USER_FUNCTION_PLACEHOLDER import function as USER_FUNCTION_PLACEHOLDER_function - NOTE - !!! TK - STANDARDISE THIS!!! IMPORTANT -from USER_FUNCTION_PLACEHOLDER import user_function as USER_FUNCTION_PLACEHOLDER_function +import uuid +import boto3 from copy import deepcopy +from USER_FUNCTION_PLACEHOLDER import user_function as USER_FUNCTION_PLACEHOLDER_function from python.src.utils.classes.commons.serwo_objects import build_serwo_object from python.src.utils.classes.commons.serwo_objects import build_serwo_list_object -from python.src.utils.classes.commons.serwo_objects import SerWOObject +from python.src.utils.classes.commons.serwo_objects import SerWOObject, SerWOObjectsList downstream = 0 """ @@ -27,27 +25,48 @@ - objective is to create a list of common keys, access patterns which will be used to create a common object to pass around - """ +# Long Message Support +# Helper for S3 key generation and upload +def generate_s3_key(prefix="xfaas", extension="json"): + """ + Generate a unique S3 object key with the given prefix and extension. + """ + if prefix == None: + prefix = "xfaas" + return f"{prefix}/{uuid.uuid4()}.{extension}" +def upload_json_payload(bucket, payload, prefix="xfaas"): + """ + Serializes and uploads the payload (dict/list) to the given S3 bucket. + Returns the key used. + """ + key = generate_s3_key(prefix=prefix, extension="json") + s3 = boto3.client('s3') + data_bytes = json.dumps(payload).encode("utf-8") + s3.put_object(Bucket=bucket, Key=key, Body=data_bytes) + return key # Get time delta function def get_delta(timestamp): return round(time.time() * 1000) - timestamp + # AWS Handler def lambda_handler(event, context): start_time = round(time.time() * 1000) - # Unmarshal from lambda handler - # capturing input payload size + # Handle input payload size computation for monitoring input_payload_size_bytes = None + # ----------- INPUT PARSING --------------- if isinstance(event, list): # TODO: exception handling - serwo_request_object = build_serwo_list_object(event) - # Calculate input payload size - input_payload_size_bytes = sum([objsize.get_deep_size(x.get_body()) for x in serwo_request_object.get_objects()]) - + + input_payload_size_bytes = sum([objsize.get_deep_size(x.get("body")) for x in event]) + #sum([objsize.get_deep_size(x.get_body()) for x in serwo_request_object.get_objects()]) + + serwo_request_object = build_serwo_list_object(event) elif isinstance(event, dict): # # NOTE - this is a sample if condition for the pilot jobs # if "body" in event: @@ -71,12 +90,16 @@ def lambda_handler(event, context): deployment_id=deployment_id, functions=[], ) + input_payload_size_bytes = objsize.get_deep_size(event.get("body")) + serwo_request_object = build_serwo_object(event) - input_payload_size_bytes = objsize.get_deep_size(serwo_request_object.get_body()) + + else: - # TODO: Report error and return - pass + return dict(statusCode=500, body="Unrecognized input type", metadata="None") serwo_request_object.set_basepath("") + + # user function exec status_code = 200 try: @@ -95,7 +118,7 @@ def lambda_handler(event, context): memory_after = process.memory_info().rss print(f"SerWOMemUsage::After::{wf_instance_id},{function_id},{memory_after}") - # Sanity check for user function response + # Validation check for user function response if not isinstance(response_object, SerWOObject): status_code = 500 return dict( @@ -138,8 +161,45 @@ def lambda_handler(event, context): # post function handler # NOTE - leaving empty for now and returning a response is. # Send service bus/ storage queue + deployment_id = metadata.get("deployment_id") body = response_object.get_body() response = dict(statusCode=status_code, body=body, metadata=metadata) + output_bytes = json.dumps(response).encode("utf-8") + LONG_MESSAGE_THRESHOLD = 256 * 1024 # 256 KB in bytes + + if len(output_bytes) > LONG_MESSAGE_THRESHOLD: + user_bucket = None + # If output is large (autotrigger) + + if isinstance(event, dict): + event_body = event.get("body") + if "aws" in event_body: + aws_details = event_body.get("aws", {}) + user_bucket = aws_details.get("bucket") + elif isinstance(event, list): + #We get a list in input event from a fan-in which are all assumed to be in the same csp + event_body = event[0].get("body") + if "aws" in event_body: + aws_details = event_body.get("aws", {}) + user_bucket = aws_details.get("bucket") + if not user_bucket: + raise ValueError("This workflow needs long payload size handling. Please provide a CSP AND valid storage details for each CSP in the event") + # --- Proceed to upload --- + key = upload_json_payload(user_bucket, body, prefix= deployment_id) + result = { + + "large_payload": 1, + "aws": { + "bucket": user_bucket, + "key": key + } + } + function_metadata_list[-1][function_id]['out_payload_bytes'] = objsize.get_deep_size(result) + metadata.update(dict(functions=function_metadata_list)) + + response = dict(statusCode=200, body=result, metadata=metadata) + + if downstream == 0: # TODO - change while doing egress return response diff --git a/serwo/python/src/runner-templates/aws/runner_template_bkp.py b/serwo/python/src/runner-templates/aws/runner_template_bkp.py new file mode 100644 index 0000000..3461074 --- /dev/null +++ b/serwo/python/src/runner-templates/aws/runner_template_bkp.py @@ -0,0 +1,151 @@ +# TODO - !! MOVE THIS OUT TO A DIFFERENT MODULE # +# SerwoObject - single one +# SerwoListObject - [SerwoObject] +import importlib +import json +import sys +import time +import random +import string +import logging +import os +import psutil +import objsize +# from USER_FUNCTION_PLACEHOLDER import function as USER_FUNCTION_PLACEHOLDER_function - NOTE - !!! TK - STANDARDISE THIS!!! IMPORTANT +from USER_FUNCTION_PLACEHOLDER import user_function as USER_FUNCTION_PLACEHOLDER_function +from copy import deepcopy +from python.src.utils.classes.commons.serwo_objects import build_serwo_object +from python.src.utils.classes.commons.serwo_objects import build_serwo_list_object +from python.src.utils.classes.commons.serwo_objects import SerWOObject + +downstream = 0 +""" +NOTE - creating a serwo wrapper object from cloud events +* a different wrapper object will be constructed for different event types +* for one particular event type one object will be constructed + - we will need to find common keys in the event object for one event type across different FaaS service providers + - objective is to create a list of common keys, access patterns which will be used to create a common object to pass around + - +""" + + +# Get time delta function +def get_delta(timestamp): + return round(time.time() * 1000) - timestamp + + +# AWS Handler +def lambda_handler(event, context): + start_time = round(time.time() * 1000) + # Unmarshal from lambda handler + # capturing input payload size + input_payload_size_bytes = None + + if isinstance(event, list): + # TODO: exception handling + serwo_request_object = build_serwo_list_object(event) + + # Calculate input payload size + input_payload_size_bytes = sum([objsize.get_deep_size(x.get_body()) for x in serwo_request_object.get_objects()]) + + elif isinstance(event, dict): + # # NOTE - this is a sample if condition for the pilot jobs + # if "body" in event: + # if "is_warmup" in event["body"]: + # print("Warmup code") + # return dict(status_code=200, body="Warmed up") + if "metadata" not in event: + new_event = deepcopy(event) + event = dict(body=new_event) + wf_instance_id = event["body"].get("workflow_instance_id") + request_timestamp = event["body"].get("request_timestamp") + session_id = event["body"].get("session_id") # NOTE - new variable to keep track of requests + deployment_id = event["body"].get("deployment_id") + overheads = start_time - request_timestamp + event["metadata"] = dict( + workflow_instance_id=wf_instance_id, + workflow_start_time=start_time, + request_timestamp=request_timestamp, + session_id=session_id, + overheads=overheads, + deployment_id=deployment_id, + functions=[], + ) + serwo_request_object = build_serwo_object(event) + input_payload_size_bytes = objsize.get_deep_size(serwo_request_object.get_body()) + else: + # TODO: Report error and return + pass + serwo_request_object.set_basepath("") + # user function exec + status_code = 200 + try: + start_epoch_time = serwo_request_object.get_metadata().get( + "workflow_start_time" + ) + start_time_delta = get_delta(start_epoch_time) + wf_instance_id = serwo_request_object.get_metadata().get("workflow_instance_id") + function_id = "{{function_id_placeholder}}" + process = psutil.Process(os.getpid()) + + memory_before = process.memory_info().rss + print(f"SerWOMemUsage::Before::{wf_instance_id},{function_id},{memory_before}") + response_object = USER_FUNCTION_PLACEHOLDER_function(serwo_request_object) + process = psutil.Process(os.getpid()) + memory_after = process.memory_info().rss + print(f"SerWOMemUsage::After::{wf_instance_id},{function_id},{memory_after}") + + # Sanity check for user function response + if not isinstance(response_object, SerWOObject): + status_code = 500 + return dict( + statusCode=status_code, + body="Response Object Must be of type SerWOObject", + metadata="None", + ) + end_time_delta = get_delta(start_epoch_time) + # st_time = int(time.time()*1000) + # cpu_brand = cpuinfo.get_cpu_info()["brand_raw"] + # en_time = int(time.time()*1000) + # time_taken = en_time - st_time + # cpu_brand = f"{cpu_brand}_{time_taken}" + # Get current metadata here + metadata = serwo_request_object.get_metadata() + function_metadata_list = metadata.get("functions") + # NOTE - the template for generated function id + function_metadata_list.append( + { + function_id: dict( + start_delta=start_time_delta, + end_delta=end_time_delta, + mem_before=memory_before, + mem_after=memory_after, + in_payload_bytes=input_payload_size_bytes, + out_payload_bytes=objsize.get_deep_size(response_object.get_body()), + # cpu=cpu_brand + + ) + } + ) + metadata.update(dict(functions=function_metadata_list)) + except Exception as e: + # if user compute fails then default to status code as 500 and no response body + print(e) + status_code = 500 + return dict( + statusCode=status_code, body="Error in user compute", metadata="None" + ) + # post function handler + # NOTE - leaving empty for now and returning a response is. + # Send service bus/ storage queue + body = response_object.get_body() + response = dict(statusCode=status_code, body=body, metadata=metadata) + if downstream == 0: + # TODO - change while doing egress + return response + return response + + +if __name__ == "__main__": + print("Main Method") + # lambda_handler(event, context) diff --git a/serwo/python/src/utils/classes/aws/function.py b/serwo/python/src/utils/classes/aws/function.py index 18d84a1..04b33a1 100644 --- a/serwo/python/src/utils/classes/aws/function.py +++ b/serwo/python/src/utils/classes/aws/function.py @@ -1,5 +1,5 @@ class Function: - def __init__(self, id, name, path, end_point, memory): + def __init__(self, id, name, path, end_point, memory, model_name=None): self._name = name self._id = id self._arn = name + "Arn" @@ -10,6 +10,7 @@ def __init__(self, id, name, path, end_point, memory): self._uri = "functions/" + name self._module_name = end_point.split(".")[0] self._memory = memory + self._model_name = model_name or "openai:gpt-4o-mini" # Default model # TODO - add function id self._isasync = False self._iscontainerised=False @@ -35,6 +36,9 @@ def get_handler(self): def get_module_name(self): return self._module_name + def get_model_name(self): + return self._model_name + def get_as_dict(self): return { "name": self._name, diff --git a/serwo/python/src/utils/classes/aws/user_dag.py b/serwo/python/src/utils/classes/aws/user_dag.py index 257738f..4a9aa84 100644 --- a/serwo/python/src/utils/classes/aws/user_dag.py +++ b/serwo/python/src/utils/classes/aws/user_dag.py @@ -24,6 +24,7 @@ def __init__(self, user_config_path): self.__nodeIDMap = {} self.__dag = nx.DiGraph() self.__functions = {} + self.__conditional_branches = [] except Exception as e: raise e @@ -34,13 +35,16 @@ def __init__(self, user_config_path): nodeID = "n" + str(index) self.__nodeIDMap[node["NodeName"]] = nodeID self.__nodeIDMap[node["NodeId"]] = nodeID + model_name = node.get("ModelName", "openai:gpt-4o-mini") self.__functions[node["NodeName"]] = Function( node["NodeId"], node["NodeName"], node["Path"], node["EntryPoint"], - node["MemoryInMB"] + node["MemoryInMB"], + model_name ) + print("NodeKeys -", list(node.keys())) if "IsAsync" in node and node["IsAsync"]: @@ -65,6 +69,10 @@ def __init__(self, user_config_path): for val in edge[key]: self.__dag.add_edge(self.__nodeIDMap[key], self.__nodeIDMap[val]) + #Parse conditional branches after edges + if "ConditionalBranches" in self.__dag_config_data: + self.__conditional_branches = self.__dag_config_data["ConditionalBranches"] + def _get_state(self, nodename): state = AWSSfnBuilder.State.parse( { @@ -298,6 +306,11 @@ def get_predecessor_node_names(self, node_name): def get_user_dag_nodes(self): return self.__dag_config_data["Nodes"] + + def get_conditional_branches(self): + """Returns conditional branches from DAG config""" + return self.__conditional_branches + def get_user_dag_edges(self): return self.__dag_config_data["Edges"] def get_dag(self): diff --git a/serwo/python/src/utils/classes/azure/user_dag.py b/serwo/python/src/utils/classes/azure/user_dag.py index 1856678..f9ae787 100644 --- a/serwo/python/src/utils/classes/azure/user_dag.py +++ b/serwo/python/src/utils/classes/azure/user_dag.py @@ -18,6 +18,8 @@ def __init__(self, user_config_path): print("Dag config data - ", self.__dag_config_data) self.__nodeIDMap = {} self.__dag = nx.DiGraph() + self.__conditional_branches = [] + except Exception as e: raise e @@ -31,8 +33,9 @@ def __init__(self, user_config_path): # NOTE - this is different in AWS, being picked up from the user dag-description # nodeID = "n" + str(index) nodeID = node["NodeId"] + model_name = node.get("ModelName", "openai:gpt-4o-mini") self.__nodeIDMap[node["NodeName"]] = nodeID - self.__dag.add_node(nodeID, NodeName=node["NodeName"], pre="", ret=["yield ", "context.call_activity(\"" + node["NodeName"] + "\",$var$)"], var=self._generate_random_variable_name(), machine_list=[nodeID]) + self.__dag.add_node(nodeID, NodeName=node["NodeName"], ModelName=model_name, pre="", ret=["yield ", "context.call_activity(\"" + node["NodeName"] + "\",$var$)"], var=self._generate_random_variable_name(), machine_list=[nodeID]) index += 1 @@ -43,6 +46,9 @@ def __init__(self, user_config_path): self.__dag.add_edge( self.__nodeIDMap[key], self.__nodeIDMap[val]) + # Conditional Branches after edges: + if "ConditionalBranches" in self.__dag_config_data: + self.__conditional_branches = self.__dag_config_data["ConditionalBranches"] start_node = [node for node in self.__dag.nodes if self.__dag.in_degree(node) == 0][0] self.__dag.nodes[start_node]['ret'] = ["yield ", "context.call_activity(\"" + self.__dag.nodes[start_node]["NodeName"] + "\", serwoObject)"] @@ -53,12 +59,84 @@ def __init__(self, user_config_path): start_node = [node for node in self.__dag.nodes if self.__dag.in_degree(node) == 0][0] self.__dag.nodes[start_node]['ret'] = ["yield ", "context.call_activity(\"" + self.__dag.nodes[start_node]["NodeName"] + "\", serwoObject)"] + def get_conditional_branches(self): + return self.__conditional_branches + + def has_conditional_branches(self): + return len(self.__conditional_branches) > 0 def __load_user_spec(self, user_config_path): with open(user_config_path, "r") as user_dag_spec: dag_data = json.load(user_dag_spec) return dag_data + def _wrap_with_conditional_logic(self, statements, result_var): + """ + Generic wrapper for conditional branching using user input. + """ + if not self.__conditional_branches: + return statements + + branch = self.__conditional_branches[0] + condition_var = branch['ConditionVariable'] + condition_type = branch['ConditionType'] + condition_value = branch['ConditionValue'] + + json_path = condition_var.replace('$.', '') + + wrapped_statements = [] + wrapped_statements.append("# Conditional branching loop") + wrapped_statements.append("should_continue = True") + wrapped_statements.append("") + wrapped_statements.append("while should_continue:") + + # Indent workflow statements (exclude return) - add 4 spaces for while body + for stmt in statements[:-1]: + wrapped_statements.append(" " + stmt) + + wrapped_statements.append("") + wrapped_statements.append(" # Check conditional branching condition") + wrapped_statements.append(" import json") + wrapped_statements.append(" try:") + wrapped_statements.append(" result_dict = json.loads({})".format(result_var)) + wrapped_statements.append(" if '_body' in result_dict:") + wrapped_statements.append(" result_body = result_dict['_body']") + wrapped_statements.append(" else:") + wrapped_statements.append(" result_body = result_dict.get('body', {})") + wrapped_statements.append("") + + # Generate condition check + if condition_type == "BooleanEquals": + # FIX: Use capital True/False + condition_check = "result_body.get('{}', False) == {}".format( + json_path, str(condition_value).capitalize() + ) + elif condition_type == "StringEquals": + condition_check = "result_body.get('{}', '') == '{}'".format(json_path, condition_value) + elif condition_type == "NumericEquals": + condition_check = "result_body.get('{}', 0) == {}".format(json_path, condition_value) + else: + condition_check = "result_body.get('{}', False)".format(json_path) + + wrapped_statements.append(" should_continue = {}".format(condition_check)) + wrapped_statements.append("") + wrapped_statements.append(" # Check iteration limit from user input") + wrapped_statements.append(" current_iter = result_body.get('iteration_count', 0)") + wrapped_statements.append(" max_iter = result_body.get('max_iterations', 1)") + wrapped_statements.append(" if current_iter >= max_iter:") + wrapped_statements.append(" should_continue = False") + wrapped_statements.append("") + wrapped_statements.append(" except Exception as e:") + wrapped_statements.append(" should_continue = False") + wrapped_statements.append("") + wrapped_statements.append(" if not should_continue:") + wrapped_statements.append(" break") + + wrapped_statements.append("") + wrapped_statements.append(statements[-1]) # return statement + + return wrapped_statements + def _generate_random_variable_name(self, n=4): res = ''.join(random.choices(string.ascii_letters, k=n)) return str(res).lower() @@ -70,6 +148,16 @@ def _get_orchestrator_code_parallel_merge(self, dag, nodes): task_list_create = task_list_var_name + " = []\n" ret = ["yield ", "context.task_all(" + task_list_var_name + ")"] pre = "" + + # Use the first node's model_name + if nodes: + model_name = dag.nodes[nodes[0]].get('ModelName', 'openai:gpt-4o-mini') + pre += f"\n# Inject model_name for parallel execution" + pre += f"\nparallel_input = json.loads(serwoObject)" + pre += f"\nif 'body' in parallel_input:" + pre += f"\n parallel_input['body']['model_name'] = '{model_name}'" + pre += f"\nserwoObject = json.dumps(parallel_input)" + for node in nodes: # Remember No $var$ will be updated in parallel merge pre += "\n" + dag.nodes[node]['pre'] @@ -92,16 +180,84 @@ def _get_orchestrator_code_linear_merge(self, dag, nodes): previous_var = None for node in nodes[:-1]: # $var$ will be updated in linear merge + # Get model_name for this node + model_name = dag.nodes[node].get('ModelName', 'openai:gpt-4o-mini') + node_name = dag.nodes[node].get('NodeName', 'unknown') if previous_var is not None: + # Inject model_name before calling activity + pre += f"\n# Inject model_name for {dag.nodes[node].get('NodeName')}" + pre += f"\nimport json" + pre += f"\n{previous_var}_dict = json.loads({previous_var})" + pre += f"\nif 'body' in {previous_var}_dict:" + pre += f"\n body = {previous_var}_dict['body']" + pre += f"\n if isinstance(body, str):" + pre += f"\n body = json.loads(body)" + pre += f"\n body['model_name'] = '{model_name}'" + pre += f"\n {previous_var}_dict['body'] = json.dumps(body)" + pre += f"\n else:" + pre += f"\n body['model_name'] = '{model_name}'" + pre += f"\n {previous_var}_dict['body'] = body" + pre += f"\nelif '_body' in {previous_var}_dict:" + pre += f"\n body = {previous_var}_dict['_body']" + pre += f"\n if isinstance(body, str):" + pre += f"\n body = json.loads(body)" + pre += f"\n body['model_name'] = '{model_name}'" + pre += f"\n {previous_var}_dict['_body'] = json.dumps(body)" + pre += f"\n else:" + pre += f"\n body['model_name'] = '{model_name}'" + pre += f"\n {previous_var}_dict['_body'] = body" + pre += f"\nelse:" + pre += f"\n {previous_var}_dict['model_name'] = '{model_name}'" + pre += f"\n{previous_var} = json.dumps({previous_var}_dict)" + pre += "\n" + dag.nodes[node]['pre'].replace("$var$", previous_var) var_substituted = dag.nodes[node]['ret'][1].replace("$var$", previous_var) else: + # Inject model_name for first node + pre += f"\n# Inject model_name for {dag.nodes[node].get('NodeName')}" + pre += f"\nimport json" + pre += f"\nserwoObject_dict = json.loads(serwoObject)" + pre += f"\nif 'body' in serwoObject_dict:" + pre += f"\n serwoObject_dict['body']['model_name'] = '{model_name}'" + pre += f"\nelse:" + pre += f"\n # Raw input - add model_name at top level" + pre += f"\n serwoObject_dict['model_name'] = '{model_name}'" + pre += f"\nserwoObject = json.dumps(serwoObject_dict)" + pre += "\n" + dag.nodes[node]['pre'] var_substituted = dag.nodes[node]['ret'][1] + pre += "\n" + dag.nodes[node]['var'] + " = " + dag.nodes[node]['ret'][0] + " " + var_substituted previous_var = dag.nodes[node]['var'] + # Handle last node + model_name_last = dag.nodes[last].get('ModelName', 'openai:gpt-4o-mini') + + pre += f"\n# Inject model_name for {dag.nodes[last].get('NodeName')}" + pre += f"\n{previous_var}_dict = json.loads({dag.nodes[nodes[-2]]['var']})" + pre += f"\nif 'body' in {previous_var}_dict:" + pre += f"\n body = {previous_var}_dict['body']" + pre += f"\n if isinstance(body, str):" + pre += f"\n body = json.loads(body)" + pre += f"\n body['model_name'] = '{model_name_last}'" + pre += f"\n {previous_var}_dict['body'] = json.dumps(body)" + pre += f"\n else:" + pre += f"\n body['model_name'] = '{model_name_last}'" + pre += f"\n {previous_var}_dict['body'] = body" + pre += f"\nelif '_body' in {previous_var}_dict:" + pre += f"\n body = {previous_var}_dict['_body']" + pre += f"\n if isinstance(body, str):" + pre += f"\n body = json.loads(body)" + pre += f"\n body['model_name'] = '{model_name_last}'" + pre += f"\n {previous_var}_dict['_body'] = json.dumps(body)" + pre += f"\n else:" + pre += f"\n body['model_name'] = '{model_name_last}'" + pre += f"\n {previous_var}_dict['_body'] = body" + pre += f"\nelse:" + pre += f"\n {previous_var}_dict['model_name'] = '{model_name_last}'" + pre += f"\n{dag.nodes[nodes[-2]]['var']} = json.dumps({previous_var}_dict)" + pre += "\n" + dag.nodes[last]['pre'].replace("$var$", dag.nodes[nodes[-2]]['var']) var = self._generate_random_variable_name() # $var$ will be updated in linear merge @@ -276,6 +432,12 @@ def get_orchestrator_code(self): pre_statements.append(post_code) pre_statements.append(f"return {final_var}") - # TODO - for every taskall add the converstion from [serwo_objects] -> serwo_list_object - orchestrator_code = "\n".join([pre_statements[0]] + ["\t" + statement for statement in pre_statements[1:]]) - return orchestrator_code \ No newline at end of file + # Apply conditional branching + + if self.has_conditional_branches(): + pre_statements = self._wrap_with_conditional_logic(pre_statements, final_var) + + # Always add base indentation to be inside orchestrator_function + orchestrator_code = "\n".join([" " + statement for statement in pre_statements]) + + return orchestrator_code diff --git a/serwo/python/src/utils/classes/commons/serwo_objects.py b/serwo/python/src/utils/classes/commons/serwo_objects.py index 77e1de7..449afab 100644 --- a/serwo/python/src/utils/classes/commons/serwo_objects.py +++ b/serwo/python/src/utils/classes/commons/serwo_objects.py @@ -1,4 +1,5 @@ import json +import boto3 class SerWOObject: @@ -66,6 +67,22 @@ def get_basepath(self): def set_basepath(self, basepath): self._basepath = basepath +def fetch_event_payload(body): + """Checks whether incoming input to lambda has large payload enabled and extracts exact payload from the pointer""" + if isinstance(body, dict) and body.get("large_payload") == 1: + csp_info = body.get("aws", {}) + bucket = csp_info.get("bucket") + key = csp_info.get("key") + if bucket and key: + try: + s3 = boto3.client('s3') + obj = s3.get_object(Bucket=bucket, Key=key) + data = obj['Body'].read().decode('utf-8') + return json.loads(data) + except Exception as e: + print(f"[Large payload download error]: {e}") + return body + return body def build_serwo_list_object(event): """ @@ -95,7 +112,7 @@ def build_serwo_list_object(event): for data in functions_metadata_list: for fid, fdata in data.items(): functions_metadata_dict[fid] = fdata - list_obj.add_object(body=record.get("body")) + list_obj.add_object(body=fetch_event_payload(record.get("body"))) # convert the function metadata dict to a list to be added to collated metadata for fid, fdata in functions_metadata_dict.items(): collated_functions_metadata_list.append({fid: fdata}) @@ -106,4 +123,4 @@ def build_serwo_list_object(event): def build_serwo_object(event): - return SerWOObject(body=event["body"], metadata=event["metadata"]) + return SerWOObject(body=fetch_event_payload(event["body"]), metadata=event["metadata"]) diff --git a/serwo/python/src/utils/classes/commons/serwo_user_dag.py b/serwo/python/src/utils/classes/commons/serwo_user_dag.py index 1f7405f..2a02ee3 100644 --- a/serwo/python/src/utils/classes/commons/serwo_user_dag.py +++ b/serwo/python/src/utils/classes/commons/serwo_user_dag.py @@ -38,9 +38,19 @@ def __init__(self, user_config_path): NodeName=node["NodeName"], Path=node["Path"], EntryPoint=node["EntryPoint"], - MemoryInMB=node["MemoryInMB"], - IsAsync=node.get("IsAsync", False), - CSP=node.get("CSP", "NA")) + MemoryInMB=node["MemoryInMB"]) + #Adding additional parameters for QXFaas workflows + if "IsAsync" in node: + self.__dag.add_node(nodeID, + IsAsync=node["IsAsync"]) + if "IsContainerised" in node: + self.__dag.add_node(nodeID, + IsContainerised=node["IsContainerised"]) + + # if "ModelName" in node: + # self.__dag.add_node(nodeID, + # ModelName=node["ModelName"]) + index += 1 # add edges in the dag diff --git a/serwo/scripts/azure/azure_builder.py b/serwo/scripts/azure/azure_builder.py index 41f05c2..03d1038 100644 --- a/serwo/scripts/azure/azure_builder.py +++ b/serwo/scripts/azure/azure_builder.py @@ -64,6 +64,7 @@ def build_working_dir(region,part_id,is_netherite): else: build_dir += f"azure-{region}-{part_id}" az_functions_path=f"{build_dir}/{user_workflow_name}" + if not os.path.exists(az_functions_path): os.makedirs(az_functions_path) @@ -240,7 +241,7 @@ def generate_function_id(f_id): def copy_all_dirs(fn_dir_path,fin_func_dir): - + dirs = os.listdir(fn_dir_path) for dir in dirs: @@ -354,8 +355,10 @@ def build(user_dir, dag_definition_file, region, part_id,is_netherite): USER_DIR = user_dir DAG_DEFINITION_FILE = dag_definition_file init_paths() + build_working_dir(region,part_id,is_netherite) user_fns_data, user_app_name = get_user_workflow_details() + ingress_queue_name, app_name = generate_app_name_and_populate_and_get_ingress_queue_name(user_app_name,region,part_id,is_netherite) build_user_fn_dirs(user_fns_data) copy_meta_files(user_fns_data,ingress_queue_name,app_name,is_netherite) diff --git a/serwo/scripts/azure/azure_deploy.py b/serwo/scripts/azure/azure_deploy.py index cf7ba43..236494a 100644 --- a/serwo/scripts/azure/azure_deploy.py +++ b/serwo/scripts/azure/azure_deploy.py @@ -20,7 +20,7 @@ def init_paths(user_workflow_dir, region , part_id,is_netherite): resources_path = f'{user_workflow_dir}/build/workflow/resources/azure-{region}-{part_id}.json' runtime = 'python' functions_version = 4 - runtime_version = 3.9 + runtime_version = 3.12 os_type = 'linux' diff --git a/serwo/scripts/azure/orchestrator_async_update.py b/serwo/scripts/azure/orchestrator_async_update.py index 5820e42..d8104b4 100644 --- a/serwo/scripts/azure/orchestrator_async_update.py +++ b/serwo/scripts/azure/orchestrator_async_update.py @@ -42,7 +42,10 @@ def codegen(in_var,out_var,func_name): new_code +='\n\tif not body["Poll"] or body["Poll"]==False: ' new_code +='\n\t\tbreak ' new_code +='\n\telse:' - new_code +='\n\t\tdeadline = context.current_utc_datetime + timedelta(seconds=900) ' + new_code +='\n\t\tdelta = 900 ' + new_code +='\n\t\tif "waittime" in body : ' + new_code +='\n\t\t\tdelta = body["waittime"] ' + new_code +='\n\t\tdeadline = context.current_utc_datetime + timedelta(seconds=delta) ' new_code +='\n\t\tyield context.create_timer(deadline)\n\n' return new_code diff --git a/serwo/templates/azure/azure_runner_template.py b/serwo/templates/azure/azure_runner_template.py index b952b06..8f40825 100644 --- a/serwo/templates/azure/azure_runner_template.py +++ b/serwo/templates/azure/azure_runner_template.py @@ -154,8 +154,13 @@ def main(serwoObject, context: az_func.Context) -> str: serwoObject.set_basepath(basepath=basepath) body_before = serwoObject.get_body() input_body_size = objsize.get_deep_size(body_before) + #TODO: Long Message: AWS details preserved in case of long message AND hybrid cloud partioner result + aws_config = body_before.get("aws") if isinstance(body_before, dict) else None serwoObjectResponse = USER_FUNCTION_PLACEHOLDER_function(serwoObject) body_after = serwoObjectResponse.get_body() + if isinstance(body_after, dict) and aws_config is not None: + body_after["aws"] = aws_config + output_body_size = objsize.get_deep_size(body_after) process = psutil.Process(os.getpid()) memory = process.memory_info().rss @@ -170,7 +175,7 @@ def main(serwoObject, context: az_func.Context) -> str: func_json = {func_id: {"start_delta": start_delta, "end_delta": end_delta, "mem_before" : memory_before, "mem_after" : memory_after , "in_payload_bytes" : input_body_size, "out_payload_bytes" : output_body_size}} metadata["functions"].append(func_json) metadata = metadata - body = serwoObjectResponse.get_body() + body = body_after return SerWOObject(body=body, metadata=metadata).to_json() except Exception as e: logging.info("excep= " + str(e)) diff --git a/serwo/templates/azure/json-templates/host-v2.json b/serwo/templates/azure/json-templates/host-v2.json index ad59628..e0731af 100644 --- a/serwo/templates/azure/json-templates/host-v2.json +++ b/serwo/templates/azure/json-templates/host-v2.json @@ -23,10 +23,9 @@ "maxConcurrentOrchestratorFunctions": 8, "storageProvider": { "type" : "Netherite", - "PartitionCount": "32", "StorageConnectionName": "AzureWebJobsStorage", "EventHubsConnectionName": "EventHubsConnection", - "partitionCount": 1 + "partitionCount": 32 } } diff --git a/serwo/templates/azure/predefined-functions/CollectLogs/func.py b/serwo/templates/azure/predefined-functions/CollectLogs/func.py index b0cfd2b..73c38bd 100644 --- a/serwo/templates/azure/predefined-functions/CollectLogs/func.py +++ b/serwo/templates/azure/predefined-functions/CollectLogs/func.py @@ -17,7 +17,7 @@ def user_function(serwoObject) -> SerWOObject: data = serwoObject.get_body() logging.info("Data to push - "+str(data)) metadata = serwoObject.get_metadata() - data = {"body": "success: OK"} + data = {"body": data} fin_dict["data"] = data fin_dict["metadata"] = metadata logging.info("Fin dict - "+str(fin_dict)) diff --git a/serwo/templates/azure/predefined-functions/Orchestrate/__init__.py b/serwo/templates/azure/predefined-functions/Orchestrate/__init__.py index b26c539..ea5cbca 100644 --- a/serwo/templates/azure/predefined-functions/Orchestrate/__init__.py +++ b/serwo/templates/azure/predefined-functions/Orchestrate/__init__.py @@ -159,18 +159,121 @@ def orchestrator_function(context: df.DurableOrchestrationContext): serwoObject = build_serwo_object(inp_dict).to_json() # user dag execution - okok = yield context.call_activity("TaskA", serwoObject) - eywu = [] - ylqi = context.call_activity("TaskB", okok) - eddg = context.call_activity("TaskC", okok) - dgye = context.call_activity("TaskD", okok) - eywu.append(ylqi) - eywu.append(eddg) - eywu.append(dgye) - llbk = yield context.task_all(eywu) - llbk = insert_end_stats_in_metadata(llbk) - iqyk = yield context.call_activity("TaskE", llbk) - return iqyk + # Conditional branching loop + should_continue = True + + while should_continue: + # Inject model_name for Planner + import json + serwoObject_dict = json.loads(serwoObject) + if 'body' in serwoObject_dict: + serwoObject_dict['body']['model_name'] = 'openai:gpt-4o-mini' + else: + # Raw input - add model_name at top level + serwoObject_dict['model_name'] = 'openai:gpt-4o-mini' + serwoObject = json.dumps(serwoObject_dict) + oshm = yield context.call_activity("Planner", serwoObject) + # Inject model_name for Actor + import json + oshm_dict = json.loads(oshm) + if 'body' in oshm_dict: + body = oshm_dict['body'] + if isinstance(body, str): + body = json.loads(body) + body['model_name'] = 'openai:gpt-4o-mini' + oshm_dict['body'] = json.dumps(body) + else: + body['model_name'] = 'openai:gpt-4o-mini' + oshm_dict['body'] = body + elif '_body' in oshm_dict: + body = oshm_dict['_body'] + if isinstance(body, str): + body = json.loads(body) + body['model_name'] = 'openai:gpt-4o-mini' + oshm_dict['_body'] = json.dumps(body) + else: + body['model_name'] = 'openai:gpt-4o-mini' + oshm_dict['_body'] = body + else: + oshm_dict['model_name'] = 'openai:gpt-4o-mini' + oshm = json.dumps(oshm_dict) + jzsy = yield context.call_activity("Actor", oshm) + # Inject model_name for Evaluator + import json + jzsy_dict = json.loads(jzsy) + if 'body' in jzsy_dict: + body = jzsy_dict['body'] + if isinstance(body, str): + body = json.loads(body) + body['model_name'] = 'openai:gpt-4o-mini' + jzsy_dict['body'] = json.dumps(body) + else: + body['model_name'] = 'openai:gpt-4o-mini' + jzsy_dict['body'] = body + elif '_body' in jzsy_dict: + body = jzsy_dict['_body'] + if isinstance(body, str): + body = json.loads(body) + body['model_name'] = 'openai:gpt-4o-mini' + jzsy_dict['_body'] = json.dumps(body) + else: + body['model_name'] = 'openai:gpt-4o-mini' + jzsy_dict['_body'] = body + else: + jzsy_dict['model_name'] = 'openai:gpt-4o-mini' + jzsy = json.dumps(jzsy_dict) + rveh = yield context.call_activity("Evaluator", jzsy) + # Inject model_name for CollectLogs + rveh_dict = json.loads(rveh) + if 'body' in rveh_dict: + body = rveh_dict['body'] + if isinstance(body, str): + body = json.loads(body) + body['model_name'] = 'openai:gpt-4o-mini' + rveh_dict['body'] = json.dumps(body) + else: + body['model_name'] = 'openai:gpt-4o-mini' + rveh_dict['body'] = body + elif '_body' in rveh_dict: + body = rveh_dict['_body'] + if isinstance(body, str): + body = json.loads(body) + body['model_name'] = 'openai:gpt-4o-mini' + rveh_dict['_body'] = json.dumps(body) + else: + body['model_name'] = 'openai:gpt-4o-mini' + rveh_dict['_body'] = body + else: + rveh_dict['model_name'] = 'openai:gpt-4o-mini' + rveh = json.dumps(rveh_dict) + rveh = insert_end_stats_in_metadata(rveh) + ktpc = yield context.call_activity("CollectLogs", rveh) + + # Check conditional branching condition + import json + try: + result_dict = json.loads(ktpc) + if '_body' in result_dict: + result_body = result_dict['_body'] + else: + result_body = result_dict.get('body', {}) + + should_continue = result_body.get( + 'body.needs_retry', False) == True + + # Check iteration limit from user input + current_iter = result_body.get('iteration_count', 0) + max_iter = result_body.get('max_iterations', 1) + if current_iter >= max_iter: + should_continue = False + + except Exception as e: + should_continue = False + + if not should_continue: + break + + return ktpc main = df.Orchestrator.create(orchestrator_function) diff --git a/serwo/templates/azure/predefined-functions/QueueTrigger/__init__.py b/serwo/templates/azure/predefined-functions/QueueTrigger/__init__.py index d59a417..cb02105 100644 --- a/serwo/templates/azure/predefined-functions/QueueTrigger/__init__.py +++ b/serwo/templates/azure/predefined-functions/QueueTrigger/__init__.py @@ -5,6 +5,7 @@ from time import time import azure.durable_functions as df + def get_delta(start_time): curr_time = int(time() * 1000) return (curr_time-start_time) @@ -12,14 +13,28 @@ def get_delta(start_time): func_id = 253 -app_name = 'xfaasUserWf0000774398' +app_name = 'xfaasUserWf0000882849' async def main(msg: func.QueueMessage,starter: str) -> None: logging.info('Python queue trigger function processed a queue item: %s', msg.get_body().decode('utf-8')) URL = f'https://{app_name}.azurewebsites.net/api/orchestrators/Orchestrate' - metadata = json.loads(msg.get_body().decode('utf-8'))['metadata'] - body = json.loads(msg.get_body().decode('utf-8'))['body'] + msg_dict = json.loads(msg.get_body().decode('utf-8')) + + # If the message is a pointer to a large payload, fetch it from Blob + if msg_dict.get('large_payload') == 1 and "azure" in msg_dict: + blob_url = msg_dict["azure"]["blob_url"] + resp = requests.get(blob_url) + if resp.status_code == 200: + payload = json.loads(resp.content) + body = payload.get("body") + metadata = payload.get("metadata") + else: + raise Exception(f"Could not retrieve blob from {blob_url} - status code: {resp.status_code}") + + else: + metadata = msg_dict['metadata'] + body = msg_dict['body'] start_delta = get_delta(metadata['workflow_start_time']) end_delta = get_delta(metadata['workflow_start_time']) diff --git a/serwo/templates/azure/predefined-functions/QueueTrigger/queue_trigger_runner_template.py b/serwo/templates/azure/predefined-functions/QueueTrigger/queue_trigger_runner_template.py index 7cdc03f..e65c52e 100644 --- a/serwo/templates/azure/predefined-functions/QueueTrigger/queue_trigger_runner_template.py +++ b/serwo/templates/azure/predefined-functions/QueueTrigger/queue_trigger_runner_template.py @@ -5,6 +5,7 @@ from time import time import azure.durable_functions as df + def get_delta(start_time): curr_time = int(time() * 1000) return (curr_time-start_time) @@ -18,8 +19,22 @@ def get_delta(start_time): async def main(msg: func.QueueMessage,starter: str) -> None: logging.info('Python queue trigger function processed a queue item: %s', msg.get_body().decode('utf-8')) URL = f'https://{app_name}.azurewebsites.net/api/orchestrators/Orchestrate' - metadata = json.loads(msg.get_body().decode('utf-8'))['metadata'] - body = json.loads(msg.get_body().decode('utf-8'))['body'] + msg_dict = json.loads(msg.get_body().decode('utf-8')) + + # If the message is a pointer to a large payload, fetch it from Blob + if msg_dict.get('large_payload') == 1 and "azure" in msg_dict: + blob_url = msg_dict["azure"]["blob_url"] + resp = requests.get(blob_url) + if resp.status_code == 200: + payload = json.loads(resp.content) + body = payload.get("body") + metadata = payload.get("metadata") + else: + raise Exception(f"Could not retrieve blob from {blob_url} - status code: {resp.status_code}") + + else: + metadata = msg_dict['metadata'] + body = msg_dict['body'] start_delta = get_delta(metadata['workflow_start_time']) end_delta = get_delta(metadata['workflow_start_time']) diff --git a/serwo/templates/azure/push-to-storage-queue-template/PushToStorageQueue/push_to_azure_q.py b/serwo/templates/azure/push-to-storage-queue-template/PushToStorageQueue/push_to_azure_q.py index f833e03..e507499 100644 --- a/serwo/templates/azure/push-to-storage-queue-template/PushToStorageQueue/push_to_azure_q.py +++ b/serwo/templates/azure/push-to-storage-queue-template/PushToStorageQueue/push_to_azure_q.py @@ -1,7 +1,11 @@ from azure.storage.queue import QueueService, QueueMessageFormat +from azure.storage.blob import BlobServiceClient import json from python.src.utils.classes.commons.serwo_objects import SerWOObject import os, uuid +from azure.storage.blob import generate_blob_sas, BlobSasPermissions +from datetime import datetime, timedelta, timezone +from azure.core.exceptions import ResourceExistsError connect_str = "{{connection_string}}" queue_service = QueueService(connection_string=connect_str) @@ -10,15 +14,75 @@ queue_service.encode_function = QueueMessageFormat.binary_base64encode queue_service.decode_function = QueueMessageFormat.binary_base64decode +MAX_QUEUE_MSG_SIZE = 64 * 1024 + +def parse_connection_string(conn_str): + tokens = dict(x.split('=', 1) for x in conn_str.strip().split(';') if '=' in x) + #tokens is a dict with all the values in the connections string like DefaultEndpointsProtocol, AccountName, AccountKey, BlobEndpoint, .. etc + account_name = tokens.get('AccountName') + account_key = tokens.get('AccountKey') + return account_name, account_key + +def upload_to_blob(payload, container): + blob_service = BlobServiceClient.from_connection_string(connect_str) + container_client = blob_service.get_container_client(container) + if not container_client.exists(): + try: + container_client.create_container() + except ResourceExistsError: + print("Container already exists, probably due to a parallel workflow") + pass + blob_name = f"{uuid.uuid4()}.json" + blob_client = blob_service.get_blob_client(container=container, blob=blob_name) + data_bytes = json.dumps(payload).encode("utf-8") + blob_client.upload_blob(data_bytes, overwrite=True) + return blob_name + def user_function(serwoObject) -> SerWOObject: try: fin_dict = dict() data = serwoObject.get_body() metadata = serwoObject.get_metadata() - fin_dict["body"] = data + fin_dict["body"] = data fin_dict["metadata"] = metadata - queue_service.put_message(queue_name, json.dumps(fin_dict).encode("utf-8")) + #TODO: + if len(json.dumps(fin_dict).encode("utf-8")) > MAX_QUEUE_MSG_SIZE: + blob_details = data.get(data.get("csp")) + container_name = blob_details.get("container") if isinstance(blob_details, dict) else None + if not container_name: + container_name = "xfaas-large-payload-container" + + blob_name = upload_to_blob(fin_dict, container_name) + + account_name, account_key = parse_connection_string(connect_str) + sas_token = generate_blob_sas( + account_name=account_name, + container_name=container_name, + blob_name=blob_name, + account_key=account_key, + permission=BlobSasPermissions(read=True), + expiry=datetime.now(timezone.utc) + timedelta(hours=1) + ) + blob_url_with_sas = f"https://{account_name}.blob.core.windows.net/{container_name}/{blob_name}?{sas_token}" + + + + payload_data = { + + "large_payload": 1, + "azure": { + "container": container_name, + "blob_name": blob_name, + "blob_url": blob_url_with_sas + + } + } + queue_service.put_message(queue_name, json.dumps(payload_data).encode("utf-8")) + + + else: + queue_service.put_message(queue_name, json.dumps(fin_dict).encode("utf-8")) return SerWOObject(body=data) except Exception as e: print(e) diff --git a/serwo/templates/azure/push-to-storage-queue-template/PushToStorageQueue/requirements.txt b/serwo/templates/azure/push-to-storage-queue-template/PushToStorageQueue/requirements.txt index 74c8949..b808198 100644 --- a/serwo/templates/azure/push-to-storage-queue-template/PushToStorageQueue/requirements.txt +++ b/serwo/templates/azure/push-to-storage-queue-template/PushToStorageQueue/requirements.txt @@ -1,2 +1,3 @@ azure-storage-queue==2.1.0 -psutil \ No newline at end of file +psutil +azure-storage-blob==12.19.1 \ No newline at end of file diff --git a/serwo/xfaas_benchmark.py b/serwo/xfaas_benchmark.py index 00bfc93..f177393 100644 --- a/serwo/xfaas_benchmark.py +++ b/serwo/xfaas_benchmark.py @@ -4,9 +4,70 @@ import sys -def populate_benchmarks_for_user_dag(user_dag, user_pinned_nodes, benchmark_path, valid_partition_points, cloud_ids): - bm_data, edges, latency_map, user_dag_copy = init_benchmark_populator( - benchmark_path, user_dag, cloud_ids) +def evaluate_sub_dag(csp_id, sub_dag, user_pinned_nodes): + nodes = list(sub_dag.nodes) + + constraints_violated = evaluate_node_and_edge_constraints(csp_id, sub_dag, user_pinned_nodes) + + if constraints_violated: + return sys.maxsize + + else: + if len(nodes) == 1: + return sub_dag.nodes[nodes[0]]['NodeBenchmark'][csp_id]['Latency'] + else: + sub_dag_latency = calculate_sub_dag_latency(csp_id, nodes, sub_dag) + + return sub_dag_latency + + +def calculate_sub_dag_latency(csp_id, nodes, sub_dag): + for node in nodes: + successors = sub_dag.successors(node) + for succ in successors: + edge_latency = sub_dag.edges[(node, succ)]['EdgeBenchmarks']['Latencies'][int(csp_id)][int(csp_id)] + node_latency = sub_dag.nodes[node]['NodeBenchmark'][csp_id]['Latency'] + val = edge_latency + node_latency + sub_dag.edges[(node, succ)]['edge_latency'] = -1 * val + sources = [] + sink = '' + for nd in nodes: + if sub_dag.in_degree(nd) == 0: + sources.append(nd) + if sub_dag.out_degree(nd) == 0: + sink = nd + max_latency = -1 + for src in sources: + critical_path = nx.shortest_path(sub_dag, src, sink, weight="edge_latency") + path_latency = 0 + for index in range(0, len(critical_path) - 1): + path_latency += sub_dag[critical_path[index]][critical_path[index + 1]]["edge_latency"] + + path_latency = path_latency * -1 + max_latency = max(path_latency, max_latency) + + max_latency += sub_dag.nodes[sink]['NodeBenchmark'][csp_id]['Latency'] + return max_latency + + +def evaluate_node_and_edge_constraints(csp_id, sub_dag, user_pinned_nodes): + flag = False + for nd in sub_dag.nodes: + if nd in user_pinned_nodes: + pinned_csp = user_pinned_nodes[nd] + if pinned_csp != csp_id: + flag = True + break + #Added Large Payload Support + # for ed in sub_dag.edges: + # if sub_dag.edges[ed]['EdgeBenchmark']['DataTransferSize'] > 256 and csp_id == '0': + # flag = True + # break + return flag + + +def populate_benchmarks_for_user_dag(user_dag,user_pinned_nodes,benchmark_path,valid_partition_points,cloud_ids): + bm_data, edges, latency_map, user_dag_copy = init_benchmark_populator(benchmark_path, user_dag,cloud_ids) latency_benchmark = populate_latanecy_benchmarks(bm_data, cloud_ids, edges, latency_map, user_dag_copy, user_pinned_nodes, valid_partition_points) @@ -32,6 +93,28 @@ def init_benchmark_populator(benchmark_path, user_dag, cloud_ids): return bm_data, edges, latency_map, user_dag_copy +def populate_is_fan_in(user_dag_copy, valid_partition_points): + is_fan_in = [] + for nd in valid_partition_points: + predecessors = list(user_dag_copy.predecessors(nd['node_id'])) + if len(predecessors) > 1: + is_fan_in.append(True) + else: + is_fan_in.append(False) + return is_fan_in + + +def populate_inter_cloud_data_transfers(user_dag_copy, valid_partition_points): + inter_cloud_data_transfer = [] + for nd in valid_partition_points: + successors = list(user_dag_copy.successors(nd['node_id'])) + for sc in successors: + inter_cloud_data_transfer.append( + user_dag_copy.edges[(nd['node_id'], sc)]['EdgeBenchmarks']['DataTransferSize']) + break + return inter_cloud_data_transfer + + def populate_latanecy_benchmarks(bm_data, cloud_ids, edges, latency_map, user_dag_copy, user_pinned_nodes, valid_partition_points): edges_data = dict() @@ -42,7 +125,7 @@ def populate_latanecy_benchmarks(bm_data, cloud_ids, edges, latency_map, user_da edges_data[(src, neighbors) ] = bm_data['EdgeBenchmarks'][src][neighbors] for ed in edges: - user_dag_copy.edges[ed]['EdgeBenchmark'] = edges_data[ed] + user_dag_copy.edges[ed]['EdgeBenchmarks'] = edges_data[ed] top_sort = list(nx.topological_sort(user_dag_copy)) for i in range(0, len(valid_partition_points)): sub_nodes = [] @@ -143,7 +226,7 @@ def calculate_sub_dag_latency(csp_id, nodes, sub_dag): def populate_data_transfer_benchmarks(cloud_ids, edges, user_dag_copy): data_transfer_benchmark = [] for ed in edges: - edge_bm = user_dag_copy.edges[ed]['EdgeBenchmark']['Latencies'] + edge_bm = user_dag_copy.edges[ed]['EdgeBenchmarks']['Latencies'] for csp_id_outer in cloud_ids: temp = [] for csp_id_inner in cloud_ids: diff --git a/serwo/xfaas_debugger_donot_commit.py b/serwo/xfaas_debugger_donot_commit.py new file mode 100644 index 0000000..c4bccdc --- /dev/null +++ b/serwo/xfaas_debugger_donot_commit.py @@ -0,0 +1,23 @@ +from xfbench_plotter import XFBenchPlotter + + +def plot_metrics(user_wf_dir, wf_deployment_id, run_id, wf_name,region): + breakpoint() + format = 'pdf' + plotter = XFBenchPlotter(user_wf_dir, wf_deployment_id, run_id,format) + plotter.plot_e2e_timeline(xticks=[], yticks=[],is_overlay=True,region=region) + figwidth = 7 + if wf_name == 'fileProcessing' or wf_name == 'math': + figwidth = 20 + plotter.plot_stagewise( yticks=[],figwidth=figwidth) + plotter.plot_cumm_e2e(yticks=[]) + # plotter.plot_e2e_invocations_wnwo_containers(csp="azure", yticks=[]) + + +if __name__ == "__main__": + user_wf_dir = "/Users/vaibhavjha/Documents/IISc/xfbench_multicloud/XFaaS/serwo/examples/static-qfanout-simulator-aws/workflow-gen" + wf_deployment_id = "random-d51-static" + run_id = "exp_1" + wf_name = "staticfanout" + region = "ap-south-1" + plot_metrics(user_wf_dir, wf_deployment_id, run_id, wf_name, region) \ No newline at end of file diff --git a/serwo/xfaas_main.py b/serwo/xfaas_main.py index 7da74af..a1c6450 100644 --- a/serwo/xfaas_main.py +++ b/serwo/xfaas_main.py @@ -45,14 +45,15 @@ project_dir = pathlib.Path(__file__).parent.resolve() -args = parser.parse_args() +args, unkown = parser.parse_known_args() +print("Passed arguments to xfaas_main:",args) is_containerbasedaws = bool(int(args.is_containerbasedaws)) USER_DIR = args.wf_user_directory -DAG_DEFINITION_FILE = args.dag_filename +DAG_DEFINITION_FILE = args.dag_filename DAG_DEFINITION_PATH = f"{USER_DIR}/{DAG_DEFINITION_FILE}" -BENCHMARK_FILE = args.dag_benchmark +BENCHMARK_FILE = args.dag_benchmark benchmark_path = f'{USER_DIR}/{BENCHMARK_FILE}' csp = args.csp region = args.region @@ -64,6 +65,8 @@ def get_user_pinned_nodes(): config = json.loads( open(f'{project_dir}/config/xfaas_user_config.json', 'r').read()) if "user_pinned_nodes" in config: + print("User Pinned Nodes Detected") + print(config['user_pinned_nodes']) return config['user_pinned_nodes'] else: return None @@ -213,6 +216,10 @@ def swap(a, b): def generate_new_dags(partition_config, xfaas_user_dag, user_wf_dir, dag_definition_path): + # Load original DAG config to preserve ConditionalBranches + with open(dag_definition_path, 'r') as file: + original_dag_config = json.load(file) + src_node = None sink_node = None nx_dag = xfaas_user_dag.get_dag() @@ -243,7 +250,7 @@ def generate_new_dags(partition_config, xfaas_user_dag, user_wf_dir, dag_definit nodes_in_between = [] for i in range(st_ind, en_ind+1): nodes_in_between.append(top_sort_nodes[i]) - + subdag = nx_dag.subgraph(nodes_in_between) dagg = {} part_id = partition_config[0].get_part_id() @@ -257,7 +264,8 @@ def generate_new_dags(partition_config, xfaas_user_dag, user_wf_dir, dag_definit csp = partition_config[0].get_left_csp().get_name() region = partition_config[0].get_region() - write_dag_for_partition(user_wf_dir, dagg, part_id, csp, region) + write_dag_for_partition( user_wf_dir, dagg,part_id,csp,region, original_dag_config) + for i in range(1, len(partition_config)): @@ -330,11 +338,10 @@ def generate_new_dags(partition_config, xfaas_user_dag, user_wf_dir, dag_definit csp = partition_config[i].get_left_csp().get_name() region = partition_config[i].get_region() + + write_dag_for_partition( user_wf_dir, dagg,part_id,csp,region, original_dag_config) - write_dag_for_partition(user_wf_dir, dagg, part_id, csp, region) - - -def write_dag_for_partition(user_wf_dir, dagg, part_id, csp, region): +def write_dag_for_partition(user_wf_dir, dagg, part_id, csp, region, original_dag_config=None): out_dag_name = "dag.json" directory = f'{user_wf_dir}/partitions/{csp}-{region}-{part_id}' if not os.path.exists(directory): @@ -342,6 +349,11 @@ def write_dag_for_partition(user_wf_dir, dagg, part_id, csp, region): else: shutil.rmtree(directory) os.makedirs(directory) + + # Add ConditionalBranches if present in original DAG + if original_dag_config and "ConditionalBranches" in original_dag_config: + dagg["ConditionalBranches"] = original_dag_config["ConditionalBranches"] + with open(f'{directory}/{out_dag_name}', 'w') as file: file.write(json.dumps(dagg, indent=4)) @@ -407,6 +419,7 @@ def run(user_wf_dir, dag_definition_file, benchmark_file, csp, region): with open(f'{user_wf_dir}/partitions/part-details.json', 'w') as json_file: json.dump(part_details, json_file, indent=4) + wf_id = xfaas_provenance.push_user_dag(dag_definition_path) last_partition = partition_config[-1] @@ -434,7 +447,7 @@ def run(user_wf_dir, dag_definition_file, benchmark_file, csp, region): xfaas_provenance.generate_provenance_artifacts( user_wf_dir, wf_id, refactored_wf_id, wf_deployment_id, csp, region, part_id, queue_details) - return '', '', '' + #return '', '', '' return wf_id, refactored_wf_id, wf_deployment_id diff --git a/serwo/xfaas_optimizer.py b/serwo/xfaas_optimizer.py index c4a7ff6..6d9dcc3 100644 --- a/serwo/xfaas_optimizer.py +++ b/serwo/xfaas_optimizer.py @@ -37,7 +37,16 @@ def translate(clouds, cloud_dictionary, valid_partition_points, user_dag): i = 0 part_id = "0000" kk = 0 - while (i < len(clouds)): + #Adding a condition for pinned Single Node workflows: + if (len(valid_partition_points) == 1 and len(set(clouds)) == 1 and len(user_dag.get_dag().edges) == 0): + function_name = (user_dag.get_dag()).nodes[valid_partition_points[0]['node_id']]['NodeName'] + out_degree = valid_partition_points[0]['out_degree'] + csp = CSP(cloud_dictionary[str(clouds[0])]['csp']) + region = cloud_dictionary[str(clouds[0])]['region'] + partition_point = PartitionPoint(function_name, out_degree, csp, None, part_id, region) + return [partition_point] + + while(i < len(clouds)): j = i while j < len(clouds) and clouds[j] == clouds[i]: j += 1 diff --git a/serwo/xfaas_resource_generator.py b/serwo/xfaas_resource_generator.py index 50f8cc9..1bcbdda 100644 --- a/serwo/xfaas_resource_generator.py +++ b/serwo/xfaas_resource_generator.py @@ -6,7 +6,7 @@ import os def generate(user_dir, partition_config, dag_definition_file): partition_config = list(reversed(partition_config)) - + for i in range(len(partition_config)): if i==0: csp = partition_config[i].get_left_csp().get_name() @@ -24,6 +24,7 @@ def generate(user_dir, partition_config, dag_definition_file): dag_definition_path = f"{updated_user_dir}/{dag_definition_file}" else: + downstream_csp = partition_config[i-1].get_left_csp().get_name() downstream_region = partition_config[i-1].get_region() downstream_part_id = partition_config[i-1].get_part_id() @@ -36,6 +37,13 @@ def generate(user_dir, partition_config, dag_definition_file): region = partition_config[i].get_region() part_id = partition_config[i].get_part_id() updated_user_dir = f"{user_dir}/partitions/{csp}-{region}-{part_id}" + csp_temp = csp.split('_') + main_csp = csp + csp = csp_temp[0] + if len(csp_temp) > 1: + is_netherite = True + else: + is_netherite = False dag_path = f"{updated_user_dir}/{dag_definition_file}" with open(dag_path, "r") as dag_file: dag_from_file = json.load(dag_file) @@ -84,7 +92,7 @@ def generate(user_dir, partition_config, dag_definition_file): "queue_name": queue_name, "connection_string": connection_string, } - + root_dir = os.path.dirname(os.path.abspath(__file__)) template_dir = f"{root_dir}/templates/azure/push-to-storage-queue-template/{function_name}" output_path = f"{updated_user_dir}/" os.system(f"cp -r {template_dir} {output_path}") diff --git a/serwo/xfaas_run_benchmark.py b/serwo/xfaas_run_benchmark.py index bee5dbb..0772d72 100644 --- a/serwo/xfaas_run_benchmark.py +++ b/serwo/xfaas_run_benchmark.py @@ -48,7 +48,7 @@ def get_client_login_details(config_path): data = json.load(f) data = data[args.client_key] server_ip = data['server_ip'] - server_user_id = data['server_user_id'] + server_user_id = data['server_user_id'] if 'server_pem_file_path' in data: server_pem_file_path = data['server_pem_file_path'] @@ -71,12 +71,13 @@ def get_azure_payload(payload): def read_dynamism_file(dynamism,duration, max_rps): file_path = os.getenv("XFBENCH_DIR") + f"/workloads/{dynamism}-{max_rps}-{duration}.csv" + breakpoint() with open(file_path) as f: data = f.readlines() data = [x.strip() for x in data] final_data = [] for d in data: - vals = [float(x) for x in d.split(",") if x != "" ] # '''and 'KB' not in x''' + vals = [float(x) for x in d.split(",") if x != "" and 'KB' not in x ] # '''and 'KB' not in x''' # size = d.split(",")[-1] final_data.append((vals[0],vals[1])) @@ -287,6 +288,7 @@ def generate_shell_script_and_scp(csp,payload_size, wf_name, rps, duration,dynam print(f'output_path: {output_path}') os.system(f"chmod +x {output_path}") os.system(f"/{output_path}") + #os.system(f"./{output_path}") use when you don't have full path def load_payload(wf_user_directory,payload_size): payload_path = f"{wf_user_directory}/samples/{payload_size}/input/input.json" @@ -327,10 +329,20 @@ def run_workload(csp,region,part_id,max_rps,duration,payload_size,dynamism,wf_na duration = d[0] rps = d[1] # payload_size = d[2] - # payload = load_payload(wf_user_directory,payload_size) ne_session_id = session_id + str(i) + # payload = load_payload(wf_user_directory,payload_size) + #breakpoint() + if dynamism == 'slow': # Or match your exact dynamism string + duration_segment = 600.0 # Force ThreadGroup.duration=600s (10 min alive) + target_throughput = 1.0 # Force ConstantThroughputTimer throughput=1.0 (1 sample/minute) + # Pass target_throughput instead of rps * 60.0 to make_jmx_file + make_jmx_file(csp, target_throughput, duration_segment, payload_size, wf_name, execute_url, state_machine_arn, dynamism, ne_session_id, wf_user_directory, part_id, region, wf_deployment_id, run_id, payload, is_localhost) + else: + # Original logic for other experiments + make_jmx_file(csp, rps * 60.0, duration, payload_size, wf_name, execute_url, state_machine_arn, dynamism, ne_session_id, wf_user_directory, part_id, region, wf_deployment_id, run_id, payload, is_localhost) + - make_jmx_file(csp, rps * 60.0, duration, payload_size, wf_name, execute_url,state_machine_arn, dynamism, ne_session_id, wf_user_directory, part_id, region , wf_deployment_id, run_id,payload,is_localhost) + #make_jmx_file(csp, rps * 60.0, duration, payload_size, wf_name, execute_url,state_machine_arn, dynamism, ne_session_id, wf_user_directory, part_id, region , wf_deployment_id, run_id,payload, is_localhost) i += 1 generate_shell_script_and_scp(csp,payload_size, wf_name, max_rps, duration,dynamism,region,is_localhost) @@ -468,6 +480,7 @@ def local_teardown(wf_user_directory): wf_user_directory = os.getenv("XFBENCH_DIR") + f"/workflows/singleton_workflows/{function_class}/{function_name}" else: build_workflow(wf_user_directory) + breakpoint() wf_user_directory += "/workflow-gen" @@ -475,7 +488,6 @@ def local_teardown(wf_user_directory): print('==================DEPLOYING WF===========================') wf_id, refactored_wf_id, wf_deployment_id = deploy_workflow(wf_user_directory,dag_filename, region,csp) - # wf_deployment_id = args.wf_deployment_id ## write deployment id to a file create if not exists else append deps = [] xfaas_dir = os.getenv('XFAAS_DIR') diff --git a/serwo/xfbench_plotter.py b/serwo/xfbench_plotter.py index 502e6cf..4656da4 100644 --- a/serwo/xfbench_plotter.py +++ b/serwo/xfbench_plotter.py @@ -35,6 +35,7 @@ class XFBenchPlotter: plt.rcParams['legend.fontsize'] = 20 def __init__(self, workflow_directory: str, workflow_deployment_id: str, run_id: str, format: str): + breakpoint() self.__workflow_directory = workflow_directory self.__workflow_deployment_id = workflow_deployment_id self.__run_id = run_id @@ -142,15 +143,17 @@ def __get_outfile_prefix(self): def __create_dynamo_db_items(self): + breakpoint() print("Creating DynamoDB items") dynamodb_item_list = [] queue = QueueClient.from_connection_string(conn_str=self.__conn_str, queue_name=self.__queue_name) - response = queue.receive_messages(visibility_timeout=3000) + response = queue.receive_messages(visibility_timeout=180) print('Reading Queue') for message in response: queue_item = json.loads(message.content) metadata = queue_item["metadata"] + data = queue_item["data"] # Filtering based on workflow deployment id during creation itself if metadata["deployment_id"].strip() == self.__workflow_deployment_id: @@ -164,7 +167,7 @@ def __create_dynamo_db_items(self): dynamo_item["invocation_start_time_ms"] = str( metadata["workflow_start_time"] ) - + dynamo_item["data"] = data # add session id to dynamo db dynamo_item["session_id"] = str(metadata["session_id"])