Skip to content

[feat]support rollout data transfer with mooncake#591

Open
zxpdemonio wants to merge 5 commits intoradixark:mainfrom
zxpdemonio:cruz/mooncake
Open

[feat]support rollout data transfer with mooncake#591
zxpdemonio wants to merge 5 commits intoradixark:mainfrom
zxpdemonio:cruz/mooncake

Conversation

@zxpdemonio
Copy link

  • Add Mooncake backend for rollout data transfer
  • Use zero-copy put_from/get_into for better performance
  • Auto cleanup data after retrieval to free storage

XucSh and others added 5 commits February 12, 2026 15:30
Signed-off-by: Xuchun Shang <xuchun.shang@linux.alibaba.com>
Signed-off-by: Cruz Zhao <CruzZhao@linux.alibaba.com>
Signed-off-by: Cruz Zhao <CruzZhao@linux.alibaba.com>
Signed-off-by: Cruz Zhao <CruzZhao@linux.alibaba.com>
Co-authored-by: Cursor <cursoragent@cursor.com>
Signed-off-by: Cruz Zhao <CruzZhao@linux.alibaba.com>
@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @zxpdemonio, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly upgrades the system's data transfer capabilities by integrating Mooncake as an alternative backend. This integration is designed to boost performance through zero-copy data operations and optimize storage usage with automatic cleanup. The changes provide a more flexible and efficient way to manage rollout data, particularly beneficial for large-scale distributed training environments.

Highlights

  • Mooncake Data Transfer Backend: Introduced a new MooncakeDataTransfer backend for efficient rollout data transfer, leveraging mooncake.store.MooncakeDistributedStore.
  • Zero-Copy Operations: Implemented zero-copy put_from and get_into methods within the Mooncake backend to enhance data transfer performance by avoiding unnecessary data copying.
  • Automatic Data Cleanup: Added an asynchronous, delayed cleanup mechanism for Mooncake, which automatically removes transferred data from storage after retrieval to free up resources.
  • Pluggable Data Transfer: Refactored data handling to support a pluggable data transfer backend, allowing users to choose between the default Ray object store or the new Mooncake implementation via a configuration argument.
Changelog
  • miles/backends/megatron_utils/actor.py
    • Imported get_data_transfer_backend to dynamically select the data transfer mechanism.
    • Initialized a transfer_backend instance based on configuration arguments.
    • Modified the get_rollout_data call to pass the newly initialized transfer_backend.
  • miles/backends/training_utils/data.py
    • Updated the get_rollout_data function signature to accept a transfer_backend argument.
    • Modified process_rollout_data to utilize the provided transfer_backend for data retrieval, replacing direct Ray calls.
    • Simplified multi-line tensor assignments and logging statements for improved readability.
  • miles/ray/rollout.py
    • Imported get_data_transfer_backend for backend selection.
    • Initialized a transfer_backend instance within the RolloutManager.
    • Replaced ray.put calls with self.transfer_backend.put for storing rollout data.
    • Refactored several multi-line function calls and assertions for conciseness.
  • miles/utils/data.py
    • Refactored multi-line logging and assertion statements for improved code formatting.
  • miles/utils/data_transfer.py
    • Added a new file defining DataTransferBackend abstract class and its implementations.
    • Implemented RayDataTransfer for standard Ray object store operations.
    • Implemented MooncakeDataTransfer with zero-copy put_from/get_into and an asynchronous cleanup thread.
    • Included MooncakeStoreConfig for loading Mooncake configuration from environment variables.
    • Provided a get_data_transfer_backend factory function to select the appropriate backend.
  • scripts/run-qwen3-0.6b.sh
    • Added a new shell script to configure and launch a Qwen3-0.6B model training job.
    • Included environment variable settings for Mooncake to demonstrate its integration.
    • Configured Ray job submission with various model, rollout, optimizer, and performance arguments.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@zxpdemonio zxpdemonio changed the title support rollout data transfer with mooncake [feat]support rollout data transfer with mooncake Feb 12, 2026
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a data transfer abstraction layer with a new Mooncake backend for improved performance using zero-copy transfers and automatic data cleanup. However, it introduces a significant security risk by using the pickle library for data serialization and deserialization, which is known to be insecure and can lead to remote code execution (RCE) if the data source is untrusted or compromised, especially given Mooncake is a distributed store. Additionally, I've identified a potential race condition in the cleanup thread management within the MooncakeDataTransfer class, an opportunity for efficiency improvements in the cleanup worker, and suggestions for adding type hints to enhance clarity.

serialized_data = bytes(buffer[:bytes_read])

# Deserialize and return
data = pickle.loads(serialized_data)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-high high

The use of pickle.loads on data retrieved from an external distributed store (MooncakeDistributedStore) is insecure. pickle can be exploited to execute arbitrary code if the input data is maliciously crafted. Since Mooncake is a distributed storage system, an attacker who can write to the store or intercept network traffic could achieve Remote Code Execution (RCE) on the training actors.

