Skip to content
This repository was archived by the owner on Mar 13, 2020. It is now read-only.

Commit 9b8b78b

Browse files
authored
Merge pull request #19 from PageUpPeopleOrg/task/refactor_state_schema
Rename sync versions with correct context
2 parents ad401b4 + 42d4df6 commit 9b8b78b

14 files changed

+145
-148
lines changed

modules/BatchDataLoader.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import logging
22
from io import StringIO
33
from modules.column_transformers.StringTransformers import ToUpper
4+
from modules.shared import Constants
45

56

67
class BatchDataLoader(object):
@@ -23,9 +24,9 @@ def load_batch(self, batch_key_tracker):
2324
batch_tracker = self.data_load_tracker.start_batch()
2425

2526
self.logger.debug(f"ImportBatch Starting from previous_batch_key: '{batch_key_tracker.bookmarks}'. "
26-
f"Full Refresh: '{self.full_refresh}' "
27-
f"this_sync_version: '{self.change_tracking_info.this_sync_version}'")
28-
# TODO ^ this is actually the last_sync_version, log appropriately?
27+
f"Full Refresh: '{self.full_refresh}', "
28+
f"sync_version: '{self.change_tracking_info.sync_version}', "
29+
f"last_sync_version: '{self.change_tracking_info.last_sync_version}'.")
2930

