Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions plexe/CODE_INDEX.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Code Index: plexe

> Generated on 2026-03-02 22:03:39
> Generated on 2026-03-03 00:06:47

Code structure and public interface documentation for the **plexe** package.

Expand Down Expand Up @@ -222,7 +222,7 @@ Helper functions for workflow.

**Functions:**
- `select_viable_model_types(data_layout: DataLayout, selected_frameworks: list[str] | None) -> list[str]` - Select viable model types using three-tier filtering.
- `evaluate_on_sample(spark: SparkSession, sample_uri: str, model_artifacts_path: Path, model_type: str, metric: str, target_columns: list[str], group_column: str | None) -> float` - Evaluate model on sample (fast).
- `evaluate_on_sample(spark: SparkSession, sample_uri: str, model_artifacts_path: Path, model_type: str, metric: str, target_columns: list[str], group_column: str | None, train_sample_uri: str | None) -> tuple[float, float | None]` - Evaluate model on validation sample, optionally also on training sample.
- `compute_metric_hardcoded(y_true, y_pred, metric_name: str) -> float` - Compute metric using hardcoded sklearn implementations.
- `compute_metric(y_true, y_pred, metric_name: str, group_ids) -> float` - Compute metric value.

Expand Down
13 changes: 12 additions & 1 deletion plexe/agents/hypothesiser.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,13 @@ def _summarize_node(node) -> str:
if not node:
return "No node"

summary = f" Performance: {node.performance:.4f}\n" if node.performance else " Performance: N/A\n"
if node.performance is not None:
summary = f" Val Performance: {node.performance:.4f}\n"
if node.train_performance is not None:
gap = node.train_performance - node.performance
summary += f" Train Performance: {node.train_performance:.4f} (train-val gap: {gap:+.4f})\n"
else:
summary = " Performance: N/A\n"

if node.plan:
summary += f" Features: {node.plan.features.strategy}\n"
Expand Down Expand Up @@ -236,6 +242,11 @@ def _summarize_history(history: list[dict]) -> str:
status = f"✓ {perf:.4f}" if success and perf else ("✗ FAILED" if not success else "pending")
summary += f" Solution {solution_id} ({stage}): {status}\n"

train_perf = entry.get("train_performance")
if success and perf and train_perf is not None:
gap = train_perf - perf
summary += f" Train/val gap: {gap:+.4f}\n"

if entry.get("error"):
summary += f" Error: {entry['error'][:80]}\n"

Expand Down
5 changes: 4 additions & 1 deletion plexe/agents/insight_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,10 @@ def _summarize_results(self, parent_perf: float | None) -> str:
if perf_delta is not None:
perf_pct = (perf_delta / parent_perf * 100) if parent_perf else 0
perf_str += f" ({perf_delta:+.4f}, {perf_pct:+.1f}%)"
summary += f" Performance: {perf_str}\n"
summary += f" Val Performance: {perf_str}\n"
if sol.train_performance is not None:
gap = sol.train_performance - sol.performance
summary += f" Train Performance: {sol.train_performance:.4f} (train-val gap: {gap:+.4f})\n"
else:
summary += f" Status: {status}\n"

Expand Down
77 changes: 47 additions & 30 deletions plexe/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

import logging
from pathlib import Path
from typing import TYPE_CHECKING
from typing import Any, TYPE_CHECKING

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -93,68 +93,85 @@ def evaluate_on_sample(
metric: str,
target_columns: list[str],
group_column: str | None = None,
) -> float:
train_sample_uri: str | None = None,
) -> tuple[float, float | None]:
"""
Evaluate model on sample (fast).
Evaluate model on validation sample, optionally also on training sample.

Args:
spark: SparkSession
sample_uri: Sample data URI
sample_uri: Validation sample data URI
model_artifacts_path: Path to model artifacts
model_type: "xgboost", "catboost", "lightgbm", "keras", or "pytorch"
metric: Metric name
target_columns: Target column names
group_column: Optional group column for ranking metrics (query_id, session_id)
train_sample_uri: Optional training sample URI (for train/val gap computation)

Returns:
Performance value
Tuple of (val_performance, train_performance). train_performance is None
when train_sample_uri is not provided.
"""
predictor = _load_predictor(model_artifacts_path, model_type)
val_performance = _evaluate_predictor(spark, predictor, sample_uri, metric, target_columns, group_column)
logger.info(f"Val sample performance ({metric}): {val_performance:.4f}")

# TODO: Computing secondary metrics (e.g. per-class breakdown, calibration) per solution during search.
train_performance = None
if train_sample_uri:
train_performance = _evaluate_predictor(
spark, predictor, train_sample_uri, metric, target_columns, group_column
)
gap = train_performance - val_performance
logger.info(f"Train sample performance ({metric}): {train_performance:.4f} (train-val gap: {gap:+.4f})")

logger.info(f"Evaluating on sample with metric: {metric}")

# Load Sample
sample_df = spark.read.parquet(sample_uri).toPandas()

