diff --git a/cosmotech/coal/store/parquet.py b/cosmotech/coal/store/parquet.py new file mode 100644 index 00000000..1b6b39b6 --- /dev/null +++ b/cosmotech/coal/store/parquet.py @@ -0,0 +1,28 @@ +# Copyright (C) - 2023 - 2025 - Cosmo Tech +# This document and all information contained herein is the exclusive property - +# including all intellectual property rights pertaining thereto - of Cosmo Tech. +# Any use, reproduction, translation, broadcasting, transmission, distribution, +# etc., to any person is prohibited unless it has been previously and +# specifically authorized by written means by Cosmo Tech. + +import pathlib + +import pyarrow.parquet as pq + +from cosmotech.coal.store.store import Store + + +def store_parquet_file( + table_name: str, + parquet_path: pathlib.Path, + replace_existsing_file: bool = False, + store=Store(), +): + if not parquet_path.exists(): + raise FileNotFoundError(f"File {parquet_path} does not exists") + + data = pq.read_table(parquet_path) + _c = data.column_names + data = data.rename_columns([Store.sanitize_column(_column) for _column in _c]) + + store.add_table(table_name=table_name, data=data, replace=replace_existsing_file) diff --git a/cosmotech/csm_data/commands/store/load_parquet_folder.py b/cosmotech/csm_data/commands/store/load_parquet_folder.py new file mode 100644 index 00000000..5baf686a --- /dev/null +++ b/cosmotech/csm_data/commands/store/load_parquet_folder.py @@ -0,0 +1,49 @@ +# Copyright (C) - 2023 - 2025 - Cosmo Tech +# This document and all information contained herein is the exclusive property - +# including all intellectual property rights pertaining thereto - of Cosmo Tech. +# Any use, reproduction, translation, broadcasting, transmission, distribution, +# etc., to any person is prohibited unless it has been previously and +# specifically authorized by written means by Cosmo Tech. +from cosmotech.orchestrator.utils.translate import T + +from cosmotech.csm_data.utils.click import click +from cosmotech.csm_data.utils.decorators import translate_help, web_help + + +@click.command() +@web_help("csm-data/store/load-parquet-folder") +@translate_help("csm_data.commands.store.load_parquet_folder.description") +@click.option( + "--store-folder", + envvar="CSM_PARAMETERS_ABSOLUTE_PATH", + help=T("csm_data.commands.store.load_parquet_folder.parameters.store_folder"), + metavar="PATH", + type=str, + show_envvar=True, + required=True, +) +@click.option( + "--parquet-folder", + envvar="CSM_OUTPUT_ABSOLUTE_PATH", + help=T("csm_data.commands.store.load_parquet_folder.parameters.parquet_folder"), + metavar="PATH", + type=str, + show_envvar=True, + required=True, +) +def load_parquet_folder(store_folder, parquet_folder): + # Import the modules and functions at the start of the command + import pathlib + + from cosmotech.coal.store.parquet import store_parquet_file + from cosmotech.coal.store.store import Store + from cosmotech.coal.utils.configuration import Configuration + from cosmotech.coal.utils.logger import LOGGER + + _conf = Configuration() + + _conf.coal.store = store_folder + + for parquet_path in pathlib.Path(parquet_folder).glob("*.parquet"): + LOGGER.info(T("coal.services.azure_storage.found_file").format(file=parquet_path.name)) + store_parquet_file(parquet_path.stem, parquet_path, store=Store(False, _conf)) diff --git a/cosmotech/csm_data/commands/store/store.py b/cosmotech/csm_data/commands/store/store.py index 47be8957..2b84be81 100644 --- a/cosmotech/csm_data/commands/store/store.py +++ b/cosmotech/csm_data/commands/store/store.py @@ -11,6 +11,7 @@ from cosmotech.csm_data.commands.store.dump_to_s3 import dump_to_s3 from cosmotech.csm_data.commands.store.list_tables import list_tables from cosmotech.csm_data.commands.store.load_csv_folder import load_csv_folder +from cosmotech.csm_data.commands.store.load_parquet_folder import load_parquet_folder from cosmotech.csm_data.commands.store.load_from_singlestore import ( load_from_singlestore_command, ) @@ -30,6 +31,7 @@ def store(): store.add_command(reset, "reset") store.add_command(list_tables, "list-tables") store.add_command(load_csv_folder, "load-csv-folder") +store.add_command(load_parquet_folder, "load-parquet-folder") store.add_command(load_from_singlestore_command, "load-from-singlestore") store.add_command(dump_to_postgresql, "dump-to-postgresql") store.add_command(dump_to_s3, "dump-to-s3") diff --git a/cosmotech/translation/csm_data/en-US/csm_data/commands/store/load_parquet_folder.yml b/cosmotech/translation/csm_data/en-US/csm_data/commands/store/load_parquet_folder.yml new file mode 100644 index 00000000..4e869074 --- /dev/null +++ b/cosmotech/translation/csm_data/en-US/csm_data/commands/store/load_parquet_folder.yml @@ -0,0 +1,5 @@ +description: | + Running this command will find all parquet files in the given folder and put them in the store +parameters: + store_folder: The folder containing the store files + parquet_folder: The folder containing the parquet files to store