1010
1111
1212class AWSLambdaDataSource (object ):
13- CONNECTION_STRING_PREFIX = 'aws-lambda-arn://' # 'aws-lambda://tenant=543_dc2;arn=123456789012:function:my-function;'
13+ # 'aws-lambda://tenant=543_dc2;function=123456789012:function:my-function;'
14+ CONNECTION_STRING_PREFIX = 'aws-lambda://'
15+ CONNECTION_STRING_GROUP_SEPARATOR = ';'
16+ CONNECTION_STRING_KEY_VALUE_SEPARATOR = '='
1417
1518 def __init__ (self , connection_string , logger = None ):
1619 self .logger = logger or logging .getLogger (__name__ )
20+ if not AWSLambdaDataSource .can_handle_connection_string (connection_string ):
21+ raise ValueError (connection_string )
1722 self .connection_string = connection_string
23+ self .connection_data = dict (kv .split (AWSLambdaDataSource .CONNECTION_STRING_KEY_VALUE_SEPARATOR ) for kv in
24+ self .connection_string
25+ .lstrip (AWSLambdaDataSource .CONNECTION_STRING_PREFIX )
26+ .rstrip (AWSLambdaDataSource .CONNECTION_STRING_GROUP_SEPARATOR )
27+ .split (AWSLambdaDataSource .CONNECTION_STRING_GROUP_SEPARATOR ))
1828 self .aws_lambda_client = boto3 .client ('lambda' )
1929
2030 @staticmethod
@@ -31,7 +41,7 @@ def get_table_info(self, table_config, last_known_sync_version):
3141 columns_in_database = column_names
3242 change_tracking_info = ChangeTrackingInfo (
3343 last_sync_version = last_sync_version ,
34- sync_version = sync_version + 1 ,
44+ sync_version = sync_version ,
3545 force_full_load = full_refresh_required ,
3646 data_changed_since_last_sync = data_changed_since_last_sync )
3747 source_table_info = SourceTableInfo (columns_in_database , change_tracking_info )
@@ -51,7 +61,7 @@ def get_table_data_frame(self, table_config, columns, batch_config, batch_tracke
5161 def __get_table_info (self , table_config , last_known_sync_version ):
5262 pay_load = {
5363 "command" : "GetTableInfo" ,
54- "tenantId" : 543 , # self.connection_string. tenant.split('_')[0] as int
64+ "tenantId" : self .connection_data [ ' tenant' ],
5565 "table" : {
5666 "schema" : table_config ['schema' ],
5767 "name" : table_config ['name' ]
@@ -91,7 +101,7 @@ def __get_table_data(self, table_config, batch_config, change_tracking_info, ful
91101
92102 def __invoke_lambda (self , pay_load ):
93103 lambda_response = self .aws_lambda_client .invoke (
94- FunctionName = 'string' , # self.connection_string.arn
104+ FunctionName = self . connection_data [ 'function' ],
95105 InvocationType = 'RequestResponse' ,
96106 LogType = 'None' , # |'Tail', Set to Tail to include the execution log in the response
97107 Payload = json .dump (pay_load ).encode ()
0 commit comments