diff --git a/ais_bench/benchmark/cli/workers.py b/ais_bench/benchmark/cli/workers.py index ca997164..689762a0 100644 --- a/ais_bench/benchmark/cli/workers.py +++ b/ais_bench/benchmark/cli/workers.py @@ -1,5 +1,7 @@ +import os import os.path as osp import copy +import shutil from abc import ABC, abstractmethod from collections import defaultdict @@ -8,12 +10,15 @@ from ais_bench.benchmark.registry import PARTITIONERS, RUNNERS, build_from_cfg from ais_bench.benchmark.utils.config.run import get_config_type from ais_bench.benchmark.utils.logging.logger import AISLogger +from ais_bench.benchmark.utils.logging.exceptions import PredictionInvalidException +from ais_bench.benchmark.utils.logging.error_codes import TMAN_CODES from ais_bench.benchmark.partitioners import NaivePartitioner from ais_bench.benchmark.runners import LocalRunner from ais_bench.benchmark.tasks import OpenICLEvalTask, OpenICLApiInferTask, OpenICLInferTask from ais_bench.benchmark.summarizers import DefaultSummarizer, DefaultPerfSummarizer from ais_bench.benchmark.calculators import DefaultPerfMetricCalculator from ais_bench.benchmark.cli.utils import fill_model_path_if_datasets_need +from ais_bench.benchmark.utils.file.file import load_jsonl, dump_jsonl logger = AISLogger() @@ -108,6 +113,117 @@ def _update_tasks_cfg(self, tasks, cfg: ConfigDict): task.attack = cfg.attack +class JudgeInfer(BaseWorker): + def update_cfg(self, cfg: ConfigDict) -> None: + def get_task_type() -> str: + if cfg["models"][0]["attr"] == "service": + return get_config_type(OpenICLApiInferTask) + else: + return get_config_type(OpenICLInferTask) + + new_cfg = dict( + judge_infer=dict( + partitioner=dict(type=get_config_type(NaivePartitioner)), + runner=dict( + max_num_workers=self.args.max_num_workers, + max_workers_per_gpu=self.args.max_workers_per_gpu, + debug=self.args.debug, + task=dict(type=get_task_type()), + type=get_config_type(LocalRunner), + ), + ), + ) + + cfg.merge_from_dict(new_cfg) + if cfg.cli_args.debug: + cfg.judge_infer.runner.debug = True + cfg.judge_infer.partitioner["out_dir"] = osp.join(cfg["work_dir"], "predictions/") + return cfg + + def do_work(self, cfg: ConfigDict): + partitioner = PARTITIONERS.build(cfg.judge_infer.partitioner) + logger.info("Starting inference tasks...") + tasks = partitioner(cfg) + + # delete the tasks without judge_infer_cfg + new_tasks = [] + for task in tasks: + if task["datasets"][0][0].get("judge_infer_cfg"): + new_tasks.append(task) + tasks = new_tasks + if len(tasks) == 0: + return + + # update tasks cfg before run + self._update_tasks_cfg(tasks, cfg) + + if ( + cfg.get("cli_args", {}).get("merge_ds", False) + or cfg.get("cli_args", {}).get("mode") == "perf" # performance mode will enable merge datasets by default + ): + logger.info("Merging datasets with the same model and inferencer...") + tasks = self._merge_datasets(tasks) + + runner = RUNNERS.build(cfg.judge_infer.runner) + runner(tasks) + self._result_post_process(tasks, cfg) + logger.info("Inference tasks completed.") + + def _merge_datasets(self, tasks): + # merge datasets with the same model, dataset type and inferencer + task_groups = defaultdict(list) + for task in tasks: + key = ( + task["models"][0]["abbr"] # same model + + "_" + + str(task['datasets'][0][0]['type']) # same dataset type + + "_" + + str(task["datasets"][0][0]["infer_cfg"]["inferencer"]) # same inferencer with the same args + ) + task_groups[key].append(task) + new_tasks = [] + for key, task_group in task_groups.items(): + new_task = copy.deepcopy(task_group[0]) + if len(task_group) > 1: + for t in task_group[1:]: + new_task["datasets"][0].extend(t["datasets"][0]) + new_tasks.append(new_task) + return new_tasks + + def _update_tasks_cfg(self, tasks, cfg: ConfigDict): + # update parameters to correct sub cfg + if hasattr(cfg, "attack"): + for task in tasks: + cfg.attack.dataset = task.datasets[0][0].abbr + task.attack = cfg.attack + + # update judge cfgs to model cfgs and data + for task in tasks: + task["datasets"][0][0]["predictions_path"] = osp.join(cfg.judge_infer.partitioner.out_dir, task["models"][0]["abbr"], f'{task["datasets"][0][0]["abbr"]}.jsonl') + if not osp.exists(task["datasets"][0][0]["predictions_path"]): + raise PredictionInvalidException(TMAN_CODES.UNKNOWN_ERROR, f"Predictions path {task['datasets'][0][0]['predictions_path']} does not exist.") + task["datasets"][0][0]["abbr"] = f'{task["datasets"][0][0]["abbr"]}-{task["datasets"][0][0]["judge_infer_cfg"]["judge_model"]["abbr"]}' + model_abbr = task["models"][0]["abbr"] + task["models"][0] = task["datasets"][0][0]["judge_infer_cfg"].pop("judge_model") + task["models"][0]["abbr"] = model_abbr + task["datasets"][0][0]["type"] = task["datasets"][0][0]["judge_infer_cfg"].pop("judge_dataset_type") + task["datasets"][0][0]["reader_cfg"] = task["datasets"][0][0]["judge_infer_cfg"].pop("judge_reader_cfg") + task["datasets"][0][0]["infer_cfg"] = task["datasets"][0][0].pop("judge_infer_cfg") + + def _result_post_process(self, tasks, cfg: ConfigDict): + # Reconstruct the judge infer predictions to normal predictions format + for task in tasks: + model_org_prediction_path = task["datasets"][0][0]["predictions_path"] + model_preds: dict = {item["uuid"]: item for item in load_jsonl(model_org_prediction_path)} + judge_org_prediction_path = osp.join(cfg.judge_infer.partitioner.out_dir, task["models"][0]["abbr"], f'{task["datasets"][0][0]["abbr"]}.jsonl') + judge_preds: list = load_jsonl(judge_org_prediction_path) + for i, pred in enumerate(judge_preds): + uuid = pred["gold"] + judge_preds[i]["id"] = model_preds[uuid]["id"] + os.remove(judge_org_prediction_path) + dump_jsonl(judge_preds, judge_org_prediction_path) + + class Eval(BaseWorker): def update_cfg(self, cfg: ConfigDict) -> None: new_cfg = dict( @@ -138,7 +254,7 @@ def do_work(self, cfg: ConfigDict): logger.info("Starting evaluation tasks...") tasks = partitioner(cfg) - # update tasks cfg before run + # Update tasks cfg before run self._update_tasks_cfg(tasks, cfg) runner = RUNNERS.build(cfg.eval.runner) @@ -148,11 +264,33 @@ def do_work(self, cfg: ConfigDict): runner(task_part) else: runner(tasks) + self._result_post_process(tasks, cfg) logger.info("Evaluation tasks completed.") def _update_tasks_cfg(self, tasks, cfg: ConfigDict): - # update parameters to correct sub cfg - pass + # Replace default model config to judge model config + self.judge_result_paths = {} + for task in tasks: + if task["datasets"][0][0].get("judge_infer_cfg"): + new_dataset_abbr = f'{task["datasets"][0][0]["abbr"]}-{task["datasets"][0][0]["judge_infer_cfg"]["judge_model"]["abbr"]}' + org_dataset_abbr = task["datasets"][0][0]["abbr"] + self.judge_result_paths[new_dataset_abbr] = org_dataset_abbr + task["datasets"][0][0]["abbr"] = new_dataset_abbr + task["datasets"][0][0].pop("judge_infer_cfg") + + def _result_post_process(self, tasks, cfg: ConfigDict): + # Copy judge infer result to normal name + + for task in tasks: + if task["datasets"][0][0]["abbr"] in self.judge_result_paths.keys(): + cur_results_path = osp.join(cfg.eval.partitioner.out_dir, task["models"][0]["abbr"], f'{task["datasets"][0][0]["abbr"]}.jsonl') + final_org_results_path = osp.join(cfg.eval.partitioner.out_dir, task["models"][0]["abbr"], f'{self.judge_result_paths[task["datasets"][0][0]["abbr"]]}.jsonl') + if os.path.exists(final_org_results_path): + os.remove(final_org_results_path) + + if os.path.exists(cur_results_path): + # 基于cur_results_path的文件复制一份final_org_results_path + shutil.copy(cur_results_path, final_org_results_path) class AccViz(BaseWorker): @@ -171,6 +309,7 @@ def update_cfg(self, cfg: ConfigDict) -> None: def do_work(self, cfg: ConfigDict) -> int: logger.info("Summarizing evaluation results...") summarizer_cfg = cfg.get("summarizer", {}) + cfg = self._cfg_pre_process(cfg) # For subjective summarizer if summarizer_cfg.get("function", None): @@ -203,6 +342,13 @@ def do_work(self, cfg: ConfigDict) -> int: summarizer = build_from_cfg(summarizer_cfg) summarizer.summarize(time_str=self.args.cfg_time_str) + def _cfg_pre_process(self, cfg: ConfigDict) -> None: + for i, dataset in enumerate(cfg.datasets): + if dataset.get("judge_infer_cfg"): + cfg.datasets[i]["abbr"] = f'{cfg.datasets[i]["abbr"]}-{cfg.datasets[i]["judge_infer_cfg"]["judge_model"]["abbr"]}' + cfg.datasets[i].pop("judge_infer_cfg") + return cfg + class PerfViz(BaseWorker): def update_cfg(self, cfg: ConfigDict) -> None: @@ -233,9 +379,9 @@ def do_work(self, cfg: ConfigDict) -> int: WORK_FLOW = dict( - all=[Infer, Eval, AccViz], + all=[Infer, JudgeInfer, Eval, AccViz], infer=[Infer], - eval=[Eval, AccViz], + eval=[JudgeInfer, Eval, AccViz], viz=[AccViz], perf=[Infer, PerfViz], perf_viz=[PerfViz], diff --git a/ais_bench/benchmark/configs/datasets/aime2025/aime2025_gen_0_shot_llmjudge.py b/ais_bench/benchmark/configs/datasets/aime2025/aime2025_gen_0_shot_llmjudge.py new file mode 100644 index 00000000..7ece227c --- /dev/null +++ b/ais_bench/benchmark/configs/datasets/aime2025/aime2025_gen_0_shot_llmjudge.py @@ -0,0 +1,118 @@ +from ais_bench.benchmark.openicl.icl_prompt_template import PromptTemplate +from ais_bench.benchmark.openicl.icl_retriever import ZeroRetriever +from ais_bench.benchmark.openicl.icl_inferencer import GenInferencer +from ais_bench.benchmark.models import VLLMCustomAPIChat +from ais_bench.benchmark.utils.postprocess.model_postprocessors import extract_non_reasoning_content +from ais_bench.benchmark.datasets import ( + Aime2025Dataset, + Aime2025JDGDataset, +) +from ais_bench.benchmark.datasets.utils.llm_judge import get_a_or_b, LLMJudgeCorrectEvaluator + + +aime2025_reader_cfg = dict(input_columns=["question"], output_column="answer") + + +aime2025_infer_cfg = dict( + prompt_template=dict( + type=PromptTemplate, + template=dict( + round=[ + dict( + role="HUMAN", + prompt="{question}\nRemember to put your final answer within \\boxed{}.", + ), + ], + ), + ), + retriever=dict(type=ZeroRetriever), + inferencer=dict(type=GenInferencer), +) + +GRADER_TEMPLATE = """ + Please as a grading expert, judge whether the final answers given by the candidates below are consistent with the standard answers, that is, whether the candidates answered correctly. + + Here are some evaluation criteria: + 1. Please refer to the given standard answer. You don't need to re-generate the answer to the question because the standard answer has been given. You only need to judge whether the candidate's answer is consistent with the standard answer according to the form of the question. Don't try to answer the original question. You can assume that the standard answer is definitely correct. + 2. Because the candidate's answer may be different from the standard answer in the form of expression, before making a judgment, please understand the question and the standard answer first, and then judge whether the candidate's answer is correct, but be careful not to try to answer the original question. + 3. Some answers may contain multiple items, such as multiple-choice questions, multiple-select questions, fill-in-the-blank questions, etc. As long as the answer is the same as the standard answer, it is enough. For multiple-select questions and multiple-blank fill-in-the-blank questions, the candidate needs to answer all the corresponding options or blanks correctly to be considered correct. + 4. Some answers may be expressed in different ways, such as some answers may be a mathematical expression, some answers may be a textual description, as long as the meaning expressed is the same. And some formulas are expressed in different ways, but they are equivalent and correct. + 5. If the prediction is given with \\boxed{}, please ignore the \\boxed{} and only judge whether the candidate's answer is consistent with the standard answer. + 6. If the candidate's answer is semantically incomplete at the end, please judge it as inconsistent. + + Please judge whether the following answers are consistent with the standard answer based on the above criteria. Grade the predicted answer of this new question as one of: + A: Means the answer is consistent with the standard answer. + B: Means the answer is inconsistent with the standard answer. + Just return the letters "A" or "B", with no text around it. + + Here is your task. Simply reply with either CORRECT, INCORRECT. Don't apologize or correct yourself if there was a mistake; we are just trying to grade the answer. + + + : \n{question}\n\n\n + : \n{answer}\n\n\n + : \n{model_answer}\n\n\n + + Judging the correctness of candidates' answers, please return the the letters "A" or "B" first before your thinking: +""".strip() + +aime2025_judge_infer_cfg = dict( + judge_reader_cfg = dict(input_columns=["question", "answer", "model_answer"], output_column="model_pred_uuid"), + judge_model=dict( + attr="service", + type=VLLMCustomAPIChat, + abbr="judge", # Be added after dataset abbr + path="", + model="", + stream=True, + request_rate=0, + use_timestamp=False, + retry=2, + api_key="", + host_ip="localhost", + host_port=8080, + url="", + max_out_len=512, + batch_size=1, + trust_remote_code=False, + generation_kwargs=dict( + temperature=0.01, + ignore_eos=False, + ), + pred_postprocessor=dict(type=extract_non_reasoning_content), + ), + judge_dataset_type=Aime2025JDGDataset, + prompt_template=dict( + type=PromptTemplate, + template=dict( + begin=[ + dict( + role='SYSTEM', + fallback_role='HUMAN', + prompt="You are a helpful assistant who evaluates the correctness and quality of models' outputs.", + ) + ], + round=[ + dict(role='HUMAN', prompt=GRADER_TEMPLATE), + ], + ), + ), + retriever=dict(type=ZeroRetriever), + inferencer=dict(type=GenInferencer), +) + +aime2025_eval_cfg = dict( + evaluator=dict(type=LLMJudgeCorrectEvaluator), + pred_postprocessor=dict(type=get_a_or_b), +) + +aime2025_datasets = [ + dict( + abbr="aime2025", + type=Aime2025Dataset, + path="ais_bench/datasets/aime2025/aime2025.jsonl", + reader_cfg=aime2025_reader_cfg, + infer_cfg=aime2025_infer_cfg, + judge_infer_cfg=aime2025_judge_infer_cfg, + eval_cfg=aime2025_eval_cfg, + ) +] diff --git a/ais_bench/benchmark/datasets/aime2025.py b/ais_bench/benchmark/datasets/aime2025.py index 6e67b07d..b6b13a1c 100644 --- a/ais_bench/benchmark/datasets/aime2025.py +++ b/ais_bench/benchmark/datasets/aime2025.py @@ -1,16 +1,15 @@ -import json +import json from datasets import Dataset from ais_bench.benchmark.registry import LOAD_DATASET from ais_bench.benchmark.datasets.utils.datasets import get_data_path +from ais_bench.benchmark.datasets.utils.llm_judge import LLMJudgeDataset -from .base import BaseDataset - +from ais_bench.benchmark.datasets.base import BaseDataset @LOAD_DATASET.register_module() class Aime2025Dataset(BaseDataset): - @staticmethod def load(path, **kwargs): path = get_data_path(path) @@ -20,3 +19,9 @@ def load(path, **kwargs): line = json.loads(line.strip()) dataset.append(line) return Dataset.from_list(dataset) + + +@LOAD_DATASET.register_module() +class Aime2025JDGDataset(LLMJudgeDataset): + def _get_dataset_class(self): + return Aime2025Dataset diff --git a/ais_bench/benchmark/datasets/base.py b/ais_bench/benchmark/datasets/base.py index de062a5d..57a34359 100644 --- a/ais_bench/benchmark/datasets/base.py +++ b/ais_bench/benchmark/datasets/base.py @@ -1,5 +1,5 @@ from abc import abstractmethod -from typing import List, Dict, Optional, Union +from typing import List, Dict, Optional, Union, Type from datasets import Dataset, DatasetDict from datasets.utils.logging import disable_progress_bar @@ -106,5 +106,64 @@ def test(self): return self.reader.dataset['test'] @abstractmethod - def load(**kwargs) -> Union[Dataset, DatasetDict]: + def load(self, **kwargs) -> Union[Dataset, DatasetDict]: pass + + +class BaseJDGDataset(BaseDataset): + def __init__(self, + reader_cfg: Optional[Dict] = {}, + k: Union[int, List[int]] = 1, + n: int = 1, + **kwargs): + self.dataset_instance = self._init_org_datasets_instance(reader_cfg, k, n, **kwargs) + super().__init__(reader_cfg, k, n, **kwargs) + + def load(self, predictions_path: str, **kwargs): + + dataset_content = self.dataset_instance.dataset["test"] + + # 加载被测模型的推理结果(排序后) + predictions: list = self._load_from_predictions(predictions_path) + + # 为数据集添加 model_answer 列 + if isinstance(dataset_content, Dataset): + dataset_list = [] + for item in predictions: + item_dict = dataset_content[int(item["id"])] + self._modify_dataset_item(item_dict, item) + item_dict["model_pred_uuid"] = item["uuid"] # Be filled in gold + dataset_list.append(item_dict) + elif isinstance(dataset_content, DatasetDict): + dataset_list = [] + for key in dataset_content: + for item in predictions: + item_dict = dataset_content[key][int(item["id"])] + self._modify_dataset_item(item_dict, item) + item_dict["model_pred_uuid"] = item["uuid"] # Be filled in gold + dataset_list.append(item_dict) + else: + raise ValueError(f"Unsupported dataset type: {type(dataset_content)}") + + return Dataset.from_list(dataset_list) + + @abstractmethod + def _load_from_predictions(self, prediction_path: str) -> Dict: + pass + + @abstractmethod + def _get_dataset_class(self): + return BaseDataset + + def _modify_dataset_item(self, dataset_item, pred_item): + dataset_item["model_answer"] = pred_item["prediction"] + + def _init_org_datasets_instance( + self, + reader_cfg: Optional[Dict] = {}, + k: Union[int, List[int]] = 1, + n: int = 1, + **kwargs): + dataset_class = self._get_dataset_class() + return dataset_class(reader_cfg, k, n, **kwargs) + diff --git a/ais_bench/benchmark/datasets/utils/datasets.py b/ais_bench/benchmark/datasets/utils/datasets.py index 9f473d6a..6bb31701 100644 --- a/ais_bench/benchmark/datasets/utils/datasets.py +++ b/ais_bench/benchmark/datasets/utils/datasets.py @@ -69,7 +69,7 @@ def get_sample_data(data_list: list, sample_mode: str = "default", request_count data_list (list): Data list. sample_mode (str): Sample mode. request_count (int): Request count. - + Raises: ValueError: If sample mode is not supported. ValueError: If request count is negative. @@ -101,7 +101,7 @@ def get_sample_data(data_list: list, sample_mode: str = "default", request_count return shuffle_data else: raise ValueError(f"Sample mode: {sample_mode} is not supported!") - + def get_meta_json(dataset_path, meta_path): ori_meta_path = meta_path if not meta_path: @@ -389,7 +389,7 @@ def _to_float(text: str): return relative_change <= max_relative_change else: return prediction.lower() == target.lower() - + def anls_compute(groundtruth, prediction): gt_answer = ' '.join(groundtruth.strip().lower().split()) det_answer = ' '.join(prediction.strip().lower().split()) diff --git a/ais_bench/benchmark/datasets/utils/llm_judge.py b/ais_bench/benchmark/datasets/utils/llm_judge.py new file mode 100644 index 00000000..8b6b18f0 --- /dev/null +++ b/ais_bench/benchmark/datasets/utils/llm_judge.py @@ -0,0 +1,56 @@ +import re +import os + +from ais_bench.benchmark.utils.logging import AISLogger +from ais_bench.benchmark.registry import (ICL_EVALUATORS, LOAD_DATASET, + TEXT_POSTPROCESSORS) +from ais_bench.benchmark.openicl.icl_evaluator import BaseEvaluator +from ais_bench.benchmark.datasets.base import BaseJDGDataset +from ais_bench.benchmark.utils.file.file import load_jsonl +logger = AISLogger() + + +@TEXT_POSTPROCESSORS.register_module("get_a_or_b") +def get_a_or_b(pred: str) -> str: + """从模型回复中提取A或B""" + match = re.search(r'[AB]', pred[-1:]) + return match.group(0) if match else 'B' + + +class LLMJudgeDataset(BaseJDGDataset): + def _load_from_predictions(self, prediction_path: str): + """Load predictions from a directory and merge them with the dataset. + + Args: + prediction_path (str): The path to the prediction file. + + Returns: + Dataset: The merged dataset with predictions. + """ + if os.path.exists(prediction_path): + preds = load_jsonl(prediction_path) + preds.sort(key=lambda x: x.get('id',0)) + return preds + + +@ICL_EVALUATORS.register_module() +class LLMJudgeCorrectEvaluator(BaseEvaluator): + + def __init__(self): + super().__init__() + + def score(self, predictions, references): + if len(predictions) != len(references): + return {'error': 'preds and refrs have different length'} + correct = 0 + count = 0 + details = [] + for i, j in zip(predictions, references): + detail = {'pred': i, 'answer': j, 'correct': False} + count += 1 + if i == "A": + correct += 1 + detail['correct'] = True + details.append(detail) + result = {'accuracy': 100 * correct / count, 'details': details} + return result \ No newline at end of file diff --git a/ais_bench/benchmark/utils/file/file.py b/ais_bench/benchmark/utils/file/file.py index d6bfde67..47f048dc 100644 --- a/ais_bench/benchmark/utils/file/file.py +++ b/ais_bench/benchmark/utils/file/file.py @@ -1,6 +1,8 @@ from typing import List, Tuple, Union import os import json +import mmap +import orjson import fnmatch import tabulate @@ -226,4 +228,31 @@ def check_mm_custom(path): return False if line["type"] not in ["image", "video", "audio"]: return False - return True \ No newline at end of file + return True + +def load_jsonl(path: str) -> List[dict]: + """Load JSONL file into a list of dictionaries. + + Args: + path: Path to the JSONL file + + Returns: + List of dictionaries, each representing a line in the JSONL file + """ + preds = [] + with open(path, "rb") as f: + mm = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) + for line in iter(mm.readline, b""): + preds.append(orjson.loads(line)) + return preds + +def dump_jsonl(data: List[dict], path: str): + """Dump a list of dictionaries to a JSONL file. + + Args: + data: List of dictionaries to be dumped + path: Path to the output JSONL file + """ + with open(path, 'wb') as f: + for item in data: + f.write(orjson.dumps(item) + b'\n') \ No newline at end of file