Skip to content
This repository was archived by the owner on Mar 13, 2020. It is now read-only.

Commit 7fa066d

Browse files
author
dames
committed
Inital stab at refactoring for sources that can change.
1 parent 7ae0619 commit 7fa066d

File tree

7 files changed

+123
-75
lines changed

7 files changed

+123
-75
lines changed

relational_data_loader_project/BatchDataLoader.py

Lines changed: 15 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,46 +1,28 @@
11
import logging
2-
import pandas
32
from io import StringIO
43
import importlib
54

65
from column_transformers.StringTransformers import ToUpper
76

87

9-
108
class BatchDataLoader(object):
11-
def __init__(self, source_table_configuration, columns, batch_configuration, logger=None):
9+
def __init__(self, data_source, source_table_configuration, target_table_configuration, columns, data_load_tracker, batch_configuration, target_engine, logger=None):
1210
self.logger = logger or logging.getLogger(__name__)
1311
self.source_table_configuration = source_table_configuration
1412
self.columns = columns
13+
self.data_source = data_source
14+
self.target_table_configuration = target_table_configuration
15+
self.data_load_tracker = data_load_tracker
1516
self.batch_configuration = batch_configuration
16-
17-
def build_select_statement(self, previous_key=0):
18-
19-
column_array = list(map(lambda cfg: cfg['source_name'], self.columns))
20-
column_names = ", ".join(column_array)
21-
22-
23-
return "SELECT TOP ({0}) {1} FROM {2}.{3} WHERE {4} > {5} ORDER BY {4}".format(self.batch_configuration['size'],
24-
column_names,
25-
self.source_table_configuration['schema'],
26-
self.source_table_configuration['name'],
27-
self.batch_configuration['source_unique_column'],
28-
previous_key
29-
)
17+
self.target_engine = target_engine
3018

3119
# Imports rows, returns True if >0 rows were found
32-
def import_batch(self, source_engine, target_engine, target_table_configuration, batch_tracker, previous_key):
33-
self.logger.debug("ImportBatch Starting for source {0} target {1} previous_key {2}".format(self.source_table_configuration['name'],
34-
target_table_configuration['name'],
35-
previous_key))
36-
37-
sql = self.build_select_statement(previous_key)
20+
def import_batch(self, previous_batch_key):
21+
batch_tracker = self.data_load_tracker.start_batch()
3822

39-
self.logger.debug("Starting read of SQL Statement: {0}".format(sql))
40-
data_frame = pandas.read_sql_query(sql, source_engine)
41-
self.logger.debug("Completed read")
23+
self.logger.debug("ImportBatch Starting from previous_batch_key: {0}".format(previous_batch_key))
4224

43-
batch_tracker.extract_completed_successfully(len(data_frame))
25+
data_frame = self.data_source.get_next_data_frame(self.source_table_configuration, self.columns, self.batch_configuration, batch_tracker, previous_batch_key)
4426

