Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 130 additions & 0 deletions PyRDF.uml
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
<?xml version="1.0" encoding="UTF-8"?>
<Diagram>
<ID>Python</ID>
<OriginalElement>#C:/home/vetch/Engineerka/PyRDF/PyRDF</OriginalElement>
<nodes>
<node x="708.0" y="584.0">PyRDF.Proxy.Proxy</node>
<node x="714.0" y="428.0">ABC</node>
<node x="627.0" y="784.0">PyRDF.Proxy.ActionProxy</node>
<node x="848.0" y="708.0">PyRDF.Proxy.TransformationProxy</node>
<node x="974.0" y="193.0">PyRDF.backend.Local.Local</node>
<node x="596.0" y="78.0">PyRDF.CallableGenerator.CallableGenerator</node>
<node x="341.0" y="0.0">PyRDF.RDataFrame.HeadNode</node>
<node x="1091.0" y="451.0">PyRDF.backend.Dist.Dist</node>
<node x="1284.0" y="342.0">PyRDF.backend.AWS.AWS</node>
<node x="536.0" y="514.0">PyRDF.backend.Utils.Utils</node>
<node x="413.0" y="335.0">PyRDF.backend.Dist.Range</node>
<node x="455.0" y="599.0">PyRDF.Operation.Operation</node>
<node x="0.0" y="700.0">PyRDF.RDataFrame.RDataFrameException</node>
<node x="280.0" y="560.0">Exception</node>
<node x="1225.0" y="632.0">PyRDF.backend.Spark.Spark</node>
<node x="494.0" y="173.0">PyRDF.Node.Node</node>
<node x="414.0" y="444.0">BaseException</node>
<node x="809.0" y="415.0">PyRDF.backend.Backend.Backend</node>
<node x="659.0" y="331.0">PyRDF.RDataFrame.RDataFrame</node>
<node x="690.0" y="261.0">typing.Hashable</node>
<node x="277.0" y="265.0">PyRDF.backend.Dist.FriendInfo</node>
<node x="597.0" y="405.0">object</node>
</nodes>
<notes />
<edges>
<edge source="PyRDF.Proxy.ActionProxy" target="PyRDF.Proxy.Proxy">
<point x="0.0" y="0.0" />
<point x="0.0" y="0.0" />
</edge>
<edge source="PyRDF.backend.Utils.Utils" target="object">
<point x="0.0" y="0.0" />
<point x="0.0" y="0.0" />
</edge>
<edge source="ABC" target="object">
<point x="0.0" y="0.0" />
<point x="0.0" y="0.0" />
</edge>
<edge source="Exception" target="BaseException">
<point x="0.0" y="0.0" />
<point x="0.0" y="0.0" />
</edge>
<edge source="PyRDF.backend.Spark.Spark" target="PyRDF.backend.Dist.Dist">
<point x="0.0" y="0.0" />
<point x="0.0" y="0.0" />
</edge>
<edge source="PyRDF.CallableGenerator.CallableGenerator" target="object">
<point x="0.0" y="0.0" />
<point x="0.0" y="0.0" />
</edge>
<edge source="PyRDF.RDataFrame.RDataFrameException" target="Exception">
<point x="0.0" y="0.0" />
<point x="0.0" y="0.0" />
</edge>
<edge source="PyRDF.backend.Dist.Range" target="object">
<point x="0.0" y="0.0" />
<point x="0.0" y="0.0" />
</edge>
<edge source="PyRDF.backend.Dist.FriendInfo" target="object">
<point x="0.0" y="0.0" />
<point x="0.0" y="0.0" />
</edge>
<edge source="PyRDF.backend.Dist.Dist" target="PyRDF.backend.Backend.Backend">
<point x="0.0" y="0.0" />
<point x="0.0" y="0.0" />
</edge>
<edge source="PyRDF.RDataFrame.RDataFrame" target="object">
<point x="0.0" y="0.0" />
<point x="0.0" y="0.0" />
</edge>
<edge source="PyRDF.Node.Node" target="object">
<point x="0.0" y="0.0" />
<point x="0.0" y="0.0" />
</edge>
<edge source="PyRDF.Proxy.Proxy" target="object">
<point x="0.0" y="0.0" />
<point x="0.0" y="0.0" />
</edge>
<edge source="PyRDF.RDataFrame.HeadNode" target="PyRDF.Node.Node">
<point x="0.0" y="0.0" />
<point x="0.0" y="0.0" />
</edge>
<edge source="PyRDF.Proxy.Proxy" target="ABC">
<point x="0.0" y="0.0" />
<point x="0.0" y="0.0" />
</edge>
<edge source="PyRDF.backend.Backend.Backend" target="ABC">
<point x="0.0" y="0.0" />
<point x="0.0" y="0.0" />
</edge>
<edge source="PyRDF.backend.Local.Local" target="PyRDF.backend.Backend.Backend">
<point x="0.0" y="0.0" />
<point x="0.0" y="0.0" />
</edge>
<edge source="PyRDF.backend.Backend.Backend" target="object">
<point x="0.0" y="0.0" />
<point x="0.0" y="0.0" />
</edge>
<edge source="object" target="typing.Hashable">
<point x="0.0" y="0.0" />
<point x="0.0" y="0.0" />
</edge>
<edge source="BaseException" target="object">
<point x="0.0" y="0.0" />
<point x="0.0" y="0.0" />
</edge>
<edge source="PyRDF.Operation.Operation" target="object">
<point x="0.0" y="0.0" />
<point x="0.0" y="0.0" />
</edge>
<edge source="PyRDF.Proxy.TransformationProxy" target="PyRDF.Proxy.Proxy">
<point x="0.0" y="0.0" />
<point x="0.0" y="0.0" />
</edge>
<edge source="PyRDF.backend.AWS.AWS" target="PyRDF.backend.Dist.Dist">
<point x="0.0" y="0.0" />
<point x="0.0" y="0.0" />
</edge>
</edges>
<settings layout="Orthogonal" zoom="0.8499399759903962" x="734.0" y="406.5" />
<SelectedNodes>
<node>PyRDF.Proxy.Proxy</node>
</SelectedNodes>
<Categories />
</Diagram>

