Skip to content
This repository was archived by the owner on Mar 13, 2020. It is now read-only.

Commit cc88ac6

Browse files
authored
Merge pull request #2 from PageUpPeopleOrg/OSC-922-FixNullInts
Osc 922 fix null ints
2 parents cd2046a + 2a8b686 commit cc88ac6

File tree

15 files changed

+198
-91
lines changed

15 files changed

+198
-91
lines changed

README.md

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,34 @@ In the above example, dwsource is a 64bit ODBC system dsn
3232
Run with `--log-level DEBUG` on the command line.
3333

3434

35-
##Other Notes
36-
###Testing
35+
## Other Notes
36+
### Testing
3737
The test batch files assume there is a user by the name of `postgres` on the system.
3838
It also sends through a nonense password - it is assumed that the target system is running in 'trust' mode.
39-
See https://www.postgresql.org/docs/9.1/static/auth-pg-hba-conf.html for details on trust mode
39+
See https://www.postgresql.org/docs/9.1/static/auth-pg-hba-conf.html for details on trust mode
40+
41+
42+
43+
### Destination.Type Values
44+
The destination.type value controls both the data reader type and the destination column type. They are mapped as followed
45+
46+
| destination.type | pandas type | sqlalchemy type | dw column type | notes |
47+
|-----------------------------|-------------|---------------------------------------|----------------|--------------------------------------------------|
48+
| string | str | citext.CIText | citext | A case-insensitive string that supports unicode |
49+
| int (when nullable = false) | int | sqlalchemy.Integer | int | An (optionally) signed INT value |
50+
| int (when nullable = true) | object | sqlalchemy.Integer | int | An (optionally) signed INT value |
51+
| datetime | str | sqlalchemy.DateTime | datetime (tz?) | |
52+
| json | str | sqlalchemy.dialects.postgresql.JSONB | jsonb | Stored as binary-encoded json on the database |
53+
| numeric | float | sqlalchemy.Numeric | numeric | Stores whole and decimal numbers |
54+
| guid | str | sqlalchemy.dialects.postgresql.UUID | uuid | |
55+
| bigint | int | sqlalchemy.BigInteger | BigInt | Relies on 64big python. Limited to largest number of ~2147483647121212|
56+
57+
58+
These are implemented in Column_Type_Resolver.py
59+
60+
61+
62+
63+
64+
65+

integration_tests/csv_source/assertions/column_test_full_refresh_assertions.sql

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,20 @@ SET client_encoding TO 'UTF8';
33
DROP TABLE IF EXISTS results;
44

