From 25a31d5ee7a2a0a52085c9e219069bf12409a9b5 Mon Sep 17 00:00:00 2001 From: littlegy <787321726@qq.com> Date: Mon, 24 Nov 2025 17:46:37 +0800 Subject: [PATCH 1/5] TEST: update distributed test --- autotest/config-h.yaml | 7 + autotest/conftest.py | 33 ++ autotest/evaluate/test_api_evaluate.py | 74 +++++ .../test_restful_chat_hf_pytorch_llm.py | 45 +++ .../test_restful_chat_hf_turbomind_llm.py | 45 +++ autotest/utils/distributed_utils.py | 302 ++++++++++++++++++ 6 files changed, 506 insertions(+) create mode 100644 autotest/utils/distributed_utils.py diff --git a/autotest/config-h.yaml b/autotest/config-h.yaml index afe6fde07b..0a7a9bfb4e 100644 --- a/autotest/config-h.yaml +++ b/autotest/config-h.yaml @@ -19,6 +19,7 @@ tp_config: DeepSeek-V3.1: 8 Qwen3-30B-A3B-Base: 2 Qwen2.5-32B-Instruct: 2 + Kimi-K2-Instruct-0905: 16 turbomind_chat_model: - internlm/Intern-S1 @@ -36,6 +37,7 @@ turbomind_chat_model: - Qwen/Qwen3-32B-FP8 - openai/gpt-oss-120b - openai/gpt-oss-20b + - moonshotai/Kimi-K2-Instruct-0905 pytorch_chat_model: - internlm/Intern-S1 @@ -54,6 +56,7 @@ pytorch_chat_model: - unsloth/gpt-oss-120b-BF16 - unsloth/gpt-oss-20b-BF16 - deepseek/DeepSeek-V3.1 + - moonshotai/Kimi-K2-Instruct-0905 turbomind_vl_model: - internlm/Intern-S1 @@ -89,6 +92,7 @@ turbomind_quatization: - Qwen/Qwen3-32B-FP8 - openai/gpt-oss-120b - openai/gpt-oss-20b + - moonshotai/Kimi-K2-Instruct-0905 gptq: - empty no_kvint4: @@ -107,6 +111,7 @@ turbomind_quatization: - Qwen/Qwen3-32B-FP8 - openai/gpt-oss-120b - openai/gpt-oss-20b + - moonshotai/Kimi-K2-Instruct-0905 no_kvint8: - empty @@ -129,6 +134,7 @@ pytorch_quatization: - Qwen/Qwen3-30B-A3B-FP8 - Qwen/Qwen3-32B - Qwen/Qwen3-32B-FP8 + - moonshotai/Kimi-K2-Instruct-0905 no_kvint8: - empty @@ -167,3 +173,4 @@ evaluate_model: - unsloth/gpt-oss-120b-BF16 - unsloth/gpt-oss-20b-BF16 - deepseek/DeepSeek-V3.1 + - moonshotai/Kimi-K2-Instruct-0905 diff --git a/autotest/conftest.py b/autotest/conftest.py index e4c23c13be..618dd3a87a 100644 --- a/autotest/conftest.py +++ b/autotest/conftest.py @@ -2,11 +2,14 @@ import pytest import yaml +from utils.distributed_utils import RayLMDeployManager cli_prompt_case_file = 'autotest/chat_prompt_case.yaml' common_prompt_case_file = 'autotest/prompt_case.yaml' config_file = 'autotest/config.yaml' +PROXY_PORT = 8000 + @pytest.fixture(scope='session') def config(): @@ -43,6 +46,36 @@ def common_case_config(): return case_config +@pytest.fixture(scope='session') +def shared_ray_manager(): + master_addr = os.getenv('MASTER_ADDR', 'localhost') + device = os.environ.get('DEVICE', '') + if device: + device_config_path = f'autotest/config-{device}.yaml' + if os.path.exists(device_config_path): + config_path = device_config_path + else: + config_path = config_file + else: + config_path = config_file + + with open(config_path) as f: + env_config = yaml.load(f.read(), Loader=yaml.SafeLoader) + log_dir = env_config.get('log_path', '/tmp/lmdeploy_test') + + manager = RayLMDeployManager(master_addr=master_addr, api_port=PROXY_PORT, log_dir=log_dir, health_check=True) + + manager.start_ray_cluster() + + if manager.is_master: + print('๐ŸŽฏ Master node: Ray cluster started, waiting for worker nodes to join...') + + yield manager + + print(f'\n[Final Cleanup] Node {manager.node_rank} performing final resource cleanup...') + manager.cleanup(force=True) + + def pytest_addoption(parser): parser.addoption('--run_id', action='store', default='', help='github run_id') parser.addoption('--device', action='store', default='', help='device config suffix') diff --git a/autotest/evaluate/test_api_evaluate.py b/autotest/evaluate/test_api_evaluate.py index 05bc3c1f2d..8e68c741fc 100644 --- a/autotest/evaluate/test_api_evaluate.py +++ b/autotest/evaluate/test_api_evaluate.py @@ -1,5 +1,9 @@ +import os +import time + import pytest from utils.config_utils import get_evaluate_pytorch_model_list, get_evaluate_turbomind_model_list, get_workerid +from utils.distributed_utils import worker_node_wait from utils.evaluate_utils import restful_test from utils.run_restful_chat import start_proxy_server, start_restful_api, stop_restful_api @@ -75,6 +79,48 @@ def prepare_environment_judge_evaluate(request, config, worker_id): stop_restful_api(proxy_pid, proxy_process, request.param) +def _run_distributed_test( + config, + run_id, + model_param, + worker_id, + test_type='infer', + manager=None, # โ† New parameter: pass in shared manager + eval_config_name='default'): + """Universal distributed test executor (using shared Ray cluster)""" + assert manager is not None, 'Manager instance must be provided' + if 'gpt' in model_param.get('model', '').lower(): + eval_config_name = 'gpt' + preset_config = EVAL_CONFIGS.get(eval_config_name, {}) + + if manager.is_master: + model_name = model_param['model'] + model_path = os.path.join(config['model_path'], model_name) + preset_config = EVAL_CONFIGS.get(eval_config_name, {}) + + # Start API Server for current model (master node starts/stops, worker nodes verify) + manager.start_lmdeploy_api_server(model_path=model_path, model_param=model_param) + + try: + print(f'๐Ÿงช Master node executing {test_type} test ({eval_config_name})...') + result, msg = restful_test(config, + run_id, + model_param, + worker_id=worker_id, + port=PROXY_PORT, + test_type=test_type, + **preset_config) + assert result, f'โŒ {test_type} test failed: {msg}' + print(f'โœ… {test_type} test passed') + + finally: + # Clean up API Server for current model (worker nodes skip) + manager.cleanup(force=False) + else: + time.sleep(10) + worker_node_wait(manager, timeout_minutes=4880) + + def get_turbomind_model_list(tp_num): model_list = get_evaluate_turbomind_model_list(tp_num, kvint_list=[4, 8]) new_model_list = [] @@ -165,6 +211,20 @@ def test_turbomind_restful_tp8(config, run_id, prepare_environment, worker_id): assert result, msg +@pytest.mark.infer +@pytest.mark.turbomind +@pytest.mark.gpu_num_16 +@pytest.mark.flaky(reruns=0) +@pytest.mark.parametrize('model_param', get_turbomind_model_list(tp_num=16)) +def test_turbomind_restful_distributed_tp16(shared_ray_manager, config, run_id, model_param, worker_id): + _run_distributed_test(config=config, + run_id=run_id, + model_param=model_param, + worker_id=worker_id, + test_type='infer', + manager=shared_ray_manager) + + @pytest.mark.infer @pytest.mark.pytorch @pytest.mark.gpu_num_1 @@ -220,6 +280,20 @@ def test_pytorch_restful_tp16(config, run_id, prepare_environment, worker_id): assert result, msg +@pytest.mark.infer +@pytest.mark.pytorch +@pytest.mark.gpu_num_16 +@pytest.mark.flaky(reruns=0) +@pytest.mark.parametrize('model_param', get_pytorch_model_list(tp_num=16)) +def test_pytorch_restful_distributed_tp16(shared_ray_manager, config, run_id, model_param, worker_id): + _run_distributed_test(config=config, + run_id=run_id, + model_param=model_param, + worker_id=worker_id, + test_type='infer', + manager=shared_ray_manager) + + @pytest.mark.eval @pytest.mark.pytorch @pytest.mark.gpu_num_1 diff --git a/autotest/tools/restful/test_restful_chat_hf_pytorch_llm.py b/autotest/tools/restful/test_restful_chat_hf_pytorch_llm.py index 4c6b14ab6b..2100602e6a 100644 --- a/autotest/tools/restful/test_restful_chat_hf_pytorch_llm.py +++ b/autotest/tools/restful/test_restful_chat_hf_pytorch_llm.py @@ -1,8 +1,13 @@ +import os +import time + import pytest from utils.config_utils import get_torch_model_list, get_workerid +from utils.distributed_utils import worker_node_wait from utils.run_restful_chat import run_all_step, run_reasoning_case, run_tools_case, start_restful_api, stop_restful_api DEFAULT_PORT = 23333 +PROXY_PORT = 8000 @pytest.fixture(scope='function', autouse=True) @@ -33,6 +38,34 @@ def getPrefixCacheModelList(tp_num): } for item in get_torch_model_list(tp_num, exclude_dup=True)] +def _run_distributed_test( + config, + model_param, + common_case_config, + worker_id, + manager=None, # โ† New parameter: pass in shared manager +): + """Universal distributed test executor (using shared Ray cluster)""" + assert manager is not None, 'Manager instance must be provided' + + if manager.is_master: + model_name = model_param['model'] + model_path = os.path.join(config['model_path'], model_name) + + # Start API Server for current model (master node starts/stops, worker nodes verify) + manager.start_lmdeploy_api_server(model_path=model_path, model_param=model_param) + + try: + run_all_step(config, common_case_config, worker_id=worker_id, port=PROXY_PORT) + + finally: + # Clean up API Server for current model (worker nodes skip) + manager.cleanup(force=False) + else: + time.sleep(10) + worker_node_wait(manager, timeout_minutes=4880) + + @pytest.mark.order(7) @pytest.mark.usefixtures('common_case_config') @pytest.mark.prefix_cache_test @@ -111,6 +144,18 @@ def test_restful_chat_tp16(config, common_case_config, worker_id): run_all_step(config, common_case_config, worker_id=worker_id, port=DEFAULT_PORT + get_workerid(worker_id)) +@pytest.mark.order(7) +@pytest.mark.usefixtures('common_case_config') +@pytest.mark.restful_api_pytorch +@pytest.mark.parametrize('model_param', getModelList(tp_num=16)) +def test_restful_chat_distributed_tp16(shared_ray_manager, config, model_param, common_case_config, worker_id): + _run_distributed_test(config=config, + model_param=model_param, + common_case_config=common_case_config, + worker_id=worker_id, + manager=shared_ray_manager) + + def getKvintModelList(tp_num, quant_policy): return [{ 'model': item, diff --git a/autotest/tools/restful/test_restful_chat_hf_turbomind_llm.py b/autotest/tools/restful/test_restful_chat_hf_turbomind_llm.py index 2fbbff10fe..30794b90a4 100644 --- a/autotest/tools/restful/test_restful_chat_hf_turbomind_llm.py +++ b/autotest/tools/restful/test_restful_chat_hf_turbomind_llm.py @@ -1,9 +1,14 @@ +import os +import time + import pytest from utils.config_utils import get_communicator_list, get_turbomind_model_list, get_workerid +from utils.distributed_utils import worker_node_wait from utils.run_restful_chat import (run_all_step, run_reasoning_case, run_tools_case, start_restful_api, stop_restful_api, test_logprobs) DEFAULT_PORT = 23333 +PROXY_PORT = 8000 @pytest.fixture(scope='function', autouse=True) @@ -41,6 +46,34 @@ def getPrefixCacheModelList(tp_num): return model_list +def _run_distributed_test( + config, + model_param, + common_case_config, + worker_id, + manager=None, # โ† New parameter: pass in shared manager +): + """Universal distributed test executor (using shared Ray cluster)""" + assert manager is not None, 'Manager instance must be provided' + + if manager.is_master: + model_name = model_param['model'] + model_path = os.path.join(config['model_path'], model_name) + + # Start API Server for current model (master node starts/stops, worker nodes verify) + manager.start_lmdeploy_api_server(model_path=model_path, model_param=model_param) + + try: + run_all_step(config, common_case_config, worker_id=worker_id, port=PROXY_PORT) + + finally: + # Clean up API Server for current model (worker nodes skip) + manager.cleanup(force=False) + else: + time.sleep(10) + worker_node_wait(manager, timeout_minutes=4880) + + @pytest.mark.order(7) @pytest.mark.usefixtures('common_case_config') @pytest.mark.prefix_cache_test @@ -102,6 +135,18 @@ def test_restful_chat_tp8(config, common_case_config, worker_id): run_all_step(config, common_case_config, worker_id=worker_id, port=DEFAULT_PORT + get_workerid(worker_id)) +@pytest.mark.order(7) +@pytest.mark.usefixtures('common_case_config') +@pytest.mark.restful_api +@pytest.mark.parametrize('model_param', getModelList(tp_num=16)) +def test_restful_chat_distributed_tp16(shared_ray_manager, config, model_param, common_case_config, worker_id): + _run_distributed_test(config=config, + model_param=model_param, + common_case_config=common_case_config, + worker_id=worker_id, + manager=shared_ray_manager) + + def getKvintModelList(tp_num, quant_policy): model_list = [] for communicator in get_communicator_list(tp_num): diff --git a/autotest/utils/distributed_utils.py b/autotest/utils/distributed_utils.py new file mode 100644 index 0000000000..c77d644175 --- /dev/null +++ b/autotest/utils/distributed_utils.py @@ -0,0 +1,302 @@ +# utils/distributed_utils.py +import os +import subprocess +import time +import random +import socket +import requests +from typing import Dict, Any +from time import time as time_time + +# Default constants +LM_DEPLOY_API_PORT = 8000 +RAY_PORT = 6379 +HEALTH_CHECK_TIMEOUT = 30 +CONNECTION_CHECK_TIMEOUT = 5 +WORKER_WAIT_INTERVAL = 30 + + +def wait_for_model_service_ready( + host: str, + api_port: int, + model_name: str, + timeout_seconds: int = 1000, +) -> bool: + """ + Wait for LMDeploy API Server to be ready and verify basic functionality. + No longer checks multi-node registration (API Server is a single-point service). + """ + print(f"โณ Waiting for LMDeploy API Server to be ready (Model: {model_name}), Timeout: {timeout_seconds}s") + + start_time = time_time() + check_count = 0 + last_progress_print = 0 + progress_print_interval = 30 + + # Random initial delay to avoid multiple clients requesting simultaneously + time.sleep(random.uniform(1, 5)) + + while time_time() - start_time < timeout_seconds: + check_count += 1 + current_time = time_time() + + try: + # Check if port is open + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.settimeout(5) + if sock.connect_ex((host, api_port)) != 0: + if current_time - last_progress_print >= progress_print_interval: + print(f"๐Ÿ”Œ Check {check_count}: API port {api_port} not ready") + last_progress_print = current_time + time.sleep(10) + continue + + # Verify service functionality + if verify_service_functionality(host, api_port, model_name, check_count): + print("โœ… LMDeploy API Server is fully ready!") + return True + + except Exception as e: + if current_time - last_progress_print >= progress_print_interval: + print(f"๐Ÿ”ง Check {check_count}: Exception - {e}") + last_progress_print = current_time + + sleep_time = 10 + random.uniform(-2, 2) + time.sleep(sleep_time) + + print(f"โŒ LMDeploy API Server startup timed out ({timeout_seconds} seconds)") + return False + + +def verify_service_functionality(host: str, api_port: int, model_name: str, check_count: int) -> bool: + """Verify that the API Server can respond to basic requests""" + try: + test_data = { + "model": model_name, + "messages": [{"role": "user", "content": "hi"}], + "max_tokens": 5, + "stream": False + } + + resp = requests.post( + f"http://{host}:{api_port}/v1/chat/completions", + json=test_data, + timeout=15 + ) + + if resp.status_code == 200: + print(f"โœ… Check {check_count}: Service functionality normal (received valid response)") + return True + elif resp.status_code == 400: + print(f"โœ… Check {check_count}: Service framework activated (received 400)") + return True + else: + print(f"๐Ÿ”ง Check {check_count}: Service test failed, status code: {resp.status_code}") + return False + + except requests.exceptions.RequestException as e: + print(f"๐Ÿ”ง Check {check_count}: Service test exception - {e}") + return False + + + +class RayLMDeployManager: + def __init__( + self, + master_addr: str, + ray_port: int = RAY_PORT, + api_port: int = LM_DEPLOY_API_PORT, + log_dir: str = ".", + health_check: bool = True, + ): + self.master_addr = master_addr + self.ray_port = ray_port + self.api_port = api_port + self.log_dir = log_dir + self.health_check = health_check + self._cleaned = False + + # Determine if this is the master node (via environment variable NODE_RANK) + self.node_rank = int(os.getenv("NODE_RANK", "0")) + self.is_master = (self.node_rank == 0) + + os.makedirs(self.log_dir, exist_ok=True) + print(f"๐Ÿ“ Node {self.node_rank} log directory: {self.log_dir}") + + # Print cluster information + self.node_count = int(os.getenv("NODE_COUNT", "1")) + self.job_id = os.getenv("JOB_ID", "unknown") + print(f"๐ŸŽฏ Node {self.node_rank} cluster information:") + print(f" - Total nodes: {self.node_count}") + print(f" - Role: {'Master node' if self.is_master else 'Worker node'}") + print(f" - Master address: {self.master_addr}") + print(f" - Ray port: {self.ray_port}") + print(f" - API port: {self.api_port}") + print(f" - Job ID: {self.job_id}") + + def start_ray_cluster(self): + """Start or join Ray cluster""" + if self.is_master: + cmd = ["ray", "start", "--head", "--port", str(self.ray_port)] + print(f"๐Ÿš€ Master node starting Ray cluster (Port: {self.ray_port})") + else: + cmd = ["ray", "start", "--address", f"{self.master_addr}:{self.ray_port}"] + print(f"๐Ÿ”Œ Worker node {self.node_rank} joining Ray cluster: {self.master_addr}:{self.ray_port}") + + try: + result = subprocess.run(cmd, capture_output=True, text=True, check=True) + print("โœ… Ray started successfully") + except subprocess.CalledProcessError as e: + print(f"๐Ÿ’ฅ Ray startup failed: {e.stderr}") + raise + + def start_lmdeploy_api_server(self, model_path: str, model_param: dict): + """ + Master node: Start LMDeploy API Server and wait for it to be ready. + Worker nodes: Do not start the service, only verify that the master node's API Server is ready. + """ + if self.is_master: + # === Master node logic: Start service === + log_path = os.path.join(self.log_dir, "lmdeploy_api.log") + tp = model_param.get('tp_num', 1) + backend = model_param.get('backend', 'turbomind') + communicator = model_param.get('communicator', 'nccl') + quant_policy = model_param.get('quant_policy', 0) + + with open(log_path, "w") as log_file: + cmd = [ + "lmdeploy", "serve", "api_server", + model_path, + "--server-port", str(self.api_port), + "--tp", str(tp), + "--backend", backend, + "--communicator", communicator + ] + + if backend == "turbomind": + cmd.extend(["--quant-policy", str(quant_policy)]) + print(f"๐Ÿš€ Master node starting LMDeploy API Server: {' '.join(cmd)}") + self._api_process = subprocess.Popen(cmd, stdout=log_file, stderr=log_file) + print(f"๐Ÿ“ API Server log: {log_path}") + + # Wait for service to be ready + if self.health_check: + ready = wait_for_model_service_ready( + host=self.master_addr, + api_port=self.api_port, + model_name=model_path, + timeout_seconds=1000 + ) + if not ready: + print("โŒ API Server failed to be ready, terminating process") + self._api_process.terminate() + try: + self._api_process.wait(timeout=10) + except subprocess.TimeoutExpired: + self._api_process.kill() + raise RuntimeError("LMDeploy API Server failed to start") + else: + # === Worker node logic: Only verify that the master node service is ready === + print(f"๐Ÿ” Worker node {self.node_rank} is verifying that the master node ({self.master_addr}:{self.api_port}) API Server is ready...") + if self.health_check: + ready = wait_for_model_service_ready( + host=self.master_addr, + api_port=self.api_port, + model_name=model_path, + timeout_seconds=1000 + ) + if not ready: + raise RuntimeError( + f"Worker node {self.node_rank}: Master node API Server not ready within 1000 seconds, cannot continue" + ) + else: + print("โš ๏ธ health_check=False, skipping API Server readiness check (not recommended)") + + def cleanup(self, force: bool = True): + """Clean up resources + + Args: + force (bool): + - False: Only stop LMDeploy API Server (used after individual test completion) + - True: Stop API Server + Ray cluster (used for final cleanup at session end) + """ + if self._cleaned and force: + # Note: If this is just an intermediate cleanup with force=False, we shouldn't skip due to _cleaned + # So only skip when force=True and already cleaned + return + + print(f"๐Ÿงน Node {self.node_rank} cleaning resources... (force={force})") + + # Stop API Server (master node only) + if hasattr(self, '_api_process') and self._api_process.poll() is None: + self._api_process.terminate() + try: + self._api_process.wait(timeout=10) + except subprocess.TimeoutExpired: + self._api_process.kill() + print("โœ… LMDeploy API Server stopped") + # Note: We don't clear the _api_process attribute here so it can be checked later + + # Stop Ray (only when force=True) + if force: + try: + subprocess.run(["ray", "stop", "--force"], check=False, capture_output=True) + print("โœ… Ray cluster stopped") + except Exception as e: + print(f"โš ๏ธ Ray stop exception: {e}") + self._cleaned = True # Only mark as "fully cleaned" when force=True + + def get_cluster_info(self) -> Dict[str, Any]: + return { + "node_rank": self.node_rank, + "node_count": self.node_count, + "master_addr": self.master_addr, + "ray_port": self.ray_port, + "api_port": self.api_port, + "is_master": self.is_master, + "job_id": self.job_id, + } + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.cleanup() + + +def worker_node_wait(manager: RayLMDeployManager, timeout_minutes: int = 60): + """ + Worker node waits for Ray master node (Head Node) to be alive (by detecting GCS service port) + """ + if manager.is_master: + return + + print(f"โธ๏ธ Worker node {manager.node_rank} entering wait mode...") + max_checks = (timeout_minutes * 60) // WORKER_WAIT_INTERVAL + consecutive_failures = 0 + max_consecutive_failures = 3 + + for i in range(max_checks): + try: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.settimeout(CONNECTION_CHECK_TIMEOUT) + if sock.connect_ex((manager.master_addr, RAY_PORT)) == 0: + consecutive_failures = 0 + else: + consecutive_failures += 1 + except Exception: + consecutive_failures += 1 + + if consecutive_failures >= max_consecutive_failures: + print("๐Ÿ“ก Ray master node GCS service unreachable, worker node exiting") + break + + if i % 4 == 0: + elapsed = (i * WORKER_WAIT_INTERVAL) // 60 + print(f"โณ Worker node {manager.node_rank} waiting... Running for {elapsed} minutes") + + time.sleep(WORKER_WAIT_INTERVAL) + else: + print(f"โฐ Worker node {manager.node_rank} wait timeout ({timeout_minutes} minutes)") + + manager.cleanup() \ No newline at end of file From 3f6f989dd466b6c6da451ee6ab22158f05c25028 Mon Sep 17 00:00:00 2001 From: littlegy <787321726@qq.com> Date: Tue, 25 Nov 2025 14:04:32 +0800 Subject: [PATCH 2/5] add proxy test --- autotest/conftest.py | 35 +- autotest/evaluate/test_api_evaluate.py | 102 ++++- .../test_restful_chat_hf_pytorch_llm.py | 74 ++- .../test_restful_chat_hf_turbomind_llm.py | 76 ++- autotest/utils/proxy_distributed_utils.py | 431 ++++++++++++++++++ ...uted_utils.py => ray_distributed_utils.py} | 196 ++++---- 6 files changed, 766 insertions(+), 148 deletions(-) create mode 100644 autotest/utils/proxy_distributed_utils.py rename autotest/utils/{distributed_utils.py => ray_distributed_utils.py} (54%) diff --git a/autotest/conftest.py b/autotest/conftest.py index 618dd3a87a..aa8bced205 100644 --- a/autotest/conftest.py +++ b/autotest/conftest.py @@ -2,7 +2,8 @@ import pytest import yaml -from utils.distributed_utils import RayLMDeployManager +from utils.proxy_distributed_utils import ProxyDistributedManager +from utils.ray_distributed_utils import RayLMDeployManager cli_prompt_case_file = 'autotest/chat_prompt_case.yaml' common_prompt_case_file = 'autotest/prompt_case.yaml' @@ -76,6 +77,38 @@ def shared_ray_manager(): manager.cleanup(force=True) +@pytest.fixture(scope='session') +def shared_proxy_manager(): + master_addr = os.getenv('MASTER_ADDR', 'localhost') + device = os.environ.get('DEVICE', '') + + # Load device-specific or default config + if device: + device_config_path = f'autotest/config-{device}.yaml' + if os.path.exists(device_config_path): + config_path = device_config_path + else: + config_path = config_file + else: + config_path = config_file + + with open(config_path) as f: + env_config = yaml.load(f, Loader=yaml.SafeLoader) + log_dir = env_config.get('log_path', '/tmp/lmdeploy_test') + + # Initialize manager (master starts proxy automatically) + manager = ProxyDistributedManager(health_check=True, proxy_port=PROXY_PORT, log_dir=log_dir) + + if manager.is_master: + print(f'๐ŸŽฏ Master node: LMDeploy Proxy started on {master_addr}:{PROXY_PORT}') + print('โณ Waiting for worker nodes to connect (they will register when starting api_server)...') + + yield manager + + print(f'\n[Final Cleanup] Node {manager.node_rank} performing final resource cleanup...') + manager.cleanup(force=True) + + def pytest_addoption(parser): parser.addoption('--run_id', action='store', default='', help='github run_id') parser.addoption('--device', action='store', default='', help='device config suffix') diff --git a/autotest/evaluate/test_api_evaluate.py b/autotest/evaluate/test_api_evaluate.py index 8e68c741fc..b05627485a 100644 --- a/autotest/evaluate/test_api_evaluate.py +++ b/autotest/evaluate/test_api_evaluate.py @@ -3,8 +3,9 @@ import pytest from utils.config_utils import get_evaluate_pytorch_model_list, get_evaluate_turbomind_model_list, get_workerid -from utils.distributed_utils import worker_node_wait from utils.evaluate_utils import restful_test +from utils.proxy_distributed_utils import proxy_worker_node_wait +from utils.ray_distributed_utils import ray_worker_node_wait from utils.run_restful_chat import start_proxy_server, start_restful_api, stop_restful_api DEFAULT_PORT = 23333 @@ -79,7 +80,7 @@ def prepare_environment_judge_evaluate(request, config, worker_id): stop_restful_api(proxy_pid, proxy_process, request.param) -def _run_distributed_test( +def _run_ray_distributed_test( config, run_id, model_param, @@ -118,7 +119,50 @@ def _run_distributed_test( manager.cleanup(force=False) else: time.sleep(10) - worker_node_wait(manager, timeout_minutes=4880) + ray_worker_node_wait(manager, timeout_minutes=4880) + + +def _run_proxy_distributed_test(config, + run_id, + model_param, + worker_id, + test_type='infer', + manager=None, + eval_config_name='default'): + + assert manager is not None, 'Manager instance must be provided' + + # Adjust eval config for GPT-style models + if 'gpt' in model_param.get('model', '').lower(): + eval_config_name = 'gpt' + + preset_config = EVAL_CONFIGS.get(eval_config_name, {}) + model_name = model_param['model'] + model_path = os.path.join(config['model_path'], model_name) + + manager.start_lmdeploy_api_server_async(model_path=model_path, model_param=model_param) + + if manager.is_master: + try: + + print(f'๐Ÿงช Master node executing {test_type} test ({eval_config_name})...') + result, msg = restful_test(config, + run_id, + model_param, + worker_id=worker_id, + port=manager.proxy_port, + test_type=test_type, + **preset_config) + assert result, f'โŒ {test_type} test failed: {msg}' + print(f'โœ… {test_type} test passed') + + finally: + print('๐Ÿงน Master node cleaning up API servers after test...') + manager.cleanup(force=False) + + else: + print(f'โธ๏ธ Worker node {manager.node_rank} waiting for master to complete test...') + proxy_worker_node_wait(manager, timeout_minutes=4880) def get_turbomind_model_list(tp_num): @@ -217,12 +261,26 @@ def test_turbomind_restful_tp8(config, run_id, prepare_environment, worker_id): @pytest.mark.flaky(reruns=0) @pytest.mark.parametrize('model_param', get_turbomind_model_list(tp_num=16)) def test_turbomind_restful_distributed_tp16(shared_ray_manager, config, run_id, model_param, worker_id): - _run_distributed_test(config=config, - run_id=run_id, - model_param=model_param, - worker_id=worker_id, - test_type='infer', - manager=shared_ray_manager) + _run_ray_distributed_test(config=config, + run_id=run_id, + model_param=model_param, + worker_id=worker_id, + test_type='infer', + manager=shared_ray_manager) + + +@pytest.mark.infer +@pytest.mark.turbomind +@pytest.mark.gpu_num_16 +@pytest.mark.flaky(reruns=0) +@pytest.mark.parametrize('model_param', get_turbomind_model_list(tp_num=16)) +def test_turbomind_restful_distributed_dpep16(shared_proxy_manager, config, run_id, model_param, worker_id): + _run_proxy_distributed_test(config=config, + run_id=run_id, + model_param=model_param, + worker_id=worker_id, + test_type='infer', + manager=shared_proxy_manager) @pytest.mark.infer @@ -286,12 +344,26 @@ def test_pytorch_restful_tp16(config, run_id, prepare_environment, worker_id): @pytest.mark.flaky(reruns=0) @pytest.mark.parametrize('model_param', get_pytorch_model_list(tp_num=16)) def test_pytorch_restful_distributed_tp16(shared_ray_manager, config, run_id, model_param, worker_id): - _run_distributed_test(config=config, - run_id=run_id, - model_param=model_param, - worker_id=worker_id, - test_type='infer', - manager=shared_ray_manager) + _run_ray_distributed_test(config=config, + run_id=run_id, + model_param=model_param, + worker_id=worker_id, + test_type='infer', + manager=shared_ray_manager) + + +@pytest.mark.infer +@pytest.mark.pytorch +@pytest.mark.gpu_num_16 +@pytest.mark.flaky(reruns=0) +@pytest.mark.parametrize('model_param', get_pytorch_model_list(tp_num=16)) +def test_pytorch_restful_distributed_dpep16(shared_proxy_manager, config, run_id, model_param, worker_id): + _run_proxy_distributed_test(config=config, + run_id=run_id, + model_param=model_param, + worker_id=worker_id, + test_type='infer', + manager=shared_proxy_manager) @pytest.mark.eval diff --git a/autotest/tools/restful/test_restful_chat_hf_pytorch_llm.py b/autotest/tools/restful/test_restful_chat_hf_pytorch_llm.py index 2100602e6a..04831cfb31 100644 --- a/autotest/tools/restful/test_restful_chat_hf_pytorch_llm.py +++ b/autotest/tools/restful/test_restful_chat_hf_pytorch_llm.py @@ -3,7 +3,8 @@ import pytest from utils.config_utils import get_torch_model_list, get_workerid -from utils.distributed_utils import worker_node_wait +from utils.proxy_distributed_utils import proxy_worker_node_wait +from utils.ray_distributed_utils import ray_worker_node_wait from utils.run_restful_chat import run_all_step, run_reasoning_case, run_tools_case, start_restful_api, stop_restful_api DEFAULT_PORT = 23333 @@ -12,13 +13,16 @@ @pytest.fixture(scope='function', autouse=True) def prepare_environment(request, config, worker_id): - param = request.param - model = param['model'] - model_path = config.get('model_path') + '/' + model - - pid, startRes = start_restful_api(config, param, model, model_path, 'pytorch', worker_id) - yield - stop_restful_api(pid, startRes, param) + if hasattr(request, 'param'): + param = request.param + model = param['model'] + model_path = config.get('model_path') + '/' + model + + pid, startRes = start_restful_api(config, param, model, model_path, 'pytorch', worker_id) + yield + stop_restful_api(pid, startRes, param) + else: + yield def getModelList(tp_num): @@ -38,7 +42,7 @@ def getPrefixCacheModelList(tp_num): } for item in get_torch_model_list(tp_num, exclude_dup=True)] -def _run_distributed_test( +def _run_ray_distributed_test( config, model_param, common_case_config, @@ -63,7 +67,35 @@ def _run_distributed_test( manager.cleanup(force=False) else: time.sleep(10) - worker_node_wait(manager, timeout_minutes=4880) + ray_worker_node_wait(manager, timeout_minutes=4880) + + +def _run_proxy_distributed_test( + config, + model_param, + common_case_config, + worker_id, + manager=None, # โ† New parameter: pass in shared manager +): + """Universal distributed test executor (using shared Ray cluster)""" + assert manager is not None, 'Manager instance must be provided' + model_name = model_param['model'] + model_path = os.path.join(config['model_path'], model_name) + + # Start API Server for current model (master node starts/stops, worker nodes verify) + manager.start_lmdeploy_api_server_async(model_path=model_path, model_param=model_param) + + if manager.is_master: + + try: + run_all_step(config, common_case_config, worker_id=worker_id, port=PROXY_PORT) + + finally: + # Clean up API Server for current model (worker nodes skip) + manager.cleanup(force=False) + else: + time.sleep(10) + proxy_worker_node_wait(manager, timeout_minutes=4880) @pytest.mark.order(7) @@ -149,11 +181,23 @@ def test_restful_chat_tp16(config, common_case_config, worker_id): @pytest.mark.restful_api_pytorch @pytest.mark.parametrize('model_param', getModelList(tp_num=16)) def test_restful_chat_distributed_tp16(shared_ray_manager, config, model_param, common_case_config, worker_id): - _run_distributed_test(config=config, - model_param=model_param, - common_case_config=common_case_config, - worker_id=worker_id, - manager=shared_ray_manager) + _run_ray_distributed_test(config=config, + model_param=model_param, + common_case_config=common_case_config, + worker_id=worker_id, + manager=shared_ray_manager) + + +@pytest.mark.order(7) +@pytest.mark.usefixtures('common_case_config') +@pytest.mark.restful_api_pytorch +@pytest.mark.parametrize('model_param', getModelList(tp_num=16)) +def test_restful_chat_distributed_dpep16(shared_proxy_manager, config, model_param, common_case_config, worker_id): + _run_proxy_distributed_test(config=config, + model_param=model_param, + common_case_config=common_case_config, + worker_id=worker_id, + manager=shared_proxy_manager) def getKvintModelList(tp_num, quant_policy): diff --git a/autotest/tools/restful/test_restful_chat_hf_turbomind_llm.py b/autotest/tools/restful/test_restful_chat_hf_turbomind_llm.py index 30794b90a4..fcd197be16 100644 --- a/autotest/tools/restful/test_restful_chat_hf_turbomind_llm.py +++ b/autotest/tools/restful/test_restful_chat_hf_turbomind_llm.py @@ -3,7 +3,8 @@ import pytest from utils.config_utils import get_communicator_list, get_turbomind_model_list, get_workerid -from utils.distributed_utils import worker_node_wait +from utils.proxy_distributed_utils import proxy_worker_node_wait +from utils.ray_distributed_utils import ray_worker_node_wait from utils.run_restful_chat import (run_all_step, run_reasoning_case, run_tools_case, start_restful_api, stop_restful_api, test_logprobs) @@ -13,13 +14,16 @@ @pytest.fixture(scope='function', autouse=True) def prepare_environment(request, config, worker_id): - param = request.param - model = param['model'] - model_path = config.get('model_path') + '/' + model - - pid, startRes = start_restful_api(config, param, model, model_path, 'turbomind', worker_id) - yield - stop_restful_api(pid, startRes, param) + if hasattr(request, 'param'): + param = request.param + model = param['model'] + model_path = config.get('model_path') + '/' + model + + pid, startRes = start_restful_api(config, param, model, model_path, 'pytorch', worker_id) + yield + stop_restful_api(pid, startRes, param) + else: + yield def getModelList(tp_num): @@ -46,7 +50,7 @@ def getPrefixCacheModelList(tp_num): return model_list -def _run_distributed_test( +def _run_ray_distributed_test( config, model_param, common_case_config, @@ -71,7 +75,35 @@ def _run_distributed_test( manager.cleanup(force=False) else: time.sleep(10) - worker_node_wait(manager, timeout_minutes=4880) + ray_worker_node_wait(manager, timeout_minutes=4880) + + +def _run_proxy_distributed_test( + config, + model_param, + common_case_config, + worker_id, + manager=None, # โ† New parameter: pass in shared manager +): + """Universal distributed test executor (using shared Ray cluster)""" + assert manager is not None, 'Manager instance must be provided' + model_name = model_param['model'] + model_path = os.path.join(config['model_path'], model_name) + + # Start API Server for current model (master node starts/stops, worker nodes verify) + manager.start_lmdeploy_api_server_async(model_path=model_path, model_param=model_param) + + if manager.is_master: + + try: + run_all_step(config, common_case_config, worker_id=worker_id, port=PROXY_PORT) + + finally: + # Clean up API Server for current model (worker nodes skip) + manager.cleanup(force=False) + else: + time.sleep(10) + proxy_worker_node_wait(manager, timeout_minutes=4880) @pytest.mark.order(7) @@ -138,13 +170,27 @@ def test_restful_chat_tp8(config, common_case_config, worker_id): @pytest.mark.order(7) @pytest.mark.usefixtures('common_case_config') @pytest.mark.restful_api +@pytest.mark.gpu_num_distributed_16 @pytest.mark.parametrize('model_param', getModelList(tp_num=16)) def test_restful_chat_distributed_tp16(shared_ray_manager, config, model_param, common_case_config, worker_id): - _run_distributed_test(config=config, - model_param=model_param, - common_case_config=common_case_config, - worker_id=worker_id, - manager=shared_ray_manager) + _run_ray_distributed_test(config=config, + model_param=model_param, + common_case_config=common_case_config, + worker_id=worker_id, + manager=shared_ray_manager) + + +@pytest.mark.order(7) +@pytest.mark.usefixtures('common_case_config') +@pytest.mark.restful_api +@pytest.mark.gpu_num_distributed_16 +@pytest.mark.parametrize('model_param', getModelList(tp_num=16)) +def test_restful_chat_distributed_dpep16(shared_proxy_manager, config, model_param, common_case_config, worker_id): + _run_proxy_distributed_test(config=config, + model_param=model_param, + common_case_config=common_case_config, + worker_id=worker_id, + manager=shared_proxy_manager) def getKvintModelList(tp_num, quant_policy): diff --git a/autotest/utils/proxy_distributed_utils.py b/autotest/utils/proxy_distributed_utils.py new file mode 100644 index 0000000000..103ea96bb6 --- /dev/null +++ b/autotest/utils/proxy_distributed_utils.py @@ -0,0 +1,431 @@ +import os +import random +import socket +import subprocess +import time +from time import time as time_time +from typing import Any, Dict, Tuple + +import requests + +# Default constants +LM_DEPLOY_PROXY_PORT = 8000 +HEALTH_CHECK_TIMEOUT = 30 +CONNECTION_CHECK_TIMEOUT = 5 +WORKER_WAIT_INTERVAL = 30 + + +def wait_for_model_service_ready(host: str, + proxy_port: int, + model_name: str, + timeout_seconds: int = 1500, + expected_nodes: int = None) -> bool: + """Wait for LM Deploy Proxy + backend workers to be fully ready, ensuring + all nodes are registered. + + Check all nodes' readiness status through /nodes/status API. + """ + if expected_nodes: + print(f'โณ Waiting for model service to be fully ready (Model: {model_name}), ' + f'expected nodes: {expected_nodes}, timeout: {timeout_seconds}s') + else: + print(f'โณ Waiting for model service to be fully ready (Model: {model_name}), ' + f'timeout: {timeout_seconds}s') + + start_time = time_time() + check_count = 0 + last_progress_print = 0 + progress_print_interval = 30 + + initial_delay = random.uniform(1, 5) + time.sleep(initial_delay) + + while time_time() - start_time < timeout_seconds: + check_count += 1 + current_time = time_time() + + try: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.settimeout(5) + if sock.connect_ex((host, proxy_port)) != 0: + if current_time - last_progress_print >= progress_print_interval: + print(f'๐Ÿ”Œ Check {check_count}: proxy port not ready') + last_progress_print = current_time + time.sleep(10) + continue + + if expected_nodes: + nodes_ready, ready_nodes = check_nodes_status(host, proxy_port, model_name, expected_nodes, check_count, + current_time, last_progress_print, + progress_print_interval) + if not nodes_ready: + if ready_nodes is not None and current_time - last_progress_print >= progress_print_interval: + last_progress_print = current_time + continue + + service_ready = verify_service_functionality(host, proxy_port, model_name, check_count) + if service_ready: + if expected_nodes: + print(f'โœ… All {expected_nodes} nodes are ready and service is functional!') + else: + print('โœ… Model service is fully ready!') + return True + + except requests.exceptions.RequestException as e: + if current_time - last_progress_print >= progress_print_interval: + print(f'๐Ÿ”ง Check {check_count}: Request exception - {e}') + last_progress_print = current_time + except Exception as e: + if current_time - last_progress_print >= progress_print_interval: + print(f'๐Ÿ”ง Check {check_count}: Unknown exception - {e}') + last_progress_print = current_time + + sleep_time = 10 + random.uniform(-2, 2) + time.sleep(sleep_time) + + print(f'โŒ Model service startup timed out ({timeout_seconds} seconds)') + return False + + +def check_nodes_status(host: str, proxy_port: int, model_name: str, expected_nodes: int, check_count: int, + current_time: float, last_progress_print: float, + progress_print_interval: int) -> Tuple[bool, int]: + try: + nodes_url = f'http://{host}:{proxy_port}/nodes/status' + resp = requests.get(nodes_url, timeout=10) + + if resp.status_code != 200: + if current_time - last_progress_print >= progress_print_interval: + print(f'๐Ÿ”ง Check {check_count}: Failed to get node status, status code: {resp.status_code}') + return False, 0 + + nodes_data = resp.json() + ready_nodes = 0 + total_nodes = len(nodes_data) + + for node_url, node_info in nodes_data.items(): + models = node_info.get('models', []) + if model_name in models: + ready_nodes += 1 + + should_print = current_time - last_progress_print >= progress_print_interval + + if should_print: + print(f'๐Ÿ“Š Check {check_count}: Node readiness progress: {ready_nodes}/{expected_nodes} ' + f'(Total nodes: {total_nodes})') + for node_url, node_info in nodes_data.items(): + models = node_info.get('models', []) + basename = os.path.basename(model_name) + if model_name in models: + print(f' โœ… Node {node_url} registered model {basename}') + else: + print(f' โณ Node {node_url} has not registered target model') + + if ready_nodes >= expected_nodes: + if should_print: + print(f'๐ŸŽฏ All {expected_nodes} nodes have registered the target model') + return True, ready_nodes + else: + if should_print: + print(f'โณ Waiting for more nodes to register... ({ready_nodes}/{expected_nodes})') + return False, ready_nodes + + except Exception as e: + if current_time - last_progress_print >= progress_print_interval: + print(f'๐Ÿ”ง Check {check_count}: Exception getting node status - {e}') + return False, 0 + + +def verify_service_functionality(host: str, proxy_port: int, model_name: str, check_count: int) -> bool: + try: + test_data = { + 'model': model_name, + 'messages': [{ + 'role': 'user', + 'content': 'hi' + }], + 'max_tokens': 5, + 'stream': False + } + + resp = requests.post(f'http://{host}:{proxy_port}/v1/chat/completions', json=test_data, timeout=15) + + if resp.status_code == 200: + print(f'โœ… Check {check_count}: Service functionality OK (received valid response)') + return True + elif resp.status_code == 400: + print(f'โœ… Check {check_count}: Service framework activated (received 400)') + return True + else: + print(f'๐Ÿ”ง Check {check_count}: Service functionality test failed, status code: {resp.status_code}') + return False + + except requests.exceptions.RequestException as e: + print(f'๐Ÿ”ง Check {check_count}: Service functionality test exception - {e}') + return False + + +class ProxyDistributedManager: + + def __init__(self, health_check: bool = True, proxy_port: int = None, log_dir: str = '.'): + self.health_check = health_check + self.proxy_port = proxy_port or LM_DEPLOY_PROXY_PORT + self.log_dir = log_dir + self._cleaned = False + + self._lmdeploy_proxy_process = None + self._local_lmdeploy_process = None + + self.node_rank = int(os.getenv('NODE_RANK', '0')) + self.is_master = (self.node_rank == 0) + + os.makedirs(self.log_dir, exist_ok=True) + + role = 'master' if self.is_master else 'worker' + timestamp = time.strftime('%Y%m%d_%H%M%S') + log_filename = f'lmdeploy_{role}_rank{self.node_rank}_{timestamp}.log' + self._lmdeploy_log_path = os.path.join(self.log_dir, log_filename) + + self._setup_from_env() + self._setup_distributed_cluster() + + print(f'๐Ÿ“ Node {self.node_rank} LMDeploy log path: {self._lmdeploy_log_path}') + + def _setup_from_env(self): + self.node_count = int(os.getenv('NODE_COUNT', '1')) + self.master_addr = os.getenv('MASTER_ADDR', 'localhost') + self.proc_per_node = int(os.getenv('PROC_PER_NODE', '1')) + self.job_id = os.getenv('JOB_ID', 'unknown') + self.total_gpus = self.node_count * self.proc_per_node + + print(f'๐ŸŽฏ Node {self.node_rank} distributed environment info:') + print(f' - Nodes: {self.node_count} nodes ร— {self.proc_per_node} GPUs = {self.total_gpus} GPUs') + print(f" - Current: Rank {self.node_rank} ({'Master node' if self.is_master else 'Worker node'})") + print(f' - Master address: {self.master_addr}') + print(f' - Proxy port: {self.proxy_port}') + print(f' - Job ID: {self.job_id}') + + def _setup_distributed_cluster(self): + if self.is_master: + self._start_lmdeploy_proxy() + if self.health_check: + self._basic_health_check() + + def _start_lmdeploy_proxy(self): + print(f'๐Ÿš€ Master node starting lmdeploy proxy (port: {self.proxy_port})...') + env = os.environ.copy() + self._lmdeploy_proxy_process = subprocess.Popen([ + 'lmdeploy', + 'serve', + 'proxy', + '--server-name', + self.master_addr, + '--server-port', + str(self.proxy_port), + '--routing-strategy', + 'min_expected_latency', + '--serving-strategy', + 'Hybrid', + ], + env=env) + time.sleep(10) + + if self._check_service_health(self.proxy_port): + print('โœ… lmdeploy proxy started successfully') + else: + print('โš ๏ธ lmdeploy proxy may have issues starting') + + def start_lmdeploy_api_server_async(self, + model_path: str, + model_param: dict, + start_timeout: int = 1500) -> Tuple[int, subprocess.Popen]: + total_gpus_per_node = self.proc_per_node + total_nodes = self.node_count + + ep = total_gpus_per_node * total_nodes + dp = total_gpus_per_node * total_nodes + + backend = model_param.get('backend', 'turbomind') + communicator = model_param.get('communicator', 'nccl') + quant_policy = model_param.get('quant_policy', 0) + + full_command = [ + 'lmdeploy', 'serve', 'api_server', model_path, '--backend', backend, '--tp', + str(1), '--ep', + str(ep), '--dp', + str(dp), '--proxy-url', f'http://{self.master_addr}:{self.proxy_port}', '--nnodes', + str(total_nodes), '--node-rank', + str(self.node_rank), '--communicator', communicator + ] + + if backend == 'turbomind': + full_command.extend(['--quant-policy', str(quant_policy)]) + + cmd = ' '.join(full_command) + print(f'๐ŸŽฏ Node {self.node_rank} start command: {cmd}') + + env = os.environ.copy() + env.update({ + 'DEEPEP_MAX_BATCH_SIZE': '256', + }) + + if dp > 1: + env.update({ + 'LMDEPLOY_DP_MASTER_ADDR': self.master_addr, + 'LMDEPLOY_DP_MASTER_PORT': '29555', + }) + + log_file = open(self._lmdeploy_log_path, 'w') + + try: + self._local_lmdeploy_process = subprocess.Popen(full_command, + stdout=log_file, + stderr=log_file, + env=env, + text=True, + encoding='utf-8') + pid = self._local_lmdeploy_process.pid + print(f'๐Ÿš€ Node {self.node_rank} started lmdeploy api_server (PID: {pid}), log: {self._lmdeploy_log_path}') + + if self.health_check: + expected_nodes = self.node_count + ready = wait_for_model_service_ready(host=self.master_addr, + proxy_port=self.proxy_port, + model_name=model_path, + timeout_seconds=start_timeout, + expected_nodes=expected_nodes) + if not ready: + print(f'โŒ Node {self.node_rank}: Model service could not be ready within timeout, ' + f'terminating local process') + self._local_lmdeploy_process.terminate() + try: + self._local_lmdeploy_process.wait(timeout=10) + except subprocess.TimeoutExpired: + self._local_lmdeploy_process.kill() + log_file.close() + return 0, self._local_lmdeploy_process + + log_file.close() + return pid, self._local_lmdeploy_process + + except Exception as e: + print(f'๐Ÿ’ฅ Node {self.node_rank} failed to start lmdeploy api_server: {e}') + log_file.close() + raise + + def is_lmdeploy_running(self): + return self._local_lmdeploy_process is not None and self._local_lmdeploy_process.poll() is None + + def _basic_health_check(self): + print(f'๐Ÿ” Node {self.node_rank} performing basic health check...') + if self.is_master: + ok = self._check_service_health(self.proxy_port) + status = 'โœ… lmdeploy proxy service healthy' if ok else 'โš ๏ธ lmdeploy proxy service may have issues' + else: + ok = self._check_connection_to_master(self.proxy_port) + status = 'โœ… Connection to master node normal' if ok else 'โš ๏ธ Connection to master node may have issues' + print(status) + + def _check_service_health(self, port: int) -> bool: + try: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.settimeout(HEALTH_CHECK_TIMEOUT) + return sock.connect_ex((self.master_addr, port)) == 0 + except Exception: + return False + + def _check_connection_to_master(self, port: int = None) -> bool: + p = port or self.proxy_port + try: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.settimeout(CONNECTION_CHECK_TIMEOUT) + return sock.connect_ex((self.master_addr, p)) == 0 + except Exception: + return False + + def get_cluster_info(self) -> Dict[str, Any]: + return { + 'node_rank': self.node_rank, + 'node_count': self.node_count, + 'master_addr': self.master_addr, + 'proc_per_node': self.proc_per_node, + 'total_gpus': self.total_gpus, + 'job_id': self.job_id, + 'is_master': self.is_master, + 'proxy_port': self.proxy_port + } + + def cleanup(self, force: bool = True): + """Clean up resources. + + Args: + force (bool): + - False: Only stop LMDeploy API Server (used after individual test completion) + - True: Stop API Server + Proxy (if master) and mark as fully cleaned (session end) + """ + if self._cleaned and force: + return + + print(f'๐Ÿงน Node {self.node_rank} cleaning resources... (force={force})') + + # --- Stop local LMDeploy API Server (all nodes) --- + if hasattr(self, '_local_lmdeploy_process') and self._local_lmdeploy_process is not None: + if self._local_lmdeploy_process.poll() is None: + try: + self._local_lmdeploy_process.terminate() + self._local_lmdeploy_process.wait(timeout=10) + print(f'โœ… Node {self.node_rank}: LMDeploy API Server stopped') + except subprocess.TimeoutExpired: + print(f'โš ๏ธ Node {self.node_rank}: API Server stop timeout, forcing kill') + self._local_lmdeploy_process.kill() + + # --- Stop LMDeploy Proxy (master node only, only when force=True) --- + if force and self.is_master: + if hasattr(self, '_lmdeploy_proxy_process') and self._lmdeploy_proxy_process is not None: + if self._lmdeploy_proxy_process.poll() is None: + try: + self._lmdeploy_proxy_process.terminate() + self._lmdeploy_proxy_process.wait(timeout=10) + print('โœ… LMDeploy Proxy stopped') + except subprocess.TimeoutExpired: + print('โš ๏ธ LMDeploy Proxy stop timeout, forcing kill') + self._lmdeploy_proxy_process.kill() + + # Mark as fully cleaned only on final cleanup + if force: + self._cleaned = True + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.cleanup(force=True) + + +def proxy_worker_node_wait(manager: ProxyDistributedManager, timeout_minutes: int = 60): + print(f'โธ๏ธ Worker node {manager.node_rank} entering monitoring mode...') + + max_checks = (timeout_minutes * 60) // WORKER_WAIT_INTERVAL + consecutive_failures = 0 + max_consecutive_failures = 3 + + for i in range(max_checks): + if not manager._check_connection_to_master(): + consecutive_failures += 1 + print(f'โš ๏ธ Master node connection failed ({consecutive_failures}/{max_consecutive_failures})') + if consecutive_failures >= max_consecutive_failures: + print('๐Ÿ“ก Master node service stopped, worker node exiting') + break + else: + consecutive_failures = 0 + + if i % 4 == 0: + elapsed = (i * WORKER_WAIT_INTERVAL) // 60 + print(f'โณ Worker node {manager.node_rank} monitoring... Running for {elapsed} minutes') + + time.sleep(WORKER_WAIT_INTERVAL) + else: + print(f'โฐ Worker node {manager.node_rank} monitoring timed out ({timeout_minutes} minutes)') + + manager.cleanup(force=False) # Worker node only cleans up its own API Server when exiting + print(f'โœ… Worker node {manager.node_rank} completed waiting') diff --git a/autotest/utils/distributed_utils.py b/autotest/utils/ray_distributed_utils.py similarity index 54% rename from autotest/utils/distributed_utils.py rename to autotest/utils/ray_distributed_utils.py index c77d644175..71506f7024 100644 --- a/autotest/utils/distributed_utils.py +++ b/autotest/utils/ray_distributed_utils.py @@ -1,12 +1,12 @@ -# utils/distributed_utils.py import os -import subprocess -import time import random import socket -import requests -from typing import Dict, Any +import subprocess +import time from time import time as time_time +from typing import Any, Dict + +import requests # Default constants LM_DEPLOY_API_PORT = 8000 @@ -22,12 +22,12 @@ def wait_for_model_service_ready( model_name: str, timeout_seconds: int = 1000, ) -> bool: - """ - Wait for LMDeploy API Server to be ready and verify basic functionality. + """Wait for LMDeploy API Server to be ready and verify basic functionality. + No longer checks multi-node registration (API Server is a single-point service). """ - print(f"โณ Waiting for LMDeploy API Server to be ready (Model: {model_name}), Timeout: {timeout_seconds}s") - + print(f'โณ Waiting for LMDeploy API Server to be ready (Model: {model_name}), Timeout: {timeout_seconds}s') + start_time = time_time() check_count = 0 last_progress_print = 0 @@ -46,67 +46,66 @@ def wait_for_model_service_ready( sock.settimeout(5) if sock.connect_ex((host, api_port)) != 0: if current_time - last_progress_print >= progress_print_interval: - print(f"๐Ÿ”Œ Check {check_count}: API port {api_port} not ready") + print(f'๐Ÿ”Œ Check {check_count}: API port {api_port} not ready') last_progress_print = current_time time.sleep(10) continue # Verify service functionality if verify_service_functionality(host, api_port, model_name, check_count): - print("โœ… LMDeploy API Server is fully ready!") + print('โœ… LMDeploy API Server is fully ready!') return True except Exception as e: if current_time - last_progress_print >= progress_print_interval: - print(f"๐Ÿ”ง Check {check_count}: Exception - {e}") + print(f'๐Ÿ”ง Check {check_count}: Exception - {e}') last_progress_print = current_time sleep_time = 10 + random.uniform(-2, 2) time.sleep(sleep_time) - print(f"โŒ LMDeploy API Server startup timed out ({timeout_seconds} seconds)") + print(f'โŒ LMDeploy API Server startup timed out ({timeout_seconds} seconds)') return False def verify_service_functionality(host: str, api_port: int, model_name: str, check_count: int) -> bool: - """Verify that the API Server can respond to basic requests""" + """Verify that the API Server can respond to basic requests.""" try: test_data = { - "model": model_name, - "messages": [{"role": "user", "content": "hi"}], - "max_tokens": 5, - "stream": False + 'model': model_name, + 'messages': [{ + 'role': 'user', + 'content': 'hi' + }], + 'max_tokens': 5, + 'stream': False } - resp = requests.post( - f"http://{host}:{api_port}/v1/chat/completions", - json=test_data, - timeout=15 - ) + resp = requests.post(f'http://{host}:{api_port}/v1/chat/completions', json=test_data, timeout=15) if resp.status_code == 200: - print(f"โœ… Check {check_count}: Service functionality normal (received valid response)") + print(f'โœ… Check {check_count}: Service functionality normal (received valid response)') return True elif resp.status_code == 400: - print(f"โœ… Check {check_count}: Service framework activated (received 400)") + print(f'โœ… Check {check_count}: Service framework activated (received 400)') return True else: - print(f"๐Ÿ”ง Check {check_count}: Service test failed, status code: {resp.status_code}") + print(f'๐Ÿ”ง Check {check_count}: Service test failed, status code: {resp.status_code}') return False except requests.exceptions.RequestException as e: - print(f"๐Ÿ”ง Check {check_count}: Service test exception - {e}") + print(f'๐Ÿ”ง Check {check_count}: Service test exception - {e}') return False - class RayLMDeployManager: + def __init__( self, master_addr: str, ray_port: int = RAY_PORT, api_port: int = LM_DEPLOY_API_PORT, - log_dir: str = ".", + log_dir: str = '.', health_check: bool = True, ): self.master_addr = master_addr @@ -117,37 +116,37 @@ def __init__( self._cleaned = False # Determine if this is the master node (via environment variable NODE_RANK) - self.node_rank = int(os.getenv("NODE_RANK", "0")) + self.node_rank = int(os.getenv('NODE_RANK', '0')) self.is_master = (self.node_rank == 0) os.makedirs(self.log_dir, exist_ok=True) - print(f"๐Ÿ“ Node {self.node_rank} log directory: {self.log_dir}") + print(f'๐Ÿ“ Node {self.node_rank} log directory: {self.log_dir}') # Print cluster information - self.node_count = int(os.getenv("NODE_COUNT", "1")) - self.job_id = os.getenv("JOB_ID", "unknown") - print(f"๐ŸŽฏ Node {self.node_rank} cluster information:") - print(f" - Total nodes: {self.node_count}") + self.node_count = int(os.getenv('NODE_COUNT', '1')) + self.job_id = os.getenv('JOB_ID', 'unknown') + print(f'๐ŸŽฏ Node {self.node_rank} cluster information:') + print(f' - Total nodes: {self.node_count}') print(f" - Role: {'Master node' if self.is_master else 'Worker node'}") - print(f" - Master address: {self.master_addr}") - print(f" - Ray port: {self.ray_port}") - print(f" - API port: {self.api_port}") - print(f" - Job ID: {self.job_id}") + print(f' - Master address: {self.master_addr}') + print(f' - Ray port: {self.ray_port}') + print(f' - API port: {self.api_port}') + print(f' - Job ID: {self.job_id}') def start_ray_cluster(self): - """Start or join Ray cluster""" + """Start or join Ray cluster.""" if self.is_master: - cmd = ["ray", "start", "--head", "--port", str(self.ray_port)] - print(f"๐Ÿš€ Master node starting Ray cluster (Port: {self.ray_port})") + cmd = ['ray', 'start', '--head', '--port', str(self.ray_port)] + print(f'๐Ÿš€ Master node starting Ray cluster (Port: {self.ray_port})') else: - cmd = ["ray", "start", "--address", f"{self.master_addr}:{self.ray_port}"] - print(f"๐Ÿ”Œ Worker node {self.node_rank} joining Ray cluster: {self.master_addr}:{self.ray_port}") + cmd = ['ray', 'start', '--address', f'{self.master_addr}:{self.ray_port}'] + print(f'๐Ÿ”Œ Worker node {self.node_rank} joining Ray cluster: {self.master_addr}:{self.ray_port}') try: - result = subprocess.run(cmd, capture_output=True, text=True, check=True) - print("โœ… Ray started successfully") + subprocess.run(cmd, capture_output=True, text=True, check=True) + print('โœ… Ray started successfully') except subprocess.CalledProcessError as e: - print(f"๐Ÿ’ฅ Ray startup failed: {e.stderr}") + print(f'๐Ÿ’ฅ Ray startup failed: {e.stderr}') raise def start_lmdeploy_api_server(self, model_path: str, model_param: dict): @@ -157,66 +156,60 @@ def start_lmdeploy_api_server(self, model_path: str, model_param: dict): """ if self.is_master: # === Master node logic: Start service === - log_path = os.path.join(self.log_dir, "lmdeploy_api.log") + timestamp = time.strftime('%Y%m%d_%H%M%S') + log_path = os.path.join(self.log_dir, f'lmdeploy_api_{timestamp}.log') tp = model_param.get('tp_num', 1) backend = model_param.get('backend', 'turbomind') communicator = model_param.get('communicator', 'nccl') quant_policy = model_param.get('quant_policy', 0) - with open(log_path, "w") as log_file: + with open(log_path, 'w') as log_file: cmd = [ - "lmdeploy", "serve", "api_server", - model_path, - "--server-port", str(self.api_port), - "--tp", str(tp), - "--backend", backend, - "--communicator", communicator + 'lmdeploy', 'serve', 'api_server', model_path, '--server-port', + str(self.api_port), '--tp', + str(tp), '--backend', backend, '--communicator', communicator ] - - if backend == "turbomind": - cmd.extend(["--quant-policy", str(quant_policy)]) + + if backend == 'turbomind': + cmd.extend(['--quant-policy', str(quant_policy)]) print(f"๐Ÿš€ Master node starting LMDeploy API Server: {' '.join(cmd)}") self._api_process = subprocess.Popen(cmd, stdout=log_file, stderr=log_file) - print(f"๐Ÿ“ API Server log: {log_path}") + print(f'๐Ÿ“ API Server log: {log_path}') # Wait for service to be ready if self.health_check: - ready = wait_for_model_service_ready( - host=self.master_addr, - api_port=self.api_port, - model_name=model_path, - timeout_seconds=1000 - ) + ready = wait_for_model_service_ready(host=self.master_addr, + api_port=self.api_port, + model_name=model_path, + timeout_seconds=1000) if not ready: - print("โŒ API Server failed to be ready, terminating process") + print('โŒ API Server failed to be ready, terminating process') self._api_process.terminate() try: self._api_process.wait(timeout=10) except subprocess.TimeoutExpired: self._api_process.kill() - raise RuntimeError("LMDeploy API Server failed to start") + raise RuntimeError('LMDeploy API Server failed to start') else: # === Worker node logic: Only verify that the master node service is ready === - print(f"๐Ÿ” Worker node {self.node_rank} is verifying that the master node ({self.master_addr}:{self.api_port}) API Server is ready...") + print(f'๐Ÿ” Worker node {self.node_rank} is verifying that the master node ' + f'({self.master_addr}:{self.api_port}) API Server is ready...') if self.health_check: - ready = wait_for_model_service_ready( - host=self.master_addr, - api_port=self.api_port, - model_name=model_path, - timeout_seconds=1000 - ) + ready = wait_for_model_service_ready(host=self.master_addr, + api_port=self.api_port, + model_name=model_path, + timeout_seconds=1000) if not ready: - raise RuntimeError( - f"Worker node {self.node_rank}: Master node API Server not ready within 1000 seconds, cannot continue" - ) + raise RuntimeError(f'Worker node {self.node_rank}: Master node API Server not ready ' + f'within 1000 seconds, cannot continue') else: - print("โš ๏ธ health_check=False, skipping API Server readiness check (not recommended)") + print('โš ๏ธ health_check=False, skipping API Server readiness check (not recommended)') def cleanup(self, force: bool = True): - """Clean up resources - + """Clean up resources. + Args: - force (bool): + force (bool): - False: Only stop LMDeploy API Server (used after individual test completion) - True: Stop API Server + Ray cluster (used for final cleanup at session end) """ @@ -225,7 +218,7 @@ def cleanup(self, force: bool = True): # So only skip when force=True and already cleaned return - print(f"๐Ÿงน Node {self.node_rank} cleaning resources... (force={force})") + print(f'๐Ÿงน Node {self.node_rank} cleaning resources... (force={force})') # Stop API Server (master node only) if hasattr(self, '_api_process') and self._api_process.poll() is None: @@ -234,27 +227,27 @@ def cleanup(self, force: bool = True): self._api_process.wait(timeout=10) except subprocess.TimeoutExpired: self._api_process.kill() - print("โœ… LMDeploy API Server stopped") + print('โœ… LMDeploy API Server stopped') # Note: We don't clear the _api_process attribute here so it can be checked later # Stop Ray (only when force=True) if force: try: - subprocess.run(["ray", "stop", "--force"], check=False, capture_output=True) - print("โœ… Ray cluster stopped") + subprocess.run(['ray', 'stop', '--force'], check=False, capture_output=True) + print('โœ… Ray cluster stopped') except Exception as e: - print(f"โš ๏ธ Ray stop exception: {e}") + print(f'โš ๏ธ Ray stop exception: {e}') self._cleaned = True # Only mark as "fully cleaned" when force=True def get_cluster_info(self) -> Dict[str, Any]: return { - "node_rank": self.node_rank, - "node_count": self.node_count, - "master_addr": self.master_addr, - "ray_port": self.ray_port, - "api_port": self.api_port, - "is_master": self.is_master, - "job_id": self.job_id, + 'node_rank': self.node_rank, + 'node_count': self.node_count, + 'master_addr': self.master_addr, + 'ray_port': self.ray_port, + 'api_port': self.api_port, + 'is_master': self.is_master, + 'job_id': self.job_id, } def __enter__(self): @@ -264,14 +257,13 @@ def __exit__(self, exc_type, exc_val, exc_tb): self.cleanup() -def worker_node_wait(manager: RayLMDeployManager, timeout_minutes: int = 60): - """ - Worker node waits for Ray master node (Head Node) to be alive (by detecting GCS service port) - """ +def ray_worker_node_wait(manager: RayLMDeployManager, timeout_minutes: int = 60): + """Worker node waits for Ray master node (Head Node) to be alive (by + detecting GCS service port)""" if manager.is_master: return - print(f"โธ๏ธ Worker node {manager.node_rank} entering wait mode...") + print(f'โธ๏ธ Worker node {manager.node_rank} entering wait mode...') max_checks = (timeout_minutes * 60) // WORKER_WAIT_INTERVAL consecutive_failures = 0 max_consecutive_failures = 3 @@ -288,15 +280,15 @@ def worker_node_wait(manager: RayLMDeployManager, timeout_minutes: int = 60): consecutive_failures += 1 if consecutive_failures >= max_consecutive_failures: - print("๐Ÿ“ก Ray master node GCS service unreachable, worker node exiting") + print('๐Ÿ“ก Ray master node GCS service unreachable, worker node exiting') break if i % 4 == 0: elapsed = (i * WORKER_WAIT_INTERVAL) // 60 - print(f"โณ Worker node {manager.node_rank} waiting... Running for {elapsed} minutes") + print(f'โณ Worker node {manager.node_rank} waiting... Running for {elapsed} minutes') time.sleep(WORKER_WAIT_INTERVAL) else: - print(f"โฐ Worker node {manager.node_rank} wait timeout ({timeout_minutes} minutes)") + print(f'โฐ Worker node {manager.node_rank} wait timeout ({timeout_minutes} minutes)') - manager.cleanup() \ No newline at end of file + manager.cleanup() From b9ca337215f3764d7871c0c46e06f112cb6fade1 Mon Sep 17 00:00:00 2001 From: littlegy <787321726@qq.com> Date: Fri, 28 Nov 2025 10:44:20 +0800 Subject: [PATCH 3/5] update deep test --- autotest/conftest.py | 25 +- autotest/evaluate/test_api_evaluate.py | 52 +- .../test_restful_chat_hf_pytorch_llm.py | 29 +- .../test_restful_chat_hf_turbomind_llm.py | 29 +- autotest/utils/evaluate_utils.py | 9 +- autotest/utils/proxy_distributed_utils.py | 544 +++++++----------- autotest/utils/run_restful_chat.py | 3 +- 7 files changed, 288 insertions(+), 403 deletions(-) diff --git a/autotest/conftest.py b/autotest/conftest.py index aa8bced205..640ad59e77 100644 --- a/autotest/conftest.py +++ b/autotest/conftest.py @@ -80,33 +80,18 @@ def shared_ray_manager(): @pytest.fixture(scope='session') def shared_proxy_manager(): master_addr = os.getenv('MASTER_ADDR', 'localhost') - device = os.environ.get('DEVICE', '') - - # Load device-specific or default config - if device: - device_config_path = f'autotest/config-{device}.yaml' - if os.path.exists(device_config_path): - config_path = device_config_path - else: - config_path = config_file - else: - config_path = config_file - with open(config_path) as f: - env_config = yaml.load(f, Loader=yaml.SafeLoader) - log_dir = env_config.get('log_path', '/tmp/lmdeploy_test') - - # Initialize manager (master starts proxy automatically) - manager = ProxyDistributedManager(health_check=True, proxy_port=PROXY_PORT, log_dir=log_dir) + manager = ProxyDistributedManager() if manager.is_master: - print(f'๐ŸŽฏ Master node: LMDeploy Proxy started on {master_addr}:{PROXY_PORT}') - print('โณ Waiting for worker nodes to connect (they will register when starting api_server)...') + manager.start() + print(f'๐ŸŽฏ Master node: LMDeploy Proxy started on {master_addr}:{manager.proxy_port}') + print('โณ Waiting for worker nodes to connect...') yield manager print(f'\n[Final Cleanup] Node {manager.node_rank} performing final resource cleanup...') - manager.cleanup(force=True) + manager.cleanup() def pytest_addoption(parser): diff --git a/autotest/evaluate/test_api_evaluate.py b/autotest/evaluate/test_api_evaluate.py index b05627485a..05ac42df30 100644 --- a/autotest/evaluate/test_api_evaluate.py +++ b/autotest/evaluate/test_api_evaluate.py @@ -4,7 +4,7 @@ import pytest from utils.config_utils import get_evaluate_pytorch_model_list, get_evaluate_turbomind_model_list, get_workerid from utils.evaluate_utils import restful_test -from utils.proxy_distributed_utils import proxy_worker_node_wait +from utils.proxy_distributed_utils import ApiServerPerTest, proxy_worker_node_wait from utils.ray_distributed_utils import ray_worker_node_wait from utils.run_restful_chat import start_proxy_server, start_restful_api, stop_restful_api @@ -129,10 +129,9 @@ def _run_proxy_distributed_test(config, test_type='infer', manager=None, eval_config_name='default'): - assert manager is not None, 'Manager instance must be provided' - # Adjust eval config for GPT-style models + # ็‰นๆฎŠๆจกๅž‹ไฝฟ็”จไธ“็”จ่ฏ„ไผฐ้…็ฝฎ if 'gpt' in model_param.get('model', '').lower(): eval_config_name = 'gpt' @@ -140,29 +139,36 @@ def _run_proxy_distributed_test(config, model_name = model_param['model'] model_path = os.path.join(config['model_path'], model_name) - manager.start_lmdeploy_api_server_async(model_path=model_path, model_param=model_param) - - if manager.is_master: - try: + # ๅฏๅŠจๆœฌๆต‹่ฏ•ไธ“ๅฑž็š„ API Server๏ผˆๆฏไธช่Š‚็‚น้ƒฝๅฏๅŠจ่‡ชๅทฑ็š„ๅฎžไพ‹๏ผ‰ + api_server = ApiServerPerTest(proxy_manager=manager, model_path=model_path, model_param=model_param) + api_server.start() + try: + if manager.is_master: + # Master ็ญ‰ๅพ…ๆ‰€ๆœ‰ๅฎžไพ‹ๆณจๅ†ŒๅฎŒๆˆ + api_server.wait_until_ready() print(f'๐Ÿงช Master node executing {test_type} test ({eval_config_name})...') + result, msg = restful_test(config, run_id, model_param, worker_id=worker_id, - port=manager.proxy_port, + port=PROXY_PORT, test_type=test_type, **preset_config) assert result, f'โŒ {test_type} test failed: {msg}' print(f'โœ… {test_type} test passed') - finally: - print('๐Ÿงน Master node cleaning up API servers after test...') - manager.cleanup(force=False) + else: + # Worker ่Š‚็‚น่ฟ›ๅ…ฅ็ญ‰ๅพ…ๆจกๅผ๏ผŒ็›‘ๆŽง master proxy ๆ˜ฏๅฆ้€€ๅ‡บ + print(f'โธ๏ธ Worker node {manager.node_rank} waiting for master to complete test...') + proxy_worker_node_wait(manager, timeout_minutes=4880) - else: - print(f'โธ๏ธ Worker node {manager.node_rank} waiting for master to complete test...') - proxy_worker_node_wait(manager, timeout_minutes=4880) + finally: + # ๆฏไธช่Š‚็‚นๆธ…็†่‡ชๅทฑ็š„ API Server ่ฟ›็จ‹ + api_server.cleanup() + if manager.is_master: + time.sleep(1) # ็ป™ workers ไธ€็‚นๆ—ถ้—ดๆ„Ÿ็Ÿฅ proxy ๅ…ณ้—ญ def get_turbomind_model_list(tp_num): @@ -257,7 +263,7 @@ def test_turbomind_restful_tp8(config, run_id, prepare_environment, worker_id): @pytest.mark.infer @pytest.mark.turbomind -@pytest.mark.gpu_num_16 +@pytest.mark.gpu_num_distributed_tp16 @pytest.mark.flaky(reruns=0) @pytest.mark.parametrize('model_param', get_turbomind_model_list(tp_num=16)) def test_turbomind_restful_distributed_tp16(shared_ray_manager, config, run_id, model_param, worker_id): @@ -271,7 +277,7 @@ def test_turbomind_restful_distributed_tp16(shared_ray_manager, config, run_id, @pytest.mark.infer @pytest.mark.turbomind -@pytest.mark.gpu_num_16 +@pytest.mark.gpu_num_distributed_dpep16 @pytest.mark.flaky(reruns=0) @pytest.mark.parametrize('model_param', get_turbomind_model_list(tp_num=16)) def test_turbomind_restful_distributed_dpep16(shared_proxy_manager, config, run_id, model_param, worker_id): @@ -340,7 +346,7 @@ def test_pytorch_restful_tp16(config, run_id, prepare_environment, worker_id): @pytest.mark.infer @pytest.mark.pytorch -@pytest.mark.gpu_num_16 +@pytest.mark.gpu_num_distributed_tp16 @pytest.mark.flaky(reruns=0) @pytest.mark.parametrize('model_param', get_pytorch_model_list(tp_num=16)) def test_pytorch_restful_distributed_tp16(shared_ray_manager, config, run_id, model_param, worker_id): @@ -354,7 +360,7 @@ def test_pytorch_restful_distributed_tp16(shared_ray_manager, config, run_id, mo @pytest.mark.infer @pytest.mark.pytorch -@pytest.mark.gpu_num_16 +@pytest.mark.gpu_num_distributed_dpep16 @pytest.mark.flaky(reruns=0) @pytest.mark.parametrize('model_param', get_pytorch_model_list(tp_num=16)) def test_pytorch_restful_distributed_dpep16(shared_proxy_manager, config, run_id, model_param, worker_id): @@ -459,3 +465,13 @@ def test_turbomind_judgeeval_tp4(config, run_id, prepare_environment_judge_evalu def test_turbomind_judgeeval_tp8(config, run_id, prepare_environment_judge_evaluate, worker_id): result, msg = run_test(config, run_id, prepare_environment_judge_evaluate, worker_id, 'eval') assert result, msg + + +@pytest.mark.eval +@pytest.mark.turbomind +@pytest.mark.gpu_num_16 +@pytest.mark.flaky(reruns=0) +@pytest.mark.parametrize('prepare_environment_judge_evaluate', get_turbomind_model_list(tp_num=16), indirect=True) +def test_turbomind_judgeeval_tp16(config, run_id, prepare_environment_judge_evaluate, worker_id): + result, msg = run_test(config, run_id, prepare_environment_judge_evaluate, worker_id, 'eval') + assert result, msg diff --git a/autotest/tools/restful/test_restful_chat_hf_pytorch_llm.py b/autotest/tools/restful/test_restful_chat_hf_pytorch_llm.py index 04831cfb31..b3afb611ad 100644 --- a/autotest/tools/restful/test_restful_chat_hf_pytorch_llm.py +++ b/autotest/tools/restful/test_restful_chat_hf_pytorch_llm.py @@ -3,7 +3,7 @@ import pytest from utils.config_utils import get_torch_model_list, get_workerid -from utils.proxy_distributed_utils import proxy_worker_node_wait +from utils.proxy_distributed_utils import ApiServerPerTest, proxy_worker_node_wait from utils.ray_distributed_utils import ray_worker_node_wait from utils.run_restful_chat import run_all_step, run_reasoning_case, run_tools_case, start_restful_api, stop_restful_api @@ -82,20 +82,23 @@ def _run_proxy_distributed_test( model_name = model_param['model'] model_path = os.path.join(config['model_path'], model_name) - # Start API Server for current model (master node starts/stops, worker nodes verify) - manager.start_lmdeploy_api_server_async(model_path=model_path, model_param=model_param) + api_server = ApiServerPerTest(proxy_manager=manager, model_path=model_path, model_param=model_param) + api_server.start() - if manager.is_master: + try: + + if manager.is_master: + api_server.wait_until_ready() - try: run_all_step(config, common_case_config, worker_id=worker_id, port=PROXY_PORT) - finally: - # Clean up API Server for current model (worker nodes skip) - manager.cleanup(force=False) - else: - time.sleep(10) - proxy_worker_node_wait(manager, timeout_minutes=4880) + else: + print(f'โธ๏ธ Worker node {manager.node_rank} waiting for master to complete test...') + proxy_worker_node_wait(manager, timeout_minutes=4880) + finally: + api_server.cleanup() + if manager.is_master: + time.sleep(1) @pytest.mark.order(7) @@ -179,6 +182,8 @@ def test_restful_chat_tp16(config, common_case_config, worker_id): @pytest.mark.order(7) @pytest.mark.usefixtures('common_case_config') @pytest.mark.restful_api_pytorch +@pytest.mark.flaky(reruns=0) +@pytest.mark.gpu_num_distributed_tp16 @pytest.mark.parametrize('model_param', getModelList(tp_num=16)) def test_restful_chat_distributed_tp16(shared_ray_manager, config, model_param, common_case_config, worker_id): _run_ray_distributed_test(config=config, @@ -191,6 +196,8 @@ def test_restful_chat_distributed_tp16(shared_ray_manager, config, model_param, @pytest.mark.order(7) @pytest.mark.usefixtures('common_case_config') @pytest.mark.restful_api_pytorch +@pytest.mark.flaky(reruns=0) +@pytest.mark.gpu_num_distributed_dpep16 @pytest.mark.parametrize('model_param', getModelList(tp_num=16)) def test_restful_chat_distributed_dpep16(shared_proxy_manager, config, model_param, common_case_config, worker_id): _run_proxy_distributed_test(config=config, diff --git a/autotest/tools/restful/test_restful_chat_hf_turbomind_llm.py b/autotest/tools/restful/test_restful_chat_hf_turbomind_llm.py index fcd197be16..78b482ec72 100644 --- a/autotest/tools/restful/test_restful_chat_hf_turbomind_llm.py +++ b/autotest/tools/restful/test_restful_chat_hf_turbomind_llm.py @@ -3,7 +3,7 @@ import pytest from utils.config_utils import get_communicator_list, get_turbomind_model_list, get_workerid -from utils.proxy_distributed_utils import proxy_worker_node_wait +from utils.proxy_distributed_utils import ApiServerPerTest, proxy_worker_node_wait from utils.ray_distributed_utils import ray_worker_node_wait from utils.run_restful_chat import (run_all_step, run_reasoning_case, run_tools_case, start_restful_api, stop_restful_api, test_logprobs) @@ -90,20 +90,23 @@ def _run_proxy_distributed_test( model_name = model_param['model'] model_path = os.path.join(config['model_path'], model_name) - # Start API Server for current model (master node starts/stops, worker nodes verify) - manager.start_lmdeploy_api_server_async(model_path=model_path, model_param=model_param) + api_server = ApiServerPerTest(proxy_manager=manager, model_path=model_path, model_param=model_param) + api_server.start() - if manager.is_master: + try: + + if manager.is_master: + api_server.wait_until_ready() - try: run_all_step(config, common_case_config, worker_id=worker_id, port=PROXY_PORT) - finally: - # Clean up API Server for current model (worker nodes skip) - manager.cleanup(force=False) - else: - time.sleep(10) - proxy_worker_node_wait(manager, timeout_minutes=4880) + else: + print(f'โธ๏ธ Worker node {manager.node_rank} waiting for master to complete test...') + proxy_worker_node_wait(manager, timeout_minutes=4880) + finally: + api_server.cleanup() + if manager.is_master: + time.sleep(1) @pytest.mark.order(7) @@ -170,6 +173,7 @@ def test_restful_chat_tp8(config, common_case_config, worker_id): @pytest.mark.order(7) @pytest.mark.usefixtures('common_case_config') @pytest.mark.restful_api +@pytest.mark.flaky(reruns=0) @pytest.mark.gpu_num_distributed_16 @pytest.mark.parametrize('model_param', getModelList(tp_num=16)) def test_restful_chat_distributed_tp16(shared_ray_manager, config, model_param, common_case_config, worker_id): @@ -183,7 +187,8 @@ def test_restful_chat_distributed_tp16(shared_ray_manager, config, model_param, @pytest.mark.order(7) @pytest.mark.usefixtures('common_case_config') @pytest.mark.restful_api -@pytest.mark.gpu_num_distributed_16 +@pytest.mark.flaky(reruns=0) +@pytest.mark.gpu_num_distributed_dpep16 @pytest.mark.parametrize('model_param', getModelList(tp_num=16)) def test_restful_chat_distributed_dpep16(shared_proxy_manager, config, model_param, common_case_config, worker_id): _run_proxy_distributed_test(config=config, diff --git a/autotest/utils/evaluate_utils.py b/autotest/utils/evaluate_utils.py index 596dbc6b32..ced9355289 100644 --- a/autotest/utils/evaluate_utils.py +++ b/autotest/utils/evaluate_utils.py @@ -106,6 +106,9 @@ def restful_test(config, run_id, prepare_environment, worker_id='gw0', port=DEFA f"wk_{backend_type}_{model_name.replace('/', '_')}_{communicator}_{quant_policy}") os.makedirs(work_dir, exist_ok=True) + master_addr = os.getenv('MASTER_ADDR', '127.0.0.1') + test_url = f'http://{master_addr}:{port}/v1' + try: temp_config_file = f"temp_{backend_type}_{summary_model_name.replace('/', '_')}_{communicator}.py" @@ -119,12 +122,12 @@ def restful_test(config, run_id, prepare_environment, worker_id='gw0', port=DEFA cfg.MODEL_NAME = summary_model_name cfg.MODEL_PATH = model_path - cfg.API_BASE = f'http://127.0.0.1:{port}/v1' # noqa: E231 + cfg.API_BASE = test_url # noqa: E231 if cfg.models and len(cfg.models) > 0: model_cfg = cfg.models[0] model_cfg['abbr'] = f'{summary_model_name}-lmdeploy-api' - model_cfg['openai_api_base'] = f'http://127.0.0.1:{port}/v1' # noqa: E231 + model_cfg['openai_api_base'] = test_url # noqa: E231 model_cfg['path'] = model_path for key, value in kwargs.items(): @@ -142,7 +145,7 @@ def restful_test(config, run_id, prepare_environment, worker_id='gw0', port=DEFA cfg = Config.fromfile(temp_config_path) print(f'Using existing temp config file: {temp_config_path}') - cfg.JUDGE_API_BASE = f'http://127.0.0.1:{port}/v1' + cfg.JUDGE_API_BASE = test_url cfg.JUDGE_MODEL_PATH = os.path.join(model_base_path, 'Qwen/Qwen2.5-32B-Instruct') if hasattr(cfg, 'judge_cfg'): diff --git a/autotest/utils/proxy_distributed_utils.py b/autotest/utils/proxy_distributed_utils.py index 103ea96bb6..af69c50dfd 100644 --- a/autotest/utils/proxy_distributed_utils.py +++ b/autotest/utils/proxy_distributed_utils.py @@ -3,31 +3,108 @@ import socket import subprocess import time -from time import time as time_time from typing import Any, Dict, Tuple import requests -# Default constants -LM_DEPLOY_PROXY_PORT = 8000 -HEALTH_CHECK_TIMEOUT = 30 -CONNECTION_CHECK_TIMEOUT = 5 -WORKER_WAIT_INTERVAL = 30 +time_time = time.time + +DEFAULT_PROXY_PORT = 8000 +WORKER_WAIT_INTERVAL = 15 # seconds + + +def is_port_open(host: str, port: int, timeout: float = 1.0) -> bool: + """Check if a port is open.""" + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.settimeout(timeout) + try: + s.connect((host, port)) + return True + except (socket.timeout, ConnectionRefusedError, OSError): + return False + + +def verify_service_functionality(host: str, proxy_port: int, model_name: str, check_count: int) -> bool: + try: + url = f'http://{host}:{proxy_port}/v1/chat/completions' + payload = { + 'model': model_name, + 'messages': [{ + 'role': 'user', + 'content': 'hi' + }], + 'max_tokens': 5, + 'stream': False + } + resp = requests.post(url, json=payload, timeout=15) + if resp.status_code in (200, 400): + return True + else: + print(f'๐Ÿ”ง Check {check_count}: Service returned status {resp.status_code}') + return False + except Exception as e: + print(f'๐Ÿ”ง Check {check_count}: Failed to verify service functionality - {e}') + return False + + +def check_nodes_status(host: str, proxy_port: int, model_name: str, expected_instances: int, check_count: int, + current_time: float, last_progress_print: float, + progress_print_interval: int) -> Tuple[bool, int]: + try: + nodes_url = f'http://{host}:{proxy_port}/nodes/status' + resp = requests.get(nodes_url, timeout=10) + + if resp.status_code != 200: + if current_time - last_progress_print >= progress_print_interval: + print(f'๐Ÿ”ง Check {check_count}: Failed to get node status, status code: {resp.status_code}') + return False, 0 + + nodes_data = resp.json() + ready_instances = 0 + total_instances = len(nodes_data) + + for node_info in nodes_data.values(): + models = node_info.get('models', []) + if model_name in models: + ready_instances += 1 + + should_print = current_time - last_progress_print >= progress_print_interval + + if should_print: + basename = os.path.basename(model_name) + print(f'๐Ÿ“Š Check {check_count}: Model registration progress: ' + f'{ready_instances}/{expected_instances} instances ready ' + f'(Total reported: {total_instances})') + for node_url, node_info in nodes_data.items(): + models = node_info.get('models', []) + if model_name in models: + print(f' โœ… Instance {node_url} registered model {basename}') + else: + print(f' โณ Instance {node_url} has not registered target model') + + if ready_instances >= expected_instances: + if should_print: + print(f'๐ŸŽฏ All {expected_instances} API server instances have registered the target model') + return True, ready_instances + else: + if should_print: + print(f'โณ Waiting for more instances to register... ({ready_instances}/{expected_instances})') + return False, ready_instances + + except Exception as e: + if current_time - last_progress_print >= progress_print_interval: + print(f'๐Ÿ”ง Check {check_count}: Exception getting node status - {e}') + return False, 0 def wait_for_model_service_ready(host: str, proxy_port: int, model_name: str, - timeout_seconds: int = 1500, - expected_nodes: int = None) -> bool: - """Wait for LM Deploy Proxy + backend workers to be fully ready, ensuring - all nodes are registered. - - Check all nodes' readiness status through /nodes/status API. - """ - if expected_nodes: + timeout_seconds: int = 2000, + expected_instances: int = None) -> bool: + if expected_instances: print(f'โณ Waiting for model service to be fully ready (Model: {model_name}), ' - f'expected nodes: {expected_nodes}, timeout: {timeout_seconds}s') + f'expected instances: {expected_instances}, timeout: {timeout_seconds}s') else: print(f'โณ Waiting for model service to be fully ready (Model: {model_name}), ' f'timeout: {timeout_seconds}s') @@ -54,19 +131,20 @@ def wait_for_model_service_ready(host: str, time.sleep(10) continue - if expected_nodes: - nodes_ready, ready_nodes = check_nodes_status(host, proxy_port, model_name, expected_nodes, check_count, - current_time, last_progress_print, - progress_print_interval) - if not nodes_ready: - if ready_nodes is not None and current_time - last_progress_print >= progress_print_interval: + if expected_instances: + instances_ready, ready_count = check_nodes_status(host, proxy_port, model_name, expected_instances, + check_count, current_time, last_progress_print, + progress_print_interval) + if not instances_ready: + if ready_count is not None and current_time - last_progress_print >= progress_print_interval: last_progress_print = current_time + time.sleep(10) continue service_ready = verify_service_functionality(host, proxy_port, model_name, check_count) if service_ready: - if expected_nodes: - print(f'โœ… All {expected_nodes} nodes are ready and service is functional!') + if expected_instances: + print(f'โœ… All {expected_instances} API server instances are ready and service is functional!') else: print('โœ… Model service is fully ready!') return True @@ -87,345 +165,135 @@ def wait_for_model_service_ready(host: str, return False -def check_nodes_status(host: str, proxy_port: int, model_name: str, expected_nodes: int, check_count: int, - current_time: float, last_progress_print: float, - progress_print_interval: int) -> Tuple[bool, int]: - try: - nodes_url = f'http://{host}:{proxy_port}/nodes/status' - resp = requests.get(nodes_url, timeout=10) +def proxy_worker_node_wait(manager, timeout_minutes: int = 120): + """Worker node waits by periodically checking if the master's proxy service + is still alive. If the proxy becomes unreachable for several consecutive + checks, assume master has finished. - if resp.status_code != 200: - if current_time - last_progress_print >= progress_print_interval: - print(f'๐Ÿ”ง Check {check_count}: Failed to get node status, status code: {resp.status_code}') - return False, 0 - - nodes_data = resp.json() - ready_nodes = 0 - total_nodes = len(nodes_data) - - for node_url, node_info in nodes_data.items(): - models = node_info.get('models', []) - if model_name in models: - ready_nodes += 1 - - should_print = current_time - last_progress_print >= progress_print_interval + Args: + manager: ProxyDistributedManager instance + timeout_minutes: Maximum time to wait before giving up (default: 120 minutes) + """ + print(f'โธ๏ธ Worker node {manager.node_rank} entering monitoring mode...') - if should_print: - print(f'๐Ÿ“Š Check {check_count}: Node readiness progress: {ready_nodes}/{expected_nodes} ' - f'(Total nodes: {total_nodes})') - for node_url, node_info in nodes_data.items(): - models = node_info.get('models', []) - basename = os.path.basename(model_name) - if model_name in models: - print(f' โœ… Node {node_url} registered model {basename}') - else: - print(f' โณ Node {node_url} has not registered target model') + max_checks = (timeout_minutes * 60) // WORKER_WAIT_INTERVAL + consecutive_failures = 0 + max_consecutive_failures = 3 - if ready_nodes >= expected_nodes: - if should_print: - print(f'๐ŸŽฏ All {expected_nodes} nodes have registered the target model') - return True, ready_nodes + for i in range(max_checks): + if not is_port_open(manager.master_addr, manager.proxy_port, timeout=2.0): + consecutive_failures += 1 + print(f'โš ๏ธ Proxy connection to master failed ({consecutive_failures}/{max_consecutive_failures})') + if consecutive_failures >= max_consecutive_failures: + print('๐Ÿ“ก Master proxy service stopped, worker node exiting') + break else: - if should_print: - print(f'โณ Waiting for more nodes to register... ({ready_nodes}/{expected_nodes})') - return False, ready_nodes - - except Exception as e: - if current_time - last_progress_print >= progress_print_interval: - print(f'๐Ÿ”ง Check {check_count}: Exception getting node status - {e}') - return False, 0 - - -def verify_service_functionality(host: str, proxy_port: int, model_name: str, check_count: int) -> bool: - try: - test_data = { - 'model': model_name, - 'messages': [{ - 'role': 'user', - 'content': 'hi' - }], - 'max_tokens': 5, - 'stream': False - } + consecutive_failures = 0 - resp = requests.post(f'http://{host}:{proxy_port}/v1/chat/completions', json=test_data, timeout=15) + if i % 4 == 0: + elapsed = (i * WORKER_WAIT_INTERVAL) // 60 + print(f'โณ Worker node {manager.node_rank} monitoring... Running for {elapsed} minutes') - if resp.status_code == 200: - print(f'โœ… Check {check_count}: Service functionality OK (received valid response)') - return True - elif resp.status_code == 400: - print(f'โœ… Check {check_count}: Service framework activated (received 400)') - return True - else: - print(f'๐Ÿ”ง Check {check_count}: Service functionality test failed, status code: {resp.status_code}') - return False + time.sleep(WORKER_WAIT_INTERVAL) + else: + print(f'โฐ Worker node {manager.node_rank} monitoring timed out ({timeout_minutes} minutes)') - except requests.exceptions.RequestException as e: - print(f'๐Ÿ”ง Check {check_count}: Service functionality test exception - {e}') - return False + print(f'โœ… Worker node {manager.node_rank} completed waiting') class ProxyDistributedManager: - def __init__(self, health_check: bool = True, proxy_port: int = None, log_dir: str = '.'): - self.health_check = health_check - self.proxy_port = proxy_port or LM_DEPLOY_PROXY_PORT - self.log_dir = log_dir - self._cleaned = False - - self._lmdeploy_proxy_process = None - self._local_lmdeploy_process = None - + def __init__(self): + self.master_addr = os.getenv('MASTER_ADDR', 'localhost') self.node_rank = int(os.getenv('NODE_RANK', '0')) - self.is_master = (self.node_rank == 0) + self.proxy_port = int(os.getenv('PROXY_PORT', str(DEFAULT_PROXY_PORT))) - os.makedirs(self.log_dir, exist_ok=True) - - role = 'master' if self.is_master else 'worker' - timestamp = time.strftime('%Y%m%d_%H%M%S') - log_filename = f'lmdeploy_{role}_rank{self.node_rank}_{timestamp}.log' - self._lmdeploy_log_path = os.path.join(self.log_dir, log_filename) - - self._setup_from_env() - self._setup_distributed_cluster() + self.is_master = (self.node_rank == 0) + self.proxy_process = None - print(f'๐Ÿ“ Node {self.node_rank} LMDeploy log path: {self._lmdeploy_log_path}') + def start(self): + if not self.is_master: + return - def _setup_from_env(self): - self.node_count = int(os.getenv('NODE_COUNT', '1')) - self.master_addr = os.getenv('MASTER_ADDR', 'localhost') - self.proc_per_node = int(os.getenv('PROC_PER_NODE', '1')) - self.job_id = os.getenv('JOB_ID', 'unknown') - self.total_gpus = self.node_count * self.proc_per_node - - print(f'๐ŸŽฏ Node {self.node_rank} distributed environment info:') - print(f' - Nodes: {self.node_count} nodes ร— {self.proc_per_node} GPUs = {self.total_gpus} GPUs') - print(f" - Current: Rank {self.node_rank} ({'Master node' if self.is_master else 'Worker node'})") - print(f' - Master address: {self.master_addr}') - print(f' - Proxy port: {self.proxy_port}') - print(f' - Job ID: {self.job_id}') - - def _setup_distributed_cluster(self): - if self.is_master: - self._start_lmdeploy_proxy() - if self.health_check: - self._basic_health_check() - - def _start_lmdeploy_proxy(self): - print(f'๐Ÿš€ Master node starting lmdeploy proxy (port: {self.proxy_port})...') - env = os.environ.copy() - self._lmdeploy_proxy_process = subprocess.Popen([ - 'lmdeploy', - 'serve', - 'proxy', - '--server-name', - self.master_addr, - '--server-port', - str(self.proxy_port), - '--routing-strategy', - 'min_expected_latency', - '--serving-strategy', - 'Hybrid', - ], - env=env) - time.sleep(10) - - if self._check_service_health(self.proxy_port): - print('โœ… lmdeploy proxy started successfully') - else: - print('โš ๏ธ lmdeploy proxy may have issues starting') - - def start_lmdeploy_api_server_async(self, - model_path: str, - model_param: dict, - start_timeout: int = 1500) -> Tuple[int, subprocess.Popen]: - total_gpus_per_node = self.proc_per_node - total_nodes = self.node_count - - ep = total_gpus_per_node * total_nodes - dp = total_gpus_per_node * total_nodes - - backend = model_param.get('backend', 'turbomind') - communicator = model_param.get('communicator', 'nccl') - quant_policy = model_param.get('quant_policy', 0) - - full_command = [ - 'lmdeploy', 'serve', 'api_server', model_path, '--backend', backend, '--tp', - str(1), '--ep', - str(ep), '--dp', - str(dp), '--proxy-url', f'http://{self.master_addr}:{self.proxy_port}', '--nnodes', - str(total_nodes), '--node-rank', - str(self.node_rank), '--communicator', communicator + cmd = [ + 'lmdeploy', 'serve', 'proxy', '--server-name', self.master_addr, '--server-port', + str(self.proxy_port), '--routing-strategy', 'min_expected_latency', '--serving-strategy', 'Hybrid' ] + print(f"[Proxy] Starting: {' '.join(cmd)}") + self.proxy_process = subprocess.Popen(cmd) - if backend == 'turbomind': - full_command.extend(['--quant-policy', str(quant_policy)]) + time.sleep(5) - cmd = ' '.join(full_command) - print(f'๐ŸŽฏ Node {self.node_rank} start command: {cmd}') + def cleanup(self): + if self.proxy_process and self.proxy_process.poll() is None: + print('[Proxy] Terminating proxy process...') + self.proxy_process.terminate() + try: + self.proxy_process.wait(timeout=10) + except subprocess.TimeoutExpired: + self.proxy_process.kill() - env = os.environ.copy() - env.update({ - 'DEEPEP_MAX_BATCH_SIZE': '256', - }) - if dp > 1: - env.update({ - 'LMDEPLOY_DP_MASTER_ADDR': self.master_addr, - 'LMDEPLOY_DP_MASTER_PORT': '29555', - }) +class ApiServerPerTest: - log_file = open(self._lmdeploy_log_path, 'w') + def __init__(self, proxy_manager: ProxyDistributedManager, model_path: str, model_param: Dict[str, Any]): + self.proxy_manager = proxy_manager + self.model_path = model_path + self.model_param = model_param or {} - try: - self._local_lmdeploy_process = subprocess.Popen(full_command, - stdout=log_file, - stderr=log_file, - env=env, - text=True, - encoding='utf-8') - pid = self._local_lmdeploy_process.pid - print(f'๐Ÿš€ Node {self.node_rank} started lmdeploy api_server (PID: {pid}), log: {self._lmdeploy_log_path}') - - if self.health_check: - expected_nodes = self.node_count - ready = wait_for_model_service_ready(host=self.master_addr, - proxy_port=self.proxy_port, - model_name=model_path, - timeout_seconds=start_timeout, - expected_nodes=expected_nodes) - if not ready: - print(f'โŒ Node {self.node_rank}: Model service could not be ready within timeout, ' - f'terminating local process') - self._local_lmdeploy_process.terminate() - try: - self._local_lmdeploy_process.wait(timeout=10) - except subprocess.TimeoutExpired: - self._local_lmdeploy_process.kill() - log_file.close() - return 0, self._local_lmdeploy_process - - log_file.close() - return pid, self._local_lmdeploy_process - - except Exception as e: - print(f'๐Ÿ’ฅ Node {self.node_rank} failed to start lmdeploy api_server: {e}') - log_file.close() - raise - - def is_lmdeploy_running(self): - return self._local_lmdeploy_process is not None and self._local_lmdeploy_process.poll() is None - - def _basic_health_check(self): - print(f'๐Ÿ” Node {self.node_rank} performing basic health check...') - if self.is_master: - ok = self._check_service_health(self.proxy_port) - status = 'โœ… lmdeploy proxy service healthy' if ok else 'โš ๏ธ lmdeploy proxy service may have issues' - else: - ok = self._check_connection_to_master(self.proxy_port) - status = 'โœ… Connection to master node normal' if ok else 'โš ๏ธ Connection to master node may have issues' - print(status) - - def _check_service_health(self, port: int) -> bool: - try: - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: - sock.settimeout(HEALTH_CHECK_TIMEOUT) - return sock.connect_ex((self.master_addr, port)) == 0 - except Exception: - return False + self.master_addr = proxy_manager.master_addr + self.proxy_port = proxy_manager.proxy_port + self.node_rank = int(os.getenv('NODE_RANK', '0')) + self.node_count = int(os.getenv('NODE_COUNT', '1')) + self.proc_per_node = int(os.getenv('PROC_PER_NODE', '1')) - def _check_connection_to_master(self, port: int = None) -> bool: - p = port or self.proxy_port - try: - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: - sock.settimeout(CONNECTION_CHECK_TIMEOUT) - return sock.connect_ex((self.master_addr, p)) == 0 - except Exception: - return False + self.backend = self.model_param.get('backend', 'turbomind') + self.communicator = self.model_param.get('communicator', 'nccl') + self.quant_policy = self.model_param.get('quant_policy', 0) + self.tp = int(self.model_param.get('tp', 1)) + self.ep = self.node_count * self.proc_per_node + self.dp = self.node_count * self.proc_per_node + self.max_batch_size = int(self.model_param.get('max_batch_size', 128)) - def get_cluster_info(self) -> Dict[str, Any]: - return { - 'node_rank': self.node_rank, - 'node_count': self.node_count, - 'master_addr': self.master_addr, - 'proc_per_node': self.proc_per_node, - 'total_gpus': self.total_gpus, - 'job_id': self.job_id, - 'is_master': self.is_master, - 'proxy_port': self.proxy_port - } + self.expected_instances = self.node_count * self.proc_per_node + self.is_master = (self.node_rank == 0) + self.api_process = None + + def start(self): + proxy_url = f'http://{self.master_addr}:{self.proxy_port}' + cmd = [ + 'lmdeploy', 'serve', 'api_server', self.model_path, '--backend', + str(self.backend), '--tp', + str(self.tp), '--ep', + str(self.ep), '--dp', + str(self.dp), '--proxy-url', proxy_url, '--nnodes', + str(self.node_count), '--node-rank', + str(self.node_rank), '--communicator', + str(self.communicator), '--max-batch-size', + str(self.max_batch_size) + ] + if self.quant_policy != 0: + cmd += ['--quant-policy', str(self.quant_policy)] - def cleanup(self, force: bool = True): - """Clean up resources. + print(f"[API Server] Starting: {' '.join(cmd)}") + self.api_process = subprocess.Popen(cmd) - Args: - force (bool): - - False: Only stop LMDeploy API Server (used after individual test completion) - - True: Stop API Server + Proxy (if master) and mark as fully cleaned (session end) - """ - if self._cleaned and force: + def wait_until_ready(self): + if not self.is_master: return - - print(f'๐Ÿงน Node {self.node_rank} cleaning resources... (force={force})') - - # --- Stop local LMDeploy API Server (all nodes) --- - if hasattr(self, '_local_lmdeploy_process') and self._local_lmdeploy_process is not None: - if self._local_lmdeploy_process.poll() is None: - try: - self._local_lmdeploy_process.terminate() - self._local_lmdeploy_process.wait(timeout=10) - print(f'โœ… Node {self.node_rank}: LMDeploy API Server stopped') - except subprocess.TimeoutExpired: - print(f'โš ๏ธ Node {self.node_rank}: API Server stop timeout, forcing kill') - self._local_lmdeploy_process.kill() - - # --- Stop LMDeploy Proxy (master node only, only when force=True) --- - if force and self.is_master: - if hasattr(self, '_lmdeploy_proxy_process') and self._lmdeploy_proxy_process is not None: - if self._lmdeploy_proxy_process.poll() is None: - try: - self._lmdeploy_proxy_process.terminate() - self._lmdeploy_proxy_process.wait(timeout=10) - print('โœ… LMDeploy Proxy stopped') - except subprocess.TimeoutExpired: - print('โš ๏ธ LMDeploy Proxy stop timeout, forcing kill') - self._lmdeploy_proxy_process.kill() - - # Mark as fully cleaned only on final cleanup - if force: - self._cleaned = True - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.cleanup(force=True) - - -def proxy_worker_node_wait(manager: ProxyDistributedManager, timeout_minutes: int = 60): - print(f'โธ๏ธ Worker node {manager.node_rank} entering monitoring mode...') - - max_checks = (timeout_minutes * 60) // WORKER_WAIT_INTERVAL - consecutive_failures = 0 - max_consecutive_failures = 3 - - for i in range(max_checks): - if not manager._check_connection_to_master(): - consecutive_failures += 1 - print(f'โš ๏ธ Master node connection failed ({consecutive_failures}/{max_consecutive_failures})') - if consecutive_failures >= max_consecutive_failures: - print('๐Ÿ“ก Master node service stopped, worker node exiting') - break - else: - consecutive_failures = 0 - - if i % 4 == 0: - elapsed = (i * WORKER_WAIT_INTERVAL) // 60 - print(f'โณ Worker node {manager.node_rank} monitoring... Running for {elapsed} minutes') - - time.sleep(WORKER_WAIT_INTERVAL) - else: - print(f'โฐ Worker node {manager.node_rank} monitoring timed out ({timeout_minutes} minutes)') - - manager.cleanup(force=False) # Worker node only cleans up its own API Server when exiting - print(f'โœ… Worker node {manager.node_rank} completed waiting') + success = wait_for_model_service_ready(host=self.master_addr, + proxy_port=self.proxy_port, + model_name=self.model_path, + timeout_seconds=2000, + expected_instances=self.expected_instances) + if not success: + raise RuntimeError(f'API Server failed to register model: {self.model_path}') + + def cleanup(self): + if self.api_process and self.api_process.poll() is None: + print(f'[API Server] Terminating for model: {self.model_path}') + self.api_process.terminate() + try: + self.api_process.wait(timeout=15) + except subprocess.TimeoutExpired: + self.api_process.kill() diff --git a/autotest/utils/run_restful_chat.py b/autotest/utils/run_restful_chat.py index 4f088774f5..99851ded89 100644 --- a/autotest/utils/run_restful_chat.py +++ b/autotest/utils/run_restful_chat.py @@ -16,7 +16,8 @@ from lmdeploy.serve.openai.api_client import APIClient -BASE_HTTP_URL = 'http://localhost' +MASTER_ADDR = os.getenv('MASTER_ADDR', 'localhost') +BASE_HTTP_URL = f'http://{MASTER_ADDR}' DEFAULT_PORT = 23333 PROXY_PORT = 8000 From 3b26e4280c78fe14b02f0971792382135565e333 Mon Sep 17 00:00:00 2001 From: littlegy <787321726@qq.com> Date: Fri, 28 Nov 2025 11:48:32 +0800 Subject: [PATCH 4/5] rm turbomind test --- autotest/config-h.yaml | 3 - autotest/evaluate/test_api_evaluate.py | 45 +-------- .../test_restful_chat_hf_turbomind_llm.py | 92 ------------------- 3 files changed, 1 insertion(+), 139 deletions(-) diff --git a/autotest/config-h.yaml b/autotest/config-h.yaml index 0a7a9bfb4e..2192597498 100644 --- a/autotest/config-h.yaml +++ b/autotest/config-h.yaml @@ -37,7 +37,6 @@ turbomind_chat_model: - Qwen/Qwen3-32B-FP8 - openai/gpt-oss-120b - openai/gpt-oss-20b - - moonshotai/Kimi-K2-Instruct-0905 pytorch_chat_model: - internlm/Intern-S1 @@ -92,7 +91,6 @@ turbomind_quatization: - Qwen/Qwen3-32B-FP8 - openai/gpt-oss-120b - openai/gpt-oss-20b - - moonshotai/Kimi-K2-Instruct-0905 gptq: - empty no_kvint4: @@ -111,7 +109,6 @@ turbomind_quatization: - Qwen/Qwen3-32B-FP8 - openai/gpt-oss-120b - openai/gpt-oss-20b - - moonshotai/Kimi-K2-Instruct-0905 no_kvint8: - empty diff --git a/autotest/evaluate/test_api_evaluate.py b/autotest/evaluate/test_api_evaluate.py index 05ac42df30..7f15f29385 100644 --- a/autotest/evaluate/test_api_evaluate.py +++ b/autotest/evaluate/test_api_evaluate.py @@ -131,7 +131,6 @@ def _run_proxy_distributed_test(config, eval_config_name='default'): assert manager is not None, 'Manager instance must be provided' - # ็‰นๆฎŠๆจกๅž‹ไฝฟ็”จไธ“็”จ่ฏ„ไผฐ้…็ฝฎ if 'gpt' in model_param.get('model', '').lower(): eval_config_name = 'gpt' @@ -139,13 +138,11 @@ def _run_proxy_distributed_test(config, model_name = model_param['model'] model_path = os.path.join(config['model_path'], model_name) - # ๅฏๅŠจๆœฌๆต‹่ฏ•ไธ“ๅฑž็š„ API Server๏ผˆๆฏไธช่Š‚็‚น้ƒฝๅฏๅŠจ่‡ชๅทฑ็š„ๅฎžไพ‹๏ผ‰ api_server = ApiServerPerTest(proxy_manager=manager, model_path=model_path, model_param=model_param) api_server.start() try: if manager.is_master: - # Master ็ญ‰ๅพ…ๆ‰€ๆœ‰ๅฎžไพ‹ๆณจๅ†ŒๅฎŒๆˆ api_server.wait_until_ready() print(f'๐Ÿงช Master node executing {test_type} test ({eval_config_name})...') @@ -160,15 +157,13 @@ def _run_proxy_distributed_test(config, print(f'โœ… {test_type} test passed') else: - # Worker ่Š‚็‚น่ฟ›ๅ…ฅ็ญ‰ๅพ…ๆจกๅผ๏ผŒ็›‘ๆŽง master proxy ๆ˜ฏๅฆ้€€ๅ‡บ print(f'โธ๏ธ Worker node {manager.node_rank} waiting for master to complete test...') proxy_worker_node_wait(manager, timeout_minutes=4880) finally: - # ๆฏไธช่Š‚็‚นๆธ…็†่‡ชๅทฑ็š„ API Server ่ฟ›็จ‹ api_server.cleanup() if manager.is_master: - time.sleep(1) # ็ป™ workers ไธ€็‚นๆ—ถ้—ดๆ„Ÿ็Ÿฅ proxy ๅ…ณ้—ญ + time.sleep(1) def get_turbomind_model_list(tp_num): @@ -261,34 +256,6 @@ def test_turbomind_restful_tp8(config, run_id, prepare_environment, worker_id): assert result, msg -@pytest.mark.infer -@pytest.mark.turbomind -@pytest.mark.gpu_num_distributed_tp16 -@pytest.mark.flaky(reruns=0) -@pytest.mark.parametrize('model_param', get_turbomind_model_list(tp_num=16)) -def test_turbomind_restful_distributed_tp16(shared_ray_manager, config, run_id, model_param, worker_id): - _run_ray_distributed_test(config=config, - run_id=run_id, - model_param=model_param, - worker_id=worker_id, - test_type='infer', - manager=shared_ray_manager) - - -@pytest.mark.infer -@pytest.mark.turbomind -@pytest.mark.gpu_num_distributed_dpep16 -@pytest.mark.flaky(reruns=0) -@pytest.mark.parametrize('model_param', get_turbomind_model_list(tp_num=16)) -def test_turbomind_restful_distributed_dpep16(shared_proxy_manager, config, run_id, model_param, worker_id): - _run_proxy_distributed_test(config=config, - run_id=run_id, - model_param=model_param, - worker_id=worker_id, - test_type='infer', - manager=shared_proxy_manager) - - @pytest.mark.infer @pytest.mark.pytorch @pytest.mark.gpu_num_1 @@ -465,13 +432,3 @@ def test_turbomind_judgeeval_tp4(config, run_id, prepare_environment_judge_evalu def test_turbomind_judgeeval_tp8(config, run_id, prepare_environment_judge_evaluate, worker_id): result, msg = run_test(config, run_id, prepare_environment_judge_evaluate, worker_id, 'eval') assert result, msg - - -@pytest.mark.eval -@pytest.mark.turbomind -@pytest.mark.gpu_num_16 -@pytest.mark.flaky(reruns=0) -@pytest.mark.parametrize('prepare_environment_judge_evaluate', get_turbomind_model_list(tp_num=16), indirect=True) -def test_turbomind_judgeeval_tp16(config, run_id, prepare_environment_judge_evaluate, worker_id): - result, msg = run_test(config, run_id, prepare_environment_judge_evaluate, worker_id, 'eval') - assert result, msg diff --git a/autotest/tools/restful/test_restful_chat_hf_turbomind_llm.py b/autotest/tools/restful/test_restful_chat_hf_turbomind_llm.py index 78b482ec72..4d2bd1d831 100644 --- a/autotest/tools/restful/test_restful_chat_hf_turbomind_llm.py +++ b/autotest/tools/restful/test_restful_chat_hf_turbomind_llm.py @@ -1,10 +1,5 @@ -import os -import time - import pytest from utils.config_utils import get_communicator_list, get_turbomind_model_list, get_workerid -from utils.proxy_distributed_utils import ApiServerPerTest, proxy_worker_node_wait -from utils.ray_distributed_utils import ray_worker_node_wait from utils.run_restful_chat import (run_all_step, run_reasoning_case, run_tools_case, start_restful_api, stop_restful_api, test_logprobs) @@ -50,65 +45,6 @@ def getPrefixCacheModelList(tp_num): return model_list -def _run_ray_distributed_test( - config, - model_param, - common_case_config, - worker_id, - manager=None, # โ† New parameter: pass in shared manager -): - """Universal distributed test executor (using shared Ray cluster)""" - assert manager is not None, 'Manager instance must be provided' - - if manager.is_master: - model_name = model_param['model'] - model_path = os.path.join(config['model_path'], model_name) - - # Start API Server for current model (master node starts/stops, worker nodes verify) - manager.start_lmdeploy_api_server(model_path=model_path, model_param=model_param) - - try: - run_all_step(config, common_case_config, worker_id=worker_id, port=PROXY_PORT) - - finally: - # Clean up API Server for current model (worker nodes skip) - manager.cleanup(force=False) - else: - time.sleep(10) - ray_worker_node_wait(manager, timeout_minutes=4880) - - -def _run_proxy_distributed_test( - config, - model_param, - common_case_config, - worker_id, - manager=None, # โ† New parameter: pass in shared manager -): - """Universal distributed test executor (using shared Ray cluster)""" - assert manager is not None, 'Manager instance must be provided' - model_name = model_param['model'] - model_path = os.path.join(config['model_path'], model_name) - - api_server = ApiServerPerTest(proxy_manager=manager, model_path=model_path, model_param=model_param) - api_server.start() - - try: - - if manager.is_master: - api_server.wait_until_ready() - - run_all_step(config, common_case_config, worker_id=worker_id, port=PROXY_PORT) - - else: - print(f'โธ๏ธ Worker node {manager.node_rank} waiting for master to complete test...') - proxy_worker_node_wait(manager, timeout_minutes=4880) - finally: - api_server.cleanup() - if manager.is_master: - time.sleep(1) - - @pytest.mark.order(7) @pytest.mark.usefixtures('common_case_config') @pytest.mark.prefix_cache_test @@ -170,34 +106,6 @@ def test_restful_chat_tp8(config, common_case_config, worker_id): run_all_step(config, common_case_config, worker_id=worker_id, port=DEFAULT_PORT + get_workerid(worker_id)) -@pytest.mark.order(7) -@pytest.mark.usefixtures('common_case_config') -@pytest.mark.restful_api -@pytest.mark.flaky(reruns=0) -@pytest.mark.gpu_num_distributed_16 -@pytest.mark.parametrize('model_param', getModelList(tp_num=16)) -def test_restful_chat_distributed_tp16(shared_ray_manager, config, model_param, common_case_config, worker_id): - _run_ray_distributed_test(config=config, - model_param=model_param, - common_case_config=common_case_config, - worker_id=worker_id, - manager=shared_ray_manager) - - -@pytest.mark.order(7) -@pytest.mark.usefixtures('common_case_config') -@pytest.mark.restful_api -@pytest.mark.flaky(reruns=0) -@pytest.mark.gpu_num_distributed_dpep16 -@pytest.mark.parametrize('model_param', getModelList(tp_num=16)) -def test_restful_chat_distributed_dpep16(shared_proxy_manager, config, model_param, common_case_config, worker_id): - _run_proxy_distributed_test(config=config, - model_param=model_param, - common_case_config=common_case_config, - worker_id=worker_id, - manager=shared_proxy_manager) - - def getKvintModelList(tp_num, quant_policy): model_list = [] for communicator in get_communicator_list(tp_num): From beace57ded42ac5834770ded397d03a4973d55fb Mon Sep 17 00:00:00 2001 From: littlegy <787321726@qq.com> Date: Mon, 1 Dec 2025 19:01:33 +0800 Subject: [PATCH 5/5] update backend --- autotest/tools/restful/test_restful_chat_hf_turbomind_llm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/autotest/tools/restful/test_restful_chat_hf_turbomind_llm.py b/autotest/tools/restful/test_restful_chat_hf_turbomind_llm.py index 4d2bd1d831..f009b702f4 100644 --- a/autotest/tools/restful/test_restful_chat_hf_turbomind_llm.py +++ b/autotest/tools/restful/test_restful_chat_hf_turbomind_llm.py @@ -14,7 +14,7 @@ def prepare_environment(request, config, worker_id): model = param['model'] model_path = config.get('model_path') + '/' + model - pid, startRes = start_restful_api(config, param, model, model_path, 'pytorch', worker_id) + pid, startRes = start_restful_api(config, param, model, model_path, 'turbomind', worker_id) yield stop_restful_api(pid, startRes, param) else: