Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ analysis/
stashed/
serwo/examples/graphAwsFourRps
serwo/examples/graphAwsMedium
build
build/
*.jsonl
log_files
serwo/config
Expand All @@ -39,9 +39,9 @@ jmeter.log
*.csv
deployments.txt
*.pdf
qutils
qsserializers
qutils/
qsserializers/
serwo/templates/azure/predefined-functions/Orchestrate/__init__.py
serwo/templates/azure/predefined-functions/QueueTrigger/__init__.py
serwo/benchmark_resources/aws-az-mmc.jmx
serwo/benchmark_resources/az-aws-mmc.jmx
serwo/benchmark_resources/az-aws-mmc.jmx
66 changes: 60 additions & 6 deletions serwo/aws_create_statemachine.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,6 @@ def __create_lambda_fns(
with open(temp_runner_path, "w") as file:
file.write(contents)

# TODO - Fix the stickytape issue for AWS
# GitHub Issue link - https://github.com/dream-lab/XFaaS/issues/4
"""
Sticytape the runner
"""
logger.info(f"Stickytape the runner template for dependency resolution")
runner_file_path = fn_dir / f"{runner_filename}.py"
print("Temprory runner path",temp_runner_path)
Expand Down Expand Up @@ -334,6 +329,8 @@ def __generate_asl_template(self):


dag_json=self.__user_dag.get_user_dag_nodes()
map_list=self.__user_dag.get_map_list()

list_async_fns= get_set_of_async_funtions(dag_json)
# print("List of Async Fns:",list_async_fns)
changing_fns=set()
Expand All @@ -347,6 +344,8 @@ def __generate_asl_template(self):
# print("List of changing funtions:",changing_fns_list)
# Statemachine.asl.josn should be changed here
sfn_json_copy = copy.deepcopy(json.loads(sfn_json))
sfn_json_copy = self.add_map_flow(sfn_json_copy,map_list)
print(sfn_json_copy)
data=add_async_afn_builder(sfn_json_copy,changing_fns_list)
# print("Updated data of sfn builder after adding poll",data)
with open(f"{self.__aws_build_dir}/{self.__json_file}", "w") as statemachinejson:
Expand Down Expand Up @@ -443,6 +442,24 @@ def deploy_workflow(self):
json.dump(data, f, indent=4)

return self.__outputs_filepath

def add_map_flow(self,asl_json,subgraphs):
if subgraphs==None:
return asl_json
for i in range(0,len(subgraphs)):
subgraph=subgraphs[i]
if asl_json["StartAt"]==subgraph["Nodes"][0]:
asl_json["StartAt"]="Map"+str(i)
extracted_states={}
for node in subgraph["Nodes"]:
extracted_states[node]=asl_json["States"][node]
del asl_json["States"][node]
parent_node=self.__user_dag.get_parent_nodename(subgraph["Nodes"][0])
if parent_node != None:
asl_json["States"][parent_node]["Next"] = "Map"+str(i)

asl_json["States"]["Map"+str(i)]=map_template(subgraph,extracted_states)
return asl_json


def add_async_afn_builder(data,list):
Expand Down Expand Up @@ -669,4 +686,41 @@ def copy_if_not_exists(source_directory, destination_directory):
if os.path.isdir(src_path):
shutil.copytree(src_path, dst_path) # Copy entire folder
else:
shutil.copy(src_path, dst_path) # Copy individual file
shutil.copy(src_path, dst_path) # Copy individual file

def map_template(subgraph,extracted_states):
lastnode=subgraph["Nodes"][-1]
next_node=extracted_states[lastnode]["Next"]
del extracted_states[lastnode]["Next"]
extracted_states[lastnode]["End"]=True
template={
"Type": "Map",
"ItemProcessor": {
"ProcessorConfig": {
"Mode": "INLINE"
},
"StartAt": subgraph["Nodes"][0],
"States": extracted_states
},
"Next": next_node,
"ResultPath": "$.body."+subgraph["Listname"]+"Result",
"ItemsPath": "$.body."+subgraph["Listname"]
}
return template
# def add_map_flow(self,asl_json,subgraphs):
# if subgraphs==None:
# return asl_json
# for i in range(0,len(subgraphs)):
# subgraph=subgraphs[i]
# if asl_json["StartAt"]==subgraph["Nodes"][0]:
# asl_json["StartAt"]="Map"+str(i)
# extracted_states={}
# for node in subgraph["Nodes"]:
# extracted_states[node]=asl_json["States"][node]
# del asl_json["States"][node]
# parent_node=self.__user_dag.get_parent_nodename()
# if parent_node != None:
# asl_json["States"][parent_node]["Next"] = "Map"+str(i)

# asl_json["States"]["Map"+str(i)]=map_template(subgraph,extracted_states)
# return asl_json
4 changes: 3 additions & 1 deletion serwo/azure_create_statemachine.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ def run(user_directory,user_dag_file_name):
output_dir = f"{user_directory}"
dag_definition_path = f"{user_directory}/{user_dag_file_name}"
orchestrator_filepath = f"{user_directory}/orchestrator.py"

if os.path.exists(orchestrator_filepath):
os.remove(orchestrator_filepath)
print(f"File '{orchestrator_filepath}' already exists so removed successfully.")
# load the dag
user_dag = AzureUserDAG(dag_definition_path)
orchestrator_code = user_dag.get_orchestrator_code()
Expand Down
1 change: 1 addition & 0 deletions serwo/config/xfaas_user_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@
}
},
"user_pinned_nodes" : {}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ Resources:
{% for arn in arns -%}
{{ arn.name }}: {{ arn.ref }}
{% endfor %}

Policies: # Find out more about SAM policy templates: https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-policy-templates.html
{% for policy in policies -%}
- LambdaInvokePolicy:
FunctionName: !Ref {{policy.name}}
{% endfor %}

##########################################################################
##########################################################################
# REST API #
##########################################################################
ExecuteAPIForWF:
Expand All @@ -51,7 +51,7 @@ Resources:
OpenApiVersion: 3.0.3
EndpointConfiguration:
Type: REGIONAL

##########################################################################
# Roles #
##########################################################################
Expand All @@ -75,7 +75,35 @@ Resources:
- Effect: Allow
Action: "states:StartExecution"
Resource: !GetAtt {{stepfunctionsarnatrribute}}


LambdaExecutionRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
Action:
- 'sts:AssumeRole'
Policies:
- PolicyName: LambdaExecutionPolicy
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- 'logs:CreateLogGroup'
- 'logs:CreateLogStream'
- 'logs:PutLogEvents'
Resource: '*'
- Effect: Allow
Action:
- 'sqs:SendMessage'
Resource: 'arn:aws:sqs:*:*:*'

{% for function in functions -%}
{{function.name}}:
{% if function.iscontainer %}
Expand All @@ -86,6 +114,9 @@ Resources:
- x86_64
Timeout: 600
MemorySize: {{function.memory}}
{%if function.name == "CollectLogs" %}
Role: !GetAtt LambdaExecutionRole.Arn
{% endif %}
Metadata:
Dockerfile: Dockerfile
DockerContext: ./functions/{{function.name}}
Expand All @@ -98,6 +129,9 @@ Resources:
Runtime: python3.9
MemorySize: {{ function.memory }}
Timeout: 300
{%if function.name == "CollectLogs" %}
Role: !GetAtt LambdaExecutionRole.Arn
{% endif %}
Architectures:
- x86_64
{% endif%}
Expand Down
4 changes: 3 additions & 1 deletion serwo/python/src/runner-templates/aws/runner_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from copy import deepcopy
from python.src.utils.classes.commons.serwo_objects import build_serwo_object
from python.src.utils.classes.commons.serwo_objects import build_serwo_list_object
from python.src.utils.classes.commons.serwo_objects import SerWOObject
from python.src.utils.classes.commons.serwo_objects import SerWOObject, SerWOObjectsList