55
CREATE TEMPORARY TABLE results AS
6-
WITH expected(id, int_column_1, date_column_1, decimal_column_1, date_time_column_1, string_column_1) AS (
7-
SELECT 1, 111.0, '1976-12-01'::DATE, 12.1212, '1976-12-01 01:00:00.000000'::TIMESTAMP, 'A Basic String'
6+
WITH expected(id, int_column_1, date_column_1, decimal_column_1, date_time_column_1, string_column_1, guid_column_1,big_int_column_1) AS (
7+
SELECT 1, 111.0, '1976-12-01'::DATE, 12.1212, '1976-12-01 01:00:00.000000'::TIMESTAMP, 'A Basic String', '57bc8093-fe4c-477a-bbd7-fb5c02055a7e'::UUID,2147483647121212
88
UNION ALL
9-
SELECT 2, NULL, NULL, NULL, NULL, NULL
9+
SELECT 2, NULL, NULL, NULL, NULL, NULL, NULL, NULL
1010
UNION ALL
11-
SELECT 3, 333.0, '2001-01-01', 33.333, NULL, 'This Text Has a Quote Before "Dave'
11+
SELECT 3, 333.0, '2001-01-01', 33.333, NULL, 'This Text Has a Quote Before "Dave', NULL, NULL
1212
UNION ALL
13-
SELECT 4, NULL, NULL, NULL, NULL, 'ം ഃ അ ആ ഇ ഈ ഉ ഊ ഋ ഌ എ ഏ'
13+
SELECT 4, NULL, NULL, NULL, NULL, 'ം ഃ അ ആ ഇ ഈ ഉ ഊ ഋ ഌ എ ഏ','aabc8093-fe4c-477a-bbd7-fb5c02055a7e', NULL
1414
UNION ALL
15-
SELECT 5, NULL, NULL, NULL, NULL, 'This row will be updated in the incremental review test'
15+
SELECT 5, NULL, NULL, NULL, NULL, 'This row will be updated in the incremental review test', NULL, NULL
1616
),
1717

1818
actual AS (
19-
SELECT id, int_column_1, date_column_1, decimal_column_1, date_time_column_1, string_column_1
19+
SELECT id, int_column_1, date_column_1, decimal_column_1, date_time_column_1, string_column_1,guid_column_1,big_int_column_1
2020
FROM rdl_integration_tests.load_source_data
2121
)
2222

integration_tests/csv_source/assertions/column_test_incremental_refresh_assertions.sql

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,24 +2,24 @@ SET client_encoding TO 'UTF8';
22
DROP TABLE IF EXISTS results;
33

44
CREATE TEMPORARY TABLE results AS
5-
WITH expected(id, int_column_1, date_column_1, decimal_column_1, date_time_column_1, string_column_1) AS (
6-
SELECT 1, 111.0, '1976-12-01'::DATE, 12.1212, '1976-12-01 01:00:00.000000'::TIMESTAMP, 'A Basic String'
5+
WITH expected(id, int_column_1, date_column_1, decimal_column_1, date_time_column_1, string_column_1, guid_column_1,big_int_column_1) AS (
6+
SELECT 1, 111.0, '1976-12-01'::DATE, 12.1212, '1976-12-01 01:00:00.000000'::TIMESTAMP, 'A Basic String', '57bc8093-fe4c-477a-bbd7-fb5c02055a7e'::UUID,2147483647121212
77
UNION ALL
8-
SELECT 2, NULL, NULL, NULL, NULL, NULL
8+
SELECT 2, NULL, NULL, NULL, NULL, NULL, NULL, NULL
99
UNION ALL
10-
SELECT 3, 333.0, '2001-01-01', 33.333, NULL, 'This Text Has a Quote Before "Dave'
10+
SELECT 3, 333.0, '2001-01-01', 33.333, NULL, 'This Text Has a Quote Before "Dave', NULL, NULL
1111
UNION ALL
12-
SELECT 4, NULL, NULL, NULL, NULL, 'ം ഃ അ ആ ഇ ഈ ഉ ഊ ഋ ഌ എ ഏ'
12+
SELECT 4, NULL, NULL, NULL, NULL, 'ം ഃ അ ആ ഇ ഈ ഉ ഊ ഋ ഌ എ ഏ', 'aabc8093-fe4c-477a-bbd7-fb5c02055a7e', NULL
1313
UNION ALL
14-
SELECT 5, NULL, NULL, NULL, NULL, 'This row WAS updated in the incremental review test'
14+
SELECT 5, NULL, NULL, NULL, NULL, 'This row WAS updated in the incremental review test', NULL, NULL
1515
UNION ALL
16-
SELECT 6, 111.0, '1976-12-01'::DATE, 12.1212, '1976-12-01 01:00:00.000000'::TIMESTAMP, 'A Basic String'
16+
SELECT 6, 111.0, '1976-12-01'::DATE, 12.1212, '1976-12-01 01:00:00.000000'::TIMESTAMP, 'A Basic String', '57bc8093-fe4c-477a-bbd7-fb5c02055a7e', NULL
1717
UNION ALL
18-
SELECT 7, 111.0, '1976-12-01'::DATE, 12.1212, '1976-12-01 01:00:00.000000'::TIMESTAMP, 'Another Basic String'
18+
SELECT 7, 111.0, '1976-12-01'::DATE, 12.1212, '1976-12-01 01:00:00.000000'::TIMESTAMP, 'Another Basic String', NULL, NULL
1919
),
2020

2121
actual AS (
22-
SELECT id, int_column_1, date_column_1, decimal_column_1, date_time_column_1, string_column_1
22+
SELECT id, int_column_1, date_column_1, decimal_column_1, date_time_column_1, string_column_1, guid_column_1,big_int_column_1
2323
FROM rdl_integration_tests.load_source_data
2424
)
2525

integration_tests/csv_source/config/ColumnTest.json

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
"source_name": "id",
1818
"destination": {
1919
"name": "id",
20-
"type": "sqlalchemy.Integer",
20+
"type": "int",
2121
"nullable": false,
2222
"primary_key": true
2323
}
@@ -26,41 +26,61 @@
2626
"source_name": "IntColumn1",
2727
"destination": {
2828
"name": "int_column_1",
29-
"type": "sqlalchemy.Numeric",
29+
"type": "int",
3030
"nullable": true
3131
}
3232
},
3333
{
3434
"source_name": "DateColumn1",
3535
"destination": {
3636
"name": "date_column_1",
37-
"type": "sqlalchemy.DateTime",
37+
"type": "datetime",
3838
"nullable": true
3939
}
4040
},
4141
{
4242
"source_name": "DecimalColumn1",
4343
"destination": {
4444
"name": "decimal_column_1",
45-
"type": "sqlalchemy.Numeric",
45+
"type": "numeric",
4646
"nullable": true
4747
}
4848
},
4949
{
5050
"source_name": "DateTimeColumn1",
5151
"destination": {
5252
"name": "date_time_column_1",
53-
"type": "sqlalchemy.DateTime",
53+
"type": "datetime",
5454
"nullable": true
5555
}
5656
},
5757
{
5858
"source_name": "StringColumn1",
5959
"destination": {
6060
"name": "string_column_1",
61-
"type": "citext.CIText",
61+
"type": "string",
62+
"nullable": true
63+
}
64+
},
65+
{
66+
"source_name": "GuidColumn1",
67+
"destination": {
68+
"name": "guid_column_1",
69+
"type": "guid",
70+
"nullable": true
71+
}
72+
},
73+
{
74+
"source_name": "BigIntColumn1",
75+
"destination": {
76+
"name": "big_int_column_1",
77+
"type": "bigint",
6278
"nullable": true
6379
}
6480
}
81+
82+
83+
84+
6585
]
6686
}
Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
id,StringColumn1,IntColumn1,DecimalColumn1,DateColumn1,DateTimeColumn1
2-
1,"A Basic String",111,12.1212,01-Dec-1976,01-dec-1976 1:00 am
3-
2,,,,,
4-
3,"This Text Has a Quote Before ""Dave", 333,33.333, 01-01-01,
5-
4,"ം ഃ അ ആ ഇ ഈ ഉ ഊ ഋ ഌ എ ഏ",,,,
6-
5,"This row will be updated in the incremental review test"
1+
id,StringColumn1,IntColumn1,DecimalColumn1,DateColumn1,DateTimeColumn1,GuidColumn1,BigIntColumn1
2+
1,"A Basic String",111,12.1212,01-Dec-1976,01-dec-1976 1:00 am,57BC8093-FE4C-477A-BBD7-FB5C02055A7E,2147483647121212
3+
2,,,,,,,
4+
3,"This Text Has a Quote Before ""Dave", 333,33.333, 01-01-01,,,
5+
4,"ം ഃ അ ആ ഇ ഈ ഉ ഊ ഋ ഌ എ ഏ",,,,,AABC8093-FE4C-477A-BBD7-FB5C02055A7E,
6+
5,"This row will be updated in the incremental review test",,
Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
id,StringColumn1,IntColumn1,DecimalColumn1,DateColumn1,DateTimeColumn1
2-
5,"This row WAS updated in the incremental review test",,,,
3-
6,"A Basic String",111,12.1212,01-Dec-1976,01-dec-1976 1:00 am
4-
7,"Another Basic String",111,12.1212,01-Dec-1976,01-dec-1976 1:00 am
1+
id,StringColumn1,IntColumn1,DecimalColumn1,DateColumn1,DateTimeColumn1,GuidColumn1,BigIntColumn1
2+
5,"This row WAS updated in the incremental review test",,,,,,
3+
6,"A Basic String",111,12.1212,01-Dec-1976,01-dec-1976 1:00 am,57BC8093-FE4C-477A-BBD7-FB5C02055A7E,
4+
7,"Another Basic String",111,12.1212,01-Dec-1976,01-dec-1976 1:00 am,,

