Skip to content
This repository was archived by the owner on Mar 13, 2020. It is now read-only.

Commit 2efde69

Browse files
committed
(a) fix bug in BatchDataLoader to ensure we save data in python-native types (b) fix payload feed to lambda
1 parent 48832e7 commit 2efde69

File tree

2 files changed

+34
-27
lines changed

2 files changed

+34
-27
lines changed

rdl/BatchDataLoader.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ def load_batch(self, batch_key_tracker):
4646
batch_tracker.load_completed_successfully()
4747

4848
for primary_key in batch_key_tracker.primary_keys:
49-
batch_key_tracker.set_bookmark(primary_key, data_frame.iloc[-1][primary_key])
49+
batch_key_tracker.set_bookmark(primary_key, int(data_frame.iloc[-1][primary_key]))
5050

5151
self.logger.info(f"Batch keys '{batch_key_tracker.bookmarks}' completed. {batch_tracker.get_statistics()}")
5252

rdl/data_sources/AWSLambdaDataSource.py

Lines changed: 33 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,8 @@ def get_connection_string_prefix():
4343
return AWSLambdaDataSource.CONNECTION_STRING_PREFIX
4444

4545
def get_table_info(self, table_config, last_known_sync_version):
46-
column_names, last_sync_version, sync_version, full_refresh_required, data_changed_since_last_sync = self.__get_table_info(
47-
table_config, last_known_sync_version
48-
)
46+
column_names, last_sync_version, sync_version, full_refresh_required, data_changed_since_last_sync \
47+
= self.__get_table_info(table_config, last_known_sync_version)
4948
columns_in_database = column_names
5049
change_tracking_info = ChangeTrackingInfo(
5150
last_sync_version=last_sync_version,
@@ -60,7 +59,7 @@ def get_table_info(self, table_config, last_known_sync_version):
6059
def get_table_data_frame(
6160
self,
6261
table_config,
63-
columns,
62+
columns_config,
6463
batch_config,
6564
batch_tracker,
6665
batch_key_tracker,
@@ -73,7 +72,7 @@ def get_table_data_frame(
7372
batch_config,
7473
change_tracking_info,
7574
full_refresh,
76-
columns,
75+
columns_config,
7776
batch_key_tracker,
7877
)
7978
self.logger.debug(f"Finished read data from lambda.. : \n{None}")
@@ -84,39 +83,43 @@ def get_table_data_frame(
8483

8584
def __get_table_info(self, table_config, last_known_sync_version):
8685
pay_load = {
87-
"command": "GetTableInfo",
88-
"tenantId": self.connection_data["tenant"],
89-
"table": {"schema": table_config["schema"], "name": table_config["name"]},
90-
"commandPayload": {"lastSyncVersion": last_known_sync_version},
86+
"Command": "GetTableInfo",
87+
"TenantId": int(self.connection_data["tenant"]),
88+
"Table": {"Schema": table_config["schema"], "Name": table_config["name"]},
89+
"CommandPayload": {"lastSyncVersion": last_known_sync_version},
9190
}
9291

9392
result = self.__invoke_lambda(pay_load)
9493

95-
return result["ColumnNames"], result["Data"]
94+
return result["ColumnNames"], \
95+
result["LastSyncVersion"], \
96+
result["CurrentSyncVersion"], \
97+
result["FullRefreshRequired"], \
98+
result["DataChangedSinceLastSync"]
9699

97100
def __get_table_data(
98101
self,
99102
table_config,
100103
batch_config,
101104
change_tracking_info,
102105
full_refresh,
103-
columns,
106+
columns_config,
104107
batch_key_tracker,
105108
):
106109
pay_load = {
107-
"command": "GetTableData",
108-
"tenantId": self.connection_data["tenant"],
109-
"table": {"schema": table_config["schema"], "name": table_config["name"]},
110-
"commandPayload": {
111-
"auditColumnNameForChangeVersion": Providers.AuditColumnsNames.CHANGE_VERSION,
112-
"auditColumnNameForDeletionFlag": Providers.AuditColumnsNames.IS_DELETED,
113-
"batchSize": batch_config["size"],
114-
"lastSyncVersion": change_tracking_info.last_sync_version,
115-
"fullRefresh": full_refresh,
116-
"columnNames": columns,
117-
"primaryKeyColumnNames": table_config["primary_keys"],
118-
"lastBatchPrimaryKeys": [
119-
{k: v} for k, v in batch_key_tracker.bookmarks.items()
110+
"Command": "GetTableData",
111+
"TenantId": int(self.connection_data["tenant"]),
112+
"Table": {"Schema": table_config["schema"], "Name": table_config["name"]},
113+
"CommandPayload": {
114+
"AuditColumnNameForChangeVersion": Providers.AuditColumnsNames.CHANGE_VERSION,
115+
"AuditColumnNameForDeletionFlag": Providers.AuditColumnsNames.IS_DELETED,
116+
"BatchSize": batch_config["size"],
117+
"LastSyncVersion": change_tracking_info.last_sync_version,
118+
"FullRefresh": full_refresh,
119+
"ColumnNames": list(map(lambda cfg: cfg['source_name'], columns_config)),
120+
"PrimaryKeyColumnNames": table_config["primary_keys"],
121+
"LastBatchPrimaryKeys": [
122+
{"Key": k, "Value": v} for k, v in batch_key_tracker.bookmarks.items()
120123
],
121124
},
122125
}
@@ -129,11 +132,15 @@ def __get_data_frame(self, data: [[]], column_names: []):
129132
return pandas.DataFrame(data=data, columns=column_names)
130133

131134
def __invoke_lambda(self, pay_load):
135+
self.logger.debug('\nRequest being sent to Lambda:')
136+
self.logger.debug(pay_load)
132137
lambda_response = self.aws_lambda_client.invoke(
133138
FunctionName=self.connection_data["function"],
134139
InvocationType="RequestResponse",
135140
LogType="None", # |'Tail', Set to Tail to include the execution log in the response
136-
Payload=json.dump(pay_load).encode(),
141+
Payload=json.dumps(pay_load).encode(),
137142
)
138-
result = json.loads(lambda_response.Payload.read()) # .decode()
143+
result = json.loads(lambda_response['Payload'].read()) # .decode()
144+
self.logger.debug('\nResponse received from Lambda:\n')
145+
self.logger.debug(result)
139146
return result

0 commit comments

Comments
 (0)