Skip to content

Commit bf2484c

Browse files
authored
Cloud-DB extractor: Fix windows compatibility (#71)
Updated path refs to make windows compatible Support for table names with [ ] (transactSQL) Improved logging during cursor read Improved documentation
1 parent 0f9062b commit bf2484c

File tree

3 files changed

+73
-19
lines changed

3 files changed

+73
-19
lines changed

Community-Supported/clouddb-extractor/README.md

Lines changed: 55 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ __Current Version__: 1.0
77

88
Cloud Database Extractor Utility - This sample shows how to extract data from a cloud database to a published hyper extract and append/update/delete rows to keep up to date.
99

10+
A detailed article about this utility is availabe at: https://www.tableau.com/developer/learning/how-synchronize-your-cloud-data-tableau-extracts-scale
11+
1012
# Overview
1113
This package defines a standard Extractor Interface which is extended by specific implementations
1214
to support specific cloud databases. For most use cases you will probably only ever call the
@@ -42,6 +44,7 @@ $ python3 extractor_cli.py --help
4244
{load_sample,export_load,append,update,delete}
4345
[--extractor {bigquery}]
4446
[--source_table_id SOURCE_TABLE_ID]
47+
[--overwrite]
4548
[--tableau_project TABLEAU_PROJECT]
4649
--tableau_datasource TABLEAU_DATASOURCE
4750
[--tableau_hostname TABLEAU_HOSTNAME]
@@ -63,16 +66,62 @@ $ python3 extractor_cli.py --help
6366
```
6467

6568
### Sample Usage
69+
Before use you should modify the file config.yml with your tableau and database settings.
6670

71+
__Load Sample:__ Load a sample (default=1000 lines) from test_table to sample_extract in test_project:
6772
```console
68-
# Load a sample (default=1000 lines) from test_table to sample_extract in test_project
69-
python3 extractor_cli.py load_sample --tableau_token_name hyperapitest --tableau_token_secretfile hyperapitest.token --source_table_id test_table --tableau_project test_project --tableau_datasource sample_extract
73+
python3 extractor_cli.py load_sample --tableau_token_name hyperapitest --tableau_token_secretfile hyperapitest.token \
74+
--source_table_id test_table --tableau_project test_project --tableau_datasource sample_extract
75+
```
7076

71-
# Load a full extract from test_table to full_extract in test_project
72-
python3 extractor_cli.py export_load --tableau_token_name hyperapitest --tableau_token_secretfile hyperapitest.token --source_table_id test_table --tableau_project test_project --tableau_datasource full_extract
77+
__Full Export:__ Load a full extract from test_table to full_extract in test_project:
78+
```console
79+
python extractor_cli.py export_load --tableau_token_name hyperapitest --tableau_token_secretfile hyperapitest.token \
80+
--source_table_id "test_table" --tableau_project "test_project" --tableau_datasource "test_datasource"
81+
```
7382

74-
# Execute updated_rows.sql to retrieve a changeset and update full_extract where ROW_ID in changeset matches
75-
python3 extractor_cli.py update --tableau_token_name hyperapitest --tableau_token_secretfile hyperapitest.token --sqlfile updated_rows.sql --tableau_project test_project --tableau_datasource full_extract --match_columns ROW_ID ROW_ID
83+
84+
__Append:__ Execute new_rows.sql to retrieve a changeset and append to test_datasource:
85+
```console
86+
# new_rows.sql:
87+
SELECT * FROM staging_table
88+
89+
python extractor_cli.py update --tableau_token_name hyperapitest --tableau_token_secretfile hyperapitest.token \
90+
--sqlfile new_rows.sql --tableau_project "test_project" --tableau_datasource "test_datasource"
91+
```
92+
93+
__Update:__ Execute updated_rows.sql to retrieve a changeset and update test_datasource where primary key columns in changeset (METRIC_ID and METRIC_DATE) match corresponding columns in target datasource:
94+
```console
95+
# updated_rows.sql:
96+
SELECT * FROM source_table WHERE LOAD_TIMESTAMP<UPDATE_TIMESTAMP
97+
98+
python extractor_cli.py update --tableau_token_name hyperapitest --tableau_token_secretfile hyperapitest.token \
99+
--sqlfile updated_rows.sql --tableau_project "test_project" --tableau_datasource "test_datasource" \
100+
--match_columns METRIC_ID METRIC_ID --match_columns METRIC_DATE METRIC_DATE
101+
```
102+
103+
__Delete:__ Execute deleted_rows.sql to retrieve a changeset containing the primary key columns that identify which rows have been deleted. a list of deleted rows. Delete full_extract where METRIC_ID and METRIC_DATE in changeset match corresponding columns in target datasource:
104+
```console
105+
# deleted_rows.sql:
106+
SELECT METRIC_ID, METRIC_DATE FROM source_table_deleted_rows
107+
108+
python extractor_cli.py delete --tableau_token_name hyperapitest --tableau_token_secretfile hyperapitest.token \
109+
--sqlfile deleted_rows.sql --tableau_project "test_project" --tableau_datasource "full_extract" \
110+
--match_columns METRIC_ID METRIC_ID --match_columns METRIC_DATE METRIC_DATE
111+
```
112+
113+
__Conditional Delete:__ In this example no changeset is provided - records to be deleted are identified using the conditions specified in delete_conditions.json
114+
```console
115+
# delete_conditions.json
116+
{
117+
"op": "lt",
118+
"target-col": "ORDER_DATE",
119+
"const": {"type": "datetime", "v": "2018-02-01T00:00:00Z"}
120+
}
121+
122+
python extractor_cli.py delete --tableau_token_name hyperapitest --tableau_token_secretfile hyperapitest.token \
123+
--tableau_project "test_project" --tableau_datasource "full_extract" \
124+
--match_conditions_json=delete_conditions.json
76125
```
77126

78127
# Installation

Community-Supported/clouddb-extractor/base_extractor.py

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,6 @@
6565
Set to Telemetry.DO_NOT_SEND_USAGE_DATA_TO_TABLEAU to disable
6666
"""
6767

68-
TEMP_DIR: str = "/tmp"
69-
"""
70-
TEMP_DIR (str): Local staging directory for hyper files, database exports etc.
71-
"""
72-
7368
SAMPLE_ROWS: int = 1000
7469
"""
7570
SAMPLE_ROWS (int): Default number of rows for LIMIT when using load_sample
@@ -88,11 +83,17 @@
8883
multiple hosts
8984
"""
9085

91-
# DATASOURCE_LOCKFILE_PREFIX: str = "/var/lock/tableau_extractor"
92-
DATASOURCE_LOCKFILE_PREFIX: str = "/tmp/lock.tableau_extractor"
86+
DATASOURCE_LOCKFILE_PREFIX: str = "tableau_extractor"
9387
"""
94-
DATASOURCE_LOCKFILE_PREFIX (str): Defines the location of lockfiles
88+
DATASOURCE_LOCKFILE_PREFIX (str): Defines the naming convention for lockfiles
89+
"""
90+
91+
TEMP_DIR: str = "/tmp"
92+
"""
93+
TEMP_DIR (str): Local staging directory for hyper files, database exports etc.
9594
"""
95+
if os.name == 'nt':
96+
TEMP_DIR = os.environ.get('TEMP')
9697

9798
DEFAULT_SITE_ID: str = ""
9899
"""
@@ -190,7 +191,7 @@ def wrapper_debug(*args, **kwargs):
190191

191192
def tempfile_name(prefix: str = "", suffix: str = "") -> str:
192193
"""Return a unique temporary file name."""
193-
return "{}/tableau_extractor_{}{}{}".format(TEMP_DIR, prefix, uuid.uuid4().hex, suffix)
194+
return os.path.join(TEMP_DIR, "{}_tableau_extractor_{}{}".format(prefix, uuid.uuid4().hex, suffix))
194195

195196

196197
class BaseExtractor(ABC):
@@ -271,8 +272,7 @@ def quoted_sql_identifier(self, sql_identifier: str) -> str:
271272
if len(sql_identifier) > maxlength:
272273
raise Exception("Invalid SQL identifier: {} - exceeded max allowed length: {}".format(sql_identifier, maxlength))
273274

274-
# char_whitelist = re.compile("^[A-Za-z0-9_-.]*$")
275-
char_whitelist = re.compile(r"\A[\w\.\-]*\Z")
275+
char_whitelist = re.compile(r"\A[\[\w\.\-\]]*\Z")
276276
if char_whitelist.match(sql_identifier) is None:
277277
raise Exception("Invalid SQL identifier: {} - found invalid characters".format(sql_identifier))
278278

@@ -317,7 +317,7 @@ def _datasource_lock(self, tab_ds_name: str) -> FileLock:
317317
#exclusive lock active for datasource here
318318
#exclusive lock released for datasource here
319319
"""
320-
lock_path = "{}.{}.{}.lock".format(DATASOURCE_LOCKFILE_PREFIX, self.tableau_project_id, tab_ds_name)
320+
lock_path = os.path.join(TEMP_DIR,"{}.{}.{}.lock".format(DATASOURCE_LOCKFILE_PREFIX, self.tableau_project_id, tab_ds_name))
321321
return FileLock(lock_path, timeout=DATASOURCE_LOCK_TIMEOUT)
322322

323323
def _get_project_id(self, tab_project: str) -> str:
@@ -400,13 +400,19 @@ def query_result_to_hyper_file(
400400
inserter.execute()
401401
else:
402402
assert cursor is not None
403+
logger.info(f"Spooling cursor to hyper file, DBAPI_BATCHSIZE={self.dbapi_batchsize}")
404+
batches=0
403405
if rows:
404406
# We have rows in the buffer from where we determined the cursor.description for server side cursor
405407
inserter.add_rows(rows)
408+
batches+=1
406409
while True:
407410
rows = cursor.fetchmany(self.dbapi_batchsize)
408411
if rows:
409412
inserter.add_rows(rows)
413+
batches+=1
414+
if batches % 10 == 0:
415+
logger.info(f"Completed Batch {batches}")
410416
else:
411417
break
412418
inserter.execute()

Community-Supported/clouddb-extractor/requirements.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
filelock==3.0.12
22
PyYAML==5.4.1
33
toml==0.10.2
4-
typed-ast==1.4.3
54
types-filelock==0.1.3
65
types-futures==0.1.3
76
types-protobuf==0.1.11

0 commit comments

Comments
 (0)