integration_tests/mssql_source/config/LargeTableTest.json

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
"source_name": "Id",
1818
"destination": {
1919
"name": "id",
20-
"type": "sqlalchemy.Integer",
20+
"type": "int",
2121
"nullable": false,
2222
"primary_key": true
2323
}
@@ -26,47 +26,47 @@
2626
"source_name": "DateColumn1",
2727
"destination": {
2828
"name": "date_column_1",
29-
"type": "sqlalchemy.DateTime",
29+
"type": "datetime",
3030
"nullable": true
3131
}
3232
},
3333
{
3434
"source_name": "IntColumn1",
3535
"destination": {
3636
"name": "int_column_1",
37-
"type": "sqlalchemy.Numeric",
37+
"type": "int",
3838
"nullable": true
3939
}
4040
},
4141
{
4242
"source_name": "DateColumn2",
4343
"destination": {
4444
"name": "date_column_2",
45-
"type": "sqlalchemy.DateTime",
45+
"type": "datetime",
4646
"nullable": true
4747
}
4848
},
4949
{
5050
"source_name": "StringColumn1",
5151
"destination": {
5252
"name": "string_column_1",
53-
"type": "citext.CIText",
53+
"type": "string",
5454
"nullable": true
5555
}
5656
},
5757
{
5858
"source_name": "StringColumn2",
5959
"destination": {
6060
"name": "string_column_2",
61-
"type": "citext.CIText",
61+
"type": "string",
6262
"nullable": true
6363
}
6464
},
6565
{
6666
"source_name": "GuidColumn",
6767
"destination": {
6868
"name": "guid_column",
69-
"type": "citext.CIText",
69+
"type": "guid",
7070
"nullable": true
7171
}
7272
}

