diff --git a/Dockerfile b/Dockerfile index 05a9b7e3..cc2afc3a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -3,20 +3,47 @@ # base β†’ shared dependencies (no Spark provider) # pyspark β†’ local PySpark execution (DEFAULT) # databricksβ†’ remote Databricks Connect execution +# VARIANT=cpu (default) or VARIANT=gpu # # Usage: # docker build . # default: pyspark # docker build --target databricks . # databricks-connect +# docker build --build-arg VARIANT=gpu . # GPU-enabled PySpark image (amd64 only) + +# ============================================ +# Stage: base selection (cpu/gpu) +# ============================================ +ARG PYTHON_VERSION=3.12 +ARG VARIANT=cpu + +FROM python:${PYTHON_VERSION}-slim-bookworm AS base-cpu + +FROM nvidia/cuda:12.9.0-runtime-ubuntu24.04 AS base-gpu +ARG PYTHON_VERSION=3.12 +RUN apt-get update && apt-get install -y --no-install-recommends \ + software-properties-common \ + && add-apt-repository ppa:deadsnakes/ppa \ + && apt-get update && apt-get install -y --no-install-recommends \ + python${PYTHON_VERSION} \ + python${PYTHON_VERSION}-venv \ + python${PYTHON_VERSION}-dev \ + python3-pip \ + && ln -sf /usr/bin/python${PYTHON_VERSION} /usr/bin/python3 \ + && ln -sf /usr/bin/python3 /usr/bin/python \ + && rm -rf /var/lib/apt/lists/* \ + && rm -f /usr/lib/python${PYTHON_VERSION}/EXTERNALLY-MANAGED # ============================================ # Stage: base (shared across all variants) # ============================================ +FROM base-${VARIANT} AS base +ARG TARGETARCH +ARG VARIANT=cpu ARG PYTHON_VERSION=3.12 -FROM python:${PYTHON_VERSION}-slim-bookworm AS base +# System dependencies WORKDIR /code -# System dependencies RUN apt-get update && apt-get install -y \ build-essential \ gcc \ @@ -29,14 +56,17 @@ RUN curl https://sh.rustup.rs -sSf | bash -s -- -y ENV PATH="/root/.cargo/bin:${PATH}" # Python tooling -RUN pip install --no-cache-dir --upgrade pip && \ +RUN rm -rf /usr/lib/python3/dist-packages/*.dist-info 2>/dev/null; \ + pip install --no-cache-dir pip && \ pip install --no-cache-dir poetry && \ poetry config virtualenvs.create false # Install large stable dependencies before poetry to maximize build cache reuse. -# INSTALL_PYTORCH controls whether CPU-only PyTorch is installed. +# INSTALL_PYTORCH controls whether PyTorch is installed. ARG INSTALL_PYTORCH="true" -RUN if [ "$INSTALL_PYTORCH" = "true" ]; then \ +RUN if [ "$VARIANT" = "gpu" ] && [ "$INSTALL_PYTORCH" = "true" ]; then \ + pip install --no-cache-dir torch==2.7.1; \ + elif [ "$INSTALL_PYTORCH" = "true" ]; then \ pip install --no-cache-dir torch==2.7.1 \ --index-url https://download.pytorch.org/whl/cpu \ --extra-index-url https://pypi.org/simple; \ @@ -101,6 +131,10 @@ RUN mkdir -p /opt/spark-jars && \ # Spark configuration for local mode ARG PYTHON_VERSION=3.12 +# GPU variant (Ubuntu) may install to dist-packages. Symlink ensures stable SPARK_HOME. +RUN mkdir -p /usr/local/lib/python${PYTHON_VERSION}/site-packages && \ + ln -sf $(python3 -c "import pyspark; print(pyspark.__path__[0])") \ + /usr/local/lib/python${PYTHON_VERSION}/site-packages/pyspark 2>/dev/null || true ENV SPARK_HOME="/usr/local/lib/python${PYTHON_VERSION}/site-packages/pyspark" ENV PYSPARK_PYTHON="python3" ENV PYSPARK_DRIVER_PYTHON="python3" diff --git a/Makefile b/Makefile index 4c36144c..4557d82a 100644 --- a/Makefile +++ b/Makefile @@ -49,6 +49,7 @@ help: @echo "" @echo "πŸ—οΈ Building:" @echo " make build Build default image (PySpark)" + @echo " make build-gpu Build GPU variant (CUDA + GPU PyTorch, amd64)" @echo " make build-databricks Build Databricks variant" @echo "" @echo "🧹 Cleanup:" @@ -70,21 +71,23 @@ help: .PHONY: test-integration test-integration: @echo "πŸ§ͺ Running staged pytest integration suite..." + @echo "Using DATALOADER_WORKERS=$${DATALOADER_WORKERS:-0}" @if [ -n "$(INTEGRATION_RUN_ID)" ]; then \ echo "Using integration run id: $(INTEGRATION_RUN_ID)"; \ - PLEXE_IT_RUN_ID="$(INTEGRATION_RUN_ID)" bash scripts/tests/run_integration_staged.sh; \ + DATALOADER_WORKERS="$${DATALOADER_WORKERS:-0}" PLEXE_IT_RUN_ID="$(INTEGRATION_RUN_ID)" bash scripts/tests/run_integration_staged.sh; \ else \ - bash scripts/tests/run_integration_staged.sh; \ + DATALOADER_WORKERS="$${DATALOADER_WORKERS:-0}" bash scripts/tests/run_integration_staged.sh; \ fi .PHONY: test-integration-verbose test-integration-verbose: @echo "πŸ§ͺ Running staged pytest integration suite (verbose)..." + @echo "Using DATALOADER_WORKERS=$${DATALOADER_WORKERS:-0}" @if [ -n "$(INTEGRATION_RUN_ID)" ]; then \ echo "Using integration run id: $(INTEGRATION_RUN_ID)"; \ - PLEXE_IT_RUN_ID="$(INTEGRATION_RUN_ID)" PLEXE_IT_VERBOSE=1 bash scripts/tests/run_integration_staged.sh; \ + DATALOADER_WORKERS="$${DATALOADER_WORKERS:-0}" PLEXE_IT_RUN_ID="$(INTEGRATION_RUN_ID)" PLEXE_IT_VERBOSE=1 bash scripts/tests/run_integration_staged.sh; \ else \ - PLEXE_IT_VERBOSE=1 bash scripts/tests/run_integration_staged.sh; \ + DATALOADER_WORKERS="$${DATALOADER_WORKERS:-0}" PLEXE_IT_VERBOSE=1 bash scripts/tests/run_integration_staged.sh; \ fi # Fast sanity check - 1 iteration, minimal config @@ -368,6 +371,17 @@ build: -f Dockerfile . @echo "βœ… Build complete: plexe:py$(PYTHON_VERSION)" +# Build GPU variant (NVIDIA CUDA + CUDA-enabled PyTorch, amd64 only) +.PHONY: build-gpu +build-gpu: + @echo "πŸ—οΈ Building GPU variant (Python $(PYTHON_VERSION), CUDA)..." + docker buildx build --platform linux/amd64 --output type=docker --provenance=false \ + --build-arg PYTHON_VERSION=$(PYTHON_VERSION) \ + --build-arg VARIANT=gpu \ + -t plexe:py$(PYTHON_VERSION)-gpu \ + -f Dockerfile . + @echo "βœ… Build complete: plexe:py$(PYTHON_VERSION)-gpu" + # Build Databricks variant .PHONY: build-databricks diff --git a/config.yaml.template b/config.yaml.template index 8e103476..7bded73f 100644 --- a/config.yaml.template +++ b/config.yaml.template @@ -34,8 +34,8 @@ # Default epochs for neural network training (Keras, PyTorch) # Type: integer -# Default: 25 -# nn_default_epochs: 25 +# Default: 10 +# nn_default_epochs: 10 # Maximum epochs for neural network training (Keras, PyTorch) # Type: integer diff --git a/plexe/CODE_INDEX.md b/plexe/CODE_INDEX.md index c4001189..752a9420 100644 --- a/plexe/CODE_INDEX.md +++ b/plexe/CODE_INDEX.md @@ -1,6 +1,6 @@ # Code Index: plexe -> Generated on 2026-03-02 19:57:53 +> Generated on 2026-03-02 22:03:39 Code structure and public interface documentation for the **plexe** package. @@ -207,14 +207,14 @@ Local process runner - executes training in subprocess. **`LocalProcessRunner`** - Runs training in local subprocess. - `__init__(self, work_dir: str)` -- `run_training(self, template: str, model: Any, feature_pipeline: Pipeline, train_uri: str, val_uri: str, timeout: int, target_columns: list[str], optimizer: Any, loss: Any, epochs: int, batch_size: int, group_column: str | None) -> Path` - Execute training in subprocess. +- `run_training(self, template: str, model: Any, feature_pipeline: Pipeline, train_uri: str, val_uri: str, timeout: int, target_columns: list[str], task_type: str, optimizer: Any, loss: Any, epochs: int, batch_size: int, group_column: str | None, mixed_precision: bool, dataloader_workers: int) -> Path` - Execute training in subprocess. --- ## `execution/training/runner.py` Training runner abstract base class. **`TrainingRunner`** - Abstract base class for training execution environments. -- `run_training(self, template: str, model: Any, feature_pipeline: Pipeline, train_uri: str, val_uri: str, timeout: int, target_columns: list[str]) -> Path` - Execute model training and return path to artifacts. +- `run_training(self, template: str, model: Any, feature_pipeline: Pipeline, train_uri: str, val_uri: str, timeout: int, target_columns: list[str], task_type: str) -> Path` - Execute model training and return path to artifacts. --- ## `helpers.py` @@ -312,6 +312,9 @@ Simple dataclasses for model building workflow. **`DataLayout`** - Physical structure of dataset (not semantic meaning). +**`TaskType`** - Canonical ML task type determined during Phase 1. +- `is_classification(self) -> bool` - No description + **`Metric`** - Evaluation metric definition. **`BuildContext`** - Context passed through workflow phases. @@ -452,6 +455,7 @@ Standard Keras predictor - NO Plexe dependencies. **`KerasPredictor`** - Standalone Keras predictor. - `__init__(self, model_dir: str)` - `predict(self, x: pd.DataFrame) -> pd.DataFrame` - Make predictions on input DataFrame. +- `predict_proba(self, x: pd.DataFrame) -> pd.DataFrame` - Predict per-class probabilities on input DataFrame. --- ## `templates/inference/lightgbm_predictor.py` @@ -468,6 +472,7 @@ Standard PyTorch predictor - NO Plexe dependencies. **`PyTorchPredictor`** - Standalone PyTorch predictor. - `__init__(self, model_dir: str)` - `predict(self, x: pd.DataFrame) -> pd.DataFrame` - Make predictions on input DataFrame. +- `predict_proba(self, x: pd.DataFrame) -> pd.DataFrame` - Predict per-class probabilities on input DataFrame. --- ## `templates/inference/xgboost_predictor.py` @@ -489,37 +494,37 @@ Model card template generator. Hardcoded robust CatBoost training loop. **Functions:** -- `train_catboost(untrained_model_path: Path, train_uri: str, val_uri: str, output_dir: Path, target_column: str) -> dict` - Train CatBoost model directly (no Spark). +- `train_catboost(untrained_model_path: Path, train_uri: str, val_uri: str, output_dir: Path, target_column: str, task_type: str | None) -> dict` - Train CatBoost model directly (no Spark). - `main()` - No description --- ## `templates/training/train_keras.py` -Hardcoded robust Keras training loop. +Keras training template with streaming data loading, multi-GPU (MirroredStrategy), and mixed precision. **Functions:** -- `train_keras(untrained_model_path: Path, train_uri: str, val_uri: str, output_dir: Path, target_column: str, epochs: int, batch_size: int) -> dict` - Train Keras model directly. +- `train_keras(untrained_model_path: Path, train_uri: str, val_uri: str, output_dir: Path, target_column: str, epochs: int, batch_size: int, use_multi_gpu: bool, use_mixed_precision: bool, task_type: str | None) -> dict` - Train Keras model with streaming data, optional multi-GPU, and mixed precision. --- ## `templates/training/train_lightgbm.py` Hardcoded robust LightGBM training loop. **Functions:** -- `train_lightgbm(untrained_model_path: Path, train_uri: str, val_uri: str, output_dir: Path, target_column: str, group_column: str | None) -> dict` - Train LightGBM model directly (no Spark). +- `train_lightgbm(untrained_model_path: Path, train_uri: str, val_uri: str, output_dir: Path, target_column: str, group_column: str | None, task_type: str | None) -> dict` - Train LightGBM model directly (no Spark). - `main()` - No description --- ## `templates/training/train_pytorch.py` -Hardcoded robust PyTorch training loop. +PyTorch training template with streaming data loading, multi-GPU (DDP), and mixed precision. **Functions:** -- `train_pytorch(untrained_model_path: Path, train_uri: str, val_uri: str, output_dir: Path, target_column: str, epochs: int, batch_size: int) -> dict` - Train PyTorch model directly. +- `train_pytorch(untrained_model_path: Path, train_uri: str, val_uri: str, output_dir: Path, target_column: str, epochs: int, batch_size: int, num_workers: int, use_ddp: bool, use_mixed_precision: bool, task_type: str | None) -> dict` - Train PyTorch model with streaming data, optional DDP, and mixed precision. --- ## `templates/training/train_xgboost.py` Hardcoded robust XGBoost training loop. **Functions:** -- `train_xgboost(untrained_model_path: Path, train_uri: str, val_uri: str, output_dir: Path, target_column: str, group_column: str | None) -> dict` - Train XGBoost model directly (no Spark). +- `train_xgboost(untrained_model_path: Path, train_uri: str, val_uri: str, output_dir: Path, target_column: str, group_column: str | None, task_type: str | None) -> dict` - Train XGBoost model directly (no Spark). - `main()` - No description --- @@ -624,7 +629,7 @@ Utility functions for dashboard data loading. - `load_report(exp_path: Path, report_name: str) -> dict | None` - Load YAML report from DirNames.BUILD_DIR/reports/. - `load_code_file(file_path: Path) -> str | None` - Load Python code file. - `load_parquet_sample(uri: str, limit: int) -> pd.DataFrame | None` - Load first N rows from parquet file. -- `get_parquet_row_count(uri: str) -> int | None` - Get row count from parquet file. +- `get_parquet_row_count(uri: str) -> int | None` - Get row count from parquet metadata without reading data. - `load_json_file(file_path: Path) -> dict | None` - Load JSON file. --- @@ -636,6 +641,21 @@ LiteLLM model wrapper with retry logic and optional post-call hook. - `generate(self)` - Generate with automatic retries, header injection, and post-call hook. - `chat(self)` - Chat with automatic retries, header injection, and post-call hook. +--- +## `utils/parquet_dataset.py` +Streaming parquet data loading utilities for large-dataset training. + +**`ParquetIterableDataset`** - Streaming parquet dataset for PyTorch DataLoader. +- `__init__(self, uri: str, target_column: str, task_type: str)` +- `total_rows(self) -> int` - No description + +**Functions:** +- `get_parquet_row_count(uri: str) -> int` - Get total row count from parquet metadata without reading data. +- `get_dataset_size_bytes(uri: str) -> int` - Get dataset size in bytes for a local file or directory of parquet files. +- `parquet_batch_generator(uri: str, target_column: str, batch_size: int, task_type: str | None) -> Iterator[tuple[np.ndarray, np.ndarray]]` - Streaming parquet batch generator for Keras/TensorFlow. +- `get_parquet_feature_count(uri: str, target_column: str) -> int` - Get number of feature columns (total columns minus target). +- `get_steps_per_epoch(uri: str, batch_size: int) -> int` - Compute number of steps per epoch for a parquet dataset. + --- ## `utils/reporting.py` Utilities for saving agent reports to disk. diff --git a/plexe/config.py b/plexe/config.py index 0c45108b..db287eb2 100644 --- a/plexe/config.py +++ b/plexe/config.py @@ -309,12 +309,21 @@ class Config(BaseSettings): # Training settings training_timeout: int = Field(default=1800, description="Timeout for training runs (seconds)", gt=0) nn_default_epochs: int = Field( - default=25, description="Default epochs for neural network training (Keras, PyTorch)" + default=10, description="Default epochs for neural network training (Keras, PyTorch)" ) nn_max_epochs: int = Field(default=50, description="Maximum epochs for neural network training (Keras, PyTorch)") nn_default_batch_size: int = Field( default=32, description="Default batch size for neural network training (Keras, PyTorch)" ) + nn_training_timeout: int = Field( + default=14400, description="Timeout for neural network training on full dataset (seconds)", gt=0 + ) + mixed_precision: bool = Field( + default=True, description="Use mixed precision (FP16) when GPU available (auto-disabled on CPU)" + ) + dataloader_workers: int = Field( + default=4, description="Number of DataLoader worker processes for streaming data loading", ge=0 + ) # LLM settings (per agent role) statistical_analysis_llm: str = Field( diff --git a/plexe/execution/training/local_runner.py b/plexe/execution/training/local_runner.py index a7e3e92c..de7944eb 100644 --- a/plexe/execution/training/local_runner.py +++ b/plexe/execution/training/local_runner.py @@ -2,6 +2,7 @@ Local process runner - executes training in subprocess. """ +import inspect import json import logging import os @@ -21,14 +22,35 @@ logger = logging.getLogger(__name__) +def _detect_gpu_count() -> int: + """Detect number of available CUDA GPUs via PyTorch. Returns 0 if torch or CUDA unavailable.""" + try: + import torch + + if torch.cuda.is_available(): + return torch.cuda.device_count() + except Exception: + pass + return 0 + + +def _detect_tf_gpu_count() -> int: + """Detect number of available GPUs via TensorFlow. Returns 0 if TF or GPUs unavailable.""" + try: + import tensorflow as tf + + return len(tf.config.list_physical_devices("GPU")) + except Exception: + pass + return 0 + + class LocalProcessRunner(TrainingRunner): """ Runs training in local subprocess. - Suitable for: - - Development/testing - - Small datasets that fit on single machine - - When external compute (SageMaker, EMR) is not needed + Supports single-GPU, multi-GPU (DDP via torch.distributed.run for PyTorch, + MirroredStrategy for Keras), and CPU training. """ def __init__(self, work_dir: str = "/tmp/model_training"): @@ -50,11 +72,14 @@ def run_training( val_uri: str, timeout: int, target_columns: list[str], + task_type: str = "", optimizer: Any = None, loss: Any = None, epochs: int = None, batch_size: int = None, group_column: str | None = None, + mixed_precision: bool = False, + dataloader_workers: int = 0, ) -> Path: """ Execute training in subprocess. @@ -138,9 +163,15 @@ def run_training( torch.save(model, untrained_model_path) # Save optimizer and loss configs as JSON (mirror Keras pattern) + # Filter optimizer.defaults to only include params accepted by __init__, + # since defaults can contain internal state (e.g. decoupled_weight_decay) + # that causes TypeError when passed back to the constructor. + init_params = set(inspect.signature(type(optimizer).__init__).parameters.keys()) + optimizer_config = {k: v for k, v in optimizer.defaults.items() if k in init_params} + training_config = { "optimizer_class": type(optimizer).__name__, - "optimizer_config": {k: v for k, v in optimizer.defaults.items()}, + "optimizer_config": optimizer_config, "loss_class": type(loss).__name__, } @@ -168,20 +199,46 @@ def run_training( # Note: Currently only single-target supported target_column = target_columns[0] if target_columns else "target" - cmd = [ - sys.executable, - str(template_script), - "--untrained-model", - str(untrained_model_path), - "--train-uri", - train_uri, - "--val-uri", - val_uri, - "--target-column", - target_column, - "--output", - str(output_dir), - ] + # Detect GPU availability for neural network templates (use framework-specific detection) + if "keras" in template: + gpu_count = _detect_tf_gpu_count() + elif "pytorch" in template: + gpu_count = _detect_gpu_count() + else: + gpu_count = 0 + use_torchrun = "pytorch" in template and gpu_count > 1 + + # Build command - use torch.distributed.run for multi-GPU PyTorch DDP + if use_torchrun: + cmd = [ + sys.executable, + "-m", + "torch.distributed.run", + "--nproc_per_node=auto", + "--standalone", + str(template_script), + ] + else: + cmd = [sys.executable, str(template_script)] + + cmd.extend( + [ + "--untrained-model", + str(untrained_model_path), + "--train-uri", + train_uri, + "--val-uri", + val_uri, + "--target-column", + target_column, + "--output", + str(output_dir), + ] + ) + + # Pass task type if provided (avoids re-inference in templates) + if task_type: + cmd.extend(["--task-type", task_type]) # Add neural network training params if provided (Keras and PyTorch) if "keras" in template or "pytorch" in template: @@ -190,6 +247,20 @@ def run_training( if batch_size is not None: cmd.extend(["--batch-size", str(batch_size)]) + # GPU flags + if "pytorch" in template: + if use_torchrun: + cmd.append("--ddp") + if gpu_count > 0 and mixed_precision: + cmd.append("--mixed-precision") + if dataloader_workers > 0: + cmd.extend(["--num-workers", str(dataloader_workers)]) + elif "keras" in template: + if gpu_count > 1: + cmd.append("--multi-gpu") + if gpu_count > 0 and mixed_precision: + cmd.append("--mixed-precision") + # Add ranking-specific params if provided if group_column is not None: cmd.extend(["--group-column", group_column]) diff --git a/plexe/execution/training/runner.py b/plexe/execution/training/runner.py index 9fbd7381..374e8e75 100644 --- a/plexe/execution/training/runner.py +++ b/plexe/execution/training/runner.py @@ -28,6 +28,7 @@ def run_training( val_uri: str, timeout: int, target_columns: list[str], + task_type: str = "", ) -> Path: """ Execute model training and return path to artifacts. @@ -40,6 +41,7 @@ def run_training( val_uri: URI to validation data timeout: Maximum training time (seconds) target_columns: List of target column names + task_type: Canonical task type (e.g. "binary_classification", "regression") Returns: Path to directory containing: diff --git a/plexe/models.py b/plexe/models.py index 860c947f..8bae1dc5 100644 --- a/plexe/models.py +++ b/plexe/models.py @@ -32,6 +32,24 @@ class DataLayout(str, Enum): UNSUPPORTED = "unsupported" # Data structure not supported (videos, audio, multi-column, etc.) +class TaskType(str, Enum): + """ + Canonical ML task type determined during Phase 1. + + Single source of truth for task type across the entire pipeline: + training templates, predictors, evaluation, and validation. + """ + + BINARY_CLASSIFICATION = "binary_classification" + MULTICLASS_CLASSIFICATION = "multiclass_classification" + REGRESSION = "regression" + RANKING = "learning_to_rank" + + @property + def is_classification(self) -> bool: + return self in (TaskType.BINARY_CLASSIFICATION, TaskType.MULTICLASS_CLASSIFICATION) + + # TODO(IMAGE_TEXT_SUPPORT): Add preprocessing config dataclasses here # See /IMAGE_TEXT_SUPPORT.md for implementation guide # Need: ImageAugmentationConfig, ImagePreprocessingConfig, TextPreprocessingConfig diff --git a/plexe/retrain.py b/plexe/retrain.py index b8431e59..18d7dd50 100644 --- a/plexe/retrain.py +++ b/plexe/retrain.py @@ -334,6 +334,7 @@ def reset_weights(module): val_uri=str(val_uri), timeout=config.training_timeout, target_columns=[target_column], + task_type=metadata.get("task_type", ""), **training_kwargs, ) diff --git a/plexe/templates/inference/keras_predictor.py b/plexe/templates/inference/keras_predictor.py index 20b44029..e3e4d904 100644 --- a/plexe/templates/inference/keras_predictor.py +++ b/plexe/templates/inference/keras_predictor.py @@ -5,6 +5,7 @@ Can be used standalone with just: keras, scikit-learn, pandas, cloudpickle. """ +import json import os from pathlib import Path @@ -29,11 +30,23 @@ def __init__(self, model_dir: str): Args: model_dir: Path to model package directory """ + import keras model_dir = Path(model_dir) artifacts_dir = model_dir / "artifacts" + # Load metadata for task_type-driven post-processing + metadata_path = artifacts_dir / "metadata.json" + if metadata_path.exists(): + with open(metadata_path) as f: + metadata = json.load(f) + raw_task_type = metadata.get("task_type", "") + else: + raw_task_type = "" + + self._task_type = raw_task_type + # Execute pipeline code (defines custom FunctionTransformer functions) code_path = model_dir / "src" / "pipeline.py" if code_path.exists(): @@ -65,23 +78,40 @@ def predict(self, x: pd.DataFrame) -> pd.DataFrame: # Keras predict returns probabilities/values raw_predictions = self.model.predict(x_transformed, verbose=0) - # For classification: argmax to get class - if len(raw_predictions.shape) > 1 and raw_predictions.shape[1] > 1: - # Multi-class classification + # Post-process based on task_type from metadata + if self._task_type == "binary_classification": + # Keras outputs probabilities β€” threshold at 0.5 + predictions = raw_predictions.squeeze() + predictions = (predictions > 0.5).astype(int) + elif self._task_type == "multiclass_classification": predictions = np.argmax(raw_predictions, axis=1) else: - # Binary classification or regression - if raw_predictions.shape[-1] == 1: - # Single output - squeeze to 1D - predictions = raw_predictions.squeeze() - # For binary classification, threshold at 0.5 - if predictions.max() <= 1.0 and predictions.min() >= 0.0: - predictions = (predictions > 0.5).astype(int) - else: - predictions = raw_predictions + # Regression or unknown: return raw values + predictions = raw_predictions.squeeze() return pd.DataFrame({"prediction": predictions}) + def predict_proba(self, x: pd.DataFrame) -> pd.DataFrame: + """ + Predict per-class probabilities on input DataFrame. + + Returns raw model outputs (sigmoid/softmax values) without argmax. + """ + import numpy as np + + x_transformed = self.pipeline.transform(x) + raw_predictions = self.model.predict(x_transformed, verbose=0) + + probabilities = np.asarray(raw_predictions) + if probabilities.ndim == 1: + probabilities = probabilities.reshape(-1, 1) + + if probabilities.shape[1] == 1: + probabilities = np.column_stack([1 - probabilities[:, 0], probabilities[:, 0]]) + + columns = [f"proba_{i}" for i in range(probabilities.shape[1])] + return pd.DataFrame(probabilities, columns=columns) + # ============================================ # Example Usage diff --git a/plexe/templates/inference/pytorch_predictor.py b/plexe/templates/inference/pytorch_predictor.py index c07ccee7..0faad4cd 100644 --- a/plexe/templates/inference/pytorch_predictor.py +++ b/plexe/templates/inference/pytorch_predictor.py @@ -5,6 +5,7 @@ Can be used standalone with just: torch, scikit-learn, pandas, cloudpickle. """ +import json from pathlib import Path import cloudpickle @@ -28,9 +29,21 @@ def __init__(self, model_dir: str): Args: model_dir: Path to model package directory """ + model_dir = Path(model_dir) artifacts_dir = model_dir / "artifacts" + # Load metadata for task_type-driven post-processing + metadata_path = artifacts_dir / "metadata.json" + if metadata_path.exists(): + with open(metadata_path) as f: + metadata = json.load(f) + raw_task_type = metadata.get("task_type", "") + else: + raw_task_type = "" + + self._task_type = raw_task_type + # Execute pipeline code (defines custom FunctionTransformer functions) code_path = model_dir / "src" / "pipeline.py" if code_path.exists(): @@ -42,7 +55,7 @@ def __init__(self, model_dir: str): with open(model_class_path, "rb") as f: self.model = cloudpickle.load(f) - # Load trained state dict and apply + # Load trained state dict and apply (always on CPU for portable inference) state_dict_path = artifacts_dir / "model.pt" self.model.load_state_dict(torch.load(state_dict_path, weights_only=True, map_location="cpu")) self.model.eval() @@ -76,21 +89,52 @@ def predict(self, x: pd.DataFrame) -> pd.DataFrame: raw_predictions = raw_output.detach().cpu().numpy() - # Post-process based on output shape - if len(raw_predictions.shape) > 1 and raw_predictions.shape[1] > 1: - # Multi-class classification: argmax + # Post-process based on task_type from metadata + if self._task_type == "binary_classification": + # BCEWithLogitsLoss outputs raw logits β€” apply sigmoid + threshold + predictions = raw_predictions.squeeze() + sigmoid = 1.0 / (1.0 + np.exp(-predictions)) + predictions = (sigmoid > 0.5).astype(int) + elif self._task_type == "multiclass_classification": predictions = np.argmax(raw_predictions, axis=1) else: - # Single output: squeeze to 1D + # Regression or unknown: return raw values predictions = raw_predictions.squeeze() - # Binary classification: threshold at 0.5 if output looks like probabilities - # FIXME: This heuristic can misclassify regression outputs in [0, 1]; - # we'll address this in a follow-up PR with richer task metadata. - if predictions.ndim > 0 and predictions.max() <= 1.0 and predictions.min() >= 0.0: - predictions = (predictions > 0.5).astype(int) return pd.DataFrame({"prediction": predictions}) + def predict_proba(self, x: pd.DataFrame) -> pd.DataFrame: + """ + Predict per-class probabilities on input DataFrame. + + Applies sigmoid for single-logit binary models, otherwise softmax. + """ + # Transform features through pipeline + x_transformed = self.pipeline.transform(x) + + # Handle sparse matrix output (e.g. from OneHotEncoder, CountVectorizer) + if scipy.sparse.issparse(x_transformed): + x_transformed = x_transformed.toarray() + + x_tensor = torch.tensor(np.array(x_transformed, dtype=np.float32)) + + with torch.no_grad(): + raw_output = self.model(x_tensor) + + raw_output = raw_output.detach().cpu() + if raw_output.ndim == 1: + raw_output = raw_output.unsqueeze(1) + + if raw_output.shape[1] == 1: + proba_pos = torch.sigmoid(raw_output).squeeze(1) + probabilities = torch.stack([1 - proba_pos, proba_pos], dim=1) + else: + probabilities = torch.softmax(raw_output, dim=1) + + probabilities = probabilities.numpy() + columns = [f"proba_{i}" for i in range(probabilities.shape[1])] + return pd.DataFrame(probabilities, columns=columns) + # ============================================ # Example Usage diff --git a/plexe/templates/training/train_catboost.py b/plexe/templates/training/train_catboost.py index eba674d8..e9558d0d 100644 --- a/plexe/templates/training/train_catboost.py +++ b/plexe/templates/training/train_catboost.py @@ -34,6 +34,7 @@ def train_catboost( val_uri: str, output_dir: Path, target_column: str, + task_type: str | None = None, ) -> dict: """ Train CatBoost model directly (no Spark). @@ -158,6 +159,13 @@ def train_catboost( logger.info(f"LabelEncoder saved to {shorten(str(encoder_path), 30)}") # Step 8: Save Metadata + if not task_type: + if is_classification: + n_classes = len(np.unique(y_train)) + task_type = "multiclass_classification" if n_classes > 2 else "binary_classification" + else: + task_type = "regression" + metadata = { "model_type": "catboost", "training_mode": "direct", @@ -166,7 +174,7 @@ def train_catboost( "best_score": model.best_score_, # Nested dict - save as-is (JSON handles this fine) "n_features": X_train.shape[1], "target_column": target_column, - "task_type": "classification" if is_classification else "regression", + "task_type": task_type, "train_samples": len(X_train), "val_samples": len(X_val), } @@ -191,6 +199,7 @@ def main(): parser.add_argument("--val-uri", required=True, help="Validation data URI") parser.add_argument("--target-column", required=True, help="Target column name") parser.add_argument("--output", required=True, help="Output directory") + parser.add_argument("--task-type", required=False, default=None, help="Canonical task type") args = parser.parse_args() @@ -204,6 +213,7 @@ def main(): val_uri=args.val_uri, output_dir=Path(args.output), target_column=args.target_column, + task_type=args.task_type, ) logger.info("Training complete!") diff --git a/plexe/templates/training/train_keras.py b/plexe/templates/training/train_keras.py index 83cbb60b..9c6b46f7 100644 --- a/plexe/templates/training/train_keras.py +++ b/plexe/templates/training/train_keras.py @@ -1,7 +1,11 @@ """ -Hardcoded robust Keras training loop. +Keras training template with streaming data loading, multi-GPU (MirroredStrategy), and mixed precision. -Trains Keras 3 models directly with numpy arrays from parquet. +Supports: +- Streaming parquet data via tf.data.Dataset + generator (handles 100GB+ datasets) +- Single GPU, multi-GPU (MirroredStrategy), and CPU training +- Mixed precision (FP16) for faster training and lower memory usage +- EarlyStopping with best model restoration """ import argparse @@ -15,25 +19,93 @@ os.environ["KERAS_BACKEND"] = "tensorflow" import keras -import pandas as pd - +import numpy as np +import pyarrow.parquet as pq +import tensorflow as tf + +from plexe.utils.parquet_dataset import ( + get_parquet_feature_count, + get_parquet_row_count, +) from plexe.utils.s3 import download_s3_uri logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", stream=sys.stdout) logger = logging.getLogger(__name__) +_STREAMING_THRESHOLD_ROWS = 100_000 +_STREAMING_THRESHOLD_BYTES = 1_000_000_000 # 1 GB + + +def _create_tf_dataset( + uri: str, + target_column: str, + batch_size: int, + n_features: int, + total_rows: int, + task_type: str | None = None, +) -> tf.data.Dataset: + """Create a tf.data.Dataset from parquet files. + + For small datasets (< 100k rows and < 1GB): loads into memory via tensor slices (fast, no threads). + For large datasets: streams row-by-row via from_generator (memory-efficient). + """ + import pandas as pd + + from plexe.utils.parquet_dataset import get_dataset_size_bytes + + y_np_dtype = np.int32 if task_type == "multiclass_classification" else np.float32 + y_tf_dtype = tf.int32 if task_type == "multiclass_classification" else tf.float32 + + dataset_bytes = get_dataset_size_bytes(uri) + if total_rows < _STREAMING_THRESHOLD_ROWS and dataset_bytes < _STREAMING_THRESHOLD_BYTES: + # Small dataset: load fully into memory (fast, avoids TF generator threading issues) + df = pd.read_parquet(uri) + feature_cols = [c for c in df.columns if c != target_column] + X = df[feature_cols].values.astype(np.float32) + y = df[target_column].values.astype(y_np_dtype) + dataset = tf.data.Dataset.from_tensor_slices((X, y)) + return dataset.batch(batch_size).prefetch(tf.data.AUTOTUNE) + else: + # Large dataset: stream from parquet to avoid OOM + from plexe.utils.parquet_dataset import _resolve_parquet_files + + def row_generator(): + files = _resolve_parquet_files(uri) + for file_path in files: + parquet_file = pq.ParquetFile(file_path) + columns = [c for c in parquet_file.schema_arrow.names if c != target_column] + for batch in parquet_file.iter_batches(batch_size=4096, columns=columns + [target_column]): + batch_df = batch.to_pandas() + X_batch = batch_df[columns].values.astype(np.float32) + y_batch = batch_df[target_column].values.astype(y_np_dtype) + for i in range(len(X_batch)): + yield X_batch[i], y_batch[i] + + dataset = tf.data.Dataset.from_generator( + row_generator, + output_signature=( + tf.TensorSpec(shape=(n_features,), dtype=tf.float32), + tf.TensorSpec(shape=(), dtype=y_tf_dtype), + ), + ) + return dataset.batch(batch_size).prefetch(tf.data.AUTOTUNE) + + def train_keras( untrained_model_path: Path, train_uri: str, val_uri: str, output_dir: Path, target_column: str, - epochs: int = 50, + epochs: int = 10, batch_size: int = 32, + use_multi_gpu: bool = False, + use_mixed_precision: bool = False, + task_type: str | None = None, ) -> dict: """ - Train Keras model directly. + Train Keras model with streaming data, optional multi-GPU, and mixed precision. Args: untrained_model_path: Path to .keras model file @@ -43,74 +115,109 @@ def train_keras( target_column: Target column name epochs: Number of epochs batch_size: Batch size - - Returns: - Training metadata + use_multi_gpu: Whether to use MirroredStrategy for multi-GPU + use_mixed_precision: Whether to use FP16 mixed precision """ output_dir.mkdir(parents=True, exist_ok=True) - # Load model - logger.info(f"Loading model from {untrained_model_path}...") - model = keras.models.load_model(untrained_model_path) - logger.info(f"Model loaded: {type(model).__name__}") - - # Load optimizer/loss config + # ============================================ + # Step 1: Setup GPU and distribution strategy + # ============================================ + gpus = tf.config.list_physical_devices("GPU") + gpu_count = len(gpus) + + if use_mixed_precision and gpu_count > 0: + keras.mixed_precision.set_global_policy("mixed_float16") + logger.info("Mixed precision (FP16) enabled") + else: + use_mixed_precision = False + + strategy = None + if use_multi_gpu and gpu_count > 1: + strategy = tf.distribute.MirroredStrategy() + logger.info(f"MirroredStrategy: {gpu_count} GPUs") + elif gpu_count > 0: + logger.info(f"Single GPU training ({gpu_count} GPU(s) available)") + else: + logger.info("Training on CPU") + + # ============================================ + # Step 2: Load model and compile (inside strategy scope if multi-GPU) + # ============================================ config_path = untrained_model_path.parent / "training_config.json" logger.info(f"Loading training config from {config_path}...") with open(config_path) as f: training_config = json.load(f) - # Recreate optimizer - optimizer_class = getattr(keras.optimizers, training_config["optimizer_class"]) - optimizer = optimizer_class.from_config(training_config["optimizer_config"]) - logger.info(f"Optimizer: {type(optimizer).__name__}") + def _load_and_compile(): + model = keras.models.load_model(untrained_model_path) + logger.info(f"Model loaded: {type(model).__name__}") - # Recreate loss - loss_class = getattr(keras.losses, training_config["loss_class"]) - loss = loss_class.from_config(training_config["loss_config"]) - logger.info(f"Loss: {type(loss).__name__}") + optimizer_class = getattr(keras.optimizers, training_config["optimizer_class"]) + optimizer = optimizer_class.from_config(training_config["optimizer_config"]) + loss_class = getattr(keras.losses, training_config["loss_class"]) + loss = loss_class.from_config(training_config["loss_config"]) + + metrics = None if task_type == "regression" else ["accuracy"] + logger.info(f"Optimizer: {type(optimizer).__name__}, Loss: {type(loss).__name__}, Metrics: {metrics}") + model.compile(optimizer=optimizer, loss=loss, metrics=metrics, jit_compile=False) + return model + + if strategy is not None: + with strategy.scope(): + model = _load_and_compile() + else: + model = _load_and_compile() - # Compile (jit_compile=False to avoid deadlocks on some systems) - logger.info("Compiling model...") - model.compile(optimizer=optimizer, loss=loss, metrics=["accuracy"], jit_compile=False) logger.info("Model compiled successfully") - # Download from S3 if needed + # ============================================ + # Step 3: Download from S3 if needed + # ============================================ if train_uri.startswith("s3://"): train_uri = download_s3_uri(train_uri) if val_uri.startswith("s3://"): val_uri = download_s3_uri(val_uri) - # Load training data - logger.info(f"Loading training data from {train_uri}...") - train_df = pd.read_parquet(train_uri) - logger.info(f"Training data shape: {train_df.shape}") - - X_train = train_df.drop(columns=[target_column]).values - y_train = train_df[target_column].values - - # Load validation data - logger.info(f"Loading validation data from {val_uri}...") - val_df = pd.read_parquet(val_uri) - logger.info(f"Validation data shape: {val_df.shape}") - - X_val = val_df.drop(columns=[target_column]).values - y_val = val_df[target_column].values - - # Train - logger.info(f"Training for {epochs} epochs, batch_size={batch_size}...") + # ============================================ + # Step 4: Create streaming data pipelines + # ============================================ + n_features = get_parquet_feature_count(train_uri, target_column) + train_rows = get_parquet_row_count(train_uri) + val_rows = get_parquet_row_count(val_uri) + logger.info(f"Training data: {train_rows} rows, {n_features} features (streaming)") + logger.info(f"Validation data: {val_rows} rows (streaming)") + + train_dataset = _create_tf_dataset(train_uri, target_column, batch_size, n_features, train_rows, task_type) + val_dataset = _create_tf_dataset(val_uri, target_column, batch_size, n_features, val_rows, task_type) + + # ============================================ + # Step 5: Train with EarlyStopping + # ============================================ + callbacks = [ + keras.callbacks.EarlyStopping(monitor="val_loss", patience=3, restore_best_weights=True, verbose=1), + ] + + logger.info(f"Training for up to {epochs} epochs, batch_size={batch_size}...") history = model.fit( - X_train, - y_train, - validation_data=(X_val, y_val), + train_dataset, + validation_data=val_dataset, epochs=epochs, - batch_size=batch_size, - verbose=2, # Per-epoch logging (verbose=1 progress bars don't work well in pipes) + callbacks=callbacks, + verbose=2, ) - logger.info("Training complete!") + # Determine if early stopping occurred + actual_epochs = len(history.history["loss"]) + early_stopped = actual_epochs < epochs + + logger.info( + f"Training complete! Ran {actual_epochs}/{epochs} epochs" + (" (early stopped)" if early_stopped else "") + ) - # Create artifacts/ subdirectory (aligned with final packaging structure) + # ============================================ + # Step 6: Save artifacts + # ============================================ artifacts_dir = output_dir / "artifacts" artifacts_dir.mkdir(exist_ok=True) @@ -124,24 +231,29 @@ def train_keras( history_data = {key: [float(v) for v in values] for key, values in history.history.items()} with open(history_path, "w") as f: json.dump(history_data, f, indent=2) - logger.info(f"History saved to {history_path}") - # FIXME: metadata should include optimizer_class, optimizer_config, loss_class, and - # loss_config so that retrain.py can reconstruct actual Keras objects. Without these, - # the Keras retrain path in retrain.py will crash (it passes strings where objects are expected). - # See the PyTorch training template for the correct pattern. - # Save metadata + # Save metadata (including optimizer/loss config for faithful retraining) metadata = { "model_type": "keras", "training_mode": "direct", - "epochs": epochs, + "task_type": task_type or "", + "epochs": actual_epochs, + "max_epochs": epochs, "batch_size": batch_size, - "n_features": X_train.shape[1], + "n_features": n_features, "target_column": target_column, - "train_samples": len(X_train), - "val_samples": len(X_val), + "train_samples": train_rows, + "val_samples": val_rows, "final_train_loss": float(history.history["loss"][-1]), "final_val_loss": float(history.history["val_loss"][-1]), + "optimizer_class": training_config["optimizer_class"], + "optimizer_config": training_config["optimizer_config"], + "loss_class": training_config["loss_class"], + "loss_config": training_config["loss_config"], + "gpu_count": gpu_count, + "mixed_precision": use_mixed_precision, + "distributed": use_multi_gpu and strategy is not None, + "early_stopped_epoch": actual_epochs if early_stopped else None, } metadata_path = artifacts_dir / "metadata.json" @@ -159,8 +271,11 @@ def train_keras( parser.add_argument("--val-uri", required=True, help="Validation data URI") parser.add_argument("--target-column", required=True, help="Target column name") parser.add_argument("--output", required=True, help="Output directory") - parser.add_argument("--epochs", type=int, default=5, help="Number of epochs") + parser.add_argument("--epochs", type=int, default=10, help="Number of epochs") parser.add_argument("--batch-size", type=int, default=32, help="Batch size") + parser.add_argument("--multi-gpu", action="store_true", help="Enable MirroredStrategy for multi-GPU") + parser.add_argument("--mixed-precision", action="store_true", help="Enable FP16 mixed precision") + parser.add_argument("--task-type", required=False, default=None, help="Canonical task type") args = parser.parse_args() @@ -172,6 +287,9 @@ def train_keras( target_column=args.target_column, epochs=args.epochs, batch_size=args.batch_size, + use_multi_gpu=args.multi_gpu, + use_mixed_precision=args.mixed_precision, + task_type=args.task_type, ) logger.info("Script complete!") diff --git a/plexe/templates/training/train_lightgbm.py b/plexe/templates/training/train_lightgbm.py index e8c771fb..af991bde 100644 --- a/plexe/templates/training/train_lightgbm.py +++ b/plexe/templates/training/train_lightgbm.py @@ -35,6 +35,7 @@ def train_lightgbm( output_dir: Path, target_column: str, group_column: str | None = None, + task_type: str | None = None, ) -> dict: """ Train LightGBM model directly (no Spark). @@ -201,12 +202,14 @@ def train_lightgbm( logger.info(f"LabelEncoder saved to {shorten(str(encoder_path), 30)}") # Step 8: Save Metadata - if isinstance(model, LGBMRanker): - task_type = "ranking" - elif isinstance(model, LGBMClassifier): - task_type = "classification" - else: - task_type = "regression" + if not task_type: + if isinstance(model, LGBMRanker): + task_type = "learning_to_rank" + elif isinstance(model, LGBMClassifier): + n_classes = len(np.unique(y_train)) + task_type = "multiclass_classification" if n_classes > 2 else "binary_classification" + else: + task_type = "regression" metadata = { "model_type": "lightgbm", @@ -249,6 +252,7 @@ def main(): "--group-column", required=False, default=None, help="Group column for ranking (query_id, session_id)" ) parser.add_argument("--output", required=True, help="Output directory") + parser.add_argument("--task-type", required=False, default=None, help="Canonical task type") args = parser.parse_args() @@ -259,6 +263,7 @@ def main(): output_dir=Path(args.output), target_column=args.target_column, group_column=args.group_column, + task_type=args.task_type, ) logger.info("Training complete!") diff --git a/plexe/templates/training/train_pytorch.py b/plexe/templates/training/train_pytorch.py index a6b10a00..52085273 100644 --- a/plexe/templates/training/train_pytorch.py +++ b/plexe/templates/training/train_pytorch.py @@ -1,149 +1,221 @@ """ -Hardcoded robust PyTorch training loop. +PyTorch training template with streaming data loading, multi-GPU (DDP), and mixed precision. -Trains PyTorch models directly with DataLoader-based batching. +Supports: +- Streaming parquet data via ParquetIterableDataset (handles 100GB+ datasets) +- Single GPU, multi-GPU (DDP via torchrun), and CPU training +- Mixed precision (FP16) for faster training and lower memory usage +- Best model checkpointing to disk (not memory) """ import argparse -import copy import inspect import json import logging +import multiprocessing as mp +import os import sys from pathlib import Path import cloudpickle -import pandas as pd import torch +import torch.distributed as dist import torch.nn as nn -from torch.utils.data import DataLoader, TensorDataset +from plexe.utils.parquet_dataset import ( + ParquetIterableDataset, + get_parquet_feature_count, + get_parquet_row_count, +) from plexe.utils.s3 import download_s3_uri logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", stream=sys.stdout) logger = logging.getLogger(__name__) +def _infer_task_type(loss_fn: nn.Module) -> str: + """Infer task type from loss function (legacy fallback when --task-type not provided).""" + if isinstance(loss_fn, nn.CrossEntropyLoss): + return "multiclass_classification" + elif isinstance(loss_fn, nn.BCEWithLogitsLoss): + return "binary_classification" + return "regression" + + +def _is_rank0(use_ddp: bool) -> bool: + """Check if this is rank 0 (or non-distributed).""" + if not use_ddp: + return True + return dist.get_rank() == 0 + + +def _resolve_num_workers(requested_workers: int) -> int: + """Resolve safe DataLoader worker count for the current runtime.""" + if requested_workers <= 0: + return 0 + + start_method = mp.get_start_method(allow_none=True) + if start_method is None: + start_method = mp.get_context().get_start_method() + + if sys.platform == "darwin" and start_method == "spawn": + logger.warning( + "Falling back DataLoader workers from %s to 0 on platform=%s start_method=%s", + requested_workers, + sys.platform, + start_method, + ) + return 0 + + return requested_workers + + def train_pytorch( untrained_model_path: Path, train_uri: str, val_uri: str, output_dir: Path, target_column: str, - epochs: int = 25, + epochs: int = 10, batch_size: int = 32, + num_workers: int = 0, + use_ddp: bool = False, + use_mixed_precision: bool = False, + task_type: str | None = None, ) -> dict: """ - Train PyTorch model directly. + Train PyTorch model with streaming data, optional DDP, and mixed precision. Args: untrained_model_path: Path to untrained model (pkl via torch.save) - train_uri: Training data parquet - val_uri: Validation data parquet + train_uri: Training data parquet (file or directory) + val_uri: Validation data parquet (file or directory) output_dir: Where to save outputs target_column: Target column name epochs: Number of training epochs batch_size: Batch size for DataLoader - - Returns: - Training metadata + num_workers: Number of DataLoader worker processes + use_ddp: Whether DDP is active (set by torchrun launcher) + use_mixed_precision: Whether to use FP16 mixed precision """ output_dir.mkdir(parents=True, exist_ok=True) - # Step 1: Load untrained model - logger.info(f"Loading untrained model from {untrained_model_path}...") - model = torch.load(untrained_model_path, weights_only=False) - logger.info(f"Model loaded: {type(model).__name__}") + # ============================================ + # Step 1: Setup device and distributed training + # ============================================ + if use_ddp: + if not torch.cuda.is_available(): + raise RuntimeError("DDP requires CUDA but no GPU is available. Remove --ddp to train on CPU.") + dist.init_process_group(backend="nccl") + local_rank = int(os.environ.get("LOCAL_RANK", 0)) + device = torch.device(f"cuda:{local_rank}") + torch.cuda.set_device(device) + gpu_count = dist.get_world_size() + logger.info(f"DDP initialized: rank {dist.get_rank()}/{gpu_count}, device {device}") + elif torch.cuda.is_available(): + device = torch.device("cuda:0") + gpu_count = torch.cuda.device_count() + logger.info(f"Single GPU training on {device} ({gpu_count} GPU(s) available)") + else: + device = torch.device("cpu") + gpu_count = 0 + use_mixed_precision = False # AMP requires CUDA + logger.info("Training on CPU") + + rank0 = _is_rank0(use_ddp) + + # ============================================ + # Step 2: Load untrained model + # ============================================ + if rank0: + logger.info(f"Loading untrained model from {untrained_model_path}...") + model = torch.load(untrained_model_path, weights_only=False, map_location="cpu") + model = model.to(device) + + if use_ddp: + model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[local_rank]) + + if rank0: + logger.info(f"Model loaded: {type(model).__name__}") - # Step 2: Load optimizer/loss config + # ============================================ + # Step 3: Load optimizer/loss config + # ============================================ config_path = untrained_model_path.parent / "training_config.json" - logger.info(f"Loading training config from {config_path}...") with open(config_path) as f: training_config = json.load(f) - # Recreate optimizer (needs model.parameters()) + # Recreate optimizer (needs model.parameters() β€” works with DDP wrapper) optimizer_class = getattr(torch.optim, training_config["optimizer_class"]) optimizer_config = training_config.get("optimizer_config", {}) - if optimizer_config: - signature = inspect.signature(optimizer_class.__init__) - params = signature.parameters - accepts_kwargs = any(param.kind == inspect.Parameter.VAR_KEYWORD for param in params.values()) - if not accepts_kwargs: - allowed = {name for name in params if name not in ("self", "params")} - filtered_config = {key: value for key, value in optimizer_config.items() if key in allowed} - dropped_keys = sorted(set(optimizer_config) - set(filtered_config)) - if dropped_keys: - logger.warning( - "Dropping unsupported optimizer args for %s: %s", - optimizer_class.__name__, - dropped_keys, - ) - optimizer_config = filtered_config optimizer = optimizer_class(model.parameters(), **optimizer_config) - logger.info(f"Optimizer: {type(optimizer).__name__}") # Recreate loss loss_class = getattr(nn, training_config["loss_class"]) loss_fn = loss_class() - logger.info(f"Loss: {type(loss_fn).__name__}") - # Step 3: Download from S3 if needed + if not task_type: + task_type = _infer_task_type(loss_fn) + if rank0: + logger.info(f"Optimizer: {type(optimizer).__name__}, Loss: {type(loss_fn).__name__}, Task: {task_type}") + + # ============================================ + # Step 4: Download from S3 if needed + # ============================================ if train_uri.startswith("s3://"): train_uri = download_s3_uri(train_uri) if val_uri.startswith("s3://"): val_uri = download_s3_uri(val_uri) - # Step 4: Load data and convert to tensors - logger.info(f"Loading training data from {train_uri}...") - train_df = pd.read_parquet(train_uri) - logger.info(f"Training data shape: {train_df.shape}") - - X_train = torch.tensor(train_df.drop(columns=[target_column]).values, dtype=torch.float32) - y_train_raw = train_df[target_column].values - - logger.info(f"Loading validation data from {val_uri}...") - val_df = pd.read_parquet(val_uri) - logger.info(f"Validation data shape: {val_df.shape}") - - X_val = torch.tensor(val_df.drop(columns=[target_column]).values, dtype=torch.float32) - y_val_raw = val_df[target_column].values - - # Determine task type from loss function and target values - is_classification = isinstance(loss_fn, nn.CrossEntropyLoss) - is_binary = isinstance(loss_fn, nn.BCEWithLogitsLoss) - - if is_classification: - # CrossEntropyLoss expects long targets - y_train = torch.tensor(y_train_raw, dtype=torch.long) - y_val = torch.tensor(y_val_raw, dtype=torch.long) - task_type = "classification" - elif is_binary: - y_train = torch.tensor(y_train_raw, dtype=torch.float32).unsqueeze(1) - y_val = torch.tensor(y_val_raw, dtype=torch.float32).unsqueeze(1) - task_type = "classification" - else: - # Regression - y_train = torch.tensor(y_train_raw, dtype=torch.float32).unsqueeze(1) - y_val = torch.tensor(y_val_raw, dtype=torch.float32).unsqueeze(1) - task_type = "regression" + # ============================================ + # Step 5: Create streaming DataLoaders + # ============================================ + train_dataset = ParquetIterableDataset(train_uri, target_column, task_type) + val_dataset = ParquetIterableDataset(val_uri, target_column, task_type) - # Step 5: Create DataLoaders - train_dataset = TensorDataset(X_train, y_train) - val_dataset = TensorDataset(X_val, y_val) + effective_num_workers = _resolve_num_workers(num_workers) - train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) - val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False) + train_loader = torch.utils.data.DataLoader( + train_dataset, + batch_size=batch_size, + num_workers=effective_num_workers, + pin_memory=device.type == "cuda", + ) + val_loader = torch.utils.data.DataLoader( + val_dataset, + batch_size=batch_size, + num_workers=effective_num_workers, + pin_memory=device.type == "cuda", + ) - # Step 6: Training loop - logger.info(f"Training for {epochs} epochs, batch_size={batch_size}...") + n_features = get_parquet_feature_count(train_uri, target_column) + train_rows = get_parquet_row_count(train_uri) + val_rows = get_parquet_row_count(val_uri) - device = torch.device("cuda" if torch.cuda.is_available() else "cpu") - model = model.to(device) - logger.info(f"Training on device: {device}") + if rank0: + logger.info("Using ParquetIterableDataset for streaming data loading") + logger.info(f"Training data: {train_rows} rows, {n_features} features (streaming)") + logger.info(f"Validation data: {val_rows} rows (streaming)") + logger.info(f"DataLoader workers: requested={num_workers}, effective={effective_num_workers}") + + # ============================================ + # Step 6: Setup mixed precision + # ============================================ + scaler = torch.amp.GradScaler("cuda") if use_mixed_precision else None + autocast_ctx = torch.amp.autocast("cuda") if use_mixed_precision else torch.amp.autocast("cpu", enabled=False) + + if rank0 and use_mixed_precision: + logger.info("Mixed precision (FP16) enabled") + + # ============================================ + # Step 7: Training loop + # ============================================ + if rank0: + logger.info(f"Training for {epochs} epochs, batch_size={batch_size}...") history = {"train_loss": [], "val_loss": []} best_val_loss = float("inf") - best_model_state = None + best_checkpoint_path = None for epoch in range(epochs): # Train @@ -155,15 +227,22 @@ def train_pytorch( X_batch, y_batch = X_batch.to(device), y_batch.to(device) optimizer.zero_grad() - output = model(X_batch) - loss = loss_fn(output, y_batch) - loss.backward() - optimizer.step() + with autocast_ctx: + output = model(X_batch) + loss = loss_fn(output, y_batch) + + if scaler is not None: + scaler.scale(loss).backward() + scaler.step(optimizer) + scaler.update() + else: + loss.backward() + optimizer.step() train_loss_sum += loss.item() train_batches += 1 - avg_train_loss = train_loss_sum / train_batches + avg_train_loss = train_loss_sum / max(train_batches, 1) # Validate model.eval() @@ -173,78 +252,103 @@ def train_pytorch( with torch.no_grad(): for X_batch, y_batch in val_loader: X_batch, y_batch = X_batch.to(device), y_batch.to(device) - output = model(X_batch) - loss = loss_fn(output, y_batch) + with autocast_ctx: + output = model(X_batch) + loss = loss_fn(output, y_batch) val_loss_sum += loss.item() val_batches += 1 - avg_val_loss = val_loss_sum / val_batches + avg_val_loss = val_loss_sum / max(val_batches, 1) history["train_loss"].append(avg_train_loss) history["val_loss"].append(avg_val_loss) - # Track best model - if avg_val_loss < best_val_loss: + # Track best model β€” save checkpoint to disk instead of memory. + # Note: only rank 0 updates best_val_loss and saves checkpoints. Non-rank-0 + # processes retain best_val_loss=inf, which is intentional β€” only rank 0's + # history and artifacts are used downstream. + if avg_val_loss < best_val_loss and rank0: best_val_loss = avg_val_loss - best_model_state = copy.deepcopy(model.state_dict()) + # Get the underlying model (unwrap DDP if needed) + raw_model = model.module if use_ddp else model + best_checkpoint_path = output_dir / "_best_checkpoint.pt" + torch.save(raw_model.state_dict(), best_checkpoint_path) - if (epoch + 1) % 5 == 0 or epoch == 0: + if rank0 and ((epoch + 1) % 5 == 0 or epoch == 0): logger.info( f" Epoch {epoch + 1}/{epochs} - train_loss: {avg_train_loss:.4f}, val_loss: {avg_val_loss:.4f}" ) - logger.info("Training complete!") - - # Restore best model weights - if best_model_state is not None: - model.load_state_dict(best_model_state) - logger.info(f"Restored best model (val_loss: {best_val_loss:.4f})") - - # Step 7: Save artifacts - artifacts_dir = output_dir / "artifacts" - artifacts_dir.mkdir(exist_ok=True) - - # Save model state dict - model_path = artifacts_dir / "model.pt" - model_cpu = model.to("cpu") - torch.save(model_cpu.state_dict(), model_path) - logger.info(f"Model state dict saved to {model_path}") - - # Save model class definition via cloudpickle (needed to reconstruct at inference) - model_class_path = artifacts_dir / "model_class.pkl" - with open(model_class_path, "wb") as f: - cloudpickle.dump(model_cpu, f) - logger.info(f"Model class saved to {model_class_path}") - - # Save training history - history_path = artifacts_dir / "history.json" - with open(history_path, "w") as f: - json.dump(history, f, indent=2) - logger.info(f"History saved to {history_path}") - - # Save metadata (includes optimizer/loss config for faithful retraining) - metadata = { - "model_type": "pytorch", - "training_mode": "direct", - "epochs": epochs, - "batch_size": batch_size, - "best_val_loss": best_val_loss, - "n_features": X_train.shape[1], - "target_column": target_column, - "task_type": task_type, - "train_samples": len(X_train), - "val_samples": len(X_val), - "final_train_loss": history["train_loss"][-1], - "final_val_loss": history["val_loss"][-1], - "optimizer_class": type(optimizer).__name__, - "optimizer_config": {k: v for k, v in optimizer.defaults.items()}, - "loss_class": type(loss_fn).__name__, - } - - metadata_path = artifacts_dir / "metadata.json" - with open(metadata_path, "w") as f: - json.dump(metadata, f, indent=2) - logger.info(f"Metadata saved to {metadata_path}") + if rank0: + logger.info("Training complete!") + + # ============================================ + # Step 8: Save artifacts (rank 0 only) + # ============================================ + metadata = {} + if rank0: + artifacts_dir = output_dir / "artifacts" + artifacts_dir.mkdir(exist_ok=True) + + # Restore best model weights + raw_model = model.module if use_ddp else model + if best_checkpoint_path and best_checkpoint_path.exists(): + raw_model.load_state_dict(torch.load(best_checkpoint_path, weights_only=True, map_location=device)) + best_checkpoint_path.unlink() # Clean up temp checkpoint + logger.info(f"Restored best model (val_loss: {best_val_loss:.4f})") + + # Save model state dict + model_cpu = raw_model.to("cpu") + model_path = artifacts_dir / "model.pt" + torch.save(model_cpu.state_dict(), model_path) + logger.info(f"Model state dict saved to {model_path}") + + # Save model class definition via cloudpickle (needed to reconstruct at inference) + model_class_path = artifacts_dir / "model_class.pkl" + with open(model_class_path, "wb") as f: + cloudpickle.dump(model_cpu, f) + logger.info(f"Model class saved to {model_class_path}") + + # Save training history + history_path = artifacts_dir / "history.json" + with open(history_path, "w") as f: + json.dump(history, f, indent=2) + + init_params = set(inspect.signature(type(optimizer).__init__).parameters.keys()) + + # Save metadata + metadata = { + "model_type": "pytorch", + "training_mode": "direct", + "epochs": epochs, + "batch_size": batch_size, + "best_val_loss": best_val_loss, + "n_features": n_features, + "target_column": target_column, + "task_type": task_type, + "train_samples": train_rows, + "val_samples": val_rows, + "final_train_loss": history["train_loss"][-1], + "final_val_loss": history["val_loss"][-1], + "optimizer_class": type(optimizer).__name__, + "optimizer_config": {k: v for k, v in optimizer.defaults.items() if k in init_params}, + "loss_class": type(loss_fn).__name__, + "gpu_count": gpu_count, + "mixed_precision": use_mixed_precision, + "device": str(device), + "distributed": use_ddp, + } + + metadata_path = artifacts_dir / "metadata.json" + with open(metadata_path, "w") as f: + json.dump(metadata, f, indent=2) + logger.info(f"Metadata saved to {metadata_path}") + + # ============================================ + # Step 9: Cleanup distributed training + # ============================================ + if use_ddp: + dist.destroy_process_group() return metadata @@ -256,8 +360,12 @@ def train_pytorch( parser.add_argument("--val-uri", required=True, help="Validation data URI") parser.add_argument("--target-column", required=True, help="Target column name") parser.add_argument("--output", required=True, help="Output directory") - parser.add_argument("--epochs", type=int, default=25, help="Number of epochs") + parser.add_argument("--epochs", type=int, default=10, help="Number of epochs") parser.add_argument("--batch-size", type=int, default=32, help="Batch size") + parser.add_argument("--num-workers", type=int, default=0, help="DataLoader worker processes") + parser.add_argument("--ddp", action="store_true", help="Enable DDP (set by torchrun)") + parser.add_argument("--mixed-precision", action="store_true", help="Enable FP16 mixed precision") + parser.add_argument("--task-type", required=False, default=None, help="Canonical task type") args = parser.parse_args() @@ -269,6 +377,10 @@ def train_pytorch( target_column=args.target_column, epochs=args.epochs, batch_size=args.batch_size, + num_workers=args.num_workers, + use_ddp=args.ddp, + use_mixed_precision=args.mixed_precision, + task_type=args.task_type, ) logger.info("Script complete!") diff --git a/plexe/templates/training/train_xgboost.py b/plexe/templates/training/train_xgboost.py index 963fa37e..b81d8c26 100644 --- a/plexe/templates/training/train_xgboost.py +++ b/plexe/templates/training/train_xgboost.py @@ -35,6 +35,7 @@ def train_xgboost( output_dir: Path, target_column: str, group_column: str | None = None, + task_type: str | None = None, ) -> dict: """ Train XGBoost model directly (no Spark). @@ -205,12 +206,14 @@ def train_xgboost( logger.info(f"LabelEncoder saved to {shorten(str(encoder_path), 30)}") # Step 8: Save Metadata - if isinstance(model, XGBRanker): - task_type = "ranking" - elif isinstance(model, XGBClassifier): - task_type = "classification" - else: - task_type = "regression" + if not task_type: + if isinstance(model, XGBRanker): + task_type = "learning_to_rank" + elif isinstance(model, XGBClassifier): + n_classes = len(np.unique(y_train)) + task_type = "multiclass_classification" if n_classes > 2 else "binary_classification" + else: + task_type = "regression" metadata = { "model_type": "xgboost", @@ -252,6 +255,7 @@ def main(): "--group-column", required=False, default=None, help="Group column for ranking (query_id, session_id)" ) parser.add_argument("--output", required=True, help="Output directory") + parser.add_argument("--task-type", required=False, default=None, help="Canonical task type") args = parser.parse_args() @@ -266,6 +270,7 @@ def main(): output_dir=Path(args.output), target_column=args.target_column, group_column=args.group_column, + task_type=args.task_type, ) logger.info("Training complete!") diff --git a/plexe/tools/submission.py b/plexe/tools/submission.py index bca41320..36c41500 100644 --- a/plexe/tools/submission.py +++ b/plexe/tools/submission.py @@ -14,7 +14,7 @@ from smolagents import tool from plexe.constants import DirNames, ScratchKeys -from plexe.models import BuildContext, Metric, Hypothesis, UnifiedPlan +from plexe.models import BuildContext, Metric, Hypothesis, TaskType, UnifiedPlan from plexe.search.insight_store import InsightStore from plexe.utils.tracing import tool_span from plexe.validation.validators import validate_sklearn_pipeline, validate_pipeline_consistency @@ -210,7 +210,7 @@ def save_model(model: Any, optimizer: Any, loss: Any, epochs: int, batch_size: i model: Keras model instance (keras.Model) optimizer: Optimizer instance (keras.optimizers.Optimizer) loss: Loss instance (keras.losses.Loss) - epochs: Number of training epochs (e.g., 50) + epochs: Number of training epochs (e.g., 10) batch_size: Batch size for training (e.g., 32) Returns: @@ -259,7 +259,7 @@ def save_model(model: Any, optimizer: Any, loss: Any, epochs: int, batch_size: i model: PyTorch model instance (torch.nn.Module) optimizer: Optimizer instance (torch.optim.Optimizer) loss: Loss instance (torch.nn.Module) - epochs: Number of training epochs (e.g., 50) + epochs: Number of training epochs (e.g., 10) batch_size: Batch size for training (e.g., 32) Returns: @@ -750,6 +750,13 @@ def save_eda_report( if problematic_columns is None: problematic_columns = [] + # Validate task_type against canonical values + _valid_task_types = {t.value for t in TaskType} + if task_type not in _valid_task_types: + logger.warning( + f"Non-canonical task_type '{task_type}' submitted. " f"Valid values: {sorted(_valid_task_types)}" + ) + # Build structured report report = { "task_type": task_type, @@ -1443,6 +1450,13 @@ def register_core_metrics_report( """ from plexe.models import CoreMetricsReport + _valid_task_types = {t.value for t in TaskType} + if task_type not in _valid_task_types: + logger.warning( + f"Non-canonical task_type '{task_type}' in core metrics report. " + f"Valid values: {sorted(_valid_task_types)}" + ) + report = CoreMetricsReport( task_type=task_type, primary_metric_name=primary_metric_name, diff --git a/plexe/utils/dashboard/utils.py b/plexe/utils/dashboard/utils.py index e80ac729..f13d2eab 100644 --- a/plexe/utils/dashboard/utils.py +++ b/plexe/utils/dashboard/utils.py @@ -36,9 +36,11 @@ def load_parquet_sample(uri: str, limit: int = 10) -> pd.DataFrame | None: def get_parquet_row_count(uri: str) -> int | None: - """Get row count from parquet file.""" + """Get row count from parquet metadata without reading data.""" try: - return len(pd.read_parquet(uri)) + from plexe.utils.parquet_dataset import get_parquet_row_count as _count + + return _count(uri) except Exception: return None diff --git a/plexe/utils/parquet_dataset.py b/plexe/utils/parquet_dataset.py new file mode 100644 index 00000000..23d99fdc --- /dev/null +++ b/plexe/utils/parquet_dataset.py @@ -0,0 +1,212 @@ +""" +Streaming parquet data loading utilities for large-dataset training. + +Reads parquet files lazily via PyArrow row groups instead of loading +everything into memory. Supports PyTorch DataLoader and Keras tf.data +integration, including DDP rank sharding and DataLoader worker sharding. +""" + +import logging +import math +import random # noqa: F401 - used in ParquetIterableDataset.__iter__ +from pathlib import Path +from typing import Iterator + +import numpy as np +import pyarrow.parquet as pq + +try: + import torch + import torch.distributed as dist + import torch.utils.data + + _IterableDatasetBase = torch.utils.data.IterableDataset +except ImportError: + torch = None # type: ignore[assignment] + dist = None # type: ignore[assignment] + _IterableDatasetBase = object # type: ignore[assignment,misc] + +logger = logging.getLogger(__name__) + + +def get_parquet_row_count(uri: str) -> int: + """Get total row count from parquet metadata without reading data.""" + total = 0 + for f in _resolve_parquet_files(uri): + pf = pq.ParquetFile(f) + total += pf.metadata.num_rows + return total + + +def get_dataset_size_bytes(uri: str) -> int: + """Get dataset size in bytes for a local file or directory of parquet files.""" + path = Path(uri) + if path.is_file(): + return path.stat().st_size + elif path.is_dir(): + return sum(f.stat().st_size for f in path.rglob("*.parquet")) + return 0 + + +def _resolve_parquet_files(uri: str) -> list[str]: + """Resolve a URI to a list of parquet file paths. + + Handles both single files and directories containing parquet files. + """ + path = Path(uri) + if path.is_file(): + return [str(path)] + elif path.is_dir(): + files = sorted(str(f) for f in path.rglob("*.parquet")) + if not files: + raise FileNotFoundError(f"No parquet files found in {uri}") + return files + else: + raise FileNotFoundError(f"Path does not exist: {uri}") + + +class ParquetIterableDataset(_IterableDatasetBase): + """Streaming parquet dataset for PyTorch DataLoader. + + Reads parquet files row-group by row-group, yielding individual samples. + Supports sharding across DDP ranks and DataLoader workers. + + Inherits from torch.utils.data.IterableDataset so PyTorch DataLoader + knows to use the iterable protocol instead of map-style indexing. + """ + + def __init__( + self, + uri: str, + target_column: str, + task_type: str = "regression", + ): + """ + Args: + uri: Path to parquet file or directory of parquet files + target_column: Name of the target column + task_type: One of "multiclass_classification", "binary_classification", "regression" + (legacy: "classification", "binary" also accepted) + """ + super().__init__() + self._files = _resolve_parquet_files(uri) + self._target_column = target_column + self._task_type = task_type + + # Build index of (file_idx, row_group_idx) pairs + self._row_group_index: list[tuple[int, int]] = [] + self._total_rows = 0 + for file_idx, file_path in enumerate(self._files): + pf = pq.ParquetFile(file_path) + for rg_idx in range(pf.metadata.num_row_groups): + self._row_group_index.append((file_idx, rg_idx)) + self._total_rows += pf.metadata.num_rows + + @property + def total_rows(self) -> int: + return self._total_rows + + def _get_assigned_row_groups(self) -> list[tuple[int, int]]: + """Determine which row groups this worker should process. + + Shards first by DDP rank, then by DataLoader worker. + """ + rgs = list(self._row_group_index) + + # Shard across DDP ranks (balanced so all ranks get equal count) + if dist is not None and dist.is_available() and dist.is_initialized(): + rank = dist.get_rank() + world_size = dist.get_world_size() + if len(rgs) > 0: + per_rank = math.ceil(len(rgs) / world_size) + total = per_rank * world_size + balanced = [rgs[i % len(rgs)] for i in range(total)] + rgs = balanced[rank * per_rank : (rank + 1) * per_rank] + + # Shard across DataLoader workers + worker_info = torch.utils.data.get_worker_info() + if worker_info is not None: + rgs = rgs[worker_info.id :: worker_info.num_workers] + + return rgs + + def __iter__(self) -> Iterator: + assigned_rgs = self._get_assigned_row_groups() + random.shuffle(assigned_rgs) + + # Cache ParquetFile handles to avoid re-reading footers + open_files: dict[int, pq.ParquetFile] = {} + for file_idx, rg_idx in assigned_rgs: + if file_idx not in open_files: + open_files[file_idx] = pq.ParquetFile(self._files[file_idx]) + pf = open_files[file_idx] + table = pf.read_row_group(rg_idx) + df = table.to_pandas() + + feature_cols = [c for c in df.columns if c != self._target_column] + X = df[feature_cols].values.astype(np.float32) + y_raw = df[self._target_column].values + + # Yield individual samples for DataLoader batching + for i in range(len(X)): + x_tensor = torch.from_numpy(X[i].copy()) + if self._task_type in ("classification", "multiclass_classification"): + # CrossEntropyLoss expects scalar long targets -> batch becomes [batch] + y_tensor = torch.tensor(int(y_raw[i]), dtype=torch.long) + else: + # BCEWithLogitsLoss / MSELoss expect [batch, 1] shape + # Yield [1] tensor so DataLoader collates to [batch, 1] + y_tensor = torch.tensor([float(y_raw[i])], dtype=torch.float32) + yield x_tensor, y_tensor + + +def parquet_batch_generator( + uri: str, + target_column: str, + batch_size: int = 1024, + task_type: str | None = None, +) -> Iterator[tuple[np.ndarray, np.ndarray]]: + """Streaming parquet batch generator for Keras/TensorFlow. + + Reads parquet file(s) in batches using PyArrow's iter_batches, + yielding (X_batch, y_batch) numpy arrays suitable for + tf.data.Dataset.from_generator(). + + Args: + uri: Path to parquet file or directory of parquet files + target_column: Name of the target column + batch_size: Number of rows per batch + task_type: Canonical task type used to choose y dtype + + Yields: + (features_array, target_array) tuples of numpy arrays + """ + files = _resolve_parquet_files(uri) + + for file_path in files: + pf = pq.ParquetFile(file_path) + columns = [c for c in pf.schema_arrow.names if c != target_column] + + for batch in pf.iter_batches(batch_size=batch_size, columns=columns + [target_column]): + df = batch.to_pandas() + X = df[columns].values.astype(np.float32) + y_dtype = np.int64 if task_type == "multiclass_classification" else np.float32 + y = df[target_column].values.astype(y_dtype) + yield X, y + + +def get_parquet_feature_count(uri: str, target_column: str) -> int: + """Get number of feature columns (total columns minus target).""" + files = _resolve_parquet_files(uri) + pf = pq.ParquetFile(files[0]) + return len([c for c in pf.schema_arrow.names if c != target_column]) + + +def get_steps_per_epoch(uri: str, batch_size: int) -> int: + """Compute number of steps per epoch for a parquet dataset.""" + total_rows = 0 + files = _resolve_parquet_files(uri) + for f in files: + pf = pq.ParquetFile(f) + total_rows += pf.metadata.num_rows + return math.ceil(total_rows / batch_size) diff --git a/plexe/workflow.py b/plexe/workflow.py index e465619d..17497b86 100644 --- a/plexe/workflow.py +++ b/plexe/workflow.py @@ -1240,7 +1240,10 @@ def _execute_variant( val_uri=val_transformed_uri, timeout=config.training_timeout, target_columns=variant_context.output_targets, + task_type=variant_context.task_analysis.get("task_type", "") if variant_context.task_analysis else "", group_column=variant_context.group_column, + mixed_precision=config.mixed_precision, + dataloader_workers=config.dataloader_workers, **training_kwargs, ) @@ -1650,15 +1653,23 @@ def retrain_on_full_dataset( retrain_kwargs["epochs"] = best_solution.epochs retrain_kwargs["batch_size"] = best_solution.batch_size + # Use longer timeout for neural network full-dataset training + retrain_timeout = ( + config.nn_training_timeout if best_solution.model_type in ("keras", "pytorch") else config.training_timeout + ) + final_artifacts_path = runner.run_training( template=f"train_{best_solution.model_type}", model=best_solution.model, # ← Untrained model object feature_pipeline=fitted_pipeline, train_uri=final_train_transformed, # ← FULL transformed data val_uri=final_val_transformed, - timeout=config.training_timeout, + timeout=retrain_timeout, target_columns=context.output_targets, + task_type=context.task_analysis.get("task_type", "") if context.task_analysis else "", group_column=context.group_column, + mixed_precision=config.mixed_precision, + dataloader_workers=config.dataloader_workers, **retrain_kwargs, ) diff --git a/pyproject.toml b/pyproject.toml index 134c5143..8f84ef1b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "plexe" -version = "1.3.6" +version = "1.4.0" description = "An agentic framework for building ML models from natural language" authors = [ "Marcello De Bernardi ", diff --git a/tests/CODE_INDEX.md b/tests/CODE_INDEX.md index 357e3b32..255f6752 100644 --- a/tests/CODE_INDEX.md +++ b/tests/CODE_INDEX.md @@ -1,9 +1,17 @@ # Code Index: tests -> Generated on 2026-03-02 19:57:53 +> Generated on 2026-03-02 22:03:39 Test suite structure and test case documentation. +## `conftest.py` +Shared test fixtures for plexe tests. + +**Functions:** +- `synthetic_parquet_classification(tmp_path)` - Create a 200-row binary classification parquet with 2 row groups. +- `synthetic_parquet_regression(tmp_path)` - Create a 200-row regression parquet with 2 row groups. + +--- ## `integration/conftest.py` Shared fixtures and helpers for staged integration tests. @@ -67,6 +75,24 @@ Tests for user feedback integration in agents. - `test_hypothesiser_includes_feedback(self, mock_context, mock_config)` - HypothesiserAgent should include feedback in instructions. - `test_agent_without_feedback_works(self, mock_context, mock_config)` - Agents should work normally when no feedback is provided. +--- +## `unit/execution/training/test_local_runner.py` +Tests for LocalProcessRunner GPU detection and command construction. + +**`TestGPUDetection`** - Tests for framework GPU detection helpers. +- `test_no_torch(self)` - Returns 0 when torch is not importable. +- `test_no_cuda(self)` - Returns 0 when CUDA is not available. +- `test_with_cuda(self)` - Returns device count when CUDA is available. +- `test_tf_gpu_detection_no_tf(self)` - Returns 0 when tensorflow is not importable. + +**`TestCommandConstruction`** - Test that the runner builds the right command for different GPU configurations. +- `setup_method(self)` - No description +- `test_pytorch_no_gpu_uses_python(self)` - PyTorch with 0 GPUs should use the current Python launcher, no GPU flags. +- `test_pytorch_single_gpu_no_ddp(self)` - PyTorch with 1 GPU should use current Python (no DDP), but get --mixed-precision. +- `test_pytorch_multi_gpu_uses_distributed_run(self)` - PyTorch with >1 GPU should use torch.distributed.run with --ddp and --mixed-precision. +- `test_pytorch_num_workers_passed(self)` - PyTorch should pass --num-workers when dataloader_workers > 0. +- `test_pytorch_no_mixed_precision_when_disabled(self)` - PyTorch with GPU but mixed_precision=False should not get --mixed-precision. + --- ## `unit/search/test_evolutionary_policy_determinism.py` Determinism tests for EvolutionarySearchPolicy local RNG behavior. @@ -121,6 +147,16 @@ Unit tests for pipeline_runner feature name resolution. - `test_resolve_feature_names_falls_back_on_mismatch()` - Returns generic names when resolved names don't match output count. - `test_resolve_feature_names_falls_back_when_unavailable()` - Returns generic names when no get_feature_names_out is available. +--- +## `unit/templates/training/test_train_pytorch_worker_fallback.py` +Unit tests for PyTorch DataLoader worker fallback behavior. + +**Functions:** +- `test_resolve_num_workers_zero_is_unchanged() -> None` - Requested zero workers should remain zero. +- `test_resolve_num_workers_falls_back_on_darwin_spawn(monkeypatch) -> None` - On macOS spawn, requested workers should fall back to zero. +- `test_resolve_num_workers_uses_context_when_start_method_is_none(monkeypatch) -> None` - When get_start_method returns None, context start method should be used. +- `test_resolve_num_workers_kept_on_non_darwin_spawn(monkeypatch) -> None` - Spawn on non-macOS should keep the requested worker count. + --- ## `unit/test_config.py` Unit tests for config helpers. @@ -180,6 +216,33 @@ Unit tests for PyTorch model submission. **Functions:** - `test_save_model_pytorch(tmp_path)` - Test PyTorch model submission validation and context scratch storage. +--- +## `unit/utils/test_parquet_dataset.py` +Tests for streaming parquet data loading utilities. + +**`TestMetadataUtilities`** - Tests for parquet metadata helper functions. +- `test_get_parquet_row_count(self, synthetic_parquet_classification)` - No description +- `test_get_dataset_size_bytes_file(self, synthetic_parquet_classification)` - No description +- `test_get_dataset_size_bytes_directory(self, tmp_path, synthetic_parquet_classification)` - No description +- `test_get_dataset_size_bytes_nonexistent(self)` - No description +- `test_get_parquet_feature_count(self, synthetic_parquet_classification)` - No description +- `test_get_steps_per_epoch(self, synthetic_parquet_classification)` - No description + +**`TestParquetIterableDataset`** - Tests for streaming iterable dataset behavior. +- `test_yields_all_rows_classification(self, synthetic_parquet_classification)` - No description +- `test_yields_all_rows_regression(self, synthetic_parquet_regression)` - No description +- `test_yields_all_rows_binary(self, synthetic_parquet_classification)` - No description +- `test_directory_of_parquets(self, tmp_path, synthetic_parquet_classification)` - Test loading from a directory containing multiple parquet files. +- `test_total_rows_property(self, synthetic_parquet_classification)` - No description +- `test_ddp_sharding(self, synthetic_parquet_classification)` - Verify DDP sharding splits row groups across ranks. +- `test_feature_values_match_source(self, synthetic_parquet_classification)` - Verify streamed data matches the original parquet content. + +**`TestParquetBatchGenerator`** - Tests for Keras/TensorFlow parquet batch generator. +- `test_yields_all_rows(self, synthetic_parquet_classification)` - No description +- `test_batch_size_respected(self, synthetic_parquet_classification)` - No description +- `test_directory_input(self, tmp_path, synthetic_parquet_classification)` - Test generator with directory of parquet files. +- `test_values_match_source(self, synthetic_parquet_classification)` - Verify batched data matches original parquet content. + --- ## `unit/utils/test_reporting.py` Unit tests for reporting utilities. diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 00000000..fa5476fc --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,66 @@ +"""Shared test fixtures for plexe tests.""" + +import pytest + + +@pytest.fixture +def synthetic_parquet_classification(tmp_path): + """Create a 200-row binary classification parquet with 2 row groups.""" + import numpy as np + import pandas as pd + import pyarrow as pa + import pyarrow.parquet as pq + + np.random.seed(42) + n_rows = 200 + n_features = 5 + + X = np.random.randn(n_rows, n_features).astype(np.float32) + y = (X[:, 0] + X[:, 1] > 0).astype(np.int64) + + df = pd.DataFrame(X, columns=[f"f{i}" for i in range(n_features)]) + df["target"] = y + + path = tmp_path / "classification.parquet" + table = pa.Table.from_pandas(df) + pq.write_table(table, path, row_group_size=100) + + return { + "path": str(path), + "target_column": "target", + "n_rows": n_rows, + "n_features": n_features, + "n_classes": 2, + "task_type": "binary_classification", + } + + +@pytest.fixture +def synthetic_parquet_regression(tmp_path): + """Create a 200-row regression parquet with 2 row groups.""" + import numpy as np + import pandas as pd + import pyarrow as pa + import pyarrow.parquet as pq + + np.random.seed(42) + n_rows = 200 + n_features = 5 + + X = np.random.randn(n_rows, n_features).astype(np.float32) + y = (X[:, 0] * 2 + X[:, 1] + np.random.randn(n_rows) * 0.1).astype(np.float32) + + df = pd.DataFrame(X, columns=[f"f{i}" for i in range(n_features)]) + df["target"] = y + + path = tmp_path / "regression.parquet" + table = pa.Table.from_pandas(df) + pq.write_table(table, path, row_group_size=100) + + return { + "path": str(path), + "target_column": "target", + "n_rows": n_rows, + "n_features": n_features, + "task_type": "regression", + } diff --git a/tests/unit/execution/__init__.py b/tests/unit/execution/__init__.py new file mode 100644 index 00000000..8b137891 --- /dev/null +++ b/tests/unit/execution/__init__.py @@ -0,0 +1 @@ + diff --git a/tests/unit/execution/training/__init__.py b/tests/unit/execution/training/__init__.py new file mode 100644 index 00000000..8b137891 --- /dev/null +++ b/tests/unit/execution/training/__init__.py @@ -0,0 +1 @@ + diff --git a/tests/unit/execution/training/test_local_runner.py b/tests/unit/execution/training/test_local_runner.py new file mode 100644 index 00000000..083a599c --- /dev/null +++ b/tests/unit/execution/training/test_local_runner.py @@ -0,0 +1,163 @@ +"""Tests for LocalProcessRunner GPU detection and command construction.""" + +import builtins +import sys +from unittest.mock import MagicMock, patch + +import pytest + +from plexe.execution.training.local_runner import ( + LocalProcessRunner, + _detect_gpu_count, + _detect_tf_gpu_count, +) + +torch = pytest.importorskip("torch") +nn = torch.nn + + +class TestGPUDetection: + """Tests for framework GPU detection helpers.""" + + def test_no_torch(self): + """Returns 0 when torch is not importable.""" + real_import = builtins.__import__ + + def _mock_import(name, *args, **kwargs): + if name == "torch": + raise ImportError("mocked") + return real_import(name, *args, **kwargs) + + with patch("builtins.__import__", side_effect=_mock_import): + assert _detect_gpu_count() == 0 + + def test_no_cuda(self): + """Returns 0 when CUDA is not available.""" + with patch("torch.cuda.is_available", return_value=False): + assert _detect_gpu_count() == 0 + + def test_with_cuda(self): + """Returns device count when CUDA is available.""" + with patch("torch.cuda.is_available", return_value=True), patch("torch.cuda.device_count", return_value=4): + assert _detect_gpu_count() == 4 + + def test_tf_gpu_detection_no_tf(self): + """Returns 0 when tensorflow is not importable.""" + real_import = builtins.__import__ + + def _mock_import(name, *args, **kwargs): + if name == "tensorflow": + raise ImportError("mocked") + return real_import(name, *args, **kwargs) + + with patch("builtins.__import__", side_effect=_mock_import): + assert _detect_tf_gpu_count() == 0 + + +def _make_pytorch_model(): + """Create a simple PyTorch model for testing.""" + return nn.Linear(10, 1) + + +def _make_pytorch_optimizer(model): + return torch.optim.Adam(model.parameters(), lr=0.001) + + +def _make_pytorch_loss(): + return nn.MSELoss() + + +def _run_and_capture_cmd(runner, template, gpu_count, mixed_precision=True, dataloader_workers=0): + """Run training with mocked subprocess and return the constructed command.""" + model = _make_pytorch_model() + optimizer = _make_pytorch_optimizer(model) + loss = _make_pytorch_loss() + + captured_cmd = None + + def _capture_popen(cmd, **kwargs): + nonlocal captured_cmd + captured_cmd = cmd + mock_proc = MagicMock() + mock_proc.stdout = iter([]) + mock_proc.wait.return_value = 0 + return mock_proc + + gpu_patch = "plexe.execution.training.local_runner._detect_gpu_count" + tf_gpu_patch = "plexe.execution.training.local_runner._detect_tf_gpu_count" + + with ( + patch(gpu_patch, return_value=gpu_count if "pytorch" in template else 0), + patch(tf_gpu_patch, return_value=gpu_count if "keras" in template else 0), + patch("subprocess.Popen", side_effect=_capture_popen), + patch("torch.save"), + ): + try: + runner.run_training( + template=template, + model=model, + feature_pipeline=MagicMock(), + train_uri="/tmp/train.parquet", + val_uri="/tmp/val.parquet", + timeout=300, + target_columns=["target"], + optimizer=optimizer, + loss=loss, + epochs=10, + batch_size=32, + mixed_precision=mixed_precision, + dataloader_workers=dataloader_workers, + ) + except Exception: + pass # We only care about the command + + return captured_cmd + + +class TestCommandConstruction: + """Test that the runner builds the right command for different GPU configurations.""" + + def setup_method(self): + self.runner = LocalProcessRunner(work_dir="/tmp/test_runner") + + def test_pytorch_no_gpu_uses_python(self): + """PyTorch with 0 GPUs should use the current Python launcher, no GPU flags.""" + cmd = _run_and_capture_cmd(self.runner, "train_pytorch", gpu_count=0) + assert cmd is not None + assert cmd[0] == sys.executable + assert "--ddp" not in cmd + assert "--mixed-precision" not in cmd + + def test_pytorch_single_gpu_no_ddp(self): + """PyTorch with 1 GPU should use current Python (no DDP), but get --mixed-precision.""" + cmd = _run_and_capture_cmd(self.runner, "train_pytorch", gpu_count=1) + assert cmd is not None + assert cmd[0] == sys.executable + assert "--ddp" not in cmd + assert "--mixed-precision" in cmd + + def test_pytorch_multi_gpu_uses_distributed_run(self): + """PyTorch with >1 GPU should use torch.distributed.run with --ddp and --mixed-precision.""" + cmd = _run_and_capture_cmd(self.runner, "train_pytorch", gpu_count=4) + assert cmd is not None + assert cmd[0] == sys.executable + assert "-m" in cmd + assert "torch.distributed.run" in cmd + assert "--nproc_per_node=auto" in cmd + assert "--standalone" in cmd + assert "--ddp" in cmd + assert "--mixed-precision" in cmd + + def test_pytorch_num_workers_passed(self): + """PyTorch should pass --num-workers when dataloader_workers > 0.""" + cmd = _run_and_capture_cmd(self.runner, "train_pytorch", gpu_count=1, dataloader_workers=4) + assert cmd is not None + assert "--num-workers" in cmd + idx = cmd.index("--num-workers") + assert cmd[idx + 1] == "4" + + def test_pytorch_no_mixed_precision_when_disabled(self): + """PyTorch with GPU but mixed_precision=False should not get --mixed-precision.""" + cmd = _run_and_capture_cmd(self.runner, "train_pytorch", gpu_count=1, mixed_precision=False) + assert cmd is not None + assert "--mixed-precision" not in cmd diff --git a/tests/unit/templates/training/test_train_pytorch_worker_fallback.py b/tests/unit/templates/training/test_train_pytorch_worker_fallback.py new file mode 100644 index 00000000..3e5b0180 --- /dev/null +++ b/tests/unit/templates/training/test_train_pytorch_worker_fallback.py @@ -0,0 +1,40 @@ +"""Unit tests for PyTorch DataLoader worker fallback behavior.""" + +import pytest + +pytest.importorskip("torch") + +from plexe.templates.training import train_pytorch + + +def test_resolve_num_workers_zero_is_unchanged() -> None: + """Requested zero workers should remain zero.""" + assert train_pytorch._resolve_num_workers(0) == 0 + + +def test_resolve_num_workers_falls_back_on_darwin_spawn(monkeypatch) -> None: + """On macOS spawn, requested workers should fall back to zero.""" + monkeypatch.setattr(train_pytorch.sys, "platform", "darwin") + monkeypatch.setattr(train_pytorch.mp, "get_start_method", lambda allow_none=True: "spawn") + assert train_pytorch._resolve_num_workers(4) == 0 + + +def test_resolve_num_workers_uses_context_when_start_method_is_none(monkeypatch) -> None: + """When get_start_method returns None, context start method should be used.""" + + class _Context: + @staticmethod + def get_start_method() -> str: + return "spawn" + + monkeypatch.setattr(train_pytorch.sys, "platform", "darwin") + monkeypatch.setattr(train_pytorch.mp, "get_start_method", lambda allow_none=True: None) + monkeypatch.setattr(train_pytorch.mp, "get_context", lambda: _Context()) + assert train_pytorch._resolve_num_workers(2) == 0 + + +def test_resolve_num_workers_kept_on_non_darwin_spawn(monkeypatch) -> None: + """Spawn on non-macOS should keep the requested worker count.""" + monkeypatch.setattr(train_pytorch.sys, "platform", "linux") + monkeypatch.setattr(train_pytorch.mp, "get_start_method", lambda allow_none=True: "spawn") + assert train_pytorch._resolve_num_workers(3) == 3 diff --git a/tests/unit/utils/test_parquet_dataset.py b/tests/unit/utils/test_parquet_dataset.py new file mode 100644 index 00000000..60713a3f --- /dev/null +++ b/tests/unit/utils/test_parquet_dataset.py @@ -0,0 +1,246 @@ +"""Tests for streaming parquet data loading utilities.""" + +from unittest.mock import patch + +import numpy as np +import pandas as pd +import pyarrow as pa +import pyarrow.parquet as pq +import pytest + +from plexe.utils.parquet_dataset import ( + ParquetIterableDataset, + get_dataset_size_bytes, + get_parquet_feature_count, + get_parquet_row_count, + get_steps_per_epoch, + parquet_batch_generator, +) + +torch = pytest.importorskip("torch") + + +class TestMetadataUtilities: + """Tests for parquet metadata helper functions.""" + + def test_get_parquet_row_count(self, synthetic_parquet_classification): + count = get_parquet_row_count(synthetic_parquet_classification["path"]) + assert count == synthetic_parquet_classification["n_rows"] + + def test_get_dataset_size_bytes_file(self, synthetic_parquet_classification): + size = get_dataset_size_bytes(synthetic_parquet_classification["path"]) + assert size > 0 + + def test_get_dataset_size_bytes_directory(self, tmp_path, synthetic_parquet_classification): + # Create a directory with multiple parquet files + subdir = tmp_path / "multi" + subdir.mkdir() + df = pd.read_parquet(synthetic_parquet_classification["path"]) + for i in range(3): + pq.write_table(pa.Table.from_pandas(df), subdir / f"part_{i}.parquet") + + size = get_dataset_size_bytes(str(subdir)) + assert size > 0 + + def test_get_dataset_size_bytes_nonexistent(self): + size = get_dataset_size_bytes("/nonexistent/path") + assert size == 0 + + def test_get_parquet_feature_count(self, synthetic_parquet_classification): + count = get_parquet_feature_count( + synthetic_parquet_classification["path"], + synthetic_parquet_classification["target_column"], + ) + assert count == synthetic_parquet_classification["n_features"] + + def test_get_steps_per_epoch(self, synthetic_parquet_classification): + steps = get_steps_per_epoch(synthetic_parquet_classification["path"], batch_size=32) + expected = (synthetic_parquet_classification["n_rows"] + 31) // 32 # ceil division + assert steps == expected + + +class TestParquetIterableDataset: + """Tests for streaming iterable dataset behavior.""" + + def test_yields_all_rows_classification(self, synthetic_parquet_classification): + ds = ParquetIterableDataset( + synthetic_parquet_classification["path"], + target_column="target", + task_type="multiclass_classification", + ) + + rows = list(ds) + assert len(rows) == synthetic_parquet_classification["n_rows"] + + # Check shapes and dtypes + x, y = rows[0] + assert x.shape == (synthetic_parquet_classification["n_features"],) + assert x.dtype == torch.float32 + assert y.dtype == torch.long + assert y.ndim == 0 # scalar + + def test_yields_all_rows_regression(self, synthetic_parquet_regression): + ds = ParquetIterableDataset( + synthetic_parquet_regression["path"], + target_column="target", + task_type="regression", + ) + + rows = list(ds) + assert len(rows) == synthetic_parquet_regression["n_rows"] + + x, y = rows[0] + assert x.dtype == torch.float32 + assert y.dtype == torch.float32 + assert y.shape == (1,) # [1] tensor, collates to [batch, 1] + + def test_yields_all_rows_binary(self, synthetic_parquet_classification): + ds = ParquetIterableDataset( + synthetic_parquet_classification["path"], + target_column="target", + task_type="binary_classification", + ) + + rows = list(ds) + x, y = rows[0] + assert y.dtype == torch.float32 + assert y.shape == (1,) # [1] tensor, collates to [batch, 1] + + def test_directory_of_parquets(self, tmp_path, synthetic_parquet_classification): + """Test loading from a directory containing multiple parquet files.""" + subdir = tmp_path / "parts" + subdir.mkdir() + df = pd.read_parquet(synthetic_parquet_classification["path"]) + + # Split into 2 files + mid = len(df) // 2 + pq.write_table(pa.Table.from_pandas(df.iloc[:mid]), subdir / "part_0.parquet", row_group_size=50) + pq.write_table(pa.Table.from_pandas(df.iloc[mid:]), subdir / "part_1.parquet", row_group_size=50) + + ds = ParquetIterableDataset(str(subdir), target_column="target", task_type="multiclass_classification") + rows = list(ds) + assert len(rows) == len(df) + + def test_total_rows_property(self, synthetic_parquet_classification): + ds = ParquetIterableDataset( + synthetic_parquet_classification["path"], + target_column="target", + task_type="multiclass_classification", + ) + assert ds.total_rows == synthetic_parquet_classification["n_rows"] + + def test_ddp_sharding(self, synthetic_parquet_classification): + """Verify DDP sharding splits row groups across ranks.""" + path = synthetic_parquet_classification["path"] + + all_rows_by_rank = {} + for rank in range(2): + with ( + patch("torch.distributed.is_available", return_value=True), + patch("torch.distributed.is_initialized", return_value=True), + patch("torch.distributed.get_rank", return_value=rank), + patch("torch.distributed.get_world_size", return_value=2), + ): + ds = ParquetIterableDataset(path, target_column="target", task_type="multiclass_classification") + rows = list(ds) + all_rows_by_rank[rank] = rows + + # Each rank gets a subset, together they cover all rows + total = len(all_rows_by_rank[0]) + len(all_rows_by_rank[1]) + assert total == synthetic_parquet_classification["n_rows"] + # Ranks should get different row groups (different counts due to 2 row groups) + assert len(all_rows_by_rank[0]) > 0 + assert len(all_rows_by_rank[1]) > 0 + + def test_feature_values_match_source(self, synthetic_parquet_classification): + """Verify streamed data matches the original parquet content.""" + ds = ParquetIterableDataset( + synthetic_parquet_classification["path"], + target_column="target", + task_type="multiclass_classification", + ) + + df = pd.read_parquet(synthetic_parquet_classification["path"]) + feature_cols = [c for c in df.columns if c != "target"] + + streamed_x = [] + streamed_y = [] + for x, y in ds: + streamed_x.append(x.numpy()) + streamed_y.append(y.numpy()) + + streamed_x = np.array(streamed_x) + streamed_y = np.array(streamed_y) + + # Sort both by features to account for row-group shuffling + expected_x = df[feature_cols].values + expected_y = df["target"].values + streamed_order = np.lexsort(streamed_x.T) + expected_order = np.lexsort(expected_x.T) + + np.testing.assert_allclose(streamed_x[streamed_order], expected_x[expected_order], atol=1e-6) + np.testing.assert_array_equal(streamed_y[streamed_order], expected_y[expected_order]) + + +class TestParquetBatchGenerator: + """Tests for Keras/TensorFlow parquet batch generator.""" + + def test_yields_all_rows(self, synthetic_parquet_classification): + total_rows = 0 + for X_batch, y_batch in parquet_batch_generator( + synthetic_parquet_classification["path"], + target_column="target", + batch_size=64, + ): + assert X_batch.dtype == np.float32 + assert y_batch.dtype == np.float32 + assert X_batch.shape[1] == synthetic_parquet_classification["n_features"] + total_rows += len(X_batch) + + assert total_rows == synthetic_parquet_classification["n_rows"] + + def test_batch_size_respected(self, synthetic_parquet_classification): + batch_size = 50 + batches = list( + parquet_batch_generator( + synthetic_parquet_classification["path"], + target_column="target", + batch_size=batch_size, + ) + ) + + # All batches except possibly the last should be exactly batch_size + for X_batch, _ in batches[:-1]: + assert len(X_batch) == batch_size + + def test_directory_input(self, tmp_path, synthetic_parquet_classification): + """Test generator with directory of parquet files.""" + subdir = tmp_path / "gen_parts" + subdir.mkdir() + df = pd.read_parquet(synthetic_parquet_classification["path"]) + mid = len(df) // 2 + pq.write_table(pa.Table.from_pandas(df.iloc[:mid]), subdir / "p0.parquet") + pq.write_table(pa.Table.from_pandas(df.iloc[mid:]), subdir / "p1.parquet") + + total = sum(len(X) for X, _ in parquet_batch_generator(str(subdir), "target", batch_size=64)) + assert total == len(df) + + def test_values_match_source(self, synthetic_parquet_classification): + """Verify batched data matches original parquet content.""" + df = pd.read_parquet(synthetic_parquet_classification["path"]) + feature_cols = [c for c in df.columns if c != "target"] + + all_x, all_y = [], [] + for X_batch, y_batch in parquet_batch_generator( + synthetic_parquet_classification["path"], + target_column="target", + batch_size=1000, # Single batch to get all data + ): + all_x.append(X_batch) + all_y.append(y_batch) + + all_x = np.concatenate(all_x) + all_y = np.concatenate(all_y) + + np.testing.assert_allclose(all_x, df[feature_cols].values, atol=1e-6) + np.testing.assert_allclose(all_y, df["target"].values.astype(np.float32), atol=1e-6)