Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Apps package."""
1 change: 1 addition & 0 deletions apps/dagster/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Dagster application for data engineering workflows."""
5 changes: 5 additions & 0 deletions apps/dagster/builds/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from .assets import build_assets
from .partitions import build_partition_def
from .resources import LocalStorageResource

__all__ = ["build_assets", "build_partition_def", "LocalStorageResource"]
123 changes: 123 additions & 0 deletions apps/dagster/builds/assets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
import os
from pathlib import Path
from typing import Any

import yaml
from dagster import AssetExecutionContext, Config, MaterializeResult, asset
from pydantic import create_model

from dcpy import lifecycle
from dcpy.lifecycle.builds.plan import recipe_from_yaml

from .partitions import build_partition_def
from .resources import LocalStorageResource


def create_config_class(product_name: str, recipe_path: Path) -> type[Config]:
"""Dynamically create a Config class based on recipe template vars.

Parses the recipe without rendering templates to extract the vars section keys.

Args:
product_name: Product name (e.g., 'edde')
recipe_path: Path to the recipe.yml file

Returns:
A Pydantic Config class with fields for each template variable
"""
# Parse recipe without rendering to get template variable names
recipe = recipe_from_yaml(recipe_path, render_templates=False)
template_vars = list(recipe.vars.keys()) if recipe.vars else []

if not template_vars:
# No template vars, create minimal config
return type(f"PlanRecipeConfig_{product_name}", (Config,), {})

# Create field definitions: {var_name: (str, ...)} where ... means required
field_definitions: dict[str, Any] = {var: (str, ...) for var in template_vars}

# Dynamically create Config class
config_class = create_model(
f"PlanRecipeConfig_{product_name}",
__base__=Config,
**field_definitions,
)

return config_class


def make_plan_recipe_asset(product: lifecycle.asset_models.Product):
"""Create a plan recipe asset for a specific product.

Args:
product: Product object with name and path attributes

Returns:
A Dagster asset function
"""
config_class = create_config_class(product.name, product.recipe_path)

@asset(
name=f"plan_recipe_{product.name}",
partitions_def=build_partition_def,
group_name="build",
tags={"product": product.name, "lifecycle_stage": "builds.plan"},
)
def _plan_recipe_asset(
context: AssetExecutionContext,
config: config_class, # type: ignore
local_storage: LocalStorageResource,
):
"""Plan recipe for product, resolving all template variables and versions."""
from dcpy.lifecycle.builds.build import upload_build
from dcpy.lifecycle.builds.plan import plan_recipe

partition_key = context.partition_key

# Set environment variables from config
# Get all template vars from the config object
template_vars = [field for field in config.__fields__.keys()]
for var in template_vars:
value = getattr(config, var)
os.environ[var] = value
context.log.info(f"Set {var}={value}")

context.log.info(f"Planning recipe from {product.recipe_path}")

# Plan the recipe (this will render templates and resolve versions)
recipe = plan_recipe(product.recipe_path, version=partition_key)

# Create temp build directory
build_path = local_storage.get_path("builds", product.name, partition_key)

# Write recipe.lock.yml to build directory
lock_file = Path(build_path) / "recipe.lock.yml"
with open(lock_file, "w", encoding="utf-8") as f:
yaml.dump(recipe.model_dump(mode="json"), f)

context.log.info(f"Wrote recipe.lock.yml to {lock_file}")

# Upload build to S3 using existing functionality
context.log.info(f"Uploading build from {build_path}")
upload_build(build_path, recipe_lock_path=lock_file)

return MaterializeResult(
metadata={
"recipe_path": str(product.recipe_path),
"lock_file": str(lock_file),
"build_path": str(build_path),
"version": partition_key,
"product": product.name,
"template_vars": ", ".join(template_vars),
}
)

return _plan_recipe_asset


# Generate assets for all products
products = lifecycle.list_products()
plan_recipe_assets = [make_plan_recipe_asset(product) for product in products]

# Export all assets
build_assets = plan_recipe_assets
11 changes: 11 additions & 0 deletions apps/dagster/builds/definitions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from dagster import Definitions

from .assets import build_assets
from .resources import LocalStorageResource

defs = Definitions(
assets=build_assets,
resources={
"local_storage": LocalStorageResource(base_path="/tmp/dagster-builds"),
},
)
3 changes: 3 additions & 0 deletions apps/dagster/builds/partitions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from dagster import DynamicPartitionsDefinition

build_partition_def = DynamicPartitionsDefinition(name="build_version")
22 changes: 22 additions & 0 deletions apps/dagster/builds/resources.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from pathlib import Path

from dagster import ConfigurableResource


class LocalStorageResource(ConfigurableResource):
"""Resource for managing local storage paths for build operations."""

base_path: str = "/tmp/dagster-builds"

def get_path(self, *parts: str) -> Path:
"""Get a path under the base storage directory.

Args:
*parts: Path components to join under base_path

Returns:
Path object for the requested location
"""
path = Path(self.base_path).joinpath(*parts)
path.mkdir(parents=True, exist_ok=True)
return path
20 changes: 20 additions & 0 deletions apps/dagster/definitions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
"""Dagster definitions for build and ingest workflows."""

from builds import LocalStorageResource as BuildLocalStorageResource
from builds import build_assets
from dagster import Definitions
from ingest import ingest_assets

# Combine all assets
all_assets = [*build_assets, *ingest_assets]

# Define resources
resources = {
"local_storage": BuildLocalStorageResource(base_path="/tmp/dagster-builds"),
}

