Skip to content
This repository was archived by the owner on Mar 13, 2020. It is now read-only.

Commit 6abd0d5

Browse files
author
dames
committed
Incremental/Full refresh from CSV appears to be working.
1 parent 1c84096 commit 6abd0d5

File tree

7 files changed

+100
-31
lines changed

7 files changed

+100
-31
lines changed
Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,15 @@
22

33
"source_table": {
44
"name": "ColumnTest",
5-
"schema": "dbo"
5+
"schema": "dbo",
6+
"primary_key": "id"
67
},
7-
"target_schema": "load",
8+
"target_schema": "rdl_integration_tests",
89
"stage_table": "stage_source_data",
910
"load_table": "load_source_data",
1011

1112
"batch": {
12-
"size": 100000,
13-
"source_unique_column": "id"
13+
"size": 100000
1414
},
1515
"columns": [
1616
{

full_refresh_from_csv.cmd

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
py relational_data_loader_project\__main__.py csv://./test_data/incremental_refresh postgresql+psycopg2://postgres:xxxx@localhost/dest_dw c:\_dev\relational-data-loader\configuration\ --log-level DEBUG
2+
3+
4+
5+

relational_data_loader_project/BatchDataLoader.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ def load_batch(self, previous_batch_key):
3535
self.write_data_frame_to_table(data_frame)
3636
batch_tracker.load_completed_successfully()
3737

38-
last_key_returned = data_frame.iloc[-1][self.batch_configuration['source_unique_column']]
38+
last_key_returned = data_frame.iloc[-1][self.source_table_configuration['primary_key']]
3939

4040
self.logger.info("Batch key {0} Completed. {1}".format(last_key_returned, batch_tracker.get_statistics()))
4141
return last_key_returned

relational_data_loader_project/DataLoadManager.py

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -12,30 +12,38 @@ def __init__(self, configuration_path, data_source, logger=None):
1212
self.configuration_path = configuration_path
1313
self.data_source = data_source
1414

15-
def start_imports(self, target_engine, full_load):
15+
def start_imports(self, target_engine, full_refresh):
1616
for file in os.listdir(self.configuration_path):
17-
self.start_single_import(target_engine, file, full_load)
17+
self.start_single_import(target_engine, file, full_refresh)
1818

19-
def start_single_import(self, target_engine, configuration_name, full_load):
19+
def start_single_import(self, target_engine, configuration_name, requested_full_refresh):
2020

2121
with open("{0}{1}".format(self.configuration_path, configuration_name)) as json_data:
2222
pipeline_configuration = json.load(json_data)
2323

24-
data_load_tracker = DataLoadTracker(configuration_name, json_data, full_load)
25-
26-
self.logger.debug("Execute Starting")
24+
self.logger.info("Execute Starting for: {0} requested_full_refresh: {1}".format(configuration_name, requested_full_refresh))
2725

2826
destination_table_manager = DestinationTableManager(target_engine)
2927

28+
full_refresh = requested_full_refresh
29+
if not requested_full_refresh and not destination_table_manager.table_exists(pipeline_configuration['target_schema'],
30+
pipeline_configuration['stage_table']):
31+
self.logger.warning("The load table {0}.{1} does not exist. Swapping to full-refresh mode".format(pipeline_configuration['target_schema'],
32+
pipeline_configuration['stage_table']))
33+
full_refresh = True
34+
35+
data_load_tracker = DataLoadTracker(configuration_name, json_data, full_refresh)
36+
3037
columns = self.data_source.get_valid_columns(pipeline_configuration['source_table'],
3138
pipeline_configuration['columns'])
3239

3340
destination_table_manager.create_schema(pipeline_configuration['target_schema'])
34-
if full_load:
35-
self.logger.info("Full-load is set. Recreating the staging table.")
36-
destination_table_manager.create_table(pipeline_configuration['target_schema'],
37-
pipeline_configuration['stage_table'],
38-
columns, drop_first=True)
41+
42+
self.logger.info("Recreating the staging table {0}.{1}".format(pipeline_configuration['target_schema'], pipeline_configuration['stage_table']))
43+
destination_table_manager.create_table(pipeline_configuration['target_schema'],
44+
pipeline_configuration['stage_table'],
45+
columns, drop_first=True)
46+
3947

4048
# Import the data.
4149
batch_data_loader = BatchDataLoader(self.data_source,
@@ -53,15 +61,18 @@ def start_single_import(self, target_engine, configuration_name, full_load):
5361

5462
self.logger.info("ImportBatch Completed")
5563

56-
if full_load:
57-
#return
64+
if full_refresh:
5865
# Rename the stage table to the load table.
5966
self.logger.info("Full-load is set. Renaming the stage table to the load table.")
6067
destination_table_manager.rename_table(pipeline_configuration['target_schema'],
6168
pipeline_configuration['stage_table'],
6269
pipeline_configuration['load_table'])
63-
#else:
64-
# upsert_data_from_stage_to_load_tables(pipeline_configuration['stage_source_data'], pipeline_configuration['load_source_data'])
70+
else:
71+
self.logger.info("Incremental-load is set. Upserting from the stage table to the load table.")
72+
destination_table_manager.upsert_table(pipeline_configuration['target_schema'],
73+
pipeline_configuration['stage_table'],
74+
pipeline_configuration['load_table'],
75+
pipeline_configuration['columns'])
6576

6677
data_load_tracker.completed_successfully()
6778
self.logger.info(data_load_tracker.get_statistics())

relational_data_loader_project/DestinationTableManager.py

Lines changed: 42 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,21 @@
44
import importlib
55
from sqlalchemy.sql import func
66
import io
7+
import os
8+
79

810
class DestinationTableManager(object):
11+
TIMESTAMP_COLUMN_NAME = "data_pipeline_timestamp"
12+
913
def __init__(self, target_engine, logger=None):
1014
self.logger = logger or logging.getLogger(__name__)
1115
self.target_engine = target_engine
1216

1317
def create_schema(self, schema_name):
14-
1518
self.target_engine.execute("CREATE SCHEMA IF NOT EXISTS {0}".format(schema_name))
16-
#self.target_engine.execute(CreateSchema(schema_name))
19+
20+
def table_exists(self, schema_name, table_name):
21+
return self.target_engine.dialect.has_table(self.target_engine, "{0}.{1}".format(schema_name, table_name))
1722

1823
def create_table(self, schema_name, table_name, columns_configuration, drop_first):
1924
metadata = MetaData()
@@ -24,7 +29,7 @@ def create_table(self, schema_name, table_name, columns_configuration, drop_firs
2429
table.append_column(self.create_column(column_configuration['destination']))
2530

2631
table.append_column(
27-
Column("data_pipeline_timestamp", DateTime(timezone=True), server_default=func.now()))
32+
Column(self.TIMESTAMP_COLUMN_NAME, DateTime(timezone=True), server_default=func.now()))
2833

2934
if drop_first:
3035
self.logger.info(
@@ -72,10 +77,12 @@ def rename_table(self, schema_name, source_table_name, target_table_name):
7277
sql_builder.write("BEGIN TRANSACTION; ")
7378

7479
# Step 3
75-
sql_builder.write("ALTER TABLE {0}.{1} RENAME TO {2}; ".format(schema_name, target_table_name, old_load_table_name))
80+
sql_builder.write(
81+
"ALTER TABLE IF EXISTS {0}.{1} RENAME TO {2}; ".format(schema_name, target_table_name, old_load_table_name))
7682

7783
# Step 4
78-
sql_builder.write("ALTER TABLE {0}.{1} RENAME TO {2}; ".format(schema_name, source_table_name, target_table_name))
84+
sql_builder.write(
85+
"ALTER TABLE {0}.{1} RENAME TO {2}; ".format(schema_name, source_table_name, target_table_name))
7986

8087
sql_builder.write("COMMIT TRANSACTION; ")
8188
self.logger.debug("Table Rename, executing {0}".format(sql_builder.getvalue()))
@@ -85,7 +92,34 @@ def rename_table(self, schema_name, source_table_name, target_table_name):
8592

8693
sql = "DROP TABLE IF EXISTS {0}.{1} ".format(schema_name, old_load_table_name)
8794
self.logger.debug("Table Rename, executing {0}".format(sql))
95+
self.target_engine.execute(sql)
96+
97+
def upsert_table(self, schema_name, source_table_name, target_table_name, columns_configuration):
98+
column_array = list(map(lambda column: column['destination']['name'], columns_configuration))
99+
column_list = ','.join(map(str, column_array))
100+
column_list = column_list + ",{0}".format(self.TIMESTAMP_COLUMN_NAME)
101+
102+
primary_key_column_array = [column_configuration['destination']['name'] for column_configuration in columns_configuration if 'primary_key' in column_configuration['destination'] and column_configuration['destination']['primary_key']]
103+
104+
primary_key_column_list = ','.join(map(str, primary_key_column_array))
105+
106+
sql_builder = io.StringIO()
107+
sql_builder.write("INSERT INTO {0}.{1} ({2})".format(schema_name, target_table_name, column_list))
108+
sql_builder.write(os.linesep)
109+
sql_builder.write(" SELECT {0} FROM {1}.{2}".format(column_list, schema_name, source_table_name))
110+
sql_builder.write(os.linesep)
111+
sql_builder.write(" ON CONFLICT({0}) DO UPDATE SET ".format(primary_key_column_list))
112+
113+
for column_configuratiomn in columns_configuration:
114+
sql_builder.write("{0} = EXCLUDED.{0},".format(column_configuratiomn['destination']['name']))
115+
sql_builder.write(os.linesep)
116+
117+
sql_builder.write("{0} = EXCLUDED.{0}".format(self.TIMESTAMP_COLUMN_NAME))
118+
119+
self.logger.debug("Upsert executing {0}".format(sql_builder.getvalue()))
120+
self.target_engine.execute(sql_builder.getvalue())
121+
122+
sql_builder.close()
88123

89-
def upsert_data_from_stage_to_load_tables(self, source_table_configuration, target_table_configuration):
90-
print('TODO - create a method to upsert the data;')
91-
return;
124+
def bob(self, x):
125+
print(x)

relational_data_loader_project/RelationalDataLoader.py

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ def main(self):
2222
destination_engine = create_engine(args['destination-engine'])
2323

2424
data_load_manager = DataLoadManager(args['configuration-folder'], data_source)
25-
data_load_manager.start_imports(destination_engine, True)
25+
data_load_manager.start_imports(destination_engine, args['full_refresh'])
2626

2727
def configure_logging(self, log_level):
2828
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
@@ -55,18 +55,33 @@ def get_arguments(self):
5555

5656
parser.add_argument('source-connection-string', metavar='source-connection-string',
5757
type=self.raw_connection_string_to_valid_source_connection_string,
58-
help='The source connections string. Eg: mssql+pyodbc://dwsource or csv://c://some//Path//To//Csv//Files//')
58+
help='The source connections string. Eg: mssql+pyodbc://dwsource or '
59+
'csv://c://some//Path//To//Csv//Files//')
5960

6061
parser.add_argument('destination-engine', metavar='destination-engine',
6162
help='The destination engine. Eg: postgresql+psycopg2://postgres:xxxx@localhost/dest_dw')
6263

6364
parser.add_argument('configuration-folder', metavar='configuration-folder',
64-
help='The configuration folder. Eg C:\\_dev\\oscars-misc\\el-pipeline-spike\\configuraton\\')
65+
help='The configuration folder. Eg C:\\_dev\\oscars-misc\\el-pipeline-spike\\configuration\\')
6566

6667
parser.add_argument('--log-level',
6768
default='INFO',
6869
type=self.log_level_string_to_int,
6970
nargs='?',
7071
help='Set the logging output level. {0}'.format(_LOG_LEVEL_STRINGS))
7172

73+
parser.add_argument("--full-refresh", type=self.str2bool, nargs='?',
74+
const=True, default=False,
75+
help='If true, a full refresh of the destination will be performed. This drops/re-creates '
76+
'the destination table(s).')
77+
78+
7279
return vars(parser.parse_args())
80+
81+
def str2bool(v):
82+
if v.lower() in ('yes', 'true', 't', 'y', '1'):
83+
return True
84+
elif v.lower() in ('no', 'false', 'f', 'n', '0'):
85+
return False
86+
else:
87+
raise argparse.ArgumentTypeError('Boolean value expected.')
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
id,StringColumn1,IntColumn1,DecimalColumn1,DateColumn1,DateTimeColumn1
2+
5,"This row WAS updated in the incremental review test",,,,
3+
6,"A Basic String",111,12.1212,01-Dec-1976,01-dec-1976 1:00 am
4+
7,"Another Basic String",111,12.1212,01-Dec-1976,01-dec-1976 1:00 am

0 commit comments

Comments
 (0)