Skip to content
This repository was archived by the owner on Mar 13, 2020. It is now read-only.

Commit 4952795

Browse files
author
dames
committed
Inital commit from Oscars-Misc
1 parent 99e452c commit 4952795

File tree

8 files changed

+389
-0
lines changed

8 files changed

+389
-0
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
.idea/
2+
13
# Byte-compiled / optimized / DLL files
24
__pycache__/
35
*.py[cod]

BatchDataLoader.py

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
import logging
2+
import pandas
3+
from io import StringIO
4+
import importlib
5+
6+
7+
8+
9+
10+
class BatchDataLoader(object):
11+
def __init__(self, source_table_configuration, columns, batch_configuration, logger=None):
12+
self.logger = logger or logging.getLogger(__name__)
13+
self.source_table_configuration = source_table_configuration
14+
self.columns = columns
15+
self.batch_configuration = batch_configuration
16+
17+
def build_select_statement(self, previous_key=0):
18+
19+
column_array = list(map(lambda cfg: cfg['source_name'], self.columns))
20+
column_names = ", ".join(column_array)
21+
22+
23+
return "SELECT TOP ({0}) {1} FROM {2}.{3} WHERE {4} > {5} ORDER BY {4}".format(self.batch_configuration['size'],
24+
column_names,
25+
self.source_table_configuration['schema'],
26+
self.source_table_configuration['name'],
27+
self.batch_configuration['source_unique_column'],
28+
previous_key
29+
)
30+
31+
# Imports rows, returns True if >0 rows were found
32+
def import_batch(self, source_engine, target_engine, target_table_configuration, batch_tracker, previous_key):
33+
self.logger.debug("ImportBatch Starting for source {0} target {1} previous_key {2}".format(self.source_table_configuration['name'],
34+
target_table_configuration['name'],
35+
previous_key))
36+
sql = self.build_select_statement(previous_key)
37+
self.logger.debug("SQL Statement: {0}".format(sql))
38+
self.logger.info("Starting read")
39+
data_frame = pandas.read_sql_query(sql, source_engine)
40+
self.logger.info("Completed read")
41+
42+
batch_tracker.extract_completed_successfully(len(data_frame))
43+
44+
self.attach_column_transformers(data_frame)
45+
46+
if len(data_frame) == 0:
47+
self.logger.info("There are no rows to import, returning False")
48+
batch_tracker.load_skipped_due_to_zero_rows()
49+
return -1
50+
51+
self.write_data_frame_to_table(data_frame, target_table_configuration, target_engine)
52+
batch_tracker.load_completed_successfully()
53+
54+
last_key_returned = data_frame.iloc[-1][self.batch_configuration['source_unique_column']]
55+
56+
self.logger.debug("Returning {0} to signify we loaded data.".format(last_key_returned))
57+
58+
return last_key_returned
59+
60+
def write_data_frame_to_table(self, data_frame, table_configuration, target_engine):
61+
destination_table = "{0}.{1}".format(table_configuration['schema'], table_configuration['name'])
62+
self.logger.info("Starting write to table {0}".format(destination_table))
63+
data = StringIO()
64+
data_frame.to_csv(data, header=False, index=False)
65+
data.seek(0)
66+
raw = target_engine.raw_connection()
67+
curs = raw.cursor()
68+
69+
column_array = list(map(lambda cfg: cfg['destination']['name'], self.columns))
70+
71+
curs.copy_from(data, destination_table, sep=',', columns=column_array)
72+
self.logger.info("Completed write to table {0}".format(destination_table))
73+
74+
curs.connection.commit()
75+
return
76+
77+
def attach_column_transformers(self, data_frame):
78+
return
79+
#for column in self.columns:
80+
#if 'column_transformer' in column:
81+
82+
#TODO: this is horribly broken
83+
#data_frame = data_frame[column['source_name']].map(lambda x: x.upper())
84+
#print (data_frame)
85+
#transformer = self.create_column_transformer_type(column['column_transformer'])
86+
#// df['a'] = df['a'].map(lambda a: a / 2.)
87+
88+
#data_frame.
89+
90+
91+
92+
93+
def create_column_transformer_type(self, type_name):
94+
module = importlib.import_module(type_name)
95+
class_ = getattr(module, type_name)
96+
instance = class_()
97+
return instance
98+
99+
def remove_non_existent_columns(self, columns):
100+
pass