# Extract group IDs if ranking task
group_ids = sample_df[group_column].values if group_column and group_column in sample_df.columns else None

# Use column names instead of positional indexing to handle target columns in any position
columns_to_drop = list(target_columns)
if group_column and group_column in sample_df.columns:
columns_to_drop.append(group_column)
return val_performance, train_performance

X_sample = sample_df.drop(columns=columns_to_drop)
y_sample = sample_df[target_columns[0]]

# Load Predictor
def _load_predictor(model_artifacts_path: Path, model_type: str) -> Any:
"""Load the appropriate predictor for a model type."""
if model_type == ModelType.XGBOOST:
from plexe.templates.inference.xgboost_predictor import XGBoostPredictor

predictor = XGBoostPredictor(str(model_artifacts_path))
return XGBoostPredictor(str(model_artifacts_path))
elif model_type == ModelType.CATBOOST:
from plexe.templates.inference.catboost_predictor import CatBoostPredictor

predictor = CatBoostPredictor(str(model_artifacts_path))
return CatBoostPredictor(str(model_artifacts_path))
elif model_type == ModelType.LIGHTGBM:
from plexe.templates.inference.lightgbm_predictor import LightGBMPredictor

predictor = LightGBMPredictor(str(model_artifacts_path))
return LightGBMPredictor(str(model_artifacts_path))
elif model_type == ModelType.KERAS:
from plexe.templates.inference.keras_predictor import KerasPredictor

predictor = KerasPredictor(str(model_artifacts_path))
return KerasPredictor(str(model_artifacts_path))
else:
from plexe.templates.inference.pytorch_predictor import PyTorchPredictor

predictor = PyTorchPredictor(str(model_artifacts_path))
return PyTorchPredictor(str(model_artifacts_path))

# Predict and compute metric on predictions
predictions = predictor.predict(X_sample)["prediction"].values
performance = compute_metric(y_sample, predictions, metric, group_ids=group_ids)

logger.info(f"Sample performance ({metric}): {performance:.4f}")
def _evaluate_predictor(
spark: "SparkSession",
predictor: Any,
data_uri: str,
metric: str,
target_columns: list[str],
group_column: str | None,
) -> float:
"""Run predictor on a dataset and compute metric."""
df = spark.read.parquet(data_uri).toPandas()
group_ids = df[group_column].values if group_column and group_column in df.columns else None

columns_to_drop = list(target_columns)
if group_column and group_column in df.columns:
columns_to_drop.append(group_column)

return performance
X = df.drop(columns=columns_to_drop)
y = df[target_columns[0]]
predictions = predictor.predict(X)["prediction"].values
return compute_metric(y, predictions, metric, group_ids=group_ids)


def compute_metric_hardcoded(y_true, y_pred, metric_name: str) -> float:
Expand Down
5 changes: 4 additions & 1 deletion plexe/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,8 @@ class Solution:

# Execution Results
model_artifacts_path: Path | None = None
performance: float | None = None
performance: float | None = None # Validation set metric
train_performance: float | None = None # Training set metric (for overfitting/underfitting detection)
training_time: float | None = None

# Tree Structure
Expand Down Expand Up @@ -359,6 +360,7 @@ def to_dict(self) -> dict:
"parent_solution_id": self.parent.solution_id if self.parent else None,
"child_solution_ids": [c.solution_id for c in self.children],
"performance": self.performance,
"train_performance": self.train_performance,
"training_time": self.training_time,
"is_buggy": self.is_buggy,
"error": self.error,
Expand Down Expand Up @@ -398,6 +400,7 @@ def from_dict(d: dict, all_solutions: dict[int, "Solution"]) -> "Solution":
model_type=d["model_type"],
model_artifacts_path=Path(d["model_artifacts_path"]) if d.get("model_artifacts_path") else None,
performance=d.get("performance"),
train_performance=d.get("train_performance"),
training_time=d.get("training_time"),
parent=None, # Will be linked in second pass
children=[], # Will be linked in second pass
Expand Down
1 change: 1 addition & 0 deletions plexe/search/journal.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ def get_history(self, limit: int = 10) -> list[dict]:
"stage": _compute_stage(node),
"success": not node.is_buggy,
"performance": node.performance,
"train_performance": node.train_performance,
"error": node.error,
}

