Skip to content
This repository was archived by the owner on Mar 13, 2020. It is now read-only.

Commit 9e8e5e5

Browse files
committed
Implimented logging to DataLoadExecution table
1 parent 1c4554c commit 9e8e5e5

File tree

10 files changed

+89
-65
lines changed

10 files changed

+89
-65
lines changed
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
FROM microsoft/mssql-server-linux:2017-latest
2+
3+
ENV ACCEPT_EULA=Y

modules/DataLoadManager.py

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,34 @@
11
import os
22
import json
3+
import uuid
34
import logging
45
from modules.BatchDataLoader import BatchDataLoader
56
from modules.DestinationTableManager import DestinationTableManager
6-
from modules.DataLoadTracker import DataLoadTracker
7-
7+
from modules.data_load_tracking.DataLoadTracker import DataLoadTracker
88

99

1010
class DataLoadManager(object):
11-
def __init__(self, configuration_path, data_source, logger=None):
11+
def __init__(self, configuration_path, data_source, data_load_tracker_repository, logger=None):
1212
self.logger = logger or logging.getLogger(__name__)
1313
self.configuration_path = configuration_path
1414
self.data_source = data_source
15-
15+
self.data_load_tracker_repository = data_load_tracker_repository
16+
self.correlation_id = uuid.uuid4()
1617
def start_imports(self, target_engine, full_refresh):
1718
for file in os.listdir(self.configuration_path):
1819
self.start_single_import(target_engine, file, full_refresh)
1920

2021
self.logger.info("Execution completed.")
2122

22-
def start_single_import(self, target_engine, configuration_name, requested_full_refresh):
23-
self.logger.debug("Using configuration file : {0}".format(configuration_name))
23+
def start_single_import(self, target_engine, model_name, requested_full_refresh):
24+
self.logger.debug("Using configuration file : {0}".format(model_name))
2425

25-
config_file = os.path.abspath(self.configuration_path + configuration_name)
26+
config_file = os.path.abspath(self.configuration_path + model_name)
2627
self.logger.debug("Using configuration file : {0}".format(config_file))
2728
with open(config_file) as json_data:
2829
pipeline_configuration = json.load(json_data)
2930

30-
self.logger.info("Execute Starting for: {0} requested_full_refresh: {1}".format(configuration_name, requested_full_refresh))
31+
self.logger.info("Execute Starting for: {0} requested_full_refresh: {1}".format(model_name, requested_full_refresh))
3132

3233
destination_table_manager = DestinationTableManager(target_engine)
3334

