Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 7 additions & 2 deletions components/src/dynamo/frontend/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,12 @@ def parse_args():
),
help=f"Interval in seconds for polling custom backend metrics. Set to > 0 to enable polling (default: 0=disabled, suggested: 9.2s which is less than typical Prometheus scrape interval). Can be set via {CUSTOM_BACKEND_METRICS_POLLING_INTERVAL_ENV_VAR} env var.",
)
parser.add_argument(
"--store-kv",
type=str,
default=os.environ.get("DYN_STORE_KV", "etcd"),
help="Which key-value backend to use: etcd, mem, file. Etcd uses the ETCD_* env vars (e.g. ETCD_ENPOINTS) for connection details. File uses root dir from env var DYN_FILE_KV or defaults to $TMPDIR/dynamo_store_kv.",
)

flags = parser.parse_args()

Expand Down Expand Up @@ -252,8 +258,7 @@ async def async_main():
os.environ["DYN_METRICS_PREFIX"] = flags.metrics_prefix

loop = asyncio.get_running_loop()

runtime = DistributedRuntime(loop, is_static)
runtime = DistributedRuntime(loop, flags.store_kv, is_static)

def signal_handler():
asyncio.create_task(graceful_shutdown(runtime))
Expand Down
6 changes: 6 additions & 0 deletions components/src/dynamo/mocker/args.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,12 @@ def parse_args():
default=False,
help="Mark this as a decode worker which does not publish KV events and skips prefill cost estimation (default: False)",
)
parser.add_argument(
"--store-kv",
type=str,
default=os.environ.get("DYN_STORE_KV", "etcd"),
help="Which key-value backend to use: etcd, mem, file. Etcd uses the ETCD_* env vars (e.g. ETCD_ENPOINTS) for connection details. File uses root dir from env var DYN_FILE_KV or defaults to $TMPDIR/dynamo_store_kv.",
)

args = parser.parse_args()
validate_worker_type_args(args)
Expand Down
2 changes: 1 addition & 1 deletion components/src/dynamo/mocker/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ async def launch_workers(args, extra_engine_args_path):
logger.info(f"Creating mocker worker {worker_id + 1}/{args.num_workers}")

# Create a separate DistributedRuntime for this worker (on same event loop)
runtime = DistributedRuntime(loop, False)
runtime = DistributedRuntime(loop, args.store_kv, False)
runtimes.append(runtime)

# Create EntrypointArgs for this worker
Expand Down
8 changes: 8 additions & 0 deletions components/src/dynamo/sglang/args.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@
"default": None,
"help": "Dump debug config to the specified file path. If not specified, the config will be dumped to stdout at INFO level.",
},
"store-kv": {
"flags": ["--store-kv"],
"type": str,
"default": os.environ.get("DYN_STORE_KV", "etcd"),
"help": "Which key-value backend to use: etcd, mem, file. Etcd uses the ETCD_* env vars (e.g. ETCD_ENPOINTS) for connection details. File uses root dir from env var DYN_FILE_KV or defaults to $TMPDIR/dynamo_store_kv.",
},
}


Expand All @@ -102,6 +108,7 @@ class DynamoArgs:
component: str
endpoint: str
migration_limit: int
store_kv: str

