|
1 | 1 | import logging |
| 2 | +import csv |
| 3 | + |
2 | 4 | from io import StringIO |
3 | 5 | from modules.column_transformers.StringTransformers import ToUpper |
4 | 6 | from modules.shared import Constants |
@@ -51,18 +53,33 @@ def write_data_frame_to_table(self, data_frame): |
51 | 53 | qualified_target_table = f'{self.target_schema}.{self.target_table}' |
52 | 54 | self.logger.debug(f"Starting write to table '{qualified_target_table}'") |
53 | 55 | data = StringIO() |
54 | | - data_frame.to_csv(data, header=False, index=False, na_rep='', float_format='%.16g') |
| 56 | + # quoting: Due to \r existing in strings in MSSQL we must quote anything that's non numeric just to be safe |
| 57 | + # line_terminator: ensure \n is used even on windows machines as prod runs on *nix with \n |
| 58 | + # na_rep: Due to us quoting everything non-numeric, our null's must be represented by something special, as the |
| 59 | + # default null representation (nothing), once quoted, is equivalent to an empty string |
| 60 | + data_frame.to_csv(data, header=False, index=False, na_rep='\\N', float_format='%.16g', |
| 61 | + quotechar='"', quoting=csv.QUOTE_NONNUMERIC, line_terminator='\n') |
55 | 62 | # Float_format is used to truncate any insignificant digits. Unfortunately it gives us an artificial limitation |
56 | 63 |
|
57 | 64 | data.seek(0) |
58 | 65 | raw = self.target_db.raw_connection() |
59 | 66 | curs = raw.cursor() |
60 | 67 |
|
| 68 | + # log CSV on debug |
| 69 | + if self.logger.getEffectiveLevel() == logging.DEBUG: |
| 70 | + with open(f'{qualified_target_table}.csv', 'w', encoding='utf-8') as f: |
| 71 | + f.write(data.getvalue()) |
| 72 | + |
61 | 73 | column_array = list( |
62 | 74 | map(lambda source_colum_name: self.get_destination_column_name(source_colum_name), data_frame.columns)) |
63 | 75 | column_list = ','.join(map(str, column_array)) |
64 | 76 |
|
65 | | - sql = f"COPY {qualified_target_table}({column_list}) FROM STDIN with csv" |
| 77 | + # FORCE_NULL: ensure quoted fields are checked for NULLs as by default they are assumed to be non-null |
| 78 | + # specify null as \N so that psql doesn't assume empty strings are nulls |
| 79 | + sql = f"COPY {qualified_target_table}({column_list}) FROM STDIN "\ |
| 80 | + f"with (format csv, "\ |
| 81 | + f"null '\\N', "\ |
| 82 | + f"FORCE_NULL ({column_list}))" |
66 | 83 | self.logger.debug(f"Writing to table using command '{sql}'") |
67 | 84 |
|
68 | 85 | curs.copy_expert(sql=sql, file=data) |
|
0 commit comments