@@ -41,14 +42,13 @@ def start_single_import(self, target_engine, configuration_name, requested_full_
4142
self.data_source.assert_data_source_is_valid(pipeline_configuration['source_table'],
4243
pipeline_configuration['columns'])
4344

44-
last_sync_version = destination_table_manager.get_last_sync_version(pipeline_configuration['target_schema'],
45-
pipeline_configuration['load_table'])
45+
last_sync_version = self.data_load_tracker_repository.get_last_sync_version(model_name)
4646

4747
change_tracking_info = self.data_source.init_change_tracking(pipeline_configuration['source_table'],
4848
last_sync_version)
4949

5050

51-
data_load_tracker = DataLoadTracker(configuration_name, json_data, full_refresh, change_tracking_info)
51+
data_load_tracker = DataLoadTracker(model_name, json_data, full_refresh, change_tracking_info, self.correlation_id)
5252

5353

5454
self.logger.debug(" Change Tracking: this_sync_version: {0} next_sync_version: {1} force_full_load:{2} : ".format(change_tracking_info.this_sync_version, change_tracking_info.next_sync_version, change_tracking_info.force_full_load()))
@@ -97,5 +97,6 @@ def start_single_import(self, target_engine, configuration_name, requested_full_
9797
destination_table_manager.drop_table(pipeline_configuration['target_schema'],
9898
pipeline_configuration['stage_table'])
9999
data_load_tracker.completed_successfully()
100-
self.logger.info("Import Complete for: {0}. {1}".format(configuration_name, data_load_tracker.get_statistics()))
100+
self.data_load_tracker_repository.save(data_load_tracker)
101+
self.logger.info("Import Complete for: {0}. {1}".format(model_name, data_load_tracker.get_statistics()))
101102

modules/DestinationTableManager.py

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ class DestinationTableManager(object):
1212
TIMESTAMP_COLUMN_NAME = "data_pipeline_timestamp"
1313
IS_DELETED_COLUMN_NAME = "data_pipeline_is_deleted"
1414
CHANGE_VERSION_COLUMN_NAME = "data_pipeline_change_version"
15-
NEXT_CHANGE_MINIMUM_VERSION_COLUMN_NAME = "data_pipeline_next_change_minimum_version"
1615

1716
def __init__(self, target_engine, logger=None):
1817
self.logger = logger or logging.getLogger(__name__)
@@ -57,9 +56,6 @@ def create_table(self, schema_name, table_name, columns_configuration, drop_firs
5756
table.append_column(
5857
Column(self.CHANGE_VERSION_COLUMN_NAME, BigInteger))
5958

60-
table.append_column(
61-
Column(self.NEXT_CHANGE_MINIMUM_VERSION_COLUMN_NAME, BigInteger))
62-
6359
if drop_first:
6460
self.logger.debug(
6561
"Dropping table {0}.{1}".format(schema_name, table_name))
@@ -79,16 +75,6 @@ def create_column(self, configuration):
7975
primary_key=configuration.get("primary_key", False),
8076
nullable=configuration['nullable'])
8177

82-
# TODO: this should come from a different log table which is only written to at the end of a successful load load.
83-
def get_last_sync_version(self, schema_name, table_name):
84-
sql = "SELECT max({0}) as version FROM {1}.{2}".format(self.NEXT_CHANGE_MINIMUM_VERSION_COLUMN_NAME, schema_name, table_name)
85-
86-
result = self.target_engine.execute(sql)
87-
row = result.fetchone()
88-
if row["version"] is None:
89-
return 0
90-
return row["version"]
91-
9278

9379
def rename_table(self, schema_name, source_table_name, target_table_name):
9480

@@ -135,7 +121,6 @@ def upsert_table(self, schema_name, source_table_name, target_table_name, column
135121
column_list = column_list + ",{0}".format(self.TIMESTAMP_COLUMN_NAME)
136122
column_list = column_list + ",{0}".format(self.IS_DELETED_COLUMN_NAME)
137123
column_list = column_list + ",{0}".format(self.CHANGE_VERSION_COLUMN_NAME)
138-
column_list = column_list + ",{0}".format(self.NEXT_CHANGE_MINIMUM_VERSION_COLUMN_NAME)
139124

140125

141126
primary_key_column_array = [column_configuration['destination']['name'] for column_configuration in columns_configuration if 'primary_key' in column_configuration['destination'] and column_configuration['destination']['primary_key']]
@@ -159,8 +144,6 @@ def upsert_table(self, schema_name, source_table_name, target_table_name, column
159144
sql_builder.write(os.linesep)
160145
sql_builder.write("{0} = EXCLUDED.{0},".format(self.CHANGE_VERSION_COLUMN_NAME))
161146
sql_builder.write(os.linesep)
162-
sql_builder.write("{0} = EXCLUDED.{0}".format(self.NEXT_CHANGE_MINIMUM_VERSION_COLUMN_NAME))
163-
sql_builder.write(os.linesep)
164147

165148
self.logger.debug("Upsert executing {0}".format(sql_builder.getvalue()))
166149
self.target_engine.execute(sql_builder.getvalue())

modules/RelationalDataLoader.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22
import argparse
33
from sqlalchemy import create_engine
44
from modules.DataLoadManager import DataLoadManager
5+
from modules.data_load_tracking.DataLoadTrackerRepository import DataLoadTrackerRepository
56
from modules.data_sources.DataSourceFactory import DataSourceFactory
7+
from sqlalchemy.orm import sessionmaker
68

79
_LOG_LEVEL_STRINGS = ['CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG']
810

@@ -20,7 +22,10 @@ def main(self):
2022

2123
destination_engine = create_engine(args['destination-engine'])
2224

23-
data_load_manager = DataLoadManager(args['configuration-folder'], data_source)
25+
session_maker = sessionmaker(bind=destination_engine)
26+
repository = DataLoadTrackerRepository(session_maker)
27+
repository.create_tables(destination_engine)
28+
data_load_manager = DataLoadManager(args['configuration-folder'], data_source, repository)
2429
data_load_manager.start_imports(destination_engine, args['full_refresh'])
2530

2631
def configure_logging(self, log_level):

modules/batch_executions/BatchExecution.py

Lines changed: 0 additions & 14 deletions
This file was deleted.

modules/batch_executions/BatchExecutionManager.py

Lines changed: 0 additions & 14 deletions
This file was deleted.
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
from sqlalchemy import Column, DateTime, Integer, String, Boolean, BigInteger
2+
from sqlalchemy.sql import func
3+
from sqlalchemy.ext.declarative import declarative_base
4+
5+
Base = declarative_base()
6+
7+
class DataLoadExecution(Base):
8+
__tablename__ = 'data_load_execution'
9+
__table_args__ = {'schema': 'data_pipeline'}
10+
id = Column(Integer, primary_key=True)
11+
model_name = Column(String(250), nullable=False)
12+
is_full_refresh = Column(Boolean, nullable=False)
13+
this_sync_version = Column(BigInteger, nullable=False)
14+
next_sync_version = Column(BigInteger, nullable=False)
15+
completed_on = Column(DateTime(timezone=True), server_default=func.now())
16+
execution_time_ms = Column(Integer, nullable=False)
17+
rows_processed = Column(Integer, nullable=False)
18+
correlation_id = Column(String(250), nullable=True)
19+
status = Column(String(25), nullable=False)
20+

modules/DataLoadTracker.py renamed to modules/data_load_tracking/DataLoadTracker.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,20 +7,22 @@ class DataLoadTracker:
77
status = "Not Started"
88
total_row_count = 0
99
batches = []
10-
configuration_name = None
10+
model_name = None
1111
configuration = None
12-
is_full_load = False
12+
is_full_refresh = False
1313
total_execution_time = None
1414
total_row_count = 0
1515
rows_per_second = 0
16+
correlation_id = None,
1617

17-
def __init__(self, configuration_name, configuration, is_full_load, change_tracking_info):
18-
self.configuration_name = configuration_name
18+
def __init__(self, model_name, configuration, is_full_refresh, change_tracking_info, correlation_id):
19+
self.model_name = model_name
1920
self.configuration = configuration
20-
self.is_full_load = is_full_load
21+
self.is_full_refresh = is_full_refresh
2122
self.started = datetime.now()
2223
self.status = "Not Started"
2324
self.change_tracking_info = change_tracking_info
25+
self.correlation_id = correlation_id
2426

2527
def start_batch(self):
2628
batch = self.Batch()
@@ -37,7 +39,7 @@ def completed_successfully(self):
3739
self.rows_per_second = self.total_row_count / self.total_execution_time.total_seconds()
3840

3941
def get_statistics(self):
40-
load_type = 'full' if self.is_full_load else "incremental from version {0} ".format(self.change_tracking_info.this_sync_version)
42+
load_type = 'full' if self.is_full_refresh else "incremental from version {0} ".format(self.change_tracking_info.this_sync_version)
4143
return "Rows: {0} ({1}), Total Execution Time: {2}. ({3:.2f} rows per second) ".format(self.total_row_count,
4244
load_type,
4345
self.total_execution_time,
@@ -89,6 +91,7 @@ def load_completed_successfully(self):
8991
self.load_rows_per_second = self.row_count / self.load_execution_time.total_seconds()
9092

9193

94+
# TODO: remove
9295
def load_skipped_due_to_zero_rows(self):
9396
self.status = "Skipped - Zero Rows"
9497
self.load_completed = datetime.now()
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import logging
2+
from modules.data_load_tracking.DataLoadExecution import DataLoadExecution, Base
3+
4+
5+
class DataLoadTrackerRepository(object):
6+
def __init__(self, session_maker, logger=None):
7+
self.logger = logger or logging.getLogger(__name__)
8+
self.session_maker = session_maker
9+
10+
def create_tables(self, engine):
11+
engine.execute("CREATE SCHEMA IF NOT EXISTS {0}".format("data_pipeline"))
12+
Base.metadata.create_all(engine)
13+
14+
def get_last_sync_version(self, model_name):
15+
session = self.session_maker()
16+
result = session.query(DataLoadExecution).filter_by(model_name=model_name, status="Load Completed Successfully").order_by(DataLoadExecution.completed_on).first()
17+
18+
if result is None:
19+
return 0
20+
return result.next_sync_version
21+
22+
23+
def save(self, data_load_tracker):
24+
25+
data_load_execution = DataLoadExecution(model_name=data_load_tracker.model_name,
26+
correlation_id=data_load_tracker.correlation_id,
27+
is_full_refresh=data_load_tracker.is_full_refresh,
28+
this_sync_version=data_load_tracker.change_tracking_info.this_sync_version,
29+
next_sync_version=data_load_tracker.change_tracking_info.next_sync_version,
30+
execution_time_ms=int(data_load_tracker.total_execution_time.total_seconds() * 1000),
31+
rows_processed=data_load_tracker.total_row_count,
32+
status=data_load_tracker.status)
33+
34+
35+
36+
session = self.session_maker()
37+
session.add(data_load_execution)
38+
session.commit()

modules/data_sources/MsSqlDataSource.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ def prefix_column(column_name, full_refresh, primary_key_column_name):
3434
def build_select_statement(self, table_configuration, columns, batch_configuration, previous_batch_key, full_refresh, change_tracking_info):
3535
column_array = list(map(lambda cfg: self.prefix_column(cfg['source_name'], full_refresh, table_configuration['primary_key']), columns))
3636
column_names = ", ".join(column_array)
37-
column_names = "{0}, {1} as data_pipeline_next_change_minimum_version".format(column_names, change_tracking_info.next_sync_version)
3837
if full_refresh:
3938
return "SELECT TOP ({0}) {1} FROM {2}.{3} t WHERE t.{4} > {5} ORDER BY t.{4}".format(batch_configuration['size'],
4039
column_names,

0 commit comments

Comments
 (0)