diff --git a/.markdownlintrc b/.markdownlintrc new file mode 100644 index 0000000..67d1329 --- /dev/null +++ b/.markdownlintrc @@ -0,0 +1,17 @@ +# Markdownlint configuration for PFS Target Database documentation +default: true + +# MD013: Line length - disable for auto-generated CLI docs and long URLs +MD013: false + +# MD014: Dollar signs in shell commands - disable for CLI documentation +MD014: false + +# MD033: Inline HTML - allow for documentation formatting (e.g., details/summary) +MD033: false + +# MD034: Bare URLs - allow for reference links +MD034: false + +# MD041: First line in file should be top-level heading - not always needed +MD041: false diff --git a/docs/reference/cli.md b/docs/reference/cli.md index 8926bc4..c508cf9 100644 --- a/docs/reference/cli.md +++ b/docs/reference/cli.md @@ -28,9 +28,17 @@ The command-line interface (CLI) tool `pfs-targetdb-cli` is provided to work wit host = "" user = "" data_dir = "" + + # Optional section for Web API access + # The following parameters are used to download data via Web API instead of rsync. + [webapi] + url = "" # e.g., "https://example.com/get-upload/" + api_key = "" # Optional: leave empty ("") for no authentication + verify_ssl = true # Optional: set to false to disable SSL certificate verification ``` The `schemacrawler` section is required only if you want to draw an ER diagram of the database schema with SchemaCrawler. + The `webapi` section is used by the `transfer-targets-api` command to download data via Web API. ## `pfs-targetdb-cli` @@ -60,6 +68,7 @@ $ pfs-targetdb-cli [OPTIONS] COMMAND [ARGS]... - `update`: Update rows in a table in the PFS Target... - `parse-alloc`: Parse an Excel file containing time... - `transfer-targets`: Download target lists from the uploader to... +- `transfer-targets-api`: Download target lists from the uploader to... - `insert-targets`: Insert targets using a list of input... - `insert-pointings`: Insert user-defined pointings using a list... - `update-catalog-active`: Update active flag in the input_catalog... @@ -329,6 +338,29 @@ $ pfs-targetdb-cli transfer-targets [OPTIONS] INPUT_FILE --- +### `transfer-targets-api` + +Download target lists from the uploader to the local machine via Web API. + +**Usage**: + +```console +$ pfs-targetdb-cli transfer-targets-api [OPTIONS] INPUT_FILE +``` + +**Arguments**: + +- `INPUT_FILE`: Input catalog list file (csv). [required] + +**Options**: + +- `-c, --config TEXT`: Database configuration file in the TOML format. [required] +- `--local-dir PATH`: Path to the data directory in the local machine [default: .] +- `--force / --no-force`: Force download. [default: no-force] +- `--help`: Show this message and exit. + +--- + ### `insert-targets` Insert targets using a list of input catalogs and upload IDs. diff --git a/pyproject.toml b/pyproject.toml index e1451b6..2a84401 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,6 +25,7 @@ dependencies = [ "pandas", "psycopg2-binary", "pyarrow", + "requests", "sqlalchemy", # "setuptools", "sqlalchemy-utils", diff --git a/src/targetdb/cli/cli_main.py b/src/targetdb/cli/cli_main.py index f3860ed..3231aa1 100644 --- a/src/targetdb/cli/cli_main.py +++ b/src/targetdb/cli/cli_main.py @@ -25,6 +25,7 @@ parse_allocation_file, prep_fluxstd_data, transfer_data_from_uploader, + transfer_data_from_uploader_via_webapi, update_input_catalog_active, ) @@ -196,7 +197,7 @@ def checkdups( typer.Option("--skip-save-merged", help="Do not save the merged DataFrame."), ] = False, additional_columns: Annotated[ - List[str], + List[str] | None, typer.Option( "--additional-columns", help="Additional columns to output for the merged file. (e.g., 'psf_mag_g' 'psf_mag_r'). " @@ -217,7 +218,7 @@ def checkdups( "--format", help="File format of the merged data file.", ), - ] = "parquet", + ] = PyArrowFileFormat.parquet, ): if additional_columns is None: additional_columns = [] @@ -260,7 +261,7 @@ def prep_fluxstd( ), ], input_catalog_id: Annotated[ - int, + int | None, typer.Option( "--input_catalog_id", show_default=False, @@ -268,7 +269,7 @@ def prep_fluxstd( ), ] = None, input_catalog_name: Annotated[ - str, + str | None, typer.Option( "--input_catalog_name", show_default=False, @@ -276,7 +277,7 @@ def prep_fluxstd( ), ] = None, rename_cols: Annotated[ - str, + str | None, typer.Option( "--rename-cols", help='Dictionary to rename columns (e.g., \'{"fstar_gaia": "is_fstar_gaia"}\').', @@ -288,7 +289,7 @@ def prep_fluxstd( "--format", help="File format of the output data file.", ), - ] = "parquet", + ] = PyArrowFileFormat.parquet, ): if input_catalog_id is None and input_catalog_name is None: @@ -419,7 +420,7 @@ def insert( ), ] = FluxType.total, upload_id: Annotated[ - str, + str | None, typer.Option( "--upload_id", show_default=False, @@ -427,7 +428,7 @@ def insert( ), ] = None, proposal_id: Annotated[ - str, + str | None, typer.Option( "--proposal_id", show_default=False, @@ -502,7 +503,7 @@ def update( ), ] = False, upload_id: Annotated[ - str, + str | None, typer.Option( "--upload_id", show_default=False, @@ -510,7 +511,7 @@ def update( ), ] = None, proposal_id: Annotated[ - str, + str | None, typer.Option( "--proposal_id", show_default=False, @@ -564,9 +565,9 @@ def parse_alloc( writable=True, help="Directory path to save output files.", ), - ] = ".", + ] = Path("."), outfile_prefix: Annotated[ - str, + str | None, typer.Option( show_default=False, help="Prefix to the output files.", @@ -603,12 +604,11 @@ def transfer_targets( local_dir: Annotated[ Path, typer.Option( - exists=True, dir_okay=True, writable=True, help="Path to the data directory in the local machine", ), - ] = ".", + ] = Path("."), force: Annotated[bool, typer.Option(help="Force download.")] = False, ): @@ -627,6 +627,55 @@ def transfer_targets( ) +@app.command( + help="Download target lists from the uploader to the local machine via Web API." +) +def transfer_targets_api( + input_file: Annotated[ + Path, + typer.Argument( + exists=True, + file_okay=True, + dir_okay=False, + readable=True, + show_default=False, + help="Input catalog list file (csv).", + ), + ], + config_file: Annotated[ + str, + typer.Option( + "-c", + "--config", + show_default=False, + help=config_help_msg, + ), + ], + local_dir: Annotated[ + Path, + typer.Option( + dir_okay=True, + writable=True, + help="Path to the data directory in the local machine", + ), + ] = Path("."), + force: Annotated[bool, typer.Option(help="Force download.")] = False, +): + + logger.info(f"Loading config file: {config_file}") + config = load_config(config_file) + + logger.info(f"Loading input data from {input_file} into a DataFrame") + df = load_input_data(input_file) + + transfer_data_from_uploader_via_webapi( + df, + config, + local_dir=local_dir, + force=force, + ) + + @app.command(help="Insert targets using a list of input catalogs and upload IDs.") def insert_targets( input_catalogs: Annotated[ @@ -657,7 +706,7 @@ def insert_targets( readable=True, help="Path to the data directory.", ), - ] = ".", + ] = Path("."), flux_type: Annotated[ FluxType, typer.Option( @@ -724,7 +773,7 @@ def insert_pointings( readable=True, help="Path to the data directory.", ), - ] = ".", + ] = Path("."), commit: Annotated[ bool, typer.Option("--commit", help="Commit changes to the database."), diff --git a/src/targetdb/utils.py b/src/targetdb/utils.py index 5e2f5b4..a55972e 100644 --- a/src/targetdb/utils.py +++ b/src/targetdb/utils.py @@ -2,13 +2,18 @@ import glob import os +import re +import shutil import subprocess +import tempfile import time +import zipfile from datetime import datetime from pathlib import Path import numpy as np import pandas as pd +import requests from astropy.table import Table from loguru import logger from sqlalchemy import URL @@ -1235,11 +1240,23 @@ def transfer_data_from_uploader( local_dir=Path("."), force=False, ): + # Create local directory if it doesn't exist + if not local_dir.exists(): + logger.info(f"Creating local directory: {local_dir}") + local_dir.mkdir(parents=True, exist_ok=True) + status = [] n_transfer = [] # is_user_ppc = [] for upload_id in df["upload_id"]: + # Skip empty or invalid upload_id + if pd.isna(upload_id) or str(upload_id).strip() == "": + logger.warning("Skipping empty or invalid upload_id") + status.append("skipped") + n_transfer.append(0) + continue + # datadirs = glob.glob(local_dir / f"????????-??????-{upload_id}") # datadirs = list(local_dir.cwd().glob(f"????????-??????-{upload_id}")) datadirs = list(local_dir.glob(f"????????-??????-{upload_id}")) @@ -1352,7 +1369,7 @@ def transfer_data_from_uploader( n_transfer.append(0) # is_user_ppc.append(False) - custom_status_dict = {"success": 0, "WARNING": 1, "FAILED": 3} + custom_status_dict = {"success": 0, "WARNING": 1, "skipped": 2, "FAILED": 3} df_status = pd.DataFrame( { "upload_id": df["upload_id"], @@ -1375,6 +1392,275 @@ def transfer_data_from_uploader( ) +def _setup_webapi_config(config): + """Setup Web API configuration and headers.""" + webapi_url = config["webapi"]["url"] + api_key = config["webapi"].get("api_key", "") + verify_ssl = config["webapi"].get("verify_ssl", True) + + # Normalize URL (ensure trailing slash) + if not webapi_url.endswith("/"): + webapi_url += "/" + + # Setup request headers (only add Authorization if api_key is provided) + headers = {} + if api_key: + headers["Authorization"] = f"Bearer {api_key}" + + # Log SSL verification status + if not verify_ssl: + logger.warning("SSL certificate verification is disabled") + import urllib3 + + urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + + return webapi_url, headers, verify_ssl + + +def _check_existing_directory(local_dir, upload_id, force): + """Check for existing directory and determine if transfer should be skipped.""" + datadirs = list(local_dir.glob(f"*-*-{upload_id}")) + + if len(datadirs) == 1: + skip_transfer = not force + if skip_transfer: + logger.info( + f"Data directory, {datadirs[0]}, is found locally. Skip transfer" + ) + else: + logger.info( + f"Data directory, {datadirs[0]}, is found locally, but force transfer" + ) + return skip_transfer + elif len(datadirs) > 1: + logger.error( + f"Multiple data directories are found in the destination directory: {datadirs}." + ) + raise ValueError( + f"Multiple data directories are found in the destination directory for {upload_id}: {datadirs}" + ) + else: + logger.info( + f"Data directory for upload_id: {upload_id} is not found locally. Try transfer" + ) + return False + + +def _download_zip_from_api(url, headers, verify_ssl, local_dir): + """Download ZIP file from Web API.""" + response = requests.get(url, headers=headers, stream=True, timeout=300, verify=verify_ssl) + response.raise_for_status() + + with tempfile.NamedTemporaryFile( + mode="wb", suffix=".zip", delete=False, dir=local_dir + ) as tmp_zip: + tmp_zip_path = Path(tmp_zip.name) + for chunk in response.iter_content(chunk_size=8192): + tmp_zip.write(chunk) + + logger.info(f"Downloaded ZIP file to {tmp_zip_path}") + return tmp_zip_path + + +def _extract_and_validate_zip(zip_path, upload_id, local_dir, force): + """Extract ZIP and validate directory structure.""" + with tempfile.TemporaryDirectory(dir=local_dir) as tmp_extract_dir: + tmp_extract_path = Path(tmp_extract_dir) + + with zipfile.ZipFile(zip_path, "r") as zip_ref: + zip_ref.extractall(tmp_extract_path) + + logger.info("Extracted ZIP to temporary directory") + + # Validate directory structure + pattern = re.compile(r"^pfs_target-\d{8}-\d{6}-" + re.escape(upload_id) + r"$") + extracted_dirs = [ + d for d in tmp_extract_path.iterdir() if d.is_dir() and pattern.match(d.name) + ] + + if len(extracted_dirs) == 0: + logger.error( + f"No directory matching pattern 'pfs_target-????????-??????-{upload_id}' found in ZIP" + ) + return 0, "WARNING" + elif len(extracted_dirs) > 1: + logger.error( + f"Multiple directories matching pattern found in ZIP: {extracted_dirs}" + ) + return len(extracted_dirs), "WARNING" + else: + # Move extracted directory to destination + extracted_dir = extracted_dirs[0] + final_dest = local_dir / extracted_dir.name + + if final_dest.exists(): + logger.info(f"Removing existing directory: {final_dest}") + shutil.rmtree(final_dest) + + shutil.move(str(extracted_dir), str(final_dest)) + logger.info(f"Moved extracted directory to {final_dest}") + + return 1, "success" + + +def _process_single_upload(upload_id, webapi_url, headers, verify_ssl, local_dir, force): + """Process a single upload_id download.""" + # Skip empty or invalid upload_id + if pd.isna(upload_id) or str(upload_id).strip() == "": + logger.warning("Skipping empty or invalid upload_id") + return "skipped", 0 + + # Check existing directory + try: + skip_transfer = _check_existing_directory(local_dir, upload_id, force) + except ValueError: + raise + + if skip_transfer: + return "skipped", 0 + + # Download and extract + logger.info(f"Downloading data for upload_id: {upload_id} from the Web API") + full_url = f"{webapi_url}{upload_id}" + logger.info(f"API endpoint: {full_url}") + + try: + zip_path = _download_zip_from_api(full_url, headers, verify_ssl, local_dir) + try: + n_transfer, status = _extract_and_validate_zip(zip_path, upload_id, local_dir, force) + return status, n_transfer + finally: + zip_path.unlink() + logger.info("Cleaned up temporary ZIP file") + + except requests.exceptions.HTTPError as e: + logger.error(f"HTTP error while downloading data for upload_id: {upload_id}") + logger.error(f"Status code: {e.response.status_code}, Message: {str(e)}") + return "FAILED", 0 + except requests.exceptions.RequestException as e: + logger.error(f"Network error while downloading data for upload_id: {upload_id}") + logger.error(str(e)) + return "FAILED", 0 + except zipfile.BadZipFile as e: + logger.error(f"Invalid ZIP file for upload_id: {upload_id}") + logger.error(str(e)) + return "FAILED", 0 + except Exception as e: + logger.error(f"Unexpected error while transferring data for upload_id: {upload_id}") + logger.error(str(e)) + return "FAILED", 0 + + +def transfer_data_from_uploader_via_webapi( + df, + config, + local_dir=Path("."), + force=False, +): + """ + Transfer data from the uploader server to local machine via Web API. + + This function downloads target data as ZIP archives from a Web API endpoint + using Bearer token authentication, extracts them, and validates the directory + structure matches the expected pattern: YYYYMMDD-HHMMSS-{upload_id}. + + Parameters + ---------- + df : pandas.DataFrame + DataFrame containing at least an 'upload_id' column with upload IDs to transfer. + config : dict + Configuration dictionary containing Web API credentials: + - config["webapi"]["url"]: Base URL of the Web API endpoint + - config["webapi"]["api_key"]: Bearer token for authentication + local_dir : Path, optional + Local directory where data should be downloaded. Defaults to Path("."). + force : bool, optional + If True, re-download even if directory already exists locally. + If False, skip transfer if directory exists. Defaults to False. + + Returns + ------- + None + The function logs transfer status and errors. It does not return a value, + but prints a status DataFrame showing results for each upload_id. + + Raises + ------ + ValueError + If multiple directories matching the same upload_id are found locally. + KeyError + If required config keys are missing. + + Notes + ----- + - Downloads are streamed to handle large ZIP files efficiently + - Temporary files are created in local_dir and cleaned up after extraction + - Status tracking mirrors transfer_data_from_uploader() for consistency: + * "success": 1 directory transferred successfully + * "WARNING": 0 or >1 directories in ZIP archive + * "FAILED": Error during HTTP request or extraction + * "skipped": Directory exists locally and force=False + - The function continues processing remaining upload_ids even if one fails + + Examples + -------- + >>> df = pd.DataFrame({'upload_id': ['abc123', 'def456']}) + >>> config = { + ... 'webapi': { + ... 'url': 'http://localhost:8000/get-upload/', + ... 'api_key': 'my-secret-token' + ... } + ... } + >>> transfer_data_from_uploader_via_webapi(df, config, local_dir=Path('./data')) + """ + # Create local directory if it doesn't exist + if not local_dir.exists(): + logger.info(f"Creating local directory: {local_dir}") + local_dir.mkdir(parents=True, exist_ok=True) + + # Setup Web API configuration + webapi_url, headers, verify_ssl = _setup_webapi_config(config) + + # Process each upload_id + status = [] + n_transfer = [] + + for upload_id in df["upload_id"]: + upload_status, upload_n_transfer = _process_single_upload( + upload_id, webapi_url, headers, verify_ssl, local_dir, force + ) + status.append(upload_status) + n_transfer.append(upload_n_transfer) + + # Generate status report + custom_status_dict = {"success": 0, "WARNING": 1, "skipped": 2, "FAILED": 3} + df_status = pd.DataFrame( + { + "upload_id": df["upload_id"], + "status": status, + "n_transfer": n_transfer, + } + ) + df_status_out = df_status.sort_values( + by=["status"], key=lambda x: x.map(custom_status_dict) + ) + logger.info(f"Transfer status: \n{df_status_out.to_string(index=False)}") + + # Check for actual issues (FAILED or WARNING), skipped is not an issue + has_failures = np.any( + (df_status["status"] == "FAILED") | (df_status["status"] == "WARNING") + ) + + if np.all(df_status["status"] == "success"): + logger.info("All data transfer is successful.") + elif not has_failures: + logger.info("All transfers completed (some skipped).") + else: + logger.error( + "There are some issues with data transfer. Please check the status." + ) + + def insert_targets_from_uploader( df_input_catalogs, config,