To remediate this, consider using a safer serialization format like json for non-tensor data and safetensors for tensor data. If you must use pickle, you should implement a message authentication code (MAC), such as HMAC with a shared secret, to verify the integrity and authenticity of the data before calling pickle.loads.

Comment on lines +327 to +328
if self._cleanup_thread is None or not self._cleanup_thread.is_alive():
self._start_cleanup_thread()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

There is a potential race condition here. The check for the cleanup thread's existence and liveness is not atomic with the call to _start_cleanup_thread(). If multiple threads call get() concurrently and find that the cleanup thread is not running, they might all attempt to start it. This can lead to a RuntimeError: threads can only be started once.

You should use a threading.Lock to protect this block of code to ensure the thread is started only once. You'll need to add self._cleanup_thread_lock = threading.Lock() in the __init__ method.

Suggested change
if self._cleanup_thread is None or not self._cleanup_thread.is_alive():
self._start_cleanup_thread()
with self._cleanup_thread_lock:
if self._cleanup_thread is None or not self._cleanup_thread.is_alive():
self._start_cleanup_thread()



def get_rollout_data(args: Namespace, rollout_data_ref: Box, parallel_state: ParallelState) -> RolloutBatch:
def get_rollout_data(args: Namespace, rollout_data_ref, parallel_state: ParallelState, transfer_backend) -> RolloutBatch:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The type hints for rollout_data_ref and transfer_backend are missing. Adding them would improve code clarity and maintainability. Based on the new data_transfer module, transfer_backend should be of type DataTransferBackend. The type of rollout_data_ref now depends on the backend, so Any would be appropriate.

You will need to add the following imports:

from typing import Any
from miles.utils.data_transfer import DataTransferBackend
Suggested change
def get_rollout_data(args: Namespace, rollout_data_ref, parallel_state: ParallelState, transfer_backend) -> RolloutBatch:
def get_rollout_data(args: Namespace, rollout_data_ref: "Any", parallel_state: ParallelState, transfer_backend: "DataTransferBackend") -> RolloutBatch:



def process_rollout_data(args, rollout_data_ref, dp_rank, dp_size):
def process_rollout_data(args, rollout_data_ref, dp_rank, dp_size, transfer_backend):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This function is missing type hints for its arguments. Adding them would improve readability and help static analysis tools. Using forward references in strings for types that are not imported is a good practice.

You will need to add the following imports:

from argparse import Namespace
from miles.utils.data_transfer import DataTransferBackend
Suggested change
def process_rollout_data(args, rollout_data_ref, dp_rank, dp_size, transfer_backend):
def process_rollout_data(args: "Namespace", rollout_data_ref: list, dp_rank: int, dp_size: int, transfer_backend: "DataTransferBackend"):

Comment on lines +181 to +235
def _cleanup_worker(self):
"""Background worker that periodically deletes keys."""
while not self._cleanup_stop_event.is_set():
try:
# Collect keys ready for deletion
current_time = time.time()
keys_to_delete = []
keys_not_ready = []

# Process pending deletions
while len(keys_to_delete) < self.cleanup_batch_size:
try:
# Non-blocking get with timeout
key, deletion_time = self._pending_deletion.get(timeout=0.1)

if current_time >= deletion_time:
keys_to_delete.append(key)
else:
# Keep track of keys not ready yet
keys_not_ready.append((key, deletion_time))
except queue.Empty:
break

# Put back keys that are not ready yet
for key, deletion_time in keys_not_ready:
self._pending_deletion.put((key, deletion_time))

# Batch delete keys
if keys_to_delete:
deleted_count = 0
for key in keys_to_delete:
try:
result = self.store.remove(key)
if result == 0:
deleted_count += 1
else:
logger.warning(f"Failed to delete key {key}, error code: {result}")
except Exception as e:
logger.warning(f"Exception while deleting key {key}: {e}")

if deleted_count > 0:
logger.debug(f"Deleted {deleted_count} keys from Mooncake store")

# Sleep a bit to avoid busy waiting
# If we have keys not ready, sleep until the earliest one is ready
if keys_not_ready:
earliest_time = min(dt for _, dt in keys_not_ready)
sleep_time = min(0.5, max(0.1, earliest_time - current_time))
time.sleep(sleep_time)
else:
time.sleep(0.5)

except Exception as e:
logger.error(f"Error in cleanup worker: {e}", exc_info=True)
time.sleep(1.0) # Sleep longer on error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The _cleanup_worker uses a standard queue.Queue with a polling mechanism (get(timeout=0.1)) to find keys ready for deletion. This approach is inefficient as it involves busy-waiting when the queue is empty and re-queueing items that are not yet ready, consuming unnecessary CPU cycles.

A queue.PriorityQueue would be a more efficient data structure for this task. By storing items with their deletion time as the priority, the worker thread can block on get() to wait for the next item due for deletion and then sleep until its deletion time. This avoids polling and improves efficiency.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants