@@ -16,7 +16,10 @@ class AWSLambdaDataSource(object):
1616 CONNECTION_STRING_PREFIX = "aws-lambda://"
1717 CONNECTION_STRING_GROUP_SEPARATOR = ";"
1818 CONNECTION_STRING_KEY_VALUE_SEPARATOR = "="
19- CONNECTION_STRING_ROLE_KEY = "role"
19+
20+ CONNECTION_DATA_ROLE_KEY = "role"
21+ CONNECTION_DATA_FUNCTION_KEY = "function"
22+ CONNECTION_DATA_TENANT_KEY = "tenant"
2023
2124 AWS_SERVICE_LAMBDA = "lambda"
2225 AWS_SERVICE_S3 = "s3"
@@ -39,8 +42,8 @@ def __init__(self, connection_string, logger=None):
3942
4043 self .aws_sts_client = boto3 .client ("sts" )
4144 role_credentials = self .__assume_role (
42- self .connection_data [self .CONNECTION_STRING_ROLE_KEY ],
43- f' dwp_{ self .connection_data ["tenant" ] } ' ,
45+ self .connection_data [self .CONNECTION_DATA_ROLE_KEY ],
46+ f" dwp_{ self .connection_data [self . CONNECTION_DATA_TENANT_KEY ] } " ,
4447 )
4548
4649 self .aws_lambda_client = self .__get_aws_client (
@@ -105,7 +108,7 @@ def get_table_data_frame(
105108 def __get_table_info (self , table_config , last_known_sync_version ):
106109 pay_load = {
107110 "Command" : "GetTableInfo" ,
108- "TenantId" : int (self .connection_data ["tenant" ]),
111+ "TenantId" : int (self .connection_data [self . CONNECTION_DATA_TENANT_KEY ]),
109112 "Table" : {"Schema" : table_config ["schema" ], "Name" : table_config ["name" ]},
110113 "CommandPayload" : {"LastSyncVersion" : last_known_sync_version },
111114 }
@@ -131,7 +134,7 @@ def __get_table_data(
131134 ):
132135 pay_load = {
133136 "Command" : "GetTableData" ,
134- "TenantId" : int (self .connection_data ["tenant" ]),
137+ "TenantId" : int (self .connection_data [self . CONNECTION_DATA_TENANT_KEY ]),
135138 "Table" : {"Schema" : table_config ["schema" ], "Name" : table_config ["name" ]},
136139 "CommandPayload" : {
137140 "AuditColumnNameForChangeVersion" : Providers .AuditColumnsNames .CHANGE_VERSION ,
@@ -195,8 +198,8 @@ def __refresh_aws_clients_if_expired(self):
195198 and current_datetime < self .role_session_expiry
196199 ):
197200 role_credentials = self .__assume_role (
198- self .connection_data [self .CONNECTION_STRING_ROLE_KEY ],
199- f' dwp_{ self .connection_data ["tenant" ] } ' ,
201+ self .connection_data [self .CONNECTION_DATA_ROLE_KEY ],
202+ f" dwp_{ self .connection_data [self . CONNECTION_DATA_TENANT_KEY ] } " ,
200203 )
201204
202205 self .aws_lambda_client = self .__get_aws_client (
@@ -227,7 +230,7 @@ def __invoke_lambda(self, pay_load):
227230 self .logger .debug (pay_load )
228231
229232 lambda_response = self .aws_lambda_client .invoke (
230- FunctionName = self .connection_data ["function" ],
233+ FunctionName = self .connection_data [self . CONNECTION_DATA_FUNCTION_KEY ],
231234 InvocationType = "RequestResponse" ,
232235 LogType = "None" , # |'Tail', Set to Tail to include the execution log in the response
233236 Payload = json .dumps (pay_load ).encode (),
@@ -245,7 +248,7 @@ def __invoke_lambda(self, pay_load):
245248
246249 if response_status_code != 200 or response_function_error :
247250 self .logger .error (
248- f' Error in response from aws lambda \ '{ self .connection_data ["function" ] } \ ' , '
251+ f" Error in response from aws lambda '{ self .connection_data [self . CONNECTION_DATA_FUNCTION_KEY ] } ', "
249252 f"attempt { current_attempt } of { max_attempts } "
250253 )
251254 self .logger .error (f"Response - Status Code = { response_status_code } " )
0 commit comments