From 9cb0ea80f20fc634873dcb309b59d39496bb9b1e Mon Sep 17 00:00:00 2001 From: Tarun Pal Date: Fri, 14 Jun 2024 17:17:41 +0530 Subject: [PATCH 01/10] Added independent sqs support for aws and seprated deployment as per csp --- .../apigw-awsstepfunctions.yaml | 44 ++++++- .../predefined-functions/CollectLogs/func.py | 28 +++-- .../CollectLogs/requirements.txt | 3 +- serwo/xfaas_main.py | 111 ++++++++++-------- 4 files changed, 120 insertions(+), 66 deletions(-) diff --git a/serwo/python/src/faas-templates/aws/yaml-templates/apigw-awsstepfunctions.yaml b/serwo/python/src/faas-templates/aws/yaml-templates/apigw-awsstepfunctions.yaml index fc80e21..f80995d 100644 --- a/serwo/python/src/faas-templates/aws/yaml-templates/apigw-awsstepfunctions.yaml +++ b/serwo/python/src/faas-templates/aws/yaml-templates/apigw-awsstepfunctions.yaml @@ -29,14 +29,14 @@ Resources: {% for arn in arns -%} {{ arn.name }}: {{ arn.ref }} {% endfor %} - + Policies: # Find out more about SAM policy templates: https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-policy-templates.html {% for policy in policies -%} - LambdaInvokePolicy: FunctionName: !Ref {{policy.name}} {% endfor %} - - ########################################################################## + +########################################################################## # REST API # ########################################################################## ExecuteAPIForWF: @@ -51,7 +51,7 @@ Resources: OpenApiVersion: 3.0.3 EndpointConfiguration: Type: REGIONAL - + ########################################################################## # Roles # ########################################################################## @@ -75,7 +75,35 @@ Resources: - Effect: Allow Action: "states:StartExecution" Resource: !GetAtt {{stepfunctionsarnatrribute}} - + + LambdaExecutionRole: + Type: AWS::IAM::Role + Properties: + AssumeRolePolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Principal: + Service: + - lambda.amazonaws.com + Action: + - 'sts:AssumeRole' + Policies: + - PolicyName: LambdaExecutionPolicy + PolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Action: + - 'logs:CreateLogGroup' + - 'logs:CreateLogStream' + - 'logs:PutLogEvents' + Resource: '*' + - Effect: Allow + Action: + - 'sqs:SendMessage' + Resource: 'arn:aws:sqs:*:*:*' + {% for function in functions -%} {{function.name}}: {% if function.iscontainer %} @@ -86,6 +114,9 @@ Resources: - x86_64 Timeout: 600 MemorySize: {{function.memory}} + {%if function.name == "CollectLogs" %} + Role: !GetAtt LambdaExecutionRole.Arn + {% endif %} Metadata: Dockerfile: Dockerfile DockerContext: ./functions/{{function.name}} @@ -98,6 +129,9 @@ Resources: Runtime: python3.10 MemorySize: {{ function.memory }} Timeout: 300 + {%if function.name == "CollectLogs" %} + Role: !GetAtt LambdaExecutionRole.Arn + {% endif %} Architectures: - x86_64 {% endif%} diff --git a/serwo/templates/azure/predefined-functions/CollectLogs/func.py b/serwo/templates/azure/predefined-functions/CollectLogs/func.py index 4d8e60f..ccd5cd0 100644 --- a/serwo/templates/azure/predefined-functions/CollectLogs/func.py +++ b/serwo/templates/azure/predefined-functions/CollectLogs/func.py @@ -3,14 +3,20 @@ from python.src.utils.classes.commons.serwo_objects import SerWOObject import os, uuid import logging - -connect_str = 'CONNECTION_STRING' -queue_name = 'QUEUE_NAME' - -queue = QueueClient.from_connection_string(conn_str=connect_str, queue_name=queue_name) - - - +import boto3 + +connect_str = "CONNECTION_STRING" +queue_name = "QUEUE_NAME" +csp = "COLL_CSP" + +if csp == "aws": + queue = boto3.client("sqs") +elif csp == "azure": + queue = QueueClient.from_connection_string( + conn_str=connect_str, queue_name=queue_name + ) + + def user_function(serwoObject) -> SerWOObject: try: fin_dict = dict() @@ -20,8 +26,12 @@ def user_function(serwoObject) -> SerWOObject: fin_dict["data"] = data fin_dict["metadata"] = metadata logging.info("Fin dict - "+str(fin_dict)) - queue.send_message(json.dumps(fin_dict)) + if csp == "azure": + queue.send_message(json.dumps(fin_dict)) + elif csp == "aws": + queue.send_message(MessageBody=json.dumps(fin_dict), QueueUrl=connect_str) # data = {"body": "success: OK"} return SerWOObject(body=serwoObject.get_body()) except Exception as e: return SerWOObject(error=True) + \ No newline at end of file diff --git a/serwo/templates/azure/predefined-functions/CollectLogs/requirements.txt b/serwo/templates/azure/predefined-functions/CollectLogs/requirements.txt index 4d753b2..c4db13a 100644 --- a/serwo/templates/azure/predefined-functions/CollectLogs/requirements.txt +++ b/serwo/templates/azure/predefined-functions/CollectLogs/requirements.txt @@ -1,4 +1,5 @@ azure-storage-queue psutil requests -py-cpuinfo \ No newline at end of file +py-cpuinfo +boto3 \ No newline at end of file diff --git a/serwo/xfaas_main.py b/serwo/xfaas_main.py index 203ec2c..aa28e25 100644 --- a/serwo/xfaas_main.py +++ b/serwo/xfaas_main.py @@ -62,82 +62,91 @@ def randomString(stringLength): def add_collect_logs(dag_definition_path,user_wf_dir, xfaas_user_dag,region): - if region == 'us-east-1': - region = 'eastus' - elif region == 'ap-southeast-1': - region = 'southeastasia' - else: - region = 'centralindia' + # if region == 'us-east-1': + # region = 'eastus' + # elif region == 'ap-southeast-1': + # region = 'southeastasia' + # else: + # region = 'centralindia' # region = 'centralindia' collect_logs_dir = f'{project_dir}/templates/azure/predefined-functions/CollectLogs' new_collect_logs_dir = f'{user_wf_dir}/CollectLogs' - - print('creating xfaas logging queue') - resource_group_name = f"xfaasLog{region}" - storage_account_name = f"xfaaslog{region}" - queue_name = randomString(5) - credential = DefaultAzureCredential() - subscription_id = os.environ["AZURE_SUBSCRIPTION_ID"] - resource_client = ResourceManagementClient(credential, subscription_id) - try: - rg_result = resource_client.resource_groups.create_or_update( - f"{resource_group_name}", {"location": f"{region}"} - ) - except Exception as e: - print(e) + if csp=="azure": + print('creating xfaas logging queue') + resource_group_name = f"xfaasLog{region}" + storage_account_name = f"xfaaslog{region}" - try: - storage_client = StorageManagementClient(credential, subscription_id) - poller = storage_client.storage_accounts.begin_create(resource_group_name, storage_account_name, - { - "location" : region, - "kind": "StorageV2", - "sku": {"name": "Standard_LRS"} - } - ) - account_result = poller.result() - # print(f"Provisioned storage account {account_result.name}") - except Exception as e: - print(e) + + credential = DefaultAzureCredential() + subscription_id = os.environ["AZURE_SUBSCRIPTION_ID"] + resource_client = ResourceManagementClient(credential, subscription_id) + try: + rg_result = resource_client.resource_groups.create_or_update( + f"{resource_group_name}", {"location": f"{region}"} + ) + except Exception as e: + print(e) + + try: + storage_client = StorageManagementClient(credential, subscription_id) + poller = storage_client.storage_accounts.begin_create(resource_group_name, storage_account_name, + { + "location" : region, + "kind": "StorageV2", + "sku": {"name": "Standard_LRS"} + } + ) + account_result = poller.result() + # print(f"Provisioned storage account {account_result.name}") + except Exception as e: + print(e) - try: - # queue_service_client = QueueServiceClient(account_url=f"https://{storage_account_name}.queue.core.windows.net", credential=credential) - # queue_service_client.create_queue(queue_name) - queue_creation_command=f"az storage queue create -n {queue_name} --account-name {storage_account_name}" - os.system(queue_creation_command) - except Exception as e: - print(e) + try: + # queue_service_client = QueueServiceClient(account_url=f"https://{storage_account_name}.queue.core.windows.net", credential=credential) + # queue_service_client.create_queue(queue_name) + queue_creation_command=f"az storage queue create -n {queue_name} --account-name {storage_account_name}" + os.system(queue_creation_command) + except Exception as e: + print(e) - try: - #TODO Need a native call to get connection string - stream = os.popen(f'az storage account show-connection-string --name {storage_account_name} --resource-group {resource_group_name}') - json_str = stream.read() - stream.close() - except Exception as e: - print(e) + try: + #TODO Need a native call to get connection string + stream = os.popen(f'az storage account show-connection-string --name {storage_account_name} --resource-group {resource_group_name}') + json_str = stream.read() + stream.close() + except Exception as e: + print(e) - jsson = json.loads(json_str) - fin_dict = {'queue_name' : queue_name, 'connection_string' : jsson['connectionString'] , 'storage_account' : storage_account_name, 'group':resource_group_name} - + jsson = json.loads(json_str) + fin_dict = {'queue_name' : queue_name, 'connection_string' : jsson['connectionString'] , 'storage_account' : storage_account_name, 'group':resource_group_name} + elif csp=="aws": + queue_creation_command = f"aws sqs create-queue --queue-name {queue_name}" + stream = os.popen(queue_creation_command) + output = stream.read() + stream.close() + queue_url = json.loads(output)["QueueUrl"] + print("queue created") + fin_dict = {"queue_name": queue_name, "connection_string": queue_url} ## copy collect_logs_dir to user_wf_dir using shutil copytree os.system(f'cp -r {collect_logs_dir} {user_wf_dir}') connection_string_template = 'CONNECTION_STRING' queue_name_template = 'QUEUE_NAME' - + coll_csp_template="COLL_CSP" ##open the file and replace the connection string and queue name with open(f'{new_collect_logs_dir}/func.py', 'r') as file : filedata = file.read() filedata = filedata.replace(connection_string_template, fin_dict['connection_string']) filedata = filedata.replace(queue_name_template, fin_dict['queue_name']) + filedata = filedata.replace('COLL_CSP', csp.lower()) with open(f'{new_collect_logs_dir}/func.py', 'w') as file: file.write(filedata) From a17bfa6657343adba9a79f67c1ac0c929d752847 Mon Sep 17 00:00:00 2001 From: Tarun Pal Date: Fri, 14 Jun 2024 17:21:39 +0530 Subject: [PATCH 02/10] Rermoving quantum dependancies --- qutils/__init__.py | 41 ---------- qutils/marshaller.py | 66 ---------------- qutils/program_serializers.py | 72 ----------------- qutils/s3utils.py | 46 ----------- qutils/serializers.py | 91 ---------------------- serwo/qsserializers/__init__.py | 41 ---------- serwo/qsserializers/program_serializers.py | 72 ----------------- serwo/qsserializers/serializers.py | 91 ---------------------- 8 files changed, 520 deletions(-) delete mode 100644 qutils/__init__.py delete mode 100644 qutils/marshaller.py delete mode 100644 qutils/program_serializers.py delete mode 100644 qutils/s3utils.py delete mode 100644 qutils/serializers.py delete mode 100644 serwo/qsserializers/__init__.py delete mode 100644 serwo/qsserializers/program_serializers.py delete mode 100644 serwo/qsserializers/serializers.py diff --git a/qutils/__init__.py b/qutils/__init__.py deleted file mode 100644 index 4a384fb..0000000 --- a/qutils/__init__.py +++ /dev/null @@ -1,41 +0,0 @@ -# This code is a Qiskit project. -# -# (C) Copyright IBM 2022. -# -# This code is licensed under the Apache License, Version 2.0. You may -# obtain a copy of this license in the LICENSE.txt file in the root directory -# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. -# -# Any modifications or derivative works of this code must retain this -# copyright notice, and modified files need to carry a notice indicating -# that they have been altered from the originals. - -""" -=================================================== -Serializers (:mod:`quantum_serverless.serializers`) -=================================================== - -.. currentmodule:: quantum_serverless.serializers - -Quantum serverless serializers -============================== - -.. autosummary:: - :toctree: ../stubs/ - - register_all_serializers - circuit_serializer - circuit_deserializer - service_serializer - service_deserializer - get_arguments -""" - -from .serializers import ( - circuit_serializer, - circuit_deserializer, - service_serializer, - service_deserializer, -) - -from .program_serializers import * diff --git a/qutils/marshaller.py b/qutils/marshaller.py deleted file mode 100644 index 1825260..0000000 --- a/qutils/marshaller.py +++ /dev/null @@ -1,66 +0,0 @@ -# OM NAMO GANAPATHAYEN NAMAHA -from . import serializers, program_serializers -import json -from qiskit.quantum_info import PauliList - - -def jsonifyCuts(subexperiments): - jsonDict = {} - for key, items in subexperiments.items(): - print(key, len(items)) - l = [] - for item in items: - l.append(serializers.circuit_serializer(item)) - jsonDict[key] = l - return json.dumps(jsonDict) - -def objectifyCuts(data): - jsonData = json.loads(data) - objData = {} - for key, items in jsonData.items(): - objData[key] = [serializers.circuit_deserializer(item) for item in items] - return objData - -def objectify_specific(data, index): - data_as_dict = json.loads(data) - serialized_sub_experiments = data_as_dict['sub-experiments'] - sub_experiment_serialized = serialized_sub_experiments[index] - sub_experiment = serializers.circuit_deserializer(sub_experiment_serialized) - return sub_experiment - -def coefficients_to_list(coeffcients): - l = [] - for c in coeffcients: - l.append((c[0], int(c[1].value))) - return json.dumps(l) - -# def json2coefficients(jsonData): -# return json.loads(jsonData) - -def sub_observables_to_dict(subobservables): - d = {} - for k, pauilist in subobservables.items(): - d[k] = pauilist.to_labels() - return d - -def dict_to_sub_observables(data): - output_d = {} - for k, pauilist_str in data.items(): - output_d[int(k)] = PauliList(pauilist_str) - return output_d - -def decode_results(results): - decoded_results = {} - for key, result in results.items(): - decoded_result = json.loads(result, cls=program_serializers.QiskitObjectsDecoder) - for i in range(len(decoded_result.quasi_dists)): - decoded_result.quasi_dists[i] = fix_quasidist(decoded_result.quasi_dists[i]) - decoded_results[int(key)] = decoded_result - return decoded_results - -def fix_quasidist(data): - di = {} - for k, v in data.items(): - ki = int(k) - di[ki] = v - return di \ No newline at end of file diff --git a/qutils/program_serializers.py b/qutils/program_serializers.py deleted file mode 100644 index a16f7ed..0000000 --- a/qutils/program_serializers.py +++ /dev/null @@ -1,72 +0,0 @@ -# This code is a Qiskit project. -# -# (C) Copyright IBM 2022. -# -# This code is licensed under the Apache License, Version 2.0. You may -# obtain a copy of this license in the LICENSE.txt file in the root directory -# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. -# -# Any modifications or derivative works of this code must retain this -# copyright notice, and modified files need to carry a notice indicating -# that they have been altered from the originals. - -""" -======================================================================= -Serializers (:mod:`quantum_serverless.serializers.program_serializers`) -======================================================================= - -.. currentmodule:: quantum_serverless.serializers.program_serializers - -Quantum serverless program serializers -====================================== - -.. autosummary:: - :toctree: ../stubs/ - - QiskitObjectsDecoder - QiskitObjectsEncoder -""" -import json -import os -from typing import Any, Dict -from qiskit.primitives import SamplerResult, EstimatorResult -from qiskit_ibm_runtime import QiskitRuntimeService -from qiskit_ibm_runtime.utils.json import RuntimeDecoder, RuntimeEncoder - -class QiskitObjectsEncoder(RuntimeEncoder): - """Json encoder for Qiskit objects.""" - - def default(self, obj: Any) -> Any: - if isinstance(obj, QiskitRuntimeService): - return { - "__type__": "QiskitRuntimeService", - "__value__": obj.active_account(), - } - if isinstance(obj, SamplerResult): - return { - "__type__": "SamplerResult", - "__value__": {"quasi_dists": obj.quasi_dists, "metadata": obj.metadata}, - } - if isinstance(obj, EstimatorResult): - return { - "__type__": "EstimatorResult", - "__value__": {"values": obj.values, "metadata": obj.metadata}, - } - return super().default(obj) - - -class QiskitObjectsDecoder(RuntimeDecoder): - """Json decoder for Qiskit objects.""" - - def object_hook(self, obj: Any) -> Any: - if "__type__" in obj: - obj_type = obj["__type__"] - - if obj_type == "QiskitRuntimeService": - return QiskitRuntimeService(**obj["__value__"]) - if obj_type == "SamplerResult": - return SamplerResult(**obj["__value__"]) - if obj_type == "EstimatorResult": - return EstimatorResult(**obj["__value__"]) - return super().object_hook(obj) - return obj diff --git a/qutils/s3utils.py b/qutils/s3utils.py deleted file mode 100644 index 88b076d..0000000 --- a/qutils/s3utils.py +++ /dev/null @@ -1,46 +0,0 @@ -# OM NAMO GANAPATHAYEN NAMAHA -import boto3 -import json -from dataclasses import dataclass -import re - -@dataclass -class S3Arguments: - filename: str - bucket_name: str - region: str - key_id: str - secret_access_key: str - -def fetch_relevent_file_data(args: S3Arguments, filter_regex): - s3 = boto3.resource('s3', - region_name=args.region, - aws_access_key_id=args.key_id, - aws_secret_access_key=args.secret_access_key) - bucket = s3.Bucket(args.bucket_name) - - all_data = [] - for bucket_item in bucket.objects.all(): - if re.search(filter_regex, bucket_item) is not None: - data_of_item = s3.get_object(Bucket=args.bucket_name, Key=args.filename) - all_data.append(data_of_item) - return all_data - -def read_from_bucket(args: S3Arguments): - s3 = boto3.resource('s3', - region_name=args.region, - aws_access_key_id=args.key_id, - aws_secret_access_key=args.secret_access_key) - obj = s3.get_object(Bucket=args.bucket_name, Key=args.filename) - if obj is None: - return None - data = json.loads(obj['Body'].read()) - return data - -def write_to_bucket(args: S3Arguments, \ - data: dict): - s3 = boto3.resource('s3', - region_name=args.region, - aws_access_key_id=args.key_id, - aws_secret_access_key=args.secret_access_key) - s3.Object(args.bucket_name, args.filename).put(Body=json.dumps(data)) \ No newline at end of file diff --git a/qutils/serializers.py b/qutils/serializers.py deleted file mode 100644 index b037fae..0000000 --- a/qutils/serializers.py +++ /dev/null @@ -1,91 +0,0 @@ -# This code is a Qiskit project. -# -# (C) Copyright IBM 2022. -# -# This code is licensed under the Apache License, Version 2.0. You may -# obtain a copy of this license in the LICENSE.txt file in the root directory -# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. -# -# Any modifications or derivative works of this code must retain this -# copyright notice, and modified files need to carry a notice indicating -# that they have been altered from the originals. - -""" -=============================================================== -Serializers (:mod:`quantum_serverless.serializers.serializers`) -=============================================================== - -.. currentmodule:: quantum_serverless.serializers.serializers - -Quantum serverless serializers -============================== - -.. autosummary:: - :toctree: ../stubs/ - - register_all_serializers - circuit_serializer - circuit_deserializer - service_serializer - service_deserializer -""" -import base64 -import io -import zlib - -from qiskit import QuantumCircuit, qpy -from qiskit_ibm_runtime import QiskitRuntimeService - - -def circuit_serializer(circuit: QuantumCircuit) -> str: - """Serializes QuantumCircuit into string. - - Args: - circuit: Qiskit QuantumCircuit object to serialize - - Returns: - circuit encoded in string - """ - buff = io.BytesIO() - qpy.dump(circuit, buff) - buff.seek(0) - serialized_data = buff.read() - buff.close() - serialized_data = zlib.compress(serialized_data) - return base64.standard_b64encode(serialized_data).decode("utf-8") - - -def circuit_deserializer(encoded_circuit: str) -> QuantumCircuit: - """Deserialize encoded QuantumCircuit object. - - Args: - encoded_circuit: encoded circuit - - Returns: - QuantumCircuit decoded object - """ - buff = io.BytesIO() - decoded = base64.standard_b64decode(encoded_circuit) - decoded = zlib.decompress(decoded) - buff.write(decoded) - buff.seek(0) - orig = qpy.load(buff) - buff.close() - return orig[0] - - -def service_serializer(service: QiskitRuntimeService): - """Serializes QiskitRuntimeService.""" - return service.active_account() - - -def service_deserializer(account: dict): - """Deserializes QiskitRuntimeService. - - Args: - account: Dict from calling `QiskitRuntimeService.active_account` - - Returns: - QiskitRuntimeService instance - """ - return QiskitRuntimeService(**account) diff --git a/serwo/qsserializers/__init__.py b/serwo/qsserializers/__init__.py deleted file mode 100644 index 4a384fb..0000000 --- a/serwo/qsserializers/__init__.py +++ /dev/null @@ -1,41 +0,0 @@ -# This code is a Qiskit project. -# -# (C) Copyright IBM 2022. -# -# This code is licensed under the Apache License, Version 2.0. You may -# obtain a copy of this license in the LICENSE.txt file in the root directory -# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. -# -# Any modifications or derivative works of this code must retain this -# copyright notice, and modified files need to carry a notice indicating -# that they have been altered from the originals. - -""" -=================================================== -Serializers (:mod:`quantum_serverless.serializers`) -=================================================== - -.. currentmodule:: quantum_serverless.serializers - -Quantum serverless serializers -============================== - -.. autosummary:: - :toctree: ../stubs/ - - register_all_serializers - circuit_serializer - circuit_deserializer - service_serializer - service_deserializer - get_arguments -""" - -from .serializers import ( - circuit_serializer, - circuit_deserializer, - service_serializer, - service_deserializer, -) - -from .program_serializers import * diff --git a/serwo/qsserializers/program_serializers.py b/serwo/qsserializers/program_serializers.py deleted file mode 100644 index a16f7ed..0000000 --- a/serwo/qsserializers/program_serializers.py +++ /dev/null @@ -1,72 +0,0 @@ -# This code is a Qiskit project. -# -# (C) Copyright IBM 2022. -# -# This code is licensed under the Apache License, Version 2.0. You may -# obtain a copy of this license in the LICENSE.txt file in the root directory -# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. -# -# Any modifications or derivative works of this code must retain this -# copyright notice, and modified files need to carry a notice indicating -# that they have been altered from the originals. - -""" -======================================================================= -Serializers (:mod:`quantum_serverless.serializers.program_serializers`) -======================================================================= - -.. currentmodule:: quantum_serverless.serializers.program_serializers - -Quantum serverless program serializers -====================================== - -.. autosummary:: - :toctree: ../stubs/ - - QiskitObjectsDecoder - QiskitObjectsEncoder -""" -import json -import os -from typing import Any, Dict -from qiskit.primitives import SamplerResult, EstimatorResult -from qiskit_ibm_runtime import QiskitRuntimeService -from qiskit_ibm_runtime.utils.json import RuntimeDecoder, RuntimeEncoder - -class QiskitObjectsEncoder(RuntimeEncoder): - """Json encoder for Qiskit objects.""" - - def default(self, obj: Any) -> Any: - if isinstance(obj, QiskitRuntimeService): - return { - "__type__": "QiskitRuntimeService", - "__value__": obj.active_account(), - } - if isinstance(obj, SamplerResult): - return { - "__type__": "SamplerResult", - "__value__": {"quasi_dists": obj.quasi_dists, "metadata": obj.metadata}, - } - if isinstance(obj, EstimatorResult): - return { - "__type__": "EstimatorResult", - "__value__": {"values": obj.values, "metadata": obj.metadata}, - } - return super().default(obj) - - -class QiskitObjectsDecoder(RuntimeDecoder): - """Json decoder for Qiskit objects.""" - - def object_hook(self, obj: Any) -> Any: - if "__type__" in obj: - obj_type = obj["__type__"] - - if obj_type == "QiskitRuntimeService": - return QiskitRuntimeService(**obj["__value__"]) - if obj_type == "SamplerResult": - return SamplerResult(**obj["__value__"]) - if obj_type == "EstimatorResult": - return EstimatorResult(**obj["__value__"]) - return super().object_hook(obj) - return obj diff --git a/serwo/qsserializers/serializers.py b/serwo/qsserializers/serializers.py deleted file mode 100644 index b037fae..0000000 --- a/serwo/qsserializers/serializers.py +++ /dev/null @@ -1,91 +0,0 @@ -# This code is a Qiskit project. -# -# (C) Copyright IBM 2022. -# -# This code is licensed under the Apache License, Version 2.0. You may -# obtain a copy of this license in the LICENSE.txt file in the root directory -# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. -# -# Any modifications or derivative works of this code must retain this -# copyright notice, and modified files need to carry a notice indicating -# that they have been altered from the originals. - -""" -=============================================================== -Serializers (:mod:`quantum_serverless.serializers.serializers`) -=============================================================== - -.. currentmodule:: quantum_serverless.serializers.serializers - -Quantum serverless serializers -============================== - -.. autosummary:: - :toctree: ../stubs/ - - register_all_serializers - circuit_serializer - circuit_deserializer - service_serializer - service_deserializer -""" -import base64 -import io -import zlib - -from qiskit import QuantumCircuit, qpy -from qiskit_ibm_runtime import QiskitRuntimeService - - -def circuit_serializer(circuit: QuantumCircuit) -> str: - """Serializes QuantumCircuit into string. - - Args: - circuit: Qiskit QuantumCircuit object to serialize - - Returns: - circuit encoded in string - """ - buff = io.BytesIO() - qpy.dump(circuit, buff) - buff.seek(0) - serialized_data = buff.read() - buff.close() - serialized_data = zlib.compress(serialized_data) - return base64.standard_b64encode(serialized_data).decode("utf-8") - - -def circuit_deserializer(encoded_circuit: str) -> QuantumCircuit: - """Deserialize encoded QuantumCircuit object. - - Args: - encoded_circuit: encoded circuit - - Returns: - QuantumCircuit decoded object - """ - buff = io.BytesIO() - decoded = base64.standard_b64decode(encoded_circuit) - decoded = zlib.decompress(decoded) - buff.write(decoded) - buff.seek(0) - orig = qpy.load(buff) - buff.close() - return orig[0] - - -def service_serializer(service: QiskitRuntimeService): - """Serializes QiskitRuntimeService.""" - return service.active_account() - - -def service_deserializer(account: dict): - """Deserializes QiskitRuntimeService. - - Args: - account: Dict from calling `QiskitRuntimeService.active_account` - - Returns: - QiskitRuntimeService instance - """ - return QiskitRuntimeService(**account) From 08e64fd7c3bb9ff8284583650abcaab3c36b9dff Mon Sep 17 00:00:00 2001 From: Tarun Pal Date: Fri, 14 Jun 2024 17:22:20 +0530 Subject: [PATCH 03/10] Updating gitignore for quantum dependancies --- .gitignore | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.gitignore b/.gitignore index b6698cc..ba8b825 100644 --- a/.gitignore +++ b/.gitignore @@ -38,5 +38,5 @@ jmeter.log *.csv deployments.txt *.pdf -qutils -qsserializers \ No newline at end of file +qutils/ +qsserializers/ \ No newline at end of file From ad5ce210af8c1f87df11ea90f351aa7f30b0ea16 Mon Sep 17 00:00:00 2001 From: Tarun Pal Date: Fri, 21 Jun 2024 14:47:29 +0530 Subject: [PATCH 04/10] Updated xfaas user config file --- serwo/config/xfaas_user_config.json | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/serwo/config/xfaas_user_config.json b/serwo/config/xfaas_user_config.json index 597a936..8f37c50 100644 --- a/serwo/config/xfaas_user_config.json +++ b/serwo/config/xfaas_user_config.json @@ -10,6 +10,7 @@ } }, "user_pinned_nodes" : { - + "1":"0", + "2":"0" } } \ No newline at end of file From 1fbcfc1cd84f6ea1c4487c47e10b51390f4f3b25 Mon Sep 17 00:00:00 2001 From: Tarun Pal Date: Fri, 21 Jun 2024 14:51:38 +0530 Subject: [PATCH 05/10] ignoring build directory --- .gitignore | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index ba8b825..2b2a386 100644 --- a/.gitignore +++ b/.gitignore @@ -26,7 +26,7 @@ analysis/ stashed/ serwo/examples/graphAwsFourRps serwo/examples/graphAwsMedium -build +build/ *.jsonl log_files serwo/config From 86028110605e59e7ffa571ace80383706e20c146 Mon Sep 17 00:00:00 2001 From: Tarun Pal Date: Mon, 24 Jun 2024 15:52:12 +0530 Subject: [PATCH 06/10] Seprating AWS and AZURE Collect logs --- .../CollectLogs/Dockerfile | 16 ++++++++++ .../predefined-functions/CollectLogs/func.py | 29 +++++++++++++++++++ .../CollectLogs/requirements.txt | 5 ++++ .../predefined-functions/CollectLogs/func.py | 12 ++------ .../CollectLogs/requirements.txt | 3 +- serwo/xfaas_main.py | 7 +++-- 6 files changed, 58 insertions(+), 14 deletions(-) create mode 100644 serwo/templates/aws/predefined-functions/CollectLogs/Dockerfile create mode 100644 serwo/templates/aws/predefined-functions/CollectLogs/func.py create mode 100644 serwo/templates/aws/predefined-functions/CollectLogs/requirements.txt diff --git a/serwo/templates/aws/predefined-functions/CollectLogs/Dockerfile b/serwo/templates/aws/predefined-functions/CollectLogs/Dockerfile new file mode 100644 index 0000000..dc76c24 --- /dev/null +++ b/serwo/templates/aws/predefined-functions/CollectLogs/Dockerfile @@ -0,0 +1,16 @@ +# FROM ubuntu:22.04 +# RUN apt-get update && apt-get install -y python3 +# FROM seshapad/aws-lambda-qiskit:base +FROM public.ecr.aws/lambda/python:3.10 +# COPY standalone_app_runner.py requirements.txt ./ +# # ADD python ./python +# RUN pip install --upgrade pip +# RUN pip install -r requirements.txt +# # Command can be overwritten by providing a different command in the template directly. +# CMD ["standalone_app_runner.lambda_handler"] +# FROM my:latest +# RUN pip install --upgrade pip +COPY . . +RUN pip install -r requirements.txt +# Command can be overwritten by providing a different command in the template directly. +CMD ["standalone_app_runner.lambda_handler"] \ No newline at end of file diff --git a/serwo/templates/aws/predefined-functions/CollectLogs/func.py b/serwo/templates/aws/predefined-functions/CollectLogs/func.py new file mode 100644 index 0000000..46b019f --- /dev/null +++ b/serwo/templates/aws/predefined-functions/CollectLogs/func.py @@ -0,0 +1,29 @@ +from azure.storage.queue import QueueClient +import json +from python.src.utils.classes.commons.serwo_objects import SerWOObject +import os, uuid +import logging +import boto3 + +connect_str = "CONNECTION_STRING" +queue_name = "QUEUE_NAME" +# csp = "COLL_CSP" + +queue = boto3.client("sqs") + + +def user_function(serwoObject) -> SerWOObject: + try: + fin_dict = dict() + data = serwoObject.get_body() + logging.info("Data to push - "+str(data)) + metadata = serwoObject.get_metadata() + fin_dict["data"] = data + fin_dict["metadata"] = metadata + logging.info("Fin dict - "+str(fin_dict)) + queue.send_message(MessageBody=json.dumps(fin_dict), QueueUrl=connect_str) + # data = {"body": "success: OK"} + return SerWOObject(body=serwoObject.get_body()) + except Exception as e: + return SerWOObject(error=True) + \ No newline at end of file diff --git a/serwo/templates/aws/predefined-functions/CollectLogs/requirements.txt b/serwo/templates/aws/predefined-functions/CollectLogs/requirements.txt new file mode 100644 index 0000000..c4db13a --- /dev/null +++ b/serwo/templates/aws/predefined-functions/CollectLogs/requirements.txt @@ -0,0 +1,5 @@ +azure-storage-queue +psutil +requests +py-cpuinfo +boto3 \ No newline at end of file diff --git a/serwo/templates/azure/predefined-functions/CollectLogs/func.py b/serwo/templates/azure/predefined-functions/CollectLogs/func.py index ccd5cd0..b204577 100644 --- a/serwo/templates/azure/predefined-functions/CollectLogs/func.py +++ b/serwo/templates/azure/predefined-functions/CollectLogs/func.py @@ -7,12 +7,9 @@ connect_str = "CONNECTION_STRING" queue_name = "QUEUE_NAME" -csp = "COLL_CSP" +# csp = "COLL_CSP" -if csp == "aws": - queue = boto3.client("sqs") -elif csp == "azure": - queue = QueueClient.from_connection_string( +queue = QueueClient.from_connection_string( conn_str=connect_str, queue_name=queue_name ) @@ -26,10 +23,7 @@ def user_function(serwoObject) -> SerWOObject: fin_dict["data"] = data fin_dict["metadata"] = metadata logging.info("Fin dict - "+str(fin_dict)) - if csp == "azure": - queue.send_message(json.dumps(fin_dict)) - elif csp == "aws": - queue.send_message(MessageBody=json.dumps(fin_dict), QueueUrl=connect_str) + queue.send_message(json.dumps(fin_dict)) # data = {"body": "success: OK"} return SerWOObject(body=serwoObject.get_body()) except Exception as e: diff --git a/serwo/templates/azure/predefined-functions/CollectLogs/requirements.txt b/serwo/templates/azure/predefined-functions/CollectLogs/requirements.txt index c4db13a..4d753b2 100644 --- a/serwo/templates/azure/predefined-functions/CollectLogs/requirements.txt +++ b/serwo/templates/azure/predefined-functions/CollectLogs/requirements.txt @@ -1,5 +1,4 @@ azure-storage-queue psutil requests -py-cpuinfo -boto3 \ No newline at end of file +py-cpuinfo \ No newline at end of file diff --git a/serwo/xfaas_main.py b/serwo/xfaas_main.py index aa28e25..ae42d06 100644 --- a/serwo/xfaas_main.py +++ b/serwo/xfaas_main.py @@ -71,12 +71,12 @@ def add_collect_logs(dag_definition_path,user_wf_dir, xfaas_user_dag,region): # region = 'centralindia' - collect_logs_dir = f'{project_dir}/templates/azure/predefined-functions/CollectLogs' new_collect_logs_dir = f'{user_wf_dir}/CollectLogs' - + collect_logs_dir='' queue_name = randomString(5) if csp=="azure": + collect_logs_dir = f'{project_dir}/templates/azure/predefined-functions/CollectLogs' print('creating xfaas logging queue') resource_group_name = f"xfaasLog{region}" storage_account_name = f"xfaaslog{region}" @@ -126,6 +126,7 @@ def add_collect_logs(dag_definition_path,user_wf_dir, xfaas_user_dag,region): fin_dict = {'queue_name' : queue_name, 'connection_string' : jsson['connectionString'] , 'storage_account' : storage_account_name, 'group':resource_group_name} elif csp=="aws": + collect_logs_dir = f'{project_dir}/templates/aws/predefined-functions/CollectLogs' queue_creation_command = f"aws sqs create-queue --queue-name {queue_name}" stream = os.popen(queue_creation_command) output = stream.read() @@ -139,7 +140,7 @@ def add_collect_logs(dag_definition_path,user_wf_dir, xfaas_user_dag,region): connection_string_template = 'CONNECTION_STRING' queue_name_template = 'QUEUE_NAME' - coll_csp_template="COLL_CSP" + # coll_csp_template="COLL_CSP" ##open the file and replace the connection string and queue name with open(f'{new_collect_logs_dir}/func.py', 'r') as file : filedata = file.read() From 20a226f465a864b65153de7cd95c1c059863b0de Mon Sep 17 00:00:00 2001 From: Tarun Pal Date: Mon, 24 Jun 2024 16:08:01 +0530 Subject: [PATCH 07/10] Code cleaning removing unnecessary dependancies --- serwo/templates/aws/predefined-functions/CollectLogs/func.py | 4 ++-- .../templates/azure/predefined-functions/CollectLogs/func.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/serwo/templates/aws/predefined-functions/CollectLogs/func.py b/serwo/templates/aws/predefined-functions/CollectLogs/func.py index 46b019f..d5fd8fa 100644 --- a/serwo/templates/aws/predefined-functions/CollectLogs/func.py +++ b/serwo/templates/aws/predefined-functions/CollectLogs/func.py @@ -1,7 +1,7 @@ -from azure.storage.queue import QueueClient +# from azure.storage.queue import QueueClient import json from python.src.utils.classes.commons.serwo_objects import SerWOObject -import os, uuid +# import os, uuid import logging import boto3 diff --git a/serwo/templates/azure/predefined-functions/CollectLogs/func.py b/serwo/templates/azure/predefined-functions/CollectLogs/func.py index b204577..d307c1b 100644 --- a/serwo/templates/azure/predefined-functions/CollectLogs/func.py +++ b/serwo/templates/azure/predefined-functions/CollectLogs/func.py @@ -1,9 +1,9 @@ from azure.storage.queue import QueueClient import json from python.src.utils.classes.commons.serwo_objects import SerWOObject -import os, uuid +# import os, uuid import logging -import boto3 +# import boto3 connect_str = "CONNECTION_STRING" queue_name = "QUEUE_NAME" From 8336581871f8bc1e496750dcd1b63113a323b530 Mon Sep 17 00:00:00 2001 From: Tarun Pal Date: Mon, 24 Jun 2024 16:17:25 +0530 Subject: [PATCH 08/10] Code cleaning removing unnecessary dependancies --- .../aws/predefined-functions/CollectLogs/requirements.txt | 1 - 1 file changed, 1 deletion(-) diff --git a/serwo/templates/aws/predefined-functions/CollectLogs/requirements.txt b/serwo/templates/aws/predefined-functions/CollectLogs/requirements.txt index c4db13a..10c3a83 100644 --- a/serwo/templates/aws/predefined-functions/CollectLogs/requirements.txt +++ b/serwo/templates/aws/predefined-functions/CollectLogs/requirements.txt @@ -1,4 +1,3 @@ -azure-storage-queue psutil requests py-cpuinfo From 58ed566d9ea52941fa2fd3a7707ff927df559715 Mon Sep 17 00:00:00 2001 From: Tarun Pal Date: Thu, 11 Jul 2024 16:34:24 +0530 Subject: [PATCH 09/10] Adding map service --- serwo/aws_create_statemachine.py | 66 +++++++++- serwo/azure_create_statemachine.py | 4 +- .../runner-templates/aws/runner_template.py | 4 +- .../python/src/utils/classes/aws/user_dag.py | 12 ++ serwo/scripts/azure/azure_builder.py | 11 +- serwo/scripts/azure/map_update.py | 120 ++++++++++++++++++ .../azure/orchestrator_async_update.py | 4 +- .../templates/azure/orchestrator_template.py | 12 +- 8 files changed, 218 insertions(+), 15 deletions(-) create mode 100644 serwo/scripts/azure/map_update.py diff --git a/serwo/aws_create_statemachine.py b/serwo/aws_create_statemachine.py index 90723a0..a053d61 100644 --- a/serwo/aws_create_statemachine.py +++ b/serwo/aws_create_statemachine.py @@ -182,11 +182,6 @@ def __create_lambda_fns( with open(temp_runner_path, "w") as file: file.write(contents) - # TODO - Fix the stickytape issue for AWS - # GitHub Issue link - https://github.com/dream-lab/XFaaS/issues/4 - """ - Sticytape the runner - """ logger.info(f"Stickytape the runner template for dependency resolution") runner_file_path = fn_dir / f"{runner_filename}.py" print("Temprory runner path",temp_runner_path) @@ -346,6 +341,8 @@ def __generate_asl_template(self): dag_json=self.__user_dag.get_user_dag_nodes() + map_list=self.__user_dag.get_map_list() + list_async_fns= get_set_of_async_funtions(dag_json) # print("List of Async Fns:",list_async_fns) changing_fns=set() @@ -359,6 +356,8 @@ def __generate_asl_template(self): # print("List of changing funtions:",changing_fns_list) # Statemachine.asl.josn should be changed here sfn_json_copy = copy.deepcopy(json.loads(sfn_json)) + sfn_json_copy = self.add_map_flow(sfn_json_copy,map_list) + print(sfn_json_copy) data=add_async_afn_builder(sfn_json_copy,changing_fns_list) # print("Updated data of sfn builder after adding poll",data) with open(f"{self.__aws_build_dir}/{self.__json_file}", "w") as statemachinejson: @@ -455,6 +454,24 @@ def deploy_workflow(self): json.dump(data, f, indent=4) return self.__outputs_filepath + + def add_map_flow(self,asl_json,subgraphs): + if subgraphs==None: + return asl_json + for i in range(0,len(subgraphs)): + subgraph=subgraphs[i] + if asl_json["StartAt"]==subgraph["Nodes"][0]: + asl_json["StartAt"]="Map"+str(i) + extracted_states={} + for node in subgraph["Nodes"]: + extracted_states[node]=asl_json["States"][node] + del asl_json["States"][node] + parent_node=self.__user_dag.get_parent_nodename(subgraph["Nodes"][0]) + if parent_node != None: + asl_json["States"][parent_node]["Next"] = "Map"+str(i) + + asl_json["States"]["Map"+str(i)]=map_template(subgraph,extracted_states) + return asl_json def add_async_afn_builder(data,list): @@ -681,4 +698,41 @@ def copy_if_not_exists(source_directory, destination_directory): if os.path.isdir(src_path): shutil.copytree(src_path, dst_path) # Copy entire folder else: - shutil.copy(src_path, dst_path) # Copy individual file \ No newline at end of file + shutil.copy(src_path, dst_path) # Copy individual file + +def map_template(subgraph,extracted_states): + lastnode=subgraph["Nodes"][-1] + next_node=extracted_states[lastnode]["Next"] + del extracted_states[lastnode]["Next"] + extracted_states[lastnode]["End"]=True + template={ + "Type": "Map", + "ItemProcessor": { + "ProcessorConfig": { + "Mode": "INLINE" + }, + "StartAt": subgraph["Nodes"][0], + "States": extracted_states + }, + "Next": next_node, + "ResultPath": "$.body."+subgraph["Listname"]+"Result", + "ItemsPath": "$.body."+subgraph["Listname"] + } + return template +# def add_map_flow(self,asl_json,subgraphs): +# if subgraphs==None: +# return asl_json +# for i in range(0,len(subgraphs)): +# subgraph=subgraphs[i] +# if asl_json["StartAt"]==subgraph["Nodes"][0]: +# asl_json["StartAt"]="Map"+str(i) +# extracted_states={} +# for node in subgraph["Nodes"]: +# extracted_states[node]=asl_json["States"][node] +# del asl_json["States"][node] +# parent_node=self.__user_dag.get_parent_nodename() +# if parent_node != None: +# asl_json["States"][parent_node]["Next"] = "Map"+str(i) + +# asl_json["States"]["Map"+str(i)]=map_template(subgraph,extracted_states) +# return asl_json \ No newline at end of file diff --git a/serwo/azure_create_statemachine.py b/serwo/azure_create_statemachine.py index b82ef19..a9139f0 100644 --- a/serwo/azure_create_statemachine.py +++ b/serwo/azure_create_statemachine.py @@ -17,7 +17,9 @@ def run(user_directory,user_dag_file_name): output_dir = f"{user_directory}" dag_definition_path = f"{user_directory}/{user_dag_file_name}" orchestrator_filepath = f"{user_directory}/orchestrator.py" - + if os.path.exists(orchestrator_filepath): + os.remove(orchestrator_filepath) + print(f"File '{orchestrator_filepath}' already exists so removed successfully.") # load the dag user_dag = AzureUserDAG(dag_definition_path) orchestrator_code = user_dag.get_orchestrator_code() diff --git a/serwo/python/src/runner-templates/aws/runner_template.py b/serwo/python/src/runner-templates/aws/runner_template.py index 3461074..173aede 100644 --- a/serwo/python/src/runner-templates/aws/runner_template.py +++ b/serwo/python/src/runner-templates/aws/runner_template.py @@ -16,7 +16,7 @@ from copy import deepcopy from python.src.utils.classes.commons.serwo_objects import build_serwo_object from python.src.utils.classes.commons.serwo_objects import build_serwo_list_object -from python.src.utils.classes.commons.serwo_objects import SerWOObject +from python.src.utils.classes.commons.serwo_objects import SerWOObject, SerWOObjectsList downstream = 0 """ @@ -61,6 +61,8 @@ def lambda_handler(event, context): request_timestamp = event["body"].get("request_timestamp") session_id = event["body"].get("session_id") # NOTE - new variable to keep track of requests deployment_id = event["body"].get("deployment_id") + if request_timestamp==None: + request_timestamp=0 overheads = start_time - request_timestamp event["metadata"] = dict( workflow_instance_id=wf_instance_id, diff --git a/serwo/python/src/utils/classes/aws/user_dag.py b/serwo/python/src/utils/classes/aws/user_dag.py index 3d35944..9ea7278 100644 --- a/serwo/python/src/utils/classes/aws/user_dag.py +++ b/serwo/python/src/utils/classes/aws/user_dag.py @@ -223,6 +223,10 @@ def get_user_dag_name(self): # get a map: function_name -> function_object def get_node_object_map(self): return self.__functions + def get_map_list(self): + if "SubGraphs" in self.__dag_config_data: + return self.__dag_config_data["SubGraphs"] + return None # get a list of node dictionaries def get_node_param_list(self): @@ -248,6 +252,14 @@ def get_statemachine_structure(self): tasklist = output_dag.nodes[node]["machine_list"] return tasklist + def get_parent_nodename(self, node_name): + for node in self.__dag_config_data["Edges"]: + for key, value in node.items(): + for name in value: + if name == node_name: + return key + return None + def get_successor_node_names(self, node_name): # Get the node ID corresponding to the given node name node_id = self.__nodeIDMap.get(node_name) diff --git a/serwo/scripts/azure/azure_builder.py b/serwo/scripts/azure/azure_builder.py index c86ae0b..7a3abe1 100644 --- a/serwo/scripts/azure/azure_builder.py +++ b/serwo/scripts/azure/azure_builder.py @@ -5,6 +5,7 @@ from jinja2 import Environment, FileSystemLoader from signal import signal, SIGPIPE, SIG_DFL from .orchestrator_async_update import async_update +from .map_update import map_update signal(SIGPIPE,SIG_DFL) from random import randint @@ -57,7 +58,7 @@ def init_paths(): def build_working_dir(region,part_id,is_netherite): global az_functions_path,build_dir - dummy, user_workflow_name = get_user_workflow_details() + dummy, user_workflow_name,dummy2 = get_user_workflow_details() if is_netherite: build_dir += f"azure_v2-{region}-{part_id}" @@ -72,7 +73,10 @@ def get_user_workflow_details(): json_path = user_workflow_directory + '/' + user_dag_file_name data = json.load(open(json_path)) fns_data = data['Nodes'] - return fns_data,data['WorkflowName'] + subgraphs= None + if "SubGraphs" in data: + subgraphs = data["SubGraphs"] + return fns_data, data['WorkflowName'], subgraphs def get_set_of_async_funtions(fns_data): # json_path = user_workflow_directory + '/' + user_dag_file_name @@ -353,7 +357,7 @@ def build(user_dir, dag_definition_file, region, part_id,is_netherite): DAG_DEFINITION_FILE = dag_definition_file init_paths() build_working_dir(region,part_id,is_netherite) - user_fns_data, user_app_name = get_user_workflow_details() + user_fns_data, user_app_name, subgraphs = get_user_workflow_details() ingress_queue_name, app_name = generate_app_name_and_populate_and_get_ingress_queue_name(user_app_name,region,part_id,is_netherite) build_user_fn_dirs(user_fns_data) copy_meta_files(user_fns_data,ingress_queue_name,app_name,is_netherite) @@ -365,6 +369,7 @@ def build(user_dir, dag_definition_file, region, part_id,is_netherite): print("Async_Fn_detction",async_func_set) print("Orchestrator initial path:",orchestrator_generated_path) print("Orchestrator dest path:",orch_dest_path) + map_update.add_graph_node(orchestrator_generated_path,orchestrator_generated_path,subgraphs) async_update.orchestrator_async_update(orchestrator_generated_path,orch_dest_path,async_func_set) if __name__ == '__main__': diff --git a/serwo/scripts/azure/map_update.py b/serwo/scripts/azure/map_update.py new file mode 100644 index 0000000..4cd2d8f --- /dev/null +++ b/serwo/scripts/azure/map_update.py @@ -0,0 +1,120 @@ +import re +import os + + +# def serwolist_to_obj(var): +# list = var.get_objects() +# # matadata=var.get_metadata() +# List = [] +# for obj in list: +# body = obj.get_body() +# List.append(body) +# return list + +def map_cod_gen(lists, name): + first_obj = lists[0] + new_code = 'serwo_to_dict = unmarshall(json.loads(' + \ + first_obj[2] + '))\n' + new_code += 'body = serwo_to_dict.get_body()\n' + new_code += 'obj = body["' + name + '"]\n' + new_code += 'std_meta = serwo_to_dict.get_metadata()\n' + + for i in range(0, len(lists)): + new_code += lists[i][0] + ' = []\n' + new_code += '\nfor i in range(0, len(obj)):\n' + new_code += 'var = {' + '"body": obj[i], "metadata": std_meta }\n' + new_code += 'var = build_serwo_object(var).to_json()\n' + new_code += \ + first_obj[0] + \ + '.append(context.call_activity(' + first_obj[1]+' , var))\n' + + for i in range(1, len(lists)): + l = lists[i] + new_code += \ + l[0]+'.append(context.call_activity(' + \ + l[1]+' , ' + l[2] + '[i]))\n' + + # this is out of for loop + lines = new_code.split('\n') + indented_lines = ['\n\t' + lines[i] for i in range(0,len(lines)-1)] + # print(indented_lines) + + last_obj = lists[-1] + last_code = [] + last_code.append("\n") + last_code.append("\n "+last_obj[0]+' = yield context.task_all('+last_obj[0]+')') + last_code.append("\n "+'body["'+name + \ + '"+"results"] = serwolist_to_obj('+last_obj[0] + ')') + last_code.append('\n var2 = {' + '"body": body, "metadata": std_meta }') + last_code.append("\n "+last_obj[0] + ' = build_serwo_object(var2).to_json()') + last_code.append('\n') + + # print(lines) + indented_lines+=['\n']+last_code + + # print(indented_lines) + return indented_lines + +def extract_var(input_str): + # Define the regular expressions + variable_regex = r'^\s*([a-zA-Z_]\w*)\s*=' + function_call_regex = r'\bcontext\.call_activity\(\s*"([^"]*)"\s*,\s*([^)]*)\)' + + # Extract the variable name and function call arguments + variable_match = re.match(variable_regex, input_str) + variable_name = variable_match.group(1) if variable_match else None + + # Extract function call arguments + function_call_match = re.search(function_call_regex, input_str) + if function_call_match: + first_argument = function_call_match.group(1) + second_argument = function_call_match.group(2) + else: + first_argument = None + second_argument = None + first_argument = '"'+str(first_argument)+'"' + # print("Variable Name:", variable_name) + # print("First Argument:", first_argument) + # print("Second Argument:", second_argument) + return [str(variable_name), first_argument, str(second_argument)] + +class map_update: + def add_graph_node(inpath, outpath, subgraphs): + if subgraphs == None: + return + with open(inpath, 'r') as file_ptr: + lines = file_ptr.readlines() + str = "context.call_activity" + new_lines = [] + for subgraph in subgraphs: + i = 0 + list_subgraphs = [] + # print(len(lines)) + while i Date: Thu, 25 Jul 2024 00:29:43 +0530 Subject: [PATCH 10/10] Adding support for async updates for dynamic poller in orchestrator --- serwo/scripts/azure/orchestrator_async_update.py | 5 ++++- .../templates/azure/predefined-functions/CollectLogs/func.py | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/serwo/scripts/azure/orchestrator_async_update.py b/serwo/scripts/azure/orchestrator_async_update.py index f24f006..2a73a67 100644 --- a/serwo/scripts/azure/orchestrator_async_update.py +++ b/serwo/scripts/azure/orchestrator_async_update.py @@ -42,7 +42,10 @@ def async_codegen(in_var,out_var,func_name): new_code +='\n\tif not body["Poll"] or body["Poll"]==False: ' new_code +='\n\t\tbreak ' new_code +='\n\telse:' - new_code +='\n\t\tdeadline = context.current_utc_datetime + timedelta(seconds=900) ' + new_code +='\n\t\tdelta = 900 ' + new_code +='\n\t\tif "waittime" in body : ' + new_code +='\n\t\t\tdelta = body["waittime"] ' + new_code +='\n\t\tdeadline = context.current_utc_datetime + timedelta(seconds=delta) ' new_code +='\n\t\tyield context.create_timer(deadline)\n\n' return new_code diff --git a/serwo/templates/azure/predefined-functions/CollectLogs/func.py b/serwo/templates/azure/predefined-functions/CollectLogs/func.py index d307c1b..8509070 100644 --- a/serwo/templates/azure/predefined-functions/CollectLogs/func.py +++ b/serwo/templates/azure/predefined-functions/CollectLogs/func.py @@ -1,6 +1,6 @@ from azure.storage.queue import QueueClient import json -from python.src.utils.classes.commons.serwo_objects import SerWOObject +from .python.src.utils.classes.commons.serwo_objects import SerWOObject # import os, uuid import logging # import boto3