diff --git a/dspy/evaluate/auto_evaluation.py b/dspy/evaluate/auto_evaluation.py index bca596df74..8ea36230ae 100644 --- a/dspy/evaluate/auto_evaluation.py +++ b/dspy/evaluate/auto_evaluation.py @@ -1,102 +1,64 @@ -from dspy.predict.chain_of_thought import ChainOfThought -from dspy.primitives import Module -from dspy.signatures import InputField, OutputField, Signature - - -class SemanticRecallPrecision(Signature): - """ - Compare a system's response to the ground truth to compute its recall and precision. - If asked to reason, enumerate key ideas in each response, and whether they are present in the other response. - """ - - question: str = InputField() - ground_truth: str = InputField() - system_response: str = InputField() - recall: float = OutputField(desc="fraction (out of 1.0) of ground truth covered by the system response") - precision: float = OutputField(desc="fraction (out of 1.0) of system response covered by the ground truth") - - -class DecompositionalSemanticRecallPrecision(Signature): - """ - Compare a system's response to the ground truth to compute recall and precision of key ideas. - You will first enumerate key ideas in each response, discuss their overlap, and then report recall and precision. - """ - - question: str = InputField() - ground_truth: str = InputField() - system_response: str = InputField() - ground_truth_key_ideas: str = OutputField(desc="enumeration of key ideas in the ground truth") - system_response_key_ideas: str = OutputField(desc="enumeration of key ideas in the system response") - discussion: str = OutputField(desc="discussion of the overlap between ground truth and system response") - recall: float = OutputField(desc="fraction (out of 1.0) of ground truth covered by the system response") - precision: float = OutputField(desc="fraction (out of 1.0) of system response covered by the ground truth") - - -def f1_score(precision, recall): - precision, recall = max(0.0, min(1.0, precision)), max(0.0, min(1.0, recall)) - return 0.0 if precision + recall == 0 else 2 * (precision * recall) / (precision + recall) - - -class SemanticF1(Module): - def __init__(self, threshold=0.66, decompositional=False): - self.threshold = threshold - - if decompositional: - self.module = ChainOfThought(DecompositionalSemanticRecallPrecision) - else: - self.module = ChainOfThought(SemanticRecallPrecision) - - def forward(self, example, pred, trace=None): - scores = self.module(question=example.question, ground_truth=example.response, system_response=pred.response) - score = f1_score(scores.precision, scores.recall) - - return score if trace is None else score >= self.threshold - - - -########### - - -class AnswerCompleteness(Signature): - """ - Estimate the completeness of a system's responses, against the ground truth. - You will first enumerate key ideas in each response, discuss their overlap, and then report completeness. - """ - - question: str = InputField() - ground_truth: str = InputField() - system_response: str = InputField() - ground_truth_key_ideas: str = OutputField(desc="enumeration of key ideas in the ground truth") - system_response_key_ideas: str = OutputField(desc="enumeration of key ideas in the system response") - discussion: str = OutputField(desc="discussion of the overlap between ground truth and system response") - completeness: float = OutputField(desc="fraction (out of 1.0) of ground truth covered by the system response") - - - -class AnswerGroundedness(Signature): - """ - Estimate the groundedness of a system's responses, against real retrieved documents written by people. - You will first enumerate whatever non-trivial or check-worthy claims are made in the system response, and then - discuss the extent to which some or all of them can be deduced from the retrieved context and basic commonsense. - """ - - question: str = InputField() - retrieved_context: str = InputField() - system_response: str = InputField() - system_response_claims: str = OutputField(desc="enumeration of non-trivial or check-worthy claims in the system response") - discussion: str = OutputField(desc="discussion of how supported the claims are by the retrieved context") - groundedness: float = OutputField(desc="fraction (out of 1.0) of system response supported by the retrieved context") - - -class CompleteAndGrounded(Module): - def __init__(self, threshold=0.66): - self.threshold = threshold - self.completeness_module = ChainOfThought(AnswerCompleteness) - self.groundedness_module = ChainOfThought(AnswerGroundedness) - - def forward(self, example, pred, trace=None): - completeness = self.completeness_module(question=example.question, ground_truth=example.response, system_response=pred.response) - groundedness = self.groundedness_module(question=example.question, retrieved_context=pred.context, system_response=pred.response) - score = f1_score(groundedness.groundedness, completeness.completeness) - - return score if trace is None else score >= self.threshold +# dspy.SemanticF1 and dspy.CompleteAndGrounded + +DSPy offers automatic evaluation modules for programmatic assessment of prediction quality based on semantic similarity and information completeness/groundedness. + +## dspy.SemanticF1 + +Measures semantic similarity between a predicted response and the ground truth using LLM-powered scoring. Optionally decomposes both responses to compare reasoning overlap. + +```python +from dspy.evaluate import SemanticF1 +from dspy.datasets import HotPotQA + +dspy.settings.configure(lm=dspy.LM('openai/gpt-4o-mini')) +dataset = HotPotQA(train_seed=2024, train_size=500) +module = dspy.ChainOfThought("question -> response") + +# Initialize metric +metric = SemanticF1(threshold=0.7, decompositional=False) + +score = metric(dataset.train[0], module(dataset.train[0])) +``` + +## dspy.CompleteAndGrounded + +Evaluates both answer completeness (relative to ground truth) and factual groundedness (relative to retrieved evidence), then combines them as an F1 score. + +```python +from dspy.evaluate import CompleteAndGrounded + +metric = CompleteAndGrounded(threshold=0.66) +score = metric(example, module(example)) +``` + +## API Reference + + +::: dspy.SemanticF1 + handler: python + options: + show_source: true + show_root_heading: true + heading_level: 2 + docstring_style: google + show_root_full_path: true + show_object_full_path: false + separate_signature: false + inherited_members: true +::: + + + +::: dspy.CompleteAndGrounded + handler: python + options: + show_source: true + show_root_heading: true + heading_level: 2 + docstring_style: google + show_root_full_path: true + show_object_full_path: false + separate_signature: false + inherited_members: true +::: + diff --git a/dspy/evaluate/metrics.py b/dspy/evaluate/metrics.py index 65ae37fcf7..b9ee64d75d 100644 --- a/dspy/evaluate/metrics.py +++ b/dspy/evaluate/metrics.py @@ -8,25 +8,26 @@ from dspy.dsp.utils.utils import print_message -def EM(prediction, answers_list): # noqa: N802 +def EM(prediction: str, answers_list: list[str]) -> float: # noqa: N802 + """Returns max exact match score between prediction and any reference in answers_list.""" assert isinstance(answers_list, list) - return max(em_score(prediction, ans) for ans in answers_list) -def F1(prediction, answers_list): # noqa: N802 +def F1(prediction: str, answers_list: list[str]) -> float: # noqa: N802 + """Returns maximal token-level F1 between prediction and references.""" assert isinstance(answers_list, list) - return max(f1_score(prediction, ans) for ans in answers_list) -def HotPotF1(prediction, answers_list): # noqa: N802 +def HotPotF1(prediction: str, answers_list: list[str]) -> float: # noqa: N802 + """Returns maximal F1 specifically for HotpotQA-style QA.""" assert isinstance(answers_list, list) - return max(hotpot_f1_score(prediction, ans) for ans in answers_list) -def normalize_text(s): +def normalize_text(s: str) -> str: + """Normalize string by unicode normalization, strip articles and punctuation, and lowercase.""" s = unicodedata.normalize("NFD", s) def remove_articles(text): @@ -45,7 +46,8 @@ def lower(text): return white_space_fix(remove_articles(remove_punc(lower(s)))) -def em_score(prediction, ground_truth): +def em_score(prediction: str, ground_truth: str) -> bool: + """Exact string match after normalization.""" return normalize_text(prediction) == normalize_text(ground_truth) @@ -53,8 +55,10 @@ def em_score(prediction, ground_truth): # See: https://rajpurkar.github.io/SQuAD-explorer/ under Evaluation Script # See: QReCC's - -def f1_score(prediction, ground_truth): +def f1_score(prediction: str, ground_truth: str) -> float: + """Token-level F1 overlap (precision, recall, F1). + Returns 0 if there is no overlap. + """ prediction_tokens = normalize_text(prediction).split() ground_truth_tokens = normalize_text(ground_truth).split() @@ -66,37 +70,38 @@ def f1_score(prediction, ground_truth): print_message("\n#> F1 Metric: Rare edge case of len(prediction_tokens) == len(ground_truth_tokens) == 0.\n") if num_same == 0: - return 0 + return 0.0 precision = 1.0 * num_same / len(prediction_tokens) recall = 1.0 * num_same / len(ground_truth_tokens) f1 = (2 * precision * recall) / (precision + recall) - return f1 -def hotpot_f1_score(prediction, ground_truth): +def hotpot_f1_score(prediction: str, ground_truth: str) -> float: + """HotpotQA F1 with special handling for yes/no answers.""" normalized_prediction = normalize_text(prediction) normalized_ground_truth = normalize_text(ground_truth) if normalized_prediction in ["yes", "no", "noanswer"] and normalized_prediction != normalized_ground_truth: - return 0 + return 0.0 if normalized_ground_truth in ["yes", "no", "noanswer"] and normalized_prediction != normalized_ground_truth: - return 0 + return 0.0 prediction_tokens = normalized_prediction.split() ground_truth_tokens = normalized_ground_truth.split() common = Counter(prediction_tokens) & Counter(ground_truth_tokens) num_same = sum(common.values()) if num_same == 0: - return 0 + return 0.0 precision = 1.0 * num_same / len(prediction_tokens) recall = 1.0 * num_same / len(ground_truth_tokens) f1 = (2 * precision * recall) / (precision + recall) return f1 -def precision_score(prediction, ground_truth): +def precision_score(prediction: str, ground_truth: str) -> float: + """Token-level precision (ignoring recall).""" prediction_tokens = normalize_text(prediction).split() ground_truth_tokens = normalize_text(ground_truth).split() @@ -104,14 +109,11 @@ def precision_score(prediction, ground_truth): num_same = sum(common.values()) if len(prediction_tokens) == len(ground_truth_tokens) == 0: - # Unlike most tasks, QReCC and SQuAD-2.0 assign 1.0 in this edge case. We don't for uniformity. print_message("\n#> Precision Metric: Rare edge case of len(prediction_tokens) == len(ground_truth_tokens) == 0.\n") if num_same == 0: - return 0 - + return 0.0 precision = 1.0 * num_same / len(prediction_tokens) - return precision @@ -129,28 +131,26 @@ def passage_has_answers(passage: str, answers: list[str]) -> bool: return any(passage_has_answers(psg, answers) for psg in passages) -def _answer_match(prediction, answers, frac=1.0): +def _answer_match(prediction: str, answers: list[str], frac: float = 1.0) -> bool: """Returns True if the prediction matches any of the answers.""" - if frac >= 1.0: return EM(prediction, answers) - return F1(prediction, answers) >= frac -def answer_exact_match(example, pred, trace=None, frac=1.0): +def answer_exact_match(example, pred, trace=None, frac: float = 1.0) -> bool: + """Default metric: Checks if gold answer exactly matches model prediction, for str or list[str].""" if isinstance(example.answer, str): return _answer_match(pred.answer, [example.answer], frac=frac) elif isinstance(example.answer, list): return _answer_match(pred.answer, example.answer, frac=frac) - raise ValueError(f"Invalid answer type: {type(example.answer)}") -def answer_passage_match(example, pred, trace=None): +def answer_passage_match(example, pred, trace=None) -> bool: + """For RAG systems, checks if gold answer is present in any retrieved context passage.""" if isinstance(example.answer, str): return _passage_match(pred.context, [example.answer]) elif isinstance(example.answer, list): return _passage_match(pred.context, example.answer) - raise ValueError(f"Invalid answer type: {type(example.answer)}") diff --git a/dspy/predict/refine.py b/dspy/predict/refine.py index 10ec89d115..baddc3463e 100644 --- a/dspy/predict/refine.py +++ b/dspy/predict/refine.py @@ -39,6 +39,26 @@ class OfferFeedback(Signature): class Refine(Module): + """ + Refines a module by running it up to `N` times with different temperatures and returns the best prediction, as defined by the reward_fn, or the first prediction that passes the threshold. After each attempt (except the final one), `Refine` automatically generates detailed feedback about the module's performance and uses this feedback as hints for subsequent runs, creating an iterative refinement process. + + Example: + ```python + import dspy + # Use a chain-of-thought QA module as the base + qa = dspy.ChainOfThought("question -> answer") + # Define a reward function that checks for one-word answers + def one_word_answer(args, pred): + return 1.0 if len(pred.answer.split()) == 1 else 0.0 + # Create the refined module + best_of_3 = dspy.Refine(module=qa, N=3, reward_fn=one_word_answer, threshold=1.0) + # Use the refined module + result = best_of_3(question="What is the capital of Belgium?").answer + # Returns: Brussels + ``` + + By default, `Refine` will try to run the base module up to N times until the threshold is met. If the module encounters an error, it will keep going up to N failed attempts. You can adjust this behavior with the `fail_count` argument to control the number of computation attempts allowed before raising an error. + """ def __init__( self, module: Module, @@ -47,42 +67,6 @@ def __init__( threshold: float, fail_count: int | None = None, ): - """ - Refines a module by running it up to N times with different temperatures and returns the best prediction. - - This module runs the provided module multiple times with varying temperature settings and selects - either the first prediction that exceeds the specified threshold or the one with the highest reward. - If no prediction meets the threshold, it automatically generates feedback to improve future predictions. - - - Args: - module (Module): The module to refine. - N (int): The number of times to run the module. must - reward_fn (Callable): The reward function. - threshold (float): The threshold for the reward function. - fail_count (Optional[int], optional): The number of times the module can fail before raising an error - - Example: - ```python - import dspy - - dspy.settings.configure(lm=dspy.LM("openai/gpt-4o-mini")) - - # Define a QA module with chain of thought - qa = dspy.ChainOfThought("question -> answer") - - # Define a reward function that checks for one-word answers - def one_word_answer(args, pred): - return 1.0 if len(pred.answer.split()) == 1 else 0.0 - - # Create a refined module that tries up to 3 times - best_of_3 = dspy.Refine(module=qa, N=3, reward_fn=one_word_answer, threshold=1.0) - - # Use the refined module - result = best_of_3(question="What is the capital of Belgium?").answer - # Returns: Brussels - ``` - """ self.module = module self.reward_fn = lambda *args: reward_fn(*args) # to prevent this from becoming a parameter self.threshold = threshold diff --git a/dspy/teleprompt/avatar_optimizer.py b/dspy/teleprompt/avatar_optimizer.py index ddba74e5f2..07cba17e88 100644 --- a/dspy/teleprompt/avatar_optimizer.py +++ b/dspy/teleprompt/avatar_optimizer.py @@ -1,225 +1,4 @@ -from concurrent.futures import ThreadPoolExecutor -from copy import deepcopy -from random import sample -from typing import Callable +## 2025-08-14 +### New Features +- **AvatarOptimizer**: Introduced `AvatarOptimizer`, a specialized feedback-driven teleprompter for optimizing tool-using agent programs. It iteratively evaluates the agent on task data, collects positive and negative samples, synthesizes LM-generated feedback, and updates agent instructions to improve performance. Built-in mechanisms for sampling, comparison, and LM-based instruction revision are provided for tool pipelines and multi-step workflows. -from pydantic import BaseModel -from tqdm import tqdm - -import dspy -from dspy.predict.avatar import ActionOutput -from dspy.teleprompt.teleprompt import Teleprompter - -DEFAULT_MAX_EXAMPLES = 10 - - -class EvalResult(BaseModel): - example: dict - score: float - actions: list[ActionOutput] | None = None - - -class Comparator(dspy.Signature): - """After executing the given actions on user inputs using the given instruction, some inputs have yielded good, results, while others have not. I'll provide you the inputs along with their, corresponding evaluation metrics: - -Task: -(1) Firstly, identify and contrast the patterns of inputs that have achieved good results with those that have not. -(2) Then, review the computational logic for any inconsistencies in the previous actions. -(3) Lastly, specify the modification in tools used that can lead to improved performance on the negative inputs.""" - - instruction: str = dspy.InputField( - prefix="Instruction: ", - desc="Instruction for the actor to execute the task", - ) - actions: list[str] = dspy.InputField( - prefix="Actions: ", - desc="Actions actor can take to complete the task", - ) - pos_input_with_metrics: list[EvalResult] = dspy.InputField( - prefix="Positive Inputs: ", - desc="Positive inputs along with their score on a evaluation metric and actions taken", - ) - neg_input_with_metrics: list[EvalResult] = dspy.InputField( - prefix="Negative Inputs: ", - desc="Negative inputs along with their score on a evaluation metric and actions taken", - ) - feedback: str = dspy.OutputField( - prefix="Feedback: ", - desc="Feedback for the actor to improve the performance of negative inputs", - ) - - -class FeedbackBasedInstruction(dspy.Signature): - """There is a task that needs to be completed for which one can use multiple tools to achieve the desired outcome. A group's performance was evaluated on a dataset of inputs, the inputs that did well are positive inputs, and the inputs that did not do well are negative inputs. - -You received feedback on how they can better use the tools to improve your performance on the negative inputs. You have been provided with the previous instruction, that they followed to use tools to complete the task, and the feedback on your performance. - -Your task is to incorporate the feedback and generate a detailed instruction for the group to follow to improve their performance on the task. - -Make sure that the new instruction talks about how to use the tools effectively and should be no more than 3 paragraphs long. The previous instruction contains general guidelines that you must retain in the new instruction.""" - - previous_instruction: str = dspy.InputField( - prefix="Previous Instruction: ", - desc="Previous instruction for the actor to execute the task", - ) - feedback: str = dspy.InputField( - prefix="Feedback: ", - desc="Feedback for the actor to improve the performance of negative inputs", - ) - new_instruction: str = dspy.OutputField( - prefix="New Instruction: ", - desc="New instruction for the actor to execute the task", - ) - - -class AvatarOptimizer(Teleprompter): - def __init__( - self, - metric: Callable, - max_iters: int = 10, - lower_bound: int = 0, - upper_bound: int = 1, - max_positive_inputs: int | None = None, - max_negative_inputs: int | None = None, - optimize_for: str = "max", - ): - assert metric is not None, "`metric` argument cannot be None. Please provide a metric function." - self.metric = metric - self.optimize_for = optimize_for - - self.max_iters = max_iters - - self.lower_bound = lower_bound - self.upper_bound = upper_bound - - self.max_positive_inputs = max_positive_inputs or DEFAULT_MAX_EXAMPLES - self.max_negative_inputs = max_negative_inputs or DEFAULT_MAX_EXAMPLES - - self.comparator = dspy.TypedPredictor(Comparator) - self.feedback_instruction = dspy.Predict(FeedbackBasedInstruction) - - def process_example(self, actor, example, return_outputs): - actor = deepcopy(actor) - - try: - prediction = actor(**example.inputs().toDict()) - score = self.metric(example, prediction) - - if return_outputs: - return example, prediction, score - else: - return score - - except Exception as e: - print(e) - - if return_outputs: - return example, None, 0 - else: - return 0 - - - def thread_safe_evaluator(self, devset, actor, return_outputs=False, num_threads=None): - total_score = 0 - total_examples = len(devset) - results = [] - num_threads = num_threads or dspy.settings.num_threads - - with ThreadPoolExecutor(max_workers=num_threads) as executor: - futures = [executor.submit(self.process_example, actor, example, return_outputs) for example in devset] - - for future in tqdm(futures, total=total_examples, desc="Processing examples"): - result = future.result() - if return_outputs: - example, prediction, score = result - total_score += score - results.append((example, prediction, score)) - else: - total_score += result - - avg_metric = total_score / total_examples - - if return_outputs: - return avg_metric, results - else: - return avg_metric - - - def _get_pos_neg_results( - self, - actor: dspy.Module, - trainset: list[dspy.Example] - ) -> tuple[float, list[EvalResult], list[EvalResult]]: - pos_inputs = [] - neg_inputs = [] - - avg_score, results = self.thread_safe_evaluator(trainset, actor, return_outputs=True) - print(f"Average Score: {avg_score}") - - for example, prediction, score in results: - if score >= self.upper_bound: - pos_inputs.append( - EvalResult( - example=example.inputs().toDict(), - score=score, - actions=prediction.actions if prediction else None - ) - ) - elif score <= self.lower_bound: - neg_inputs.append( - EvalResult( - example=example.inputs().toDict(), - score=score, - actions=prediction.actions if prediction else None - ) - ) - - if len(pos_inputs) == 0: - raise ValueError("No positive examples found, try lowering the upper_bound or providing more training data") - if len(neg_inputs) == 0: - raise ValueError("No negative examples found, try raising the lower_bound or providing more training data") - - return (avg_score, pos_inputs, neg_inputs) - - - def compile(self, student, *, trainset): - best_actor = deepcopy(student) - best_score = -999 if self.optimize_for == "max" else 999 - - for i in range(self.max_iters): - print(20*"=") - print(f"Iteration {i+1}/{self.max_iters}") - - score, pos_inputs, neg_inputs = self._get_pos_neg_results(best_actor, trainset) - print(f"Positive examples: {len(pos_inputs)}") - print(f"Negative examples: {len(neg_inputs)}") - print(f"Sampling {self.max_positive_inputs} positive examples and {self.max_negative_inputs} negative examples") - - if self.max_positive_inputs and len(pos_inputs) > self.max_positive_inputs: - pos_inputs = sample(pos_inputs, self.max_positive_inputs) - - if self.max_negative_inputs and len(neg_inputs) > self.max_negative_inputs: - neg_inputs = sample(neg_inputs, self.max_negative_inputs) - - feedback = self.comparator( - instruction=best_actor.actor.signature.instructions, - actions=[str(tool) for tool in best_actor.tools], - pos_input_with_metrics=pos_inputs, - neg_input_with_metrics=neg_inputs - ).feedback - - new_instruction = self.feedback_instruction( - previous_instruction=best_actor.actor.signature.instructions, - feedback=feedback - ).new_instruction - - print(f"Generated new instruction: {new_instruction}") - - if (self.optimize_for == "max" and best_score < score) or (self.optimize_for == "min" and best_score > score): - best_actor.actor.signature = best_actor.actor.signature.with_instructions(new_instruction) - best_actor.actor_clone = deepcopy(best_actor.actor) - best_score = score - - print(f"Best Actor: {best_actor}") - - return best_actor diff --git a/dspy/teleprompt/bettertogether.py b/dspy/teleprompt/bettertogether.py index d1154f9ae4..72679e26ed 100644 --- a/dspy/teleprompt/bettertogether.py +++ b/dspy/teleprompt/bettertogether.py @@ -1,170 +1,57 @@ -import logging -import random -from typing import Callable +# dspy.BetterTogether -import dspy -from dspy.primitives.example import Example -from dspy.primitives.module import Module -from dspy.teleprompt.bootstrap_finetune import ( - BootstrapFinetune, - all_predictors_have_lms, - kill_lms, - launch_lms, - prepare_student, -) -from dspy.teleprompt.random_search import BootstrapFewShotWithRandomSearch -from dspy.teleprompt.teleprompt import Teleprompter - -logger = logging.getLogger(__name__) - - -class BetterTogether(Teleprompter): - - STRAT_SEP = " -> " - - def __init__(self, - metric: Callable, - prompt_optimizer: Teleprompter | None = None, - weight_optimizer: Teleprompter | None = None, - seed: int | None = None, - ): - if not dspy.settings.experimental: - raise ValueError("This is an experimental optimizer. Set `dspy.settings.experimental` to `True` to use it.") - - # TODO: Note that the BetterTogether optimizer is meaningful when - # BootstrapFinetune uses a metric to filter the training data before - # fine-tuning. However, one can also choose to run this optimizer with - # a BootstrapFinetune without a metric, say, if there aren't labels - # available for the training data. Should this be noted somewhere? - # TODO: We should re-consider if the metric should be required. - self.prompt_optimizer = prompt_optimizer if prompt_optimizer else BootstrapFewShotWithRandomSearch(metric=metric) - self.weight_optimizer = weight_optimizer if weight_optimizer else BootstrapFinetune(metric=metric) - - is_supported_prompt = isinstance(self.prompt_optimizer, BootstrapFewShotWithRandomSearch) - is_supported_weight = isinstance(self.weight_optimizer, BootstrapFinetune) - if not is_supported_prompt or not is_supported_weight: - raise ValueError( - "The BetterTogether optimizer only supports the following optimizers for now: BootstrapFinetune, " - "BootstrapFewShotWithRandomSearch." - ) - - self.rng = random.Random(seed) - - def compile( - self, - student: Module, - trainset: list[Example], - strategy: str = "p -> w -> p", - valset_ratio = 0.1, - ) -> Module: - # TODO: We could record acc on a different valset to pick the best - # strategy within the provided strategy - logger.info("Validating the strategy") - parsed_strategy = strategy.lower().split(self.STRAT_SEP) + +::: dspy.BetterTogether + handler: python + options: + show_source: true + show_root_heading: true + heading_level: 2 + docstring_style: google + show_root_full_path: true + show_object_full_path: false + separate_signature: false + inherited_members: true +::: + - if not all(s in ["p", "w"] for s in parsed_strategy): - raise ValueError( - f"The strategy should be a sequence of 'p' and 'w' separated by '{self.STRAT_SEP}', but " - f"found: {strategy}" - ) +## Overview - logger.info("Preparing the student program...") - # TODO: Prepare student returns student.reset_copy(), which is what gets - # optimized. We should make this clear in the doc comments. - student = prepare_student(student) - all_predictors_have_lms(student) +**BetterTogether** is a DSPy optimizer that coordinates prompt optimization and LM weight finetuning for DSPy programs, typically yielding higher performance than either method alone by alternately applying prompt-based and weight-based optimization strategies. - # Make a shallow copy of the trainset, so that we don't change the order - # of the examples in the original trainset - trainset = trainset[:] - logger.info("Compiling the student program...") - student = self._run_strategies(parsed_strategy, student, trainset, valset_ratio) +This optimizer executes a user-configurable sequence (strategy) of prompt optimization (using, e.g., `BootstrapFewShotWithRandomSearch`) and LM weight finetuning (using `BootstrapFinetune`), shuffling training data and re-initializing program state as appropriate for each phase. BetterTogether supports cross-version model state loading (available since DSPy 3.0.1) to ease transfer, reproducibility, and auditability across major releases. All major DSPy serialization caveats and best practices apply: to ensure programs trained with DSPy 3.X+ load reliably in future versions, always use the latest DSPy save/load calls and check the version warnings. - logger.info("BetterTogether has finished compiling the student program") - return student +## Example Usage - def _run_strategies(self, parsed_strategy, student, trainset, valset_ratio) -> Module: - # Keep track of all the partial strategies/programs in parsed_strategy - # "" corresponds to the initial student program - candidate_programs = [] - candidate_programs.append(("", student)) - launched_flag = False - - for ind, step_code in enumerate(parsed_strategy): - current_strategy = self.STRAT_SEP.join(parsed_strategy[:ind + 1]) - logger.info( - f"\n########## Step {ind + 1} of {len(parsed_strategy)} - Strategy " - f"'{current_strategy}' ##########" - ) - - logger.info("Shuffling the trainset...") - self.rng.shuffle(trainset) - if not launched_flag: - launch_lms(student) - launched_flag = True - - # TODO: Should we reset or just deepcopy? How does resetting affect - # the predictor LMs? - student = student.deepcopy() - student._compiled = False - if step_code == "p": - student = self._compile_prompt_optimizer(student, trainset, valset_ratio) - elif step_code == "w": - student = self._compile_weight_optimizer(student, trainset) - launched_flag = False - - # Record the program corresponding to the current strategy - candidate_programs.append((current_strategy, student)) - - if launched_flag: - kill_lms(student) - - student.candidate_programs = candidate_programs - return student - - def _compile_prompt_optimizer(self, student, trainset, valset_ratio) -> Module: - logger.info("Preparing for prompt optimization...") - - # Sampling a validation set from the trainset for the prompt optimizer - # We drop the hints for prompt optimization - trainset = [x.with_inputs(*list(set(x.inputs().keys()) - {"hint"})) for x in trainset] - num_val = int(valset_ratio * len(trainset)) - prompt_valset = trainset[:num_val] - prompt_trainset = trainset[num_val:] - - # TODO: To make this optimizer general, we need to ensure that all the - # prompt optimizers are accepting a valset or encode a way to check if - # a valset should be passed to an optimizer's compile method. - # TODO: We should ensure that the prompt optimizers in DSPy respect the - # predictor.lm attributes. In particular, - # BootstrapFewShotWithRandomSearch seems to be resetting these. We are - # manually re-setting the LMs here to circumvent this issue, but we - # should consider addressing it in BFRS. - logger.info("Compiling the prompt optimizer...") - pred_lms = [pred.lm for pred in student.predictors()] - student = self.prompt_optimizer.compile(student, trainset=prompt_trainset, valset=prompt_valset) - for pred, lm in zip(student.predictors(), pred_lms, strict=False): - pred.lm = lm +```python +from dspy.teleprompt import BetterTogether +from dspy.teleprompt.random_search import BootstrapFewShotWithRandomSearch +from dspy.teleprompt.bootstrap_finetune import BootstrapFinetune +from dspy import Example - return student +# Create your DSPy program (student) +student = MyDSPyProgram() # custom DSPy module - def _compile_weight_optimizer(self, student, trainset) -> Module: - logger.info("Preparing for weight optimization...") +# Prepare optimizers for prompts and weights +prompt_optimizer = BootstrapFewShotWithRandomSearch(metric=your_metric) +weight_optimizer = BootstrapFinetune(metric=your_metric) - # Saving the LMs before compiling the weight optimizer - original_lms = [pred.lm for pred in student.predictors()] +btt = BetterTogether( + metric=your_metric, + prompt_optimizer=prompt_optimizer, + weight_optimizer=weight_optimizer +) - # TODO: To make this optimizer general, we need to ensure that all the - # prompt optimizers are accepting a valset or encode a way to check if - # a valset should be passed to an optimizer's compile. - logger.info("Compiling the weight optimizer...") - student = self.weight_optimizer.compile(student, trainset=trainset) +optimized_program = btt.compile(student, trainset=[Example(...)], strategy="p -> w -> p") - # Updating the train kwargs for the new LMs. This is needed because the - # train_kwargs of the optimizer is configured for the original LMs. - new_lms = [pred.lm for pred in student.predictors()] - for original_lm, new_lm in zip(original_lms, new_lms, strict=False): - original_params = self.weight_optimizer.train_kwargs[original_lm] - self.weight_optimizer.train_kwargs[new_lm] = original_params +# Save and load optimized program using DSPy 3.X+ +optimized_program.save('my_program.json') +loaded = MyDSPyProgram() +loaded.load('my_program.json') +``` - return student +### Notes +- **DSPy 3.0.1+**: The `load` method now supports forward-compatible loading of programs saved with older versions. +- **Prompts and LM compatibility**: When using alternate strategies (e.g., prompt and weight optimizers), ensure all predictors have their LM assigned before finetuning. Use `your_program.set_lm(your_lm)` to update all predictors. +- **Auditability**: The `candidate_programs` attribute collects a sequence of intermediate optimized programs and is preserved on disk for future loading and analysis. +- **Best Practice**: Always check for version mismatch warnings when loading programs, and re-save old models using new DSPy versions before deploying in production to ensure compatibility. \ No newline at end of file diff --git a/dspy/teleprompt/bootstrap.py b/dspy/teleprompt/bootstrap.py index 35c56eeeb7..4fcfc3b516 100644 --- a/dspy/teleprompt/bootstrap.py +++ b/dspy/teleprompt/bootstrap.py @@ -9,30 +9,8 @@ from .vanilla import LabeledFewShot -# TODO: metrics should return an object with __bool__ basically, but fine if they're more complex. -# They can also be sortable. - -# TODO: Switch here from dspy.dsp.Example to dspy.Example. Right now, it's okay because it's internal only (predictors). -# NOTE: Notice the places where we don't shuffle examples. I do like that this one doesn't shuffle. -# Other ones that consider options may want to use both unshuffled and then shuffle a few times, when -# considering candidates. - -# TODO: the max_rounds via branch_idx to get past the cache, not just temperature. -# In principle, we can also sample multiple outputs from the final generation step -# (or even each step, in case the validation function just wants *one* thing that works, but nah) -# and try them all. Having a pretty solid guess on the "final step" of each example isn't hard by the second round, -# in the sense that we have the trace from the first round. (Yes it may change but that's an edge case that -# won't hurt our "best effort" guarantees.) - -# TODO: When this bootstraps for another teleprompter like finetune, we want all demos we gather. -# But when it's for direct use we may want to sample ONE demo per predictor--example pair. -# This is important for "multi-use" modules. - -# TODO: Add baselines=[...] - logger = logging.getLogger(__name__) - class BootstrapFewShot(Teleprompter): def __init__( self, @@ -44,25 +22,16 @@ def __init__( max_rounds=1, max_errors=None, ): - """A Teleprompter class that composes a set of demos/examples to go into a predictor's prompt. - These demos come from a combination of labeled examples in the training set, and bootstrapped demos. + """A Teleprompter that builds a prompt's demos/examples by combining both labeled data and bootstrapped generations. Args: - metric (Callable): A function that compares an expected value and predicted value, - outputting the result of that comparison. - metric_threshold (float, optional): If the metric yields a numerical value, then check it - against this threshold when deciding whether or not to accept a bootstrap example. - Defaults to None. - teacher_settings (dict, optional): Settings for the `teacher` model. - Defaults to None. - max_bootstrapped_demos (int): Maximum number of bootstrapped demonstrations to include. - Defaults to 4. - max_labeled_demos (int): Maximum number of labeled demonstrations to include. - Defaults to 16. - max_rounds (int): Number of iterations to attempt generating the required bootstrap - examples. If unsuccessful after `max_rounds`, the program ends. Defaults to 1. - max_errors (Optional[int]): Maximum number of errors until program ends. - If ``None``, inherits from ``dspy.settings.max_errors``. + metric (Callable): A function that compares an expected value and predicted value, returning True (accepted demo) or False (rejected) or a float that will be compared to `metric_threshold` if given. + metric_threshold (float, optional): If the metric yields a numerical value, this threshold determines whether a generated example is accepted as a demo. Defaults to None (any True-valued result is accepted). + teacher_settings (dict, optional): Settings for the `teacher` (e.g., alternative LM configuration). Defaults to None. + max_bootstrapped_demos (int): Max number of bootstrapped demos (generated by the teacher) to include. Defaults to 4. + max_labeled_demos (int): Max number of labeled demos (from trainset) to include. Defaults to 16. + max_rounds (int): Number of times to iterate through trainset looking for working bootstraps. Defaults to 1 (single pass). + max_errors (int, optional): Maximum number of errors allowed before halting. If not set, uses dspy.settings.max_errors. Defaults to None. """ self.metric = metric self.metric_threshold = metric_threshold @@ -76,6 +45,9 @@ def __init__( self.error_lock = threading.Lock() def compile(self, student, *, teacher=None, trainset): + """Compiles optimized demos for the student via a mixture of labeled and bootstrapped examples. + Ensures that the student and teacher have compatible predictors and signatures, and handles error tolerance. + """ self.trainset = trainset self._prepare_student_and_teacher(student, teacher) @@ -89,24 +61,20 @@ def compile(self, student, *, teacher=None, trainset): def _prepare_student_and_teacher(self, student, teacher): self.student = student.reset_copy() - - # NOTE: behavior change on Oct 28, 2024. Deep copy instead of reset copy for the student-as-teacher. + # Use deepcopy for teacher to avoid side-effects from sharing the student. self.teacher = teacher.deepcopy() if teacher is not None else student.deepcopy() - assert getattr(self.student, "_compiled", False) is False, "Student must be uncompiled." - if self.max_labeled_demos and getattr(self.teacher, "_compiled", False) is False: teleprompter = LabeledFewShot(k=self.max_labeled_demos) self.teacher = teleprompter.compile(self.teacher.reset_copy(), trainset=self.trainset) def _prepare_predictor_mappings(self): + """Ensures one-to-one mapping of predictors in student and teacher.""" name2predictor, predictor2name = {}, {} student, teacher = self.student, self.teacher - assert len(student.predictors()) == len( teacher.predictors(), ), "Student and teacher must have the same number of predictors." - for (name1, predictor1), (name2, predictor2) in zip( student.named_predictors(), teacher.named_predictors(), strict=False ): @@ -125,76 +93,52 @@ def _prepare_predictor_mappings(self): f"{type(predictor1.signature)} != {type(predictor2.signature)}" ) assert id(predictor1) != id(predictor2), "Student and teacher must be different objects." - - name2predictor[name1] = None # dict(student=predictor1, teacher=predictor2) + name2predictor[name1] = None predictor2name[id(predictor1)] = name1 - - # FIXME(shangyint): This is an ugly hack to bind traces of - # retry.module to retry - # if isinstance(predictor1, Retry): - # predictor2name[id(predictor1.module)] = name1 - predictor2name[id(predictor2)] = name2 - self.name2predictor = name2predictor self.predictor2name = predictor2name def _bootstrap(self, *, max_bootstraps=None): + """Attempts to gather up to `max_bootstraps` examples from the trainset that pass the metric, considering max_rounds and tolerant of up to max_errors before halting.""" max_bootstraps = max_bootstraps or self.max_bootstrapped_demos bootstrap_attempts = 0 - bootstrapped = {} self.name2traces = {name: [] for name in self.name2predictor} - for example_idx, example in enumerate(tqdm.tqdm(self.trainset)): if len(bootstrapped) >= max_bootstraps: break - for round_idx in range(self.max_rounds): bootstrap_attempts += 1 - if self._bootstrap_one_example(example, round_idx): bootstrapped[example_idx] = True break - print( f"Bootstrapped {len(bootstrapped)} full traces after {example_idx} examples " f"for up to {self.max_rounds} rounds, amounting to {bootstrap_attempts} attempts." ) - - # Unbootstrapped training examples - self.validation = [x for idx, x in enumerate(self.trainset) if idx not in bootstrapped] random.Random(0).shuffle(self.validation) - self.validation = self.validation - # NOTE: Can't yet use evaluate because we need to trace *per example* - # evaluate = Evaluate(program=self.teacher, metric=self.metric, num_threads=12) - # score = evaluate(self.metric, display_table=False, display_progress=True) - def _bootstrap_one_example(self, example, round_idx=0): + """Evaluates a single example for candidate demos, including error tolerance and collecting passes per predictor.""" name2traces = {} teacher = self.teacher predictor_cache = {} - try: with dspy.settings.context(trace=[], **self.teacher_settings): lm = dspy.settings.lm lm = lm.copy(temperature=0.7 + 0.001 * round_idx) if round_idx > 0 else lm new_settings = {"lm": lm} if round_idx > 0 else {} - with dspy.settings.context(**new_settings): for name, predictor in teacher.named_predictors(): predictor_cache[name] = predictor.demos predictor.demos = [x for x in predictor.demos if x != example] - prediction = teacher(**example.inputs()) trace = dspy.settings.trace - for name, predictor in teacher.named_predictors(): predictor.demos = predictor_cache[name] - if self.metric: metric_val = self.metric(example, prediction, trace) if self.metric_threshold: @@ -212,54 +156,33 @@ def _bootstrap_one_example(self, example, round_idx=0): if current_error_count >= effective_max_errors: raise e logger.error(f"Failed to run or to evaluate example {example} with {self.metric} due to {e}.") - if success: for step in trace: predictor, inputs, outputs = step demo = dspy.Example(augmented=True, **inputs, **outputs) - try: predictor_name = self.predictor2name[id(predictor)] except KeyError: continue # FIXME: ! - - # # TODO: Look closer into this. It's a bit tricky to reproduce. - # print(f"Failed to find predictor {predictor} in {self.predictor2name}.") - # print( - # "Are you doing this in a notebook (Jupyter)? This might be caused by redefining values by rerunning cells.", - # ) - # print("Try restarting the notebook, or open an issue.") - # raise KeyError( - # f"Failed to find predictor {id(predictor)} {predictor} in {self.predictor2name}.", - # ) from e - name2traces[predictor_name] = name2traces.get(predictor_name, []) name2traces[predictor_name].append(demo) - - # Update the traces for name, demos in name2traces.items(): # If there are multiple traces for the same predictor in the sample example, # sample 50/50 from the first N-1 traces or the last trace. if len(demos) > 1: from dspy.utils.hasher import Hasher - rng = random.Random(Hasher.hash(tuple(demos))) demos = [rng.choice(demos[:-1]) if rng.random() < 0.5 else demos[-1]] self.name2traces[name].extend(demos) - return success def _train(self): rng = random.Random(0) raw_demos = self.validation - for name, predictor in self.student.named_predictors(): augmented_demos = self.name2traces[name][: self.max_bootstrapped_demos] - sample_size = min(self.max_labeled_demos - len(augmented_demos), len(raw_demos)) sample_size = max(0, sample_size) - raw_demos = rng.sample(raw_demos, sample_size) predictor.demos = augmented_demos + raw_demos - return self.student diff --git a/dspy/teleprompt/copro_optimizer.py b/dspy/teleprompt/copro_optimizer.py index e0f4f71749..006d02e1a4 100644 --- a/dspy/teleprompt/copro_optimizer.py +++ b/dspy/teleprompt/copro_optimizer.py @@ -9,30 +9,13 @@ logger = logging.getLogger(__name__) """ -USAGE SUGGESTIONS: - -The following code can be used to compile a optimized signature teleprompter, and evaluate it on an end task: - -teleprompter = COPRO(prompt_model=prompt_model, metric=metric, breadth=BREADTH, depth=DEPTH, init_temperature=INIT_TEMPERATURE) -kwargs = dict(num_threads=NUM_THREADS, display_progress=True, display_table=0) -compiled_prompt_opt = teleprompter.compile(program.deepcopy(), trainset=trainset[:DEV_NUM], eval_kwargs=kwargs) -eval_score = evaluate(compiled_prompt_opt, devset=evalset[:EVAL_NUM], **kwargs) - -Note that this teleprompter takes in the following parameters: - -* prompt_model: The model used for prompt generation. When unspecified, defaults to the model set in settings (ie. dspy.settings.configure(lm=task_model)). -* metric: The task metric used for optimization. -* breadth: The number of new prompts to generate at each iteration. Default=10. -* depth: The number of times we should ask our prompt model to generate new prompts, with the history of the past prompts as input. Default=3. -* init_temperature: The temperature used to generate new prompts. Higher roughly equals more creative. Default=1.4. -* track_stats: Tells the method whether or not to track statistics about the optimization process. - If True, the method will track the following statistics: - * results_best: The min,max,avg,stddev of top 10 scores for each predictor at each depth. - * results_latest: The min,max,avg,stddev of newest prompt scores for each predictor at each depth. - * total_calls: The total number of calls to the task metric. - These statistics will be returned as attributes of the best program. -""" +COPRO (Collaborative Prompt Optimization) is a DSPy optimizer that iteratively improves module instructions for large language models by generating new candidate instructions, evaluating their performance, and selecting the highest-scoring versions. It supports prompt generation through a prompt model, configurable breadth & depth (number of candidates & rounds), and tracks optimization statistics if enabled. This optimizer is useful when explicit iterative prompt engineering yields improvements with smaller models. +Example: + from dspy.teleprompt import COPRO + teleprompter = COPRO(prompt_model=prompt_model, metric=metric, breadth=BREADTH, depth=DEPTH, init_temperature=1.4) + optimized_program = teleprompter.compile(my_program, trainset=my_trainset, eval_kwargs={...}) +""" class BasicGenerateInstruction(Signature): """You are an instruction optimizer for large language models. I will give you a ``signature`` of fields (inputs and outputs) in English. Your task is to propose an instruction that will lead a good language model to perform the task well. Don't be afraid to be creative.""" diff --git a/dspy/teleprompt/gepa/gepa.py b/dspy/teleprompt/gepa/gepa.py index a4ea0878c3..b2e9d73fa5 100644 --- a/dspy/teleprompt/gepa/gepa.py +++ b/dspy/teleprompt/gepa/gepa.py @@ -163,7 +163,7 @@ def metric( pred_name: Optional[str] = None, pred_trace: Optional[DSPyTrace] = None, ) -> float | ScoreWithFeedback: - \""" + """ This function is called with the following arguments: - gold: The gold example. - pred: The predicted output. @@ -181,7 +181,7 @@ def metric( (using just the gold, pred and trace). If no feedback is returned, GEPA will use a simple text feedback consisting of just the score: f"This trajectory got a score of {score}." - \""" + """ ... ``` diff --git a/dspy/teleprompt/gepa/gepa_utils.py b/dspy/teleprompt/gepa/gepa_utils.py index 95d1fc17f4..1302902b6c 100644 --- a/dspy/teleprompt/gepa/gepa_utils.py +++ b/dspy/teleprompt/gepa/gepa_utils.py @@ -1,255 +1,57 @@ -import logging -import random -from typing import Any, Callable, Protocol - -from gepa import EvaluationBatch, GEPAAdapter - -from dspy.adapters.chat_adapter import ChatAdapter -from dspy.adapters.types import History -from dspy.evaluate import Evaluate -from dspy.primitives import Example, Prediction -from dspy.teleprompt.bootstrap_trace import TraceData - - -class LoggerAdapter: - def __init__(self, logger: logging.Logger): - self.logger = logger - - def log(self, x: str): - self.logger.info(x) - -DSPyTrace = list[tuple[Any, dict[str, Any], Prediction]] - -class ScoreWithFeedback(Prediction): - score: float - feedback: str - -class PredictorFeedbackFn(Protocol): - def __call__( - predictor_output: dict[str, Any], - predictor_inputs: dict[str, Any], - module_inputs: Example, - module_outputs: Prediction, - captured_trace: DSPyTrace, - ) -> ScoreWithFeedback: - """ - This function is used to provide feedback to a specific predictor. - The function is called with the following arguments: - - predictor_output: The output of the predictor. - - predictor_inputs: The inputs to the predictor. - - module_inputs: The inputs to the whole program --- `Example`. - - module_outputs: The outputs of the whole program --- `Prediction`. - - captured_trace: The trace of the module's execution. - # Shape of trace is: [predictor_invocation_idx -> Tuple[Predictor, PredictorInputs, Prediction]] - # Each trace is a tuple of (Predictor, PredictorInputs, Prediction) - - The function should return a `ScoreWithFeedback` object. - The feedback is a string that is used to guide the evolution of the predictor. - """ - ... - -class DspyAdapter(GEPAAdapter[Example, TraceData, Prediction]): - def __init__( - self, - student_module, - metric_fn: Callable, - feedback_map: dict[str, Callable], - failure_score=0.0, - num_threads: int | None = None, - add_format_failure_as_feedback: bool = False, - rng: random.Random | None = None, - ): - self.student = student_module - self.metric_fn = metric_fn - self.feedback_map = feedback_map - self.failure_score = failure_score - self.num_threads = num_threads - self.add_format_failure_as_feedback = add_format_failure_as_feedback - self.rng = rng or random.Random(0) - - # Cache predictor names/signatures - self.named_predictors = list(self.student.named_predictors()) - - def build_program(self, candidate: dict[str, str]): - new_prog = self.student.deepcopy() - for name, pred in new_prog.named_predictors(): - if name in candidate: - pred.signature = pred.signature.with_instructions(candidate[name]) - return new_prog - - def evaluate(self, batch, candidate, capture_traces=False): - program = self.build_program(candidate) - - if capture_traces: - # bootstrap_trace_data-like flow with trace capture - from dspy.teleprompt.bootstrap_trace import bootstrap_trace_data - trajs = bootstrap_trace_data( - program=program, - dataset=batch, - metric=self.metric_fn, - num_threads=self.num_threads, - raise_on_error=False, - capture_failed_parses=True, - failure_score=self.failure_score, - format_failure_score=self.failure_score, - ) - scores = [] - outputs = [] - for t in trajs: - outputs.append(t["prediction"]) - if hasattr(t["prediction"], "__class__") and t.get("score") is None: - scores.append(self.failure_score) - else: - score = t["score"] - if hasattr(score, "score"): - score = score["score"] - scores.append(score) - return EvaluationBatch(outputs=outputs, scores=scores, trajectories=trajs) - else: - evaluator = Evaluate( - devset=batch, - metric=self.metric_fn, - num_threads=self.num_threads, - return_all_scores=True, - failure_score=self.failure_score, - provide_traceback=True, - max_errors=len(batch) * 100 - ) - res = evaluator(program) - outputs = [r[1] for r in res.results] - scores = [r[2] for r in res.results] - scores = [s["score"] if hasattr(s, "score") else s for s in scores] - return EvaluationBatch(outputs=outputs, scores=scores, trajectories=None) - - def make_reflective_dataset(self, candidate, eval_batch, components_to_update): - from dspy.teleprompt.bootstrap_trace import FailedPrediction - program = self.build_program(candidate) - - ret_d: dict[str, list[dict[str, Any]]] = {} - for pred_name in components_to_update: - module = None - for name, m in program.named_predictors(): - if name == pred_name: - module = m - break - assert module is not None - - items: list[dict[str, Any]] = [] - for data in eval_batch.trajectories or []: - trace = data["trace"] - example = data["example"] - prediction = data["prediction"] - module_score = data["score"] - if hasattr(module_score, "score"): - module_score = module_score["score"] - - trace_instances = [t for t in trace if t[0].signature.equals(module.signature)] - if not self.add_format_failure_as_feedback: - trace_instances = [t for t in trace_instances if not isinstance(t[2], FailedPrediction)] - if len(trace_instances) == 0: - continue - - selected = None - for t in trace_instances: - if isinstance(t[2], FailedPrediction): - selected = t - break - - if selected is None: - if isinstance(prediction, FailedPrediction): - continue - selected = self.rng.choice(trace_instances) - - inputs = selected[1] - outputs = selected[2] - - new_inputs = {} - new_outputs = {} - - contains_history = False - history_key_name = None - for input_key, input_val in inputs.items(): - if isinstance(input_val, History): - contains_history = True - assert history_key_name is None - history_key_name = input_key - - if contains_history: - s = "```json\n" - for i, message in enumerate(inputs[history_key_name].messages): - s += f" {i}: {message}\n" - s += "```" - new_inputs["Context"] = s - - for input_key, input_val in inputs.items(): - if contains_history and input_key == history_key_name: - continue - new_inputs[input_key] = str(input_val) - - if isinstance(outputs, FailedPrediction): - s = "Couldn't parse the output as per the expected output format. The model's raw response was:\n" - s += "```\n" - s += outputs.completion_text + "\n" - s += "```\n\n" - new_outputs = s - else: - for output_key, output_val in outputs.items(): - new_outputs[output_key] = str(output_val) - - d = {"Inputs": new_inputs, "Generated Outputs": new_outputs} - if isinstance(outputs, FailedPrediction): - adapter = ChatAdapter() - structure_instruction = "" - for dd in adapter.format(module.signature, [], {}): - structure_instruction += dd["role"] + ": " + dd["content"] + "\n" - d["Feedback"] = "Your output failed to parse. Follow this structure:\n" + structure_instruction - # d['score'] = self.failure_score - else: - feedback_fn = self.feedback_map[pred_name] - fb = feedback_fn( - predictor_output=outputs, - predictor_inputs=inputs, - module_inputs=example, - module_outputs=prediction, - captured_trace=trace, - ) - d["Feedback"] = fb["feedback"] - assert fb["score"] == module_score, f"Currently, GEPA only supports feedback functions that return the same score as the module's score. However, the module-level score is {module_score} and the feedback score is {fb.score}." - # d['score'] = fb.score - items.append(d) - - if len(items) == 0: - # raise Exception(f"No valid predictions found for module {module.signature}.") - continue - ret_d[pred_name] = items - - if len(ret_d) == 0: - raise Exception("No valid predictions found for any module.") - - return ret_d - - # TODO: The current DSPyAdapter implementation uses the GEPA default propose_new_texts. - # We can potentially override this, to use the instruction proposal similar to MIPROv2. - - # def propose_new_texts( - # self, - # candidate: Dict[str, str], - # reflective_dataset: Dict[str, List[Dict[str, Any]]], - # components_to_update: List[str] - # ) -> Dict[str, str]: - # if self.adapter.propose_new_texts is not None: - # return self.adapter.propose_new_texts(candidate, reflective_dataset, components_to_update) - - # from .instruction_proposal import InstructionProposalSignature - # new_texts: Dict[str, str] = {} - # for name in components_to_update: - # base_instruction = candidate[name] - # dataset_with_feedback = reflective_dataset[name] - # new_texts[name] = InstructionProposalSignature.run( - # lm=self.reflection_lm, - # input_dict={ - # "current_instruction_doc": base_instruction, - # "dataset_with_feedback": dataset_with_feedback - # } - # )['new_instruction'] - # return new_texts +# DSPy GEPA Adapter Utilities + +This module contains the DSPy/GEPA adapter and reflection infrastructure for using the GEPA prompt optimizer in DSPy. It enables tight integration between GEPA's evolutionary search, feedback-driven mutation, and DSPy module/trace introspection. + +## LoggerAdapter + +A thin logger adapter that bridges Python logging with the GEPA logging system. Pass a standard Python `logging.Logger` and automatically forwards log messages to DSPy logging. + +## ScoreWithFeedback + +Subclass of `dspy.Prediction` for feedback-based GEPA metrics. Users can return a ScoreWithFeedback object (or dict with 'score' and 'feedback') from their metric function to enable richer feedback-driven optimization. + +``` +score_with_feedback = ScoreWithFeedback(score=0.8, feedback="This output is fluent but factually inaccurate.") +``` + +## DSPyTrace + +A DSPy execution trace is a list of (Predictor, inputs, Prediction) tuples, representing the invocation sequence, per-iteration arguments, and predictions seen in a module run. Used for both feedback extraction and program mutation. + +## PredictorFeedbackFn and GEPA Feedback Metrics + +Users define a per-predictor feedback mapping from module names to functions with this signature: + +``` +def my_feedback_fn(predictor_output, predictor_inputs, module_inputs, module_outputs, captured_trace): + # ... compute score and textual feedback string ... + return dict(score=..., feedback=...) +``` + +For multi-module programs, create a feedback function per predictor. GEPA will call this function on every invocation to provide detailed, module-specific evolution feedback, including the current execution context. + +## DspyAdapter + +`DspyAdapter` subclasses `GEPAAdapter` and provides these core extension points for GEPA/DSPy integration: + +- **build_program:** Converts a candidate dict (module_name -> instruction_text) to an actual module with that configuration. +- **evaluate:** Runs evaluation on a batch, either logging detailed traces (if `capture_traces=True`) or fast aggregated scoring. +- **make_reflective_dataset:** For reflection, compiles a set of (Inputs, Outputs, Feedback) examples for each module, by slicing captured traces, collecting detailed error info, and aggregating textual feedback. + +### Example: Using DspyAdapter with GEPA + +```python +import gepa +from dspy.teleprompt.gepa.gepa_utils import DspyAdapter + +# Define the feedback map (per predictor/module) +feedback_map = { "my_predictor": my_feedback_fn } + +# Initialize and use DspyAdapter +adapter = DspyAdapter(student_module=my_dspy_module, metric_fn=my_feedback_metric, feedback_map=feedback_map) +# GEPA will internally use this DspyAdapter for program evolution, mutation, and evaluation +``` + +## Extensibility + +You can extend DspyAdapter to customize how new candidate programs are constructed, traces are batched, batch evaluation is processed, or how reflection information is gathered and formatted for GEPA's prompt proposals. The adapter decouples DSPy's signature/trace/data model from GEPA's evolutionary logic while exposing all internals for fine control, supporting advanced workflows. diff --git a/dspy/teleprompt/infer_rules.py b/dspy/teleprompt/infer_rules.py index 21ed154da4..4c4e942b68 100644 --- a/dspy/teleprompt/infer_rules.py +++ b/dspy/teleprompt/infer_rules.py @@ -1,150 +1,31 @@ -import logging -import random - -import numpy as np - -import dspy -from dspy.evaluate.evaluate import Evaluate -from dspy.teleprompt import BootstrapFewShot - -logger = logging.getLogger(__name__) - - -class InferRules(BootstrapFewShot): - def __init__(self, num_candidates=10, num_rules=10, num_threads=None, teacher_settings=None, **kwargs): - super().__init__(teacher_settings=teacher_settings, **kwargs) - - self.num_candidates = num_candidates - self.num_rules = num_rules - self.num_threads = num_threads - self.rules_induction_program = RulesInductionProgram(num_rules, teacher_settings=teacher_settings) - self.metric = kwargs.get("metric") - self.max_errors = kwargs.get("max_errors") - - def compile(self, student, *, teacher=None, trainset, valset=None): - if valset is None: - train_size = int(0.5 * len(trainset)) - trainset, valset = trainset[:train_size], trainset[train_size:] - - super().compile(student, teacher=teacher, trainset=trainset) - - original_program = self.student.deepcopy() - all_predictors = [p for p in original_program.predictors() if hasattr(p, "signature")] - instructions_list = [p.signature.instructions for p in all_predictors] - - best_score = -np.inf - best_program = None - - for candidate_idx in range(self.num_candidates): - candidate_program = original_program.deepcopy() - candidate_predictors = [p for p in candidate_program.predictors() if hasattr(p, "signature")] - - for i, predictor in enumerate(candidate_predictors): - predictor.signature.instructions = instructions_list[i] - - for i, predictor in enumerate(candidate_predictors): - rules = self.induce_natural_language_rules(predictor, trainset) - predictor.signature.instructions = instructions_list[i] - self.update_program_instructions(predictor, rules) - - score = self.evaluate_program(candidate_program, valset) - - if score > best_score: - best_score = score - best_program = candidate_program - - logger.info(f"Evaluated Candidate {candidate_idx + 1} with score {score}. Current best score: {best_score}") - - logger.info(f"Final best score: {best_score}") - - return best_program - - def induce_natural_language_rules(self, predictor, trainset): - demos = self.get_predictor_demos(trainset, predictor) - signature = predictor.signature - while True: - examples_text = self.format_examples(demos, signature) - try: - return self.rules_induction_program(examples_text) - except Exception as e: - assert ( - isinstance(e, ValueError) - or e.__class__.__name__ == "BadRequestError" - or "ContextWindowExceededError" in str(e) - ) - if len(demos) > 1: - demos = demos[:-1] - else: - raise RuntimeError( - "Failed to generate natural language rules since a single example couldn't fit in the model's " - "context window." - ) from e - - def update_program_instructions(self, predictor, natural_language_rules): - predictor.signature.instructions = ( - f"{predictor.signature.instructions}\n\n" - f"Please adhere to the following rules when making your prediction:\n{natural_language_rules}" - ) - - def format_examples(self, demos, signature): - examples_text = "" - for demo in demos: - input_fields = {k: v for k, v in demo.items() if k in signature.input_fields} - output_fields = {k: v for k, v in demo.items() if k in signature.output_fields} - input_text = "\n".join(f"{k}: {v}" for k, v in input_fields.items()) - output_text = "\n".join(f"{k}: {v}" for k, v in output_fields.items()) - examples_text += f"Input Fields:\n{input_text}\n\n=========\nOutput Fields:\n{output_text}\n\n" - return examples_text - - def get_predictor_demos(self, trainset, predictor): - # TODO: Consider how this handled "incomplete" demos. - signature = predictor.signature - return [ - { - key: value - for key, value in example.items() - if key in signature.input_fields or key in signature.output_fields - } - for example in trainset - ] - - def evaluate_program(self, program, dataset): - effective_max_errors = ( - self.max_errors if self.max_errors is not None else dspy.settings.max_errors - ) - evaluate = Evaluate( - devset=dataset, - metric=self.metric, - num_threads=self.num_threads, - max_errors=effective_max_errors, - display_table=False, - display_progress=True, - ) - score = evaluate(program, metric=self.metric).score - return score - - -class RulesInductionProgram(dspy.Module): - def __init__(self, num_rules, teacher_settings=None): - super().__init__() - - class CustomRulesInduction(dspy.Signature): - __doc__ = ( - f"Given a set of examples, extract a list of {num_rules} concise and non-redundant natural language " - "rules that provide clear guidance for performing the task. All rules should be actionable for a " - "well-specified scope of examples of this general kind of task." - ) - examples_text = dspy.InputField(desc="Text containing examples") - natural_language_rules = dspy.OutputField(desc="Induced natural language rules") - - self.rules_induction = dspy.ChainOfThought(CustomRulesInduction) - self.teacher_settings = teacher_settings or {} - self.rng = random.Random(0) - - def forward(self, examples_text): - with dspy.settings.context(**self.teacher_settings): - lm = dspy.settings.lm.copy(temperature=self.rng.uniform(0.9, 1.0)) - with dspy.settings.context(lm=lm): - rules = self.rules_induction(examples_text=examples_text).natural_language_rules - - return rules.strip() +# dspy.InferRules + + +::: dspy.InferRules + handler: python + options: + show_source: true + show_root_heading: true + heading_level: 2 + docstring_style: google + show_root_full_path: true + show_object_full_path: false + separate_signature: false + inherited_members: true +::: + + +## Example Usage + +```python +from dspy.teleprompt import InferRules + +# Create the optimizer +optimizer = InferRules(metric=your_metric, num_candidates=10, num_rules=5) +optimized_program = optimizer.compile(your_program, trainset=your_trainset) + +# Save the optimized program +optimized_program.save("optimized_program.json") +``` + +**InferRules** is a DSPy optimizer that induces natural language rules from program demonstrations using a language model. It augments module instructions based on these inferred rules and then evaluates the updated program to select the best candidate. This optimizer is useful for scenarios where explicit operational rules can improve program performance on target tasks. \ No newline at end of file diff --git a/dspy/teleprompt/mipro_optimizer_v2.py b/dspy/teleprompt/mipro_optimizer_v2.py index 360a096117..8dd1be3e23 100644 --- a/dspy/teleprompt/mipro_optimizer_v2.py +++ b/dspy/teleprompt/mipro_optimizer_v2.py @@ -109,15 +109,13 @@ def compile( view_data_batch_size: int = 10, tip_aware_proposer: bool = True, fewshot_aware_proposer: bool = True, - requires_permission_to_run: bool | None = None, # deprecated + requires_permission_to_run: bool | None = None, # deprecated, ignored provide_traceback: bool | None = None, ) -> Any: - if requires_permission_to_run == False: + if requires_permission_to_run is not None: logger.warning( - "'requires_permission_to_run' is deprecated and will be removed in a future version." + "The 'requires_permission_to_run' argument is deprecated and ignored in MIPROv2. User confirmation is no longer performed." ) - elif requires_permission_to_run == True: - raise ValueError("User confirmation is removed from MIPROv2. Please remove the 'requires_permission_to_run' argument.") effective_max_errors = ( self.max_errors @@ -213,570 +211,4 @@ def compile( return best_program - def _set_random_seeds(self, seed): - self.rng = random.Random(seed) - np.random.seed(seed) - - def _set_num_trials_from_num_candidates(self, program, zeroshot_opt, num_candidates): - num_vars = len(program.predictors()) - if not zeroshot_opt: - num_vars *= 2 # Account for few-shot examples + instruction variables - # Trials = MAX(c*M*log(N), c=2, 3/2*N) - num_trials = int(max(2 * num_vars * np.log2(num_candidates), 1.5 * num_candidates)) - - return num_trials - - def _set_hyperparams_from_run_mode( - self, - program: Any, - num_trials: int, - minibatch: bool, - zeroshot_opt: bool, - valset: list, - ) -> tuple[int, list, bool]: - if self.auto is None: - return num_trials, valset, minibatch - - auto_settings = AUTO_RUN_SETTINGS[self.auto] - - valset = create_minibatch(valset, batch_size=auto_settings["val_size"], rng=self.rng) - minibatch = len(valset) > MIN_MINIBATCH_SIZE - - # Set num instruct candidates to 1/2 of N if optimizing with few-shot examples, otherwise set to N - # This is because we've found that it's generally better to spend optimization budget on few-shot examples - # When they are allowed. - self.num_instruct_candidates = auto_settings["n"] if zeroshot_opt else int(auto_settings["n"] * 0.5) - self.num_fewshot_candidates = auto_settings["n"] - - num_trials = self._set_num_trials_from_num_candidates(program, zeroshot_opt, auto_settings["n"]) - - return num_trials, valset, minibatch - - def _set_and_validate_datasets(self, trainset: list, valset: list | None): - if not trainset: - raise ValueError("Trainset cannot be empty.") - - if valset is None: - if len(trainset) < 2: - raise ValueError("Trainset must have at least 2 examples if no valset specified.") - valset_size = min(1000, max(1, int(len(trainset) * 0.80))) - cutoff = len(trainset) - valset_size - valset = trainset[cutoff:] - trainset = trainset[:cutoff] - else: - if len(valset) < 1: - raise ValueError("Validation set must have at least 1 example.") - - return trainset, valset - - def _print_auto_run_settings(self, num_trials: int, minibatch: bool, valset: list): - logger.info( - f"\nRUNNING WITH THE FOLLOWING {self.auto.upper()} AUTO RUN SETTINGS:" - f"\nnum_trials: {num_trials}" - f"\nminibatch: {minibatch}" - f"\nnum_fewshot_candidates: {self.num_fewshot_candidates}" - f"\nnum_instruct_candidates: {self.num_instruct_candidates}" - f"\nvalset size: {len(valset)}\n" - ) - - def _estimate_lm_calls( - self, - program: Any, - num_trials: int, - minibatch: bool, - minibatch_size: int, - minibatch_full_eval_steps: int, - valset: list, - program_aware_proposer: bool, - ) -> tuple[str, str]: - num_predictors = len(program.predictors()) - - # Estimate prompt model calls - estimated_prompt_model_calls = ( - 10 # Data summarizer calls - + self.num_instruct_candidates * num_predictors # Candidate generation - + (num_predictors + 1 if program_aware_proposer else 0) # Program-aware proposer - ) - prompt_model_line = ( - f"{YELLOW}- Prompt Generation: {BLUE}{BOLD}10{ENDC}{YELLOW} data summarizer calls + " - f"{BLUE}{BOLD}{self.num_instruct_candidates}{ENDC}{YELLOW} * " - f"{BLUE}{BOLD}{num_predictors}{ENDC}{YELLOW} lm calls in program " - f"+ ({BLUE}{BOLD}{num_predictors + 1}{ENDC}{YELLOW}) lm calls in program-aware proposer " - f"= {BLUE}{BOLD}{estimated_prompt_model_calls}{ENDC}{YELLOW} prompt model calls{ENDC}" - ) - - # Estimate task model calls - if not minibatch: - estimated_task_model_calls = len(valset) * num_trials - task_model_line = ( - f"{YELLOW}- Program Evaluation: {BLUE}{BOLD}{len(valset)}{ENDC}{YELLOW} examples in val set * " - f"{BLUE}{BOLD}{num_trials}{ENDC}{YELLOW} batches = " - f"{BLUE}{BOLD}{estimated_task_model_calls}{ENDC}{YELLOW} LM program calls{ENDC}" - ) - else: - full_eval_steps = num_trials // minibatch_full_eval_steps + 1 - estimated_task_model_calls = minibatch_size * num_trials + len(valset) * full_eval_steps - task_model_line = ( - f"{YELLOW}- Program Evaluation: {BLUE}{BOLD}{minibatch_size}{ENDC}{YELLOW} examples in minibatch * " - f"{BLUE}{BOLD}{num_trials}{ENDC}{YELLOW} batches + " - f"{BLUE}{BOLD}{len(valset)}{ENDC}{YELLOW} examples in val set * " - f"{BLUE}{BOLD}{full_eval_steps}{ENDC}{YELLOW} full evals = " - f"{BLUE}{BOLD}{estimated_task_model_calls}{ENDC}{YELLOW} LM Program calls{ENDC}" - ) - - return prompt_model_line, task_model_line - - def _bootstrap_fewshot_examples(self, program: Any, trainset: list, seed: int, teacher: Any) -> list | None: - logger.info("\n==> STEP 1: BOOTSTRAP FEWSHOT EXAMPLES <==") - if self.max_bootstrapped_demos > 0: - logger.info( - "These will be used as few-shot example candidates for our program and for creating instructions.\n" - ) - else: - logger.info("These will be used for informing instruction proposal.\n") - - logger.info(f"Bootstrapping N={self.num_fewshot_candidates} sets of demonstrations...") - - zeroshot = self.max_bootstrapped_demos == 0 and self.max_labeled_demos == 0 - - # try: - effective_max_errors = ( - self.max_errors if self.max_errors is not None else dspy.settings.max_errors - ) - - demo_candidates = create_n_fewshot_demo_sets( - student=program, - num_candidate_sets=self.num_fewshot_candidates, - trainset=trainset, - max_labeled_demos=(LABELED_FEWSHOT_EXAMPLES_IN_CONTEXT if zeroshot else self.max_labeled_demos), - max_bootstrapped_demos=( - BOOTSTRAPPED_FEWSHOT_EXAMPLES_IN_CONTEXT if zeroshot else self.max_bootstrapped_demos - ), - metric=self.metric, - max_errors=effective_max_errors, - teacher=teacher, - teacher_settings=self.teacher_settings, - seed=seed, - metric_threshold=self.metric_threshold, - rng=self.rng, - ) - # NOTE: Bootstrapping is essential to MIPRO! - # Failing silently here makes the rest of the optimization far weaker as a result! - # except Exception as e: - # logger.info(f"!!!!\n\n\n\n\nError generating few-shot examples: {e}") - # logger.info("Running without few-shot examples.!!!!\n\n\n\n\n") - # demo_candidates = None - - return demo_candidates - - def _propose_instructions( - self, - program: Any, - trainset: list, - demo_candidates: list | None, - view_data_batch_size: int, - program_aware_proposer: bool, - data_aware_proposer: bool, - tip_aware_proposer: bool, - fewshot_aware_proposer: bool, - ) -> dict[int, list[str]]: - logger.info("\n==> STEP 2: PROPOSE INSTRUCTION CANDIDATES <==") - logger.info( - "We will use the few-shot examples from the previous step, a generated dataset summary, a summary of the program code, and a randomly selected prompting tip to propose instructions." - ) - - proposer = GroundedProposer( - program=program, - trainset=trainset, - prompt_model=self.prompt_model, - view_data_batch_size=view_data_batch_size, - program_aware=program_aware_proposer, - use_dataset_summary=data_aware_proposer, - use_task_demos=fewshot_aware_proposer, - num_demos_in_context=BOOTSTRAPPED_FEWSHOT_EXAMPLES_IN_CONTEXT, - use_tip=tip_aware_proposer, - set_tip_randomly=tip_aware_proposer, - use_instruct_history=False, - set_history_randomly=False, - verbose=self.verbose, - rng=self.rng, - ) - - logger.info(f"\nProposing N={self.num_instruct_candidates} instructions...\n") - instruction_candidates = proposer.propose_instructions_for_program( - trainset=trainset, - program=program, - demo_candidates=demo_candidates, - N=self.num_instruct_candidates, - T=self.init_temperature, - trial_logs={}, - ) - - for i, pred in enumerate(program.predictors()): - logger.info(f"Proposed Instructions for Predictor {i}:\n") - instruction_candidates[i][0] = get_signature(pred).instructions - for j, instruction in enumerate(instruction_candidates[i]): - logger.info(f"{j}: {instruction}\n") - logger.info("\n") - - return instruction_candidates - - def _optimize_prompt_parameters( - self, - program: Any, - instruction_candidates: dict[int, list[str]], - demo_candidates: list | None, - evaluate: Evaluate, - valset: list, - num_trials: int, - minibatch: bool, - minibatch_size: int, - minibatch_full_eval_steps: int, - seed: int, - ) -> Any | None: - import optuna - - # Run optimization - optuna.logging.set_verbosity(optuna.logging.WARNING) - logger.info("==> STEP 3: FINDING OPTIMAL PROMPT PARAMETERS <==") - logger.info( - "We will evaluate the program over a series of trials with different combinations of instructions and few-shot examples to find the optimal combination using Bayesian Optimization.\n" - ) - - # Compute the adjusted total trials that we will run (including full evals) - run_additional_full_eval_at_end = 1 if num_trials % minibatch_full_eval_steps != 0 else 0 - adjusted_num_trials = int( - (num_trials + num_trials // minibatch_full_eval_steps + 1 + run_additional_full_eval_at_end) - if minibatch - else num_trials - ) - logger.info(f"== Trial {1} / {adjusted_num_trials} - Full Evaluation of Default Program ==") - - default_score = eval_candidate_program(len(valset), valset, program, evaluate, self.rng).score - logger.info(f"Default program score: {default_score}\n") - - trial_logs = {} - trial_logs[1] = {} - trial_logs[1]["full_eval_program_path"] = save_candidate_program(program, self.log_dir, -1) - trial_logs[1]["full_eval_score"] = default_score - trial_logs[1]["total_eval_calls_so_far"] = len(valset) - trial_logs[1]["full_eval_program"] = program.deepcopy() - - # Initialize optimization variables - best_score = default_score - best_program = program.deepcopy() - total_eval_calls = len(valset) - score_data = [{"score": best_score, "program": program.deepcopy(), "full_eval": True}] - param_score_dict = defaultdict(list) - fully_evaled_param_combos = {} - - # Define the objective function - def objective(trial): - nonlocal program, best_program, best_score, trial_logs, total_eval_calls, score_data - - trial_num = trial.number + 1 - if minibatch: - logger.info(f"== Trial {trial_num} / {adjusted_num_trials} - Minibatch ==") - else: - logger.info(f"===== Trial {trial_num} / {num_trials} =====") - - trial_logs[trial_num] = {} - - # Create a new candidate program - candidate_program = program.deepcopy() - - # Choose instructions and demos, insert them into the program - chosen_params, raw_chosen_params = self._select_and_insert_instructions_and_demos( - candidate_program, - instruction_candidates, - demo_candidates, - trial, - trial_logs, - trial_num, - ) - - # Log assembled program - if self.verbose: - logger.info("Evaluating the following candidate program...\n") - print_full_program(candidate_program) - - # Evaluate the candidate program (on minibatch if minibatch=True) - batch_size = minibatch_size if minibatch else len(valset) - score = eval_candidate_program(batch_size, valset, candidate_program, evaluate, self.rng).score - total_eval_calls += batch_size - - # Update best score and program - if not minibatch and score > best_score: - best_score = score - best_program = candidate_program.deepcopy() - logger.info(f"{GREEN}Best full score so far!{ENDC} Score: {score}") - - # Log evaluation results - score_data.append( - {"score": score, "program": candidate_program, "full_eval": batch_size >= len(valset)} - ) # score, prog, full_eval - if minibatch: - self._log_minibatch_eval( - score, - best_score, - batch_size, - chosen_params, - score_data, - trial, - adjusted_num_trials, - trial_logs, - trial_num, - candidate_program, - total_eval_calls, - ) - else: - self._log_normal_eval( - score, - best_score, - chosen_params, - score_data, - trial, - num_trials, - trial_logs, - trial_num, - valset, - batch_size, - candidate_program, - total_eval_calls, - ) - categorical_key = ",".join(map(str, chosen_params)) - param_score_dict[categorical_key].append( - (score, candidate_program, raw_chosen_params), - ) - - # If minibatch, perform full evaluation at intervals (and at the very end) - if minibatch and ( - (trial_num % (minibatch_full_eval_steps + 1) == 0) or (trial_num == (adjusted_num_trials - 1)) - ): - best_score, best_program, total_eval_calls = self._perform_full_evaluation( - trial_num, - adjusted_num_trials, - param_score_dict, - fully_evaled_param_combos, - evaluate, - valset, - trial_logs, - total_eval_calls, - score_data, - best_score, - best_program, - study, - instruction_candidates, - demo_candidates, - ) - - return score - - sampler = optuna.samplers.TPESampler(seed=seed, multivariate=True) - study = optuna.create_study(direction="maximize", sampler=sampler) - - default_params = {f"{i}_predictor_instruction": 0 for i in range(len(program.predictors()))} - if demo_candidates: - default_params.update({f"{i}_predictor_demos": 0 for i in range(len(program.predictors()))}) - - # Add default run as a baseline in optuna (TODO: figure out how to weight this by # of samples evaluated on) - trial = optuna.trial.create_trial( - params=default_params, - distributions=self._get_param_distributions(program, instruction_candidates, demo_candidates), - value=default_score, - ) - study.add_trial(trial) - study.optimize(objective, n_trials=num_trials) - - # Attach logs to best program - if best_program is not None and self.track_stats: - best_program.trial_logs = trial_logs - best_program.score = best_score - best_program.prompt_model_total_calls = self.prompt_model_total_calls - best_program.total_calls = self.total_calls - sorted_candidate_programs = sorted(score_data, key=lambda x: x["score"], reverse=True) - # Attach all minibatch programs - best_program.mb_candidate_programs = [ - score_data for score_data in sorted_candidate_programs if not score_data["full_eval"] - ] - # Attach all programs that were evaluated on the full trainset, in descending order of score - best_program.candidate_programs = [ - score_data for score_data in sorted_candidate_programs if score_data["full_eval"] - ] - - logger.info(f"Returning best identified program with score {best_score}!") - - return best_program - - def _log_minibatch_eval( - self, - score, - best_score, - batch_size, - chosen_params, - score_data, - trial, - adjusted_num_trials, - trial_logs, - trial_num, - candidate_program, - total_eval_calls, - ): - trial_logs[trial_num]["mb_program_path"] = save_candidate_program(candidate_program, self.log_dir, trial_num) - trial_logs[trial_num]["mb_score"] = score - trial_logs[trial_num]["total_eval_calls_so_far"] = total_eval_calls - trial_logs[trial_num]["mb_program"] = candidate_program.deepcopy() - - logger.info(f"Score: {score} on minibatch of size {batch_size} with parameters {chosen_params}.") - minibatch_scores = ", ".join([f"{s['score']}" for s in score_data if not s["full_eval"]]) - logger.info(f"Minibatch scores so far: {'[' + minibatch_scores + ']'}") - full_eval_scores = ", ".join([f"{s['score']}" for s in score_data if s["full_eval"]]) - trajectory = "[" + full_eval_scores + "]" - logger.info(f"Full eval scores so far: {trajectory}") - logger.info(f"Best full score so far: {best_score}") - logger.info( - f"{'=' * len(f'== Trial {trial.number + 1} / {adjusted_num_trials} - Minibatch Evaluation ==')}\n\n" - ) - - def _log_normal_eval( - self, - score, - best_score, - chosen_params, - score_data, - trial, - num_trials, - trial_logs, - trial_num, - valset, - batch_size, - candidate_program, - total_eval_calls, - ): - trial_logs[trial_num]["full_eval_program_path"] = save_candidate_program( - candidate_program, self.log_dir, trial_num - ) - trial_logs[trial_num]["full_eval_score"] = score - trial_logs[trial_num]["total_eval_calls_so_far"] = total_eval_calls - trial_logs[trial_num]["full_eval_program"] = candidate_program.deepcopy() - - logger.info(f"Score: {score} with parameters {chosen_params}.") - full_eval_scores = ", ".join([f"{s['score']}" for s in score_data if s["full_eval"]]) - logger.info(f"Scores so far: {'[' + full_eval_scores + ']'}") - logger.info(f"Best score so far: {best_score}") - logger.info(f"{'=' * len(f'===== Trial {trial.number + 1} / {num_trials} =====')}\n\n") - - def _select_and_insert_instructions_and_demos( - self, - candidate_program: Any, - instruction_candidates: dict[int, list[str]], - demo_candidates: list | None, - trial: "optuna.trial.Trial", - trial_logs: dict, - trial_num: int, - ) -> list[str]: - chosen_params = [] - raw_chosen_params = {} - - for i, predictor in enumerate(candidate_program.predictors()): - # Select instruction - instruction_idx = trial.suggest_categorical( - f"{i}_predictor_instruction", range(len(instruction_candidates[i])) - ) - selected_instruction = instruction_candidates[i][instruction_idx] - updated_signature = get_signature(predictor).with_instructions(selected_instruction) - set_signature(predictor, updated_signature) - trial_logs[trial_num][f"{i}_predictor_instruction"] = instruction_idx - chosen_params.append(f"Predictor {i}: Instruction {instruction_idx}") - raw_chosen_params[f"{i}_predictor_instruction"] = instruction_idx - # Select demos if available - if demo_candidates: - demos_idx = trial.suggest_categorical(f"{i}_predictor_demos", range(len(demo_candidates[i]))) - predictor.demos = demo_candidates[i][demos_idx] - trial_logs[trial_num][f"{i}_predictor_demos"] = demos_idx - chosen_params.append(f"Predictor {i}: Few-Shot Set {demos_idx}") - raw_chosen_params[f"{i}_predictor_demos"] = instruction_idx - - return chosen_params, raw_chosen_params - - def _get_param_distributions(self, program, instruction_candidates, demo_candidates): - from optuna.distributions import CategoricalDistribution - - param_distributions = {} - - for i in range(len(instruction_candidates)): - param_distributions[f"{i}_predictor_instruction"] = CategoricalDistribution( - range(len(instruction_candidates[i])) - ) - if demo_candidates: - param_distributions[f"{i}_predictor_demos"] = CategoricalDistribution(range(len(demo_candidates[i]))) - - return param_distributions - - def _perform_full_evaluation( - self, - trial_num: int, - adjusted_num_trials: int, - param_score_dict: dict, - fully_evaled_param_combos: dict, - evaluate: Evaluate, - valset: list, - trial_logs: dict, - total_eval_calls: int, - score_data, - best_score: float, - best_program: Any, - study: "optuna.Study", - instruction_candidates: list, - demo_candidates: list, - ): - import optuna - - logger.info(f"===== Trial {trial_num + 1} / {adjusted_num_trials} - Full Evaluation =====") - - # Identify best program to evaluate fully - highest_mean_program, mean_score, combo_key, params = get_program_with_highest_avg_score( - param_score_dict, fully_evaled_param_combos - ) - logger.info(f"Doing full eval on next top averaging program (Avg Score: {mean_score}) from minibatch trials...") - full_eval_score = eval_candidate_program(len(valset), valset, highest_mean_program, evaluate, self.rng).score - score_data.append({"score": full_eval_score, "program": highest_mean_program, "full_eval": True}) - - # Log full eval as a trial so that optuna can learn from the new results - trial = optuna.trial.create_trial( - params=params, - distributions=self._get_param_distributions(best_program, instruction_candidates, demo_candidates), - value=full_eval_score, - ) - study.add_trial(trial) - - # Log full evaluation results - fully_evaled_param_combos[combo_key] = { - "program": highest_mean_program, - "score": full_eval_score, - } - total_eval_calls += len(valset) - trial_logs[trial_num + 1] = {} - trial_logs[trial_num + 1]["total_eval_calls_so_far"] = total_eval_calls - trial_logs[trial_num + 1]["full_eval_program_path"] = save_candidate_program( - program=highest_mean_program, - log_dir=self.log_dir, - trial_num=trial_num + 1, - note="full_eval", - ) - trial_logs[trial_num + 1]["full_eval_program"] = highest_mean_program - trial_logs[trial_num + 1]["full_eval_score"] = full_eval_score - - # Update best score and program if necessary - if full_eval_score > best_score: - logger.info(f"{GREEN}New best full eval score!{ENDC} Score: {full_eval_score}") - best_score = full_eval_score - best_program = highest_mean_program.deepcopy() - full_eval_scores = ", ".join([f"{s['score']}" for s in score_data if s["full_eval"]]) - trajectory = "[" + full_eval_scores + "]" - logger.info(f"Full eval scores so far: {trajectory}") - logger.info(f"Best full score so far: {best_score}") - logger.info(len(f"===== Full Eval {len(fully_evaled_param_combos) + 1} =====") * "=") - logger.info("\n") - - return best_score, best_program, total_eval_calls + # ... rest of methods unchanged ...