4527
if len(data_frame) == 0:
4628
self.logger.debug("There are no rows to import, returning -1")
@@ -49,7 +31,7 @@ def import_batch(self, source_engine, target_engine, target_table_configuration,
4931

5032
data_frame = self.attach_column_transformers(data_frame)
5133

52-
self.write_data_frame_to_table(data_frame, target_table_configuration, target_engine)
34+
self.write_data_frame_to_table(data_frame, self.target_table_configuration, self.target_engine)
5335
batch_tracker.load_completed_successfully()
5436

5537
last_key_returned = data_frame.iloc[-1][self.batch_configuration['source_unique_column']]
@@ -66,6 +48,9 @@ def write_data_frame_to_table(self, data_frame, table_configuration, target_engi
6648
raw = target_engine.raw_connection()
6749
curs = raw.cursor()
6850

51+
#TODO: This is assuming that our destination schema column order matches the columns in the dataframe. This
52+
#isn't always correct (especially in csv sources) - therefore, we should derive the column_array from the
53+
#data frames' columns.
6954
column_array = list(map(lambda cfg: cfg['destination']['name'], self.columns))
7055

7156
curs.copy_from(data, destination_table, sep=',', columns=column_array, null='')
@@ -78,13 +63,12 @@ def attach_column_transformers(self, data_frame):
7863
self.logger.debug("Attaching column transformers")
7964
for column in self.columns:
8065
if 'column_transformer' in column:
81-
#transformer = self.create_column_transformer_type(column['column_transformer'])
66+
# transformer = self.create_column_transformer_type(column['column_transformer'])
8267
transformer = ToUpper.execute;
8368
data_frame[column['source_name']] = data_frame[column['source_name']].map(transformer)
84-
#print (data_frame)
69+
# print (data_frame)
8570
return data_frame
8671

87-
8872
def create_column_transformer_type(self, type_name):
8973
module = importlib.import_module(type_name)
9074
class_ = getattr(module, type_name)
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
import logging
2+
import pandas
3+
4+
5+
class CsvDataSource(object):
6+
def __init__(self, source_path, source_table_configuration, columns, logger=None):
7+
self.logger = logger or logging.getLogger(__name__)
8+
self.source_path = source_path
9+
self.columns = columns
10+
11+
def get_data_frame(self, batch_tracker, previous_key=0):
12+
path_to_csv_file = "{0}{1}.csv".format(self.source_path, self.source_table_configuration['source_table']['name'])
13+
14+
self.logger.debug("Starting read of file: {0}".format(path_to_csv_file))
15+
data_frame = pandas.read_csv(path_to_csv_file)
16+
self.logger.debug("Completed read")
17+
18+
batch_tracker.extract_completed_successfully(len(data_frame))
19+
return data_frame
20+
21+

relational_data_loader_project/DataLoadManager.py

Lines changed: 15 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,46 +2,50 @@
22
from relational_data_loader_project.BatchDataLoader import BatchDataLoader
33
from relational_data_loader_project.DestinationTableManager import DestinationTableManager
44
from relational_data_loader_project.DataLoadTracker import DataLoadTracker
5-
from relational_data_loader_project.SourceTableManager import SourceTableManager
65
import os
76
import json
87

98

109
class DataLoadManager(object):
11-
def __init__(self, configuration_path, logger=None):
10+
def __init__(self, configuration_path, data_source, logger=None):
1211
self.logger = logger or logging.getLogger(__name__)
1312
self.configuration_path = configuration_path
13+
self.data_source = data_source
1414

15-
def start_import(self, source_engine, target_engine, full_load):
15+
def start_imports(self, target_engine, full_load):
1616
for file in os.listdir(self.configuration_path):
17-
self.start_single_import(source_engine, target_engine, file, full_load)
17+
self.start_single_import(target_engine, file, full_load)
1818

19-
def start_single_import(self, source_engine, target_engine, configuration_name, full_load):
19+
def start_single_import(self, target_engine, configuration_name, full_load):
2020

2121
with open("{0}{1}".format(self.configuration_path, configuration_name)) as json_data:
2222
pipeline_configuration = json.load(json_data)
2323

2424
data_load_tracker = DataLoadTracker(configuration_name, json_data, full_load)
2525

2626
self.logger.debug("Execute Starting")
27+
2728
destination_table_manager = DestinationTableManager()
2829

29-
columns = self.remove_invalid_columns(pipeline_configuration['source_table'], pipeline_configuration['columns'],
30-
source_engine)
30+
columns = self.data_source.get_valid_columns(pipeline_configuration['source_table'], pipeline_configuration['columns'])
3131

3232
if full_load:
3333
self.logger.info("Full-load is set. Recreating the staging table.")
3434
destination_table_manager.create_table(pipeline_configuration['stage_table'],
3535
columns, target_engine, drop_first=True)
3636

3737
# Import the data.
38-
batch_importer = BatchDataLoader(pipeline_configuration['source_table'], columns,
39-
pipeline_configuration['batch'])
38+
batch_importer = BatchDataLoader(self.data_source,
39+
pipeline_configuration['source_table'],
40+
pipeline_configuration['stage_table'],
41+
columns,
42+
data_load_tracker,
43+
pipeline_configuration['batch'],
44+
target_engine)
4045

4146
previous_unique_column_value = 0
4247
while previous_unique_column_value > -1:
43-
previous_unique_column_value = batch_importer.import_batch(source_engine, target_engine, pipeline_configuration['stage_table'], data_load_tracker.start_batch(), previous_unique_column_value)
44-
48+
previous_unique_column_value = batch_importer.import_batch(previous_unique_column_value)
4549

4650
self.logger.info("ImportBatch Completed")
4751

@@ -57,13 +61,3 @@ def start_single_import(self, source_engine, target_engine, configuration_name,
5761
data_load_tracker.completed_successfully()
5862
self.logger.info(data_load_tracker.get_statistics())
5963

60-
def remove_invalid_columns(self, source_table_configuration, column_configration, source_engine):
61-
source_table_manager = SourceTableManager()
62-
existing_columns = source_table_manager.get_columns(source_table_configuration, source_engine)
63-
return list(filter(lambda column: self.column_exists(column['source_name'], existing_columns), column_configration))
64-
65-
def column_exists(self, column_name, existing_columns):
66-
if column_name in existing_columns:
67-
return True
68-
self.logger.warning("Column {0} does not exist in source. It will be ignored for now, however may cause downstream issues.".format(column_name))
69-
return False
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
import logging
2+
import pandas
3+
from sqlalchemy import create_engine
4+
from sqlalchemy import MetaData
5+
from sqlalchemy.schema import Table
6+
7+
8+
class MsSqlDataSource(object):
9+
10+
def __init__(self, connection_string, logger=None):
11+
self.logger = logger or logging.getLogger(__name__)
12+
self.connection_string = connection_string
13+
self.database_engine = create_engine(connection_string)
14+
15+
def build_select_statement(self, table_configuration, columns, batch_configuration, previous_batch_key):
16+
column_array = list(map(lambda cfg: cfg['source_name'], columns))
17+
column_names = ", ".join(column_array)
18+
19+
return "SELECT TOP ({0}) {1} FROM {2}.{3} WHERE {4} > {5} ORDER BY {4}".format(batch_configuration['size'],
20+
column_names,
21+
table_configuration[
22+
'schema'],
23+
table_configuration[
24+
'name'],
25+
batch_configuration[
26+
'source_unique_column'],
27+
previous_batch_key)
28+
29+
# Returns an array of configured_columns containing only columns that this data source supports. Logs invalid ones.
30+
def get_valid_columns(self, table_configuration, configured_columns):
31+
columns_in_database = self.get_table_columns(table_configuration)
32+
33+
return list(
34+
filter(lambda column: self.column_exists(column['source_name'], columns_in_database), configured_columns))
35+
36+
def column_exists(self, column_name, columns_in_database):
37+
if column_name in columns_in_database:
38+
return True
39+
self.logger.warning(
40+
"Column {0} does not exist in source. It will be ignored for now, however may cause downstream issues.".format(
41+
column_name))
42+
return False
43+
44+
def get_table_columns(self, table_configuration):
45+
metadata = MetaData()
46+
self.logger.debug("Reading definition for source table {0}.{1}".format(table_configuration['schema'],
47+
table_configuration['name']))
48+
table = Table(table_configuration['name'], metadata, schema=table_configuration['schema'], autoload=True,
49+
autoload_with=self.database_engine)
50+
return list(map(lambda column: column.name, table.columns))
51+
52+
def get_next_data_frame(self, table_configuration, columns, batch_configuration, batch_tracker, previous_batch_key):
53+
sql = self.build_select_statement(table_configuration, columns, batch_configuration, previous_batch_key)
54+
55+
self.logger.debug("Starting read of SQL Statement: {0}".format(sql))
56+
data_frame = pandas.read_sql_query(sql, self.database_engine)
57+
self.logger.debug("Completed read")
58+
59+
batch_tracker.extract_completed_successfully(len(data_frame))
60+
61+
return data_frame

relational_data_loader_project/SourceTableManager.py

Lines changed: 0 additions & 18 deletions
This file was deleted.

relational_data_loader_project/__main__.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import logging
22
from relational_data_loader_project.DataLoadManager import DataLoadManager
3+
from relational_data_loader_project.MsSqlDataSource import MsSqlDataSource
34
from sqlalchemy import create_engine
45
import argparse
56

@@ -9,11 +10,12 @@
910
def main(args):
1011

1112
configure_logging(args['log_level'])
12-
source_engine = create_engine(args['source-engine'])
13+
data_source = MsSqlDataSource(args['source-connection-string'])
14+
1315
destination_engine = create_engine(args['destination-engine'])
1416

15-
data_load_manager = DataLoadManager(args['configuration-folder'])
16-
data_load_manager.start_import(source_engine, destination_engine, True)
17+
data_load_manager = DataLoadManager(args['configuration-folder'], data_source)
18+
data_load_manager.start_imports(destination_engine, True)
1719

1820

1921
def configure_logging(log_level):
@@ -41,8 +43,8 @@ def _log_level_string_to_int(log_level_string):
4143
def get_arguments():
4244
parser = argparse.ArgumentParser(description='Relational Data Loader')
4345

44-
parser.add_argument('source-engine', metavar='source-engine',
45-
help='The source engine. Eg: mssql+pyodbc://dwsource')
46+
parser.add_argument('source-connection-string', metavar='source-connection-string',
47+
help='The source connections string. Eg: mssql+pyodbc://dwsource or csv://c://some//Path//To//Csv//Files//')
4648

4749
parser.add_argument('destination-engine', metavar='destination-engine',
4850
help='The destination engine. Eg: postgresql+psycopg2://postgres:xxxx@localhost/dest_dw')
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
id,StringColumn1,IntColumn1,StringColumn2,DateColumn1,DateColumn2,DateTimeColumn1,LongString,UnicodeString
2+
1,"String Column 1",555, "String Column 2, 01-Dec-1976,,01-dec-1976 1:00 am,"This is a really long string",""
3+
4+

0 commit comments

Comments
 (0)