Skip to content

Commit 4a27f39

Browse files
committed
feat(keyvalue): Filesystem backed KeyValueStore
Running any of the components with `--store-kv file` will use the file system instead of etcd for discovery. If there is a shared folder, or for the single-node case, this allows running frontend + backend without etcd.
1 parent 1ab2fe1 commit 4a27f39

File tree

34 files changed

+747
-192
lines changed

34 files changed

+747
-192
lines changed

Cargo.lock

Lines changed: 23 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

components/src/dynamo/frontend/main.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,12 @@ def parse_args():
225225
),
226226
help=f"Interval in seconds for polling custom backend metrics. Set to > 0 to enable polling (default: 0=disabled, suggested: 9.2s which is less than typical Prometheus scrape interval). Can be set via {CUSTOM_BACKEND_METRICS_POLLING_INTERVAL_ENV_VAR} env var.",
227227
)
228+
parser.add_argument(
229+
"--store-kv",
230+
type=str,
231+
default=os.environ.get("DYN_STORE_KV", "etcd"),
232+
help="Which key-value backend to use: etcd, mem, file. Etcd uses the ETCD_* env vars (e.g. ETCD_ENPOINTS) for connection details. File uses root dir from env var DYN_FILE_KV or defaults to $TMP/dynamo_store_kv.",
233+
)
228234

229235
flags = parser.parse_args()
230236

@@ -252,8 +258,7 @@ async def async_main():
252258
os.environ["DYN_METRICS_PREFIX"] = flags.metrics_prefix
253259

254260
loop = asyncio.get_running_loop()
255-
256-
runtime = DistributedRuntime(loop, is_static)
261+
runtime = DistributedRuntime(loop, flags.store_kv, is_static)
257262

258263
def signal_handler():
259264
asyncio.create_task(graceful_shutdown(runtime))

components/src/dynamo/mocker/args.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,12 @@ def parse_args():
204204
default=False,
205205
help="Mark this as a decode worker which does not publish KV events and skips prefill cost estimation (default: False)",
206206
)
207+
parser.add_argument(
208+
"--store-kv",
209+
type=str,
210+
default=os.environ.get("DYN_STORE_KV", "etcd"),
211+
help="Which key-value backend to use: etcd, mem, file. Etcd uses the ETCD_* env vars (e.g. ETCD_ENPOINTS) for connection details. File uses root dir from env var DYN_FILE_KV or defaults to $TMP/dynamo_store_kv.",
212+
)
207213

208214
args = parser.parse_args()
209215
validate_worker_type_args(args)

components/src/dynamo/mocker/main.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ async def launch_workers(args, extra_engine_args_path):
7272
logger.info(f"Creating mocker worker {worker_id + 1}/{args.num_workers}")
7373

7474
# Create a separate DistributedRuntime for this worker (on same event loop)
75-
runtime = DistributedRuntime(loop, False)
75+
runtime = DistributedRuntime(loop, args.store_kv, False)
7676
runtimes.append(runtime)
7777

7878
# Create EntrypointArgs for this worker

components/src/dynamo/sglang/args.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,12 @@
9393
"default": None,
9494
"help": "Dump debug config to the specified file path. If not specified, the config will be dumped to stdout at INFO level.",
9595
},
96+
"store-kv": {
97+
"flags": ["--store-kv"],
98+
"type": str,
99+
"default": os.environ.get("DYN_STORE_KV", "etcd"),
100+
"help": "Which key-value backend to use: etcd, mem, file. Etcd uses the ETCD_* env vars (e.g. ETCD_ENPOINTS) for connection details. File uses root dir from env var DYN_FILE_KV or defaults to $TMP/dynamo_store_kv.",
101+
},
96102
}
97103

98104

@@ -102,6 +108,7 @@ class DynamoArgs:
102108
component: str
103109
endpoint: str
104110
migration_limit: int
111+
store_kv: str
105112

106113
# tool and reasoning parser options
107114
tool_call_parser: Optional[str] = None
@@ -329,6 +336,7 @@ async def parse_args(args: list[str]) -> Config:
329336
component=parsed_component_name,
330337
endpoint=parsed_endpoint_name,
331338
migration_limit=parsed_args.migration_limit,
339+
store_kv=parsed_args.store_kv,
332340
tool_call_parser=tool_call_parser,
333341
reasoning_parser=reasoning_parser,
334342
custom_jinja_template=expanded_template_path,

components/src/dynamo/sglang/main.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111

1212
from dynamo.common.config_dump import dump_config
1313
from dynamo.llm import ModelInput, ModelType
14-
from dynamo.runtime import DistributedRuntime, dynamo_worker
14+
from dynamo.runtime import DistributedRuntime
1515
from dynamo.runtime.logging import configure_dynamo_logging
1616
from dynamo.sglang.args import Config, DisaggregationMode, parse_args
1717
from dynamo.sglang.health_check import (
@@ -33,9 +33,12 @@
3333
configure_dynamo_logging()
3434

3535

36-
@dynamo_worker(static=False)
37-
async def worker(runtime: DistributedRuntime):
36+
async def worker():
37+
config = await parse_args(sys.argv[1:])
38+
dump_config(config.dynamo_args.dump_config_to, config)
39+
3840
loop = asyncio.get_running_loop()
41+
runtime = DistributedRuntime(loop, config.dynamo_args.store_kv, False)
3942

4043
def signal_handler():
4144
asyncio.create_task(graceful_shutdown(runtime))
@@ -45,9 +48,6 @@ def signal_handler():
4548

4649
logging.info("Signal handlers will trigger a graceful shutdown of the runtime")
4750

48-
config = await parse_args(sys.argv[1:])
49-
dump_config(config.dynamo_args.dump_config_to, config)
50-
5151
if config.dynamo_args.embedding_worker:
5252
await init_embedding(runtime, config)
5353
elif config.dynamo_args.multimodal_processor:

components/src/dynamo/trtllm/main.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
from dynamo.common.config_dump import dump_config
4040
from dynamo.common.utils.prometheus import register_engine_metrics_callback
4141
from dynamo.llm import ModelInput, ModelRuntimeConfig, ModelType, register_llm
42-
from dynamo.runtime import DistributedRuntime, dynamo_worker
42+
from dynamo.runtime import DistributedRuntime
4343
from dynamo.runtime.logging import configure_dynamo_logging
4444
from dynamo.trtllm.engine import TensorRTLLMEngine, get_llm_engine
4545
from dynamo.trtllm.health_check import TrtllmHealthCheckPayload
@@ -102,11 +102,13 @@ async def get_engine_runtime_config(
102102
return runtime_config
103103

104104

105-
@dynamo_worker(static=False)
106-
async def worker(runtime: DistributedRuntime):
107-
# Set up signal handler for graceful shutdown
105+
async def worker():
106+
config = cmd_line_args()
107+
108108
loop = asyncio.get_running_loop()
109+
runtime = DistributedRuntime(loop, config.store_kv, False)
109110

111+
# Set up signal handler for graceful shutdown
110112
def signal_handler():
111113
# Schedule the shutdown coroutine instead of calling it directly
112114
asyncio.create_task(graceful_shutdown(runtime))
@@ -116,7 +118,6 @@ def signal_handler():
116118

117119
logging.info("Signal handlers set up for graceful shutdown")
118120

119-
config = cmd_line_args()
120121
await init(runtime, config)
121122

122123

components/src/dynamo/trtllm/utils/trtllm_utils.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ def __init__(self) -> None:
5858
self.tool_call_parser: Optional[str] = None
5959
self.dump_config_to: Optional[str] = None
6060
self.custom_jinja_template: Optional[str] = None
61+
self.store_kv: str = ""
6162

6263
def __str__(self) -> str:
6364
return (
@@ -88,7 +89,8 @@ def __str__(self) -> str:
8889
f"reasoning_parser={self.reasoning_parser}, "
8990
f"tool_call_parser={self.tool_call_parser}, "
9091
f"dump_config_to={self.dump_config_to},"
91-
f"custom_jinja_template={self.custom_jinja_template}"
92+
f"custom_jinja_template={self.custom_jinja_template}",
93+
f"store_kv={self.store_kv}",
9294
)
9395

9496

@@ -278,6 +280,12 @@ def cmd_line_args():
278280
default=None,
279281
help="Path to a custom Jinja template file to override the model's default chat template. This template will take precedence over any template found in the model repository.",
280282
)
283+
parser.add_argument(
284+
"--store-kv",
285+
type=str,
286+
default=os.environ.get("DYN_STORE_KV", "etcd"),
287+
help="Which key-value backend to use: etcd, mem, file. Etcd uses the ETCD_* env vars (e.g. ETCD_ENPOINTS) for connection details. File uses root dir from env var DYN_FILE_KV or defaults to $TMP/dynamo_store_kv.",
288+
)
281289

282290
args = parser.parse_args()
283291

@@ -337,6 +345,7 @@ def cmd_line_args():
337345
config.reasoning_parser = args.dyn_reasoning_parser
338346
config.tool_call_parser = args.dyn_tool_call_parser
339347
config.dump_config_to = args.dump_config_to
348+
config.store_kv = args.store_kv
340349

341350
# Handle custom jinja template path expansion (environment variables and home directory)
342351
if args.custom_jinja_template:

components/src/dynamo/vllm/args.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ class Config:
5050
kv_port: Optional[int] = None
5151
port_range: DynamoPortRange
5252
custom_jinja_template: Optional[str] = None
53+
store_kv: str
5354

5455
# mirror vLLM
5556
model: str
@@ -188,6 +189,12 @@ def parse_args() -> Config:
188189
"'USER: <image> please describe the image ASSISTANT:'."
189190
),
190191
)
192+
parser.add_argument(
193+
"--store-kv",
194+
type=str,
195+
default=os.environ.get("DYN_STORE_KV", "etcd"),
196+
help="Which key-value backend to use: etcd, mem, file. Etcd uses the ETCD_* env vars (e.g. ETCD_ENPOINTS) for connection details. File uses root dir from env var DYN_FILE_KV or defaults to $TMP/dynamo_store_kv.",
197+
)
191198
add_config_dump_args(parser)
192199

193200
parser = AsyncEngineArgs.add_cli_args(parser)

components/src/dynamo/vllm/main.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
fetch_llm,
2727
register_llm,
2828
)
29-
from dynamo.runtime import DistributedRuntime, dynamo_worker
29+
from dynamo.runtime import DistributedRuntime
3030
from dynamo.runtime.logging import configure_dynamo_logging
3131
from dynamo.vllm.multimodal_handlers import (
3232
EncodeWorkerHandler,
@@ -71,16 +71,16 @@ async def graceful_shutdown(runtime):
7171
logging.info("DistributedRuntime shutdown complete")
7272

7373

74-
@dynamo_worker(static=False)
75-
async def worker(runtime: DistributedRuntime):
74+
async def worker():
7675
config = parse_args()
7776

77+
loop = asyncio.get_running_loop()
78+
runtime = DistributedRuntime(loop, config.store_kv, False)
79+
7880
await configure_ports(runtime, config)
7981
overwrite_args(config)
8082

8183
# Set up signal handler for graceful shutdown
82-
loop = asyncio.get_running_loop()
83-
8484
def signal_handler():
8585
asyncio.create_task(graceful_shutdown(runtime))
8686

0 commit comments

Comments
 (0)