From 57351f3c41f7a851ea048de011f46e37c8c05a6f Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Tue, 20 May 2025 23:46:37 +0200 Subject: [PATCH] Cluster API: Add dataset loader --- cratedb_toolkit/cluster/core.py | 13 +++++++++ cratedb_toolkit/datasets/model.py | 5 +++- cratedb_toolkit/io/cli.py | 47 +++++++++++++++++++++++-------- 3 files changed, 52 insertions(+), 13 deletions(-) diff --git a/cratedb_toolkit/cluster/core.py b/cratedb_toolkit/cluster/core.py index d579bcd0..d95e36bd 100644 --- a/cratedb_toolkit/cluster/core.py +++ b/cratedb_toolkit/cluster/core.py @@ -15,6 +15,7 @@ from cratedb_toolkit.cluster.guide import DataImportGuide from cratedb_toolkit.cluster.model import ClientBundle, ClusterBase, ClusterInformation from cratedb_toolkit.config import CONFIG +from cratedb_toolkit.datasets import load_dataset from cratedb_toolkit.exception import ( CroudException, DatabaseAddressMissingError, @@ -338,6 +339,14 @@ def stop(self) -> "ManagedCluster": self.probe() return self + @flexfun(domain="runtime") + def load_dataset(self, name: str, table: str = None): + if table is None: + table = name.replace("/", ".") + with jwt_token_patch(self.info.jwt.token): + ds = load_dataset(name) + return ds.dbtable(dburi=self.address.dburi, table=table).load() + @flexfun(domain="runtime") def load_table( self, @@ -619,6 +628,10 @@ def from_options(cls, options: ClusterAddressOptions) -> t.Union[ManagedCluster, """ return DatabaseCluster.create(**options.asdict()) + @classmethod + def from_ctx(cls, ctx: "click.Context") -> t.Union[ManagedCluster, StandaloneCluster]: + return DatabaseCluster.from_options(ctx.meta["address"]) + @classmethod def create( cls, cluster_id: str = None, cluster_name: str = None, cluster_url: str = None diff --git a/cratedb_toolkit/datasets/model.py b/cratedb_toolkit/datasets/model.py index 0ff5f936..7657e6c2 100644 --- a/cratedb_toolkit/datasets/model.py +++ b/cratedb_toolkit/datasets/model.py @@ -70,6 +70,7 @@ class DatasetToDatabaseTableAdapter: def __post_init__(self): self.init_sql = None self.db = DatabaseAdapter(dburi=self.dburi) + self.table = self.db.quote_relation_name(self.table) def create( self, @@ -109,12 +110,14 @@ def load( has_data = cardinality > 0 if if_exists == "noop" and has_data: - return + return self if self.init_sql is None: raise ValueError("SQL for loading data is missing") self.run_sql(self.init_sql) + return self + def run_sql(self, sql: str): for statement in sqlparse.parse(sql): self.db.run_sql(str(statement)) diff --git a/cratedb_toolkit/io/cli.py b/cratedb_toolkit/io/cli.py index 6bd72ee4..edc80e27 100644 --- a/cratedb_toolkit/io/cli.py +++ b/cratedb_toolkit/io/cli.py @@ -3,26 +3,18 @@ from pathlib import Path import click -from click_aliases import ClickAliasedGroup from cratedb_toolkit.cluster.core import DatabaseCluster from cratedb_toolkit.model import InputOutputResource, TableAddress from cratedb_toolkit.option import option_cluster_id, option_cluster_name, option_cluster_url -from cratedb_toolkit.util.cli import boot_click, make_command +from cratedb_toolkit.util.app import make_cli +from cratedb_toolkit.util.cli import make_command logger = logging.getLogger(__name__) -@click.group(cls=ClickAliasedGroup) # type: ignore[arg-type] -@click.option("--verbose", is_flag=True, required=False, help="Turn on logging") -@click.option("--debug", is_flag=True, required=False, help="Turn on logging with debug level") -@click.version_option() -@click.pass_context -def cli(ctx: click.Context, verbose: bool, debug: bool): - """ - Load data into CrateDB. - """ - return boot_click(ctx, verbose, debug) +cli = make_cli() +cli.help = "Load data into CrateDB." @make_command(cli, name="table") @@ -67,3 +59,34 @@ def load_table( cluster_url=cluster_url, ) cluster.load_table(source=source, target=target, transformation=transformation) + + logger.info(f"Importing table succeeded. source={source}, target={target}") + + +@make_command(cli, name="dataset") +@click.argument("name", envvar="DATASET_NAME", type=str) +@click.option("--schema", envvar="CRATEDB_SCHEMA", type=str, required=False, help="Schema where to import the data") +@click.option("--table", envvar="CRATEDB_TABLE", type=str, required=False, help="Table where to import the data") +@click.pass_context +def load_dataset( + ctx: click.Context, + name: str, + schema: str, + table: str, +): + """ + Import named dataset into CrateDB and CrateDB Cloud clusters. + """ + + # Adjust/convert target table parameter. + effective_table = None + if table is not None: + table_address = TableAddress(schema=schema, table=table) + effective_table = table_address.fullname + + # Dispatch "load dataset" operation. + cluster = DatabaseCluster.from_ctx(ctx) + ds = cluster.load_dataset(name=name, table=effective_table) + + logger.info(f"Importing dataset succeeded. Name: {name}, Table: {ds.table}") + logger.info(f"Peek SQL: SELECT * FROM {ds.table} LIMIT 42;")