Skip to content
This repository was archived by the owner on Mar 13, 2020. It is now read-only.

Commit 27e6d34

Browse files
committed
progress commit
1 parent b8b4f7d commit 27e6d34

File tree

3 files changed

+104
-2
lines changed

3 files changed

+104
-2
lines changed
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
import logging
2+
import pandas
3+
import json
4+
import boto3
5+
6+
from rdl.data_sources.ChangeTrackingInfo import ChangeTrackingInfo
7+
from rdl.data_sources.SourceTableInfo import SourceTableInfo
8+
from rdl.shared import Providers
9+
from rdl.shared.Utils import prevent_senstive_data_logging
10+
11+
12+
class AWSLambdaDataSource(object):
13+
CONNECTION_STRING_PREFIX = 'aws-lambda-arn://' # 'aws-lambda://tenant=543_dc2;arn=123456789012:function:my-function;'
14+
15+
def __init__(self, connection_string, logger=None):
16+
self.logger = logger or logging.getLogger(__name__)
17+
self.connection_string = connection_string
18+
self.aws_lambda_client = boto3.client('lambda')
19+
20+
@staticmethod
21+
def can_handle_connection_string(connection_string):
22+
return connection_string.startswith(AWSLambdaDataSource.CONNECTION_STRING_PREFIX)
23+
24+
@staticmethod
25+
def connection_string_prefix():
26+
return AWSLambdaDataSource.CONNECTION_STRING_PREFIX
27+
28+
def get_table_info(self, table_config, last_known_sync_version):
29+
column_names, last_sync_version, sync_version, full_refresh_required, data_changed_since_last_sync = \
30+
self.__get_table_info(table_config, last_known_sync_version)
31+
columns_in_database = column_names
32+
change_tracking_info = ChangeTrackingInfo(
33+
last_sync_version=last_sync_version,
34+
sync_version=sync_version + 1,
35+
force_full_load=full_refresh_required,
36+
data_changed_since_last_sync=data_changed_since_last_sync)
37+
source_table_info = SourceTableInfo(columns_in_database, change_tracking_info)
38+
return source_table_info
39+
40+
@prevent_senstive_data_logging
41+
def get_table_data_frame(self, table_config, columns, batch_config, batch_tracker, batch_key_tracker,
42+
full_refresh, change_tracking_info):
43+
self.logger.debug(f"Starting read data from lambda.. : \n{None}")
44+
column_names, data = self.__get_table_data(table_config, batch_config, change_tracking_info, full_refresh, columns, batch_key_tracker)
45+
self.logger.debug(f"Finished read data from lambda.. : \n{None}")
46+
# should we log size of data extracted?
47+
data_frame = pandas.DataFrame(data=data, columns=column_names)
48+
batch_tracker.extract_completed_successfully(len(data_frame))
49+
return data_frame
50+
51+
def __get_table_info(self, table_config, last_known_sync_version):
52+
pay_load = {
53+
"command": "GetTableInfo",
54+
"tenantId": 543, # self.connection_string.tenant.split('_')[0] as int
55+
"table": {
56+
"schema": table_config['schema'],
57+
"name": table_config['name']
58+
},
59+
"commandPayload": {
60+
"lastSyncVersion": last_known_sync_version,
61+
}
62+
}
63+
64+
result = self.__invoke_lambda(pay_load)
65+
66+
return result['ColumnNames'], result['Data']
67+
68+
def __get_table_data(self, table_config, batch_config, change_tracking_info, full_refresh, columns, batch_key_tracker):
69+
pay_load = {
70+
"command": "GetTableData",
71+
"tenantId": 543, # self.connection_string.tenant.split('_')[0] as int
72+
"table": {
73+
"schema": table_config['schema'],
74+
"name": table_config['name']
75+
},
76+
"commandPayload": {
77+
"auditColumnNameForChangeVersion": Providers.AuditColumnsNames.CHANGE_VERSION,
78+
"auditColumnNameForDeletionFlag": Providers.AuditColumnsNames.IS_DELETED,
79+
"batchSize": batch_config['size'],
80+
"lastSyncVersion": change_tracking_info.last_sync_version,
81+
"fullRefresh": full_refresh,
82+
"columnNames": columns,
83+
"primaryKeyColumnNames": table_config['primary_keys'],
84+
"lastBatchPrimaryKeys": [{k: v} for k, v in batch_key_tracker.bookmarks.items()]
85+
}
86+
}
87+
88+
result = self.__invoke_lambda(pay_load)
89+
90+
return result['ColumnNames'], result['Data']
91+
92+
def __invoke_lambda(self, pay_load):
93+
lambda_response = self.aws_lambda_client.invoke(
94+
FunctionName='string', # self.connection_string.arn
95+
InvocationType='RequestResponse',
96+
LogType='None', # |'Tail', Set to Tail to include the execution log in the response
97+
Payload=json.dump(pay_load).encode()
98+
)
99+
result = json.loads(lambda_response.Payload.read()) # .decode()
100+
return result
101+

rdl/data_sources/DataSourceFactory.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
import logging
22
from rdl.data_sources.MsSqlDataSource import MsSqlDataSource
3-
3+
from rdl.data_sources.AWSLambdaDataSource import AWSLambdaDataSource
44

55
class DataSourceFactory(object):
66

77
def __init__(self, logger=None):
88
self.logger = logger or logging.getLogger(__name__)
9-
self.sources = [MsSqlDataSource]
9+
self.sources = [MsSqlDataSource, AWSLambdaDataSource]
1010

1111
def create_source(self, connection_string):
1212
for source in self.sources:

setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
'SQLAlchemy==1.3.3',
1515
'sqlalchemy-citext==1.3.post0',
1616
'alembic==1.0.9',
17+
'boto3==1.9.187',
1718
],
1819
package_data={
1920
'': ['alembic.ini', 'alembic/*.py', 'alembic/**/*.py'],

0 commit comments

Comments
 (0)