Skip to content
This repository was archived by the owner on Mar 13, 2020. It is now read-only.

Commit 9e4ba3d

Browse files
authored
[OSC-1257] add failover server compatibility (#25)
* add failover server compatibility * better logging * make username/password check stricter * only log things that exist * use trusted connection * quote string in integration tests * stricter regex * simplify logic, complify regex * tidy regex and sort imports * avoid divide by zero error * move function down
1 parent b7b9755 commit 9e4ba3d

File tree

4 files changed

+58
-4
lines changed

4 files changed

+58
-4
lines changed

appveyor.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ test_script:
6666

6767
- test_full_refresh_from_mssql.cmd
6868
- test_audit.cmd
69+
- test_mssql_failover_server.cmd
6970

7071
on_finish:
7172
#Enable this line to make the build pause after completion for RDP troubleshooting.

rdl/data_load_tracking/DataLoadTracker.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ def completed_successfully(self):
4747
for batch in self.batches:
4848
self.total_row_count += batch.row_count
4949

50-
self.rows_per_second = self.total_row_count / self.total_execution_time.total_seconds()
50+
self.rows_per_second = self.total_row_count / (self.total_execution_time.total_seconds() + 1e-10)
5151

5252
def get_statistics(self):
5353
load_type = 'FULL' if self.is_full_refresh else f"INCREMENTAL from " \

rdl/data_sources/MsSqlDataSource.py

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,42 @@
11
import io
22
import logging
33
import pandas
4+
import pyodbc
5+
import re
6+
7+
import sqlalchemy.exc
48
from sqlalchemy import create_engine
59
from sqlalchemy import MetaData
610
from sqlalchemy.schema import Table
11+
from sqlalchemy.sql import text
12+
713
from rdl.ColumnTypeResolver import ColumnTypeResolver
814
from rdl.data_sources.ChangeTrackingInfo import ChangeTrackingInfo
9-
from sqlalchemy.sql import text
1015
from rdl.shared import Constants
1116

1217

1318
class MsSqlDataSource(object):
1419
SOURCE_TABLE_ALIAS = 'src'
1520
CHANGE_TABLE_ALIAS = 'chg'
21+
MSSQL_STRING_REGEX = r"mssql\+pyodbc://" \
22+
r"(?:(?P<username>[^@/?&:]+)?:(?P<password>[^@/?&:]+)?@)?" \
23+
r"(?P<server>[^@/?&:]*)/(?P<database>[^@/?&:]*)" \
24+
r"\?driver=(?P<driver>[^@/?&:]*)" \
25+
r"(?:&failover=(?P<failover>[^@/?&:]*))?"
1626

1727
def __init__(self, connection_string, logger=None):
1828
self.logger = logger or logging.getLogger(__name__)
1929
self.connection_string = connection_string
20-
self.database_engine = create_engine(connection_string)
30+
self.database_engine = create_engine(connection_string, creator=self.create_connection_with_failover)
2131
self.column_type_resolver = ColumnTypeResolver()
2232

2333
@staticmethod
2434
def can_handle_connection_string(connection_string):
25-
return connection_string.startswith(MsSqlDataSource.connection_string_prefix())
35+
return MsSqlDataSource.connection_string_regex_match(connection_string) is not None
36+
37+
@staticmethod
38+
def connection_string_regex_match(connection_string):
39+
return re.match(MsSqlDataSource.MSSQL_STRING_REGEX, connection_string)
2640

2741
@staticmethod
2842
def connection_string_prefix():
@@ -37,6 +51,36 @@ def prefix_column(column_name, full_refresh, primary_key_column_names):
3751
else:
3852
return f"{MsSqlDataSource.SOURCE_TABLE_ALIAS}.{column_name}"
3953

54+
def create_connection_with_failover(self):
55+
conn_string_data = MsSqlDataSource.connection_string_regex_match(self.connection_string)
56+
server = conn_string_data.group('server')
57+
failover = conn_string_data.group('failover')
58+
database = conn_string_data.group('database')
59+
driver = "{"+conn_string_data.group('driver').replace('+', ' ')+"}"
60+
dsn = f'DRIVER={driver};DATABASE={database};'
61+
62+
username = conn_string_data.group('username')
63+
password = conn_string_data.group('password')
64+
65+
login_cred = "Trusted_Connection=yes;"
66+
if username is not None and password is not None:
67+
login_cred = f'UID={username};PWD={password};'
68+
69+
dsn += login_cred
70+
self.logger.info(
71+
'Parsed Connection Details: ' +
72+
f'''FAILOVER={failover}
73+
SERVER={server}
74+
DRIVER={driver}
75+
DATABASE={database}''')
76+
try:
77+
return pyodbc.connect(dsn, server=server)
78+
except (sqlalchemy.exc.OperationalError, pyodbc.OperationalError) as e:
79+
if e.args[0] == "08001" and failover is not None:
80+
self.logger.warning(f'Using Failover Server: {failover}')
81+
return pyodbc.connect(dsn, server=failover)
82+
raise e
83+
4084
def build_select_statement(self, table_config, columns, batch_config, batch_key_tracker, full_refresh,
4185
change_tracking_info):
4286
column_array = list(

test_mssql_failover_server.cmd

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
@echo off
2+
py -m rdl process "mssql+pyodbc://(local)\SQL2016/RelationalDataLoaderIntegrationTestSource?driver=SQL+Server+Native+Client+11.0" postgresql+psycopg2://postgres:there_is_no_password_due_to_pg_trust@localhost/relational_data_loader_integration_tests ./integration_tests/mssql_source/config/ --log-level DEBUG
3+
if %errorlevel% neq 0 exit /b %errorlevel%
4+
5+
py -m rdl process "mssql+pyodbc://fake_server\SQL2016/RelationalDataLoaderIntegrationTestSource?driver=SQL+Server+Native+Client+11.0&failover=(local)" postgresql+psycopg2://postgres:there_is_no_password_due_to_pg_trust@localhost/relational_data_loader_integration_tests ./integration_tests/mssql_source/config/ --log-level DEBUG
6+
if %errorlevel% neq 0 exit /b %errorlevel%
7+
8+
py -m rdl process "mssql+pyodbc://(local)\SQL2016/RelationalDataLoaderIntegrationTestSource?driver=SQL+Server+Native+Client+11.0&failover=fake_server" postgresql+psycopg2://postgres:there_is_no_password_due_to_pg_trust@localhost/relational_data_loader_integration_tests ./integration_tests/mssql_source/config/ --log-level DEBUG
9+
if %errorlevel% neq 0 exit /b %errorlevel%

0 commit comments

Comments
 (0)