diff --git a/tests/test_cases/test_broadcast.py b/tests/test_cases/test_broadcast.py new file mode 100644 index 0000000..9f5db3e --- /dev/null +++ b/tests/test_cases/test_broadcast.py @@ -0,0 +1,391 @@ +""" +Tests for broadcast operations using PyPTO frontend. + +Tests tile-level broadcast operations: +- row_expand_sub: Row-wise broadcast subtraction +- row_expand_mul: Row-wise broadcast multiplication +- row_expand_div: Row-wise broadcast division + +These tests use the simplified pattern where orchestration is auto-generated. +Each operation has both 64x64 and 128x128 test cases. +""" + +import sys +from pathlib import Path +from typing import Any, List + +import numpy as np + +from pto_test.core import environment +from pto_test.core.test_case import DataType, PTOTestCase, TensorSpec + +# Add pypto to path +_PYPTO_PYTHON = environment.get_pypto_python_path() +if _PYPTO_PYTHON is not None and _PYPTO_PYTHON.exists() and str(_PYPTO_PYTHON) not in sys.path: + sys.path.insert(0, str(_PYPTO_PYTHON)) + + +# ============================================================================= +# Row-wise broadcast subtraction +# ============================================================================= + + +class TestTileRowExpandSub(PTOTestCase): + """Base class for tile row-wise broadcast subtraction tests.""" + + ROWS = 128 # Override in subclasses + COLS = 128 # Override in subclasses + + def get_name(self) -> str: + return f"tile_row_expand_sub_{self.ROWS}x{self.COLS}" + + def define_tensors(self) -> List[TensorSpec]: + return [ + TensorSpec( + "a", + [self.ROWS, self.COLS], + DataType.FP32, + init_value=lambda shape: np.random.randn(*shape), + ), + TensorSpec( + "row_vec", + [self.ROWS, 1], + DataType.FP32, + init_value=lambda shape: np.random.randn(*shape), + ), + TensorSpec("c", [self.ROWS, self.COLS], DataType.FP32, is_output=True), + ] + + def get_program(self) -> Any: + raise NotImplementedError("Subclasses must implement get_program() with their specific shape") + + def compute_expected(self, tensors, params=None): + tensors["c"][:] = tensors["a"] - tensors["row_vec"] + + +class TestTileRowExpandSub64x64(TestTileRowExpandSub): + """64x64 tile row-wise broadcast subtraction test.""" + + ROWS = 64 + COLS = 64 + + def get_program(self) -> Any: + import pypto.language as pl + + @pl.program + class TileRowExpandSubProgram: + @pl.function(type=pl.FunctionType.InCore) + def tile_row_expand_sub( + self, + a: pl.Tensor[[64, 64], pl.FP32], + row_vec: pl.Tensor[[64, 1], pl.FP32], + c: pl.Tensor[[64, 64], pl.FP32], + ) -> pl.Tensor[[64, 64], pl.FP32]: + tile_a = pl.op.block.load(a, [0, 0], [64, 64]) + tile_row = pl.op.block.load(row_vec, [0, 0], [64, 1]) + tile_c = pl.op.block.row_expand_sub(tile_a, tile_row) + out_c = pl.op.block.store(tile_c, [0, 0], [64, 64], c) + return out_c + + @pl.function(type=pl.FunctionType.Orchestration) + def orchestrator( + self, a: pl.Tensor[[64, 64], pl.FP32], row_vec: pl.Tensor[[64, 1], pl.FP32] + ) -> pl.Tensor[[64, 64], pl.FP32]: + out_c = self.tile_row_expand_sub(a, row_vec) + return out_c + + return TileRowExpandSubProgram + + +class TestTileRowExpandSub128x128(TestTileRowExpandSub): + """128x128 tile row-wise broadcast subtraction test.""" + + ROWS = 128 + COLS = 128 + + def get_program(self) -> Any: + import pypto.language as pl + + @pl.program + class TileRowExpandSubProgram: + @pl.function(type=pl.FunctionType.InCore) + def tile_row_expand_sub( + self, + a: pl.Tensor[[128, 128], pl.FP32], + row_vec: pl.Tensor[[128, 1], pl.FP32], + c: pl.Tensor[[128, 128], pl.FP32], + ) -> pl.Tensor[[128, 128], pl.FP32]: + tile_a = pl.op.block.load(a, [0, 0], [128, 128]) + tile_row = pl.op.block.load(row_vec, [0, 0], [128, 1]) + tile_c = pl.op.block.row_expand_sub(tile_a, tile_row) + out_c = pl.op.block.store(tile_c, [0, 0], [128, 128], c) + return out_c + + @pl.function(type=pl.FunctionType.Orchestration) + def orchestrator( + self, a: pl.Tensor[[128, 128], pl.FP32], row_vec: pl.Tensor[[128, 1], pl.FP32] + ) -> pl.Tensor[[128, 128], pl.FP32]: + out_c = self.tile_row_expand_sub(a, row_vec) + return out_c + + return TileRowExpandSubProgram + + +# ============================================================================= +# Row-wise broadcast multiplication +# ============================================================================= + + +class TestTileRowExpandMul(PTOTestCase): + """Base class for tile row-wise broadcast multiplication tests.""" + + ROWS = 128 # Override in subclasses + COLS = 128 # Override in subclasses + + def get_name(self) -> str: + return f"tile_row_expand_mul_{self.ROWS}x{self.COLS}" + + def define_tensors(self) -> List[TensorSpec]: + return [ + TensorSpec( + "a", + [self.ROWS, self.COLS], + DataType.FP32, + init_value=lambda shape: np.random.randn(*shape), + ), + TensorSpec( + "row_vec", + [self.ROWS, 1], + DataType.FP32, + init_value=lambda shape: np.random.randn(*shape), + ), + TensorSpec("c", [self.ROWS, self.COLS], DataType.FP32, is_output=True), + ] + + def get_program(self) -> Any: + raise NotImplementedError("Subclasses must implement get_program() with their specific shape") + + def compute_expected(self, tensors, params=None): + tensors["c"][:] = tensors["a"] * tensors["row_vec"] + + +class TestTileRowExpandMul64x64(TestTileRowExpandMul): + """64x64 tile row-wise broadcast multiplication test.""" + + ROWS = 64 + COLS = 64 + + def get_program(self) -> Any: + import pypto.language as pl + + @pl.program + class TileRowExpandMulProgram: + @pl.function(type=pl.FunctionType.InCore) + def tile_row_expand_mul( + self, + a: pl.Tensor[[64, 64], pl.FP32], + row_vec: pl.Tensor[[64, 1], pl.FP32], + c: pl.Tensor[[64, 64], pl.FP32], + ) -> pl.Tensor[[64, 64], pl.FP32]: + tile_a = pl.op.block.load(a, [0, 0], [64, 64]) + tile_row = pl.op.block.load(row_vec, [0, 0], [64, 1]) + tile_c = pl.op.block.row_expand_mul(tile_a, tile_row) + out_c = pl.op.block.store(tile_c, [0, 0], [64, 64], c) + return out_c + + @pl.function(type=pl.FunctionType.Orchestration) + def orchestrator( + self, a: pl.Tensor[[64, 64], pl.FP32], row_vec: pl.Tensor[[64, 1], pl.FP32] + ) -> pl.Tensor[[64, 64], pl.FP32]: + out_c = self.tile_row_expand_mul(a, row_vec) + return out_c + + return TileRowExpandMulProgram + + +class TestTileRowExpandMul128x128(TestTileRowExpandMul): + """128x128 tile row-wise broadcast multiplication test.""" + + ROWS = 128 + COLS = 128 + + def get_program(self) -> Any: + import pypto.language as pl + + @pl.program + class TileRowExpandMulProgram: + @pl.function(type=pl.FunctionType.InCore) + def tile_row_expand_mul( + self, + a: pl.Tensor[[128, 128], pl.FP32], + row_vec: pl.Tensor[[128, 1], pl.FP32], + c: pl.Tensor[[128, 128], pl.FP32], + ) -> pl.Tensor[[128, 128], pl.FP32]: + tile_a = pl.op.block.load(a, [0, 0], [128, 128]) + tile_row = pl.op.block.load(row_vec, [0, 0], [128, 1]) + tile_c = pl.op.block.row_expand_mul(tile_a, tile_row) + out_c = pl.op.block.store(tile_c, [0, 0], [128, 128], c) + return out_c + + @pl.function(type=pl.FunctionType.Orchestration) + def orchestrator( + self, a: pl.Tensor[[128, 128], pl.FP32], row_vec: pl.Tensor[[128, 1], pl.FP32] + ) -> pl.Tensor[[128, 128], pl.FP32]: + out_c = self.tile_row_expand_mul(a, row_vec) + return out_c + + return TileRowExpandMulProgram + + +# ============================================================================= +# Row-wise broadcast division +# ============================================================================= + + +class TestTileRowExpandDiv(PTOTestCase): + """Base class for tile row-wise broadcast division tests.""" + + ROWS = 128 # Override in subclasses + COLS = 128 # Override in subclasses + + def get_name(self) -> str: + return f"tile_row_expand_div_{self.ROWS}x{self.COLS}" + + def define_tensors(self) -> List[TensorSpec]: + return [ + TensorSpec( + "a", + [self.ROWS, self.COLS], + DataType.FP32, + init_value=lambda shape: np.random.randn(*shape), + ), + TensorSpec( + "row_vec", + [self.ROWS, 1], + DataType.FP32, + init_value=lambda shape: np.random.randn(*shape) + 1.0, # Avoid division by zero + ), + TensorSpec("c", [self.ROWS, self.COLS], DataType.FP32, is_output=True), + ] + + def get_program(self) -> Any: + raise NotImplementedError("Subclasses must implement get_program() with their specific shape") + + def compute_expected(self, tensors, params=None): + tensors["c"][:] = tensors["a"] / tensors["row_vec"] + + +class TestTileRowExpandDiv64x64(TestTileRowExpandDiv): + """64x64 tile row-wise broadcast division test.""" + + ROWS = 64 + COLS = 64 + + def get_program(self) -> Any: + import pypto.language as pl + + @pl.program + class TileRowExpandDivProgram: + @pl.function(type=pl.FunctionType.InCore) + def tile_row_expand_div( + self, + a: pl.Tensor[[64, 64], pl.FP32], + row_vec: pl.Tensor[[64, 1], pl.FP32], + c: pl.Tensor[[64, 64], pl.FP32], + ) -> pl.Tensor[[64, 64], pl.FP32]: + tile_a = pl.op.block.load(a, [0, 0], [64, 64]) + tile_row = pl.op.block.load(row_vec, [0, 0], [64, 1]) + tile_c = pl.op.block.row_expand_div(tile_a, tile_row) + out_c = pl.op.block.store(tile_c, [0, 0], [64, 64], c) + return out_c + + @pl.function(type=pl.FunctionType.Orchestration) + def orchestrator( + self, a: pl.Tensor[[64, 64], pl.FP32], row_vec: pl.Tensor[[64, 1], pl.FP32] + ) -> pl.Tensor[[64, 64], pl.FP32]: + out_c = self.tile_row_expand_div(a, row_vec) + return out_c + + return TileRowExpandDivProgram + + +class TestTileRowExpandDiv128x128(TestTileRowExpandDiv): + """128x128 tile row-wise broadcast division test.""" + + ROWS = 128 + COLS = 128 + + def get_program(self) -> Any: + import pypto.language as pl + + @pl.program + class TileRowExpandDivProgram: + @pl.function(type=pl.FunctionType.InCore) + def tile_row_expand_div( + self, + a: pl.Tensor[[128, 128], pl.FP32], + row_vec: pl.Tensor[[128, 1], pl.FP32], + c: pl.Tensor[[128, 128], pl.FP32], + ) -> pl.Tensor[[128, 128], pl.FP32]: + tile_a = pl.op.block.load(a, [0, 0], [128, 128]) + tile_row = pl.op.block.load(row_vec, [0, 0], [128, 1]) + tile_c = pl.op.block.row_expand_div(tile_a, tile_row) + out_c = pl.op.block.store(tile_c, [0, 0], [128, 128], c) + return out_c + + @pl.function(type=pl.FunctionType.Orchestration) + def orchestrator( + self, a: pl.Tensor[[128, 128], pl.FP32], row_vec: pl.Tensor[[128, 1], pl.FP32] + ) -> pl.Tensor[[128, 128], pl.FP32]: + out_c = self.tile_row_expand_div(a, row_vec) + return out_c + + return TileRowExpandDivProgram + + +# ============================================================================= +# pytest test functions +# ============================================================================= + + +class TestBroadcastOperations: + """Test suite for broadcast operations.""" + + # Row-wise broadcast subtraction + def test_tile_row_expand_sub_64x64(self, test_runner): + """Test tile row-wise broadcast subtraction with 64x64 shape.""" + test_case = TestTileRowExpandSub64x64() + result = test_runner.run(test_case) + assert result.passed, f"Test failed for 64x64: {result.error}" + + def test_tile_row_expand_sub_128x128(self, test_runner): + """Test tile row-wise broadcast subtraction with 128x128 shape.""" + test_case = TestTileRowExpandSub128x128() + result = test_runner.run(test_case) + assert result.passed, f"Test failed for 128x128: {result.error}" + + # Row-wise broadcast multiplication + def test_tile_row_expand_mul_64x64(self, test_runner): + """Test tile row-wise broadcast multiplication with 64x64 shape.""" + test_case = TestTileRowExpandMul64x64() + result = test_runner.run(test_case) + assert result.passed, f"Test failed for 64x64: {result.error}" + + def test_tile_row_expand_mul_128x128(self, test_runner): + """Test tile row-wise broadcast multiplication with 128x128 shape.""" + test_case = TestTileRowExpandMul128x128() + result = test_runner.run(test_case) + assert result.passed, f"Test failed for 128x128: {result.error}" + + # Row-wise broadcast division + def test_tile_row_expand_div_64x64(self, test_runner): + """Test tile row-wise broadcast division with 64x64 shape.""" + test_case = TestTileRowExpandDiv64x64() + result = test_runner.run(test_case) + assert result.passed, f"Test failed for 64x64: {result.error}" + + def test_tile_row_expand_div_128x128(self, test_runner): + """Test tile row-wise broadcast division with 128x128 shape.""" + test_case = TestTileRowExpandDiv128x128() + result = test_runner.run(test_case) + assert result.passed, f"Test failed for 128x128: {result.error}" diff --git a/tests/test_cases/test_elementwise.py b/tests/test_cases/test_elementwise.py index e31dfc0..56a67d2 100644 --- a/tests/test_cases/test_elementwise.py +++ b/tests/test_cases/test_elementwise.py @@ -1,8 +1,12 @@ """ Tests for elementwise operations using PyPTO frontend. -Tests tile-level binary operations like add, sub, mul, div. +Tests tile-level operations including: +- Binary tile-tile operations: add, sub, mul, div, maximum +- Scalar operations: adds, subs, muls, divs + These tests use the simplified pattern where orchestration is auto-generated. +Each operation has both 64x64 and 128x128 test cases. """ import sys @@ -10,7 +14,6 @@ from typing import Any, List import numpy as np -import pytest from pto_test.core import environment from pto_test.core.test_case import DataType, PTOTestCase, TensorSpec @@ -22,53 +25,93 @@ class TestTileAdd(PTOTestCase): - """Test case for tile element-wise addition. - - This test case demonstrates the simplified pattern: - - Just implement incore function in get_program() and compute_expected() - - Orchestration function will be auto-generated + """Base class for tile element-wise addition tests. Note: PyPTO requires shape dimensions to be compile-time constants in type - annotations. The shape is fixed at 128x128 for this test case. + annotations. Each shape needs its own test class with hardcoded dimensions. + This is a limitation of PyPTO's type system. """ - ROWS = 128 - COLS = 128 - - def __init__(self, rows: int = 128, cols: int = 128, **kwargs): - super().__init__(**kwargs) - self.rows = rows - self.cols = cols + ROWS = 128 # Override in subclasses + COLS = 128 # Override in subclasses def get_name(self) -> str: - return f"tile_add_{self.rows}x{self.cols}" + return f"tile_add_{self.ROWS}x{self.COLS}" def define_tensors(self) -> List[TensorSpec]: return [ - TensorSpec("a", [self.rows, self.cols], DataType.FP32, init_value=2.0), - TensorSpec("b", [self.rows, self.cols], DataType.FP32, init_value=3.0), - TensorSpec("c", [self.rows, self.cols], DataType.FP32, is_output=True), + TensorSpec("a", [self.ROWS, self.COLS], DataType.FP32, init_value=2.0), + TensorSpec("b", [self.ROWS, self.COLS], DataType.FP32, init_value=3.0), + TensorSpec("c", [self.ROWS, self.COLS], DataType.FP32, is_output=True), ] def get_program(self) -> Any: import pypto.language as pl - # PyPTO parser requires constant shape dimensions in type annotations. - # Use literal values throughout. + # PyPTO requires compile-time constant shapes in type annotations. + # Subclasses must override this method with their specific shape. + raise NotImplementedError("Subclasses must implement get_program() with their specific shape") + + def compute_expected(self, tensors, params=None): + tensors["c"][:] = tensors["a"] + tensors["b"] + + +class TestTileAdd64x64(TestTileAdd): + """64x64 tile addition test.""" + + ROWS = 64 + COLS = 64 + + def get_program(self) -> Any: + import pypto.language as pl @pl.program class TileAddProgram: - @pl.function + @pl.function(type=pl.FunctionType.InCore) + def tile_add( + self, + a: pl.Tensor[[64, 64], pl.FP32], + b: pl.Tensor[[64, 64], pl.FP32], + c: pl.Tensor[[64, 64], pl.FP32], + ) -> pl.Tensor[[64, 64], pl.FP32]: + tile_a = pl.op.block.load(a, [0, 0], [64, 64]) + tile_b = pl.op.block.load(b, [0, 0], [64, 64]) + tile_c = pl.op.block.add(tile_a, tile_b) + out_c = pl.op.block.store(tile_c, [0, 0], [64, 64], c) + return out_c + + @pl.function(type=pl.FunctionType.Orchestration) + def orchestrator( + self, a: pl.Tensor[[64, 64], pl.FP32], b: pl.Tensor[[64, 64], pl.FP32] + ) -> pl.Tensor[[64, 64], pl.FP32]: + out_c = self.tile_add(a, b) + return out_c + + return TileAddProgram + + +class TestTileAdd128x128(TestTileAdd): + """128x128 tile addition test.""" + + ROWS = 128 + COLS = 128 + + def get_program(self) -> Any: + import pypto.language as pl + + @pl.program + class TileAddProgram: + @pl.function(type=pl.FunctionType.InCore) def tile_add( self, a: pl.Tensor[[128, 128], pl.FP32], b: pl.Tensor[[128, 128], pl.FP32], c: pl.Tensor[[128, 128], pl.FP32], ) -> pl.Tensor[[128, 128], pl.FP32]: - tile_a = pl.op.block.load(a, 0, 0, 128, 128) - tile_b = pl.op.block.load(b, 0, 0, 128, 128) + tile_a = pl.op.block.load(a, [0, 0], [128, 128]) + tile_b = pl.op.block.load(b, [0, 0], [128, 128]) tile_c = pl.op.block.add(tile_a, tile_b) - out_c = pl.op.block.store(tile_c, 0, 0, 128, 128, c) + out_c = pl.op.block.store(tile_c, [0, 0], [128, 128], c) return out_c @pl.function(type=pl.FunctionType.Orchestration) @@ -80,56 +123,98 @@ def orchestrator( return TileAddProgram - def compute_expected(self, tensors, params=None): - tensors["c"][:] = tensors["a"] + tensors["b"] - class TestTileMul(PTOTestCase): - """Test case for tile element-wise multiplication.""" + """Base class for tile element-wise multiplication tests.""" - def __init__(self, rows: int = 128, cols: int = 128, **kwargs): - super().__init__(**kwargs) - self.rows = rows - self.cols = cols + ROWS = 128 # Override in subclasses + COLS = 128 # Override in subclasses def get_name(self) -> str: - return f"tile_mul_{self.rows}x{self.cols}" + return f"tile_mul_{self.ROWS}x{self.COLS}" def define_tensors(self) -> List[TensorSpec]: return [ # Method 1: Use Callable to generate random data (different on each run) TensorSpec( "a", - [self.rows, self.cols], + [self.ROWS, self.COLS], DataType.FP32, init_value=lambda shape: np.random.randn(*shape), ), # Method 2: Use scalar value (recommended - simple and serializable) - TensorSpec("b", [self.rows, self.cols], DataType.FP32, init_value=3.0), + TensorSpec("b", [self.ROWS, self.COLS], DataType.FP32, init_value=3.0), # For other methods, see TestCustomArrayInit class examples: # - Small arrays can use np.array([[...]]) # - Identity matrix: np.eye(n) # - Diagonal matrix: np.diag([...]) # Output tensor: automatically zero-initialized - TensorSpec("c", [self.rows, self.cols], DataType.FP32, is_output=True), + TensorSpec("c", [self.ROWS, self.COLS], DataType.FP32, is_output=True), ] + def get_program(self) -> Any: + raise NotImplementedError("Subclasses must implement get_program() with their specific shape") + + def compute_expected(self, tensors, params=None): + tensors["c"][:] = tensors["a"] * tensors["b"] + + +class TestTileMul64x64(TestTileMul): + """64x64 tile multiplication test.""" + + ROWS = 64 + COLS = 64 + def get_program(self) -> Any: import pypto.language as pl @pl.program class TileMulProgram: - @pl.function + @pl.function(type=pl.FunctionType.InCore) + def tile_mul( + self, + a: pl.Tensor[[64, 64], pl.FP32], + b: pl.Tensor[[64, 64], pl.FP32], + c: pl.Tensor[[64, 64], pl.FP32], + ) -> pl.Tensor[[64, 64], pl.FP32]: + tile_a = pl.op.block.load(a, [0, 0], [64, 64]) + tile_b = pl.op.block.load(b, [0, 0], [64, 64]) + tile_c = pl.op.block.mul(tile_a, tile_b) + out_c = pl.op.block.store(tile_c, [0, 0], [64, 64], c) + return out_c + + @pl.function(type=pl.FunctionType.Orchestration) + def orchestrator( + self, a: pl.Tensor[[64, 64], pl.FP32], b: pl.Tensor[[64, 64], pl.FP32] + ) -> pl.Tensor[[64, 64], pl.FP32]: + out_c = self.tile_mul(a, b) + return out_c + + return TileMulProgram + + +class TestTileMul128x128(TestTileMul): + """128x128 tile multiplication test.""" + + ROWS = 128 + COLS = 128 + + def get_program(self) -> Any: + import pypto.language as pl + + @pl.program + class TileMulProgram: + @pl.function(type=pl.FunctionType.InCore) def tile_mul( self, a: pl.Tensor[[128, 128], pl.FP32], b: pl.Tensor[[128, 128], pl.FP32], c: pl.Tensor[[128, 128], pl.FP32], ) -> pl.Tensor[[128, 128], pl.FP32]: - tile_a = pl.op.block.load(a, 0, 0, 128, 128) - tile_b = pl.op.block.load(b, 0, 0, 128, 128) + tile_a = pl.op.block.load(a, [0, 0], [128, 128]) + tile_b = pl.op.block.load(b, [0, 0], [128, 128]) tile_c = pl.op.block.mul(tile_a, tile_b) - out_c = pl.op.block.store(tile_c, 0, 0, 128, 128, c) + out_c = pl.op.block.store(tile_c, [0, 0], [128, 128], c) return out_c @pl.function(type=pl.FunctionType.Orchestration) @@ -141,11 +226,624 @@ def orchestrator( return TileMulProgram + +class TestTileSub(PTOTestCase): + """Base class for tile element-wise subtraction tests.""" + + ROWS = 128 # Override in subclasses + COLS = 128 # Override in subclasses + + def get_name(self) -> str: + return f"tile_sub_{self.ROWS}x{self.COLS}" + + def define_tensors(self) -> List[TensorSpec]: + return [ + TensorSpec("a", [self.ROWS, self.COLS], DataType.FP32, init_value=5.0), + TensorSpec("b", [self.ROWS, self.COLS], DataType.FP32, init_value=2.0), + TensorSpec("c", [self.ROWS, self.COLS], DataType.FP32, is_output=True), + ] + + def get_program(self) -> Any: + raise NotImplementedError("Subclasses must implement get_program() with their specific shape") + def compute_expected(self, tensors, params=None): - tensors["c"][:] = tensors["a"] * tensors["b"] + tensors["c"][:] = tensors["a"] - tensors["b"] + + +class TestTileSub64x64(TestTileSub): + """64x64 tile subtraction test.""" + + ROWS = 64 + COLS = 64 + + def get_program(self) -> Any: + import pypto.language as pl + + @pl.program + class TileSubProgram: + @pl.function(type=pl.FunctionType.InCore) + def tile_sub( + self, + a: pl.Tensor[[64, 64], pl.FP32], + b: pl.Tensor[[64, 64], pl.FP32], + c: pl.Tensor[[64, 64], pl.FP32], + ) -> pl.Tensor[[64, 64], pl.FP32]: + tile_a = pl.op.block.load(a, [0, 0], [64, 64]) + tile_b = pl.op.block.load(b, [0, 0], [64, 64]) + tile_c = pl.op.block.sub(tile_a, tile_b) + out_c = pl.op.block.store(tile_c, [0, 0], [64, 64], c) + return out_c + + @pl.function(type=pl.FunctionType.Orchestration) + def orchestrator( + self, a: pl.Tensor[[64, 64], pl.FP32], b: pl.Tensor[[64, 64], pl.FP32] + ) -> pl.Tensor[[64, 64], pl.FP32]: + out_c = self.tile_sub(a, b) + return out_c + + return TileSubProgram + + +class TestTileSub128x128(TestTileSub): + """128x128 tile subtraction test.""" + + ROWS = 128 + COLS = 128 + + def get_program(self) -> Any: + import pypto.language as pl + + @pl.program + class TileSubProgram: + @pl.function(type=pl.FunctionType.InCore) + def tile_sub( + self, + a: pl.Tensor[[128, 128], pl.FP32], + b: pl.Tensor[[128, 128], pl.FP32], + c: pl.Tensor[[128, 128], pl.FP32], + ) -> pl.Tensor[[128, 128], pl.FP32]: + tile_a = pl.op.block.load(a, [0, 0], [128, 128]) + tile_b = pl.op.block.load(b, [0, 0], [128, 128]) + tile_c = pl.op.block.sub(tile_a, tile_b) + out_c = pl.op.block.store(tile_c, [0, 0], [128, 128], c) + return out_c + + @pl.function(type=pl.FunctionType.Orchestration) + def orchestrator( + self, a: pl.Tensor[[128, 128], pl.FP32], b: pl.Tensor[[128, 128], pl.FP32] + ) -> pl.Tensor[[128, 128], pl.FP32]: + out_c = self.tile_sub(a, b) + return out_c + + return TileSubProgram + + +class TestTileDiv(PTOTestCase): + """Base class for tile element-wise division tests.""" + + ROWS = 128 # Override in subclasses + COLS = 128 # Override in subclasses + + def get_name(self) -> str: + return f"tile_div_{self.ROWS}x{self.COLS}" + + def define_tensors(self) -> List[TensorSpec]: + return [ + TensorSpec("a", [self.ROWS, self.COLS], DataType.FP32, init_value=6.0), + TensorSpec("b", [self.ROWS, self.COLS], DataType.FP32, init_value=2.0), + TensorSpec("c", [self.ROWS, self.COLS], DataType.FP32, is_output=True), + ] + + def get_program(self) -> Any: + raise NotImplementedError("Subclasses must implement get_program() with their specific shape") + + def compute_expected(self, tensors, params=None): + tensors["c"][:] = tensors["a"] / tensors["b"] + + +class TestTileDiv64x64(TestTileDiv): + """64x64 tile division test.""" + + ROWS = 64 + COLS = 64 + + def get_program(self) -> Any: + import pypto.language as pl + + @pl.program + class TileDivProgram: + @pl.function(type=pl.FunctionType.InCore) + def tile_div( + self, + a: pl.Tensor[[64, 64], pl.FP32], + b: pl.Tensor[[64, 64], pl.FP32], + c: pl.Tensor[[64, 64], pl.FP32], + ) -> pl.Tensor[[64, 64], pl.FP32]: + tile_a = pl.op.block.load(a, [0, 0], [64, 64]) + tile_b = pl.op.block.load(b, [0, 0], [64, 64]) + tile_c = pl.op.block.div(tile_a, tile_b) + out_c = pl.op.block.store(tile_c, [0, 0], [64, 64], c) + return out_c + + @pl.function(type=pl.FunctionType.Orchestration) + def orchestrator( + self, a: pl.Tensor[[64, 64], pl.FP32], b: pl.Tensor[[64, 64], pl.FP32] + ) -> pl.Tensor[[64, 64], pl.FP32]: + out_c = self.tile_div(a, b) + return out_c + + return TileDivProgram + + +class TestTileDiv128x128(TestTileDiv): + """128x128 tile division test.""" + + ROWS = 128 + COLS = 128 + + def get_program(self) -> Any: + import pypto.language as pl + + @pl.program + class TileDivProgram: + @pl.function(type=pl.FunctionType.InCore) + def tile_div( + self, + a: pl.Tensor[[128, 128], pl.FP32], + b: pl.Tensor[[128, 128], pl.FP32], + c: pl.Tensor[[128, 128], pl.FP32], + ) -> pl.Tensor[[128, 128], pl.FP32]: + tile_a = pl.op.block.load(a, [0, 0], [128, 128]) + tile_b = pl.op.block.load(b, [0, 0], [128, 128]) + tile_c = pl.op.block.div(tile_a, tile_b) + out_c = pl.op.block.store(tile_c, [0, 0], [128, 128], c) + return out_c + + @pl.function(type=pl.FunctionType.Orchestration) + def orchestrator( + self, a: pl.Tensor[[128, 128], pl.FP32], b: pl.Tensor[[128, 128], pl.FP32] + ) -> pl.Tensor[[128, 128], pl.FP32]: + out_c = self.tile_div(a, b) + return out_c + + return TileDivProgram + + +class TestTileMaximum(PTOTestCase): + """Base class for tile element-wise maximum tests.""" + + ROWS = 128 # Override in subclasses + COLS = 128 # Override in subclasses + + def get_name(self) -> str: + return f"tile_maximum_{self.ROWS}x{self.COLS}" + + def define_tensors(self) -> List[TensorSpec]: + return [ + TensorSpec( + "a", + [self.ROWS, self.COLS], + DataType.FP32, + init_value=lambda shape: np.random.randn(*shape), + ), + TensorSpec( + "b", + [self.ROWS, self.COLS], + DataType.FP32, + init_value=lambda shape: np.random.randn(*shape), + ), + TensorSpec("c", [self.ROWS, self.COLS], DataType.FP32, is_output=True), + ] + + def get_program(self) -> Any: + raise NotImplementedError("Subclasses must implement get_program() with their specific shape") + + def compute_expected(self, tensors, params=None): + tensors["c"][:] = np.maximum(tensors["a"], tensors["b"]) + + +class TestTileMaximum64x64(TestTileMaximum): + """64x64 tile maximum test.""" + + ROWS = 64 + COLS = 64 + + def get_program(self) -> Any: + import pypto.language as pl + + @pl.program + class TileMaximumProgram: + @pl.function(type=pl.FunctionType.InCore) + def tile_maximum( + self, + a: pl.Tensor[[64, 64], pl.FP32], + b: pl.Tensor[[64, 64], pl.FP32], + c: pl.Tensor[[64, 64], pl.FP32], + ) -> pl.Tensor[[64, 64], pl.FP32]: + tile_a = pl.op.block.load(a, [0, 0], [64, 64]) + tile_b = pl.op.block.load(b, [0, 0], [64, 64]) + tile_c = pl.op.block.maximum(tile_a, tile_b) + out_c = pl.op.block.store(tile_c, [0, 0], [64, 64], c) + return out_c + + @pl.function(type=pl.FunctionType.Orchestration) + def orchestrator( + self, a: pl.Tensor[[64, 64], pl.FP32], b: pl.Tensor[[64, 64], pl.FP32] + ) -> pl.Tensor[[64, 64], pl.FP32]: + out_c = self.tile_maximum(a, b) + return out_c + + return TileMaximumProgram + + +class TestTileMaximum128x128(TestTileMaximum): + """128x128 tile maximum test.""" + + ROWS = 128 + COLS = 128 + + def get_program(self) -> Any: + import pypto.language as pl + + @pl.program + class TileMaximumProgram: + @pl.function(type=pl.FunctionType.InCore) + def tile_maximum( + self, + a: pl.Tensor[[128, 128], pl.FP32], + b: pl.Tensor[[128, 128], pl.FP32], + c: pl.Tensor[[128, 128], pl.FP32], + ) -> pl.Tensor[[128, 128], pl.FP32]: + tile_a = pl.op.block.load(a, [0, 0], [128, 128]) + tile_b = pl.op.block.load(b, [0, 0], [128, 128]) + tile_c = pl.op.block.maximum(tile_a, tile_b) + out_c = pl.op.block.store(tile_c, [0, 0], [128, 128], c) + return out_c + + @pl.function(type=pl.FunctionType.Orchestration) + def orchestrator( + self, a: pl.Tensor[[128, 128], pl.FP32], b: pl.Tensor[[128, 128], pl.FP32] + ) -> pl.Tensor[[128, 128], pl.FP32]: + out_c = self.tile_maximum(a, b) + return out_c + + return TileMaximumProgram + + +# ============================================================================= +# Scalar operations +# ============================================================================= + + +class TestTileAdds(PTOTestCase): + """Base class for tile-scalar addition tests.""" + + ROWS = 128 # Override in subclasses + COLS = 128 # Override in subclasses + + def get_name(self) -> str: + return f"tile_adds_{self.ROWS}x{self.COLS}" + + def define_tensors(self) -> List[TensorSpec]: + return [ + TensorSpec("a", [self.ROWS, self.COLS], DataType.FP32, init_value=3.0), + TensorSpec("c", [self.ROWS, self.COLS], DataType.FP32, is_output=True), + ] + + def get_program(self) -> Any: + raise NotImplementedError("Subclasses must implement get_program() with their specific shape") + + def compute_expected(self, tensors, params=None): + tensors["c"][:] = tensors["a"] + 2.0 + + +class TestTileAdds64x64(TestTileAdds): + """64x64 tile-scalar addition test.""" + + ROWS = 64 + COLS = 64 + + def get_program(self) -> Any: + import pypto.language as pl + + @pl.program + class TileAddsProgram: + @pl.function(type=pl.FunctionType.InCore) + def tile_adds( + self, + a: pl.Tensor[[64, 64], pl.FP32], + c: pl.Tensor[[64, 64], pl.FP32], + ) -> pl.Tensor[[64, 64], pl.FP32]: + tile_a = pl.op.block.load(a, [0, 0], [64, 64]) + tile_c = pl.op.block.adds(tile_a, 2.0) + out_c = pl.op.block.store(tile_c, [0, 0], [64, 64], c) + return out_c + + @pl.function(type=pl.FunctionType.Orchestration) + def orchestrator(self, a: pl.Tensor[[64, 64], pl.FP32]) -> pl.Tensor[[64, 64], pl.FP32]: + out_c = self.tile_adds(a) + return out_c + + return TileAddsProgram + + +class TestTileAdds128x128(TestTileAdds): + """128x128 tile-scalar addition test.""" + + ROWS = 128 + COLS = 128 + + def get_program(self) -> Any: + import pypto.language as pl + + @pl.program + class TileAddsProgram: + @pl.function(type=pl.FunctionType.InCore) + def tile_adds( + self, + a: pl.Tensor[[128, 128], pl.FP32], + c: pl.Tensor[[128, 128], pl.FP32], + ) -> pl.Tensor[[128, 128], pl.FP32]: + tile_a = pl.op.block.load(a, [0, 0], [128, 128]) + tile_c = pl.op.block.adds(tile_a, 2.0) + out_c = pl.op.block.store(tile_c, [0, 0], [128, 128], c) + return out_c + + @pl.function(type=pl.FunctionType.Orchestration) + def orchestrator(self, a: pl.Tensor[[128, 128], pl.FP32]) -> pl.Tensor[[128, 128], pl.FP32]: + out_c = self.tile_adds(a) + return out_c + + return TileAddsProgram + + +class TestTileSubs(PTOTestCase): + """Base class for tile-scalar subtraction tests.""" + ROWS = 128 # Override in subclasses + COLS = 128 # Override in subclasses + + def get_name(self) -> str: + return f"tile_subs_{self.ROWS}x{self.COLS}" + + def define_tensors(self) -> List[TensorSpec]: + return [ + TensorSpec("a", [self.ROWS, self.COLS], DataType.FP32, init_value=5.0), + TensorSpec("c", [self.ROWS, self.COLS], DataType.FP32, is_output=True), + ] + + def get_program(self) -> Any: + raise NotImplementedError("Subclasses must implement get_program() with their specific shape") + + def compute_expected(self, tensors, params=None): + tensors["c"][:] = tensors["a"] - 2.0 + + +class TestTileSubs64x64(TestTileSubs): + """64x64 tile-scalar subtraction test.""" + + ROWS = 64 + COLS = 64 + + def get_program(self) -> Any: + import pypto.language as pl + + @pl.program + class TileSubsProgram: + @pl.function(type=pl.FunctionType.InCore) + def tile_subs( + self, + a: pl.Tensor[[64, 64], pl.FP32], + c: pl.Tensor[[64, 64], pl.FP32], + ) -> pl.Tensor[[64, 64], pl.FP32]: + tile_a = pl.op.block.load(a, [0, 0], [64, 64]) + tile_c = pl.op.block.subs(tile_a, 2.0) + out_c = pl.op.block.store(tile_c, [0, 0], [64, 64], c) + return out_c + + @pl.function(type=pl.FunctionType.Orchestration) + def orchestrator(self, a: pl.Tensor[[64, 64], pl.FP32]) -> pl.Tensor[[64, 64], pl.FP32]: + out_c = self.tile_subs(a) + return out_c + + return TileSubsProgram + + +class TestTileSubs128x128(TestTileSubs): + """128x128 tile-scalar subtraction test.""" + + ROWS = 128 + COLS = 128 + + def get_program(self) -> Any: + import pypto.language as pl + + @pl.program + class TileSubsProgram: + @pl.function(type=pl.FunctionType.InCore) + def tile_subs( + self, + a: pl.Tensor[[128, 128], pl.FP32], + c: pl.Tensor[[128, 128], pl.FP32], + ) -> pl.Tensor[[128, 128], pl.FP32]: + tile_a = pl.op.block.load(a, [0, 0], [128, 128]) + tile_c = pl.op.block.subs(tile_a, 2.0) + out_c = pl.op.block.store(tile_c, [0, 0], [128, 128], c) + return out_c + + @pl.function(type=pl.FunctionType.Orchestration) + def orchestrator(self, a: pl.Tensor[[128, 128], pl.FP32]) -> pl.Tensor[[128, 128], pl.FP32]: + out_c = self.tile_subs(a) + return out_c + + return TileSubsProgram + + +class TestTileMuls(PTOTestCase): + """Base class for tile-scalar multiplication tests.""" + + ROWS = 128 # Override in subclasses + COLS = 128 # Override in subclasses + + def get_name(self) -> str: + return f"tile_muls_{self.ROWS}x{self.COLS}" + + def define_tensors(self) -> List[TensorSpec]: + return [ + TensorSpec("a", [self.ROWS, self.COLS], DataType.FP32, init_value=3.0), + TensorSpec("c", [self.ROWS, self.COLS], DataType.FP32, is_output=True), + ] + + def get_program(self) -> Any: + raise NotImplementedError("Subclasses must implement get_program() with their specific shape") + + def compute_expected(self, tensors, params=None): + tensors["c"][:] = tensors["a"] * 2.0 -class TestTileAddWithPTOAS(TestTileAdd): + +class TestTileMuls64x64(TestTileMuls): + """64x64 tile-scalar multiplication test.""" + + ROWS = 64 + COLS = 64 + + def get_program(self) -> Any: + import pypto.language as pl + + @pl.program + class TileMulsProgram: + @pl.function(type=pl.FunctionType.InCore) + def tile_muls( + self, + a: pl.Tensor[[64, 64], pl.FP32], + c: pl.Tensor[[64, 64], pl.FP32], + ) -> pl.Tensor[[64, 64], pl.FP32]: + tile_a = pl.op.block.load(a, [0, 0], [64, 64]) + tile_c = pl.op.block.muls(tile_a, 2.0) + out_c = pl.op.block.store(tile_c, [0, 0], [64, 64], c) + return out_c + + @pl.function(type=pl.FunctionType.Orchestration) + def orchestrator(self, a: pl.Tensor[[64, 64], pl.FP32]) -> pl.Tensor[[64, 64], pl.FP32]: + out_c = self.tile_muls(a) + return out_c + + return TileMulsProgram + + +class TestTileMuls128x128(TestTileMuls): + """128x128 tile-scalar multiplication test.""" + + ROWS = 128 + COLS = 128 + + def get_program(self) -> Any: + import pypto.language as pl + + @pl.program + class TileMulsProgram: + @pl.function(type=pl.FunctionType.InCore) + def tile_muls( + self, + a: pl.Tensor[[128, 128], pl.FP32], + c: pl.Tensor[[128, 128], pl.FP32], + ) -> pl.Tensor[[128, 128], pl.FP32]: + tile_a = pl.op.block.load(a, [0, 0], [128, 128]) + tile_c = pl.op.block.muls(tile_a, 2.0) + out_c = pl.op.block.store(tile_c, [0, 0], [128, 128], c) + return out_c + + @pl.function(type=pl.FunctionType.Orchestration) + def orchestrator(self, a: pl.Tensor[[128, 128], pl.FP32]) -> pl.Tensor[[128, 128], pl.FP32]: + out_c = self.tile_muls(a) + return out_c + + return TileMulsProgram + + +class TestTileDivs(PTOTestCase): + """Base class for tile-scalar division tests.""" + + ROWS = 128 # Override in subclasses + COLS = 128 # Override in subclasses + + def get_name(self) -> str: + return f"tile_divs_{self.ROWS}x{self.COLS}" + + def define_tensors(self) -> List[TensorSpec]: + return [ + TensorSpec("a", [self.ROWS, self.COLS], DataType.FP32, init_value=6.0), + TensorSpec("c", [self.ROWS, self.COLS], DataType.FP32, is_output=True), + ] + + def get_program(self) -> Any: + raise NotImplementedError("Subclasses must implement get_program() with their specific shape") + + def compute_expected(self, tensors, params=None): + tensors["c"][:] = tensors["a"] / 2.0 + + +class TestTileDivs64x64(TestTileDivs): + """64x64 tile-scalar division test.""" + + ROWS = 64 + COLS = 64 + + def get_program(self) -> Any: + import pypto.language as pl + + @pl.program + class TileDivsProgram: + @pl.function(type=pl.FunctionType.InCore) + def tile_divs( + self, + a: pl.Tensor[[64, 64], pl.FP32], + c: pl.Tensor[[64, 64], pl.FP32], + ) -> pl.Tensor[[64, 64], pl.FP32]: + tile_a = pl.op.block.load(a, [0, 0], [64, 64]) + tile_c = pl.op.block.divs(tile_a, 2.0) + out_c = pl.op.block.store(tile_c, [0, 0], [64, 64], c) + return out_c + + @pl.function(type=pl.FunctionType.Orchestration) + def orchestrator(self, a: pl.Tensor[[64, 64], pl.FP32]) -> pl.Tensor[[64, 64], pl.FP32]: + out_c = self.tile_divs(a) + return out_c + + return TileDivsProgram + + +class TestTileDivs128x128(TestTileDivs): + """128x128 tile-scalar division test.""" + + ROWS = 128 + COLS = 128 + + def get_program(self) -> Any: + import pypto.language as pl + + @pl.program + class TileDivsProgram: + @pl.function(type=pl.FunctionType.InCore) + def tile_divs( + self, + a: pl.Tensor[[128, 128], pl.FP32], + c: pl.Tensor[[128, 128], pl.FP32], + ) -> pl.Tensor[[128, 128], pl.FP32]: + tile_a = pl.op.block.load(a, [0, 0], [128, 128]) + tile_c = pl.op.block.divs(tile_a, 2.0) + out_c = pl.op.block.store(tile_c, [0, 0], [128, 128], c) + return out_c + + @pl.function(type=pl.FunctionType.Orchestration) + def orchestrator(self, a: pl.Tensor[[128, 128], pl.FP32]) -> pl.Tensor[[128, 128], pl.FP32]: + out_c = self.tile_divs(a) + return out_c + + return TileDivsProgram + + +class TestTileAddWithPTOAS(TestTileAdd128x128): """Test tile add with PTOAS optimization strategy. This demonstrates how to use a custom optimization strategy. @@ -157,7 +855,7 @@ def get_strategy(self): return OptimizationStrategy.PTOAS def get_name(self) -> str: - return f"tile_add_ptoas_{self.rows}x{self.cols}" + return f"tile_add_ptoas_{self.ROWS}x{self.COLS}" class TestCustomArrayInit(PTOTestCase): @@ -204,22 +902,119 @@ def compute_expected(self, tensors, params=None): class TestElementwiseOperations: """Test suite for elementwise operations.""" - @pytest.mark.parametrize("rows,cols", [(64, 64), (128, 128)]) - def test_tile_add_shapes(self, test_runner, rows, cols): - """Test tile addition with various shapes.""" - test_case = TestTileAdd(rows=rows, cols=cols) + # Binary tile-tile operations + def test_tile_add_64x64(self, test_runner): + """Test tile addition with 64x64 shape.""" + test_case = TestTileAdd64x64() + result = test_runner.run(test_case) + assert result.passed, f"Test failed for 64x64: {result.error}" + + def test_tile_add_128x128(self, test_runner): + """Test tile addition with 128x128 shape.""" + test_case = TestTileAdd128x128() + result = test_runner.run(test_case) + assert result.passed, f"Test failed for 128x128: {result.error}" + + def test_tile_sub_64x64(self, test_runner): + """Test tile subtraction with 64x64 shape.""" + test_case = TestTileSub64x64() + result = test_runner.run(test_case) + assert result.passed, f"Test failed for 64x64: {result.error}" + + def test_tile_sub_128x128(self, test_runner): + """Test tile subtraction with 128x128 shape.""" + test_case = TestTileSub128x128() + result = test_runner.run(test_case) + assert result.passed, f"Test failed for 128x128: {result.error}" + + def test_tile_mul_64x64(self, test_runner): + """Test tile multiplication with 64x64 shape.""" + test_case = TestTileMul64x64() + result = test_runner.run(test_case) + assert result.passed, f"Test failed for 64x64: {result.error}" + + def test_tile_mul_128x128(self, test_runner): + """Test tile multiplication with 128x128 shape.""" + test_case = TestTileMul128x128() + result = test_runner.run(test_case) + assert result.passed, f"Test failed for 128x128: {result.error}" + + def test_tile_div_64x64(self, test_runner): + """Test tile division with 64x64 shape.""" + test_case = TestTileDiv64x64() + result = test_runner.run(test_case) + assert result.passed, f"Test failed for 64x64: {result.error}" + + def test_tile_div_128x128(self, test_runner): + """Test tile division with 128x128 shape.""" + test_case = TestTileDiv128x128() + result = test_runner.run(test_case) + assert result.passed, f"Test failed for 128x128: {result.error}" + + def test_tile_maximum_64x64(self, test_runner): + """Test tile element-wise maximum with 64x64 shape.""" + test_case = TestTileMaximum64x64() + result = test_runner.run(test_case) + assert result.passed, f"Test failed for 64x64: {result.error}" + + def test_tile_maximum_128x128(self, test_runner): + """Test tile element-wise maximum with 128x128 shape.""" + test_case = TestTileMaximum128x128() + result = test_runner.run(test_case) + assert result.passed, f"Test failed for 128x128: {result.error}" + + # Scalar operations + def test_tile_adds_64x64(self, test_runner): + """Test tile-scalar addition with 64x64 shape.""" + test_case = TestTileAdds64x64() + result = test_runner.run(test_case) + assert result.passed, f"Test failed for 64x64: {result.error}" + + def test_tile_adds_128x128(self, test_runner): + """Test tile-scalar addition with 128x128 shape.""" + test_case = TestTileAdds128x128() + result = test_runner.run(test_case) + assert result.passed, f"Test failed for 128x128: {result.error}" + + def test_tile_subs_64x64(self, test_runner): + """Test tile-scalar subtraction with 64x64 shape.""" + test_case = TestTileSubs64x64() + result = test_runner.run(test_case) + assert result.passed, f"Test failed for 64x64: {result.error}" + + def test_tile_subs_128x128(self, test_runner): + """Test tile-scalar subtraction with 128x128 shape.""" + test_case = TestTileSubs128x128() + result = test_runner.run(test_case) + assert result.passed, f"Test failed for 128x128: {result.error}" + + def test_tile_muls_64x64(self, test_runner): + """Test tile-scalar multiplication with 64x64 shape.""" + test_case = TestTileMuls64x64() + result = test_runner.run(test_case) + assert result.passed, f"Test failed for 64x64: {result.error}" + + def test_tile_muls_128x128(self, test_runner): + """Test tile-scalar multiplication with 128x128 shape.""" + test_case = TestTileMuls128x128() + result = test_runner.run(test_case) + assert result.passed, f"Test failed for 128x128: {result.error}" + + def test_tile_divs_64x64(self, test_runner): + """Test tile-scalar division with 64x64 shape.""" + test_case = TestTileDivs64x64() result = test_runner.run(test_case) - assert result.passed, f"Test failed for {rows}x{cols}: {result.error}" + assert result.passed, f"Test failed for 64x64: {result.error}" - @pytest.mark.parametrize("rows,cols", [(64, 64), (128, 128)]) - def test_tile_mul_shapes(self, test_runner, rows, cols): - """Test tile multiplication with various shapes.""" - test_case = TestTileMul(rows=rows, cols=cols) + def test_tile_divs_128x128(self, test_runner): + """Test tile-scalar division with 128x128 shape.""" + test_case = TestTileDivs128x128() result = test_runner.run(test_case) - assert result.passed, f"Test failed for {rows}x{cols}: {result.error}" + assert result.passed, f"Test failed for 128x128: {result.error}" + # Optimization strategy test def test_tile_add_ptoas_strategy(self, test_runner): """Test tile addition with PTOAS optimization strategy.""" - test_case = TestTileAddWithPTOAS(rows=128, cols=128) + test_case = TestTileAddWithPTOAS() result = test_runner.run(test_case) assert result.passed, f"Test failed: {result.error}" diff --git a/tests/test_cases/test_matmul.py b/tests/test_cases/test_matmul.py index eedc115..9508061 100644 --- a/tests/test_cases/test_matmul.py +++ b/tests/test_cases/test_matmul.py @@ -42,13 +42,13 @@ def matmul( b: pl.Tensor[[64, 64], pl.FP32], c: pl.Tensor[[64, 64], pl.FP32], ) -> pl.Tensor[[64, 64], pl.FP32]: - tile_a_l1 = pl.op.block.load(a, 0, 0, 64, 64, target_memory=2) - tile_b_l1 = pl.op.block.load(b, 0, 0, 64, 64, target_memory=2) + tile_a_l1 = pl.op.block.load(a, [0, 0], [64, 64], target_memory=2) + tile_b_l1 = pl.op.block.load(b, [0, 0], [64, 64], target_memory=2) tile_a_l0a = pl.op.block.move(tile_a_l1, target_memory=3) tile_b_l0b = pl.op.block.move(tile_b_l1, target_memory=4) tile_c_l0c = pl.op.block.matmul(tile_a_l0a, tile_b_l0b) # store can support l0c -> GM directly - out_c = pl.op.block.l0c_store(tile_c_l0c, 0, 0, 64, 64, c) + out_c = pl.op.block.l0c_store(tile_c_l0c, [0, 0], [64, 64], c) return out_c @pl.function(type=pl.FunctionType.Orchestration) diff --git a/tests/test_cases/test_memory.py b/tests/test_cases/test_memory.py new file mode 100644 index 0000000..68f716a --- /dev/null +++ b/tests/test_cases/test_memory.py @@ -0,0 +1,237 @@ +""" +Tests for memory operations using PyPTO frontend. + +Tests tile-level memory operations: +- load + store: Basic memory copy +- full: Create constant-filled tiles + +These tests use the simplified pattern where orchestration is auto-generated. +Each operation has both 64x64 and 128x128 test cases. + +Note: Operations like get_block_idx, create_tile, alloc, and move are auxiliary +operations that are already used in other tests (e.g., reduction tests use create_tile, +matmul tests use move for L0A/L0B transfers). +""" + +import sys +from typing import Any, List + +import numpy as np + +from pto_test.core import environment +from pto_test.core.test_case import DataType, PTOTestCase, TensorSpec + +# Add pypto to path +_PYPTO_PYTHON = environment.get_pypto_python_path() +if _PYPTO_PYTHON is not None and _PYPTO_PYTHON.exists() and str(_PYPTO_PYTHON) not in sys.path: + sys.path.insert(0, str(_PYPTO_PYTHON)) + + +# ============================================================================= +# Load + Store: Basic memory copy +# ============================================================================= + + +class TestTileLoadStore(PTOTestCase): + """Base class for tile load + store tests (memory copy).""" + + ROWS = 128 # Override in subclasses + COLS = 128 # Override in subclasses + + def get_name(self) -> str: + return f"tile_load_store_{self.ROWS}x{self.COLS}" + + def define_tensors(self) -> List[TensorSpec]: + return [ + TensorSpec( + "a", + [self.ROWS, self.COLS], + DataType.FP32, + init_value=lambda shape: np.random.randn(*shape), + ), + TensorSpec("c", [self.ROWS, self.COLS], DataType.FP32, is_output=True), + ] + + def get_program(self) -> Any: + raise NotImplementedError("Subclasses must implement get_program() with their specific shape") + + def compute_expected(self, tensors, params=None): + del params # Unused + tensors["c"][:] = tensors["a"] + + +class TestTileLoadStore64x64(TestTileLoadStore): + """64x64 tile load + store test.""" + + ROWS = 64 + COLS = 64 + + def get_program(self) -> Any: + import pypto.language as pl + + @pl.program + class TileLoadStoreProgram: + @pl.function(type=pl.FunctionType.InCore) + def tile_load_store( + self, + a: pl.Tensor[[64, 64], pl.FP32], + c: pl.Tensor[[64, 64], pl.FP32], + ) -> pl.Tensor[[64, 64], pl.FP32]: + tile_a = pl.op.block.load(a, [0, 0], [64, 64]) + out_c = pl.op.block.store(tile_a, [0, 0], [64, 64], c) + return out_c + + @pl.function(type=pl.FunctionType.Orchestration) + def orchestrator(self, a: pl.Tensor[[64, 64], pl.FP32]) -> pl.Tensor[[64, 64], pl.FP32]: + out_c = self.tile_load_store(a) + return out_c + + return TileLoadStoreProgram + + +class TestTileLoadStore128x128(TestTileLoadStore): + """128x128 tile load + store test.""" + + ROWS = 128 + COLS = 128 + + def get_program(self) -> Any: + import pypto.language as pl + + @pl.program + class TileLoadStoreProgram: + @pl.function(type=pl.FunctionType.InCore) + def tile_load_store( + self, + a: pl.Tensor[[128, 128], pl.FP32], + c: pl.Tensor[[128, 128], pl.FP32], + ) -> pl.Tensor[[128, 128], pl.FP32]: + tile_a = pl.op.block.load(a, [0, 0], [128, 128]) + out_c = pl.op.block.store(tile_a, [0, 0], [128, 128], c) + return out_c + + @pl.function(type=pl.FunctionType.Orchestration) + def orchestrator(self, a: pl.Tensor[[128, 128], pl.FP32]) -> pl.Tensor[[128, 128], pl.FP32]: + out_c = self.tile_load_store(a) + return out_c + + return TileLoadStoreProgram + + +# ============================================================================= +# Full: Create constant-filled tiles +# ============================================================================= + + +class TestTileFull(PTOTestCase): + """Base class for tile full tests (constant initialization).""" + + ROWS = 128 # Override in subclasses + COLS = 128 # Override in subclasses + FILL_VALUE = 3.14 # Constant value to fill + + def get_name(self) -> str: + return f"tile_full_{self.ROWS}x{self.COLS}" + + def define_tensors(self) -> List[TensorSpec]: + return [ + TensorSpec("c", [self.ROWS, self.COLS], DataType.FP32, is_output=True), + ] + + def get_program(self) -> Any: + raise NotImplementedError("Subclasses must implement get_program() with their specific shape") + + def compute_expected(self, tensors, params=None): + del params # Unused + tensors["c"][:] = 3.14 # Constant fill value + + +class TestTileFull64x64(TestTileFull): + """64x64 tile full test.""" + + ROWS = 64 + COLS = 64 + + def get_program(self) -> Any: + import pypto.language as pl + + @pl.program + class TileFullProgram: + @pl.function(type=pl.FunctionType.InCore) + def tile_full( + self, + c: pl.Tensor[[64, 64], pl.FP32], + ) -> pl.Tensor[[64, 64], pl.FP32]: + tile_c = pl.op.block.full([64, 64], dtype=pl.FP32, value=3.14) + out_c = pl.op.block.store(tile_c, [0, 0], [64, 64], c) + return out_c + + @pl.function(type=pl.FunctionType.Orchestration) + def orchestrator(self) -> pl.Tensor[[64, 64], pl.FP32]: + out_c = self.tile_full() + return out_c + + return TileFullProgram + + +class TestTileFull128x128(TestTileFull): + """128x128 tile full test.""" + + ROWS = 128 + COLS = 128 + + def get_program(self) -> Any: + import pypto.language as pl + + @pl.program + class TileFullProgram: + @pl.function(type=pl.FunctionType.InCore) + def tile_full( + self, + c: pl.Tensor[[128, 128], pl.FP32], + ) -> pl.Tensor[[128, 128], pl.FP32]: + tile_c = pl.op.block.full([128, 128], dtype=pl.FP32, value=3.14) + out_c = pl.op.block.store(tile_c, [0, 0], [128, 128], c) + return out_c + + @pl.function(type=pl.FunctionType.Orchestration) + def orchestrator(self) -> pl.Tensor[[128, 128], pl.FP32]: + out_c = self.tile_full() + return out_c + + return TileFullProgram + + +# ============================================================================= +# pytest test functions +# ============================================================================= + + +class TestMemoryOperations: + """Test suite for memory operations.""" + + # Load + Store + def test_tile_load_store_64x64(self, test_runner): + """Test tile load + store with 64x64 shape.""" + test_case = TestTileLoadStore64x64() + result = test_runner.run(test_case) + assert result.passed, f"Test failed for 64x64: {result.error}" + + def test_tile_load_store_128x128(self, test_runner): + """Test tile load + store with 128x128 shape.""" + test_case = TestTileLoadStore128x128() + result = test_runner.run(test_case) + assert result.passed, f"Test failed for 128x128: {result.error}" + + # Full + def test_tile_full_64x64(self, test_runner): + """Test tile full with 64x64 shape.""" + test_case = TestTileFull64x64() + result = test_runner.run(test_case) + assert result.passed, f"Test failed for 64x64: {result.error}" + + def test_tile_full_128x128(self, test_runner): + """Test tile full with 128x128 shape.""" + test_case = TestTileFull128x128() + result = test_runner.run(test_case) + assert result.passed, f"Test failed for 128x128: {result.error}" diff --git a/tests/test_cases/test_reduction.py b/tests/test_cases/test_reduction.py new file mode 100644 index 0000000..e64eaf6 --- /dev/null +++ b/tests/test_cases/test_reduction.py @@ -0,0 +1,255 @@ +""" +Tests for reduction operations using PyPTO frontend. + +Tests tile-level reduction operations: +- row_max: Row-wise maximum reduction +- row_sum: Row-wise sum reduction + +These tests use the simplified pattern where orchestration is auto-generated. +Each operation has both 64x64 and 128x128 test cases. +""" + +import sys +from pathlib import Path +from typing import Any, List + +import numpy as np + +from pto_test.core import environment +from pto_test.core.test_case import DataType, PTOTestCase, TensorSpec + +# Add pypto to path +_PYPTO_PYTHON = environment.get_pypto_python_path() +if _PYPTO_PYTHON is not None and _PYPTO_PYTHON.exists() and str(_PYPTO_PYTHON) not in sys.path: + sys.path.insert(0, str(_PYPTO_PYTHON)) + + +# ============================================================================= +# Row-wise max reduction +# ============================================================================= + + +class TestTileRowMax(PTOTestCase): + """Base class for tile row-wise max reduction tests.""" + + ROWS = 128 # Override in subclasses + COLS = 128 # Override in subclasses + + def get_name(self) -> str: + return f"tile_row_max_{self.ROWS}x{self.COLS}" + + def define_tensors(self) -> List[TensorSpec]: + return [ + TensorSpec( + "a", + [self.ROWS, self.COLS], + DataType.FP32, + init_value=lambda shape: np.random.randn(*shape), + ), + TensorSpec("c", [self.ROWS, 1], DataType.FP32, is_output=True), + ] + + def get_program(self) -> Any: + raise NotImplementedError("Subclasses must implement get_program() with their specific shape") + + def compute_expected(self, tensors, params=None): + tensors["c"][:] = np.max(tensors["a"], axis=1, keepdims=True) + + +class TestTileRowMax64x64(TestTileRowMax): + """64x64 tile row-wise max reduction test.""" + + ROWS = 64 + COLS = 64 + + def get_program(self) -> Any: + import pypto.language as pl + + @pl.program + class TileRowMaxProgram: + @pl.function(type=pl.FunctionType.InCore) + def tile_row_max( + self, + a: pl.Tensor[[64, 64], pl.FP32], + c: pl.Tensor[[64, 1], pl.FP32], + ) -> pl.Tensor[[64, 1], pl.FP32]: + tile_a = pl.op.block.load(a, [0, 0], [64, 64]) + tmp_tile: pl.Tile[[64, 64], pl.FP32] = pl.op.create_tile( + [64, 64], dtype=pl.FP32, target_memory=1 + ) + tile_c = pl.op.block.row_max(tile_a, tmp_tile) + out_c = pl.op.block.store(tile_c, [0, 0], [64, 1], c) + return out_c + + @pl.function(type=pl.FunctionType.Orchestration) + def orchestrator(self, a: pl.Tensor[[64, 64], pl.FP32]) -> pl.Tensor[[64, 1], pl.FP32]: + out_c = self.tile_row_max(a) + return out_c + + return TileRowMaxProgram + + +class TestTileRowMax128x128(TestTileRowMax): + """128x128 tile row-wise max reduction test.""" + + ROWS = 128 + COLS = 128 + + def get_program(self) -> Any: + import pypto.language as pl + + @pl.program + class TileRowMaxProgram: + @pl.function(type=pl.FunctionType.InCore) + def tile_row_max( + self, + a: pl.Tensor[[128, 128], pl.FP32], + c: pl.Tensor[[128, 1], pl.FP32], + ) -> pl.Tensor[[128, 1], pl.FP32]: + tile_a = pl.op.block.load(a, [0, 0], [128, 128]) + tmp_tile: pl.Tile[[128, 128], pl.FP32] = pl.op.create_tile( + [128, 128], dtype=pl.FP32, target_memory=1 + ) + tile_c = pl.op.block.row_max(tile_a, tmp_tile) + out_c = pl.op.block.store(tile_c, [0, 0], [128, 1], c) + return out_c + + @pl.function(type=pl.FunctionType.Orchestration) + def orchestrator(self, a: pl.Tensor[[128, 128], pl.FP32]) -> pl.Tensor[[128, 1], pl.FP32]: + out_c = self.tile_row_max(a) + return out_c + + return TileRowMaxProgram + + +# ============================================================================= +# Row-wise sum reduction +# ============================================================================= + + +class TestTileRowSum(PTOTestCase): + """Base class for tile row-wise sum reduction tests.""" + + ROWS = 128 # Override in subclasses + COLS = 128 # Override in subclasses + + def get_name(self) -> str: + return f"tile_row_sum_{self.ROWS}x{self.COLS}" + + def define_tensors(self) -> List[TensorSpec]: + return [ + TensorSpec( + "a", + [self.ROWS, self.COLS], + DataType.FP32, + init_value=lambda shape: np.random.randn(*shape), + ), + TensorSpec("c", [self.ROWS, 1], DataType.FP32, is_output=True), + ] + + def get_program(self) -> Any: + raise NotImplementedError("Subclasses must implement get_program() with their specific shape") + + def compute_expected(self, tensors, params=None): + tensors["c"][:] = np.sum(tensors["a"], axis=1, keepdims=True) + + +class TestTileRowSum64x64(TestTileRowSum): + """64x64 tile row-wise sum reduction test.""" + + ROWS = 64 + COLS = 64 + + def get_program(self) -> Any: + import pypto.language as pl + + @pl.program + class TileRowSumProgram: + @pl.function(type=pl.FunctionType.InCore) + def tile_row_sum( + self, + a: pl.Tensor[[64, 64], pl.FP32], + c: pl.Tensor[[64, 1], pl.FP32], + ) -> pl.Tensor[[64, 1], pl.FP32]: + tile_a = pl.op.block.load(a, [0, 0], [64, 64]) + tmp_tile: pl.Tile[[64, 64], pl.FP32] = pl.op.create_tile( + [64, 64], dtype=pl.FP32, target_memory=1 + ) + tile_c = pl.op.block.row_sum(tile_a, tmp_tile) + out_c = pl.op.block.store(tile_c, [0, 0], [64, 1], c) + return out_c + + @pl.function(type=pl.FunctionType.Orchestration) + def orchestrator(self, a: pl.Tensor[[64, 64], pl.FP32]) -> pl.Tensor[[64, 1], pl.FP32]: + out_c = self.tile_row_sum(a) + return out_c + + return TileRowSumProgram + + +class TestTileRowSum128x128(TestTileRowSum): + """128x128 tile row-wise sum reduction test.""" + + ROWS = 128 + COLS = 128 + + def get_program(self) -> Any: + import pypto.language as pl + + @pl.program + class TileRowSumProgram: + @pl.function(type=pl.FunctionType.InCore) + def tile_row_sum( + self, + a: pl.Tensor[[128, 128], pl.FP32], + c: pl.Tensor[[128, 1], pl.FP32], + ) -> pl.Tensor[[128, 1], pl.FP32]: + tile_a = pl.op.block.load(a, [0, 0], [128, 128]) + tmp_tile: pl.Tile[[128, 128], pl.FP32] = pl.op.create_tile( + [128, 128], dtype=pl.FP32, target_memory=1 + ) + tile_c = pl.op.block.row_sum(tile_a, tmp_tile) + out_c = pl.op.block.store(tile_c, [0, 0], [128, 1], c) + return out_c + + @pl.function(type=pl.FunctionType.Orchestration) + def orchestrator(self, a: pl.Tensor[[128, 128], pl.FP32]) -> pl.Tensor[[128, 1], pl.FP32]: + out_c = self.tile_row_sum(a) + return out_c + + return TileRowSumProgram + + +# ============================================================================= +# pytest test functions +# ============================================================================= + + +class TestReductionOperations: + """Test suite for reduction operations.""" + + # Row-wise max reduction + def test_tile_row_max_64x64(self, test_runner): + """Test tile row-wise max reduction with 64x64 shape.""" + test_case = TestTileRowMax64x64() + result = test_runner.run(test_case) + assert result.passed, f"Test failed for 64x64: {result.error}" + + def test_tile_row_max_128x128(self, test_runner): + """Test tile row-wise max reduction with 128x128 shape.""" + test_case = TestTileRowMax128x128() + result = test_runner.run(test_case) + assert result.passed, f"Test failed for 128x128: {result.error}" + + # Row-wise sum reduction + def test_tile_row_sum_64x64(self, test_runner): + """Test tile row-wise sum reduction with 64x64 shape.""" + test_case = TestTileRowSum64x64() + result = test_runner.run(test_case) + assert result.passed, f"Test failed for 64x64: {result.error}" + + def test_tile_row_sum_128x128(self, test_runner): + """Test tile row-wise sum reduction with 128x128 shape.""" + test_case = TestTileRowSum128x128() + result = test_runner.run(test_case) + assert result.passed, f"Test failed for 128x128: {result.error}" diff --git a/tests/test_cases/test_unary.py b/tests/test_cases/test_unary.py new file mode 100644 index 0000000..c1c181f --- /dev/null +++ b/tests/test_cases/test_unary.py @@ -0,0 +1,647 @@ +""" +Tests for unary operations using PyPTO frontend. + +Tests tile-level unary operations: +- log: Natural logarithm +- abs: Absolute value +- relu: ReLU activation (max(0, x)) +- exp: Exponential +- sqrt: Square root +- neg: Negation + +These tests use the simplified pattern where orchestration is auto-generated. +Each operation has both 64x64 and 128x128 test cases. +""" + +import sys +from pathlib import Path +from typing import Any, List + +import numpy as np + +from pto_test.core import environment +from pto_test.core.test_case import DataType, PTOTestCase, TensorSpec + +# Add pypto to path +_PYPTO_PYTHON = environment.get_pypto_python_path() +if _PYPTO_PYTHON is not None and _PYPTO_PYTHON.exists() and str(_PYPTO_PYTHON) not in sys.path: + sys.path.insert(0, str(_PYPTO_PYTHON)) + + +# ============================================================================= +# Natural logarithm +# ============================================================================= + + +class TestTileLog(PTOTestCase): + """Base class for tile natural logarithm tests.""" + + ROWS = 128 # Override in subclasses + COLS = 128 # Override in subclasses + + def get_name(self) -> str: + return f"tile_log_{self.ROWS}x{self.COLS}" + + def define_tensors(self) -> List[TensorSpec]: + return [ + TensorSpec("a", [self.ROWS, self.COLS], DataType.FP32, init_value=2.718), + TensorSpec("c", [self.ROWS, self.COLS], DataType.FP32, is_output=True), + ] + + def get_program(self) -> Any: + raise NotImplementedError("Subclasses must implement get_program() with their specific shape") + + def compute_expected(self, tensors, params=None): + tensors["c"][:] = np.log(tensors["a"]) + + +class TestTileLog64x64(TestTileLog): + """64x64 tile natural logarithm test.""" + + ROWS = 64 + COLS = 64 + + def get_program(self) -> Any: + import pypto.language as pl + + @pl.program + class TileLogProgram: + @pl.function(type=pl.FunctionType.InCore) + def tile_log( + self, + a: pl.Tensor[[64, 64], pl.FP32], + c: pl.Tensor[[64, 64], pl.FP32], + ) -> pl.Tensor[[64, 64], pl.FP32]: + tile_a = pl.op.block.load(a, [0, 0], [64, 64]) + tile_c = pl.op.block.log(tile_a) + out_c = pl.op.block.store(tile_c, [0, 0], [64, 64], c) + return out_c + + @pl.function(type=pl.FunctionType.Orchestration) + def orchestrator(self, a: pl.Tensor[[64, 64], pl.FP32]) -> pl.Tensor[[64, 64], pl.FP32]: + out_c = self.tile_log(a) + return out_c + + return TileLogProgram + + +class TestTileLog128x128(TestTileLog): + """128x128 tile natural logarithm test.""" + + ROWS = 128 + COLS = 128 + + def get_program(self) -> Any: + import pypto.language as pl + + @pl.program + class TileLogProgram: + @pl.function(type=pl.FunctionType.InCore) + def tile_log( + self, + a: pl.Tensor[[128, 128], pl.FP32], + c: pl.Tensor[[128, 128], pl.FP32], + ) -> pl.Tensor[[128, 128], pl.FP32]: + tile_a = pl.op.block.load(a, [0, 0], [128, 128]) + tile_c = pl.op.block.log(tile_a) + out_c = pl.op.block.store(tile_c, [0, 0], [128, 128], c) + return out_c + + @pl.function(type=pl.FunctionType.Orchestration) + def orchestrator(self, a: pl.Tensor[[128, 128], pl.FP32]) -> pl.Tensor[[128, 128], pl.FP32]: + out_c = self.tile_log(a) + return out_c + + return TileLogProgram + + +# ============================================================================= +# Absolute value +# ============================================================================= + + +class TestTileAbs(PTOTestCase): + """Base class for tile absolute value tests.""" + + ROWS = 128 # Override in subclasses + COLS = 128 # Override in subclasses + + def get_name(self) -> str: + return f"tile_abs_{self.ROWS}x{self.COLS}" + + def define_tensors(self) -> List[TensorSpec]: + return [ + TensorSpec( + "a", + [self.ROWS, self.COLS], + DataType.FP32, + init_value=lambda shape: np.random.randn(*shape) * 2 - 1, + ), + TensorSpec("c", [self.ROWS, self.COLS], DataType.FP32, is_output=True), + ] + + def get_program(self) -> Any: + raise NotImplementedError("Subclasses must implement get_program() with their specific shape") + + def compute_expected(self, tensors, params=None): + tensors["c"][:] = np.abs(tensors["a"]) + + +class TestTileAbs64x64(TestTileAbs): + """64x64 tile absolute value test.""" + + ROWS = 64 + COLS = 64 + + def get_program(self) -> Any: + import pypto.language as pl + + @pl.program + class TileAbsProgram: + @pl.function(type=pl.FunctionType.InCore) + def tile_abs( + self, + a: pl.Tensor[[64, 64], pl.FP32], + c: pl.Tensor[[64, 64], pl.FP32], + ) -> pl.Tensor[[64, 64], pl.FP32]: + tile_a = pl.op.block.load(a, [0, 0], [64, 64]) + tile_c = pl.op.block.abs(tile_a) + out_c = pl.op.block.store(tile_c, [0, 0], [64, 64], c) + return out_c + + @pl.function(type=pl.FunctionType.Orchestration) + def orchestrator(self, a: pl.Tensor[[64, 64], pl.FP32]) -> pl.Tensor[[64, 64], pl.FP32]: + out_c = self.tile_abs(a) + return out_c + + return TileAbsProgram + + +class TestTileAbs128x128(TestTileAbs): + """128x128 tile absolute value test.""" + + ROWS = 128 + COLS = 128 + + def get_program(self) -> Any: + import pypto.language as pl + + @pl.program + class TileAbsProgram: + @pl.function(type=pl.FunctionType.InCore) + def tile_abs( + self, + a: pl.Tensor[[128, 128], pl.FP32], + c: pl.Tensor[[128, 128], pl.FP32], + ) -> pl.Tensor[[128, 128], pl.FP32]: + tile_a = pl.op.block.load(a, [0, 0], [128, 128]) + tile_c = pl.op.block.abs(tile_a) + out_c = pl.op.block.store(tile_c, [0, 0], [128, 128], c) + return out_c + + @pl.function(type=pl.FunctionType.Orchestration) + def orchestrator(self, a: pl.Tensor[[128, 128], pl.FP32]) -> pl.Tensor[[128, 128], pl.FP32]: + out_c = self.tile_abs(a) + return out_c + + return TileAbsProgram + + +# ============================================================================= +# ReLU activation +# ============================================================================= + + +class TestTileRelu(PTOTestCase): + """Base class for tile ReLU activation tests.""" + + ROWS = 128 # Override in subclasses + COLS = 128 # Override in subclasses + + def get_name(self) -> str: + return f"tile_relu_{self.ROWS}x{self.COLS}" + + def define_tensors(self) -> List[TensorSpec]: + return [ + TensorSpec( + "a", + [self.ROWS, self.COLS], + DataType.FP32, + init_value=lambda shape: np.random.randn(*shape) * 2, + ), + TensorSpec("c", [self.ROWS, self.COLS], DataType.FP32, is_output=True), + ] + + def get_program(self) -> Any: + raise NotImplementedError("Subclasses must implement get_program() with their specific shape") + + def compute_expected(self, tensors, params=None): + tensors["c"][:] = np.maximum(0, tensors["a"]) + + +class TestTileRelu64x64(TestTileRelu): + """64x64 tile ReLU activation test.""" + + ROWS = 64 + COLS = 64 + + def get_program(self) -> Any: + import pypto.language as pl + + @pl.program + class TileReluProgram: + @pl.function(type=pl.FunctionType.InCore) + def tile_relu( + self, + a: pl.Tensor[[64, 64], pl.FP32], + c: pl.Tensor[[64, 64], pl.FP32], + ) -> pl.Tensor[[64, 64], pl.FP32]: + tile_a = pl.op.block.load(a, [0, 0], [64, 64]) + tile_c = pl.op.block.relu(tile_a) + out_c = pl.op.block.store(tile_c, [0, 0], [64, 64], c) + return out_c + + @pl.function(type=pl.FunctionType.Orchestration) + def orchestrator(self, a: pl.Tensor[[64, 64], pl.FP32]) -> pl.Tensor[[64, 64], pl.FP32]: + out_c = self.tile_relu(a) + return out_c + + return TileReluProgram + + +class TestTileRelu128x128(TestTileRelu): + """128x128 tile ReLU activation test.""" + + ROWS = 128 + COLS = 128 + + def get_program(self) -> Any: + import pypto.language as pl + + @pl.program + class TileReluProgram: + @pl.function(type=pl.FunctionType.InCore) + def tile_relu( + self, + a: pl.Tensor[[128, 128], pl.FP32], + c: pl.Tensor[[128, 128], pl.FP32], + ) -> pl.Tensor[[128, 128], pl.FP32]: + tile_a = pl.op.block.load(a, [0, 0], [128, 128]) + tile_c = pl.op.block.relu(tile_a) + out_c = pl.op.block.store(tile_c, [0, 0], [128, 128], c) + return out_c + + @pl.function(type=pl.FunctionType.Orchestration) + def orchestrator(self, a: pl.Tensor[[128, 128], pl.FP32]) -> pl.Tensor[[128, 128], pl.FP32]: + out_c = self.tile_relu(a) + return out_c + + return TileReluProgram + + +# ============================================================================= +# Exponential +# ============================================================================= + + +class TestTileExp(PTOTestCase): + """Base class for tile exponential tests.""" + + ROWS = 128 # Override in subclasses + COLS = 128 # Override in subclasses + + def get_name(self) -> str: + return f"tile_exp_{self.ROWS}x{self.COLS}" + + def define_tensors(self) -> List[TensorSpec]: + return [ + TensorSpec("a", [self.ROWS, self.COLS], DataType.FP32, init_value=1.0), + TensorSpec("c", [self.ROWS, self.COLS], DataType.FP32, is_output=True), + ] + + def get_program(self) -> Any: + raise NotImplementedError("Subclasses must implement get_program() with their specific shape") + + def compute_expected(self, tensors, params=None): + tensors["c"][:] = np.exp(tensors["a"]) + + +class TestTileExp64x64(TestTileExp): + """64x64 tile exponential test.""" + + ROWS = 64 + COLS = 64 + + def get_program(self) -> Any: + import pypto.language as pl + + @pl.program + class TileExpProgram: + @pl.function(type=pl.FunctionType.InCore) + def tile_exp( + self, + a: pl.Tensor[[64, 64], pl.FP32], + c: pl.Tensor[[64, 64], pl.FP32], + ) -> pl.Tensor[[64, 64], pl.FP32]: + tile_a = pl.op.block.load(a, [0, 0], [64, 64]) + tile_c = pl.op.block.exp(tile_a) + out_c = pl.op.block.store(tile_c, [0, 0], [64, 64], c) + return out_c + + @pl.function(type=pl.FunctionType.Orchestration) + def orchestrator(self, a: pl.Tensor[[64, 64], pl.FP32]) -> pl.Tensor[[64, 64], pl.FP32]: + out_c = self.tile_exp(a) + return out_c + + return TileExpProgram + + +class TestTileExp128x128(TestTileExp): + """128x128 tile exponential test.""" + + ROWS = 128 + COLS = 128 + + def get_program(self) -> Any: + import pypto.language as pl + + @pl.program + class TileExpProgram: + @pl.function(type=pl.FunctionType.InCore) + def tile_exp( + self, + a: pl.Tensor[[128, 128], pl.FP32], + c: pl.Tensor[[128, 128], pl.FP32], + ) -> pl.Tensor[[128, 128], pl.FP32]: + tile_a = pl.op.block.load(a, [0, 0], [128, 128]) + tile_c = pl.op.block.exp(tile_a) + out_c = pl.op.block.store(tile_c, [0, 0], [128, 128], c) + return out_c + + @pl.function(type=pl.FunctionType.Orchestration) + def orchestrator(self, a: pl.Tensor[[128, 128], pl.FP32]) -> pl.Tensor[[128, 128], pl.FP32]: + out_c = self.tile_exp(a) + return out_c + + return TileExpProgram + + +# ============================================================================= +# Square root +# ============================================================================= + + +class TestTileSqrt(PTOTestCase): + """Base class for tile square root tests.""" + + ROWS = 128 # Override in subclasses + COLS = 128 # Override in subclasses + + def get_name(self) -> str: + return f"tile_sqrt_{self.ROWS}x{self.COLS}" + + def define_tensors(self) -> List[TensorSpec]: + return [ + TensorSpec("a", [self.ROWS, self.COLS], DataType.FP32, init_value=4.0), + TensorSpec("c", [self.ROWS, self.COLS], DataType.FP32, is_output=True), + ] + + def get_program(self) -> Any: + raise NotImplementedError("Subclasses must implement get_program() with their specific shape") + + def compute_expected(self, tensors, params=None): + tensors["c"][:] = np.sqrt(tensors["a"]) + + +class TestTileSqrt64x64(TestTileSqrt): + """64x64 tile square root test.""" + + ROWS = 64 + COLS = 64 + + def get_program(self) -> Any: + import pypto.language as pl + + @pl.program + class TileSqrtProgram: + @pl.function(type=pl.FunctionType.InCore) + def tile_sqrt( + self, + a: pl.Tensor[[64, 64], pl.FP32], + c: pl.Tensor[[64, 64], pl.FP32], + ) -> pl.Tensor[[64, 64], pl.FP32]: + tile_a = pl.op.block.load(a, [0, 0], [64, 64]) + tile_c = pl.op.block.sqrt(tile_a) + out_c = pl.op.block.store(tile_c, [0, 0], [64, 64], c) + return out_c + + @pl.function(type=pl.FunctionType.Orchestration) + def orchestrator(self, a: pl.Tensor[[64, 64], pl.FP32]) -> pl.Tensor[[64, 64], pl.FP32]: + out_c = self.tile_sqrt(a) + return out_c + + return TileSqrtProgram + + +class TestTileSqrt128x128(TestTileSqrt): + """128x128 tile square root test.""" + + ROWS = 128 + COLS = 128 + + def get_program(self) -> Any: + import pypto.language as pl + + @pl.program + class TileSqrtProgram: + @pl.function(type=pl.FunctionType.InCore) + def tile_sqrt( + self, + a: pl.Tensor[[128, 128], pl.FP32], + c: pl.Tensor[[128, 128], pl.FP32], + ) -> pl.Tensor[[128, 128], pl.FP32]: + tile_a = pl.op.block.load(a, [0, 0], [128, 128]) + tile_c = pl.op.block.sqrt(tile_a) + out_c = pl.op.block.store(tile_c, [0, 0], [128, 128], c) + return out_c + + @pl.function(type=pl.FunctionType.Orchestration) + def orchestrator(self, a: pl.Tensor[[128, 128], pl.FP32]) -> pl.Tensor[[128, 128], pl.FP32]: + out_c = self.tile_sqrt(a) + return out_c + + return TileSqrtProgram + + +# ============================================================================= +# Negation +# ============================================================================= + + +class TestTileNeg(PTOTestCase): + """Base class for tile negation tests.""" + + ROWS = 128 # Override in subclasses + COLS = 128 # Override in subclasses + + def get_name(self) -> str: + return f"tile_neg_{self.ROWS}x{self.COLS}" + + def define_tensors(self) -> List[TensorSpec]: + return [ + TensorSpec("a", [self.ROWS, self.COLS], DataType.FP32, init_value=3.5), + TensorSpec("c", [self.ROWS, self.COLS], DataType.FP32, is_output=True), + ] + + def get_program(self) -> Any: + raise NotImplementedError("Subclasses must implement get_program() with their specific shape") + + def compute_expected(self, tensors, params=None): + tensors["c"][:] = -tensors["a"] + + +class TestTileNeg64x64(TestTileNeg): + """64x64 tile negation test.""" + + ROWS = 64 + COLS = 64 + + def get_program(self) -> Any: + import pypto.language as pl + + @pl.program + class TileNegProgram: + @pl.function(type=pl.FunctionType.InCore) + def tile_neg( + self, + a: pl.Tensor[[64, 64], pl.FP32], + c: pl.Tensor[[64, 64], pl.FP32], + ) -> pl.Tensor[[64, 64], pl.FP32]: + tile_a = pl.op.block.load(a, [0, 0], [64, 64]) + tile_c = pl.op.block.neg(tile_a) + out_c = pl.op.block.store(tile_c, [0, 0], [64, 64], c) + return out_c + + @pl.function(type=pl.FunctionType.Orchestration) + def orchestrator(self, a: pl.Tensor[[64, 64], pl.FP32]) -> pl.Tensor[[64, 64], pl.FP32]: + out_c = self.tile_neg(a) + return out_c + + return TileNegProgram + + +class TestTileNeg128x128(TestTileNeg): + """128x128 tile negation test.""" + + ROWS = 128 + COLS = 128 + + def get_program(self) -> Any: + import pypto.language as pl + + @pl.program + class TileNegProgram: + @pl.function(type=pl.FunctionType.InCore) + def tile_neg( + self, + a: pl.Tensor[[128, 128], pl.FP32], + c: pl.Tensor[[128, 128], pl.FP32], + ) -> pl.Tensor[[128, 128], pl.FP32]: + tile_a = pl.op.block.load(a, [0, 0], [128, 128]) + tile_c = pl.op.block.neg(tile_a) + out_c = pl.op.block.store(tile_c, [0, 0], [128, 128], c) + return out_c + + @pl.function(type=pl.FunctionType.Orchestration) + def orchestrator(self, a: pl.Tensor[[128, 128], pl.FP32]) -> pl.Tensor[[128, 128], pl.FP32]: + out_c = self.tile_neg(a) + return out_c + + return TileNegProgram + + +# ============================================================================= +# pytest test functions +# ============================================================================= + + +class TestUnaryOperations: + """Test suite for unary operations.""" + + # Natural logarithm + def test_tile_log_64x64(self, test_runner): + """Test tile natural logarithm with 64x64 shape.""" + test_case = TestTileLog64x64() + result = test_runner.run(test_case) + assert result.passed, f"Test failed for 64x64: {result.error}" + + def test_tile_log_128x128(self, test_runner): + """Test tile natural logarithm with 128x128 shape.""" + test_case = TestTileLog128x128() + result = test_runner.run(test_case) + assert result.passed, f"Test failed for 128x128: {result.error}" + + # Absolute value + def test_tile_abs_64x64(self, test_runner): + """Test tile absolute value with 64x64 shape.""" + test_case = TestTileAbs64x64() + result = test_runner.run(test_case) + assert result.passed, f"Test failed for 64x64: {result.error}" + + def test_tile_abs_128x128(self, test_runner): + """Test tile absolute value with 128x128 shape.""" + test_case = TestTileAbs128x128() + result = test_runner.run(test_case) + assert result.passed, f"Test failed for 128x128: {result.error}" + + # ReLU activation + def test_tile_relu_64x64(self, test_runner): + """Test tile ReLU activation with 64x64 shape.""" + test_case = TestTileRelu64x64() + result = test_runner.run(test_case) + assert result.passed, f"Test failed for 64x64: {result.error}" + + def test_tile_relu_128x128(self, test_runner): + """Test tile ReLU activation with 128x128 shape.""" + test_case = TestTileRelu128x128() + result = test_runner.run(test_case) + assert result.passed, f"Test failed for 128x128: {result.error}" + + # Exponential + def test_tile_exp_64x64(self, test_runner): + """Test tile exponential with 64x64 shape.""" + test_case = TestTileExp64x64() + result = test_runner.run(test_case) + assert result.passed, f"Test failed for 64x64: {result.error}" + + def test_tile_exp_128x128(self, test_runner): + """Test tile exponential with 128x128 shape.""" + test_case = TestTileExp128x128() + result = test_runner.run(test_case) + assert result.passed, f"Test failed for 128x128: {result.error}" + + # Square root + def test_tile_sqrt_64x64(self, test_runner): + """Test tile square root with 64x64 shape.""" + test_case = TestTileSqrt64x64() + result = test_runner.run(test_case) + assert result.passed, f"Test failed for 64x64: {result.error}" + + def test_tile_sqrt_128x128(self, test_runner): + """Test tile square root with 128x128 shape.""" + test_case = TestTileSqrt128x128() + result = test_runner.run(test_case) + assert result.passed, f"Test failed for 128x128: {result.error}" + + # Negation + def test_tile_neg_64x64(self, test_runner): + """Test tile negation with 64x64 shape.""" + test_case = TestTileNeg64x64() + result = test_runner.run(test_case) + assert result.passed, f"Test failed for 64x64: {result.error}" + + def test_tile_neg_128x128(self, test_runner): + """Test tile negation with 128x128 shape.""" + test_case = TestTileNeg128x128() + result = test_runner.run(test_case) + assert result.passed, f"Test failed for 128x128: {result.error}"