Skip to content
This repository was archived by the owner on Mar 13, 2020. It is now read-only.

Commit e0175b0

Browse files
author
dames
committed
1. Cleaned up a whole lot of logging messages.
2. Start of ability to attach formatters. 3. Better logging of execution time.
1 parent d7acd93 commit e0175b0

File tree

7 files changed

+67
-45
lines changed

7 files changed

+67
-45
lines changed
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
class ToUpper:
2+
def execute(text_in):
3+
x = text_in.upper()
4+
return x
5+
6+
class TrimWhiteSpace:
7+
def execute(text_in):
8+
return text_in.strip()
9+

column_transformers/ToUpper.py

Lines changed: 0 additions & 4 deletions
This file was deleted.

configuraton/provider.json

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
{
2+
23
"source_table": {
3-
"name": "SmallTable",
4+
"name": "SourceData",
45
"schema": "dbo"
56
},
67
"stage_table": {
@@ -12,7 +13,7 @@
1213
"schema": "load"
1314
},
1415
"batch": {
15-
"size": 1000000,
16+
"size": 100000,
1617
"source_unique_column": "id"
1718
},
1819
"columns": [
@@ -41,7 +42,7 @@
4142
"type": "citext.CIText",
4243
"nullable": true
4344
},
44-
"column_transformer": "ToUpper.ToUpper"
45+
"column_transformer": "StringTransformers.ToUpper"
4546
},
4647
{
4748
"source_name": "IntColumn1",

relational_data_loader_project/BatchDataLoader.py

Lines changed: 17 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from io import StringIO
44
import importlib
55

6-
6+
from column_transformers.StringTransformers import ToUpper
77

88

99

@@ -33,33 +33,33 @@ def import_batch(self, source_engine, target_engine, target_table_configuration,
3333
self.logger.debug("ImportBatch Starting for source {0} target {1} previous_key {2}".format(self.source_table_configuration['name'],
3434
target_table_configuration['name'],
3535
previous_key))
36+
3637
sql = self.build_select_statement(previous_key)
37-
self.logger.debug("SQL Statement: {0}".format(sql))
38-
self.logger.info("Starting read")
38+
39+
self.logger.debug("Starting read of SQL Statement: {0}".format(sql))
3940
data_frame = pandas.read_sql_query(sql, source_engine)
40-
self.logger.info("Completed read")
41+
self.logger.debug("Completed read")
4142

4243
batch_tracker.extract_completed_successfully(len(data_frame))
4344

44-
self.attach_column_transformers(data_frame)
45-
4645
if len(data_frame) == 0:
4746
self.logger.info("There are no rows to import, returning False")
4847
batch_tracker.load_skipped_due_to_zero_rows()
4948
return -1
5049

50+
data_frame = self.attach_column_transformers(data_frame)
51+
5152
self.write_data_frame_to_table(data_frame, target_table_configuration, target_engine)
5253
batch_tracker.load_completed_successfully()
5354

5455
last_key_returned = data_frame.iloc[-1][self.batch_configuration['source_unique_column']]
5556

56-
self.logger.debug("Returning {0} to signify we loaded data.".format(last_key_returned))
57-
57+
self.logger.info("Batch key {0} Completed. {1}".format(last_key_returned, batch_tracker.get_statistics()))
5858
return last_key_returned
5959

6060
def write_data_frame_to_table(self, data_frame, table_configuration, target_engine):
6161
destination_table = "{0}.{1}".format(table_configuration['schema'], table_configuration['name'])
62-
self.logger.info("Starting write to table {0}".format(destination_table))
62+
self.logger.debug("Starting write to table {0}".format(destination_table))
6363
data = StringIO()
6464
data_frame.to_csv(data, header=False, index=False, na_rep='')
6565
data.seek(0)
@@ -69,32 +69,24 @@ def write_data_frame_to_table(self, data_frame, table_configuration, target_engi
6969
column_array = list(map(lambda cfg: cfg['destination']['name'], self.columns))
7070

7171
curs.copy_from(data, destination_table, sep=',', columns=column_array, null='')
72-
self.logger.info("Completed write to table {0}".format(destination_table))
72+
self.logger.debug("Completed write to table {0}".format(destination_table))
7373

7474
curs.connection.commit()
7575
return
7676

7777
def attach_column_transformers(self, data_frame):
78-
return
79-
#for column in self.columns:
80-
#if 'column_transformer' in column:
81-
82-
#TODO: this is horribly broken
83-
#data_frame = data_frame[column['source_name']].map(lambda x: x.upper())
84-
#print (data_frame)
78+
self.logger.debug("Attaching column transformers")
79+
for column in self.columns:
80+
if 'column_transformer' in column:
8581
#transformer = self.create_column_transformer_type(column['column_transformer'])
86-
#// df['a'] = df['a'].map(lambda a: a / 2.)
87-
88-
#data_frame.
89-
90-
82+
transformer = ToUpper.execute;
83+
data_frame[column['source_name']] = data_frame[column['source_name']].map(transformer)
84+
#print (data_frame)
85+
return data_frame
9186

9287

9388
def create_column_transformer_type(self, type_name):
9489
module = importlib.import_module(type_name)
9590
class_ = getattr(module, type_name)
9691
instance = class_()
9792
return instance
98-
99-
def remove_non_existent_columns(self, columns):
100-
pass

relational_data_loader_project/DataLoadManager.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ def start_single_import(self, source_engine, target_engine, configuration_name,
3535
columns, target_engine, drop_first=True)
3636

3737
# Import the data.
38-
self.logger.info("Creating Batch Importer")
3938
batch_importer = BatchDataLoader(pipeline_configuration['source_table'], columns,
4039
pipeline_configuration['batch'])
4140

relational_data_loader_project/DataLoadTracker.py

Lines changed: 36 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33

44
class DataLoadTracker:
5-
started = datetime.now()
5+
started = None
66
completed = None
77
status = "Not Started"
88
total_row_count = 0
@@ -18,6 +18,8 @@ def __init__(self, configuration_name, configuration, is_full_load):
1818
self.configuration_name = configuration_name
1919
self.configuration = configuration
2020
self.is_full_load = is_full_load
21+
self.started = datetime.now()
22+
self.status = "Not Started"
2123

2224
def start_batch(self):
2325
batch = self.Batch()
@@ -34,29 +36,52 @@ def completed_successfully(self):
3436
self.rows_per_second = self.total_row_count / self.total_execution_time.total_seconds()
3537

3638
def get_statistics(self):
37-
return "Rows: {0}, Total Execution Time: {1}. ({2} rows per second)".format(self.total_row_count,
38-
self.total_execution_time,
39-
self.rows_per_second)
39+
return "Rows: {0}, Total Execution Time: {1}. ({2:.2f} rows per second)".format(self.total_row_count,
40+
self.total_execution_time,
41+
self.rows_per_second)
4042

4143
class Batch:
4244
row_count = 0
43-
extract_started = datetime.now()
44-
extract_completed_on = None
45-
load_completed_on = None
45+
extract_started = None
46+
extract_completed = None
47+
load_completed = None
4648
status = "Not Started"
4749

50+
extract_execution_time = None
51+
extract_rows_per_second = 0
52+
load_execution_time = None
53+
load_rows_per_second = 0
54+
total_rows_per_second = 0
55+
total_execution_time = None
56+
4857
def __init__(self):
49-
pass
58+
self.extract_started = datetime.now()
59+
self.status = "Not Started"
5060

5161
def extract_completed_successfully(self, row_count):
5262
self.status = "Extract Completed Successfully"
5363
self.row_count = row_count
54-
self.extract_completed_on = datetime.now()
64+
self.extract_completed = datetime.now()
65+
self.extract_execution_time = self.extract_completed - self.extract_started
66+
self.extract_rows_per_second = self.row_count / self.extract_execution_time.total_seconds()
5567

5668
def load_completed_successfully(self):
5769
self.status = "Load Completed Successfully"
58-
self.load_completed_on = datetime.now()
70+
self.load_completed = datetime.now()
71+
self.load_execution_time = self.load_completed - self.extract_completed
72+
self.load_rows_per_second = self.row_count / self.load_execution_time.total_seconds()
73+
self.total_execution_time = self.load_completed - self.extract_started
74+
self.total_rows_per_second = self.row_count / self.total_execution_time.total_seconds()
5975

6076
def load_skipped_due_to_zero_rows(self):
6177
self.status = "Skipped - Zero Rows"
62-
self.load_completed_on = datetime.now()
78+
self.load_completed = datetime.now()
79+
80+
def get_statistics(self):
81+
return "Rows: {0}, Extract Execution Time: {1} ({2:.2f} rows per second). Load Execution Time {3} ({4:.2f} rows per second) Total Execution Time {5} ({6:.2f} rows per second)".format(self.row_count,
82+
self.extract_execution_time,
83+
self.extract_rows_per_second,
84+
self.load_execution_time,
85+
self.load_rows_per_second,
86+
self.total_execution_time,
87+
self.total_rows_per_second)

relational_data_loader_project/__main__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ def configure_logging():
1919
console_stream_handler = logging.StreamHandler()
2020
console_stream_handler.setFormatter(formatter)
2121
log.addHandler(console_stream_handler)
22-
log.setLevel(logging.DEBUG)
22+
log.setLevel(logging.INFO)
2323
return
2424

2525
def get_arguments():

0 commit comments

Comments
 (0)