From 331e6997c870081fbfafa3da1343cc54eed23d17 Mon Sep 17 00:00:00 2001 From: zhupengyang Date: Thu, 4 Sep 2025 19:37:06 +0800 Subject: [PATCH] [xpu] support ep --- custom_ops/xpu_ops/src/ops/moe_topk_select.cc | 26 +- custom_ops/xpu_ops/src/ops/pybind/pybind.cc | 2 +- fastdeploy/input/ernie4_5_processor.py | 2 +- fastdeploy/input/text_processor.py | 2 +- .../layers/backends/xpu/__init__.py | 12 +- .../layers/backends/xpu/moe/__init__.py | 16 + .../layers/backends/xpu/moe/ep.py | 415 +++++++++++++ .../layers/backends/xpu/moe/fused_moe.py | 550 ++++++++++++++++++ .../backends/xpu/quantization/weight_only.py | 14 +- fastdeploy/model_executor/layers/moe/ep.py | 109 +++- .../layers/moe/fused_moe_backend_base.py | 21 +- .../layers/moe/fused_moe_xpu_backend.py | 258 -------- fastdeploy/model_executor/layers/moe/moe.py | 2 +- .../layers/quantization/weight_only.py | 33 +- fastdeploy/output/token_processor.py | 2 +- fastdeploy/worker/xpu_model_runner.py | 27 +- fastdeploy/worker/xpu_worker.py | 14 +- scripts/run_ci_xpu.sh | 3 + 18 files changed, 1179 insertions(+), 329 deletions(-) create mode 100644 fastdeploy/model_executor/layers/backends/xpu/moe/__init__.py create mode 100644 fastdeploy/model_executor/layers/backends/xpu/moe/ep.py create mode 100644 fastdeploy/model_executor/layers/backends/xpu/moe/fused_moe.py delete mode 100644 fastdeploy/model_executor/layers/moe/fused_moe_xpu_backend.py diff --git a/custom_ops/xpu_ops/src/ops/moe_topk_select.cc b/custom_ops/xpu_ops/src/ops/moe_topk_select.cc index 7f39e4482c..1ed1cd569f 100644 --- a/custom_ops/xpu_ops/src/ops/moe_topk_select.cc +++ b/custom_ops/xpu_ops/src/ops/moe_topk_select.cc @@ -43,18 +43,20 @@ std::vector MoeTopkSelect( int32_t* block_statistic = nullptr; const float* bias_data = bias.get_ptr() != nullptr ? bias.get_ptr()->data() : nullptr; - int ret = infer_ops::moe_softmax_topk_norm_fusion( - xpu_ctx->x_context(), - gating_logits.data(), - topk_weights.mutable_data(), - topk_ids.mutable_data(), - block_statistic, - token_num, - expert_num, - moe_topk, - 0, - bias_data); - PD_CHECK(ret == 0); + if (token_num > 0) { + int ret = infer_ops::moe_softmax_topk_norm_fusion( + xpu_ctx->x_context(), + gating_logits.data(), + topk_weights.mutable_data(), + topk_ids.mutable_data(), + block_statistic, + token_num, + expert_num, + moe_topk, + 0, + bias_data); + PD_CHECK(ret == 0); + } return {topk_ids, topk_weights}; } diff --git a/custom_ops/xpu_ops/src/ops/pybind/pybind.cc b/custom_ops/xpu_ops/src/ops/pybind/pybind.cc index de81c49bfc..e8eea990b2 100644 --- a/custom_ops/xpu_ops/src/ops/pybind/pybind.cc +++ b/custom_ops/xpu_ops/src/ops/pybind/pybind.cc @@ -416,7 +416,7 @@ PYBIND11_MODULE(fastdeploy_ops, m) { py::arg("bias"), py::arg("weight_dtype"), py::arg("arch"), - py::arg("group_size")); + py::arg("group_size")=-1); m.def("ep_moe_expert_combine", &MoeEPCombine, diff --git a/fastdeploy/input/ernie4_5_processor.py b/fastdeploy/input/ernie4_5_processor.py index 01289a3338..f364ecba11 100644 --- a/fastdeploy/input/ernie4_5_processor.py +++ b/fastdeploy/input/ernie4_5_processor.py @@ -265,7 +265,7 @@ def process_response(self, response_dict, **kwargs): if tool_call_info.tools_called: response_dict.outputs.tool_calls = tool_call_info.tool_calls response_dict.outputs.text = tool_call_info.content - data_processor_logger.info(f"req_id:{req_id}, token)ids: {token_ids}") + data_processor_logger.info(f"req_id:{req_id}, token_ids: {token_ids}") if response_dict.outputs.text == "" and response_dict.outputs.reasoning_content == "": return None return response_dict diff --git a/fastdeploy/input/text_processor.py b/fastdeploy/input/text_processor.py index 5e620cd3d7..97aac5cf6f 100644 --- a/fastdeploy/input/text_processor.py +++ b/fastdeploy/input/text_processor.py @@ -377,7 +377,7 @@ def process_response(self, response_dict, **kwargs): if tool_call_info.tools_called: response_dict.outputs.tool_calls = tool_call_info.tool_calls response_dict.outputs.text = tool_call_info.content - data_processor_logger.info(f"req_id:{req_id}, token)ids: {token_ids}") + data_processor_logger.info(f"req_id:{req_id}, token_ids: {token_ids}") return response_dict diff --git a/fastdeploy/model_executor/layers/backends/xpu/__init__.py b/fastdeploy/model_executor/layers/backends/xpu/__init__.py index d528ebe073..ac530637e5 100644 --- a/fastdeploy/model_executor/layers/backends/xpu/__init__.py +++ b/fastdeploy/model_executor/layers/backends/xpu/__init__.py @@ -16,6 +16,16 @@ xpu backend methods """ +from .moe.fused_moe import ( + XPUMoEMethod, + XPUWeightOnlyMoeEpMethod, + XPUWeightOnlyMoEMethod, +) from .quantization.weight_only import XPUWeightOnlyLinearMethod -__all__ = ["XPUWeightOnlyLinearMethod"] +__all__ = [ + "XPUWeightOnlyLinearMethod", + "XPUMoEMethod", + "XPUWeightOnlyMoEMethod", + "XPUWeightOnlyMoeEpMethod", +] diff --git a/fastdeploy/model_executor/layers/backends/xpu/moe/__init__.py b/fastdeploy/model_executor/layers/backends/xpu/moe/__init__.py new file mode 100644 index 0000000000..6dd6afd479 --- /dev/null +++ b/fastdeploy/model_executor/layers/backends/xpu/moe/__init__.py @@ -0,0 +1,16 @@ +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +xpu fused moe methods +""" diff --git a/fastdeploy/model_executor/layers/backends/xpu/moe/ep.py b/fastdeploy/model_executor/layers/backends/xpu/moe/ep.py new file mode 100644 index 0000000000..c2ec2b9ae9 --- /dev/null +++ b/fastdeploy/model_executor/layers/backends/xpu/moe/ep.py @@ -0,0 +1,415 @@ +""" +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" + +from abc import abstractmethod + +import deep_ep +import paddle +from paddle import nn +from paddleformers.utils.log import logger + +import fastdeploy +from fastdeploy.config import MoEPhase +from fastdeploy.model_executor.layers.moe.ep import DeepEPEngineBase, EPRunner +from fastdeploy.utils import singleton + + +@singleton +class DeepEPEngine(DeepEPEngineBase): + """ + A wrapper class for DeepEP engine. + """ + + def __init__( + self, + num_max_dispatch_tokens_per_rank: int, + hidden: int, + num_experts: int, + ep_size: int, + ep_rank: int, + splitwise_role: str, + moe_phase: MoEPhase, + async_finish: bool = False, + group=None, + ): + """ + Initialize the DeepEP engine. + Args: + group: The MPI group object. + ep_size: The number of ranks. + rank_id: The rank id. + num_max_dispatch_tokens_per_rank: The maximum number of tokens per rank to dispatch. + hidden: The hidden dimension of the model. + num_experts: The number of experts. + """ + super().__init__( + num_max_dispatch_tokens_per_rank, + hidden, + num_experts, + ep_size, + ep_rank, + splitwise_role, + moe_phase, + async_finish, + group, + ) + + def init_deepep_engine(self): + if self.splitwise_role == "mixed" or self.moe_phase.phase == "prefill": + self.deepep_engine = deep_ep.Buffer( + self.group, + int(1e9), + 0, + num_experts=self.num_experts, + low_latency_mode=False, + num_qps_per_rank=1, + ) + elif self.moe_phase.phase == "decode": + logger.info("Initializing Low Latency Buffer") + self.get_low_latency_buffer() + else: + raise ValueError(f"Unknown generation phase {self.moe_phase}") + + def get_low_latency_buffer(self): + """ + Get the DeepEP buffer. + Args: + group: The MPI group object. + num_max_dispatch_tokens_per_rank: The maximum number of tokens per rank to dispatch. + hidden: The hidden dimension of the model. + """ + # NOTES: the low-latency mode will consume much more space than the normal mode + # So we recommend that `num_max_dispatch_tokens_per_rank` + # (the actual batch size in the decoding engine) should be less than 256 + num_rdma_bytes = deep_ep.Buffer.get_low_latency_rdma_size_hint( + self.num_max_dispatch_tokens_per_rank, + self.hidden, + self.ep_size, + self.num_experts, + ) + # Allocate a buffer if not existed or not enough buffer size + if ( + self.deepep_engine is None + or self.deepep_engine.group != self.group + or not self.deepep_engine.low_latency_mode + or self.deepep_engine.num_rdma_bytes < num_rdma_bytes + ): + # NOTES: for best performance, the QP number **must** be equal to the number of the local experts + assert self.num_experts % self.ep_size == 0 + self.deepep_engine = deep_ep.Buffer( + self.group, + 0, + num_rdma_bytes, + self.num_experts, + low_latency_mode=True, + num_qps_per_rank=self.num_experts // self.num_ranks, + ) + + def low_latency_dispatch( + self, + hidden_states: paddle.Tensor, + topk_idx: paddle.Tensor, + expertwise_scale, + use_fp8: bool = False, + ): + """ + Args: + hidden_states: [token_num, hidden] 'bfloat16/int8' + topk_idx: [token_num, num_topk] 'int64' + + Returns: + recv_hidden_states: [num_local_experts, + num_max_dispatch_tokens_per_rank * ep_size, hidden] + ep_size * num_local_experts = num_experts + recv_count: [num_local_experts] + recv_count: a tensor shaped `[num_local_experts]` with type `torch.int`, indicating how many tokens each + expert receive. As mentioned before, all not tokens are valid in `recv_x`. + handle: the communication handle to be used in the `low_latency_combine` function. + event: the event after executing the kernel (valid only if `async_finish` is set). + hook: the receiving hook function (valid only if `return_recv_hook` is set). + """ + moe_in_w4a8_scale = None + ( + packed_recv_x, + recv_expert_count, + handle, + dispatch_hook, + valid_token_num, + ) = self.deepep_engine.low_latency_dispatch( + hidden_states, + moe_in_w4a8_scale, + topk_idx, + self.num_max_dispatch_tokens_per_rank, + self.num_experts, + use_fp8=use_fp8, + async_finish=False, + return_recv_hook=True, + ) + + return packed_recv_x, recv_expert_count, handle, dispatch_hook, valid_token_num + + def low_latency_combine( + self, + hidden_states: paddle.Tensor, + topk_idx: paddle.Tensor, + topk_weights: paddle.Tensor, + handle, + ): + """ + + Return: + combined_hidden_states: [num_tokens, hidden] + """ + combined_hidden_states, combine_hook = self.deepep_engine.low_latency_combine( + hidden_states, + topk_idx, + topk_weights, + handle, + async_finish=False, + return_recv_hook=True, + ) + return combined_hidden_states, combine_hook + + def clean_low_latency_buffer(self): + """ + clean_low_latency_buffer + """ + pass + + def barrier_all(self): + """ + barrier_all + """ + self.deepep_engine.barrier_all() + + +class XPUEPRunner(EPRunner): + """ + EPRunnerBase + """ + + def __init__( + self, + top_k: int, + hidden: int, + num_experts: int, + splitwise_role: str, + moe_phase: MoEPhase, + num_max_dispatch_tokens_per_rank: int = 1, + ep_size: int = 1, + ep_rank: int = 0, + redundant_experts_num: int = 0, + ep_group=None, + ): + super().__init__( + top_k, + hidden, + num_experts, + splitwise_role, + moe_phase, + num_max_dispatch_tokens_per_rank, + ep_size, + ep_rank, + redundant_experts_num, + ep_group, + ) + + def init_ep_engine(self): + self.ep_engine = DeepEPEngine( + num_max_dispatch_tokens_per_rank=self.num_max_dispatch_tokens_per_rank, + hidden=self.hidden, + num_experts=self.num_experts + self.redundant_experts_num, + ep_size=self.ep_size, + ep_rank=self.ep_rank, + splitwise_role=self.splitwise_role, + moe_phase=self.moe_phase, + group=self.ep_group, + ) + + def moe_select(self, layer: nn.Layer, gate_out: paddle.Tensor): + """ + moe_select + """ + if layer.redundant_table_manger is not None: + ( + ep_rank_to_expert_id_list, + expert_id_to_ep_rank_array, + expert_in_rank_num_list, + tokens_per_expert_stats_list, + ) = layer.redundant_table_manger.get_ep_rank_to_expert_id_list_by_layer(layer.layer_idx) + + topk_idx, topk_weights = fastdeploy.model_executor.ops.xpu.moe_redundant_topk_select( + gating_logits=gate_out, + expert_id_to_ep_rank_array=expert_id_to_ep_rank_array, + expert_in_rank_num_list=expert_in_rank_num_list, + tokens_per_expert_stats_list=tokens_per_expert_stats_list, + bias=layer.gate_correction_bias, + moe_topk=self.top_k, + apply_norm_weight=True, # apply_norm_weight + enable_softmax_top_k_fused=False, + redundant_ep_rank_num_plus_one=layer.fd_config.model_config.redundant_experts_num + 1, + ) + else: + topk_idx, topk_weights = fastdeploy.model_executor.ops.xpu.moe_topk_select( + gate_out, + layer.gate_correction_bias, + self.top_k, + True, # apply_norm_weight, + ) + return topk_idx, topk_weights + + @abstractmethod + def dispatch(self, *args, **kwargs): + """ + dispatch + """ + raise NotImplementedError + + @abstractmethod + def combine(self, *args, **kwargs): + """ + combine + """ + raise NotImplementedError + + +class XPUEPPrefillRunner(XPUEPRunner): + """ + EPPrefillRunner + """ + + def __init__( + self, + top_k: int, + hidden: int, + num_experts: int, + splitwise_role: str, + num_max_dispatch_tokens_per_rank: int, + ep_size: int = 1, + ep_rank: int = 0, + redundant_experts_num: int = 0, + ep_group=None, + moe_phase: MoEPhase = MoEPhase("prefill"), + ): + super().__init__( + top_k, + hidden, + num_experts, + splitwise_role, + moe_phase, + num_max_dispatch_tokens_per_rank=num_max_dispatch_tokens_per_rank, + ep_size=ep_size, + ep_rank=ep_rank, + redundant_experts_num=redundant_experts_num, + ep_group=ep_group, + ) + + def dispatch( + self, + x: paddle.Tensor, + topk_idx: paddle.Tensor, + topk_weights: paddle.Tensor, + *args, + **kwargs, + ): + self.num_combined_tokens = x.shape[0] + x_scale_tensor = kwargs.get("x_scale_tensor", None) + dispatch_args = { + "x": (x, x_scale_tensor) if x_scale_tensor is not None else x, + "topk_idx": topk_idx, + "topk_weights": topk_weights, + } + return self.ep_engine.deepep_engine.dispatch(**dispatch_args) + + def combine( + self, + tmp_ffn_out: paddle.Tensor, + handle: tuple, + recv_topk_weights: paddle.Tensor, + ): + combine_args = { + "x": tmp_ffn_out, + "topk_weights": recv_topk_weights, + "num_combined_tokens": self.num_combined_tokens, + } + fused_moe_out, _, _ = self.ep_engine.deepep_engine.combine(**combine_args) + + return fused_moe_out + + +class XPUEPDecoderRunner(XPUEPRunner): + """ + EPDecoderRunner + """ + + def __init__( + self, + top_k: int, + hidden: int, + num_experts: int, + splitwise_role: str, + num_max_dispatch_tokens_per_rank: int, + ep_size: int = 1, + ep_rank: int = 0, + redundant_experts_num: int = 0, + ep_group=None, + moe_phase: MoEPhase = MoEPhase("decode"), + ): + super().__init__( + top_k, + hidden, + num_experts, + splitwise_role, + moe_phase, + num_max_dispatch_tokens_per_rank, + ep_size=ep_size, + ep_rank=ep_rank, + redundant_experts_num=redundant_experts_num, + ep_group=ep_group, + ) + + def dispatch( + self, + x: paddle.Tensor, + topk_idx: paddle.Tensor, + topk_weights: paddle.Tensor, + *args, + **kwargs, + ): + expertwise_scale = kwargs.get("expertwise_scale", None) + use_fp8 = expertwise_scale is not None + + ( + recv_hidden_states, + recv_expert_count, + handle, + dispatch_hook, + valid_token_num, + ) = self.ep_engine.low_latency_dispatch(x, topk_idx, expertwise_scale, use_fp8) + # no need to call dispatch_hook here, because it has already been done in xDeepEP + # if dispatch_hook is not None: + # dispatch_hook() + + return recv_hidden_states, recv_expert_count, handle, valid_token_num + + def combine(self, ffn_out, topk_idx, topk_weights, handle): + combined_hidden_states, combine_hook = self.ep_engine.low_latency_combine( + ffn_out, topk_idx, topk_weights, handle + ) + if combine_hook is not None: + combine_hook() + + return combined_hidden_states diff --git a/fastdeploy/model_executor/layers/backends/xpu/moe/fused_moe.py b/fastdeploy/model_executor/layers/backends/xpu/moe/fused_moe.py new file mode 100644 index 0000000000..3c571697fc --- /dev/null +++ b/fastdeploy/model_executor/layers/backends/xpu/moe/fused_moe.py @@ -0,0 +1,550 @@ +""" +# Copyright (c) 2024 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" + +import paddle +from paddle import nn + +from fastdeploy.model_executor.layers.moe.fused_moe_backend_base import ( + UnquantizedFusedMoEMethod, +) +from fastdeploy.model_executor.layers.quantization.quant_base import QuantMethodBase +from fastdeploy.model_executor.layers.quantization.weight_only import WeightOnlyConfig +from fastdeploy.model_executor.ops.xpu import ( + ep_moe_expert_combine, + ep_moe_expert_dispatch, + moe_expert_ffn, + weight_quantize_xpu, +) + + +class XPUMoEMethod(UnquantizedFusedMoEMethod): + """ + XPU MOE + """ + + def process_loaded_weights(self, layer: nn.Layer, state_dict): + up_gate_proj_weights, down_proj_weights, _, _ = layer.extract_moe_ffn_weights(state_dict) + for weights in [up_gate_proj_weights, down_proj_weights]: + for idx, weight in enumerate(weights): + weights[idx] = weight.transpose([1, 0]) + stacked_up_gate_proj_weights = paddle.stack(up_gate_proj_weights, axis=0) + stacked_down_proj_weights = paddle.stack(down_proj_weights, axis=0) + + layer.up_gate_proj_weight.set_value(stacked_up_gate_proj_weights) + layer.down_proj_weight.set_value(stacked_down_proj_weights) + + def apply_tp( + self, + layer: nn.Layer, + x: paddle.Tensor, + gate: nn.Layer, + ) -> paddle.Tensor: + """ + Paddle Cutlass compute Fused MoE. + """ + from fastdeploy.model_executor.ops.xpu import xpu_moe_layer + + fused_moe_out = xpu_moe_layer( + x, + gate.weight.transpose([1, 0]), + layer.gate_correction_bias, + layer.up_gate_proj_weight, + layer.down_proj_weight, + None, # up_gate_proj bias + None, # down_proj bias + None, # up_gate_proj scale + None, # down_proj scale + None, # up_gate_proj_in_scale + "", # moe_quant_type + layer.top_k, + False, # moe group, used in deepseek + ) + if layer.tp_size > 1: + from fastdeploy.distributed.communication import ( + tensor_model_parallel_all_reduce, + ) + + tensor_model_parallel_all_reduce(fused_moe_out) + + return fused_moe_out + + def apply_ep_prefill( + self, + layer: nn.Layer, + x: paddle.Tensor, + gate: nn.Layer, + ) -> paddle.Tensor: + """ + Apply the EP prefill method. + """ + raise NotImplementedError + + def apply_ep_decode( + self, + layer: nn.Layer, + x: paddle.Tensor, + gate: nn.Layer, + ) -> paddle.Tensor: + """ + Apply the EP decoder method. + """ + raise NotImplementedError + + +class XPUWeightOnlyMoEMethod(QuantMethodBase): + """ + XPU Fused MoE Method. + """ + + def __init__( + self, + quant_config: WeightOnlyConfig, + ) -> None: + super().__init__() + self.quant_config = quant_config + self.moe_quant_type = self.quant_config.algo + self.added_weight_attrs = ["up_gate_proj_weight", "down_proj_weight"] + self.added_scale_attrs = [ + "up_gate_proj_weight_scale", + "down_proj_weight_scale", + ] + + def create_weights(self, layer: nn.Layer, **extra_weight_attrs): + """ + Paddle cutlass create weight process. + """ + self.default_dtype = "float32" + self.weight_dtype = "int8" + + if self.moe_quant_type in ["weight_only_int4", "w4a8"]: + self.up_gate_proj_weight_shape = [ + layer.num_local_experts, + layer.moe_intermediate_size * 2, + layer.hidden_size // 2, + ] + else: + self.up_gate_proj_weight_shape = [ + layer.num_local_experts, + layer.moe_intermediate_size * 2, + layer.hidden_size, + ] + if self.moe_quant_type in ["weight_only_int4", "w4a8"]: + self.down_proj_weight_shape = [ + layer.num_local_experts, + layer.hidden_size, + layer.moe_intermediate_size // 2, + ] + else: + self.down_proj_weight_shape = [ + layer.num_local_experts, + layer.hidden_size, + layer.moe_intermediate_size, + ] + + setattr( + layer, + self.added_weight_attrs[0], + layer.create_parameter( + shape=self.up_gate_proj_weight_shape, + dtype=self.weight_dtype, + default_initializer=paddle.nn.initializer.Constant(0), + ), + ) + setattr( + layer, + self.added_weight_attrs[1], + layer.create_parameter( + shape=self.down_proj_weight_shape, + dtype=self.weight_dtype, + default_initializer=paddle.nn.initializer.Constant(0), + ), + ) + # weight_scale + setattr( + layer, + self.added_scale_attrs[0], + layer.create_parameter( + shape=[layer.num_local_experts, layer.moe_intermediate_size * 2], + dtype=self.default_dtype, + default_initializer=paddle.nn.initializer.Constant(0), + ), + ) + setattr( + layer, + self.added_scale_attrs[1], + layer.create_parameter( + shape=[layer.num_local_experts, layer.hidden_size], + dtype=self.default_dtype, + default_initializer=paddle.nn.initializer.Constant(0), + ), + ) + + def process_loaded_weights(self, layer: nn.Layer, state_dict): + """ + Paddle xpu load weight process. + """ + up_gate_proj_weights, down_proj_weights, _, _ = layer.extract_moe_ffn_weights(state_dict) + assert len(up_gate_proj_weights) == layer.num_local_experts + assert len(down_proj_weights) == layer.num_local_experts + assert up_gate_proj_weights[0].shape == [ + layer.hidden_size, + layer.moe_intermediate_size * 2, + ] + assert down_proj_weights[0].shape == [ + layer.moe_intermediate_size, + layer.hidden_size, + ] + + for idx, weight_tensor in enumerate([up_gate_proj_weights, down_proj_weights]): + weight_name = self.added_weight_attrs[idx] + scale_name = self.added_scale_attrs[idx] + + weight_list = [] + weight_scale_list = [] + for i in range(layer.num_local_experts): + quant_weight, scale = weight_quantize_xpu( + weight_tensor[i], self.moe_quant_type, -1, -1 + ) # weight is [k,n] + weight_list.append(quant_weight.transpose([1, 0])) # transpose weight to [n,k] + weight_scale_list.append(scale) + quanted_weight = paddle.stack(weight_list, axis=0) + getattr(layer, weight_name).set_value(quanted_weight) + + quanted_weight_scale = paddle.stack(weight_scale_list, axis=0) + getattr(layer, scale_name).set_value(quanted_weight_scale) + + def apply( + self, + layer: nn.Layer, + x: paddle.Tensor, + gate: nn.Layer, + ) -> paddle.Tensor: + """ + XPU compute Fused MoE. + """ + from fastdeploy.model_executor.ops.xpu import xpu_moe_layer + + fused_moe_out = xpu_moe_layer( + x, + gate.weight.transpose([1, 0]), + layer.gate_correction_bias, + layer.up_gate_proj_weight, + layer.down_proj_weight, + None, # up_gate_proj bias + None, # down_proj bias + (layer.up_gate_proj_weight_scale if hasattr(layer, "up_gate_proj_weight_scale") else None), + (layer.down_proj_weight_scale if hasattr(layer, "down_proj_weight_scale") else None), + (layer.down_proj_in_scale if hasattr(layer, "down_proj_in_scale") else None), + self.moe_quant_type, + layer.top_k, + False, # moe group, used in deepseek + ) + if layer.tp_size > 1: + from fastdeploy.distributed.communication import ( + tensor_model_parallel_all_reduce, + ) + + tensor_model_parallel_all_reduce(fused_moe_out) + + return fused_moe_out + + +class XPUWeightOnlyMoeEpMethod(XPUMoEMethod): + """ + XPU Fused MoE EP Method. + """ + + def __init__( + self, + quant_config: WeightOnlyConfig, + ) -> None: + super().__init__(quant_config) + self.moe_quant_type = self.quant_config.algo + self.weight_dtype = "int8" + self.scale_dtype = "float32" + + def import_backend_ep_runner(self) -> None: + from .ep import XPUEPDecoderRunner, XPUEPPrefillRunner + + self.EPPrefillRunner = XPUEPPrefillRunner + self.EPDecoderRunner = XPUEPDecoderRunner + + def create_weights(self, layer: nn.Layer, **extra_weight_attrs): + """ + create weight process. + """ + if self.moe_quant_type in ["weight_only_int8"]: + self.up_gate_proj_weight_shape = [ + layer.num_local_experts, + layer.moe_intermediate_size * 2, + layer.hidden_size, + ] + self.down_proj_weight_shape = [ + layer.num_local_experts, + layer.hidden_size, + layer.moe_intermediate_size, + ] + elif self.moe_quant_type in ["weight_only_int4"]: + self.up_gate_proj_weight_shape = [ + layer.num_local_experts, + layer.moe_intermediate_size * 2, + layer.hidden_size // 2, + ] + self.down_proj_weight_shape = [ + layer.num_local_experts, + layer.hidden_size, + layer.moe_intermediate // 2, + ] + else: + raise ValueError(f"Unsupported moe quant type: {self.moe_quant_type}") + + self.up_gate_proj_scale_shape = [ + layer.num_local_experts, + layer.moe_intermediate_size * 2, + ] + self.down_proj_scale_shape = [ + layer.num_local_experts, + layer.hidden_size, + ] + + setattr( + layer, + self.added_weight_attrs[0], + layer.create_parameter( + shape=self.up_gate_proj_weight_shape, + dtype=self.weight_dtype, + default_initializer=paddle.nn.initializer.Constant(0), + ), + ) + setattr( + layer, + self.added_weight_attrs[1], + layer.create_parameter( + shape=self.down_proj_weight_shape, + dtype=self.weight_dtype, + default_initializer=paddle.nn.initializer.Constant(0), + ), + ) + setattr( + layer, + self.added_scale_attrs[0], + layer.create_parameter( + shape=self.up_gate_proj_scale_shape, + dtype=self.scale_dtype, + default_initializer=paddle.nn.initializer.Constant(0), + ), + ) + setattr( + layer, + self.added_scale_attrs[1], + layer.create_parameter( + shape=self.down_proj_scale_shape, + dtype=self.scale_dtype, + default_initializer=paddle.nn.initializer.Constant(0), + ), + ) + + def process_loaded_weights(self, layer: nn.Layer, state_dict): + """ + Paddle xpu load weight process. + """ + up_gate_proj_weights, down_proj_weights, _, _ = layer.extract_moe_ffn_weights(state_dict) + assert len(up_gate_proj_weights) == layer.num_local_experts + assert len(down_proj_weights) == layer.num_local_experts + assert up_gate_proj_weights[0].shape == [ + layer.hidden_size, + layer.moe_intermediate_size * 2, + ] + assert down_proj_weights[0].shape == [ + layer.moe_intermediate_size, + layer.hidden_size, + ] + + for idx, weight_tensor in enumerate([up_gate_proj_weights, down_proj_weights]): + weight_name = self.added_weight_attrs[idx] + scale_name = self.added_scale_attrs[idx] + + weight_list = [] + weight_scale_list = [] + for i in range(layer.num_local_experts): + quant_weight, scale = weight_quantize_xpu( + weight_tensor[i], self.moe_quant_type, -1, -1 + ) # quant_weight is [k,n] + # transpose quant_weight to [n,k] + weight_list.append(quant_weight.transpose([1, 0])) + weight_scale_list.append(scale) + + quanted_weight = paddle.stack(weight_list, axis=0) + getattr(layer, weight_name).set_value(quanted_weight) + quanted_weight_scale = paddle.stack(weight_scale_list, axis=0) + getattr(layer, scale_name).set_value(quanted_weight_scale) + + def apply_ep_prefill( + self, + layer: nn.Layer, + x: paddle.Tensor, + gate: nn.Layer, + ) -> paddle.Tensor: + """ + Apply the EP prefill method. + """ + gate_out = gate(x.cast("float32")) + # 1. Select topk experts and weights + topk_idx, topk_weights = self.ep_prefill_runner.moe_select(layer, gate_out) + # 2. Dynamic compute blockwise quantization scales + # x, x_scale_tensor = fastdeploy.model_executor.ops.xpu.per_token_quant(x) + x_scale_tensor = None + # 3. EP Dispatch + ( + recv_x, + recv_x_scales, + recv_topk_idx, + recv_topk_weights, + recv_num_tokens_per_expert_list, + _, + ) = self.ep_prefill_runner.dispatch(x, topk_idx, topk_weights, x_scale_tensor=x_scale_tensor) + + token_num_per_expert = recv_num_tokens_per_expert_list.numpy().tolist() + token_all_num = sum(token_num_per_expert) + + # 4. Compute ffn + if token_all_num > 0: + moe_dispatch_scale = None + ( + permute_input, + permute_indices_per_token, + token_num_lod, + dst_weights, + ffn1_act_scale_per_token, + ) = ep_moe_expert_dispatch( + recv_x, + recv_topk_idx, + recv_topk_weights, + moe_dispatch_scale, + token_num_per_expert, + token_all_num, + self.moe_quant_type, + ) + + moe_ffn1_scale = None + moe_ffn2_scale = None + ffn_out = moe_expert_ffn( + permute_input, + token_num_lod, + getattr(layer, self.added_weight_attrs[0]), + getattr(layer, self.added_weight_attrs[1]), + None, + None, + moe_ffn1_scale, + moe_ffn2_scale, + getattr(layer, self.added_scale_attrs[0]), + getattr(layer, self.added_scale_attrs[1]), + None, + None, + self.moe_quant_type, + -1, + token_all_num, + ) + + # prmt back per rank + recv_topk_weights_bf16 = recv_topk_weights.astype("bfloat16") + tmp_ffn_out = ep_moe_expert_combine( + ffn_out, + permute_indices_per_token, + recv_topk_weights_bf16, + permute_indices_per_token.shape[0], + ffn_out.shape[0], + ffn_out.shape[1], + permute_indices_per_token.shape[1], + ) + + else: + tmp_ffn_out = paddle.empty(recv_x.shape, "bfloat16") + + # 5. EP combine + handle = None + return self.ep_prefill_runner.combine(tmp_ffn_out, handle, recv_topk_weights) + + def compute_ffn( + self, + layer: nn.Layer, + permute_input, + token_nums_per_expert, + valid_token_num=-1, + extra_ffn1_in_scale=None, + ): + """ + Calculate moe + """ + # ffn1_in_scale = extra_ffn1_in_scale + moe_ffn1_scale = None + moe_ffn2_scale = None + + ffn_out = moe_expert_ffn( + permute_input, + token_nums_per_expert, + getattr(layer, self.added_weight_attrs[0]), + getattr(layer, self.added_weight_attrs[1]), + None, + None, + moe_ffn1_scale, + moe_ffn2_scale, + getattr(layer, self.added_scale_attrs[0]), + getattr(layer, self.added_scale_attrs[1]), + None, + None, + self.moe_quant_type, + -1, + valid_token_num, + ) + return ffn_out + + def apply_ep_decode( + self, + layer: nn.Layer, + x: paddle.Tensor, + gate: nn.Layer, + ) -> paddle.Tensor: + """ + Apply the EP decoder method. + """ + gate_out = gate(x.cast("float32")) + + # 1. Select topk experts and weights + topk_idx, topk_weights = self.ep_decoder_runner.moe_select(layer, gate_out) + + # 2. EP Dispatch + expertwise_scale = None + use_fp8 = False + ( + permute_input, + token_nums_per_expert, + handle, + valid_token_num, + ) = self.ep_decoder_runner.dispatch( + x, topk_idx, topk_weights, expertwise_scale=expertwise_scale, use_fp8=use_fp8 + ) + + # 3. Compute ffn + ffn_out = self.compute_ffn( + layer, + permute_input, + token_nums_per_expert, + valid_token_num, + ) + + # 4. EP combine + return self.ep_decoder_runner.combine(ffn_out, topk_idx, topk_weights, handle) diff --git a/fastdeploy/model_executor/layers/backends/xpu/quantization/weight_only.py b/fastdeploy/model_executor/layers/backends/xpu/quantization/weight_only.py index b010f958f0..fa95561d4d 100644 --- a/fastdeploy/model_executor/layers/backends/xpu/quantization/weight_only.py +++ b/fastdeploy/model_executor/layers/backends/xpu/quantization/weight_only.py @@ -34,6 +34,7 @@ def __init__( quant_config: WeightOnlyConfig, ) -> None: super().__init__(quant_config) + self.quant_config.weight_only_linear_arch = -1 def create_weights(self, layer: nn.Layer, **extra_weight_attrs) -> None: """ @@ -61,6 +62,17 @@ def process_loaded_weights(self, layer: nn.Layer, weight: paddle.Tensor) -> None """ loaded_weights using xpu special quantization """ - quanted_weight_tensor, weight_scale_tensor = weight_quantize_xpu(weight, self.quant_config.algo, -1, -1) + k, n = weight.shape + quanted_weight_tensors = [] + weight_scale_tensors = [] + offset = 30720 + for i in range(0, n, offset): + end_n = min(i + offset, n) + weight_i = weight[:, i:end_n] + quanted_weight_tensor, weight_scale_tensor = weight_quantize_xpu(weight_i, self.quant_config.algo, -1, -1) + quanted_weight_tensors.append(quanted_weight_tensor) + weight_scale_tensors.append(weight_scale_tensor) + quanted_weight_tensor = paddle.concat(quanted_weight_tensors, axis=1) + weight_scale_tensor = paddle.concat(weight_scale_tensors, axis=0) layer.weight.set_value(paddle.transpose(quanted_weight_tensor, [1, 0])) layer.weight_scale.set_value(weight_scale_tensor) diff --git a/fastdeploy/model_executor/layers/moe/ep.py b/fastdeploy/model_executor/layers/moe/ep.py index 7b3bcecadd..43101508d0 100644 --- a/fastdeploy/model_executor/layers/moe/ep.py +++ b/fastdeploy/model_executor/layers/moe/ep.py @@ -20,11 +20,13 @@ from paddle import nn from paddleformers.utils.log import logger -try: - from paddle.distributed.communication import deep_ep -except: - logger.warning("import deep_ep Failed!") +from fastdeploy.platforms import current_platform +if current_platform.is_cuda(): + try: + from paddle.distributed.communication import deep_ep + except: + logger.warning("import deep_ep Failed!") import fastdeploy from fastdeploy.config import MoEPhase @@ -32,8 +34,7 @@ from fastdeploy.utils import singleton -@singleton -class DeepEPEngine: +class DeepEPEngineBase: """ A wrapper class for DeepEP engine. """ @@ -60,28 +61,76 @@ def __init__( hidden: The hidden dimension of the model. num_experts: The number of experts. """ + self.num_max_dispatch_tokens_per_rank = num_max_dispatch_tokens_per_rank + self.hidden = hidden + self.num_experts = num_experts + self.ep_size = ep_size + self.rank_id = ep_rank + self.splitwise_role = splitwise_role + self.moe_phase = moe_phase + self.async_finish = async_finish # TODO(@wufeisheng): Support configurable EP size​ if group is None: group = paddle.distributed.new_group(range(ep_size)) self.group = group - self.ep_size = ep_size - self.rank_id = ep_rank - self.hidden = hidden - self.num_experts = num_experts self.num_local_experts = num_experts // ep_size - self.async_finish = async_finish - self.deepep_engine = None + self.init_deepep_engine() + + @abstractmethod + def init_deepep_engine(self): + raise NotImplementedError + + +@singleton +class DeepEPEngine(DeepEPEngineBase): + """ + A wrapper class for DeepEP engine. + """ + + def __init__( + self, + num_max_dispatch_tokens_per_rank: int, + hidden: int, + num_experts: int, + ep_size: int, + ep_rank: int, + splitwise_role: str, + moe_phase: MoEPhase, + async_finish: bool = False, + group=None, + ): + """ + Initialize the DeepEP engine. + Args: + group: The MPI group object. + ep_size: The number of ranks. + rank_id: The rank id. + num_max_dispatch_tokens_per_rank: The maximum number of tokens per rank to dispatch. + hidden: The hidden dimension of the model. + num_experts: The number of experts. + """ + super().__init__( + num_max_dispatch_tokens_per_rank, + hidden, + num_experts, + ep_size, + ep_rank, + splitwise_role, + moe_phase, + async_finish, + group, + ) + def init_deepep_engine(self): from paddle.base.core import Config self.ep_config = Config(24, 6, 256) - self.num_max_dispatch_tokens_per_rank = num_max_dispatch_tokens_per_rank # In mixed EP mode on a single node, we dynamically switch between # high throughput and low latency modes. - if splitwise_role == "mixed": + if self.splitwise_role == "mixed": self.deepep_engine = deep_ep.Buffer( self.group, int(2e9), @@ -92,10 +141,10 @@ def __init__( # In disaggregated mode on multiple nodes, we either use # high throughput mode or low latency mode. else: - if moe_phase.phase == "decode": + if self.moe_phase.phase == "decode": logger.info("Initializing Low Latency Buffer") self.get_low_latency_buffer() - elif moe_phase.phase == "prefill": + elif self.moe_phase.phase == "prefill": self.deepep_engine = deep_ep.Buffer( self.group, int(5e8), @@ -104,7 +153,7 @@ def __init__( num_qps_per_rank=1, ) else: - raise ValueError(f"Unknown generation phase {moe_phase}") + raise ValueError(f"Unknown generation phase {self.moe_phase}") def get_low_latency_buffer(self): """ @@ -255,17 +304,27 @@ def __init__( ep_group=None, ): self.top_k = top_k + self.hidden = hidden self.num_experts = num_experts + self.splitwise_role = splitwise_role + self.moe_phase = moe_phase + self.num_max_dispatch_tokens_per_rank = num_max_dispatch_tokens_per_rank + self.ep_size = ep_size + self.ep_rank = ep_rank self.redundant_experts_num = redundant_experts_num + self.ep_group = ep_group + self.init_ep_engine() + + def init_ep_engine(self): self.ep_engine = DeepEPEngine( - num_max_dispatch_tokens_per_rank=num_max_dispatch_tokens_per_rank, - hidden=hidden, - num_experts=num_experts + redundant_experts_num, - ep_size=ep_size, - ep_rank=ep_rank, - splitwise_role=splitwise_role, - moe_phase=moe_phase, - group=ep_group, + num_max_dispatch_tokens_per_rank=self.num_max_dispatch_tokens_per_rank, + hidden=self.hidden, + num_experts=self.num_experts + self.redundant_experts_num, + ep_size=self.ep_size, + ep_rank=self.ep_rank, + splitwise_role=self.splitwise_role, + moe_phase=self.moe_phase, + group=self.ep_group, ) def moe_select(self, layer: nn.Layer, gate_out: paddle.Tensor): diff --git a/fastdeploy/model_executor/layers/moe/fused_moe_backend_base.py b/fastdeploy/model_executor/layers/moe/fused_moe_backend_base.py index 2f6dff80d6..1ca102448b 100644 --- a/fastdeploy/model_executor/layers/moe/fused_moe_backend_base.py +++ b/fastdeploy/model_executor/layers/moe/fused_moe_backend_base.py @@ -41,15 +41,20 @@ def __init__(self, quant_config): ] self.pack_num = 1 + def import_backend_ep_runner(self) -> None: + from .ep import EPDecoderRunner, EPPrefillRunner + + self.EPPrefillRunner = EPPrefillRunner + self.EPDecoderRunner = EPDecoderRunner + def init_ep(self, layer: nn.Layer) -> None: """ Init EP related module """ + self.import_backend_ep_runner() if layer.ep_size > 1: if layer.fd_config.parallel_config.splitwise_role == "mixed": - from .ep import EPDecoderRunner, EPPrefillRunner - - self.ep_prefill_runner = EPPrefillRunner( + self.ep_prefill_runner = self.EPPrefillRunner( layer.top_k, layer.hidden_size, layer.num_experts, @@ -60,7 +65,7 @@ def init_ep(self, layer: nn.Layer) -> None: layer.fd_config.model_config.redundant_experts_num, ep_group=layer.fd_config.parallel_config.ep_group, ) - self.ep_decoder_runner = EPDecoderRunner( + self.ep_decoder_runner = self.EPDecoderRunner( layer.top_k, layer.hidden_size, layer.num_experts, @@ -73,9 +78,7 @@ def init_ep(self, layer: nn.Layer) -> None: ) else: if layer.fd_config.parallel_config.moe_phase.phase == "prefill": - from .ep import EPPrefillRunner - - self.ep_prefill_runner = EPPrefillRunner( + self.ep_prefill_runner = self.EPPrefillRunner( layer.top_k, layer.hidden_size, layer.num_experts, @@ -87,9 +90,7 @@ def init_ep(self, layer: nn.Layer) -> None: ep_group=layer.fd_config.parallel_config.ep_group, ) else: - from .ep import EPDecoderRunner - - self.ep_decoder_runner = EPDecoderRunner( + self.ep_decoder_runner = self.EPDecoderRunner( layer.top_k, layer.hidden_size, layer.num_experts, diff --git a/fastdeploy/model_executor/layers/moe/fused_moe_xpu_backend.py b/fastdeploy/model_executor/layers/moe/fused_moe_xpu_backend.py deleted file mode 100644 index b83cce96d2..0000000000 --- a/fastdeploy/model_executor/layers/moe/fused_moe_xpu_backend.py +++ /dev/null @@ -1,258 +0,0 @@ -""" -# Copyright (c) 2024 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -""" - -import paddle -from paddle import nn - -from fastdeploy.model_executor.layers.moe.fused_moe_backend_base import ( - UnquantizedFusedMoEMethod, -) -from fastdeploy.model_executor.layers.quantization.quant_base import QuantMethodBase -from fastdeploy.model_executor.layers.quantization.weight_only import WeightOnlyConfig -from fastdeploy.model_executor.ops.xpu import weight_quantize_xpu - - -class XPUMoEMethod(UnquantizedFusedMoEMethod): - """ - XPU MOE - """ - - def process_loaded_weights(self, layer: nn.Layer, state_dict): - - up_gate_proj_weights, down_proj_weights, _, _ = layer.extract_moe_ffn_weights(state_dict) - for weights in [up_gate_proj_weights, down_proj_weights]: - for idx, weight in enumerate(weights): - weights[idx] = weight.transpose([1, 0]) - stacked_up_gate_proj_weights = paddle.stack(up_gate_proj_weights, axis=0) - stacked_down_proj_weights = paddle.stack(down_proj_weights, axis=0) - - layer.up_gate_proj_weight.set_value(stacked_up_gate_proj_weights) - layer.down_proj_weight.set_value(stacked_down_proj_weights) - - def apply_tp( - self, - layer: nn.Layer, - x: paddle.Tensor, - gate: nn.Layer, - ) -> paddle.Tensor: - """ - Paddle Cutlass compute Fused MoE. - """ - from fastdeploy.model_executor.ops.xpu import xpu_moe_layer - - fused_moe_out = xpu_moe_layer( - x, - gate.weight.transpose([1, 0]), - layer.gate_correction_bias, - layer.up_gate_proj_weight, - layer.down_proj_weight, - None, # up_gate_proj bias - None, # down_proj bias - None, # up_gate_proj scale - None, # down_proj scale - None, # up_gate_proj_in_scale - "", # moe_quant_type - layer.top_k, - False, # moe group, used in deepseek - ) - if layer.tp_size > 1: - from fastdeploy.distributed.communication import ( - tensor_model_parallel_all_reduce, - ) - - tensor_model_parallel_all_reduce(fused_moe_out) - - return fused_moe_out - - def apply_ep_prefill( - self, - layer: nn.Layer, - x: paddle.Tensor, - gate: nn.Layer, - ) -> paddle.Tensor: - """ - Apply the EP prefill method. - """ - raise NotImplementedError - - def apply_ep_decode( - self, - layer: nn.Layer, - x: paddle.Tensor, - gate: nn.Layer, - ) -> paddle.Tensor: - """ - Apply the EP decoder method. - """ - raise NotImplementedError - - -class XPUWeightOnlyMoEMethod(QuantMethodBase): - """ - XPU Fused MoE Method. - """ - - def __init__( - self, - quant_config: WeightOnlyConfig, - ) -> None: - super().__init__() - self.quant_config = quant_config - self.moe_quant_type = self.quant_config.algo - self.added_weight_attrs = ["up_gate_proj_weight", "down_proj_weight"] - self.added_scale_attrs = [ - "up_gate_proj_weight_scale", - "down_proj_weight_scale", - ] - - def create_weights(self, layer: nn.Layer, **extra_weight_attrs): - """ - Paddle cutlass create weight process. - """ - self.default_dtype = "float32" - self.weight_dtype = "int8" - - if self.moe_quant_type in ["weight_only_int4", "w4a8"]: - self.up_gate_proj_weight_shape = [ - layer.num_local_experts, - layer.moe_intermediate_size * 2, - layer.hidden_size // 2, - ] - else: - self.up_gate_proj_weight_shape = [ - layer.num_local_experts, - layer.moe_intermediate_size * 2, - layer.hidden_size, - ] - if self.moe_quant_type in ["weight_only_int4", "w4a8"]: - self.down_proj_weight_shape = [ - layer.num_local_experts, - layer.hidden_size, - layer.moe_intermediate_size // 2, - ] - else: - self.down_proj_weight_shape = [ - layer.num_local_experts, - layer.hidden_size, - layer.moe_intermediate_size, - ] - - setattr( - layer, - self.added_weight_attrs[0], - layer.create_parameter( - shape=self.up_gate_proj_weight_shape, - dtype=self.weight_dtype, - default_initializer=paddle.nn.initializer.Constant(0), - ), - ) - setattr( - layer, - self.added_weight_attrs[1], - layer.create_parameter( - shape=self.down_proj_weight_shape, - dtype=self.weight_dtype, - default_initializer=paddle.nn.initializer.Constant(0), - ), - ) - # weight_scale - setattr( - layer, - self.added_scale_attrs[0], - layer.create_parameter( - shape=[layer.num_local_experts, layer.moe_intermediate_size * 2], - dtype=self.default_dtype, - default_initializer=paddle.nn.initializer.Constant(0), - ), - ) - setattr( - layer, - self.added_scale_attrs[1], - layer.create_parameter( - shape=[layer.num_local_experts, layer.hidden_size], - dtype=self.default_dtype, - default_initializer=paddle.nn.initializer.Constant(0), - ), - ) - - def process_loaded_weights(self, layer: nn.Layer, state_dict): - """ - Paddle xpu load weight process. - """ - up_gate_proj_weights, down_proj_weights, _, _ = layer.extract_moe_ffn_weights(state_dict) - assert len(up_gate_proj_weights) == layer.num_local_experts - assert len(down_proj_weights) == layer.num_local_experts - assert up_gate_proj_weights[0].shape == [ - layer.hidden_size, - layer.moe_intermediate_size * 2, - ] - assert down_proj_weights[0].shape == [ - layer.moe_intermediate_size, - layer.hidden_size, - ] - - for idx, weight_tensor in enumerate([up_gate_proj_weights, down_proj_weights]): - weight_name = self.added_weight_attrs[idx] - scale_name = self.added_scale_attrs[idx] - - weight_list = [] - weight_scale_list = [] - for i in range(layer.num_local_experts): - quant_weight, scale = weight_quantize_xpu( - weight_tensor[i], self.moe_quant_type, -1, -1 - ) # weight is [k,n] - weight_list.append(quant_weight.transpose([1, 0])) # transpose weight to [n,k] - weight_scale_list.append(scale) - quanted_weight = paddle.stack(weight_list, axis=0) - getattr(layer, weight_name).set_value(quanted_weight) - - quanted_weight_scale = paddle.stack(weight_scale_list, axis=0) - getattr(layer, scale_name).set_value(quanted_weight_scale) - - def apply( - self, - layer: nn.Layer, - x: paddle.Tensor, - gate: nn.Layer, - ) -> paddle.Tensor: - """ - XPU compute Fused MoE. - """ - from fastdeploy.model_executor.ops.xpu import xpu_moe_layer - - fused_moe_out = xpu_moe_layer( - x, - gate.weight.transpose([1, 0]), - layer.gate_correction_bias, - layer.up_gate_proj_weight, - layer.down_proj_weight, - None, # up_gate_proj bias - None, # down_proj bias - (layer.up_gate_proj_weight_scale if hasattr(layer, "up_gate_proj_weight_scale") else None), - (layer.down_proj_weight_scale if hasattr(layer, "down_proj_weight_scale") else None), - (layer.down_proj_in_scale if hasattr(layer, "down_proj_in_scale") else None), - self.moe_quant_type, - layer.top_k, - False, # moe group, used in deepseek - ) - if layer.tp_size > 1: - from fastdeploy.distributed.communication import ( - tensor_model_parallel_all_reduce, - ) - - tensor_model_parallel_all_reduce(fused_moe_out) - - return fused_moe_out diff --git a/fastdeploy/model_executor/layers/moe/moe.py b/fastdeploy/model_executor/layers/moe/moe.py index 0e9a51cab4..d7b4876df3 100644 --- a/fastdeploy/model_executor/layers/moe/moe.py +++ b/fastdeploy/model_executor/layers/moe/moe.py @@ -43,7 +43,7 @@ def get_moe_method(): return CutlassMoEMethod(None) elif current_platform.is_xpu(): - from .fused_moe_xpu_backend import XPUMoEMethod + from fastdeploy.model_executor.layers.backends import XPUMoEMethod return XPUMoEMethod(None) elif current_platform.is_gcu(): diff --git a/fastdeploy/model_executor/layers/quantization/weight_only.py b/fastdeploy/model_executor/layers/quantization/weight_only.py index b8e929b4c9..ddf02adbb0 100644 --- a/fastdeploy/model_executor/layers/quantization/weight_only.py +++ b/fastdeploy/model_executor/layers/quantization/weight_only.py @@ -19,7 +19,7 @@ from typing import Optional import paddle -from paddle.nn.quant import weight_only_linear, weight_quantize +from paddle.nn.quant import weight_quantize from fastdeploy import envs from fastdeploy.model_executor.layers.linear import ( @@ -30,6 +30,13 @@ from fastdeploy.model_executor.utils import TensorTracker, free_tensor, set_weight_attrs from fastdeploy.platforms import current_platform +if current_platform.is_xpu(): + from fastdeploy.model_executor.ops.xpu import ( + weight_only_linear_xpu as weight_only_linear, + ) +else: + from paddle.nn.quant import weight_only_linear + from ..moe import FusedMoE from ..utils import get_tensor from .quant_base import QuantConfigBase, QuantMethodBase @@ -70,16 +77,24 @@ def from_config(cls, config: dict) -> "WeightOnlyConfig": def get_quant_method(self, layer) -> Optional[QuantMethodBase]: if current_platform.is_xpu(): - from fastdeploy.model_executor.layers.backends import ( - XPUWeightOnlyLinearMethod, - ) - from fastdeploy.model_executor.layers.moe.fused_moe_xpu_backend import ( - XPUWeightOnlyMoEMethod, - ) - if isinstance(layer, FusedMoE): - return XPUWeightOnlyMoEMethod(self) + if layer.ep_size > 1: + from fastdeploy.model_executor.layers.backends import ( + XPUWeightOnlyMoeEpMethod, + ) + + return XPUWeightOnlyMoeEpMethod(self) + else: + from fastdeploy.model_executor.layers.backends import ( + XPUWeightOnlyMoEMethod, + ) + + return XPUWeightOnlyMoEMethod(self) else: + from fastdeploy.model_executor.layers.backends import ( + XPUWeightOnlyLinearMethod, + ) + return XPUWeightOnlyLinearMethod(self) elif current_platform.is_gcu(): from fastdeploy.model_executor.layers.backends import ( diff --git a/fastdeploy/output/token_processor.py b/fastdeploy/output/token_processor.py index 8245e56571..09af37a0bc 100644 --- a/fastdeploy/output/token_processor.py +++ b/fastdeploy/output/token_processor.py @@ -146,7 +146,7 @@ def process_sampling_results(self): """ if current_platform.is_xpu(): - from fastdeploy.model_executor.ops.xpu import get_output + from fastdeploy.model_executor.ops.xpu import get_output, get_output_ep elif current_platform.is_iluvatar(): from fastdeploy.model_executor.ops.iluvatar import get_output elif current_platform.is_gcu(): diff --git a/fastdeploy/worker/xpu_model_runner.py b/fastdeploy/worker/xpu_model_runner.py index 09ec0ee1a3..b3bbf1fbfe 100644 --- a/fastdeploy/worker/xpu_model_runner.py +++ b/fastdeploy/worker/xpu_model_runner.py @@ -872,16 +872,36 @@ def _dummy_run( self._dummy_prefill_inputs(num_tokens, batch_size) while True: - self.execute_model(None, True) + self.execute_model(is_dummy_run=True) if int((self.share_inputs["seq_lens_this_time"] > 0).sum()) == 0: break + def _set_debug_level( + self, debug_level: int = 0x1, model_forward_batch: Optional[List[Request]] = None, is_dummy_run: bool = False + ) -> None: + """ + Set debug level for XPU: 0x1, 0xA1, 0x1B1 + """ + request_num = 0 if model_forward_batch is None else len(model_forward_batch) + if debug_level == 0 or request_num == 0 or is_dummy_run: + paddle.device.xpu.set_debug_level(0) + return + + if self.parallel_config.use_ep: + request_num = paddle.to_tensor(request_num, dtype="int32") + paddle.distributed.all_reduce(request_num, group=self.parallel_config.ep_group) + logger.info(f"local_rank: {self.local_rank}, request_num: {request_num.item()}") + if request_num.item() > 0: + paddle.device.xpu.set_debug_level(debug_level) + else: + paddle.device.xpu.set_debug_level(debug_level) + def execute_model( self, model_forward_batch: Optional[List[Request]] = None, - is_dummy_run: bool = False, num_running_requests: int = None, + is_dummy_run: bool = False, ) -> Optional[ModelRunnerOutput]: """ The Entrance of model execute. @@ -892,6 +912,9 @@ class at the server level, which is too granular for ModelRunner. num_running_requests: batch_size intermediate_tensors: """ + # 0. set debug level + # self._set_debug_level(0x1, model_forward_batch, is_dummy_run) + # 1. Prepare inputs of model and decoder. self._prepare_inputs(is_dummy_run=is_dummy_run) diff --git a/fastdeploy/worker/xpu_worker.py b/fastdeploy/worker/xpu_worker.py index 9de95aa877..66d0d9cb94 100644 --- a/fastdeploy/worker/xpu_worker.py +++ b/fastdeploy/worker/xpu_worker.py @@ -110,7 +110,10 @@ def determine_available_memory(self) -> int: ) self.model_runner.prepare_profile() - self.model_runner.profile_run() + if self.parallel_config.use_ep: + logger.warning("EP mode does not support profile run.") + else: + self.model_runner.profile_run() set_random_seed(self.fd_config.model_config.seed) total_available_memory = int(total_memory * self.cache_config.gpu_memory_utilization) @@ -118,6 +121,8 @@ def determine_available_memory(self) -> int: available_kv_cache_memory = total_available_memory - used_memory model_block_memory_used = self.cal_theortical_kvcache() available_kv_cache_memory += model_block_memory_used * self.parallel_config.total_block_num + if self.parallel_config.use_ep: + available_kv_cache_memory = int(available_kv_cache_memory * 0.6) self.model_runner.clear_block_table() @@ -147,14 +152,11 @@ def initialize_cache(self, num_gpu_blocks: int) -> None: def execute_model( self, model_forward_batch: Optional[List[Request]] = None, - is_dummy_run: bool = False, num_running_requests: Optional[int] = None, + is_dummy_run: bool = False, ) -> Optional[ModelRunnerOutput]: """ """ - - output = self.model_runner.execute_model(model_forward_batch) - - return output + return self.model_runner.execute_model(model_forward_batch, num_running_requests, is_dummy_run) def exist_prefill(self): """ diff --git a/scripts/run_ci_xpu.sh b/scripts/run_ci_xpu.sh index a240d1acab..2fe85e49b5 100644 --- a/scripts/run_ci_xpu.sh +++ b/scripts/run_ci_xpu.sh @@ -66,6 +66,9 @@ while true; do echo -e "\n服务启动超时:经过 $((TIMEOUT/60)) 分钟服务仍未启动!" cat server.log cat log/workerlog.0 + cat log/workerlog.1 + cat log/workerlog.2 + cat log/workerlog.3 exit 1 fi