Skip to content
This repository was archived by the owner on Mar 13, 2020. It is now read-only.

Commit 4f63e7d

Browse files
authored
[OSC-1294] give an audit column prefix as defined by an argument (#31)
* give an audit column prefix as defined by an argument * update the audit column prefix source of truth * update default prefix from rdl -> rdl_
1 parent ab50132 commit 4f63e7d

File tree

7 files changed

+73
-31
lines changed

7 files changed

+73
-31
lines changed

README.md

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,10 @@ A utility for taking data from MS-SQL and loading it into PostgreSQL
99
`py -m rdl --help`
1010

1111
```text
12-
usage: py -m rdl process [-h] [-f [FORCE_FULL_REFRESH_MODELS]] [-l [LOG_LEVEL]]
13-
source-connection-string destination-connection-string
14-
configuration-folder
12+
usage: py -m rdl process [-h] [-f [FORCE_FULL_REFRESH_MODELS]]
13+
[-l [LOG_LEVEL]] [-p [AUDIT_COLUMN_PREFIX]]
14+
source-connection-string
15+
destination-connection-string configuration-folder
1516
1617
positional arguments:
1718
source-connection-string
@@ -38,9 +39,13 @@ optional arguments:
3839
-l [LOG_LEVEL], --log-level [LOG_LEVEL]
3940
Set the logging output level. ['CRITICAL', 'ERROR',
4041
'WARNING', 'INFO', 'DEBUG']
42+
-p [AUDIT_COLUMN_PREFIX], --audit-column-prefix [AUDIT_COLUMN_PREFIX]
43+
Set the audit column prefix, used in the destination
44+
schema. Default is 'rdl_'.
4145
42-
usage: py -m rdl audit [-h] [-l [LOG_LEVEL]]
43-
destination-connection-string model-type timestamp
46+
47+
usage: py -m rdl audit [-h] [-l [LOG_LEVEL]] [-p [AUDIT_COLUMN_PREFIX]]
48+
destination-connection-string model-type timestamp
4449
4550
positional arguments:
4651
destination-connection-string
@@ -50,15 +55,19 @@ positional arguments:
5055
model-type Use the command FULL to return full refresh models or
5156
the command INCR to return only the incremental models
5257
since the timestamp
53-
timestamp ISO 8601 datetime with timezone (`yyyy-mm-ddThh:mm:ss.nnnnnn+|-hh:mm`) used to provide information on all
54-
actions since the specified date. Eg
55-
'2019-02-14T01:55:54.123456+00:00'.
58+
timestamp ISO 8601 datetime with timezone (yyyy-mm-
59+
ddThh:mm:ss.nnnnnn+|-hh:mm) used to provide
60+
information on all actions since the specified date.
61+
Eg '2019-02-14T01:55:54.123456+00:00'.
5662
5763
optional arguments:
5864
-h, --help show this help message and exit
5965
-l [LOG_LEVEL], --log-level [LOG_LEVEL]
6066
Set the logging output level. ['CRITICAL', 'ERROR',
6167
'WARNING', 'INFO', 'DEBUG']
68+
-p [AUDIT_COLUMN_PREFIX], --audit-column-prefix [AUDIT_COLUMN_PREFIX]
69+
Set the audit column prefix, used in the destination
70+
schema. Default is 'rdl_'.
6271
```
6372

6473
_Notes:_

rdl/BatchDataLoader.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
from io import StringIO
55
from rdl.column_transformers.StringTransformers import ToUpper
6-
from rdl.shared import Constants
6+
from rdl.shared import Providers
77
from rdl.shared.Utils import prevent_senstive_data_logging
88

99

@@ -97,7 +97,7 @@ def get_destination_column_name(self, source_column_name):
9797
return column['destination']['name']
9898

9999
# Audit columns - map them straight through
100-
if source_column_name.startswith(Constants.AUDIT_COLUMN_PREFIX):
100+
if source_column_name.startswith(Providers.AuditColumnsNames.audit_column_prefix):
101101
return source_column_name
102102

103103
message = f"A source column with name '{source_column_name}' was not found in the column configuration"

rdl/DestinationTableManager.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from sqlalchemy import MetaData, DateTime, Boolean, BigInteger
77
from sqlalchemy.schema import Column, Table
88
from sqlalchemy.sql import func
9-
from rdl.shared import Constants
9+
from rdl.shared import Providers
1010

1111

1212
class DestinationTableManager(object):
@@ -39,13 +39,13 @@ def create_table(self, schema_name, table_name, columns_configuration, drop_firs
3939
table.append_column(self.create_column(column_configuration['destination']))
4040

4141
table.append_column(
42-
Column(Constants.AuditColumnNames.TIMESTAMP, DateTime(timezone=True), server_default=func.now()))
42+
Column(Providers.AuditColumnsNames.TIMESTAMP, DateTime(timezone=True), server_default=func.now()))
4343

4444
table.append_column(
45-
Column(Constants.AuditColumnNames.IS_DELETED, Boolean, server_default='f', default=False))
45+
Column(Providers.AuditColumnsNames.IS_DELETED, Boolean, server_default='f', default=False))
4646