# Create Definitions object
defs = Definitions(
assets=all_assets,
resources=resources,
)
6 changes: 6 additions & 0 deletions apps/dagster/ingest/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,7 @@
"""Dagster ingest package for NYC Planning data engineering."""

from .assets import ingest_assets
from .partitions import ingest_partition_def
from .resources import LocalStorageResource

__all__ = ["ingest_assets", "ingest_partition_def", "LocalStorageResource"]
34 changes: 5 additions & 29 deletions apps/dagster/ingest/assets.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from pathlib import Path
from typing import List, Optional

from dagster import AssetExecutionContext, Config, MaterializeResult, asset

from dcpy.lifecycle.ingest import get_template_directory, list_ingest_templates

from .partitions import ingest_partition_def
from .resources import LocalStorageResource

Expand All @@ -22,31 +23,6 @@ class IngestConfig(Config):
overwrite_okay: bool = True # Allow overwriting existing versions


# Mount point for ingest templates in container
INGEST_TEMPLATES_PATH = Path("/app/repos/data-engineering/ingest_templates")


def get_ingest_template_ids() -> List[str]:
"""Get all ingest template IDs from ingest_templates directory"""
if not INGEST_TEMPLATES_PATH.exists():
raise FileNotFoundError(
f"Templates directory not found at {INGEST_TEMPLATES_PATH}. "
"Ensure the data-engineering repo is mounted correctly at /app/repos/data-engineering"
)

templates = []
for file in INGEST_TEMPLATES_PATH.glob("*.yml"):
templates.append(file.stem)

if not templates:
raise ValueError(
f"No template files (*.yml) found in {INGEST_TEMPLATES_PATH}. "
"Check that ingest_templates/ contains .yml files."
)

return sorted(templates)


def make_ingest_asset(template_id: str):
"""Create a Dagster asset for an ingest template"""

Expand Down Expand Up @@ -79,7 +55,7 @@ def _ingest_asset(
latest=config.latest,
push=config.push,
output_csv=config.output_csv,
definition_dir=INGEST_TEMPLATES_PATH,
definition_dir=get_template_directory(),
overwrite_okay=config.overwrite_okay,
)

Expand All @@ -97,5 +73,5 @@ def _ingest_asset(


# Create ingest assets for all templates
ingest_template_ids = get_ingest_template_ids()
ingest_assets = [make_ingest_asset(template_id) for template_id in ingest_template_ids]
ingest_templates = list_ingest_templates()
ingest_assets = [make_ingest_asset(template.name) for template in ingest_templates]
1 change: 1 addition & 0 deletions apps/dagster/workspace.yaml
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
load_from:
- python_module: ingest.definitions
- python_module: builds.definitions
6 changes: 6 additions & 0 deletions dcpy/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,14 @@

PRODUCT_METADATA_REPO_PATH = env.get("PRODUCT_METADATA_REPO_PATH")

# Project root directory (assumes dcpy is at PROJECT_ROOT/dcpy)
PROJECT_ROOT_PATH = Path(__file__).parent.parent

INGEST_DEF_DIR = Path(env.get("TEMPLATE_DIR", "./ingest_templates"))

# Products directory for build recipes
PRODUCTS_DIR = Path(env.get("PRODUCTS_DIR", PROJECT_ROOT_PATH / "products"))

SFTP_HOST = env.get("SFTP_HOST")
SFTP_USER = env.get("SFTP_USER")
SFTP_PORT = str(env.get("SFTP_PORT", "22"))
Expand Down
5 changes: 4 additions & 1 deletion dcpy/connectors/edm/builds.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,20 +122,23 @@ def _upload_build(

def push_versioned(self, key: str, version: str, **kwargs) -> dict:
# For builds, the "version" is the build name/ID
logger.info(
f"Pushing version with connectors.edm.builds: {key}, version={version}"
)
connector_args = kwargs["connector_args"]
acl = (
s3.string_as_acl(connector_args["acl"])
if connector_args.get("acl")
else None
)

logger.info(f"Pushing build for product: {key}, build: {version}")
result = self._upload_build(
build_dir=kwargs["build_path"],
product=key,
acl=acl,
build_name=version,
)
logger.info(f"Pushed build. Result: {result}")
return asdict(result)

def _pull(
Expand Down
41 changes: 41 additions & 0 deletions dcpy/lifecycle/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""Lifecycle module for data engineering operations."""

from typing import List

from dcpy.configuration import PRODUCTS_DIR
from dcpy.lifecycle.asset_models import Product


def list_products() -> List[Product]:
"""List all available products.

Returns:
List of Product objects with name and path attributes

Raises:
FileNotFoundError: If the products directory doesn't exist
"""
products_dir = PRODUCTS_DIR

if not products_dir.exists():
raise FileNotFoundError(
f"Products directory not found at {products_dir}. "
f"Set PRODUCTS_DIR environment variable to the correct path."
)

products = []
for product_dir in sorted(products_dir.iterdir()):
if product_dir.is_dir():
recipe_file = product_dir / "recipe.yml"
if recipe_file.exists():
products.append(
Product(
name=product_dir.name,
path=product_dir,
)
)

return products


__all__ = ["list_products"]
25 changes: 25 additions & 0 deletions dcpy/lifecycle/asset_models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
"""Data models for lifecycle assets (products, templates, etc.)."""

from dataclasses import dataclass
from pathlib import Path


@dataclass
class Product:
"""Represents a product in the products directory."""

name: str
path: Path

@property
def recipe_path(self) -> Path:
"""Path to the product's recipe.yml file."""
return self.path / "recipe.yml"


@dataclass
class IngestTemplate:
"""Represents an ingest template."""

name: str
path: Path
Loading
Loading