downstream = 0
"""
Expand Down Expand Up @@ -61,6 +61,8 @@ def lambda_handler(event, context):
request_timestamp = event["body"].get("request_timestamp")
session_id = event["body"].get("session_id") # NOTE - new variable to keep track of requests
deployment_id = event["body"].get("deployment_id")
if request_timestamp==None:
request_timestamp=0
overheads = start_time - request_timestamp
event["metadata"] = dict(
workflow_instance_id=wf_instance_id,
Expand Down
12 changes: 12 additions & 0 deletions serwo/python/src/utils/classes/aws/user_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,10 @@ def get_user_dag_name(self):
# get a map: function_name -> function_object
def get_node_object_map(self):
return self.__functions
def get_map_list(self):
if "SubGraphs" in self.__dag_config_data:
return self.__dag_config_data["SubGraphs"]
return None

# get a list of node dictionaries
def get_node_param_list(self):
Expand All @@ -254,6 +258,14 @@ def get_statemachine_structure(self):
tasklist = output_dag.nodes[node]["machine_list"]

return tasklist
def get_parent_nodename(self, node_name):
for node in self.__dag_config_data["Edges"]:
for key, value in node.items():
for name in value:
if name == node_name:
return key
return None

def get_successor_node_names(self, node_name):
# Get the node ID corresponding to the given node name
node_id = self.__nodeIDMap.get(node_name)
Expand Down
11 changes: 8 additions & 3 deletions serwo/scripts/azure/azure_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from jinja2 import Environment, FileSystemLoader
from signal import signal, SIGPIPE, SIG_DFL
from .orchestrator_async_update import async_update
from .map_update import map_update

signal(SIGPIPE,SIG_DFL)
from random import randint
Expand Down Expand Up @@ -57,7 +58,7 @@ def init_paths():
def build_working_dir(region,part_id,is_netherite):

global az_functions_path,build_dir
dummy, user_workflow_name = get_user_workflow_details()
dummy, user_workflow_name,dummy2 = get_user_workflow_details()

if is_netherite:
build_dir += f"azure_v2-{region}-{part_id}"
Expand All @@ -72,7 +73,10 @@ def get_user_workflow_details():
json_path = user_workflow_directory + '/' + user_dag_file_name
data = json.load(open(json_path))
fns_data = data['Nodes']
return fns_data,data['WorkflowName']
subgraphs= None
if "SubGraphs" in data:
subgraphs = data["SubGraphs"]
return fns_data, data['WorkflowName'], subgraphs

def get_set_of_async_funtions(fns_data):
# json_path = user_workflow_directory + '/' + user_dag_file_name
Expand Down Expand Up @@ -353,7 +357,7 @@ def build(user_dir, dag_definition_file, region, part_id,is_netherite):
DAG_DEFINITION_FILE = dag_definition_file
init_paths()
build_working_dir(region,part_id,is_netherite)
user_fns_data, user_app_name = get_user_workflow_details()
user_fns_data, user_app_name, subgraphs = get_user_workflow_details()
ingress_queue_name, app_name = generate_app_name_and_populate_and_get_ingress_queue_name(user_app_name,region,part_id,is_netherite)
build_user_fn_dirs(user_fns_data)
copy_meta_files(user_fns_data,ingress_queue_name,app_name,is_netherite)
Expand All @@ -365,6 +369,7 @@ def build(user_dir, dag_definition_file, region, part_id,is_netherite):
print("Async_Fn_detction",async_func_set)
print("Orchestrator initial path:",orchestrator_generated_path)
print("Orchestrator dest path:",orch_dest_path)
map_update.add_graph_node(orchestrator_generated_path,orchestrator_generated_path,subgraphs)
async_update.orchestrator_async_update(orchestrator_generated_path,orch_dest_path,async_func_set)

if __name__ == '__main__':
Expand Down
Loading