diff --git a/benchpress/config/jobs.yml b/benchpress/config/jobs.yml index fe163b37..fda31a4b 100644 --- a/benchpress/config/jobs.yml +++ b/benchpress/config/jobs.yml @@ -1045,6 +1045,7 @@ - '--timeout-buffer={timeout_buffer}' - '--disable-tls={disable_tls}' - '--smart-nanosleep={smart_nanosleep}' + - '--memory-file={memory_file}' - '--real' vars: - 'interface_name=eth0' @@ -1056,6 +1057,7 @@ - 'timeout_buffer=120' - 'disable_tls=0' - 'smart_nanosleep=0' + - 'memory_file=' client: args: - 'client' @@ -1123,6 +1125,7 @@ - '--client-wait-after-warmup={client_wait_after_warmup}' - '--disable-tls={disable_tls}' - '--smart-nanosleep={smart_nanosleep}' + - '--memory-file={memory_file}' - '--real' vars: - 'num_servers=0' @@ -1144,6 +1147,7 @@ - 'client_wait_after_warmup=5' - 'disable_tls=0' - 'smart_nanosleep=0' + - 'memory_file=' hooks: - hook: tao_instruction - hook: copymove @@ -1179,6 +1183,7 @@ - '--num-slow-threads={num_slow_threads}' - '--num-fast-threads={num_fast_threads}' - '--num-client-threads={num_client_threads}' + - '--memory-file={memory_file}' vars: - 'num_servers=0' - 'memsize=0' @@ -1196,6 +1201,7 @@ - 'num_slow_threads=0' - 'num_fast_threads=0' - 'num_client_threads=0' + - 'memory_file=' hooks: - hook: copymove options: @@ -1233,6 +1239,7 @@ - '--test-timeout-buffer={test_timeout_buffer}' - '--postprocessing-timeout-buffer={postprocessing_timeout_buffer}' - '--poll-interval={poll_interval}' + - '--memory-file={memory_file}' vars: - 'num_servers=0' - 'memsize=0.5' @@ -1254,6 +1261,7 @@ - 'test_timeout_buffer=0' - 'postprocessing_timeout_buffer=60' - 'poll_interval=0.2' + - 'memory_file=' hooks: - hook: copymove options: @@ -1290,6 +1298,7 @@ - '--conns-per-server-core={conns_per_server_core}' - '--stats-interval={stats_interval}' - '--disable-tls={disable_tls}' + - '--memory-file={memory_file}' - '--real' vars: - 'num_servers=0' @@ -1308,6 +1317,7 @@ - 'conns_per_server_core=85' - 'stats_interval=5000' - 'disable_tls=0' + - 'memory_file=' hooks: - hook: tao_instruction - hook: copymove diff --git a/packages/tao_bench/README.md b/packages/tao_bench/README.md index e2922793..8eba9e5a 100644 --- a/packages/tao_bench/README.md +++ b/packages/tao_bench/README.md @@ -313,6 +313,96 @@ the guide for the `tao_bench_autoscale` job. Just replace the job name. A good run of this workload should have about 75~80% of CPU utilization in the steady state, of which 25~30% should be in userspace. +## Memory file for fast warm restart + +TaoBench supports a memory file option that allows the server to persist its +cache state across runs. On the first run, TaoBench fills the cache from scratch +(cold start). On subsequent runs with the same memory file, the server reloads +the pre-filled cache instantly, skipping the lengthy warmup phase. + +This uses memcached's native `-e` flag, which mmaps a file for slab storage. +On graceful shutdown (SIGUSR1), memcached saves a `.meta` restart file. On the +next startup with the same memory file, memcached restores all cached items +from the file. + +### Usage + +Specify the `memory_file` parameter pointing to a path on a tmpfs filesystem +(e.g. `/dev/shm`): + +```bash +./benchpress_cli.py run tao_bench_autoscale -i '{"memory_file": "/dev/shm/tao_bench_mem"}' +``` + +On the first run, TaoBench will create per-instance memory files at +`/dev/shm/tao_bench_mem.0`, `/dev/shm/tao_bench_mem.1`, etc. When the run +completes, the server is shut down with SIGUSR1 which saves the restart +metadata (a small `.meta` file alongside each memory file). + +On subsequent runs with the same `memory_file` path, the server loads all +cached items from the memory files, drastically reducing warmup time. + +### /dev/shm sizing + +The memory files are stored in `/dev/shm` (tmpfs), which defaults to 50% of +system RAM. When using large `memsize` values, `/dev/shm` may be too small. +TaoBench automatically expands `/dev/shm` when needed (requires root), but +you can also do it manually: + +```bash +mount -o remount,size=800G /dev/shm +``` + +### Parameters + + - `memory_file`: Path prefix for memory files. Each server instance gets a + suffixed file (e.g. `/dev/shm/tao_bench_mem.0`). Default is empty (disabled). + +## Auto-warmup detection + +When enabled, TaoBench monitors server statistics during the warmup phase and +automatically signals clients to stop warmup early once the server reaches a +warmed-up state. The `warmup_time` parameter becomes the maximum warmup +duration rather than a fixed duration. + +### How it works + +The server opens a TCP control port (`port_number_start + 1000`, default 12211) +and monitors the server log output in real time. Warmup is considered complete +when: + +1. Hit ratio reaches >= 95% of `target_hit_ratio` (default 0.9, threshold 0.855) +2. QPS stabilizes (coefficient of variation < 5%) over a 2-minute rolling window + +When both conditions are met for all server instances, the control port responds +`READY` to client polls. Clients poll the control port every 5 seconds during +warmup, and terminate the warmup phase early when the server reports `READY`. + +### Usage + +Set `auto_warmup` to 1: + +```bash +./benchpress_cli.py run tao_bench_autoscale -i '{"auto_warmup": 1}' +``` + +This is most useful combined with the memory file option: + +```bash +# Second run with pre-warmed cache + auto-warmup detection +./benchpress_cli.py run tao_bench_autoscale -i '{"memory_file": "/dev/shm/tao_bench_mem", "auto_warmup": 1}' +``` + +The generated client instructions will automatically include the `control_port` +parameter so clients can poll the server's warmup status. + +### Parameters + + - `auto_warmup`: Set to 1 to enable auto-warmup detection. Default is 0 (disabled) + for `tao_bench_autoscale`, 1 (enabled) for `tao_bench_autoscale_v2_beta`. + - `target_hit_ratio`: Target hit ratio for warmup completion detection. Default + is 0.9. Warmup is considered complete when hit ratio reaches 95% of this value. + ## Advanced job: `tao_bench_custom` **NOTE**: We recommend using `tao_bench_autoscale` to start the server as this job diff --git a/packages/tao_bench/args_utils.py b/packages/tao_bench/args_utils.py index 67c4e787..891e907e 100644 --- a/packages/tao_bench/args_utils.py +++ b/packages/tao_bench/args_utils.py @@ -166,6 +166,16 @@ def add_common_server_args(server_parser: ArgumentParser) -> List[Tuple[str, str help="poll interval in seconds for process completion detection; " + "if > 0, use polling mechanism instead of fixed timeout", ) + server_parser.add_argument( + "--memory-file", + type=str, + nargs="?", + const="", + default="", + help="path to memory file for memcached -e flag. " + + "Enables persistent memory backing via mmap. On graceful shutdown (SIGUSR1), " + + "memcached saves state to this file for fast warm restart on subsequent runs.", + ) server_parser.add_argument("--real", action="store_true", help="for real") return get_opt_strings(server_parser) diff --git a/packages/tao_bench/run.py b/packages/tao_bench/run.py index 5cd39f39..06fd8394 100755 --- a/packages/tao_bench/run.py +++ b/packages/tao_bench/run.py @@ -9,6 +9,7 @@ import pathlib import re import shlex +import signal import subprocess import sys import threading @@ -62,6 +63,7 @@ def run_cmd( cmd: List[str], timeout=None, for_real=True, + graceful_signal=None, ) -> str: print(" ".join(cmd)) if for_real: @@ -72,18 +74,40 @@ def run_cmd( try: proc.wait(timeout=timeout) except subprocess.TimeoutExpired: - print(f"Process timeout expired, terminating process {proc.pid}...") - proc.terminate() - try: - # Give the process 5 seconds to terminate gracefully - proc.wait(timeout=5) - print(f"Process {proc.pid} terminated gracefully") - except subprocess.TimeoutExpired: - # If it still doesn't terminate, force kill it - print(f"Process {proc.pid} didn't respond to SIGTERM, force killing...") - proc.kill() - proc.wait() - print(f"Process {proc.pid} killed successfully") + if graceful_signal is not None: + print( + f"Process timeout expired, sending graceful signal " + f"{graceful_signal} to process {proc.pid}..." + ) + os.kill(proc.pid, graceful_signal) + try: + proc.wait(timeout=15) + print(f"Process {proc.pid} exited after graceful signal") + return + except subprocess.TimeoutExpired: + print( + f"Process {proc.pid} didn't exit after graceful signal, " + f"force killing..." + ) + proc.kill() + proc.wait() + print(f"Process {proc.pid} killed successfully") + else: + print(f"Process timeout expired, terminating process {proc.pid}...") + proc.terminate() + try: + # Give the process 5 seconds to terminate gracefully + proc.wait(timeout=5) + print(f"Process {proc.pid} terminated gracefully") + except subprocess.TimeoutExpired: + # If it still doesn't terminate, force kill it + print( + f"Process {proc.pid} didn't respond to SIGTERM, " + f"force killing..." + ) + proc.kill() + proc.wait() + print(f"Process {proc.pid} killed successfully") def profile_server(): @@ -221,6 +245,9 @@ def run_server(args): "-o", ",".join(extended_options), ] + if args.memory_file: + server_cmd += ["-e", args.memory_file] + if "DCPERF_PERF_RECORD" in os.environ and os.environ["DCPERF_PERF_RECORD"] == "1": profiler_wait_time = ( args.warmup_time + args.timeout_buffer + SERVER_PROFILING_DELAY @@ -236,7 +263,8 @@ def run_server(args): os.environ["LD_LIBRARY_PATH"] = os.path.join(TAO_BENCH_DIR, "build-deps/lib") timeout = args.warmup_time + args.test_time + args.timeout_buffer - run_cmd(server_cmd, timeout, args.real) + graceful_sig = signal.SIGUSR1 if args.memory_file else None + run_cmd(server_cmd, timeout, args.real, graceful_signal=graceful_sig) if "DCPERF_PERF_RECORD" in os.environ and os.environ["DCPERF_PERF_RECORD"] == "1": t_prof.cancel() diff --git a/packages/tao_bench/run_autoscale.py b/packages/tao_bench/run_autoscale.py index 03e8df7f..6ebfb695 100755 --- a/packages/tao_bench/run_autoscale.py +++ b/packages/tao_bench/run_autoscale.py @@ -10,6 +10,7 @@ import pathlib import re import shlex +import signal import socket import subprocess import sys @@ -73,13 +74,17 @@ def is_in_range(node_cpu_ranges, input_cpu_range): SERVER_CMD_OPTIONS = [] # To be initialized in init_parser() -def compose_server_cmd(args, cpu_core_range, memsize, port_number): +def compose_server_cmd(args, cpu_core_range, memsize, port_number, instance_index=0): server_args = { optstr: getattr(args, argkey) for optstr, argkey in SERVER_CMD_OPTIONS } server_args["--memsize"] = memsize server_args["--port-number"] = port_number + # Per-instance memory file: append instance index suffix + if args.memory_file: + server_args["--memory-file"] = f"{args.memory_file}.{instance_index}" + # Add custom thread parameters if specified if hasattr(args, "num_fast_threads") and args.num_fast_threads > 0: server_args["--num-fast-threads"] = args.num_fast_threads @@ -97,7 +102,7 @@ def compose_server_cmd(args, cpu_core_range, memsize, port_number): if isinstance(argval, bool): if argval: cmd.append(argname) - elif argval is not None and argval != 0: + elif argval is not None and argval != 0 and argval != "": cmd += [argname, str(argval)] if len(NUMA_NODES) > 1 and (args.bind_cpu > 0 or args.bind_mem > 0): @@ -282,10 +287,73 @@ def distribute_cores(n_parts): return core_ranges +def ensure_shm_capacity(required_gb): + """Expand /dev/shm if it is smaller than required_gb. + + Memory files are stored in /dev/shm (tmpfs). The default tmpfs size is + typically 50% of system RAM, which may be too small when multiple server + instances each need large memory files. This function remounts /dev/shm + with a larger size if needed. + """ + shm_path = "/dev/shm" + try: + stat = os.statvfs(shm_path) + shm_total_gb = (stat.f_blocks * stat.f_frsize) / (1024**3) + if shm_total_gb >= required_gb: + return + target_gb = int(required_gb * 1.1) # 10% headroom + print( + f"/dev/shm is {shm_total_gb:.0f}GB, need {required_gb:.0f}GB. " + f"Expanding to {target_gb}GB..." + ) + ret = subprocess.run( + ["mount", "-o", f"remount,size={target_gb}G", shm_path], + capture_output=True, + ) + if ret.returncode != 0: + print(f"WARNING: Failed to expand /dev/shm: {ret.stderr.decode().strip()}") + else: + print(f"/dev/shm expanded to {target_gb}GB") + except OSError as e: + print(f"WARNING: Could not check /dev/shm capacity: {e}") + + +def graceful_kill_pg(pid, use_sigusr1=False, grace_period=60): + """Kill a process group, optionally sending SIGUSR1 first for graceful shutdown.""" + try: + pgid = os.getpgid(pid) + except (ProcessLookupError, PermissionError): + return + if use_sigusr1: + try: + os.killpg(pgid, signal.SIGUSR1) + # Wait for graceful shutdown + deadline = time.time() + grace_period + while time.time() < deadline: + try: + os.kill(pid, 0) # Check if still alive + time.sleep(0.5) + except ProcessLookupError: + return # Process exited + # Still alive after grace period, force kill + print(f"Process group {pgid} didn't exit after SIGUSR1, force killing...") + except (ProcessLookupError, PermissionError): + return + try: + os.killpg(pgid, 9) + except (ProcessLookupError, PermissionError): + pass + + def run_server(args): # Create DiagnosisRecorder instance (automatically manages env var for subprocesses) recorder = DiagnosisRecorder.get_instance(root_dir=str(BENCHPRESS_ROOT)) + # If memory file is on /dev/shm, ensure tmpfs is large enough + if args.memory_file and args.memory_file.startswith("/dev/shm"): + required_gb = float(args.memsize) * args_utils.MEM_USAGE_FACTOR + ensure_shm_capacity(required_gb) + core_ranges = distribute_cores(args.num_servers) # memory size - split evenly for each server n_mem = float(args.memsize) @@ -298,7 +366,11 @@ def run_server(args): servers.append( [ compose_server_cmd( - args, core_ranges[i], mem_per_inst, args.port_number_start + i + args, + core_ranges[i], + mem_per_inst, + args.port_number_start + i, + instance_index=i, ), open(logpath, "w"), logpath, @@ -402,10 +474,7 @@ def run_server(args): ) for p in procs: if p.poll() is None: # Process still running - try: - os.killpg(os.getpgid(p.pid), 9) - except (ProcessLookupError, PermissionError): - pass + graceful_kill_pg(p.pid, use_sigusr1=bool(args.memory_file)) # Ensure all processes are collected and output is flushed for p in procs: @@ -424,10 +493,7 @@ def run_server(args): # Final cleanup to ensure process groups are terminated for p in procs: - try: - os.killpg(os.getpgid(p.pid), 9) - except (ProcessLookupError, PermissionError): - pass # Process already terminated or we don't have permission + graceful_kill_pg(p.pid, use_sigusr1=bool(args.memory_file)) else: # Original behavior with fixed timeout timeout = ( @@ -441,18 +507,11 @@ def run_server(args): try: (out, err) = p.communicate(timeout=timeout) except subprocess.TimeoutExpired: - # Kill the entire process group - try: - os.killpg(os.getpgid(p.pid), 9) - except ProcessLookupError: - pass # Process already terminated + graceful_kill_pg(p.pid, use_sigusr1=bool(args.memory_file)) (out, err) = p.communicate() finally: # Ensure cleanup even if process completed successfully - try: - os.killpg(os.getpgid(p.pid), 9) - except (ProcessLookupError, PermissionError): - pass # Process already terminated or we don't have permission + graceful_kill_pg(p.pid, use_sigusr1=bool(args.memory_file)) for server in servers: server[1].close() diff --git a/packages/tao_bench/run_standalone.py b/packages/tao_bench/run_standalone.py index 756f3bc8..b7e987a6 100755 --- a/packages/tao_bench/run_standalone.py +++ b/packages/tao_bench/run_standalone.py @@ -186,6 +186,10 @@ def launch_server(port_number_start=11211, bind_cpu=1, bind_mem=1): if hasattr(args, "poll_interval") and args.poll_interval > 0: script_args["--poll-interval"] = args.poll_interval + # Pass through memory file if specified + if hasattr(args, "memory_file") and args.memory_file: + script_args["--memory-file"] = args.memory_file + cmd = [f"{TAO_BENCH_DIR}/run_autoscale.py --real"] for argname, argval in script_args.items():