diff --git a/run.log b/run.log new file mode 100644 index 0000000..f1e6fe9 --- /dev/null +++ b/run.log @@ -0,0 +1,122 @@ +============================= test session starts ============================== +platform linux -- Python 3.10.19, pytest-9.0.2, pluggy-1.6.0 -- /data/m00956180/.conda/envs/mjzkd/bin/python3.10 +cachedir: .pytest_cache +rootdir: /data/m00956180/runtime/ptofuzz +configfile: pyproject.toml +plugins: forked-1.6.0, repeat-0.9.4 +collecting ... collected 1 item + +src/fuzzer/generated_tests/test_fuzz_multi_kernel.py::TestMultiKernelFuzzing::test_fuzz_sequential_simple 2026-02-12 15:53:42.801 D | Created 1 basic blocks +2026-02-12 15:53:42.801 I | Identified 1 basic blocks +2026-02-12 15:53:42.801 D | Found 6 dependencies in block 0 +2026-02-12 15:53:42.801 I | Found 6 dependency edges +2026-02-12 15:53:42.801 I | Dependency types: RAW=6, WAR=0, WAW=0 +2026-02-12 15:53:42.801 D | Assigned declaration order to 8 statements +2026-02-12 15:53:42.801 D | Variable tile_a has 1 use statements +2026-02-12 15:53:42.801 D | Use at order 3 +2026-02-12 15:53:42.801 D | Lifetime for tile_a: [0, 3] space=1 size=65536 +2026-02-12 15:53:42.801 D | Variable tile_b has 1 use statements +2026-02-12 15:53:42.801 D | Use at order 2 +2026-02-12 15:53:42.801 D | Lifetime for tile_b: [1, 2] space=1 size=65536 +2026-02-12 15:53:42.801 D | Variable tmp_0 has 1 use statements +2026-02-12 15:53:42.801 D | Use at order 5 +2026-02-12 15:53:42.801 D | Lifetime for tmp_0: [2, 5] space=1 size=65536 +2026-02-12 15:53:42.801 D | Variable tmp_1 has 1 use statements +2026-02-12 15:53:42.801 D | Use at order 4 +2026-02-12 15:53:42.801 D | Lifetime for tmp_1: [3, 4] space=1 size=65536 +2026-02-12 15:53:42.801 D | Variable tmp_2 has 1 use statements +2026-02-12 15:53:42.801 D | Use at order 5 +2026-02-12 15:53:42.801 D | Lifetime for tmp_2: [4, 5] space=1 size=65536 +2026-02-12 15:53:42.801 D | Variable tmp_3 has 1 use statements +2026-02-12 15:53:42.801 D | Use at order 6 +2026-02-12 15:53:42.801 D | Lifetime for tmp_3: [5, 6] space=1 size=65536 +2026-02-12 15:53:42.801 D | Variable tmp_1 can reuse tile_b (lifetime [3, 4] vs [1, 2]) +2026-02-12 15:53:42.801 D | Variable tmp_2 can reuse tile_a (lifetime [4, 5] vs [0, 3]) +2026-02-12 15:53:42.801 D | Variable tmp_3 cannot reuse tile_a due to overlap with existing user tmp_2 (lifetime [5, 6] vs [4, 5]) +2026-02-12 15:53:42.801 D | Variable tmp_3 can reuse tile_b (lifetime [5, 6] vs [1, 2]) +2026-02-12 15:53:42.801 D | Created 1 basic blocks +2026-02-12 15:53:42.801 I | Identified 1 basic blocks +2026-02-12 15:53:42.801 D | Found 8 dependencies in block 0 +2026-02-12 15:53:42.801 I | Found 8 dependency edges +2026-02-12 15:53:42.801 I | Dependency types: RAW=8, WAR=0, WAW=0 +2026-02-12 15:53:42.801 D | Assigned declaration order to 9 statements +2026-02-12 15:53:42.801 D | Variable tile_a has 1 use statements +2026-02-12 15:53:42.801 D | Use at order 2 +2026-02-12 15:53:42.801 D | Lifetime for tile_a: [0, 2] space=1 size=65536 +2026-02-12 15:53:42.801 D | Variable tile_b has 1 use statements +2026-02-12 15:53:42.801 D | Use at order 2 +2026-02-12 15:53:42.801 D | Lifetime for tile_b: [1, 2] space=1 size=65536 +2026-02-12 15:53:42.801 D | Variable tmp_0 has 2 use statements +2026-02-12 15:53:42.801 D | Use at order 3 +2026-02-12 15:53:42.801 D | Use at order 5 +2026-02-12 15:53:42.801 D | Lifetime for tmp_0: [2, 5] space=1 size=65536 +2026-02-12 15:53:42.801 D | Variable tmp_1 has 1 use statements +2026-02-12 15:53:42.801 D | Use at order 4 +2026-02-12 15:53:42.801 D | Lifetime for tmp_1: [3, 4] space=1 size=65536 +2026-02-12 15:53:42.801 D | Variable tmp_2 has 1 use statements +2026-02-12 15:53:42.801 D | Use at order 6 +2026-02-12 15:53:42.801 D | Lifetime for tmp_2: [4, 6] space=1 size=65536 +2026-02-12 15:53:42.801 D | Variable tmp_3 has 1 use statements +2026-02-12 15:53:42.801 D | Use at order 6 +2026-02-12 15:53:42.801 D | Lifetime for tmp_3: [5, 6] space=1 size=65536 +2026-02-12 15:53:42.801 D | Variable tmp_4 has 1 use statements +2026-02-12 15:53:42.801 D | Use at order 7 +2026-02-12 15:53:42.801 D | Lifetime for tmp_4: [6, 7] space=1 size=65536 +2026-02-12 15:53:42.801 D | Variable tmp_1 can reuse tile_a (lifetime [3, 4] vs [0, 2]) +2026-02-12 15:53:42.801 D | Variable tmp_2 cannot reuse tile_a due to overlap with existing user tmp_1 (lifetime [4, 6] vs [3, 4]) +2026-02-12 15:53:42.801 D | Variable tmp_2 can reuse tile_b (lifetime [4, 6] vs [1, 2]) +2026-02-12 15:53:42.801 D | Variable tmp_3 can reuse tile_a (lifetime [5, 6] vs [0, 2]) +2026-02-12 15:53:42.801 D | Variable tmp_4 cannot reuse tile_a due to overlap with existing user tmp_3 (lifetime [6, 7] vs [5, 6]) +2026-02-12 15:53:42.802 D | Variable tmp_4 cannot reuse tile_b due to overlap with existing user tmp_2 (lifetime [6, 7] vs [4, 6]) +2026-02-12 15:53:42.802 D | Variable tmp_4 can reuse tmp_0 (lifetime [6, 7] vs [2, 5]) +2026-02-12 15:53:42.802 D | Created 1 basic blocks +2026-02-12 15:53:42.802 I | Identified 1 basic blocks +2026-02-12 15:53:42.802 D | Found 1 dependencies in block 0 +2026-02-12 15:53:42.802 I | Found 1 dependency edges +2026-02-12 15:53:42.802 I | Dependency types: RAW=1, WAR=0, WAW=0 +2026-02-12 15:53:42.802 D | Assigned declaration order to 3 statements +2026-02-12 15:53:42.802 W | No TileType variables found, skipping memory reuse +2026-02-12 15:53:42.803 D | Created 1 basic blocks +2026-02-12 15:53:42.803 D | Found 1 dependencies in block 0 +[INFO] ensure_device_set: DeviceRunner: device=9 set, streams created +[INFO] init_runtime_impl: Registering 2 kernel(s) in init_runtime_impl +[INFO] init_runtime_impl: Loaded orchestration function: BuildOrchestrator +[INFO] init_runtime_impl: === Calling Orchestration Function === +[INFO] init_runtime_impl: Runtime initialized. Ready for execution from Python. +[INFO] ensure_binaries_loaded: DeviceRunner: binaries loaded + +=== Initialize runtime args === + +=== launch_aicpu_kernel DynTileFwkKernelServerInit=== + +=== launch_aicpu_kernel DynTileFwkKernelServer=== + +=== launch_aicore_kernel=== + +=== rtStreamSynchronize stream_aicpu_=== + +=== rtStreamSynchronize stream_aicore_=== +[INFO] validate_runtime_impl: === Copying Results Back to Host === +[INFO] validate_runtime_impl: === Cleaning Up === +[INFO] validate_runtime_impl: Freed 1 device tensors +[INFO] validate_runtime_impl: === Finalize Complete === +PASSED + +=============================== warnings summary =============================== +../../.conda/envs/mjzkd/lib/python3.10/site-packages/torch_npu/utils/collect_env.py:58 +../../.conda/envs/mjzkd/lib/python3.10/site-packages/torch_npu/utils/collect_env.py:58 + /data/m00956180/.conda/envs/mjzkd/lib/python3.10/site-packages/torch_npu/utils/collect_env.py:58: UserWarning: Warning: The /usr/local/Ascend/cann-8.5.0 owner does not match the current owner. + warnings.warn(f"Warning: The {path} owner does not match the current owner.") + +../../.conda/envs/mjzkd/lib/python3.10/site-packages/torch_npu/utils/collect_env.py:58 +../../.conda/envs/mjzkd/lib/python3.10/site-packages/torch_npu/utils/collect_env.py:58 + /data/m00956180/.conda/envs/mjzkd/lib/python3.10/site-packages/torch_npu/utils/collect_env.py:58: UserWarning: Warning: The /usr/local/Ascend/cann-8.5.0/aarch64-linux/ascend_toolkit_install.info owner does not match the current owner. + warnings.warn(f"Warning: The {path} owner does not match the current owner.") + +src/fuzzer/generated_tests/test_fuzz_multi_kernel.py:24 + /data/m00956180/runtime/ptofuzz/src/fuzzer/generated_tests/test_fuzz_multi_kernel.py:24: PytestCollectionWarning: cannot collect test class 'TestFuzzSequentialSimple' because it has a __init__ constructor (from: src/fuzzer/generated_tests/test_fuzz_multi_kernel.py) + class TestFuzzSequentialSimple(PTOTestCase): + +-- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html +======================== 1 passed, 5 warnings in 12.83s ======================== +[INFO] finalize: DeviceRunner finalized diff --git a/src/fuzzer/README.md b/src/fuzzer/README.md new file mode 100644 index 0000000..3f0f784 --- /dev/null +++ b/src/fuzzer/README.md @@ -0,0 +1,566 @@ +# 多内核模糊测试框架 (Multi-Kernel Fuzzing Framework) + +这是一个用于生成和测试多内核 PyPTO 程序的自动化框架。该框架可以随机生成多个 InCore 内核函数,并通过 Orchestration 函数以不同的模式组合它们。 + +**注意**:`src/fuzzer` 是一个独立的框架,不依赖 `src/pto_test/fuzzing`。所有必要的代码都包含在此目录中。 + +--- + +## 目录 + +1. [快速开始](#快速开始) +2. [代码结构](#代码结构) +3. [核心概念](#核心概念) +4. [配置指南](#配置指南) +5. [算子规则](#算子规则) +6. [使用示例](#使用示例) +7. [更新日志](#更新日志) + +--- + +## 快速开始 + +### 基础示例 + +```bash +# 生成测试用例(使用默认配置) +python src/fuzzer/example_multi_kernel.py + +# 生成特定配置的测试用例 +python src/fuzzer/example_multi_kernel.py --config-index 0 + +# 设置误差容忍度 +python src/fuzzer/example_multi_kernel.py --atol 1e-3 --rtol 1e-3 + +# 运行测试(只生成代码) +pytest src/fuzzer/generated_tests/test_fuzz_multi_kernel.py -v --codegen-only + +# 查看生成的 C++ 代码 +pytest src/fuzzer/generated_tests/test_fuzz_multi_kernel.py -v --codegen-only --save-kernels --kernels-dir=/tmp/kernels +``` + +### 命令行参数 + +```bash +python src/fuzzer/example_multi_kernel.py [选项] + +选项: + --config-index N 指定配置索引(从0开始),不指定则使用所有配置 + --output PATH 输出文件路径(默认: src/fuzzer/generated_tests/test_fuzz_multi_kernel.py) + --atol FLOAT 绝对误差容忍度(默认: 1e-4) + --rtol FLOAT 相对误差容忍度(默认: 1e-4) +``` + +--- + +## 代码结构 + +### 目录结构 + +``` +src/fuzzer/ # 独立的模糊测试框架 +├── __init__.py # 外部接口 +├── example_multi_kernel.py # 使用示例脚本(主入口) +├── conftest.py # pytest 配置 +├── README.md # 本文档 +├── src/ # 内部实现 +│ ├── __init__.py +│ ├── fuzzer.py # OpFuzzer 核心逻辑 +│ ├── kernel_generator.py # InCore 内核生成器 +│ ├── orchestrator_generator.py # Orchestration 组合函数生成器 +│ └── multi_kernel_test_generator.py # 完整测试用例生成器 +└── generated_tests/ # 生成的测试文件目录 + └── test_fuzz_multi_kernel.py # 生成的测试文件 +``` + +### 核心模块说明 + +#### 1. fuzzer.py - OpFuzzer +操作符模糊生成器,负责: +- 定义所有支持的算子(二元、一元、标量、高级算子) +- 随机生成操作链 +- 处理数据约束(避免除零、正值约束等) +- 生成 NumPy/PyTorch 参考实现 + +**主要类**: +- `OpSpec`: 算子规格定义 +- `OpFuzzer`: 操作链生成器 + +#### 2. kernel_generator.py - KernelGenerator +内核生成器,负责: +- 生成单个 InCore 内核函数 +- 支持不同数量和维度的输入 +- 生成 PyPTO 代码和 PyTorch 参考实现 +- 处理形状对齐约束 + +#### 3. orchestrator_generator.py - OrchestratorGenerator +编排函数生成器,负责: +- 生成 Orchestration 函数 +- 支持三种组合模式:sequential、branching、mixed +- 管理内核间的数据流 + +#### 4. multi_kernel_test_generator.py - MultiKernelTestGenerator +测试用例生成器,负责: +- 生成完整的 PTOTestCase 类 +- 集成内核和编排函数 +- 生成 PyTorch 参考实现 +- 生成测试文件 + +--- + +## 核心概念 + +### 1. 内核生成规则 + +每个 InCore 内核包含: +- **输入**: 1-3 个 tile 张量,支持不同维度 +- **操作链**: 3-10 个随机操作 +- **输出**: 1 个 tile 张量 + +**操作链生成规则**: +1. 从输入张量中随机选择操作数 +2. 随机选择一个操作符(根据权重) +3. 执行操作并生成中间结果 +4. 中间结果可以被后续操作使用 +5. 最后一个操作的结果作为内核输出 + +**示例**: +```python +@pl.function(type=pl.FunctionType.InCore) +def kernel_0(self, a: pl.Tensor[[128, 128], pl.FP32], + b: pl.Tensor[[64, 64], pl.FP32]) -> pl.Tensor[[128, 128], pl.FP32]: + tile_a = pl.load(a, offsets=[0, 0], shapes=[128, 128]) + tile_b = pl.load(b, offsets=[0, 0], shapes=[128, 128]) + tmp_0 = pl.add(tile_b, tile_a) # 操作1 + tmp_1 = pl.mul(tmp_0, tile_a) # 操作2 + tmp_2 = pl.sub(tmp_1, tile_b) # 操作3 + return tmp_2 +``` + +### 2. 内核组合模式 + +#### Sequential (顺序模式) +内核按顺序执行,每个内核的输出作为下一个内核的输入。 + +``` +input → kernel_0 → kernel_1 → kernel_2 → output +``` + +#### Branching (分支模式) +多个内核并行执行,使用 merge 内核合并结果。 + +``` +input → kernel_0 ↘ +input → kernel_1 → merge → output +input → kernel_2 ↗ +``` + +#### Mixed (混合模式) +结合顺序和分支执行。 + +``` +input → kernel_0 ↘ +input → kernel_1 → merge → kernel_2 → kernel_3 → output +``` + +### 3. 支持的算子 + +#### 基本算子(默认启用) +- **二元操作**: add, sub, mul, div, maximum, minimum +- **标量操作**: adds, subs, muls, divs +- **一元操作**: sqrt, rsqrt, exp, neg, recip, log, abs, relu + +#### 高级算子(需要启用) +- **行广播操作**: row_expand_add, row_expand_sub, row_expand_mul, row_expand_div +- **矩阵操作**: matmul + +详细算子规则请参考 [算子规则](#算子规则) 章节。 + +--- + +## 配置指南 + +### 配置结构 + +所有配置都在 `example_multi_kernel.py` 的 `all_configs` 列表中定义: + +```python +all_configs = [ + { + # 基本信息 + "name": "test_name", # 测试用例名称(必需) + "description": "测试描述", # 测试描述(可选) + + # 生成控制 + "num_instances": 1, # 从该配置生成的测试实例数量 + "seed": 42, # 随机种子 + + # 算子配置 + "enable_advanced_ops": False, # 是否启用高级算子 + + # 张量配置 + "tensor_init_type": "constant", # 张量初始化类型 + "shape": (128, 128), # 张量形状 + + # 内核配置 + "num_kernels": 3, # 内核数量 + "mode": "sequential", # 组合模式 + "num_ops_range": (3, 7), # 每个内核的操作数量范围 + "input_shapes_list": None, # 每个内核的输入形状列表(可选) + }, +] +``` + +### 配置字段详解 + +#### 1. 基本信息 +- **name** (必需): 测试用例的名称 +- **description** (可选): 测试用例的描述 + +#### 2. 生成控制 +- **num_instances** (默认: 1): 从该配置生成的测试实例数量 + - 如果设置为 N > 1,将生成 N 个测试用例 + - 每个实例使用不同的随机种子:`seed + instance_index` + - 实例名称自动添加索引:`name_0`, `name_1`, ..., `name_N-1` + +- **seed** (默认: 42): 随机种子,用于可重现性 + +#### 3. 算子配置 +- **enable_advanced_ops** (默认: False): 是否启用高级算子 + - False: 只使用基本算子 + - True: 包含高级算子(row_expand, matmul 等) + +#### 4. 张量配置 +- **tensor_init_type** (默认: "constant"): 张量初始化类型 + - `"constant"`: 每个张量使用不同的常量值(2.0, 2.5, 3.0, ...) + - `"random"`: 使用 `torch.randn` 生成随机正态分布值 + - `"range"`: 使用 `torch.rand` 生成 [0, 1) 范围内的随机值 + - `"normal"`: 使用 `torch.randn` 生成标准正态分布值 + - `"ones"`: 所有元素初始化为 1.0 + - `"zeros"`: 所有元素初始化为 0.0 + +- **shape** (默认: (128, 128)): 张量的形状 + +#### 5. 内核配置 +- **num_kernels** (默认: 3): 生成的内核数量 + +- **mode** (默认: "sequential"): 内核组合模式 + - `"sequential"`: 顺序执行 + - `"branching"`: 分支执行 + - `"mixed"`: 混合模式 + +- **num_ops_range** (默认: (3, 7)): 每个内核包含的操作数量范围 + +- **input_shapes_list** (可选): 每个内核的输入形状列表 + - 如果为 None,则自动生成 + - 示例:`[[(128, 128), (128, 128)], [(128, 128)]]` + +### 配置示例 + +#### 示例 1: 简单顺序执行 +```python +{ + "name": "simple_sequential", + "num_instances": 1, + "seed": 42, + "enable_advanced_ops": False, + "num_kernels": 2, + "mode": "sequential", + "shape": (128, 128), + "num_ops_range": (3, 5), + "tensor_init_type": "constant", + "input_shapes_list": [ + [(128, 128), (128, 128)], # kernel_0: 2个输入 + ], + "description": "简单顺序执行:2个内核" +} +``` + +#### 示例 2: 生成多个随机测试实例 +```python +{ + "name": "random_tests", + "num_instances": 5, # 生成5个测试用例 + "seed": 100, # 将使用种子 100, 101, 102, 103, 104 + "enable_advanced_ops": False, + "num_kernels": 3, + "mode": "branching", + "shape": (128, 128), + "num_ops_range": (4, 8), + "tensor_init_type": "random", + "input_shapes_list": None, + "description": "随机分支测试:生成5个不同的测试实例" +} +``` + +--- + +## 算子规则 + +### 形状对齐约束 + +**重要**: 所有 tensor 创建和 reshape 操作必须满足 32 字节对齐约束。 + +**规则**: +- 形状的尾轴(最后一个维度,即列数)必须满足: + 1. 尾轴 = 1, 或者 + 2. (尾轴 × sizeof(datatype)) % 32 == 0 + +**FP32 类型的有效尾轴值**: +- 尾轴 = 1(总是有效) +- 尾轴 % 8 == 0(因为 8 × 4 = 32) +- 有效值: 1, 8, 16, 24, 32, 40, 48, 56, 64, ..., 128, ... + +**示例**: +```python +# ✓ 有效的形状 +pl.tensor.create([128, 1], pl.FP32) # 尾轴=1 +pl.tensor.create([128, 8], pl.FP32) # 8*4=32, 对齐 +pl.tensor.create([128, 128], pl.FP32) # 128*4=512, 对齐 + +# ✗ 无效的形状 +pl.tensor.create([128, 3], pl.FP32) # 3*4=12, 不对齐 +pl.tensor.create([128, 5], pl.FP32) # 5*4=20, 不对齐 +``` + +### 算子分类 + +#### 1. Block Element-wise Binary Operations +| 算子名 | 输入类型 | 输出类型 | 约束 | NumPy等价 | +|--------|----------|----------|------|-----------| +| `block.add` | `tile, tile` | `tile` | 支持广播 | `a + b` | +| `block.sub` | `tile, tile` | `tile` | 支持广播 | `a - b` | +| `block.mul` | `tile, tile` | `tile` | 支持广播 | `a * b` | +| `block.div` | `tile, tile` | `tile` | 避免除零 | `a / b` | +| `block.maximum` | `tile, tile` | `tile` | 支持广播 | `np.maximum(a, b)` | +| `block.minimum` | `tile, tile` | `tile` | 支持广播 | `np.minimum(a, b)` | + +#### 2. Block Scalar Operations +| 算子名 | 输入类型 | 输出类型 | NumPy等价 | +|--------|----------|----------|-----------| +| `block.adds` | `tile, scalar` | `tile` | `a + s` | +| `block.subs` | `tile, scalar` | `tile` | `a - s` | +| `block.muls` | `tile, scalar` | `tile` | `a * s` | +| `block.divs` | `tile, scalar` | `tile` | `a / s` | + +#### 3. Block Unary Operations +| 算子名 | 输入类型 | 输出类型 | 约束 | NumPy等价 | +|--------|----------|----------|------|-----------| +| `block.neg` | `tile` | `tile` | - | `-a` | +| `block.exp` | `tile` | `tile` | 建议范围 [-10, 10] | `np.exp(a)` | +| `block.recip` | `tile` | `tile` | 避免除零 | `1.0 / a` | +| `block.sqrt` | `tile` | `tile` | 输入 ≥ 0 | `np.sqrt(a)` | +| `block.rsqrt` | `tile` | `tile` | 输入 > 0 | `1.0 / np.sqrt(a)` | +| `block.log` | `tile` | `tile` | 输入 > 0 | `np.log(a)` | +| `block.abs` | `tile` | `tile` | - | `np.abs(a)` | +| `block.relu` | `tile` | `tile` | - | `np.maximum(0, a)` | + +#### 4. Block Row/Column Broadcast Operations (高级) +| 算子名 | 输入类型 | 输出类型 | 形状约束 | NumPy等价 | +|--------|----------|----------|----------|-----------| +| `block.row_expand_add` | `tile[M,N], tile[M,1]` | `tile[M,N]` | 第二个输入 [M,1] | `tile + row_vec` | +| `block.row_expand_sub` | `tile[M,N], tile[M,1]` | `tile[M,N]` | 第二个输入 [M,1] | `tile - row_vec` | +| `block.row_expand_mul` | `tile[M,N], tile[M,1]` | `tile[M,N]` | 第二个输入 [M,1] | `tile * row_vec` | +| `block.row_expand_div` | `tile[M,N], tile[M,1]` | `tile[M,N]` | 第二个输入 [M,1],避免除零 | `tile / row_vec` | + +#### 5. Block Matrix Operations (高级) +| 算子名 | 输入类型 | 输出类型 | 形状约束 | NumPy等价 | +|--------|----------|----------|----------|-----------| +| `block.matmul` | `tile, tile` | `tile` | `[M, K] @ [K, N] -> [M, N]` | `a @ b` | + +### 数据约束 + +1. **避免除零**: `div`, `divs`, `recip`, `row_expand_div` + - 确保分母绝对值 ≥ 0.01 + +2. **正值约束**: `sqrt`, `rsqrt`, `log` + - 确保输入 > 0 或使用 `abs(x) + 1e-6` + +3. **范围约束**: `exp` + - 建议输入范围 [-10, 10] 避免溢出 + +### 常见算子组合模式 + +#### Softmax 组件 +```python +# Step 1: Row max reduction +max_vals = pl.row_max(tile, tmp_tile) # [M,N] -> [M,1] + +# Step 2: Subtract max (数值稳定性) +centered = pl.row_expand_sub(tile, max_vals) + +# Step 3: Exponential +exp_vals = pl.exp(centered) + +# Step 4: Row sum +sum_vals = pl.row_sum(exp_vals, tmp_tile) + +# Step 5: Normalize +output = pl.row_expand_div(exp_vals, sum_vals) +``` + +#### ReLU 及变体 +```python +# ReLU +output = pl.relu(tile) + +# LeakyReLU (alpha=0.01) +neg_part = pl.muls(tile, 0.01) +output = pl.maximum(tile, neg_part) +``` + +--- + +## 使用示例 + +### 生成测试用例 + +```bash +# 使用默认配置生成所有测试用例 +python src/fuzzer/example_multi_kernel.py + +# 只生成第一个配置 +python src/fuzzer/example_multi_kernel.py --config-index 0 + +# 设置误差容忍度 +python src/fuzzer/example_multi_kernel.py --atol 1e-3 --rtol 1e-3 + +# 指定输出文件 +python src/fuzzer/example_multi_kernel.py --output my_tests.py + +# 组合使用 +python src/fuzzer/example_multi_kernel.py --config-index 1 --atol 1e-4 --rtol 1e-4 --output my_test.py +``` + +### 运行测试 + +```bash +# 运行所有测试 +pytest src/fuzzer/generated_tests/test_fuzz_multi_kernel.py -v + +# 只生成代码,不执行 +pytest src/fuzzer/generated_tests/test_fuzz_multi_kernel.py -v --codegen-only + +# 保存生成的 C++ 代码 +pytest src/fuzzer/generated_tests/test_fuzz_multi_kernel.py -v --codegen-only --save-kernels --kernels-dir=/tmp/kernels + +# 运行特定测试 +pytest src/fuzzer/generated_tests/test_fuzz_multi_kernel.py::TestMultiKernelFuzzing::test_fuzz_sequential_simple -v +``` + +### 生成的代码结构 + +```python +class TestFuzzSequentialSimple(PTOTestCase): + rows = 128 + cols = 128 + + def __init__(self): + super().__init__() + self.config.atol = 1e-4 + self.config.rtol = 1e-4 + + def get_name(self): + return "fuzz_sequential_simple" + + def define_tensors(self): + return [ + TensorSpec('a', [128, 128], DataType.FP32, init_value=2.0), + TensorSpec('b', [128, 128], DataType.FP32, init_value=2.5), + TensorSpec('output', [128, 128], DataType.FP32, is_output=True), + ] + + def get_program(self): + @pl.program + class Program: + @pl.function(type=pl.FunctionType.InCore) + def kernel_0(self, a, b): + # 内核实现 + pass + + @pl.function(type=pl.FunctionType.Orchestration) + def orchestrator(self, a, b): + # 组合逻辑 + pass + + return Program + + def compute_expected(self, tensors, params=None): + # PyTorch 参考实现 + pass +``` + +--- + +## 更新日志 + +### 最新更新 + +#### 新增功能 +- 支持多种张量初始化类型:constant, random, range, normal, ones, zeros +- 支持从单个配置生成多个测试实例(通过 `num_instances` 字段) +- 新增 `--config-index` 命令行参数,可以指定只生成某个配置的测试用例 +- 新增 `--atol` 和 `--rtol` 命令行参数,支持设置误差容忍度 +- 将所有配置参数移至 `all_configs` 结构,统一管理 + +#### 重要变更 +- 将所有 golden 数据生成从 NumPy 替换为 PyTorch + - `_generate_numpy_reference` 重命名为 `_generate_torch_reference` + - `_get_numpy_operation` 重命名为 `_get_torch_operation` + - 所有中间计算使用 PyTorch 张量操作 + +- 简化命令行参数 + - 移除 `--num-cases`、`--seed`、`--enable-advanced-ops`、`--tensor-init` 参数 + - 所有配置现在通过 `all_configs` 结构管理 + - 保留 `--output`、`--config-index`、`--atol`、`--rtol` 参数 + +#### 修复 +- 修复生成的 golden.py 文件中缺少 torch 导入的问题 + +--- + +## 注意事项 + +1. **32字节对齐约束**: 所有 tensor 创建和 reshape 操作的形状必须满足32字节对齐 + - 形状尾轴(列数)必须是 1,或 `(cols * sizeof(dtype)) % 32 == 0` + - FP32 类型有效的列数: 1, 8, 16, 24, 32, 40, 48, 56, 64, ..., 128, ... + - Fuzzer 会自动验证并修正不对齐的形状 + +2. **张量形状**: 支持不同维度的输入张量,可以在配置中指定每个内核的输入形状 + +3. **数据类型**: 当前仅支持 FP32 类型 + +4. **操作约束**: 框架自动处理除零、负数开方等约束 + +5. **ISA 支持**: 确保添加的操作在目标硬件的 ISA 中有对应实现 + +6. **输入数量**: 每个内核支持 1-3 个输入张量,可以在配置中指定 + +--- + +## 扩展框架 + +### 添加新操作符 + +编辑 `fuzzer.py` 的 `OpFuzzer.__init__` 方法: + +```python +# 在 OpFuzzer.__init__ 中 +custom_ops = [ + OpSpec("block.custom_op", ["tile", "tile"], "tile", {}, + lambda a, b: custom_numpy_impl(a, b)), +] +self.ops = self.ops + custom_ops +``` + +### 添加新组合模式 + +在 `orchestrator_generator.py` 中添加新的生成方法。 + +--- + +## 参考文件 + +- [tests/test_cases/test_matmul.py](../../tests/test_cases/test_matmul.py): PTOTestCase 使用模式 +- [src/fuzzer/src/fuzzer.py](src/fuzzer.py): OpFuzzer 操作生成逻辑和操作符定义 +- [example_multi_kernel.py](example_multi_kernel.py): 配置示例 diff --git a/src/fuzzer/__init__.py b/src/fuzzer/__init__.py new file mode 100644 index 0000000..087858e --- /dev/null +++ b/src/fuzzer/__init__.py @@ -0,0 +1,39 @@ +""" +Multi-kernel fuzzing framework for PyPTO programs. + +This is the main entry point for the fuzzer framework. +External users should import from this module. + +Example: + from fuzzer import OpFuzzer, MultiKernelTestGenerator + + # Create a test generator + generator = MultiKernelTestGenerator(seed=42) + + # Generate a test case + test_code = generator.generate_test_case( + class_name="TestMyFuzz", + num_kernels=3, + ops_per_kernel=(2, 5), + composition_style="sequential" + ) +""" + +# Import from internal src module +from .src import ( + OpFuzzer, + OpSpec, + KernelGenerator, + OrchestratorGenerator, + MultiKernelTestGenerator, +) + +__all__ = [ + "OpFuzzer", + "OpSpec", + "KernelGenerator", + "OrchestratorGenerator", + "MultiKernelTestGenerator", +] + +__version__ = "1.0.0" diff --git a/src/fuzzer/conftest.py b/src/fuzzer/conftest.py new file mode 100644 index 0000000..1e9ef67 --- /dev/null +++ b/src/fuzzer/conftest.py @@ -0,0 +1,43 @@ +""" +pytest configuration for generated multi-kernel fuzz tests. + +This conftest imports all fixtures from the main tests/conftest.py +to ensure generated tests have access to the same CLI options and fixtures. +""" + +import sys +from pathlib import Path + +# Add framework root to path +_FRAMEWORK_ROOT = Path(__file__).parent.parent.parent.parent +_TESTS_DIR = _FRAMEWORK_ROOT / "tests" + +if str(_TESTS_DIR) not in sys.path: + sys.path.insert(0, str(_TESTS_DIR)) + +# Import all fixtures and configuration from main conftest +from tests.conftest import ( + pytest_addoption, + pytest_configure, + pytest_collection_modifyitems, + test_config, + test_runner, + optimization_strategy, + fuzz_count, + fuzz_seed, + tensor_shape, + STANDARD_SHAPES, +) + +__all__ = [ + 'pytest_addoption', + 'pytest_configure', + 'pytest_collection_modifyitems', + 'test_config', + 'test_runner', + 'optimization_strategy', + 'fuzz_count', + 'fuzz_seed', + 'tensor_shape', + 'STANDARD_SHAPES', +] diff --git a/src/fuzzer/example_multi_kernel.py b/src/fuzzer/example_multi_kernel.py new file mode 100644 index 0000000..20b0868 --- /dev/null +++ b/src/fuzzer/example_multi_kernel.py @@ -0,0 +1,308 @@ +""" +多内核模糊测试框架使用示例 + +该脚本演示如何使用多内核测试生成器创建测试用例。 +支持通过命令行参数控制生成的测试用例数量和配置。 + +使用方法: + python example_multi_kernel.py --num-cases 5 +""" + +import argparse +import sys +from pathlib import Path + +# 添加当前目录到路径 +_SCRIPT_DIR = Path(__file__).parent +if str(_SCRIPT_DIR) not in sys.path: + sys.path.insert(0, str(_SCRIPT_DIR)) + +from src.multi_kernel_test_generator import MultiKernelTestGenerator + + +def main(): + """主函数""" + parser = argparse.ArgumentParser( + description="生成多内核模糊测试用例", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +示例: + # 使用默认配置生成测试用例 + python example_multi_kernel.py + + # 指定配置索引(从0开始) + python example_multi_kernel.py --config-index 0 + + # 指定输出文件 + python example_multi_kernel.py --output custom_test.py + + # 设置误差容忍度 + python example_multi_kernel.py --atol 1e-3 --rtol 1e-3 + + # 组合使用 + python example_multi_kernel.py --config-index 1 --atol 1e-4 --rtol 1e-4 --output my_test.py + """ + ) + + parser.add_argument( + "--config-index", + type=int, + default=0, + help="指定要使用的配置索引(从0开始),如果不指定则使用所有配置" + ) + + parser.add_argument( + "--output", + type=str, + default=None, + help="输出文件路径 (默认: src/fuzzer/generated_tests/test_fuzz_multi_kernel.py)" + ) + + parser.add_argument( + "--atol", + type=float, + default=1e-4, + help="绝对误差容忍度 (默认: 1e-5)" + ) + + parser.add_argument( + "--rtol", + type=float, + default=1e-4, + help="相对误差容忍度 (默认: 1e-5)" + ) + + args = parser.parse_args() + + # 设置输出路径 + if args.output: + output_path = args.output + else: + output_path = str(_SCRIPT_DIR / "generated_tests" / "test_fuzz_multi_kernel.py") + + # 定义不同配置的测试用例 + # 每个配置可以生成多个测试实例(通过 num_instances 控制) + all_configs = [ + { + "name": "fuzz_sequential_simple", + "num_instances": 1, # 从这个配置生成1个测试用例 + "seed": 4, + "enable_advanced_ops": False, + "num_kernels": 2, + "mode": "sequential", + "shape": (128, 128), + "num_ops_range": (3, 5), + "tensor_init_type": "constant", + "input_shapes_list": [ + [(128, 128), (128, 128)], # kernel_0: 2个相同维度的输入 + ], + "description": "简单顺序执行:2个内核,相同维度输入" + }, + # { + # "name": "fuzz_branching_parallel", + # "num_instances": 1, # 从这个配置生成2个测试用例 + # "seed": 42, + # "enable_advanced_ops": False, + # "num_kernels": 3, + # "mode": "branching", + # "shape": (128, 128), + # "num_ops_range": (4, 6), + # "tensor_init_type": "random", + # "input_shapes_list": [ + # [(128, 128), (128, 128)], # kernel_0: 2个相同维度 + # [(128, 128), (128, 128)], # kernel_1: 2个相同维度 + # [(128, 128)], # kernel_2: 1个输入 + # ], + # "description": "分支并行执行:3个内核,相同维度输入" + # }, + # { + # "name": "fuzz_mixed_complex", + # "num_instances": 1, + # "seed": 100, + # "enable_advanced_ops": False, + # "num_kernels": 4, + # "mode": "mixed", + # "shape": (128, 128), + # "num_ops_range": (5, 8), + # "tensor_init_type": "range", + # "input_shapes_list": None, # 使用随机生成 + # "description": "混合模式:前2个并行,后2个顺序,随机输入" + # }, + # { + # "name": "fuzz_sequential_deep", + # "num_instances": 1, + # "seed": 200, + # "enable_advanced_ops": False, + # "num_kernels": 5, + # "mode": "sequential", + # "shape": (128, 128), + # "num_ops_range": (6, 10), + # "tensor_init_type": "normal", + # "input_shapes_list": None, # 使用随机生成 + # "description": "深度顺序执行:5个内核链式调用,随机输入" + # }, + # { + # "name": "fuzz_branching_wide", + # "num_instances": 1, + # "seed": 300, + # "enable_advanced_ops": False, + # "num_kernels": 4, + # "mode": "branching", + # "shape": (128, 128), + # "num_ops_range": (4, 7), + # "tensor_init_type": "ones", + # "input_shapes_list": [ + # [(128, 128), (128, 128), (128, 128)], # kernel_0: 3个相同维度 + # [(128, 128)], # kernel_1: 1个输入 + # [(128, 128), (128, 128)], # kernel_2: 2个相同维度 + # [(128, 128), (128, 128)], # kernel_3: 2个相同维度 + # ], + # "description": "宽分支执行:4个内核,统一维度输入" + # }, + ] + + # 根据 config_index 选择配置 + if args.config_index is not None: + if args.config_index < 0 or args.config_index >= len(all_configs): + print(f"错误: 配置索引 {args.config_index} 超出范围 (0-{len(all_configs)-1})") + return + selected_configs = [all_configs[args.config_index]] + else: + selected_configs = all_configs + + # 计算总测试用例数 + total_test_cases = sum(config.get("num_instances", 1) for config in selected_configs) + + print(f"多内核模糊测试生成器") + print(f"=" * 60) + print(f"配置数量: {len(selected_configs)}") + print(f"总测试用例数: {total_test_cases}") + print(f"输出文件: {output_path}") + print(f"绝对误差容忍度 (atol): {args.atol}") + print(f"相对误差容忍度 (rtol): {args.rtol}") + print(f"=" * 60) + print() + + print("将生成以下测试用例:") + print() + test_case_num = 1 + for config_idx, config in enumerate(selected_configs): + num_instances = config.get("num_instances", 1) + print(f"配置 {config_idx}: {config['name']}") + print(f" {config['description']}") + print(f" 实例数量: {num_instances}") + print(f" 随机种子: {config.get('seed', 42)}") + print(f" 启用高级算子: {'是' if config.get('enable_advanced_ops', False) else '否'}") + print(f" 张量初始化: {config.get('tensor_init_type', 'constant')}") + if num_instances > 1: + print(f" 将生成测试用例: {test_case_num} - {test_case_num + num_instances - 1}") + else: + print(f" 将生成测试用例: {test_case_num}") + test_case_num += num_instances + print() + + # 展开配置,为每个实例创建一个测试用例 + expanded_test_configs = [] + for config in selected_configs: + num_instances = config.get("num_instances", 1) + base_seed = config.get("seed", 42) + + for instance_idx in range(num_instances): + # 为每个实例创建一个配置副本 + test_config = config.copy() + + # 如果有多个实例,在名称后添加索引 + if num_instances > 1: + test_config["name"] = f"{config['name']}_{instance_idx}" + # 每个实例使用不同的种子 + test_config["seed"] = base_seed + instance_idx + + expanded_test_configs.append(test_config) + + # 生成测试文件 + print("正在生成测试文件...") + + # 为每个配置创建独立的生成器(使用各自的种子和配置) + all_test_cases = [] + for test_config in expanded_test_configs: + generator = MultiKernelTestGenerator( + seed=test_config.get("seed", 42), + enable_advanced_ops=test_config.get("enable_advanced_ops", False), + tensor_init_type=test_config.get("tensor_init_type", "constant") + ) + + test_code = generator.generate_test_case( + test_name=test_config["name"], + num_kernels=test_config.get("num_kernels", 3), + orchestration_mode=test_config.get("mode", "sequential"), + shape=test_config.get("shape", (128, 128)), + num_ops_range=test_config.get("num_ops_range", (3, 7)), + input_shapes_list=test_config.get("input_shapes_list"), + tensor_init_type=test_config.get("tensor_init_type"), + atol=args.atol, + rtol=args.rtol, + ) + all_test_cases.append(test_code) + + # 生成文件头部 + file_header = '''""" +自动生成的多内核模糊测试用例 + +该文件由 MultiKernelTestGenerator 自动生成。 +包含多个测试用例,每个测试用例包含多个 InCore 内核和一个 Orchestration 函数。 +""" + +import sys +from pathlib import Path +from typing import Any, List + +import torch +import pytest + +from pto_test.core.test_case import DataType, PTOTestCase, TensorSpec + +# 添加 pypto 到路径 +_FRAMEWORK_ROOT = Path(__file__).parent.parent.parent.parent +_PYPTO_ROOT = _FRAMEWORK_ROOT / "3rdparty" / "pypto" / "python" +if _PYPTO_ROOT.exists() and str(_PYPTO_ROOT) not in sys.path: + sys.path.insert(0, str(_PYPTO_ROOT)) + + +''' + + # 生成测试套件类 + test_suite = f''' +class TestMultiKernelFuzzing: + """多内核模糊测试套件""" + +''' + + # 为每个测试用例添加测试方法 + for test_config in expanded_test_configs: + test_name = test_config["name"] + test_suite += f''' def test_{test_name}(self, test_runner): + """测试 {test_name}""" + test_case = Test{test_name.title().replace("_", "")}() + result = test_runner.run(test_case) + assert result.passed, f"测试失败: {{result.error}}" + +''' + + # 写入文件 + with open(output_path, 'w', encoding='utf-8') as f: + f.write(file_header) + f.write('\n\n'.join(all_test_cases)) + f.write('\n\n') + f.write(test_suite) + + print() + print(f"✓ 成功生成 {len(expanded_test_configs)} 个测试用例") + print(f"✓ 输出文件: {output_path}") + print() + print("运行测试:") + print(f" pytest {output_path}") + print() + + +if __name__ == "__main__": + main() diff --git a/src/fuzzer/generated_tests/test_fuzz_multi_kernel.py b/src/fuzzer/generated_tests/test_fuzz_multi_kernel.py new file mode 100644 index 0000000..b2fb972 --- /dev/null +++ b/src/fuzzer/generated_tests/test_fuzz_multi_kernel.py @@ -0,0 +1,139 @@ +""" +自动生成的多内核模糊测试用例 + +该文件由 MultiKernelTestGenerator 自动生成。 +包含多个测试用例,每个测试用例包含多个 InCore 内核和一个 Orchestration 函数。 +""" + +import sys +from pathlib import Path +from typing import Any, List + +import torch +import pytest + +from pto_test.core.test_case import DataType, PTOTestCase, TensorSpec + +# 添加 pypto 到路径 +_FRAMEWORK_ROOT = Path(__file__).parent.parent.parent.parent +_PYPTO_ROOT = _FRAMEWORK_ROOT / "3rdparty" / "pypto" / "python" +if _PYPTO_ROOT.exists() and str(_PYPTO_ROOT) not in sys.path: + sys.path.insert(0, str(_PYPTO_ROOT)) + + +class TestFuzzSequentialSimple(PTOTestCase): + """ + 测试用例: fuzz_sequential_simple + 组合模式: sequential + 内核数量: 2 + """ + + rows = 128 + cols = 128 + + def __init__(self): + super().__init__() + self.config.atol = 0.0001 + self.config.rtol = 0.0001 + + def get_name(self) -> str: + return 'fuzz_sequential_simple' + + def define_tensors(self) -> List[TensorSpec]: + return [ + TensorSpec('a', [128, 128], DataType.FP32, init_value=2.0), + TensorSpec('b', [128, 128], DataType.FP32, init_value=2.5), + TensorSpec('output', [128, 128], DataType.FP32, is_output=True), + ] + + def get_program(self) -> Any: + import pypto.language as pl + + @pl.program + class FuzzSequentialSimpleProgram: + @pl.function(type=pl.FunctionType.InCore) + def kernel_0(self, a: pl.Tensor[[128, 128], pl.FP32], b: pl.Tensor[[128, 128], pl.FP32], output: pl.Tensor[[128, 128], pl.FP32]) -> pl.Tensor[[128, 128], pl.FP32]: + tile_a = pl.load(a, offsets=[0, 0], shapes=[128, 128]) + tile_b = pl.load(b, offsets=[0, 0], shapes=[128, 128]) + tmp_0 = pl.subs(tile_b, 1.0) + tmp_1 = pl.mul(tile_a, tile_a) + tmp_2 = pl.subs(tmp_1, 1.0) + tmp_3 = pl.add(tmp_0, tmp_2) + result = pl.store(tmp_3, offsets=[0, 0], shapes=[128, 128], output_tensor=output) + return result + + @pl.function(type=pl.FunctionType.InCore) + def kernel_1(self, a: pl.Tensor[[128, 128], pl.FP32], b: pl.Tensor[[128, 128], pl.FP32], output: pl.Tensor[[128, 128], pl.FP32]) -> pl.Tensor[[128, 128], pl.FP32]: + tile_a = pl.load(a, offsets=[0, 0], shapes=[128, 128]) + tile_b = pl.load(b, offsets=[0, 0], shapes=[128, 128]) + tmp_0 = pl.div(tile_a, tile_b) + tmp_1 = pl.muls(tmp_0, 0.5) + tmp_2 = pl.rsqrt(tmp_1) + tmp_3 = pl.exp(tmp_0) + tmp_4 = pl.add(tmp_2, tmp_3) + result = pl.store(tmp_4, offsets=[0, 0], shapes=[128, 128], output_tensor=output) + return result + + @pl.function(type=pl.FunctionType.Orchestration) + def orchestrator(self, a: pl.Tensor[[128, 128], pl.FP32], b: pl.Tensor[[128, 128], pl.FP32]) -> pl.Tensor[[128, 128], pl.FP32]: + result_0 = self.kernel_0(a, b) + result_1 = self.kernel_1(result_0, b) + return result_1 + + return FuzzSequentialSimpleProgram + + def compute_expected(self, tensors, params=None): + """使用 Torch 计算期望输出""" + # 将 numpy 数组转换为 torch 张量(仅在输入边界) + torch_tensors = {name: torch.from_numpy(arr) for name, arr in tensors.items() if not name.endswith('output')} + + def _torch_kernel_0(a, b): + """Torch 实现: kernel_0""" + # 创建变量环境 + env = {} + env['tile_a'] = a.clone() + env['tile_b'] = b.clone() + + # 执行操作链 + env['tmp_0'] = env['tile_b'] - 1.0 + env['tmp_1'] = env['tile_a'] * env['tile_a'] + env['tmp_2'] = env['tmp_1'] - 1.0 + env['tmp_3'] = env['tmp_0'] + env['tmp_2'] + return env['tmp_3'] + + def _torch_kernel_1(a, b): + """Torch 实现: kernel_1""" + # 创建变量环境 + env = {} + env['tile_a'] = a.clone() + env['tile_b'] = b.clone() + + # 执行操作链 + env['tile_a'] = torch.where(torch.abs(env['tile_a']) < 0.01, torch.tensor(1.0), env['tile_a']) + env['tile_b'] = torch.where(torch.abs(env['tile_b']) < 0.01, torch.tensor(1.0), env['tile_b']) + env['tmp_0'] = env['tile_a'] / env['tile_b'] + env['tmp_1'] = env['tmp_0'] * 0.5 + env['tmp_1'] = torch.abs(env['tmp_1']) + 1e-6 + env['tmp_2'] = torch.rsqrt(env['tmp_1']) + env['tmp_3'] = torch.exp(torch.clamp(env['tmp_0'], -10, 10)) + env['tmp_4'] = env['tmp_2'] + env['tmp_3'] + return env['tmp_4'] + + + # 顺序执行模式 + result_0 = _torch_kernel_0(torch_tensors['a'], torch_tensors['b']) + result_1 = _torch_kernel_1(result_0, torch_tensors['b']) + # 将结果转换回 numpy 并写入输出 + tensors['output'][:] = result_1.numpy() + + + +class TestMultiKernelFuzzing: + """多内核模糊测试套件""" + + def test_fuzz_sequential_simple(self, test_runner): + """测试 fuzz_sequential_simple""" + test_case = TestFuzzSequentialSimple() + result = test_runner.run(test_case) + assert result.passed, f"测试失败: {result.error}" + diff --git a/src/fuzzer/src/__init__.py b/src/fuzzer/src/__init__.py new file mode 100644 index 0000000..5cefd5b --- /dev/null +++ b/src/fuzzer/src/__init__.py @@ -0,0 +1,16 @@ +""" +Internal implementation modules for the fuzzer framework. +""" + +from .fuzzer import OpFuzzer, OpSpec +from .kernel_generator import KernelGenerator +from .orchestrator_generator import OrchestratorGenerator +from .multi_kernel_test_generator import MultiKernelTestGenerator + +__all__ = [ + "OpFuzzer", + "OpSpec", + "KernelGenerator", + "OrchestratorGenerator", + "MultiKernelTestGenerator", +] diff --git a/src/fuzzer/src/fuzzer.py b/src/fuzzer/src/fuzzer.py new file mode 100644 index 0000000..06c02a9 --- /dev/null +++ b/src/fuzzer/src/fuzzer.py @@ -0,0 +1,543 @@ +""" +Operator fuzzer for generating random operator combinations. +""" + +import random +from dataclasses import dataclass +from typing import Any, Dict, List, Optional, Tuple + +import numpy as np # Used in lambda functions for op equivalents + + +# 数据类型字节大小 +DTYPE_SIZES = { + "FP32": 4, + "FP16": 2, + "INT8": 1, + "INT32": 4, +} + + +def is_shape_aligned(shape: Tuple[int, int], dtype: str = "FP32") -> bool: + """检查形状是否满足32字节对齐约束 + + Args: + shape: (rows, cols) 形状元组 + dtype: 数据类型 (默认 FP32) + + Returns: + True 如果形状满足对齐要求 + + 规则: + - 尾轴 (cols) 必须是 1, 或者 + - (尾轴 * sizeof(dtype)) 必须是 32 的倍数 + + 示例 (FP32, sizeof=4): + - (128, 1) ✓ 尾轴=1 + - (128, 8) ✓ 8*4=32, 对齐 + - (128, 16) ✓ 16*4=64, 对齐 + - (128, 32) ✓ 32*4=128, 对齐 + - (128, 5) ✗ 5*4=20, 不对齐 + """ + rows, cols = shape + dtype_size = DTYPE_SIZES.get(dtype, 4) + + # 尾轴是1,总是对齐 + if cols == 1: + return True + + # 检查 (尾轴 * sizeof(dtype)) 是否是32的倍数 + return (cols * dtype_size) % 32 == 0 + + +def get_aligned_shapes(dtype: str = "FP32", max_size: int = 128) -> List[Tuple[int, int]]: + """获取所有满足对齐约束的常用形状 + + Args: + dtype: 数据类型 (默认 FP32) + max_size: 最大维度大小 (默认 128,避免内存溢出) + + Returns: + 对齐的形状列表 + """ + dtype_size = DTYPE_SIZES.get(dtype, 4) + # 计算最小对齐的列数 (除了1) + min_aligned_cols = 32 // dtype_size # FP32: 8, FP16: 16, INT8: 32 + + aligned_shapes = [] + + # 常用行数 - 限制最大为 max_size + common_rows = [32, 64, 80, 96, 128] + common_rows = [r for r in common_rows if r <= max_size] + + # 对齐的列数: 1, min_aligned_cols, 2*min_aligned_cols, ... + for rows in common_rows: + # 列数为1的情况 + aligned_shapes.append((rows, 1)) + + # 对齐的列数 + max_multiplier = max_size // min_aligned_cols + for multiplier in range(1, max_multiplier + 1): + cols = min_aligned_cols * multiplier + if cols <= max_size: + aligned_shapes.append((rows, cols)) + + return aligned_shapes + + +def generate_aligned_shape(rng, dtype: str = "FP32", max_size: int = 128) -> Tuple[int, int]: + """随机生成一个对齐的形状 + + Args: + rng: 随机数生成器 + dtype: 数据类型 + max_size: 最大维度大小 (默认 128,避免内存溢出) + + Returns: + 满足对齐约束的形状元组 + """ + aligned_shapes = get_aligned_shapes(dtype, max_size) + return rng.choice(aligned_shapes) if aligned_shapes else (128, 128) + + +@dataclass +class OpSpec: + """Operator specification for fuzzing. + + Attributes: + name: Operator name (e.g., "block.add") + input_types: List of input types (e.g., ["tile", "tile"]) + output_type: Output type (e.g., "tile") + constraints: Additional constraints (e.g., {"min_shape": [64, 64]}) + np_equivalent: NumPy equivalent function for golden reference + shape_transform: Optional callable that computes output shape from input shapes + param_generator: Optional callable that generates operator parameters + requires_params: Whether this operator requires parameters (default: False) + """ + name: str + input_types: List[str] + output_type: str + constraints: Dict[str, Any] + np_equivalent: Optional[Any] = None + shape_transform: Optional[Any] = None + param_generator: Optional[Any] = None + requires_params: bool = False + + def compute_output_shape(self, input_shapes: List[Tuple[int, int]], params: Optional[Dict[str, Any]] = None) -> Tuple[int, int]: + """Compute output shape from input shapes.""" + if self.shape_transform: + import inspect + sig = inspect.signature(self.shape_transform) + if len(sig.parameters) >= 2 and params is not None: + return self.shape_transform(input_shapes, params) + else: + return self.shape_transform(input_shapes) + return input_shapes[0] if input_shapes else (128, 128) + + def generate_params(self, input_shapes: List[Tuple[int, int]], rng) -> Dict[str, Any]: + """Generate operator parameters based on input shapes.""" + if self.param_generator and self.requires_params: + return self.param_generator(input_shapes, rng) + return {} + + +class OpFuzzer: + """Generates random operator combinations for fuzzing.""" + + # Block-level binary operators + BLOCK_BINARY_OPS = [ + OpSpec("block.add", ["tile", "tile"], "tile", {}, lambda a, b: a + b), + OpSpec("block.sub", ["tile", "tile"], "tile", {}, lambda a, b: a - b), + OpSpec("block.mul", ["tile", "tile"], "tile", {}, lambda a, b: a * b), + OpSpec("block.div", ["tile", "tile"], "tile", {"avoid_zero": True}, lambda a, b: a / b), + OpSpec("block.maximum", ["tile", "tile"], "tile", {}, lambda a, b: np.maximum(a, b)), + OpSpec("block.minimum", ["tile", "tile"], "tile", {}, lambda a, b: np.minimum(a, b)), + ] + + # Block-level scalar operators + BLOCK_SCALAR_OPS = [ + OpSpec("block.adds", ["tile", "scalar"], "tile", {}, lambda a, s: a + s), + OpSpec("block.subs", ["tile", "scalar"], "tile", {}, lambda a, s: a - s), + OpSpec("block.muls", ["tile", "scalar"], "tile", {}, lambda a, s: a * s), + OpSpec("block.divs", ["tile", "scalar"], "tile", {"avoid_zero": True}, lambda a, s: a / s), + ] + + # Block-level unary operators + BLOCK_UNARY_OPS = [ + OpSpec("block.sqrt", ["tile"], "tile", {"positive_only": True}, lambda a: np.sqrt(a)), + OpSpec("block.rsqrt", ["tile"], "tile", {"positive_only": True}, lambda a: 1.0 / np.sqrt(a)), + OpSpec("block.exp", ["tile"], "tile", {}, lambda a: np.exp(np.clip(a, -10, 10))), + OpSpec("block.neg", ["tile"], "tile", {}, lambda a: -a), + OpSpec("block.recip", ["tile"], "tile", {"avoid_zero": True}, lambda a: 1.0 / a), + OpSpec("block.log", ["tile"], "tile", {"positive_only": True}, lambda a: np.log(a)), + OpSpec("block.abs", ["tile"], "tile", {}, lambda a: np.abs(a)), + OpSpec("block.relu", ["tile"], "tile", {}, lambda a: np.maximum(0, a)), + ] + + # Block-level row expand operators (需要特殊的形状处理) + # 注意: 这些操作需要第二个输入是 [M, 1] 形状 + BLOCK_ROW_EXPAND_OPS = [ + OpSpec("block.row_expand_add", ["tile", "tile"], "tile", {"row_vec_required": True}, + lambda a, b: a + b), # b is [M,1], broadcasts to [M,N] + OpSpec("block.row_expand_sub", ["tile", "tile"], "tile", {"row_vec_required": True}, + lambda a, b: a - b), + OpSpec("block.row_expand_mul", ["tile", "tile"], "tile", {"row_vec_required": True}, + lambda a, b: a * b), + OpSpec("block.row_expand_div", ["tile", "tile"], "tile", {"row_vec_required": True, "avoid_zero": True}, + lambda a, b: a / b), + ] + + # Block-level reduction operators (改变形状) + # axis=1: 沿最后一个轴归约, [M,N] -> [M,1] + BLOCK_REDUCTION_OPS = [ + # 注意: row_sum, row_max, row_min 需要一个临时tile参数 + # 为了简化,这里先不包含它们,或者使用 sum/max/min with axis参数 + ] + + # Block-level matrix operators + BLOCK_MATRIX_OPS = [ + OpSpec("block.matmul", ["tile", "tile"], "tile", {"matmul_shape": True}, + lambda a, b: a @ b, + shape_transform=lambda shapes, params=None: (shapes[0][0], shapes[1][1]) if len(shapes) >= 2 else shapes[0]), + ] + + def __init__(self, seed: Optional[int] = None, enable_advanced_ops: bool = False): + """Initialize fuzzer with optional seed for reproducibility. + + Args: + seed: Random seed for reproducibility + enable_advanced_ops: Enable advanced operations like row_expand, matmul (default: False) + """ + self.rng = random.Random(seed) + # 基础操作符集合 + self.ops = self.BLOCK_BINARY_OPS + self.BLOCK_SCALAR_OPS + self.BLOCK_UNARY_OPS + + # 可选: 启用高级操作符 + if enable_advanced_ops: + self.ops = self.ops + self.BLOCK_ROW_EXPAND_OPS + self.BLOCK_MATRIX_OPS + + def generate_op_chain( + self, + num_ops: int = 5, + input_count: int = 2, + allow_scalars: bool = True, + track_shapes: bool = False, + default_shape: Tuple[int, int] = (128, 128), + ) -> List[Dict[str, Any]]: + """Generate a chain of operator calls. + + All input tensors and intermediate results are guaranteed to contribute + to the final output through smart generation and post-processing. + """ + # Initialize available variables + available_tiles = [f"tile_{chr(97 + i)}" for i in range(input_count)] + available_scalars = ["1.0", "2.0", "0.5"] if allow_scalars else [] + + # Track which initial inputs have been used + initial_inputs = set(available_tiles) + used_inputs = set() + + # Track usage count for each variable + variable_usage_count = {tile: 0 for tile in available_tiles} + + # Shape tracking (optional) + variable_shapes = {} + if track_shapes: + for tile in available_tiles: + variable_shapes[tile] = default_shape + + operations = [] + + for i in range(num_ops): + # Calculate urgency for using unused inputs + unused_count = len(initial_inputs - used_inputs) + remaining_ops = num_ops - i + + # Dynamic priority + use_unused_priority = 0.7 + if unused_count > 0: + if unused_count >= remaining_ops: + use_unused_priority = 1.0 + elif remaining_ops > 0: + use_unused_priority = min(0.9, 0.7 + 0.3 * (unused_count / remaining_ops)) + + # Select eligible operators + eligible_ops = self._get_eligible_ops( + available_tiles, + available_scalars, + allow_scalars, + variable_shapes if track_shapes else None, + ) + + if not eligible_ops: + break + + # Prioritize binary ops if we need to use unused inputs + if unused_count > 0 and use_unused_priority >= 0.9: + binary_ops = [op for op in eligible_ops if sum(1 for t in op.input_types if t == "tile") >= 2] + if binary_ops: + eligible_ops = binary_ops + + op = self.rng.choice(eligible_ops) + + # Select inputs + inputs = [] + scalar_value = None + + for input_type in op.input_types: + if input_type == "tile": + candidate_tiles = available_tiles + + if track_shapes: + candidate_tiles = [ + t for t in candidate_tiles + if self._is_shape_compatible(op, t, variable_shapes) + ] + if not candidate_tiles: + continue + + # Smart selection: prioritize unused inputs + unused_initial_inputs = { + t for t in candidate_tiles + if t in initial_inputs and t not in used_inputs + } + + candidate_scores = [] + for t in candidate_tiles: + score = 0 + + if t in unused_initial_inputs: + score += 50 + if use_unused_priority >= 0.9: + score += 30 + + usage = variable_usage_count.get(t, 0) + score += max(0, 20 - usage * 5) + + if t.startswith("tmp_"): + score += 5 + + candidate_scores.append((t, score)) + + if candidate_scores: + max_score = max(score for _, score in candidate_scores) + + if max_score >= 40: + threshold = max(max_score * 0.6, 30) + top_candidates = [t for t, score in candidate_scores if score >= threshold] + + if top_candidates and self.rng.random() < 0.85: + candidate_tiles = top_candidates + else: + min_score_needed = max(max_score * 0.7, 10) + preferred = [t for t, score in candidate_scores if score >= min_score_needed] + if preferred and self.rng.random() < 0.75: + candidate_tiles = preferred + + selected_input = self.rng.choice(candidate_tiles) + inputs.append(selected_input) + + variable_usage_count[selected_input] = variable_usage_count.get(selected_input, 0) + 1 + + if selected_input in initial_inputs: + used_inputs.add(selected_input) + + elif input_type == "scalar": + if self.rng.random() < 0.5 and available_scalars: + scalar_value = self.rng.choice(available_scalars) + else: + scalar_value = f"{self.rng.uniform(0.1, 10.0):.2f}" + inputs.append(scalar_value) + + output = f"tmp_{i}" + + # Generate operator parameters if required + params = None + if op.requires_params: + input_shapes = [variable_shapes[inp] for inp in inputs if inp in variable_shapes] + if input_shapes: + params = op.generate_params(input_shapes, self.rng) + + op_dict = { + "op": op, + "inputs": inputs, + "output": output, + "scalar_value": scalar_value, + "params": params, + } + + # Compute output shape if tracking + if track_shapes: + input_shapes = [variable_shapes[inp] for inp in inputs if inp in variable_shapes] + output_shape = op.compute_output_shape(input_shapes, params) + op_dict["output_shape"] = output_shape + variable_shapes[output] = output_shape + + operations.append(op_dict) + available_tiles.append(output) + variable_usage_count[output] = 0 + + # Ensure all initial inputs are used + unused_inputs = initial_inputs - used_inputs + if unused_inputs: + add_op = next((op for op in self.BLOCK_BINARY_OPS if op.name == "block.add"), None) + + for unused_input in unused_inputs: + if operations: + current_final = operations[-1]["output"] + output = f"tmp_{len(operations)}" + + op_dict = { + "op": add_op, + "inputs": [unused_input, current_final], + "output": output, + "scalar_value": None, + "params": None, + } + + if track_shapes: + input_shapes = [ + variable_shapes.get(unused_input, default_shape), + variable_shapes.get(current_final, default_shape) + ] + output_shape = add_op.compute_output_shape(input_shapes) + op_dict["output_shape"] = output_shape + variable_shapes[output] = output_shape + + operations.append(op_dict) + available_tiles.append(output) + used_inputs.add(unused_input) + variable_usage_count[output] = 0 + variable_usage_count[unused_input] = variable_usage_count.get(unused_input, 0) + 1 + variable_usage_count[current_final] = variable_usage_count.get(current_final, 0) + 1 + + # Ensure all intermediate results contribute to the final output + if operations: + final_output = operations[-1]["output"] + unused_intermediates = [] + + for var_name, usage_count in variable_usage_count.items(): + if var_name.startswith("tmp_") and usage_count == 0 and var_name != final_output: + unused_intermediates.append(var_name) + + if unused_intermediates: + add_op = next((op for op in self.BLOCK_BINARY_OPS if op.name == "block.add"), None) + + for unused_var in unused_intermediates: + current_final = operations[-1]["output"] + output = f"tmp_{len(operations)}" + + op_dict = { + "op": add_op, + "inputs": [unused_var, current_final], + "output": output, + "scalar_value": None, + "params": None, + } + + if track_shapes: + input_shapes = [ + variable_shapes.get(unused_var, default_shape), + variable_shapes.get(current_final, default_shape) + ] + output_shape = add_op.compute_output_shape(input_shapes) + op_dict["output_shape"] = output_shape + variable_shapes[output] = output_shape + + operations.append(op_dict) + available_tiles.append(output) + variable_usage_count[output] = 0 + variable_usage_count[unused_var] = variable_usage_count.get(unused_var, 0) + 1 + variable_usage_count[current_final] = variable_usage_count.get(current_final, 0) + 1 + + return operations + + def _get_eligible_ops( + self, + available_tiles: List[str], + available_scalars: List[str], + allow_scalars: bool, + variable_shapes: Optional[Dict[str, Tuple[int, int]]] = None, + ) -> List[OpSpec]: + """Get operators that can be applied with current variables.""" + eligible = [] + + for op in self.ops: + tile_inputs = sum(1 for t in op.input_types if t == "tile") + scalar_inputs = sum(1 for t in op.input_types if t == "scalar") + + has_tiles = len(available_tiles) >= tile_inputs + has_scalars = (scalar_inputs == 0) or (allow_scalars and + (len(available_scalars) >= scalar_inputs or scalar_inputs > 0)) + + if has_tiles and has_scalars: + eligible.append(op) + + return eligible + + def _is_shape_compatible( + self, + op: OpSpec, + var: str, + variable_shapes: Dict[str, Tuple[int, int]] + ) -> bool: + """Check if a variable's shape is compatible with an operator.""" + if var not in variable_shapes: + return True + return True # All current ops are compatible with any shape + + def generate_numpy_reference( + self, + op_chain: List[Dict[str, Any]], + input_tensors: Dict[str, Any], + ) -> Any: + """Generate NumPy golden reference from operation chain.""" + import numpy as np + + # Create variable environment + env = {} + for name, tensor in input_tensors.items(): + env[f"tile_{name}"] = tensor.copy() + + # Execute operations + for op_dict in op_chain: + op = op_dict["op"] + inputs = op_dict["inputs"] + output = op_dict["output"] + params = op_dict.get("params") + + # Get input values + input_vals = [] + for inp in inputs: + if inp in env: + val = env[inp] + else: + val = float(inp) + input_vals.append(val) + + # Apply constraints + if "avoid_zero" in op.constraints and op.constraints["avoid_zero"]: + for i, val in enumerate(input_vals): + if isinstance(val, np.ndarray): + input_vals[i] = np.where(np.abs(val) < 0.01, 1.0, val) + + if "positive_only" in op.constraints and op.constraints["positive_only"]: + for i, val in enumerate(input_vals): + if isinstance(val, np.ndarray): + input_vals[i] = np.abs(val) + 1e-6 + + # Execute operation + if op.np_equivalent: + import inspect + sig = inspect.signature(op.np_equivalent) + if params and len(sig.parameters) > len(input_vals): + result = op.np_equivalent(*input_vals, params) + else: + result = op.np_equivalent(*input_vals) + env[output] = result + + # Return final result + if op_chain: + return env[op_chain[-1]["output"]] + else: + return input_tensors[list(input_tensors.keys())[0]] diff --git a/src/fuzzer/src/kernel_generator.py b/src/fuzzer/src/kernel_generator.py new file mode 100644 index 0000000..c58c9fb --- /dev/null +++ b/src/fuzzer/src/kernel_generator.py @@ -0,0 +1,225 @@ +""" +InCore 内核函数生成器 + +该模块负责生成 @pl.function(type=pl.FunctionType.InCore) 内核函数。 +每个内核包含一系列随机生成的算子操作链。 +""" + +import random +from typing import Any, Dict, List, Optional, Tuple + +from .fuzzer import OpFuzzer, is_shape_aligned, generate_aligned_shape + + +class KernelGenerator: + """生成 InCore 内核函数的生成器""" + + def __init__(self, seed: Optional[int] = None, enable_advanced_ops: bool = False): + """初始化内核生成器 + + Args: + seed: 随机种子,用于可重现性 + enable_advanced_ops: 启用高级算子(row_expand, matmul等) + """ + self.rng = random.Random(seed) + self.fuzzer = OpFuzzer(seed=seed, enable_advanced_ops=enable_advanced_ops) + + def generate_kernel( + self, + kernel_name: str, + num_inputs: int = 2, + num_ops: int = 5, + shape: Tuple[int, int] = (128, 128), + allow_scalars: bool = True, + input_shapes: Optional[List[Tuple[int, int]]] = None, + output_shape: Optional[Tuple[int, int]] = None, + ) -> Dict[str, Any]: + """生成单个 InCore 内核 + + Args: + kernel_name: 内核函数名称 + num_inputs: 输入张量数量(如果未指定 input_shapes) + num_ops: 操作数量 + shape: 默认张量形状(如果未指定 input_shapes) + allow_scalars: 是否允许标量操作 + input_shapes: 每个输入的形状列表,如果指定则覆盖 num_inputs 和 shape + output_shape: 输出形状,如果指定则覆盖默认行为 + + Returns: + 包含内核信息的字典: + - name: 内核名称 + - inputs: 输入参数列表 [(name, shape), ...] + - output_shape: 输出形状 + - op_chain: 操作链 + - code: 生成的 PyPTO 代码 + """ + # 确定输入形状 + if input_shapes is not None: + actual_num_inputs = len(input_shapes) + actual_shapes = input_shapes + else: + actual_num_inputs = num_inputs + actual_shapes = [shape] * num_inputs + + # 验证所有形状是否满足对齐约束 + dtype = "FP32" # 当前仅支持 FP32 + for i, input_shape in enumerate(actual_shapes): + if not is_shape_aligned(input_shape, dtype): + # 如果形状不对齐,使用最接近的对齐形状 + print(f"Warning: Input shape {input_shape} is not 32-byte aligned. Regenerating aligned shape.") + actual_shapes[i] = generate_aligned_shape(self.rng, dtype) + + # 确定输出形状并验证对齐 + if output_shape is not None: + actual_output_shape = output_shape + if not is_shape_aligned(actual_output_shape, dtype): + print(f"Warning: Output shape {actual_output_shape} is not 32-byte aligned. Regenerating aligned shape.") + actual_output_shape = generate_aligned_shape(self.rng, dtype) + else: + actual_output_shape = actual_shapes[0] + + # 生成操作链 + op_chain = self.fuzzer.generate_op_chain( + num_ops=num_ops, + input_count=actual_num_inputs, + allow_scalars=allow_scalars, + track_shapes=False, + default_shape=actual_output_shape, + ) + + # 生成输入参数 + input_names = [chr(97 + i) for i in range(actual_num_inputs)] # a, b, c, ... + inputs = [(name, actual_shapes[i]) for i, name in enumerate(input_names)] + + # 生成内核代码 + code = self._generate_kernel_code( + kernel_name=kernel_name, + inputs=inputs, + op_chain=op_chain, + output_shape=actual_output_shape, + ) + + return { + "name": kernel_name, + "inputs": inputs, + "output_shape": actual_output_shape, + "op_chain": op_chain, + "code": code, + } + + def _generate_kernel_code( + self, + kernel_name: str, + inputs: List[Tuple[str, Tuple[int, int]]], + op_chain: List[Dict[str, Any]], + output_shape: Tuple[int, int], + ) -> str: + """生成内核函数代码 + + Args: + kernel_name: 内核名称 + inputs: 输入参数列表 + op_chain: 操作链 + output_shape: 输出形状 + + Returns: + 生成的 PyPTO 代码字符串 + """ + rows, cols = output_shape + + # 生成函数签名 - 添加 output_tensor 参数 + params = [] + for name, (r, c) in inputs: + params.append(f"{name}: pl.Tensor[[{r}, {c}], pl.FP32]") + # 添加 output_tensor 参数 + params.append(f"output: pl.Tensor[[{rows}, {cols}], pl.FP32]") + + code_lines = [ + f" @pl.function(type=pl.FunctionType.InCore)", + f" def {kernel_name}(self, {', '.join(params)}) -> pl.Tensor[[{rows}, {cols}], pl.FP32]:", + ] + + # 加载输入张量 - 使用每个输入自己的实际形状 + for name, (r, c) in inputs: + code_lines.append(f" tile_{name} = pl.load({name}, offsets=[0, 0], shapes=[{r}, {c}])") + + # 生成操作链 + for op_dict in op_chain: + op = op_dict["op"] + inputs_str = ", ".join(op_dict["inputs"]) + output = op_dict["output"] + params = op_dict.get("params") + + # 去掉 block. 前缀,直接使用 pl.xxx + op_name = op.name.replace("block.", "") + + if params: + params_str = ", ".join(f"{k}={v}" for k, v in params.items()) + code_lines.append(f" {output} = pl.{op_name}({inputs_str}, {params_str})") + else: + code_lines.append(f" {output} = pl.{op_name}({inputs_str})") + + # Store 结果并返回 + if op_chain: + last_output = op_chain[-1]["output"] + code_lines.append(f" result = pl.store({last_output}, offsets=[0, 0], shapes=[{rows}, {cols}], output_tensor=output)") + code_lines.append(f" return result") + else: + # 如果没有操作,直接 store 第一个输入 + first_input = inputs[0][0] + code_lines.append(f" result = pl.store(tile_{first_input}, offsets=[0, 0], shapes=[{rows}, {cols}], output_tensor=output)") + code_lines.append(f" return result") + + return "\n".join(code_lines) + + def generate_multiple_kernels( + self, + num_kernels: int = 3, + num_inputs_range: Tuple[int, int] = (2, 3), + num_ops_range: Tuple[int, int] = (3, 7), + shape: Tuple[int, int] = (128, 128), + input_shapes_list: Optional[List[List[Tuple[int, int]]]] = None, + output_shapes: Optional[List[Tuple[int, int]]] = None, + ) -> List[Dict[str, Any]]: + """生成多个 InCore 内核 + + Args: + num_kernels: 要生成的内核数量 + num_inputs_range: 输入数量范围 (min, max) + num_ops_range: 操作数量范围 (min, max) + shape: 默认张量形状 + input_shapes_list: 每个内核的输入形状列表,如果指定则覆盖其他参数 + 例如: [[(128,128), (64,64)], [(256,256)], ...] + output_shapes: 每个内核的输出形状列表(可选) + + Returns: + 内核信息字典列表 + """ + kernels = [] + for i in range(num_kernels): + num_ops = self.rng.randint(*num_ops_range) + + # 确定输入形状 + if input_shapes_list and i < len(input_shapes_list): + kernel_input_shapes = input_shapes_list[i] + kernel_output_shape = output_shapes[i] if output_shapes and i < len(output_shapes) else None + kernel = self.generate_kernel( + kernel_name=f"kernel_{i}", + num_ops=num_ops, + shape=shape, + input_shapes=kernel_input_shapes, + output_shape=kernel_output_shape, + ) + else: + num_inputs = self.rng.randint(*num_inputs_range) + kernel_output_shape = output_shapes[i] if output_shapes and i < len(output_shapes) else None + kernel = self.generate_kernel( + kernel_name=f"kernel_{i}", + num_inputs=num_inputs, + num_ops=num_ops, + shape=shape, + output_shape=kernel_output_shape, + ) + kernels.append(kernel) + + return kernels diff --git a/src/fuzzer/src/multi_kernel_test_generator.py b/src/fuzzer/src/multi_kernel_test_generator.py new file mode 100644 index 0000000..8afe447 --- /dev/null +++ b/src/fuzzer/src/multi_kernel_test_generator.py @@ -0,0 +1,770 @@ +""" +多内核测试用例生成器 + +该模块负责生成完整的测试用例,包括: +- 多个 InCore 内核 +- Orchestration 组合函数 +- NumPy 参考实现 +- PTOTestCase 测试类 +""" + +import sys +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple + +from .fuzzer import OpFuzzer + +from .kernel_generator import KernelGenerator +from .orchestrator_generator import OrchestratorGenerator + + +class MultiKernelTestGenerator: + """生成多内核测试用例的生成器""" + + def __init__( + self, + seed: Optional[int] = None, + enable_advanced_ops: bool = False, + tensor_init_type: str = "constant" + ): + """初始化测试生成器 + + Args: + seed: 随机种子,用于可重现性 + enable_advanced_ops: 启用高级算子(row_expand, matmul等) + tensor_init_type: 张量初始化类型,可选值: + - "constant": 常量初始化(默认) + - "random": 随机初始化 + - "range": 范围初始化(0到1之间) + - "normal": 正态分布初始化 + """ + self.seed = seed + self.enable_advanced_ops = enable_advanced_ops + self.tensor_init_type = tensor_init_type + self.kernel_gen = KernelGenerator(seed=seed, enable_advanced_ops=enable_advanced_ops) + self.orch_gen = OrchestratorGenerator(seed=seed) + self.fuzzer = OpFuzzer(seed=seed, enable_advanced_ops=enable_advanced_ops) + + def _generate_tensor_init_value(self, tensor_index: int, init_type: str = None) -> str: + """生成张量初始化值的代码 + + Args: + tensor_index: 张量索引(用于生成不同的常量值) + init_type: 初始化类型,如果为None则使用self.tensor_init_type + + Returns: + 初始化值的代码字符串 + """ + if init_type is None: + init_type = self.tensor_init_type + + if init_type == "constant": + # 常量初始化:每个张量使用不同的常量 + init_val = 2.0 + tensor_index * 0.5 + return f"init_value={init_val}" + elif init_type == "random": + # 随机初始化:使用lambda函数生成随机数 + return "init_value=lambda shape: torch.randn(shape, dtype=torch.float32).numpy()" + elif init_type == "range": + # 范围初始化:0到1之间的均匀分布 + return "init_value=lambda shape: torch.rand(shape, dtype=torch.float32).numpy()" + elif init_type == "normal": + # 正态分布初始化:均值0,标准差1 + return "init_value=lambda shape: torch.randn(shape, dtype=torch.float32).numpy()" + elif init_type == "ones": + # 全1初始化 + return "init_value=1.0" + elif init_type == "zeros": + # 全0初始化(不推荐用于输入,可能导致除零) + return "init_value=0.0" + else: + # 默认使用常量 + init_val = 2.0 + tensor_index * 0.5 + return f"init_value={init_val}" + + def _compute_output_shapes_for_sequential( + self, + num_kernels: int, + default_shape: Tuple[int, int], + input_shapes_list: Optional[List[List[Tuple[int, int]]]], + mode: str, + ) -> List[Tuple[int, int]]: + """计算顺序模式下每个内核的输出形状,确保形状兼容性 + + Args: + num_kernels: 内核数量 + default_shape: 默认形状 + input_shapes_list: 输入形状列表 + mode: 组合模式 + + Returns: + 每个内核的输出形状列表 + """ + output_shapes = [] + + if mode == "sequential": + # 顺序模式:kernel_i 的输出必须匹配 kernel_{i+1} 的第一个输入 + for i in range(num_kernels): + if i == num_kernels - 1: + # 最后一个内核:输出形状使用其第一个输入的形状 + if input_shapes_list and i < len(input_shapes_list): + output_shapes.append(input_shapes_list[i][0]) + else: + output_shapes.append(default_shape) + else: + # 非最后一个内核:输出形状必须匹配下一个内核的第一个输入 + if input_shapes_list and i + 1 < len(input_shapes_list): + next_kernel_first_input = input_shapes_list[i + 1][0] + output_shapes.append(next_kernel_first_input) + else: + output_shapes.append(default_shape) + + elif mode == "branching": + # 分支模式:所有内核必须有相同的输出形状(用于合并) + # 使用第一个内核的第一个输入形状作为统一输出形状 + if input_shapes_list and len(input_shapes_list) > 0: + unified_output_shape = input_shapes_list[0][0] + else: + unified_output_shape = default_shape + + for i in range(num_kernels): + output_shapes.append(unified_output_shape) + + elif mode == "mixed": + # 混合模式:前半部分并行,后半部分顺序 + mid = num_kernels // 2 + + # 并行部分:所有内核使用相同的输出形状 + if input_shapes_list and len(input_shapes_list) > 0: + parallel_output_shape = input_shapes_list[0][0] + else: + parallel_output_shape = default_shape + + for i in range(num_kernels): + if i < mid: + # 并行部分:统一输出形状 + output_shapes.append(parallel_output_shape) + elif i == mid: + # 第一个顺序内核:输出形状匹配下一个内核的第一个输入(如果有) + if i == num_kernels - 1: + # 如果是最后一个,使用其第一个输入的形状 + if input_shapes_list and i < len(input_shapes_list): + output_shapes.append(input_shapes_list[i][0]) + else: + output_shapes.append(default_shape) + else: + # 匹配下一个内核的第一个输入 + if input_shapes_list and i + 1 < len(input_shapes_list): + output_shapes.append(input_shapes_list[i + 1][0]) + else: + output_shapes.append(default_shape) + else: + # 后续顺序内核 + if i == num_kernels - 1: + # 最后一个内核 + if input_shapes_list and i < len(input_shapes_list): + output_shapes.append(input_shapes_list[i][0]) + else: + output_shapes.append(default_shape) + else: + # 匹配下一个内核的第一个输入 + if input_shapes_list and i + 1 < len(input_shapes_list): + output_shapes.append(input_shapes_list[i + 1][0]) + else: + output_shapes.append(default_shape) + + return output_shapes + + def _regenerate_kernel_code_with_unified_shapes( + self, + kernel: Dict[str, Any], + input_shapes_map: Dict[str, Tuple[int, int]], + ) -> str: + """使用统一的输入形状重新生成 kernel 代码 + + Args: + kernel: 内核信息字典 + input_shapes_map: 统一的输入形状映射 + + Returns: + 重新生成的 kernel 代码 + """ + kernel_name = kernel["name"] + output_shape = kernel["output_shape"] + op_chain = kernel["op_chain"] + rows, cols = output_shape + + # 使用统一的输入形状生成函数签名 + params = [] + for inp_name, _ in kernel["inputs"]: + unified_shape = input_shapes_map[inp_name] + params.append(f"{inp_name}: pl.Tensor[[{unified_shape[0]}, {unified_shape[1]}], pl.FP32]") + # 添加 output_tensor 参数 + params.append(f"output: pl.Tensor[[{rows}, {cols}], pl.FP32]") + + code_lines = [ + f" @pl.function(type=pl.FunctionType.InCore)", + f" def {kernel_name}(self, {', '.join(params)}) -> pl.Tensor[[{rows}, {cols}], pl.FP32]:", + ] + + # 加载输入张量 - 使用每个输入的实际定义形状 + for inp_name, _ in kernel["inputs"]: + inp_shape = input_shapes_map[inp_name] + code_lines.append(f" tile_{inp_name} = pl.load({inp_name}, offsets=[0, 0], shapes=[{inp_shape[0]}, {inp_shape[1]}])") + + # 生成操作链 + for op_dict in op_chain: + op = op_dict["op"] + inputs_str = ", ".join(op_dict["inputs"]) + output = op_dict["output"] + params_dict = op_dict.get("params") + + # 去掉 block. 前缀,直接使用 pl.xxx + op_name = op.name.replace("block.", "") + + if params_dict: + params_str = ", ".join(f"{k}={v}" for k, v in params_dict.items()) + code_lines.append(f" {output} = pl.{op_name}({inputs_str}, {params_str})") + else: + code_lines.append(f" {output} = pl.{op_name}({inputs_str})") + + # Store 结果并返回 + if op_chain: + last_output = op_chain[-1]["output"] + code_lines.append(f" result = pl.store({last_output}, offsets=[0, 0], shapes=[{rows}, {cols}], output_tensor=output)") + code_lines.append(f" return result") + else: + # 如果没有操作,直接 store 第一个输入 + first_input = kernel["inputs"][0][0] + code_lines.append(f" result = pl.store(tile_{first_input}, offsets=[0, 0], shapes=[{rows}, {cols}], output_tensor=output)") + code_lines.append(f" return result") + + return "\n".join(code_lines) + + def generate_test_case( + self, + test_name: str, + num_kernels: int = 3, + orchestration_mode: str = "sequential", + shape: Tuple[int, int] = (128, 128), + num_ops_range: Tuple[int, int] = (3, 7), + input_shapes_list: Optional[List[List[Tuple[int, int]]]] = None, + tensor_init_type: Optional[str] = None, + atol: float = 1e-5, + rtol: float = 1e-5, + ) -> str: + """生成完整的测试用例代码 + + Args: + test_name: 测试用例名称 + num_kernels: 内核数量 + orchestration_mode: 组合模式 ("sequential", "branching", "mixed") + shape: 张量形状 + num_ops_range: 每个内核的操作数量范围 + input_shapes_list: 每个内核的输入形状列表(可选) + tensor_init_type: 张量初始化类型(可选,如果不指定则使用全局配置) + atol: 绝对误差容忍度 + rtol: 相对误差容忍度 + + Returns: + 完整的测试用例代码字符串 + """ + # 对于 sequential、branching 和 mixed 模式,计算输出形状以确保兼容性 + if orchestration_mode in ["sequential", "branching", "mixed"]: + output_shapes = self._compute_output_shapes_for_sequential( + num_kernels, shape, input_shapes_list, orchestration_mode + ) + else: + output_shapes = None + + # 生成多个内核 + kernels = self.kernel_gen.generate_multiple_kernels( + num_kernels=num_kernels, + num_inputs_range=(2, 3), + num_ops_range=num_ops_range, + shape=shape, + input_shapes_list=input_shapes_list, + output_shapes=output_shapes, + ) + + # 生成 Orchestration 函数 + if orchestration_mode == "sequential": + orch_info = self.orch_gen.generate_sequential(kernels, shape) + elif orchestration_mode == "branching": + orch_info = self.orch_gen.generate_branching(kernels, shape) + elif orchestration_mode == "mixed": + orch_info = self.orch_gen.generate_mixed(kernels, shape) + else: + raise ValueError(f"未知的组合模式: {orchestration_mode}") + + # 生成 Torch 参考实现 + torch_code = self._generate_torch_reference(kernels, orch_info) + + # 生成完整的测试类 + test_code = self._generate_test_class( + test_name=test_name, + kernels=kernels, + orch_info=orch_info, + torch_code=torch_code, + shape=shape, + tensor_init_type=tensor_init_type, + atol=atol, + rtol=rtol, + ) + + return test_code + + def _generate_torch_reference( + self, + kernels: List[Dict[str, Any]], + orch_info: Dict[str, Any], + ) -> str: + """生成 Torch 参考实现代码 + + Args: + kernels: 内核信息列表 + orch_info: Orchestration 信息 + + Returns: + Torch 参考实现代码字符串 + """ + code_lines = [] + + # 为每个内核生成 Torch 函数 + for kernel in kernels: + kernel_name = kernel["name"] + input_names = [inp[0] for inp in kernel["inputs"]] + op_chain = kernel["op_chain"] + + # 嵌套函数不需要 self 参数 + code_lines.append(f" def _torch_{kernel_name}({', '.join(input_names)}):") + code_lines.append(f" \"\"\"Torch 实现: {kernel_name}\"\"\"") + + # 生成 Torch 操作 + code_lines.append(f" # 创建变量环境") + code_lines.append(f" env = {{}}") + for name in input_names: + code_lines.append(f" env['tile_{name}'] = {name}.clone()") + + code_lines.append(f"") + code_lines.append(f" # 执行操作链") + for op_dict in op_chain: + op = op_dict["op"] + inputs = op_dict["inputs"] + output = op_dict["output"] + + # 获取输入值 + input_vals = [] + for inp in inputs: + if inp.startswith("tile_") or inp.startswith("tmp_"): + input_vals.append(f"env['{inp}']") + else: + input_vals.append(inp) + + # 应用约束 + if "avoid_zero" in op.constraints and op.constraints["avoid_zero"]: + for i, inp in enumerate(inputs): + if inp.startswith("tile_") or inp.startswith("tmp_"): + code_lines.append(f" env['{inp}'] = torch.where(torch.abs(env['{inp}']) < 0.01, torch.tensor(1.0), env['{inp}'])") + + if "positive_only" in op.constraints and op.constraints["positive_only"]: + for i, inp in enumerate(inputs): + if inp.startswith("tile_") or inp.startswith("tmp_"): + code_lines.append(f" env['{inp}'] = torch.abs(env['{inp}']) + 1e-6") + + # 生成操作 + if op.np_equivalent: + torch_expr = self._get_torch_operation(op.name, input_vals) + code_lines.append(f" env['{output}'] = {torch_expr}") + + code_lines.append(f" return env['{op_chain[-1]['output']}']") + code_lines.append(f"") + + return "\n".join(code_lines) + + def _get_torch_operation(self, op_name: str, input_vals: List[str]) -> str: + """将 PyPTO 操作名转换为 Torch 操作表达式 + + Args: + op_name: PyPTO 操作名 (如 "block.add") + input_vals: 输入值列表 + + Returns: + Torch 操作表达式字符串 + """ + # 根据操作类型生成表达式 + # 二元操作 + if op_name == "block.add": + return f"{input_vals[0]} + {input_vals[1]}" + elif op_name == "block.sub": + return f"{input_vals[0]} - {input_vals[1]}" + elif op_name == "block.mul": + return f"{input_vals[0]} * {input_vals[1]}" + elif op_name == "block.div": + return f"{input_vals[0]} / {input_vals[1]}" + elif op_name == "block.maximum": + return f"torch.maximum({input_vals[0]}, {input_vals[1]})" + elif op_name == "block.minimum": + return f"torch.minimum({input_vals[0]}, {input_vals[1]})" + # 标量操作 + elif op_name == "block.adds": + return f"{input_vals[0]} + {input_vals[1]}" + elif op_name == "block.subs": + return f"{input_vals[0]} - {input_vals[1]}" + elif op_name == "block.muls": + return f"{input_vals[0]} * {input_vals[1]}" + elif op_name == "block.divs": + return f"{input_vals[0]} / {input_vals[1]}" + # 一元操作 + elif op_name == "block.sqrt": + return f"torch.sqrt({input_vals[0]})" + elif op_name == "block.rsqrt": + return f"torch.rsqrt({input_vals[0]})" + elif op_name == "block.exp": + return f"torch.exp(torch.clamp({input_vals[0]}, -10, 10))" + elif op_name == "block.neg": + return f"-{input_vals[0]}" + elif op_name == "block.recip": + return f"torch.reciprocal({input_vals[0]})" + elif op_name == "block.log": + return f"torch.log({input_vals[0]})" + elif op_name == "block.abs": + return f"torch.abs({input_vals[0]})" + elif op_name == "block.relu": + return f"torch.relu({input_vals[0]})" + # Row expand 操作 + elif op_name == "block.row_expand_add": + return f"{input_vals[0]} + {input_vals[1]}" # Broadcasting + elif op_name == "block.row_expand_sub": + return f"{input_vals[0]} - {input_vals[1]}" + elif op_name == "block.row_expand_mul": + return f"{input_vals[0]} * {input_vals[1]}" + elif op_name == "block.row_expand_div": + return f"{input_vals[0]} / {input_vals[1]}" + # 矩阵操作 + elif op_name == "block.matmul": + return f"torch.matmul({input_vals[0]}, {input_vals[1]})" + else: + return f"# 未知操作: {op_name}" + + def _generate_test_class( + self, + test_name: str, + kernels: List[Dict[str, Any]], + orch_info: Dict[str, Any], + torch_code: str, + shape: Tuple[int, int], + tensor_init_type: Optional[str] = None, + atol: float = 1e-5, + rtol: float = 1e-5, + ) -> str: + """生成完整的测试类代码 + + Args: + test_name: 测试名称 + kernels: 内核信息列表 + orch_info: Orchestration 信息 + torch_code: Torch 参考实现代码 + shape: 张量形状 + tensor_init_type: 张量初始化类型(可选,如果不指定则使用全局配置) + atol: 绝对误差容忍度 + rtol: 相对误差容忍度 + + Returns: + 完整的测试类代码 + """ + rows, cols = shape + class_name = f"Test{test_name.replace('_', ' ').title().replace(' ', '')}" + + # 收集所有输入及其实际形状 + input_shapes_map = {} # {input_name: shape} + for kernel in kernels: + for inp_name, inp_shape in kernel["inputs"]: + if inp_name not in input_shapes_map: + input_shapes_map[inp_name] = inp_shape + # 如果同一个输入在不同内核中有不同形状,使用较大的形状 + elif inp_shape != input_shapes_map[inp_name]: + existing_size = input_shapes_map[inp_name][0] * input_shapes_map[inp_name][1] + new_size = inp_shape[0] * inp_shape[1] + if new_size > existing_size: + input_shapes_map[inp_name] = inp_shape + + input_list = sorted(input_shapes_map.keys()) + + # 输出形状使用最后一个内核的输出形状 + output_shape = kernels[-1]["output_shape"] if kernels else shape + + # 生成头部 + code_lines = [ + f"class {class_name}(PTOTestCase):", + f" \"\"\"", + f" 测试用例: {test_name}", + f" 组合模式: {orch_info['mode']}", + f" 内核数量: {len(kernels)}", + f" \"\"\"", + f"", + f" rows = {rows}", + f" cols = {cols}", + f"", + f" def __init__(self):", + f" super().__init__()", + f" self.config.atol = {atol}", + f" self.config.rtol = {rtol}", + f"", + f" def get_name(self) -> str:", + f" return '{test_name}'", + f"", + f" def define_tensors(self) -> List[TensorSpec]:", + f" return [", + ] + + # 定义输入张量 - 使用实际形状和配置的初始化类型 + for idx, inp_name in enumerate(input_list): + inp_shape = input_shapes_map[inp_name] + init_code = self._generate_tensor_init_value(idx, tensor_init_type) + code_lines.append(f" TensorSpec('{inp_name}', [{inp_shape[0]}, {inp_shape[1]}], DataType.FP32, {init_code}),") + + # 定义输出张量 - 使用实际输出形状 + code_lines.append(f" TensorSpec('output', [{output_shape[0]}, {output_shape[1]}], DataType.FP32, is_output=True),") + code_lines.append(f" ]") + code_lines.append(f"") + + # 生成 PyPTO 程序 + code_lines.append(f" def get_program(self) -> Any:") + code_lines.append(f" import pypto.language as pl") + code_lines.append(f"") + code_lines.append(f" @pl.program") + code_lines.append(f" class {test_name.replace('_', ' ').title().replace(' ', '')}Program:") + + # 添加所有内核(需要额外缩进) + for kernel in kernels: + # 使用统一的输入形状重新生成 kernel 代码 + regenerated_code = self._regenerate_kernel_code_with_unified_shapes(kernel, input_shapes_map) + # 为内核代码添加额外的8个空格缩进(4个用于get_program方法,4个用于@pl.program类) + kernel_lines = regenerated_code.split("\n") + for line in kernel_lines: + code_lines.append(f" {line}") + code_lines.append(f"") + + # 添加合并内核(如果需要) + if orch_info.get("needs_merge_kernel", False): + merge_code = self.orch_gen.generate_merge_kernel(shape) + merge_lines = merge_code.split("\n") + for line in merge_lines: + code_lines.append(f" {line}") + code_lines.append(f"") + + # 添加 Orchestration 函数 + orch_lines = orch_info["code"].split("\n") + for line in orch_lines: + code_lines.append(f" {line}") + code_lines.append(f"") + + code_lines.append(f" return {test_name.replace('_', ' ').title().replace(' ', '')}Program") + code_lines.append(f"") + + # 添加 Torch 参考实现 + code_lines.append(f" def compute_expected(self, tensors, params=None):") + code_lines.append(f" \"\"\"使用 Torch 计算期望输出\"\"\"") + code_lines.append(f" # 将 numpy 数组转换为 torch 张量(仅在输入边界)") + code_lines.append(f" torch_tensors = {{name: torch.from_numpy(arr) for name, arr in tensors.items() if not name.endswith('output')}}") + code_lines.append(f"") + # torch_code 包含嵌套函数定义,需要添加到 compute_expected 内部,所以需要额外缩进 + torch_lines = torch_code.split('\n') + for line in torch_lines: + if line.strip(): # 跳过空行 + code_lines.append(f" {line}") # 添加额外的4个空格缩进 + else: + code_lines.append(line) + code_lines.append(f"") + + # 根据组合模式生成计算逻辑 + if orch_info["mode"] == "sequential": + code_lines.append(f" # 顺序执行模式") + result_var = None + for i, kernel in enumerate(kernels): + kernel_name = kernel["name"] + kernel_inputs = [inp[0] for inp in kernel["inputs"]] + + if i > 0 and result_var: + # 第一个输入使用前一个结果(变量名) + kernel_inputs[0] = result_var + # 构建参数列表:第一个是变量,其他从 torch_tensors 获取 + inputs_parts = [kernel_inputs[0]] + for inp in kernel_inputs[1:]: + inputs_parts.append(f"torch_tensors['{inp}']") + inputs_str = ", ".join(inputs_parts) + else: + # 第一个内核,所有输入都从 torch_tensors 获取 + inputs_str = ", ".join([f"torch_tensors['{inp}']" for inp in kernel_inputs]) + + result_var = f"result_{i}" + # 调用嵌套函数不需要 self + code_lines.append(f" {result_var} = _torch_{kernel_name}({inputs_str})") + + code_lines.append(f" # 将结果转换回 numpy 并写入输出") + code_lines.append(f" tensors['output'][:] = {result_var}.numpy()") + + elif orch_info["mode"] == "branching": + code_lines.append(f" # 分支执行模式") + branch_results = [] + for i, kernel in enumerate(kernels): + kernel_name = kernel["name"] + kernel_inputs = [inp[0] for inp in kernel["inputs"]] + result_var = f"branch_{i}" + branch_results.append(result_var) + + inputs_str = ", ".join([f"torch_tensors['{inp}']" for inp in kernel_inputs]) + # 调用嵌套函数不需要 self + code_lines.append(f" {result_var} = _torch_{kernel_name}({inputs_str})") + + # 合并结果 + if len(branch_results) == 1: + code_lines.append(f" # 将结果转换回 numpy 并写入输出") + code_lines.append(f" tensors['output'][:] = {branch_results[0]}.numpy()") + else: + merged = branch_results[0] + for i in range(1, len(branch_results)): + new_merged = f"merged_{i}" + code_lines.append(f" {new_merged} = {merged} + {branch_results[i]}") + merged = new_merged + code_lines.append(f" # 将结果转换回 numpy 并写入输出") + code_lines.append(f" tensors['output'][:] = {merged}.numpy()") + + elif orch_info["mode"] == "mixed": + code_lines.append(f" # 混合执行模式") + mid = len(kernels) // 2 + parallel_kernels = kernels[:mid] + sequential_kernels = kernels[mid:] + + # 并行部分 + branch_results = [] + for i, kernel in enumerate(parallel_kernels): + kernel_name = kernel["name"] + kernel_inputs = [inp[0] for inp in kernel["inputs"]] + result_var = f"parallel_{i}" + branch_results.append(result_var) + + inputs_str = ", ".join([f"torch_tensors['{inp}']" for inp in kernel_inputs]) + # 调用嵌套函数不需要 self + code_lines.append(f" {result_var} = _torch_{kernel_name}({inputs_str})") + + # 合并并行结果 + if len(branch_results) > 1: + merged = branch_results[0] + for i in range(1, len(branch_results)): + new_merged = f"merged_parallel_{i}" + code_lines.append(f" {new_merged} = {merged} + {branch_results[i]}") + merged = new_merged + current_result = merged + else: + current_result = branch_results[0] + + # 顺序部分 + for i, kernel in enumerate(sequential_kernels): + kernel_name = kernel["name"] + kernel_inputs = [inp[0] for inp in kernel["inputs"]] + kernel_inputs[0] = current_result + + result_var = f"sequential_{i}" + # 第一个输入是变量,其他是张量 + inputs_parts = [kernel_inputs[0]] + for inp in kernel_inputs[1:]: + inputs_parts.append(f"torch_tensors['{inp}']") + inputs_str = ", ".join(inputs_parts) + # 调用嵌套函数不需要 self + code_lines.append(f" {result_var} = _torch_{kernel_name}({inputs_str})") + current_result = result_var + + code_lines.append(f" # 将结果转换回 numpy 并写入输出") + code_lines.append(f" tensors['output'][:] = {current_result}.numpy()") + + code_lines.append(f"") + + return "\n".join(code_lines) + + def generate_test_file( + self, + output_path: str, + test_configs: List[Dict[str, Any]], + ) -> None: + """生成完整的测试文件 + + Args: + output_path: 输出文件路径 + test_configs: 测试配置列表,每个配置包含: + - name: 测试名称 + - num_kernels: 内核数量 + - mode: 组合模式 + - shape: 张量形状 + - num_ops_range: 操作数量范围 + - tensor_init_type: 张量初始化类型(可选) + """ + # 生成文件头 + header = '''""" +自动生成的多内核模糊测试用例 + +该文件由 MultiKernelTestGenerator 自动生成。 +包含多个测试用例,每个测试用例包含多个 InCore 内核和一个 Orchestration 函数。 +""" + +import sys +from pathlib import Path +from typing import Any, List + +import torch +import pytest + +from pto_test.core.test_case import DataType, PTOTestCase, TensorSpec + +# 添加 pypto 到路径 +_FRAMEWORK_ROOT = Path(__file__).parent.parent.parent.parent +_PYPTO_ROOT = _FRAMEWORK_ROOT / "3rdparty" / "pypto" / "python" +if _PYPTO_ROOT.exists() and str(_PYPTO_ROOT) not in sys.path: + sys.path.insert(0, str(_PYPTO_ROOT)) + + +''' + + # 生成所有测试用例 + test_cases = [] + for config in test_configs: + test_code = self.generate_test_case( + test_name=config["name"], + num_kernels=config.get("num_kernels", 3), + orchestration_mode=config.get("mode", "sequential"), + shape=config.get("shape", (128, 128)), + num_ops_range=config.get("num_ops_range", (3, 7)), + input_shapes_list=config.get("input_shapes_list"), + tensor_init_type=config.get("tensor_init_type"), + ) + test_cases.append(test_code) + + # 生成测试套件 + test_suite = ''' + +class TestMultiKernelFuzzing: + """多内核模糊测试套件""" + +''' + + for config in test_configs: + test_name = config["name"] + class_name = f"Test{test_name.replace('_', ' ').title().replace(' ', '')}" + test_suite += f''' def test_{test_name}(self, test_runner): + """测试 {test_name}""" + test_case = {class_name}() + result = test_runner.run(test_case) + assert result.passed, f"测试失败: {{result.error}}" + +''' + + # 组合完整文件 + full_content = header + "\n\n".join(test_cases) + test_suite + + # 写入文件 + output_file = Path(output_path) + output_file.parent.mkdir(parents=True, exist_ok=True) + output_file.write_text(full_content, encoding="utf-8") + + print(f"测试文件已生成: {output_path}") diff --git a/src/fuzzer/src/orchestrator_generator.py b/src/fuzzer/src/orchestrator_generator.py new file mode 100644 index 0000000..dddb1ad --- /dev/null +++ b/src/fuzzer/src/orchestrator_generator.py @@ -0,0 +1,303 @@ +""" +Orchestration 组合函数生成器 + +该模块负责生成 @pl.function(type=pl.FunctionType.Orchestration) 函数, +用于组合多个 InCore 内核。支持三种组合模式: +- Sequential: 顺序执行内核 +- Branching: 分支执行内核 +- Mixed: 混合模式 +""" + +import random +from typing import Any, Dict, List, Optional, Tuple + +from .fuzzer import is_shape_aligned + + +class OrchestratorGenerator: + """生成 Orchestration 组合函数的生成器""" + + def __init__(self, seed: Optional[int] = None): + """初始化组合函数生成器 + + Args: + seed: 随机种子,用于可重现性 + """ + self.rng = random.Random(seed) + + def generate_sequential( + self, + kernels: List[Dict[str, Any]], + shape: Tuple[int, int] = (128, 128), + ) -> Dict[str, Any]: + """生成顺序执行模式的 Orchestration 函数 + + 在顺序模式中,每个内核的输出作为下一个内核的输入。 + + Args: + kernels: 内核信息列表 + shape: 张量形状 + + Returns: + 包含组合函数信息的字典 + """ + if not kernels: + raise ValueError("至少需要一个内核") + + # 收集所有需要的输入及其形状 + input_shapes_map = {} # {input_name: shape} + for kernel in kernels: + for inp_name, inp_shape in kernel["inputs"]: + if inp_name not in input_shapes_map: + input_shapes_map[inp_name] = inp_shape + # 如果同一个输入在不同内核中有不同形状,使用较大的形状 + elif inp_shape != input_shapes_map[inp_name]: + existing_size = input_shapes_map[inp_name][0] * input_shapes_map[inp_name][1] + new_size = inp_shape[0] * inp_shape[1] + if new_size > existing_size: + input_shapes_map[inp_name] = inp_shape + + # 生成函数签名 + input_params = sorted(input_shapes_map.keys()) + params = [] + for name in input_params: + inp_shape = input_shapes_map[name] + params.append(f"{name}: pl.Tensor[[{inp_shape[0]}, {inp_shape[1]}], pl.FP32]") + + # 输出形状使用最后一个内核的输出形状 + output_shape = kernels[-1]["output_shape"] + rows, cols = output_shape + + code_lines = [ + " @pl.function(type=pl.FunctionType.Orchestration)", + f" def orchestrator(self, {', '.join(params)}) -> pl.Tensor[[{rows}, {cols}], pl.FP32]:", + ] + + # 顺序调用内核 - 不需要显式创建 tensor + result_var = None + for i, kernel in enumerate(kernels): + kernel_name = kernel["name"] + kernel_inputs = [inp[0] for inp in kernel["inputs"]] + + # 如果不是第一个内核,使用前一个内核的输出 + if i > 0 and result_var: + # 替换第一个输入为前一个内核的输出 + kernel_inputs[0] = result_var + + # 调用 InCore 函数,框架会自动处理输出 tensor + result_var = f"result_{i}" + inputs_str = ", ".join(kernel_inputs) + code_lines.append(f" {result_var} = self.{kernel_name}({inputs_str})") + + # 返回最后一个结果 + code_lines.append(f" return {result_var}") + + return { + "mode": "sequential", + "code": "\n".join(code_lines), + "inputs": input_params, + "output_shape": output_shape, + } + + def generate_branching( + self, + kernels: List[Dict[str, Any]], + shape: Tuple[int, int] = (128, 128), + ) -> Dict[str, Any]: + """生成分支执行模式的 Orchestration 函数 + + 在分支模式中,多个内核并行执行,然后合并结果。 + + Args: + kernels: 内核信息列表 + shape: 张量形状 + + Returns: + 包含组合函数信息的字典 + """ + if not kernels: + raise ValueError("至少需要一个内核") + + # 收集所有需要的输入及其形状 + input_shapes_map = {} # {input_name: shape} + for kernel in kernels: + for inp_name, inp_shape in kernel["inputs"]: + if inp_name not in input_shapes_map: + input_shapes_map[inp_name] = inp_shape + # 如果同一个输入在不同内核中有不同形状,使用较大的形状 + elif inp_shape != input_shapes_map[inp_name]: + existing_size = input_shapes_map[inp_name][0] * input_shapes_map[inp_name][1] + new_size = inp_shape[0] * inp_shape[1] + if new_size > existing_size: + input_shapes_map[inp_name] = inp_shape + + # 生成函数签名 + input_params = sorted(input_shapes_map.keys()) + params = [] + for name in input_params: + inp_shape = input_shapes_map[name] + params.append(f"{name}: pl.Tensor[[{inp_shape[0]}, {inp_shape[1]}], pl.FP32]") + + # 输出形状:在分支模式中,所有分支应该有相同的输出形状 + # 使用第一个内核的输出形状 + output_shape = kernels[0]["output_shape"] + rows, cols = output_shape + + code_lines = [ + " @pl.function(type=pl.FunctionType.Orchestration)", + f" def orchestrator(self, {', '.join(params)}) -> pl.Tensor[[{rows}, {cols}], pl.FP32]:", + ] + + # 并行执行所有内核 - 不需要显式创建 tensor + result_vars = [] + for i, kernel in enumerate(kernels): + kernel_name = kernel["name"] + kernel_inputs = [inp[0] for inp in kernel["inputs"]] + result_var = f"branch_{i}" + result_vars.append(result_var) + + inputs_str = ", ".join(kernel_inputs) + code_lines.append(f" {result_var} = self.{kernel_name}({inputs_str})") + + # 合并所有分支结果 + if len(result_vars) == 1: + code_lines.append(f" return {result_vars[0]}") + else: + # 使用 add 操作合并结果 + code_lines.append(f" # 合并所有分支结果") + merged = result_vars[0] + for i in range(1, len(result_vars)): + new_merged = f"merged_{i}" + code_lines.append(f" {new_merged} = self.merge_results({merged}, {result_vars[i]})") + merged = new_merged + code_lines.append(f" return {merged}") + + return { + "mode": "branching", + "code": "\n".join(code_lines), + "inputs": input_params, + "output_shape": output_shape, + "needs_merge_kernel": len(result_vars) > 1, + } + + def generate_mixed( + self, + kernels: List[Dict[str, Any]], + shape: Tuple[int, int] = (128, 128), + ) -> Dict[str, Any]: + """生成混合模式的 Orchestration 函数 + + 混合模式结合了顺序和分支执行。 + + Args: + kernels: 内核信息列表 + shape: 张量形状 + + Returns: + 包含组合函数信息的字典 + """ + if len(kernels) < 2: + # 如果内核数量少于2,使用顺序模式 + return self.generate_sequential(kernels, shape) + + # 收集所有需要的输入及其形状 + input_shapes_map = {} # {input_name: shape} + for kernel in kernels: + for inp_name, inp_shape in kernel["inputs"]: + if inp_name not in input_shapes_map: + input_shapes_map[inp_name] = inp_shape + # 如果同一个输入在不同内核中有不同形状,使用较大的形状 + elif inp_shape != input_shapes_map[inp_name]: + existing_size = input_shapes_map[inp_name][0] * input_shapes_map[inp_name][1] + new_size = inp_shape[0] * inp_shape[1] + if new_size > existing_size: + input_shapes_map[inp_name] = inp_shape + + # 生成函数签名 + input_params = sorted(input_shapes_map.keys()) + params = [] + for name in input_params: + inp_shape = input_shapes_map[name] + params.append(f"{name}: pl.Tensor[[{inp_shape[0]}, {inp_shape[1]}], pl.FP32]") + + # 输出形状使用最后一个内核的输出形状 + output_shape = kernels[-1]["output_shape"] + rows, cols = output_shape + + code_lines = [ + " @pl.function(type=pl.FunctionType.Orchestration)", + f" def orchestrator(self, {', '.join(params)}) -> pl.Tensor[[{rows}, {cols}], pl.FP32]:", + ] + + # 将内核分成两组:前半部分并行,后半部分顺序 + mid = len(kernels) // 2 + parallel_kernels = kernels[:mid] + sequential_kernels = kernels[mid:] + + # 并行执行前半部分 - 不需要显式创建 tensor + branch_results = [] + for i, kernel in enumerate(parallel_kernels): + kernel_name = kernel["name"] + kernel_inputs = [inp[0] for inp in kernel["inputs"]] + result_var = f"parallel_{i}" + branch_results.append(result_var) + + inputs_str = ", ".join(kernel_inputs) + code_lines.append(f" {result_var} = self.{kernel_name}({inputs_str})") + + # 合并并行结果 + if len(branch_results) > 1: + code_lines.append(f" # 合并并行结果") + merged = branch_results[0] + for i in range(1, len(branch_results)): + new_merged = f"merged_parallel_{i}" + code_lines.append(f" {new_merged} = self.merge_results({merged}, {branch_results[i]})") + merged = new_merged + current_result = merged + else: + current_result = branch_results[0] + + # 顺序执行后半部分 + for i, kernel in enumerate(sequential_kernels): + kernel_name = kernel["name"] + kernel_inputs = [inp[0] for inp in kernel["inputs"]] + + # 使用前一个结果作为第一个输入 + kernel_inputs[0] = current_result + + result_var = f"sequential_{i}" + inputs_str = ", ".join(kernel_inputs) + code_lines.append(f" {result_var} = self.{kernel_name}({inputs_str})") + current_result = result_var + + # 返回最终结果 + code_lines.append(f" return {current_result}") + + return { + "mode": "mixed", + "code": "\n".join(code_lines), + "inputs": input_params, + "output_shape": output_shape, + "needs_merge_kernel": len(branch_results) > 1, + } + + def generate_merge_kernel(self, shape: Tuple[int, int] = (128, 128)) -> str: + """生成用于合并结果的辅助内核 + + Args: + shape: 张量形状 + + Returns: + 合并内核的代码字符串 + """ + rows, cols = shape + code = f""" @pl.function(type=pl.FunctionType.InCore) + def merge_results(self, a: pl.Tensor[[{rows}, {cols}], pl.FP32], + b: pl.Tensor[[{rows}, {cols}], pl.FP32], + output: pl.Tensor[[{rows}, {cols}], pl.FP32]) -> pl.Tensor[[{rows}, {cols}], pl.FP32]: + tile_a = pl.load(a, offsets=[0, 0], shapes=[{rows}, {cols}]) + tile_b = pl.load(b, offsets=[0, 0], shapes=[{rows}, {cols}]) + result_tile = pl.add(tile_a, tile_b) + result = pl.store(result_tile, offsets=[0, 0], shapes=[{rows}, {cols}], output_tensor=output) + return result""" + return code diff --git a/src/pto_test/codegen/golden_generator.py b/src/pto_test/codegen/golden_generator.py index 1d8442b..dd0b84d 100644 --- a/src/pto_test/codegen/golden_generator.py +++ b/src/pto_test/codegen/golden_generator.py @@ -66,6 +66,7 @@ def generate(self, test_case: "PTOTestCase") -> str: '"""', "", "import numpy as np", + "import torch", "", f"__outputs__ = {output_names!r}", f"TENSOR_ORDER = {tensor_order!r}", @@ -274,6 +275,7 @@ def generate_with_callback( '"""', "", "import numpy as np", + "import torch", "", f"__outputs__ = {output_names!r}", f"TENSOR_ORDER = {tensor_order!r}", diff --git a/tests/test_cases/test_expand.py b/tests/test_cases/test_expand.py new file mode 100644 index 0000000..4e14344 --- /dev/null +++ b/tests/test_cases/test_expand.py @@ -0,0 +1,404 @@ +""" +Tests for row_expand_div operation using PyPTO frontend. + +This test demonstrates the row_expand_div operation which expands a row vector +and performs element-wise division with a matrix. +""" + +import sys +from pathlib import Path +from typing import Any, List + +import numpy as np +import pytest + +from pto_test.core import environment +from pto_test.core.test_case import DataType, PTOTestCase, TensorSpec + +# Add pypto to path +_PYPTO_PYTHON = environment.get_pypto_python_path() +if _PYPTO_PYTHON is not None and _PYPTO_PYTHON.exists() and str(_PYPTO_PYTHON) not in sys.path: + sys.path.insert(0, str(_PYPTO_PYTHON)) + + +class TestRowExpandDivBase(PTOTestCase): + """Base test case for row_expand_div operation. + + This operation takes a matrix and a column vector, and divides each row + of the matrix by the corresponding scalar value from the column vector. + + For example: + - Matrix a: [[6, 8], [12, 16]] + - Column vector b: [[2], [4]] + - Result c: [[6/2, 8/2], [12/4, 16/4]] = [[3, 4], [3, 4]] + + Note: PyPTO requires shape dimensions to be compile-time constants in type + annotations, so each shape needs its own subclass with get_program() method. + """ + + # Subclasses must define these + ROWS = 128 + COLS = 128 + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.rows = self.ROWS + self.cols = self.COLS + + def get_name(self) -> str: + return f"row_expand_div_{self.rows}x{self.cols}" + + def define_tensors(self) -> List[TensorSpec]: + return [ + # Matrix to be divided (random values) + TensorSpec("a", [self.rows, self.cols], DataType.FP32, + init_value=lambda shape: np.random.rand(*shape).astype(np.float32)), + # Column vector (divisor) - shape is [rows, 1] (random values, avoid division by zero) + TensorSpec("b", [self.rows, 1], DataType.FP32, + init_value=lambda shape: (np.random.rand(*shape) + 0.1).astype(np.float32)), + # Output tensor + TensorSpec("c", [self.rows, self.cols], DataType.FP32, is_output=True), + ] + + def compute_expected(self, tensors, params=None): + """Compute expected output: each row of a divided by corresponding scalar in b.""" + # Broadcasting: a[rows, cols] / b[rows, 1] -> c[rows, cols] + tensors["c"][:] = tensors["a"] / tensors["b"] + + +# Generate test classes for different shapes +class TestRowExpandDiv_32x32(TestRowExpandDivBase): + ROWS = 32 + COLS = 32 + + def get_program(self) -> Any: + import pypto.language as pl + + @pl.program + class RowExpandDivProgram: + @pl.function + def row_expand_div( + self, + a: pl.Tensor[[32, 32], pl.FP32], + b: pl.Tensor[[1, 32], pl.FP32], + c: pl.Tensor[[32, 32], pl.FP32], + ) -> pl.Tensor[[32, 32], pl.FP32]: + tile_a = pl.load(a, offsets=[0, 0], shapes=[32, 32]) + tile_b = pl.load(b, offsets=[0, 0], shapes=[1, 32]) + + tile_b_reshaped = pl.reshape(tile_b, [32, 1]) + + tile_c = pl.row_expand_div(tile_a, tile_b_reshaped) + + out_c = pl.store(tile_c, offsets=[0, 0], shapes=[32, 32], output_tensor=c) + return out_c + + @pl.function(type=pl.FunctionType.Orchestration) + def orchestrator( + self, + a: pl.Tensor[[32, 32], pl.FP32], + b: pl.Tensor[[1, 32], pl.FP32] + ) -> pl.Tensor[[32, 32], pl.FP32]: + out_c = self.row_expand_div(a, b) + return out_c + + return RowExpandDivProgram + + +class TestRowExpandDiv_64x64(TestRowExpandDivBase): + ROWS = 64 + COLS = 64 + + def get_program(self) -> Any: + import pypto.language as pl + + @pl.program + class RowExpandDivProgram: + @pl.function + def row_expand_div( + self, + a: pl.Tensor[[64, 64], pl.FP32], + b: pl.Tensor[[64, 1], pl.FP32], + c: pl.Tensor[[64, 64], pl.FP32], + ) -> pl.Tensor[[64, 64], pl.FP32]: + tile_a = pl.load(a, offsets=[0, 0], shapes=[64, 64]) + tile_b = pl.load(b, offsets=[0, 0], shapes=[64, 1]) + tile_c = pl.row_expand_div(tile_a, tile_b) + out_c = pl.store(tile_c, offsets=[0, 0], shapes=[64, 64], output_tensor=c) + return out_c + + @pl.function(type=pl.FunctionType.Orchestration) + def orchestrator( + self, + a: pl.Tensor[[64, 64], pl.FP32], + b: pl.Tensor[[64, 1], pl.FP32] + ) -> pl.Tensor[[64, 64], pl.FP32]: + out_c = self.row_expand_div(a, b) + return out_c + + return RowExpandDivProgram + + +class TestRowExpandDiv_128x128(TestRowExpandDivBase): + ROWS = 128 + COLS = 128 + + def get_program(self) -> Any: + import pypto.language as pl + + @pl.program + class RowExpandDivProgram: + @pl.function + def row_expand_div( + self, + a: pl.Tensor[[128, 128], pl.FP32], + b: pl.Tensor[[128, 1], pl.FP32], + c: pl.Tensor[[128, 128], pl.FP32], + ) -> pl.Tensor[[128, 128], pl.FP32]: + tile_a = pl.load(a, offsets=[0, 0], shapes=[128, 128]) + tile_b = pl.load(b, offsets=[0, 0], shapes=[128, 1]) + tile_c = pl.row_expand_div(tile_a, tile_b) + out_c = pl.store(tile_c, offsets=[0, 0], shapes=[128, 128], output_tensor=c) + return out_c + + @pl.function(type=pl.FunctionType.Orchestration) + def orchestrator( + self, + a: pl.Tensor[[128, 128], pl.FP32], + b: pl.Tensor[[128, 1], pl.FP32] + ) -> pl.Tensor[[128, 128], pl.FP32]: + out_c = self.row_expand_div(a, b) + return out_c + + return RowExpandDivProgram + + +class TestRowExpandDiv_128x64(TestRowExpandDivBase): + ROWS = 128 + COLS = 64 + + def get_program(self) -> Any: + import pypto.language as pl + + @pl.program + class RowExpandDivProgram: + @pl.function + def row_expand_div( + self, + a: pl.Tensor[[128, 64], pl.FP32], + b: pl.Tensor[[128, 1], pl.FP32], + c: pl.Tensor[[128, 64], pl.FP32], + ) -> pl.Tensor[[128, 64], pl.FP32]: + tile_a = pl.load(a, offsets=[0, 0], shapes=[128, 64]) + tile_b = pl.load(b, offsets=[0, 0], shapes=[128, 1]) + tile_c = pl.row_expand_div(tile_a, tile_b) + out_c = pl.store(tile_c, offsets=[0, 0], shapes=[128, 64], output_tensor=c) + return out_c + + @pl.function(type=pl.FunctionType.Orchestration) + def orchestrator( + self, + a: pl.Tensor[[128, 64], pl.FP32], + b: pl.Tensor[[128, 1], pl.FP32] + ) -> pl.Tensor[[128, 64], pl.FP32]: + out_c = self.row_expand_div(a, b) + return out_c + + return RowExpandDivProgram + + +class TestRowExpandDiv_64x128(TestRowExpandDivBase): + ROWS = 64 + COLS = 128 + + def get_program(self) -> Any: + import pypto.language as pl + + @pl.program + class RowExpandDivProgram: + @pl.function + def row_expand_div( + self, + a: pl.Tensor[[64, 128], pl.FP32], + b: pl.Tensor[[64, 1], pl.FP32], + c: pl.Tensor[[64, 128], pl.FP32], + ) -> pl.Tensor[[64, 128], pl.FP32]: + tile_a = pl.load(a, offsets=[0, 0], shapes=[64, 128]) + tile_b = pl.load(b, offsets=[0, 0], shapes=[64, 1]) + tile_c = pl.row_expand_div(tile_a, tile_b) + out_c = pl.store(tile_c, offsets=[0, 0], shapes=[64, 128], output_tensor=c) + return out_c + + @pl.function(type=pl.FunctionType.Orchestration) + def orchestrator( + self, + a: pl.Tensor[[64, 128], pl.FP32], + b: pl.Tensor[[64, 1], pl.FP32] + ) -> pl.Tensor[[64, 128], pl.FP32]: + out_c = self.row_expand_div(a, b) + return out_c + + return RowExpandDivProgram + + +class TestRowExpandDiv_96x96(TestRowExpandDivBase): + ROWS = 96 + COLS = 96 + + def get_program(self) -> Any: + import pypto.language as pl + + @pl.program + class RowExpandDivProgram: + @pl.function + def row_expand_div( + self, + a: pl.Tensor[[96, 96], pl.FP32], + b: pl.Tensor[[96, 1], pl.FP32], + c: pl.Tensor[[96, 96], pl.FP32], + ) -> pl.Tensor[[96, 96], pl.FP32]: + tile_a = pl.load(a, offsets=[0, 0], shapes=[96, 96]) + tile_b = pl.load(b, offsets=[0, 0], shapes=[96, 1]) + tile_c = pl.row_expand_div(tile_a, tile_b) + out_c = pl.store(tile_c, offsets=[0, 0], shapes=[96, 96], output_tensor=c) + return out_c + + @pl.function(type=pl.FunctionType.Orchestration) + def orchestrator( + self, + a: pl.Tensor[[96, 96], pl.FP32], + b: pl.Tensor[[96, 1], pl.FP32] + ) -> pl.Tensor[[96, 96], pl.FP32]: + out_c = self.row_expand_div(a, b) + return out_c + + return RowExpandDivProgram + + +class TestRowExpandDiv_80x96(TestRowExpandDivBase): + ROWS = 80 + COLS = 96 + + def get_program(self) -> Any: + import pypto.language as pl + + @pl.program + class RowExpandDivProgram: + @pl.function + def row_expand_div( + self, + a: pl.Tensor[[80, 96], pl.FP32], + b: pl.Tensor[[80, 1], pl.FP32], + c: pl.Tensor[[80, 96], pl.FP32], + ) -> pl.Tensor[[80, 96], pl.FP32]: + tile_a = pl.load(a, offsets=[0, 0], shapes=[80, 96]) + tile_b = pl.load(b, offsets=[0, 0], shapes=[80, 1]) + tile_c = pl.row_expand_div(tile_a, tile_b) + out_c = pl.store(tile_c, offsets=[0, 0], shapes=[80, 96], output_tensor=c) + return out_c + + @pl.function(type=pl.FunctionType.Orchestration) + def orchestrator( + self, + a: pl.Tensor[[80, 96], pl.FP32], + b: pl.Tensor[[80, 1], pl.FP32] + ) -> pl.Tensor[[80, 96], pl.FP32]: + out_c = self.row_expand_div(a, b) + return out_c + + return RowExpandDivProgram + + +class TestRowExpandDiv_96x80(TestRowExpandDivBase): + ROWS = 96 + COLS = 80 + + def get_program(self) -> Any: + import pypto.language as pl + + @pl.program + class RowExpandDivProgram: + @pl.function + def row_expand_div( + self, + a: pl.Tensor[[96, 80], pl.FP32], + b: pl.Tensor[[96, 1], pl.FP32], + c: pl.Tensor[[96, 80], pl.FP32], + ) -> pl.Tensor[[96, 80], pl.FP32]: + tile_a = pl.load(a, offsets=[0, 0], shapes=[96, 80]) + tile_b = pl.load(b, offsets=[0, 0], shapes=[96, 1]) + tile_c = pl.row_expand_div(tile_a, tile_b) + out_c = pl.store(tile_c, offsets=[0, 0], shapes=[96, 80], output_tensor=c) + return out_c + + @pl.function(type=pl.FunctionType.Orchestration) + def orchestrator( + self, + a: pl.Tensor[[96, 80], pl.FP32], + b: pl.Tensor[[96, 1], pl.FP32] + ) -> pl.Tensor[[96, 80], pl.FP32]: + out_c = self.row_expand_div(a, b) + return out_c + + return RowExpandDivProgram + + +# ============================================================================= +# pytest test functions +# ============================================================================= + + +def test_row_expand_div_32x32(test_runner): + """Test 32x32 shape.""" + test_case = TestRowExpandDiv_32x32() + result = test_runner.run(test_case) + assert result.passed, f"Test failed: {result.error}" + + +def test_row_expand_div_64x64(test_runner): + """Test 64x64 shape.""" + test_case = TestRowExpandDiv_64x64() + result = test_runner.run(test_case) + assert result.passed, f"Test failed: {result.error}" + + +def test_row_expand_div_128x128(test_runner): + """Test 128x128 shape.""" + test_case = TestRowExpandDiv_128x128() + result = test_runner.run(test_case) + assert result.passed, f"Test failed: {result.error}" + + +def test_row_expand_div_128x64(test_runner): + """Test 128x64 shape.""" + test_case = TestRowExpandDiv_128x64() + result = test_runner.run(test_case) + assert result.passed, f"Test failed: {result.error}" + + +def test_row_expand_div_64x128(test_runner): + """Test 64x128 shape.""" + test_case = TestRowExpandDiv_64x128() + result = test_runner.run(test_case) + assert result.passed, f"Test failed: {result.error}" + + +def test_row_expand_div_96x96(test_runner): + """Test 96x96 shape.""" + test_case = TestRowExpandDiv_96x96() + result = test_runner.run(test_case) + assert result.passed, f"Test failed: {result.error}" + + +def test_row_expand_div_80x96(test_runner): + """Test 80x96 shape.""" + test_case = TestRowExpandDiv_80x96() + result = test_runner.run(test_case) + assert result.passed, f"Test failed: {result.error}" + + +def test_row_expand_div_96x80(test_runner): + """Test 96x80 shape.""" + test_case = TestRowExpandDiv_96x80() + result = test_runner.run(test_case) + assert result.passed, f"Test failed: {result.error}"