Skip to content
This repository was archived by the owner on Mar 13, 2020. It is now read-only.

Commit 0a1b17c

Browse files
committed
restructure MsSqlDataSource code such that public api methods are together and private / other-class methods together
1 parent b69b7a8 commit 0a1b17c

File tree

2 files changed

+58
-67
lines changed

2 files changed

+58
-67
lines changed

rdl/BatchDataLoader.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,9 @@ def load_batch(self, batch_key_tracker):
3131
f"sync_version: '{self.change_tracking_info.sync_version}', "
3232
f"last_sync_version: '{self.change_tracking_info.last_sync_version}'.")
3333

34-
data_frame = self.source_db.get_next_data_frame(self.source_table_config, self.columns,
35-
self.batch_config, batch_tracker, batch_key_tracker,
36-
self.full_refresh, self.change_tracking_info)
34+
data_frame = self.source_db.get_table_data_frame(self.source_table_config, self.columns,
35+
self.batch_config, batch_tracker, batch_key_tracker,
36+
self.full_refresh, self.change_tracking_info)
3737

3838
if data_frame is None or len(data_frame) == 0:
3939
self.logger.debug("There are no more rows to import.")

rdl/data_sources/MsSqlDataSource.py

Lines changed: 55 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -34,24 +34,27 @@ def __init__(self, connection_string, logger=None):
3434

3535
@staticmethod
3636
def can_handle_connection_string(connection_string):
37-
return MsSqlDataSource.__connection_string_regex_match(connection_string) is not None
37+
return re.match(MsSqlDataSource.MSSQL_STRING_REGEX, connection_string) is not None
3838

39-
@staticmethod
40-
def __connection_string_regex_match(connection_string):
41-
return re.match(MsSqlDataSource.MSSQL_STRING_REGEX, connection_string)
39+
def get_table_info(self, table_config, last_known_sync_version):
40+
columns_in_database = self.__get_table_columns(table_config)
41+
change_tracking_info = self.__get_change_tracking_info(table_config, last_known_sync_version)
42+
source_table_info = SourceTableInfo(columns_in_database, change_tracking_info)
43+
return source_table_info
4244

43-
@staticmethod
44-
def connection_string_prefix():
45-
return 'mssql+pyodbc://'
45+
@prevent_senstive_data_logging
46+
def get_table_data_frame(self, table_config, columns, batch_config, batch_tracker, batch_key_tracker,
47+
full_refresh, change_tracking_info):
48+
sql = self.__build_select_statement(table_config, columns, batch_config, batch_key_tracker,
49+
full_refresh, change_tracking_info)
4650

47-
@staticmethod
48-
def prefix_column(column_name, full_refresh, primary_key_column_names):
49-
if not isinstance(primary_key_column_names, (list, tuple)):
50-
raise TypeError(f"Argument 'primary_key_column_names' must be a list or tuple")
51-
if column_name in primary_key_column_names and not full_refresh:
52-
return f"{MsSqlDataSource.CHANGE_TABLE_ALIAS}.{column_name}"
53-
else:
54-
return f"{MsSqlDataSource.SOURCE_TABLE_ALIAS}.{column_name}"
51+
self.logger.debug(f"Starting read of SQL Statement: \n{sql}")
52+
data_frame = pandas.read_sql_query(sql, self.database_engine)
53+
self.logger.debug("Completed read")
54+
55+
batch_tracker.extract_completed_successfully(len(data_frame))
56+
57+
return data_frame
5558

5659
def __create_connection_with_failover(self):
5760
conn_string_data = MsSqlDataSource.__connection_string_regex_match(self.connection_string)
@@ -83,41 +86,6 @@ def __create_connection_with_failover(self):
8386
return pyodbc.connect(dsn, server=failover)
8487
raise e
8588

86-
def __build_select_statement(self, table_config, columns, batch_config, batch_key_tracker, full_refresh,
87-
change_tracking_info):
88-
column_array = list(
89-
map(lambda cfg: self.prefix_column(cfg['source_name'], full_refresh, table_config['primary_keys']),
90-
columns))
91-
column_names = ", ".join(column_array)
92-
93-
if full_refresh:
94-
select_sql = f"SELECT TOP ({batch_config['size']}) {column_names}"
95-
from_sql = f"FROM {table_config['schema']}.{table_config['name']} AS {MsSqlDataSource.SOURCE_TABLE_ALIAS}"
96-
where_sql = f"WHERE {self.__build_where_clause(batch_key_tracker, MsSqlDataSource.SOURCE_TABLE_ALIAS)}"
97-
order_by_sql = "ORDER BY " + f", {MsSqlDataSource.SOURCE_TABLE_ALIAS}.".join(table_config['primary_keys'])
98-
else:
99-
select_sql = f"SELECT TOP ({batch_config['size']}) {column_names}, " \
100-
f"{MsSqlDataSource.CHANGE_TABLE_ALIAS}.SYS_CHANGE_VERSION" \
101-
f" AS {Providers.AuditColumnsNames.CHANGE_VERSION}, " \
102-
f"CASE {MsSqlDataSource.CHANGE_TABLE_ALIAS}.SYS_CHANGE_OPERATION WHEN 'D' THEN 1 ELSE 0 " \
103-
f"END AS {Providers.AuditColumnsNames.IS_DELETED}"
104-
from_sql = f"FROM CHANGETABLE(CHANGES" \
105-
f" {table_config['schema']}.{table_config['name']}," \
106-
f" {change_tracking_info.last_sync_version})" \
107-
f" AS {MsSqlDataSource.CHANGE_TABLE_ALIAS}" \
108-
f" LEFT JOIN {table_config['schema']}.{table_config['name']} AS {MsSqlDataSource.SOURCE_TABLE_ALIAS}" \
109-
f" ON {self.__build_change_table_on_clause(batch_key_tracker)}"
110-
where_sql = f"WHERE {self.__build_where_clause(batch_key_tracker, MsSqlDataSource.CHANGE_TABLE_ALIAS)}"
111-
order_by_sql = "ORDER BY " + f", {MsSqlDataSource.CHANGE_TABLE_ALIAS}.".join(table_config['primary_keys'])
112-
113-
return f"{select_sql} \n {from_sql} \n {where_sql} \n {order_by_sql};"
114-
115-
def get_table_info(self, table_config, last_known_sync_version):
116-
columns_in_database = self.__get_table_columns(table_config)
117-
change_tracking_info = self.__get_change_tracking_info(table_config, last_known_sync_version)
118-
source_table_info = SourceTableInfo(columns_in_database, change_tracking_info)
119-
return source_table_info
120-
12189
def __get_table_columns(self, table_config):
12290
metadata = MetaData()
12391
self.logger.debug(f"Reading definition for source table "
@@ -126,20 +94,6 @@ def __get_table_columns(self, table_config):
12694
autoload_with=self.database_engine)
12795
return list(map(lambda column: column.name, table.columns))
12896

