Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
<p align="center">
<img src="https://raw.githubusercontent.com/silentsokolov/dbt-clickhouse/master/etc/dbt-logo-full.svg" alt="dbt logo" width="300"/>
<img src="https://raw.githubusercontent.com/timeplus-io/dbt-proton/master/etc/dbt-logo-full.svg" alt="dbt logo" width="300"/>
</p>

[![build](https://github.com/silentsokolov/dbt-clickhouse/actions/workflows/build.yml/badge.svg?branch=master)](https://github.com/silentsokolov/dbt-clickhouse/actions/workflows/build.yml)

# dbt-proton

This plugin ports [dbt](https://getdbt.com) functionality to [Timeplus Proton](https://github.com/timeplus-io/proton).
Expand All @@ -18,15 +16,21 @@ pip install dbt-proton
```

### Development
Follow the [dbt Documentation])(https://docs.getdbt.com/docs/core/pip-install) to install dbt with pip.
Follow the [dbt Documentation](https://docs.getdbt.com/docs/core/pip-install) to install dbt with pip.
```shell
python3.10 -m venv proton-dbt-env
source proton-dbt-env/bin/activate
pip install dbt-core
pip install -r dev_requirements.txt
```
Then run `pip install -e .` to install the current dev code.
Run `pytest tests/unit/test_adapter.py` to run basic tests.

Run `pytest tests/unit` to run basic tests without a running Timeplus instance.

To run functional test, please start a Timeplus Proton or Timeplus Enterprise instance via Docker or binary. [Check the docs](https://docs.timeplus.com/proton-howto) for details.

Run `pytest tests/functional` to run functional tests.

Run `pytest tests/integration/proton.dbtspec` to run integration tests.

### Supported features
Expand All @@ -45,14 +49,14 @@ Run `pytest tests/integration/proton.dbtspec` to run integration tests.

### Database

The dbt model `database.schema.table` is not compatible with ClickHouse because ClickHouse does not support a `schema`.
So we use a simple model `schema.table`, where `schema` is the ClickHouse's database. Please, don't use `default` database!
The dbt model `database.schema.table` is not compatible with Timeplus because Timeplus does not support a `schema`.
So we use a simple model `schema.table`, where `schema` is the Timeplus' database.

### Model Configuration

| Option | Description | Required? |
|----------------|------------------------------------------------------------------------------------------------------------------------------------------------------|-----------------------------------|
| engine | The table engine (type of table) to use when creating tables | Optional (default: `MergeTree()`) |
| engine | The table engine (type of stream) to use when creating tables | Optional (default: `Stream()`) |
| order_by | A tuple of column names or arbitrary expressions. This allows you to create a small sparse index that helps find data faster. | Optional (default: `tuple()`) |
| partition_by | A partition is a logical combination of records in a table by a specified criterion. The partition key can be any expression from the table columns. | Optional |

Expand Down
2 changes: 1 addition & 1 deletion dbt/adapters/proton/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = '1.6.5'
version = '1.8.0'
51 changes: 16 additions & 35 deletions dbt/adapters/proton/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@

from proton_driver import Client, errors

from dbt.adapters.base import Credentials
from dbt.adapters.contracts.connection import Connection, Credentials
from dbt.adapters.sql import SQLConnectionManager
from dbt.contracts.connection import Connection
from dbt.logger import GLOBAL_LOGGER as logger
from dbt.version import __version__ as dbt_version

Expand All @@ -22,7 +21,7 @@ class ProtonCredentials(Credentials):
host: str = 'localhost'
port: Optional[int] = None
user: Optional[str] = 'default'
database: Optional[str] = None
database: Optional[str] = 'default'
schema: Optional[str] = 'default'
password: str = ''
cluster: Optional[str] = None
Expand All @@ -43,15 +42,9 @@ def unique_field(self):
return self.host

def __post_init__(self):
if self.database is not None and self.database != self.schema:
raise dbt.exceptions.DbtRuntimeError(
f' schema: {self.schema} \n'
f' database: {self.database} \n'
f' cluster: {self.cluster} \n'
f'On Proton, database must be omitted or have the same value as'
f' schema.'
)
self.database = None
# so far only use default database/schema
self.database = 'default'
self.schema = 'default'

def _connection_keys(self):
return ('host', 'port', 'user', 'schema', 'secure', 'verify')
Expand All @@ -64,27 +57,11 @@ class ProtonConnectionManager(SQLConnectionManager):
def exception_handler(self, sql):
try:
yield

except errors.ServerException as e:
logger.debug('Proton error: {}', str(e))

try:
# attempt to release the connection
self.release()
except errors.Error:
logger.debug('Failed to release connection!')
pass

raise dbt.exceptions.DbtDatabaseError(str(e).strip()) from e

except Exception as e:
except Exception as exp:
logger.debug('Error running SQL: {}', sql)
logger.debug('Rolling back transaction.')
self.release()
if isinstance(e, dbt.exceptions.DbtRuntimeError):
if isinstance(exp, dbt.exceptions.DbtRuntimeError):
raise

raise dbt.exceptions.DbtRuntimeError(e) from e
raise dbt.exceptions.DbtRuntimeError('Timeplus exception: ' + str(exp)) from exp

@classmethod
def open(cls, connection):
Expand All @@ -99,7 +76,7 @@ def open(cls, connection):
handle = Client(
host=credentials.host,
port=credentials.port,
database='default',
database=credentials.database,
user=credentials.user,
password=credentials.password,
client_name=f'dbt-{dbt_version}',
Expand Down Expand Up @@ -138,13 +115,15 @@ def cancel(self, connection):

@classmethod
def get_table_from_response(cls, response, columns) -> agate.Table:
from dbt_common.clients.agate_helper import table_from_data_flat

column_names = [x[0] for x in columns]

data = []
for row in response:
data.append(dict(zip(column_names, row)))

return dbt.clients.agate_helper.table_from_data_flat(data, column_names)
return table_from_data_flat(data, column_names)

def execute(
self, sql: str, auto_begin: bool = False, fetch: bool = False, limit: Optional[int] = None
Expand All @@ -154,7 +133,7 @@ def execute(
client = conn.handle

with self.exception_handler(sql):
#sys.stdout.write("Jove TEMP LOG "+sql+"\n")
#sys.stdout.write("Jove TEMP LOG "+sql+"\n")
logger.debug(
'On {connection_name}: {sql}',
connection_name=conn.name,
Expand All @@ -176,7 +155,9 @@ def execute(
if fetch:
table = self.get_table_from_response(response, columns)
else:
table = dbt.clients.agate_helper.empty_table()
from dbt_common.clients.agate_helper import empty_table

table = empty_table()
return status, table

def add_query(
Expand Down
92 changes: 46 additions & 46 deletions dbt/adapters/proton/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,21 @@
import dbt.exceptions
from dataclasses import dataclass
from concurrent.futures import Future
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
FrozenSet,
Iterable,
List,
Optional,
Set,
Tuple,
Union,
)

from dbt.contracts.relation import RelationType
from dbt.contracts.graph.manifest import Manifest
from dbt.clients.agate_helper import table_from_rows
from dbt.adapters.base.relation import InformationSchema
from dbt.adapters.base.impl import catch_as_completed
from dbt.adapters.base import AdapterConfig, available
Expand All @@ -19,7 +30,8 @@
ProtonRelation,
ProtonColumn,
)
from dbt.utils import executor
from dbt.adapters.proton.relation import ProtonRelation, ProtonRelationType
from dbt.adapters.contracts.relation import Path, RelationConfig


GET_CATALOG_MACRO_NAME = 'get_catalog'
Expand Down Expand Up @@ -90,21 +102,30 @@ def list_relations_without_caching(
) -> List[ProtonRelation]:
kwargs = {'schema_relation': schema_relation}
results = self.execute_macro(LIST_RELATIONS_MACRO_NAME, kwargs=kwargs)
conn_supports_exchange = False # TODO

relations = []
for row in results:
if len(row) != 4:
raise dbt.exceptions.DbtRuntimeError(
f'Invalid value from \'show table extended ...\', '
f'got {len(row)} values, expected 4'
)
_database, name, schema, type_info = row
rel_type = RelationType.View if 'view' in type_info else RelationType.Table
name, schema, type_info, db_engine, on_cluster = row
if 'view' in type_info:
rel_type = ProtonRelationType.View
elif type_info == 'dictionary':
rel_type = ProtonRelationType.Dictionary
else:
rel_type = ProtonRelationType.Table
can_exchange = (
conn_supports_exchange
and rel_type == ProtonRelationType.Table
and db_engine in ('Atomic', 'Replicated')
)

relation = self.Relation.create(
database=None,
database='',
schema=schema,
identifier=name,
type=rel_type,
can_exchange=can_exchange,
can_on_cluster=(on_cluster >= 1),
)
relations.append(relation)

Expand All @@ -131,30 +152,20 @@ def get_columns_in_relation(self, relation: Relation) -> List[ProtonColumn]:

return self.parse_proton_columns(relation, rows)

def get_catalog(self, manifest):
schema_map = self._get_catalog_schemas(manifest)
if len(schema_map) > 1:
dbt.exceptions.raise_compiler_error(
f'Expected only one database in get_catalog, found '
f'{list(schema_map)}'
)

with executor(self.config) as tpe:
futures: List[Future[agate.Table]] = []
for info, schemas in schema_map.items():
for schema in schemas:
futures.append(
tpe.submit_connected(
self,
schema,
self._get_one_catalog,
info,
[schema],
manifest,
)
)
catalogs, exceptions = catch_as_completed(futures)
return catalogs, exceptions
def get_catalog(
self,
relation_configs: Iterable[RelationConfig],
used_schemas: FrozenSet[Tuple[str, str]],
) -> Tuple["agate.Table", List[Exception]]:
from dbt_common.clients.agate_helper import empty_table

relations = self._get_catalog_relations(relation_configs)
schemas = set(relation.schema for relation in relations)
if schemas:
catalog = self._get_one_catalog(InformationSchema(Path()), schemas, used_schemas)
else:
catalog = empty_table()
return catalog, []

def _get_one_catalog(
self,
Expand All @@ -170,17 +181,6 @@ def _get_one_catalog(

return super()._get_one_catalog(information_schema, schemas, manifest)

@classmethod
def _catalog_filter_table(
cls, table: agate.Table, manifest: Manifest
) -> agate.Table:
table = table_from_rows(
table.rows,
table.column_names,
text_only_columns=['table_schema', 'table_name'],
)
return table.where(_catalog_filter_schemas(manifest))

def get_rows_different_sql(
self,
relation_a: ProtonRelation,
Expand Down
17 changes: 11 additions & 6 deletions dbt/adapters/proton/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@

from dbt.adapters.base.relation import BaseRelation, Policy, Self
from dbt.contracts.graph.nodes import SourceDefinition
from dbt_common.dataclass_schema import StrEnum
from dbt.exceptions import DbtRuntimeError
from dbt.utils import deep_merge
from dbt_common.utils import deep_merge


@dataclass
Expand All @@ -20,6 +21,14 @@ class ProtonIncludePolicy(Policy):
schema: bool = True
identifier: bool = True

class ProtonRelationType(StrEnum):
Table = "table"
View = "view"
CTE = "cte"
MaterializedView = "materialized_view"
External = "external"
Ephemeral = "ephemeral"
Dictionary = "dictionary"

@dataclass(frozen=True, eq=False, repr=False)
class ProtonRelation(BaseRelation):
Expand All @@ -28,10 +37,6 @@ class ProtonRelation(BaseRelation):
quote_character: str = ''
can_exchange: bool = False

def __post_init__(self):
if self.database != self.schema and self.database:
raise DbtRuntimeError(f'Cannot set database {self.database} in Proton!')

def render(self):
if self.include_policy.database and self.include_policy.schema:
raise DbtRuntimeError(
Expand Down Expand Up @@ -72,4 +77,4 @@ def create_from_source(cls: Type[Self], source: SourceDefinition, **kwargs: Any)
identifier=source.identifier,
quote_policy=quote_policy,
**kwargs,
)
)
2 changes: 1 addition & 1 deletion dbt/include/proton/dbt_project.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: dbt_proton
version: 1.6.5
version: 1.8.7
config-version: 2

macro-paths: ["macros"]
4 changes: 2 additions & 2 deletions dbt/include/proton/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,13 @@

{% macro proton__create_schema(relation) -%}
{%- call statement('create_schema') -%}
create database if not exists {{ relation.without_identifier().include(database=False) }} {{ on_cluster_clause(label="on cluster") }}
create view {{ relation.without_identifier().include(database=False) }} as select 1 --create database if not exists {{ relation.without_identifier().include(database=False) }} {{ on_cluster_clause(label="on cluster") }}
{% endcall %}
{% endmacro %}

{% macro proton__drop_schema(relation) -%}
{%- call statement('drop_schema') -%}
drop database if exists {{ relation.without_identifier().include(database=False) }} {{ on_cluster_clause(label="on cluster") }}
drop view {{ relation.without_identifier().include(database=False) }} --drop database if exists {{ relation.without_identifier().include(database=False) }} {{ on_cluster_clause(label="on cluster") }}
{%- endcall -%}
{% endmacro %}

Expand Down
4 changes: 2 additions & 2 deletions dev_requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
dbt-core>=1.6.5
proton-driver>=0.2.8
dbt-core>=1.8.7
proton-driver>=0.2.13
pytest
pytest-dotenv
dbt-tests-adapter
Loading