diff --git a/Makefile b/Makefile index 4557d82a..59731544 100644 --- a/Makefile +++ b/Makefile @@ -45,6 +45,7 @@ help: @echo "" @echo "📊 Example Datasets:" @echo " make run-titanic Run on Titanic dataset (medium)" + @echo " make run-titanic-proba Run Titanic with probability-focused intent" @echo " make run-house-prices Run on House Prices dataset (regression)" @echo "" @echo "🏗️ Building:" @@ -331,6 +332,32 @@ run-titanic: build --spark-mode local \ --enable-final-evaluation +# Spaceship Titanic dataset with probability-focused objective +.PHONY: run-titanic-proba +run-titanic-proba: build + @echo "📊 Running on Spaceship Titanic dataset (probability-focused)..." + $(eval TIMESTAMP := $(shell date +%Y%m%d_%H%M%S)) + docker run --rm \ + --add-host=host.docker.internal:host-gateway \ + $(CONFIG_MOUNT) \ + $(CONFIG_ENV) \ + -v $(PWD)/examples/datasets:/data:ro \ + -v $(PWD)/workdir:/workdir \ + -e OPENAI_API_KEY=$(OPENAI_API_KEY) \ + -e ANTHROPIC_API_KEY=$(ANTHROPIC_API_KEY) \ + -e SPARK_LOCAL_CORES=4 \ + -e SPARK_DRIVER_MEMORY=4g \ + plexe:py$(PYTHON_VERSION) \ + python -m plexe.main \ + --train-dataset-uri /data/spaceship-titanic/train.parquet \ + --user-id dev_user \ + --intent "predict each passenger's probability of being transported; optimize probability quality and ranking" \ + --experiment-id titanic_proba \ + --max-iterations 5 \ + --work-dir /workdir/titanic_proba/$(TIMESTAMP) \ + --spark-mode local \ + --enable-final-evaluation + # House Prices dataset (regression) .PHONY: run-house-prices run-house-prices: build diff --git a/plexe/CODE_INDEX.md b/plexe/CODE_INDEX.md index 2211af1d..f3ee5380 100644 --- a/plexe/CODE_INDEX.md +++ b/plexe/CODE_INDEX.md @@ -1,6 +1,6 @@ # Code Index: plexe -> Generated on 2026-03-03 00:06:47 +> Generated on 2026-03-03 05:08:33 Code structure and public interface documentation for the **plexe** package. @@ -222,6 +222,8 @@ Helper functions for workflow. **Functions:** - `select_viable_model_types(data_layout: DataLayout, selected_frameworks: list[str] | None) -> list[str]` - Select viable model types using three-tier filtering. +- `metric_requires_probabilities(metric_name: str) -> bool` - Return True when a metric requires probability scores instead of hard labels. +- `normalize_probability_predictions(y_true: np.ndarray, y_pred_proba: Any, metric_name: str) -> np.ndarray` - Normalize probability predictions for sklearn metric compatibility. - `evaluate_on_sample(spark: SparkSession, sample_uri: str, model_artifacts_path: Path, model_type: str, metric: str, target_columns: list[str], group_column: str | None, train_sample_uri: str | None) -> tuple[float, float | None]` - Evaluate model on validation sample, optionally also on training sample. - `compute_metric_hardcoded(y_true, y_pred, metric_name: str) -> float` - Compute metric using hardcoded sklearn implementations. - `compute_metric(y_true, y_pred, metric_name: str, group_ids) -> float` - Compute metric value. @@ -393,7 +395,12 @@ Insight store for accumulating learnings from search. Search journal for tracking model search tree. **`SearchJournal`** - Tracks solution search tree. -- `__init__(self, baseline: Baseline | None)` +- `__init__(self, baseline: Baseline | None, optimization_direction: str)` +- `optimization_direction(self) -> str` - Metric optimization direction, constrained to {'higher', 'lower'}. +- `optimization_direction(self, value: str) -> None` - Validate and set optimization direction. +- `selection_score(self, value: float) -> float` - Normalize a metric value so larger always means better. +- `is_better(self, candidate: float, reference: float | None) -> bool` - Compare two metric values using the configured optimization direction. +- `sort_key(self, node: Solution) -> float` - Direction-aware sort key for solution nodes. - `add_node(self, node: Solution) -> None` - Add a solution to the journal. - `draft_nodes(self) -> list[Solution]` - Get all root nodes (bootstrap solutions without parents). - `buggy_nodes(self) -> list[Solution]` - Get all buggy nodes that could be debugged. @@ -447,6 +454,7 @@ Standard CatBoost predictor - NO Plexe dependencies. **`CatBoostPredictor`** - Standalone CatBoost predictor. - `__init__(self, model_dir: str)` - `predict(self, x: pd.DataFrame) -> pd.DataFrame` - Make predictions on input DataFrame. +- `predict_proba(self, x: pd.DataFrame) -> pd.DataFrame` - Predict per-class probabilities on input DataFrame. --- ## `templates/inference/keras_predictor.py` @@ -464,6 +472,7 @@ Standard LightGBM predictor - NO Plexe dependencies. **`LightGBMPredictor`** - Standalone LightGBM predictor. - `__init__(self, model_dir: str)` - `predict(self, x: pd.DataFrame) -> pd.DataFrame` - Make predictions on input DataFrame. +- `predict_proba(self, x: pd.DataFrame) -> pd.DataFrame` - Predict per-class probabilities on input DataFrame. --- ## `templates/inference/pytorch_predictor.py` @@ -481,6 +490,7 @@ Standard XGBoost predictor - NO Plexe dependencies. **`XGBoostPredictor`** - Standalone XGBoost predictor. - `__init__(self, model_dir: str)` - `predict(self, x: pd.DataFrame) -> pd.DataFrame` - Make predictions on input DataFrame. +- `predict_proba(self, x: pd.DataFrame) -> pd.DataFrame` - Predict per-class probabilities on input DataFrame. --- ## `templates/packaging/model_card_template.py` @@ -539,7 +549,7 @@ Submission tools for agents. - `get_register_statistical_profile_tool(context: BuildContext)` - Factory: Returns statistical profile submission tool. - `get_register_layout_tool(context: BuildContext)` - Factory: Returns layout detection submission tool. - `get_register_eda_report_tool(context: BuildContext)` - Factory: Returns EDA report submission tool. -- `get_save_split_uris_tool(context: BuildContext)` - Factory: Returns split URI submission tool. +- `get_save_split_uris_tool(context: BuildContext, spark: Any | None, expected_ratios: dict[str, float] | None)` - Factory: Returns split URI submission tool. - `get_save_sample_uris_tool(context: BuildContext)` - Factory: Returns sample URIs submission tool. - `get_save_metric_implementation_fn(context: BuildContext)` - Factory: Returns metric implementation submission function. - `get_validate_baseline_predictor_tool(context: BuildContext, val_sample_df)` - Factory: Returns baseline predictor validation tool. @@ -694,6 +704,7 @@ OpenTelemetry tracing decorators for agents and tools. Validation functions for pipelines, models, and other agent outputs. **Functions:** +- `canonicalize_split_ratios(split_ratios: dict[str, float] | None) -> dict[str, float]` - Normalize split ratio key aliases to canonical names. - `validate_sklearn_pipeline(pipeline: Pipeline, sample_df: pd.DataFrame, target_columns: list[str]) -> tuple[bool, str]` - Validate that an sklearn Pipeline is well-formed and functional. - `validate_pipeline_consistency(pipeline: Pipeline, train_sample: pd.DataFrame, val_sample: pd.DataFrame, target_columns: list[str]) -> tuple[bool, str]` - Validate pipeline produces consistent output shape on train/val samples. - `validate_xgboost_params(params: dict[str, Any]) -> tuple[bool, str]` - Validate XGBoost hyperparameters. diff --git a/plexe/agents/baseline_builder.py b/plexe/agents/baseline_builder.py index 580ccfcf..0954b348 100644 --- a/plexe/agents/baseline_builder.py +++ b/plexe/agents/baseline_builder.py @@ -17,6 +17,7 @@ from plexe.config import Config, get_routing_for_model from plexe.constants import DirNames +from plexe.helpers import compute_metric, metric_requires_probabilities, normalize_probability_predictions from plexe.models import BuildContext, Baseline from plexe.utils.tracing import agent_span from plexe.tools.submission import ( @@ -54,6 +55,13 @@ def _build_agent(self, val_sample_df) -> CodeAgent: from plexe.agents.utils import format_user_feedback_for_prompt feedback_section = format_user_feedback_for_prompt(self.context.scratch.get("_user_feedback")) + requires_proba = metric_requires_probabilities(self.context.metric.name) + proba_requirement = ( + "- Because the selected primary metric requires probabilities, your class MUST also implement\n" + " `predict_proba(self, x: pd.DataFrame) -> np.ndarray | pd.DataFrame` that returns per-sample scores.\n" + if requires_proba + else "" + ) # Get routing configuration for this agent's model api_base, headers = get_routing_for_model(self.config.routing_config, self.llm_model) @@ -128,6 +136,7 @@ def _build_agent(self, val_sample_df) -> CodeAgent: "## CRITICAL:\n" "- Use task_analysis['output_targets'] to identify target column(s)\n" "- Predictor must have standard .predict(X) -> array interface\n" + f"{proba_requirement}" ), model=PlexeLiteLLMModel( model_id=self.llm_model, @@ -164,6 +173,11 @@ def run(self) -> Baseline: task_type = self.context.task_analysis.get("task_type", "unknown") target_columns = self.context.output_targets metric_name = self.context.metric.name + proba_note = ( + "Primary metric requires probabilities, so baseline must implement predict_proba(X)." + if metric_requires_probabilities(metric_name) + else "Primary metric uses label/value predictions." + ) task = ( f"Create a simple baseline predictor for this ML task.\n\n" @@ -171,6 +185,7 @@ def run(self) -> Baseline: f"Task Type: {task_type}\n" f"Target Column(s): {target_columns}\n" f"Metric: {metric_name}\n\n" + f"{proba_note}\n\n" f"Build ONE simple baseline (heuristics preferred) that makes sense for this ML task. " f"Register it and evaluate performance using the tools provided." ) @@ -224,8 +239,6 @@ def _evaluate_performance(self, val_sample_df) -> float: Returns: Performance metric value """ - from plexe.helpers import compute_metric - # Separate features from target target_cols = self.context.output_targets feature_cols = [col for col in val_sample_df.columns if col not in target_cols] @@ -233,11 +246,21 @@ def _evaluate_performance(self, val_sample_df) -> float: X_val = val_sample_df[feature_cols] y_val = val_sample_df[target_cols[0]] - # Make predictions - y_pred = self.context.baseline_predictor.predict(X_val) + if metric_requires_probabilities(self.context.metric.name): + if not hasattr(self.context.baseline_predictor, "predict_proba") or not callable( + self.context.baseline_predictor.predict_proba + ): + raise ValueError( + f"Metric '{self.context.metric.name}' requires probabilities but baseline predictor " + "does not implement predict_proba()." + ) + raw_proba = self.context.baseline_predictor.predict_proba(X_val) + y_pred_input = normalize_probability_predictions(y_val.values, raw_proba, self.context.metric.name) + else: + y_pred_input = self.context.baseline_predictor.predict(X_val) # Compute metric - performance = compute_metric(y_true=y_val.values, y_pred=y_pred, metric_name=self.context.metric.name) + performance = compute_metric(y_true=y_val.values, y_pred=y_pred_input, metric_name=self.context.metric.name) logger.info(f"Baseline performance: {self.context.metric.name}={performance:.4f}") diff --git a/plexe/agents/dataset_splitter.py b/plexe/agents/dataset_splitter.py index 18fc551a..5d4837a2 100644 --- a/plexe/agents/dataset_splitter.py +++ b/plexe/agents/dataset_splitter.py @@ -53,10 +53,12 @@ def __init__(self, spark: SparkSession, dataset_uri: str, context: BuildContext, self.config = config self.llm_model = config.dataset_splitting_llm - def _build_agent(self) -> CodeAgent: + def _build_agent(self, split_ratios: dict[str, float]) -> CodeAgent: """Build CodeAgent with splitting tool.""" # Get routing configuration for this agent's model api_base, headers = get_routing_for_model(self.config.routing_config, self.llm_model) + # TODO(splitter-prompts): Make split instructions conditional on requested split mode. + # 2-way modes should not instruct writing test.parquet or passing test_uri. return CodeAgent( name="DatasetSplitter", @@ -142,7 +144,7 @@ def _build_agent(self) -> CodeAgent: extra_headers=headers, ), verbosity_level=self.config.agent_verbosity_level, - tools=[get_save_split_uris_tool(self.context)], + tools=[get_save_split_uris_tool(self.context, self.spark, split_ratios)], add_base_tools=False, additional_authorized_imports=self.config.allowed_base_imports + [ @@ -189,7 +191,7 @@ def run(self, split_ratios: dict[str, float], output_dir: str | Path) -> tuple[s output_dir_str = str(output_dir) # Build agent - agent = self._build_agent() + agent = self._build_agent(split_ratios) # Build task prompt (use string version of output_dir) task = self._build_task_prompt(split_ratios, output_dir_str) @@ -263,6 +265,8 @@ def _build_task_prompt(self, split_ratios: dict[str, float], output_dir: str) -> prompt += ( "\n" + # TODO(splitter-prompts): Make this task prompt explicitly 2-way vs 3-way. + # Current wording always asks for train/val/test outputs, which can induce accidental holdouts. "Based on the task type and data characteristics, choose the appropriate splitting strategy:\n" "- Classification → Stratified split (preserve class balance)\n" "- Forecasting future events/values → Chronological split (train on past, test on future)\n" diff --git a/plexe/agents/model_evaluator.py b/plexe/agents/model_evaluator.py index 735a93d3..d0f15412 100644 --- a/plexe/agents/model_evaluator.py +++ b/plexe/agents/model_evaluator.py @@ -95,8 +95,11 @@ def _build_agent(self, phase_name: str, phase_prompt: str, tools: list) -> CodeA "- primary_metric_name: Name of the primary optimization metric (string)\\n" "- output_targets: list[str] (target column names to exclude from features)\\n\\n" "PREDICTOR INTERFACE:\\n" - "The predictor's predict() function takes a pandas DataFrame (features only, no target)\\n" - "and returns a pandas DataFrame with a 'prediction' column.\\n\\n" + "- predict(X): input is features-only pandas DataFrame (no target columns), returns DataFrame with 'prediction'\\n" + "- predict_proba(X): input is features-only pandas DataFrame; classification only; returns per-class probabilities\\n\\n" + "CRITICAL METRIC RULE:\\n" + "If a metric is probability-based (roc_auc, roc_auc_ovr, roc_auc_ovo, log_loss),\\n" + "you MUST compute it from predict_proba() outputs, not from thresholded labels.\\n\\n" "Example usage:\\n" "```python\\n" "# Prepare features (drop target columns using output_targets)\\n" @@ -106,6 +109,7 @@ def _build_agent(self, phase_name: str, phase_prompt: str, tools: list) -> CodeA "# Generate predictions (returns DataFrame with 'prediction' column)\\n" "predictions_df = predictor.predict(X_test)\\n" "y_pred = predictions_df['prediction'].values\\n" + "# For probability metrics, use predictor.predict_proba(X_test)\\n" "```\\n\\n" "## YOUR MISSION:\\n" f"{phase_prompt}\\n\\n" @@ -329,6 +333,10 @@ def _get_phase_1_prompt(task: str, primary_metric_name: str) -> str: f"3. Compute primary metric + 4-6 additional relevant metrics\\n" f"4. ENCOURAGED: Compute 95% confidence intervals using bootstrap (1000 samples)\\n" f"5. Interpret results - what do these numbers tell us?\\n\\n" + f"If computing probability-based metrics (roc_auc, roc_auc_ovr, roc_auc_ovo, log_loss):\\n" + f"- Use predictor.predict_proba(X_test), never thresholded class labels\\n" + f"- Binary: use positive-class scores\\n" + f"- Multiclass: use full per-class probability matrix\\n\\n" f"Register using:\\n" f"register_core_metrics_report(\\n" f" task_type='...', # your detected type\\n" diff --git a/plexe/helpers.py b/plexe/helpers.py index 2c9c203d..be7398fd 100644 --- a/plexe/helpers.py +++ b/plexe/helpers.py @@ -28,6 +28,13 @@ logger = logging.getLogger(__name__) +PROBABILITY_METRICS = { + StandardMetric.ROC_AUC.value, + StandardMetric.ROC_AUC_OVR.value, + StandardMetric.ROC_AUC_OVO.value, + StandardMetric.LOG_LOSS.value, +} + def select_viable_model_types(data_layout: DataLayout, selected_frameworks: list[str] | None = None) -> list[str]: """ @@ -85,6 +92,52 @@ def select_viable_model_types(data_layout: DataLayout, selected_frameworks: list return viable +def metric_requires_probabilities(metric_name: str) -> bool: + """Return True when a metric requires probability scores instead of hard labels.""" + return metric_name.lower().strip() in PROBABILITY_METRICS + + +def normalize_probability_predictions(y_true: np.ndarray, y_pred_proba: Any, metric_name: str) -> np.ndarray: + """ + Normalize probability predictions for sklearn metric compatibility. + + - Binary metrics use positive-class scores (1D array). + - Multiclass metrics use full per-class probability matrix (2D array). + """ + probabilities = y_pred_proba.values if hasattr(y_pred_proba, "values") else np.asarray(y_pred_proba) + metric = metric_name.lower().strip() + n_classes = len(np.unique(y_true)) + + if probabilities.ndim == 1: + if n_classes > 2: + raise ValueError(f"Metric '{metric_name}' requires per-class probabilities for multiclass tasks.") + return probabilities + + if probabilities.ndim != 2: + raise ValueError(f"Expected probability outputs to be 1D or 2D, got shape {probabilities.shape}") + + original_n_cols = probabilities.shape[1] + if probabilities.shape[1] == 1: + probabilities = np.column_stack([1 - probabilities[:, 0], probabilities[:, 0]]) + + is_multiclass = probabilities.shape[1] > 2 or n_classes > 2 + if not is_multiclass: + return probabilities[:, 1] + + if probabilities.shape[1] != n_classes and metric in PROBABILITY_METRICS: + reported_n_cols = ( + original_n_cols if original_n_cols == 1 and probabilities.shape[1] == 2 else probabilities.shape[1] + ) + column_label = "column" if reported_n_cols == 1 else "columns" + raise ValueError( + f"Probability outputs have {reported_n_cols} {column_label} but validation labels contain {n_classes} " + f"distinct classes for metric '{metric_name}'. For multiclass tasks, predictor.predict_proba " + f"must return one probability column per class." + ) + + return probabilities + + def evaluate_on_sample( spark: SparkSession, sample_uri: str, @@ -170,8 +223,18 @@ def _evaluate_predictor( X = df.drop(columns=columns_to_drop) y = df[target_columns[0]] - predictions = predictor.predict(X)["prediction"].values - return compute_metric(y, predictions, metric, group_ids=group_ids) + if metric_requires_probabilities(metric): + if not hasattr(predictor, "predict_proba") or not callable(predictor.predict_proba): + raise ValueError( + f"Metric '{metric}' requires probability scores, but predictor {type(predictor).__name__} " + "does not implement predict_proba()." + ) + raw_probabilities = predictor.predict_proba(X) + predictions = normalize_probability_predictions(y.values, raw_probabilities, metric) + else: + predictions = predictor.predict(X)["prediction"].values + + return compute_metric(y.values, predictions, metric, group_ids=group_ids) def compute_metric_hardcoded(y_true, y_pred, metric_name: str) -> float: diff --git a/plexe/search/evolutionary_search_policy.py b/plexe/search/evolutionary_search_policy.py index 9a09d173..4fb13d5d 100644 --- a/plexe/search/evolutionary_search_policy.py +++ b/plexe/search/evolutionary_search_policy.py @@ -100,9 +100,9 @@ def _calculate_recent_progress(self, journal: SearchJournal, window: int = 5) -> return -0.3 # Slight negative - recent period mostly buggy but we had good solutions before # Calculate trend slope using linear regression - performances = [n.performance for n in good_recent] - x = np.arange(len(performances)) - slope = np.polyfit(x, performances, 1)[0] if len(performances) > 1 else 0.0 + scores = [journal.selection_score(n.performance) for n in good_recent] + x = np.arange(len(scores)) + slope = np.polyfit(x, scores, 1)[0] if len(scores) > 1 else 0.0 # Normalize slope to [-1, 1] range return float(np.clip(slope * 10, -1.0, 1.0)) # Scale for typical performance ranges @@ -112,7 +112,7 @@ def _calculate_stagnation(self, journal: SearchJournal, window: int = 3) -> floa if not journal.good_nodes: return 0.0 # No stagnation if no good solutions yet - best_performance = journal.best_performance + best_score = journal.selection_score(journal.best_performance) recent_nodes = journal.nodes[-window:] if len(journal.nodes) >= window else journal.nodes # Safety check: ensure we have nodes to analyze @@ -125,7 +125,7 @@ def _calculate_stagnation(self, journal: SearchJournal, window: int = 3) -> floa for node in reversed(recent_nodes): if not node.is_buggy and node.performance is not None: - if node.performance >= best_performance - threshold: + if journal.selection_score(node.performance) >= best_score - threshold: improvements += 1 # High stagnation = few improvements in recent window @@ -203,7 +203,7 @@ def _exploit_action(self, journal: SearchJournal, iteration: int, max_iterations temp = max(0.2, (1 - progress) ** 1.5) # Focus on top-k performers - sorted_nodes = sorted(good_nodes, key=lambda n: n.performance, reverse=True) + sorted_nodes = sorted(good_nodes, key=journal.sort_key, reverse=True) top_k = sorted_nodes[:k] if len(top_k) == 1 or temp < 0.25: @@ -211,9 +211,9 @@ def _exploit_action(self, journal: SearchJournal, iteration: int, max_iterations logger.info(f"Action: EXPLOIT (greedy) - solution {selected.solution_id} (perf={selected.performance:.4f})") else: # Softmax selection among top-k - perfs = np.array([n.performance for n in top_k]) + scores = np.array([journal.selection_score(n.performance) for n in top_k]) # Numerical stability: subtract max before exp - exp_probs = np.exp((perfs / temp) - np.max(perfs / temp)) + exp_probs = np.exp((scores / temp) - np.max(scores / temp)) probs = exp_probs / np.sum(exp_probs) selected = self._np_rng.choice(top_k, p=probs) logger.info( @@ -253,7 +253,7 @@ def _mutate_action(self, journal: SearchJournal) -> Solution | None: return self._explore_action(journal) # Prefer solutions with medium performance for mutation (not best, not worst) - sorted_nodes = sorted(good_nodes, key=lambda n: n.performance, reverse=True) + sorted_nodes = sorted(good_nodes, key=journal.sort_key, reverse=True) mid_range_start = max(0, len(sorted_nodes) // 4) mid_range_end = min(len(sorted_nodes), 3 * len(sorted_nodes) // 4) mid_range = sorted_nodes[mid_range_start:mid_range_end] if mid_range_end > mid_range_start else sorted_nodes @@ -273,13 +273,18 @@ def should_stop(self, journal: SearchJournal, iteration: int, max_iterations: in # Early stopping logic (only after halfway point to allow for exploration) if iteration > max_iterations * 0.4: stagnation = self._calculate_stagnation(journal, window=5) + baseline = journal.baseline_performance + best = journal.best_performance + + if journal.optimization_direction == "higher": + has_good_performance = best > baseline * 1.1 if baseline > 0 else best > 0 + has_exceptional_performance = best > baseline * 1.5 if baseline > 0 else False + else: + has_good_performance = best < baseline * 0.9 if baseline > 0 else best >= 0 + has_exceptional_performance = best < baseline * 0.5 if baseline > 0 else False # Stop if highly stagnant AND we have a good solution (>10% improvement over baseline) - if ( - stagnation > 0.8 - and journal.best_performance > journal.baseline_performance * 1.1 - and len(journal.good_nodes) >= 2 - ): + if stagnation > 0.8 and has_good_performance and len(journal.good_nodes) >= 2: logger.info( f"Early stopping: High stagnation ({stagnation:.3f}) with good performance " @@ -288,7 +293,7 @@ def should_stop(self, journal: SearchJournal, iteration: int, max_iterations: in return True # Stop if we have exceptional performance (>50% improvement) and some stagnation - if stagnation > 0.6 and journal.best_performance > journal.baseline_performance * 1.5: + if stagnation > 0.6 and has_exceptional_performance: logger.info( f"Early stopping: Exceptional performance ({journal.best_performance:.4f}) " diff --git a/plexe/search/journal.py b/plexe/search/journal.py index 1cbf2534..d374301c 100644 --- a/plexe/search/journal.py +++ b/plexe/search/journal.py @@ -32,20 +32,56 @@ class SearchJournal: - Improve nodes (enhancements) """ - def __init__(self, baseline: Baseline | None = None): + def __init__(self, baseline: Baseline | None = None, optimization_direction: str = "higher"): """ Initialize journal. Args: baseline: Baseline model for comparison + optimization_direction: Metric optimization direction ("higher" or "lower") """ self.baseline = baseline self.baseline_performance = baseline.performance if baseline else 0.0 + self.optimization_direction = optimization_direction self.nodes: list[Solution] = [] self.successful_attempts = 0 self.failed_attempts = 0 + @property + def optimization_direction(self) -> str: + """Metric optimization direction, constrained to {'higher', 'lower'}.""" + return self._optimization_direction + + @optimization_direction.setter + def optimization_direction(self, value: str) -> None: + """Validate and set optimization direction.""" + if value not in {"higher", "lower"}: + raise ValueError(f"optimization_direction must be 'higher' or 'lower', got: {value}") + self._optimization_direction = value + + def selection_score(self, value: float) -> float: + """Normalize a metric value so larger always means better.""" + return value if self.optimization_direction == "higher" else -value + + def is_better(self, candidate: float, reference: float | None) -> bool: + """ + Compare two metric values using the configured optimization direction. + + Args: + candidate: Candidate performance value + reference: Reference performance value (or None) + """ + if reference is None: + return True + return self.selection_score(candidate) > self.selection_score(reference) + + def sort_key(self, node: Solution) -> float: + """Direction-aware sort key for solution nodes.""" + if node.performance is None: + return float("-inf") + return self.selection_score(node.performance) + # ============================================ # Adding Nodes # ============================================ @@ -97,7 +133,12 @@ def best_node(self) -> Solution | None: good = self.good_nodes if not good: return None - return max(good, key=lambda n: n.performance) + + best = good[0] + for candidate in good[1:]: + if self.is_better(candidate.performance, best.performance): + best = candidate + return best @property def best_performance(self) -> float: @@ -186,11 +227,9 @@ def summarize(self) -> str: best = self.best_node if best: - improvement = ( - (best.performance - self.baseline_performance) / self.baseline_performance * 100 - if self.baseline_performance > 0 - else 0 - ) + score_delta = self.selection_score(best.performance) - self.selection_score(self.baseline_performance) + baseline_scale = abs(self.baseline_performance) + improvement = (score_delta / baseline_scale * 100) if baseline_scale > 0 else 0 summary += ( f" Best: {best.performance:.4f} ({improvement:+.1f}% vs baseline) [solution {best.solution_id}]\n" ) @@ -216,10 +255,10 @@ def get_improvement_trend(self, window: int = 5) -> float: # Look at last N successful attempts recent = successful[-window:] if len(successful) > window else successful - performances = [n.performance for n in recent] + scores = [self.selection_score(n.performance) for n in recent] # Calculate average delta - deltas = [performances[i + 1] - performances[i] for i in range(len(performances) - 1)] + deltas = [scores[i + 1] - scores[i] for i in range(len(scores) - 1)] return sum(deltas) / len(deltas) if deltas else 0.0 @@ -258,7 +297,7 @@ def get_successful_improvements(self, limit: int = 5) -> list[Solution]: child_nodes = [n for n in self.nodes if n.parent is not None and not n.is_buggy and n.performance is not None] # Sort by performance - child_nodes.sort(key=lambda n: n.performance, reverse=True) + child_nodes.sort(key=self.sort_key, reverse=True) return child_nodes[:limit] @@ -271,6 +310,7 @@ def to_dict(self) -> dict: return { "baseline": self.baseline.to_dict() if self.baseline else None, "baseline_performance": self.baseline_performance, + "optimization_direction": self.optimization_direction, "nodes": [node.to_dict() for node in self.nodes], "successful_attempts": self.successful_attempts, "failed_attempts": self.failed_attempts, @@ -289,7 +329,10 @@ def from_dict(d: dict) -> "SearchJournal": baseline = Baseline.from_dict(d["baseline"]) if d.get("baseline") else None # Create journal - journal = SearchJournal(baseline=baseline) + journal = SearchJournal( + baseline=baseline, + optimization_direction=d.get("optimization_direction", "higher"), + ) journal.baseline_performance = d.get("baseline_performance", 0.0) journal.successful_attempts = d.get("successful_attempts", 0) journal.failed_attempts = d.get("failed_attempts", 0) diff --git a/plexe/search/tree_policy.py b/plexe/search/tree_policy.py index 988c674d..49860ead 100644 --- a/plexe/search/tree_policy.py +++ b/plexe/search/tree_policy.py @@ -64,7 +64,7 @@ def decide_next_solution( k = max(1, round(self.num_drafts * (1 - progress))) temp = max(0.3, (1 - progress) ** 2) - sorted_nodes = sorted(journal.good_nodes, key=lambda n: n.performance, reverse=True) + sorted_nodes = sorted(journal.good_nodes, key=journal.sort_key, reverse=True) top_k = sorted_nodes[:k] # Greedy if k=1 or low temperature @@ -73,8 +73,8 @@ def decide_next_solution( return top_k[0] # Softmax sampling - perfs = np.array([n.performance for n in top_k]) - probs = np.exp((perfs / temp) - np.max(perfs / temp)) + scores = np.array([journal.selection_score(n.performance) for n in top_k]) + probs = np.exp((scores / temp) - np.max(scores / temp)) probs /= probs.sum() selected = self._np_rng.choice(top_k, p=probs) diff --git a/plexe/templates/inference/catboost_predictor.py b/plexe/templates/inference/catboost_predictor.py index 51797d5d..addd56af 100644 --- a/plexe/templates/inference/catboost_predictor.py +++ b/plexe/templates/inference/catboost_predictor.py @@ -6,8 +6,10 @@ """ from pathlib import Path +import json import joblib +import numpy as np import pandas as pd from catboost import CatBoostClassifier, CatBoostRegressor @@ -29,6 +31,14 @@ def __init__(self, model_dir: str): model_dir = Path(model_dir) artifacts_dir = model_dir / "artifacts" + metadata_path = artifacts_dir / "metadata.json" + if metadata_path.exists(): + with open(metadata_path) as f: + metadata = json.load(f) + self._task_type = metadata.get("task_type", "") + else: + self._task_type = "" + # Execute pipeline code (defines custom FunctionTransformer functions) code_path = model_dir / "src" / "pipeline.py" if code_path.exists(): @@ -71,6 +81,29 @@ def predict(self, x: pd.DataFrame) -> pd.DataFrame: return pd.DataFrame({"prediction": predictions}) + def predict_proba(self, x: pd.DataFrame) -> pd.DataFrame: + """ + Predict per-class probabilities on input DataFrame. + + Returns: + DataFrame with probability columns named proba_0..proba_n. + """ + if self._task_type and self._task_type not in {"binary_classification", "multiclass_classification"}: + raise ValueError( + f"predict_proba() is only valid for classification tasks, got task_type='{self._task_type or 'unknown'}'" + ) + if not isinstance(self.model, CatBoostClassifier): + raise ValueError(f"Model type {type(self.model).__name__} does not support predict_proba().") + + probabilities = np.asarray(self.model.predict_proba(self.pipeline.transform(x))) + if probabilities.ndim == 1: + probabilities = probabilities.reshape(-1, 1) + if probabilities.shape[1] == 1: + probabilities = np.column_stack([1 - probabilities[:, 0], probabilities[:, 0]]) + + columns = [f"proba_{i}" for i in range(probabilities.shape[1])] + return pd.DataFrame(probabilities, columns=columns) + # ============================================ # Example Usage diff --git a/plexe/templates/inference/keras_predictor.py b/plexe/templates/inference/keras_predictor.py index e3e4d904..7e3274f4 100644 --- a/plexe/templates/inference/keras_predictor.py +++ b/plexe/templates/inference/keras_predictor.py @@ -42,8 +42,12 @@ def __init__(self, model_dir: str): with open(metadata_path) as f: metadata = json.load(f) raw_task_type = metadata.get("task_type", "") + self._loss_class = metadata.get("loss_class", "") + self._loss_config = metadata.get("loss_config", {}) or {} else: raw_task_type = "" + self._loss_class = "" + self._loss_config = {} self._task_type = raw_task_type @@ -60,6 +64,75 @@ def __init__(self, model_dir: str): with open(artifacts_dir / "pipeline.pkl", "rb") as f: self.pipeline = cloudpickle.load(f) + def _uses_logits_output(self, task_type: str | None = None) -> bool: + """Return True when model outputs are logits based on training loss metadata.""" + effective_task_type = task_type or self._task_type + from_logits = self._loss_config.get("from_logits") + if effective_task_type == "binary_classification" and self._loss_class == "BinaryCrossentropy": + return bool(from_logits) + if effective_task_type == "multiclass_classification" and self._loss_class in { + "SparseCategoricalCrossentropy", + "CategoricalCrossentropy", + }: + return bool(from_logits) + return False + + def _probabilities_from_raw(self, raw_predictions): + """Convert raw model outputs into probability arrays.""" + import numpy as np + + probabilities = np.asarray(raw_predictions) + if probabilities.ndim == 1: + probabilities = probabilities.reshape(-1, 1) + if not np.isfinite(probabilities).all(): + raise ValueError("Keras model outputs contain NaN/Inf values; cannot compute probabilities.") + task_type = self._task_type + if not task_type: + task_type = "binary_classification" if probabilities.shape[1] <= 2 else "multiclass_classification" + uses_logits = self._uses_logits_output(task_type) + + # Legacy model metadata may omit loss_config.from_logits. + # If outputs are clearly outside probability bounds, treat them as logits. + if ( + not uses_logits + and not self._loss_config + and task_type + in { + "binary_classification", + "multiclass_classification", + } + ): + if probabilities.min() < 0.0 or probabilities.max() > 1.0: + uses_logits = True + + if task_type == "binary_classification": + if probabilities.shape[1] == 1: + positive = probabilities[:, 0] + if uses_logits: + positive = 1.0 / (1.0 + np.exp(-positive)) + probabilities = np.column_stack([1 - positive, positive]) + return probabilities + + if probabilities.shape[1] == 2: + if uses_logits: + shifted = probabilities - np.max(probabilities, axis=1, keepdims=True) + exp_values = np.exp(shifted) + probabilities = exp_values / np.sum(exp_values, axis=1, keepdims=True) + return probabilities + + raise ValueError(f"Binary classification expects 1 or 2 outputs, got shape {probabilities.shape}") + + if task_type == "multiclass_classification": + if uses_logits: + shifted = probabilities - np.max(probabilities, axis=1, keepdims=True) + exp_values = np.exp(shifted) + probabilities = exp_values / np.sum(exp_values, axis=1, keepdims=True) + return probabilities + + raise ValueError( + f"predict_proba() is only valid for classification tasks, got task_type='{self._task_type or 'unknown'}'" + ) + def predict(self, x: pd.DataFrame) -> pd.DataFrame: """ Make predictions on input DataFrame. @@ -80,11 +153,11 @@ def predict(self, x: pd.DataFrame) -> pd.DataFrame: # Post-process based on task_type from metadata if self._task_type == "binary_classification": - # Keras outputs probabilities — threshold at 0.5 - predictions = raw_predictions.squeeze() - predictions = (predictions > 0.5).astype(int) + probabilities = self._probabilities_from_raw(raw_predictions) + predictions = (probabilities[:, 1] > 0.5).astype(int) elif self._task_type == "multiclass_classification": - predictions = np.argmax(raw_predictions, axis=1) + probabilities = self._probabilities_from_raw(raw_predictions) + predictions = np.argmax(probabilities, axis=1) else: # Regression or unknown: return raw values predictions = raw_predictions.squeeze() @@ -95,19 +168,12 @@ def predict_proba(self, x: pd.DataFrame) -> pd.DataFrame: """ Predict per-class probabilities on input DataFrame. - Returns raw model outputs (sigmoid/softmax values) without argmax. + Returns per-class probabilities with columns proba_0..proba_n. """ - import numpy as np - x_transformed = self.pipeline.transform(x) raw_predictions = self.model.predict(x_transformed, verbose=0) - probabilities = np.asarray(raw_predictions) - if probabilities.ndim == 1: - probabilities = probabilities.reshape(-1, 1) - - if probabilities.shape[1] == 1: - probabilities = np.column_stack([1 - probabilities[:, 0], probabilities[:, 0]]) + probabilities = self._probabilities_from_raw(raw_predictions) columns = [f"proba_{i}" for i in range(probabilities.shape[1])] return pd.DataFrame(probabilities, columns=columns) diff --git a/plexe/templates/inference/lightgbm_predictor.py b/plexe/templates/inference/lightgbm_predictor.py index fb503c82..3e787913 100644 --- a/plexe/templates/inference/lightgbm_predictor.py +++ b/plexe/templates/inference/lightgbm_predictor.py @@ -6,8 +6,10 @@ """ from pathlib import Path +import json import joblib +import numpy as np import pandas as pd @@ -28,6 +30,14 @@ def __init__(self, model_dir: str): model_dir = Path(model_dir) artifacts_dir = model_dir / "artifacts" + metadata_path = artifacts_dir / "metadata.json" + if metadata_path.exists(): + with open(metadata_path) as f: + metadata = json.load(f) + self._task_type = metadata.get("task_type", "") + else: + self._task_type = "" + # Execute pipeline code (defines custom FunctionTransformer functions) code_path = model_dir / "src" / "pipeline.py" if code_path.exists(): @@ -61,6 +71,29 @@ def predict(self, x: pd.DataFrame) -> pd.DataFrame: return pd.DataFrame({"prediction": predictions}) + def predict_proba(self, x: pd.DataFrame) -> pd.DataFrame: + """ + Predict per-class probabilities on input DataFrame. + + Returns: + DataFrame with probability columns named proba_0..proba_n. + """ + if self._task_type and self._task_type not in {"binary_classification", "multiclass_classification"}: + raise ValueError( + f"predict_proba() is only valid for classification tasks, got task_type='{self._task_type or 'unknown'}'" + ) + if not hasattr(self.model, "predict_proba"): + raise ValueError(f"Model type {type(self.model).__name__} does not support predict_proba().") + + probabilities = np.asarray(self.model.predict_proba(self.pipeline.transform(x))) + if probabilities.ndim == 1: + probabilities = probabilities.reshape(-1, 1) + if probabilities.shape[1] == 1: + probabilities = np.column_stack([1 - probabilities[:, 0], probabilities[:, 0]]) + + columns = [f"proba_{i}" for i in range(probabilities.shape[1])] + return pd.DataFrame(probabilities, columns=columns) + # ============================================ # Example Usage diff --git a/plexe/templates/inference/pytorch_predictor.py b/plexe/templates/inference/pytorch_predictor.py index 0faad4cd..71ce858b 100644 --- a/plexe/templates/inference/pytorch_predictor.py +++ b/plexe/templates/inference/pytorch_predictor.py @@ -109,6 +109,11 @@ def predict_proba(self, x: pd.DataFrame) -> pd.DataFrame: Applies sigmoid for single-logit binary models, otherwise softmax. """ + if self._task_type and self._task_type not in {"binary_classification", "multiclass_classification"}: + raise ValueError( + f"predict_proba() is only valid for classification tasks, got task_type='{self._task_type or 'unknown'}'" + ) + # Transform features through pipeline x_transformed = self.pipeline.transform(x) diff --git a/plexe/templates/inference/xgboost_predictor.py b/plexe/templates/inference/xgboost_predictor.py index 02dd7880..029d707b 100644 --- a/plexe/templates/inference/xgboost_predictor.py +++ b/plexe/templates/inference/xgboost_predictor.py @@ -6,8 +6,10 @@ """ from pathlib import Path +import json import joblib +import numpy as np import pandas as pd @@ -28,6 +30,14 @@ def __init__(self, model_dir: str): model_dir = Path(model_dir) artifacts_dir = model_dir / "artifacts" + metadata_path = artifacts_dir / "metadata.json" + if metadata_path.exists(): + with open(metadata_path) as f: + metadata = json.load(f) + self._task_type = metadata.get("task_type", "") + else: + self._task_type = "" + # Execute pipeline code (defines custom FunctionTransformer functions) code_path = model_dir / "src" / "pipeline.py" if code_path.exists(): @@ -61,6 +71,29 @@ def predict(self, x: pd.DataFrame) -> pd.DataFrame: return pd.DataFrame({"prediction": predictions}) + def predict_proba(self, x: pd.DataFrame) -> pd.DataFrame: + """ + Predict per-class probabilities on input DataFrame. + + Returns: + DataFrame with probability columns named proba_0..proba_n. + """ + if self._task_type and self._task_type not in {"binary_classification", "multiclass_classification"}: + raise ValueError( + f"predict_proba() is only valid for classification tasks, got task_type='{self._task_type or 'unknown'}'" + ) + if not hasattr(self.model, "predict_proba"): + raise ValueError(f"Model type {type(self.model).__name__} does not support predict_proba().") + + probabilities = np.asarray(self.model.predict_proba(self.pipeline.transform(x))) + if probabilities.ndim == 1: + probabilities = probabilities.reshape(-1, 1) + if probabilities.shape[1] == 1: + probabilities = np.column_stack([1 - probabilities[:, 0], probabilities[:, 0]]) + + columns = [f"proba_{i}" for i in range(probabilities.shape[1])] + return pd.DataFrame(probabilities, columns=columns) + # ============================================ # Example Usage diff --git a/plexe/tools/submission.py b/plexe/tools/submission.py index 36c41500..81e0d8ce 100644 --- a/plexe/tools/submission.py +++ b/plexe/tools/submission.py @@ -17,7 +17,12 @@ from plexe.models import BuildContext, Metric, Hypothesis, TaskType, UnifiedPlan from plexe.search.insight_store import InsightStore from plexe.utils.tracing import tool_span -from plexe.validation.validators import validate_sklearn_pipeline, validate_pipeline_consistency +from plexe.validation.validators import ( + canonicalize_split_ratios, + validate_dataset_splits, + validate_pipeline_consistency, + validate_sklearn_pipeline, +) logger = logging.getLogger(__name__) @@ -787,16 +792,22 @@ def save_eda_report( return save_eda_report -def get_save_split_uris_tool(context: BuildContext): +def get_save_split_uris_tool( + context: BuildContext, spark: Any | None = None, expected_ratios: dict[str, float] | None = None +): """ Factory: Returns split URI submission tool. Args: context: Build context for storing result + spark: Optional SparkSession for immediate split validation + expected_ratios: Optional expected split ratios (train/val/test) Returns: Configured tool """ + normalized_expected = canonicalize_split_ratios(expected_ratios) + expects_test_split = normalized_expected.get("test", 0.0) > 0 @tool @tool_span @@ -820,6 +831,19 @@ def save_split_uris(train_uri: str, val_uri: str, test_uri: str = None) -> str: Returns: Confirmation message """ + if expects_test_split and not test_uri: + raise ValueError("A non-empty test split is required for this run. Provide test_uri.") + + if spark is not None and normalized_expected: + is_valid, error_msg = validate_dataset_splits( + spark=spark, + train_uri=train_uri, + val_uri=val_uri, + test_uri=test_uri, + expected_ratios=normalized_expected, + ) + if not is_valid: + raise ValueError(f"Split validation failed: {error_msg}") # Save URIs to context scratch context.scratch["_train_uri"] = train_uri @@ -960,7 +984,11 @@ def validate_baseline_predictor(predictor: Any, name: str, description: str) -> ValueError: If validation or metric computation fails """ import numpy as np - from plexe.helpers import compute_metric + from plexe.helpers import ( + compute_metric, + metric_requires_probabilities, + normalize_probability_predictions, + ) # Check class name matches template if type(predictor).__name__ != "HeuristicBaselinePredictor": @@ -974,18 +1002,37 @@ def validate_baseline_predictor(predictor: Any, name: str, description: str) -> logger.error(error_msg) raise ValueError(error_msg) + requires_proba = metric_requires_probabilities(context.metric.name) + if requires_proba and (not hasattr(predictor, "predict_proba") or not callable(predictor.predict_proba)): + error_msg = ( + f"Primary metric '{context.metric.name}' requires probability scores. " + "Baseline predictor must implement callable .predict_proba()" + ) + logger.error(error_msg) + raise ValueError(error_msg) + # Test on small validation sample try: X_test = val_sample_df.drop(columns=context.output_targets, errors="ignore").head(10) - predictions = predictor.predict(X_test) - - if not isinstance(predictions, list | tuple | np.ndarray | pd.Series): - error_msg = f"predict() must return array-like, got {type(predictions)}" + predictions = predictor.predict_proba(X_test) if requires_proba else predictor.predict(X_test) + + expected_types = ( + (list, tuple, np.ndarray, pd.Series, pd.DataFrame) + if requires_proba + else ( + list, + tuple, + np.ndarray, + pd.Series, + ) + ) + if not isinstance(predictions, expected_types): + error_msg = f"{'predict_proba' if requires_proba else 'predict'}() must return array-like, got {type(predictions)}" logger.error(error_msg) raise ValueError(error_msg) if len(predictions) != len(X_test): - error_msg = f"predict() returned {len(predictions)} predictions for {len(X_test)} samples" + error_msg = f"{'predict_proba' if requires_proba else 'predict'}() returned {len(predictions)} predictions for {len(X_test)} samples" logger.error(error_msg) raise ValueError(error_msg) @@ -998,10 +1045,14 @@ def validate_baseline_predictor(predictor: Any, name: str, description: str) -> try: X_val = val_sample_df.drop(columns=context.output_targets, errors="ignore") y_val = val_sample_df[context.output_targets[0]] - y_pred = predictor.predict(X_val) + if requires_proba: + raw_proba = predictor.predict_proba(X_val) + y_pred_input = normalize_probability_predictions(y_val.values, raw_proba, context.metric.name) + else: + y_pred_input = predictor.predict(X_val) # This is where squared= errors would happen - agent can now see them! - performance = compute_metric(y_true=y_val.values, y_pred=y_pred, metric_name=context.metric.name) + performance = compute_metric(y_true=y_val.values, y_pred=y_pred_input, metric_name=context.metric.name) logger.info(f"Baseline performance: {context.metric.name}={performance:.4f}") @@ -1120,7 +1171,11 @@ def evaluate_baseline_performance() -> str: Returns: String with performance metric value """ - from plexe.helpers import compute_metric + from plexe.helpers import ( + compute_metric, + metric_requires_probabilities, + normalize_probability_predictions, + ) # Check prerequisites if context.baseline_predictor is None: @@ -1147,12 +1202,23 @@ def evaluate_baseline_performance() -> str: else None ) - # Make predictions (standard array interface) - y_pred = context.baseline_predictor.predict(X_val) + requires_proba = metric_requires_probabilities(context.metric.name) + if requires_proba: + if not hasattr(context.baseline_predictor, "predict_proba") or not callable( + context.baseline_predictor.predict_proba + ): + raise ValueError( + f"Metric '{context.metric.name}' requires probability scores but baseline predictor " + "does not implement predict_proba()." + ) + raw_proba = context.baseline_predictor.predict_proba(X_val) + y_pred_input = normalize_probability_predictions(y_val.values, raw_proba, context.metric.name) + else: + y_pred_input = context.baseline_predictor.predict(X_val) # Compute metric (pass group_ids for ranking metrics) performance = compute_metric( - y_true=y_val.values, y_pred=y_pred, metric_name=context.metric.name, group_ids=group_ids + y_true=y_val.values, y_pred=y_pred_input, metric_name=context.metric.name, group_ids=group_ids ) # Save performance diff --git a/plexe/validation/validators.py b/plexe/validation/validators.py index c50805c1..1de237d6 100644 --- a/plexe/validation/validators.py +++ b/plexe/validation/validators.py @@ -16,6 +16,43 @@ logger = logging.getLogger(__name__) +# ============================================ +# Dataset Split Validation +# ============================================ + + +def canonicalize_split_ratios(split_ratios: dict[str, float] | None) -> dict[str, float]: + """ + Normalize split ratio key aliases to canonical names. + + Canonical keys are: train, val, test. + Unknown keys and non-numeric values are ignored. + """ + if not split_ratios: + return {} + + alias_map = { + "train": "train", + "val": "val", + "valid": "val", + "validation": "val", + "test": "test", + } + + normalized: dict[str, float] = {} + for key, value in split_ratios.items(): + if not isinstance(value, int | float): + continue + + canonical_key = alias_map.get(str(key).strip().lower()) + if canonical_key is None: + continue + + normalized[canonical_key] = float(value) + + return normalized + + # ============================================ # Pipeline Validation # ============================================ @@ -342,6 +379,9 @@ def validate_dataset_splits( Returns: (is_valid, error_message) """ + normalized_expected = canonicalize_split_ratios(expected_ratios) + expects_test_split = normalized_expected.get("test", 0.0) > 0 + # Validate row counts (existence check implicit - count() fails if dataset doesn't exist) try: train_count = spark.read.parquet(train_uri).count() @@ -356,16 +396,28 @@ def validate_dataset_splits( except Exception as e: return False, f"Failed to read split datasets: {e}" + if train_count == 0: + return False, "Train split is empty" + if val_count == 0: + return False, "Validation split is empty" + if expects_test_split: + if not test_uri: + return False, "Test split expected but test URI was not provided" + if test_count == 0: + return False, "Test split is empty but final evaluation requires a non-empty test split" + logger.info(f"Split sizes: train={train_count}, val={val_count}, test={test_count}, total={total}") # Check ratios are within reasonable tolerance (10%) actual_ratios = {"train": train_count / total, "val": val_count / total, "test": test_count / total} + # TODO(split-validation): Escalate severe ratio drift to hard failure (not warning-only), + # so agent retries before finishing Phase 2. # Only check splits that exist in actual_ratios (ignore extra keys like "rationale") for split_name in actual_ratios.keys(): - if split_name not in expected_ratios: + if split_name not in normalized_expected: continue - expected = expected_ratios[split_name] + expected = normalized_expected[split_name] actual = actual_ratios[split_name] diff = abs(actual - expected) diff --git a/plexe/workflow.py b/plexe/workflow.py index 6076f114..88a709d8 100644 --- a/plexe/workflow.py +++ b/plexe/workflow.py @@ -63,6 +63,7 @@ from plexe.templates.features.pipeline_runner import transform_dataset_via_spark from plexe.templates.packaging.model_card_template import generate_model_card from plexe.helpers import evaluate_on_sample, select_viable_model_types +from plexe.validation.validators import canonicalize_split_ratios logger = logging.getLogger(__name__) @@ -178,6 +179,8 @@ def build_model( context = BuildContext.from_dict(checkpoint_data["context"]) if checkpoint_data.get("search_journal"): journal = SearchJournal.from_dict(checkpoint_data["search_journal"]) + if context.metric: + journal.optimization_direction = context.metric.optimization_direction context.scratch["_search_journal"] = journal logger.info(f"Restored SearchJournal with {len(journal.nodes)} solutions") if checkpoint_data.get("insight_store"): @@ -437,7 +440,7 @@ def build_model( if valid_alternatives: # Sort by performance and pick the best alternative - valid_alternatives.sort(key=lambda s: s.performance, reverse=True) + valid_alternatives.sort(key=journal.sort_key, reverse=True) fallback_solution = valid_alternatives[0] logger.info(f"Found {len(valid_alternatives)} valid alternatives") @@ -915,6 +918,16 @@ def prepare_data( else: # Default fallback split_ratios = {"train": 0.7, "val": 0.15, "test": 0.15} + + split_ratios = canonicalize_split_ratios(split_ratios) + if not {"train", "val", "test"}.issubset(split_ratios): + logger.warning( + "Recommended split ratios are missing one of train/val/test (%s); " + "falling back to default 70/15/15 for final evaluation.", + split_ratios, + ) + split_ratios = {"train": 0.7, "val": 0.15, "test": 0.15} + logger.info("Creating train/val/test splits from single dataset (final evaluation enabled)") else: # 2-way split: train/val only @@ -1342,9 +1355,13 @@ def search_models( # Use restored journal/insight_store if resuming, otherwise create fresh if restored_journal: journal = restored_journal + journal.optimization_direction = context.metric.optimization_direction logger.info(f"Using restored SearchJournal with {len(journal.nodes)} existing solutions") else: - journal = SearchJournal(baseline=context.heuristic_baseline) + journal = SearchJournal( + baseline=context.heuristic_baseline, + optimization_direction=context.metric.optimization_direction, + ) if restored_insight_store: insight_store = restored_insight_store @@ -1786,6 +1803,8 @@ def evaluate_final( if context.test_uri: logger.info(f"Loading test sample from {context.test_uri}") test_df_spark = spark.read.parquet(context.test_uri) + # TODO(evaluation-guard): Fail fast with a clear message when test split is empty, + # instead of letting downstream evaluator phases fail indirectly. # Sample for evaluation (20k-50k rows) sample_size = min(50000, test_df_spark.count()) test_sample_df = test_df_spark.limit(sample_size).toPandas() diff --git a/pyproject.toml b/pyproject.toml index b2bf899e..aee2a460 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "plexe" -version = "1.4.2" +version = "1.4.3" description = "An agentic framework for building ML models from natural language" authors = [ "Marcello De Bernardi ", diff --git a/tests/CODE_INDEX.md b/tests/CODE_INDEX.md index 50861deb..66b0f26a 100644 --- a/tests/CODE_INDEX.md +++ b/tests/CODE_INDEX.md @@ -1,6 +1,6 @@ # Code Index: tests -> Generated on 2026-03-03 00:06:47 +> Generated on 2026-03-03 05:08:33 Test suite structure and test case documentation. @@ -75,6 +75,14 @@ Tests for user feedback integration in agents. - `test_hypothesiser_includes_feedback(self, mock_context, mock_config)` - HypothesiserAgent should include feedback in instructions. - `test_agent_without_feedback_works(self, mock_context, mock_config)` - Agents should work normally when no feedback is provided. +--- +## `unit/agents/test_model_evaluator_prompt.py` +Prompt-level tests for ModelEvaluatorAgent probability guidance. + +**Functions:** +- `test_phase_1_prompt_includes_probability_metric_guidance()` - No description +- `test_build_agent_instructions_document_predict_proba_interface()` - No description + --- ## `unit/execution/training/test_local_runner.py` Tests for LocalProcessRunner GPU detection and command construction. @@ -99,6 +107,8 @@ Determinism tests for EvolutionarySearchPolicy local RNG behavior. **Functions:** - `test_evolutionary_policy_determinism(monkeypatch, tmp_path)` - No description +- `test_evolutionary_exploit_respects_lower_metric_direction(tmp_path)` - No description +- `test_should_stop_lower_metric_without_baseline_can_early_stop()` - No description --- ## `unit/search/test_insight_store.py` @@ -117,6 +127,7 @@ Unit tests for SearchJournal. - `test_journal_add_successful_node()` - Test recording a successful solution. - `test_journal_add_buggy_node()` - Test recording a failed attempt. - `test_journal_best_node_tracks_best()` - Test best_node returns the highest performing solution. +- `test_journal_best_node_respects_lower_direction()` - best_node should select smallest metric when optimization is lower. - `test_journal_failure_rate()` - Test failure rate computation. - `test_journal_failure_rate_empty()` - Test failure rate on empty journal. - `test_journal_get_history()` - Test history returns recent entries. @@ -124,6 +135,9 @@ Unit tests for SearchJournal. - `test_journal_improvement_trend_insufficient_data()` - Test improvement trend with fewer than 2 successful solutions. - `test_journal_get_history_includes_train_performance()` - get_history should include train_performance when set on a solution. - `test_journal_get_history_train_performance_none()` - get_history should include train_performance=None when not set. +- `test_journal_serialization_preserves_optimization_direction()` - to_dict/from_dict should preserve optimization_direction. +- `test_journal_from_dict_defaults_optimization_direction_to_higher()` - Older checkpoints without optimization_direction should default to higher. +- `test_journal_optimization_direction_setter_validates_values()` - No description --- ## `unit/search/test_tree_policy_determinism.py` @@ -131,6 +145,7 @@ Determinism tests for TreeSearchPolicy local RNG behavior. **Functions:** - `test_tree_policy_determinism(monkeypatch, tmp_path)` - No description +- `test_tree_policy_respects_lower_metric_direction(tmp_path)` - No description --- ## `unit/templates/features/test_pipeline_runner.py` @@ -159,6 +174,26 @@ Unit tests for PyTorch DataLoader worker fallback behavior. - `test_resolve_num_workers_uses_context_when_start_method_is_none(monkeypatch) -> None` - When get_start_method returns None, context start method should be used. - `test_resolve_num_workers_kept_on_non_darwin_spawn(monkeypatch) -> None` - Spawn on non-macOS should keep the requested worker count. +--- +## `unit/test_baseline_probability_validation.py` +Unit tests for baseline probability validation behavior. + +**Functions:** +- `test_validate_baseline_predictor_requires_predict_proba_for_probability_metrics(tmp_path)` - No description +- `test_validate_baseline_predictor_accepts_predict_proba_for_probability_metrics(tmp_path)` - No description + +--- +## `unit/test_catboost_predictor.py` +Unit tests for CatBoost predictor template. + +**`DummyPipeline`** - Minimal pipeline stub for tests. +- `transform(self, x)` - No description + +**Functions:** +- `test_catboost_predictor_predict_proba_classification() -> None` - No description +- `test_catboost_predictor_predict_proba_allows_missing_task_metadata() -> None` - No description +- `test_catboost_predictor_predict_proba_raises_for_regression() -> None` - No description + --- ## `unit/test_config.py` Unit tests for config helpers. @@ -182,6 +217,16 @@ Unit tests for workflow helper functions. - `test_select_viable_model_types_defaults_image()` - Default model types intersect with IMAGE_PATH. - `test_select_viable_model_types_no_intersection()` - No compatible frameworks should raise ValueError. - `test_compute_metric_map_grouped()` - MAP should compute per-group and average. +- `test_metric_requires_probabilities()` - No description +- `test_normalize_probability_predictions_binary_matrix_to_positive_scores()` - No description +- `test_normalize_probability_predictions_multiclass_keeps_matrix()` - No description +- `test_normalize_probability_predictions_multiclass_raises_on_1d()` - No description +- `test_normalize_probability_predictions_multiclass_raises_on_extra_columns()` - No description +- `test_normalize_probability_predictions_multiclass_raises_on_single_column_matrix()` - No description +- `test_normalize_probability_predictions_raises_when_validation_missing_class()` - No description +- `test_evaluate_predictor_uses_predict_for_label_metrics()` - No description +- `test_evaluate_predictor_uses_predict_proba_for_probability_metrics()` - No description +- `test_evaluate_predictor_raises_when_probability_metric_missing_predict_proba()` - No description --- ## `unit/test_imports.py` @@ -190,6 +235,26 @@ Test that all production modules can be imported without errors. **Functions:** - `test_all_modules_importable()` - Import all production modules in the plexe/ package to catch import errors. +--- +## `unit/test_keras_predictor.py` +Unit tests for Keras predictor template semantics. + +**`DummyPipeline`** - Minimal pipeline stub for tests. +- `transform(self, x)` - No description + +**`DummyModel`** - Minimal model stub for tests. +- `__init__(self, output)` +- `predict(self, x, verbose)` - No description + +**Functions:** +- `test_keras_probabilities_from_binary_logits() -> None` - No description +- `test_keras_probabilities_from_binary_two_logit_output() -> None` - No description +- `test_keras_probabilities_from_multiclass_logits() -> None` - No description +- `test_keras_probabilities_infer_logits_when_loss_config_missing() -> None` - No description +- `test_keras_predict_proba_raises_for_regression() -> None` - No description +- `test_keras_predict_proba_allows_missing_task_metadata() -> None` - No description +- `test_keras_predict_proba_raises_on_non_finite_outputs() -> None` - No description + --- ## `unit/test_lightgbm_predictor.py` Unit tests for LightGBM predictor template. @@ -197,12 +262,18 @@ Unit tests for LightGBM predictor template. **`DummyModel`** - Minimal model stub with a predict method. - `predict(self, x)` - No description +**`DummyClassificationModel`** - Minimal model stub with predict_proba for classification. +- `predict_proba(self, x)` - No description + **`DummyPipeline`** - Minimal pipeline stub with a transform method. - `transform(self, x)` - No description **Functions:** - `test_lightgbm_predictor_basic(tmp_path: Path) -> None` - No description - `test_lightgbm_predictor_label_encoder(tmp_path: Path) -> None` - No description +- `test_lightgbm_predictor_predict_proba_classification(tmp_path: Path) -> None` - No description +- `test_lightgbm_predictor_predict_proba_without_metadata(tmp_path: Path) -> None` - No description +- `test_lightgbm_predictor_predict_proba_raises_for_regression(tmp_path: Path) -> None` - No description --- ## `unit/test_models.py` @@ -215,6 +286,21 @@ Unit tests for core model dataclasses. - `test_solution_from_dict_backward_compatible()` - Old checkpoints missing train_performance should deserialize cleanly. - `test_solution_from_dict_with_train_performance()` - Checkpoints with train_performance should round-trip correctly. +--- +## `unit/test_pytorch_predictor.py` +Unit tests for PyTorch predictor template semantics. + +**`DummyPipeline`** - Minimal pipeline stub for tests. +- `transform(self, x)` - No description + +**`DummyModel`** - Minimal callable model stub for tests. +- `__init__(self, outputs)` + +**Functions:** +- `test_pytorch_predict_proba_binary_classification() -> None` - No description +- `test_pytorch_predict_proba_allows_missing_task_metadata() -> None` - No description +- `test_pytorch_predict_proba_raises_for_regression() -> None` - No description + --- ## `unit/test_submission_pytorch.py` Unit tests for PyTorch model submission. @@ -222,6 +308,30 @@ Unit tests for PyTorch model submission. **Functions:** - `test_save_model_pytorch(tmp_path)` - Test PyTorch model submission validation and context scratch storage. +--- +## `unit/test_submission_split_validation.py` +Unit tests for split URI submission validation. + +**Functions:** +- `test_save_split_uris_requires_test_when_expected(tmp_path)` - No description +- `test_save_split_uris_canonicalizes_validation_key(tmp_path)` - No description + +--- +## `unit/test_xgboost_predictor.py` +Unit tests for XGBoost predictor template. + +**`DummyModel`** - Minimal predictor stub for tests. +- `predict(self, x)` - No description +- `predict_proba(self, x)` - No description + +**`DummyPipeline`** - Minimal pipeline stub for tests. +- `transform(self, x)` - No description + +**Functions:** +- `test_xgboost_predictor_predict_proba_classification(tmp_path: Path) -> None` - No description +- `test_xgboost_predictor_predict_proba_without_metadata(tmp_path: Path) -> None` - No description +- `test_xgboost_predictor_predict_proba_raises_for_regression(tmp_path: Path) -> None` - No description + --- ## `unit/utils/test_parquet_dataset.py` Tests for streaming parquet data loading utilities. @@ -279,6 +389,8 @@ Unit tests for validation functions. - `test_validate_model_definition_unknown_type()` - Test unknown model type fails validation. - `test_validate_metric_function_object_success()` - Callable with correct signature should pass. - `test_validate_metric_function_object_bad_signature()` - Callable with wrong arg names should fail. +- `test_canonicalize_split_ratios_maps_validation_alias()` - No description +- `test_validate_dataset_splits_fails_when_expected_test_is_empty()` - No description --- ## `unit/workflow/test_checkpoint_resume_feedback.py` diff --git a/tests/unit/agents/test_model_evaluator_prompt.py b/tests/unit/agents/test_model_evaluator_prompt.py new file mode 100644 index 00000000..ad19bc53 --- /dev/null +++ b/tests/unit/agents/test_model_evaluator_prompt.py @@ -0,0 +1,23 @@ +"""Prompt-level tests for ModelEvaluatorAgent probability guidance.""" + +from __future__ import annotations + +import inspect + +from plexe.agents.model_evaluator import ModelEvaluatorAgent + + +def test_phase_1_prompt_includes_probability_metric_guidance(): + prompt = ModelEvaluatorAgent._get_phase_1_prompt("Predict churn", "roc_auc") + + assert "predict_proba" in prompt + assert "roc_auc" in prompt + assert "Binary: use positive-class scores" in prompt + assert "Multiclass: use full per-class probability matrix" in prompt + + +def test_build_agent_instructions_document_predict_proba_interface(): + source = inspect.getsource(ModelEvaluatorAgent._build_agent) + + assert "predict_proba" in source + assert "probability-based" in source diff --git a/tests/unit/search/test_evolutionary_policy_determinism.py b/tests/unit/search/test_evolutionary_policy_determinism.py index 93bfc448..7978685b 100644 --- a/tests/unit/search/test_evolutionary_policy_determinism.py +++ b/tests/unit/search/test_evolutionary_policy_determinism.py @@ -56,3 +56,27 @@ def _fail(*args, **kwargs): assert (selected_a is None) == (selected_b is None) if selected_a is not None: assert selected_a.solution_id == selected_b.solution_id + + +def test_evolutionary_exploit_respects_lower_metric_direction(tmp_path): + journal = SearchJournal(optimization_direction="lower") + for idx, perf in enumerate([0.9, 0.3, 0.2], start=1): + journal.add_node(_make_solution(idx, performance=perf)) + + policy = EvolutionarySearchPolicy(num_drafts=2, seed=456) + selected = policy._exploit_action(journal, iteration=9, max_iterations=10) + + assert selected is not None + assert selected.solution_id == 3 + + +def test_should_stop_lower_metric_without_baseline_can_early_stop(): + journal = SearchJournal(optimization_direction="lower") + journal.baseline_performance = 0.0 + for idx, perf in enumerate([0.9, 0.8, 0.7], start=1): + journal.add_node(_make_solution(idx, performance=perf)) + + policy = EvolutionarySearchPolicy(num_drafts=2, seed=456) + policy._calculate_stagnation = MagicMock(return_value=0.9) + + assert policy.should_stop(journal, iteration=5, max_iterations=10) diff --git a/tests/unit/search/test_journal.py b/tests/unit/search/test_journal.py index b76264c9..15af68fa 100644 --- a/tests/unit/search/test_journal.py +++ b/tests/unit/search/test_journal.py @@ -102,6 +102,22 @@ def test_journal_best_node_tracks_best(): assert journal.best_performance == 0.90 +def test_journal_best_node_respects_lower_direction(): + """best_node should select smallest metric when optimization is lower.""" + journal = SearchJournal(optimization_direction="lower") + + sol1 = _make_solution(0, performance=0.40) + sol2 = _make_solution(1, performance=0.25) + sol3 = _make_solution(2, performance=0.31) + + journal.add_node(sol1) + journal.add_node(sol2) + journal.add_node(sol3) + + assert journal.best_node == sol2 + assert journal.best_performance == 0.25 + + # ============================================ # Failure Rate Tests # ============================================ @@ -195,3 +211,31 @@ def test_journal_get_history_train_performance_none(): history = journal.get_history() assert history[0]["train_performance"] is None + + +def test_journal_serialization_preserves_optimization_direction(): + """to_dict/from_dict should preserve optimization_direction.""" + journal = SearchJournal(optimization_direction="lower") + journal.add_node(_make_solution(0, performance=0.3)) + + restored = SearchJournal.from_dict(journal.to_dict()) + assert restored.optimization_direction == "lower" + assert restored.best_performance == pytest.approx(0.3) + + +def test_journal_from_dict_defaults_optimization_direction_to_higher(): + """Older checkpoints without optimization_direction should default to higher.""" + journal = SearchJournal() + journal.add_node(_make_solution(0, performance=0.3)) + payload = journal.to_dict() + payload.pop("optimization_direction") + + restored = SearchJournal.from_dict(payload) + assert restored.optimization_direction == "higher" + + +def test_journal_optimization_direction_setter_validates_values(): + journal = SearchJournal() + + with pytest.raises(ValueError, match="optimization_direction must be 'higher' or 'lower'"): + journal.optimization_direction = "maximize" diff --git a/tests/unit/search/test_tree_policy_determinism.py b/tests/unit/search/test_tree_policy_determinism.py index 1e876f1c..98d73401 100644 --- a/tests/unit/search/test_tree_policy_determinism.py +++ b/tests/unit/search/test_tree_policy_determinism.py @@ -55,3 +55,17 @@ def _fail(*args, **kwargs): assert selected_a is not None assert selected_b is not None assert selected_a.solution_id == selected_b.solution_id + + +def test_tree_policy_respects_lower_metric_direction(tmp_path): + journal = SearchJournal(optimization_direction="lower") + for idx, perf in enumerate([0.9, 0.2, 0.5], start=1): + journal.add_node(_make_solution(idx, performance=perf)) + + context = _make_context(tmp_path) + policy = TreeSearchPolicy(num_drafts=2, debug_prob=0.0, seed=123) + + selected = policy.decide_next_solution(journal, context, iteration=9, max_iterations=10) + + assert selected is not None + assert selected.solution_id == 2 diff --git a/tests/unit/test_baseline_probability_validation.py b/tests/unit/test_baseline_probability_validation.py new file mode 100644 index 00000000..85c795db --- /dev/null +++ b/tests/unit/test_baseline_probability_validation.py @@ -0,0 +1,63 @@ +"""Unit tests for baseline probability validation behavior.""" + +from __future__ import annotations + +import numpy as np +import pandas as pd +import pytest + +from plexe.models import BuildContext, Metric +from plexe.tools.submission import get_validate_baseline_predictor_tool + + +def _make_context(tmp_path) -> BuildContext: + context = BuildContext( + user_id="user", + experiment_id="exp", + dataset_uri="/tmp/dataset.parquet", + work_dir=tmp_path, + intent="predict churn", + ) + context.output_targets = ["target"] + return context + + +def test_validate_baseline_predictor_requires_predict_proba_for_probability_metrics(tmp_path): + context = _make_context(tmp_path) + context.metric = Metric(name="roc_auc", optimization_direction="higher") + val_df = pd.DataFrame({"feature": [1, 2, 3, 4], "target": [0, 1, 1, 0]}) + + class HeuristicBaselinePredictor: + def predict(self, x): + return np.zeros(len(x), dtype=int) + + validate_tool = get_validate_baseline_predictor_tool(context, val_df) + predictor = HeuristicBaselinePredictor() + + with pytest.raises(ValueError, match="requires probability scores"): + validate_tool(predictor, "baseline", "missing predict_proba") + + +def test_validate_baseline_predictor_accepts_predict_proba_for_probability_metrics(tmp_path): + context = _make_context(tmp_path) + context.metric = Metric(name="roc_auc", optimization_direction="higher") + val_df = pd.DataFrame({"feature": [1, 2, 3, 4], "target": [0, 1, 1, 0]}) + + class HeuristicBaselinePredictor: + def predict(self, x): + return np.zeros(len(x), dtype=int) + + def predict_proba(self, x): + return pd.DataFrame( + { + "proba_0": np.array([0.9, 0.2, 0.1, 0.8]), + "proba_1": np.array([0.1, 0.8, 0.9, 0.2]), + } + ) + + validate_tool = get_validate_baseline_predictor_tool(context, val_df) + predictor = HeuristicBaselinePredictor() + message = validate_tool(predictor, "baseline", "has predict_proba") + + assert "validated" in message.lower() + assert context.baseline_performance is not None diff --git a/tests/unit/test_catboost_predictor.py b/tests/unit/test_catboost_predictor.py new file mode 100644 index 00000000..65f07125 --- /dev/null +++ b/tests/unit/test_catboost_predictor.py @@ -0,0 +1,62 @@ +"""Unit tests for CatBoost predictor template.""" + +from __future__ import annotations + +import numpy as np +import pandas as pd +import pytest + +catboost = pytest.importorskip("catboost") + +from plexe.templates.inference.catboost_predictor import CatBoostPredictor + + +class DummyPipeline: + """Minimal pipeline stub for tests.""" + + def transform(self, x): + return x + + +def test_catboost_predictor_predict_proba_classification() -> None: + model = catboost.CatBoostClassifier(iterations=5, verbose=False) + X_train = pd.DataFrame({"f1": [0.0, 0.1, 0.9, 1.0], "f2": [0.0, 0.2, 0.8, 1.0]}) + y_train = np.array([0, 0, 1, 1]) + model.fit(X_train, y_train) + + predictor = CatBoostPredictor.__new__(CatBoostPredictor) + predictor._task_type = "binary_classification" + predictor.model = model + predictor.pipeline = DummyPipeline() + + probabilities = predictor.predict_proba(pd.DataFrame({"f1": [0.05, 0.95], "f2": [0.05, 0.95]})) + + assert list(probabilities.columns) == ["proba_0", "proba_1"] + assert len(probabilities) == 2 + + +def test_catboost_predictor_predict_proba_allows_missing_task_metadata() -> None: + model = catboost.CatBoostClassifier(iterations=5, verbose=False) + X_train = pd.DataFrame({"f1": [0.0, 0.1, 0.9, 1.0], "f2": [0.0, 0.2, 0.8, 1.0]}) + y_train = np.array([0, 0, 1, 1]) + model.fit(X_train, y_train) + + predictor = CatBoostPredictor.__new__(CatBoostPredictor) + predictor._task_type = "" + predictor.model = model + predictor.pipeline = DummyPipeline() + + probabilities = predictor.predict_proba(pd.DataFrame({"f1": [0.05, 0.95], "f2": [0.05, 0.95]})) + + assert list(probabilities.columns) == ["proba_0", "proba_1"] + assert len(probabilities) == 2 + + +def test_catboost_predictor_predict_proba_raises_for_regression() -> None: + predictor = CatBoostPredictor.__new__(CatBoostPredictor) + predictor._task_type = "regression" + predictor.model = object() + predictor.pipeline = DummyPipeline() + + with pytest.raises(ValueError, match="only valid for classification"): + predictor.predict_proba(pd.DataFrame({"f1": [0.1], "f2": [0.2]})) diff --git a/tests/unit/test_helpers.py b/tests/unit/test_helpers.py index 3902f00c..1118653b 100644 --- a/tests/unit/test_helpers.py +++ b/tests/unit/test_helpers.py @@ -3,11 +3,18 @@ """ import numpy as np +import pandas as pd import pytest from sklearn.metrics import accuracy_score from plexe.config import DEFAULT_MODEL_TYPES, ModelType, detect_installed_frameworks -from plexe.helpers import compute_metric, select_viable_model_types +from plexe.helpers import ( + _evaluate_predictor, + compute_metric, + metric_requires_probabilities, + normalize_probability_predictions, + select_viable_model_types, +) from plexe.models import DataLayout @@ -85,3 +92,174 @@ def test_compute_metric_map_grouped(): result = compute_metric(y_true, y_pred, "map", group_ids=group_ids) assert result == pytest.approx(0.75) + + +def test_metric_requires_probabilities(): + assert metric_requires_probabilities("roc_auc") + assert metric_requires_probabilities("log_loss") + assert not metric_requires_probabilities("accuracy") + + +def test_normalize_probability_predictions_binary_matrix_to_positive_scores(): + y_true = np.array([0, 1, 1, 0]) + probs = np.array([[0.8, 0.2], [0.1, 0.9], [0.2, 0.8], [0.9, 0.1]]) + + normalized = normalize_probability_predictions(y_true, probs, "roc_auc") + + assert normalized.shape == (4,) + assert np.allclose(normalized, np.array([0.2, 0.9, 0.8, 0.1])) + + +def test_normalize_probability_predictions_multiclass_keeps_matrix(): + y_true = np.array([0, 1, 2]) + probs = np.array( + [ + [0.8, 0.1, 0.1], + [0.1, 0.7, 0.2], + [0.1, 0.2, 0.7], + ] + ) + + normalized = normalize_probability_predictions(y_true, probs, "roc_auc_ovr") + + assert normalized.shape == (3, 3) + assert np.allclose(normalized, probs) + + +def test_normalize_probability_predictions_multiclass_raises_on_1d(): + y_true = np.array([0, 1, 2]) + probs = np.array([0.2, 0.6, 0.4]) + + with pytest.raises(ValueError, match="per-class probabilities"): + normalize_probability_predictions(y_true, probs, "log_loss") + + +def test_normalize_probability_predictions_multiclass_raises_on_extra_columns(): + y_true = np.array([0, 1, 2]) + probs = np.array( + [ + [0.7, 0.1, 0.1, 0.1], + [0.1, 0.7, 0.1, 0.1], + [0.1, 0.1, 0.7, 0.1], + ] + ) + + with pytest.raises(ValueError, match="Probability outputs have 4 columns"): + normalize_probability_predictions(y_true, probs, "roc_auc_ovr") + + +def test_normalize_probability_predictions_multiclass_raises_on_single_column_matrix(): + y_true = np.array([0, 1, 2]) + probs = np.array([[0.2], [0.5], [0.3]]) + + with pytest.raises(ValueError, match="Probability outputs have 1 column"): + normalize_probability_predictions(y_true, probs, "log_loss") + + +def test_normalize_probability_predictions_raises_when_validation_missing_class(): + y_true = np.array([0, 1, 1, 0]) + probs = np.array( + [ + [0.8, 0.1, 0.1], + [0.1, 0.7, 0.2], + [0.2, 0.6, 0.2], + [0.7, 0.2, 0.1], + ] + ) + + with pytest.raises(ValueError, match="validation labels contain 2 distinct classes"): + normalize_probability_predictions(y_true, probs, "log_loss") + + +class _DummySparkDataFrame: + def __init__(self, pdf: pd.DataFrame): + self._pdf = pdf + + def toPandas(self) -> pd.DataFrame: + return self._pdf + + +class _DummySparkReader: + def __init__(self, pdf: pd.DataFrame): + self._pdf = pdf + + def parquet(self, _uri: str) -> _DummySparkDataFrame: + return _DummySparkDataFrame(self._pdf) + + +class _DummySpark: + def __init__(self, pdf: pd.DataFrame): + self.read = _DummySparkReader(pdf) + + +class _PredictorWithProba: + def __init__(self): + self.predict_calls = 0 + self.predict_proba_calls = 0 + + def predict(self, x: pd.DataFrame) -> pd.DataFrame: + self.predict_calls += 1 + return pd.DataFrame({"prediction": np.zeros(len(x), dtype=int)}) + + def predict_proba(self, x: pd.DataFrame) -> pd.DataFrame: + self.predict_proba_calls += 1 + return pd.DataFrame( + { + "proba_0": np.array([0.9, 0.2, 0.1, 0.8]), + "proba_1": np.array([0.1, 0.8, 0.9, 0.2]), + } + ) + + +def test_evaluate_predictor_uses_predict_for_label_metrics(): + predictor = _PredictorWithProba() + df = pd.DataFrame({"feature": [1, 2, 3, 4], "target": [0, 1, 0, 1]}) + + score = _evaluate_predictor( + spark=_DummySpark(df), + predictor=predictor, + data_uri="unused", + metric="accuracy", + target_columns=["target"], + group_column=None, + ) + + assert isinstance(score, float) + assert predictor.predict_calls == 1 + assert predictor.predict_proba_calls == 0 + + +def test_evaluate_predictor_uses_predict_proba_for_probability_metrics(): + predictor = _PredictorWithProba() + df = pd.DataFrame({"feature": [1, 2, 3, 4], "target": [0, 1, 1, 0]}) + + score = _evaluate_predictor( + spark=_DummySpark(df), + predictor=predictor, + data_uri="unused", + metric="roc_auc", + target_columns=["target"], + group_column=None, + ) + + assert isinstance(score, float) + assert predictor.predict_calls == 0 + assert predictor.predict_proba_calls == 1 + + +def test_evaluate_predictor_raises_when_probability_metric_missing_predict_proba(): + class _PredictOnly: + def predict(self, x: pd.DataFrame) -> pd.DataFrame: + return pd.DataFrame({"prediction": np.zeros(len(x), dtype=int)}) + + df = pd.DataFrame({"feature": [1, 2, 3, 4], "target": [0, 1, 1, 0]}) + + with pytest.raises(ValueError, match="does not implement predict_proba"): + _evaluate_predictor( + spark=_DummySpark(df), + predictor=_PredictOnly(), + data_uri="unused", + metric="roc_auc", + target_columns=["target"], + group_column=None, + ) diff --git a/tests/unit/test_keras_predictor.py b/tests/unit/test_keras_predictor.py new file mode 100644 index 00000000..67f7b27d --- /dev/null +++ b/tests/unit/test_keras_predictor.py @@ -0,0 +1,119 @@ +"""Unit tests for Keras predictor template semantics.""" + +from __future__ import annotations + +import numpy as np +import pandas as pd +import pytest + +from plexe.templates.inference.keras_predictor import KerasPredictor + + +class DummyPipeline: + """Minimal pipeline stub for tests.""" + + def transform(self, x): + return x + + +class DummyModel: + """Minimal model stub for tests.""" + + def __init__(self, output): + self._output = output + + def predict(self, x, verbose=0): + return self._output + + +def test_keras_probabilities_from_binary_logits() -> None: + predictor = KerasPredictor.__new__(KerasPredictor) + predictor._task_type = "binary_classification" + predictor._loss_class = "BinaryCrossentropy" + predictor._loss_config = {"from_logits": True} + + probs = predictor._probabilities_from_raw(np.array([[-2.0], [0.0], [2.0]])) + + assert probs.shape == (3, 2) + assert np.allclose(probs[:, 0] + probs[:, 1], np.ones(3)) + assert probs[0, 1] < probs[1, 1] < probs[2, 1] + + +def test_keras_probabilities_from_binary_two_logit_output() -> None: + predictor = KerasPredictor.__new__(KerasPredictor) + predictor._task_type = "binary_classification" + predictor._loss_class = "BinaryCrossentropy" + predictor._loss_config = {"from_logits": True} + + probs = predictor._probabilities_from_raw(np.array([[2.0, -2.0], [-2.0, 2.0]])) + + assert probs.shape == (2, 2) + assert np.isclose(probs[0].sum(), 1.0) + assert np.isclose(probs[1].sum(), 1.0) + assert probs[0, 0] > probs[0, 1] + assert probs[1, 1] > probs[1, 0] + + +def test_keras_probabilities_from_multiclass_logits() -> None: + predictor = KerasPredictor.__new__(KerasPredictor) + predictor._task_type = "multiclass_classification" + predictor._loss_class = "SparseCategoricalCrossentropy" + predictor._loss_config = {"from_logits": True} + + probs = predictor._probabilities_from_raw(np.array([[1.0, 2.0, 3.0]])) + + assert probs.shape == (1, 3) + assert np.isclose(probs.sum(), 1.0) + assert np.argmax(probs, axis=1)[0] == 2 + + +def test_keras_probabilities_infer_logits_when_loss_config_missing() -> None: + predictor = KerasPredictor.__new__(KerasPredictor) + predictor._task_type = "binary_classification" + predictor._loss_class = "BinaryCrossentropy" + predictor._loss_config = {} + + probs = predictor._probabilities_from_raw(np.array([[-2.0], [2.0]])) + + assert probs.shape == (2, 2) + assert np.allclose(probs[:, 0] + probs[:, 1], np.ones(2)) + assert probs[0, 1] < probs[1, 1] + + +def test_keras_predict_proba_raises_for_regression() -> None: + predictor = KerasPredictor.__new__(KerasPredictor) + predictor._task_type = "regression" + predictor._loss_class = "MeanSquaredError" + predictor._loss_config = {} + predictor.pipeline = DummyPipeline() + predictor.model = DummyModel(np.array([[0.5], [0.7]])) + + with pytest.raises(ValueError, match="only valid for classification"): + predictor.predict_proba(pd.DataFrame({"f1": [1.0, 2.0]})) + + +def test_keras_predict_proba_allows_missing_task_metadata() -> None: + predictor = KerasPredictor.__new__(KerasPredictor) + predictor._task_type = "" + predictor._loss_class = "" + predictor._loss_config = {} + predictor.pipeline = DummyPipeline() + predictor.model = DummyModel(np.array([[-2.0], [2.0]])) + + probabilities = predictor.predict_proba(pd.DataFrame({"f1": [1.0, 2.0]})) + + assert list(probabilities.columns) == ["proba_0", "proba_1"] + assert len(probabilities) == 2 + assert probabilities.iloc[0]["proba_1"] < probabilities.iloc[1]["proba_1"] + + +def test_keras_predict_proba_raises_on_non_finite_outputs() -> None: + predictor = KerasPredictor.__new__(KerasPredictor) + predictor._task_type = "" + predictor._loss_class = "" + predictor._loss_config = {} + predictor.pipeline = DummyPipeline() + predictor.model = DummyModel(np.array([[np.nan], [np.inf]])) + + with pytest.raises(ValueError, match="contain NaN/Inf"): + predictor.predict_proba(pd.DataFrame({"f1": [1.0, 2.0]})) diff --git a/tests/unit/test_lightgbm_predictor.py b/tests/unit/test_lightgbm_predictor.py index ede2dad7..b9d835ae 100644 --- a/tests/unit/test_lightgbm_predictor.py +++ b/tests/unit/test_lightgbm_predictor.py @@ -9,6 +9,7 @@ import joblib import numpy as np import pandas as pd +import pytest from sklearn.preprocessing import LabelEncoder from plexe.templates.inference.lightgbm_predictor import LightGBMPredictor @@ -21,6 +22,13 @@ def predict(self, x): return np.zeros(len(x), dtype=int) +class DummyClassificationModel(DummyModel): + """Minimal model stub with predict_proba for classification.""" + + def predict_proba(self, x): + return np.tile(np.array([[0.7, 0.3]]), (len(x), 1)) + + class DummyPipeline: """Minimal pipeline stub with a transform method.""" @@ -43,6 +51,12 @@ def _write_artifacts(base_dir: Path, with_encoder: bool = False) -> Path: return artifacts_dir +def _write_metadata(base_dir: Path, task_type: str) -> None: + artifacts_dir = base_dir / "artifacts" + metadata_path = artifacts_dir / "metadata.json" + metadata_path.write_text(f'{{"task_type": "{task_type}"}}', encoding="utf-8") + + def test_lightgbm_predictor_basic(tmp_path: Path) -> None: _write_artifacts(tmp_path) @@ -64,3 +78,41 @@ def test_lightgbm_predictor_label_encoder(tmp_path: Path) -> None: predictions = predictor.predict(input_df)["prediction"].tolist() assert predictions == ["no", "no"] + + +def test_lightgbm_predictor_predict_proba_classification(tmp_path: Path) -> None: + artifacts_dir = _write_artifacts(tmp_path) + joblib.dump(DummyClassificationModel(), artifacts_dir / "model.pkl") + _write_metadata(tmp_path, "binary_classification") + + predictor = LightGBMPredictor(str(tmp_path)) + input_df = pd.DataFrame({"a": [1, 2], "b": [3, 4]}) + + probabilities = predictor.predict_proba(input_df) + + assert list(probabilities.columns) == ["proba_0", "proba_1"] + assert len(probabilities) == 2 + + +def test_lightgbm_predictor_predict_proba_without_metadata(tmp_path: Path) -> None: + artifacts_dir = _write_artifacts(tmp_path) + joblib.dump(DummyClassificationModel(), artifacts_dir / "model.pkl") + + predictor = LightGBMPredictor(str(tmp_path)) + input_df = pd.DataFrame({"a": [1, 2], "b": [3, 4]}) + + probabilities = predictor.predict_proba(input_df) + + assert list(probabilities.columns) == ["proba_0", "proba_1"] + assert len(probabilities) == 2 + + +def test_lightgbm_predictor_predict_proba_raises_for_regression(tmp_path: Path) -> None: + _write_artifacts(tmp_path) + _write_metadata(tmp_path, "regression") + + predictor = LightGBMPredictor(str(tmp_path)) + input_df = pd.DataFrame({"a": [1, 2], "b": [3, 4]}) + + with pytest.raises(ValueError, match="only valid for classification"): + predictor.predict_proba(input_df) diff --git a/tests/unit/test_pytorch_predictor.py b/tests/unit/test_pytorch_predictor.py new file mode 100644 index 00000000..a7522325 --- /dev/null +++ b/tests/unit/test_pytorch_predictor.py @@ -0,0 +1,62 @@ +"""Unit tests for PyTorch predictor template semantics.""" + +from __future__ import annotations + +import pandas as pd +import pytest + +torch = pytest.importorskip("torch") + +from plexe.templates.inference.pytorch_predictor import PyTorchPredictor + + +class DummyPipeline: + """Minimal pipeline stub for tests.""" + + def transform(self, x): + return x.values + + +class DummyModel: + """Minimal callable model stub for tests.""" + + def __init__(self, outputs): + self._outputs = outputs + + def __call__(self, x_tensor): + return self._outputs + + +def test_pytorch_predict_proba_binary_classification() -> None: + predictor = PyTorchPredictor.__new__(PyTorchPredictor) + predictor._task_type = "binary_classification" + predictor.pipeline = DummyPipeline() + predictor.model = DummyModel(torch.tensor([[-2.0], [2.0]], dtype=torch.float32)) + + probabilities = predictor.predict_proba(pd.DataFrame({"f1": [0.0, 1.0]})) + + assert list(probabilities.columns) == ["proba_0", "proba_1"] + assert len(probabilities) == 2 + assert probabilities.iloc[0]["proba_1"] < probabilities.iloc[1]["proba_1"] + + +def test_pytorch_predict_proba_allows_missing_task_metadata() -> None: + predictor = PyTorchPredictor.__new__(PyTorchPredictor) + predictor._task_type = "" + predictor.pipeline = DummyPipeline() + predictor.model = DummyModel(torch.tensor([[-2.0], [2.0]], dtype=torch.float32)) + + probabilities = predictor.predict_proba(pd.DataFrame({"f1": [0.0, 1.0]})) + + assert list(probabilities.columns) == ["proba_0", "proba_1"] + assert len(probabilities) == 2 + + +def test_pytorch_predict_proba_raises_for_regression() -> None: + predictor = PyTorchPredictor.__new__(PyTorchPredictor) + predictor._task_type = "regression" + predictor.pipeline = DummyPipeline() + predictor.model = DummyModel(torch.tensor([[0.2], [0.8]], dtype=torch.float32)) + + with pytest.raises(ValueError, match="only valid for classification"): + predictor.predict_proba(pd.DataFrame({"f1": [0.0, 1.0]})) diff --git a/tests/unit/test_submission_split_validation.py b/tests/unit/test_submission_split_validation.py new file mode 100644 index 00000000..5af9eb6a --- /dev/null +++ b/tests/unit/test_submission_split_validation.py @@ -0,0 +1,73 @@ +"""Unit tests for split URI submission validation.""" + +from __future__ import annotations + +import pytest + +from plexe.models import BuildContext +from plexe.tools.submission import get_save_split_uris_tool + + +class _DummySparkFrame: + def __init__(self, count: int): + self._count = count + + def count(self) -> int: + return self._count + + +class _DummySparkReader: + def __init__(self, counts: dict[str, int]): + self._counts = counts + + def parquet(self, uri: str) -> _DummySparkFrame: + if uri not in self._counts: + raise ValueError(f"Unknown URI: {uri}") + return _DummySparkFrame(self._counts[uri]) + + +class _DummySpark: + def __init__(self, counts: dict[str, int]): + self.read = _DummySparkReader(counts) + + +def _make_context(tmp_path) -> BuildContext: + return BuildContext( + user_id="user", + experiment_id="exp", + dataset_uri="dataset.parquet", + work_dir=tmp_path, + intent="predict transported", + ) + + +def test_save_split_uris_requires_test_when_expected(tmp_path): + context = _make_context(tmp_path) + spark = _DummySpark({"train_uri": 80, "val_uri": 20}) + + save_split_uris = get_save_split_uris_tool( + context=context, + spark=spark, + expected_ratios={"train": 0.7, "val": 0.15, "test": 0.15}, + ) + + with pytest.raises(ValueError, match="non-empty test split is required"): + save_split_uris(train_uri="train_uri", val_uri="val_uri") + + +def test_save_split_uris_canonicalizes_validation_key(tmp_path): + context = _make_context(tmp_path) + spark = _DummySpark({"train_uri": 80, "val_uri": 20}) + + save_split_uris = get_save_split_uris_tool( + context=context, + spark=spark, + expected_ratios={"train": 0.8, "validation": 0.2}, + ) + + message = save_split_uris(train_uri="train_uri", val_uri="val_uri") + + assert "saved successfully" in message.lower() + assert context.scratch["_train_uri"] == "train_uri" + assert context.scratch["_val_uri"] == "val_uri" + assert context.scratch["_test_uri"] is None diff --git a/tests/unit/test_xgboost_predictor.py b/tests/unit/test_xgboost_predictor.py new file mode 100644 index 00000000..d2880464 --- /dev/null +++ b/tests/unit/test_xgboost_predictor.py @@ -0,0 +1,73 @@ +"""Unit tests for XGBoost predictor template.""" + +from __future__ import annotations + +from pathlib import Path + +import joblib +import numpy as np +import pandas as pd +import pytest + +from plexe.templates.inference.xgboost_predictor import XGBoostPredictor + + +class DummyModel: + """Minimal predictor stub for tests.""" + + def predict(self, x): + return np.zeros(len(x), dtype=int) + + def predict_proba(self, x): + return np.tile(np.array([[0.6, 0.4]]), (len(x), 1)) + + +class DummyPipeline: + """Minimal pipeline stub for tests.""" + + def transform(self, x): + return x + + +def _write_artifacts(base_dir: Path, task_type: str) -> None: + artifacts_dir = base_dir / "artifacts" + artifacts_dir.mkdir(parents=True, exist_ok=True) + + joblib.dump(DummyModel(), artifacts_dir / "model.pkl") + joblib.dump(DummyPipeline(), artifacts_dir / "pipeline.pkl") + (artifacts_dir / "metadata.json").write_text(f'{{"task_type": "{task_type}"}}', encoding="utf-8") + + +def test_xgboost_predictor_predict_proba_classification(tmp_path: Path) -> None: + _write_artifacts(tmp_path, "binary_classification") + + predictor = XGBoostPredictor(str(tmp_path)) + input_df = pd.DataFrame({"a": [1, 2], "b": [3, 4]}) + + probabilities = predictor.predict_proba(input_df) + + assert list(probabilities.columns) == ["proba_0", "proba_1"] + assert len(probabilities) == 2 + + +def test_xgboost_predictor_predict_proba_without_metadata(tmp_path: Path) -> None: + _write_artifacts(tmp_path, "binary_classification") + (tmp_path / "artifacts" / "metadata.json").unlink() + + predictor = XGBoostPredictor(str(tmp_path)) + input_df = pd.DataFrame({"a": [1, 2], "b": [3, 4]}) + + probabilities = predictor.predict_proba(input_df) + + assert list(probabilities.columns) == ["proba_0", "proba_1"] + assert len(probabilities) == 2 + + +def test_xgboost_predictor_predict_proba_raises_for_regression(tmp_path: Path) -> None: + _write_artifacts(tmp_path, "regression") + + predictor = XGBoostPredictor(str(tmp_path)) + input_df = pd.DataFrame({"a": [1, 2], "b": [3, 4]}) + + with pytest.raises(ValueError, match="only valid for classification"): + predictor.predict_proba(input_df) diff --git a/tests/unit/validation/test_validators.py b/tests/unit/validation/test_validators.py index 985f79a9..e20c03e4 100644 --- a/tests/unit/validation/test_validators.py +++ b/tests/unit/validation/test_validators.py @@ -7,6 +7,8 @@ from sklearn.preprocessing import StandardScaler from plexe.validation.validators import ( + canonicalize_split_ratios, + validate_dataset_splits, validate_sklearn_pipeline, validate_xgboost_params, validate_model_definition, @@ -127,3 +129,47 @@ def metric_fn(a, b): assert not is_valid assert "Arguments must be named" in error + + +class _DummySparkFrame: + def __init__(self, count: int): + self._count = count + + def count(self) -> int: + return self._count + + +class _DummySparkReader: + def __init__(self, counts: dict[str, int]): + self._counts = counts + + def parquet(self, uri: str) -> _DummySparkFrame: + if uri not in self._counts: + raise ValueError(f"Unknown URI: {uri}") + return _DummySparkFrame(self._counts[uri]) + + +class _DummySpark: + def __init__(self, counts: dict[str, int]): + self.read = _DummySparkReader(counts) + + +def test_canonicalize_split_ratios_maps_validation_alias(): + ratios = canonicalize_split_ratios({"train": 0.8, "validation": 0.2, "note": "ignored"}) + + assert ratios == {"train": 0.8, "val": 0.2} + + +def test_validate_dataset_splits_fails_when_expected_test_is_empty(): + spark = _DummySpark({"train_uri": 80, "val_uri": 20, "test_uri": 0}) + + is_valid, error = validate_dataset_splits( + spark=spark, + train_uri="train_uri", + val_uri="val_uri", + test_uri="test_uri", + expected_ratios={"train": 0.7, "val": 0.15, "test": 0.15}, + ) + + assert not is_valid + assert "Test split is empty" in error