4747
table.append_column(
48-
Column(Constants.AuditColumnNames.CHANGE_VERSION, BigInteger))
48+
Column(Providers.AuditColumnsNames.CHANGE_VERSION, BigInteger))
4949

5050
if drop_first:
5151
self.logger.debug(f"Dropping table {schema_name}.{table_name}")
@@ -103,9 +103,9 @@ def rename_table(self, schema_name, source_table_name, target_table_name):
103103
def upsert_table(self, schema_name, source_table_name, target_table_name, columns_config):
104104
column_array = list(map(lambda column: column['destination']['name'], columns_config))
105105
column_list = ','.join(map(str, column_array))
106-
column_list = column_list + f",{Constants.AuditColumnNames.TIMESTAMP}"
107-
column_list = column_list + f",{Constants.AuditColumnNames.IS_DELETED}"
108-
column_list = column_list + f",{Constants.AuditColumnNames.CHANGE_VERSION}"
106+
column_list = column_list + f",{Providers.AuditColumnsNames.TIMESTAMP}"
107+
column_list = column_list + f",{Providers.AuditColumnsNames.IS_DELETED}"
108+
column_list = column_list + f",{Providers.AuditColumnsNames.CHANGE_VERSION}"
109109

110110
primary_key_column_array = [column_config['destination']['name'] for column_config in
111111
columns_config if 'primary_key' in column_config['destination'] and
@@ -121,9 +121,9 @@ def upsert_table(self, schema_name, source_table_name, target_table_name, column
121121
for column_config in columns_config:
122122
sql_builder.write("{0} = EXCLUDED.{0},\n".format(column_config['destination']['name']))
123123

124-
sql_builder.write("{0} = EXCLUDED.{0},\n".format(Constants.AuditColumnNames.TIMESTAMP))
125-
sql_builder.write("{0} = EXCLUDED.{0},\n".format(Constants.AuditColumnNames.IS_DELETED))
126-
sql_builder.write("{0} = EXCLUDED.{0};\n".format(Constants.AuditColumnNames.CHANGE_VERSION))
124+
sql_builder.write("{0} = EXCLUDED.{0},\n".format(Providers.AuditColumnsNames.TIMESTAMP))
125+
sql_builder.write("{0} = EXCLUDED.{0},\n".format(Providers.AuditColumnsNames.IS_DELETED))
126+
sql_builder.write("{0} = EXCLUDED.{0};\n".format(Providers.AuditColumnsNames.CHANGE_VERSION))
127127

128128
upsert_sql = sql_builder.getvalue()
129129

rdl/RelationalDataLoader.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from datetime import datetime
44
from sqlalchemy import create_engine
55
from rdl.DataLoadManager import DataLoadManager
6-
from rdl.shared import Constants
6+
from rdl.shared import Constants, Providers
77
from rdl.data_load_tracking.DataLoadTrackerRepository import DataLoadTrackerRepository
88
from rdl.data_sources.DataSourceFactory import DataSourceFactory
99
from sqlalchemy.orm import sessionmaker
@@ -24,6 +24,7 @@ def main(self):
2424
self.args = self.get_arguments()
2525

2626
self.configure_root_logger(self.args.log_level)
27+
Providers.AuditColumnsNames.update_audit_column_prefix(self.args.audit_column_prefix)
2728

2829
self.args.func()
2930

@@ -130,6 +131,13 @@ def get_arguments(self):
130131
nargs='?',
131132
help=f'Set the logging output level. {_LOG_LEVEL_STRINGS}')
132133

134+
process_command_parser.add_argument(
135+
'-p', '--audit-column-prefix',
136+
default='rdl_',
137+
type=str,
138+
nargs='?',
139+
help=f'Set the audit column prefix, used in the destination schema. Default is \'rdl_\'. ')
140+
133141
audit_command_parser = subparsers.add_parser('audit',
134142
help='provides list of processed models since a given timestamp')
135143
audit_command_parser.set_defaults(func=self.execute_audit_command)
@@ -162,6 +170,13 @@ def get_arguments(self):
162170
nargs='?',
163171
help=f'Set the logging output level. {_LOG_LEVEL_STRINGS}')
164172