DataLoadManager.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
import logging
2+
from BatchDataLoader import BatchDataLoader
3+
from DestinationTableManager import DestinationTableManager
4+
from DataLoadTracker import DataLoadTracker
5+
from SourceTableManager import SourceTableManager
6+
import json
7+
8+
9+
class DataLoadManager(object):
10+
def __init__(self, configuration_path, logger=None):
11+
self.logger = logger or logging.getLogger(__name__)
12+
self.configuration_path = configuration_path
13+
14+
def start_import(self, source_engine, target_engine, configuration_name, full_load):
15+
16+
with open("{0}{1}.json".format(self.configuration_path, configuration_name)) as json_data:
17+
pipeline_configuration = json.load(json_data)
18+
19+
data_load_tracker = DataLoadTracker(configuration_name, json_data, full_load)
20+
21+
self.logger.debug("Execute Starting")
22+
destination_table_manager = DestinationTableManager()
23+
24+
columns = self.remove_invalid_columns(pipeline_configuration['source_table'], pipeline_configuration['columns'], source_engine)
25+
26+
if full_load:
27+
self.logger.info("Full-load is set. Recreating the staging table.")
28+
destination_table_manager.create_table(pipeline_configuration['stage_table'],
29+
columns, target_engine, drop_first=True)
30+
31+
# Import the data.
32+
self.logger.info("Creating Batch Importer")
33+
batch_importer = BatchDataLoader(pipeline_configuration['source_table'], columns,
34+
pipeline_configuration['batch'])
35+
36+
previous_unique_column_value = 0
37+
while previous_unique_column_value > -1:
38+
previous_unique_column_value = batch_importer.import_batch(source_engine, target_engine, pipeline_configuration['stage_table'], data_load_tracker.start_batch(), previous_unique_column_value)
39+
40+
41+
self.logger.info("ImportBatch Completed")
42+
43+
#if full_load:
44+
#return
45+
# Rename the stage table to the load table.
46+
# log.information("Full-load is set. Renaming the stage table to the load table.")
47+
# rename_table(pipeline_configuration['stage_source_data'], pipeline_configuration['load_source_data'])
48+
#else:
49+
#return
50+
# upsert_data_from_stage_to_load_tables(pipeline_configuration['stage_source_data'], pipeline_configuration['load_source_data'])
51+
52+
data_load_tracker.completed_successfully()
53+
self.logger.info(data_load_tracker.get_statistics())
54+
55+
def remove_invalid_columns(self, source_table_configuration, column_configration, source_engine):
56+
source_table_manager = SourceTableManager()
57+
existing_columns = source_table_manager.get_columns(source_table_configuration, source_engine)
58+
return list(filter(lambda column: self.column_exists(column['source_name'], existing_columns), column_configration))
59+
60+
def column_exists(self, column_name, existing_columns):
61+
if column_name in existing_columns:
62+
return True
63+
self.logger.warning("Column {0} does not exist in source. It will be ignored for now, however may cause downstream issues.".format(column_name))
64+
return False

