Skip to content

Conversation

@SandishKumarHN
Copy link

@SandishKumarHN SandishKumarHN commented Nov 29, 2025

Purpose

  • Add support for timeouts when calling vllm.v1.executor.multiproc_executor.MultiprocExecutor.collective_rpc and when awaiting futures returned by non-blocking RPCs.
  • Treat a provided timeout as an overall deadline across response MessageQueues when collecting responses.
  • Add FutureWrapper.result(timeout=...) and propagate the timeout into FutureWrapper.wait_for_response(get_response, timeout=...) so user code can wait on the Future with an overall timeout.
  • This change makes debugging and handling stuck RPCs easier by allowing bounded waits for remote workers.
  • Related issue: [Feature]: rpc timeout #29390

Test Plan

I validated the new behavior locally in a lightweight, standalone script that emulates the FutureWrapper and get_response semantics (no vLLM or C extensions required).

Quick manual verification steps you can run locally:

Small standalone test script

import time
from collections import deque
from concurrent.futures import TimeoutError

# Minimal FutureWrapper reproduction (same semantics as in multiproc_executor.py)
from concurrent.futures import Future, InvalidStateError
from contextlib import suppress

class FutureWrapper(Future):
    def __init__(self, futures_queue, aggregate=lambda x: x):
        self.futures_queue = futures_queue
        self.aggregate = aggregate
        super().__init__()

    def result(self, timeout=None):
        deadline = None if timeout is None else time.monotonic() + timeout
        while not self.done():
            future, get_response = self.futures_queue.pop()
            remaining = None if deadline is None else (deadline - time.monotonic())
            if remaining is not None and remaining <= 0:
                raise TimeoutError("timeout while waiting for pending RPC futures")
            future.wait_for_response(get_response, timeout=remaining)
        return super().result()

    def wait_for_response(self, get_response, timeout=None):
        try:
            try:
                response = self.aggregate(get_response(timeout=timeout))
            except TypeError:
                response = self.aggregate(get_response())
            with suppress(InvalidStateError):
                self.set_result(response)
        except Exception as e:
            with suppress(InvalidStateError):
                self.set_exception(e)

# Test helpers
futures_q = deque()
fut = FutureWrapper(futures_q)
# Simulate a get_response that sleeps longer than the timeout
def slow_response(timeout=None):
    time.sleep(0.2)
    return "ok"

futures_q.appendleft((fut, slow_response))

try:
    fut.result(timeout=0.05)
    print("Unexpected success")
except TimeoutError:
    print("Timeout raised as expected")

# Now a fast response
futures_q.appendleft((FutureWrapper(futures_q), lambda timeout=None: "fast"))
print("Fast response OK")

Test Result

  • Lightweight standalone validation: I ran a standalone script that reproduces the FutureWrapper semantics — it demonstrated the timeout path and success path as expected.

  • Integration test note: Running the full repository integration tests (e.g., test_collective_rpc.py)


Essential Elements of an Effective PR Description Checklist
  • The purpose of the PR, such as "Fix some issue (link existing issues this PR will resolve)".
  • The test plan, such as providing test command.
  • The test results, such as pasting the results comparison before and after, or e2e results
  • (Optional) The necessary documentation update, such as updating supported_models.md and examples for a new model.
  • (Optional) Release notes update. If your change is user facing, please update the release notes draft in the Google Doc.

@chatgpt-codex-connector
Copy link

Codex usage limits have been reached for code reviews. Please check with the admins of this repo to increase the limits by adding credits.

@github-actions
Copy link

👋 Hi! Thank you for contributing to the vLLM project.

💬 Join our developer Slack at https://slack.vllm.ai to discuss your PR in #pr-reviews, coordinate on features in #feat- channels, or join special interest groups in #sig- channels.

Just a reminder: PRs would not trigger full CI run by default. Instead, it would only run fastcheck CI which starts running only a small and essential subset of CI tests to quickly catch errors.

You ask your reviewers to trigger select CI tests on top of fastcheck CI.

Once the PR is approved and ready to go, your PR reviewer(s) can run CI to test the changes comprehensively before merging.

To run CI, PR reviewers can either: Add ready label to the PR or enable auto-merge.

If you have any questions, please reach out to us on Slack at https://slack.vllm.ai.

🚀

@mergify mergify bot added the v1 label Nov 29, 2025
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces timeout support for MultiprocExecutor.collective_rpc and FutureWrapper, which is a valuable addition for handling stuck RPCs and improving debuggability. The implementation correctly propagates timeouts for both blocking and non-blocking calls. I've found a critical issue where a negative timeout could be passed to the underlying ZMQ socket, causing it to wait indefinitely instead of timing out. I've provided a suggestion to fix this by explicitly checking for non-positive timeout values before making the call. Otherwise, the changes look good and align with the goal of the pull request.

…pper (squashed)

Treat a provided timeout as an overall deadline across response MessageQueues when collecting responses; implement timeout-aware FutureWrapper.result(timeout) and FutureWrapper.wait_for_response(get_response, timeout).

Fixes: vllm-project#29390
Signed-off-by: sandishkumarhn <sanysandish@gmail.com>
@SandishKumarHN SandishKumarHN changed the title Add timeout support to MultiprocExecutor.collective_rpc and FutureWrapper (fixes #29390) [Feature][#29390]: Add timeout support to MultiprocExecutor.collective_rpc and FutureWrapper Nov 29, 2025
@SandishKumarHN SandishKumarHN mentioned this pull request Nov 29, 2025
1 task
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant