From d65209d145607fbe170ef070ba93495a1069d6e4 Mon Sep 17 00:00:00 2001 From: Jove Zhong Date: Tue, 24 Sep 2024 16:49:47 -0700 Subject: [PATCH 1/4] use dbt-core 1.8.7, latest proton lib, and 1.8.0 as version --- README.md | 10 ++++----- dbt/adapters/proton/__version__.py | 2 +- dbt/include/proton/dbt_project.yml | 2 +- dev_requirements.txt | 4 ++-- pyproject.toml | 35 +++++++++++------------------- setup.py | 17 +++++++-------- 6 files changed, 29 insertions(+), 41 deletions(-) diff --git a/README.md b/README.md index a21c62e..eb797f8 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,7 @@

- dbt logo + dbt logo

-[![build](https://github.com/silentsokolov/dbt-clickhouse/actions/workflows/build.yml/badge.svg?branch=master)](https://github.com/silentsokolov/dbt-clickhouse/actions/workflows/build.yml) - # dbt-proton This plugin ports [dbt](https://getdbt.com) functionality to [Timeplus Proton](https://github.com/timeplus-io/proton). @@ -45,14 +43,14 @@ Run `pytest tests/integration/proton.dbtspec` to run integration tests. ### Database -The dbt model `database.schema.table` is not compatible with ClickHouse because ClickHouse does not support a `schema`. -So we use a simple model `schema.table`, where `schema` is the ClickHouse's database. Please, don't use `default` database! +The dbt model `database.schema.table` is not compatible with Timeplus because Timeplus does not support a `schema`. +So we use a simple model `schema.table`, where `schema` is the Timeplus' database. ### Model Configuration | Option | Description | Required? | |----------------|------------------------------------------------------------------------------------------------------------------------------------------------------|-----------------------------------| -| engine | The table engine (type of table) to use when creating tables | Optional (default: `MergeTree()`) | +| engine | The table engine (type of stream) to use when creating tables | Optional (default: `Stream()`) | | order_by | A tuple of column names or arbitrary expressions. This allows you to create a small sparse index that helps find data faster. | Optional (default: `tuple()`) | | partition_by | A partition is a logical combination of records in a table by a specified criterion. The partition key can be any expression from the table columns. | Optional | diff --git a/dbt/adapters/proton/__version__.py b/dbt/adapters/proton/__version__.py index ce10155..a3428ff 100644 --- a/dbt/adapters/proton/__version__.py +++ b/dbt/adapters/proton/__version__.py @@ -1 +1 @@ -version = '1.6.5' +version = '1.8.0' diff --git a/dbt/include/proton/dbt_project.yml b/dbt/include/proton/dbt_project.yml index 437b217..9682be9 100644 --- a/dbt/include/proton/dbt_project.yml +++ b/dbt/include/proton/dbt_project.yml @@ -1,5 +1,5 @@ name: dbt_proton -version: 1.6.5 +version: 1.8.7 config-version: 2 macro-paths: ["macros"] diff --git a/dev_requirements.txt b/dev_requirements.txt index d739144..63a3310 100644 --- a/dev_requirements.txt +++ b/dev_requirements.txt @@ -1,5 +1,5 @@ -dbt-core>=1.6.5 -proton-driver>=0.2.8 +dbt-core>=1.8.7 +proton-driver>=0.2.10 pytest pytest-dotenv dbt-tests-adapter diff --git a/pyproject.toml b/pyproject.toml index bc5b772..842526f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,24 +1,15 @@ -[build-system] -requires = ["setuptools>=61.0"] -build-backend = "setuptools.build_meta" +[tool.black] +line-length = 100 +skip-string-normalization = true +target-version = ['py310', 'py311', 'py312'] +exclude = '(\.eggs|\.git|\.mypy_cache|\.venv|venv|env|_build|build|build|dist|)' -[project] -name = "dbt-proton" -version = "1.6.5" -description = "Proton dbt adapter" -authors = [{name="Jove Zhong", email="jove@timeplus.io"}] -readme = "README.md" -requires-python = ">=3.10" -classifiers = [ - "Programming Language :: Python :: 3", - "License :: OSI Approved :: Apache License", - "Operating System :: OS Independent", -] -dependencies = [ - "dbt-core>=1.6.5", - "proton-driver>=0.2.8", -] +[tool.isort] +line_length = 100 +profile = "black" +use_parentheses = true +skip = '.eggs/,.mypy_cache/,.venv/,venv/,env/' -[project.urls] -"Homepage" = "https://github.com/timeplus-io/proton" -"Bug Tracker" = "https://github.com/timeplus-io/proton" \ No newline at end of file +[tool.pytest.ini_options] +log_cli = true +log_cli_level = "WARNING" diff --git a/setup.py b/setup.py index 8f5b23f..3988f46 100644 --- a/setup.py +++ b/setup.py @@ -26,14 +26,15 @@ def _dbt_proton_version(): package_name = 'dbt-proton' package_version = _dbt_proton_version() -description = '''The Proton plugin for dbt (data build tool)''' +description = '''The Timeplus Proton plugin for dbt (data build tool)''' -dbt_version = '1.6.5' +dbt_version = '1.8.0' +dbt_minor = '.'.join(dbt_version.split('.')[0:2]) if not package_version.startswith(dbt_version): raise ValueError( f'Invalid setup.py: package_version={package_version} must start with ' - f'dbt_version={dbt_version}' + f'dbt_version={dbt_minor}' ) @@ -48,7 +49,7 @@ def _dbt_proton_version(): author='Jove Zhong', author_email='jove@timeplus.com', url='https://github.com/timeplus-io/dbt-proton', - license='MIT', + license='Apache 2.0 License', packages=find_namespace_packages(include=['dbt', 'dbt.*']), package_data={ @@ -60,9 +61,9 @@ def _dbt_proton_version(): }, install_requires=[ f'dbt-core=={dbt_version}', - 'proton-driver>=0.2.8', + 'proton-driver>=0.2.10', ], - python_requires=">=3.7", + python_requires=">=3.10", platforms='any', classifiers=[ 'Development Status :: 5 - Production/Stable', @@ -70,10 +71,8 @@ def _dbt_proton_version(): 'Operating System :: Microsoft :: Windows', 'Operating System :: MacOS :: MacOS X', 'Operating System :: POSIX :: Linux', - 'Programming Language :: Python :: 3.7', - 'Programming Language :: Python :: 3.8', - 'Programming Language :: Python :: 3.9', 'Programming Language :: Python :: 3.10', 'Programming Language :: Python :: 3.11', + 'Programming Language :: Python :: 3.12', ], ) From 0033e90a8a2b03c3f61dc3665f83a99fd6046572 Mon Sep 17 00:00:00 2001 From: Jove Zhong Date: Tue, 24 Sep 2024 17:22:28 -0700 Subject: [PATCH 2/4] pass the basic unit test --- README.md | 2 +- dbt/adapters/proton/connections.py | 5 +- dbt/adapters/proton/impl.py | 91 +++++++++++++++--------------- dbt/adapters/proton/relation.py | 13 ++++- 4 files changed, 59 insertions(+), 52 deletions(-) diff --git a/README.md b/README.md index eb797f8..82ff0b2 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,7 @@ pip install dbt-proton ``` ### Development -Follow the [dbt Documentation])(https://docs.getdbt.com/docs/core/pip-install) to install dbt with pip. +Follow the [dbt Documentation](https://docs.getdbt.com/docs/core/pip-install) to install dbt with pip. ```shell python3.10 -m venv proton-dbt-env source proton-dbt-env/bin/activate diff --git a/dbt/adapters/proton/connections.py b/dbt/adapters/proton/connections.py index 21a1719..42b3ebe 100644 --- a/dbt/adapters/proton/connections.py +++ b/dbt/adapters/proton/connections.py @@ -10,9 +10,8 @@ from proton_driver import Client, errors -from dbt.adapters.base import Credentials +from dbt.adapters.contracts.connection import Connection, Credentials from dbt.adapters.sql import SQLConnectionManager -from dbt.contracts.connection import Connection from dbt.logger import GLOBAL_LOGGER as logger from dbt.version import __version__ as dbt_version @@ -154,7 +153,7 @@ def execute( client = conn.handle with self.exception_handler(sql): - #sys.stdout.write("Jove TEMP LOG "+sql+"\n") + #sys.stdout.write("Jove TEMP LOG "+sql+"\n") logger.debug( 'On {connection_name}: {sql}', connection_name=conn.name, diff --git a/dbt/adapters/proton/impl.py b/dbt/adapters/proton/impl.py index 2ca2112..7098759 100644 --- a/dbt/adapters/proton/impl.py +++ b/dbt/adapters/proton/impl.py @@ -6,10 +6,21 @@ import dbt.exceptions from dataclasses import dataclass from concurrent.futures import Future +from typing import ( + TYPE_CHECKING, + Any, + Callable, + Dict, + FrozenSet, + Iterable, + List, + Optional, + Set, + Tuple, + Union, +) -from dbt.contracts.relation import RelationType from dbt.contracts.graph.manifest import Manifest -from dbt.clients.agate_helper import table_from_rows from dbt.adapters.base.relation import InformationSchema from dbt.adapters.base.impl import catch_as_completed from dbt.adapters.base import AdapterConfig, available @@ -19,7 +30,8 @@ ProtonRelation, ProtonColumn, ) -from dbt.utils import executor +from dbt.adapters.proton.relation import ProtonRelation, ProtonRelationType +from dbt.adapters.contracts.relation import Path, RelationConfig GET_CATALOG_MACRO_NAME = 'get_catalog' @@ -93,18 +105,26 @@ def list_relations_without_caching( relations = [] for row in results: - if len(row) != 4: - raise dbt.exceptions.DbtRuntimeError( - f'Invalid value from \'show table extended ...\', ' - f'got {len(row)} values, expected 4' - ) - _database, name, schema, type_info = row - rel_type = RelationType.View if 'view' in type_info else RelationType.Table + name, schema, type_info, db_engine, on_cluster = row + if 'view' in type_info: + rel_type = ProtonRelationType.View + elif type_info == 'dictionary': + rel_type = ProtonRelationType.Dictionary + else: + rel_type = ProtonRelationType.Table + can_exchange = ( + conn_supports_exchange + and rel_type == ProtonRelationType.Table + and db_engine in ('Atomic', 'Replicated') + ) + relation = self.Relation.create( - database=None, + database='', schema=schema, identifier=name, type=rel_type, + can_exchange=can_exchange, + can_on_cluster=(on_cluster >= 1), ) relations.append(relation) @@ -131,30 +151,20 @@ def get_columns_in_relation(self, relation: Relation) -> List[ProtonColumn]: return self.parse_proton_columns(relation, rows) - def get_catalog(self, manifest): - schema_map = self._get_catalog_schemas(manifest) - if len(schema_map) > 1: - dbt.exceptions.raise_compiler_error( - f'Expected only one database in get_catalog, found ' - f'{list(schema_map)}' - ) - - with executor(self.config) as tpe: - futures: List[Future[agate.Table]] = [] - for info, schemas in schema_map.items(): - for schema in schemas: - futures.append( - tpe.submit_connected( - self, - schema, - self._get_one_catalog, - info, - [schema], - manifest, - ) - ) - catalogs, exceptions = catch_as_completed(futures) - return catalogs, exceptions + def get_catalog( + self, + relation_configs: Iterable[RelationConfig], + used_schemas: FrozenSet[Tuple[str, str]], + ) -> Tuple["agate.Table", List[Exception]]: + from dbt_common.clients.agate_helper import empty_table + + relations = self._get_catalog_relations(relation_configs) + schemas = set(relation.schema for relation in relations) + if schemas: + catalog = self._get_one_catalog(InformationSchema(Path()), schemas, used_schemas) + else: + catalog = empty_table() + return catalog, [] def _get_one_catalog( self, @@ -170,17 +180,6 @@ def _get_one_catalog( return super()._get_one_catalog(information_schema, schemas, manifest) - @classmethod - def _catalog_filter_table( - cls, table: agate.Table, manifest: Manifest - ) -> agate.Table: - table = table_from_rows( - table.rows, - table.column_names, - text_only_columns=['table_schema', 'table_name'], - ) - return table.where(_catalog_filter_schemas(manifest)) - def get_rows_different_sql( self, relation_a: ProtonRelation, diff --git a/dbt/adapters/proton/relation.py b/dbt/adapters/proton/relation.py index ee4bbc7..c49c036 100644 --- a/dbt/adapters/proton/relation.py +++ b/dbt/adapters/proton/relation.py @@ -3,8 +3,9 @@ from dbt.adapters.base.relation import BaseRelation, Policy, Self from dbt.contracts.graph.nodes import SourceDefinition +from dbt_common.dataclass_schema import StrEnum from dbt.exceptions import DbtRuntimeError -from dbt.utils import deep_merge +from dbt_common.utils import deep_merge @dataclass @@ -20,6 +21,14 @@ class ProtonIncludePolicy(Policy): schema: bool = True identifier: bool = True +class ProtonRelationType(StrEnum): + Table = "table" + View = "view" + CTE = "cte" + MaterializedView = "materialized_view" + External = "external" + Ephemeral = "ephemeral" + Dictionary = "dictionary" @dataclass(frozen=True, eq=False, repr=False) class ProtonRelation(BaseRelation): @@ -72,4 +81,4 @@ def create_from_source(cls: Type[Self], source: SourceDefinition, **kwargs: Any) identifier=source.identifier, quote_policy=quote_policy, **kwargs, - ) \ No newline at end of file + ) From 168e61a5293a4dfd1fbdcaeffb60a0700518ffbb Mon Sep 17 00:00:00 2001 From: Jove Zhong Date: Tue, 24 Sep 2024 20:33:27 -0700 Subject: [PATCH 3/4] migrate code to new dbt core --- README.md | 8 +++++++- dbt/adapters/proton/connections.py | 24 +++++++++++------------- dbt/adapters/proton/relation.py | 4 ---- dbt/include/proton/macros/adapters.sql | 4 ++-- pytest.ini | 3 ++- setup.py | 4 ++-- 6 files changed, 24 insertions(+), 23 deletions(-) diff --git a/README.md b/README.md index 82ff0b2..b5ba25c 100644 --- a/README.md +++ b/README.md @@ -24,7 +24,13 @@ pip install dbt-core pip install -r dev_requirements.txt ``` Then run `pip install -e .` to install the current dev code. -Run `pytest tests/unit/test_adapter.py` to run basic tests. + +Run `pytest tests/unit` to run basic tests without a running Timeplus instance. + +To run functional test, please start a Timeplus Proton or Timeplus Enterprise instance via Docker or binary. [Check the docs](https://docs.timeplus.com/proton-howto) for details. + +Run `pytest tests/functional` to run functional tests. + Run `pytest tests/integration/proton.dbtspec` to run integration tests. ### Supported features diff --git a/dbt/adapters/proton/connections.py b/dbt/adapters/proton/connections.py index 42b3ebe..458919a 100644 --- a/dbt/adapters/proton/connections.py +++ b/dbt/adapters/proton/connections.py @@ -21,7 +21,7 @@ class ProtonCredentials(Credentials): host: str = 'localhost' port: Optional[int] = None user: Optional[str] = 'default' - database: Optional[str] = None + database: Optional[str] = 'default' schema: Optional[str] = 'default' password: str = '' cluster: Optional[str] = None @@ -42,15 +42,9 @@ def unique_field(self): return self.host def __post_init__(self): - if self.database is not None and self.database != self.schema: - raise dbt.exceptions.DbtRuntimeError( - f' schema: {self.schema} \n' - f' database: {self.database} \n' - f' cluster: {self.cluster} \n' - f'On Proton, database must be omitted or have the same value as' - f' schema.' - ) - self.database = None + # so far only use default database/schema + self.database = 'default' + self.schema = 'default' def _connection_keys(self): return ('host', 'port', 'user', 'schema', 'secure', 'verify') @@ -98,7 +92,7 @@ def open(cls, connection): handle = Client( host=credentials.host, port=credentials.port, - database='default', + database=credentials.database, user=credentials.user, password=credentials.password, client_name=f'dbt-{dbt_version}', @@ -137,13 +131,15 @@ def cancel(self, connection): @classmethod def get_table_from_response(cls, response, columns) -> agate.Table: + from dbt_common.clients.agate_helper import table_from_data_flat + column_names = [x[0] for x in columns] data = [] for row in response: data.append(dict(zip(column_names, row))) - return dbt.clients.agate_helper.table_from_data_flat(data, column_names) + return table_from_data_flat(data, column_names) def execute( self, sql: str, auto_begin: bool = False, fetch: bool = False, limit: Optional[int] = None @@ -175,7 +171,9 @@ def execute( if fetch: table = self.get_table_from_response(response, columns) else: - table = dbt.clients.agate_helper.empty_table() + from dbt_common.clients.agate_helper import empty_table + + table = empty_table() return status, table def add_query( diff --git a/dbt/adapters/proton/relation.py b/dbt/adapters/proton/relation.py index c49c036..b0c7b7a 100644 --- a/dbt/adapters/proton/relation.py +++ b/dbt/adapters/proton/relation.py @@ -37,10 +37,6 @@ class ProtonRelation(BaseRelation): quote_character: str = '' can_exchange: bool = False - def __post_init__(self): - if self.database != self.schema and self.database: - raise DbtRuntimeError(f'Cannot set database {self.database} in Proton!') - def render(self): if self.include_policy.database and self.include_policy.schema: raise DbtRuntimeError( diff --git a/dbt/include/proton/macros/adapters.sql b/dbt/include/proton/macros/adapters.sql index ecb8ef4..6032a72 100644 --- a/dbt/include/proton/macros/adapters.sql +++ b/dbt/include/proton/macros/adapters.sql @@ -88,13 +88,13 @@ {% macro proton__create_schema(relation) -%} {%- call statement('create_schema') -%} - create database if not exists {{ relation.without_identifier().include(database=False) }} {{ on_cluster_clause(label="on cluster") }} + create view {{ relation.without_identifier().include(database=False) }} as select 1 --create database if not exists {{ relation.without_identifier().include(database=False) }} {{ on_cluster_clause(label="on cluster") }} {% endcall %} {% endmacro %} {% macro proton__drop_schema(relation) -%} {%- call statement('drop_schema') -%} - drop database if exists {{ relation.without_identifier().include(database=False) }} {{ on_cluster_clause(label="on cluster") }} + drop view {{ relation.without_identifier().include(database=False) }} --drop database if exists {{ relation.without_identifier().include(database=False) }} {{ on_cluster_clause(label="on cluster") }} {%- endcall -%} {% endmacro %} diff --git a/pytest.ini b/pytest.ini index 1e5a349..9f3c84e 100644 --- a/pytest.ini +++ b/pytest.ini @@ -6,4 +6,5 @@ env_files = test.env pythonpath = . testpaths = - tests/functional # name per convention \ No newline at end of file + tests/functional # name per convention + tests/integration # name per convention diff --git a/setup.py b/setup.py index 3988f46..85f06ad 100644 --- a/setup.py +++ b/setup.py @@ -28,10 +28,10 @@ def _dbt_proton_version(): package_version = _dbt_proton_version() description = '''The Timeplus Proton plugin for dbt (data build tool)''' -dbt_version = '1.8.0' +dbt_version = '1.8.7' dbt_minor = '.'.join(dbt_version.split('.')[0:2]) -if not package_version.startswith(dbt_version): +if not package_version.startswith(dbt_minor): raise ValueError( f'Invalid setup.py: package_version={package_version} must start with ' f'dbt_version={dbt_minor}' From 35bc8c2abe2ae29459a552788297db38edf2c1b7 Mon Sep 17 00:00:00 2001 From: Jove Zhong Date: Fri, 20 Dec 2024 17:38:53 -0800 Subject: [PATCH 4/4] proton driver 0.2.13, port code --- dbt/adapters/proton/connections.py | 22 +++------------------- dbt/adapters/proton/impl.py | 1 + dev_requirements.txt | 2 +- setup.py | 2 +- 4 files changed, 6 insertions(+), 21 deletions(-) diff --git a/dbt/adapters/proton/connections.py b/dbt/adapters/proton/connections.py index 458919a..dbdfb11 100644 --- a/dbt/adapters/proton/connections.py +++ b/dbt/adapters/proton/connections.py @@ -57,27 +57,11 @@ class ProtonConnectionManager(SQLConnectionManager): def exception_handler(self, sql): try: yield - - except errors.ServerException as e: - logger.debug('Proton error: {}', str(e)) - - try: - # attempt to release the connection - self.release() - except errors.Error: - logger.debug('Failed to release connection!') - pass - - raise dbt.exceptions.DbtDatabaseError(str(e).strip()) from e - - except Exception as e: + except Exception as exp: logger.debug('Error running SQL: {}', sql) - logger.debug('Rolling back transaction.') - self.release() - if isinstance(e, dbt.exceptions.DbtRuntimeError): + if isinstance(exp, dbt.exceptions.DbtRuntimeError): raise - - raise dbt.exceptions.DbtRuntimeError(e) from e + raise dbt.exceptions.DbtRuntimeError('Timeplus exception: ' + str(exp)) from exp @classmethod def open(cls, connection): diff --git a/dbt/adapters/proton/impl.py b/dbt/adapters/proton/impl.py index 7098759..f6d0333 100644 --- a/dbt/adapters/proton/impl.py +++ b/dbt/adapters/proton/impl.py @@ -102,6 +102,7 @@ def list_relations_without_caching( ) -> List[ProtonRelation]: kwargs = {'schema_relation': schema_relation} results = self.execute_macro(LIST_RELATIONS_MACRO_NAME, kwargs=kwargs) + conn_supports_exchange = False # TODO relations = [] for row in results: diff --git a/dev_requirements.txt b/dev_requirements.txt index 63a3310..33fbccc 100644 --- a/dev_requirements.txt +++ b/dev_requirements.txt @@ -1,5 +1,5 @@ dbt-core>=1.8.7 -proton-driver>=0.2.10 +proton-driver>=0.2.13 pytest pytest-dotenv dbt-tests-adapter diff --git a/setup.py b/setup.py index 85f06ad..51220ed 100644 --- a/setup.py +++ b/setup.py @@ -61,7 +61,7 @@ def _dbt_proton_version(): }, install_requires=[ f'dbt-core=={dbt_version}', - 'proton-driver>=0.2.10', + 'proton-driver>=0.2.13', ], python_requires=">=3.10", platforms='any',