# tool and reasoning parser options
tool_call_parser: Optional[str] = None
Expand Down Expand Up @@ -329,6 +336,7 @@ async def parse_args(args: list[str]) -> Config:
component=parsed_component_name,
endpoint=parsed_endpoint_name,
migration_limit=parsed_args.migration_limit,
store_kv=parsed_args.store_kv,
tool_call_parser=tool_call_parser,
reasoning_parser=reasoning_parser,
custom_jinja_template=expanded_template_path,
Expand Down
12 changes: 6 additions & 6 deletions components/src/dynamo/sglang/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from dynamo.common.config_dump import dump_config
from dynamo.llm import ModelInput, ModelType
from dynamo.runtime import DistributedRuntime, dynamo_worker
from dynamo.runtime import DistributedRuntime
from dynamo.runtime.logging import configure_dynamo_logging
from dynamo.sglang.args import Config, DisaggregationMode, parse_args
from dynamo.sglang.health_check import (
Expand All @@ -33,9 +33,12 @@
configure_dynamo_logging()


@dynamo_worker(static=False)
async def worker(runtime: DistributedRuntime):
async def worker():
config = await parse_args(sys.argv[1:])
dump_config(config.dynamo_args.dump_config_to, config)

loop = asyncio.get_running_loop()
runtime = DistributedRuntime(loop, config.dynamo_args.store_kv, False)

def signal_handler():
asyncio.create_task(graceful_shutdown(runtime))
Expand All @@ -45,9 +48,6 @@ def signal_handler():

logging.info("Signal handlers will trigger a graceful shutdown of the runtime")

config = await parse_args(sys.argv[1:])
dump_config(config.dynamo_args.dump_config_to, config)

if config.dynamo_args.embedding_worker:
await init_embedding(runtime, config)
elif config.dynamo_args.multimodal_processor:
Expand Down
11 changes: 6 additions & 5 deletions components/src/dynamo/trtllm/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
from dynamo.common.config_dump import dump_config
from dynamo.common.utils.prometheus import register_engine_metrics_callback
from dynamo.llm import ModelInput, ModelRuntimeConfig, ModelType, register_llm
from dynamo.runtime import DistributedRuntime, dynamo_worker
from dynamo.runtime import DistributedRuntime
from dynamo.runtime.logging import configure_dynamo_logging
from dynamo.trtllm.engine import TensorRTLLMEngine, get_llm_engine
from dynamo.trtllm.health_check import TrtllmHealthCheckPayload
Expand Down Expand Up @@ -102,11 +102,13 @@ async def get_engine_runtime_config(
return runtime_config


@dynamo_worker(static=False)
async def worker(runtime: DistributedRuntime):
# Set up signal handler for graceful shutdown
async def worker():
config = cmd_line_args()

loop = asyncio.get_running_loop()
runtime = DistributedRuntime(loop, config.store_kv, False)

# Set up signal handler for graceful shutdown
def signal_handler():
# Schedule the shutdown coroutine instead of calling it directly
asyncio.create_task(graceful_shutdown(runtime))
Expand All @@ -116,7 +118,6 @@ def signal_handler():

logging.info("Signal handlers set up for graceful shutdown")

config = cmd_line_args()
await init(runtime, config)


Expand Down
13 changes: 11 additions & 2 deletions components/src/dynamo/trtllm/utils/trtllm_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def __init__(self) -> None:
self.tool_call_parser: Optional[str] = None
self.dump_config_to: Optional[str] = None
self.custom_jinja_template: Optional[str] = None
self.store_kv: str = ""

def __str__(self) -> str:
return (
Expand Down Expand Up @@ -87,8 +88,9 @@ def __str__(self) -> str:
f"max_file_size_mb={self.max_file_size_mb}, "
f"reasoning_parser={self.reasoning_parser}, "
f"tool_call_parser={self.tool_call_parser}, "
f"dump_config_to={self.dump_config_to},"
f"custom_jinja_template={self.custom_jinja_template}"
f"dump_config_to={self.dump_config_to}, "
f"custom_jinja_template={self.custom_jinja_template}, "
f"store_kv={self.store_kv}"
)


Expand Down Expand Up @@ -278,6 +280,12 @@ def cmd_line_args():
default=None,
help="Path to a custom Jinja template file to override the model's default chat template. This template will take precedence over any template found in the model repository.",
)
parser.add_argument(
"--store-kv",
type=str,
default=os.environ.get("DYN_STORE_KV", "etcd"),
help="Which key-value backend to use: etcd, mem, file. Etcd uses the ETCD_* env vars (e.g. ETCD_ENPOINTS) for connection details. File uses root dir from env var DYN_FILE_KV or defaults to $TMPDIR/dynamo_store_kv.",
)

args = parser.parse_args()

Expand Down Expand Up @@ -337,6 +345,7 @@ def cmd_line_args():
config.reasoning_parser = args.dyn_reasoning_parser
config.tool_call_parser = args.dyn_tool_call_parser
config.dump_config_to = args.dump_config_to
config.store_kv = args.store_kv

# Handle custom jinja template path expansion (environment variables and home directory)
if args.custom_jinja_template:
Expand Down
8 changes: 8 additions & 0 deletions components/src/dynamo/vllm/args.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class Config:
migration_limit: int = 0
kv_port: Optional[int] = None
custom_jinja_template: Optional[str] = None
store_kv: str

# mirror vLLM
model: str
Expand Down Expand Up @@ -164,6 +165,12 @@ def parse_args() -> Config:
"'USER: <image> please describe the image ASSISTANT:'."
),
)
parser.add_argument(
"--store-kv",
type=str,
default=os.environ.get("DYN_STORE_KV", "etcd"),
help="Which key-value backend to use: etcd, mem, file. Etcd uses the ETCD_* env vars (e.g. ETCD_ENPOINTS) for connection details. File uses root dir from env var DYN_FILE_KV or defaults to $TMPDIR/dynamo_store_kv.",
)
add_config_dump_args(parser)

parser = AsyncEngineArgs.add_cli_args(parser)
Expand Down Expand Up @@ -233,6 +240,7 @@ def parse_args() -> Config:
config.multimodal_worker = args.multimodal_worker
config.multimodal_encode_prefill_worker = args.multimodal_encode_prefill_worker
config.mm_prompt_template = args.mm_prompt_template
config.store_kv = args.store_kv

# Validate custom Jinja template file exists if provided
if config.custom_jinja_template is not None:
Expand Down
10 changes: 5 additions & 5 deletions components/src/dynamo/vllm/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
fetch_llm,
register_llm,
)
from dynamo.runtime import DistributedRuntime, dynamo_worker
from dynamo.runtime import DistributedRuntime
from dynamo.runtime.logging import configure_dynamo_logging
from dynamo.vllm.multimodal_handlers import (
EncodeWorkerHandler,
Expand Down Expand Up @@ -70,16 +70,16 @@ async def graceful_shutdown(runtime):
logging.info("DistributedRuntime shutdown complete")


@dynamo_worker(static=False)
async def worker(runtime: DistributedRuntime):
async def worker():
config = parse_args()

loop = asyncio.get_running_loop()
runtime = DistributedRuntime(loop, config.store_kv, False)

await configure_ports(config)
overwrite_args(config)

# Set up signal handler for graceful shutdown
loop = asyncio.get_running_loop()

def signal_handler():
asyncio.create_task(graceful_shutdown(runtime))

Expand Down
2 changes: 1 addition & 1 deletion examples/custom_backend/cancellation/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ async def main():
return

loop = asyncio.get_running_loop()
runtime = DistributedRuntime(loop, True)
runtime = DistributedRuntime(loop, "mem", True)

# Connect to middle server or direct server based on argument
if use_middle_server:
Expand Down
2 changes: 1 addition & 1 deletion examples/custom_backend/cancellation/middle_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ async def generate(self, request, context):
async def main():
"""Start the middle server"""
loop = asyncio.get_running_loop()
runtime = DistributedRuntime(loop, True)
runtime = DistributedRuntime(loop, "mem", True)

# Create middle server handler
handler = MiddleServer(runtime)
Expand Down
2 changes: 1 addition & 1 deletion examples/custom_backend/cancellation/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ async def generate(self, request, context):
async def main():
"""Start the demo server"""
loop = asyncio.get_running_loop()
runtime = DistributedRuntime(loop, True)
runtime = DistributedRuntime(loop, "mem", True)

# Create server component
component = runtime.namespace("demo").component("server")
Expand Down
2 changes: 1 addition & 1 deletion examples/custom_backend/nim/mock_nim_frontend.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ async def async_main():

# Create DistributedRuntime - similar to frontend/main.py line 246
is_static = True # Use static mode (no etcd)
runtime = DistributedRuntime(loop, is_static) # type: ignore[call-arg]
runtime = DistributedRuntime(loop, "mem", is_static) # type: ignore[call-arg]

# Setup signal handlers for graceful shutdown
def signal_handler():
Expand Down
6 changes: 6 additions & 0 deletions launch/dynamo-run/src/flags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,12 @@ pub struct Flags {
#[arg(long, default_value = "false")]
pub static_worker: bool,

/// Which key-value backend to use: etcd, mem, file.
/// Etcd uses the ETCD_* env vars (e.g. ETCD_ENPOINTS) for connection details.
/// File uses root dir from env var DYN_FILE_KV or defaults to $TMPDIR/dynamo_store_kv.
#[arg(long, default_value = "etcd")]
pub store_kv: String,

/// Everything after a `--`.
/// These are the command line arguments to the python engine when using `pystr` or `pytok`.
#[arg(index = 2, last = true, hide = true, allow_hyphen_values = true)]
Expand Down
Loading
Loading