@@ -100,16 +100,16 @@ def rename_table(self, schema_name, source_table_name, target_table_name):
100100 self .logger .debug (f"Table Rename, executing '{ sql } '" )
101101 self .target_db .execute (sql )
102102
103- def upsert_table (self , schema_name , source_table_name , target_table_name , columns_configuration ):
104- column_array = list (map (lambda column : column ['destination' ]['name' ], columns_configuration ))
103+ def upsert_table (self , schema_name , source_table_name , target_table_name , columns_config ):
104+ column_array = list (map (lambda column : column ['destination' ]['name' ], columns_config ))
105105 column_list = ',' .join (map (str , column_array ))
106106 column_list = column_list + f",{ Constants .AuditColumnNames .TIMESTAMP } "
107107 column_list = column_list + f",{ Constants .AuditColumnNames .IS_DELETED } "
108108 column_list = column_list + f",{ Constants .AuditColumnNames .CHANGE_VERSION } "
109109
110- primary_key_column_array = [column_configuration ['destination' ]['name' ] for column_configuration in
111- columns_configuration if 'primary_key' in column_configuration ['destination' ] and
112- column_configuration ['destination' ]['primary_key' ]]
110+ primary_key_column_array = [column_config ['destination' ]['name' ] for column_config in
111+ columns_config if 'primary_key' in column_config ['destination' ] and
112+ column_config ['destination' ]['primary_key' ]]
113113
114114 primary_key_column_list = ',' .join (map (str , primary_key_column_array ))
115115
@@ -118,15 +118,17 @@ def upsert_table(self, schema_name, source_table_name, target_table_name, column
118118 sql_builder .write (f" SELECT { column_list } FROM { schema_name } .{ source_table_name } \n " )
119119 sql_builder .write (f" ON CONFLICT({ primary_key_column_list } ) DO UPDATE SET " )
120120
121- for column_configuration in columns_configuration :
122- sql_builder .write ("{0} = EXCLUDED.{0},\n " .format (column_configuration ['destination' ]['name' ]))
121+ for column_config in columns_config :
122+ sql_builder .write ("{0} = EXCLUDED.{0},\n " .format (column_config ['destination' ]['name' ]))
123123
124124 sql_builder .write ("{0} = EXCLUDED.{0},\n " .format (Constants .AuditColumnNames .TIMESTAMP ))
125125 sql_builder .write ("{0} = EXCLUDED.{0},\n " .format (Constants .AuditColumnNames .IS_DELETED ))
126- sql_builder .write ("{0} = EXCLUDED.{0}\n " .format (Constants .AuditColumnNames .CHANGE_VERSION ))
126+ sql_builder .write ("{0} = EXCLUDED.{0}; \n " .format (Constants .AuditColumnNames .CHANGE_VERSION ))
127127
128- self .logger .debug (f"UPSERT executing '{ sql_builder .getvalue ()} '" )
129- self .target_db .execute (sql_builder .getvalue ())
128+ upsert_sql = sql_builder .getvalue ()
129+
130+ self .logger .debug (f"UPSERT executing '{ upsert_sql } '" )
131+ self .target_db .execute (upsert_sql )
130132 self .logger .debug ("UPSERT completed" )
131133
132134 sql_builder .close ()
0 commit comments