Skip to content
This repository was archived by the owner on Mar 13, 2020. It is now read-only.

Commit 1c4554c

Browse files
committed
Fixed CSV regressions
1 parent 0388fca commit 1c4554c

File tree

3 files changed

+12
-14
lines changed

3 files changed

+12
-14
lines changed

modules/data_sources/CsvDataSource.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,15 @@
33
import os.path
44
from modules.ColumnTypeResolver import ColumnTypeResolver
55
from pathlib import Path
6+
from modules.data_sources.ChangeTrackingInfo import ChangeTrackingInfo
67

78

89
class CsvDataSource(object):
910
def __init__(self, connection_string, logger=None):
1011
self.logger = logger or logging.getLogger(__name__)
1112
self.source_path = Path(connection_string[len(self.connection_string_prefix()):])
1213
self.column_type_resolver = ColumnTypeResolver()
14+
1315
@staticmethod
1416
def can_handle_connection_string(connection_string):
1517
return connection_string.startswith(CsvDataSource.connection_string_prefix())
@@ -41,7 +43,7 @@ def assert_column_exists(self, column_name, data_frame, csv_file):
4143

4244

4345
# For now, the CSV data sources will get all rows in the CSV regardless of batch size. - Ie, they don't currently support paging.
44-
def get_next_data_frame(self, table_configuration, columns, batch_configuration, batch_tracker, previous_batch_key):
46+
def get_next_data_frame(self, table_configuration, columns, batch_configuration, batch_tracker, previous_batch_key, full_refresh, change_tracking_info):
4547

4648
if previous_batch_key > 0:
4749
return None
@@ -55,9 +57,11 @@ def get_next_data_frame(self, table_configuration, columns, batch_configuration,
5557

5658
self.logger.debug("Starting read of file: {0}".format(csv_file))
5759

58-
5960
data_frame = pandas.read_csv(csv_file)
6061
self.logger.debug("Completed read")
6162

6263
batch_tracker.extract_completed_successfully(len(data_frame))
6364
return data_frame
65+
66+
def init_change_tracking(self, table_configuration, last_sync_version):
67+
return ChangeTrackingInfo(0,0)

modules/data_sources/MsSqlDataSource.py

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,6 @@ def prefix_column(column_name, full_refresh, primary_key_column_name):
3434
def build_select_statement(self, table_configuration, columns, batch_configuration, previous_batch_key, full_refresh, change_tracking_info):
3535
column_array = list(map(lambda cfg: self.prefix_column(cfg['source_name'], full_refresh, table_configuration['primary_key']), columns))
3636
column_names = ", ".join(column_array)
37-
38-
#This line below is temp until we have a proper storage log of what ran - then data_pipeline_next_change_minimum_version will be stored there.
3937
column_names = "{0}, {1} as data_pipeline_next_change_minimum_version".format(column_names, change_tracking_info.next_sync_version)
4038
if full_refresh:
4139
return "SELECT TOP ({0}) {1} FROM {2}.{3} t WHERE t.{4} > {5} ORDER BY t.{4}".format(batch_configuration['size'],
@@ -86,7 +84,7 @@ def get_table_columns(self, table_configuration):
8684
return list(map(lambda column: column.name, table.columns))
8785

8886

89-
def get_next_data_frame(self, table_configuration, columns, batch_configuration, batch_tracker, previous_batch_key, full_refresh, change_tracking_info,):
87+
def get_next_data_frame(self, table_configuration, columns, batch_configuration, batch_tracker, previous_batch_key, full_refresh, change_tracking_info):
9088
sql = self.build_select_statement(table_configuration, columns, batch_configuration, previous_batch_key, full_refresh, change_tracking_info,)
9189
self.logger.debug("Starting read of SQL Statement: {0}".format(sql))
9290
data_frame = pandas.read_sql_query(sql, self.database_engine)
@@ -110,7 +108,6 @@ def init_change_tracking(self, table_configuration, last_sync_version):
110108

111109
self.database_engine.execute(text(sql_builder.getvalue()).execution_options(autocommit=True))
112110

113-
114111
sql_builder = io.StringIO()
115112
sql_builder.write("DECLARE @last_sync_version bigint = {0}; \n".format(last_sync_version))
116113
sql_builder.write("DECLARE @this_sync_version bigint = 0; \n")
@@ -125,9 +122,6 @@ def init_change_tracking(self, table_configuration, last_sync_version):
125122

126123
result = self.database_engine.execute(sql_builder.getvalue())
127124
row = result.fetchone()
128-
return_value = ChangeTrackingInfo(row["this_sync_version"], row["next_sync_version"])
129-
130125
sql_builder.close()
131126

132-
return return_value
133-
127+
return ChangeTrackingInfo(row["this_sync_version"], row["next_sync_version"])

test_full_refresh_from_mssql.cmd

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
@echo off
2-
REM IF /I "%APPVEYOR%"=="TRUE" py rdl.py mssql+pyodbc://(local)\SQL2016/RelationalDataLoaderIntegrationTestSource?driver=SQL+Server+Native+Client+11.0 postgresql+psycopg2://postgres:there_is_no_password_due_to_pg_trust@localhost/relational_data_loader_integration_tests ./integration_tests/mssql_source/config/ --log-level DEBUG --full-refresh yes
2+
IF /I "%APPVEYOR%"=="TRUE" py rdl.py mssql+pyodbc://(local)\SQL2016/RelationalDataLoaderIntegrationTestSource?driver=SQL+Server+Native+Client+11.0 postgresql+psycopg2://postgres:there_is_no_password_due_to_pg_trust@localhost/relational_data_loader_integration_tests ./integration_tests/mssql_source/config/ --log-level DEBUG --full-refresh yes
33
IF /I NOT "%APPVEYOR%"=="TRUE" py rdl.py mssql+pyodbc://(local)/RelationalDataLoaderIntegrationTestSource?driver=SQL+Server+Native+Client+11.0 postgresql+psycopg2://postgres:there_is_no_password_due_to_pg_trust@localhost/relational_data_loader_integration_tests ./integration_tests/mssql_source/config/ --log-level DEBUG --full-refresh no
4-
REM if %errorlevel% neq 0 exit /b %errorlevel%
5-
REM psql -U postgres -d relational_data_loader_integration_tests -a -v ON_ERROR_STOP=1 -f ./integration_tests/mssql_source/assertions/large_table_test_full_refresh_assertions.sql
6-
REM if %errorlevel% neq 0 exit /b %errorlevel%
4+
if %errorlevel% neq 0 exit /b %errorlevel%
5+
psql -U postgres -d relational_data_loader_integration_tests -a -v ON_ERROR_STOP=1 -f ./integration_tests/mssql_source/assertions/large_table_test_full_refresh_assertions.sql
6+
if %errorlevel% neq 0 exit /b %errorlevel%
77

88

99

0 commit comments

Comments
 (0)