diff --git a/.gitignore b/.gitignore index 759511f..6427d70 100644 --- a/.gitignore +++ b/.gitignore @@ -27,7 +27,7 @@ analysis/ stashed/ serwo/examples/graphAwsFourRps serwo/examples/graphAwsMedium -build +build/ *.jsonl log_files serwo/config @@ -39,9 +39,9 @@ jmeter.log *.csv deployments.txt *.pdf -qutils -qsserializers +qutils/ +qsserializers/ serwo/templates/azure/predefined-functions/Orchestrate/__init__.py serwo/templates/azure/predefined-functions/QueueTrigger/__init__.py serwo/benchmark_resources/aws-az-mmc.jmx -serwo/benchmark_resources/az-aws-mmc.jmx \ No newline at end of file +serwo/benchmark_resources/az-aws-mmc.jmx diff --git a/serwo/aws_create_statemachine.py b/serwo/aws_create_statemachine.py index 36d2660..96d9ae2 100644 --- a/serwo/aws_create_statemachine.py +++ b/serwo/aws_create_statemachine.py @@ -170,11 +170,6 @@ def __create_lambda_fns( with open(temp_runner_path, "w") as file: file.write(contents) - # TODO - Fix the stickytape issue for AWS - # GitHub Issue link - https://github.com/dream-lab/XFaaS/issues/4 - """ - Sticytape the runner - """ logger.info(f"Stickytape the runner template for dependency resolution") runner_file_path = fn_dir / f"{runner_filename}.py" print("Temprory runner path",temp_runner_path) @@ -334,6 +329,8 @@ def __generate_asl_template(self): dag_json=self.__user_dag.get_user_dag_nodes() + map_list=self.__user_dag.get_map_list() + list_async_fns= get_set_of_async_funtions(dag_json) # print("List of Async Fns:",list_async_fns) changing_fns=set() @@ -347,6 +344,8 @@ def __generate_asl_template(self): # print("List of changing funtions:",changing_fns_list) # Statemachine.asl.josn should be changed here sfn_json_copy = copy.deepcopy(json.loads(sfn_json)) + sfn_json_copy = self.add_map_flow(sfn_json_copy,map_list) + print(sfn_json_copy) data=add_async_afn_builder(sfn_json_copy,changing_fns_list) # print("Updated data of sfn builder after adding poll",data) with open(f"{self.__aws_build_dir}/{self.__json_file}", "w") as statemachinejson: @@ -443,6 +442,24 @@ def deploy_workflow(self): json.dump(data, f, indent=4) return self.__outputs_filepath + + def add_map_flow(self,asl_json,subgraphs): + if subgraphs==None: + return asl_json + for i in range(0,len(subgraphs)): + subgraph=subgraphs[i] + if asl_json["StartAt"]==subgraph["Nodes"][0]: + asl_json["StartAt"]="Map"+str(i) + extracted_states={} + for node in subgraph["Nodes"]: + extracted_states[node]=asl_json["States"][node] + del asl_json["States"][node] + parent_node=self.__user_dag.get_parent_nodename(subgraph["Nodes"][0]) + if parent_node != None: + asl_json["States"][parent_node]["Next"] = "Map"+str(i) + + asl_json["States"]["Map"+str(i)]=map_template(subgraph,extracted_states) + return asl_json def add_async_afn_builder(data,list): @@ -669,4 +686,41 @@ def copy_if_not_exists(source_directory, destination_directory): if os.path.isdir(src_path): shutil.copytree(src_path, dst_path) # Copy entire folder else: - shutil.copy(src_path, dst_path) # Copy individual file \ No newline at end of file + shutil.copy(src_path, dst_path) # Copy individual file + +def map_template(subgraph,extracted_states): + lastnode=subgraph["Nodes"][-1] + next_node=extracted_states[lastnode]["Next"] + del extracted_states[lastnode]["Next"] + extracted_states[lastnode]["End"]=True + template={ + "Type": "Map", + "ItemProcessor": { + "ProcessorConfig": { + "Mode": "INLINE" + }, + "StartAt": subgraph["Nodes"][0], + "States": extracted_states + }, + "Next": next_node, + "ResultPath": "$.body."+subgraph["Listname"]+"Result", + "ItemsPath": "$.body."+subgraph["Listname"] + } + return template +# def add_map_flow(self,asl_json,subgraphs): +# if subgraphs==None: +# return asl_json +# for i in range(0,len(subgraphs)): +# subgraph=subgraphs[i] +# if asl_json["StartAt"]==subgraph["Nodes"][0]: +# asl_json["StartAt"]="Map"+str(i) +# extracted_states={} +# for node in subgraph["Nodes"]: +# extracted_states[node]=asl_json["States"][node] +# del asl_json["States"][node] +# parent_node=self.__user_dag.get_parent_nodename() +# if parent_node != None: +# asl_json["States"][parent_node]["Next"] = "Map"+str(i) + +# asl_json["States"]["Map"+str(i)]=map_template(subgraph,extracted_states) +# return asl_json \ No newline at end of file diff --git a/serwo/azure_create_statemachine.py b/serwo/azure_create_statemachine.py index b82ef19..a9139f0 100644 --- a/serwo/azure_create_statemachine.py +++ b/serwo/azure_create_statemachine.py @@ -17,7 +17,9 @@ def run(user_directory,user_dag_file_name): output_dir = f"{user_directory}" dag_definition_path = f"{user_directory}/{user_dag_file_name}" orchestrator_filepath = f"{user_directory}/orchestrator.py" - + if os.path.exists(orchestrator_filepath): + os.remove(orchestrator_filepath) + print(f"File '{orchestrator_filepath}' already exists so removed successfully.") # load the dag user_dag = AzureUserDAG(dag_definition_path) orchestrator_code = user_dag.get_orchestrator_code() diff --git a/serwo/config/xfaas_user_config.json b/serwo/config/xfaas_user_config.json index 73cafe2..bb351ed 100644 --- a/serwo/config/xfaas_user_config.json +++ b/serwo/config/xfaas_user_config.json @@ -10,4 +10,5 @@ } }, "user_pinned_nodes" : {} + } \ No newline at end of file diff --git a/serwo/python/src/faas-templates/aws/yaml-templates/apigw-awsstepfunctions.yaml b/serwo/python/src/faas-templates/aws/yaml-templates/apigw-awsstepfunctions.yaml index d55b94d..48fc328 100644 --- a/serwo/python/src/faas-templates/aws/yaml-templates/apigw-awsstepfunctions.yaml +++ b/serwo/python/src/faas-templates/aws/yaml-templates/apigw-awsstepfunctions.yaml @@ -29,14 +29,14 @@ Resources: {% for arn in arns -%} {{ arn.name }}: {{ arn.ref }} {% endfor %} - + Policies: # Find out more about SAM policy templates: https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-policy-templates.html {% for policy in policies -%} - LambdaInvokePolicy: FunctionName: !Ref {{policy.name}} {% endfor %} - - ########################################################################## + +########################################################################## # REST API # ########################################################################## ExecuteAPIForWF: @@ -51,7 +51,7 @@ Resources: OpenApiVersion: 3.0.3 EndpointConfiguration: Type: REGIONAL - + ########################################################################## # Roles # ########################################################################## @@ -75,7 +75,35 @@ Resources: - Effect: Allow Action: "states:StartExecution" Resource: !GetAtt {{stepfunctionsarnatrribute}} - + + LambdaExecutionRole: + Type: AWS::IAM::Role + Properties: + AssumeRolePolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Principal: + Service: + - lambda.amazonaws.com + Action: + - 'sts:AssumeRole' + Policies: + - PolicyName: LambdaExecutionPolicy + PolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Action: + - 'logs:CreateLogGroup' + - 'logs:CreateLogStream' + - 'logs:PutLogEvents' + Resource: '*' + - Effect: Allow + Action: + - 'sqs:SendMessage' + Resource: 'arn:aws:sqs:*:*:*' + {% for function in functions -%} {{function.name}}: {% if function.iscontainer %} @@ -86,6 +114,9 @@ Resources: - x86_64 Timeout: 600 MemorySize: {{function.memory}} + {%if function.name == "CollectLogs" %} + Role: !GetAtt LambdaExecutionRole.Arn + {% endif %} Metadata: Dockerfile: Dockerfile DockerContext: ./functions/{{function.name}} @@ -98,6 +129,9 @@ Resources: Runtime: python3.9 MemorySize: {{ function.memory }} Timeout: 300 + {%if function.name == "CollectLogs" %} + Role: !GetAtt LambdaExecutionRole.Arn + {% endif %} Architectures: - x86_64 {% endif%} diff --git a/serwo/python/src/runner-templates/aws/runner_template.py b/serwo/python/src/runner-templates/aws/runner_template.py index 3461074..173aede 100644 --- a/serwo/python/src/runner-templates/aws/runner_template.py +++ b/serwo/python/src/runner-templates/aws/runner_template.py @@ -16,7 +16,7 @@ from copy import deepcopy from python.src.utils.classes.commons.serwo_objects import build_serwo_object from python.src.utils.classes.commons.serwo_objects import build_serwo_list_object -from python.src.utils.classes.commons.serwo_objects import SerWOObject +from python.src.utils.classes.commons.serwo_objects import SerWOObject, SerWOObjectsList downstream = 0 """ @@ -61,6 +61,8 @@ def lambda_handler(event, context): request_timestamp = event["body"].get("request_timestamp") session_id = event["body"].get("session_id") # NOTE - new variable to keep track of requests deployment_id = event["body"].get("deployment_id") + if request_timestamp==None: + request_timestamp=0 overheads = start_time - request_timestamp event["metadata"] = dict( workflow_instance_id=wf_instance_id, diff --git a/serwo/python/src/utils/classes/aws/user_dag.py b/serwo/python/src/utils/classes/aws/user_dag.py index 0517199..d9eee4c 100644 --- a/serwo/python/src/utils/classes/aws/user_dag.py +++ b/serwo/python/src/utils/classes/aws/user_dag.py @@ -228,6 +228,10 @@ def get_user_dag_name(self): # get a map: function_name -> function_object def get_node_object_map(self): return self.__functions + def get_map_list(self): + if "SubGraphs" in self.__dag_config_data: + return self.__dag_config_data["SubGraphs"] + return None # get a list of node dictionaries def get_node_param_list(self): @@ -254,6 +258,14 @@ def get_statemachine_structure(self): tasklist = output_dag.nodes[node]["machine_list"] return tasklist + def get_parent_nodename(self, node_name): + for node in self.__dag_config_data["Edges"]: + for key, value in node.items(): + for name in value: + if name == node_name: + return key + return None + def get_successor_node_names(self, node_name): # Get the node ID corresponding to the given node name node_id = self.__nodeIDMap.get(node_name) diff --git a/serwo/scripts/azure/azure_builder.py b/serwo/scripts/azure/azure_builder.py index c86ae0b..7a3abe1 100644 --- a/serwo/scripts/azure/azure_builder.py +++ b/serwo/scripts/azure/azure_builder.py @@ -5,6 +5,7 @@ from jinja2 import Environment, FileSystemLoader from signal import signal, SIGPIPE, SIG_DFL from .orchestrator_async_update import async_update +from .map_update import map_update signal(SIGPIPE,SIG_DFL) from random import randint @@ -57,7 +58,7 @@ def init_paths(): def build_working_dir(region,part_id,is_netherite): global az_functions_path,build_dir - dummy, user_workflow_name = get_user_workflow_details() + dummy, user_workflow_name,dummy2 = get_user_workflow_details() if is_netherite: build_dir += f"azure_v2-{region}-{part_id}" @@ -72,7 +73,10 @@ def get_user_workflow_details(): json_path = user_workflow_directory + '/' + user_dag_file_name data = json.load(open(json_path)) fns_data = data['Nodes'] - return fns_data,data['WorkflowName'] + subgraphs= None + if "SubGraphs" in data: + subgraphs = data["SubGraphs"] + return fns_data, data['WorkflowName'], subgraphs def get_set_of_async_funtions(fns_data): # json_path = user_workflow_directory + '/' + user_dag_file_name @@ -353,7 +357,7 @@ def build(user_dir, dag_definition_file, region, part_id,is_netherite): DAG_DEFINITION_FILE = dag_definition_file init_paths() build_working_dir(region,part_id,is_netherite) - user_fns_data, user_app_name = get_user_workflow_details() + user_fns_data, user_app_name, subgraphs = get_user_workflow_details() ingress_queue_name, app_name = generate_app_name_and_populate_and_get_ingress_queue_name(user_app_name,region,part_id,is_netherite) build_user_fn_dirs(user_fns_data) copy_meta_files(user_fns_data,ingress_queue_name,app_name,is_netherite) @@ -365,6 +369,7 @@ def build(user_dir, dag_definition_file, region, part_id,is_netherite): print("Async_Fn_detction",async_func_set) print("Orchestrator initial path:",orchestrator_generated_path) print("Orchestrator dest path:",orch_dest_path) + map_update.add_graph_node(orchestrator_generated_path,orchestrator_generated_path,subgraphs) async_update.orchestrator_async_update(orchestrator_generated_path,orch_dest_path,async_func_set) if __name__ == '__main__': diff --git a/serwo/scripts/azure/map_update.py b/serwo/scripts/azure/map_update.py new file mode 100644 index 0000000..4cd2d8f --- /dev/null +++ b/serwo/scripts/azure/map_update.py @@ -0,0 +1,120 @@ +import re +import os + + +# def serwolist_to_obj(var): +# list = var.get_objects() +# # matadata=var.get_metadata() +# List = [] +# for obj in list: +# body = obj.get_body() +# List.append(body) +# return list + +def map_cod_gen(lists, name): + first_obj = lists[0] + new_code = 'serwo_to_dict = unmarshall(json.loads(' + \ + first_obj[2] + '))\n' + new_code += 'body = serwo_to_dict.get_body()\n' + new_code += 'obj = body["' + name + '"]\n' + new_code += 'std_meta = serwo_to_dict.get_metadata()\n' + + for i in range(0, len(lists)): + new_code += lists[i][0] + ' = []\n' + new_code += '\nfor i in range(0, len(obj)):\n' + new_code += 'var = {' + '"body": obj[i], "metadata": std_meta }\n' + new_code += 'var = build_serwo_object(var).to_json()\n' + new_code += \ + first_obj[0] + \ + '.append(context.call_activity(' + first_obj[1]+' , var))\n' + + for i in range(1, len(lists)): + l = lists[i] + new_code += \ + l[0]+'.append(context.call_activity(' + \ + l[1]+' , ' + l[2] + '[i]))\n' + + # this is out of for loop + lines = new_code.split('\n') + indented_lines = ['\n\t' + lines[i] for i in range(0,len(lines)-1)] + # print(indented_lines) + + last_obj = lists[-1] + last_code = [] + last_code.append("\n") + last_code.append("\n "+last_obj[0]+' = yield context.task_all('+last_obj[0]+')') + last_code.append("\n "+'body["'+name + \ + '"+"results"] = serwolist_to_obj('+last_obj[0] + ')') + last_code.append('\n var2 = {' + '"body": body, "metadata": std_meta }') + last_code.append("\n "+last_obj[0] + ' = build_serwo_object(var2).to_json()') + last_code.append('\n') + + # print(lines) + indented_lines+=['\n']+last_code + + # print(indented_lines) + return indented_lines + +def extract_var(input_str): + # Define the regular expressions + variable_regex = r'^\s*([a-zA-Z_]\w*)\s*=' + function_call_regex = r'\bcontext\.call_activity\(\s*"([^"]*)"\s*,\s*([^)]*)\)' + + # Extract the variable name and function call arguments + variable_match = re.match(variable_regex, input_str) + variable_name = variable_match.group(1) if variable_match else None + + # Extract function call arguments + function_call_match = re.search(function_call_regex, input_str) + if function_call_match: + first_argument = function_call_match.group(1) + second_argument = function_call_match.group(2) + else: + first_argument = None + second_argument = None + first_argument = '"'+str(first_argument)+'"' + # print("Variable Name:", variable_name) + # print("First Argument:", first_argument) + # print("Second Argument:", second_argument) + return [str(variable_name), first_argument, str(second_argument)] + +class map_update: + def add_graph_node(inpath, outpath, subgraphs): + if subgraphs == None: + return + with open(inpath, 'r') as file_ptr: + lines = file_ptr.readlines() + str = "context.call_activity" + new_lines = [] + for subgraph in subgraphs: + i = 0 + list_subgraphs = [] + # print(len(lines)) + while i SerWOObject: + try: + fin_dict = dict() + data = serwoObject.get_body() + logging.info("Data to push - "+str(data)) + metadata = serwoObject.get_metadata() + fin_dict["data"] = data + fin_dict["metadata"] = metadata + logging.info("Fin dict - "+str(fin_dict)) + queue.send_message(MessageBody=json.dumps(fin_dict), QueueUrl=connect_str) + # data = {"body": "success: OK"} + return SerWOObject(body=serwoObject.get_body()) + except Exception as e: + return SerWOObject(error=True) + \ No newline at end of file diff --git a/serwo/templates/aws/predefined-functions/CollectLogs/requirements.txt b/serwo/templates/aws/predefined-functions/CollectLogs/requirements.txt new file mode 100644 index 0000000..10c3a83 --- /dev/null +++ b/serwo/templates/aws/predefined-functions/CollectLogs/requirements.txt @@ -0,0 +1,4 @@ +psutil +requests +py-cpuinfo +boto3 \ No newline at end of file diff --git a/serwo/templates/azure/orchestrator_template.py b/serwo/templates/azure/orchestrator_template.py index 9892067..28f56cd 100644 --- a/serwo/templates/azure/orchestrator_template.py +++ b/serwo/templates/azure/orchestrator_template.py @@ -3,7 +3,7 @@ import azure.functions as func import azure.durable_functions as df from .python.src.utils.classes.commons.serwo_objects import build_serwo_object -from .python.src.utils.classes.commons.serwo_objects import SerWOObject +from .python.src.utils.classes.commons.serwo_objects import SerWOObject, SerWOObjectsList import time import os, psutil import uuid @@ -126,7 +126,15 @@ def insert_end_stats_in_metadata(input): input = build_serwo_object(out_dict).to_json() return input - +def serwolist_to_obj(list): + # list=var.get_objects() + #TODO: Add metadata field to + # matadata=var.get_metadata() + List=[] + for obj in list: + body = unmarshall(json.loads(obj)).get_body() + List.append(body) + return List def orchestrator_function(context: df.DurableOrchestrationContext): # convert input body to serwoObject for first function diff --git a/serwo/templates/azure/predefined-functions/CollectLogs/func.py b/serwo/templates/azure/predefined-functions/CollectLogs/func.py index b0cfd2b..4a0825d 100644 --- a/serwo/templates/azure/predefined-functions/CollectLogs/func.py +++ b/serwo/templates/azure/predefined-functions/CollectLogs/func.py @@ -1,16 +1,19 @@ from azure.storage.queue import QueueClient import json -from python.src.utils.classes.commons.serwo_objects import SerWOObject -import os, uuid +from .python.src.utils.classes.commons.serwo_objects import SerWOObject +# import os, uuid import logging - -connect_str = 'CONNECTION_STRING' -queue_name = 'QUEUE_NAME' - -queue = QueueClient.from_connection_string(conn_str=connect_str, queue_name=queue_name) - - - +# import boto3 + +connect_str = "CONNECTION_STRING" +queue_name = "QUEUE_NAME" +# csp = "COLL_CSP" + +queue = QueueClient.from_connection_string( + conn_str=connect_str, queue_name=queue_name + ) + + def user_function(serwoObject) -> SerWOObject: try: fin_dict = dict() @@ -26,3 +29,4 @@ def user_function(serwoObject) -> SerWOObject: return SerWOObject(body=serwoObject.get_body()) except Exception as e: return SerWOObject(error=True) + \ No newline at end of file diff --git a/serwo/xfaas_main.py b/serwo/xfaas_main.py index b7b7914..1c000c3 100644 --- a/serwo/xfaas_main.py +++ b/serwo/xfaas_main.py @@ -61,84 +61,93 @@ def randomString(stringLength): letters = string.ascii_lowercase return ''.join(random.choice(letters) for i in range(stringLength)) - -def add_collect_logs(dag_definition_path,user_wf_dir, xfaas_user_dag,region,part_id): - if region == 'us-east-1': - region = 'eastus' - elif region == 'ap-southeast-1': - region = 'southeastasia' - else: - region = 'centralindia' +def add_collect_logs(dag_definition_path,user_wf_dir, xfaas_user_dag,region): + # if region == 'us-east-1': + # region = 'eastus' + # elif region == 'ap-southeast-1': + # region = 'southeastasia' + # else: + # region = 'centralindia' # region = 'centralindia' - collect_logs_dir = f'{project_dir}/templates/azure/predefined-functions/CollectLogs' new_collect_logs_dir = f'{user_wf_dir}/CollectLogs' - - - print('creating xfaas logging queue') - resource_group_name = f"xfaasLog{region}" - storage_account_name = f"xfaaslog{region}" - + collect_logs_dir='' queue_name = randomString(5) - credential = DefaultAzureCredential() - subscription_id = os.environ["AZURE_SUBSCRIPTION_ID"] - resource_client = ResourceManagementClient(credential, subscription_id) - try: - rg_result = resource_client.resource_groups.create_or_update( - f"{resource_group_name}", {"location": f"{region}"} - ) - except Exception as e: - print(e) - - try: - storage_client = StorageManagementClient(credential, subscription_id) - poller = storage_client.storage_accounts.begin_create(resource_group_name, storage_account_name, - { - "location" : region, - "kind": "StorageV2", - "sku": {"name": "Standard_LRS"} - } - ) - account_result = poller.result() - # print(f"Provisioned storage account {account_result.name}") - except Exception as e: - print(e) - - try: - # queue_service_client = QueueServiceClient(account_url=f"https://{storage_account_name}.queue.core.windows.net", credential=credential) - # queue_service_client.create_queue(queue_name) - queue_creation_command=f"az storage queue create -n {queue_name} --account-name {storage_account_name}" - os.system(queue_creation_command) - except Exception as e: - print(e) - - try: - #TODO Need a native call to get connection string - stream = os.popen(f'az storage account show-connection-string --name {storage_account_name} --resource-group {resource_group_name}') - json_str = stream.read() - stream.close() - except Exception as e: - print(e) - - jsson = json.loads(json_str) - fin_dict = {'queue_name' : queue_name, 'connection_string' : jsson['connectionString'] , 'storage_account' : storage_account_name, 'group':resource_group_name} + if csp=="azure": + collect_logs_dir = f'{project_dir}/templates/azure/predefined-functions/CollectLogs' + print('creating xfaas logging queue') + resource_group_name = f"xfaasLog{region}" + storage_account_name = f"xfaaslog{region}" + + credential = DefaultAzureCredential() + subscription_id = os.environ["AZURE_SUBSCRIPTION_ID"] + resource_client = ResourceManagementClient(credential, subscription_id) + try: + rg_result = resource_client.resource_groups.create_or_update( + f"{resource_group_name}", {"location": f"{region}"} + ) + except Exception as e: + print(e) + + try: + storage_client = StorageManagementClient(credential, subscription_id) + poller = storage_client.storage_accounts.begin_create(resource_group_name, storage_account_name, + { + "location" : region, + "kind": "StorageV2", + "sku": {"name": "Standard_LRS"} + } + ) + account_result = poller.result() + # print(f"Provisioned storage account {account_result.name}") + except Exception as e: + print(e) + + try: + # queue_service_client = QueueServiceClient(account_url=f"https://{storage_account_name}.queue.core.windows.net", credential=credential) + # queue_service_client.create_queue(queue_name) + queue_creation_command=f"az storage queue create -n {queue_name} --account-name {storage_account_name}" + os.system(queue_creation_command) + except Exception as e: + print(e) + + try: + #TODO Need a native call to get connection string + stream = os.popen(f'az storage account show-connection-string --name {storage_account_name} --resource-group {resource_group_name}') + json_str = stream.read() + stream.close() + except Exception as e: + print(e) + + jsson = json.loads(json_str) + fin_dict = {'queue_name' : queue_name, 'connection_string' : jsson['connectionString'] , 'storage_account' : storage_account_name, 'group':resource_group_name} + elif csp=="aws": + collect_logs_dir = f'{project_dir}/templates/aws/predefined-functions/CollectLogs' + queue_creation_command = f"aws sqs create-queue --queue-name {queue_name}" + stream = os.popen(queue_creation_command) + output = stream.read() + stream.close() + queue_url = json.loads(output)["QueueUrl"] + print("queue created") + fin_dict = {"queue_name": queue_name, "connection_string": queue_url} ## copy collect_logs_dir to user_wf_dir using shutil copytree os.system(f'cp -r {collect_logs_dir} {user_wf_dir}') connection_string_template = 'CONNECTION_STRING' queue_name_template = 'QUEUE_NAME' - + # coll_csp_template="COLL_CSP" ##open the file and replace the connection string and queue name with open(f'{new_collect_logs_dir}/func.py', 'r') as file : filedata = file.read() filedata = filedata.replace(connection_string_template, fin_dict['connection_string']) filedata = filedata.replace(queue_name_template, fin_dict['queue_name']) + filedata = filedata.replace('COLL_CSP', csp.lower()) with open(f'{new_collect_logs_dir}/func.py', 'w') as file: file.write(filedata)