Expand Down
27 changes: 22 additions & 5 deletions plexe/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,11 @@ def build_model(
logger.error("No search journal or good nodes available for fallback")
logger.error("Proceeding with failed model (will be marked as FAILED)")

# TODO(EVAL_REFINEMENT_LOOP): When verdict is CONDITIONAL_PASS or FAIL with actionable
# HIGH-priority recommendations, summarize evaluation findings into structured feedback,
# loop back to search_models() for 1-3 targeted iterations, then re-evaluate.
# This creates a closed loop: search -> evaluate -> refine -> re-evaluate.

# Phase 6: Package Final Deliverables
if start_phase <= 6:
with tracer.start_as_current_span("Phase 6: Package Final Model"):
Expand Down Expand Up @@ -1260,19 +1265,21 @@ def _execute_variant(
(src_dir / "__init__.py").write_text("# Auto-generated\n")

# Evaluate
performance = evaluate_on_sample(
performance, train_performance = evaluate_on_sample(
spark=spark,
sample_uri=variant_context.val_sample_uri,
model_artifacts_path=model_artifacts_path,
model_type=plan.model.model_type,
metric=variant_context.metric.name,
target_columns=variant_context.output_targets,
group_column=variant_context.group_column,
train_sample_uri=variant_context.train_sample_uri,
)

# Update solution
new_solution.model_artifacts_path = model_artifacts_path
new_solution.performance = float(performance)
new_solution.train_performance = float(train_performance) if train_performance is not None else None
new_solution.training_time = time.time() - start_time
new_solution.is_buggy = False

Expand Down Expand Up @@ -1405,6 +1412,17 @@ def search_models(

logger.info(f"Generated {len(plans)} bootstrap plan(s)")

# Synthetic hypothesis for insight extraction on bootstrap results
hypothesis = Hypothesis(
expand_solution_id=-1,
focus="both",
vary="bootstrap_initial_solutions",
num_variants=len(plans),
rationale="Initial diverse solutions to seed the search tree",
keep_from_parent=[],
expected_impact="Establish baseline performance range across strategies",
)

except Exception as e:
logger.error(f"Bootstrap planning failed: {e}")
continue # Skip this iteration
Expand Down Expand Up @@ -1506,10 +1524,9 @@ def search_models(
solution_id_counter += len(variant_ids)

# ============================================
# Step 2e: Extract Insights from Variants (skip in bootstrap mode)
# Step 2e: Extract Insights from Variants
# ============================================
if variant_solutions and expand_solution_id is not None:
# Only extract insights when we have a hypothesis to learn from
if variant_solutions:
try:
InsightExtractorAgent(
hypothesis=hypothesis,
Expand Down Expand Up @@ -1699,7 +1716,7 @@ def retrain_on_full_dataset(
# ============================================
logger.info("Evaluating final model on full validation set...")

final_val_performance = evaluate_on_sample(
final_val_performance, _ = evaluate_on_sample(
spark=spark,
sample_uri=context.val_uri, # ← FULL validation set
model_artifacts_path=final_artifacts_path,
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "plexe"
version = "1.4.0"
version = "1.4.1"
description = "An agentic framework for building ML models from natural language"
authors = [
"Marcello De Bernardi <mdebernardi@plexe.ai>",
Expand Down
8 changes: 7 additions & 1 deletion tests/CODE_INDEX.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Code Index: tests

> Generated on 2026-03-02 22:03:39
> Generated on 2026-03-03 00:06:47

Test suite structure and test case documentation.

Expand Down Expand Up @@ -122,6 +122,8 @@ Unit tests for SearchJournal.
- `test_journal_get_history()` - Test history returns recent entries.
- `test_journal_improvement_trend_improving()` - Test improvement trend with steadily improving solutions.
- `test_journal_improvement_trend_insufficient_data()` - Test improvement trend with fewer than 2 successful solutions.
- `test_journal_get_history_includes_train_performance()` - get_history should include train_performance when set on a solution.
- `test_journal_get_history_train_performance_none()` - get_history should include train_performance=None when not set.

---
## `unit/search/test_tree_policy_determinism.py`
Expand Down Expand Up @@ -208,6 +210,10 @@ Unit tests for core model dataclasses.

**Functions:**
- `test_build_context_update_and_unknown_key()` - Update should set known fields and reject unknown keys.
- `test_solution_train_performance_defaults_to_none()` - New field should default to None for backward compatibility.
- `test_solution_to_dict_includes_train_performance()` - to_dict should serialize train_performance.
- `test_solution_from_dict_backward_compatible()` - Old checkpoints missing train_performance should deserialize cleanly.
- `test_solution_from_dict_with_train_performance()` - Checkpoints with train_performance should round-trip correctly.

---
## `unit/test_submission_pytorch.py`
Expand Down
25 changes: 25 additions & 0 deletions tests/unit/search/test_journal.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,3 +170,28 @@ def test_journal_improvement_trend_insufficient_data():
trend = journal.get_improvement_trend()

assert trend == 0.0


# ============================================
# Train Performance in History Tests
# ============================================


def test_journal_get_history_includes_train_performance():
"""get_history should include train_performance when set on a solution."""
journal = SearchJournal()
sol = _make_solution(0, performance=0.85)
sol.train_performance = 0.92
journal.add_node(sol)

history = journal.get_history()
assert history[0]["train_performance"] == 0.92


def test_journal_get_history_train_performance_none():
"""get_history should include train_performance=None when not set."""
journal = SearchJournal()
journal.add_node(_make_solution(0, performance=0.85))

history = journal.get_history()
assert history[0]["train_performance"] is None
Loading