modules/BatchDataLoader.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@
55

66

77
class BatchDataLoader(object):
8-
def __init__(self, data_source, source_table_configuration, target_schema, target_table, columns, data_load_tracker, batch_configuration, target_engine, logger=None):
8+
def __init__(self, data_source, source_table_configuration, target_schema, target_table, columns, data_load_tracker,
9+
batch_configuration, target_engine, logger=None):
910
self.logger = logger or logging.getLogger(__name__)
1011
self.source_table_configuration = source_table_configuration
1112
self.columns = columns
@@ -22,7 +23,8 @@ def load_batch(self, previous_batch_key):
2223

2324
self.logger.debug("ImportBatch Starting from previous_batch_key: {0}".format(previous_batch_key))
2425

25-
data_frame = self.data_source.get_next_data_frame(self.source_table_configuration, self.columns, self.batch_configuration, batch_tracker, previous_batch_key)
26+
data_frame = self.data_source.get_next_data_frame(self.source_table_configuration, self.columns,
27+
self.batch_configuration, batch_tracker, previous_batch_key)
2628

2729
if data_frame is None or len(data_frame) == 0:
2830
self.logger.debug("There are no rows to import, returning -1")
@@ -42,12 +44,16 @@ def write_data_frame_to_table(self, data_frame):
4244
qualified_target_table = "{0}.{1}".format(self.target_schema, self.target_table)
4345
self.logger.debug("Starting write to table {0}".format(qualified_target_table))
4446
data = StringIO()
45-
data_frame.to_csv(data, header=False, index=False, na_rep='')
47+
48+
data_frame.to_csv(data, header=False, index=False, na_rep='', float_format='%.16g')
49+
# Float_format is used to truncate any insignificant digits. Unfortunately it gives us an artificial limitation
50+
4651
data.seek(0)
4752
raw = self.target_engine.raw_connection()
4853
curs = raw.cursor()
4954

50-
column_array = list(map(lambda source_colum_name: self.get_destination_column_name(source_colum_name), data_frame.columns))
55+
column_array = list(
56+
map(lambda source_colum_name: self.get_destination_column_name(source_colum_name), data_frame.columns))
5157
column_list = ','.join(map(str, column_array))
5258

5359
sql = "COPY {0}({1}) FROM STDIN with csv".format(qualified_target_table, column_list)

modules/ColumnTypeResolver.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
import citext
2+
from sqlalchemy import DateTime, Numeric, Integer, BigInteger
3+
from sqlalchemy.dialects.postgresql import JSONB
4+
from sqlalchemy.dialects.postgresql import UUID
5+
6+
7+
class ColumnTypeResolver(object):
8+
PANDAS_TYPE_MAP = {'string': str,
9+
'datetime': str,
10+
'json': str,
11+
'numeric': float,
12+
'guid': str,
13+
'bigint': int}
14+
15+
POSTGRES_TYPE_MAP = {'string': citext.CIText,
16+
'datetime': DateTime,
17+
'json': JSONB,
18+
'numeric': Numeric,
19+
'guid': UUID,
20+
'int': Integer,
21+
'bigint': BigInteger}
22+
23+
def resolve_postgres_type(self, column):
24+
return self.POSTGRES_TYPE_MAP[column['type']]
25+
26+
def resolve_pandas_type(self, column):
27+
if column['type'] == 'int':
28+
if column['nullable']:
29+
return object
30+
else:
31+
return int
32+
else:
33+
return self.PANDAS_TYPE_MAP[column['type']]
34+
35+
def create_column_type_dictionary(self, columns):
36+
types = {}
37+
for column in columns:
38+
types[column['source_name']] = self.resolve_pandas_type(column['destination'])
39+
return types

modules/DataLoadManager.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,10 @@ def start_single_import(self, target_engine, configuration_name, requested_full_
4040

4141
data_load_tracker = DataLoadTracker(configuration_name, json_data, full_refresh)
4242

43-
columns = self.data_source.get_valid_columns(pipeline_configuration['source_table'],
43+
self.data_source.assert_data_source_is_valid(pipeline_configuration['source_table'],
4444
pipeline_configuration['columns'])
4545

46-
if columns is None:
47-
self.logger.debug("There are no columns, returning.")
48-
return
49-
46+
columns = pipeline_configuration['columns']
5047
destination_table_manager.create_schema(pipeline_configuration['target_schema'])
5148

5249
self.logger.debug("Recreating the staging table {0}.{1}".format(pipeline_configuration['target_schema'], pipeline_configuration['stage_table']))

0 commit comments

Comments
 (0)