Skip to content
This repository was archived by the owner on Mar 13, 2020. It is now read-only.

Commit 4a3d633

Browse files
authored
[OSC-1224] Use timestamps to determine what to return (#20)
* added timestamp cli arg and return results since then * improve output results * ran autopep8 * updated readme.md * fix typo in readme * move data retreival code to repositoy layer * bug fix - remove lefover method param * use string constants module more * begin design of unit tests * update unit test cmd connection string * change unit tests to match RDL usage * split up results based on FRT/incremental * make an API for incremental exluding frt * update function name * autopep8 * relint * split rdl into separate commands, audit and process * update integration tests to fit new API * Apply suggestions from code review Update timezone arg to use nanoseconds as per ISO8601 Co-Authored-By: seanbudd <seanbudd123@gmail.com> * use a dictionary of constants for audit options
1 parent 4803c7a commit 4a3d633

14 files changed

+399
-55
lines changed

README.md

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,9 @@ A utility for taking data from MS-SQL and loading it into PostgreSQL
99
`py rdl.py --help`
1010

1111
```text
12-
usage: rdl.py [-h] [-f [FORCE_FULL_REFRESH_MODELS]] [-l [LOG_LEVEL]]
13-
source-connection-string destination-connection-string
14-
configuration-folder
15-
16-
Relational Data Loader
12+
usage: rdl.py process [-h] [-f [FORCE_FULL_REFRESH_MODELS]] [-l [LOG_LEVEL]]
13+
source-connection-string destination-connection-string
14+
configuration-folder
1715
1816
positional arguments:
1917
source-connection-string
@@ -40,6 +38,27 @@ optional arguments:
4038
-l [LOG_LEVEL], --log-level [LOG_LEVEL]
4139
Set the logging output level. ['CRITICAL', 'ERROR',
4240
'WARNING', 'INFO', 'DEBUG']
41+
42+
usage: rdl.py audit [-h] [-l [LOG_LEVEL]]
43+
destination-connection-string model-type timestamp
44+
45+
positional arguments:
46+
destination-connection-string
47+
The destination database connection string. Provide in
48+
PostgreSQL + Psycopg format. Eg: 'postgresql+psycopg2:
49+
//username:password@host:port/dbname'
50+
model-type Use the command FULL to return full refresh models or
51+
the command INCR to return only the incremental models
52+
since the timestamp
53+
timestamp ISO 8601 datetime with timezone (`yyyy-mm-ddThh:mm:ss.nnnnnn+|-hh:mm`) used to provide information on all
54+
actions since the specified date. Eg
55+
'2019-02-14T01:55:54.123456+00:00'.
56+
57+
optional arguments:
58+
-h, --help show this help message and exit
59+
-l [LOG_LEVEL], --log-level [LOG_LEVEL]
60+
Set the logging output level. ['CRITICAL', 'ERROR',
61+
'WARNING', 'INFO', 'DEBUG']
4362
```
4463

4564
_Notes:_

modules/DataLoadManager.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import hashlib
55
from pathlib import Path
66
from json import JSONDecodeError
7+
78
from modules.BatchDataLoader import BatchDataLoader
89
from modules.DestinationTableManager import DestinationTableManager
910
from modules.data_load_tracking.DataLoadTracker import DataLoadTracker

modules/RelationalDataLoader.py

Lines changed: 94 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import logging
22
import argparse
3+
from datetime import datetime
34
from sqlalchemy import create_engine
45
from modules.DataLoadManager import DataLoadManager
56
from modules.shared import Constants
@@ -8,6 +9,10 @@
89
from sqlalchemy.orm import sessionmaker
910

1011
_LOG_LEVEL_STRINGS = ['CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG']
12+
_AUDIT_FUNCTION_OPTIONS = {
13+
'FULL': DataLoadTrackerRepository.get_full_refresh_since,
14+
'INCR': DataLoadTrackerRepository.get_only_incremental_since,
15+
}
1116

1217

1318
class RelationalDataLoader:
@@ -16,19 +21,33 @@ def __init__(self, logger=None):
1621
self.data_source_factory = DataSourceFactory()
1722

1823
def main(self):
19-
args = self.get_arguments()
24+
self.args = self.get_arguments()
2025

21-
self.configure_root_logger(args.log_level)
26+
self.configure_root_logger(self.args.log_level)
2227

23-
source_db = self.data_source_factory.create_source(args.source_connection_string)
28+
self.args.func()
2429

25-
destination_db = create_engine(args.destination_connection_string)
30+
def execute_process_command(self):
31+
source_db = self.data_source_factory.create_source(self.args.source_connection_string)
32+
33+
destination_db = create_engine(self.args.destination_connection_string)
2634
session_maker = sessionmaker(bind=destination_db)
2735
repository = DataLoadTrackerRepository(session_maker)
2836
repository.ensure_schema_exists(destination_db)
2937

30-
data_load_manager = DataLoadManager(args.configuration_folder, source_db, destination_db, repository)
31-
data_load_manager.start_imports(args.force_full_refresh_models)
38+
data_load_manager = DataLoadManager(self.args.configuration_folder, source_db, destination_db, repository)
39+
data_load_manager.start_imports(self.args.force_full_refresh_models)
40+
41+
def execute_audit_command(self):
42+
destination_db = create_engine(self.args.destination_connection_string)
43+
session_maker = sessionmaker(bind=destination_db)
44+
data_load_tracker_repository = DataLoadTrackerRepository(session_maker)
45+
46+
last_successful_timestamp = datetime.fromisoformat(self.args.timestamp)
47+
48+
results = _AUDIT_FUNCTION_OPTIONS[self.args.model_type](data_load_tracker_repository, last_successful_timestamp)
49+
50+
print(results.join(" "))
3251

3352
def configure_root_logger(self, log_level):
3453
# get the root logger
@@ -67,39 +86,81 @@ def raw_connection_string_to_valid_source_connection_string(self, connection_str
6786
def get_arguments(self):
6887
parser = argparse.ArgumentParser(description=Constants.APP_NAME)
6988

70-
parser.add_argument(
89+
subparsers = parser.add_subparsers(title='commands', metavar='', dest='command')
90+
91+
process_command_parser = subparsers.add_parser('process', help='processes load models')
92+
process_command_parser.set_defaults(func=self.execute_process_command)
93+
94+
process_command_parser.add_argument(
7195
'source_connection_string',
7296
metavar='source-connection-string',
7397
type=self.raw_connection_string_to_valid_source_connection_string,
7498
help='The source connections string as a 64bit ODBC system dsn. Eg: mssql+pyodbc://dwsource or '
7599
'csv://c://some//Path//To//Csv//Files//')
76100

77-
parser.add_argument('destination_connection_string',
78-
metavar='destination-connection-string',
79-
help='The destination database connection string. Provide in PostgreSQL + Psycopg format. '
80-
'Eg: \'postgresql+psycopg2://username:password@host:port/dbname\'')
81-
82-
parser.add_argument('configuration_folder',
83-
metavar='configuration-folder',
84-
help='Absolute or relative path to the models. '
85-
'Eg \'./models\', \'C:/path/to/models\'')
86-
87-
parser.add_argument('-f',
88-
'--force-full-refresh-models',
89-
nargs='?',
90-
const='*',
91-
help='Comma separated model names in the configuration folder. These models would be '
92-
'forcefully refreshed dropping and recreating the destination tables. All others '
93-
'models would only be refreshed if required as per the state of the source and '
94-
'destination tables. '
95-
'Eg \'CompoundPkTest,LargeTableTest\'. '
96-
'Leave blank or use glob (*) to force full refresh of all models.')
97-
98-
parser.add_argument('-l', '--log-level',
99-
default='INFO',
100-
type=self.log_level_string_to_int,
101-
nargs='?',
102-
help=f'Set the logging output level. {_LOG_LEVEL_STRINGS}')
101+
process_command_parser.add_argument(
102+
'destination_connection_string',
103+
metavar='destination-connection-string',
104+
help='The destination database connection string. Provide in PostgreSQL'
105+
' + Psycopg format. '
106+
'Eg: \'postgresql+psycopg2://username:password@host:port/dbname\'')
107+
108+
process_command_parser.add_argument(
109+
'configuration_folder',
110+
metavar='configuration-folder',
111+
help='Absolute or relative path to the models. '
112+
'Eg \'./models\', \'C:/path/to/models\'')
113+
114+
process_command_parser.add_argument(
115+
'-f',
116+
'--force-full-refresh-models',
117+
nargs='?',
118+
const='*',
119+
help='Comma separated model names in the configuration folder. '
120+
'These models would be forcefully refreshed dropping and recreating the '
121+
'destination tables. All others models would only be refreshed if required '
122+
'as per the state of the source and destination tables. '
123+
'Eg \'CompoundPkTest,LargeTableTest\'. '
124+
'Leave blank or use glob (*) to force full refresh of all models.')
125+
126+
process_command_parser.add_argument(
127+
'-l', '--log-level',
128+
default='INFO',
129+
type=self.log_level_string_to_int,
130+
nargs='?',
131+
help=f'Set the logging output level. {_LOG_LEVEL_STRINGS}')
132+
133+
audit_command_parser = subparsers.add_parser('audit',
134+
help='provides list of processed models since a given timestamp')
135+
audit_command_parser.set_defaults(func=self.execute_audit_command)
136+
137+
audit_command_parser.add_argument(
138+
'destination_connection_string',
139+
metavar='destination-connection-string',
140+
help='The destination database connection string. Provide in PostgreSQL'
141+
' + Psycopg format. '
142+
'Eg: \'postgresql+psycopg2://username:password@host:port/dbname\'')
143+
144+
audit_command_parser.add_argument(
145+
'model_type',
146+
metavar='model-type',
147+
choices=_AUDIT_FUNCTION_OPTIONS.keys(),
148+
help='Use the command FULL to return full refresh models or the '
149+
'command INCR to return only the incremental models since the timestamp')
150+
151+
audit_command_parser.add_argument(
152+
'timestamp',
153+
metavar='timestamp',
154+
help='ISO 8601 datetime with timezone (yyyy-mm-ddThh:mm:ss.nnnnnn+|-hh:mm) used to provide information '
155+
'on all actions since the specified date. '
156+
'Eg \'2019-02-14T01:55:54.123456+00:00\'. ')
157+
158+
audit_command_parser.add_argument(
159+
'-l', '--log-level',
160+
default='INFO',
161+
type=self.log_level_string_to_int,
162+
nargs='?',
163+
help=f'Set the logging output level. {_LOG_LEVEL_STRINGS}')
103164

104165
return parser.parse_args()
105166

modules/data_load_tracking/DataLoadTrackerRepository.py

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
import logging
2-
from sqlalchemy import desc
2+
33
from modules.data_load_tracking.DataLoadExecution import DataLoadExecution, Base
44
from modules.shared import Constants
55

6+
from sqlalchemy import desc
7+
from sqlalchemy import func
8+
69

710
class DataLoadTrackerRepository(object):
811
def __init__(self, session_maker, logger=None):
@@ -15,10 +18,12 @@ def ensure_schema_exists(self, engine):
1518

1619
def get_last_successful_data_load_execution(self, model_name):
1720
session = self.session_maker()
18-
return session.query(DataLoadExecution)\
21+
result = session.query(DataLoadExecution)\
1922
.filter_by(model_name=model_name, status=Constants.ExecutionStatus.COMPLETED_SUCCESSFULLY)\
2023
.order_by(desc(DataLoadExecution.completed_on))\
2124
.first()
25+
session.close()
26+
return result
2227

2328
def save(self, data_load_tracker):
2429
data_load_execution = DataLoadExecution(
@@ -36,3 +41,39 @@ def save(self, data_load_tracker):
3641
session = self.session_maker()
3742
session.add(data_load_execution)
3843
session.commit()
44+
session.close()
45+
46+
def get_full_refresh_since(self, timestamp):
47+
session = self.session_maker()
48+
results = session.query(DataLoadExecution.model_name)\
49+
.filter(DataLoadExecution.completed_on > timestamp,
50+
DataLoadExecution.is_full_refresh)\
51+
.distinct(DataLoadExecution.model_name)\
52+
.group_by(DataLoadExecution.model_name)\
53+
.all()
54+
session.close()
55+
return [r for (r, ) in results]
56+
57+
def get_incremental_since(self, timestamp):
58+
session = self.session_maker()
59+
results = session.query(DataLoadExecution.model_name)\
60+
.filter(DataLoadExecution.completed_on > timestamp,
61+
DataLoadExecution.is_full_refresh == False,
62+
DataLoadExecution.rows_processed > 0)\
63+
.distinct(DataLoadExecution.model_name)\
64+
.group_by(DataLoadExecution.model_name)\
65+
.all()
66+
session.close()
67+
return [r for (r, ) in results]
68+
69+
def get_only_incremental_since(self, timestamp):
70+
session = self.session_maker()
71+
results = session.query(DataLoadExecution.model_name)\
72+
.filter(DataLoadExecution.completed_on > timestamp,
73+
DataLoadExecution.rows_processed > 0)\
74+
.distinct(DataLoadExecution.model_name)\
75+
.group_by(DataLoadExecution.model_name)\
76+
.having(func.bool_and(DataLoadExecution.is_full_refresh == False))\
77+
.all()
78+
session.close()
79+
return [r for (r, ) in results]

modules/data_sources/MsSqlDataSource.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@ def build_select_statement(self, table_config, columns, batch_config, batch_key_
5151
order_by_sql = "ORDER BY " + f", {MsSqlDataSource.SOURCE_TABLE_ALIAS}.".join(table_config['primary_keys'])
5252
else:
5353
select_sql = f"SELECT TOP ({batch_config['size']}) {column_names}, " \
54-
f"{MsSqlDataSource.CHANGE_TABLE_ALIAS}.SYS_CHANGE_VERSION AS {Constants.AuditColumnNames.CHANGE_VERSION}, " \
54+
f"{MsSqlDataSource.CHANGE_TABLE_ALIAS}.SYS_CHANGE_VERSION" \
55+
f" AS {Constants.AuditColumnNames.CHANGE_VERSION}, " \
5556
f"CASE {MsSqlDataSource.CHANGE_TABLE_ALIAS}.SYS_CHANGE_OPERATION WHEN 'D' THEN 1 ELSE 0 " \
5657
f"END AS {Constants.AuditColumnNames.IS_DELETED}"
5758
from_sql = f"FROM CHANGETABLE(CHANGES" \
Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
11
{
2-
"username": "rdl_test_user",
3-
"password": "hunter2",
4-
"server_string": "(local)\\SQLEXPRESS"
2+
"mssql": {
3+
"username": "rdl_test_user",
4+
"password": "hunter2",
5+
"server_string": "(local)\\SQLEXPRESS"
6+
},
7+
"psql": {
8+
"username": "postgres",
9+
"password": "postgres",
10+
"server_string": "postgres"
11+
}
512
}

0 commit comments

Comments
 (0)