3 changes: 3 additions & 0 deletions PyRDF/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ def use(backend_name, conf={}):
elif backend_name == "spark":
from PyRDF.backend.Spark import Spark
current_backend = Spark(conf)
elif backend_name == "AWS":
from PyRDF.backend.AWS import AWS
current_backend = AWS(conf)
else:
msg = "Incorrect backend environment \"{}\"".format(backend_name)
raise Exception(msg)
Expand Down
162 changes: 162 additions & 0 deletions PyRDF/backend/AWS.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
from __future__ import print_function

import base64
import json
import logging
import time

import boto3
import cloudpickle as pickle

from PyRDF.backend.Dist import Dist


class AWS(Dist):
"""
Backend that executes the computational graph using using AWS Lambda
for distributed execution.
"""

MIN_NPARTITIONS = 2

def __init__(self, config={}):
"""
Config for AWS is same as in Dist backend,
more support will be added in future.
"""
super(AWS, self).__init__(config)
self.logger = logging.getLogger()
self.npartitions = self._get_partitions()
self.region = config.get('region') or 'us-east-1'

def _get_partitions(self):
return int(self.npartitions or AWS.MIN_NPARTITIONS)

def ProcessAndMerge(self, mapper, reducer):
"""
Performs map-reduce using AWS Lambda.

Args:
mapper (function): A function that runs the computational graph
and returns a list of values.

reducer (function): A function that merges two lists that were
returned by the mapper.

Returns:
list: A list representing the values of action nodes returned
after computation (Map-Reduce).
"""

ranges = self.build_ranges()

def encode_object(object_to_encode) -> str:
return str(base64.b64encode(pickle.dumps(object_to_encode)))

# Make mapper and reducer transferable
pickled_mapper = encode_object(mapper)
pickled_reducer = encode_object(reducer)

# Setup AWS clients
s3_resource = boto3.resource('s3', region_name=self.region)
s3_client = boto3.client('s3', region_name=self.region)
lambda_client = boto3.client('lambda', region_name=self.region)
ssm_client = boto3.client('ssm', region_name=self.region)

# Check for existence of infrastructure
s3_output_bucket = ssm_client.get_parameter(Name='output_bucket')['Parameter']['Value']
if not s3_output_bucket:
self.logger.info('AWS backend not initialized!')
return False