129-
@prevent_senstive_data_logging
130-
def get_next_data_frame(self, table_config, columns, batch_config, batch_tracker, batch_key_tracker,
131-
full_refresh, change_tracking_info):
132-
sql = self.__build_select_statement(table_config, columns, batch_config, batch_key_tracker,
133-
full_refresh, change_tracking_info)
134-
135-
self.logger.debug(f"Starting read of SQL Statement: \n{sql}")
136-
data_frame = pandas.read_sql_query(sql, self.database_engine)
137-
self.logger.debug("Completed read")
138-
139-
batch_tracker.extract_completed_successfully(len(data_frame))
140-
141-
return data_frame
142-
14397
def __get_change_tracking_info(self, table_config, last_known_sync_version):
14498

14599
if last_known_sync_version is None:
@@ -216,6 +170,43 @@ def __get_change_tracking_info(self, table_config, last_known_sync_version):
216170
return ChangeTrackingInfo(row["last_sync_version"], row["sync_version"],
217171
row["force_full_load"], row["data_changed_since_last_sync"])
218172

173+
def __build_select_statement(self, table_config, columns, batch_config, batch_key_tracker, full_refresh,
174+
change_tracking_info):
175+
column_array = list(map(lambda cfg: MsSqlDataSource.prefix_column(
176+
cfg['source_name'], full_refresh, table_config['primary_keys']), columns))
177+
column_names = ", ".join(column_array)
178+
179+
if full_refresh:
180+
select_sql = f"SELECT TOP ({batch_config['size']}) {column_names}"
181+
from_sql = f"FROM {table_config['schema']}.{table_config['name']} AS {MsSqlDataSource.SOURCE_TABLE_ALIAS}"
182+
where_sql = f"WHERE {self.__build_where_clause(batch_key_tracker, MsSqlDataSource.SOURCE_TABLE_ALIAS)}"
183+
order_by_sql = "ORDER BY " + f", {MsSqlDataSource.SOURCE_TABLE_ALIAS}.".join(table_config['primary_keys'])
184+
else:
185+
select_sql = f"SELECT TOP ({batch_config['size']}) {column_names}, " \
186+
f"{MsSqlDataSource.CHANGE_TABLE_ALIAS}.SYS_CHANGE_VERSION" \
187+
f" AS {Providers.AuditColumnsNames.CHANGE_VERSION}, " \
188+
f"CASE {MsSqlDataSource.CHANGE_TABLE_ALIAS}.SYS_CHANGE_OPERATION WHEN 'D' THEN 1 ELSE 0 " \
189+
f"END AS {Providers.AuditColumnsNames.IS_DELETED}"
190+
from_sql = f"FROM CHANGETABLE(CHANGES" \
191+
f" {table_config['schema']}.{table_config['name']}," \
192+
f" {change_tracking_info.last_sync_version})" \
193+
f" AS {MsSqlDataSource.CHANGE_TABLE_ALIAS}" \
194+
f" LEFT JOIN {table_config['schema']}.{table_config['name']} AS {MsSqlDataSource.SOURCE_TABLE_ALIAS}" \
195+
f" ON {self.__build_change_table_on_clause(batch_key_tracker)}"
196+
where_sql = f"WHERE {self.__build_where_clause(batch_key_tracker, MsSqlDataSource.CHANGE_TABLE_ALIAS)}"
197+
order_by_sql = "ORDER BY " + f", {MsSqlDataSource.CHANGE_TABLE_ALIAS}.".join(table_config['primary_keys'])
198+
199+
return f"{select_sql} \n {from_sql} \n {where_sql} \n {order_by_sql};"
200+
201+
@staticmethod
202+
def prefix_column(column_name, full_refresh, primary_key_column_names):
203+
if not isinstance(primary_key_column_names, (list, tuple)):
204+
raise TypeError(f"Argument 'primary_key_column_names' must be a list or tuple")
205+
if column_name in primary_key_column_names and not full_refresh:
206+
return f"{MsSqlDataSource.CHANGE_TABLE_ALIAS}.{column_name}"
207+
else:
208+
return f"{MsSqlDataSource.SOURCE_TABLE_ALIAS}.{column_name}"
209+
219210
@staticmethod
220211
def __build_where_clause(batch_key_tracker, table_alias):
221212
has_value = False

0 commit comments

Comments
 (0)