11import logging
2+ import csv
3+
24from io import StringIO
35from modules .column_transformers .StringTransformers import ToUpper
46from modules .shared import Constants
@@ -51,7 +53,12 @@ def write_data_frame_to_table(self, data_frame):
5153 qualified_target_table = f'{ self .target_schema } .{ self .target_table } '
5254 self .logger .debug (f"Starting write to table '{ qualified_target_table } '" )
5355 data = StringIO ()
54- data_frame .to_csv (data , header = False , index = False , na_rep = '' , float_format = '%.16g' )
56+ # quoting: Due to \r existing in strings in MSSQL we must quote anything that's non numeric just to be safe
57+ # line_terminator: ensure \n is used even on windows machines as prod runs on *nix with \n
58+ # na_rep: Due to us quoting everything non-numeric, our null's must be represented by something special, as the
59+ # default null representation (nothing), once quoted, is equivalent to an empty string
60+ data_frame .to_csv (data , header = False , index = False , na_rep = '\\ N' , float_format = '%.16g' ,
61+ quotechar = '"' , quoting = csv .QUOTE_NONNUMERIC , line_terminator = '\n ' )
5562 # Float_format is used to truncate any insignificant digits. Unfortunately it gives us an artificial limitation
5663
5764 data .seek (0 )
@@ -67,7 +74,12 @@ def write_data_frame_to_table(self, data_frame):
6774 map (lambda source_colum_name : self .get_destination_column_name (source_colum_name ), data_frame .columns ))
6875 column_list = ',' .join (map (str , column_array ))
6976
70- sql = f"COPY { qualified_target_table } ({ column_list } ) FROM STDIN with csv"
77+ # FORCE_NULL: ensure quoted fields are checked for NULLs as by default they are assumed to be non-null
78+ # specify null as \N so that psql doesn't assume empty strings are nulls
79+ sql = f"COPY { qualified_target_table } ({ column_list } ) FROM STDIN " \
80+ f"with (format csv, " \
81+ f"null '\\ N', " \
82+ f"FORCE_NULL ({ column_list } ))"
7183 self .logger .debug (f"Writing to table using command '{ sql } '" )
7284
7385 curs .copy_expert (sql = sql , file = data )
0 commit comments