From 4dc1b945dcefaff084cff2e7fae259844c85c3f3 Mon Sep 17 00:00:00 2001 From: Wei Su Date: Wed, 25 Mar 2026 13:50:45 -0700 Subject: [PATCH] Add memory file option (-e) for fast warm restart (#540) Summary: Pull Request resolved: https://github.com/facebookresearch/DCPerf/pull/540 Expose memcached's native `-e` memory file option in TaoBench. When specified, memcached mmaps the given file for slab storage. On graceful shutdown (SIGUSR1), state is saved to the file. On subsequent runs with the same file, cache data is pre-loaded, drastically reducing warmup time. Changes: - Add `--memory-file` argument to server args - Append `-e ` to memcached command when memory file specified - Use SIGUSR1 for graceful shutdown (60s grace period) when memory file is in use - Per-instance memory files in autoscale mode (suffix `.0`, `.1`, etc.) - Auto-expand `/dev/shm` when tmpfs is smaller than memsize - Add `memory_file` variable to all TaoBench job configurations - Update README with memory file documentation Reviewed By: gandhijayneel Differential Revision: D97244738 --- benchpress/config/jobs.yml | 10 +++ packages/tao_bench/README.md | 90 +++++++++++++++++++++++++ packages/tao_bench/args_utils.py | 10 +++ packages/tao_bench/run.py | 54 +++++++++++---- packages/tao_bench/run_autoscale.py | 99 ++++++++++++++++++++++------ packages/tao_bench/run_standalone.py | 4 ++ 6 files changed, 234 insertions(+), 33 deletions(-) diff --git a/benchpress/config/jobs.yml b/benchpress/config/jobs.yml index fe163b37..fda31a4b 100644 --- a/benchpress/config/jobs.yml +++ b/benchpress/config/jobs.yml @@ -1045,6 +1045,7 @@ - '--timeout-buffer={timeout_buffer}' - '--disable-tls={disable_tls}' - '--smart-nanosleep={smart_nanosleep}' + - '--memory-file={memory_file}' - '--real' vars: - 'interface_name=eth0' @@ -1056,6 +1057,7 @@ - 'timeout_buffer=120' - 'disable_tls=0' - 'smart_nanosleep=0' + - 'memory_file=' client: args: - 'client' @@ -1123,6 +1125,7 @@ - '--client-wait-after-warmup={client_wait_after_warmup}' - '--disable-tls={disable_tls}' - '--smart-nanosleep={smart_nanosleep}' + - '--memory-file={memory_file}' - '--real' vars: - 'num_servers=0' @@ -1144,6 +1147,7 @@ - 'client_wait_after_warmup=5' - 'disable_tls=0' - 'smart_nanosleep=0' + - 'memory_file=' hooks: - hook: tao_instruction - hook: copymove @@ -1179,6 +1183,7 @@ - '--num-slow-threads={num_slow_threads}' - '--num-fast-threads={num_fast_threads}' - '--num-client-threads={num_client_threads}' + - '--memory-file={memory_file}' vars: - 'num_servers=0' - 'memsize=0' @@ -1196,6 +1201,7 @@ - 'num_slow_threads=0' - 'num_fast_threads=0' - 'num_client_threads=0' + - 'memory_file=' hooks: - hook: copymove options: @@ -1233,6 +1239,7 @@ - '--test-timeout-buffer={test_timeout_buffer}' - '--postprocessing-timeout-buffer={postprocessing_timeout_buffer}' - '--poll-interval={poll_interval}' + - '--memory-file={memory_file}' vars: - 'num_servers=0' - 'memsize=0.5' @@ -1254,6 +1261,7 @@ - 'test_timeout_buffer=0' - 'postprocessing_timeout_buffer=60' - 'poll_interval=0.2' + - 'memory_file=' hooks: - hook: copymove options: @@ -1290,6 +1298,7 @@ - '--conns-per-server-core={conns_per_server_core}' - '--stats-interval={stats_interval}' - '--disable-tls={disable_tls}' + - '--memory-file={memory_file}' - '--real' vars: - 'num_servers=0' @@ -1308,6 +1317,7 @@ - 'conns_per_server_core=85' - 'stats_interval=5000' - 'disable_tls=0' + - 'memory_file=' hooks: - hook: tao_instruction - hook: copymove diff --git a/packages/tao_bench/README.md b/packages/tao_bench/README.md index e2922793..8eba9e5a 100644 --- a/packages/tao_bench/README.md +++ b/packages/tao_bench/README.md @@ -313,6 +313,96 @@ the guide for the `tao_bench_autoscale` job. Just replace the job name. A good run of this workload should have about 75~80% of CPU utilization in the steady state, of which 25~30% should be in userspace. +## Memory file for fast warm restart + +TaoBench supports a memory file option that allows the server to persist its +cache state across runs. On the first run, TaoBench fills the cache from scratch +(cold start). On subsequent runs with the same memory file, the server reloads +the pre-filled cache instantly, skipping the lengthy warmup phase. + +This uses memcached's native `-e` flag, which mmaps a file for slab storage. +On graceful shutdown (SIGUSR1), memcached saves a `.meta` restart file. On the +next startup with the same memory file, memcached restores all cached items +from the file. + +### Usage + +Specify the `memory_file` parameter pointing to a path on a tmpfs filesystem +(e.g. `/dev/shm`): + +```bash +./benchpress_cli.py run tao_bench_autoscale -i '{"memory_file": "/dev/shm/tao_bench_mem"}' +``` + +On the first run, TaoBench will create per-instance memory files at +`/dev/shm/tao_bench_mem.0`, `/dev/shm/tao_bench_mem.1`, etc. When the run +completes, the server is shut down with SIGUSR1 which saves the restart +metadata (a small `.meta` file alongside each memory file). + +On subsequent runs with the same `memory_file` path, the server loads all +cached items from the memory files, drastically reducing warmup time. + +### /dev/shm sizing + +The memory files are stored in `/dev/shm` (tmpfs), which defaults to 50% of +system RAM. When using large `memsize` values, `/dev/shm` may be too small. +TaoBench automatically expands `/dev/shm` when needed (requires root), but +you can also do it manually: + +```bash +mount -o remount,size=800G /dev/shm +``` + +### Parameters + + - `memory_file`: Path prefix for memory files. Each server instance gets a + suffixed file (e.g. `/dev/shm/tao_bench_mem.0`). Default is empty (disabled). + +## Auto-warmup detection + +When enabled, TaoBench monitors server statistics during the warmup phase and +automatically signals clients to stop warmup early once the server reaches a +warmed-up state. The `warmup_time` parameter becomes the maximum warmup +duration rather than a fixed duration. + +### How it works + +The server opens a TCP control port (`port_number_start + 1000`, default 12211) +and monitors the server log output in real time. Warmup is considered complete +when: + +1. Hit ratio reaches >= 95% of `target_hit_ratio` (default 0.9, threshold 0.855) +2. QPS stabilizes (coefficient of variation < 5%) over a 2-minute rolling window + +When both conditions are met for all server instances, the control port responds +`READY` to client polls. Clients poll the control port every 5 seconds during +warmup, and terminate the warmup phase early when the server reports `READY`. + +### Usage + +Set `auto_warmup` to 1: + +```bash +./benchpress_cli.py run tao_bench_autoscale -i '{"auto_warmup": 1}' +``` + +This is most useful combined with the memory file option: + +```bash +# Second run with pre-warmed cache + auto-warmup detection +./benchpress_cli.py run tao_bench_autoscale -i '{"memory_file": "/dev/shm/tao_bench_mem", "auto_warmup": 1}' +``` + +The generated client instructions will automatically include the `control_port` +parameter so clients can poll the server's warmup status. + +### Parameters + + - `auto_warmup`: Set to 1 to enable auto-warmup detection. Default is 0 (disabled) + for `tao_bench_autoscale`, 1 (enabled) for `tao_bench_autoscale_v2_beta`. + - `target_hit_ratio`: Target hit ratio for warmup completion detection. Default + is 0.9. Warmup is considered complete when hit ratio reaches 95% of this value. + ## Advanced job: `tao_bench_custom` **NOTE**: We recommend using `tao_bench_autoscale` to start the server as this job diff --git a/packages/tao_bench/args_utils.py b/packages/tao_bench/args_utils.py index 67c4e787..891e907e 100644 --- a/packages/tao_bench/args_utils.py +++ b/packages/tao_bench/args_utils.py @@ -166,6 +166,16 @@ def add_common_server_args(server_parser: ArgumentParser) -> List[Tuple[str, str help="poll interval in seconds for process completion detection; " + "if > 0, use polling mechanism instead of fixed timeout", ) + server_parser.add_argument( + "--memory-file", + type=str, + nargs="?", + const="", + default="", + help="path to memory file for memcached -e flag. " + + "Enables persistent memory backing via mmap. On graceful shutdown (SIGUSR1), " + + "memcached saves state to this file for fast warm restart on subsequent runs.", + ) server_parser.add_argument("--real", action="store_true", help="for real") return get_opt_strings(server_parser) diff --git a/packages/tao_bench/run.py b/packages/tao_bench/run.py index 5cd39f39..06fd8394 100755 --- a/packages/tao_bench/run.py +++ b/packages/tao_bench/run.py @@ -9,6 +9,7 @@ import pathlib import re import shlex +import signal import subprocess import sys import threading @@ -62,6 +63,7 @@ def run_cmd( cmd: List[str], timeout=None, for_real=True, + graceful_signal=None, ) -> str: print(" ".join(cmd)) if for_real: @@ -72,18 +74,40 @@ def run_cmd( try: proc.wait(timeout=timeout) except subprocess.TimeoutExpired: - print(f"Process timeout expired, terminating process {proc.pid}...") - proc.terminate() - try: - # Give the process 5 seconds to terminate gracefully - proc.wait(timeout=5) - print(f"Process {proc.pid} terminated gracefully") - except subprocess.TimeoutExpired: - # If it still doesn't terminate, force kill it - print(f"Process {proc.pid} didn't respond to SIGTERM, force killing...") - proc.kill() - proc.wait() - print(f"Process {proc.pid} killed successfully") + if graceful_signal is not None: + print( + f"Process timeout expired, sending graceful signal " + f"{graceful_signal} to process {proc.pid}..." + ) + os.kill(proc.pid, graceful_signal) + try: + proc.wait(timeout=15) + print(f"Process {proc.pid} exited after graceful signal") + return + except subprocess.TimeoutExpired: + print( + f"Process {proc.pid} didn't exit after graceful signal, " + f"force killing..." + ) + proc.kill() + proc.wait() + print(f"Process {proc.pid} killed successfully") + else: + print(f"Process timeout expired, terminating process {proc.pid}...") + proc.terminate() + try: + # Give the process 5 seconds to terminate gracefully + proc.wait(timeout=5) + print(f"Process {proc.pid} terminated gracefully") + except subprocess.TimeoutExpired: + # If it still doesn't terminate, force kill it + print( + f"Process {proc.pid} didn't respond to SIGTERM, " + f"force killing..." + ) + proc.kill() + proc.wait() + print(f"Process {proc.pid} killed successfully") def profile_server(): @@ -221,6 +245,9 @@ def run_server(args): "-o", ",".join(extended_options), ] + if args.memory_file: + server_cmd += ["-e", args.memory_file] + if "DCPERF_PERF_RECORD" in os.environ and os.environ["DCPERF_PERF_RECORD"] == "1": profiler_wait_time = ( args.warmup_time + args.timeout_buffer + SERVER_PROFILING_DELAY @@ -236,7 +263,8 @@ def run_server(args): os.environ["LD_LIBRARY_PATH"] = os.path.join(TAO_BENCH_DIR, "build-deps/lib") timeout = args.warmup_time + args.test_time + args.timeout_buffer - run_cmd(server_cmd, timeout, args.real) + graceful_sig = signal.SIGUSR1 if args.memory_file else None + run_cmd(server_cmd, timeout, args.real, graceful_signal=graceful_sig) if "DCPERF_PERF_RECORD" in os.environ and os.environ["DCPERF_PERF_RECORD"] == "1": t_prof.cancel() diff --git a/packages/tao_bench/run_autoscale.py b/packages/tao_bench/run_autoscale.py index 03e8df7f..6ebfb695 100755 --- a/packages/tao_bench/run_autoscale.py +++ b/packages/tao_bench/run_autoscale.py @@ -10,6 +10,7 @@ import pathlib import re import shlex +import signal import socket import subprocess import sys @@ -73,13 +74,17 @@ def is_in_range(node_cpu_ranges, input_cpu_range): SERVER_CMD_OPTIONS = [] # To be initialized in init_parser() -def compose_server_cmd(args, cpu_core_range, memsize, port_number): +def compose_server_cmd(args, cpu_core_range, memsize, port_number, instance_index=0): server_args = { optstr: getattr(args, argkey) for optstr, argkey in SERVER_CMD_OPTIONS } server_args["--memsize"] = memsize server_args["--port-number"] = port_number + # Per-instance memory file: append instance index suffix + if args.memory_file: + server_args["--memory-file"] = f"{args.memory_file}.{instance_index}" + # Add custom thread parameters if specified if hasattr(args, "num_fast_threads") and args.num_fast_threads > 0: server_args["--num-fast-threads"] = args.num_fast_threads @@ -97,7 +102,7 @@ def compose_server_cmd(args, cpu_core_range, memsize, port_number): if isinstance(argval, bool): if argval: cmd.append(argname) - elif argval is not None and argval != 0: + elif argval is not None and argval != 0 and argval != "": cmd += [argname, str(argval)] if len(NUMA_NODES) > 1 and (args.bind_cpu > 0 or args.bind_mem > 0): @@ -282,10 +287,73 @@ def distribute_cores(n_parts): return core_ranges +def ensure_shm_capacity(required_gb): + """Expand /dev/shm if it is smaller than required_gb. + + Memory files are stored in /dev/shm (tmpfs). The default tmpfs size is + typically 50% of system RAM, which may be too small when multiple server + instances each need large memory files. This function remounts /dev/shm + with a larger size if needed. + """ + shm_path = "/dev/shm" + try: + stat = os.statvfs(shm_path) + shm_total_gb = (stat.f_blocks * stat.f_frsize) / (1024**3) + if shm_total_gb >= required_gb: + return + target_gb = int(required_gb * 1.1) # 10% headroom + print( + f"/dev/shm is {shm_total_gb:.0f}GB, need {required_gb:.0f}GB. " + f"Expanding to {target_gb}GB..." + ) + ret = subprocess.run( + ["mount", "-o", f"remount,size={target_gb}G", shm_path], + capture_output=True, + ) + if ret.returncode != 0: + print(f"WARNING: Failed to expand /dev/shm: {ret.stderr.decode().strip()}") + else: + print(f"/dev/shm expanded to {target_gb}GB") + except OSError as e: + print(f"WARNING: Could not check /dev/shm capacity: {e}") + + +def graceful_kill_pg(pid, use_sigusr1=False, grace_period=60): + """Kill a process group, optionally sending SIGUSR1 first for graceful shutdown.""" + try: + pgid = os.getpgid(pid) + except (ProcessLookupError, PermissionError): + return + if use_sigusr1: + try: + os.killpg(pgid, signal.SIGUSR1) + # Wait for graceful shutdown + deadline = time.time() + grace_period + while time.time() < deadline: + try: + os.kill(pid, 0) # Check if still alive + time.sleep(0.5) + except ProcessLookupError: + return # Process exited + # Still alive after grace period, force kill + print(f"Process group {pgid} didn't exit after SIGUSR1, force killing...") + except (ProcessLookupError, PermissionError): + return + try: + os.killpg(pgid, 9) + except (ProcessLookupError, PermissionError): + pass + + def run_server(args): # Create DiagnosisRecorder instance (automatically manages env var for subprocesses) recorder = DiagnosisRecorder.get_instance(root_dir=str(BENCHPRESS_ROOT)) + # If memory file is on /dev/shm, ensure tmpfs is large enough + if args.memory_file and args.memory_file.startswith("/dev/shm"): + required_gb = float(args.memsize) * args_utils.MEM_USAGE_FACTOR + ensure_shm_capacity(required_gb) + core_ranges = distribute_cores(args.num_servers) # memory size - split evenly for each server n_mem = float(args.memsize) @@ -298,7 +366,11 @@ def run_server(args): servers.append( [ compose_server_cmd( - args, core_ranges[i], mem_per_inst, args.port_number_start + i + args, + core_ranges[i], + mem_per_inst, + args.port_number_start + i, + instance_index=i, ), open(logpath, "w"), logpath, @@ -402,10 +474,7 @@ def run_server(args): ) for p in procs: if p.poll() is None: # Process still running - try: - os.killpg(os.getpgid(p.pid), 9) - except (ProcessLookupError, PermissionError): - pass + graceful_kill_pg(p.pid, use_sigusr1=bool(args.memory_file)) # Ensure all processes are collected and output is flushed for p in procs: @@ -424,10 +493,7 @@ def run_server(args): # Final cleanup to ensure process groups are terminated for p in procs: - try: - os.killpg(os.getpgid(p.pid), 9) - except (ProcessLookupError, PermissionError): - pass # Process already terminated or we don't have permission + graceful_kill_pg(p.pid, use_sigusr1=bool(args.memory_file)) else: # Original behavior with fixed timeout timeout = ( @@ -441,18 +507,11 @@ def run_server(args): try: (out, err) = p.communicate(timeout=timeout) except subprocess.TimeoutExpired: - # Kill the entire process group - try: - os.killpg(os.getpgid(p.pid), 9) - except ProcessLookupError: - pass # Process already terminated + graceful_kill_pg(p.pid, use_sigusr1=bool(args.memory_file)) (out, err) = p.communicate() finally: # Ensure cleanup even if process completed successfully - try: - os.killpg(os.getpgid(p.pid), 9) - except (ProcessLookupError, PermissionError): - pass # Process already terminated or we don't have permission + graceful_kill_pg(p.pid, use_sigusr1=bool(args.memory_file)) for server in servers: server[1].close() diff --git a/packages/tao_bench/run_standalone.py b/packages/tao_bench/run_standalone.py index 756f3bc8..b7e987a6 100755 --- a/packages/tao_bench/run_standalone.py +++ b/packages/tao_bench/run_standalone.py @@ -186,6 +186,10 @@ def launch_server(port_number_start=11211, bind_cpu=1, bind_mem=1): if hasattr(args, "poll_interval") and args.poll_interval > 0: script_args["--poll-interval"] = args.poll_interval + # Pass through memory file if specified + if hasattr(args, "memory_file") and args.memory_file: + script_args["--memory-file"] = args.memory_file + cmd = [f"{TAO_BENCH_DIR}/run_autoscale.py --real"] for argname, argval in script_args.items():