Skip to content
This repository was archived by the owner on Mar 13, 2020. It is now read-only.

Commit 3a1ac54

Browse files
committed
basic incremental loads now working
1 parent 7b5c9b5 commit 3a1ac54

File tree

7 files changed

+165
-38
lines changed

7 files changed

+165
-38
lines changed

modules/BatchDataLoader.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
class BatchDataLoader(object):
88
def __init__(self, data_source, source_table_configuration, target_schema, target_table, columns, data_load_tracker,
9-
batch_configuration, target_engine, logger=None):
9+
batch_configuration, target_engine, full_refresh, change_tracking_info, logger=None):
1010
self.logger = logger or logging.getLogger(__name__)
1111
self.source_table_configuration = source_table_configuration
1212
self.columns = columns
@@ -16,15 +16,18 @@ def __init__(self, data_source, source_table_configuration, target_schema, targe
1616
self.data_load_tracker = data_load_tracker
1717
self.batch_configuration = batch_configuration
1818
self.target_engine = target_engine
19+
self.full_refresh = full_refresh
20+
self.change_tracking_info = change_tracking_info
1921

2022
# Imports rows, returns True if >0 rows were found
2123
def load_batch(self, previous_batch_key):
2224
batch_tracker = self.data_load_tracker.start_batch()
2325

24-
self.logger.debug("ImportBatch Starting from previous_batch_key: {0}".format(previous_batch_key))
26+
self.logger.debug("ImportBatch Starting from previous_batch_key: {0}. Full Refresh: {1} this_sync_version: {2}".format(previous_batch_key, self.full_refresh, self.change_tracking_info.this_sync_version))
2527

2628
data_frame = self.data_source.get_next_data_frame(self.source_table_configuration, self.columns,
27-
self.batch_configuration, batch_tracker, previous_batch_key)
29+
self.batch_configuration, batch_tracker, previous_batch_key,
30+
self.full_refresh, self.change_tracking_info)
2831

2932
if data_frame is None or len(data_frame) == 0:
3033
self.logger.debug("There are no rows to import, returning -1")
@@ -44,7 +47,7 @@ def write_data_frame_to_table(self, data_frame):
4447
qualified_target_table = "{0}.{1}".format(self.target_schema, self.target_table)
4548
self.logger.debug("Starting write to table {0}".format(qualified_target_table))
4649
data = StringIO()
47-
50+
print(data_frame)
4851
data_frame.to_csv(data, header=False, index=False, na_rep='', float_format='%.16g')
4952
# Float_format is used to truncate any insignificant digits. Unfortunately it gives us an artificial limitation
5053

@@ -58,6 +61,7 @@ def write_data_frame_to_table(self, data_frame):
5861

5962
sql = "COPY {0}({1}) FROM STDIN with csv".format(qualified_target_table, column_list)
6063
self.logger.debug("Writing to table using command {0}".format(sql))
64+
6165
curs.copy_expert(sql=sql, file=data)
6266

6367
self.logger.debug("Completed write to table {0}".format(qualified_target_table))
@@ -70,6 +74,10 @@ def get_destination_column_name(self, source_column_name):
7074
if column['source_name'] == source_column_name:
7175
return column['destination']['name']
7276

77+
# Internal columns - map them straight through
78+
if source_column_name.startswith("data_pipeline_"):
79+
return source_column_name;
80+
7381
message = 'A source column with name {0} was not found in the column configuration'.format(source_column_name)
7482
raise ValueError(message)
7583

modules/DataLoadManager.py

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,14 +38,23 @@ def start_single_import(self, target_engine, configuration_name, requested_full_
3838
pipeline_configuration['load_table']))
3939
full_refresh = True
4040

41-
#Ask the source to check that ChangeTrackingIsEnabled and valid
42-
#Maybe this returns a flag?
41+
self.data_source.assert_data_source_is_valid(pipeline_configuration['source_table'],
42+
pipeline_configuration['columns'])
4343

44+
last_sync_version = destination_table_manager.get_last_sync_version(pipeline_configuration['target_schema'],
45+
pipeline_configuration['load_table'])
4446

45-
data_load_tracker = DataLoadTracker(configuration_name, json_data, full_refresh)
47+
change_tracking_info = self.data_source.init_change_tracking(pipeline_configuration['source_table'],
48+
last_sync_version)
4649

47-
self.data_source.assert_data_source_is_valid(pipeline_configuration['source_table'],
48-
pipeline_configuration['columns'])
50+
51+
data_load_tracker = DataLoadTracker(configuration_name, json_data, full_refresh, change_tracking_info)
52+
53+
54+
self.logger.debug(" Change Tracking: this_sync_version: {0} next_sync_version: {1} force_full_load:{2} : ".format(change_tracking_info.this_sync_version, change_tracking_info.next_sync_version, change_tracking_info.force_full_load()))
55+
if not full_refresh and change_tracking_info.force_full_load():
56+
self.logger.info("Change tracking has forced this to be a full load")
57+
full_refresh = True
4958

5059
columns = pipeline_configuration['columns']
5160
destination_table_manager.create_schema(pipeline_configuration['target_schema'])
@@ -64,7 +73,9 @@ def start_single_import(self, target_engine, configuration_name, requested_full_
6473
columns,
6574
data_load_tracker,
6675
pipeline_configuration['batch'],
67-
target_engine)
76+
target_engine,
77+
full_refresh,
78+
change_tracking_info)
6879

6980
previous_unique_column_value = 0
7081
while previous_unique_column_value > -1:
@@ -86,5 +97,5 @@ def start_single_import(self, target_engine, configuration_name, requested_full_
8697
destination_table_manager.drop_table(pipeline_configuration['target_schema'],
8798
pipeline_configuration['stage_table'])
8899
data_load_tracker.completed_successfully()
89-
self.logger.info("Import for configuration: {0} Complete. {1}".format(configuration_name, data_load_tracker.get_statistics()))
100+
self.logger.info("Import Complete for: {0}. {1}".format(configuration_name, data_load_tracker.get_statistics()))
90101

modules/DataLoadTracker.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,13 @@ class DataLoadTracker:
1414
total_row_count = 0
1515
rows_per_second = 0
1616

17-
def __init__(self, configuration_name, configuration, is_full_load):
17+
def __init__(self, configuration_name, configuration, is_full_load, change_tracking_info):
1818
self.configuration_name = configuration_name
1919
self.configuration = configuration
2020
self.is_full_load = is_full_load
2121
self.started = datetime.now()
2222
self.status = "Not Started"
23+
self.change_tracking_info = change_tracking_info
2324

2425
def start_batch(self):
2526
batch = self.Batch()
@@ -36,9 +37,11 @@ def completed_successfully(self):
3637
self.rows_per_second = self.total_row_count / self.total_execution_time.total_seconds()
3738

3839
def get_statistics(self):
39-
return "Rows: {0}, Total Execution Time: {1}. ({2:.2f} rows per second)".format(self.total_row_count,
40-
self.total_execution_time,
41-
self.rows_per_second)
40+
load_type = 'full' if self.is_full_load else "incremental from version {0} ".format(self.change_tracking_info.this_sync_version)
41+
return "Rows: {0} ({1}), Total Execution Time: {2}. ({3:.2f} rows per second) ".format(self.total_row_count,
42+
load_type,
43+
self.total_execution_time,
44+
self.rows_per_second)
4245

4346
class Batch:
4447
row_count = 0

modules/DestinationTableManager.py

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,16 @@
33
import logging
44
from modules.ColumnTypeResolver import ColumnTypeResolver
55

6-
from sqlalchemy import MetaData, DateTime, Boolean
6+
from sqlalchemy import MetaData, DateTime, Boolean, BigInteger
77
from sqlalchemy.schema import Column, Table
88
from sqlalchemy.sql import func
99

1010

1111
class DestinationTableManager(object):
1212
TIMESTAMP_COLUMN_NAME = "data_pipeline_timestamp"
1313
IS_DELETED_COLUMN_NAME = "data_pipeline_is_deleted"
14+
CHANGE_VERSION_COLUMN_NAME = "data_pipeline_change_version"
15+
NEXT_CHANGE_MINIMUM_VERSION_COLUMN_NAME = "data_pipeline_next_change_minimum_version"
1416

1517
def __init__(self, target_engine, logger=None):
1618
self.logger = logger or logging.getLogger(__name__)
@@ -52,9 +54,11 @@ def create_table(self, schema_name, table_name, columns_configuration, drop_firs
5254
table.append_column(
5355
Column(self.IS_DELETED_COLUMN_NAME, Boolean, server_default='f', default=False))
5456

57+
table.append_column(
58+
Column(self.CHANGE_VERSION_COLUMN_NAME, BigInteger))
5559

56-
57-
60+
table.append_column(
61+
Column(self.NEXT_CHANGE_MINIMUM_VERSION_COLUMN_NAME, BigInteger))
5862

5963
if drop_first:
6064
self.logger.debug(
@@ -63,16 +67,29 @@ def create_table(self, schema_name, table_name, columns_configuration, drop_firs
6367
self.logger.debug(
6468
"Dropped table {0}.{1}".format(schema_name, table_name))
6569

70+
6671
self.logger.debug("Creating table {0}.{1}".format(schema_name, table_name))
6772
table.create(self.target_engine, checkfirst=False)
6873
self.logger.debug("Created table {0}.{1}".format(schema_name, table_name))
74+
6975
return
7076

7177
def create_column(self, configuration):
7278
return Column(configuration['name'], self.column_type_resolver.resolve_postgres_type(configuration),
7379
primary_key=configuration.get("primary_key", False),
7480
nullable=configuration['nullable'])
7581

82+
# TODO: this should come from a different log table which is only written to at the end of a successful load load.
83+
def get_last_sync_version(self, schema_name, table_name):
84+
sql = "SELECT max({0}) as version FROM {1}.{2}".format(self.NEXT_CHANGE_MINIMUM_VERSION_COLUMN_NAME, schema_name, table_name)
85+
86+
result = self.target_engine.execute(sql)
87+
row = result.fetchone()
88+
if row["version"] is None:
89+
return 0
90+
return row["version"]
91+
92+
7693
def rename_table(self, schema_name, source_table_name, target_table_name):
7794

7895
# Steps to efficiently rename a table.
@@ -116,6 +133,10 @@ def upsert_table(self, schema_name, source_table_name, target_table_name, column
116133
column_array = list(map(lambda column: column['destination']['name'], columns_configuration))
117134
column_list = ','.join(map(str, column_array))
118135
column_list = column_list + ",{0}".format(self.TIMESTAMP_COLUMN_NAME)
136+
column_list = column_list + ",{0}".format(self.IS_DELETED_COLUMN_NAME)
137+
column_list = column_list + ",{0}".format(self.CHANGE_VERSION_COLUMN_NAME)
138+
column_list = column_list + ",{0}".format(self.NEXT_CHANGE_MINIMUM_VERSION_COLUMN_NAME)
139+
119140

120141
primary_key_column_array = [column_configuration['destination']['name'] for column_configuration in columns_configuration if 'primary_key' in column_configuration['destination'] and column_configuration['destination']['primary_key']]
121142

@@ -132,7 +153,14 @@ def upsert_table(self, schema_name, source_table_name, target_table_name, column
132153
sql_builder.write("{0} = EXCLUDED.{0},".format(column_configuratiomn['destination']['name']))
133154
sql_builder.write(os.linesep)
134155

135-
sql_builder.write("{0} = EXCLUDED.{0}".format(self.TIMESTAMP_COLUMN_NAME))
156+
sql_builder.write("{0} = EXCLUDED.{0},".format(self.TIMESTAMP_COLUMN_NAME))
157+
sql_builder.write(os.linesep)
158+
sql_builder.write("{0} = EXCLUDED.{0},".format(self.IS_DELETED_COLUMN_NAME))
159+
sql_builder.write(os.linesep)
160+
sql_builder.write("{0} = EXCLUDED.{0},".format(self.CHANGE_VERSION_COLUMN_NAME))
161+
sql_builder.write(os.linesep)
162+
sql_builder.write("{0} = EXCLUDED.{0}".format(self.NEXT_CHANGE_MINIMUM_VERSION_COLUMN_NAME))
163+
sql_builder.write(os.linesep)
136164

137165
self.logger.debug("Upsert executing {0}".format(sql_builder.getvalue()))
138166
self.target_engine.execute(sql_builder.getvalue())
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
2+
class ChangeTrackingInfo:
3+
this_sync_version = 0
4+
next_sync_version = 0
5+
6+
def __init__(self, this_sync_version, next_sync_version):
7+
self.this_sync_version = this_sync_version
8+
self.next_sync_version = next_sync_version
9+
10+
def force_full_load(self):
11+
return bool(self.next_sync_version == 0)
12+
13+

0 commit comments

Comments
 (0)