3031
data_frame = self.source_db.get_next_data_frame(self.source_table_config, self.columns,
3132
self.batch_config, batch_tracker, batch_key_tracker,
@@ -76,8 +77,8 @@ def get_destination_column_name(self, source_column_name):
7677
if column['source_name'] == source_column_name:
7778
return column['destination']['name']
7879

79-
# Internal columns - map them straight through
80-
if source_column_name.startswith("data_pipeline_"):
80+
# Audit columns - map them straight through
81+
if source_column_name.startswith(Constants.AUDIT_COLUMN_PREFIX):
8182
return source_column_name
8283

8384
message = f"A source column with name '{source_column_name}' was not found in the column configuration"

modules/DataLoadManager.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from modules.DestinationTableManager import DestinationTableManager
99
from modules.data_load_tracking.DataLoadTracker import DataLoadTracker
1010
from modules.BatchKeyTracker import BatchKeyTracker
11-
from modules.Shared import Constants
11+
from modules.shared import Constants
1212

1313

1414
class DataLoadManager(object):
@@ -79,7 +79,7 @@ def start_single_import(self, model_file, requested_full_refresh):
7979
self.data_load_tracker_repository.get_last_successful_data_load_execution(model_name)
8080

8181
if last_successful_data_load_execution is not None:
82-
last_sync_version = last_successful_data_load_execution.next_sync_version
82+
last_sync_version = last_successful_data_load_execution.sync_version
8383

8484
destination_table_manager = DestinationTableManager(self.target_db)
8585
change_tracking_info = self.source_db.init_change_tracking(model_config['source_table'], last_sync_version)

modules/DestinationTableManager.py

Lines changed: 17 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from sqlalchemy import MetaData, DateTime, Boolean, BigInteger
77
from sqlalchemy.schema import Column, Table
88
from sqlalchemy.sql import func
9-
from modules.Shared import Constants
9+
from modules.shared import Constants
1010

1111

1212
class DestinationTableManager(object):
@@ -100,39 +100,35 @@ def rename_table(self, schema_name, source_table_name, target_table_name):
100100
self.logger.debug(f"Table Rename, executing '{sql}'")
101101
self.target_db.execute(sql)
102102

103-
def upsert_table(self, schema_name, source_table_name, target_table_name, columns_configuration):
104-
column_array = list(map(lambda column: column['destination']['name'], columns_configuration))
103+
def upsert_table(self, schema_name, source_table_name, target_table_name, columns_config):
104+
column_array = list(map(lambda column: column['destination']['name'], columns_config))
105105
column_list = ','.join(map(str, column_array))
106106
column_list = column_list + f",{Constants.AuditColumnNames.TIMESTAMP}"
107107
column_list = column_list + f",{Constants.AuditColumnNames.IS_DELETED}"
108108
column_list = column_list + f",{Constants.AuditColumnNames.CHANGE_VERSION}"
109109

110-
primary_key_column_array = [column_configuration['destination']['name'] for column_configuration in
111-
columns_configuration if 'primary_key' in column_configuration['destination'] and
112-
column_configuration['destination']['primary_key']]
110+
primary_key_column_array = [column_config['destination']['name'] for column_config in
111+
columns_config if 'primary_key' in column_config['destination'] and
112+
column_config['destination']['primary_key']]
113113

114114
primary_key_column_list = ','.join(map(str, primary_key_column_array))
115115

116116
sql_builder = io.StringIO()
117-
sql_builder.write(f"INSERT INTO {schema_name}.{target_table_name} ({column_list})")
118-
sql_builder.write(os.linesep)
119-
sql_builder.write(f" SELECT {column_list} FROM {schema_name}.{source_table_name}")
120-
sql_builder.write(os.linesep)
117+
sql_builder.write(f"INSERT INTO {schema_name}.{target_table_name} ({column_list}) \n")
118+
sql_builder.write(f" SELECT {column_list} FROM {schema_name}.{source_table_name} \n")
121119
sql_builder.write(f" ON CONFLICT({primary_key_column_list}) DO UPDATE SET ")
122120

123-
for column_configuration in columns_configuration:
124-
sql_builder.write("{0} = EXCLUDED.{0},".format(column_configuration['destination']['name']))
125-
sql_builder.write(os.linesep)
121+
for column_config in columns_config:
122+
sql_builder.write("{0} = EXCLUDED.{0},\n".format(column_config['destination']['name']))
126123

127-
sql_builder.write("{0} = EXCLUDED.{0},".format(Constants.AuditColumnNames.TIMESTAMP))
128-
sql_builder.write(os.linesep)
129-
sql_builder.write("{0} = EXCLUDED.{0},".format(Constants.AuditColumnNames.IS_DELETED))
130-
sql_builder.write(os.linesep)
131-
sql_builder.write("{0} = EXCLUDED.{0}".format(Constants.AuditColumnNames.CHANGE_VERSION))
132-
sql_builder.write(os.linesep)
124+
sql_builder.write("{0} = EXCLUDED.{0},\n".format(Constants.AuditColumnNames.TIMESTAMP))
125+
sql_builder.write("{0} = EXCLUDED.{0},\n".format(Constants.AuditColumnNames.IS_DELETED))
126+
sql_builder.write("{0} = EXCLUDED.{0};\n".format(Constants.AuditColumnNames.CHANGE_VERSION))
133127

134-
self.logger.debug(f"UPSERT executing '{sql_builder.getvalue()}'")
135-
self.target_db.execute(sql_builder.getvalue())
128+
upsert_sql = sql_builder.getvalue()
129+
130+
self.logger.debug(f"UPSERT executing '{upsert_sql}'")
131+
self.target_db.execute(upsert_sql)
136132
self.logger.debug("UPSERT completed")
137133

138134
sql_builder.close()

modules/RelationalDataLoader.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import argparse
33
from sqlalchemy import create_engine
44
from modules.DataLoadManager import DataLoadManager
5-
from modules.Shared import Constants
5+
from modules.shared import Constants
66
from modules.data_load_tracking.DataLoadTrackerRepository import DataLoadTrackerRepository
77
from modules.data_sources.DataSourceFactory import DataSourceFactory
88
from sqlalchemy.orm import sessionmaker

modules/Shared.py

Lines changed: 0 additions & 35 deletions
This file was deleted.

modules/data_load_tracking/DataLoadExecution.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,24 @@
11
from sqlalchemy import Column, DateTime, Integer, String, Boolean, BigInteger
22
from sqlalchemy.sql import func
33
from sqlalchemy.ext.declarative import declarative_base
4+
from sqlalchemy.dialects.postgresql import UUID
5+
from modules.shared import Constants
46

57
Base = declarative_base()
68

79

810
class DataLoadExecution(Base):
911
__tablename__ = 'data_load_execution'
10-
__table_args__ = {'schema': 'data_pipeline'}
12+
__table_args__ = {'schema': f'{Constants.DATA_PIPELINE_EXECUTION_SCHEMA_NAME}'}
1113
id = Column(Integer, primary_key=True)
14+
correlation_id = Column(UUID(as_uuid=True), nullable=True)
1215
model_name = Column(String(250), nullable=False)
16+
status = Column(String(25), nullable=False)
17+
last_sync_version = Column(BigInteger, nullable=False)
18+
sync_version = Column(BigInteger, nullable=False)
1319
is_full_refresh = Column(Boolean, nullable=False)
14-
this_sync_version = Column(BigInteger, nullable=False)
15-
next_sync_version = Column(BigInteger, nullable=False)
20+
full_refresh_reason = Column(String(100), nullable=False)
1621
completed_on = Column(DateTime(timezone=True), server_default=func.now())
1722
execution_time_ms = Column(Integer, nullable=False)
1823
rows_processed = Column(Integer, nullable=False)
19-
correlation_id = Column(String(250), nullable=True)
20-
status = Column(String(25), nullable=False)
2124
model_checksum = Column(String(100), nullable=False)
22-
full_refresh_reason = Column(String(100), nullable=False)

modules/data_load_tracking/DataLoadTracker.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from datetime import datetime
2-
from modules.Shared import Constants
2+
from modules.shared import Constants
33

44

55
class DataLoadTracker:
@@ -51,8 +51,8 @@ def completed_successfully(self):
5151

5252
def get_statistics(self):
5353
load_type = 'FULL' if self.is_full_refresh else f"INCREMENTAL from " \
54-
f"version '{self.change_tracking_info.this_sync_version}' " \
55-
f"to '{self.change_tracking_info.next_sync_version}'"
54+
f"version '{self.change_tracking_info.last_sync_version}' " \
55+
f"to '{self.change_tracking_info.sync_version}'"
5656
return f"Rows: {self.total_row_count}," \
5757
f"Load type: {load_type}, " \
5858
f"Total Execution Time: {self.total_execution_time} @ {self.rows_per_second:.2f} rows per second "

modules/data_load_tracking/DataLoadTrackerRepository.py

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import logging
22
from sqlalchemy import desc
33
from modules.data_load_tracking.DataLoadExecution import DataLoadExecution, Base
4-
from modules.Shared import Constants
4+
from modules.shared import Constants
55

66

77
class DataLoadTrackerRepository(object):
@@ -15,26 +15,23 @@ def ensure_schema_exists(self, engine):
1515

1616
def get_last_successful_data_load_execution(self, model_name):
1717
session = self.session_maker()
18-
return session.query(DataLoadExecution).filter_by(
19-
model_name=model_name,
20-
status=Constants.ExecutionStatus.COMPLETED_SUCCESSFULLY).order_by(
21-
desc(
22-
DataLoadExecution.completed_on)).first()
18+
return session.query(DataLoadExecution)\
19+
.filter_by(model_name=model_name, status=Constants.ExecutionStatus.COMPLETED_SUCCESSFULLY)\
20+
.order_by(desc(DataLoadExecution.completed_on))\
21+
.first()
2322

2423
def save(self, data_load_tracker):
25-
2624
data_load_execution = DataLoadExecution(
27-
model_name=data_load_tracker.model_name,
2825
correlation_id=data_load_tracker.correlation_id,
26+
model_name=data_load_tracker.model_name,
27+
status=data_load_tracker.status,
28+
last_sync_version=data_load_tracker.change_tracking_info.last_sync_version,
29+
sync_version=data_load_tracker.change_tracking_info.sync_version,
2930
is_full_refresh=data_load_tracker.is_full_refresh,
30-
this_sync_version=data_load_tracker.change_tracking_info.this_sync_version,
31-
next_sync_version=data_load_tracker.change_tracking_info.next_sync_version,
32-
execution_time_ms=int(
33-
data_load_tracker.total_execution_time.total_seconds() * 1000),
31+
full_refresh_reason=data_load_tracker.full_refresh_reason,
32+
execution_time_ms=int(data_load_tracker.total_execution_time.total_seconds() * 1000),
3433
rows_processed=data_load_tracker.total_row_count,
35-
status=data_load_tracker.status,
36-
model_checksum=data_load_tracker.model_checksum,
37-
full_refresh_reason=data_load_tracker.full_refresh_reason)
34+
model_checksum=data_load_tracker.model_checksum)
3835

3936
session = self.session_maker()
4037
session.add(data_load_execution)
Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,5 @@
1-
21
class ChangeTrackingInfo:
3-
this_sync_version = 0 # TODO rename `this_sync_version` to `last_sync_version`
4-
next_sync_version = 0 # TODO rename `next_sync_version` to `this?current?new?_sync_version`
5-
force_full_load = 0
6-
7-
def __init__(self, this_sync_version, next_sync_version, force_full_load):
8-
self.this_sync_version = this_sync_version
9-
self.next_sync_version = next_sync_version
2+
def __init__(self, last_sync_version, sync_version, force_full_load):
3+
self.last_sync_version = last_sync_version
4+
self.sync_version = sync_version
105
self.force_full_load = force_full_load

0 commit comments

Comments
 (0)