|
11 | 11 |
|
12 | 12 | class AWSLambdaDataSource(object): |
13 | 13 | # 'aws-lambda://tenant=543_dc2;function=123456789012:function:my-function;' |
14 | | - CONNECTION_STRING_PREFIX = 'aws-lambda://' |
15 | | - CONNECTION_STRING_GROUP_SEPARATOR = ';' |
16 | | - CONNECTION_STRING_KEY_VALUE_SEPARATOR = '=' |
| 14 | + CONNECTION_STRING_PREFIX = "aws-lambda://" |
| 15 | + CONNECTION_STRING_GROUP_SEPARATOR = ";" |
| 16 | + CONNECTION_STRING_KEY_VALUE_SEPARATOR = "=" |
17 | 17 |
|
18 | 18 | def __init__(self, connection_string, logger=None): |
19 | 19 | self.logger = logger or logging.getLogger(__name__) |
20 | 20 | if not AWSLambdaDataSource.can_handle_connection_string(connection_string): |
21 | 21 | raise ValueError(connection_string) |
22 | 22 | self.connection_string = connection_string |
23 | | - self.connection_data = dict(kv.split(AWSLambdaDataSource.CONNECTION_STRING_KEY_VALUE_SEPARATOR) for kv in |
24 | | - self.connection_string |
25 | | - .lstrip(AWSLambdaDataSource.CONNECTION_STRING_PREFIX) |
26 | | - .rstrip(AWSLambdaDataSource.CONNECTION_STRING_GROUP_SEPARATOR) |
27 | | - .split(AWSLambdaDataSource.CONNECTION_STRING_GROUP_SEPARATOR)) |
28 | | - self.aws_lambda_client = boto3.client('lambda') |
| 23 | + self.connection_data = dict( |
| 24 | + kv.split(AWSLambdaDataSource.CONNECTION_STRING_KEY_VALUE_SEPARATOR) |
| 25 | + for kv in self.connection_string.lstrip( |
| 26 | + AWSLambdaDataSource.CONNECTION_STRING_PREFIX |
| 27 | + ) |
| 28 | + .rstrip(AWSLambdaDataSource.CONNECTION_STRING_GROUP_SEPARATOR) |
| 29 | + .split(AWSLambdaDataSource.CONNECTION_STRING_GROUP_SEPARATOR) |
| 30 | + ) |
| 31 | + self.aws_lambda_client = boto3.client("lambda") |
29 | 32 |
|
30 | 33 | @staticmethod |
31 | 34 | def can_handle_connection_string(connection_string): |
32 | | - return connection_string.startswith(AWSLambdaDataSource.CONNECTION_STRING_PREFIX) |
| 35 | + return connection_string.startswith( |
| 36 | + AWSLambdaDataSource.CONNECTION_STRING_PREFIX |
| 37 | + ) and len(connection_string) != len( |
| 38 | + AWSLambdaDataSource.CONNECTION_STRING_PREFIX |
| 39 | + ) |
33 | 40 |
|
34 | 41 | @staticmethod |
35 | | - def connection_string_prefix(): |
| 42 | + def get_connection_string_prefix(): |
36 | 43 | return AWSLambdaDataSource.CONNECTION_STRING_PREFIX |
37 | 44 |
|
38 | 45 | def get_table_info(self, table_config, last_known_sync_version): |
39 | | - column_names, last_sync_version, sync_version, full_refresh_required, data_changed_since_last_sync = \ |
40 | | - self.__get_table_info(table_config, last_known_sync_version) |
| 46 | + column_names, last_sync_version, sync_version, full_refresh_required, data_changed_since_last_sync = self.__get_table_info( |
| 47 | + table_config, last_known_sync_version |
| 48 | + ) |
41 | 49 | columns_in_database = column_names |
42 | 50 | change_tracking_info = ChangeTrackingInfo( |
43 | 51 | last_sync_version=last_sync_version, |
44 | 52 | sync_version=sync_version, |
45 | 53 | force_full_load=full_refresh_required, |
46 | | - data_changed_since_last_sync=data_changed_since_last_sync) |
| 54 | + data_changed_since_last_sync=data_changed_since_last_sync, |
| 55 | + ) |
47 | 56 | source_table_info = SourceTableInfo(columns_in_database, change_tracking_info) |
48 | 57 | return source_table_info |
49 | 58 |
|
50 | 59 | @prevent_senstive_data_logging |
51 | | - def get_table_data_frame(self, table_config, columns, batch_config, batch_tracker, batch_key_tracker, |
52 | | - full_refresh, change_tracking_info): |
| 60 | + def get_table_data_frame( |
| 61 | + self, |
| 62 | + table_config, |
| 63 | + columns, |
| 64 | + batch_config, |
| 65 | + batch_tracker, |
| 66 | + batch_key_tracker, |
| 67 | + full_refresh, |
| 68 | + change_tracking_info, |
| 69 | + ): |
53 | 70 | self.logger.debug(f"Starting read data from lambda.. : \n{None}") |
54 | | - column_names, data = self.__get_table_data(table_config, batch_config, change_tracking_info, full_refresh, columns, batch_key_tracker) |
| 71 | + column_names, data = self.__get_table_data( |
| 72 | + table_config, |
| 73 | + batch_config, |
| 74 | + change_tracking_info, |
| 75 | + full_refresh, |
| 76 | + columns, |
| 77 | + batch_key_tracker, |
| 78 | + ) |
55 | 79 | self.logger.debug(f"Finished read data from lambda.. : \n{None}") |
56 | 80 | # should we log size of data extracted? |
57 | | - data_frame = pandas.DataFrame(data=data, columns=column_names) |
| 81 | + data_frame = self.__get_data_frame(data, column_names) |
58 | 82 | batch_tracker.extract_completed_successfully(len(data_frame)) |
59 | 83 | return data_frame |
60 | 84 |
|
61 | 85 | def __get_table_info(self, table_config, last_known_sync_version): |
62 | 86 | pay_load = { |
63 | 87 | "command": "GetTableInfo", |
64 | | - "tenantId": self.connection_data['tenant'], |
65 | | - "table": { |
66 | | - "schema": table_config['schema'], |
67 | | - "name": table_config['name'] |
68 | | - }, |
69 | | - "commandPayload": { |
70 | | - "lastSyncVersion": last_known_sync_version, |
71 | | - } |
| 88 | + "tenantId": self.connection_data["tenant"], |
| 89 | + "table": {"schema": table_config["schema"], "name": table_config["name"]}, |
| 90 | + "commandPayload": {"lastSyncVersion": last_known_sync_version}, |
72 | 91 | } |
73 | 92 |
|
74 | 93 | result = self.__invoke_lambda(pay_load) |
75 | 94 |
|
76 | | - return result['ColumnNames'], result['Data'] |
77 | | - |
78 | | - def __get_table_data(self, table_config, batch_config, change_tracking_info, full_refresh, columns, batch_key_tracker): |
| 95 | + return result["ColumnNames"], result["Data"] |
| 96 | + |
| 97 | + def __get_table_data( |
| 98 | + self, |
| 99 | + table_config, |
| 100 | + batch_config, |
| 101 | + change_tracking_info, |
| 102 | + full_refresh, |
| 103 | + columns, |
| 104 | + batch_key_tracker, |
| 105 | + ): |
79 | 106 | pay_load = { |
80 | 107 | "command": "GetTableData", |
81 | 108 | "tenantId": 543, # self.connection_string.tenant.split('_')[0] as int |
82 | | - "table": { |
83 | | - "schema": table_config['schema'], |
84 | | - "name": table_config['name'] |
85 | | - }, |
| 109 | + "table": {"schema": table_config["schema"], "name": table_config["name"]}, |
86 | 110 | "commandPayload": { |
87 | 111 | "auditColumnNameForChangeVersion": Providers.AuditColumnsNames.CHANGE_VERSION, |
88 | 112 | "auditColumnNameForDeletionFlag": Providers.AuditColumnsNames.IS_DELETED, |
89 | | - "batchSize": batch_config['size'], |
| 113 | + "batchSize": batch_config["size"], |
90 | 114 | "lastSyncVersion": change_tracking_info.last_sync_version, |
91 | 115 | "fullRefresh": full_refresh, |
92 | 116 | "columnNames": columns, |
93 | | - "primaryKeyColumnNames": table_config['primary_keys'], |
94 | | - "lastBatchPrimaryKeys": [{k: v} for k, v in batch_key_tracker.bookmarks.items()] |
95 | | - } |
| 117 | + "primaryKeyColumnNames": table_config["primary_keys"], |
| 118 | + "lastBatchPrimaryKeys": [ |
| 119 | + {k: v} for k, v in batch_key_tracker.bookmarks.items() |
| 120 | + ], |
| 121 | + }, |
96 | 122 | } |
97 | 123 |
|
98 | 124 | result = self.__invoke_lambda(pay_load) |
99 | 125 |
|
100 | | - return result['ColumnNames'], result['Data'] |
| 126 | + return result["ColumnNames"], result["Data"] |
| 127 | + |
| 128 | + def __get_data_frame(self, data: [[]], column_names: []): |
| 129 | + return pandas.DataFrame(data=data, columns=column_names) |
101 | 130 |
|
102 | 131 | def __invoke_lambda(self, pay_load): |
103 | 132 | lambda_response = self.aws_lambda_client.invoke( |
104 | | - FunctionName=self.connection_data['function'], |
105 | | - InvocationType='RequestResponse', |
106 | | - LogType='None', # |'Tail', Set to Tail to include the execution log in the response |
107 | | - Payload=json.dump(pay_load).encode() |
| 133 | + FunctionName=self.connection_data["function"], |
| 134 | + InvocationType="RequestResponse", |
| 135 | + LogType="None", # |'Tail', Set to Tail to include the execution log in the response |
| 136 | + Payload=json.dump(pay_load).encode(), |
108 | 137 | ) |
109 | 138 | result = json.loads(lambda_response.Payload.read()) # .decode() |
110 | 139 | return result |
111 | | - |
|
0 commit comments