From 9c38a91da16999e20293be81bce83fda4b005b4c Mon Sep 17 00:00:00 2001 From: Alex Richey Date: Wed, 29 Apr 2026 09:56:31 -0400 Subject: [PATCH 1/3] Enable templating in recipes --- dcpy/lifecycle/builds/plan.py | 45 +++++++++++++++++++++++++++++---- dcpy/models/lifecycle/builds.py | 16 ++++++++++++ products/edde/recipe.yml | 19 +++++++++++--- 3 files changed, 72 insertions(+), 8 deletions(-) diff --git a/dcpy/lifecycle/builds/plan.py b/dcpy/lifecycle/builds/plan.py index f71eb8c465..068b8211e9 100644 --- a/dcpy/lifecycle/builds/plan.py +++ b/dcpy/lifecycle/builds/plan.py @@ -5,6 +5,7 @@ import pandas as pd import typer import yaml +from jinja2 import Environment, StrictUndefined, TemplateSyntaxError, UndefinedError from dcpy.connectors.edm import publishing, recipes from dcpy.lifecycle.builds.connector import get_recipes_default_connector @@ -196,12 +197,46 @@ def _apply_recipe_defaults(recipe: Recipe): def recipe_from_yaml(path: Path) -> Recipe: - """Import a recipe file from yaml, and validate schema.""" + """Import a recipe file from yaml, and validate schema. + + Supports Jinja2 template variables that are rendered from environment variables. + For security, only environment variables with the BUILD_ENV_ prefix are exposed + to templates to prevent accidental exposure of secrets. + Non-templated recipes are processed normally without errors. + """ with open(path, "r", encoding="utf-8") as f: - s = yaml.safe_load(f) - recipe = Recipe(**s) - _apply_recipe_defaults(recipe) - return recipe + raw_content = f.read() + + # Filter environment variables to only BUILD_ENV_* for security + # This prevents secrets from being accidentally exposed in recipes + build_env_vars = { + key: value for key, value in os.environ.items() if key.startswith("BUILD_ENV_") + } + + # Attempt to render Jinja2 templates from BUILD_ENV_* environment variables + # Use StrictUndefined to catch missing variables + rendered_content = raw_content + try: + jinja_env = Environment(undefined=StrictUndefined) + template = jinja_env.from_string(raw_content) + rendered_content = template.render(**build_env_vars) + except UndefinedError as e: + # Missing template variable - provide clear error + raise ValueError( + f"Recipe template requires BUILD_ENV_* environment variable that is not set: {e}. " + f"Only variables starting with BUILD_ENV_ are available to templates." + ) from e + except TemplateSyntaxError as e: + # Invalid Jinja2 syntax - provide clear error + raise ValueError( + f"Recipe contains invalid Jinja2 template syntax at line {e.lineno}: {e.message}" + ) from e + + # Parse the rendered YAML + s = yaml.safe_load(rendered_content) + recipe = Recipe(**s) + _apply_recipe_defaults(recipe) + return recipe def generate_lock_file(recipe_file: Path, recipe: Recipe) -> Path: diff --git a/dcpy/models/lifecycle/builds.py b/dcpy/models/lifecycle/builds.py index 168ab0e8ec..9537bb09e7 100644 --- a/dcpy/models/lifecycle/builds.py +++ b/dcpy/models/lifecycle/builds.py @@ -111,10 +111,26 @@ def check_resolvable(self) -> Self: return self +class CommandType(StrEnum): + """Type of command execution.""" + + shell = "shell" # Execute as shell command + python = "python" # Import and execute as Python module + + +class BuildCommand(BaseModel, extra="forbid"): + """A build command to execute during the build stage.""" + + name: str + run: str + command_type: CommandType = CommandType.shell + + class StageConfig(BaseModel, extra="forbid", arbitrary_types_allowed=True): destination: str | None = None destination_key: str | None = None connector_args: list[StageConfigValue] = [] + commands: list[BuildCommand] = [] def get_connector_args_dict(self) -> dict[str, Any]: return {a.name: a.value for a in self.connector_args or []} diff --git a/products/edde/recipe.yml b/products/edde/recipe.yml index fe1db5833f..dc088f56b3 100644 --- a/products/edde/recipe.yml +++ b/products/edde/recipe.yml @@ -1,7 +1,11 @@ name: EDDE -version: 2025 +version: {{ BUILD_ENV_EDDE_VERSION }} vars: - VERSION_PREV: 2023 + BUILD_ENV_EDDE_VERSION_PREV: {{ BUILD_ENV_EDDE_VERSION_PREV }} + BUILD_ENV_EDDE_VERSION: {{ BUILD_ENV_EDDE_VERSION }} + BUILD_ENV_EDDE_CENSUS_BASE_YEAR: {{ BUILD_ENV_EDDE_CENSUS_BASE_YEAR }} + BUILD_ENV_EDDE_ACS_START_YEAR: {{ BUILD_ENV_EDDE_ACS_START_YEAR }} + BUILD_ENV_EDDE_ACS_END_YEAR: {{ BUILD_ENV_EDDE_ACS_END_YEAR }} product: db-eddt inputs: dataset_defaults: @@ -65,10 +69,19 @@ inputs: stage_config: builds.build: + commands: + - name: All Indicators + run: python3 -m external_review.external_review_collate all + - name: Census Base + run: python3 -m external_review.collate_save_census {{ BUILD_ENV_EDDE_CENSUS_BASE_YEAR }} + - name: ACS Start + run: python3 -m external_review.collate_save_census {{ BUILD_ENV_EDDE_ACS_START_YEAR }} + - name: ACS End + run: python3 -m external_review.collate_save_census {{ BUILD_ENV_EDDE_ACS_END_YEAR }} destination: edm.publishing.builds destination_key: db-eddt connector_args: - name: acl value: public-read - name: build_note - value: ar-eddt-2025_1 + value: ar-eddt-2026 From 4ce25b74ea5163f0177aac3e6f46d4ea234c7f92 Mon Sep 17 00:00:00 2001 From: Alex Richey Date: Wed, 29 Apr 2026 09:55:08 -0400 Subject: [PATCH 2/3] Add init to ingest with helper to get ingest dir --- dcpy/lifecycle/ingest/__init__.py | 27 +++++++++++++++++++++++++++ example.env | 1 + 2 files changed, 28 insertions(+) diff --git a/dcpy/lifecycle/ingest/__init__.py b/dcpy/lifecycle/ingest/__init__.py index e69de29bb2..f0e9940854 100644 --- a/dcpy/lifecycle/ingest/__init__.py +++ b/dcpy/lifecycle/ingest/__init__.py @@ -0,0 +1,27 @@ +"""Ingest lifecycle module.""" +from pathlib import Path + +from dcpy.configuration import INGEST_DEF_DIR + + +def get_template_directory() -> Path: + """Get the ingest templates directory and assert it exists. + + Returns: + Path to the ingest templates directory + + Raises: + FileNotFoundError: If the templates directory doesn't exist + """ + template_dir = INGEST_DEF_DIR + + if not template_dir.exists(): + raise FileNotFoundError( + f"Templates directory not found at {template_dir}. " + f"Set TEMPLATE_DIR environment variable to the correct path." + ) + + return template_dir + + +__all__ = ["get_template_directory"] diff --git a/example.env b/example.env index aba280d0c3..c744343706 100644 --- a/example.env +++ b/example.env @@ -19,6 +19,7 @@ PUBLISHING_BUCKET= PUBLISHING_BUCKET_ROOT_FOLDER= # dcpy assets/dependencies +PROJECT_ROOT_PATH= PRODUCT_METADATA_REPO_PATH= TEMPLATE_DIR= From 3b417265cbc9c6602007e2e2e4dd3b26ec566ed5 Mon Sep 17 00:00:00 2001 From: Alex Richey Date: Wed, 29 Apr 2026 10:54:23 -0400 Subject: [PATCH 3/3] Implement recipe planning for all assets --- apps/__init__.py | 1 + apps/dagster/__init__.py | 1 + apps/dagster/builds/__init__.py | 5 ++ apps/dagster/builds/assets.py | 123 +++++++++++++++++++++++++++++ apps/dagster/builds/definitions.py | 11 +++ apps/dagster/builds/partitions.py | 3 + apps/dagster/builds/resources.py | 22 ++++++ apps/dagster/definitions.py | 20 +++++ apps/dagster/ingest/__init__.py | 6 ++ apps/dagster/ingest/assets.py | 34 ++------ apps/dagster/workspace.yaml | 1 + dcpy/configuration.py | 6 ++ dcpy/connectors/edm/builds.py | 5 +- dcpy/lifecycle/__init__.py | 41 ++++++++++ dcpy/lifecycle/asset_models.py | 25 ++++++ dcpy/lifecycle/builds/build.py | 27 ++++--- dcpy/lifecycle/builds/plan.py | 92 +++++++++++++++------ dcpy/lifecycle/ingest/__init__.py | 28 ++++++- example.env | 1 + 19 files changed, 386 insertions(+), 66 deletions(-) create mode 100644 apps/__init__.py create mode 100644 apps/dagster/__init__.py create mode 100644 apps/dagster/builds/__init__.py create mode 100644 apps/dagster/builds/assets.py create mode 100644 apps/dagster/builds/definitions.py create mode 100644 apps/dagster/builds/partitions.py create mode 100644 apps/dagster/builds/resources.py create mode 100644 apps/dagster/definitions.py create mode 100644 dcpy/lifecycle/asset_models.py diff --git a/apps/__init__.py b/apps/__init__.py new file mode 100644 index 0000000000..32cc1640f5 --- /dev/null +++ b/apps/__init__.py @@ -0,0 +1 @@ +"""Apps package.""" diff --git a/apps/dagster/__init__.py b/apps/dagster/__init__.py new file mode 100644 index 0000000000..17f70a8497 --- /dev/null +++ b/apps/dagster/__init__.py @@ -0,0 +1 @@ +"""Dagster application for data engineering workflows.""" diff --git a/apps/dagster/builds/__init__.py b/apps/dagster/builds/__init__.py new file mode 100644 index 0000000000..6a0e08aedc --- /dev/null +++ b/apps/dagster/builds/__init__.py @@ -0,0 +1,5 @@ +from .assets import build_assets +from .partitions import build_partition_def +from .resources import LocalStorageResource + +__all__ = ["build_assets", "build_partition_def", "LocalStorageResource"] diff --git a/apps/dagster/builds/assets.py b/apps/dagster/builds/assets.py new file mode 100644 index 0000000000..8b6f17075a --- /dev/null +++ b/apps/dagster/builds/assets.py @@ -0,0 +1,123 @@ +import os +from pathlib import Path +from typing import Any + +import yaml +from dagster import AssetExecutionContext, Config, MaterializeResult, asset +from pydantic import create_model + +from dcpy import lifecycle +from dcpy.lifecycle.builds.plan import recipe_from_yaml + +from .partitions import build_partition_def +from .resources import LocalStorageResource + + +def create_config_class(product_name: str, recipe_path: Path) -> type[Config]: + """Dynamically create a Config class based on recipe template vars. + + Parses the recipe without rendering templates to extract the vars section keys. + + Args: + product_name: Product name (e.g., 'edde') + recipe_path: Path to the recipe.yml file + + Returns: + A Pydantic Config class with fields for each template variable + """ + # Parse recipe without rendering to get template variable names + recipe = recipe_from_yaml(recipe_path, render_templates=False) + template_vars = list(recipe.vars.keys()) if recipe.vars else [] + + if not template_vars: + # No template vars, create minimal config + return type(f"PlanRecipeConfig_{product_name}", (Config,), {}) + + # Create field definitions: {var_name: (str, ...)} where ... means required + field_definitions: dict[str, Any] = {var: (str, ...) for var in template_vars} + + # Dynamically create Config class + config_class = create_model( + f"PlanRecipeConfig_{product_name}", + __base__=Config, + **field_definitions, + ) + + return config_class + + +def make_plan_recipe_asset(product: lifecycle.asset_models.Product): + """Create a plan recipe asset for a specific product. + + Args: + product: Product object with name and path attributes + + Returns: + A Dagster asset function + """ + config_class = create_config_class(product.name, product.recipe_path) + + @asset( + name=f"plan_recipe_{product.name}", + partitions_def=build_partition_def, + group_name="build", + tags={"product": product.name, "lifecycle_stage": "builds.plan"}, + ) + def _plan_recipe_asset( + context: AssetExecutionContext, + config: config_class, # type: ignore + local_storage: LocalStorageResource, + ): + """Plan recipe for product, resolving all template variables and versions.""" + from dcpy.lifecycle.builds.build import upload_build + from dcpy.lifecycle.builds.plan import plan_recipe + + partition_key = context.partition_key + + # Set environment variables from config + # Get all template vars from the config object + template_vars = [field for field in config.__fields__.keys()] + for var in template_vars: + value = getattr(config, var) + os.environ[var] = value + context.log.info(f"Set {var}={value}") + + context.log.info(f"Planning recipe from {product.recipe_path}") + + # Plan the recipe (this will render templates and resolve versions) + recipe = plan_recipe(product.recipe_path, version=partition_key) + + # Create temp build directory + build_path = local_storage.get_path("builds", product.name, partition_key) + + # Write recipe.lock.yml to build directory + lock_file = Path(build_path) / "recipe.lock.yml" + with open(lock_file, "w", encoding="utf-8") as f: + yaml.dump(recipe.model_dump(mode="json"), f) + + context.log.info(f"Wrote recipe.lock.yml to {lock_file}") + + # Upload build to S3 using existing functionality + context.log.info(f"Uploading build from {build_path}") + upload_build(build_path, recipe_lock_path=lock_file) + + return MaterializeResult( + metadata={ + "recipe_path": str(product.recipe_path), + "lock_file": str(lock_file), + "build_path": str(build_path), + "version": partition_key, + "product": product.name, + "template_vars": ", ".join(template_vars), + } + ) + + return _plan_recipe_asset + + +# Generate assets for all products +products = lifecycle.list_products() +plan_recipe_assets = [make_plan_recipe_asset(product) for product in products] + +# Export all assets +build_assets = plan_recipe_assets diff --git a/apps/dagster/builds/definitions.py b/apps/dagster/builds/definitions.py new file mode 100644 index 0000000000..b2ac90a893 --- /dev/null +++ b/apps/dagster/builds/definitions.py @@ -0,0 +1,11 @@ +from dagster import Definitions + +from .assets import build_assets +from .resources import LocalStorageResource + +defs = Definitions( + assets=build_assets, + resources={ + "local_storage": LocalStorageResource(base_path="/tmp/dagster-builds"), + }, +) diff --git a/apps/dagster/builds/partitions.py b/apps/dagster/builds/partitions.py new file mode 100644 index 0000000000..30240434fd --- /dev/null +++ b/apps/dagster/builds/partitions.py @@ -0,0 +1,3 @@ +from dagster import DynamicPartitionsDefinition + +build_partition_def = DynamicPartitionsDefinition(name="build_version") diff --git a/apps/dagster/builds/resources.py b/apps/dagster/builds/resources.py new file mode 100644 index 0000000000..2ccb65530d --- /dev/null +++ b/apps/dagster/builds/resources.py @@ -0,0 +1,22 @@ +from pathlib import Path + +from dagster import ConfigurableResource + + +class LocalStorageResource(ConfigurableResource): + """Resource for managing local storage paths for build operations.""" + + base_path: str = "/tmp/dagster-builds" + + def get_path(self, *parts: str) -> Path: + """Get a path under the base storage directory. + + Args: + *parts: Path components to join under base_path + + Returns: + Path object for the requested location + """ + path = Path(self.base_path).joinpath(*parts) + path.mkdir(parents=True, exist_ok=True) + return path diff --git a/apps/dagster/definitions.py b/apps/dagster/definitions.py new file mode 100644 index 0000000000..796b24a3c9 --- /dev/null +++ b/apps/dagster/definitions.py @@ -0,0 +1,20 @@ +"""Dagster definitions for build and ingest workflows.""" + +from builds import LocalStorageResource as BuildLocalStorageResource +from builds import build_assets +from dagster import Definitions +from ingest import ingest_assets + +# Combine all assets +all_assets = [*build_assets, *ingest_assets] + +# Define resources +resources = { + "local_storage": BuildLocalStorageResource(base_path="/tmp/dagster-builds"), +} + +# Create Definitions object +defs = Definitions( + assets=all_assets, + resources=resources, +) diff --git a/apps/dagster/ingest/__init__.py b/apps/dagster/ingest/__init__.py index a1e8ae64af..e204b94f9d 100644 --- a/apps/dagster/ingest/__init__.py +++ b/apps/dagster/ingest/__init__.py @@ -1 +1,7 @@ """Dagster ingest package for NYC Planning data engineering.""" + +from .assets import ingest_assets +from .partitions import ingest_partition_def +from .resources import LocalStorageResource + +__all__ = ["ingest_assets", "ingest_partition_def", "LocalStorageResource"] diff --git a/apps/dagster/ingest/assets.py b/apps/dagster/ingest/assets.py index fe8a360072..fac1bb8547 100644 --- a/apps/dagster/ingest/assets.py +++ b/apps/dagster/ingest/assets.py @@ -1,8 +1,9 @@ -from pathlib import Path from typing import List, Optional from dagster import AssetExecutionContext, Config, MaterializeResult, asset +from dcpy.lifecycle.ingest import get_template_directory, list_ingest_templates + from .partitions import ingest_partition_def from .resources import LocalStorageResource @@ -22,31 +23,6 @@ class IngestConfig(Config): overwrite_okay: bool = True # Allow overwriting existing versions -# Mount point for ingest templates in container -INGEST_TEMPLATES_PATH = Path("/app/repos/data-engineering/ingest_templates") - - -def get_ingest_template_ids() -> List[str]: - """Get all ingest template IDs from ingest_templates directory""" - if not INGEST_TEMPLATES_PATH.exists(): - raise FileNotFoundError( - f"Templates directory not found at {INGEST_TEMPLATES_PATH}. " - "Ensure the data-engineering repo is mounted correctly at /app/repos/data-engineering" - ) - - templates = [] - for file in INGEST_TEMPLATES_PATH.glob("*.yml"): - templates.append(file.stem) - - if not templates: - raise ValueError( - f"No template files (*.yml) found in {INGEST_TEMPLATES_PATH}. " - "Check that ingest_templates/ contains .yml files." - ) - - return sorted(templates) - - def make_ingest_asset(template_id: str): """Create a Dagster asset for an ingest template""" @@ -79,7 +55,7 @@ def _ingest_asset( latest=config.latest, push=config.push, output_csv=config.output_csv, - definition_dir=INGEST_TEMPLATES_PATH, + definition_dir=get_template_directory(), overwrite_okay=config.overwrite_okay, ) @@ -97,5 +73,5 @@ def _ingest_asset( # Create ingest assets for all templates -ingest_template_ids = get_ingest_template_ids() -ingest_assets = [make_ingest_asset(template_id) for template_id in ingest_template_ids] +ingest_templates = list_ingest_templates() +ingest_assets = [make_ingest_asset(template.name) for template in ingest_templates] diff --git a/apps/dagster/workspace.yaml b/apps/dagster/workspace.yaml index 58b583a482..3c185858c2 100644 --- a/apps/dagster/workspace.yaml +++ b/apps/dagster/workspace.yaml @@ -1,2 +1,3 @@ load_from: - python_module: ingest.definitions + - python_module: builds.definitions diff --git a/dcpy/configuration.py b/dcpy/configuration.py index 679d313715..75ba41df9a 100644 --- a/dcpy/configuration.py +++ b/dcpy/configuration.py @@ -36,8 +36,14 @@ PRODUCT_METADATA_REPO_PATH = env.get("PRODUCT_METADATA_REPO_PATH") +# Project root directory (assumes dcpy is at PROJECT_ROOT/dcpy) +PROJECT_ROOT_PATH = Path(__file__).parent.parent + INGEST_DEF_DIR = Path(env.get("TEMPLATE_DIR", "./ingest_templates")) +# Products directory for build recipes +PRODUCTS_DIR = Path(env.get("PRODUCTS_DIR", PROJECT_ROOT_PATH / "products")) + SFTP_HOST = env.get("SFTP_HOST") SFTP_USER = env.get("SFTP_USER") SFTP_PORT = str(env.get("SFTP_PORT", "22")) diff --git a/dcpy/connectors/edm/builds.py b/dcpy/connectors/edm/builds.py index ba42cee925..b519db7d72 100644 --- a/dcpy/connectors/edm/builds.py +++ b/dcpy/connectors/edm/builds.py @@ -122,6 +122,9 @@ def _upload_build( def push_versioned(self, key: str, version: str, **kwargs) -> dict: # For builds, the "version" is the build name/ID + logger.info( + f"Pushing version with connectors.edm.builds: {key}, version={version}" + ) connector_args = kwargs["connector_args"] acl = ( s3.string_as_acl(connector_args["acl"]) @@ -129,13 +132,13 @@ def push_versioned(self, key: str, version: str, **kwargs) -> dict: else None ) - logger.info(f"Pushing build for product: {key}, build: {version}") result = self._upload_build( build_dir=kwargs["build_path"], product=key, acl=acl, build_name=version, ) + logger.info(f"Pushed build. Result: {result}") return asdict(result) def _pull( diff --git a/dcpy/lifecycle/__init__.py b/dcpy/lifecycle/__init__.py index e69de29bb2..a6cff80858 100644 --- a/dcpy/lifecycle/__init__.py +++ b/dcpy/lifecycle/__init__.py @@ -0,0 +1,41 @@ +"""Lifecycle module for data engineering operations.""" + +from typing import List + +from dcpy.configuration import PRODUCTS_DIR +from dcpy.lifecycle.asset_models import Product + + +def list_products() -> List[Product]: + """List all available products. + + Returns: + List of Product objects with name and path attributes + + Raises: + FileNotFoundError: If the products directory doesn't exist + """ + products_dir = PRODUCTS_DIR + + if not products_dir.exists(): + raise FileNotFoundError( + f"Products directory not found at {products_dir}. " + f"Set PRODUCTS_DIR environment variable to the correct path." + ) + + products = [] + for product_dir in sorted(products_dir.iterdir()): + if product_dir.is_dir(): + recipe_file = product_dir / "recipe.yml" + if recipe_file.exists(): + products.append( + Product( + name=product_dir.name, + path=product_dir, + ) + ) + + return products + + +__all__ = ["list_products"] diff --git a/dcpy/lifecycle/asset_models.py b/dcpy/lifecycle/asset_models.py new file mode 100644 index 0000000000..2ae969d06d --- /dev/null +++ b/dcpy/lifecycle/asset_models.py @@ -0,0 +1,25 @@ +"""Data models for lifecycle assets (products, templates, etc.).""" + +from dataclasses import dataclass +from pathlib import Path + + +@dataclass +class Product: + """Represents a product in the products directory.""" + + name: str + path: Path + + @property + def recipe_path(self) -> Path: + """Path to the product's recipe.yml file.""" + return self.path / "recipe.yml" + + +@dataclass +class IngestTemplate: + """Represents an ingest template.""" + + name: str + path: Path diff --git a/dcpy/lifecycle/builds/build.py b/dcpy/lifecycle/builds/build.py index fa851356c5..fe7861ea51 100644 --- a/dcpy/lifecycle/builds/build.py +++ b/dcpy/lifecycle/builds/build.py @@ -122,16 +122,7 @@ def _export( export(recipe_lock_path) -@app.command("upload") -def _upload_build( - build_path: Path, - recipe_lock_path: Path = typer.Option( - None, - "--recipe-path", - "-r", - help="Path of recipe lock file to use", - ), -): +def upload_build(build_path: Path, recipe_lock_path: Path | None = None) -> dict: """Upload a build to the destination configured in the recipe.""" recipe = plan.recipe_from_yaml( recipe_lock_path or (Path(build_path).parent / "recipe.lock.yml") @@ -144,9 +135,25 @@ def _upload_build( connector_key = stage_config.destination_key or recipe.product result = connectors[stage_config.destination].push( + version=recipe.version, build_path=build_path, key=connector_key, connector_args=stage_config.get_connector_args_dict(), # TODO: eventually also pass the metadata from the build stage output, which would allow us to skip passing the build path ) + return result + + +@app.command("upload") +def _upload_build( + build_path: Path, + recipe_lock_path: Path = typer.Option( + None, + "--recipe-path", + "-r", + help="Path of recipe lock file to use", + ), +): + """Upload a build to the destination configured in the recipe.""" + result = upload_build(build_path, recipe_lock_path) typer.echo(result) diff --git a/dcpy/lifecycle/builds/plan.py b/dcpy/lifecycle/builds/plan.py index 068b8211e9..6ed8ab26c9 100644 --- a/dcpy/lifecycle/builds/plan.py +++ b/dcpy/lifecycle/builds/plan.py @@ -5,7 +5,13 @@ import pandas as pd import typer import yaml -from jinja2 import Environment, StrictUndefined, TemplateSyntaxError, UndefinedError +from jinja2 import ( + Environment, + StrictUndefined, + TemplateSyntaxError, + Undefined, + UndefinedError, +) from dcpy.connectors.edm import publishing, recipes from dcpy.lifecycle.builds.connector import get_recipes_default_connector @@ -26,6 +32,19 @@ ] +class PreserveUndefined(Undefined): + """Jinja2 Undefined that preserves template syntax as YAML-safe strings. + + This allows parsing recipes with Jinja2 templates without rendering them. + Templates like {{ VAR }} are preserved as strings in the parsed recipe. + """ + + def __str__(self): + # Return the template as a single-quoted string so YAML parses it as a string literal + # This prevents YAML from trying to interpret {{ }} as a flow mapping + return f"'{{{{ {self._undefined_name} }}}}'" + + def resolve_version(recipe: Recipe) -> str: match recipe.version_strategy: case None: @@ -196,41 +215,64 @@ def _apply_recipe_defaults(recipe: Recipe): ds.load_engine = ds.load_engine or recipe.inputs.dataset_defaults.load_engine -def recipe_from_yaml(path: Path) -> Recipe: +def recipe_from_yaml(path: Path, render_templates: bool = True) -> Recipe: """Import a recipe file from yaml, and validate schema. Supports Jinja2 template variables that are rendered from environment variables. For security, only environment variables with the BUILD_ENV_ prefix are exposed to templates to prevent accidental exposure of secrets. Non-templated recipes are processed normally without errors. + + Args: + path: Path to the recipe.yml file + render_templates: If True, render Jinja2 templates from BUILD_ENV_* vars. + If False, preserve templates as strings (for DAG generation). + + Returns: + Recipe object with schema validated """ with open(path, "r", encoding="utf-8") as f: raw_content = f.read() - # Filter environment variables to only BUILD_ENV_* for security - # This prevents secrets from being accidentally exposed in recipes - build_env_vars = { - key: value for key, value in os.environ.items() if key.startswith("BUILD_ENV_") - } - - # Attempt to render Jinja2 templates from BUILD_ENV_* environment variables - # Use StrictUndefined to catch missing variables + # Render Jinja2 templates rendered_content = raw_content - try: - jinja_env = Environment(undefined=StrictUndefined) - template = jinja_env.from_string(raw_content) - rendered_content = template.render(**build_env_vars) - except UndefinedError as e: - # Missing template variable - provide clear error - raise ValueError( - f"Recipe template requires BUILD_ENV_* environment variable that is not set: {e}. " - f"Only variables starting with BUILD_ENV_ are available to templates." - ) from e - except TemplateSyntaxError as e: - # Invalid Jinja2 syntax - provide clear error - raise ValueError( - f"Recipe contains invalid Jinja2 template syntax at line {e.lineno}: {e.message}" - ) from e + if render_templates: + # Filter environment variables to only BUILD_ENV_* for security + # This prevents secrets from being accidentally exposed in recipes + build_env_vars = { + key: value + for key, value in os.environ.items() + if key.startswith("BUILD_ENV_") + } + + # Use StrictUndefined to catch missing variables + try: + jinja_env = Environment(undefined=StrictUndefined) + template = jinja_env.from_string(raw_content) + rendered_content = template.render(**build_env_vars) + except UndefinedError as e: + # Missing template variable - provide clear error + raise ValueError( + f"Recipe template requires BUILD_ENV_* environment variable that is not set: {e}. " + f"Only variables starting with BUILD_ENV_ are available to templates." + ) from e + except TemplateSyntaxError as e: + # Invalid Jinja2 syntax - provide clear error + raise ValueError( + f"Recipe contains invalid Jinja2 template syntax at line {e.lineno}: {e.message}" + ) from e + else: + # Preserve templates as strings for DAG generation + # Use PreserveUndefined to keep {{ VAR }} syntax intact + try: + jinja_env = Environment(undefined=PreserveUndefined) + template = jinja_env.from_string(raw_content) + rendered_content = template.render() # No variables provided + except TemplateSyntaxError as e: + # Invalid Jinja2 syntax - provide clear error + raise ValueError( + f"Recipe contains invalid Jinja2 template syntax at line {e.lineno}: {e.message}" + ) from e # Parse the rendered YAML s = yaml.safe_load(rendered_content) diff --git a/dcpy/lifecycle/ingest/__init__.py b/dcpy/lifecycle/ingest/__init__.py index f0e9940854..ab421c3c73 100644 --- a/dcpy/lifecycle/ingest/__init__.py +++ b/dcpy/lifecycle/ingest/__init__.py @@ -1,7 +1,10 @@ """Ingest lifecycle module.""" + from pathlib import Path +from typing import List from dcpy.configuration import INGEST_DEF_DIR +from dcpy.lifecycle.asset_models import IngestTemplate def get_template_directory() -> Path: @@ -24,4 +27,27 @@ def get_template_directory() -> Path: return template_dir -__all__ = ["get_template_directory"] +def list_ingest_templates() -> List[IngestTemplate]: + """List all available ingest templates. + + Returns: + List of IngestTemplate objects with name and path attributes + + Raises: + FileNotFoundError: If the templates directory doesn't exist + """ + template_dir = get_template_directory() + + templates = [] + for template_file in sorted(template_dir.glob("*.yml")): + templates.append( + IngestTemplate( + name=template_file.stem, + path=template_file, + ) + ) + + return templates + + +__all__ = ["get_template_directory", "list_ingest_templates"] diff --git a/example.env b/example.env index c744343706..0af20ddc26 100644 --- a/example.env +++ b/example.env @@ -22,6 +22,7 @@ PUBLISHING_BUCKET_ROOT_FOLDER= PROJECT_ROOT_PATH= PRODUCT_METADATA_REPO_PATH= TEMPLATE_DIR= +PRODUCTS_DIR= # Github GHP_TOKEN=