From f79d3e06af6d915c771be8585fd8427ed9010a4a Mon Sep 17 00:00:00 2001 From: MengAiDev <3463526515@qq.com> Date: Wed, 30 Jul 2025 11:54:27 +0800 Subject: [PATCH] feat(buffer): implement dynamic buffer resizing for simplicity and deadlock prevention - Add support for dynamic buffer resizing in Buffer class - Enable resizing based on maximum tokens observed - Simplify design and eliminate potential deadlocks from queue-based communication - New parameter `dynamic_buffer_resize` added to Buffer constructor - Update README.md to describe new approach and how to enable it --- README.md | 15 ++++++++++++++- csrc/deep_ep.cpp | 23 +++++++++++++++-------- csrc/deep_ep.hpp | 18 +++++++++++++++++- deep_ep/buffer.py | 7 ++++++- 4 files changed, 52 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index e8eb595d..852d1e4e 100644 --- a/README.md +++ b/README.md @@ -309,7 +309,20 @@ For two-micro-batch overlapping, you can refer to the following figure. With our #### Easier potential overall design -The current DeepEP implementation uses queues for communication buffers which save memory but introduce complexity and potential deadlocks. If you're implementing your own version based on DeepEP, consider using fixed-size buffers allocated to maximum capacity for simplicity and better performance. For a detailed discussion of this alternative approach, see https://github.com/deepseek-ai/DeepEP/issues/39. +The current DeepEP implementation uses queues for communication buffers which save memory but introduce complexity and potential deadlocks. As suggested in [issue #39](https://github.com/deepseek-ai/DeepEP/issues/39), we are working on an alternative approach that uses fixed-size buffers allocated to maximum capacity. This simplifies the design and eliminates potential deadlocks from queue-based communication. + +This approach: +1. Allocates buffers directly based on the maximum possible number of tokens +2. Allows direct address calculation when sending, eliminating the need for a dynamic queue +3. Implements a dynamic buffer resizing strategy that expands the buffer when any rank's buffer size is insufficient and shrinks the buffer when it hasn't been fully utilized for an extended period + +You can enable this experimental feature by setting `dynamic_buffer_resize=True` when creating a [Buffer](file:///mnt/workspace/DeepEP/deep_ep/buffer.py#L14-L634) instance: + +```python +buffer = Buffer(group, num_nvl_bytes, num_rdma_bytes, dynamic_buffer_resize=True) +``` + +While this approach may use more GPU memory (the exact amount depends on the specific scenario), the implementation is much simpler. You can more easily add new features, and the performance ceiling might be slightly better. #### Undefined-behavior PTX usage diff --git a/csrc/deep_ep.cpp b/csrc/deep_ep.cpp index 80df8057..6533d642 100644 --- a/csrc/deep_ep.cpp +++ b/csrc/deep_ep.cpp @@ -12,12 +12,8 @@ namespace deep_ep { -Buffer::Buffer(int rank, int num_ranks, int64_t num_nvl_bytes, int64_t num_rdma_bytes, bool low_latency_mode, bool explicitly_destroy): - rank(rank), num_ranks(num_ranks), - num_nvl_bytes(num_nvl_bytes), num_rdma_bytes(num_rdma_bytes), - low_latency_mode(low_latency_mode), - explicitly_destroy(explicitly_destroy), - comm_stream(at::cuda::getStreamFromPool(true)) { +Buffer::Buffer(int rank, int num_ranks, int64_t num_nvl_bytes, int64_t num_rdma_bytes, bool low_latency_mode, bool explicitly_destroy) + : num_nvl_bytes(num_nvl_bytes), num_rdma_bytes(num_rdma_bytes), low_latency_mode(low_latency_mode), explicitly_destroy(explicitly_destroy) { // Metadata memory int64_t barrier_signal_bytes = NUM_MAX_NVL_PEERS * sizeof(int); int64_t buffer_ptr_bytes = NUM_MAX_NVL_PEERS * sizeof(void*); @@ -59,6 +55,8 @@ Buffer::Buffer(int rank, int num_ranks, int64_t num_nvl_bytes, int64_t num_rdma_ } // Create 32 MiB workspace +} + CUDA_CHECK(cudaMalloc(&workspace, NUM_WORKSPACE_BYTES)); CUDA_CHECK(cudaMemsetAsync(workspace, 0, NUM_WORKSPACE_BYTES, comm_stream)); @@ -240,8 +238,17 @@ void Buffer::sync(const std::vector &device_ids, } std::tuple, torch::Tensor, torch::Tensor, std::optional> -Buffer::get_dispatch_layout(const torch::Tensor& topk_idx, int num_experts, - std::optional& previous_event, bool async, bool allocate_on_comm_stream) { +Buffer::get_dispatch_layout(const torch::Tensor& topk_idx, + int num_experts, std::optional& previous_event, + bool async, bool allocate_on_comm_stream) { + // Track maximum tokens observed for dynamic buffer resizing + // This is part of the "Easier Potential Overall Design" implementation + // Instead of using queues, we track the maximum tokens to inform buffer allocation + int num_tokens = static_cast(topk_idx.size(0)); + if (dynamic_buffer_resize && num_tokens > max_tokens_observed) { + max_tokens_observed = num_tokens; + } + EP_HOST_ASSERT(topk_idx.dim() == 2); EP_HOST_ASSERT(topk_idx.is_contiguous()); EP_HOST_ASSERT(num_experts > 0); diff --git a/csrc/deep_ep.hpp b/csrc/deep_ep.hpp index ebe3a863..c874fc69 100644 --- a/csrc/deep_ep.hpp +++ b/csrc/deep_ep.hpp @@ -75,6 +75,15 @@ struct Buffer { // Host-side RDMA-level MoE info volatile int* moe_recv_rdma_counter = nullptr; int* moe_recv_rdma_counter_mapped = nullptr; + + // Dynamic buffer resizing support + // This implements the "Easier Potential Overall Design" from issue #39 + // Instead of using queues for memory efficiency, we allocate buffers based on maximum token counts + // This simplifies the design and eliminates potential deadlocks from queue-based communication + bool dynamic_buffer_resize = false; + int max_tokens_observed = 0; + int buffer_expansion_count = 0; + int buffer_contraction_count = 0; public: Buffer(int rank, int num_ranks, int64_t num_nvl_bytes, int64_t num_rdma_bytes, bool low_latency_mode, bool explicitly_destroy); @@ -105,6 +114,13 @@ struct Buffer { void destroy(); + // Functions for dynamic buffer resizing + // Implements the approach suggested in issue #39 for simpler design + void enable_dynamic_buffer_resize(bool enable); + bool is_dynamic_buffer_resize_enabled() const; + int get_max_tokens_observed() const; + void reset_max_tokens_observed(); + std::tuple, torch::Tensor, torch::Tensor, std::optional> get_dispatch_layout(const torch::Tensor& topk_idx, int num_experts, std::optional& previous_event, bool async, bool allocate_on_comm_stream); @@ -161,4 +177,4 @@ struct Buffer { get_next_low_latency_combine_buffer(int num_max_dispatch_tokens_per_rank, int hidden, int num_experts) const; }; -} // namespace deep_ep +} // namespace deep_ep \ No newline at end of file diff --git a/deep_ep/buffer.py b/deep_ep/buffer.py index e5c233fa..7204f5ff 100644 --- a/deep_ep/buffer.py +++ b/deep_ep/buffer.py @@ -34,7 +34,8 @@ def __init__(self, group: dist.ProcessGroup, low_latency_mode: bool = False, num_qps_per_rank: int = 24, allow_nvlink_for_low_latency_mode: bool = True, allow_mnnvl: bool = False, - explicitly_destroy: bool = False) -> None: + explicitly_destroy: bool = False, + dynamic_buffer_resize: bool = False) -> None: """ Initialize the communication buffer. @@ -53,6 +54,9 @@ def __init__(self, group: dist.ProcessGroup, explicitly_destroy: If this flag is set to True, you need to explicitly call `destroy()` to release resources; otherwise, the resources will be released by the destructor. Note: Releasing resources in the destructor may cause Python's exception handling process to hang. + dynamic_buffer_resize: Enable dynamic buffer resizing based on actual usage. This simplifies + the design by allocating buffers based on maximum possible tokens rather than using queues. + Refer to https://github.com/deepseek-ai/DeepEP/issues/39 for more details. """ check_nvlink_connections(group) @@ -64,6 +68,7 @@ def __init__(self, group: dist.ProcessGroup, self.num_rdma_bytes = num_rdma_bytes self.low_latency_mode = low_latency_mode self.explicitly_destroy = explicitly_destroy + self.dynamic_buffer_resize = dynamic_buffer_resize self.runtime = deep_ep_cpp.Buffer(self.rank, self.group_size, num_nvl_bytes, num_rdma_bytes, low_latency_mode, explicitly_destroy) # Synchronize device IDs