Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions autotest/config-h.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ tp_config:
DeepSeek-V3.1: 8
Qwen3-30B-A3B-Base: 2
Qwen2.5-32B-Instruct: 2
Kimi-K2-Instruct-0905: 16

turbomind_chat_model:
- internlm/Intern-S1
Expand Down Expand Up @@ -54,6 +55,7 @@ pytorch_chat_model:
- unsloth/gpt-oss-120b-BF16
- unsloth/gpt-oss-20b-BF16
- deepseek/DeepSeek-V3.1
- moonshotai/Kimi-K2-Instruct-0905

turbomind_vl_model:
- internlm/Intern-S1
Expand Down Expand Up @@ -129,6 +131,7 @@ pytorch_quatization:
- Qwen/Qwen3-30B-A3B-FP8
- Qwen/Qwen3-32B
- Qwen/Qwen3-32B-FP8
- moonshotai/Kimi-K2-Instruct-0905
no_kvint8:
- empty

Expand Down Expand Up @@ -167,3 +170,4 @@ evaluate_model:
- unsloth/gpt-oss-120b-BF16
- unsloth/gpt-oss-20b-BF16
- deepseek/DeepSeek-V3.1
- moonshotai/Kimi-K2-Instruct-0905
51 changes: 51 additions & 0 deletions autotest/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@

import pytest
import yaml
from utils.proxy_distributed_utils import ProxyDistributedManager
from utils.ray_distributed_utils import RayLMDeployManager

cli_prompt_case_file = 'autotest/chat_prompt_case.yaml'
common_prompt_case_file = 'autotest/prompt_case.yaml'
config_file = 'autotest/config.yaml'

PROXY_PORT = 8000


@pytest.fixture(scope='session')
def config():
Expand Down Expand Up @@ -43,6 +47,53 @@ def common_case_config():
return case_config


@pytest.fixture(scope='session')
def shared_ray_manager():
master_addr = os.getenv('MASTER_ADDR', 'localhost')
device = os.environ.get('DEVICE', '')
if device:
device_config_path = f'autotest/config-{device}.yaml'
if os.path.exists(device_config_path):
config_path = device_config_path
else:
config_path = config_file
else:
config_path = config_file

with open(config_path) as f:
env_config = yaml.load(f.read(), Loader=yaml.SafeLoader)
log_dir = env_config.get('log_path', '/tmp/lmdeploy_test')

manager = RayLMDeployManager(master_addr=master_addr, api_port=PROXY_PORT, log_dir=log_dir, health_check=True)

manager.start_ray_cluster()

if manager.is_master:
print('🎯 Master node: Ray cluster started, waiting for worker nodes to join...')

yield manager

print(f'\n[Final Cleanup] Node {manager.node_rank} performing final resource cleanup...')
manager.cleanup(force=True)


@pytest.fixture(scope='session')
def shared_proxy_manager():
master_addr = os.getenv('MASTER_ADDR', 'localhost')

manager = ProxyDistributedManager()

if manager.is_master:
manager.start()
print(f'🎯 Master node: LMDeploy Proxy started on {master_addr}:{manager.proxy_port}')
print('⏳ Waiting for worker nodes to connect...')

yield manager

print(f'\n[Final Cleanup] Node {manager.node_rank} performing final resource cleanup...')
manager.cleanup()


def pytest_addoption(parser):
parser.addoption('--run_id', action='store', default='', help='github run_id')
parser.addoption('--device', action='store', default='', help='device config suffix')
Expand Down
119 changes: 119 additions & 0 deletions autotest/evaluate/test_api_evaluate.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import os
import time

import pytest
from utils.config_utils import get_evaluate_pytorch_model_list, get_evaluate_turbomind_model_list, get_workerid
from utils.evaluate_utils import restful_test
from utils.proxy_distributed_utils import ApiServerPerTest, proxy_worker_node_wait
from utils.ray_distributed_utils import ray_worker_node_wait
from utils.run_restful_chat import start_proxy_server, start_restful_api, stop_restful_api

DEFAULT_PORT = 23333
Expand Down Expand Up @@ -75,6 +80,92 @@ def prepare_environment_judge_evaluate(request, config, worker_id):
stop_restful_api(proxy_pid, proxy_process, request.param)


def _run_ray_distributed_test(
config,
run_id,
model_param,
worker_id,
test_type='infer',
manager=None, # ← New parameter: pass in shared manager
eval_config_name='default'):
"""Universal distributed test executor (using shared Ray cluster)"""
assert manager is not None, 'Manager instance must be provided'
if 'gpt' in model_param.get('model', '').lower():
eval_config_name = 'gpt'
preset_config = EVAL_CONFIGS.get(eval_config_name, {})

if manager.is_master:
model_name = model_param['model']
model_path = os.path.join(config['model_path'], model_name)
preset_config = EVAL_CONFIGS.get(eval_config_name, {})

# Start API Server for current model (master node starts/stops, worker nodes verify)
manager.start_lmdeploy_api_server(model_path=model_path, model_param=model_param)

try:
print(f'🧪 Master node executing {test_type} test ({eval_config_name})...')
result, msg = restful_test(config,
run_id,
model_param,
worker_id=worker_id,
port=PROXY_PORT,
test_type=test_type,
**preset_config)
assert result, f'❌ {test_type} test failed: {msg}'
print(f'✅ {test_type} test passed')

finally:
# Clean up API Server for current model (worker nodes skip)
manager.cleanup(force=False)
else:
time.sleep(10)
ray_worker_node_wait(manager, timeout_minutes=4880)


def _run_proxy_distributed_test(config,
run_id,
model_param,
worker_id,
test_type='infer',
manager=None,
eval_config_name='default'):
assert manager is not None, 'Manager instance must be provided'

if 'gpt' in model_param.get('model', '').lower():
eval_config_name = 'gpt'

preset_config = EVAL_CONFIGS.get(eval_config_name, {})
model_name = model_param['model']
model_path = os.path.join(config['model_path'], model_name)

api_server = ApiServerPerTest(proxy_manager=manager, model_path=model_path, model_param=model_param)
api_server.start()

try:
if manager.is_master:
api_server.wait_until_ready()
print(f'🧪 Master node executing {test_type} test ({eval_config_name})...')

result, msg = restful_test(config,
run_id,
model_param,
worker_id=worker_id,
port=PROXY_PORT,
test_type=test_type,
**preset_config)
assert result, f'❌ {test_type} test failed: {msg}'
print(f'✅ {test_type} test passed')

else:
print(f'⏸️ Worker node {manager.node_rank} waiting for master to complete test...')
proxy_worker_node_wait(manager, timeout_minutes=4880)

finally:
api_server.cleanup()
if manager.is_master:
time.sleep(1)


def get_turbomind_model_list(tp_num):
model_list = get_evaluate_turbomind_model_list(tp_num, kvint_list=[4, 8])
new_model_list = []
Expand Down Expand Up @@ -220,6 +311,34 @@ def test_pytorch_restful_tp16(config, run_id, prepare_environment, worker_id):
assert result, msg


@pytest.mark.infer
@pytest.mark.pytorch
@pytest.mark.gpu_num_distributed_tp16
@pytest.mark.flaky(reruns=0)
@pytest.mark.parametrize('model_param', get_pytorch_model_list(tp_num=16))
def test_pytorch_restful_distributed_tp16(shared_ray_manager, config, run_id, model_param, worker_id):
_run_ray_distributed_test(config=config,
run_id=run_id,
model_param=model_param,
worker_id=worker_id,
test_type='infer',
manager=shared_ray_manager)


@pytest.mark.infer
@pytest.mark.pytorch
@pytest.mark.gpu_num_distributed_dpep16
@pytest.mark.flaky(reruns=0)
@pytest.mark.parametrize('model_param', get_pytorch_model_list(tp_num=16))
def test_pytorch_restful_distributed_dpep16(shared_proxy_manager, config, run_id, model_param, worker_id):
_run_proxy_distributed_test(config=config,
run_id=run_id,
model_param=model_param,
worker_id=worker_id,
test_type='infer',
manager=shared_proxy_manager)


@pytest.mark.eval
@pytest.mark.pytorch
@pytest.mark.gpu_num_1
Expand Down
110 changes: 103 additions & 7 deletions autotest/tools/restful/test_restful_chat_hf_pytorch_llm.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,28 @@
import os
import time

import pytest
from utils.config_utils import get_torch_model_list, get_workerid
from utils.proxy_distributed_utils import ApiServerPerTest, proxy_worker_node_wait
from utils.ray_distributed_utils import ray_worker_node_wait
from utils.run_restful_chat import run_all_step, run_reasoning_case, run_tools_case, start_restful_api, stop_restful_api

DEFAULT_PORT = 23333
PROXY_PORT = 8000


@pytest.fixture(scope='function', autouse=True)
def prepare_environment(request, config, worker_id):
param = request.param
model = param['model']
model_path = config.get('model_path') + '/' + model

pid, startRes = start_restful_api(config, param, model, model_path, 'pytorch', worker_id)
yield
stop_restful_api(pid, startRes, param)
if hasattr(request, 'param'):
param = request.param
model = param['model']
model_path = config.get('model_path') + '/' + model

pid, startRes = start_restful_api(config, param, model, model_path, 'pytorch', worker_id)
yield
stop_restful_api(pid, startRes, param)
else:
yield


def getModelList(tp_num):
Expand All @@ -33,6 +42,65 @@ def getPrefixCacheModelList(tp_num):
} for item in get_torch_model_list(tp_num, exclude_dup=True)]


def _run_ray_distributed_test(
config,
model_param,
common_case_config,
worker_id,
manager=None, # ← New parameter: pass in shared manager
):
"""Universal distributed test executor (using shared Ray cluster)"""
assert manager is not None, 'Manager instance must be provided'

if manager.is_master:
model_name = model_param['model']
model_path = os.path.join(config['model_path'], model_name)

# Start API Server for current model (master node starts/stops, worker nodes verify)
manager.start_lmdeploy_api_server(model_path=model_path, model_param=model_param)

try:
run_all_step(config, common_case_config, worker_id=worker_id, port=PROXY_PORT)

finally:
# Clean up API Server for current model (worker nodes skip)
manager.cleanup(force=False)
else:
time.sleep(10)
ray_worker_node_wait(manager, timeout_minutes=4880)


def _run_proxy_distributed_test(
config,
model_param,
common_case_config,
worker_id,
manager=None, # ← New parameter: pass in shared manager
):
"""Universal distributed test executor (using shared Ray cluster)"""
assert manager is not None, 'Manager instance must be provided'
model_name = model_param['model']
model_path = os.path.join(config['model_path'], model_name)

api_server = ApiServerPerTest(proxy_manager=manager, model_path=model_path, model_param=model_param)
api_server.start()

try:

if manager.is_master:
api_server.wait_until_ready()

run_all_step(config, common_case_config, worker_id=worker_id, port=PROXY_PORT)

else:
print(f'⏸️ Worker node {manager.node_rank} waiting for master to complete test...')
proxy_worker_node_wait(manager, timeout_minutes=4880)
finally:
api_server.cleanup()
if manager.is_master:
time.sleep(1)


@pytest.mark.order(7)
@pytest.mark.usefixtures('common_case_config')
@pytest.mark.prefix_cache_test
Expand Down Expand Up @@ -111,6 +179,34 @@ def test_restful_chat_tp16(config, common_case_config, worker_id):
run_all_step(config, common_case_config, worker_id=worker_id, port=DEFAULT_PORT + get_workerid(worker_id))


@pytest.mark.order(7)
@pytest.mark.usefixtures('common_case_config')
@pytest.mark.restful_api_pytorch
@pytest.mark.flaky(reruns=0)
@pytest.mark.gpu_num_distributed_tp16
@pytest.mark.parametrize('model_param', getModelList(tp_num=16))
def test_restful_chat_distributed_tp16(shared_ray_manager, config, model_param, common_case_config, worker_id):
_run_ray_distributed_test(config=config,
model_param=model_param,
common_case_config=common_case_config,
worker_id=worker_id,
manager=shared_ray_manager)


@pytest.mark.order(7)
@pytest.mark.usefixtures('common_case_config')
@pytest.mark.restful_api_pytorch
@pytest.mark.flaky(reruns=0)
@pytest.mark.gpu_num_distributed_dpep16
@pytest.mark.parametrize('model_param', getModelList(tp_num=16))
def test_restful_chat_distributed_dpep16(shared_proxy_manager, config, model_param, common_case_config, worker_id):
_run_proxy_distributed_test(config=config,
model_param=model_param,
common_case_config=common_case_config,
worker_id=worker_id,
manager=shared_proxy_manager)


def getKvintModelList(tp_num, quant_policy):
return [{
'model': item,
Expand Down
Loading