DataLoadTracker.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
from datetime import datetime
2+
3+
4+
class DataLoadTracker:
5+
started = datetime.now()
6+
completed = None
7+
status = "Not Started"
8+
total_row_count = 0
9+
batches = []
10+
configuration_name = None
11+
configuration = None
12+
is_full_load = False
13+
total_execution_time = None
14+
total_row_count = 0
15+
rows_per_second = 0
16+
17+
def __init__(self, configuration_name, configuration, is_full_load):
18+
self.configuration_name = configuration_name
19+
self.configuration = configuration
20+
self.is_full_load = is_full_load
21+
22+
def start_batch(self):
23+
batch = self.Batch()
24+
self.batches.append(batch)
25+
return batch
26+
27+
def completed_successfully(self):
28+
self.completed = datetime.now()
29+
self.total_execution_time = self.completed - self.started
30+
31+
for batch in self.batches:
32+
self.total_row_count = self.total_row_count + batch.row_count
33+
34+
self.rows_per_second = self.total_row_count / self.total_execution_time.total_seconds()
35+
36+
def get_statistics(self):
37+
return "Rows: {0}, Total Execution Time: {1}. ({2} rows per second)".format(self.total_row_count,
38+
self.total_execution_time,
39+
self.rows_per_second)
40+
41+
class Batch:
42+
row_count = 0
43+
extract_started = datetime.now()
44+
extract_completed_on = None
45+
load_completed_on = None
46+
status = "Not Started"
47+
48+
def __init__(self):
49+
pass
50+
51+
def extract_completed_successfully(self, row_count):
52+
self.status = "Extract Completed Successfully"
53+
self.row_count = row_count
54+
self.extract_completed_on = datetime.now()
55+
56+
def load_completed_successfully(self):
57+
self.status = "Load Completed Successfully"
58+
self.load_completed_on = datetime.now()
59+
60+
def load_skipped_due_to_zero_rows(self):
61+
self.status = "Skipped - Zero Rows"
62+
self.load_completed_on = datetime.now()

DestinationTableManager.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
import logging
2+
from sqlalchemy import MetaData, DateTime
3+
from sqlalchemy.schema import Column, Table
4+
import importlib
5+
from sqlalchemy.sql import func
6+
7+
8+
class DestinationTableManager(object):
9+
def __init__(self, logger=None):
10+
self.logger = logger or logging.getLogger(__name__)
11+
12+
def create_table(self, table_configuration, columns_configuration, target_engine, drop_first):
13+
metadata = MetaData()
14+
15+
table = Table(table_configuration['name'], metadata, schema=table_configuration['schema'])
16+
17+
for column_configuration in columns_configuration:
18+
table.append_column(self.create_column(column_configuration['destination']))
19+
20+
table.append_column(
21+
Column("data_pipeline_timestamp", DateTime(timezone=True), server_default=func.now()))
22+
23+
if drop_first:
24+
self.logger.info(
25+
"Dropping table {0}.{1}".format(table_configuration['name'], table_configuration['schema']))
26+
table.drop(target_engine, checkfirst=True)
27+
self.logger.debug(
28+
"Dropped table {0}.{1}".format(table_configuration['name'], table_configuration['schema']))
29+
30+
self.logger.info("Creating table {0}.{1}".format(table_configuration['name'], table_configuration['schema']))
31+
table.create(target_engine, checkfirst=False)
32+
return
33+
34+
def create_column_type(self, type_name):
35+
parts = type_name.split(".")
36+
module = importlib.import_module(parts[0])
37+
class_ = getattr(module, parts[1])
38+
instance = class_()
39+
return instance
40+
41+
def create_column(self, configuration):
42+
return Column(configuration['name'], self.create_column_type(configuration['type']),
43+
primary_key=configuration.get("primary_key", False),
44+
nullable=configuration['nullable']);
45+
46+
def rename_table(self, source_table_configuration, target_table_configuration):
47+
print('TODO - create a rename-table method. Eg: ALTER TABLE table_name RENAME TO new_table_name;')
48+
return;
49+
50+
def upsert_data_from_stage_to_load_tables(self, source_table_configuration, target_table_configuration):
51+
print('TODO - create a method to upsert the data;')
52+
return;

SourceTableManager.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
import logging
2+
from sqlalchemy import MetaData
3+
from sqlalchemy.schema import Table
4+
5+
6+
7+
class SourceTableManager(object):
8+
def __init__(self, logger=None):
9+
self.logger = logger or logging.getLogger(__name__)
10+
11+
def get_columns(self, table_configuration, source_engine):
12+
metadata = MetaData()
13+
self.logger.debug("Reading definition for source table {0}.{1}".format(table_configuration['schema'], table_configuration['name']))
14+
table = Table(table_configuration['name'], metadata, schema=table_configuration['schema'], autoload=True, autoload_with=source_engine)
15+
return list(map(lambda column:column.name, table.columns))
16+
17+
18+

column_transformers/ToUpper.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
class TextCleanUp:
2+
def execute(text_in):
3+
return text_in.upper()
4+

0 commit comments

Comments
 (0)