88
99
1010class BatchDataLoader (object ):
11- def __init__ (self , source_db , source_table_config , target_schema , target_table , columns , data_load_tracker ,
12- batch_config , target_db , full_refresh , change_tracking_info , logger = None ):
11+ def __init__ (
12+ self ,
13+ source_db ,
14+ source_table_config ,
15+ target_schema ,
16+ target_table ,
17+ columns ,
18+ data_load_tracker ,
19+ batch_config ,
20+ target_db ,
21+ full_refresh ,
22+ change_tracking_info ,
23+ logger = None ,
24+ ):
1325 self .logger = logger or logging .getLogger (__name__ )
1426 self .source_table_config = source_table_config
1527 self .columns = columns
@@ -26,14 +38,22 @@ def __init__(self, source_db, source_table_config, target_schema, target_table,
2638 def load_batch (self , batch_key_tracker ):
2739 batch_tracker = self .data_load_tracker .start_batch ()
2840
29- self .logger .debug (f"ImportBatch Starting from previous_batch_key: '{ batch_key_tracker .bookmarks } '. "
30- f"Full Refresh: '{ self .full_refresh } ', "
31- f"sync_version: '{ self .change_tracking_info .sync_version } ', "
32- f"last_sync_version: '{ self .change_tracking_info .last_sync_version } '." )
33-
34- data_frame = self .source_db .get_table_data_frame (self .source_table_config , self .columns ,
35- self .batch_config , batch_tracker , batch_key_tracker ,
36- self .full_refresh , self .change_tracking_info )
41+ self .logger .debug (
42+ f"ImportBatch Starting from previous_batch_key: '{ batch_key_tracker .bookmarks } '. "
43+ f"Full Refresh: '{ self .full_refresh } ', "
44+ f"sync_version: '{ self .change_tracking_info .sync_version } ', "
45+ f"last_sync_version: '{ self .change_tracking_info .last_sync_version } '."
46+ )
47+
48+ data_frame = self .source_db .get_table_data_frame (
49+ self .source_table_config ,
50+ self .columns ,
51+ self .batch_config ,
52+ batch_tracker ,
53+ batch_key_tracker ,
54+ self .full_refresh ,
55+ self .change_tracking_info ,
56+ )
3757
3858 if data_frame is None or len (data_frame ) == 0 :
3959 self .logger .debug ("There are no more rows to import." )
@@ -46,21 +66,33 @@ def load_batch(self, batch_key_tracker):
4666 batch_tracker .load_completed_successfully ()
4767
4868 for primary_key in batch_key_tracker .primary_keys :
49- batch_key_tracker .set_bookmark (primary_key , int (data_frame .iloc [- 1 ][primary_key ]))
69+ batch_key_tracker .set_bookmark (
70+ primary_key , int (data_frame .iloc [- 1 ][primary_key ])
71+ )
5072
51- self .logger .info (f"Batch keys '{ batch_key_tracker .bookmarks } ' completed. { batch_tracker .get_statistics ()} " )
73+ self .logger .info (
74+ f"Batch keys '{ batch_key_tracker .bookmarks } ' completed. { batch_tracker .get_statistics ()} "
75+ )
5276
5377 @prevent_senstive_data_logging
5478 def write_data_frame_to_table (self , data_frame ):
55- qualified_target_table = f' { self .target_schema } .{ self .target_table } '
79+ qualified_target_table = f" { self .target_schema } .{ self .target_table } "
5680 self .logger .debug (f"Starting write to table '{ qualified_target_table } '" )
5781 data = StringIO ()
5882 # quoting: Due to \r existing in strings in MSSQL we must quote anything that's non numeric just to be safe
5983 # line_terminator: ensure \n is used even on windows machines as prod runs on *nix with \n
6084 # na_rep: Due to us quoting everything non-numeric, our null's must be represented by something special, as the
6185 # default null representation (nothing), once quoted, is equivalent to an empty string
62- data_frame .to_csv (data , header = False , index = False , na_rep = '\\ N' , float_format = '%.16g' ,
63- quotechar = '"' , quoting = csv .QUOTE_NONNUMERIC , line_terminator = '\n ' )
86+ data_frame .to_csv (
87+ data ,
88+ header = False ,
89+ index = False ,
90+ na_rep = "\\ N" ,
91+ float_format = "%.16g" ,
92+ quotechar = '"' ,
93+ quoting = csv .QUOTE_NONNUMERIC ,
94+ line_terminator = "\n " ,
95+ )
6496 # Float_format is used to truncate any insignificant digits. Unfortunately it gives us an artificial limitation
6597
6698 data .seek (0 )
@@ -69,19 +101,27 @@ def write_data_frame_to_table(self, data_frame):
69101
70102 # log CSV on debug
71103 if self .logger .getEffectiveLevel () == logging .DEBUG :
72- with open (f' { qualified_target_table } .csv' , 'w' , encoding = ' utf-8' ) as f :
104+ with open (f" { qualified_target_table } .csv" , "w" , encoding = " utf-8" ) as f :
73105 f .write (data .getvalue ())
74106
75107 column_array = list (
76- map (lambda source_colum_name : self .get_destination_column_name (source_colum_name ), data_frame .columns ))
77- column_list = ',' .join (map (str , column_array ))
108+ map (
109+ lambda source_colum_name : self .get_destination_column_name (
110+ source_colum_name
111+ ),
112+ data_frame .columns ,
113+ )
114+ )
115+ column_list = "," .join (map (str , column_array ))
78116
79117 # FORCE_NULL: ensure quoted fields are checked for NULLs as by default they are assumed to be non-null
80118 # specify null as \N so that psql doesn't assume empty strings are nulls
81- sql = f"COPY { qualified_target_table } ({ column_list } ) FROM STDIN " \
82- f"with (format csv, " \
83- f"null '\\ N', " \
119+ sql = (
120+ f"COPY { qualified_target_table } ({ column_list } ) FROM STDIN "
121+ f"with (format csv, "
122+ f"null '\\ N', "
84123 f"FORCE_NULL ({ column_list } ))"
124+ )
85125 self .logger .debug (f"Writing to table using command '{ sql } '" )
86126
87127 curs .copy_expert (sql = sql , file = data )
@@ -93,11 +133,13 @@ def write_data_frame_to_table(self, data_frame):
93133
94134 def get_destination_column_name (self , source_column_name ):
95135 for column in self .columns :
96- if column [' source_name' ] == source_column_name :
97- return column [' destination' ][ ' name' ]
136+ if column [" source_name" ] == source_column_name :
137+ return column [" destination" ][ " name" ]
98138
99139 # Audit columns - map them straight through
100- if source_column_name .startswith (Providers .AuditColumnsNames .audit_column_prefix ):
140+ if source_column_name .startswith (
141+ Providers .AuditColumnsNames .audit_column_prefix
142+ ):
101143 return source_column_name
102144
103145 message = f"A source column with name '{ source_column_name } ' was not found in the column configuration"
@@ -106,9 +148,11 @@ def get_destination_column_name(self, source_column_name):
106148 def attach_column_transformers (self , data_frame ):
107149 self .logger .debug ("Attaching column transformers" )
108150 for column in self .columns :
109- if ' column_transformer' in column :
151+ if " column_transformer" in column :
110152 # transformer = Utils.create_type_instance(column['column_transformer'])
111153 transformer = ToUpper .execute
112- data_frame [column ['source_name' ]] = data_frame [column ['source_name' ]].map (transformer )
154+ data_frame [column ["source_name" ]] = data_frame [
155+ column ["source_name" ]
156+ ].map (transformer )
113157 # print (data_frame)
114158 return data_frame
0 commit comments