From e376084469d460e57810a0c47a4ef5c35019925a Mon Sep 17 00:00:00 2001 From: baominghelly <1508269885@qq.com> Date: Thu, 27 Nov 2025 13:36:25 +0800 Subject: [PATCH 1/4] Add test execution gateway for dynamic input operator test --- test/infinicore/framework/runner.py | 4 +- test/infinicore/framework/test_gateway.py | 308 ++++++++++++++++++++++ 2 files changed, 310 insertions(+), 2 deletions(-) create mode 100644 test/infinicore/framework/test_gateway.py diff --git a/test/infinicore/framework/runner.py b/test/infinicore/framework/runner.py index c0de4a7f9..b7550a8a1 100644 --- a/test/infinicore/framework/runner.py +++ b/test/infinicore/framework/runner.py @@ -9,13 +9,13 @@ class GenericTestRunner: """Generic test runner that handles the common execution flow""" - def __init__(self, operator_test_class): + def __init__(self, operator_test_class, args = None): """ Args: operator_test_class: A class that implements BaseOperatorTest interface """ self.operator_test = operator_test_class() - self.args = get_args() + self.args = args or get_args() def run(self): """Execute the complete test suite diff --git a/test/infinicore/framework/test_gateway.py b/test/infinicore/framework/test_gateway.py new file mode 100644 index 000000000..563a1fc21 --- /dev/null +++ b/test/infinicore/framework/test_gateway.py @@ -0,0 +1,308 @@ +import sys +import os +import json +import importlib +import inspect +import argparse +from typing import Any, Optional, Tuple, Union, Dict + +import infinicore +import torch + +current_dir = os.path.dirname(os.path.abspath(__file__)) +parent_dir = os.path.dirname(current_dir) +if parent_dir not in sys.path: + sys.path.insert(0, parent_dir) + +from framework.base import BaseOperatorTest, TestCase, TensorSpec +from framework.config import get_args +from framework.runner import GenericTestRunner + +class TestExecutionGateway: + """ + Test Execution Gateway + """ + + SUPPORTED_HARDWARE_FLAGS = [ + "cpu", "nvidia", "cambricon", "ascend", "iluvatar", + "metax", "moore", "kunlun", "hygon", "qy" + ] + + def run(self, json_file_path: str, config: Union[str, Dict[str, Any], argparse.Namespace, None] = None) -> Any: + print(f"šŸš€ Gateway: Start processing...") + + if not json_file_path or not os.path.exists(json_file_path): + raise FileNotFoundError(f"āŒ JSON file not found: {json_file_path}") + + # Normalize Config Override + override_dict = self._normalize_override_config(config) + + print(f"šŸ“„ Source: Loading {json_file_path}") + try: + op_name, test_case, final_args, op_funcs, op_paths = self._load(json_file_path, override_config=override_dict) + except Exception as e: + import traceback; traceback.print_exc() + raise RuntimeError(f"āŒ Failed to load configuration: {e}") from e + + # Identify active devices for cleaner logging + active_devices = [hw.upper() for hw in self.SUPPORTED_HARDWARE_FLAGS if getattr(final_args, hw, False)] + device_str = ", ".join(active_devices) if active_devices else "None" + + print(f"āš™ļø Ready to execute operator: '{op_name}'") + print(f" Targets: Torch -> {op_paths['torch']}") + print(f" Infini -> {op_paths['infinicore']}") + print(f" Target Devices: {device_str}") + print(f" Config: Bench={final_args.bench}, Prerun={final_args.num_prerun}, Iterations={final_args.num_iterations}") + print(f" Description: {test_case.description}") + + results = self._execute_tests(op_name, test_case, final_args, op_funcs) + + print(f"šŸ Gateway: Process finished.") + return results + + def _normalize_override_config(self, config): + override_dict = {} + if config: + if isinstance(config, str): + if os.path.exists(config): + with open(config, 'r') as f: override_dict = json.load(f) + else: raise FileNotFoundError(f"āŒ Config file not found: {config}") + elif isinstance(config, argparse.Namespace): override_dict = vars(config) + elif isinstance(config, dict): override_dict = config + else: raise ValueError("āŒ Config must be file path, dict, or Namespace") + return override_dict + + def _load( + self, + json_file_path: str, + override_config: Dict[str, Any] + ) -> Tuple[str, TestCase, argparse.Namespace, Dict[str, Any], Dict[str, str]]: + # --- A. Read JSON --- + try: + with open(json_file_path, 'r', encoding='utf-8') as f: + data = json.load(f) + except json.JSONDecodeError: + raise ValueError(f"Invalid JSON format: {json_file_path}") + + # --- B. Extract Operator Info --- + op_name = data.get("operator") + if not op_name: + raise ValueError("JSON missing required 'operator' field.") + + # Load actual functions from strings + torch_op_str = data.get("torch_op") + infini_op_str = data.get("infinicore_op") + + if not torch_op_str or not infini_op_str: + raise ValueError("JSON must specify 'torch_op' and 'infinicore_op' function paths (e.g., 'torch.add').") + + op_funcs = { + "torch": self._load_function(torch_op_str), + "infinicore": self._load_function(infini_op_str) + } + + op_paths = { + "torch": torch_op_str, + "infinicore": infini_op_str + } + + # --- C. Construct Args --- + original_argv = sys.argv + sys.argv = [sys.argv[0]] + args = get_args() + sys.argv = original_argv + + json_args = data.get("args", {}) + for key, value in json_args.items(): + if hasattr(args, key): + setattr(args, key, value) + + target_device = data.get("device", "cpu").lower() + self._set_device_flags(args, target_device) + + # --- D. Construct TestCase --- + test_case = self._build_test_case(data, op_name) + + # --- E. Apply Override --- + if override_config: + for key, value in override_config.items(): + if value is not None: + setattr(args, key, value) + if 'device' in override_config and override_config['device']: + self._set_device_flags(args, override_config['device']) + + return op_name, test_case, args, op_funcs, op_paths + + def _execute_tests( + self, + op_name: str, + test_case: TestCase, + args: argparse.Namespace, + op_funcs: Dict[str, Any]): + """ + Constructs a DynamicOpTest class on the fly and runs it. + """ + cases_to_run = [test_case] + + torch_func = op_funcs["torch"] + infini_func = op_funcs["infinicore"] + + class DynamicOpTest(BaseOperatorTest): + def __init__(self): + super().__init__(op_name) + + def get_test_cases(self): + return cases_to_run + + def torch_operator(self, *args, **kwargs): + return torch_func(*args, **kwargs) + + def infinicore_operator(self, *args, **kwargs): + return infini_func(*args, **kwargs) + + runner = GenericTestRunner(DynamicOpTest, args) + try: runner.run_and_exit() + except SystemExit: pass + + return getattr(runner, "test_results", "Done") + + def _load_function(self, func_path: str) -> Any: + """ + Dynamically imports a module and retrieves a function object. + + Supports: + - "torch.add" -> module: torch, func: add + - "torch.nn.functional.adaptive_max_pool1d" -> module: torch.nn.functional, func: adaptive_max_pool1d + """ + if "." not in func_path: + raise ValueError(f"Invalid function path: '{func_path}'. Must be 'module.function'.") + + # Split from the right to separate module path and function name + module_name, func_name = func_path.rsplit(".", 1) + + try: + # Attempt to import the module part + module = importlib.import_module(module_name) + except ImportError as e: + raise ImportError(f"āŒ Could not import module '{module_name}' for function '{func_path}': {e}") + + try: + # Retrieve the function from the imported module + func = getattr(module, func_name) + except AttributeError: + raise AttributeError(f"āŒ Module '{module_name}' has no function named '{func_name}'") + + return func + + def _set_device_flags(self, args, target_device_str): + + for flag in self.SUPPORTED_HARDWARE_FLAGS: + if hasattr(args, flag): setattr(args, flag, False) + + d = target_device_str.lower() + if "cpu" in d: args.cpu = True + elif "cuda" in d or "nvidia" in d: args.nvidia = True + elif "npu" in d or "ascend" in d: args.ascend = True + elif "mlu" in d or "cambricon" in d: args.cambricon = True + elif "iluvatar" in d: args.iluvatar = True + elif "metax" in d or "maca" in d: args.metax = True + elif "musa" in d or "moore" in d: args.moore = True + elif "xpu" in d or "kunlun" in d: args.kunlun = True + elif "dcu" in d or "hygon" in d: args.hygon = True + elif "qy" in d: args.qy = True + else: + print(f"āš ļø Unknown device '{d}'. Fallback to CPU.") + args.cpu = True + + def _build_test_case(self, data, op_name): + # 1. Parse Inputs + inputs_list = [] + raw_inputs = data.get("inputs", []) + for idx, inp in enumerate(raw_inputs): + spec = self._parse_spec_from_dict(inp, f"in_{idx}") + inputs_list.append(spec) + + # 2. Parse Kwargs + raw_kwargs = data.get("kwargs", {}) + kwargs = {} + for k, v in raw_kwargs.items(): + if isinstance(v, dict) and "shape" in v and "dtype" in v: + kwargs[k] = self._parse_spec_from_dict(v, default_name=k) + else: + kwargs[k] = v + + # 3. Parse Output Spec + output_spec = None + raw_out_spec = data.get("output_spec") + if raw_out_spec: + output_spec = self._parse_spec_from_dict(raw_out_spec, "out_0") + + # 4. Parse Output Specs (Multiple) + output_specs = None + raw_out_specs = data.get("output_specs") + if raw_out_specs: + output_specs = [] + for idx, spec_data in enumerate(raw_out_specs): + spec = self._parse_spec_from_dict(spec_data, f"out_{idx}") + output_specs.append(spec) + + # 5. Determine output count + output_count = 1 + if output_specs: + output_count = len(output_specs) + elif "output_count" in data: + output_count = int(data["output_count"]) + + comparison_target = data.get("comparison_target") + tolerance = data.get("tolerance", {"atol": 1e-3, "rtol": 1e-3}) + description = data.get("description", f"Auto-test {op_name}") + + return TestCase( + inputs=inputs_list, + kwargs=kwargs, + output_spec=output_spec, + output_specs=output_specs, + comparison_target=comparison_target, + tolerance=tolerance, + description=description, + output_count=output_count + ) + + def _parse_spec_from_dict(self, spec_dict: Dict, default_name: str): + """Helper to create TensorSpec from a dictionary definition""" + return TensorSpec.from_tensor( + shape=tuple(spec_dict["shape"]), + strides=tuple(spec_dict["strides"]) if spec_dict.get("strides") else None, + dtype=self._parse_dtype(spec_dict.get("dtype", "float32")), + name=spec_dict.get("name", default_name) + ) + + def _parse_dtype(self, dtype_str: str): + dtype_map = { + "float16": infinicore.float16, "float32": infinicore.float32, + "bfloat16": infinicore.bfloat16, "int32": infinicore.int32, + "int64": infinicore.int64, "bool": infinicore.bool, + } + return dtype_map.get(dtype_str, infinicore.float32) + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Test Gateway") + parser.add_argument("file_path", type=str, help="Path to JSON config file") + + # Overrides + parser.add_argument("--device", type=str, default=None) + parser.add_argument("--bench", type=str, choices=["host", "device", "both"], default=None) + parser.add_argument("--debug", action="store_true") + parser.add_argument("--num_prerun", type=int, default=None) + parser.add_argument("--num_iterations", type=int, default=None) + parser.add_argument("--verbose", action="store_true") + + gateway_args = parser.parse_args() + + override_dict = { + k: v for k, v in vars(gateway_args).items() + if k != "file_path" and v is not None and v is not False + } + + gateway = TestExecutionGateway() + gateway.run(json_file_path=gateway_args.file_path, config=override_dict) From 999b1b4eda1b92c0278ae5b0e6936fe0f983ad90 Mon Sep 17 00:00:00 2001 From: baominghelly <1508269885@qq.com> Date: Thu, 27 Nov 2025 18:01:52 +0800 Subject: [PATCH 2/4] fix no test results output bug --- test/infinicore/framework/test_gateway.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/test/infinicore/framework/test_gateway.py b/test/infinicore/framework/test_gateway.py index 563a1fc21..859838537 100644 --- a/test/infinicore/framework/test_gateway.py +++ b/test/infinicore/framework/test_gateway.py @@ -160,11 +160,11 @@ def torch_operator(self, *args, **kwargs): def infinicore_operator(self, *args, **kwargs): return infini_func(*args, **kwargs) - runner = GenericTestRunner(DynamicOpTest, args) - try: runner.run_and_exit() - except SystemExit: pass + generic_runner = GenericTestRunner(DynamicOpTest, args) + + success, internal_runner = generic_runner.run() - return getattr(runner, "test_results", "Done") + return getattr(internal_runner, "test_results", "Done") def _load_function(self, func_path: str) -> Any: """ From 81a87486415aceea91a8203473c92f2df5b5c44f Mon Sep 17 00:00:00 2001 From: baominghelly <1508269885@qq.com> Date: Wed, 3 Dec 2025 23:39:03 +0800 Subject: [PATCH 3/4] Change class name to TestManager and add save json file function --- test/infinicore/framework/__init__.py | 1 + test/infinicore/framework/test_gateway.py | 308 ----------- test/infinicore/framework/testcase_manager.py | 515 ++++++++++++++++++ test/infinicore/run_external_case.py | 84 +++ 4 files changed, 600 insertions(+), 308 deletions(-) delete mode 100644 test/infinicore/framework/test_gateway.py create mode 100644 test/infinicore/framework/testcase_manager.py create mode 100644 test/infinicore/run_external_case.py diff --git a/test/infinicore/framework/__init__.py b/test/infinicore/framework/__init__.py index 79c71b1c5..6d0936775 100644 --- a/test/infinicore/framework/__init__.py +++ b/test/infinicore/framework/__init__.py @@ -36,6 +36,7 @@ "TestConfig", "TestResult", "TestRunner", + "TestCaseManager", # Core functions "compare_results", "convert_infinicore_to_torch", diff --git a/test/infinicore/framework/test_gateway.py b/test/infinicore/framework/test_gateway.py deleted file mode 100644 index 859838537..000000000 --- a/test/infinicore/framework/test_gateway.py +++ /dev/null @@ -1,308 +0,0 @@ -import sys -import os -import json -import importlib -import inspect -import argparse -from typing import Any, Optional, Tuple, Union, Dict - -import infinicore -import torch - -current_dir = os.path.dirname(os.path.abspath(__file__)) -parent_dir = os.path.dirname(current_dir) -if parent_dir not in sys.path: - sys.path.insert(0, parent_dir) - -from framework.base import BaseOperatorTest, TestCase, TensorSpec -from framework.config import get_args -from framework.runner import GenericTestRunner - -class TestExecutionGateway: - """ - Test Execution Gateway - """ - - SUPPORTED_HARDWARE_FLAGS = [ - "cpu", "nvidia", "cambricon", "ascend", "iluvatar", - "metax", "moore", "kunlun", "hygon", "qy" - ] - - def run(self, json_file_path: str, config: Union[str, Dict[str, Any], argparse.Namespace, None] = None) -> Any: - print(f"šŸš€ Gateway: Start processing...") - - if not json_file_path or not os.path.exists(json_file_path): - raise FileNotFoundError(f"āŒ JSON file not found: {json_file_path}") - - # Normalize Config Override - override_dict = self._normalize_override_config(config) - - print(f"šŸ“„ Source: Loading {json_file_path}") - try: - op_name, test_case, final_args, op_funcs, op_paths = self._load(json_file_path, override_config=override_dict) - except Exception as e: - import traceback; traceback.print_exc() - raise RuntimeError(f"āŒ Failed to load configuration: {e}") from e - - # Identify active devices for cleaner logging - active_devices = [hw.upper() for hw in self.SUPPORTED_HARDWARE_FLAGS if getattr(final_args, hw, False)] - device_str = ", ".join(active_devices) if active_devices else "None" - - print(f"āš™ļø Ready to execute operator: '{op_name}'") - print(f" Targets: Torch -> {op_paths['torch']}") - print(f" Infini -> {op_paths['infinicore']}") - print(f" Target Devices: {device_str}") - print(f" Config: Bench={final_args.bench}, Prerun={final_args.num_prerun}, Iterations={final_args.num_iterations}") - print(f" Description: {test_case.description}") - - results = self._execute_tests(op_name, test_case, final_args, op_funcs) - - print(f"šŸ Gateway: Process finished.") - return results - - def _normalize_override_config(self, config): - override_dict = {} - if config: - if isinstance(config, str): - if os.path.exists(config): - with open(config, 'r') as f: override_dict = json.load(f) - else: raise FileNotFoundError(f"āŒ Config file not found: {config}") - elif isinstance(config, argparse.Namespace): override_dict = vars(config) - elif isinstance(config, dict): override_dict = config - else: raise ValueError("āŒ Config must be file path, dict, or Namespace") - return override_dict - - def _load( - self, - json_file_path: str, - override_config: Dict[str, Any] - ) -> Tuple[str, TestCase, argparse.Namespace, Dict[str, Any], Dict[str, str]]: - # --- A. Read JSON --- - try: - with open(json_file_path, 'r', encoding='utf-8') as f: - data = json.load(f) - except json.JSONDecodeError: - raise ValueError(f"Invalid JSON format: {json_file_path}") - - # --- B. Extract Operator Info --- - op_name = data.get("operator") - if not op_name: - raise ValueError("JSON missing required 'operator' field.") - - # Load actual functions from strings - torch_op_str = data.get("torch_op") - infini_op_str = data.get("infinicore_op") - - if not torch_op_str or not infini_op_str: - raise ValueError("JSON must specify 'torch_op' and 'infinicore_op' function paths (e.g., 'torch.add').") - - op_funcs = { - "torch": self._load_function(torch_op_str), - "infinicore": self._load_function(infini_op_str) - } - - op_paths = { - "torch": torch_op_str, - "infinicore": infini_op_str - } - - # --- C. Construct Args --- - original_argv = sys.argv - sys.argv = [sys.argv[0]] - args = get_args() - sys.argv = original_argv - - json_args = data.get("args", {}) - for key, value in json_args.items(): - if hasattr(args, key): - setattr(args, key, value) - - target_device = data.get("device", "cpu").lower() - self._set_device_flags(args, target_device) - - # --- D. Construct TestCase --- - test_case = self._build_test_case(data, op_name) - - # --- E. Apply Override --- - if override_config: - for key, value in override_config.items(): - if value is not None: - setattr(args, key, value) - if 'device' in override_config and override_config['device']: - self._set_device_flags(args, override_config['device']) - - return op_name, test_case, args, op_funcs, op_paths - - def _execute_tests( - self, - op_name: str, - test_case: TestCase, - args: argparse.Namespace, - op_funcs: Dict[str, Any]): - """ - Constructs a DynamicOpTest class on the fly and runs it. - """ - cases_to_run = [test_case] - - torch_func = op_funcs["torch"] - infini_func = op_funcs["infinicore"] - - class DynamicOpTest(BaseOperatorTest): - def __init__(self): - super().__init__(op_name) - - def get_test_cases(self): - return cases_to_run - - def torch_operator(self, *args, **kwargs): - return torch_func(*args, **kwargs) - - def infinicore_operator(self, *args, **kwargs): - return infini_func(*args, **kwargs) - - generic_runner = GenericTestRunner(DynamicOpTest, args) - - success, internal_runner = generic_runner.run() - - return getattr(internal_runner, "test_results", "Done") - - def _load_function(self, func_path: str) -> Any: - """ - Dynamically imports a module and retrieves a function object. - - Supports: - - "torch.add" -> module: torch, func: add - - "torch.nn.functional.adaptive_max_pool1d" -> module: torch.nn.functional, func: adaptive_max_pool1d - """ - if "." not in func_path: - raise ValueError(f"Invalid function path: '{func_path}'. Must be 'module.function'.") - - # Split from the right to separate module path and function name - module_name, func_name = func_path.rsplit(".", 1) - - try: - # Attempt to import the module part - module = importlib.import_module(module_name) - except ImportError as e: - raise ImportError(f"āŒ Could not import module '{module_name}' for function '{func_path}': {e}") - - try: - # Retrieve the function from the imported module - func = getattr(module, func_name) - except AttributeError: - raise AttributeError(f"āŒ Module '{module_name}' has no function named '{func_name}'") - - return func - - def _set_device_flags(self, args, target_device_str): - - for flag in self.SUPPORTED_HARDWARE_FLAGS: - if hasattr(args, flag): setattr(args, flag, False) - - d = target_device_str.lower() - if "cpu" in d: args.cpu = True - elif "cuda" in d or "nvidia" in d: args.nvidia = True - elif "npu" in d or "ascend" in d: args.ascend = True - elif "mlu" in d or "cambricon" in d: args.cambricon = True - elif "iluvatar" in d: args.iluvatar = True - elif "metax" in d or "maca" in d: args.metax = True - elif "musa" in d or "moore" in d: args.moore = True - elif "xpu" in d or "kunlun" in d: args.kunlun = True - elif "dcu" in d or "hygon" in d: args.hygon = True - elif "qy" in d: args.qy = True - else: - print(f"āš ļø Unknown device '{d}'. Fallback to CPU.") - args.cpu = True - - def _build_test_case(self, data, op_name): - # 1. Parse Inputs - inputs_list = [] - raw_inputs = data.get("inputs", []) - for idx, inp in enumerate(raw_inputs): - spec = self._parse_spec_from_dict(inp, f"in_{idx}") - inputs_list.append(spec) - - # 2. Parse Kwargs - raw_kwargs = data.get("kwargs", {}) - kwargs = {} - for k, v in raw_kwargs.items(): - if isinstance(v, dict) and "shape" in v and "dtype" in v: - kwargs[k] = self._parse_spec_from_dict(v, default_name=k) - else: - kwargs[k] = v - - # 3. Parse Output Spec - output_spec = None - raw_out_spec = data.get("output_spec") - if raw_out_spec: - output_spec = self._parse_spec_from_dict(raw_out_spec, "out_0") - - # 4. Parse Output Specs (Multiple) - output_specs = None - raw_out_specs = data.get("output_specs") - if raw_out_specs: - output_specs = [] - for idx, spec_data in enumerate(raw_out_specs): - spec = self._parse_spec_from_dict(spec_data, f"out_{idx}") - output_specs.append(spec) - - # 5. Determine output count - output_count = 1 - if output_specs: - output_count = len(output_specs) - elif "output_count" in data: - output_count = int(data["output_count"]) - - comparison_target = data.get("comparison_target") - tolerance = data.get("tolerance", {"atol": 1e-3, "rtol": 1e-3}) - description = data.get("description", f"Auto-test {op_name}") - - return TestCase( - inputs=inputs_list, - kwargs=kwargs, - output_spec=output_spec, - output_specs=output_specs, - comparison_target=comparison_target, - tolerance=tolerance, - description=description, - output_count=output_count - ) - - def _parse_spec_from_dict(self, spec_dict: Dict, default_name: str): - """Helper to create TensorSpec from a dictionary definition""" - return TensorSpec.from_tensor( - shape=tuple(spec_dict["shape"]), - strides=tuple(spec_dict["strides"]) if spec_dict.get("strides") else None, - dtype=self._parse_dtype(spec_dict.get("dtype", "float32")), - name=spec_dict.get("name", default_name) - ) - - def _parse_dtype(self, dtype_str: str): - dtype_map = { - "float16": infinicore.float16, "float32": infinicore.float32, - "bfloat16": infinicore.bfloat16, "int32": infinicore.int32, - "int64": infinicore.int64, "bool": infinicore.bool, - } - return dtype_map.get(dtype_str, infinicore.float32) - -if __name__ == "__main__": - parser = argparse.ArgumentParser(description="Test Gateway") - parser.add_argument("file_path", type=str, help="Path to JSON config file") - - # Overrides - parser.add_argument("--device", type=str, default=None) - parser.add_argument("--bench", type=str, choices=["host", "device", "both"], default=None) - parser.add_argument("--debug", action="store_true") - parser.add_argument("--num_prerun", type=int, default=None) - parser.add_argument("--num_iterations", type=int, default=None) - parser.add_argument("--verbose", action="store_true") - - gateway_args = parser.parse_args() - - override_dict = { - k: v for k, v in vars(gateway_args).items() - if k != "file_path" and v is not None and v is not False - } - - gateway = TestExecutionGateway() - gateway.run(json_file_path=gateway_args.file_path, config=override_dict) diff --git a/test/infinicore/framework/testcase_manager.py b/test/infinicore/framework/testcase_manager.py new file mode 100644 index 000000000..9c59dac2e --- /dev/null +++ b/test/infinicore/framework/testcase_manager.py @@ -0,0 +1,515 @@ +import sys +import os +import json +import importlib +import argparse +from typing import Any, Optional, Tuple, Union, Dict, List +from dataclasses import is_dataclass + +import infinicore +import torch + +# Path adaptation +current_dir = os.path.dirname(os.path.abspath(__file__)) +parent_dir = os.path.dirname(current_dir) +if parent_dir not in sys.path: + sys.path.insert(0, parent_dir) + +from framework.base import BaseOperatorTest, TestCase, TensorSpec +from framework.config import get_args, get_supported_hardware_platforms +from framework.runner import GenericTestRunner +from framework.devices import InfiniDeviceEnum + + +class TestCaseManager: + """ + Test Case Manager (Strict Schema Version) + """ + + def __init__(self): + # Load supported hardware flags for CLI args (Strings) + self.supported_hw_flags = [ + item[0].lstrip("-") for item in get_supported_hardware_platforms() + ] + + def run( + self, + json_file_path: Optional[str] = None, + config: Union[str, Dict[str, Any], argparse.Namespace, None] = None, + save_path: str = None, + ) -> Any: + print(f"šŸš€ Test Case Manager: Start processing...") + override_dict = self._normalize_override_config(config) + + test_configs = [] + + # 1. Load Configurations + if json_file_path and os.path.exists(json_file_path): + print(f"šŸ“„ Source: Loading {json_file_path}") + test_configs = self._load(json_file_path, override_config=override_dict) + else: + # Fallback to default hardcoded case + ( + op_name, + test_cases, + final_args, + op_funcs, + op_paths, + ) = self._load_default_case(override_dict) + + test_configs.append( + { + "op_name": op_name, + "test_cases": test_cases, + "args": final_args, + "op_funcs": op_funcs, + "op_paths": op_paths, + "target_device": "cpu", + } + ) + + total_results = [] + + # 2. Execute & Collect Results + for idx, cfg in enumerate(test_configs): + op_name = cfg["op_name"] + test_cases = cfg["test_cases"] + n_cases = len(test_cases) + + print(f"\nšŸ”¹ Config {idx + 1}/{len(test_configs)}: {op_name} ({n_cases} cases)") + + # Execute + # results_list is a list of TestResult objects + results_list = self._execute_tests( + op_name, test_cases, cfg["args"], cfg["op_funcs"] + ) + + # Report + entry = self._prepare_report_entry( + op_name, + test_cases, + cfg["args"], + cfg["op_paths"], + cfg["target_device"], + results_list, + ) + total_results.append(entry) + + # 3. Save + if save_path: + self._save_all_results(save_path, total_results) + + return total_results + + def _load(self, json_file_path: str, override_config: Dict[str, Any]) -> List[Dict]: + try: + with open(json_file_path, "r", encoding="utf-8") as f: + data = json.load(f) + except json.JSONDecodeError: + raise ValueError(f"Invalid JSON: {json_file_path}") + + data_list = data if isinstance(data, list) else [data] + configs = [] + + for case_data in data_list: + op_name = case_data.get("operator") + if not op_name: + continue + + torch_op = case_data.get("torch_op") or self._discover_op_path( + op_name, + ["torch", "torch.nn.functional"] + ) + infini_op = case_data.get("infinicore_op") or self._discover_op_path( + op_name, + ["infinicore", "infinicore.nn.functional"] + ) + + # Load Functions + op_funcs = { + "torch": self._load_function(torch_op), + "infinicore": self._load_function(infini_op), + } + op_paths = { + "torch": torch_op, + "infinicore": infini_op, + } + + # Setup Args & Device + case_args = self._get_default_args() + self._merge_args(case_args, case_data.get("args", {})) + + if override_config: + self._merge_args(case_args, override_config) + + # Determine Target Device + if override_config and "device" in override_config: + target_device = override_config["device"] + else: + target_device = case_data.get("device", "cpu") + + self._set_device_flags(case_args, target_device) + + # Build Cases (Strict Mode) + test_cases = self._build_test_cases(case_data, op_name) + + configs.append( + { + "op_name": op_name, + "test_cases": test_cases, + "args": case_args, + "op_funcs": op_funcs, + "op_paths": op_paths, + "target_device": target_device, + } + ) + return configs + + def _build_test_cases(self, data: Dict, op_name: str) -> List[TestCase]: + """ + Parses 'cases' list from JSON. + """ + cases_data = data.get("cases") + if not cases_data or not isinstance(cases_data, list): + raise ValueError(f"āŒ Config for '{op_name}' missing required 'cases' list.") + + base_desc = data.get("description", f"Auto-test {op_name}") + base_tol = data.get("tolerance", {"atol": 1e-3, "rtol": 1e-3}) + base_cmp = data.get("comparison_target", None) + + test_cases_list = [] + + for idx, sub in enumerate(cases_data): + full_desc = f"{base_desc} - {sub.get('description', f'Case_{idx}')}" + + # Parse inputs + raw_inputs = sub.get("inputs", []) + inputs = [ + self._parse_spec(inp, f"in_{i}") for i, inp in enumerate(raw_inputs) + ] + + # Parse kwargs + kwargs = {} + for k, v in sub.get("kwargs", {}).items(): + if isinstance(v, dict) and "shape" in v and "dtype" in v: + kwargs[k] = self._parse_spec(v, k) + else: + kwargs[k] = v + + # Parse outputs + out_spec = None + if "output_spec" in sub: + out_spec = self._parse_spec(sub["output_spec"], "out") + + out_specs = None + if "output_specs" in sub: + out_specs = [ + self._parse_spec(s, f"out_{i}") + for i, s in enumerate(sub["output_specs"]) + ] + + # Determine output count + out_count = len(out_specs) if out_specs else sub.get("output_count", 1) + + tc = TestCase( + inputs=inputs, + kwargs=kwargs, + output_spec=out_spec, + output_specs=out_specs, + comparison_target=base_cmp, + tolerance=sub.get("tolerance", base_tol), + description=full_desc, + output_count=out_count, + ) + test_cases_list.append(tc) + + return test_cases_list + + def _execute_tests(self, op_name, test_cases, args, op_funcs): + # Define dynamic test class + class DynamicOpTest(BaseOperatorTest): + def __init__(self): + super().__init__(op_name) + + def get_test_cases(self): + return test_cases + + def torch_operator(self, *a, **k): + return op_funcs["torch"](*a, **k) + + def infinicore_operator(self, *a, **k): + return op_funcs["infinicore"](*a, **k) + + runner = GenericTestRunner(DynamicOpTest, args) + _, internal_runner = runner.run() + + # Returns a list of TestResult objects + return getattr(internal_runner, "test_results", []) + + def _prepare_report_entry( + self, op_name, test_cases, args, op_paths, device, results_list + ): + """ + Separates 'cases' (static input) and 'execution_results' (dynamic output). + """ + # Map results by index + results_map = {} + if isinstance(results_list, list): + results_map = {i: res for i, res in enumerate(results_list)} + elif isinstance(results_list, dict): + results_map = results_list + else: + results_map = {0: results_list} + + processed_cases = [] + formatted_results = [] + + for idx, tc in enumerate(test_cases): + # 1. Reconstruct case dict (Static info ONLY) + case_data = { + "description": tc.description, + "inputs": [self._spec_to_dict(i) for i in tc.inputs], + "kwargs": { + k: ( + self._spec_to_dict(v) if isinstance(v, TensorSpec) else v + ) + for k, v in tc.kwargs.items() + }, + "comparison_target": tc.comparison_target, + "tolerance": tc.tolerance, + } + + if tc.output_spec: + case_data["output_spec"] = self._spec_to_dict(tc.output_spec) + + if hasattr(tc, "output_specs") and tc.output_specs: + case_data["output_specs"] = [ + self._spec_to_dict(s) for s in tc.output_specs + ] + + processed_cases.append(case_data) + + # 2. Extract Result + res = results_map.get(idx) + if res: + formatted_results.append(self._fmt_result(res)) + else: + formatted_results.append({"status": {"success": False, "error": "No result"}}) + + # Global Arguments + global_args = { + k: getattr(args, k) + for k in ["bench", "num_prerun", "num_iterations", "verbose", "debug"] + if hasattr(args, k) + } + + # Use tolerance from the first case as global tolerance display + global_tolerance = test_cases[0].tolerance if test_cases else {"atol": 1e-3, "rtol": 1e-3} + + return { + "operator": op_name, + "device": device, + "description": f"Test Report for {op_name}", + "torch_op": op_paths["torch"], + "infinicore_op": op_paths["infinicore"], + "tolerance": global_tolerance, + "args": global_args, + "cases": processed_cases, + "execution_results": formatted_results, + } + + def _save_all_results(self, save_path, total_results): + print(f"šŸ’¾ Saving to: {save_path}") + try: + with open(save_path, "w", encoding="utf-8") as f: + f.write("[\n") + + for i, entry in enumerate(total_results): + f.write(" {\n") + keys = list(entry.keys()) + + for j, key in enumerate(keys): + # āœ… Apply list compression to both 'cases' and 'execution_results' + if key in ["cases", "execution_results"] and isinstance(entry[key], list): + f.write(f' "{key}": [\n') + sub_list = entry[key] + for c_idx, c_item in enumerate(sub_list): + # Compress each item into one line + c_str = json.dumps(c_item, ensure_ascii=False) + comma = "," if c_idx < len(sub_list) - 1 else "" + f.write(f" {c_str}{comma}\n") + + list_comma = "," if j < len(keys) - 1 else "" + f.write(f" ]{list_comma}\n") + else: + # Standard compact formatting for other fields + k_str = json.dumps(key, ensure_ascii=False) + v_str = json.dumps(entry[key], ensure_ascii=False) + + comma = "," if j < len(keys) - 1 else "" + f.write(f" {k_str}: {v_str}{comma}\n") + + if i < len(total_results) - 1: + f.write(" },\n") + else: + f.write(" }\n") + + f.write("]\n") + print(f" āœ… Saved (Structure Matched).") + except Exception as e: + print(f" āŒ Save failed: {e}") + + # --- Helpers --- + + def _discover_op_path(self, op_name: str, candidates: List[str]) -> str: + """ + Attempts to find a valid function path by trying imports. + """ + for prefix in candidates: + full_path = f"{prefix}.{op_name}" + try: + self._load_function(full_path) + return full_path + except (ImportError, AttributeError, ValueError): + continue + raise ValueError( + f"āŒ Could not auto-discover function for operator '{op_name}' " + f"in candidates: {candidates}. Please specify 'torch_op'/'infinicore_op' explicitly." + ) + + def _parse_spec(self, d, name): + """ + Parses dict into TensorSpec. + """ + strides = tuple(d["strides"]) if d.get("strides") else None + + return TensorSpec.from_tensor( + tuple(d["shape"]), + strides, + getattr(infinicore, d.get("dtype", "float32"), infinicore.float32), + name=d.get("name", name), + ) + + def _spec_to_dict(self, s): + return { + "name": s.name, + "shape": list(s.shape) if s.shape else None, + "dtype": str(s.dtype).split(".")[-1], + # Add strides to output if present + "strides": list(s.strides) if s.strides else None, + } + + def _fmt_result(self, res): + """ + Format result with optimized Map lookup. + """ + if not (is_dataclass(res) or hasattr(res, "success")): + return str(res) + + get_time = lambda k: round(getattr(res, k, 0.0), 4) + + # Build Map Locally + device_id_map = { + v: k + for k, v in vars(InfiniDeviceEnum).items() + if not k.startswith("_") + } + + raw_id = getattr(res, "device", None) + dev_str = device_id_map.get(raw_id, str(raw_id)) + + return { + "status": { + "success": getattr(res, "success", False), + "error": getattr(res, "error_message", ""), + }, + "perf_ms": { + "torch": { + "host": get_time("torch_host_time"), + "device": get_time("torch_device_time"), + }, + "infinicore": { + "host": get_time("infini_host_time"), + "device": get_time("infini_device_time"), + }, + }, + "device": dev_str, + } + + def _load_function(self, path): + if not path or "." not in path: + raise ValueError(f"Invalid path: {path}") + module_name, func_name = path.rsplit(".", 1) + module = importlib.import_module(module_name) + return getattr(module, func_name) + + def _get_default_args(self): + old_argv = sys.argv + sys.argv = [sys.argv[0]] + args = get_args() + sys.argv = old_argv + return args + + def _merge_args(self, args, overrides): + if not overrides: + return + + data = ( + vars(overrides) if isinstance(overrides, argparse.Namespace) else overrides + ) + for k, v in data.items(): + if v is not None: + setattr(args, k, v) + + def _set_device_flags(self, args, device_str): + # Reset existing flags + for flag in self.supported_hw_flags: + if hasattr(args, flag): + setattr(args, flag, False) + + d = str(device_str).lower() + + if hasattr(args, d): + setattr(args, d, True) + else: + args.cpu = True + print(f"āš ļø Device '{d}' -> CPU (Fallback)") + + def _normalize_override_config(self, config): + if isinstance(config, str) and os.path.exists(config): + with open(config) as f: + return json.load(f) + + if isinstance(config, argparse.Namespace): + return vars(config) + + return config or {} + + def _load_default_case(self, overrides): + args = self._get_default_args() + self._merge_args(args, overrides) + self._set_device_flags(args, "cpu") + + data = { + "description": "Default Add", + "cases": [ + { + "inputs": [{"shape": [13, 4, 4]}, {"shape": [13, 4, 4]}], + "output_spec": {"shape": [13, 4, 4]}, + } + ], + } + + op_name = "add" + test_cases = self._build_test_cases(data, op_name) + + op_funcs = { + "torch": self._load_function("torch.add"), + "infinicore": self._load_function("infinicore.add"), + } + op_paths = { + "torch": "torch.add", + "infinicore": "infinicore.add", + } + + return op_name, test_cases, args, op_funcs, op_paths diff --git a/test/infinicore/run_external_case.py b/test/infinicore/run_external_case.py new file mode 100644 index 000000000..5a7cf4a84 --- /dev/null +++ b/test/infinicore/run_external_case.py @@ -0,0 +1,84 @@ +import sys +import os +import argparse +import time + +# ============================================================================== +# šŸ› ļø Path Adaptation +# ============================================================================== +current_dir = os.path.dirname(os.path.abspath(__file__)) +if current_dir not in sys.path: + sys.path.append(current_dir) + +from framework.testcase_manager import TestCaseManager + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="External Test Case Runner for InfiniCore") + + # Optional file path (if None, uses default add case) + parser.add_argument("file_path", type=str, nargs="?", help="Path to JSON config file") + + # Overrides + parser.add_argument("--device", type=str, default=None, help="Override target device (e.g. cuda, cpu)") + parser.add_argument("--bench", type=str, choices=["host", "device", "both"], default=None, help="Override benchmark mode") + parser.add_argument("--debug", action="store_true", help="Enable debug mode") + parser.add_argument("--num_prerun", type=int, default=None, help="Override warmup iterations") + parser.add_argument("--num_iterations", type=int, default=None, help="Override measured iterations") + parser.add_argument("--verbose", action="store_true", help="Enable verbose logging") + + # Save option + parser.add_argument( + "--save", + nargs="?", + const="AUTO", + default=None, + help="Path to save effective config JSON with results. If flag is used without value, generates 'test_case_.json'" + ) + + args = parser.parse_args() + + # Handle automatic save path generation + final_save_path = args.save + if final_save_path == "AUTO": + timestamp = time.strftime("%Y%m%d_%H%M%S") + final_save_path = f"result_{timestamp}.json" + + # Construct override dictionary + override_dict = { + k: v for k, v in vars(args).items() + if k not in ["file_path", "save"] and v is not None and v is not False + } + + if override_dict: + print(f"⚔ CLI Overrides detected: {override_dict}") + + # Run Manager + manager = TestCaseManager() + try: + results = manager.run( + json_file_path=args.file_path, + config=override_dict, + save_path=final_save_path + ) + + # Simple exit code logic based on results + success = True + if isinstance(results, list): + + for entry in results: + + exec_results = entry.get("execution_results", []) + for res in exec_results: + + status = res.get("status", {}) + if not status.get("success", False): + success = False + print(f"āŒ Failure detected: {status.get('error', 'Unknown error')}") + break + + if not success: + break + + except Exception as e: + print(f"\nāŒ Execution Error: {e}") + sys.exit(1) From 1d0504213fc79ace7599ce48683ceabcb4796967 Mon Sep 17 00:00:00 2001 From: baominghelly <1508269885@qq.com> Date: Thu, 4 Dec 2025 00:28:42 +0800 Subject: [PATCH 4/4] add main function && move tol to testcases --- test/infinicore/framework/testcase_manager.py | 486 ++++++------------ test/infinicore/run_external_case.py | 8 +- 2 files changed, 173 insertions(+), 321 deletions(-) diff --git a/test/infinicore/framework/testcase_manager.py b/test/infinicore/framework/testcase_manager.py index 9c59dac2e..509688872 100644 --- a/test/infinicore/framework/testcase_manager.py +++ b/test/infinicore/framework/testcase_manager.py @@ -27,7 +27,6 @@ class TestCaseManager: """ def __init__(self): - # Load supported hardware flags for CLI args (Strings) self.supported_hw_flags = [ item[0].lstrip("-") for item in get_supported_hardware_platforms() ] @@ -39,60 +38,30 @@ def run( save_path: str = None, ) -> Any: print(f"šŸš€ Test Case Manager: Start processing...") - override_dict = self._normalize_override_config(config) + overrides = self._normalize_override_config(config) - test_configs = [] - - # 1. Load Configurations + # 1. Unified Configuration Loading if json_file_path and os.path.exists(json_file_path): print(f"šŸ“„ Source: Loading {json_file_path}") - test_configs = self._load(json_file_path, override_config=override_dict) + test_configs = self._load(json_file_path, overrides) else: - # Fallback to default hardcoded case - ( - op_name, - test_cases, - final_args, - op_funcs, - op_paths, - ) = self._load_default_case(override_dict) - - test_configs.append( - { - "op_name": op_name, - "test_cases": test_cases, - "args": final_args, - "op_funcs": op_funcs, - "op_paths": op_paths, - "target_device": "cpu", - } - ) + test_configs = self._load_default_case(overrides) total_results = [] # 2. Execute & Collect Results for idx, cfg in enumerate(test_configs): op_name = cfg["op_name"] - test_cases = cfg["test_cases"] - n_cases = len(test_cases) - + n_cases = len(cfg["test_cases"]) print(f"\nšŸ”¹ Config {idx + 1}/{len(test_configs)}: {op_name} ({n_cases} cases)") # Execute - # results_list is a list of TestResult objects - results_list = self._execute_tests( - op_name, test_cases, cfg["args"], cfg["op_funcs"] + results = self._execute_tests( + op_name, cfg["test_cases"], cfg["args"], cfg["op_funcs"] ) # Report - entry = self._prepare_report_entry( - op_name, - test_cases, - cfg["args"], - cfg["op_paths"], - cfg["target_device"], - results_list, - ) + entry = self._prepare_report_entry(cfg, results) total_results.append(entry) # 3. Save @@ -101,7 +70,48 @@ def run( return total_results - def _load(self, json_file_path: str, override_config: Dict[str, Any]) -> List[Dict]: + def _create_exec_config(self, raw_data: Dict, overrides: Dict) -> Optional[Dict]: + """ + āœ… Core Simplification: Unified logic to build a config object from raw dict. + """ + op_name = raw_data.get("operator") + if not op_name: + return None + + # 1. Resolve Paths + t_op = raw_data.get("torch_op") or self._discover_op_path( + op_name, ["torch", "torch.nn.functional", "torch.special", "torch.fft"] + ) + i_op = raw_data.get("infinicore_op") or self._discover_op_path( + op_name, ["infinicore", "infinicore.nn.functional"] + ) + + # 2. Args & Device + args = self._get_default_args() + self._merge_args(args, raw_data.get("args", {})) + self._merge_args(args, overrides) + + dev_str = ( + overrides.get("device") + if overrides and "device" in overrides + else raw_data.get("device", "cpu") + ) + self._set_device_flags(args, dev_str) + + # 3. Build & Return + return { + "op_name": op_name, + "test_cases": self._build_test_cases(raw_data, op_name), + "args": args, + "op_funcs": { + "torch": self._load_function(t_op), + "infinicore": self._load_function(i_op), + }, + "op_paths": {"torch": t_op, "infinicore": i_op}, + "target_device": dev_str, + } + + def _load(self, json_file_path: str, overrides: Dict) -> List[Dict]: try: with open(json_file_path, "r", encoding="utf-8") as f: data = json.load(f) @@ -109,124 +119,81 @@ def _load(self, json_file_path: str, override_config: Dict[str, Any]) -> List[Di raise ValueError(f"Invalid JSON: {json_file_path}") data_list = data if isinstance(data, list) else [data] - configs = [] - - for case_data in data_list: - op_name = case_data.get("operator") - if not op_name: - continue - - torch_op = case_data.get("torch_op") or self._discover_op_path( - op_name, - ["torch", "torch.nn.functional"] - ) - infini_op = case_data.get("infinicore_op") or self._discover_op_path( - op_name, - ["infinicore", "infinicore.nn.functional"] - ) - - # Load Functions - op_funcs = { - "torch": self._load_function(torch_op), - "infinicore": self._load_function(infini_op), - } - op_paths = { - "torch": torch_op, - "infinicore": infini_op, - } - - # Setup Args & Device - case_args = self._get_default_args() - self._merge_args(case_args, case_data.get("args", {})) - - if override_config: - self._merge_args(case_args, override_config) - - # Determine Target Device - if override_config and "device" in override_config: - target_device = override_config["device"] - else: - target_device = case_data.get("device", "cpu") - - self._set_device_flags(case_args, target_device) - - # Build Cases (Strict Mode) - test_cases = self._build_test_cases(case_data, op_name) + # Use generator to filter None configs + return [ + cfg + for d in data_list + if (cfg := self._create_exec_config(d, overrides)) is not None + ] - configs.append( + def _load_default_case(self, overrides: Dict) -> List[Dict]: + # Construct raw dict and pass to unified creator + raw_data = { + "operator": "add", + "description": "Default Add", + "testcases": [ { - "op_name": op_name, - "test_cases": test_cases, - "args": case_args, - "op_funcs": op_funcs, - "op_paths": op_paths, - "target_device": target_device, + "inputs": [{"shape": [13, 4, 4]}, {"shape": [13, 4, 4]}], + "output_spec": {"shape": [13, 4, 4]}, } - ) - return configs + ], + } + return [self._create_exec_config(raw_data, overrides)] def _build_test_cases(self, data: Dict, op_name: str) -> List[TestCase]: - """ - Parses 'cases' list from JSON. - """ - cases_data = data.get("cases") + cases_data = data.get("testcases") if not cases_data or not isinstance(cases_data, list): - raise ValueError(f"āŒ Config for '{op_name}' missing required 'cases' list.") + raise ValueError(f"āŒ Config for '{op_name}' missing 'testcases' list.") base_desc = data.get("description", f"Auto-test {op_name}") - base_tol = data.get("tolerance", {"atol": 1e-3, "rtol": 1e-3}) - base_cmp = data.get("comparison_target", None) test_cases_list = [] - for idx, sub in enumerate(cases_data): - full_desc = f"{base_desc} - {sub.get('description', f'Case_{idx}')}" - - # Parse inputs - raw_inputs = sub.get("inputs", []) + # Compact list/dict comprehensions inputs = [ - self._parse_spec(inp, f"in_{i}") for i, inp in enumerate(raw_inputs) + self._parse_spec(inp, f"in_{i}") + for i, inp in enumerate(sub.get("inputs", [])) ] + + kwargs = { + k: ( + self._parse_spec(v, k) + if isinstance(v, dict) and "shape" in v + else v + ) + for k, v in sub.get("kwargs", {}).items() + } - # Parse kwargs - kwargs = {} - for k, v in sub.get("kwargs", {}).items(): - if isinstance(v, dict) and "shape" in v and "dtype" in v: - kwargs[k] = self._parse_spec(v, k) - else: - kwargs[k] = v - - # Parse outputs - out_spec = None - if "output_spec" in sub: - out_spec = self._parse_spec(sub["output_spec"], "out") - - out_specs = None - if "output_specs" in sub: - out_specs = [ - self._parse_spec(s, f"out_{i}") - for i, s in enumerate(sub["output_specs"]) - ] - - # Determine output count - out_count = len(out_specs) if out_specs else sub.get("output_count", 1) + out_spec = ( + self._parse_spec(sub["output_spec"], "out") + if "output_spec" in sub + else None + ) + + out_specs = ( + [self._parse_spec(s, f"out_{i}") for i, s in enumerate(sub["output_specs"])] + if "output_specs" in sub + else None + ) + + tol = sub.get("tolerance", {"atol": 1e-3, "rtol": 1e-3}) + cmp = sub.get("comparison_target", None) tc = TestCase( inputs=inputs, kwargs=kwargs, output_spec=out_spec, output_specs=out_specs, - comparison_target=base_cmp, - tolerance=sub.get("tolerance", base_tol), - description=full_desc, - output_count=out_count, + comparison_target=cmp, + tolerance=tol, + description=f"{base_desc} - {sub.get('description', f'Case_{idx}')}", + output_count=len(out_specs) if out_specs else sub.get("output_count", 1), ) test_cases_list.append(tc) return test_cases_list def _execute_tests(self, op_name, test_cases, args, op_funcs): - # Define dynamic test class class DynamicOpTest(BaseOperatorTest): def __init__(self): super().__init__(op_name) @@ -242,80 +209,55 @@ def infinicore_operator(self, *a, **k): runner = GenericTestRunner(DynamicOpTest, args) _, internal_runner = runner.run() - - # Returns a list of TestResult objects return getattr(internal_runner, "test_results", []) - def _prepare_report_entry( - self, op_name, test_cases, args, op_paths, device, results_list - ): - """ - Separates 'cases' (static input) and 'execution_results' (dynamic output). - """ + def _prepare_report_entry(self, cfg, results_list): # Map results by index - results_map = {} - if isinstance(results_list, list): - results_map = {i: res for i, res in enumerate(results_list)} - elif isinstance(results_list, dict): - results_map = results_list - else: - results_map = {0: results_list} - - processed_cases = [] - formatted_results = [] + res_map = ( + {i: r for i, r in enumerate(results_list)} + if isinstance(results_list, list) + else {0: results_list} + ) - for idx, tc in enumerate(test_cases): - # 1. Reconstruct case dict (Static info ONLY) - case_data = { + cases, results = [], [] + for idx, tc in enumerate(cfg["test_cases"]): + # 1. Static Info + cases.append({ "description": tc.description, "inputs": [self._spec_to_dict(i) for i in tc.inputs], "kwargs": { - k: ( - self._spec_to_dict(v) if isinstance(v, TensorSpec) else v - ) + k: (self._spec_to_dict(v) if isinstance(v, TensorSpec) else v) for k, v in tc.kwargs.items() }, "comparison_target": tc.comparison_target, "tolerance": tc.tolerance, - } - - if tc.output_spec: - case_data["output_spec"] = self._spec_to_dict(tc.output_spec) + **({"output_spec": self._spec_to_dict(tc.output_spec)} if tc.output_spec else {}), + **({"output_specs": [self._spec_to_dict(s) for s in tc.output_specs]} if tc.output_specs else {}), + **({"output_count": tc.output_count} if tc.output_count > 1 and not tc.output_specs else {}) + }) + + # 2. Dynamic Result + res = res_map.get(idx) + results.append( + self._fmt_result(res) if res else {"status": {"success": False, "error": "No result"}} + ) - if hasattr(tc, "output_specs") and tc.output_specs: - case_data["output_specs"] = [ - self._spec_to_dict(s) for s in tc.output_specs - ] - - processed_cases.append(case_data) - - # 2. Extract Result - res = results_map.get(idx) - if res: - formatted_results.append(self._fmt_result(res)) - else: - formatted_results.append({"status": {"success": False, "error": "No result"}}) - - # Global Arguments - global_args = { - k: getattr(args, k) + # Global Args + g_args = { + k: getattr(cfg["args"], k) for k in ["bench", "num_prerun", "num_iterations", "verbose", "debug"] - if hasattr(args, k) + if hasattr(cfg["args"], k) } - # Use tolerance from the first case as global tolerance display - global_tolerance = test_cases[0].tolerance if test_cases else {"atol": 1e-3, "rtol": 1e-3} - return { - "operator": op_name, - "device": device, - "description": f"Test Report for {op_name}", - "torch_op": op_paths["torch"], - "infinicore_op": op_paths["infinicore"], - "tolerance": global_tolerance, - "args": global_args, - "cases": processed_cases, - "execution_results": formatted_results, + "operator": cfg["op_name"], + "device": cfg["target_device"], + "description": f"Test Report for {cfg['op_name']}", + "torch_op": cfg["op_paths"]["torch"], + "infinicore_op": cfg["op_paths"]["infinicore"], + "args": g_args, + "testcases": cases, + "execution_results": results, } def _save_all_results(self, save_path, total_results): @@ -323,66 +265,42 @@ def _save_all_results(self, save_path, total_results): try: with open(save_path, "w", encoding="utf-8") as f: f.write("[\n") - for i, entry in enumerate(total_results): f.write(" {\n") keys = list(entry.keys()) - for j, key in enumerate(keys): - # āœ… Apply list compression to both 'cases' and 'execution_results' - if key in ["cases", "execution_results"] and isinstance(entry[key], list): + # Special handling for lists (cases/results) + if key in ["testcases", "execution_results"] and isinstance(entry[key], list): f.write(f' "{key}": [\n') - sub_list = entry[key] - for c_idx, c_item in enumerate(sub_list): - # Compress each item into one line - c_str = json.dumps(c_item, ensure_ascii=False) - comma = "," if c_idx < len(sub_list) - 1 else "" + for k_idx, item in enumerate(entry[key]): + c_str = json.dumps(item, ensure_ascii=False) + comma = "," if k_idx < len(entry[key]) - 1 else "" f.write(f" {c_str}{comma}\n") - - list_comma = "," if j < len(keys) - 1 else "" - f.write(f" ]{list_comma}\n") + f.write(f" ]{',' if j < len(keys) - 1 else ''}\n") else: - # Standard compact formatting for other fields k_str = json.dumps(key, ensure_ascii=False) v_str = json.dumps(entry[key], ensure_ascii=False) - - comma = "," if j < len(keys) - 1 else "" - f.write(f" {k_str}: {v_str}{comma}\n") - - if i < len(total_results) - 1: - f.write(" },\n") - else: - f.write(" }\n") - + f.write(f" {k_str}: {v_str}{',' if j < len(keys) - 1 else ''}\n") + f.write(f" }}{',' if i < len(total_results) - 1 else ''}\n") f.write("]\n") - print(f" āœ… Saved (Structure Matched).") + print(f" āœ… Saved.") except Exception as e: print(f" āŒ Save failed: {e}") # --- Helpers --- def _discover_op_path(self, op_name: str, candidates: List[str]) -> str: - """ - Attempts to find a valid function path by trying imports. - """ for prefix in candidates: - full_path = f"{prefix}.{op_name}" + path = f"{prefix}.{op_name}" try: - self._load_function(full_path) - return full_path + self._load_function(path) + return path except (ImportError, AttributeError, ValueError): continue - raise ValueError( - f"āŒ Could not auto-discover function for operator '{op_name}' " - f"in candidates: {candidates}. Please specify 'torch_op'/'infinicore_op' explicitly." - ) - + raise ValueError(f"āŒ Cannot find op '{op_name}' in {candidates}") + def _parse_spec(self, d, name): - """ - Parses dict into TensorSpec. - """ strides = tuple(d["strides"]) if d.get("strides") else None - return TensorSpec.from_tensor( tuple(d["shape"]), strides, @@ -395,28 +313,18 @@ def _spec_to_dict(self, s): "name": s.name, "shape": list(s.shape) if s.shape else None, "dtype": str(s.dtype).split(".")[-1], - # Add strides to output if present "strides": list(s.strides) if s.strides else None, } def _fmt_result(self, res): - """ - Format result with optimized Map lookup. - """ if not (is_dataclass(res) or hasattr(res, "success")): return str(res) - + get_time = lambda k: round(getattr(res, k, 0.0), 4) - - # Build Map Locally - device_id_map = { - v: k - for k, v in vars(InfiniDeviceEnum).items() - if not k.startswith("_") - } - - raw_id = getattr(res, "device", None) - dev_str = device_id_map.get(raw_id, str(raw_id)) + + # Build Map + dev_map = {v: k for k, v in vars(InfiniDeviceEnum).items() if not k.startswith("_")} + dev_str = dev_map.get(getattr(res, "device", None), str(getattr(res, "device", None))) return { "status": { @@ -424,92 +332,34 @@ def _fmt_result(self, res): "error": getattr(res, "error_message", ""), }, "perf_ms": { - "torch": { - "host": get_time("torch_host_time"), - "device": get_time("torch_device_time"), - }, - "infinicore": { - "host": get_time("infini_host_time"), - "device": get_time("infini_device_time"), - }, + "torch": {"host": get_time("torch_host_time"), "device": get_time("torch_device_time")}, + "infinicore": {"host": get_time("infini_host_time"), "device": get_time("infini_device_time")}, }, - "device": dev_str, + "dev": dev_str, } def _load_function(self, path): - if not path or "." not in path: - raise ValueError(f"Invalid path: {path}") - module_name, func_name = path.rsplit(".", 1) - module = importlib.import_module(module_name) - return getattr(module, func_name) + if not path or "." not in path: raise ValueError(f"Invalid path: {path}") + m, f = path.rsplit(".", 1) + return getattr(importlib.import_module(m), f) def _get_default_args(self): - old_argv = sys.argv - sys.argv = [sys.argv[0]] - args = get_args() - sys.argv = old_argv + old_argv = sys.argv; sys.argv = [sys.argv[0]]; args = get_args(); sys.argv = old_argv return args def _merge_args(self, args, overrides): - if not overrides: - return - - data = ( - vars(overrides) if isinstance(overrides, argparse.Namespace) else overrides - ) + if not overrides: return + data = vars(overrides) if isinstance(overrides, argparse.Namespace) else overrides for k, v in data.items(): - if v is not None: - setattr(args, k, v) - - def _set_device_flags(self, args, device_str): - # Reset existing flags - for flag in self.supported_hw_flags: - if hasattr(args, flag): - setattr(args, flag, False) + if v is not None: setattr(args, k, v) - d = str(device_str).lower() - - if hasattr(args, d): - setattr(args, d, True) - else: - args.cpu = True - print(f"āš ļø Device '{d}' -> CPU (Fallback)") + def _set_device_flags(self, args, dev_str): + for flag in self.supported_hw_flags: setattr(args, flag, False) + d = str(dev_str).lower() + if hasattr(args, d): setattr(args, d, True) + else: args.cpu = True; print(f"āš ļø Device '{d}' -> CPU") def _normalize_override_config(self, config): if isinstance(config, str) and os.path.exists(config): - with open(config) as f: - return json.load(f) - - if isinstance(config, argparse.Namespace): - return vars(config) - - return config or {} - - def _load_default_case(self, overrides): - args = self._get_default_args() - self._merge_args(args, overrides) - self._set_device_flags(args, "cpu") - - data = { - "description": "Default Add", - "cases": [ - { - "inputs": [{"shape": [13, 4, 4]}, {"shape": [13, 4, 4]}], - "output_spec": {"shape": [13, 4, 4]}, - } - ], - } - - op_name = "add" - test_cases = self._build_test_cases(data, op_name) - - op_funcs = { - "torch": self._load_function("torch.add"), - "infinicore": self._load_function("infinicore.add"), - } - op_paths = { - "torch": "torch.add", - "infinicore": "infinicore.add", - } - - return op_name, test_cases, args, op_funcs, op_paths + with open(config) as f: return json.load(f) + return vars(config) if isinstance(config, argparse.Namespace) else (config or {}) diff --git a/test/infinicore/run_external_case.py b/test/infinicore/run_external_case.py index 5a7cf4a84..a00ae140a 100644 --- a/test/infinicore/run_external_case.py +++ b/test/infinicore/run_external_case.py @@ -12,14 +12,14 @@ from framework.testcase_manager import TestCaseManager -if __name__ == "__main__": +def main(): parser = argparse.ArgumentParser(description="External Test Case Runner for InfiniCore") # Optional file path (if None, uses default add case) parser.add_argument("file_path", type=str, nargs="?", help="Path to JSON config file") # Overrides - parser.add_argument("--device", type=str, default=None, help="Override target device (e.g. cuda, cpu)") + parser.add_argument("--device", type=str, default=None, help="Override target device (e.g. nvidia, cpu)") parser.add_argument("--bench", type=str, choices=["host", "device", "both"], default=None, help="Override benchmark mode") parser.add_argument("--debug", action="store_true", help="Enable debug mode") parser.add_argument("--num_prerun", type=int, default=None, help="Override warmup iterations") @@ -64,7 +64,6 @@ # Simple exit code logic based on results success = True if isinstance(results, list): - for entry in results: exec_results = entry.get("execution_results", []) @@ -82,3 +81,6 @@ except Exception as e: print(f"\nāŒ Execution Error: {e}") sys.exit(1) + +if __name__ == "__main__": + main()