Skip to content

a5chin/ml-pipelines

Folders and files

NameName
Last commit message
Last commit date

Latest commit

ย 

History

76 Commits
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 

Repository files navigation

ML Pipelines Template

uv Ruff

Versions code coverage

Docker Format Lint Test


๐Ÿ“‘ Table of Contents


๐Ÿ“‹ Overview

This is a production-ready template for building Kubeflow Pipelines (KFP) workflows with Python. It provides a structured, scalable architecture for ML pipelines with containerized task execution, type-safe configuration, and comprehensive testing.

โœจ Key Features

  • ๐Ÿ”„ Kubeflow Pipelines Integration: Build, compile, and deploy KFP workflows
  • ๐Ÿงฉ Task-Based Architecture: Modular ML tasks (feature engineering, training, evaluation, inference, export)
  • ๐ŸŒ Environment Management: Multi-environment support (dev, prod) with isolated configurations
  • โšก Modern Python Tooling: Built with uv and Ruff
  • ๐Ÿ”’ Type Safety: Full type hints with Pyright and Pydantic validation
  • ๐Ÿš€ CI/CD Ready: GitHub Actions workflows for testing, linting, and Docker builds

๐Ÿ“ฆ Prerequisites

  • ๐Ÿ Python 3.10+ - Programming language
  • ๐Ÿ“ฆ uv - Fast Python package installer and resolver
  • ๐Ÿณ Docker - Container platform (for builds)
  • โ˜ธ๏ธ Kubeflow Pipelines - ML workflow orchestration platform

๐Ÿ’ก Quick Install uv:

# macOS/Linux
curl -LsSf https://astral.sh/uv/install.sh | sh

# Windows
powershell -c "irm https://astral.sh/uv/install.ps1 | iex"

๐Ÿš€ Getting Started

1๏ธโƒฃ Install Dependencies

uv sync

2๏ธโƒฃ Run Tests

uv run nox -s test

3๏ธโƒฃ Compile a Pipeline

uv run nox -s compile_pipeline -- \
  --env dev \
  --pipeline_name sample-pipeline \
  --tag test \
  --model_type sample

๐Ÿ“ Project Structure

.
โ”œโ”€โ”€ const/                          # Shared enumerations
โ”‚   โ”œโ”€โ”€ environment.py              # Environment enum (dev, prod)
โ”‚   โ”œโ”€โ”€ model_type.py               # Model type enum (sample, ...)
โ”‚   โ””โ”€โ”€ task.py                     # Task enum (feature_engineering, training, ...)
โ”œโ”€โ”€ environments/                   # Environment-specific settings
โ”‚   โ”œโ”€โ”€ dev.py                      # Development environment config
โ”‚   โ”œโ”€โ”€ prod.py                     # Production environment config
โ”‚   โ””โ”€โ”€ settings.py                 # Settings loader
โ”œโ”€โ”€ pipelines/                      # KFP pipeline definitions
โ”‚   โ”œโ”€โ”€ components.py               # KFP container components
โ”‚   โ”œโ”€โ”€ graphs/                     # Pipeline graph definitions
โ”‚   โ”‚   โ””โ”€โ”€ sample.py               # Sample pipeline graph
โ”‚   โ”œโ”€โ”€ main.py                     # Pipeline compiler & uploader
โ”‚   โ””โ”€โ”€ settings.py                 # Pipeline compilation settings
โ”œโ”€โ”€ tasks/                          # ML task implementations
โ”‚   โ”œโ”€โ”€ base.py                     # BaseTask protocol
โ”‚   โ”œโ”€โ”€ feature_engineering/        # Feature engineering task
โ”‚   โ”œโ”€โ”€ training/                   # Model training task
โ”‚   โ”œโ”€โ”€ evaluation/                 # Model evaluation task
โ”‚   โ”œโ”€โ”€ inference/                  # Inference task
โ”‚   โ””โ”€โ”€ export/                     # Export task
โ”œโ”€โ”€ tests/                          # Test suite (mirrors src structure)
โ”œโ”€โ”€ main.py                         # Task executor (runs inside KFP containers)
โ”œโ”€โ”€ noxfile.py                      # Task automation with Nox
โ”œโ”€โ”€ pyproject.toml                  # Project dependencies & metadata
โ”œโ”€โ”€ pytest.ini                      # Pytest configuration
โ””โ”€โ”€ ruff.toml                       # Ruff linter configuration

Key Files:

  • main.py - Entry point for task execution in containers
  • noxfile.py - Development task automation (test, lint, fmt, compile_pipeline)
  • pyproject.toml - Project configuration and dependencies
  • CLAUDE.md - Architecture guide for Claude Code

๐Ÿ› ๏ธ Development Commands

๐Ÿงช Testing

# Run all tests
uv run nox -s test

# Run specific test file
uv run pytest tests/path/to/test__file.py

# Run with JUnit XML output
uv run nox -s test -- --junitxml=results.xml

โœ… Code Quality

# Format code
uv run nox -s fmt

# Run all linters
uv run nox -s lint -- --pyright --ruff

# Run individual linters
uv run nox -s lint -- --pyright
uv run nox -s lint -- --ruff

๐Ÿ”ง Pipeline Development

# Compile and upload pipeline
uv run nox -s compile_pipeline -- \
  --env <dev|prod> \
  --pipeline_name <name> \
  --tag <tag> \
  --model_type <sample|...>

๐Ÿ—๏ธ Architecture Overview

This project uses a dual-mode architecture:

  1. Pipeline Compilation Mode (pipelines/main.py): Compiles KFP pipeline definitions to YAML and uploads to Kubeflow
  2. Task Execution Mode (main.py): Runs individual tasks inside KFP containers

๐Ÿ”„ How It Works

  1. ๐Ÿ“ Define tasks in tasks/<task_name>/ with settings and run logic
  2. ๐Ÿ”— Create pipeline graphs in pipelines/graphs/ that chain tasks together
  3. ๐Ÿ“‹ Register components: tasks in main.py task_maps and pipelines in pipelines/main.py pipeline_types
  4. ๐Ÿ“ฆ Compile pipeline with compile_pipeline - generates KFP YAML and uploads to registry
  5. โ–ถ๏ธ Execute: KFP runs pipeline - each component executes main.py with task-specific arguments in containers

โž• Adding New Pipelines

Step-by-Step Guide

1๏ธโƒฃ Define Model Type

Add your model type to const/model_type.py:

class ModelType(StrEnum):
    """Enumeration for different Model Types."""

    SAMPLE = "sample"
    YOUR_MODEL = "your_model"  # โ† Add this

2๏ธโƒฃ Create Pipeline Graph

Create a new file pipelines/graphs/your_model.py:

from typing import TYPE_CHECKING

from kfp import dsl
from pipelines.components import (
    evaluation,
    feature_engineering,
    training,
    inference,
    export,
)

if TYPE_CHECKING:
    from kfp.dsl.graph_component import GraphComponent
    from pipelines.settings import PipelineCompileArgs


def get_pipeline(args: PipelineCompileArgs) -> GraphComponent:
    """Get your model pipeline.

    Args:
        args (PipelineCompileArgs): Pipeline arguments for compilation.

    Returns:
        GraphComponent: Pipeline Graph Component.
    """

    @dsl.pipeline(name=args.pipeline_name)
    def pipeline_def(execution_date: str) -> None:
        fe_task = feature_engineering(
            image=args.image,
            execution_date=execution_date,
            model_type=args.model_type,
        ).set_display_name("Feature Engineering")

        training_task = (
            training(
                image=args.image,
                execution_date=execution_date,
                model_type=args.model_type,
            )
            .after(fe_task)
            .set_display_name("Train Model")
        )
        # Add more tasks...

    return pipeline_def

3๏ธโƒฃ Implement Tasks

Create task implementations in tasks/<task_name>/run.py:

from logging import getLogger

from tasks.base import T_co
from tasks.training.settings import TrainingSettings

logger = getLogger(__name__)


class TrainingTask:
    """Training Task."""

    def __init__(
        self,
        *args: tuple[T_co],
        **kwargs: dict[str, T_co],
    ) -> None:
        """Initialize the Training Task."""
        self.settings = TrainingSettings()

    def run(self) -> None:
        """Run the Training Task."""
        logger.info("settings=%s", self.settings)
        # Your training logic here

4๏ธโƒฃ Register Components

Register tasks in main.py:

task_maps: dict[ModelType, dict[Task, type[BaseTask]]] = {
    ModelType.SAMPLE: {
        Task.FEATURE_ENGINEERING: FeatureEngineeringTask,
        Task.TRAINING: TrainingTask,
        # ...
    },
    ModelType.YOUR_MODEL: {  # โ† Add this
        Task.TRAINING: YourTrainingTask,
        # ...
    },
}

Register pipeline in pipelines/main.py:

from pipelines.graphs import sample, your_model

pipeline_types = {
    ModelType.SAMPLE: sample.get_pipeline,
    ModelType.YOUR_MODEL: your_model.get_pipeline,  # โ† Add this
}

5๏ธโƒฃ Compile & Deploy

uv run nox -s compile_pipeline -- \
  --env dev \
  --pipeline_name your-model-pipeline \
  --tag v1.0.0 \
  --model_type your_model

๐Ÿ’ก Tip: See CLAUDE.md for detailed architecture patterns and development guidelines.


๐Ÿ“š Related Resources

Official Documentation

Kubeflow Pipelines

Python Libraries

  • Pydantic - Data validation using Python type annotations
  • Pydantic Settings - Settings management from environment variables

๐Ÿค Contributing

We welcome contributions! Please follow these steps:

Development Workflow

  1. ๐Ÿด Fork the repository
  2. ๐Ÿ“ฅ Clone your fork:
    git clone https://github.com/YOUR_USERNAME/ml-pipelines.git
    cd ml-pipelines
  3. ๐ŸŒฟ Create a feature branch:
    git checkout -b feature/amazing-feature
  4. ๐Ÿ“ฆ Install dependencies:
    uv sync
  5. โœ๏ธ Make your changes with tests
  6. ๐ŸŽจ Format code:
    uv run nox -s fmt
  7. ๐Ÿ” Lint code:
    uv run nox -s lint -- --pyright --ruff
  8. โœ… Test changes:
    uv run nox -s test
  9. ๐Ÿ’พ Commit your changes:
    git commit -m 'Add amazing feature'
  10. ๐Ÿ“ค Push to your branch:
    git push origin feature/amazing-feature
  11. ๐Ÿ“ฎ Submit a pull request

Code Standards

  • โœ… Maintain 75%+ test coverage (enforced by pytest)
  • ๐ŸŽจ Follow Ruff formatting and linting rules (ruff.toml)
  • ๐Ÿ” Pass Pyright type checking (pyrightconfig.json)
  • ๐Ÿ“ Write clear commit messages
  • ๐Ÿงช Add tests for new features
  • ๐Ÿ“š Update documentation as needed

Testing Naming Convention

Test files must follow the test__*.py format (note the double underscore):

  • โœ… test__base.py
  • โœ… test__training.py
  • โŒ test_base.py (single underscore - won't be discovered)

๐Ÿ“„ License

This project is licensed under the terms specified in the LICENSE file.

About

No description or website provided.

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Contributors 2

  •  
  •