From e85f6664342ec193eb94c7f03e48f732a3d774bd Mon Sep 17 00:00:00 2001 From: root Date: Fri, 3 Apr 2026 07:54:10 +0000 Subject: [PATCH] 20260402 add ai edited test Co-Authored-By: Claude Opus 4.6 --- .../ai_edited_test/config_and_utils/readme.md | 20 ++ .../config_and_utils/test_ai_config_logger.py | 153 ++++++++++ .../config_and_utils/test_ai_spec_utils.py | 120 ++++++++ .../config_and_utils/test_ai_timers.py | 175 +++++++++++ .../ai_edited_test/fusions/readme.md | 23 ++ .../fusions/test_ai_fused_bias_geglu.py | 271 ++++++++++++++++++ .../fusions/test_ai_fused_bias_gelu.py | 141 +++++++++ .../fusions/test_ai_fused_bias_swiglu.py | 198 +++++++++++++ .../fusions/test_ai_fused_softmax.py | 98 +++++++ .../fusions/test_ai_fused_swiglu_scale.py | 162 +++++++++++ 10 files changed, 1361 insertions(+) create mode 100644 tests/single_card_tests/ai_edited_test/config_and_utils/readme.md create mode 100644 tests/single_card_tests/ai_edited_test/config_and_utils/test_ai_config_logger.py create mode 100644 tests/single_card_tests/ai_edited_test/config_and_utils/test_ai_spec_utils.py create mode 100644 tests/single_card_tests/ai_edited_test/config_and_utils/test_ai_timers.py create mode 100644 tests/single_card_tests/ai_edited_test/fusions/readme.md create mode 100644 tests/single_card_tests/ai_edited_test/fusions/test_ai_fused_bias_geglu.py create mode 100644 tests/single_card_tests/ai_edited_test/fusions/test_ai_fused_bias_gelu.py create mode 100644 tests/single_card_tests/ai_edited_test/fusions/test_ai_fused_bias_swiglu.py create mode 100644 tests/single_card_tests/ai_edited_test/fusions/test_ai_fused_softmax.py create mode 100644 tests/single_card_tests/ai_edited_test/fusions/test_ai_fused_swiglu_scale.py diff --git a/tests/single_card_tests/ai_edited_test/config_and_utils/readme.md b/tests/single_card_tests/ai_edited_test/config_and_utils/readme.md new file mode 100644 index 000000000..76aed00c1 --- /dev/null +++ b/tests/single_card_tests/ai_edited_test/config_and_utils/readme.md @@ -0,0 +1,20 @@ +# Config and Utils Tests + +本目录包含 PaddleFleet 配置日志和工具模块的 AI 自动生成单元测试。 + +This directory contains AI-generated unit tests for PaddleFleet config logger and utility modules. + +## Tested Files + +| Source File | Test File | Coverage Target | +|-------------|-----------|----------------| +| `config_logger.py` | `test_ai_config_logger.py` | get_config_logger_path, JSONEncoderWithMcoreTypes, log_config_to_disk | +| `spec_utils.py` | `test_ai_spec_utils.py` | LayerSpec, import_layer, get_layer, build_layer | +| `timers.py` | `test_ai_timers.py` | _Timer, RuntimeTimer, Timers | + +## Test Categories + +- JSON encoding of Paddle types (dtype, nn.Module, dict, list) +- Disk logging with config paths and rank strings +- Dynamic layer import and instantiation +- Timer start/stop/reset/elapsed operations diff --git a/tests/single_card_tests/ai_edited_test/config_and_utils/test_ai_config_logger.py b/tests/single_card_tests/ai_edited_test/config_and_utils/test_ai_config_logger.py new file mode 100644 index 000000000..ca0d0577f --- /dev/null +++ b/tests/single_card_tests/ai_edited_test/config_and_utils/test_ai_config_logger.py @@ -0,0 +1,153 @@ +# Copyright (c) 2026 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import os +import tempfile +import unittest +from types import SimpleNamespace +from unittest.mock import MagicMock, patch + +import paddle +from paddle import nn + + +class TestConfigLogger(unittest.TestCase): + """Tests for config_logger module.""" + + def test_get_config_logger_path(self): + from paddlefleet.config_logger import get_config_logger_path + + config = SimpleNamespace(config_logger_dir="/tmp/test_logger") + path = get_config_logger_path(config) + self.assertEqual(path, "/tmp/test_logger") + + def test_get_config_logger_path_default(self): + from paddlefleet.config_logger import get_config_logger_path + + config = SimpleNamespace() + path = get_config_logger_path(config) + self.assertEqual(path, "") + + def test_has_config_logger_enabled(self): + from paddlefleet.config_logger import has_config_logger_enabled + + config = SimpleNamespace(config_logger_dir="/tmp/test") + self.assertTrue(has_config_logger_enabled(config)) + + def test_has_config_logger_disabled(self): + from paddlefleet.config_logger import has_config_logger_enabled + + config = SimpleNamespace() + self.assertFalse(has_config_logger_enabled(config)) + + def test_get_path_count_first(self): + from paddlefleet.config_logger import ( + get_path_count, + ) + + count = get_path_count("/tmp/test_unique_3") + self.assertEqual(count, 0) + + def test_get_path_count_increments(self): + from paddlefleet.config_logger import get_path_count + + get_path_count("/tmp/test_unique_4") + count1 = get_path_count("/tmp/test_unique_4") + self.assertEqual(count1, 1) + + def test_get_path_with_count(self): + from paddlefleet.config_logger import get_path_with_count + + path = get_path_with_count("/tmp/test_unique_5") + self.assertTrue(path.endswith(".iter0")) + + def test_json_encoder_paddle_dtype(self): + from paddlefleet.config_logger import JSONEncoderWithMcoreTypes + + result = json.dumps(paddle.float32, cls=JSONEncoderWithMcoreTypes) + self.assertIn("float32", result) + + def test_json_encoder_nn_module(self): + from paddlefleet.config_logger import JSONEncoderWithMcoreTypes + + module = nn.Linear(10, 5) + result = json.dumps(module, cls=JSONEncoderWithMcoreTypes) + self.assertIsNotNone(result) + + def test_json_encoder_dict(self): + from paddlefleet.config_logger import JSONEncoderWithMcoreTypes + + data = {"a": 1, "b": paddle.float32} + result = json.dumps(data, cls=JSONEncoderWithMcoreTypes) + self.assertIn("float32", result) + + def test_json_encoder_list(self): + from paddlefleet.config_logger import JSONEncoderWithMcoreTypes + + data = [paddle.float16, paddle.float32] + result = json.dumps(data, cls=JSONEncoderWithMcoreTypes) + self.assertIsNotNone(result) + + def test_json_encoder_string_fallback(self): + from paddlefleet.config_logger import JSONEncoderWithMcoreTypes + + class CustomObj: + pass + + result = json.dumps(CustomObj(), cls=JSONEncoderWithMcoreTypes) + self.assertIsNotNone(result) + + @patch("paddlefleet.config_logger.parallel_state") + def test_log_config_to_disk(self, mock_ps): + from paddlefleet.config_logger import log_config_to_disk + + mock_ps.get_all_ranks = MagicMock(return_value="0_0") + with tempfile.TemporaryDirectory() as tmpdir: + config = SimpleNamespace(config_logger_dir=tmpdir) + data = {"key": "value", "dtype": paddle.float32} + log_config_to_disk(config, data, prefix="test", rank_str="0_0") + files = os.listdir(tmpdir) + self.assertTrue(any(f.startswith("test") for f in files)) + + @patch("paddlefleet.config_logger.parallel_state") + def test_log_config_to_disk_with_self(self, mock_ps): + from paddlefleet.config_logger import log_config_to_disk + + mock_ps.get_all_ranks = MagicMock(return_value="0_0") + with tempfile.TemporaryDirectory() as tmpdir: + config = SimpleNamespace(config_logger_dir=tmpdir) + m = nn.Linear(5, 3) + data = {"self": m, "extra": 42} + log_config_to_disk(config, data, prefix="layer", rank_str="0") + files = os.listdir(tmpdir) + self.assertTrue(len(files) > 0) + + @patch("paddlefleet.config_logger.parallel_state") + def test_log_config_to_disk_ordereddict(self, mock_ps): + from collections import OrderedDict + + from paddlefleet.config_logger import log_config_to_disk + + mock_ps.get_all_ranks = MagicMock(return_value="0_0") + with tempfile.TemporaryDirectory() as tmpdir: + config = SimpleNamespace(config_logger_dir=tmpdir) + data = OrderedDict([("a", 1), ("b", 2)]) + log_config_to_disk(config, data, prefix="od_test", rank_str="0") + files = os.listdir(tmpdir) + self.assertTrue(any(".pth" in f for f in files)) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/single_card_tests/ai_edited_test/config_and_utils/test_ai_spec_utils.py b/tests/single_card_tests/ai_edited_test/config_and_utils/test_ai_spec_utils.py new file mode 100644 index 000000000..a501855ee --- /dev/null +++ b/tests/single_card_tests/ai_edited_test/config_and_utils/test_ai_spec_utils.py @@ -0,0 +1,120 @@ +# Copyright (c) 2026 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + +import paddle + + +class TestLayerSpec(unittest.TestCase): + """Tests for LayerSpec dataclass.""" + + def test_repr_with_type(self): + from paddlefleet.spec_utils import LayerSpec + + spec = LayerSpec(layer=paddle.nn.Linear) + r = repr(spec) + self.assertIn("Linear", r) + + def test_default_kwargs(self): + from paddlefleet.spec_utils import LayerSpec + + spec = LayerSpec(layer=paddle.nn.Linear) + self.assertEqual(spec.extra_kwargs, {}) + + def test_with_extra_kwargs(self): + from paddlefleet.spec_utils import LayerSpec + + spec = LayerSpec(layer=paddle.nn.Linear, extra_kwargs={"bias": False}) + self.assertEqual(spec.extra_kwargs, {"bias": False}) + + +class TestImportLayer(unittest.TestCase): + """Tests for import_layer function.""" + + def test_valid_import(self): + from paddlefleet.spec_utils import import_layer + + layer = import_layer(("paddle.nn", "Linear")) + self.assertIs(layer, paddle.nn.Linear) + + def test_invalid_import(self): + from paddlefleet.spec_utils import import_layer + + layer = import_layer(("nonexistent_module_xyz", "FakeClass")) + self.assertIsNone(layer) + + +class TestGetLayer(unittest.TestCase): + """Tests for get_layer function.""" + + def test_with_type(self): + from paddlefleet.spec_utils import get_layer + + layer = get_layer(paddle.nn.Linear) + self.assertIs(layer, paddle.nn.Linear) + + def test_with_function(self): + from paddlefleet.spec_utils import get_layer + + def dummy_func(): + pass + + layer = get_layer(dummy_func) + self.assertIs(layer, dummy_func) + + def test_with_spec_type(self): + from paddlefleet.spec_utils import LayerSpec, get_layer + + spec = LayerSpec(layer=paddle.nn.Linear) + layer = get_layer(spec) + self.assertIs(layer, paddle.nn.Linear) + + def test_with_spec_tuple(self): + from paddlefleet.spec_utils import LayerSpec, get_layer + + spec = LayerSpec(layer=("paddle.nn", "Linear")) + layer = get_layer(spec) + self.assertIs(layer, paddle.nn.Linear) + + +class TestBuildLayer(unittest.TestCase): + """Tests for build_layer function.""" + + def test_with_type(self): + from paddlefleet.spec_utils import build_layer + + layer = build_layer(paddle.nn.Linear, 10, 5) + self.assertIsInstance(layer, paddle.nn.Linear) + self.assertEqual(layer.weight.shape[1], 5) + + def test_with_spec_and_kwargs(self): + from paddlefleet.spec_utils import LayerSpec, build_layer + + spec = LayerSpec(layer=paddle.nn.ReLU, extra_kwargs={}) + layer = build_layer(spec) + self.assertIsInstance(layer, paddle.nn.ReLU) + + def test_with_function(self): + from paddlefleet.spec_utils import build_layer + + def my_func(x): + return x + + result = build_layer(my_func) + self.assertIs(result, my_func) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/single_card_tests/ai_edited_test/config_and_utils/test_ai_timers.py b/tests/single_card_tests/ai_edited_test/config_and_utils/test_ai_timers.py new file mode 100644 index 000000000..4c02edf5f --- /dev/null +++ b/tests/single_card_tests/ai_edited_test/config_and_utils/test_ai_timers.py @@ -0,0 +1,175 @@ +# Copyright (c) 2026 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +import unittest + + +class TestTimerBasic(unittest.TestCase): + """Tests for _Timer class.""" + + def test_timer_init(self): + from paddlefleet.timers import _Timer + + timer = _Timer("test") + self.assertEqual(timer.name, "test") + self.assertEqual(timer.elapsed_, 0.0) + self.assertFalse(timer.started_) + + def test_timer_start_stop(self): + from paddlefleet.timers import _Timer + + timer = _Timer("test") + timer.start() + self.assertTrue(timer.started_) + timer.stop() + self.assertFalse(timer.started_) + self.assertGreater(timer.elapsed_, 0.0) + + def test_timer_elapsed_resets(self): + from paddlefleet.timers import _Timer + + timer = _Timer("test") + timer.start() + time.sleep(0.01) + timer.stop() + elapsed = timer.elapsed(reset=True) + self.assertGreater(elapsed, 0.0) + self.assertEqual(timer.elapsed_, 0.0) + + def test_timer_elapsed_no_reset(self): + from paddlefleet.timers import _Timer + + timer = _Timer("test") + timer.start() + time.sleep(0.01) + timer.stop() + elapsed1 = timer.elapsed(reset=False) + elapsed2 = timer.elapsed(reset=True) + self.assertAlmostEqual(elapsed1, elapsed2, places=3) + + def test_timer_reset(self): + from paddlefleet.timers import _Timer + + timer = _Timer("test") + timer.start() + time.sleep(0.01) + timer.stop() + timer.reset() + self.assertEqual(timer.elapsed_, 0.0) + self.assertFalse(timer.started_) + + def test_timer_start_twice_raises(self): + from paddlefleet.timers import _Timer + + timer = _Timer("test") + timer.start() + with self.assertRaises(AssertionError): + timer.start() + + def test_timer_stop_without_start_raises(self): + from paddlefleet.timers import _Timer + + timer = _Timer("test") + with self.assertRaises(AssertionError): + timer.stop() + + +class TestRuntimeTimer(unittest.TestCase): + """Tests for RuntimeTimer class.""" + + def test_runtime_timer_init(self): + from paddlefleet.timers import RuntimeTimer + + rt = RuntimeTimer() + self.assertIsNotNone(rt.timer) + + def test_runtime_timer_start_stop_log(self): + from paddlefleet.timers import RuntimeTimer + + rt = RuntimeTimer() + rt.start("test_op") + time.sleep(0.01) + rt.stop() + rt.log() + + +class TestTimers(unittest.TestCase): + """Tests for Timers group class.""" + + def test_timers_init(self): + from paddlefleet.timers import Timers + + timers = Timers() + self.assertEqual(timers.timers, {}) + + def test_timers_call(self): + from paddlefleet.timers import Timers + + timers = Timers() + timer = timers("forward") + self.assertEqual(timer.name, "forward") + + def test_timers_call_reuse(self): + from paddlefleet.timers import Timers + + timers = Timers() + t1 = timers("forward") + t2 = timers("forward") + self.assertIs(t1, t2) + + def test_timers_log(self): + from paddlefleet.timers import Timers + + timers = Timers() + timers("forward").start() + time.sleep(0.01) + timers("forward").stop() + timers.log(names=["forward"]) + + def test_timers_info(self): + from paddlefleet.timers import Timers + + timers = Timers() + timers("forward").start() + time.sleep(0.01) + timers("forward").stop() + result = timers.info(names=["forward"]) + self.assertIn("forward", result) + self.assertGreater(result["forward"], 0.0) + + def test_timers_info_with_normalizer(self): + from paddlefleet.timers import Timers + + timers = Timers() + timers("forward").start() + time.sleep(0.01) + timers("forward").stop() + result = timers.info(names=["forward"], normalizer=2.0) + self.assertIn("forward", result) + + def test_timers_log_multiple_names(self): + from paddlefleet.timers import Timers + + timers = Timers() + timers("forward").start() + timers("backward").start() + time.sleep(0.01) + timers("forward").stop() + timers("backward").stop() + timers.log(names=["forward", "backward"]) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/single_card_tests/ai_edited_test/fusions/readme.md b/tests/single_card_tests/ai_edited_test/fusions/readme.md new file mode 100644 index 000000000..8cf28c06a --- /dev/null +++ b/tests/single_card_tests/ai_edited_test/fusions/readme.md @@ -0,0 +1,23 @@ +# Fusions Module Tests + +本目录包含 PaddleFleet fusions 模块的 AI 自动生成单元测试,覆盖算子融合相关功能。 + +This directory contains AI-generated unit tests for the PaddleFleet fusions module, covering operator fusion functionality. + +## Tested Files + +| Source File | Test File | Coverage Target | +|-------------|-----------|----------------| +| `fused_swiglu_scale.py` | `test_ai_fused_swiglu_scale.py` | CPU/XPU fallback paths (forward + backward) | +| `fused_bias_gelu.py` | `test_ai_fused_bias_gelu.py` | bias_gelu, bias_gelu_back, GeLUFunction | +| `fused_bias_geglu.py` | `test_ai_fused_bias_geglu.py` | GEGLU, Quick-GEGLU, weighted variants | +| `fused_bias_swiglu.py` | `test_ai_fused_bias_swiglu.py` | SwiGLU, BiasSwiGLU, WeightedSwiGLU | +| `fused_softmax.py` | `test_ai_fused_softmax.py` | FusedScaleMaskSoftmax | + +## Test Categories + +- CPU fallback paths via mock `paddle.is_compiled_with_cuda()` +- Direct function calls (bias_gelu, geglu, quick_gelu, etc.) +- Autograd functions (PyLayer forward/backward) +- Shape and dtype verification +- Numerical correctness against reference implementations diff --git a/tests/single_card_tests/ai_edited_test/fusions/test_ai_fused_bias_geglu.py b/tests/single_card_tests/ai_edited_test/fusions/test_ai_fused_bias_geglu.py new file mode 100644 index 000000000..bf2e11679 --- /dev/null +++ b/tests/single_card_tests/ai_edited_test/fusions/test_ai_fused_bias_geglu.py @@ -0,0 +1,271 @@ +# Copyright (c) 2026 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + +import numpy as np +import paddle + + +class TestGeGLU(unittest.TestCase): + """Tests for GEGLU activation functions.""" + + def setUp(self): + paddle.seed(42) + self.y = paddle.randn([4, 16], dtype=paddle.float32) + self.g = paddle.randn([4, 8], dtype=paddle.float32) + + def test_geglu_shape(self): + from paddlefleet.fusions.fused_bias_geglu import geglu + + out = geglu(self.y) + self.assertEqual(out.shape, [4, 8]) + + def test_geglu_dtype(self): + from paddlefleet.fusions.fused_bias_geglu import geglu + + out = geglu(self.y) + self.assertEqual(out.dtype, paddle.float32) + + def test_geglu_correctness(self): + from paddlefleet.fusions.fused_bias_geglu import geglu + + out = geglu(self.y) + y_1, y_2 = paddle.chunk(self.y, 2, -1) + expected = ( + y_1 + * 0.5 + * (1.0 + paddle.tanh(0.79788456 * y_1 * (1 + 0.044715 * y_1 * y_1))) + ) * y_2 + np.testing.assert_allclose(out.numpy(), expected.numpy(), rtol=1e-5) + + def test_bias_geglu_shape(self): + from paddlefleet.fusions.fused_bias_geglu import bias_geglu + + bias = paddle.randn([16], dtype=paddle.float32) + out = bias_geglu(bias, self.y) + self.assertEqual(out.shape, [4, 8]) + + def test_bias_geglu_correctness(self): + from paddlefleet.fusions.fused_bias_geglu import bias_geglu + + bias = paddle.randn([16], dtype=paddle.float32) + out = bias_geglu(bias, self.y) + y_shifted = self.y + bias + y_1, y_2 = paddle.chunk(y_shifted, 2, -1) + expected = ( + y_1 + * 0.5 + * (1.0 + paddle.tanh(0.79788456 * y_1 * (1 + 0.044715 * y_1 * y_1))) + ) * y_2 + np.testing.assert_allclose(out.numpy(), expected.numpy(), rtol=1e-5) + + def test_geglu_back_shape(self): + from paddlefleet.fusions.fused_bias_geglu import geglu_back + + out = geglu_back(self.g, self.y) + self.assertEqual(out.shape, [4, 16]) + + def test_bias_geglu_back_shape(self): + from paddlefleet.fusions.fused_bias_geglu import bias_geglu_back + + bias = paddle.randn([16], dtype=paddle.float32) + out = bias_geglu_back(self.g, self.y, bias) + self.assertEqual(out.shape, [4, 16]) + + def test_bias_geglu_impl_with_bias(self): + from paddlefleet.fusions.fused_bias_geglu import bias_geglu_impl + + input_t = paddle.randn([4, 16], dtype=paddle.float32) + bias = paddle.randn([16], dtype=paddle.float32) + out = bias_geglu_impl(input_t, bias) + self.assertEqual(out.shape, [4, 8]) + + def test_bias_geglu_impl_without_bias(self): + from paddlefleet.fusions.fused_bias_geglu import bias_geglu_impl + + input_t = paddle.randn([4, 16], dtype=paddle.float32) + out = bias_geglu_impl(input_t, None) + self.assertEqual(out.shape, [4, 8]) + + def test_bias_geglu_impl_3d_input(self): + from paddlefleet.fusions.fused_bias_geglu import bias_geglu_impl + + input_3d = paddle.randn([2, 4, 16], dtype=paddle.float32) + bias = paddle.randn([16], dtype=paddle.float32) + out = bias_geglu_impl(input_3d, bias) + self.assertEqual(out.shape, [2, 4, 8]) + + +class TestQuickGeGLU(unittest.TestCase): + """Tests for Quick-GEGLU activation functions.""" + + def setUp(self): + paddle.seed(42) + self.y = paddle.randn([4, 16], dtype=paddle.float32) + + def test_quick_gelu_shape(self): + from paddlefleet.fusions.fused_bias_geglu import quick_gelu + + out = quick_gelu(self.y) + self.assertEqual(out.shape, [4, 16]) + + def test_quick_gelu_correctness(self): + from paddlefleet.fusions.fused_bias_geglu import quick_gelu + + out = quick_gelu(self.y) + expected = self.y * paddle.sigmoid(1.702 * self.y) + np.testing.assert_allclose(out.numpy(), expected.numpy(), rtol=1e-5) + + def test_quick_geglu_shape(self): + from paddlefleet.fusions.fused_bias_geglu import quick_geglu + + out = quick_geglu(self.y) + self.assertEqual(out.shape, [4, 8]) + + def test_quick_geglu_with_offset(self): + from paddlefleet.fusions.fused_bias_geglu import quick_geglu, quick_gelu + + out = quick_geglu(self.y, linear_offset=0.5) + self.assertEqual(out.shape, [4, 8]) + y_1, y_2 = paddle.chunk(self.y, 2, -1) + expected = quick_gelu(y_1) * (y_2 + 0.5) + np.testing.assert_allclose(out.numpy(), expected.numpy(), rtol=1e-5) + + def test_quick_geglu_zero_offset(self): + from paddlefleet.fusions.fused_bias_geglu import quick_geglu, quick_gelu + + out = quick_geglu(self.y, linear_offset=0.0) + y_1, y_2 = paddle.chunk(self.y, 2, -1) + expected = quick_gelu(y_1) * y_2 + np.testing.assert_allclose(out.numpy(), expected.numpy(), rtol=1e-5) + + def test_quick_geglu_back_shape(self): + from paddlefleet.fusions.fused_bias_geglu import quick_geglu_back + + g = paddle.randn([4, 8], dtype=paddle.float32) + out = quick_geglu_back(g, self.y) + self.assertEqual(out.shape, [4, 16]) + + def test_quick_geglu_back_with_offset(self): + from paddlefleet.fusions.fused_bias_geglu import quick_geglu_back + + g = paddle.randn([4, 8], dtype=paddle.float32) + out = quick_geglu_back(g, self.y, linear_offset=0.5) + self.assertEqual(out.shape, [4, 16]) + + def test_weighted_quick_geglu_shape(self): + from paddlefleet.fusions.fused_bias_geglu import weighted_quick_geglu + + weights = paddle.randn([4, 1], dtype=paddle.float32) + out = weighted_quick_geglu(self.y, weights) + self.assertEqual(out.shape, [4, 8]) + + def test_weighted_quick_geglu_with_offset(self): + from paddlefleet.fusions.fused_bias_geglu import weighted_quick_geglu + + weights = paddle.randn([4, 1], dtype=paddle.float32) + out = weighted_quick_geglu(self.y, weights, linear_offset=0.3) + self.assertEqual(out.shape, [4, 8]) + + def test_weighted_quick_geglu_back_shape(self): + from paddlefleet.fusions.fused_bias_geglu import ( + weighted_quick_geglu_back, + ) + + g = paddle.randn([4, 8], dtype=paddle.float32) + weights = paddle.randn([4, 1], dtype=paddle.float32) + input_grad, weights_grad = weighted_quick_geglu_back(g, self.y, weights) + self.assertEqual(input_grad.shape, [4, 16]) + self.assertEqual(weights_grad.shape, [4, 1]) + + def test_weighted_bias_quick_geglu_shape(self): + from paddlefleet.fusions.fused_bias_geglu import ( + weighted_bias_quick_geglu, + ) + + bias = paddle.randn([4, 16], dtype=paddle.float32) + weights = paddle.randn([4, 1], dtype=paddle.float32) + out = weighted_bias_quick_geglu(self.y, bias, weights) + self.assertEqual(out.shape, [4, 8]) + + def test_weighted_bias_quick_geglu_back_shape(self): + from paddlefleet.fusions.fused_bias_geglu import ( + weighted_bias_quick_geglu_back, + ) + + g = paddle.randn([4, 8], dtype=paddle.float32) + bias = paddle.randn([4, 16], dtype=paddle.float32) + weights = paddle.randn([4, 1], dtype=paddle.float32) + input_grad, bias_grad, weights_grad = weighted_bias_quick_geglu_back( + g, self.y, bias, weights + ) + self.assertEqual(input_grad.shape, [4, 16]) + self.assertEqual(bias_grad.shape, [4, 16]) + self.assertEqual(weights_grad.shape, [4, 1]) + + def test_weighted_bias_quick_geglu_impl_2d(self): + from paddlefleet.fusions.fused_bias_geglu import ( + weighted_bias_quick_geglu_impl, + ) + + weights = paddle.randn([4, 1], dtype=paddle.float32) + out = weighted_bias_quick_geglu_impl(self.y, None, weights) + self.assertEqual(out.shape, [4, 8]) + + def test_weighted_bias_quick_geglu_impl_3d(self): + from paddlefleet.fusions.fused_bias_geglu import ( + weighted_bias_quick_geglu_impl, + ) + + y_3d = paddle.randn([2, 4, 16], dtype=paddle.float32) + weights = paddle.randn([8, 1], dtype=paddle.float32) + out = weighted_bias_quick_geglu_impl(y_3d, None, weights) + self.assertEqual(out.shape, [2, 4, 8]) + + def test_weighted_bias_quick_geglu_impl_with_bias(self): + from paddlefleet.fusions.fused_bias_geglu import ( + weighted_bias_quick_geglu_impl, + ) + + bias = paddle.randn([4, 16], dtype=paddle.float32) + weights = paddle.randn([4, 1], dtype=paddle.float32) + out = weighted_bias_quick_geglu_impl(self.y, bias, weights) + self.assertEqual(out.shape, [4, 8]) + + def test_weighted_bias_quick_geglu_impl_with_clamp(self): + from paddlefleet.fusions.fused_bias_geglu import ( + weighted_bias_quick_geglu_impl, + ) + + weights = paddle.randn([4, 1], dtype=paddle.float32) + out = weighted_bias_quick_geglu_impl( + self.y, None, weights, clamp_value=1.0 + ) + self.assertEqual(out.shape, [4, 8]) + + def test_weighted_bias_quick_geglu_impl_with_offset(self): + from paddlefleet.fusions.fused_bias_geglu import ( + weighted_bias_quick_geglu_impl, + ) + + weights = paddle.randn([4, 1], dtype=paddle.float32) + out = weighted_bias_quick_geglu_impl( + self.y, None, weights, linear_offset=0.5 + ) + self.assertEqual(out.shape, [4, 8]) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/single_card_tests/ai_edited_test/fusions/test_ai_fused_bias_gelu.py b/tests/single_card_tests/ai_edited_test/fusions/test_ai_fused_bias_gelu.py new file mode 100644 index 000000000..d1e7db464 --- /dev/null +++ b/tests/single_card_tests/ai_edited_test/fusions/test_ai_fused_bias_gelu.py @@ -0,0 +1,141 @@ +# Copyright (c) 2026 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + +import numpy as np +import paddle + + +class TestBiasGelu(unittest.TestCase): + """Tests for bias_gelu and bias_gelu_back direct functions.""" + + def setUp(self): + paddle.seed(42) + self.bias = paddle.randn([8], dtype=paddle.float32) + self.y = paddle.randn([4, 8], dtype=paddle.float32) + self.g = paddle.randn([4, 8], dtype=paddle.float32) + + def test_bias_gelu_shape(self): + from paddlefleet.fusions.fused_bias_gelu import bias_gelu + + out = bias_gelu(self.bias, self.y) + self.assertEqual(out.shape, [4, 8]) + + def test_bias_gelu_dtype(self): + from paddlefleet.fusions.fused_bias_gelu import bias_gelu + + out = bias_gelu(self.bias, self.y) + self.assertEqual(out.dtype, paddle.float32) + + def test_bias_gelu_correctness(self): + from paddlefleet.fusions.fused_bias_gelu import bias_gelu + + out = bias_gelu(self.bias, self.y) + x = self.bias + self.y + expected = ( + x + * 0.5 + * (1.0 + paddle.tanh(0.79788456 * x * (1 + 0.044715 * x * x))) + ) + np.testing.assert_allclose(out.numpy(), expected.numpy(), rtol=1e-5) + + def test_bias_gelu_negative_input(self): + from paddlefleet.fusions.fused_bias_gelu import bias_gelu + + bias = paddle.full([4], -2.0, dtype=paddle.float32) + y = paddle.full([2, 4], -1.0, dtype=paddle.float32) + out = bias_gelu(bias, y) + self.assertEqual(out.shape, [2, 4]) + np.testing.assert_array_less(out.numpy(), 0.0) + + def test_bias_gelu_zero_input(self): + from paddlefleet.fusions.fused_bias_gelu import bias_gelu + + bias = paddle.zeros([4], dtype=paddle.float32) + y = paddle.zeros([2, 4], dtype=paddle.float32) + out = bias_gelu(bias, y) + np.testing.assert_allclose(out.numpy(), 0.0, atol=1e-6) + + def test_bias_gelu_back_shape(self): + from paddlefleet.fusions.fused_bias_gelu import bias_gelu_back + + out = bias_gelu_back(self.g, self.bias, self.y) + self.assertEqual(out.shape, [4, 8]) + + def test_bias_gelu_back_correctness(self): + from paddlefleet.fusions.fused_bias_gelu import bias_gelu_back + + out = bias_gelu_back(self.g, self.bias, self.y) + x = self.bias + self.y + tanh_out = paddle.tanh(0.79788456 * x * (1 + 0.044715 * x * x)) + ff = 0.5 * x * ( + (1 - tanh_out * tanh_out) * (0.79788456 + 0.1070322243 * x * x) + ) + 0.5 * (1 + tanh_out) + expected = ff * self.g + np.testing.assert_allclose(out.numpy(), expected.numpy(), rtol=1e-5) + + def test_bias_gelu_back_zero_grad(self): + from paddlefleet.fusions.fused_bias_gelu import bias_gelu_back + + g_zero = paddle.zeros([4, 8], dtype=paddle.float32) + out = bias_gelu_back(g_zero, self.bias, self.y) + np.testing.assert_allclose(out.numpy(), 0.0, atol=1e-6) + + +class TestGeLUFunction(unittest.TestCase): + """Tests for GeLUFunction autograd layer.""" + + def setUp(self): + paddle.seed(42) + self.input_t = paddle.randn([4, 8], dtype=paddle.float32) + self.bias = paddle.randn([8], dtype=paddle.float32) + + def test_forward_shape(self): + from paddlefleet.fusions.fused_bias_gelu import bias_gelu_impl + + out = bias_gelu_impl(self.input_t, self.bias) + self.assertEqual(out.shape, [4, 8]) + + def test_forward_backward(self): + from paddlefleet.fusions.fused_bias_gelu import bias_gelu_back + + g = paddle.randn([4, 8], dtype=paddle.float32) + grad = bias_gelu_back(g, self.input_t, self.bias) + self.assertEqual(grad.shape, [4, 8]) + + def test_apply_method(self): + from paddlefleet.fusions.fused_bias_gelu import GeLUFunction + + out = GeLUFunction.apply(self.input_t, self.bias) + self.assertEqual(out.shape, [4, 8]) + + def test_backward_gradient_shape(self): + from paddlefleet.fusions.fused_bias_gelu import bias_gelu_back + + g = paddle.randn([4, 8], dtype=paddle.float32) + grad = bias_gelu_back(g, self.input_t, self.bias) + self.assertEqual(grad.shape, [4, 8]) + + def test_2d_input(self): + from paddlefleet.fusions.fused_bias_gelu import bias_gelu_impl + + input_2d = paddle.randn([8, 16], dtype=paddle.float32) + bias_2d = paddle.randn([16], dtype=paddle.float32) + out = bias_gelu_impl(input_2d, bias_2d) + self.assertEqual(out.shape, [8, 16]) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/single_card_tests/ai_edited_test/fusions/test_ai_fused_bias_swiglu.py b/tests/single_card_tests/ai_edited_test/fusions/test_ai_fused_bias_swiglu.py new file mode 100644 index 000000000..d738820af --- /dev/null +++ b/tests/single_card_tests/ai_edited_test/fusions/test_ai_fused_bias_swiglu.py @@ -0,0 +1,198 @@ +# Copyright (c) 2026 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + +import numpy as np +import paddle + + +class TestSwiGLU(unittest.TestCase): + """Tests for SwiGLU activation functions.""" + + def setUp(self): + paddle.seed(42) + self.y = paddle.randn([4, 16], dtype=paddle.float32) + + def test_swiglu_shape(self): + from paddlefleet.fusions.fused_bias_swiglu import swiglu + + out = swiglu(self.y) + self.assertEqual(out.shape, [4, 8]) + + def test_bias_swiglu_shape(self): + from paddlefleet.fusions.fused_bias_swiglu import bias_swiglu + + bias = paddle.randn([16], dtype=paddle.float32) + out = bias_swiglu(self.y, bias) + self.assertEqual(out.shape, [4, 8]) + + def test_bias_swiglu_correctness(self): + import paddle.nn.functional as F + + from paddlefleet.fusions.fused_bias_swiglu import bias_swiglu + + bias = paddle.randn([16], dtype=paddle.float32) + out = bias_swiglu(self.y, bias) + expected = F.swiglu(self.y + bias) + np.testing.assert_allclose(out.numpy(), expected.numpy(), rtol=1e-5) + + def test_weighted_swiglu_shape(self): + from paddlefleet.fusions.fused_bias_swiglu import weighted_swiglu + + weights = paddle.randn([4, 1], dtype=paddle.float32) + out = weighted_swiglu(self.y, weights) + self.assertEqual(out.shape, [4, 8]) + + def test_weighted_swiglu_preserves_dtype(self): + from paddlefleet.fusions.fused_bias_swiglu import weighted_swiglu + + y_f16 = self.y.cast(paddle.float16) + weights = paddle.randn([4, 1], dtype=paddle.float32) + out = weighted_swiglu(y_f16, weights) + self.assertEqual(out.dtype, paddle.float16) + + def test_bias_swiglu_back_shape(self): + from paddlefleet.fusions.fused_bias_swiglu import bias_swiglu_back + + g = paddle.randn([4, 8], dtype=paddle.float32) + bias = paddle.randn([16], dtype=paddle.float32) + out = bias_swiglu_back(g, self.y, bias) + self.assertEqual(out.shape, [4, 16]) + + def test_weighted_swiglu_back_shape(self): + from paddlefleet.fusions.fused_bias_swiglu import weighted_swiglu_back + + g = paddle.randn([4, 8], dtype=paddle.float32) + weights = paddle.randn([4, 1], dtype=paddle.float32) + input_grad, weights_grad = weighted_swiglu_back(g, self.y, weights) + self.assertEqual(input_grad.shape, [4, 16]) + self.assertEqual(weights_grad.shape, [4, 1]) + + +class TestBiasSwiGLUImpl(unittest.TestCase): + """Tests for bias_swiglu_impl function.""" + + def setUp(self): + paddle.seed(42) + + def test_2d_input_with_bias(self): + from paddlefleet.fusions.fused_bias_swiglu import bias_swiglu_impl + + input_t = paddle.randn([4, 16], dtype=paddle.float32) + bias = paddle.randn([16], dtype=paddle.float32) + out = bias_swiglu_impl(input_t, bias) + self.assertEqual(out.shape, [4, 8]) + + def test_2d_input_without_bias(self): + from paddlefleet.fusions.fused_bias_swiglu import bias_swiglu_impl + + input_t = paddle.randn([4, 16], dtype=paddle.float32) + out = bias_swiglu_impl(input_t, None) + self.assertEqual(out.shape, [4, 8]) + + def test_3d_input_with_bias(self): + from paddlefleet.fusions.fused_bias_swiglu import bias_swiglu_impl + + input_3d = paddle.randn([2, 4, 16], dtype=paddle.float32) + bias = paddle.randn([16], dtype=paddle.float32) + out = bias_swiglu_impl(input_3d, bias) + self.assertEqual(out.shape, [2, 4, 8]) + + def test_3d_input_without_bias(self): + from paddlefleet.fusions.fused_bias_swiglu import bias_swiglu_impl + + input_3d = paddle.randn([2, 4, 16], dtype=paddle.float32) + out = bias_swiglu_impl(input_3d, None) + self.assertEqual(out.shape, [2, 4, 8]) + + def test_with_fp8_input_store(self): + from paddlefleet.fusions.fused_bias_swiglu import bias_swiglu_impl + + input_t = paddle.randn([4, 16], dtype=paddle.float32) + bias = paddle.randn([16], dtype=paddle.float32) + out = bias_swiglu_impl(input_t, bias, fp8_input_store=True) + self.assertEqual(out.shape, [4, 8]) + + def test_with_cpu_offload(self): + from paddlefleet.fusions.fused_bias_swiglu import bias_swiglu_impl + + input_t = paddle.randn([4, 16], dtype=paddle.float32) + bias = paddle.randn([16], dtype=paddle.float32) + out = bias_swiglu_impl(input_t, bias, cpu_offload_input=True) + self.assertEqual(out.shape, [4, 8]) + + +class TestWeightedBiasSwiGLUImpl(unittest.TestCase): + """Tests for weighted_bias_swiglu_impl function.""" + + def setUp(self): + paddle.seed(42) + + def test_2d_input_no_bias(self): + from paddlefleet.fusions.fused_bias_swiglu import ( + weighted_bias_swiglu_impl, + ) + + input_t = paddle.randn([4, 16], dtype=paddle.float32) + weights = paddle.randn([4, 1], dtype=paddle.float32) + out = weighted_bias_swiglu_impl(input_t, None, weights) + self.assertEqual(out.shape, [4, 8]) + + def test_3d_input_no_bias(self): + from paddlefleet.fusions.fused_bias_swiglu import ( + weighted_bias_swiglu_impl, + ) + + input_3d = paddle.randn([2, 4, 16], dtype=paddle.float32) + weights = paddle.randn([8, 1], dtype=paddle.float32) + out = weighted_bias_swiglu_impl(input_3d, None, weights) + self.assertEqual(out.shape, [2, 4, 8]) + + def test_with_bias_raises(self): + from paddlefleet.fusions.fused_bias_swiglu import ( + weighted_bias_swiglu_impl, + ) + + input_t = paddle.randn([4, 16], dtype=paddle.float32) + bias = paddle.randn([16], dtype=paddle.float32) + weights = paddle.randn([4, 1], dtype=paddle.float32) + with self.assertRaises(NotImplementedError): + weighted_bias_swiglu_impl(input_t, bias, weights) + + def test_with_fp8_input_store(self): + from paddlefleet.fusions.fused_bias_swiglu import ( + weighted_bias_swiglu_impl, + ) + + input_t = paddle.randn([4, 16], dtype=paddle.float32) + weights = paddle.randn([4, 1], dtype=paddle.float32) + out = weighted_bias_swiglu_impl( + input_t, None, weights, fp8_input_store=True + ) + self.assertEqual(out.shape, [4, 8]) + + def test_forward_backward_2d(self): + from paddlefleet.fusions.fused_bias_swiglu import weighted_swiglu_back + + input_t = paddle.randn([4, 16], dtype=paddle.float32) + weights = paddle.randn([4, 1], dtype=paddle.float32) + g = paddle.randn([4, 8], dtype=paddle.float32) + input_grad, weights_grad = weighted_swiglu_back(g, input_t, weights) + self.assertEqual(input_grad.shape, [4, 16]) + self.assertEqual(weights_grad.shape, [4, 1]) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/single_card_tests/ai_edited_test/fusions/test_ai_fused_softmax.py b/tests/single_card_tests/ai_edited_test/fusions/test_ai_fused_softmax.py new file mode 100644 index 000000000..e2c07a5e5 --- /dev/null +++ b/tests/single_card_tests/ai_edited_test/fusions/test_ai_fused_softmax.py @@ -0,0 +1,98 @@ +# Copyright (c) 2026 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + +import numpy as np +import paddle + + +class TestFusedScaleMaskSoftmax(unittest.TestCase): + """Tests for FusedScaleMaskSoftmax.""" + + def test_fused_scale_mask_softmax_with_scale(self): + from paddlefleet.fusions.fused_softmax import FusedScaleMaskSoftmax + from paddlefleet.transformer.enums import AttnMaskType + + layer = FusedScaleMaskSoftmax( + input_in_fp16=False, + input_in_bf16=False, + attn_mask_type=AttnMaskType.padding, + scaled_masked_softmax_fusion=True, + mask_func=lambda x, m: x, + softmax_in_fp32=True, + scale=0.125, + ) + x = paddle.randn([2, 4, 8, 16], dtype=paddle.float32) + out = layer(x, mask=None) + self.assertEqual(out.shape, [2, 4, 8, 16]) + + def test_fused_scale_mask_softmax_with_mask(self): + from paddlefleet.fusions.fused_softmax import FusedScaleMaskSoftmax + from paddlefleet.transformer.enums import AttnMaskType + + layer = FusedScaleMaskSoftmax( + input_in_fp16=False, + input_in_bf16=False, + attn_mask_type=AttnMaskType.padding, + scaled_masked_softmax_fusion=True, + mask_func=lambda x, m: x * m.cast(x.dtype), + softmax_in_fp32=True, + scale=None, + ) + x = paddle.randn([2, 4, 8, 16], dtype=paddle.float32) + mask = (paddle.triu(paddle.ones([8, 16]), diagonal=1) * -1e9).cast( + paddle.float32 + ) + out = layer(x, mask=mask) + self.assertEqual(out.shape, [2, 4, 8, 16]) + + def test_fused_scale_mask_softmax_no_scale(self): + from paddlefleet.fusions.fused_softmax import FusedScaleMaskSoftmax + from paddlefleet.transformer.enums import AttnMaskType + + layer = FusedScaleMaskSoftmax( + input_in_fp16=False, + input_in_bf16=False, + attn_mask_type=AttnMaskType.padding, + scaled_masked_softmax_fusion=True, + mask_func=lambda x, m: x, + softmax_in_fp32=True, + scale=None, + ) + x = paddle.randn([2, 4, 8, 16], dtype=paddle.float32) + out = layer(x, mask=None) + self.assertEqual(out.shape, [2, 4, 8, 16]) + np.testing.assert_allclose(out.sum(axis=-1).numpy(), 1.0, atol=1e-5) + + def test_fused_scale_mask_softmax_causal_mask_auto_gen(self): + from paddlefleet.fusions.fused_softmax import FusedScaleMaskSoftmax + from paddlefleet.transformer.enums import AttnMaskType + + layer = FusedScaleMaskSoftmax( + input_in_fp16=False, + input_in_bf16=False, + attn_mask_type=AttnMaskType.causal, + scaled_masked_softmax_fusion=True, + mask_func=lambda x, m: x * m.cast(x.dtype), + softmax_in_fp32=True, + scale=None, + ) + x = paddle.randn([2, 4, 8, 8], dtype=paddle.float32) + out = layer(x, mask=None) + self.assertEqual(out.shape, [2, 4, 8, 8]) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/single_card_tests/ai_edited_test/fusions/test_ai_fused_swiglu_scale.py b/tests/single_card_tests/ai_edited_test/fusions/test_ai_fused_swiglu_scale.py new file mode 100644 index 000000000..9777845f3 --- /dev/null +++ b/tests/single_card_tests/ai_edited_test/fusions/test_ai_fused_swiglu_scale.py @@ -0,0 +1,162 @@ +# Copyright (c) 2026 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +from unittest.mock import patch + +import numpy as np +import paddle + + +class TestFusedSwigluScaleCPUFallback(unittest.TestCase): + """Tests for fused_swiglu_scale CPU/XPU fallback paths.""" + + def setUp(self): + paddle.seed(42) + self.x = paddle.randn([4, 16], dtype=paddle.float32) + self.scale = paddle.randn([4], dtype=paddle.float32) + self.out_grad = paddle.randn([4, 8], dtype=paddle.float32) + + @patch("paddle.is_compiled_with_cuda", return_value=False) + def test_forward_cpu_fallback_shape(self, mock_cuda): + from paddlefleet.fusions.fused_swiglu_scale import ( + fused_swiglu_scale_forward, + ) + + out = fused_swiglu_scale_forward(self.x, self.scale) + self.assertEqual(out.shape, [4, 8]) + + @patch("paddle.is_compiled_with_cuda", return_value=False) + def test_forward_cpu_fallback_dtype(self, mock_cuda): + from paddlefleet.fusions.fused_swiglu_scale import ( + fused_swiglu_scale_forward, + ) + + out = fused_swiglu_scale_forward(self.x, self.scale) + self.assertEqual(out.dtype, paddle.float32) + + @patch("paddle.is_compiled_with_cuda", return_value=False) + def test_forward_cpu_fallback_correctness(self, mock_cuda): + import paddle.nn.functional as F + + from paddlefleet.fusions.fused_swiglu_scale import ( + fused_swiglu_scale_forward, + ) + + out = fused_swiglu_scale_forward(self.x, self.scale) + ref = F.swiglu(self.x) + scale_exp = self.scale.cast(self.x.dtype).unsqueeze(-1) + ref_out = ref * scale_exp + np.testing.assert_allclose(out.numpy(), ref_out.numpy(), rtol=1e-5) + + @patch("paddle.is_compiled_with_cuda", return_value=False) + def test_backward_cpu_fallback_shape(self, mock_cuda): + from paddlefleet.fusions.fused_swiglu_scale import ( + fused_swiglu_scale_backward, + ) + + d_x, d_scale = fused_swiglu_scale_backward( + self.x, self.scale, self.out_grad + ) + self.assertEqual(d_x.shape, self.x.shape) + self.assertEqual(d_scale.shape, self.scale.shape) + + @patch("paddle.is_compiled_with_cuda", return_value=False) + def test_backward_cpu_fallback_dtype(self, mock_cuda): + from paddlefleet.fusions.fused_swiglu_scale import ( + fused_swiglu_scale_backward, + ) + + d_x, d_scale = fused_swiglu_scale_backward( + self.x, self.scale, self.out_grad + ) + self.assertEqual(d_x.dtype, paddle.float32) + self.assertEqual(d_scale.dtype, self.scale.dtype) + + @patch("paddle.is_compiled_with_cuda", return_value=False) + def test_backward_cpu_fallback_d_scale_correctness(self, mock_cuda): + from paddlefleet.fusions.fused_swiglu_scale import ( + fused_swiglu_scale_backward, + ) + + d_x, d_scale = fused_swiglu_scale_backward( + self.x, self.scale, self.out_grad + ) + hidden = self.x.shape[-1] // 2 + gate = self.x[..., :hidden] + val = self.x[..., hidden:] + sig = paddle.sigmoid(gate).cast(self.x.dtype) + silu = gate * sig + swiglu_val = silu * val + ref_d_scale = paddle.sum( + self.out_grad.cast(paddle.float32) + * swiglu_val.cast(paddle.float32), + axis=-1, + ).cast(self.scale.dtype) + np.testing.assert_allclose( + d_scale.numpy(), ref_d_scale.numpy(), rtol=1e-5 + ) + + @patch("paddle.is_compiled_with_cuda", return_value=False) + def test_forward_3d_input(self, mock_cuda): + from paddlefleet.fusions.fused_swiglu_scale import ( + fused_swiglu_scale_forward, + ) + + x_3d = paddle.randn([2, 4, 16], dtype=paddle.float32) + scale_1d = paddle.randn([2], dtype=paddle.float32) + out = fused_swiglu_scale_forward(x_3d, scale_1d) + self.assertEqual(out.shape, [2, 4, 8]) + + @patch("paddle.is_compiled_with_cuda", return_value=False) + def test_backward_3d_input(self, mock_cuda): + from paddlefleet.fusions.fused_swiglu_scale import ( + fused_swiglu_scale_backward, + ) + + x_3d = paddle.randn([2, 4, 16], dtype=paddle.float32) + scale_1d = paddle.randn([2], dtype=paddle.float32) + out_grad_3d = paddle.randn([2, 4, 8], dtype=paddle.float32) + d_x, d_scale = fused_swiglu_scale_backward(x_3d, scale_1d, out_grad_3d) + self.assertEqual(d_x.shape, [2, 4, 16]) + self.assertEqual(d_scale.shape, [2, 4]) + + @patch("paddle.is_compiled_with_cuda", return_value=False) + def test_forward_scale_scalar(self, mock_cuda): + from paddlefleet.fusions.fused_swiglu_scale import ( + fused_swiglu_scale_forward, + ) + + scale_scalar = paddle.to_tensor([1.5], dtype=paddle.float32) + out = fused_swiglu_scale_forward(self.x, scale_scalar) + self.assertEqual(out.shape, [4, 8]) + + @patch("paddle.is_compiled_with_cuda", return_value=False) + def test_backward_with_float16(self, mock_cuda): + from paddlefleet.fusions.fused_swiglu_scale import ( + fused_swiglu_scale_backward, + ) + + x_f16 = paddle.randn([4, 16], dtype=paddle.float16) + scale_f16 = paddle.randn([4], dtype=paddle.float16) + out_grad_f16 = paddle.randn([4, 8], dtype=paddle.float16) + d_x, d_scale = fused_swiglu_scale_backward( + x_f16, scale_f16, out_grad_f16 + ) + self.assertEqual(d_x.dtype, paddle.float16) + self.assertEqual(d_scale.dtype, paddle.float16) + + +if __name__ == "__main__": + unittest.main()