diff --git a/.gitignore b/.gitignore index a0905f7..57511b3 100644 --- a/.gitignore +++ b/.gitignore @@ -178,3 +178,6 @@ carto_credentials.json .idea/codeStyles/codeStyleConfig.xml .idea/codeStyles/Project.xml .idea/.gitignore + +# Vim +*.swp diff --git a/Makefile b/Makefile index fa7ff8c..ce2b9bc 100644 --- a/Makefile +++ b/Makefile @@ -10,7 +10,7 @@ init: [ -d $(VENV) ] || python3 -m venv $(VENV) $(BIN)/pip install -r requirements-dev.txt $(BIN)/pre-commit install - $(BIN)/pip install -e .[snowflake,bigquery] + $(BIN)/pip install -e .[all] lint: $(BIN)/black raster_loader setup.py diff --git a/README.md b/README.md index a0b3c8b..15dfc3d 100644 --- a/README.md +++ b/README.md @@ -21,6 +21,7 @@ pip install -U raster-loader pip install -U raster-loader"[bigquery]" pip install -U raster-loader"[snowflake]" +pip install -U raster-loader"[databricks]" ``` ### Installing from source @@ -31,6 +32,7 @@ cd raster-loader pip install . ``` + ## Usage There are two ways you can use Raster Loader: @@ -150,6 +152,19 @@ project. [ROADMAP.md](ROADMAP.md) contains a list of features and improvements planned for future versions of Raster Loader. +### Installing for Development + +``` +make init +source env/bin/activate +``` + +Doing `which carto` should return something like `/my/local/filesystem/raster-loader/eenv/bin/carto` instead of the system-wide installation. + +The `-e` flag passed to the `pip install` program will set the project and its dependencies in development mode. Changes to the project files +will be reflected in the `carto` command immedietly without the need to re-run any setup steps. + + ## Releasing ### 1. Create and merge a release PR updating the CHANGELOG diff --git a/docs/source/user_guide/cli.rst b/docs/source/user_guide/cli.rst index c1e6bb5..4c048e7 100644 --- a/docs/source/user_guide/cli.rst +++ b/docs/source/user_guide/cli.rst @@ -42,10 +42,25 @@ Snowflake: To use the snowflake utilities, use the ``carto snowflake`` command. This command has several subcommands, which are described below. +Using the Raster Loader with Databricks +----------------------------------------- + +Before you can upload a raster file, you need to have set up the following in +Databricks: + +#. A databricks instance host. Eg. `https://dbc-abcde12345-678f.cloud.databricks.com` +#. A cluster id (cluser MUST BE turned on) +#. A Personal Access Token (PAT). See `Databricks PAT Docs `_. +#. A catalog +#. A schema (in the same catalog) + +To use the databricks utilities, use the ``carto databricks`` command. This command has +several subcommands, which are described below. + Uploading a raster layer ------------------------ -To upload a raster file, use the ``carto [bigquery|snowflake] upload`` command. +To upload a raster file, use the ``carto [bigquery|snowflake|databricks] upload`` command. The input raster must be a ``GoogleMapsCompatible`` raster. You can make your raster compatible by converting it with the following GDAL command: @@ -98,6 +113,20 @@ The same operation, performed with Snowflake, would be: Authentication parameters are explicitly required in this case for Snowflake, since they are not set up in the environment. +The same operation, performed with Databricks, would be: + +.. code-block:: bash + + carto databricks upload \ + --host 'https://dbc-12345abc-123f.cloud.databricks.com' \ + --token \ + --cluster-id '0123-456789-abc12345xyz' \ + --catalog 'main' \ + --schema default \ + --file_path \ + /path/to/my/raster/file/tif \ + --table mydatabrickstable + If no band is specified, the first band of the raster will be uploaded. If the ``--band`` flag is set, the specified band will be uploaded. For example, the following command uploads the second band of the raster: diff --git a/docs/source/user_guide/installation.rst b/docs/source/user_guide/installation.rst index 03c05a2..b18b234 100644 --- a/docs/source/user_guide/installation.rst +++ b/docs/source/user_guide/installation.rst @@ -22,13 +22,14 @@ To install from source: In most cases, it is recommended to install Raster Loader in a virtual environment. Use venv_ to create and manage your virtual environment. -The above will install the dependencies required to work with both BigQuery and Snowflake and. In case you only want to work with one of them, you can install the +The above will install the dependencies required to work with both BigQuery, Snowflake and Databricks. In case you only want to work with one of them, you can install the dependencies for each of them separately: .. code-block:: bash pip install -U raster-loader"[bigquery]" pip install -U raster-loader"[snowflake]" + pip install -U raster-loader"[databricks]" After installing the Raster Loader package, you will have access to the :ref:`carto CLI `. To make sure the installation was successful, run the diff --git a/docs/source/user_guide/use_with_python.rst b/docs/source/user_guide/use_with_python.rst index 1902973..758790b 100644 --- a/docs/source/user_guide/use_with_python.rst +++ b/docs/source/user_guide/use_with_python.rst @@ -18,6 +18,12 @@ For BigQuery, use ``BigQueryConnection``: from raster_loader import BigQueryConnection +For Databricks, use ``DatabricksConnection``: + +.. code-block:: python + + from raster_loader import DatabricksConnection + Then, create a connection object with the appropriate parameters. For Snowflake: @@ -48,7 +54,7 @@ For example: .. code-block:: python - connector.upload_raster( + connection.upload_raster( file_path = 'path/to/raster.tif', fqn = 'database.schema.tablename', ) diff --git a/raster_loader/__init__.py b/raster_loader/__init__.py index 36e7278..c85af99 100644 --- a/raster_loader/__init__.py +++ b/raster_loader/__init__.py @@ -6,9 +6,13 @@ from raster_loader.io.snowflake import ( SnowflakeConnection, ) +from raster_loader.io.databricks import ( + DatabricksConnection, +) __all__ = [ "__version__", "BigQueryConnection", "SnowflakeConnection", + "DatabricksConnection", ] diff --git a/raster_loader/cli/databricks.py b/raster_loader/cli/databricks.py new file mode 100644 index 0000000..8492a46 --- /dev/null +++ b/raster_loader/cli/databricks.py @@ -0,0 +1,172 @@ +import click +from functools import wraps, partial + +from raster_loader.utils import get_default_table_name +from raster_loader.io.databricks import DatabricksConnection + + +def catch_exception(func=None, *, handle=Exception): + if not func: + return partial(catch_exception, handle=handle) + + @wraps(func) + def wrapper(*args, **kwargs): + try: + return func(*args, **kwargs) + except handle as e: + raise click.ClickException(str(e)) + + return wrapper + + +@click.group(context_settings=dict(help_option_names=["-h", "--help"])) +def databricks(args=None): + """ + Manage Databricks resources. + """ + pass + + +@databricks.command(help="Upload a raster file to Databricks.") +@click.option("--host", help="The Databricks host URL.", required=True) +@click.option("--token", help="The Databricks access token.", required=True) +@click.option( + "--cluster-id", help="The Databricks cluster ID.", required=True +) # New option +@click.option( + "--file_path", help="The path to the raster file.", required=False, default=None +) +@click.option( + "--file_url", help="The URL to the raster file.", required=False, default=None +) +@click.option("--catalog", help="The name of the catalog.", required=True) +@click.option("--schema", help="The name of the schema.", required=True) +@click.option("--table", help="The name of the table.", default=None) +@click.option( + "--band", + help="Band(s) within raster to upload. " + "Could repeat --band to specify multiple bands.", + default=[1], + multiple=True, +) +@click.option( + "--band_name", + help="Column name(s) used to store band (Default: band_). " + "Could repeat --band_name to specify multiple bands column names. " + "List of column names HAVE to pair with --band list in the same order.", + default=[None], + multiple=True, +) +@click.option( + "--chunk_size", help="The number of blocks to upload in each chunk.", default=400 +) +@click.option( + "--overwrite", + help="Overwrite existing data in the table if it already exists.", + default=False, + is_flag=True, +) +@click.option( + "--append", + help="Append records into a table if it already exists.", + default=False, + is_flag=True, +) +@click.option( + "--cleanup-on-failure", + help="Clean up resources if the upload fails. Useful for non-interactive scripts.", + default=False, + is_flag=True, +) +@catch_exception() +def upload( + host, + token, + cluster_id, # Accept cluster ID + file_path, + file_url, + catalog, + schema, + table, + band, + band_name, + chunk_size, + overwrite=False, + append=False, + cleanup_on_failure=False, +): + from raster_loader.io.common import ( + get_number_of_blocks, + print_band_information, + get_block_dims, + ) + import os + from urllib.parse import urlparse + + if file_path is None and file_url is None: + raise ValueError("Need either a --file_path or --file_url") + + if file_path and file_url: + raise ValueError("Only one of --file_path or --file_url must be provided.") + + is_local_file = file_path is not None + + # Check that band and band_name are the same length if band_name provided + if band_name != (None,): + if len(band) != len(band_name): + raise ValueError("Must supply the same number of band_names as bands") + else: + band_name = [None] * len(band) + + # Pair band and band_name in a list of tuples + bands_info = list(zip(band, band_name)) + + # Create default table name if not provided + if table is None: + table = get_default_table_name( + file_path if is_local_file else urlparse(file_url).path, band + ) + + connector = DatabricksConnection( + host=host, + token=token, + cluster_id=cluster_id, # Pass cluster_id to DatabricksConnection + catalog=catalog, + schema=schema, + ) + + source = file_path if is_local_file else file_url + + # Introspect raster file + num_blocks = get_number_of_blocks(source) + file_size_mb = 0 + if is_local_file: + file_size_mb = os.path.getsize(file_path) / 1024 / 1024 + + click.echo("Preparing to upload raster file to Databricks...") + click.echo(f"File Path: {source}") + click.echo(f"File Size: {file_size_mb} MB") + print_band_information(source) + click.echo(f"Source Band(s): {band}") + click.echo(f"Band Name(s): {band_name}") + click.echo(f"Number of Blocks: {num_blocks}") + click.echo(f"Block Dimensions: {get_block_dims(source)}") + click.echo(f"Catalog: {catalog}") + click.echo(f"Schema: {schema}") + click.echo(f"Table: {table}") + click.echo(f"Number of Records Per Batch: {chunk_size}") + + click.echo("Uploading Raster to Databricks") + + connector.upload_raster( + source, + table, + bands_info, + chunk_size, + overwrite=overwrite, + append=append, + cleanup_on_failure=cleanup_on_failure, + ) + + click.echo("Raster file uploaded to Databricks") + exit(0) diff --git a/raster_loader/errors.py b/raster_loader/errors.py index 5d3bcec..138c391 100644 --- a/raster_loader/errors.py +++ b/raster_loader/errors.py @@ -16,6 +16,15 @@ def import_error_snowflake(): # pragma: no cover raise ImportError(msg) +def import_error_databricks(): # pragma: no cover + msg = ( + "Databricks client is not installed.\n" + "Please install Databricks dependencies to use this function.\n" + 'run `pip install -U raster-loader"[databricks]"` to install from pypi.' + ) + raise ImportError(msg) + + class IncompatibleRasterException(Exception): def __init__(self): self.message = ( diff --git a/raster_loader/io/common.py b/raster_loader/io/common.py index 6875061..5a08ad5 100644 --- a/raster_loader/io/common.py +++ b/raster_loader/io/common.py @@ -242,6 +242,8 @@ def rasterio_metadata( metadata["num_blocks"] = int(width * height / block_width / block_height) metadata["num_pixels"] = width * height metadata["pixel_resolution"] = pixel_resolution + metadata["crs"] = raster_crs + metadata["transform"] = raster_dataset.transform return metadata diff --git a/raster_loader/io/databricks.py b/raster_loader/io/databricks.py new file mode 100644 index 0000000..e3a620c --- /dev/null +++ b/raster_loader/io/databricks.py @@ -0,0 +1,296 @@ +import json +import pandas as pd + +from typing import Iterable, List, Tuple + +from raster_loader.errors import ( + IncompatibleRasterException, + import_error_databricks, +) + +from raster_loader.utils import ask_yes_no_question, batched + +from raster_loader.io.common import ( + rasterio_metadata, + rasterio_windows_to_records, + get_number_of_blocks, + check_metadata_is_compatible, + update_metadata, +) +from raster_loader.io.datawarehouse import DataWarehouseConnection + +try: + from databricks.connect import DatabricksSession + from pyspark.sql.types import ( + StructType, + StructField, + StringType, + LongType, + ) +except ImportError: # pragma: no cover + _has_databricks = False +else: + _has_databricks = True + + +class DatabricksConnection(DataWarehouseConnection): + def __init__(self, host, token, cluster_id, catalog, schema): + if not _has_databricks: + import_error_databricks() + + self.host = host + self.token = token + self.cluster_id = cluster_id + self.catalog = catalog + self.schema = schema + + self.client = self.get_connection() + + def get_connection(self): + # Initialize DatabricksSession + session = DatabricksSession.builder.remote( + host=self.host, token=self.token, cluster_id=self.cluster_id + ).getOrCreate() + session.conf.set("spark.databricks.session.timeout", "6h") + return session + + def get_table_fqn(self, table): + return f"`{self.catalog}`.{self.schema}.{table}" + + def execute(self, sql): + return self.client.sql(sql) + + def execute_to_dataframe(self, sql): + df = self.execute(sql) + return df.toPandas() + + def create_schema_if_not_exists(self): + self.execute(f"CREATE SCHEMA IF NOT EXISTS `{self.catalog}`.{self.schema}") + + def create_table_if_not_exists(self, table): + self.execute( + f""" + CREATE TABLE IF NOT EXISTS `{self.catalog}`.{self.schema}.{table} ( + BLOCK BIGINT, + METADATA STRING, + {self.band_columns} + ) USING DELTA + """ + ) + + def band_rename_function(self, band_name: str): + return band_name.upper() + + def write_metadata( + self, + metadata, + append_records, + table, + ): + # Create a DataFrame with the metadata + schema = StructType( + [ + StructField("BLOCK", LongType(), True), + StructField("METADATA", StringType(), True), + ] + ) + + data = [(0, json.dumps(metadata))] + + metadata_df = self.client.createDataFrame(data, schema) + + # Write to table + fqn = self.get_table_fqn(table) + metadata_df.write.format("delta").mode("append").saveAsTable(fqn) + + return True + + def get_metadata(self, table): + fqn = self.get_table_fqn(table) + query = f""" + SELECT METADATA + FROM {fqn} + WHERE BLOCK = 0 + """ + result = self.execute_to_dataframe(query) + if result.empty: + return None + return json.loads(result.iloc[0]["METADATA"]) + + def check_if_table_exists(self, table): + sql = f""" + SELECT * + FROM `{self.catalog}`.INFORMATION_SCHEMA.TABLES + WHERE TABLE_SCHEMA = '{self.schema}' + AND TABLE_NAME = '{table}' + """ + df = self.execute(sql) + # If the count is greater than 0, the table exists + return df.count() > 0 + + def check_if_table_is_empty(self, table): + fqn = self.get_table_fqn(table) + df = self.client.table(fqn) + return df.count() == 0 + + def upload_records( + self, + records: Iterable, + table: str, + overwrite: bool, + ): + fqn = self.get_table_fqn(table) + records_list = [] + for record in records: + # Remove 'METADATA' from records, as it's handled separately + if "METADATA" in record: + del record["METADATA"] + records_list.append(record) + + data_df = pd.DataFrame(records_list) + spark_df = self.client.createDataFrame(data_df) + + if overwrite: + mode = "overwrite" + else: + mode = "append" + + spark_df.write.format("delta").mode(mode).saveAsTable(fqn) + + return True + + def upload_raster( + self, + file_path: str, + table: str, + bands_info: List[Tuple[int, str]] = None, + chunk_size: int = None, + overwrite: bool = False, + append: bool = False, + cleanup_on_failure: bool = False, + ) -> bool: + print("Loading raster file to Databricks...") + + bands_info = bands_info or [(1, None)] + + append_records = False + + try: + if ( + self.check_if_table_exists(table) + and not self.check_if_table_is_empty(table) + and not overwrite + ): + append_records = append or ask_yes_no_question( + f"Table `{self.catalog}`.{self.schema}.{table} already exists " + "and is not empty. Append records? [yes/no] " + ) + + if not append_records: + exit() + + # Prepare band columns + self.band_columns = ", ".join( + [ + f"{self.band_rename_function(band_name or f'band_{band}')} BINARY" + for band, band_name in bands_info + ] + ) + + # Create schema and table if not exists + self.create_schema_if_not_exists() + self.create_table_if_not_exists(table) + + metadata = rasterio_metadata( + file_path, bands_info, self.band_rename_function + ) + + records_gen = rasterio_windows_to_records( + file_path, + self.band_rename_function, + bands_info, + ) + + total_blocks = get_number_of_blocks(file_path) + + if chunk_size is None: + ret = self.upload_records(records_gen, table, overwrite) + if not ret: + raise IOError("Error uploading to Databricks.") + else: + from tqdm.auto import tqdm + + print(f"Writing {total_blocks} blocks to Databricks...") + with tqdm(total=total_blocks) as pbar: + if total_blocks < chunk_size: + chunk_size = total_blocks + isFirstBatch = True + for records in batched(records_gen, chunk_size): + ret = self.upload_records( + records, table, overwrite and isFirstBatch + ) + pbar.update(len(records)) + if not ret: + raise IOError("Error uploading to Databricks.") + isFirstBatch = False + + print("Writing metadata to Databricks...") + if append_records: + old_metadata = self.get_metadata(table) + check_metadata_is_compatible(metadata, old_metadata) + update_metadata(metadata, old_metadata) + + self.write_metadata(metadata, append_records, table) + + except IncompatibleRasterException as e: + raise IOError(f"Error uploading to Databricks: {e.message}") + + except KeyboardInterrupt: + delete = cleanup_on_failure or ask_yes_no_question( + "Would you like to delete the partially uploaded table? [yes/no] " + ) + + if delete: + self.delete_table(table) + + raise KeyboardInterrupt + + except Exception as e: + delete = cleanup_on_failure or ask_yes_no_question( + ( + "Error uploading to Databricks. " + "Would you like to delete the partially uploaded table? [yes/no] " + ) + ) + + if delete: + self.delete_table(table) + + raise IOError(f"Error uploading to Databricks: {e}") + + print("Done.") + return True + + def delete_table(self, table): + fqn = self.get_table_fqn(table) + self.execute(f"DROP TABLE IF EXISTS {fqn}") + + def get_records(self, table: str, limit=10) -> pd.DataFrame: + fqn = self.get_table_fqn(table) + query = f"SELECT * FROM {fqn} LIMIT {limit}" + df = self.execute_to_dataframe(query) + return df + + def insert_in_table( + self, + rows: List[dict], + table: str, + ) -> bool: + fqn = self.get_table_fqn(table) + data_df = pd.DataFrame(rows) + spark_df = self.client.createDataFrame(data_df) + spark_df.write.format("delta").mode("append").saveAsTable(fqn) + return True + + def quote_name(self, name): + return f"`{name}`" diff --git a/raster_loader/tests/databricks/__init__.py b/raster_loader/tests/databricks/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/raster_loader/tests/databricks/test_cli.py b/raster_loader/tests/databricks/test_cli.py new file mode 100644 index 0000000..25b79d5 --- /dev/null +++ b/raster_loader/tests/databricks/test_cli.py @@ -0,0 +1,241 @@ +import os +from unittest.mock import patch + +from click.testing import CliRunner + +from raster_loader.cli import main + + +here = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +fixtures = os.path.join(here, "fixtures") +tiff = os.path.join(fixtures, "mosaic_cog.tif") + + +@patch( + "raster_loader.io.databricks.DatabricksConnection.upload_raster", return_value=None +) +@patch("raster_loader.io.databricks.DatabricksConnection.__init__", return_value=None) +def test_databricks_upload(*args, **kwargs): + runner = CliRunner() + result = runner.invoke( + main, + [ + "databricks", + "upload", + "--file_path", + f"{tiff}", + "--catalog", + "catalog", + "--schema", + "schema", + "--table", + "table", + "--host", + "https://databricks-host", + "--token", + "token", + "--cluster-id", + "cluster-1234", + "--chunk_size", + 1, + "--band", + 1, + ], + ) + print(result.output) + assert result.exit_code == 0 + + +@patch( + "raster_loader.io.databricks.DatabricksConnection.upload_raster", return_value=None +) +@patch("raster_loader.io.databricks.DatabricksConnection.__init__", return_value=None) +def test_databricks_file_path_or_url_check(*args, **kwargs): + runner = CliRunner() + result = runner.invoke( + main, + [ + "databricks", + "upload", + "--catalog", + "catalog", + "--schema", + "schema", + "--host", + "https://databricks-host", + "--token", + "token", + "--cluster-id", + "cluster-1234", + "--chunk_size", + 1, + "--band", + 1, + ], + ) + assert result.exit_code == 1 + assert "Error: Need either a --file_path or --file_url" in result.output + + result = runner.invoke( + main, + [ + "databricks", + "upload", + "--file_path", + f"{tiff}", + "--file_url", + "http://example.com/raster.tif", + "--catalog", + "catalog", + "--schema", + "schema", + "--host", + "https://databricks-host", + "--token", + "token", + "--cluster-id", + "cluster-1234", + "--chunk_size", + 1, + "--band", + 1, + ], + ) + assert result.exit_code == 1 + assert "Only one of --file_path or --file_url must be provided" in result.output + + +@patch( + "raster_loader.io.databricks.DatabricksConnection.upload_raster", return_value=None +) +@patch("raster_loader.io.databricks.DatabricksConnection.__init__", return_value=None) +def test_databricks_upload_multiple_bands(*args, **kwargs): + runner = CliRunner() + result = runner.invoke( + main, + [ + "databricks", + "upload", + "--file_path", + f"{tiff}", + "--catalog", + "catalog", + "--schema", + "schema", + "--host", + "https://databricks-host", + "--token", + "token", + "--cluster-id", + "cluster-1234", + "--chunk_size", + 1, + "--band", + 1, + "--band", + 2, + ], + ) + assert result.exit_code == 0 + + +def test_databricks_fail_upload_multiple_bands_misaligned_with_band_names( + *args, **kwargs +): + runner = CliRunner() + result = runner.invoke( + main, + [ + "databricks", + "upload", + "--file_path", + f"{tiff}", + "--catalog", + "catalog", + "--schema", + "schema", + "--host", + "https://databricks-host", + "--token", + "token", + "--cluster-id", + "cluster-1234", + "--chunk_size", + 1, + "--band", + 1, + "--band_name", + "band_1", + "--band", + 2, + ], + ) + assert result.exit_code == 1 + assert "Error: Must supply the same number of band_names as bands" in result.output + + +@patch( + "raster_loader.io.databricks.DatabricksConnection.upload_raster", return_value=None +) +@patch("raster_loader.io.databricks.DatabricksConnection.__init__", return_value=None) +def test_databricks_upload_multiple_bands_aligned_with_band_names(*args, **kwargs): + runner = CliRunner() + result = runner.invoke( + main, + [ + "databricks", + "upload", + "--file_path", + f"{tiff}", + "--catalog", + "catalog", + "--schema", + "schema", + "--host", + "https://databricks-host", + "--token", + "token", + "--cluster-id", + "cluster-1234", + "--chunk_size", + 1, + "--band", + 1, + "--band_name", + "band_1", + "--band_name", + "band_2", + "--band", + 2, + ], + ) + assert result.exit_code == 0 + + +@patch( + "raster_loader.io.databricks.DatabricksConnection.upload_raster", return_value=None +) +@patch("raster_loader.io.databricks.DatabricksConnection.__init__", return_value=None) +def test_databricks_upload_no_table_name(*args, **kwargs): + runner = CliRunner() + result = runner.invoke( + main, + [ + "databricks", + "upload", + "--file_path", + f"{tiff}", + "--catalog", + "catalog", + "--schema", + "schema", + "--host", + "https://databricks-host", + "--token", + "token", + "--cluster-id", + "cluster-1234", + ], + ) + assert result.exit_code == 0 + assert "Table: mosaic_cog_band__1___" in result.output diff --git a/setup.cfg b/setup.cfg index 26a05f1..0cee9ce 100644 --- a/setup.cfg +++ b/setup.cfg @@ -10,6 +10,7 @@ keywords = data warehouse bigquery snowflake + databricks author = CARTO url = https://github.com/cartodb/raster-loader license = BSD 3-Clause @@ -48,6 +49,7 @@ console_scripts = raster_loader.cli = bigquery = raster_loader.cli.bigquery:bigquery snowflake = raster_loader.cli.snowflake:snowflake + databricks = raster_loader.cli.databricks:databricks info = raster_loader.cli.info:info [options.extras_require] @@ -60,9 +62,13 @@ bigquery = google-auth>=2.28.0 snowflake = snowflake-connector-python>=2.6.0 +databricks = + databricks-connect==13.0.1 + all = %(bigquery)s %(snowflake)s + %(databricks)s [flake8] max-line-length = 88