55
66
77class BatchDataLoader (object ):
8- def __init__ (self , data_source , source_table_configuration , target_schema , target_table , columns , data_load_tracker , batch_configuration , target_engine , logger = None ):
8+ def __init__ (self , data_source , source_table_configuration , target_schema , target_table , columns , data_load_tracker ,
9+ batch_configuration , target_engine , logger = None ):
910 self .logger = logger or logging .getLogger (__name__ )
1011 self .source_table_configuration = source_table_configuration
1112 self .columns = columns
@@ -22,7 +23,8 @@ def load_batch(self, previous_batch_key):
2223
2324 self .logger .debug ("ImportBatch Starting from previous_batch_key: {0}" .format (previous_batch_key ))
2425
25- data_frame = self .data_source .get_next_data_frame (self .source_table_configuration , self .columns , self .batch_configuration , batch_tracker , previous_batch_key )
26+ data_frame = self .data_source .get_next_data_frame (self .source_table_configuration , self .columns ,
27+ self .batch_configuration , batch_tracker , previous_batch_key )
2628
2729 if data_frame is None or len (data_frame ) == 0 :
2830 self .logger .debug ("There are no rows to import, returning -1" )
@@ -43,15 +45,15 @@ def write_data_frame_to_table(self, data_frame):
4345 self .logger .debug ("Starting write to table {0}" .format (qualified_target_table ))
4446 data = StringIO ()
4547
46-
47- data_frame .to_csv (data , header = False , index = False , na_rep = '' , float_format = '%.16g' )
48+ data_frame .to_csv (data , header = False , index = False , na_rep = '' , float_format = '%.16g' )
4849 # Float_format is used to truncate any insignificant digits. Unfortunately it gives us an artificial limitation
4950
5051 data .seek (0 )
5152 raw = self .target_engine .raw_connection ()
5253 curs = raw .cursor ()
5354
54- column_array = list (map (lambda source_colum_name : self .get_destination_column_name (source_colum_name ), data_frame .columns ))
55+ column_array = list (
56+ map (lambda source_colum_name : self .get_destination_column_name (source_colum_name ), data_frame .columns ))
5557 column_list = ',' .join (map (str , column_array ))
5658
5759 sql = "COPY {0}({1}) FROM STDIN with csv" .format (qualified_target_table , column_list )
0 commit comments