ssm_client.put_parameter(
Name='ranges_num',
Type='String',
Value=str(len(ranges)),
Overwrite=True
)

ssm_client.put_parameter(
Name='reducer',
Type='String',
Value=str(pickled_reducer),
Overwrite=True
)

def invoke_root_lambda(client, root_range, script):
payload = json.dumps({
'range': encode_object(root_range),
'script': script,
'start': str(root_range.start),
'end': str(root_range.end),
'filelist': str(root_range.filelist),
'friend_info': encode_object(root_range.friend_info)
})
return client.invoke(
FunctionName='root_lambda',
InvocationType='Event',
Payload=bytes(payload, encoding='utf8')
)

invoke_begin = time.time()
# Invoke workers with ranges and mapper
call_results = []
for root_range in ranges:
call_result = invoke_root_lambda(lambda_client, root_range, pickled_mapper)
call_results.append(call_result)

wait_begin = time.time()

# while True:
# results = s3.list_objects_v2(Bucket=s3_output_bucket, Prefix='out.pickle')
# if results['KeyCount'] > 0:
# break
# self.logger.debug("still waiting")
# time.sleep(1)
# result = s3.get_object(s3_output_bucket, 'out.pickle')

processing_bucket = ssm_client.get_parameter(Name='processing_bucket')['Parameter']['Value']

# Wait until all lambdas finished execution
while True:
results = s3_client.list_objects_v2(Bucket=processing_bucket)
if results['KeyCount'] == len(ranges):
break
self.logger.info(f'Lambdas finished: {results["KeyCount"]}')
time.sleep(1)

reduce_begin = time.time()
# Get names of output files, download and reduce them
filenames = s3_client.list_objects_v2(Bucket=processing_bucket)['Contents']

# need better way to do that
accumulator = pickle.loads(s3_client.get_object(
Bucket=processing_bucket,
Key=filenames[0]['Key']
)['Body'].read())

for filename in filenames[1:]:
file = pickle.loads(s3_client.get_object(
Bucket=processing_bucket,
Key=filename['Key']
)['Body'].read())
accumulator = reducer(accumulator, file)

# Clean up intermediate objects after we're done
s3_resource.Bucket(processing_bucket).objects.all().delete()

bench = (
len(ranges),
wait_begin-invoke_begin,
reduce_begin-wait_begin,
time.time()-reduce_begin
)

print(bench)

return accumulator
# reduced_output = pickle.loads(result)
# return reduced_output

def distribute_files(self, includes_list):
pass
3 changes: 1 addition & 2 deletions PyRDF/backend/Dist.py
Original file line number Diff line number Diff line change
Expand Up @@ -647,8 +647,7 @@ def reducer(values_list1, values_list2):
# Sum() always returns a float in python
values_list1[i] += values_list2[i]

elif (isinstance(values_list1[i], int) or
isinstance(values_list1[i], long)): # noqa: Python 2
elif (isinstance(values_list1[i], int)): # noqa: Python 2
# Adding values resulting from a Count() operation
values_list1[i] += values_list2[i]

Expand Down
17 changes: 17 additions & 0 deletions repair.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#!/bin/sh

git filter-branch --env-filter '
OLD_EMAIL="jaku@grapeup.com"
CORRECT_NAME="Jacek Kuśnierz"
CORRECT_EMAIL="kusnierz@protonmail.com"
if [ "$GIT_COMMITTER_EMAIL" = "$OLD_EMAIL" ]
then
export GIT_COMMITTER_NAME="$CORRECT_NAME"
export GIT_COMMITTER_EMAIL="$CORRECT_EMAIL"
fi
if [ "$GIT_AUTHOR_EMAIL" = "$OLD_EMAIL" ]
then
export GIT_AUTHOR_NAME="$CORRECT_NAME"
export GIT_AUTHOR_EMAIL="$CORRECT_EMAIL"
fi
' --tag-name-filter cat -- --branches --tags
2 changes: 2 additions & 0 deletions requirements.aws.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
boto3
cloudpickle
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
enum34 ; python_version < '3.4' # Requires pip 6.0 and later
nose
nose
numpy