173+
audit_command_parser.add_argument(
174+
'-p', '--audit-column-prefix',
175+
default='rdl_',
176+
type=str,
177+
nargs='?',
178+
help=f'Set the audit column prefix, used in the destination schema. Default is \'rdl_\'. ')
179+
165180
return parser.parse_args()
166181

167182
def str2bool(self, v):

rdl/data_sources/MsSqlDataSource.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
from rdl.ColumnTypeResolver import ColumnTypeResolver
1414
from rdl.data_sources.ChangeTrackingInfo import ChangeTrackingInfo
15-
from rdl.shared import Constants
15+
from rdl.shared import Providers
1616
from rdl.shared.Utils import prevent_senstive_data_logging
1717

1818

@@ -97,9 +97,9 @@ def build_select_statement(self, table_config, columns, batch_config, batch_key_
9797
else:
9898
select_sql = f"SELECT TOP ({batch_config['size']}) {column_names}, " \
9999
f"{MsSqlDataSource.CHANGE_TABLE_ALIAS}.SYS_CHANGE_VERSION" \
100-
f" AS {Constants.AuditColumnNames.CHANGE_VERSION}, " \
100+
f" AS {Providers.AuditColumnsNames.CHANGE_VERSION}, " \
101101
f"CASE {MsSqlDataSource.CHANGE_TABLE_ALIAS}.SYS_CHANGE_OPERATION WHEN 'D' THEN 1 ELSE 0 " \
102-
f"END AS {Constants.AuditColumnNames.IS_DELETED}"
102+
f"END AS {Providers.AuditColumnsNames.IS_DELETED}"
103103
from_sql = f"FROM CHANGETABLE(CHANGES" \
104104
f" {table_config['schema']}.{table_config['name']}," \
105105
f" {change_tracking_info.last_sync_version})" \

rdl/shared/Constants.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
APP_NAME = 'Relational Data Loader'
22
DATA_PIPELINE_EXECUTION_SCHEMA_NAME = 'data_pipeline'
3-
AUDIT_COLUMN_PREFIX = f'{DATA_PIPELINE_EXECUTION_SCHEMA_NAME}_'
43

54

65
class FullRefreshReason:
@@ -19,9 +18,3 @@ class ExecutionStatus:
1918
LOAD_COMPLETED_SUCCESSFULLY = 'Load Completed Successfully'
2019
SKIPPED_AS_ZERO_ROWS = 'Skipped - Zero Rows'
2120
COMPLETED_SUCCESSFULLY = 'Completed Successfully'
22-
23-
24-
class AuditColumnNames:
25-
TIMESTAMP = f'{AUDIT_COLUMN_PREFIX}timestamp'
26-
IS_DELETED = f'{AUDIT_COLUMN_PREFIX}is_deleted'
27-
CHANGE_VERSION = f'{AUDIT_COLUMN_PREFIX}change_version'

rdl/shared/Providers.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
class __AuditColumnsNames:
2+
def __init__(self):
3+
self.audit_column_prefix = 'rdl_'
4+
self.__changed = False
5+
6+
def update_audit_column_prefix(self, audit_column_prefix):
7+
if self.__changed:
8+
raise RuntimeError("Audit Column Prefix has already been set")
9+
self.audit_column_prefix = audit_column_prefix
10+
self.__changed = True
11+
12+
@property
13+
def TIMESTAMP(self):
14+
return f'{self.audit_column_prefix}timestamp'
15+
16+
@property
17+
def IS_DELETED(self):
18+
return f'{self.audit_column_prefix}is_deleted'
19+
20+
@property
21+
def CHANGE_VERSION(self):
22+
return f'{self.audit_column_prefix}change_version'
23+
24+
25+
AuditColumnsNames = __AuditColumnsNames()

0 commit comments

Comments
 (0)