Skip to content

Commit d87fc7e

Browse files
Add timeout support to MultiprocExecutor.collective_rpc and FutureWrapper (squashed)
Treat a provided timeout as an overall deadline across response MessageQueues when collecting responses; implement timeout-aware FutureWrapper.result(timeout) and FutureWrapper.wait_for_response(get_response, timeout). Fixes: #29390 Signed-off-by: sandishkumarhn <sanysandish@gmail.com>
1 parent fecae12 commit d87fc7e

File tree

1 file changed

+26
-9
lines changed

1 file changed

+26
-9
lines changed

vllm/v1/executor/multiproc_executor.py

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -71,17 +71,29 @@ def __init__(
7171
super().__init__()
7272

7373
def result(self, timeout=None):
74-
if timeout is not None:
75-
raise RuntimeError("timeout not implemented")
74+
# Support optional timeout for awaiting the result of this future.
75+
deadline = None if timeout is None else time.monotonic() + timeout
76+
7677
# Drain any futures ahead of us in the queue.
7778
while not self.done():
7879
future, get_response = self.futures_queue.pop()
79-
future.wait_for_response(get_response)
80+
81+
# Compute remaining time for this drain step.
82+
remaining = None if deadline is None else (deadline - time.monotonic())
83+
if remaining is not None and remaining <= 0:
84+
raise TimeoutError("timeout while waiting for pending RPC futures")
85+
future.wait_for_response(get_response, timeout=remaining)
86+
8087
return super().result()
8188

82-
def wait_for_response(self, get_response: Callable):
89+
def wait_for_response(self, get_response: Callable, timeout: float | None = None):
8390
try:
84-
response = self.aggregate(get_response())
91+
# Try calling get_response with a timeout parameter if it accepts one.
92+
try:
93+
response = self.aggregate(get_response(timeout=timeout))
94+
except TypeError:
95+
# Fallback for callables that don't accept timeout.
96+
response = self.aggregate(get_response())
8597
with suppress(InvalidStateError):
8698
self.set_result(response)
8799
except Exception as e:
@@ -328,12 +340,17 @@ def collective_rpc( # type: ignore[override]
328340

329341
shutdown_event = self.shutdown_event
330342

331-
def get_response():
343+
def get_response(timeout: float | None = None):
332344
responses = []
345+
start = time.monotonic()
333346
for mq in response_mqs:
334-
dequeue_timeout = (
335-
None if deadline is None else (deadline - time.monotonic())
336-
)
347+
# If a specific timeout was provided to this call, treat it as
348+
# an overall timeout for collecting responses from all MQs.
349+
if timeout is not None:
350+
elapsed = time.monotonic() - start
351+
dequeue_timeout = None if timeout is None else (timeout - elapsed)
352+
else:
353+
dequeue_timeout = None if deadline is None else (deadline - time.monotonic())
337354
try:
338355
status, result = mq.dequeue(
339356
timeout=dequeue_timeout, cancel=shutdown_event

0 commit comments

Comments
 (0)