From 19087910624288dfce08425c0a9270d6b91b93c2 Mon Sep 17 00:00:00 2001 From: Wei Su Date: Wed, 25 Mar 2026 13:40:03 -0700 Subject: [PATCH 1/2] Add memory file option (-e) for fast warm restart (#540) Summary: Pull Request resolved: https://github.com/facebookresearch/DCPerf/pull/540 Expose memcached's native `-e` memory file option in TaoBench. When specified, memcached mmaps the given file for slab storage. On graceful shutdown (SIGUSR1), state is saved to the file. On subsequent runs with the same file, cache data is pre-loaded, drastically reducing warmup time. Changes: - Add `--memory-file` argument to server args - Append `-e ` to memcached command when memory file specified - Use SIGUSR1 for graceful shutdown (60s grace period) when memory file is in use - Per-instance memory files in autoscale mode (suffix `.0`, `.1`, etc.) - Auto-expand `/dev/shm` when tmpfs is smaller than memsize - Add `memory_file` variable to all TaoBench job configurations - Update README with memory file documentation Reviewed By: gandhijayneel Differential Revision: D97244738 --- benchpress/config/jobs.yml | 10 +++ packages/tao_bench/README.md | 90 +++++++++++++++++++++++++ packages/tao_bench/args_utils.py | 10 +++ packages/tao_bench/run.py | 54 +++++++++++---- packages/tao_bench/run_autoscale.py | 99 ++++++++++++++++++++++------ packages/tao_bench/run_standalone.py | 4 ++ 6 files changed, 234 insertions(+), 33 deletions(-) diff --git a/benchpress/config/jobs.yml b/benchpress/config/jobs.yml index fe163b37..fda31a4b 100644 --- a/benchpress/config/jobs.yml +++ b/benchpress/config/jobs.yml @@ -1045,6 +1045,7 @@ - '--timeout-buffer={timeout_buffer}' - '--disable-tls={disable_tls}' - '--smart-nanosleep={smart_nanosleep}' + - '--memory-file={memory_file}' - '--real' vars: - 'interface_name=eth0' @@ -1056,6 +1057,7 @@ - 'timeout_buffer=120' - 'disable_tls=0' - 'smart_nanosleep=0' + - 'memory_file=' client: args: - 'client' @@ -1123,6 +1125,7 @@ - '--client-wait-after-warmup={client_wait_after_warmup}' - '--disable-tls={disable_tls}' - '--smart-nanosleep={smart_nanosleep}' + - '--memory-file={memory_file}' - '--real' vars: - 'num_servers=0' @@ -1144,6 +1147,7 @@ - 'client_wait_after_warmup=5' - 'disable_tls=0' - 'smart_nanosleep=0' + - 'memory_file=' hooks: - hook: tao_instruction - hook: copymove @@ -1179,6 +1183,7 @@ - '--num-slow-threads={num_slow_threads}' - '--num-fast-threads={num_fast_threads}' - '--num-client-threads={num_client_threads}' + - '--memory-file={memory_file}' vars: - 'num_servers=0' - 'memsize=0' @@ -1196,6 +1201,7 @@ - 'num_slow_threads=0' - 'num_fast_threads=0' - 'num_client_threads=0' + - 'memory_file=' hooks: - hook: copymove options: @@ -1233,6 +1239,7 @@ - '--test-timeout-buffer={test_timeout_buffer}' - '--postprocessing-timeout-buffer={postprocessing_timeout_buffer}' - '--poll-interval={poll_interval}' + - '--memory-file={memory_file}' vars: - 'num_servers=0' - 'memsize=0.5' @@ -1254,6 +1261,7 @@ - 'test_timeout_buffer=0' - 'postprocessing_timeout_buffer=60' - 'poll_interval=0.2' + - 'memory_file=' hooks: - hook: copymove options: @@ -1290,6 +1298,7 @@ - '--conns-per-server-core={conns_per_server_core}' - '--stats-interval={stats_interval}' - '--disable-tls={disable_tls}' + - '--memory-file={memory_file}' - '--real' vars: - 'num_servers=0' @@ -1308,6 +1317,7 @@ - 'conns_per_server_core=85' - 'stats_interval=5000' - 'disable_tls=0' + - 'memory_file=' hooks: - hook: tao_instruction - hook: copymove diff --git a/packages/tao_bench/README.md b/packages/tao_bench/README.md index e2922793..8eba9e5a 100644 --- a/packages/tao_bench/README.md +++ b/packages/tao_bench/README.md @@ -313,6 +313,96 @@ the guide for the `tao_bench_autoscale` job. Just replace the job name. A good run of this workload should have about 75~80% of CPU utilization in the steady state, of which 25~30% should be in userspace. +## Memory file for fast warm restart + +TaoBench supports a memory file option that allows the server to persist its +cache state across runs. On the first run, TaoBench fills the cache from scratch +(cold start). On subsequent runs with the same memory file, the server reloads +the pre-filled cache instantly, skipping the lengthy warmup phase. + +This uses memcached's native `-e` flag, which mmaps a file for slab storage. +On graceful shutdown (SIGUSR1), memcached saves a `.meta` restart file. On the +next startup with the same memory file, memcached restores all cached items +from the file. + +### Usage + +Specify the `memory_file` parameter pointing to a path on a tmpfs filesystem +(e.g. `/dev/shm`): + +```bash +./benchpress_cli.py run tao_bench_autoscale -i '{"memory_file": "/dev/shm/tao_bench_mem"}' +``` + +On the first run, TaoBench will create per-instance memory files at +`/dev/shm/tao_bench_mem.0`, `/dev/shm/tao_bench_mem.1`, etc. When the run +completes, the server is shut down with SIGUSR1 which saves the restart +metadata (a small `.meta` file alongside each memory file). + +On subsequent runs with the same `memory_file` path, the server loads all +cached items from the memory files, drastically reducing warmup time. + +### /dev/shm sizing + +The memory files are stored in `/dev/shm` (tmpfs), which defaults to 50% of +system RAM. When using large `memsize` values, `/dev/shm` may be too small. +TaoBench automatically expands `/dev/shm` when needed (requires root), but +you can also do it manually: + +```bash +mount -o remount,size=800G /dev/shm +``` + +### Parameters + + - `memory_file`: Path prefix for memory files. Each server instance gets a + suffixed file (e.g. `/dev/shm/tao_bench_mem.0`). Default is empty (disabled). + +## Auto-warmup detection + +When enabled, TaoBench monitors server statistics during the warmup phase and +automatically signals clients to stop warmup early once the server reaches a +warmed-up state. The `warmup_time` parameter becomes the maximum warmup +duration rather than a fixed duration. + +### How it works + +The server opens a TCP control port (`port_number_start + 1000`, default 12211) +and monitors the server log output in real time. Warmup is considered complete +when: + +1. Hit ratio reaches >= 95% of `target_hit_ratio` (default 0.9, threshold 0.855) +2. QPS stabilizes (coefficient of variation < 5%) over a 2-minute rolling window + +When both conditions are met for all server instances, the control port responds +`READY` to client polls. Clients poll the control port every 5 seconds during +warmup, and terminate the warmup phase early when the server reports `READY`. + +### Usage + +Set `auto_warmup` to 1: + +```bash +./benchpress_cli.py run tao_bench_autoscale -i '{"auto_warmup": 1}' +``` + +This is most useful combined with the memory file option: + +```bash +# Second run with pre-warmed cache + auto-warmup detection +./benchpress_cli.py run tao_bench_autoscale -i '{"memory_file": "/dev/shm/tao_bench_mem", "auto_warmup": 1}' +``` + +The generated client instructions will automatically include the `control_port` +parameter so clients can poll the server's warmup status. + +### Parameters + + - `auto_warmup`: Set to 1 to enable auto-warmup detection. Default is 0 (disabled) + for `tao_bench_autoscale`, 1 (enabled) for `tao_bench_autoscale_v2_beta`. + - `target_hit_ratio`: Target hit ratio for warmup completion detection. Default + is 0.9. Warmup is considered complete when hit ratio reaches 95% of this value. + ## Advanced job: `tao_bench_custom` **NOTE**: We recommend using `tao_bench_autoscale` to start the server as this job diff --git a/packages/tao_bench/args_utils.py b/packages/tao_bench/args_utils.py index 67c4e787..891e907e 100644 --- a/packages/tao_bench/args_utils.py +++ b/packages/tao_bench/args_utils.py @@ -166,6 +166,16 @@ def add_common_server_args(server_parser: ArgumentParser) -> List[Tuple[str, str help="poll interval in seconds for process completion detection; " + "if > 0, use polling mechanism instead of fixed timeout", ) + server_parser.add_argument( + "--memory-file", + type=str, + nargs="?", + const="", + default="", + help="path to memory file for memcached -e flag. " + + "Enables persistent memory backing via mmap. On graceful shutdown (SIGUSR1), " + + "memcached saves state to this file for fast warm restart on subsequent runs.", + ) server_parser.add_argument("--real", action="store_true", help="for real") return get_opt_strings(server_parser) diff --git a/packages/tao_bench/run.py b/packages/tao_bench/run.py index 5cd39f39..06fd8394 100755 --- a/packages/tao_bench/run.py +++ b/packages/tao_bench/run.py @@ -9,6 +9,7 @@ import pathlib import re import shlex +import signal import subprocess import sys import threading @@ -62,6 +63,7 @@ def run_cmd( cmd: List[str], timeout=None, for_real=True, + graceful_signal=None, ) -> str: print(" ".join(cmd)) if for_real: @@ -72,18 +74,40 @@ def run_cmd( try: proc.wait(timeout=timeout) except subprocess.TimeoutExpired: - print(f"Process timeout expired, terminating process {proc.pid}...") - proc.terminate() - try: - # Give the process 5 seconds to terminate gracefully - proc.wait(timeout=5) - print(f"Process {proc.pid} terminated gracefully") - except subprocess.TimeoutExpired: - # If it still doesn't terminate, force kill it - print(f"Process {proc.pid} didn't respond to SIGTERM, force killing...") - proc.kill() - proc.wait() - print(f"Process {proc.pid} killed successfully") + if graceful_signal is not None: + print( + f"Process timeout expired, sending graceful signal " + f"{graceful_signal} to process {proc.pid}..." + ) + os.kill(proc.pid, graceful_signal) + try: + proc.wait(timeout=15) + print(f"Process {proc.pid} exited after graceful signal") + return + except subprocess.TimeoutExpired: + print( + f"Process {proc.pid} didn't exit after graceful signal, " + f"force killing..." + ) + proc.kill() + proc.wait() + print(f"Process {proc.pid} killed successfully") + else: + print(f"Process timeout expired, terminating process {proc.pid}...") + proc.terminate() + try: + # Give the process 5 seconds to terminate gracefully + proc.wait(timeout=5) + print(f"Process {proc.pid} terminated gracefully") + except subprocess.TimeoutExpired: + # If it still doesn't terminate, force kill it + print( + f"Process {proc.pid} didn't respond to SIGTERM, " + f"force killing..." + ) + proc.kill() + proc.wait() + print(f"Process {proc.pid} killed successfully") def profile_server(): @@ -221,6 +245,9 @@ def run_server(args): "-o", ",".join(extended_options), ] + if args.memory_file: + server_cmd += ["-e", args.memory_file] + if "DCPERF_PERF_RECORD" in os.environ and os.environ["DCPERF_PERF_RECORD"] == "1": profiler_wait_time = ( args.warmup_time + args.timeout_buffer + SERVER_PROFILING_DELAY @@ -236,7 +263,8 @@ def run_server(args): os.environ["LD_LIBRARY_PATH"] = os.path.join(TAO_BENCH_DIR, "build-deps/lib") timeout = args.warmup_time + args.test_time + args.timeout_buffer - run_cmd(server_cmd, timeout, args.real) + graceful_sig = signal.SIGUSR1 if args.memory_file else None + run_cmd(server_cmd, timeout, args.real, graceful_signal=graceful_sig) if "DCPERF_PERF_RECORD" in os.environ and os.environ["DCPERF_PERF_RECORD"] == "1": t_prof.cancel() diff --git a/packages/tao_bench/run_autoscale.py b/packages/tao_bench/run_autoscale.py index 03e8df7f..6ebfb695 100755 --- a/packages/tao_bench/run_autoscale.py +++ b/packages/tao_bench/run_autoscale.py @@ -10,6 +10,7 @@ import pathlib import re import shlex +import signal import socket import subprocess import sys @@ -73,13 +74,17 @@ def is_in_range(node_cpu_ranges, input_cpu_range): SERVER_CMD_OPTIONS = [] # To be initialized in init_parser() -def compose_server_cmd(args, cpu_core_range, memsize, port_number): +def compose_server_cmd(args, cpu_core_range, memsize, port_number, instance_index=0): server_args = { optstr: getattr(args, argkey) for optstr, argkey in SERVER_CMD_OPTIONS } server_args["--memsize"] = memsize server_args["--port-number"] = port_number + # Per-instance memory file: append instance index suffix + if args.memory_file: + server_args["--memory-file"] = f"{args.memory_file}.{instance_index}" + # Add custom thread parameters if specified if hasattr(args, "num_fast_threads") and args.num_fast_threads > 0: server_args["--num-fast-threads"] = args.num_fast_threads @@ -97,7 +102,7 @@ def compose_server_cmd(args, cpu_core_range, memsize, port_number): if isinstance(argval, bool): if argval: cmd.append(argname) - elif argval is not None and argval != 0: + elif argval is not None and argval != 0 and argval != "": cmd += [argname, str(argval)] if len(NUMA_NODES) > 1 and (args.bind_cpu > 0 or args.bind_mem > 0): @@ -282,10 +287,73 @@ def distribute_cores(n_parts): return core_ranges +def ensure_shm_capacity(required_gb): + """Expand /dev/shm if it is smaller than required_gb. + + Memory files are stored in /dev/shm (tmpfs). The default tmpfs size is + typically 50% of system RAM, which may be too small when multiple server + instances each need large memory files. This function remounts /dev/shm + with a larger size if needed. + """ + shm_path = "/dev/shm" + try: + stat = os.statvfs(shm_path) + shm_total_gb = (stat.f_blocks * stat.f_frsize) / (1024**3) + if shm_total_gb >= required_gb: + return + target_gb = int(required_gb * 1.1) # 10% headroom + print( + f"/dev/shm is {shm_total_gb:.0f}GB, need {required_gb:.0f}GB. " + f"Expanding to {target_gb}GB..." + ) + ret = subprocess.run( + ["mount", "-o", f"remount,size={target_gb}G", shm_path], + capture_output=True, + ) + if ret.returncode != 0: + print(f"WARNING: Failed to expand /dev/shm: {ret.stderr.decode().strip()}") + else: + print(f"/dev/shm expanded to {target_gb}GB") + except OSError as e: + print(f"WARNING: Could not check /dev/shm capacity: {e}") + + +def graceful_kill_pg(pid, use_sigusr1=False, grace_period=60): + """Kill a process group, optionally sending SIGUSR1 first for graceful shutdown.""" + try: + pgid = os.getpgid(pid) + except (ProcessLookupError, PermissionError): + return + if use_sigusr1: + try: + os.killpg(pgid, signal.SIGUSR1) + # Wait for graceful shutdown + deadline = time.time() + grace_period + while time.time() < deadline: + try: + os.kill(pid, 0) # Check if still alive + time.sleep(0.5) + except ProcessLookupError: + return # Process exited + # Still alive after grace period, force kill + print(f"Process group {pgid} didn't exit after SIGUSR1, force killing...") + except (ProcessLookupError, PermissionError): + return + try: + os.killpg(pgid, 9) + except (ProcessLookupError, PermissionError): + pass + + def run_server(args): # Create DiagnosisRecorder instance (automatically manages env var for subprocesses) recorder = DiagnosisRecorder.get_instance(root_dir=str(BENCHPRESS_ROOT)) + # If memory file is on /dev/shm, ensure tmpfs is large enough + if args.memory_file and args.memory_file.startswith("/dev/shm"): + required_gb = float(args.memsize) * args_utils.MEM_USAGE_FACTOR + ensure_shm_capacity(required_gb) + core_ranges = distribute_cores(args.num_servers) # memory size - split evenly for each server n_mem = float(args.memsize) @@ -298,7 +366,11 @@ def run_server(args): servers.append( [ compose_server_cmd( - args, core_ranges[i], mem_per_inst, args.port_number_start + i + args, + core_ranges[i], + mem_per_inst, + args.port_number_start + i, + instance_index=i, ), open(logpath, "w"), logpath, @@ -402,10 +474,7 @@ def run_server(args): ) for p in procs: if p.poll() is None: # Process still running - try: - os.killpg(os.getpgid(p.pid), 9) - except (ProcessLookupError, PermissionError): - pass + graceful_kill_pg(p.pid, use_sigusr1=bool(args.memory_file)) # Ensure all processes are collected and output is flushed for p in procs: @@ -424,10 +493,7 @@ def run_server(args): # Final cleanup to ensure process groups are terminated for p in procs: - try: - os.killpg(os.getpgid(p.pid), 9) - except (ProcessLookupError, PermissionError): - pass # Process already terminated or we don't have permission + graceful_kill_pg(p.pid, use_sigusr1=bool(args.memory_file)) else: # Original behavior with fixed timeout timeout = ( @@ -441,18 +507,11 @@ def run_server(args): try: (out, err) = p.communicate(timeout=timeout) except subprocess.TimeoutExpired: - # Kill the entire process group - try: - os.killpg(os.getpgid(p.pid), 9) - except ProcessLookupError: - pass # Process already terminated + graceful_kill_pg(p.pid, use_sigusr1=bool(args.memory_file)) (out, err) = p.communicate() finally: # Ensure cleanup even if process completed successfully - try: - os.killpg(os.getpgid(p.pid), 9) - except (ProcessLookupError, PermissionError): - pass # Process already terminated or we don't have permission + graceful_kill_pg(p.pid, use_sigusr1=bool(args.memory_file)) for server in servers: server[1].close() diff --git a/packages/tao_bench/run_standalone.py b/packages/tao_bench/run_standalone.py index 756f3bc8..b7e987a6 100755 --- a/packages/tao_bench/run_standalone.py +++ b/packages/tao_bench/run_standalone.py @@ -186,6 +186,10 @@ def launch_server(port_number_start=11211, bind_cpu=1, bind_mem=1): if hasattr(args, "poll_interval") and args.poll_interval > 0: script_args["--poll-interval"] = args.poll_interval + # Pass through memory file if specified + if hasattr(args, "memory_file") and args.memory_file: + script_args["--memory-file"] = args.memory_file + cmd = [f"{TAO_BENCH_DIR}/run_autoscale.py --real"] for argname, argval in script_args.items(): From a8cb3df004547934cb9341dbc097e0374bb3d7b5 Mon Sep 17 00:00:00 2001 From: Wei Su Date: Wed, 25 Mar 2026 13:53:31 -0700 Subject: [PATCH 2/2] Auto-stop warmup when server is already warmed up (#541) Summary: Pull Request resolved: https://github.com/facebookresearch/DCPerf/pull/541 Add auto-warmup detection that monitors server stats during warmup and signals clients to stop early when the server is warmed up. The server also terminates early once warmup is detected, instead of waiting for the full warmup_time. The server monitors hit_rate and QPS stability via background log tailing threads. When all server instances reach hit_rate >= 95% of target (default 0.855) and QPS stabilizes (CV < 5% over 2 minutes), a TCP control server responds READY to client polls, causing clients to terminate warmup early and proceed to the test phase. Changes: - New warmup_monitor.py: WarmupMonitor, WarmupControlServer, LogTailer classes - Add --auto-warmup and --target-hit-ratio server args - Add --control-port client arg - Server-side: start warmup monitors and control server in run_autoscale.py - Server-side: dynamic wait replaces fixed warmup_time sleep when auto-warmup enabled - Client-side: poll control port during warmup, stop early on READY - IPv6 dual-stack support for control port (AF_INET6 + IPV6_V6ONLY=0) - Opt-in for tao_bench_autoscale (auto_warmup=0) - Default on for tao_bench_autoscale_v2_beta (auto_warmup=1) - Update README with auto-warmup documentation Reviewed By: gandhijayneel Differential Revision: D97244665 --- benchpress/config/jobs.yml | 8 ++ packages/tao_bench/args_utils.py | 24 ++++ packages/tao_bench/run.py | 48 ++++++- packages/tao_bench/run_autoscale.py | 100 ++++++++++++++- packages/tao_bench/run_standalone.py | 6 + packages/tao_bench/warmup_monitor.py | 182 +++++++++++++++++++++++++++ 6 files changed, 359 insertions(+), 9 deletions(-) create mode 100644 packages/tao_bench/warmup_monitor.py diff --git a/benchpress/config/jobs.yml b/benchpress/config/jobs.yml index fda31a4b..ed6f884b 100644 --- a/benchpress/config/jobs.yml +++ b/benchpress/config/jobs.yml @@ -1075,6 +1075,7 @@ - '--wait-after-warmup={wait_after_warmup}' - '--disable-tls={disable_tls}' - '--client-id={client_id}' + - '--control-port={control_port}' - '--real' vars: - 'server_hostname' @@ -1089,6 +1090,7 @@ - 'sanity=0' - 'disable_tls=0' - 'client_id=0' + - 'control_port=0' hooks: - hook: copymove options: @@ -1126,6 +1128,7 @@ - '--disable-tls={disable_tls}' - '--smart-nanosleep={smart_nanosleep}' - '--memory-file={memory_file}' + - '--auto-warmup={auto_warmup}' - '--real' vars: - 'num_servers=0' @@ -1148,6 +1151,7 @@ - 'disable_tls=0' - 'smart_nanosleep=0' - 'memory_file=' + - 'auto_warmup=0' hooks: - hook: tao_instruction - hook: copymove @@ -1184,6 +1188,7 @@ - '--num-fast-threads={num_fast_threads}' - '--num-client-threads={num_client_threads}' - '--memory-file={memory_file}' + - '--auto-warmup={auto_warmup}' vars: - 'num_servers=0' - 'memsize=0' @@ -1202,6 +1207,7 @@ - 'num_fast_threads=0' - 'num_client_threads=0' - 'memory_file=' + - 'auto_warmup=0' hooks: - hook: copymove options: @@ -1299,6 +1305,7 @@ - '--stats-interval={stats_interval}' - '--disable-tls={disable_tls}' - '--memory-file={memory_file}' + - '--auto-warmup={auto_warmup}' - '--real' vars: - 'num_servers=0' @@ -1318,6 +1325,7 @@ - 'stats_interval=5000' - 'disable_tls=0' - 'memory_file=' + - 'auto_warmup=1' hooks: - hook: tao_instruction - hook: copymove diff --git a/packages/tao_bench/args_utils.py b/packages/tao_bench/args_utils.py index 891e907e..1a71f780 100644 --- a/packages/tao_bench/args_utils.py +++ b/packages/tao_bench/args_utils.py @@ -176,6 +176,22 @@ def add_common_server_args(server_parser: ArgumentParser) -> List[Tuple[str, str + "Enables persistent memory backing via mmap. On graceful shutdown (SIGUSR1), " + "memcached saves state to this file for fast warm restart on subsequent runs.", ) + server_parser.add_argument( + "--auto-warmup", + type=int, + default=0, + help="enable auto-warmup detection. When enabled, monitors server stats " + + "to detect when warmup is complete (hit ratio and QPS stability) and " + + "signals clients via TCP control port to stop warmup early. " + + "warmup_time becomes the maximum warmup duration.", + ) + server_parser.add_argument( + "--target-hit-ratio", + type=float, + default=0.9, + help="target hit ratio for auto-warmup detection. Warmup is considered " + + "complete when hit ratio reaches 95%% of this value.", + ) server_parser.add_argument("--real", action="store_true", help="for real") return get_opt_strings(server_parser) @@ -260,6 +276,14 @@ def add_common_client_args(client_parser: ArgumentParser) -> List[Tuple[str, str default=30, help="extra time buffer for test phase timeout in seconds", ) + client_parser.add_argument( + "--control-port", + type=int, + default=0, + help="TCP port to poll on server for warmup completion signal. " + + "When > 0, client polls this port during warmup and stops early " + + "when server reports warmed up. 0 = disabled (use full warmup time).", + ) client_parser.add_argument("--real", action="store_true", help="for real") return get_opt_strings(client_parser) diff --git a/packages/tao_bench/run.py b/packages/tao_bench/run.py index 06fd8394..3b1b9102 100755 --- a/packages/tao_bench/run.py +++ b/packages/tao_bench/run.py @@ -17,6 +17,7 @@ from typing import List import args_utils +from warmup_monitor import poll_control_port sys.path.insert(0, str(pathlib.Path(__file__).parents[1] / "common")) @@ -81,7 +82,7 @@ def run_cmd( ) os.kill(proc.pid, graceful_signal) try: - proc.wait(timeout=15) + proc.wait(timeout=60) print(f"Process {proc.pid} exited after graceful signal") return except subprocess.TimeoutExpired: @@ -350,10 +351,47 @@ def run_client(args): breakdown_utils.log_preprocessing_warmup_start(TAO_BENCH_DIR, "") print("warm up phase ...") - cmd = get_client_cmd(args, n_seconds=args.warmup_time) - run_cmd( - cmd, timeout=args.warmup_time + args.warmup_timeout_buffer, for_real=args.real - ) + if args.control_port > 0 and args.warmup_time > 0: + # Auto-warmup: poll control port and terminate warmup early when server + # reports READY + cmd = get_client_cmd(args, n_seconds=args.warmup_time) + print(" ".join(cmd)) + if args.real: + warmup_proc = subprocess.Popen(cmd, stderr=subprocess.STDOUT) + warmup_deadline = ( + time.time() + args.warmup_time + args.warmup_timeout_buffer + ) + poll_interval = 5 + while time.time() < warmup_deadline: + if warmup_proc.poll() is not None: + # Warmup process exited on its own + break + if poll_control_port(args.server_hostname, args.control_port): + print("[AutoWarmup] Server reports READY, stopping warmup early") + warmup_proc.terminate() + try: + warmup_proc.wait(timeout=5) + except subprocess.TimeoutExpired: + warmup_proc.kill() + warmup_proc.wait() + break + time.sleep(poll_interval) + else: + # Timeout reached + print("[AutoWarmup] Max warmup time reached, proceeding to test") + warmup_proc.terminate() + try: + warmup_proc.wait(timeout=5) + except subprocess.TimeoutExpired: + warmup_proc.kill() + warmup_proc.wait() + else: + cmd = get_client_cmd(args, n_seconds=args.warmup_time) + run_cmd( + cmd, + timeout=args.warmup_time + args.warmup_timeout_buffer, + for_real=args.real, + ) if args.real and args.wait_after_warmup > 0: time.sleep(args.wait_after_warmup) diff --git a/packages/tao_bench/run_autoscale.py b/packages/tao_bench/run_autoscale.py index 6ebfb695..4431e4b6 100755 --- a/packages/tao_bench/run_autoscale.py +++ b/packages/tao_bench/run_autoscale.py @@ -19,6 +19,7 @@ from parser import TaoBenchParser import args_utils +from warmup_monitor import LogTailer, WarmupControlServer, WarmupMonitor # Add parent directory to path to import diagnosis_utils sys.path.insert(0, str(pathlib.Path(__file__).parents[1] / "common")) @@ -182,6 +183,8 @@ def gen_client_instructions(args, to_file=True): client_args["disable_tls"] = 1 if hasattr(args, "num_client_threads") and args.num_client_threads > 0: client_args["num_threads"] = args.num_client_threads + if hasattr(args, "auto_warmup") and args.auto_warmup > 0: + client_args["control_port"] = args.port_number_start + 1000 clients[c] += ( " ".join( [ @@ -218,6 +221,8 @@ def gen_client_instructions(args, to_file=True): client_args["disable_tls"] = 1 if hasattr(args, "num_client_threads") and args.num_client_threads > 0: client_args["num_threads"] = args.num_client_threads + if hasattr(args, "auto_warmup") and args.auto_warmup > 0: + client_args["control_port"] = args.port_number_start + 1000 clients[i] += ( " ".join( [ @@ -393,6 +398,18 @@ def run_server(args): if match: latency = match.group(1) + # Set up warmup monitoring if auto-warmup is enabled + warmup_monitors = [] + log_tailers = [] + control_server = None + if args.auto_warmup > 0: + control_port = args.port_number_start + 1000 + for i in range(args.num_servers): + monitor = WarmupMonitor(target_hit_ratio=args.target_hit_ratio) + warmup_monitors.append(monitor) + control_server = WarmupControlServer(control_port, warmup_monitors) + control_server.start() + # let's spawn servers procs = [] for server in servers: @@ -405,14 +422,83 @@ def run_server(args): start_new_session=True, # Create new process group ) procs.append(p) + + # Start log tailers for warmup monitoring + if args.auto_warmup > 0: + for i in range(args.num_servers): + tailer = LogTailer(servers[i][2], warmup_monitors[i], instance_id=i) + tailer.start() + log_tailers.append(tailer) + # wait for servers to finish - add extra time to make sure # post-processing will finish - if args.poll_interval > 0: + max_warmup_time = args_utils.get_warmup_time(args) + + # When auto-warmup is enabled, wait for warmup completion instead of + # fixed warmup_time. This allows the server to finish earlier on warm + # restarts where the cache is already pre-loaded. + if args.auto_warmup > 0 and warmup_monitors: + warmup_start = time.time() + print( + f"Auto-warmup enabled: waiting up to {max_warmup_time}s for " + f"warmup completion..." + ) + while time.time() - warmup_start < max_warmup_time: + if all(m.is_warmed_up for m in warmup_monitors): + actual_warmup = time.time() - warmup_start + print( + f"Auto-warmup: all instances warmed up after " + f"{actual_warmup:.0f}s (max was {max_warmup_time}s)" + ) + break + time.sleep(5) + else: + print( + f"Auto-warmup: max warmup time {max_warmup_time}s reached, " + f"proceeding to test phase" + ) + remaining = args.test_time + print(f"Waiting {remaining}s for test phase...") + time.sleep(remaining) + + # Send SIGUSR1 to all process groups in parallel, then wait + if args.memory_file: + for p in procs: + try: + os.killpg(os.getpgid(p.pid), signal.SIGUSR1) + except (ProcessLookupError, PermissionError): + pass + # Wait up to 60s for all to exit + deadline = time.time() + 60 + while time.time() < deadline: + if all(p.poll() is not None for p in procs): + break + time.sleep(1) + + # Force kill any remaining + for p in procs: + if p.poll() is None: + try: + os.killpg(os.getpgid(p.pid), 9) + except (ProcessLookupError, PermissionError): + pass + + # Collect processes and flush output + for p in procs: + try: + p.communicate(timeout=1) + except subprocess.TimeoutExpired: + pass + for server in servers: + try: + server[1].flush() + os.fsync(server[1].fileno()) + except (OSError, ValueError): + pass + elif args.poll_interval > 0: # Use intelligent process polling instead of fixed timeout # First wait for base timeout (warmup + test + timeout_buffer) - base_timeout = ( - args_utils.get_warmup_time(args) + args.test_time + args.timeout_buffer - ) + base_timeout = max_warmup_time + args.test_time + args.timeout_buffer print(f"Waiting {base_timeout}s for processes to complete normally...") time.sleep(base_timeout) @@ -515,6 +601,12 @@ def run_server(args): for server in servers: server[1].close() + # Clean up warmup monitoring + for tailer in log_tailers: + tailer.stop() + if control_server: + control_server.stop() + # Initialize diagnosis recorder for detailed logging recorder = DiagnosisRecorder.get_instance(root_dir=str(BENCHPRESS_ROOT)) diff --git a/packages/tao_bench/run_standalone.py b/packages/tao_bench/run_standalone.py index b7e987a6..e250fb57 100755 --- a/packages/tao_bench/run_standalone.py +++ b/packages/tao_bench/run_standalone.py @@ -190,6 +190,12 @@ def launch_server(port_number_start=11211, bind_cpu=1, bind_mem=1): if hasattr(args, "memory_file") and args.memory_file: script_args["--memory-file"] = args.memory_file + # Pass through auto-warmup if specified + if hasattr(args, "auto_warmup") and args.auto_warmup > 0: + script_args["--auto-warmup"] = args.auto_warmup + if hasattr(args, "target_hit_ratio") and args.target_hit_ratio != 0.9: + script_args["--target-hit-ratio"] = args.target_hit_ratio + cmd = [f"{TAO_BENCH_DIR}/run_autoscale.py --real"] for argname, argval in script_args.items(): diff --git a/packages/tao_bench/warmup_monitor.py b/packages/tao_bench/warmup_monitor.py new file mode 100644 index 00000000..bc6e48c8 --- /dev/null +++ b/packages/tao_bench/warmup_monitor.py @@ -0,0 +1,182 @@ +#!/usr/bin/env python3 +# Copyright (c) Meta Platforms, Inc. and affiliates. +# +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. + +import os +import socket +import threading +import time +from parser import TaoBenchServerSnapshot + + +class WarmupMonitor: + """Monitors server stats output to detect when warmup is complete. + + Warmup is considered complete when: + 1. Hit ratio >= hit_ratio_factor * target_hit_ratio + 2. QPS is stable (coefficient of variation < stability_threshold) + over a rolling window of stability_window data points + """ + + def __init__( + self, + target_hit_ratio=0.9, + stability_window=24, + stability_threshold=0.05, + hit_ratio_factor=0.95, + ): + self.target_hit_ratio = target_hit_ratio + self.hit_threshold = hit_ratio_factor * target_hit_ratio + self.stability_window = stability_window # 24 * 5s = 2 minutes + self.stability_threshold = stability_threshold + self.qps_history = [] + self.is_warmed_up = False + self._lock = threading.Lock() + + def process_line(self, line): + """Process a server stats line. Returns True if warmup just completed.""" + snapshot = TaoBenchServerSnapshot(line) + if not snapshot.valid: + return False + + hit_rate = snapshot.get("hit_rate") + total_qps = snapshot.get("fast_qps") + snapshot.get("slow_qps") + + with self._lock: + if self.is_warmed_up: + return False + + self.qps_history.append(total_qps) + if len(self.qps_history) > self.stability_window: + self.qps_history.pop(0) + + if hit_rate >= self.hit_threshold and self._is_qps_stable(): + self.is_warmed_up = True + print( + f"[WarmupMonitor] Warmup complete: hit_rate={hit_rate:.4f} " + f"(threshold={self.hit_threshold:.4f}), " + f"QPS stable over {len(self.qps_history)} data points" + ) + return True + return False + + def _is_qps_stable(self): + if len(self.qps_history) < self.stability_window: + return False + mean_qps = sum(self.qps_history) / len(self.qps_history) + if mean_qps == 0: + return False + variance = sum((x - mean_qps) ** 2 for x in self.qps_history) / len( + self.qps_history + ) + cv = (variance**0.5) / mean_qps + return cv < self.stability_threshold + + +class WarmupControlServer: + """TCP server that responds to client warmup status polls. + + Clients connect and receive either "READY\\n" or "WAITING\\n". + """ + + def __init__(self, port, monitors): + self.port = port + self.monitors = monitors + self.server_socket = None + self._thread = None + self.running = False + + def start(self): + self.server_socket = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) + self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.server_socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0) + self.server_socket.settimeout(1.0) + self.server_socket.bind(("::", self.port)) + self.server_socket.listen(32) + self.running = True + self._thread = threading.Thread(target=self._handle_connections, daemon=True) + self._thread.start() + print(f"[WarmupControlServer] Listening on port {self.port}") + + def _handle_connections(self): + while self.running: + try: + conn, addr = self.server_socket.accept() + try: + if all(m.is_warmed_up for m in self.monitors): + conn.sendall(b"READY\n") + else: + conn.sendall(b"WAITING\n") + finally: + conn.close() + except socket.timeout: + continue + except OSError: + break + + def stop(self): + self.running = False + if self.server_socket: + try: + self.server_socket.close() + except OSError: + pass + if self._thread: + self._thread.join(timeout=5) + print("[WarmupControlServer] Stopped") + + +class LogTailer: + """Background thread that tails a log file and feeds lines to a WarmupMonitor.""" + + def __init__(self, log_path, monitor, instance_id=0): + self.log_path = log_path + self.monitor = monitor + self.instance_id = instance_id + self._thread = None + self.running = False + + def start(self): + self.running = True + self._thread = threading.Thread(target=self._tail, daemon=True) + self._thread.start() + + def _tail(self): + # Wait for file to appear + while self.running and not os.path.exists(self.log_path): + time.sleep(0.5) + if not self.running: + return + + with open(self.log_path, "r") as f: + while self.running: + line = f.readline() + if line: + if self.monitor.process_line(line): + print( + f"[LogTailer] Server instance {self.instance_id} " + f"warmup complete" + ) + else: + # No new data yet + if self.monitor.is_warmed_up: + # Stop tailing once warmed up + return + time.sleep(0.5) + + def stop(self): + self.running = False + if self._thread: + self._thread.join(timeout=5) + + +def poll_control_port(hostname, port, timeout=5): + """Poll the warmup control server. Returns True if server reports READY.""" + try: + with socket.create_connection((hostname, port), timeout=timeout) as s: + data = s.recv(64).decode().strip() + return data == "READY" + except OSError: + return False