From 3115845417c23ca6e8baf385639b1193848a16f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jacek=20Ku=C5=9Bnierz?= Date: Mon, 18 Jan 2021 10:18:49 +0100 Subject: [PATCH 1/6] AWS backend --- PyRDF/__init__.py | 3 + PyRDF/backend/AWS.py | 147 ++++++++++++++++++++++++++++++++++++++++++ PyRDF/backend/Dist.py | 3 +- requirements.aws.txt | 2 + requirements.txt | 3 +- 5 files changed, 155 insertions(+), 3 deletions(-) create mode 100644 PyRDF/backend/AWS.py create mode 100644 requirements.aws.txt diff --git a/PyRDF/__init__.py b/PyRDF/__init__.py index 64ef292..c61e5b4 100644 --- a/PyRDF/__init__.py +++ b/PyRDF/__init__.py @@ -64,6 +64,9 @@ def use(backend_name, conf={}): elif backend_name == "spark": from PyRDF.backend.Spark import Spark current_backend = Spark(conf) + elif backend_name == "AWS": + from PyRDF.backend.AWS import AWS + current_backend = AWS(conf) else: msg = "Incorrect backend environment \"{}\"".format(backend_name) raise Exception(msg) diff --git a/PyRDF/backend/AWS.py b/PyRDF/backend/AWS.py new file mode 100644 index 0000000..b26129b --- /dev/null +++ b/PyRDF/backend/AWS.py @@ -0,0 +1,147 @@ +from __future__ import print_function + +import base64 +import json +import logging +import time + +import boto3 +import cloudpickle as pickle + +from PyRDF.backend.Dist import Dist + + +class AWS(Dist): + """ + Backend that executes the computational graph using using AWS Lambda + for distributed execution. + """ + + MIN_NPARTITIONS = 2 + + def __init__(self, config={}): + """ + Config for AWS is same as in Dist backend, + more support will be added in future. + """ + super(AWS, self).__init__(config) + self.logger = logging.getLogger() + self.npartitions = self._get_partitions() + self.region = config.get('region') or 'us-east-1' + + def _get_partitions(self): + return int(self.npartitions or AWS.MIN_NPARTITIONS) + + def ProcessAndMerge(self, mapper, reducer): + """ + Performs map-reduce using AWS Lambda. + + Args: + mapper (function): A function that runs the computational graph + and returns a list of values. + + reducer (function): A function that merges two lists that were + returned by the mapper. + + Returns: + list: A list representing the values of action nodes returned + after computation (Map-Reduce). + """ + + ranges = self.build_ranges() + + def encode_object(object_to_encode) -> str: + return str(base64.b64encode(pickle.dumps(object_to_encode))) + + # Make mapper and reducer transferable + pickled_mapper = encode_object(mapper) + pickled_reducer = encode_object(reducer) + + # Setup AWS clients + s3_resource = boto3.resource('s3', region_name=self.region) + s3_client = boto3.client('s3', region_name=self.region) + lambda_client = boto3.client('lambda', region_name=self.region) + ssm_client = boto3.client('ssm', region_name=self.region) + + # Check for existence of infrastructure + s3_output_bucket = ssm_client.get_parameter(Name='output_bucket')['Parameter']['Value'] + if not s3_output_bucket: + self.logger.info('AWS backend not initialized!') + return False + + ssm_client.put_parameter( + Name='ranges_num', + Type='String', + Value=str(len(ranges)), + Overwrite=True + ) + + ssm_client.put_parameter( + Name='reducer', + Type='String', + Value=str(pickled_reducer), + Overwrite=True + ) + + def invoke_root_lambda(client, root_range, script): + payload = json.dumps({ + 'range': encode_object(root_range), + 'script': script, + 'start': str(root_range.start), + 'end': str(root_range.end) + }) + return client.invoke( + FunctionName='root_lambda', + InvocationType='Event', + Payload=bytes(payload, encoding='utf8') + ) + + # Invoke workers with ranges and mapper + call_results = [] + for root_range in ranges: + call_result = invoke_root_lambda(lambda_client, root_range, pickled_mapper) + call_results.append(call_result) + + # while True: + # results = s3.list_objects_v2(Bucket=s3_output_bucket, Prefix='out.pickle') + # if results['KeyCount'] > 0: + # break + # self.logger.debug("still waiting") + # time.sleep(1) + # result = s3.get_object(s3_output_bucket, 'out.pickle') + + processing_bucket = ssm_client.get_parameter(Name='processing_bucket')['Parameter']['Value'] + + # Wait until all lambdas finished execution + while True: + results = s3_client.list_objects_v2(Bucket=processing_bucket) + if results['KeyCount'] == len(ranges): + break + self.logger.debug(f'Lambdas finished: {results["KeyCount"]}') + time.sleep(1) + + # Get names of output files, download and reduce them + filenames = s3_client.list_objects_v2(Bucket=processing_bucket)['Contents'] + + # need better way to do that + accumulator = pickle.loads(s3_client.get_object( + Bucket=processing_bucket, + Key=filenames[0]['Key'] + )['Body'].read()) + + for filename in filenames[1:]: + file = pickle.loads(s3_client.get_object( + Bucket=processing_bucket, + Key=filename['Key'] + )['Body'].read()) + accumulator = reducer(accumulator, file) + + # Clean up intermediate objects after we're done + s3_resource.Bucket(processing_bucket).objects.all().delete() + + return accumulator + # reduced_output = pickle.loads(result) + # return reduced_output + + def distribute_files(self, includes_list): + pass diff --git a/PyRDF/backend/Dist.py b/PyRDF/backend/Dist.py index 28031a1..a2b3774 100644 --- a/PyRDF/backend/Dist.py +++ b/PyRDF/backend/Dist.py @@ -647,8 +647,7 @@ def reducer(values_list1, values_list2): # Sum() always returns a float in python values_list1[i] += values_list2[i] - elif (isinstance(values_list1[i], int) or - isinstance(values_list1[i], long)): # noqa: Python 2 + elif (isinstance(values_list1[i], int)): # noqa: Python 2 # Adding values resulting from a Count() operation values_list1[i] += values_list2[i] diff --git a/requirements.aws.txt b/requirements.aws.txt new file mode 100644 index 0000000..6773bb3 --- /dev/null +++ b/requirements.aws.txt @@ -0,0 +1,2 @@ +boto3 +cloudpickle diff --git a/requirements.txt b/requirements.txt index e51dcbe..67d2e3f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ enum34 ; python_version < '3.4' # Requires pip 6.0 and later -nose \ No newline at end of file +nose +numpy \ No newline at end of file From df70d6b073bbfd10d9c24ad5ed66a31b6973c17f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jacek=20Ku=C5=9Bnierz?= Date: Thu, 25 Feb 2021 11:08:16 +0100 Subject: [PATCH 2/6] make it log --- PyRDF/backend/AWS.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/PyRDF/backend/AWS.py b/PyRDF/backend/AWS.py index b26129b..0521d28 100644 --- a/PyRDF/backend/AWS.py +++ b/PyRDF/backend/AWS.py @@ -117,7 +117,7 @@ def invoke_root_lambda(client, root_range, script): results = s3_client.list_objects_v2(Bucket=processing_bucket) if results['KeyCount'] == len(ranges): break - self.logger.debug(f'Lambdas finished: {results["KeyCount"]}') + self.logger.info(f'Lambdas finished: {results["KeyCount"]}') time.sleep(1) # Get names of output files, download and reduce them From 1c526592d6ee5f3f55aba9945fb3b0cf3b2d49b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jacek=20Ku=C5=9Bnierz?= Date: Thu, 25 Feb 2021 15:09:18 +0100 Subject: [PATCH 3/6] small change --- PyRDF.uml | 130 +++++++++++++++++++++++++++++++++++++++++++ PyRDF/backend/AWS.py | 3 +- repair.sh | 17 ++++++ 3 files changed, 149 insertions(+), 1 deletion(-) create mode 100644 PyRDF.uml create mode 100644 repair.sh diff --git a/PyRDF.uml b/PyRDF.uml new file mode 100644 index 0000000..fc50d94 --- /dev/null +++ b/PyRDF.uml @@ -0,0 +1,130 @@ + + + Python + #C:/home/vetch/Engineerka/PyRDF/PyRDF + + PyRDF.Proxy.Proxy + ABC + PyRDF.Proxy.ActionProxy + PyRDF.Proxy.TransformationProxy + PyRDF.backend.Local.Local + PyRDF.CallableGenerator.CallableGenerator + PyRDF.RDataFrame.HeadNode + PyRDF.backend.Dist.Dist + PyRDF.backend.AWS.AWS + PyRDF.backend.Utils.Utils + PyRDF.backend.Dist.Range + PyRDF.Operation.Operation + PyRDF.RDataFrame.RDataFrameException + Exception + PyRDF.backend.Spark.Spark + PyRDF.Node.Node + BaseException + PyRDF.backend.Backend.Backend + PyRDF.RDataFrame.RDataFrame + typing.Hashable + PyRDF.backend.Dist.FriendInfo + object + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + PyRDF.Proxy.Proxy + + + + diff --git a/PyRDF/backend/AWS.py b/PyRDF/backend/AWS.py index 0521d28..451e340 100644 --- a/PyRDF/backend/AWS.py +++ b/PyRDF/backend/AWS.py @@ -88,7 +88,8 @@ def invoke_root_lambda(client, root_range, script): 'range': encode_object(root_range), 'script': script, 'start': str(root_range.start), - 'end': str(root_range.end) + 'end': str(root_range.end), + 'filelist': str(root_range.filelist) }) return client.invoke( FunctionName='root_lambda', diff --git a/repair.sh b/repair.sh new file mode 100644 index 0000000..2be395c --- /dev/null +++ b/repair.sh @@ -0,0 +1,17 @@ +#!/bin/sh + +git filter-branch --env-filter ' +OLD_EMAIL="jaku@grapeup.com" +CORRECT_NAME="Jacek KuĊ›nierz" +CORRECT_EMAIL="kusnierz@protonmail.com" +if [ "$GIT_COMMITTER_EMAIL" = "$OLD_EMAIL" ] +then + export GIT_COMMITTER_NAME="$CORRECT_NAME" + export GIT_COMMITTER_EMAIL="$CORRECT_EMAIL" +fi +if [ "$GIT_AUTHOR_EMAIL" = "$OLD_EMAIL" ] +then + export GIT_AUTHOR_NAME="$CORRECT_NAME" + export GIT_AUTHOR_EMAIL="$CORRECT_EMAIL" +fi +' --tag-name-filter cat -- --branches --tags From fc9fbf8513490c2d42b92926ffc5da092177514c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jacek=20Ku=C5=9Bnierz?= Date: Thu, 25 Feb 2021 15:13:38 +0100 Subject: [PATCH 4/6] another --- PyRDF/backend/AWS.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/PyRDF/backend/AWS.py b/PyRDF/backend/AWS.py index 451e340..97dc603 100644 --- a/PyRDF/backend/AWS.py +++ b/PyRDF/backend/AWS.py @@ -89,7 +89,8 @@ def invoke_root_lambda(client, root_range, script): 'script': script, 'start': str(root_range.start), 'end': str(root_range.end), - 'filelist': str(root_range.filelist) + 'filelist': str(root_range.filelist), + 'friend_info': str(root_range.friend_info) }) return client.invoke( FunctionName='root_lambda', From b047f1b033812ee31d5fde7033300b9b448f7dd9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jacek=20Ku=C5=9Bnierz?= Date: Thu, 25 Feb 2021 15:18:12 +0100 Subject: [PATCH 5/6] another one --- PyRDF/backend/AWS.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/PyRDF/backend/AWS.py b/PyRDF/backend/AWS.py index 97dc603..84e7759 100644 --- a/PyRDF/backend/AWS.py +++ b/PyRDF/backend/AWS.py @@ -90,7 +90,7 @@ def invoke_root_lambda(client, root_range, script): 'start': str(root_range.start), 'end': str(root_range.end), 'filelist': str(root_range.filelist), - 'friend_info': str(root_range.friend_info) + 'friend_info': encode_object(root_range.friend_info) }) return client.invoke( FunctionName='root_lambda', From 447631c725ee77af580daab5a1e9f0d958a57464 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jacek=20Ku=C5=9Bnierz?= Date: Tue, 6 Apr 2021 21:33:10 +0200 Subject: [PATCH 6/6] add benchmarking --- PyRDF/backend/AWS.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/PyRDF/backend/AWS.py b/PyRDF/backend/AWS.py index 84e7759..7445671 100644 --- a/PyRDF/backend/AWS.py +++ b/PyRDF/backend/AWS.py @@ -98,12 +98,15 @@ def invoke_root_lambda(client, root_range, script): Payload=bytes(payload, encoding='utf8') ) + invoke_begin = time.time() # Invoke workers with ranges and mapper call_results = [] for root_range in ranges: call_result = invoke_root_lambda(lambda_client, root_range, pickled_mapper) call_results.append(call_result) + wait_begin = time.time() + # while True: # results = s3.list_objects_v2(Bucket=s3_output_bucket, Prefix='out.pickle') # if results['KeyCount'] > 0: @@ -122,6 +125,7 @@ def invoke_root_lambda(client, root_range, script): self.logger.info(f'Lambdas finished: {results["KeyCount"]}') time.sleep(1) + reduce_begin = time.time() # Get names of output files, download and reduce them filenames = s3_client.list_objects_v2(Bucket=processing_bucket)['Contents'] @@ -141,6 +145,15 @@ def invoke_root_lambda(client, root_range, script): # Clean up intermediate objects after we're done s3_resource.Bucket(processing_bucket).objects.all().delete() + bench = ( + len(ranges), + wait_begin-invoke_begin, + reduce_begin-wait_begin, + time.time()-reduce_begin + ) + + print(bench) + return accumulator # reduced_output = pickle.loads(result) # return reduced_output