diff --git a/benchpress/config/jobs.yml b/benchpress/config/jobs.yml index fe163b37..ed6f884b 100644 --- a/benchpress/config/jobs.yml +++ b/benchpress/config/jobs.yml @@ -1045,6 +1045,7 @@ - '--timeout-buffer={timeout_buffer}' - '--disable-tls={disable_tls}' - '--smart-nanosleep={smart_nanosleep}' + - '--memory-file={memory_file}' - '--real' vars: - 'interface_name=eth0' @@ -1056,6 +1057,7 @@ - 'timeout_buffer=120' - 'disable_tls=0' - 'smart_nanosleep=0' + - 'memory_file=' client: args: - 'client' @@ -1073,6 +1075,7 @@ - '--wait-after-warmup={wait_after_warmup}' - '--disable-tls={disable_tls}' - '--client-id={client_id}' + - '--control-port={control_port}' - '--real' vars: - 'server_hostname' @@ -1087,6 +1090,7 @@ - 'sanity=0' - 'disable_tls=0' - 'client_id=0' + - 'control_port=0' hooks: - hook: copymove options: @@ -1123,6 +1127,8 @@ - '--client-wait-after-warmup={client_wait_after_warmup}' - '--disable-tls={disable_tls}' - '--smart-nanosleep={smart_nanosleep}' + - '--memory-file={memory_file}' + - '--auto-warmup={auto_warmup}' - '--real' vars: - 'num_servers=0' @@ -1144,6 +1150,8 @@ - 'client_wait_after_warmup=5' - 'disable_tls=0' - 'smart_nanosleep=0' + - 'memory_file=' + - 'auto_warmup=0' hooks: - hook: tao_instruction - hook: copymove @@ -1179,6 +1187,8 @@ - '--num-slow-threads={num_slow_threads}' - '--num-fast-threads={num_fast_threads}' - '--num-client-threads={num_client_threads}' + - '--memory-file={memory_file}' + - '--auto-warmup={auto_warmup}' vars: - 'num_servers=0' - 'memsize=0' @@ -1196,6 +1206,8 @@ - 'num_slow_threads=0' - 'num_fast_threads=0' - 'num_client_threads=0' + - 'memory_file=' + - 'auto_warmup=0' hooks: - hook: copymove options: @@ -1233,6 +1245,7 @@ - '--test-timeout-buffer={test_timeout_buffer}' - '--postprocessing-timeout-buffer={postprocessing_timeout_buffer}' - '--poll-interval={poll_interval}' + - '--memory-file={memory_file}' vars: - 'num_servers=0' - 'memsize=0.5' @@ -1254,6 +1267,7 @@ - 'test_timeout_buffer=0' - 'postprocessing_timeout_buffer=60' - 'poll_interval=0.2' + - 'memory_file=' hooks: - hook: copymove options: @@ -1290,6 +1304,8 @@ - '--conns-per-server-core={conns_per_server_core}' - '--stats-interval={stats_interval}' - '--disable-tls={disable_tls}' + - '--memory-file={memory_file}' + - '--auto-warmup={auto_warmup}' - '--real' vars: - 'num_servers=0' @@ -1308,6 +1324,8 @@ - 'conns_per_server_core=85' - 'stats_interval=5000' - 'disable_tls=0' + - 'memory_file=' + - 'auto_warmup=1' hooks: - hook: tao_instruction - hook: copymove diff --git a/packages/tao_bench/README.md b/packages/tao_bench/README.md index e2922793..8eba9e5a 100644 --- a/packages/tao_bench/README.md +++ b/packages/tao_bench/README.md @@ -313,6 +313,96 @@ the guide for the `tao_bench_autoscale` job. Just replace the job name. A good run of this workload should have about 75~80% of CPU utilization in the steady state, of which 25~30% should be in userspace. +## Memory file for fast warm restart + +TaoBench supports a memory file option that allows the server to persist its +cache state across runs. On the first run, TaoBench fills the cache from scratch +(cold start). On subsequent runs with the same memory file, the server reloads +the pre-filled cache instantly, skipping the lengthy warmup phase. + +This uses memcached's native `-e` flag, which mmaps a file for slab storage. +On graceful shutdown (SIGUSR1), memcached saves a `.meta` restart file. On the +next startup with the same memory file, memcached restores all cached items +from the file. + +### Usage + +Specify the `memory_file` parameter pointing to a path on a tmpfs filesystem +(e.g. `/dev/shm`): + +```bash +./benchpress_cli.py run tao_bench_autoscale -i '{"memory_file": "/dev/shm/tao_bench_mem"}' +``` + +On the first run, TaoBench will create per-instance memory files at +`/dev/shm/tao_bench_mem.0`, `/dev/shm/tao_bench_mem.1`, etc. When the run +completes, the server is shut down with SIGUSR1 which saves the restart +metadata (a small `.meta` file alongside each memory file). + +On subsequent runs with the same `memory_file` path, the server loads all +cached items from the memory files, drastically reducing warmup time. + +### /dev/shm sizing + +The memory files are stored in `/dev/shm` (tmpfs), which defaults to 50% of +system RAM. When using large `memsize` values, `/dev/shm` may be too small. +TaoBench automatically expands `/dev/shm` when needed (requires root), but +you can also do it manually: + +```bash +mount -o remount,size=800G /dev/shm +``` + +### Parameters + + - `memory_file`: Path prefix for memory files. Each server instance gets a + suffixed file (e.g. `/dev/shm/tao_bench_mem.0`). Default is empty (disabled). + +## Auto-warmup detection + +When enabled, TaoBench monitors server statistics during the warmup phase and +automatically signals clients to stop warmup early once the server reaches a +warmed-up state. The `warmup_time` parameter becomes the maximum warmup +duration rather than a fixed duration. + +### How it works + +The server opens a TCP control port (`port_number_start + 1000`, default 12211) +and monitors the server log output in real time. Warmup is considered complete +when: + +1. Hit ratio reaches >= 95% of `target_hit_ratio` (default 0.9, threshold 0.855) +2. QPS stabilizes (coefficient of variation < 5%) over a 2-minute rolling window + +When both conditions are met for all server instances, the control port responds +`READY` to client polls. Clients poll the control port every 5 seconds during +warmup, and terminate the warmup phase early when the server reports `READY`. + +### Usage + +Set `auto_warmup` to 1: + +```bash +./benchpress_cli.py run tao_bench_autoscale -i '{"auto_warmup": 1}' +``` + +This is most useful combined with the memory file option: + +```bash +# Second run with pre-warmed cache + auto-warmup detection +./benchpress_cli.py run tao_bench_autoscale -i '{"memory_file": "/dev/shm/tao_bench_mem", "auto_warmup": 1}' +``` + +The generated client instructions will automatically include the `control_port` +parameter so clients can poll the server's warmup status. + +### Parameters + + - `auto_warmup`: Set to 1 to enable auto-warmup detection. Default is 0 (disabled) + for `tao_bench_autoscale`, 1 (enabled) for `tao_bench_autoscale_v2_beta`. + - `target_hit_ratio`: Target hit ratio for warmup completion detection. Default + is 0.9. Warmup is considered complete when hit ratio reaches 95% of this value. + ## Advanced job: `tao_bench_custom` **NOTE**: We recommend using `tao_bench_autoscale` to start the server as this job diff --git a/packages/tao_bench/args_utils.py b/packages/tao_bench/args_utils.py index 67c4e787..1a71f780 100644 --- a/packages/tao_bench/args_utils.py +++ b/packages/tao_bench/args_utils.py @@ -166,6 +166,32 @@ def add_common_server_args(server_parser: ArgumentParser) -> List[Tuple[str, str help="poll interval in seconds for process completion detection; " + "if > 0, use polling mechanism instead of fixed timeout", ) + server_parser.add_argument( + "--memory-file", + type=str, + nargs="?", + const="", + default="", + help="path to memory file for memcached -e flag. " + + "Enables persistent memory backing via mmap. On graceful shutdown (SIGUSR1), " + + "memcached saves state to this file for fast warm restart on subsequent runs.", + ) + server_parser.add_argument( + "--auto-warmup", + type=int, + default=0, + help="enable auto-warmup detection. When enabled, monitors server stats " + + "to detect when warmup is complete (hit ratio and QPS stability) and " + + "signals clients via TCP control port to stop warmup early. " + + "warmup_time becomes the maximum warmup duration.", + ) + server_parser.add_argument( + "--target-hit-ratio", + type=float, + default=0.9, + help="target hit ratio for auto-warmup detection. Warmup is considered " + + "complete when hit ratio reaches 95%% of this value.", + ) server_parser.add_argument("--real", action="store_true", help="for real") return get_opt_strings(server_parser) @@ -250,6 +276,14 @@ def add_common_client_args(client_parser: ArgumentParser) -> List[Tuple[str, str default=30, help="extra time buffer for test phase timeout in seconds", ) + client_parser.add_argument( + "--control-port", + type=int, + default=0, + help="TCP port to poll on server for warmup completion signal. " + + "When > 0, client polls this port during warmup and stops early " + + "when server reports warmed up. 0 = disabled (use full warmup time).", + ) client_parser.add_argument("--real", action="store_true", help="for real") return get_opt_strings(client_parser) diff --git a/packages/tao_bench/run.py b/packages/tao_bench/run.py index 5cd39f39..3b1b9102 100755 --- a/packages/tao_bench/run.py +++ b/packages/tao_bench/run.py @@ -9,6 +9,7 @@ import pathlib import re import shlex +import signal import subprocess import sys import threading @@ -16,6 +17,7 @@ from typing import List import args_utils +from warmup_monitor import poll_control_port sys.path.insert(0, str(pathlib.Path(__file__).parents[1] / "common")) @@ -62,6 +64,7 @@ def run_cmd( cmd: List[str], timeout=None, for_real=True, + graceful_signal=None, ) -> str: print(" ".join(cmd)) if for_real: @@ -72,18 +75,40 @@ def run_cmd( try: proc.wait(timeout=timeout) except subprocess.TimeoutExpired: - print(f"Process timeout expired, terminating process {proc.pid}...") - proc.terminate() - try: - # Give the process 5 seconds to terminate gracefully - proc.wait(timeout=5) - print(f"Process {proc.pid} terminated gracefully") - except subprocess.TimeoutExpired: - # If it still doesn't terminate, force kill it - print(f"Process {proc.pid} didn't respond to SIGTERM, force killing...") - proc.kill() - proc.wait() - print(f"Process {proc.pid} killed successfully") + if graceful_signal is not None: + print( + f"Process timeout expired, sending graceful signal " + f"{graceful_signal} to process {proc.pid}..." + ) + os.kill(proc.pid, graceful_signal) + try: + proc.wait(timeout=60) + print(f"Process {proc.pid} exited after graceful signal") + return + except subprocess.TimeoutExpired: + print( + f"Process {proc.pid} didn't exit after graceful signal, " + f"force killing..." + ) + proc.kill() + proc.wait() + print(f"Process {proc.pid} killed successfully") + else: + print(f"Process timeout expired, terminating process {proc.pid}...") + proc.terminate() + try: + # Give the process 5 seconds to terminate gracefully + proc.wait(timeout=5) + print(f"Process {proc.pid} terminated gracefully") + except subprocess.TimeoutExpired: + # If it still doesn't terminate, force kill it + print( + f"Process {proc.pid} didn't respond to SIGTERM, " + f"force killing..." + ) + proc.kill() + proc.wait() + print(f"Process {proc.pid} killed successfully") def profile_server(): @@ -221,6 +246,9 @@ def run_server(args): "-o", ",".join(extended_options), ] + if args.memory_file: + server_cmd += ["-e", args.memory_file] + if "DCPERF_PERF_RECORD" in os.environ and os.environ["DCPERF_PERF_RECORD"] == "1": profiler_wait_time = ( args.warmup_time + args.timeout_buffer + SERVER_PROFILING_DELAY @@ -236,7 +264,8 @@ def run_server(args): os.environ["LD_LIBRARY_PATH"] = os.path.join(TAO_BENCH_DIR, "build-deps/lib") timeout = args.warmup_time + args.test_time + args.timeout_buffer - run_cmd(server_cmd, timeout, args.real) + graceful_sig = signal.SIGUSR1 if args.memory_file else None + run_cmd(server_cmd, timeout, args.real, graceful_signal=graceful_sig) if "DCPERF_PERF_RECORD" in os.environ and os.environ["DCPERF_PERF_RECORD"] == "1": t_prof.cancel() @@ -322,10 +351,47 @@ def run_client(args): breakdown_utils.log_preprocessing_warmup_start(TAO_BENCH_DIR, "") print("warm up phase ...") - cmd = get_client_cmd(args, n_seconds=args.warmup_time) - run_cmd( - cmd, timeout=args.warmup_time + args.warmup_timeout_buffer, for_real=args.real - ) + if args.control_port > 0 and args.warmup_time > 0: + # Auto-warmup: poll control port and terminate warmup early when server + # reports READY + cmd = get_client_cmd(args, n_seconds=args.warmup_time) + print(" ".join(cmd)) + if args.real: + warmup_proc = subprocess.Popen(cmd, stderr=subprocess.STDOUT) + warmup_deadline = ( + time.time() + args.warmup_time + args.warmup_timeout_buffer + ) + poll_interval = 5 + while time.time() < warmup_deadline: + if warmup_proc.poll() is not None: + # Warmup process exited on its own + break + if poll_control_port(args.server_hostname, args.control_port): + print("[AutoWarmup] Server reports READY, stopping warmup early") + warmup_proc.terminate() + try: + warmup_proc.wait(timeout=5) + except subprocess.TimeoutExpired: + warmup_proc.kill() + warmup_proc.wait() + break + time.sleep(poll_interval) + else: + # Timeout reached + print("[AutoWarmup] Max warmup time reached, proceeding to test") + warmup_proc.terminate() + try: + warmup_proc.wait(timeout=5) + except subprocess.TimeoutExpired: + warmup_proc.kill() + warmup_proc.wait() + else: + cmd = get_client_cmd(args, n_seconds=args.warmup_time) + run_cmd( + cmd, + timeout=args.warmup_time + args.warmup_timeout_buffer, + for_real=args.real, + ) if args.real and args.wait_after_warmup > 0: time.sleep(args.wait_after_warmup) diff --git a/packages/tao_bench/run_autoscale.py b/packages/tao_bench/run_autoscale.py index 03e8df7f..4431e4b6 100755 --- a/packages/tao_bench/run_autoscale.py +++ b/packages/tao_bench/run_autoscale.py @@ -10,6 +10,7 @@ import pathlib import re import shlex +import signal import socket import subprocess import sys @@ -18,6 +19,7 @@ from parser import TaoBenchParser import args_utils +from warmup_monitor import LogTailer, WarmupControlServer, WarmupMonitor # Add parent directory to path to import diagnosis_utils sys.path.insert(0, str(pathlib.Path(__file__).parents[1] / "common")) @@ -73,13 +75,17 @@ def is_in_range(node_cpu_ranges, input_cpu_range): SERVER_CMD_OPTIONS = [] # To be initialized in init_parser() -def compose_server_cmd(args, cpu_core_range, memsize, port_number): +def compose_server_cmd(args, cpu_core_range, memsize, port_number, instance_index=0): server_args = { optstr: getattr(args, argkey) for optstr, argkey in SERVER_CMD_OPTIONS } server_args["--memsize"] = memsize server_args["--port-number"] = port_number + # Per-instance memory file: append instance index suffix + if args.memory_file: + server_args["--memory-file"] = f"{args.memory_file}.{instance_index}" + # Add custom thread parameters if specified if hasattr(args, "num_fast_threads") and args.num_fast_threads > 0: server_args["--num-fast-threads"] = args.num_fast_threads @@ -97,7 +103,7 @@ def compose_server_cmd(args, cpu_core_range, memsize, port_number): if isinstance(argval, bool): if argval: cmd.append(argname) - elif argval is not None and argval != 0: + elif argval is not None and argval != 0 and argval != "": cmd += [argname, str(argval)] if len(NUMA_NODES) > 1 and (args.bind_cpu > 0 or args.bind_mem > 0): @@ -177,6 +183,8 @@ def gen_client_instructions(args, to_file=True): client_args["disable_tls"] = 1 if hasattr(args, "num_client_threads") and args.num_client_threads > 0: client_args["num_threads"] = args.num_client_threads + if hasattr(args, "auto_warmup") and args.auto_warmup > 0: + client_args["control_port"] = args.port_number_start + 1000 clients[c] += ( " ".join( [ @@ -213,6 +221,8 @@ def gen_client_instructions(args, to_file=True): client_args["disable_tls"] = 1 if hasattr(args, "num_client_threads") and args.num_client_threads > 0: client_args["num_threads"] = args.num_client_threads + if hasattr(args, "auto_warmup") and args.auto_warmup > 0: + client_args["control_port"] = args.port_number_start + 1000 clients[i] += ( " ".join( [ @@ -282,10 +292,73 @@ def distribute_cores(n_parts): return core_ranges +def ensure_shm_capacity(required_gb): + """Expand /dev/shm if it is smaller than required_gb. + + Memory files are stored in /dev/shm (tmpfs). The default tmpfs size is + typically 50% of system RAM, which may be too small when multiple server + instances each need large memory files. This function remounts /dev/shm + with a larger size if needed. + """ + shm_path = "/dev/shm" + try: + stat = os.statvfs(shm_path) + shm_total_gb = (stat.f_blocks * stat.f_frsize) / (1024**3) + if shm_total_gb >= required_gb: + return + target_gb = int(required_gb * 1.1) # 10% headroom + print( + f"/dev/shm is {shm_total_gb:.0f}GB, need {required_gb:.0f}GB. " + f"Expanding to {target_gb}GB..." + ) + ret = subprocess.run( + ["mount", "-o", f"remount,size={target_gb}G", shm_path], + capture_output=True, + ) + if ret.returncode != 0: + print(f"WARNING: Failed to expand /dev/shm: {ret.stderr.decode().strip()}") + else: + print(f"/dev/shm expanded to {target_gb}GB") + except OSError as e: + print(f"WARNING: Could not check /dev/shm capacity: {e}") + + +def graceful_kill_pg(pid, use_sigusr1=False, grace_period=60): + """Kill a process group, optionally sending SIGUSR1 first for graceful shutdown.""" + try: + pgid = os.getpgid(pid) + except (ProcessLookupError, PermissionError): + return + if use_sigusr1: + try: + os.killpg(pgid, signal.SIGUSR1) + # Wait for graceful shutdown + deadline = time.time() + grace_period + while time.time() < deadline: + try: + os.kill(pid, 0) # Check if still alive + time.sleep(0.5) + except ProcessLookupError: + return # Process exited + # Still alive after grace period, force kill + print(f"Process group {pgid} didn't exit after SIGUSR1, force killing...") + except (ProcessLookupError, PermissionError): + return + try: + os.killpg(pgid, 9) + except (ProcessLookupError, PermissionError): + pass + + def run_server(args): # Create DiagnosisRecorder instance (automatically manages env var for subprocesses) recorder = DiagnosisRecorder.get_instance(root_dir=str(BENCHPRESS_ROOT)) + # If memory file is on /dev/shm, ensure tmpfs is large enough + if args.memory_file and args.memory_file.startswith("/dev/shm"): + required_gb = float(args.memsize) * args_utils.MEM_USAGE_FACTOR + ensure_shm_capacity(required_gb) + core_ranges = distribute_cores(args.num_servers) # memory size - split evenly for each server n_mem = float(args.memsize) @@ -298,7 +371,11 @@ def run_server(args): servers.append( [ compose_server_cmd( - args, core_ranges[i], mem_per_inst, args.port_number_start + i + args, + core_ranges[i], + mem_per_inst, + args.port_number_start + i, + instance_index=i, ), open(logpath, "w"), logpath, @@ -321,6 +398,18 @@ def run_server(args): if match: latency = match.group(1) + # Set up warmup monitoring if auto-warmup is enabled + warmup_monitors = [] + log_tailers = [] + control_server = None + if args.auto_warmup > 0: + control_port = args.port_number_start + 1000 + for i in range(args.num_servers): + monitor = WarmupMonitor(target_hit_ratio=args.target_hit_ratio) + warmup_monitors.append(monitor) + control_server = WarmupControlServer(control_port, warmup_monitors) + control_server.start() + # let's spawn servers procs = [] for server in servers: @@ -333,14 +422,83 @@ def run_server(args): start_new_session=True, # Create new process group ) procs.append(p) + + # Start log tailers for warmup monitoring + if args.auto_warmup > 0: + for i in range(args.num_servers): + tailer = LogTailer(servers[i][2], warmup_monitors[i], instance_id=i) + tailer.start() + log_tailers.append(tailer) + # wait for servers to finish - add extra time to make sure # post-processing will finish - if args.poll_interval > 0: + max_warmup_time = args_utils.get_warmup_time(args) + + # When auto-warmup is enabled, wait for warmup completion instead of + # fixed warmup_time. This allows the server to finish earlier on warm + # restarts where the cache is already pre-loaded. + if args.auto_warmup > 0 and warmup_monitors: + warmup_start = time.time() + print( + f"Auto-warmup enabled: waiting up to {max_warmup_time}s for " + f"warmup completion..." + ) + while time.time() - warmup_start < max_warmup_time: + if all(m.is_warmed_up for m in warmup_monitors): + actual_warmup = time.time() - warmup_start + print( + f"Auto-warmup: all instances warmed up after " + f"{actual_warmup:.0f}s (max was {max_warmup_time}s)" + ) + break + time.sleep(5) + else: + print( + f"Auto-warmup: max warmup time {max_warmup_time}s reached, " + f"proceeding to test phase" + ) + remaining = args.test_time + print(f"Waiting {remaining}s for test phase...") + time.sleep(remaining) + + # Send SIGUSR1 to all process groups in parallel, then wait + if args.memory_file: + for p in procs: + try: + os.killpg(os.getpgid(p.pid), signal.SIGUSR1) + except (ProcessLookupError, PermissionError): + pass + # Wait up to 60s for all to exit + deadline = time.time() + 60 + while time.time() < deadline: + if all(p.poll() is not None for p in procs): + break + time.sleep(1) + + # Force kill any remaining + for p in procs: + if p.poll() is None: + try: + os.killpg(os.getpgid(p.pid), 9) + except (ProcessLookupError, PermissionError): + pass + + # Collect processes and flush output + for p in procs: + try: + p.communicate(timeout=1) + except subprocess.TimeoutExpired: + pass + for server in servers: + try: + server[1].flush() + os.fsync(server[1].fileno()) + except (OSError, ValueError): + pass + elif args.poll_interval > 0: # Use intelligent process polling instead of fixed timeout # First wait for base timeout (warmup + test + timeout_buffer) - base_timeout = ( - args_utils.get_warmup_time(args) + args.test_time + args.timeout_buffer - ) + base_timeout = max_warmup_time + args.test_time + args.timeout_buffer print(f"Waiting {base_timeout}s for processes to complete normally...") time.sleep(base_timeout) @@ -402,10 +560,7 @@ def run_server(args): ) for p in procs: if p.poll() is None: # Process still running - try: - os.killpg(os.getpgid(p.pid), 9) - except (ProcessLookupError, PermissionError): - pass + graceful_kill_pg(p.pid, use_sigusr1=bool(args.memory_file)) # Ensure all processes are collected and output is flushed for p in procs: @@ -424,10 +579,7 @@ def run_server(args): # Final cleanup to ensure process groups are terminated for p in procs: - try: - os.killpg(os.getpgid(p.pid), 9) - except (ProcessLookupError, PermissionError): - pass # Process already terminated or we don't have permission + graceful_kill_pg(p.pid, use_sigusr1=bool(args.memory_file)) else: # Original behavior with fixed timeout timeout = ( @@ -441,21 +593,20 @@ def run_server(args): try: (out, err) = p.communicate(timeout=timeout) except subprocess.TimeoutExpired: - # Kill the entire process group - try: - os.killpg(os.getpgid(p.pid), 9) - except ProcessLookupError: - pass # Process already terminated + graceful_kill_pg(p.pid, use_sigusr1=bool(args.memory_file)) (out, err) = p.communicate() finally: # Ensure cleanup even if process completed successfully - try: - os.killpg(os.getpgid(p.pid), 9) - except (ProcessLookupError, PermissionError): - pass # Process already terminated or we don't have permission + graceful_kill_pg(p.pid, use_sigusr1=bool(args.memory_file)) for server in servers: server[1].close() + # Clean up warmup monitoring + for tailer in log_tailers: + tailer.stop() + if control_server: + control_server.stop() + # Initialize diagnosis recorder for detailed logging recorder = DiagnosisRecorder.get_instance(root_dir=str(BENCHPRESS_ROOT)) diff --git a/packages/tao_bench/run_standalone.py b/packages/tao_bench/run_standalone.py index 756f3bc8..e250fb57 100755 --- a/packages/tao_bench/run_standalone.py +++ b/packages/tao_bench/run_standalone.py @@ -186,6 +186,16 @@ def launch_server(port_number_start=11211, bind_cpu=1, bind_mem=1): if hasattr(args, "poll_interval") and args.poll_interval > 0: script_args["--poll-interval"] = args.poll_interval + # Pass through memory file if specified + if hasattr(args, "memory_file") and args.memory_file: + script_args["--memory-file"] = args.memory_file + + # Pass through auto-warmup if specified + if hasattr(args, "auto_warmup") and args.auto_warmup > 0: + script_args["--auto-warmup"] = args.auto_warmup + if hasattr(args, "target_hit_ratio") and args.target_hit_ratio != 0.9: + script_args["--target-hit-ratio"] = args.target_hit_ratio + cmd = [f"{TAO_BENCH_DIR}/run_autoscale.py --real"] for argname, argval in script_args.items(): diff --git a/packages/tao_bench/warmup_monitor.py b/packages/tao_bench/warmup_monitor.py new file mode 100644 index 00000000..bc6e48c8 --- /dev/null +++ b/packages/tao_bench/warmup_monitor.py @@ -0,0 +1,182 @@ +#!/usr/bin/env python3 +# Copyright (c) Meta Platforms, Inc. and affiliates. +# +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. + +import os +import socket +import threading +import time +from parser import TaoBenchServerSnapshot + + +class WarmupMonitor: + """Monitors server stats output to detect when warmup is complete. + + Warmup is considered complete when: + 1. Hit ratio >= hit_ratio_factor * target_hit_ratio + 2. QPS is stable (coefficient of variation < stability_threshold) + over a rolling window of stability_window data points + """ + + def __init__( + self, + target_hit_ratio=0.9, + stability_window=24, + stability_threshold=0.05, + hit_ratio_factor=0.95, + ): + self.target_hit_ratio = target_hit_ratio + self.hit_threshold = hit_ratio_factor * target_hit_ratio + self.stability_window = stability_window # 24 * 5s = 2 minutes + self.stability_threshold = stability_threshold + self.qps_history = [] + self.is_warmed_up = False + self._lock = threading.Lock() + + def process_line(self, line): + """Process a server stats line. Returns True if warmup just completed.""" + snapshot = TaoBenchServerSnapshot(line) + if not snapshot.valid: + return False + + hit_rate = snapshot.get("hit_rate") + total_qps = snapshot.get("fast_qps") + snapshot.get("slow_qps") + + with self._lock: + if self.is_warmed_up: + return False + + self.qps_history.append(total_qps) + if len(self.qps_history) > self.stability_window: + self.qps_history.pop(0) + + if hit_rate >= self.hit_threshold and self._is_qps_stable(): + self.is_warmed_up = True + print( + f"[WarmupMonitor] Warmup complete: hit_rate={hit_rate:.4f} " + f"(threshold={self.hit_threshold:.4f}), " + f"QPS stable over {len(self.qps_history)} data points" + ) + return True + return False + + def _is_qps_stable(self): + if len(self.qps_history) < self.stability_window: + return False + mean_qps = sum(self.qps_history) / len(self.qps_history) + if mean_qps == 0: + return False + variance = sum((x - mean_qps) ** 2 for x in self.qps_history) / len( + self.qps_history + ) + cv = (variance**0.5) / mean_qps + return cv < self.stability_threshold + + +class WarmupControlServer: + """TCP server that responds to client warmup status polls. + + Clients connect and receive either "READY\\n" or "WAITING\\n". + """ + + def __init__(self, port, monitors): + self.port = port + self.monitors = monitors + self.server_socket = None + self._thread = None + self.running = False + + def start(self): + self.server_socket = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) + self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.server_socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0) + self.server_socket.settimeout(1.0) + self.server_socket.bind(("::", self.port)) + self.server_socket.listen(32) + self.running = True + self._thread = threading.Thread(target=self._handle_connections, daemon=True) + self._thread.start() + print(f"[WarmupControlServer] Listening on port {self.port}") + + def _handle_connections(self): + while self.running: + try: + conn, addr = self.server_socket.accept() + try: + if all(m.is_warmed_up for m in self.monitors): + conn.sendall(b"READY\n") + else: + conn.sendall(b"WAITING\n") + finally: + conn.close() + except socket.timeout: + continue + except OSError: + break + + def stop(self): + self.running = False + if self.server_socket: + try: + self.server_socket.close() + except OSError: + pass + if self._thread: + self._thread.join(timeout=5) + print("[WarmupControlServer] Stopped") + + +class LogTailer: + """Background thread that tails a log file and feeds lines to a WarmupMonitor.""" + + def __init__(self, log_path, monitor, instance_id=0): + self.log_path = log_path + self.monitor = monitor + self.instance_id = instance_id + self._thread = None + self.running = False + + def start(self): + self.running = True + self._thread = threading.Thread(target=self._tail, daemon=True) + self._thread.start() + + def _tail(self): + # Wait for file to appear + while self.running and not os.path.exists(self.log_path): + time.sleep(0.5) + if not self.running: + return + + with open(self.log_path, "r") as f: + while self.running: + line = f.readline() + if line: + if self.monitor.process_line(line): + print( + f"[LogTailer] Server instance {self.instance_id} " + f"warmup complete" + ) + else: + # No new data yet + if self.monitor.is_warmed_up: + # Stop tailing once warmed up + return + time.sleep(0.5) + + def stop(self): + self.running = False + if self._thread: + self._thread.join(timeout=5) + + +def poll_control_port(hostname, port, timeout=5): + """Poll the warmup control server. Returns True if server reports READY.""" + try: + with socket.create_connection((hostname, port), timeout=timeout) as s: + data = s.recv(64).decode().strip() + return data == "READY" + except OSError: + return False