From 6007dfd8296e1da24d5ddcd184af5700fcc4d02b Mon Sep 17 00:00:00 2001 From: Vaibhav Jha Date: Sun, 21 Dec 2025 18:56:56 +0530 Subject: [PATCH 1/3] additions to xfaas --- .gitignore | 4 +- Untitled-1 | 99 ++++++++++ dag-benchmark-generator-with-datatransfer.py | 128 +++++++++++++ dag-benchmark-v2.json | 179 ++++++++++++++++++ dag-benchmark_old.json | 179 ++++++++++++++++++ dag-intercloud-latency-benchmark-generator.py | 42 ++++ serwo/aws_create_statemachine.py | 100 +++++++++- serwo/config/cloud_dictionary.json | 2 +- serwo/config/xfaas_user_config.json | 6 +- serwo/deploy/template.yaml | 2 +- serwo/dp_xfaas_partitioner.py | 13 +- .../PushToSQS/push_to_aws_q.py | 63 +++++- .../apigw-awsstepfunctions.yaml | 16 +- .../aws/yaml-templates/awsstepfunctions.yaml | 2 +- .../yaml-templates/sqs-awsstepfunctions.yaml | 54 +++++- .../runner-templates/aws/runner_template.py | 92 +++++++-- .../aws/runner_template_bkp.py | 151 +++++++++++++++ .../python/src/utils/classes/aws/user_dag.py | 10 + .../src/utils/classes/azure/user_dag.py | 89 ++++++++- .../utils/classes/commons/serwo_objects.py | 21 +- .../utils/classes/commons/serwo_user_dag.py | 7 + serwo/scripts/azure/azure_builder.py | 6 +- serwo/scripts/azure/azure_deploy.py | 2 +- .../scripts/azure/azure_resource_generator.py | 2 +- .../azure/orchestrator_async_update.py | 5 +- .../templates/azure/azure_runner_template.py | 7 +- .../azure/json-templates/host-v2.json | 3 +- .../predefined-functions/CollectLogs/func.py | 2 +- .../Orchestrate/__init__.py | 48 +++-- .../QueueTrigger/__init__.py | 21 +- .../queue_trigger_runner_template.py | 19 +- .../PushToStorageQueue/push_to_azure_q.py | 68 ++++++- .../PushToStorageQueue/requirements.txt | 3 +- serwo/xfaas_benchmark.py | 17 +- serwo/xfaas_debugger_donot_commit.py | 23 +++ serwo/xfaas_main.py | 80 ++++---- serwo/xfaas_optimizer.py | 10 + serwo/xfaas_resource_generator.py | 15 +- serwo/xfaas_run_benchmark.py | 25 ++- serwo/xfbench_plotter.py | 4 +- setup-xfaas.sh | 6 +- 41 files changed, 1498 insertions(+), 127 deletions(-) create mode 100644 Untitled-1 create mode 100644 dag-benchmark-generator-with-datatransfer.py create mode 100644 dag-benchmark-v2.json create mode 100644 dag-benchmark_old.json create mode 100644 dag-intercloud-latency-benchmark-generator.py create mode 100644 serwo/python/src/runner-templates/aws/runner_template_bkp.py create mode 100644 serwo/xfaas_debugger_donot_commit.py diff --git a/.gitignore b/.gitignore index a94c5af..e3cb449 100644 --- a/.gitignore +++ b/.gitignore @@ -45,4 +45,6 @@ serwo/templates/azure/predefined-functions/Orchestrate/__init__.py serwo/templates/azure/predefined-functions/QueueTrigger/__init__.py serwo/benchmark_resources/aws-az-mmc.jmx serwo/benchmark_resources/az-aws-mmc.jmx -*.tgz \ No newline at end of file +apache-jmeter-5.5/ +*.tgz +*.sh \ No newline at end of file diff --git a/Untitled-1 b/Untitled-1 new file mode 100644 index 0000000..069bde4 --- /dev/null +++ b/Untitled-1 @@ -0,0 +1,99 @@ +Export variables for XFaas conda environment: + export AZURE_SUBSCRIPTION_ID=9dc7b503-5f49-4742-8f12-07fad514c633 + export XFAAS_DIR=/Users/vaibhavjha/Documents/IISc/xfbench_multicloud/XFaaS + export XFAAS_WF_DIR=/Users/vaibhavjha/Documents/IISc/xfbench_multicloud/XFBench + export XFBENCH_DIR=/Users/vaibhavjha/Documents/IISc/xfbench_multicloud/XFBench + +Running XFaaS main: + + python xfaas_main.py --wf-user-directory /Users/vaibhavjha/Documents/IISc/xfbench_multicloud/XFaaS/serwo/examples/static-qfanout-simulator-aws-optimized --dag-file-name dag.json --dag-benchmark dag-benchmark.json --csp aws  --region ap-south-1 + + python xfaas_main.py --wf-user-directory /Users/vaibhavjha/Documents/IISc/xfbench_multicloud/XFaaS/serwo/examples/static-qfanout-hardware-aws  --dag-file-name dag.json --dag-benchmark dag-benchmark.json --csp aws  --region ap-south-1 + + python3 xfaas_main.py --wf-user-directory /Users/vaibhavjha/Documents/IISc/xfbench_multicloud/qxfaas/serwo/examples/qmap-az --dag-file-name dag.json --dag-benchmark dag-benchmark.json --csp azure --region centralindia + + /Users/vaibhavjha/Documents/IISc/xfbench_multicloud/XFaaS/serwo/examples/static-qfanout-hardware-aws  + /Users/vaibhavjha/Documents/IISc/xfbench_multicloud/XFaaS/serwo/examples/static-qfanout-hardware-aws + +Running XFaaS benchmark: + python3 xfaas_run_benchmark.py --csp azure --region eastus --max-rps 1 --duration 300 --payload-size medium --dynamism static --wf-name grid --wf-user-directory /Users/vaibhavjha/Documents/IISc/xfbench_multicloud/XFBench/workflows/custom_workflows/smart_grid_wf --path-to-client-config /Users/vaibhavjha/Documents/IISc/xfbench_multicloud/XFaaS/serwo/config/client_config.json --dag-file-name dag.json --teardown-flag 0 --client-key localhost + + python3 bin/serwo/xfbench_run.py --csp azure --region eastus --max-rps 1 --duration 300 --payload-size random --dynamism static --wf-name quantum --wf-user-directory /Users/vaibhavjha/Documents/IISc/xfbench_multicloud/XFBench/workflows/custom_workflows/static-qfanout-hardware-aws --path-to-client-config /Users/vaibhavjha/Documents/IISc/xfbench_multicloud/XFaaS/serwo/config/client_config.json --dag-file-name dag.json --teardown-flag 0 --client-key localhost + + python3 bin/serwo/xfbench_run.py --csp aws --region ap-south-1 --max-rps 1 --duration 300 --payload-size random --dynamism static --wf-name quantum --wf-user-directory /Users/vaibhavjha/Documents/IISc/xfbench_multicloud/XFBench/workflows/custom_workflows/static-qfanout-hardware-aws --path-to-client-config /Users/vaibhavjha/Documents/IISc/xfbench_multicloud/XFaaS/serwo/config/client_config.json --dag-file-name dag.json --teardown-flag 0 --client-key localhost + disable quantum calls + + +Running QXFaas workflows: + python xfaas_main.py --wf-user-directory /Users/vaibhavjha/Documents/IISc/xfbench_multicloud/qxfaas/serwo/examples/qmap-aws --dag-file-name dag.json --dag-benchmark dag-benchmark.json --csp aws  --region ap-south-1 + + python3 xfaas_main.py --wf-user-directory /Users/vaibhavjha/Documents/IISc/xfbench_multicloud/qxfaas/serwo/examples/qmap-aws --dag-file-name dag.json --dag-benchmark dag-benchmark.json --csp aws  --region ap-south-1 + +IP details for Lab: + DNS: 10.16.25.15 + Netmask: 255.255.255.0 + Gateway: 10.24.24.1 + +Original IP Details of 10.24.24.30: + IP: 10.24.24.30 + Netmask: 24 + Gateway: 10.24.24.1 + DNS: 10.16.25.15 + +Original IP Details of 10.24.24.29 (3060): + IP: 10.24.24.29 + Netmask: 255.255.255.0 + Gateway: 10.24.24.1 + DNS: 10.16.25.13 + +Modified IP Details of 10.24.24.29: + IP: 172.23.244.2 + Netmask: 255.255.255.248 + Gateway: 172.23.244.1 + DNS: 172.23.244.1 + Should be connected to port on my desk which is 224.D22 (right side one) + +Command Triggering AWS State-Machine + aws stepfunctions start-execution \ + --state-machine-arn arn:aws:states:ap-south-1:235319806087:stateMachine:UserWf0000-9nhHSEV7fGK7 \ + --input '{\n \"workflow_instance_id\": 1,\n \"request_timestamp\": 12345,\n \"session_id\": \"123\",\n \"deployment_id\": \"dynamic-q10-d37-trotter\",\n \"data\": {\n \t\t\"circuit\": \"eJztW92K00AUziT9d/0H8WahXjWBKpsmW13wh7WuUnQFt3shikLdza6B2traXRARxIu+hO8h+Are9A30KbwQL3RbZzquNJuTDiYd58xFKaffN+c7Z84kZ0LzoN64W99cILpGB/G62jnPeHrwtaDxMfqdUMTa6HP95r3a86bfrvm9rT2/f9VeKhfd6xRNmeT96va2ub7XMuvtvrfr9cyKVS42Xr941mmZpeHg8dKTknVgOQJhhyIqoQgnFOGGIpZDEdVQxOVQxJVQxMpvxO1Wp9k3S3bVvbTiOFXbdWzHrbp2pVQuvux5W/4rv9O+tuxY1v7+oTUcr86nb1b5y+fV+x8K7xYXv36crPNwAEAfY4UCQi+w30Ho49RugNAnqD0FQp+k9jQIfYraMyD0aWrPgtBnqD0HQp+l9vxw8OZtd7QHCxohXe3wIH/NYrDcsKhZPEwp08Bmn8yUobMRbfrYeHSn2ffG/nfYJOcv3tr8ceH7jQhkMp2sHUFuPJyQxRyx4PUA8lptYxIjdweTFlsCA8ioMv56kqPqZSDrImRDJLmGmKMIlxSdu4NJ0+NKYAAZVc6yzIS7g0lLenujSqz6f6QyhhAFr99y3GVkIKdEyGmR5KbFHEW4tqe4O5i0VFwJDCCjylmW2eDuYNKS3t6oEhty9VTCHClR9TKEqMRdRoYQBfslObo6GcgZEXJWJLlZMUcRbrIZ7g4mLRNXAgPIqHKWZU5zdzBpSW9vVIkHYLVU4gFYDpVKFCO2unOkMoYQleiXZAhRsPOX43wiAzknQs6LJDcv5ihCH5Hj7mDScnElMICMKmdZ5ix3B5OW9PZGlfgoRy2V+ChHDpVKFCO2unOkMoYQleiXZAhRsPOX43ySHDnCTTbfHVvgSVHmzY1kyPhqkDIqYY6U+b97MmR8oUIOlcr8mTAGlUn3NagSn2GrpRKfYcuhUolixDP+HKnEM/4U8v+kMukCm3+Vmvbzj8EovwClFkX1\",\n\t\t\"observables\": [\n\t\t\t\"ZIIIIIIIII\",\n\t\t\t\"IZIIIIIIII\",\n\t\t\t\"IIZIIIIIII\",\n\t\t\t\"IIIZIIIIII\",\n\t\t\t\"IIIIZIIIII\",\n\t\t\t\"IIIIIZIIII\",\n\t\t\t\"IIIIIIZIII\",\n\t\t\t\"IIIIIIIZII\",\n\t\t\t\"IIIIIIIIZI\",\n\t\t\t\"IIIIIIIIIZ\"\n\t\t],\n\t\t\"metadata\": {\n\t\t\t\"depth\": 37,\n\t\t\t\"num_qubits\": 10\n\t\t}\n },\n \"credentials\": {\n \"ibmq\": {\n \"qtoken\": \"b7c27dd1a31636290b170b8330f98759c3927ba482ef07cff47ebd19814bbde4650c16eb625d377844ee5d5ff5310df14191f3e3768878ea2cec6616b346a5f7\"\n }\n },\n \"devices\": [\n {\n \"backend\": \"ibm_brisbane\",\n \"device\": \"aer\"\n }\n ]\n}' + +User Pinned Nodes (Merger, Poller, Reconstructor) + { + "6":"0", + "7":"0", + "8":"0" + } + + + +from fastapi import FastAPI +from starlette.middleware.proxy_headers import ProxyHeadersMiddleware +from mangum import Mangum +from fastmcp import FastMCP + +mcp = FastMCP("analyst") +app = mcp.http_app() + +# Trust all X-Forwarded headers so scheme and host are correct +app.add_middleware(ProxyHeadersMiddleware, trusted_hosts="*") + +handler = Mangum(app, lifespan="off") + + +from fastapi import FastAPI, Request +from mangum import Mangum + +app = FastAPI() + +@app.middleware("http") +async def forward_scheme(request: Request, call_next): + proto = request.headers.get("x-forwarded-proto") + if proto: + request.scope["scheme"] = proto + return await call_next(request) + +# mount or include your mcp.http_app here… + + + diff --git a/dag-benchmark-generator-with-datatransfer.py b/dag-benchmark-generator-with-datatransfer.py new file mode 100644 index 0000000..78ecc30 --- /dev/null +++ b/dag-benchmark-generator-with-datatransfer.py @@ -0,0 +1,128 @@ +import json +import statistics +import argparse + + +def process(aws, azure, dag): + nodes = {} + edges = {} + + for node in dag['Nodes']: + nid = node['NodeId'] + nodes[nid] = { + "0": calc_nodes(aws, nid), # AWS + "1": calc_nodes(azure, nid) # Azure + } + + for edge in dag['Edges']: + for k, v in edge.items(): + source_id = get_id(dag, k) + for target_node in v: + target_id = get_id(dag, target_node) + if source_id is None or target_id is None: + continue + if source_id == target_id: + continue + + if source_id not in edges: + edges[source_id] = {} + + # Latencies: keep existing per-cloud diagonals (placeholders for cross-cloud as in original) + aws_lat = calc_edges(aws, source_id, target_id) + az_lat = calc_edges(azure, source_id, target_id) + + # Data transfer sizes (bytes): combine AWS+Azure runs and take median of per-invocation max(out,in) + aws_pairs = transfer_pairs(aws, source_id, target_id) # list of per-invocation sizes + az_pairs = transfer_pairs(azure, source_id, target_id) + combined_pairs = aws_pairs + az_pairs + data_transfer_size = statistics.median(combined_pairs) if combined_pairs else 0 + + edges[source_id][target_id] = { + "DataTransferSize": data_transfer_size, # bytes (median across AWS+Azure runs) + "Latencies": [ + [aws_lat, 0], # [AWS->AWS, AWS->Azure] (cross left as in original) + [0, az_lat] # [Azure->AWS, Azure->Azure] + ] + } + + return {"NodeBenchmarks": nodes, "EdgeBenchmarks": edges} + + +def calc_nodes(data, nid): + latencies = [] + for entry in data: + funcs = entry.get('functions', {}) + if nid in funcs: + meta = funcs[nid] + lat = meta.get('end_delta', 0) - meta.get('start_delta', 0) + latencies.append(lat) + median = statistics.median(latencies) if latencies else 0 + return {"Latency": median, "Cost": 0} + + +def calc_edges(data, source_id, target_id): + latencies = [] + for entry in data: + funcs = entry.get('functions', {}) + if source_id in funcs and target_id in funcs: + s = funcs[source_id] + t = funcs[target_id] + latency = t.get('start_delta', 0) - s.get('end_delta', 0) + latencies.append(latency) + return statistics.median(latencies) if latencies else 0 + + +def transfer_pairs(data, source_id, target_id): + """ + Returns a list of per-invocation transfer sizes (bytes) for source->target, + using max(out_payload_bytes, in_payload_bytes) to avoid undercounting. + """ + sizes = [] + for entry in data: + funcs = entry.get('functions', {}) + if source_id in funcs and target_id in funcs: + s = funcs[source_id] + t = funcs[target_id] + out_b = s.get('out_payload_bytes', 0) or 0 + in_b = t.get('in_payload_bytes', 0) or 0 + sizes.append(max(out_b, in_b)) + return sizes + + +def get_id(dag, node_name): + for node in dag['Nodes']: + if node['NodeName'] == node_name or node['NodeId'] == node_name: + return node['NodeId'] + return None + + +def process_jsonl(file_path): + with open(file_path, 'r') as file: + return [json.loads(line) for line in file] + + +def main(): + parser = argparse.ArgumentParser(description='dag benchmark') + parser.add_argument('--aws', type=str, help='aws absolute path') + parser.add_argument('--azure', type=str, help='azure absolute path') + parser.add_argument('--dag', type=str, help='dag absolute path') + parser.add_argument('--output', type=str, help='output file name', default="dag-benchmark.json") + + args = parser.parse_args() + + aws = process_jsonl(args.aws) if args.aws else [] + azure = process_jsonl(args.azure) if args.azure else [] + with open(args.dag, 'r') as file: + dag = json.load(file) + + benchmarks = process(aws, azure, dag) + + with open(args.output, 'w') as file: + json.dump(benchmarks, file, indent=2) + + print(f"data saved - {args.output}") + + +if __name__ == "__main__": + main() + diff --git a/dag-benchmark-v2.json b/dag-benchmark-v2.json new file mode 100644 index 0000000..2a69245 --- /dev/null +++ b/dag-benchmark-v2.json @@ -0,0 +1,179 @@ +{ + "NodeBenchmarks": { + "1": { + "0": { + "Latency": 512, + "Cost": 0 + }, + "1": { + "Latency": 29990.0, + "Cost": 0 + } + }, + "2": { + "0": { + "Latency": 20028, + "Cost": 0 + }, + "1": { + "Latency": 237520.5, + "Cost": 0 + } + }, + "3": { + "0": { + "Latency": 20232, + "Cost": 0 + }, + "1": { + "Latency": 244109.5, + "Cost": 0 + } + }, + "4": { + "0": { + "Latency": 32596, + "Cost": 0 + }, + "1": { + "Latency": 284497.5, + "Cost": 0 + } + }, + "5": { + "0": { + "Latency": 31863, + "Cost": 0 + }, + "1": { + "Latency": 283017.0, + "Cost": 0 + } + }, + "6": { + "0": { + "Latency": 1, + "Cost": 0 + }, + "1": { + "Latency": 123.0, + "Cost": 0 + } + }, + "7": { + "0": { + "Latency": 127, + "Cost": 0 + }, + "1": { + "Latency": 31308.0, + "Cost": 0 + } + } + }, + "EdgeBenchmarks": { + "1": { + "2": { + "DataTransferSize": 16579, + "Latencies": [ + [ + 85, + 0 + ], + [ + 0, + 978488.5 + ] + ] + }, + "3": { + "DataTransferSize": 16579, + "Latencies": [ + [ + 85, + 0 + ], + [ + 0, + 1387289.0 + ] + ] + } + }, + "2": { + "4": { + "DataTransferSize": 28281, + "Latencies": [ + [ + 89, + 0 + ], + [ + 0, + 529383.5 + ] + ] + } + }, + "3": { + "5": { + "DataTransferSize": 28377, + "Latencies": [ + [ + 89, + 0 + ], + [ + 0, + 93317.0 + ] + ] + } + }, + "4": { + "6": { + "DataTransferSize": 43382, + "Latencies": [ + [ + 125, + 0 + ], + [ + 0, + 47225.5 + ] + ] + } + }, + "5": { + "6": { + "DataTransferSize": 43382, + "Latencies": [ + [ + 554, + 0 + ], + [ + 0, + 33583.0 + ] + ] + } + }, + "6": { + "7": { + "DataTransferSize": 43470, + "Latencies": [ + [ + 91, + 0 + ], + [ + 0, + 156913.5 + ] + ] + } + } + } +} \ No newline at end of file diff --git a/dag-benchmark_old.json b/dag-benchmark_old.json new file mode 100644 index 0000000..be62a1a --- /dev/null +++ b/dag-benchmark_old.json @@ -0,0 +1,179 @@ +{ + "NodeBenchmarks": { + "1": { + "0": { + "Latency": 512, + "Cost": 0 + }, + "1": { + "Latency": 802, + "Cost": 0 + } + }, + "2": { + "0": { + "Latency": 20028, + "Cost": 0 + }, + "1": { + "Latency": 14475, + "Cost": 0 + } + }, + "3": { + "0": { + "Latency": 20232, + "Cost": 0 + }, + "1": { + "Latency": 14054, + "Cost": 0 + } + }, + "4": { + "0": { + "Latency": 32596, + "Cost": 0 + }, + "1": { + "Latency": 47324, + "Cost": 0 + } + }, + "5": { + "0": { + "Latency": 31863, + "Cost": 0 + }, + "1": { + "Latency": 46279, + "Cost": 0 + } + }, + "6": { + "0": { + "Latency": 1, + "Cost": 0 + }, + "1": { + "Latency": 10, + "Cost": 0 + } + }, + "7": { + "0": { + "Latency": 127, + "Cost": 0 + }, + "1": { + "Latency": 361, + "Cost": 0 + } + } + }, + "EdgeBenchmarks": { + "1": { + "2": { + "DataTransferSize": 16599.0, + "Latencies": [ + [ + 85, + 0 + ], + [ + 0, + 1175448 + ] + ] + }, + "3": { + "DataTransferSize": 16599.0, + "Latencies": [ + [ + 85, + 0 + ], + [ + 0, + 490988 + ] + ] + } + }, + "2": { + "4": { + "DataTransferSize": 28479.0, + "Latencies": [ + [ + 89, + 0 + ], + [ + 0, + 821818 + ] + ] + } + }, + "3": { + "5": { + "DataTransferSize": 28589.0, + "Latencies": [ + [ + 89, + 0 + ], + [ + 0, + 1605943 + ] + ] + } + }, + "4": { + "6": { + "DataTransferSize": 43462.0, + "Latencies": [ + [ + 125, + 0 + ], + [ + 0, + 663975 + ] + ] + } + }, + "5": { + "6": { + "DataTransferSize": 43462.0, + "Latencies": [ + [ + 554, + 0 + ], + [ + 0, + 652349 + ] + ] + } + }, + "6": { + "7": { + "DataTransferSize": 43550.0, + "Latencies": [ + [ + 91, + 0 + ], + [ + 0, + 81610 + ] + ] + } + } + } +} \ No newline at end of file diff --git a/dag-intercloud-latency-benchmark-generator.py b/dag-intercloud-latency-benchmark-generator.py new file mode 100644 index 0000000..7c22444 --- /dev/null +++ b/dag-intercloud-latency-benchmark-generator.py @@ -0,0 +1,42 @@ +import json +from statistics import median + +#Az -> AWS +"""/Users/vaibhavjha/Documents/IISc/xfbench_multicloud/XFaaS/serwo/examples/static-fanout-comms/workflow-gen/70fc9eff-0c32-4456-8f0b-8d8f016daffb/exp1/logs/staticfanout_comms_azure_static_aws_dyndb_items.jsonl""" + +#Aws -> Az +"""/Users/vaibhavjha/Documents/IISc/xfbench_multicloud/XFaaS/serwo/examples/static-fanout-comms/workflow-gen/baac0abc-f3d2-42db-9508-e79bd1c515b9/exp1/logs/staticfanout_comms_aws_static_aws_dyndb_items.jsonl""" + +path_to_intercloud_benchmark = "/Users/vaibhavjha/Documents/IISc/xfbench_multicloud/XFaaS/serwo/examples/static-fanout-comms/workflow-gen/70fc9eff-0c32-4456-8f0b-8d8f016daffb/exp1/logs/staticfanout_comms_azure_static_aws_dyndb_items.jsonl" +# Load the data from the attached file +with open(path_to_intercloud_benchmark, 'r') as f: + lines = f.readlines() + +# Parse each JSON line and extract timing differences +differences = [] + +for line in lines: + # Parse the JSON record + record = json.loads(line) + functions = record.get('functions', {}) + + # Check if both node 1 and node 2 exist in the record + if '1' in functions and '2' in functions: + # Extract end_delta of node 1 and start_delta of node 2 + node1_end = functions['1']['end_delta'] + node2_start = functions['2']['start_delta'] + + # Calculate the difference: start time of node 2 - end time of node 1 + diff = node2_start - node1_end + differences.append(diff) + +# Calculate the median of all differences +median_difference = median(differences) + +print(f"Total data points analyzed: {len(differences)}") +print(f"Median difference (node 2 start - node 1 end): {median_difference} milliseconds") + +# Optional: Show some additional statistics +print(f"Minimum difference: {min(differences)} ms") +print(f"Maximum difference: {max(differences)} ms") +print(f"Average difference: {sum(differences)/len(differences):.2f} ms") diff --git a/serwo/aws_create_statemachine.py b/serwo/aws_create_statemachine.py index 36d2660..b8f80b6 100644 --- a/serwo/aws_create_statemachine.py +++ b/serwo/aws_create_statemachine.py @@ -109,12 +109,14 @@ def __create_lambda_fns( runner_template_dir, runner_filename ): + # TODO - should this be taken in from the dag-description? fn_requirements_filename = "requirements.txt" fn_dir = serwo_fn_build_dir / f"{fn_name}" logger.info(f"Creating function directory for {fn_name}") if not os.path.exists(fn_dir): + print("Directory made") os.makedirs(fn_dir) pathlib.Path(fn_dir / "__init__.py").touch() @@ -348,6 +350,40 @@ def __generate_asl_template(self): # Statemachine.asl.josn should be changed here sfn_json_copy = copy.deepcopy(json.loads(sfn_json)) data=add_async_afn_builder(sfn_json_copy,changing_fns_list) + + # apply conditional branching logic AFTER async modifications + conditional_branches = self.__user_dag.get_conditional_branches() + if conditional_branches and len(conditional_branches) > 0: + for branch in conditional_branches: + source_node = branch['SourceNode'] + + # Find the source state and modify its Next to point to Check state + if source_node in data['States']: + check_state_name = f"Check{source_node}" + data['States'][source_node]['Next'] = check_state_name + + # Remove 'End' if it exists + data['States'][source_node].pop('End', None) + + # Add the Choice state + data['States'][check_state_name] = { + 'Type': 'Choice', + 'Choices': [ + { + 'Variable': branch['ConditionVariable'], + branch['ConditionType']: branch['ConditionValue'], + 'Next': branch['TargetNode'] + } + ], + 'Default': branch['DefaultTarget'] + } + + # Add Success state if specified as default + if branch['DefaultTarget'] == 'Success': + data['States']['Success'] = { + 'Type': 'Succeed' + } + # print("Updated data of sfn builder after adding poll",data) with open(f"{self.__aws_build_dir}/{self.__json_file}", "w") as statemachinejson: statemachinejson.write(json.dumps(data)) @@ -404,15 +440,70 @@ def build_resources(self): def build_workflow(self): logger.info(f"Starting SAM Build for {self.__user_dag.get_user_dag_name()}") os.system( - f"sam build --build-dir {self.__sam_build_dir} --template-file {self.__aws_build_dir / self.__yaml_file} " + f"DOCKER_DEFAULT_PLATFORM=linux/amd64 " + f"sam build --use-container " + f"--build-dir {self.__sam_build_dir} " + f"--template-file {self.__aws_build_dir / self.__yaml_file}" ) + """ NOTE - deploy workflow """ + def __check_and_cleanup_failed_stack(self): + """Check if stack exists in failed state and delete it""" + import subprocess + import time + + try: + # Check stack status + result = subprocess.run( + f'aws cloudformation describe-stacks --stack-name {self.__sam_stackname} --region {self.__region} --query "Stacks[0].StackStatus" --output text', + shell=True, + capture_output=True, + text=True + ) + + stack_status = result.stdout.strip() + + # If stack is in a failed state, delete it + if stack_status in ['UPDATE_FAILED', 'CREATE_FAILED', 'ROLLBACK_FAILED', 'DELETE_FAILED', 'UPDATE_ROLLBACK_FAILED']: + logger.warning(f"Stack {self.__sam_stackname} is in {stack_status} state. Deleting...") + + # Delete the stack + os.system(f"aws cloudformation delete-stack --stack-name {self.__sam_stackname} --region {self.__region}") + + # Wait for deletion + logger.info("Waiting for stack deletion to complete...") + time.sleep(10) # Initial wait + + # Poll until deleted + for _ in range(30): # Max 5 minutes + check_result = subprocess.run( + f'aws cloudformation describe-stacks --stack-name {self.__sam_stackname} --region {self.__region}', + shell=True, + capture_output=True, + text=True + ) + if "does not exist" in check_result.stderr: + logger.info("Stack successfully deleted") + break + time.sleep(10) + else: + logger.warning("Stack deletion took longer than expected, continuing anyway...") + + except Exception as e: + # Stack doesn't exist, which is fine + logger.info(f"No existing stack found or error checking: {e}") + + def deploy_workflow(self): logger.info(f"Starting SAM Deploy for {self.__user_dag.get_user_dag_name()}") + logger.info(f"Template file {self.__sam_build_dir / self.__yaml_file}") + + self.__check_and_cleanup_failed_stack() + os.system( f"sam deploy \ --template-file {self.__sam_build_dir / self.__yaml_file} \ @@ -435,7 +526,7 @@ def deploy_workflow(self): # print("Sam stack name",self.__sam_stackname) # print("Output File path",self.__outputs_filepath) - ## add sam stack name to ouput filepat + ## add sam stack name to ouput filepath with open(self.__outputs_filepath, "r") as f: data = json.load(f) data.append({"OutputKey": "SAMStackName", "OutputValue": self.__sam_stackname, "Description": "SAM Stack Name"}) @@ -446,9 +537,10 @@ def deploy_workflow(self): def add_async_afn_builder(data,list): - + for i in range(0,len(list)): (fn_name,sub) =list[i] + poll_next=data["States"][fn_name]['Next'] checkPollcondition='CheckPollCondition'+str(i) waitstate='WaitState'+str(i) @@ -467,7 +559,7 @@ def add_async_afn_builder(data,list): } data["States"][waitstate]={ "Type": "Wait", - "Seconds": 100, + "SecondsPath": "$.body.waittime", "Next": fn_name } return data diff --git a/serwo/config/cloud_dictionary.json b/serwo/config/cloud_dictionary.json index 37d0024..0838daf 100644 --- a/serwo/config/cloud_dictionary.json +++ b/serwo/config/cloud_dictionary.json @@ -4,7 +4,7 @@ "region": "ap-south-1" }, "1": { - "csp": "azure", + "csp": "azure_v2", "region": "centralindia" } } \ No newline at end of file diff --git a/serwo/config/xfaas_user_config.json b/serwo/config/xfaas_user_config.json index 73cafe2..5afb926 100644 --- a/serwo/config/xfaas_user_config.json +++ b/serwo/config/xfaas_user_config.json @@ -9,5 +9,9 @@ "password": "check" } }, - "user_pinned_nodes" : {} + "user_pinned_nodes" : { + "1":"1", + "2":"1", + "3":"1" + } } \ No newline at end of file diff --git a/serwo/deploy/template.yaml b/serwo/deploy/template.yaml index 9a335d0..eb8a78e 100644 --- a/serwo/deploy/template.yaml +++ b/serwo/deploy/template.yaml @@ -82,7 +82,7 @@ Resources: Properties: CodeUri: {{ function.uri }} Handler: x - Runtime: python3.9 + Runtime: python3.13 MemorySize: 1024 Timeout: 300 Architectures: diff --git a/serwo/dp_xfaas_partitioner.py b/serwo/dp_xfaas_partitioner.py index 2eaba11..13324a9 100644 --- a/serwo/dp_xfaas_partitioner.py +++ b/serwo/dp_xfaas_partitioner.py @@ -67,12 +67,13 @@ def get_data_transfer_value(data_transfers_benchmark, i, j, v, data_tranfers,is_ def evaluate_inter_cloud_data_transfer_constraints(data_tranfers, i, j, v): flag = False - if v == 1 and j == 0 and data_tranfers[i - 1] > 64: - flag = True - if v == 0 and j == 1 and data_tranfers[i - 1] > 256: - flag = True - if v==0 and j==0 and data_tranfers[i-1] > 256: - flag = True + #Added Large payload support hence commenting this out + # if v == 1 and j == 0 and data_tranfers[i - 1] > 64: + # flag = True + # if v == 0 and j == 1 and data_tranfers[i - 1] > 256: + # flag = True + # if v==0 and j==0 and data_tranfers[i-1] > 256: + # flag = True return flag diff --git a/serwo/python/src/faas-templates/aws/push-to-sqs-template/PushToSQS/push_to_aws_q.py b/serwo/python/src/faas-templates/aws/push-to-sqs-template/PushToSQS/push_to_aws_q.py index 51ad42b..b4c0fd0 100644 --- a/serwo/python/src/faas-templates/aws/push-to-sqs-template/PushToSQS/push_to_aws_q.py +++ b/serwo/python/src/faas-templates/aws/push-to-sqs-template/PushToSQS/push_to_aws_q.py @@ -4,6 +4,7 @@ import botocore.session from python.src.utils.classes.commons.serwo_objects import SerWOObject import logging +import uuid # Create SQS client """ @@ -11,17 +12,43 @@ NOTE - Currently the access_key_id = , secret_access_key = are HARDCODED. WE NEED TO TEMPLATE THIS !!!!!!!!!!!!!!!!!!!!!!!!!!!!!! """ - +AWS_ACCESS_KEY_ID = "{{access_key_id}}" +AWS_SECRET_ACCESS_KEY = "{{secret_access_key}}" sqs = boto3.client( "sqs", region_name="ap-south-1", - aws_access_key_id="{{access_key_id}}", - aws_secret_access_key="{{secret_access_key}}", + aws_access_key_id=AWS_ACCESS_KEY_ID, + aws_secret_access_key=AWS_SECRET_ACCESS_KEY, ) +AWS_MESSAGE_THRESHOLD = 256 * 1024 queue_url = "{{queue_url}}" +def generate_s3_key(prefix="xfaas", extension="json"): + """ + Generate a unique S3 object key with the given prefix and extension. + """ + if prefix == None: + prefix = "xfaas" + return f"{prefix}/{uuid.uuid4()}.{extension}" + +def upload_json_payload(bucket, payload, prefix="xfaas"): + """ + Serializes and uploads the payload (dict/list) to the given S3 bucket. + Returns the key used. + """ + key = generate_s3_key(prefix=prefix, extension="json") + s3 = boto3.client( + "s3", + region_name="ap-south-1", + aws_access_key_id=AWS_ACCESS_KEY_ID, + aws_secret_access_key=AWS_SECRET_ACCESS_KEY, + ) + data_bytes = json.dumps(payload).encode("utf-8") + s3.put_object(Bucket=bucket, Key=key, Body=data_bytes) + return key + # Send message to SQS queue def user_function(serwoObject) -> SerWOObject: @@ -31,7 +58,35 @@ def user_function(serwoObject) -> SerWOObject: metadata = serwoObject.get_metadata() message["body"] = data message["metadata"] = metadata - logging.info(f"Input {message}") + #logging.info(f"Input {message}") + if "aws" not in message["body"]: + logging.info("AWS details for long message is not present in function body") + + output_bytes = json.dumps(message).encode("utf-8") + if len(output_bytes) > AWS_MESSAGE_THRESHOLD: + user_bucket = None + # If output is large (autotrigger) + event_body = message["body"] + if isinstance(event_body, dict): + if "aws" in event_body: + aws_details = event_body.get("aws", {}) + user_bucket = aws_details.get("bucket") + if not user_bucket: + raise ValueError("This workflow needs long payload size handling. Please provide valid storage details for each CSP in the event") + # --- Proceed to upload --- + deployment_id = metadata.get("deployment_id") + key = upload_json_payload(user_bucket, message["body"], prefix= deployment_id) + result = { + + "large_payload": 1, + "aws": { + "bucket": user_bucket, + "key": key + } + } + + message["body"] = result + response = sqs.send_message( QueueUrl=queue_url, MessageBody=json.dumps(message), diff --git a/serwo/python/src/faas-templates/aws/yaml-templates/apigw-awsstepfunctions.yaml b/serwo/python/src/faas-templates/aws/yaml-templates/apigw-awsstepfunctions.yaml index d55b94d..aa86f27 100644 --- a/serwo/python/src/faas-templates/aws/yaml-templates/apigw-awsstepfunctions.yaml +++ b/serwo/python/src/faas-templates/aws/yaml-templates/apigw-awsstepfunctions.yaml @@ -81,6 +81,13 @@ Resources: {% if function.iscontainer %} Type: AWS::Serverless::Function Properties: + Policies: + - Statement: + - Effect: Allow + Action: + - s3:GetObject + - s3:PutObject + Resource: arn:aws:s3:::*/* PackageType: Image Architectures: - x86_64 @@ -93,9 +100,16 @@ Resources: {% else %} Type: AWS::Serverless::Function Properties: + Policies: + - Statement: + - Effect: Allow + Action: + - s3:GetObject + - s3:PutObject + Resource: arn:aws:s3:::*/* CodeUri: {{ function.uri }} Handler: {{ function.handler }} - Runtime: python3.9 + Runtime: python3.13 MemorySize: {{ function.memory }} Timeout: 300 Architectures: diff --git a/serwo/python/src/faas-templates/aws/yaml-templates/awsstepfunctions.yaml b/serwo/python/src/faas-templates/aws/yaml-templates/awsstepfunctions.yaml index 7dd3062..adc0c2d 100644 --- a/serwo/python/src/faas-templates/aws/yaml-templates/awsstepfunctions.yaml +++ b/serwo/python/src/faas-templates/aws/yaml-templates/awsstepfunctions.yaml @@ -38,7 +38,7 @@ Resources: Properties: CodeUri: {{ function.uri }} Handler: {{ function.handler }} - Runtime: python3.9 + Runtime: python3.13 Architectures: - x86_64 {% endfor %} \ No newline at end of file diff --git a/serwo/python/src/faas-templates/aws/yaml-templates/sqs-awsstepfunctions.yaml b/serwo/python/src/faas-templates/aws/yaml-templates/sqs-awsstepfunctions.yaml index 43691dd..3a6f96c 100644 --- a/serwo/python/src/faas-templates/aws/yaml-templates/sqs-awsstepfunctions.yaml +++ b/serwo/python/src/faas-templates/aws/yaml-templates/sqs-awsstepfunctions.yaml @@ -63,7 +63,7 @@ Resources: input=message_body ) - {StateMachineArn: !Ref {{stepfunctionname}}} # templated here - Runtime: "python3.9" + Runtime: "python3.13" Timeout: "30" # mostly static @@ -107,13 +107,63 @@ Resources: {% for function in functions -%} {{function.name}}: + {% if function.iscontainer %} Type: AWS::Serverless::Function Properties: + Policies: + - Statement: + - Effect: Allow + Action: + - s3:GetObject + - s3:PutObject + - dynamodb:GetItem + - dynamodb:PutItem + - dynamodb:UpdateItem + - dynamodb:DeleteItem + - dynamodb:Query + - dynamodb:Scan + - dynamodb:BatchGetItem + - dynamodb:BatchWriteItem + - logs:CreateLogGroup + - logs:CreateLogStream + - logs:PutLogEvents + Resource: "*" + PackageType: Image + Architectures: + - x86_64 + Timeout: 600 + MemorySize: {{function.memory}} + Metadata: + Dockerfile: Dockerfile + DockerContext: ./functions/{{function.name}} + DockerTag: python3.13 + {% else %} + Type: AWS::Serverless::Function + Properties: + Policies: + - Statement: + - Effect: Allow + Action: + - s3:GetObject + - s3:PutObject + - dynamodb:GetItem + - dynamodb:PutItem + - dynamodb:UpdateItem + - dynamodb:DeleteItem + - dynamodb:Query + - dynamodb:Scan + - dynamodb:BatchGetItem + - dynamodb:BatchWriteItem + - logs:CreateLogGroup + - logs:CreateLogStream + - logs:PutLogEvents + Resource: "*" CodeUri: {{ function.uri }} Handler: {{ function.handler }} - Runtime: python3.9 + Runtime: python3.13 MemorySize: {{ function.memory }} Timeout: 300 Architectures: - x86_64 + {% endif%} {% endfor %} \ No newline at end of file diff --git a/serwo/python/src/runner-templates/aws/runner_template.py b/serwo/python/src/runner-templates/aws/runner_template.py index 3461074..1120467 100644 --- a/serwo/python/src/runner-templates/aws/runner_template.py +++ b/serwo/python/src/runner-templates/aws/runner_template.py @@ -1,6 +1,3 @@ -# TODO - !! MOVE THIS OUT TO A DIFFERENT MODULE # -# SerwoObject - single one -# SerwoListObject - [SerwoObject] import importlib import json import sys @@ -11,12 +8,13 @@ import os import psutil import objsize -# from USER_FUNCTION_PLACEHOLDER import function as USER_FUNCTION_PLACEHOLDER_function - NOTE - !!! TK - STANDARDISE THIS!!! IMPORTANT -from USER_FUNCTION_PLACEHOLDER import user_function as USER_FUNCTION_PLACEHOLDER_function +import uuid +import boto3 from copy import deepcopy +from USER_FUNCTION_PLACEHOLDER import user_function as USER_FUNCTION_PLACEHOLDER_function from python.src.utils.classes.commons.serwo_objects import build_serwo_object from python.src.utils.classes.commons.serwo_objects import build_serwo_list_object -from python.src.utils.classes.commons.serwo_objects import SerWOObject +from python.src.utils.classes.commons.serwo_objects import SerWOObject, SerWOObjectsList downstream = 0 """ @@ -27,27 +25,48 @@ - objective is to create a list of common keys, access patterns which will be used to create a common object to pass around - """ +# Long Message Support +# Helper for S3 key generation and upload +def generate_s3_key(prefix="xfaas", extension="json"): + """ + Generate a unique S3 object key with the given prefix and extension. + """ + if prefix == None: + prefix = "xfaas" + return f"{prefix}/{uuid.uuid4()}.{extension}" +def upload_json_payload(bucket, payload, prefix="xfaas"): + """ + Serializes and uploads the payload (dict/list) to the given S3 bucket. + Returns the key used. + """ + key = generate_s3_key(prefix=prefix, extension="json") + s3 = boto3.client('s3') + data_bytes = json.dumps(payload).encode("utf-8") + s3.put_object(Bucket=bucket, Key=key, Body=data_bytes) + return key # Get time delta function def get_delta(timestamp): return round(time.time() * 1000) - timestamp + # AWS Handler def lambda_handler(event, context): start_time = round(time.time() * 1000) - # Unmarshal from lambda handler - # capturing input payload size + # Handle input payload size computation for monitoring input_payload_size_bytes = None + # ----------- INPUT PARSING --------------- if isinstance(event, list): # TODO: exception handling - serwo_request_object = build_serwo_list_object(event) - # Calculate input payload size - input_payload_size_bytes = sum([objsize.get_deep_size(x.get_body()) for x in serwo_request_object.get_objects()]) - + + input_payload_size_bytes = sum([objsize.get_deep_size(x.get("body")) for x in event]) + #sum([objsize.get_deep_size(x.get_body()) for x in serwo_request_object.get_objects()]) + + serwo_request_object = build_serwo_list_object(event) elif isinstance(event, dict): # # NOTE - this is a sample if condition for the pilot jobs # if "body" in event: @@ -71,12 +90,16 @@ def lambda_handler(event, context): deployment_id=deployment_id, functions=[], ) + input_payload_size_bytes = objsize.get_deep_size(event.get("body")) + serwo_request_object = build_serwo_object(event) - input_payload_size_bytes = objsize.get_deep_size(serwo_request_object.get_body()) + + else: - # TODO: Report error and return - pass + return dict(statusCode=500, body="Unrecognized input type", metadata="None") serwo_request_object.set_basepath("") + + # user function exec status_code = 200 try: @@ -95,7 +118,7 @@ def lambda_handler(event, context): memory_after = process.memory_info().rss print(f"SerWOMemUsage::After::{wf_instance_id},{function_id},{memory_after}") - # Sanity check for user function response + # Validation check for user function response if not isinstance(response_object, SerWOObject): status_code = 500 return dict( @@ -138,8 +161,45 @@ def lambda_handler(event, context): # post function handler # NOTE - leaving empty for now and returning a response is. # Send service bus/ storage queue + deployment_id = metadata.get("deployment_id") body = response_object.get_body() response = dict(statusCode=status_code, body=body, metadata=metadata) + output_bytes = json.dumps(response).encode("utf-8") + LONG_MESSAGE_THRESHOLD = 256 * 1024 # 256 KB in bytes + + if len(output_bytes) > LONG_MESSAGE_THRESHOLD: + user_bucket = None + # If output is large (autotrigger) + + if isinstance(event, dict): + event_body = event.get("body") + if "aws" in event_body: + aws_details = event_body.get("aws", {}) + user_bucket = aws_details.get("bucket") + elif isinstance(event, list): + #We get a list in input event from a fan-in which are all assumed to be in the same csp + event_body = event[0].get("body") + if "aws" in event_body: + aws_details = event_body.get("aws", {}) + user_bucket = aws_details.get("bucket") + if not user_bucket: + raise ValueError("This workflow needs long payload size handling. Please provide a CSP AND valid storage details for each CSP in the event") + # --- Proceed to upload --- + key = upload_json_payload(user_bucket, body, prefix= deployment_id) + result = { + + "large_payload": 1, + "aws": { + "bucket": user_bucket, + "key": key + } + } + function_metadata_list[-1][function_id]['out_payload_bytes'] = objsize.get_deep_size(result) + metadata.update(dict(functions=function_metadata_list)) + + response = dict(statusCode=200, body=result, metadata=metadata) + + if downstream == 0: # TODO - change while doing egress return response diff --git a/serwo/python/src/runner-templates/aws/runner_template_bkp.py b/serwo/python/src/runner-templates/aws/runner_template_bkp.py new file mode 100644 index 0000000..3461074 --- /dev/null +++ b/serwo/python/src/runner-templates/aws/runner_template_bkp.py @@ -0,0 +1,151 @@ +# TODO - !! MOVE THIS OUT TO A DIFFERENT MODULE # +# SerwoObject - single one +# SerwoListObject - [SerwoObject] +import importlib +import json +import sys +import time +import random +import string +import logging +import os +import psutil +import objsize +# from USER_FUNCTION_PLACEHOLDER import function as USER_FUNCTION_PLACEHOLDER_function - NOTE - !!! TK - STANDARDISE THIS!!! IMPORTANT +from USER_FUNCTION_PLACEHOLDER import user_function as USER_FUNCTION_PLACEHOLDER_function +from copy import deepcopy +from python.src.utils.classes.commons.serwo_objects import build_serwo_object +from python.src.utils.classes.commons.serwo_objects import build_serwo_list_object +from python.src.utils.classes.commons.serwo_objects import SerWOObject + +downstream = 0 +""" +NOTE - creating a serwo wrapper object from cloud events +* a different wrapper object will be constructed for different event types +* for one particular event type one object will be constructed + - we will need to find common keys in the event object for one event type across different FaaS service providers + - objective is to create a list of common keys, access patterns which will be used to create a common object to pass around + - +""" + + +# Get time delta function +def get_delta(timestamp): + return round(time.time() * 1000) - timestamp + + +# AWS Handler +def lambda_handler(event, context): + start_time = round(time.time() * 1000) + # Unmarshal from lambda handler + # capturing input payload size + input_payload_size_bytes = None + + if isinstance(event, list): + # TODO: exception handling + serwo_request_object = build_serwo_list_object(event) + + # Calculate input payload size + input_payload_size_bytes = sum([objsize.get_deep_size(x.get_body()) for x in serwo_request_object.get_objects()]) + + elif isinstance(event, dict): + # # NOTE - this is a sample if condition for the pilot jobs + # if "body" in event: + # if "is_warmup" in event["body"]: + # print("Warmup code") + # return dict(status_code=200, body="Warmed up") + if "metadata" not in event: + new_event = deepcopy(event) + event = dict(body=new_event) + wf_instance_id = event["body"].get("workflow_instance_id") + request_timestamp = event["body"].get("request_timestamp") + session_id = event["body"].get("session_id") # NOTE - new variable to keep track of requests + deployment_id = event["body"].get("deployment_id") + overheads = start_time - request_timestamp + event["metadata"] = dict( + workflow_instance_id=wf_instance_id, + workflow_start_time=start_time, + request_timestamp=request_timestamp, + session_id=session_id, + overheads=overheads, + deployment_id=deployment_id, + functions=[], + ) + serwo_request_object = build_serwo_object(event) + input_payload_size_bytes = objsize.get_deep_size(serwo_request_object.get_body()) + else: + # TODO: Report error and return + pass + serwo_request_object.set_basepath("") + # user function exec + status_code = 200 + try: + start_epoch_time = serwo_request_object.get_metadata().get( + "workflow_start_time" + ) + start_time_delta = get_delta(start_epoch_time) + wf_instance_id = serwo_request_object.get_metadata().get("workflow_instance_id") + function_id = "{{function_id_placeholder}}" + process = psutil.Process(os.getpid()) + + memory_before = process.memory_info().rss + print(f"SerWOMemUsage::Before::{wf_instance_id},{function_id},{memory_before}") + response_object = USER_FUNCTION_PLACEHOLDER_function(serwo_request_object) + process = psutil.Process(os.getpid()) + memory_after = process.memory_info().rss + print(f"SerWOMemUsage::After::{wf_instance_id},{function_id},{memory_after}") + + # Sanity check for user function response + if not isinstance(response_object, SerWOObject): + status_code = 500 + return dict( + statusCode=status_code, + body="Response Object Must be of type SerWOObject", + metadata="None", + ) + end_time_delta = get_delta(start_epoch_time) + # st_time = int(time.time()*1000) + # cpu_brand = cpuinfo.get_cpu_info()["brand_raw"] + # en_time = int(time.time()*1000) + # time_taken = en_time - st_time + # cpu_brand = f"{cpu_brand}_{time_taken}" + # Get current metadata here + metadata = serwo_request_object.get_metadata() + function_metadata_list = metadata.get("functions") + # NOTE - the template for generated function id + function_metadata_list.append( + { + function_id: dict( + start_delta=start_time_delta, + end_delta=end_time_delta, + mem_before=memory_before, + mem_after=memory_after, + in_payload_bytes=input_payload_size_bytes, + out_payload_bytes=objsize.get_deep_size(response_object.get_body()), + # cpu=cpu_brand + + ) + } + ) + metadata.update(dict(functions=function_metadata_list)) + except Exception as e: + # if user compute fails then default to status code as 500 and no response body + print(e) + status_code = 500 + return dict( + statusCode=status_code, body="Error in user compute", metadata="None" + ) + # post function handler + # NOTE - leaving empty for now and returning a response is. + # Send service bus/ storage queue + body = response_object.get_body() + response = dict(statusCode=status_code, body=body, metadata=metadata) + if downstream == 0: + # TODO - change while doing egress + return response + return response + + +if __name__ == "__main__": + print("Main Method") + # lambda_handler(event, context) diff --git a/serwo/python/src/utils/classes/aws/user_dag.py b/serwo/python/src/utils/classes/aws/user_dag.py index 0517199..99aa7d9 100644 --- a/serwo/python/src/utils/classes/aws/user_dag.py +++ b/serwo/python/src/utils/classes/aws/user_dag.py @@ -24,6 +24,7 @@ def __init__(self, user_config_path): self.__nodeIDMap = {} self.__dag = nx.DiGraph() self.__functions = {} + self.__conditional_branches = [] except Exception as e: raise e @@ -65,6 +66,10 @@ def __init__(self, user_config_path): for val in edge[key]: self.__dag.add_edge(self.__nodeIDMap[key], self.__nodeIDMap[val]) + #Parse conditional branches after edges + if "ConditionalBranches" in self.__dag_config_data: + self.__conditional_branches = self.__dag_config_data["ConditionalBranches"] + def _get_state(self, nodename): state = AWSSfnBuilder.State.parse( { @@ -280,6 +285,11 @@ def get_successor_node_names(self, node_name): def get_user_dag_nodes(self): return self.__dag_config_data["Nodes"] + + def get_conditional_branches(self): + """Returns conditional branches from DAG config""" + return self.__conditional_branches + def get_user_dag_edges(self): return self.__dag_config_data["Edges"] def get_dag(self): diff --git a/serwo/python/src/utils/classes/azure/user_dag.py b/serwo/python/src/utils/classes/azure/user_dag.py index 1856678..96915f0 100644 --- a/serwo/python/src/utils/classes/azure/user_dag.py +++ b/serwo/python/src/utils/classes/azure/user_dag.py @@ -18,6 +18,8 @@ def __init__(self, user_config_path): print("Dag config data - ", self.__dag_config_data) self.__nodeIDMap = {} self.__dag = nx.DiGraph() + self.__conditional_branches = [] + except Exception as e: raise e @@ -43,6 +45,9 @@ def __init__(self, user_config_path): self.__dag.add_edge( self.__nodeIDMap[key], self.__nodeIDMap[val]) + # Conditional Branches after edges: + if "ConditionalBranches" in self.__dag_config_data: + self.__conditional_branches = self.__dag_config_data["ConditionalBranches"] start_node = [node for node in self.__dag.nodes if self.__dag.in_degree(node) == 0][0] self.__dag.nodes[start_node]['ret'] = ["yield ", "context.call_activity(\"" + self.__dag.nodes[start_node]["NodeName"] + "\", serwoObject)"] @@ -53,12 +58,84 @@ def __init__(self, user_config_path): start_node = [node for node in self.__dag.nodes if self.__dag.in_degree(node) == 0][0] self.__dag.nodes[start_node]['ret'] = ["yield ", "context.call_activity(\"" + self.__dag.nodes[start_node]["NodeName"] + "\", serwoObject)"] + def get_conditional_branches(self): + return self.__conditional_branches + + def has_conditional_branches(self): + return len(self.__conditional_branches) > 0 def __load_user_spec(self, user_config_path): with open(user_config_path, "r") as user_dag_spec: dag_data = json.load(user_dag_spec) return dag_data + def _wrap_with_conditional_logic(self, statements, result_var): + """ + Generic wrapper for conditional branching using user input. + """ + if not self.__conditional_branches: + return statements + + branch = self.__conditional_branches[0] + condition_var = branch['ConditionVariable'] + condition_type = branch['ConditionType'] + condition_value = branch['ConditionValue'] + + json_path = condition_var.replace('$.', '') + + wrapped_statements = [] + wrapped_statements.append("# Conditional branching loop") + wrapped_statements.append("should_continue = True") + wrapped_statements.append("") + wrapped_statements.append("while should_continue:") + + # Indent workflow statements (exclude return) - add 4 spaces for while body + for stmt in statements[:-1]: + wrapped_statements.append(" " + stmt) + + wrapped_statements.append("") + wrapped_statements.append(" # Check conditional branching condition") + wrapped_statements.append(" import json") + wrapped_statements.append(" try:") + wrapped_statements.append(" result_dict = json.loads({})".format(result_var)) + wrapped_statements.append(" if '_body' in result_dict:") + wrapped_statements.append(" result_body = result_dict['_body']") + wrapped_statements.append(" else:") + wrapped_statements.append(" result_body = result_dict.get('body', {})") + wrapped_statements.append("") + + # Generate condition check + if condition_type == "BooleanEquals": + # FIX: Use capital True/False + condition_check = "result_body.get('{}', False) == {}".format( + json_path, str(condition_value).capitalize() + ) + elif condition_type == "StringEquals": + condition_check = "result_body.get('{}', '') == '{}'".format(json_path, condition_value) + elif condition_type == "NumericEquals": + condition_check = "result_body.get('{}', 0) == {}".format(json_path, condition_value) + else: + condition_check = "result_body.get('{}', False)".format(json_path) + + wrapped_statements.append(" should_continue = {}".format(condition_check)) + wrapped_statements.append("") + wrapped_statements.append(" # Check iteration limit from user input") + wrapped_statements.append(" current_iter = result_body.get('iteration_count', 0)") + wrapped_statements.append(" max_iter = result_body.get('max_iterations', 1)") + wrapped_statements.append(" if current_iter >= max_iter:") + wrapped_statements.append(" should_continue = False") + wrapped_statements.append("") + wrapped_statements.append(" except Exception as e:") + wrapped_statements.append(" should_continue = False") + wrapped_statements.append("") + wrapped_statements.append(" if not should_continue:") + wrapped_statements.append(" break") + + wrapped_statements.append("") + wrapped_statements.append(statements[-1]) # return statement + + return wrapped_statements + def _generate_random_variable_name(self, n=4): res = ''.join(random.choices(string.ascii_letters, k=n)) return str(res).lower() @@ -276,6 +353,12 @@ def get_orchestrator_code(self): pre_statements.append(post_code) pre_statements.append(f"return {final_var}") - # TODO - for every taskall add the converstion from [serwo_objects] -> serwo_list_object - orchestrator_code = "\n".join([pre_statements[0]] + ["\t" + statement for statement in pre_statements[1:]]) - return orchestrator_code \ No newline at end of file + # Apply conditional branching + + if self.has_conditional_branches(): + pre_statements = self._wrap_with_conditional_logic(pre_statements, final_var) + + # Always add base indentation to be inside orchestrator_function + orchestrator_code = "\n".join([" " + statement for statement in pre_statements]) + + return orchestrator_code diff --git a/serwo/python/src/utils/classes/commons/serwo_objects.py b/serwo/python/src/utils/classes/commons/serwo_objects.py index 77e1de7..449afab 100644 --- a/serwo/python/src/utils/classes/commons/serwo_objects.py +++ b/serwo/python/src/utils/classes/commons/serwo_objects.py @@ -1,4 +1,5 @@ import json +import boto3 class SerWOObject: @@ -66,6 +67,22 @@ def get_basepath(self): def set_basepath(self, basepath): self._basepath = basepath +def fetch_event_payload(body): + """Checks whether incoming input to lambda has large payload enabled and extracts exact payload from the pointer""" + if isinstance(body, dict) and body.get("large_payload") == 1: + csp_info = body.get("aws", {}) + bucket = csp_info.get("bucket") + key = csp_info.get("key") + if bucket and key: + try: + s3 = boto3.client('s3') + obj = s3.get_object(Bucket=bucket, Key=key) + data = obj['Body'].read().decode('utf-8') + return json.loads(data) + except Exception as e: + print(f"[Large payload download error]: {e}") + return body + return body def build_serwo_list_object(event): """ @@ -95,7 +112,7 @@ def build_serwo_list_object(event): for data in functions_metadata_list: for fid, fdata in data.items(): functions_metadata_dict[fid] = fdata - list_obj.add_object(body=record.get("body")) + list_obj.add_object(body=fetch_event_payload(record.get("body"))) # convert the function metadata dict to a list to be added to collated metadata for fid, fdata in functions_metadata_dict.items(): collated_functions_metadata_list.append({fid: fdata}) @@ -106,4 +123,4 @@ def build_serwo_list_object(event): def build_serwo_object(event): - return SerWOObject(body=event["body"], metadata=event["metadata"]) + return SerWOObject(body=fetch_event_payload(event["body"]), metadata=event["metadata"]) diff --git a/serwo/python/src/utils/classes/commons/serwo_user_dag.py b/serwo/python/src/utils/classes/commons/serwo_user_dag.py index 48cece2..5bf9127 100644 --- a/serwo/python/src/utils/classes/commons/serwo_user_dag.py +++ b/serwo/python/src/utils/classes/commons/serwo_user_dag.py @@ -39,6 +39,13 @@ def __init__(self, user_config_path): Path=node["Path"], EntryPoint=node["EntryPoint"], MemoryInMB=node["MemoryInMB"]) + #Adding additional parameters for QXFaas workflows + if "IsAsync" in node: + self.__dag.add_node(nodeID, + IsAsync=node["IsAsync"]) + if "IsContainerised" in node: + self.__dag.add_node(nodeID, + IsContainerised=node["IsContainerised"]) index += 1 # add edges in the dag diff --git a/serwo/scripts/azure/azure_builder.py b/serwo/scripts/azure/azure_builder.py index c86ae0b..8114cd6 100644 --- a/serwo/scripts/azure/azure_builder.py +++ b/serwo/scripts/azure/azure_builder.py @@ -64,6 +64,7 @@ def build_working_dir(region,part_id,is_netherite): else: build_dir += f"azure-{region}-{part_id}" az_functions_path=f"{build_dir}/{user_workflow_name}" + if not os.path.exists(az_functions_path): os.makedirs(az_functions_path) @@ -240,7 +241,7 @@ def generate_function_id(f_id): def copy_all_dirs(fn_dir_path,fin_func_dir): - + dirs = os.listdir(fn_dir_path) for dir in dirs: @@ -335,6 +336,7 @@ def generate_app_name_and_populate_and_get_ingress_queue_name(user_app_name,regi resources_json += f"azure-{region}-{part_id}.json" xd = randint(100000, 999999) app_name = f'xfaas{user_app_name}{xd}' + f = open(resources_json,'r') data = json.loads(f.read()) if 'app_name' not in data: @@ -352,8 +354,10 @@ def build(user_dir, dag_definition_file, region, part_id,is_netherite): USER_DIR = user_dir DAG_DEFINITION_FILE = dag_definition_file init_paths() + build_working_dir(region,part_id,is_netherite) user_fns_data, user_app_name = get_user_workflow_details() + ingress_queue_name, app_name = generate_app_name_and_populate_and_get_ingress_queue_name(user_app_name,region,part_id,is_netherite) build_user_fn_dirs(user_fns_data) copy_meta_files(user_fns_data,ingress_queue_name,app_name,is_netherite) diff --git a/serwo/scripts/azure/azure_deploy.py b/serwo/scripts/azure/azure_deploy.py index cf7ba43..236494a 100644 --- a/serwo/scripts/azure/azure_deploy.py +++ b/serwo/scripts/azure/azure_deploy.py @@ -20,7 +20,7 @@ def init_paths(user_workflow_dir, region , part_id,is_netherite): resources_path = f'{user_workflow_dir}/build/workflow/resources/azure-{region}-{part_id}.json' runtime = 'python' functions_version = 4 - runtime_version = 3.9 + runtime_version = 3.12 os_type = 'linux' diff --git a/serwo/scripts/azure/azure_resource_generator.py b/serwo/scripts/azure/azure_resource_generator.py index b801e5d..451a6d7 100644 --- a/serwo/scripts/azure/azure_resource_generator.py +++ b/serwo/scripts/azure/azure_resource_generator.py @@ -64,7 +64,7 @@ def create_resources(resource_dir, out_file_path, region,is_netherite): jsson = json.loads(json_str) if is_netherite: - netherite_namespace = randomString(6) + netherite_namespace = randomString(8) ## create eventhubs namespace try: diff --git a/serwo/scripts/azure/orchestrator_async_update.py b/serwo/scripts/azure/orchestrator_async_update.py index 5820e42..d8104b4 100644 --- a/serwo/scripts/azure/orchestrator_async_update.py +++ b/serwo/scripts/azure/orchestrator_async_update.py @@ -42,7 +42,10 @@ def codegen(in_var,out_var,func_name): new_code +='\n\tif not body["Poll"] or body["Poll"]==False: ' new_code +='\n\t\tbreak ' new_code +='\n\telse:' - new_code +='\n\t\tdeadline = context.current_utc_datetime + timedelta(seconds=900) ' + new_code +='\n\t\tdelta = 900 ' + new_code +='\n\t\tif "waittime" in body : ' + new_code +='\n\t\t\tdelta = body["waittime"] ' + new_code +='\n\t\tdeadline = context.current_utc_datetime + timedelta(seconds=delta) ' new_code +='\n\t\tyield context.create_timer(deadline)\n\n' return new_code diff --git a/serwo/templates/azure/azure_runner_template.py b/serwo/templates/azure/azure_runner_template.py index b952b06..8f40825 100644 --- a/serwo/templates/azure/azure_runner_template.py +++ b/serwo/templates/azure/azure_runner_template.py @@ -154,8 +154,13 @@ def main(serwoObject, context: az_func.Context) -> str: serwoObject.set_basepath(basepath=basepath) body_before = serwoObject.get_body() input_body_size = objsize.get_deep_size(body_before) + #TODO: Long Message: AWS details preserved in case of long message AND hybrid cloud partioner result + aws_config = body_before.get("aws") if isinstance(body_before, dict) else None serwoObjectResponse = USER_FUNCTION_PLACEHOLDER_function(serwoObject) body_after = serwoObjectResponse.get_body() + if isinstance(body_after, dict) and aws_config is not None: + body_after["aws"] = aws_config + output_body_size = objsize.get_deep_size(body_after) process = psutil.Process(os.getpid()) memory = process.memory_info().rss @@ -170,7 +175,7 @@ def main(serwoObject, context: az_func.Context) -> str: func_json = {func_id: {"start_delta": start_delta, "end_delta": end_delta, "mem_before" : memory_before, "mem_after" : memory_after , "in_payload_bytes" : input_body_size, "out_payload_bytes" : output_body_size}} metadata["functions"].append(func_json) metadata = metadata - body = serwoObjectResponse.get_body() + body = body_after return SerWOObject(body=body, metadata=metadata).to_json() except Exception as e: logging.info("excep= " + str(e)) diff --git a/serwo/templates/azure/json-templates/host-v2.json b/serwo/templates/azure/json-templates/host-v2.json index ad59628..e0731af 100644 --- a/serwo/templates/azure/json-templates/host-v2.json +++ b/serwo/templates/azure/json-templates/host-v2.json @@ -23,10 +23,9 @@ "maxConcurrentOrchestratorFunctions": 8, "storageProvider": { "type" : "Netherite", - "PartitionCount": "32", "StorageConnectionName": "AzureWebJobsStorage", "EventHubsConnectionName": "EventHubsConnection", - "partitionCount": 1 + "partitionCount": 32 } } diff --git a/serwo/templates/azure/predefined-functions/CollectLogs/func.py b/serwo/templates/azure/predefined-functions/CollectLogs/func.py index b0cfd2b..73c38bd 100644 --- a/serwo/templates/azure/predefined-functions/CollectLogs/func.py +++ b/serwo/templates/azure/predefined-functions/CollectLogs/func.py @@ -17,7 +17,7 @@ def user_function(serwoObject) -> SerWOObject: data = serwoObject.get_body() logging.info("Data to push - "+str(data)) metadata = serwoObject.get_metadata() - data = {"body": "success: OK"} + data = {"body": data} fin_dict["data"] = data fin_dict["metadata"] = metadata logging.info("Fin dict - "+str(fin_dict)) diff --git a/serwo/templates/azure/predefined-functions/Orchestrate/__init__.py b/serwo/templates/azure/predefined-functions/Orchestrate/__init__.py index f4e2858..eeb868f 100644 --- a/serwo/templates/azure/predefined-functions/Orchestrate/__init__.py +++ b/serwo/templates/azure/predefined-functions/Orchestrate/__init__.py @@ -159,19 +159,41 @@ def orchestrator_function(context: df.DurableOrchestrationContext): serwoObject = build_serwo_object(inp_dict).to_json() # user dag execution - oenu = yield context.call_activity("graphGen", serwoObject) - uxwf = [] - rufp = context.call_activity("graphBft", oenu) - zzge = context.call_activity("graphMst", oenu) - rtyj = context.call_activity("pagerank", oenu) - uxwf.append(rufp) - uxwf.append(zzge) - uxwf.append(rtyj) - qmud = yield context.task_all(uxwf) - zbxu = yield context.call_activity("aggregate", qmud) - zbxu = insert_end_stats_in_metadata(zbxu) - frrb = yield context.call_activity("CollectLogs", zbxu) - return frrb + # Conditional branching loop + should_continue = True + + while should_continue: + xvyi = yield context.call_activity("Planner", serwoObject) + urxb = yield context.call_activity("Actor", xvyi) + zupq = yield context.call_activity("Evaluator", urxb) + zupq = insert_end_stats_in_metadata(zupq) + pmbi = yield context.call_activity("CollectLogs", zupq) + + # Check conditional branching condition + import json + try: + result_dict = json.loads(pmbi) + if '_body' in result_dict: + result_body = result_dict['_body'] + else: + result_body = result_dict.get('body', {}) + + should_continue = result_body.get( + 'body.needs_retry', False) == True + + # Check iteration limit from user input + current_iter = result_body.get('iteration_count', 0) + max_iter = result_body.get('max_iterations', 1) + if current_iter >= max_iter: + should_continue = False + + except Exception as e: + should_continue = False + + if not should_continue: + break + + return pmbi main = df.Orchestrator.create(orchestrator_function) diff --git a/serwo/templates/azure/predefined-functions/QueueTrigger/__init__.py b/serwo/templates/azure/predefined-functions/QueueTrigger/__init__.py index 35c1a06..3c0685a 100644 --- a/serwo/templates/azure/predefined-functions/QueueTrigger/__init__.py +++ b/serwo/templates/azure/predefined-functions/QueueTrigger/__init__.py @@ -5,6 +5,7 @@ from time import time import azure.durable_functions as df + def get_delta(start_time): curr_time = int(time() * 1000) return (curr_time-start_time) @@ -12,14 +13,28 @@ def get_delta(start_time): func_id = 253 -app_name = 'xfaasUserWf0000434144' +app_name = 'xfaasUserWf0000164304' async def main(msg: func.QueueMessage,starter: str) -> None: logging.info('Python queue trigger function processed a queue item: %s', msg.get_body().decode('utf-8')) URL = f'https://{app_name}.azurewebsites.net/api/orchestrators/Orchestrate' - metadata = json.loads(msg.get_body().decode('utf-8'))['metadata'] - body = json.loads(msg.get_body().decode('utf-8'))['body'] + msg_dict = json.loads(msg.get_body().decode('utf-8')) + + # If the message is a pointer to a large payload, fetch it from Blob + if msg_dict.get('large_payload') == 1 and "azure" in msg_dict: + blob_url = msg_dict["azure"]["blob_url"] + resp = requests.get(blob_url) + if resp.status_code == 200: + payload = json.loads(resp.content) + body = payload.get("body") + metadata = payload.get("metadata") + else: + raise Exception(f"Could not retrieve blob from {blob_url} - status code: {resp.status_code}") + + else: + metadata = msg_dict['metadata'] + body = msg_dict['body'] start_delta = get_delta(metadata['workflow_start_time']) end_delta = get_delta(metadata['workflow_start_time']) diff --git a/serwo/templates/azure/predefined-functions/QueueTrigger/queue_trigger_runner_template.py b/serwo/templates/azure/predefined-functions/QueueTrigger/queue_trigger_runner_template.py index 7cdc03f..e65c52e 100644 --- a/serwo/templates/azure/predefined-functions/QueueTrigger/queue_trigger_runner_template.py +++ b/serwo/templates/azure/predefined-functions/QueueTrigger/queue_trigger_runner_template.py @@ -5,6 +5,7 @@ from time import time import azure.durable_functions as df + def get_delta(start_time): curr_time = int(time() * 1000) return (curr_time-start_time) @@ -18,8 +19,22 @@ def get_delta(start_time): async def main(msg: func.QueueMessage,starter: str) -> None: logging.info('Python queue trigger function processed a queue item: %s', msg.get_body().decode('utf-8')) URL = f'https://{app_name}.azurewebsites.net/api/orchestrators/Orchestrate' - metadata = json.loads(msg.get_body().decode('utf-8'))['metadata'] - body = json.loads(msg.get_body().decode('utf-8'))['body'] + msg_dict = json.loads(msg.get_body().decode('utf-8')) + + # If the message is a pointer to a large payload, fetch it from Blob + if msg_dict.get('large_payload') == 1 and "azure" in msg_dict: + blob_url = msg_dict["azure"]["blob_url"] + resp = requests.get(blob_url) + if resp.status_code == 200: + payload = json.loads(resp.content) + body = payload.get("body") + metadata = payload.get("metadata") + else: + raise Exception(f"Could not retrieve blob from {blob_url} - status code: {resp.status_code}") + + else: + metadata = msg_dict['metadata'] + body = msg_dict['body'] start_delta = get_delta(metadata['workflow_start_time']) end_delta = get_delta(metadata['workflow_start_time']) diff --git a/serwo/templates/azure/push-to-storage-queue-template/PushToStorageQueue/push_to_azure_q.py b/serwo/templates/azure/push-to-storage-queue-template/PushToStorageQueue/push_to_azure_q.py index f833e03..e507499 100644 --- a/serwo/templates/azure/push-to-storage-queue-template/PushToStorageQueue/push_to_azure_q.py +++ b/serwo/templates/azure/push-to-storage-queue-template/PushToStorageQueue/push_to_azure_q.py @@ -1,7 +1,11 @@ from azure.storage.queue import QueueService, QueueMessageFormat +from azure.storage.blob import BlobServiceClient import json from python.src.utils.classes.commons.serwo_objects import SerWOObject import os, uuid +from azure.storage.blob import generate_blob_sas, BlobSasPermissions +from datetime import datetime, timedelta, timezone +from azure.core.exceptions import ResourceExistsError connect_str = "{{connection_string}}" queue_service = QueueService(connection_string=connect_str) @@ -10,15 +14,75 @@ queue_service.encode_function = QueueMessageFormat.binary_base64encode queue_service.decode_function = QueueMessageFormat.binary_base64decode +MAX_QUEUE_MSG_SIZE = 64 * 1024 + +def parse_connection_string(conn_str): + tokens = dict(x.split('=', 1) for x in conn_str.strip().split(';') if '=' in x) + #tokens is a dict with all the values in the connections string like DefaultEndpointsProtocol, AccountName, AccountKey, BlobEndpoint, .. etc + account_name = tokens.get('AccountName') + account_key = tokens.get('AccountKey') + return account_name, account_key + +def upload_to_blob(payload, container): + blob_service = BlobServiceClient.from_connection_string(connect_str) + container_client = blob_service.get_container_client(container) + if not container_client.exists(): + try: + container_client.create_container() + except ResourceExistsError: + print("Container already exists, probably due to a parallel workflow") + pass + blob_name = f"{uuid.uuid4()}.json" + blob_client = blob_service.get_blob_client(container=container, blob=blob_name) + data_bytes = json.dumps(payload).encode("utf-8") + blob_client.upload_blob(data_bytes, overwrite=True) + return blob_name + def user_function(serwoObject) -> SerWOObject: try: fin_dict = dict() data = serwoObject.get_body() metadata = serwoObject.get_metadata() - fin_dict["body"] = data + fin_dict["body"] = data fin_dict["metadata"] = metadata - queue_service.put_message(queue_name, json.dumps(fin_dict).encode("utf-8")) + #TODO: + if len(json.dumps(fin_dict).encode("utf-8")) > MAX_QUEUE_MSG_SIZE: + blob_details = data.get(data.get("csp")) + container_name = blob_details.get("container") if isinstance(blob_details, dict) else None + if not container_name: + container_name = "xfaas-large-payload-container" + + blob_name = upload_to_blob(fin_dict, container_name) + + account_name, account_key = parse_connection_string(connect_str) + sas_token = generate_blob_sas( + account_name=account_name, + container_name=container_name, + blob_name=blob_name, + account_key=account_key, + permission=BlobSasPermissions(read=True), + expiry=datetime.now(timezone.utc) + timedelta(hours=1) + ) + blob_url_with_sas = f"https://{account_name}.blob.core.windows.net/{container_name}/{blob_name}?{sas_token}" + + + + payload_data = { + + "large_payload": 1, + "azure": { + "container": container_name, + "blob_name": blob_name, + "blob_url": blob_url_with_sas + + } + } + queue_service.put_message(queue_name, json.dumps(payload_data).encode("utf-8")) + + + else: + queue_service.put_message(queue_name, json.dumps(fin_dict).encode("utf-8")) return SerWOObject(body=data) except Exception as e: print(e) diff --git a/serwo/templates/azure/push-to-storage-queue-template/PushToStorageQueue/requirements.txt b/serwo/templates/azure/push-to-storage-queue-template/PushToStorageQueue/requirements.txt index 74c8949..b808198 100644 --- a/serwo/templates/azure/push-to-storage-queue-template/PushToStorageQueue/requirements.txt +++ b/serwo/templates/azure/push-to-storage-queue-template/PushToStorageQueue/requirements.txt @@ -1,2 +1,3 @@ azure-storage-queue==2.1.0 -psutil \ No newline at end of file +psutil +azure-storage-blob==12.19.1 \ No newline at end of file diff --git a/serwo/xfaas_benchmark.py b/serwo/xfaas_benchmark.py index 0778579..4f97dfa 100644 --- a/serwo/xfaas_benchmark.py +++ b/serwo/xfaas_benchmark.py @@ -25,7 +25,7 @@ def calculate_sub_dag_latency(csp_id, nodes, sub_dag): for node in nodes: successors = sub_dag.successors(node) for succ in successors: - edge_latency = sub_dag.edges[(node, succ)]['EdgeBenchmark']['Latencies'][int(csp_id)][int(csp_id)] + edge_latency = sub_dag.edges[(node, succ)]['EdgeBenchmarks']['Latencies'][int(csp_id)][int(csp_id)] node_latency = sub_dag.nodes[node]['NodeBenchmark'][csp_id]['Latency'] val = edge_latency + node_latency sub_dag.edges[(node, succ)]['edge_latency'] = -1 * val @@ -58,10 +58,11 @@ def evaluate_node_and_edge_constraints(csp_id, sub_dag, user_pinned_nodes): if pinned_csp != csp_id: flag = True break - for ed in sub_dag.edges: - if sub_dag.edges[ed]['EdgeBenchmark']['DataTransferSize'] > 256 and csp_id == '0': - flag = True - break + #Added Large Payload Support + # for ed in sub_dag.edges: + # if sub_dag.edges[ed]['EdgeBenchmark']['DataTransferSize'] > 256 and csp_id == '0': + # flag = True + # break return flag @@ -107,7 +108,7 @@ def populate_inter_cloud_data_transfers(user_dag_copy, valid_partition_points): successors = list(user_dag_copy.successors(nd['node_id'])) for sc in successors: inter_cloud_data_transfer.append( - user_dag_copy.edges[(nd['node_id'], sc)]['EdgeBenchmark']['DataTransferSize']) + user_dag_copy.edges[(nd['node_id'], sc)]['EdgeBenchmarks']['DataTransferSize']) break return inter_cloud_data_transfer @@ -121,7 +122,7 @@ def populate_latanecy_benchmarks(bm_data, cloud_ids, edges, latency_map, user_da # for dest in neighbors: edges_data[(src, neighbors)] = bm_data['EdgeBenchmarks'][src][neighbors] for ed in edges: - user_dag_copy.edges[ed]['EdgeBenchmark'] = edges_data[ed] + user_dag_copy.edges[ed]['EdgeBenchmarks'] = edges_data[ed] top_sort = list(nx.topological_sort(user_dag_copy)) for i in range(0, len(valid_partition_points)): sub_nodes = [] @@ -152,7 +153,7 @@ def populate_latanecy_benchmarks(bm_data, cloud_ids, edges, latency_map, user_da def populate_data_transfer_benchmarks(cloud_ids, edges, user_dag_copy): data_transfer_benchmark = [] for ed in edges: - edge_bm = user_dag_copy.edges[ed]['EdgeBenchmark']['Latencies'] + edge_bm = user_dag_copy.edges[ed]['EdgeBenchmarks']['Latencies'] for csp_id_outer in cloud_ids: temp = [] for csp_id_inner in cloud_ids: diff --git a/serwo/xfaas_debugger_donot_commit.py b/serwo/xfaas_debugger_donot_commit.py new file mode 100644 index 0000000..c4bccdc --- /dev/null +++ b/serwo/xfaas_debugger_donot_commit.py @@ -0,0 +1,23 @@ +from xfbench_plotter import XFBenchPlotter + + +def plot_metrics(user_wf_dir, wf_deployment_id, run_id, wf_name,region): + breakpoint() + format = 'pdf' + plotter = XFBenchPlotter(user_wf_dir, wf_deployment_id, run_id,format) + plotter.plot_e2e_timeline(xticks=[], yticks=[],is_overlay=True,region=region) + figwidth = 7 + if wf_name == 'fileProcessing' or wf_name == 'math': + figwidth = 20 + plotter.plot_stagewise( yticks=[],figwidth=figwidth) + plotter.plot_cumm_e2e(yticks=[]) + # plotter.plot_e2e_invocations_wnwo_containers(csp="azure", yticks=[]) + + +if __name__ == "__main__": + user_wf_dir = "/Users/vaibhavjha/Documents/IISc/xfbench_multicloud/XFaaS/serwo/examples/static-qfanout-simulator-aws/workflow-gen" + wf_deployment_id = "random-d51-static" + run_id = "exp_1" + wf_name = "staticfanout" + region = "ap-south-1" + plot_metrics(user_wf_dir, wf_deployment_id, run_id, wf_name, region) \ No newline at end of file diff --git a/serwo/xfaas_main.py b/serwo/xfaas_main.py index 0481768..99b0156 100644 --- a/serwo/xfaas_main.py +++ b/serwo/xfaas_main.py @@ -21,37 +21,40 @@ import shutil import networkx as nx -# parser = argparse.ArgumentParser( -# prog="ProgramName", -# description="What the program does", -# epilog="Text at the bottom of help", -# ) -# parser.add_argument("--csp",dest='csp',type=str,help="CSP name") -# parser.add_argument("--region",dest='region',type=str,help="Region name") -# parser.add_argument("--wf-user-directory",dest='wf_user_directory',type=str,help="Workflow user directory") -# parser.add_argument("--dag-benchmark",dest='dag_benchmark',type=str,help="Path DAG Benchmark") -# parser.add_argument("--dag-file-name",dest='dag_filename',type=str,help="DAG FILE NAME") -# parser.add_argument("--is-async",dest='is_async',type=str,help="Is Async Fn",default=0) -# # parser.add_argument("--is-containerbased-aws",dest='is_containerbasedaws',type=str,help="Is Async Fn",default=0) +parser = argparse.ArgumentParser( + prog="ProgramName", + description="What the program does", + epilog="Text at the bottom of help", +) +parser.add_argument("--csp",dest='csp',type=str,help="CSP name") +parser.add_argument("--region",dest='region',type=str,help="Region name") +parser.add_argument("--wf-user-directory",dest='wf_user_directory',type=str,help="Workflow user directory") +parser.add_argument("--dag-benchmark",dest='dag_benchmark',type=str,help="Path DAG Benchmark") +parser.add_argument("--dag-file-name",dest='dag_filename',type=str,help="DAG FILE NAME") +parser.add_argument("--is-async",dest='is_async',type=str,help="Is Async Fn",default=0) +parser.add_argument("--is-containerbased-aws",dest='is_containerbasedaws',type=str,help="Is Async Fn",default=0) project_dir = pathlib.Path(__file__).parent.resolve() -# args = parser.parse_args() - -# is_containerbasedaws = bool(int(args.is_containerbasedaws)) -# USER_DIR = args.wf_user_directory -# DAG_DEFINITION_FILE = args.dag_filename - -# DAG_DEFINITION_PATH = f"{USER_DIR}/{DAG_DEFINITION_FILE}" -# BENCHMARK_FILE = args.dag_benchmark -# benchmark_path = f'{USER_DIR}/{BENCHMARK_FILE}' -# csp = args.csp -# region = args.region +args, unkown = parser.parse_known_args() +print("Passed arguments to xfaas_main:",args) + +is_containerbasedaws = bool(int(args.is_containerbasedaws)) +USER_DIR = args.wf_user_directory +DAG_DEFINITION_FILE = args.dag_filename + +DAG_DEFINITION_PATH = f"{USER_DIR}/{DAG_DEFINITION_FILE}" +BENCHMARK_FILE = args.dag_benchmark +benchmark_path = f'{USER_DIR}/{BENCHMARK_FILE}' +csp = args.csp +region = args.region part_id = "test" def get_user_pinned_nodes(): config = json.loads(open(f'{project_dir}/config/xfaas_user_config.json', 'r').read()) if "user_pinned_nodes" in config: + print("User Pinned Nodes Detected") + print(config['user_pinned_nodes']) return config['user_pinned_nodes'] else: return None @@ -248,6 +251,10 @@ def add_collect_logs(dag_definition_path,user_wf_dir, xfaas_user_dag,region,part def swap(a,b): return b,a def generate_new_dags(partition_config, xfaas_user_dag, user_wf_dir, dag_definition_path): + + # Load original DAG config to preserve ConditionalBranches + with open(dag_definition_path, 'r') as file: + original_dag_config = json.load(file) src_node = None sink_node = None @@ -280,7 +287,7 @@ def generate_new_dags(partition_config, xfaas_user_dag, user_wf_dir, dag_definit nodes_in_between = [] for i in range(st_ind, en_ind+1): nodes_in_between.append(top_sort_nodes[i]) - + subdag = nx_dag.subgraph(nodes_in_between) dagg = {} part_id = partition_config[0].get_part_id() @@ -293,11 +300,11 @@ def generate_new_dags(partition_config, xfaas_user_dag, user_wf_dir, dag_definit csp = partition_config[0].get_left_csp().get_name() region = partition_config[0].get_region() - write_dag_for_partition( user_wf_dir, dagg,part_id,csp,region) + write_dag_for_partition( user_wf_dir, dagg,part_id,csp,region, original_dag_config) for i in range(1, len(partition_config)): - + start_node = partition_config[i-1].get_function_name() end_node = partition_config[i].get_function_name() ## subdag with start node and end node @@ -365,9 +372,9 @@ def generate_new_dags(partition_config, xfaas_user_dag, user_wf_dir, dag_definit csp = partition_config[i].get_left_csp().get_name() region = partition_config[i].get_region() - write_dag_for_partition( user_wf_dir, dagg,part_id,csp,region) + write_dag_for_partition( user_wf_dir, dagg,part_id,csp,region, original_dag_config) -def write_dag_for_partition(user_wf_dir, dagg, part_id, csp, region): +def write_dag_for_partition(user_wf_dir, dagg, part_id, csp, region, original_dag_config=None): out_dag_name = "dag.json" directory = f'{user_wf_dir}/partitions/{csp}-{region}-{part_id}' if not os.path.exists(directory): @@ -375,6 +382,11 @@ def write_dag_for_partition(user_wf_dir, dagg, part_id, csp, region): else: shutil.rmtree(directory) os.makedirs(directory) + + # Add ConditionalBranches if present in original DAG + if original_dag_config and "ConditionalBranches" in original_dag_config: + dagg["ConditionalBranches"] = original_dag_config["ConditionalBranches"] + with open(f'{directory}/{out_dag_name}', 'w') as file: file.write(json.dumps(dagg, indent=4)) @@ -397,7 +409,7 @@ def run(user_wf_dir, dag_definition_file, benchmark_file, csp,region): generate_new_dags(partition_config, xfaas_user_dag, user_wf_dir, dag_definition_path) print(user_wf_dir) - + #breaakpoint() # Writes part details to a Json file in user_wf_dir/partitions part_details = {} part_ids = [] @@ -414,6 +426,7 @@ def run(user_wf_dir, dag_definition_file, benchmark_file, csp,region): ) part_ids.append(part_id) + print(p.get_function_name(), p.get_part_id(), p.get_left_csp().get_name(), p.get_region()) # partition_config = [PartitionPoint("function_name", 2, csp, None, part_id, region)] @@ -425,8 +438,7 @@ def run(user_wf_dir, dag_definition_file, benchmark_file, csp,region): with open(f'{user_wf_dir}/partitions/part-details.json', 'w') as json_file: json.dump(part_details, json_file, indent=4) - - + wf_id = xfaas_provenance.push_user_dag(dag_definition_path) last_partition = partition_config[-1] part_id = last_partition.get_part_id() @@ -442,11 +454,11 @@ def run(user_wf_dir, dag_definition_file, benchmark_file, csp,region): xfaas_resource_generator.generate(user_wf_dir, partition_config,"dag.json") xfaas_provenance.generate_provenance_artifacts(user_wf_dir,wf_id,refactored_wf_id,wf_deployment_id,csp,region,part_id,queue_details) - return '', '', '' + #return '', '', '' return wf_id, refactored_wf_id, wf_deployment_id -# if __name__ == '__main__': +if __name__ == '__main__': -# wf_id, refactored_wf_id, wf_deployment_id = run(f'{USER_DIR}', DAG_DEFINITION_FILE, BENCHMARK_FILE, csp,region) + wf_id, refactored_wf_id, wf_deployment_id = run(f'{USER_DIR}', DAG_DEFINITION_FILE, BENCHMARK_FILE, csp,region) diff --git a/serwo/xfaas_optimizer.py b/serwo/xfaas_optimizer.py index 67e7664..72de5c4 100644 --- a/serwo/xfaas_optimizer.py +++ b/serwo/xfaas_optimizer.py @@ -33,6 +33,16 @@ def translate(clouds,cloud_dictionary,valid_partition_points,user_dag): i = 0 part_id = "0000" kk = 0 + #Adding a condition for pinned Single Node workflows: + breakpoint + if (len(valid_partition_points) == 1 and len(set(clouds)) == 1 and len(user_dag.get_dag().edges) == 0): + function_name = (user_dag.get_dag()).nodes[valid_partition_points[0]['node_id']]['NodeName'] + out_degree = valid_partition_points[0]['out_degree'] + csp = CSP(cloud_dictionary[str(clouds[0])]['csp']) + region = cloud_dictionary[str(clouds[0])]['region'] + partition_point = PartitionPoint(function_name, out_degree, csp, None, part_id, region) + return [partition_point] + while(i < len(clouds)): j = i while j < len(clouds) and clouds[j] == clouds[i]: diff --git a/serwo/xfaas_resource_generator.py b/serwo/xfaas_resource_generator.py index 239db8a..7477a89 100644 --- a/serwo/xfaas_resource_generator.py +++ b/serwo/xfaas_resource_generator.py @@ -6,7 +6,7 @@ import os def generate(user_dir, partition_config, dag_definition_file): partition_config = list(reversed(partition_config)) - + for i in range(len(partition_config)): if i==0: csp = partition_config[i].get_left_csp().get_name() @@ -14,15 +14,17 @@ def generate(user_dir, partition_config, dag_definition_file): part_id = partition_config[i].get_part_id() print(f"Building resources for {csp} in {region} with part_id {part_id}") csp_temp = csp.split('_') + main_csp = csp csp = csp_temp[0] if len(csp_temp) > 1: is_netherite = True else: is_netherite = False - updated_user_dir = f"{user_dir}/partitions/{csp}-{region}-{part_id}" + updated_user_dir = f"{user_dir}/partitions/{main_csp}-{region}-{part_id}" dag_definition_path = f"{updated_user_dir}/{dag_definition_file}" else: + downstream_csp = partition_config[i-1].get_left_csp().get_name() downstream_region = partition_config[i-1].get_region() downstream_part_id = partition_config[i-1].get_part_id() @@ -35,6 +37,13 @@ def generate(user_dir, partition_config, dag_definition_file): region = partition_config[i].get_region() part_id = partition_config[i].get_part_id() updated_user_dir = f"{user_dir}/partitions/{csp}-{region}-{part_id}" + csp_temp = csp.split('_') + main_csp = csp + csp = csp_temp[0] + if len(csp_temp) > 1: + is_netherite = True + else: + is_netherite = False dag_path = f"{updated_user_dir}/{dag_definition_file}" with open(dag_path, "r") as dag_file: dag_from_file = json.load(dag_file) @@ -80,7 +89,7 @@ def generate(user_dir, partition_config, dag_definition_file): "queue_name": queue_name, "connection_string": connection_string, } - + root_dir = os.path.dirname(os.path.abspath(__file__)) template_dir = f"{root_dir}/templates/azure/push-to-storage-queue-template/{function_name}" output_path = f"{updated_user_dir}/" os.system(f"cp -r {template_dir} {output_path}") diff --git a/serwo/xfaas_run_benchmark.py b/serwo/xfaas_run_benchmark.py index b7c337e..2dbb537 100644 --- a/serwo/xfaas_run_benchmark.py +++ b/serwo/xfaas_run_benchmark.py @@ -47,7 +47,7 @@ def get_client_login_details(config_path): data = json.load(f) data = data[args.client_key] server_ip = data['server_ip'] - server_user_id = data['server_user_id'] + server_user_id = data['server_user_id'] if 'server_pem_file_path' in data: server_pem_file_path = data['server_pem_file_path'] @@ -70,12 +70,13 @@ def get_azure_payload(payload): def read_dynamism_file(dynamism,duration, max_rps): file_path = os.getenv("XFBENCH_DIR") + f"/workloads/{dynamism}-{max_rps}-{duration}.csv" + breakpoint() with open(file_path) as f: data = f.readlines() data = [x.strip() for x in data] final_data = [] for d in data: - vals = [float(x) for x in d.split(",") if x != "" ] # '''and 'KB' not in x''' + vals = [float(x) for x in d.split(",") if x != "" and 'KB' not in x ] # '''and 'KB' not in x''' # size = d.split(",")[-1] final_data.append((vals[0],vals[1])) @@ -281,7 +282,8 @@ def generate_shell_script_and_scp(csp,payload_size, wf_name, rps, duration,dynam os.system(f"ssh {server_user_id}@{server_ip} ./shell_scripts/{shell_file_name}") else: os.system(f"chmod +x {output_path}") - os.system(f"./{output_path}") + os.system(f"{output_path}") + #os.system(f"./{output_path}") use when you don't have full path def load_payload(wf_user_directory,payload_size): payload_path = f"{wf_user_directory}/samples/{payload_size}/input/input.json" @@ -322,10 +324,20 @@ def run_workload(csp,region,part_id,max_rps,duration,payload_size,dynamism,wf_na duration = d[0] rps = d[1] # payload_size = d[2] - # payload = load_payload(wf_user_directory,payload_size) ne_session_id = session_id + str(i) + # payload = load_payload(wf_user_directory,payload_size) + breakpoint() + if dynamism == 'slow': # Or match your exact dynamism string + duration_segment = 600.0 # Force ThreadGroup.duration=600s (10 min alive) + target_throughput = 1.0 # Force ConstantThroughputTimer throughput=1.0 (1 sample/minute) + # Pass target_throughput instead of rps * 60.0 to make_jmx_file + make_jmx_file(csp, target_throughput, duration_segment, payload_size, wf_name, execute_url, state_machine_arn, dynamism, ne_session_id, wf_user_directory, part_id, region, wf_deployment_id, run_id, payload, is_localhost) + else: + # Original logic for other experiments + make_jmx_file(csp, rps * 60.0, duration, payload_size, wf_name, execute_url, state_machine_arn, dynamism, ne_session_id, wf_user_directory, part_id, region, wf_deployment_id, run_id, payload, is_localhost) - make_jmx_file(csp, rps * 60.0, duration, payload_size, wf_name, execute_url,state_machine_arn, dynamism, ne_session_id, wf_user_directory, part_id, region , wf_deployment_id, run_id,payload,is_localhost) + + #make_jmx_file(csp, rps * 60.0, duration, payload_size, wf_name, execute_url,state_machine_arn, dynamism, ne_session_id, wf_user_directory, part_id, region , wf_deployment_id, run_id,payload, is_localhost) i += 1 generate_shell_script_and_scp(csp,payload_size, wf_name, max_rps, duration,dynamism,region,is_localhost) @@ -457,13 +469,14 @@ def local_teardown(wf_user_directory): wf_user_directory = os.getenv("XFBENCH_DIR") + f"/workflows/singleton_workflows/{function_class}/{function_name}" else: build_workflow(wf_user_directory) + breakpoint() wf_user_directory += "/workflow-gen" print('==================DEPLOYING WF===========================') wf_id, refactored_wf_id, wf_deployment_id = deploy_workflow(wf_user_directory,dag_filename, region,csp) - + breakpoint() ## write deployment id to a file create if not exists else append deps = [] xfaas_dir = os.getenv('XFAAS_DIR') diff --git a/serwo/xfbench_plotter.py b/serwo/xfbench_plotter.py index 502e6cf..3fcd281 100644 --- a/serwo/xfbench_plotter.py +++ b/serwo/xfbench_plotter.py @@ -35,6 +35,7 @@ class XFBenchPlotter: plt.rcParams['legend.fontsize'] = 20 def __init__(self, workflow_directory: str, workflow_deployment_id: str, run_id: str, format: str): + breakpoint() self.__workflow_directory = workflow_directory self.__workflow_deployment_id = workflow_deployment_id self.__run_id = run_id @@ -142,11 +143,12 @@ def __get_outfile_prefix(self): def __create_dynamo_db_items(self): + breakpoint() print("Creating DynamoDB items") dynamodb_item_list = [] queue = QueueClient.from_connection_string(conn_str=self.__conn_str, queue_name=self.__queue_name) - response = queue.receive_messages(visibility_timeout=3000) + response = queue.receive_messages(visibility_timeout=180) print('Reading Queue') for message in response: queue_item = json.loads(message.content) diff --git a/setup-xfaas.sh b/setup-xfaas.sh index afa1dcc..28c77a3 100755 --- a/setup-xfaas.sh +++ b/setup-xfaas.sh @@ -1,6 +1,6 @@ #!/bin/bash export AZURE_SUBSCRIPTION_ID=9dc7b503-5f49-4742-8f12-07fad514c633 -export XFAAS_DIR=/Users/varad.kulkarni/xfaas/XFaaS -export XFAAS_WF_DIR=/Users/varad.kulkarni/xfaas/xfaas-workloads -export XFBENCH_DIR=/Users/varad.kulkarni/xfaas/xfaas-workloads \ No newline at end of file +export XFAAS_DIR=/Users/vaibhavjha/Documents/IISc/xfbench_multicloud/XFaaS +export XFAAS_WF_DIR=/Users/vaibhavjha/Documents/IISc/xfbench_multicloud/XFaaS +export XFBENCH_DIR=/Users/vaibhavjha/Documents/IISc/xfbench_multicloud/XFBench \ No newline at end of file From caec3b04ea8c6aec65894c7e88fa9a0e30783717 Mon Sep 17 00:00:00 2001 From: Vaibhav Jha Date: Mon, 22 Dec 2025 17:42:08 +0530 Subject: [PATCH 2/3] added support for model injection --- serwo/aws_create_statemachine.py | 36 +++++++ serwo/config/xfaas_user_config.json | 6 +- .../python/src/utils/classes/aws/function.py | 6 +- .../python/src/utils/classes/aws/user_dag.py | 5 +- .../src/utils/classes/azure/user_dag.py | 81 +++++++++++++++- .../utils/classes/commons/serwo_user_dag.py | 5 + .../Orchestrate/__init__.py | 94 +++++++++++++++++-- .../QueueTrigger/__init__.py | 2 +- serwo/xfaas_run_benchmark.py | 2 +- 9 files changed, 222 insertions(+), 15 deletions(-) diff --git a/serwo/aws_create_statemachine.py b/serwo/aws_create_statemachine.py index b8f80b6..069dbbf 100644 --- a/serwo/aws_create_statemachine.py +++ b/serwo/aws_create_statemachine.py @@ -383,6 +383,42 @@ def __generate_asl_template(self): data['States']['Success'] = { 'Type': 'Succeed' } + + # NEW: Inject model_name Parameters for each Task state + function_object_map = self.__user_dag.get_node_object_map() + + # Define which nodes should NOT get Parameters injection + # - CollectLogs: system function + # - Start node: receives raw input without XFaaS wrapper + nodes_not_to_inject = ['CollectLogs'] + start_node = data.get('StartAt') + if start_node: + nodes_not_to_inject.append(start_node) + + for state_name, state_def in data['States'].items(): + if state_def.get('Type') == 'Task' and state_name not in nodes_not_to_inject: + if state_name in function_object_map: + model_name = function_object_map[state_name].get_model_name() + state_def['Parameters'] = { + "statusCode.$": "$.statusCode", + "metadata.$": "$.metadata", + "body": { + "model_name": model_name, + "_xfaas_wrapped_body.$": "$.body" + } + } + print(f"Injected model_name '{model_name}' for state '{state_name}'") + else: + print(f"DEBUG: State '{state_name}' not found in function_object_map, using default") + # Still inject Parameters for consistency (with default model) + state_def['Parameters'] = { + "statusCode.$": "$.statusCode", + "metadata.$": "$.metadata", + "body": { + "model_name": "openai:gpt-4o-mini", # fallback default + "_xfaas_wrapped_body.$": "$.body" + } + } # print("Updated data of sfn builder after adding poll",data) with open(f"{self.__aws_build_dir}/{self.__json_file}", "w") as statemachinejson: diff --git a/serwo/config/xfaas_user_config.json b/serwo/config/xfaas_user_config.json index 5afb926..dd8d0ff 100644 --- a/serwo/config/xfaas_user_config.json +++ b/serwo/config/xfaas_user_config.json @@ -10,8 +10,8 @@ } }, "user_pinned_nodes" : { - "1":"1", - "2":"1", - "3":"1" + "1":"0", + "2":"0", + "3":"0" } } \ No newline at end of file diff --git a/serwo/python/src/utils/classes/aws/function.py b/serwo/python/src/utils/classes/aws/function.py index 18d84a1..04b33a1 100644 --- a/serwo/python/src/utils/classes/aws/function.py +++ b/serwo/python/src/utils/classes/aws/function.py @@ -1,5 +1,5 @@ class Function: - def __init__(self, id, name, path, end_point, memory): + def __init__(self, id, name, path, end_point, memory, model_name=None): self._name = name self._id = id self._arn = name + "Arn" @@ -10,6 +10,7 @@ def __init__(self, id, name, path, end_point, memory): self._uri = "functions/" + name self._module_name = end_point.split(".")[0] self._memory = memory + self._model_name = model_name or "openai:gpt-4o-mini" # Default model # TODO - add function id self._isasync = False self._iscontainerised=False @@ -35,6 +36,9 @@ def get_handler(self): def get_module_name(self): return self._module_name + def get_model_name(self): + return self._model_name + def get_as_dict(self): return { "name": self._name, diff --git a/serwo/python/src/utils/classes/aws/user_dag.py b/serwo/python/src/utils/classes/aws/user_dag.py index 99aa7d9..4d98624 100644 --- a/serwo/python/src/utils/classes/aws/user_dag.py +++ b/serwo/python/src/utils/classes/aws/user_dag.py @@ -35,13 +35,16 @@ def __init__(self, user_config_path): nodeID = "n" + str(index) self.__nodeIDMap[node["NodeName"]] = nodeID self.__nodeIDMap[node["NodeId"]] = nodeID + model_name = node.get("ModelName", "openai:gpt-4o-mini") self.__functions[node["NodeName"]] = Function( node["NodeId"], node["NodeName"], node["Path"], node["EntryPoint"], - node["MemoryInMB"] + node["MemoryInMB"], + model_name ) + print("NodeKeys -", list(node.keys())) if "IsAsync" in node and node["IsAsync"]: diff --git a/serwo/python/src/utils/classes/azure/user_dag.py b/serwo/python/src/utils/classes/azure/user_dag.py index 96915f0..f9ae787 100644 --- a/serwo/python/src/utils/classes/azure/user_dag.py +++ b/serwo/python/src/utils/classes/azure/user_dag.py @@ -33,8 +33,9 @@ def __init__(self, user_config_path): # NOTE - this is different in AWS, being picked up from the user dag-description # nodeID = "n" + str(index) nodeID = node["NodeId"] + model_name = node.get("ModelName", "openai:gpt-4o-mini") self.__nodeIDMap[node["NodeName"]] = nodeID - self.__dag.add_node(nodeID, NodeName=node["NodeName"], pre="", ret=["yield ", "context.call_activity(\"" + node["NodeName"] + "\",$var$)"], var=self._generate_random_variable_name(), machine_list=[nodeID]) + self.__dag.add_node(nodeID, NodeName=node["NodeName"], ModelName=model_name, pre="", ret=["yield ", "context.call_activity(\"" + node["NodeName"] + "\",$var$)"], var=self._generate_random_variable_name(), machine_list=[nodeID]) index += 1 @@ -147,6 +148,16 @@ def _get_orchestrator_code_parallel_merge(self, dag, nodes): task_list_create = task_list_var_name + " = []\n" ret = ["yield ", "context.task_all(" + task_list_var_name + ")"] pre = "" + + # Use the first node's model_name + if nodes: + model_name = dag.nodes[nodes[0]].get('ModelName', 'openai:gpt-4o-mini') + pre += f"\n# Inject model_name for parallel execution" + pre += f"\nparallel_input = json.loads(serwoObject)" + pre += f"\nif 'body' in parallel_input:" + pre += f"\n parallel_input['body']['model_name'] = '{model_name}'" + pre += f"\nserwoObject = json.dumps(parallel_input)" + for node in nodes: # Remember No $var$ will be updated in parallel merge pre += "\n" + dag.nodes[node]['pre'] @@ -169,16 +180,84 @@ def _get_orchestrator_code_linear_merge(self, dag, nodes): previous_var = None for node in nodes[:-1]: # $var$ will be updated in linear merge + # Get model_name for this node + model_name = dag.nodes[node].get('ModelName', 'openai:gpt-4o-mini') + node_name = dag.nodes[node].get('NodeName', 'unknown') if previous_var is not None: + # Inject model_name before calling activity + pre += f"\n# Inject model_name for {dag.nodes[node].get('NodeName')}" + pre += f"\nimport json" + pre += f"\n{previous_var}_dict = json.loads({previous_var})" + pre += f"\nif 'body' in {previous_var}_dict:" + pre += f"\n body = {previous_var}_dict['body']" + pre += f"\n if isinstance(body, str):" + pre += f"\n body = json.loads(body)" + pre += f"\n body['model_name'] = '{model_name}'" + pre += f"\n {previous_var}_dict['body'] = json.dumps(body)" + pre += f"\n else:" + pre += f"\n body['model_name'] = '{model_name}'" + pre += f"\n {previous_var}_dict['body'] = body" + pre += f"\nelif '_body' in {previous_var}_dict:" + pre += f"\n body = {previous_var}_dict['_body']" + pre += f"\n if isinstance(body, str):" + pre += f"\n body = json.loads(body)" + pre += f"\n body['model_name'] = '{model_name}'" + pre += f"\n {previous_var}_dict['_body'] = json.dumps(body)" + pre += f"\n else:" + pre += f"\n body['model_name'] = '{model_name}'" + pre += f"\n {previous_var}_dict['_body'] = body" + pre += f"\nelse:" + pre += f"\n {previous_var}_dict['model_name'] = '{model_name}'" + pre += f"\n{previous_var} = json.dumps({previous_var}_dict)" + pre += "\n" + dag.nodes[node]['pre'].replace("$var$", previous_var) var_substituted = dag.nodes[node]['ret'][1].replace("$var$", previous_var) else: + # Inject model_name for first node + pre += f"\n# Inject model_name for {dag.nodes[node].get('NodeName')}" + pre += f"\nimport json" + pre += f"\nserwoObject_dict = json.loads(serwoObject)" + pre += f"\nif 'body' in serwoObject_dict:" + pre += f"\n serwoObject_dict['body']['model_name'] = '{model_name}'" + pre += f"\nelse:" + pre += f"\n # Raw input - add model_name at top level" + pre += f"\n serwoObject_dict['model_name'] = '{model_name}'" + pre += f"\nserwoObject = json.dumps(serwoObject_dict)" + pre += "\n" + dag.nodes[node]['pre'] var_substituted = dag.nodes[node]['ret'][1] + pre += "\n" + dag.nodes[node]['var'] + " = " + dag.nodes[node]['ret'][0] + " " + var_substituted previous_var = dag.nodes[node]['var'] + # Handle last node + model_name_last = dag.nodes[last].get('ModelName', 'openai:gpt-4o-mini') + + pre += f"\n# Inject model_name for {dag.nodes[last].get('NodeName')}" + pre += f"\n{previous_var}_dict = json.loads({dag.nodes[nodes[-2]]['var']})" + pre += f"\nif 'body' in {previous_var}_dict:" + pre += f"\n body = {previous_var}_dict['body']" + pre += f"\n if isinstance(body, str):" + pre += f"\n body = json.loads(body)" + pre += f"\n body['model_name'] = '{model_name_last}'" + pre += f"\n {previous_var}_dict['body'] = json.dumps(body)" + pre += f"\n else:" + pre += f"\n body['model_name'] = '{model_name_last}'" + pre += f"\n {previous_var}_dict['body'] = body" + pre += f"\nelif '_body' in {previous_var}_dict:" + pre += f"\n body = {previous_var}_dict['_body']" + pre += f"\n if isinstance(body, str):" + pre += f"\n body = json.loads(body)" + pre += f"\n body['model_name'] = '{model_name_last}'" + pre += f"\n {previous_var}_dict['_body'] = json.dumps(body)" + pre += f"\n else:" + pre += f"\n body['model_name'] = '{model_name_last}'" + pre += f"\n {previous_var}_dict['_body'] = body" + pre += f"\nelse:" + pre += f"\n {previous_var}_dict['model_name'] = '{model_name_last}'" + pre += f"\n{dag.nodes[nodes[-2]]['var']} = json.dumps({previous_var}_dict)" + pre += "\n" + dag.nodes[last]['pre'].replace("$var$", dag.nodes[nodes[-2]]['var']) var = self._generate_random_variable_name() # $var$ will be updated in linear merge diff --git a/serwo/python/src/utils/classes/commons/serwo_user_dag.py b/serwo/python/src/utils/classes/commons/serwo_user_dag.py index 5bf9127..dd7bab3 100644 --- a/serwo/python/src/utils/classes/commons/serwo_user_dag.py +++ b/serwo/python/src/utils/classes/commons/serwo_user_dag.py @@ -46,6 +46,11 @@ def __init__(self, user_config_path): if "IsContainerised" in node: self.__dag.add_node(nodeID, IsContainerised=node["IsContainerised"]) + + if "ModelName" in node: + self.__dag.add_node(nodeID, + ModelName=node["ModelName"]) + index += 1 # add edges in the dag diff --git a/serwo/templates/azure/predefined-functions/Orchestrate/__init__.py b/serwo/templates/azure/predefined-functions/Orchestrate/__init__.py index eeb868f..dd9f391 100644 --- a/serwo/templates/azure/predefined-functions/Orchestrate/__init__.py +++ b/serwo/templates/azure/predefined-functions/Orchestrate/__init__.py @@ -163,16 +163,96 @@ def orchestrator_function(context: df.DurableOrchestrationContext): should_continue = True while should_continue: - xvyi = yield context.call_activity("Planner", serwoObject) - urxb = yield context.call_activity("Actor", xvyi) - zupq = yield context.call_activity("Evaluator", urxb) - zupq = insert_end_stats_in_metadata(zupq) - pmbi = yield context.call_activity("CollectLogs", zupq) + # Inject model_name for Planner + import json + serwoObject_dict = json.loads(serwoObject) + if 'body' in serwoObject_dict: + serwoObject_dict['body']['model_name'] = 'openai:gpt-4o-mini' + else: + # Raw input - add model_name at top level + serwoObject_dict['model_name'] = 'openai:gpt-4o-mini' + serwoObject = json.dumps(serwoObject_dict) + isfi = yield context.call_activity("Planner", serwoObject) + # Inject model_name for Actor + import json + isfi_dict = json.loads(isfi) + if 'body' in isfi_dict: + body = isfi_dict['body'] + if isinstance(body, str): + body = json.loads(body) + body['model_name'] = 'openai:gpt-4o-mini' + isfi_dict['body'] = json.dumps(body) + else: + body['model_name'] = 'openai:gpt-4o-mini' + isfi_dict['body'] = body + elif '_body' in isfi_dict: + body = isfi_dict['_body'] + if isinstance(body, str): + body = json.loads(body) + body['model_name'] = 'openai:gpt-4o-mini' + isfi_dict['_body'] = json.dumps(body) + else: + body['model_name'] = 'openai:gpt-4o-mini' + isfi_dict['_body'] = body + else: + isfi_dict['model_name'] = 'openai:gpt-4o-mini' + isfi = json.dumps(isfi_dict) + memo = yield context.call_activity("Actor", isfi) + # Inject model_name for Evaluator + import json + memo_dict = json.loads(memo) + if 'body' in memo_dict: + body = memo_dict['body'] + if isinstance(body, str): + body = json.loads(body) + body['model_name'] = 'openai:gpt-4o-mini' + memo_dict['body'] = json.dumps(body) + else: + body['model_name'] = 'openai:gpt-4o-mini' + memo_dict['body'] = body + elif '_body' in memo_dict: + body = memo_dict['_body'] + if isinstance(body, str): + body = json.loads(body) + body['model_name'] = 'openai:gpt-4o-mini' + memo_dict['_body'] = json.dumps(body) + else: + body['model_name'] = 'openai:gpt-4o-mini' + memo_dict['_body'] = body + else: + memo_dict['model_name'] = 'openai:gpt-4o-mini' + memo = json.dumps(memo_dict) + kwjb = yield context.call_activity("Evaluator", memo) + # Inject model_name for CollectLogs + kwjb_dict = json.loads(kwjb) + if 'body' in kwjb_dict: + body = kwjb_dict['body'] + if isinstance(body, str): + body = json.loads(body) + body['model_name'] = 'openai:gpt-4o-mini' + kwjb_dict['body'] = json.dumps(body) + else: + body['model_name'] = 'openai:gpt-4o-mini' + kwjb_dict['body'] = body + elif '_body' in kwjb_dict: + body = kwjb_dict['_body'] + if isinstance(body, str): + body = json.loads(body) + body['model_name'] = 'openai:gpt-4o-mini' + kwjb_dict['_body'] = json.dumps(body) + else: + body['model_name'] = 'openai:gpt-4o-mini' + kwjb_dict['_body'] = body + else: + kwjb_dict['model_name'] = 'openai:gpt-4o-mini' + kwjb = json.dumps(kwjb_dict) + kwjb = insert_end_stats_in_metadata(kwjb) + jwjf = yield context.call_activity("CollectLogs", kwjb) # Check conditional branching condition import json try: - result_dict = json.loads(pmbi) + result_dict = json.loads(jwjf) if '_body' in result_dict: result_body = result_dict['_body'] else: @@ -193,7 +273,7 @@ def orchestrator_function(context: df.DurableOrchestrationContext): if not should_continue: break - return pmbi + return jwjf main = df.Orchestrator.create(orchestrator_function) diff --git a/serwo/templates/azure/predefined-functions/QueueTrigger/__init__.py b/serwo/templates/azure/predefined-functions/QueueTrigger/__init__.py index 3c0685a..e080783 100644 --- a/serwo/templates/azure/predefined-functions/QueueTrigger/__init__.py +++ b/serwo/templates/azure/predefined-functions/QueueTrigger/__init__.py @@ -13,7 +13,7 @@ def get_delta(start_time): func_id = 253 -app_name = 'xfaasUserWf0000164304' +app_name = 'xfaasUserWf0000973582' async def main(msg: func.QueueMessage,starter: str) -> None: diff --git a/serwo/xfaas_run_benchmark.py b/serwo/xfaas_run_benchmark.py index 2dbb537..5a1d165 100644 --- a/serwo/xfaas_run_benchmark.py +++ b/serwo/xfaas_run_benchmark.py @@ -326,7 +326,7 @@ def run_workload(csp,region,part_id,max_rps,duration,payload_size,dynamism,wf_na # payload_size = d[2] ne_session_id = session_id + str(i) # payload = load_payload(wf_user_directory,payload_size) - breakpoint() + #breakpoint() if dynamism == 'slow': # Or match your exact dynamism string duration_segment = 600.0 # Force ThreadGroup.duration=600s (10 min alive) target_throughput = 1.0 # Force ConstantThroughputTimer throughput=1.0 (1 sample/minute) From 0e95b157414a2df772a6eda73c06f615e12caa1e Mon Sep 17 00:00:00 2001 From: Vaibhav Jha Date: Wed, 28 Jan 2026 12:44:08 +0530 Subject: [PATCH 3/3] feature: long payload support, orchestrator fixes and XC Agent Reasoning Loop --- serwo/aws_create_statemachine.py | 72 ++++++++--------- serwo/config/xfaas_user_config.json | 5 +- .../Orchestrate/__init__.py | 80 +++++++++---------- .../QueueTrigger/__init__.py | 2 +- serwo/xfaas_run_benchmark.py | 1 + serwo/xfbench_plotter.py | 3 +- 6 files changed, 84 insertions(+), 79 deletions(-) diff --git a/serwo/aws_create_statemachine.py b/serwo/aws_create_statemachine.py index 069dbbf..f637597 100644 --- a/serwo/aws_create_statemachine.py +++ b/serwo/aws_create_statemachine.py @@ -383,42 +383,42 @@ def __generate_asl_template(self): data['States']['Success'] = { 'Type': 'Succeed' } - - # NEW: Inject model_name Parameters for each Task state - function_object_map = self.__user_dag.get_node_object_map() - - # Define which nodes should NOT get Parameters injection - # - CollectLogs: system function - # - Start node: receives raw input without XFaaS wrapper - nodes_not_to_inject = ['CollectLogs'] - start_node = data.get('StartAt') - if start_node: - nodes_not_to_inject.append(start_node) - - for state_name, state_def in data['States'].items(): - if state_def.get('Type') == 'Task' and state_name not in nodes_not_to_inject: - if state_name in function_object_map: - model_name = function_object_map[state_name].get_model_name() - state_def['Parameters'] = { - "statusCode.$": "$.statusCode", - "metadata.$": "$.metadata", - "body": { - "model_name": model_name, - "_xfaas_wrapped_body.$": "$.body" - } - } - print(f"Injected model_name '{model_name}' for state '{state_name}'") - else: - print(f"DEBUG: State '{state_name}' not found in function_object_map, using default") - # Still inject Parameters for consistency (with default model) - state_def['Parameters'] = { - "statusCode.$": "$.statusCode", - "metadata.$": "$.metadata", - "body": { - "model_name": "openai:gpt-4o-mini", # fallback default - "_xfaas_wrapped_body.$": "$.body" - } - } + # Comment out when you need to inject Model names for Agentic workflows + # # NEW: Inject model_name Parameters for each Task state + # function_object_map = self.__user_dag.get_node_object_map() + + # # Define which nodes should NOT get Parameters injection + # # - CollectLogs: system function + # # - Start node: receives raw input without XFaaS wrapper + # nodes_not_to_inject = ['CollectLogs'] + # start_node = data.get('StartAt') + # if start_node: + # nodes_not_to_inject.append(start_node) + + # for state_name, state_def in data['States'].items(): + # if state_def.get('Type') == 'Task' and state_name not in nodes_not_to_inject: + # if state_name in function_object_map: + # model_name = function_object_map[state_name].get_model_name() + # state_def['Parameters'] = { + # "statusCode.$": "$.statusCode", + # "metadata.$": "$.metadata", + # "body": { + # "model_name": model_name, + # "_xfaas_wrapped_body.$": "$.body" + # } + # } + # print(f"Injected model_name '{model_name}' for state '{state_name}'") + # else: + # print(f"DEBUG: State '{state_name}' not found in function_object_map, using default") + # # Still inject Parameters for consistency (with default model) + # state_def['Parameters'] = { + # "statusCode.$": "$.statusCode", + # "metadata.$": "$.metadata", + # "body": { + # "model_name": "openai:gpt-4o-mini", # fallback default + # "_xfaas_wrapped_body.$": "$.body" + # } + # } # print("Updated data of sfn builder after adding poll",data) with open(f"{self.__aws_build_dir}/{self.__json_file}", "w") as statemachinejson: diff --git a/serwo/config/xfaas_user_config.json b/serwo/config/xfaas_user_config.json index dd8d0ff..b0a9584 100644 --- a/serwo/config/xfaas_user_config.json +++ b/serwo/config/xfaas_user_config.json @@ -12,6 +12,9 @@ "user_pinned_nodes" : { "1":"0", "2":"0", - "3":"0" + "3":"0", + "4":"0", + "5":"0", + "6":"0" } } \ No newline at end of file diff --git a/serwo/templates/azure/predefined-functions/Orchestrate/__init__.py b/serwo/templates/azure/predefined-functions/Orchestrate/__init__.py index dd9f391..ea5cbca 100644 --- a/serwo/templates/azure/predefined-functions/Orchestrate/__init__.py +++ b/serwo/templates/azure/predefined-functions/Orchestrate/__init__.py @@ -172,87 +172,87 @@ def orchestrator_function(context: df.DurableOrchestrationContext): # Raw input - add model_name at top level serwoObject_dict['model_name'] = 'openai:gpt-4o-mini' serwoObject = json.dumps(serwoObject_dict) - isfi = yield context.call_activity("Planner", serwoObject) + oshm = yield context.call_activity("Planner", serwoObject) # Inject model_name for Actor import json - isfi_dict = json.loads(isfi) - if 'body' in isfi_dict: - body = isfi_dict['body'] + oshm_dict = json.loads(oshm) + if 'body' in oshm_dict: + body = oshm_dict['body'] if isinstance(body, str): body = json.loads(body) body['model_name'] = 'openai:gpt-4o-mini' - isfi_dict['body'] = json.dumps(body) + oshm_dict['body'] = json.dumps(body) else: body['model_name'] = 'openai:gpt-4o-mini' - isfi_dict['body'] = body - elif '_body' in isfi_dict: - body = isfi_dict['_body'] + oshm_dict['body'] = body + elif '_body' in oshm_dict: + body = oshm_dict['_body'] if isinstance(body, str): body = json.loads(body) body['model_name'] = 'openai:gpt-4o-mini' - isfi_dict['_body'] = json.dumps(body) + oshm_dict['_body'] = json.dumps(body) else: body['model_name'] = 'openai:gpt-4o-mini' - isfi_dict['_body'] = body + oshm_dict['_body'] = body else: - isfi_dict['model_name'] = 'openai:gpt-4o-mini' - isfi = json.dumps(isfi_dict) - memo = yield context.call_activity("Actor", isfi) + oshm_dict['model_name'] = 'openai:gpt-4o-mini' + oshm = json.dumps(oshm_dict) + jzsy = yield context.call_activity("Actor", oshm) # Inject model_name for Evaluator import json - memo_dict = json.loads(memo) - if 'body' in memo_dict: - body = memo_dict['body'] + jzsy_dict = json.loads(jzsy) + if 'body' in jzsy_dict: + body = jzsy_dict['body'] if isinstance(body, str): body = json.loads(body) body['model_name'] = 'openai:gpt-4o-mini' - memo_dict['body'] = json.dumps(body) + jzsy_dict['body'] = json.dumps(body) else: body['model_name'] = 'openai:gpt-4o-mini' - memo_dict['body'] = body - elif '_body' in memo_dict: - body = memo_dict['_body'] + jzsy_dict['body'] = body + elif '_body' in jzsy_dict: + body = jzsy_dict['_body'] if isinstance(body, str): body = json.loads(body) body['model_name'] = 'openai:gpt-4o-mini' - memo_dict['_body'] = json.dumps(body) + jzsy_dict['_body'] = json.dumps(body) else: body['model_name'] = 'openai:gpt-4o-mini' - memo_dict['_body'] = body + jzsy_dict['_body'] = body else: - memo_dict['model_name'] = 'openai:gpt-4o-mini' - memo = json.dumps(memo_dict) - kwjb = yield context.call_activity("Evaluator", memo) + jzsy_dict['model_name'] = 'openai:gpt-4o-mini' + jzsy = json.dumps(jzsy_dict) + rveh = yield context.call_activity("Evaluator", jzsy) # Inject model_name for CollectLogs - kwjb_dict = json.loads(kwjb) - if 'body' in kwjb_dict: - body = kwjb_dict['body'] + rveh_dict = json.loads(rveh) + if 'body' in rveh_dict: + body = rveh_dict['body'] if isinstance(body, str): body = json.loads(body) body['model_name'] = 'openai:gpt-4o-mini' - kwjb_dict['body'] = json.dumps(body) + rveh_dict['body'] = json.dumps(body) else: body['model_name'] = 'openai:gpt-4o-mini' - kwjb_dict['body'] = body - elif '_body' in kwjb_dict: - body = kwjb_dict['_body'] + rveh_dict['body'] = body + elif '_body' in rveh_dict: + body = rveh_dict['_body'] if isinstance(body, str): body = json.loads(body) body['model_name'] = 'openai:gpt-4o-mini' - kwjb_dict['_body'] = json.dumps(body) + rveh_dict['_body'] = json.dumps(body) else: body['model_name'] = 'openai:gpt-4o-mini' - kwjb_dict['_body'] = body + rveh_dict['_body'] = body else: - kwjb_dict['model_name'] = 'openai:gpt-4o-mini' - kwjb = json.dumps(kwjb_dict) - kwjb = insert_end_stats_in_metadata(kwjb) - jwjf = yield context.call_activity("CollectLogs", kwjb) + rveh_dict['model_name'] = 'openai:gpt-4o-mini' + rveh = json.dumps(rveh_dict) + rveh = insert_end_stats_in_metadata(rveh) + ktpc = yield context.call_activity("CollectLogs", rveh) # Check conditional branching condition import json try: - result_dict = json.loads(jwjf) + result_dict = json.loads(ktpc) if '_body' in result_dict: result_body = result_dict['_body'] else: @@ -273,7 +273,7 @@ def orchestrator_function(context: df.DurableOrchestrationContext): if not should_continue: break - return jwjf + return ktpc main = df.Orchestrator.create(orchestrator_function) diff --git a/serwo/templates/azure/predefined-functions/QueueTrigger/__init__.py b/serwo/templates/azure/predefined-functions/QueueTrigger/__init__.py index e080783..cb02105 100644 --- a/serwo/templates/azure/predefined-functions/QueueTrigger/__init__.py +++ b/serwo/templates/azure/predefined-functions/QueueTrigger/__init__.py @@ -13,7 +13,7 @@ def get_delta(start_time): func_id = 253 -app_name = 'xfaasUserWf0000973582' +app_name = 'xfaasUserWf0000882849' async def main(msg: func.QueueMessage,starter: str) -> None: diff --git a/serwo/xfaas_run_benchmark.py b/serwo/xfaas_run_benchmark.py index 5a1d165..cf53d91 100644 --- a/serwo/xfaas_run_benchmark.py +++ b/serwo/xfaas_run_benchmark.py @@ -281,6 +281,7 @@ def generate_shell_script_and_scp(csp,payload_size, wf_name, rps, duration,dynam os.system(f"ssh {server_user_id}@{server_ip} 'chmod +x shell_scripts/{shell_file_name}'") os.system(f"ssh {server_user_id}@{server_ip} ./shell_scripts/{shell_file_name}") else: + breakpoint() os.system(f"chmod +x {output_path}") os.system(f"{output_path}") #os.system(f"./{output_path}") use when you don't have full path diff --git a/serwo/xfbench_plotter.py b/serwo/xfbench_plotter.py index 3fcd281..4656da4 100644 --- a/serwo/xfbench_plotter.py +++ b/serwo/xfbench_plotter.py @@ -153,6 +153,7 @@ def __create_dynamo_db_items(self): for message in response: queue_item = json.loads(message.content) metadata = queue_item["metadata"] + data = queue_item["data"] # Filtering based on workflow deployment id during creation itself if metadata["deployment_id"].strip() == self.__workflow_deployment_id: @@ -166,7 +167,7 @@ def __create_dynamo_db_items(self): dynamo_item["invocation_start_time_ms"] = str( metadata["workflow_start_time"] ) - + dynamo_item["data"] = data # add session id to dynamo db dynamo_item["session_id"] = str(metadata["session_id"])