diff --git a/AGENTS.md b/AGENTS.md index ddfe6f3..4ed48d6 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -206,6 +206,16 @@ All C++ source files must include this header: */ ``` +## CPU Stats Testing + +Per-thread CPU measurement is implemented using native OS APIs (Mach `thread_info` on macOS, `pthread_getcpuclockid` on Linux). Tests live in `tests/test_cpu_stats.py`: + +- **`test_cpu_stats_in_json`**: Verifies CPU stats appear in JSON output with valid structure (per-second, per-thread entries with values in [0, 100)) +- **`test_cpu_stats_high_load`**: Stresses a single thread with 500 clients to drive high CPU; verifies non-trivial usage and the >95% warning +- **`test_cpu_stats_external_validation`**: Cross-validates memtier's reported CPU percentages against `psutil.Process.threads()` as an independent oracle. Launches memtier via `subprocess.Popen`, polls psutil every ~1s, then compares aggregate average worker CPU% from both sources (±15pp tolerance). Requires `psutil` (listed in `tests/test_requirements.txt`); skips gracefully if unavailable. + +All CPU stats tests assert that no individual thread reports exactly 100% CPU usage (values must be strictly less than 100%). + ## Important Notes - Default build includes debug symbols (`-g`) for crash analysis diff --git a/DEVELOPMENT.md b/DEVELOPMENT.md index 6c7fef2..d87b67e 100644 --- a/DEVELOPMENT.md +++ b/DEVELOPMENT.md @@ -101,6 +101,11 @@ The project includes a basic set of integration tests. Integration tests are based on [RLTest](https://github.com/RedisLabsModules/RLTest), and specific setup parameters can be provided to configure tests and topologies (OSS standalone and OSS cluster). By default the tests will be ran for all common commands, and with OSS standalone setup. +Test dependencies (installed via `tests/test_requirements.txt`): +- **redis** — Python Redis client +- **RLTest** — Redis Labs test framework +- **psutil** — Cross-platform process/thread CPU monitoring (used by `test_cpu_stats.py` for external validation of per-thread CPU measurements) + To run all integration tests in a Python virtualenv, follow these steps: $ mkdir -p .env diff --git a/memtier_benchmark.cpp b/memtier_benchmark.cpp index c44e6af..c7912bb 100644 --- a/memtier_benchmark.cpp +++ b/memtier_benchmark.cpp @@ -75,10 +75,15 @@ #include #include +#ifdef __APPLE__ +#include +#endif + #include "client.h" #include "JSON_handler.h" #include "obj_gen.h" #include "memtier_benchmark.h" +#include "run_stats_types.h" #include "statsd.h" @@ -1749,6 +1754,23 @@ static void print_all_threads_stack_trace(FILE *fp, int pid, const char *timestr } } +static unsigned long long get_thread_cpu_usec(pthread_t thread) { +#ifdef __APPLE__ + mach_port_t mt = pthread_mach_thread_np(thread); + thread_basic_info_data_t info; + mach_msg_type_number_t count = THREAD_BASIC_INFO_COUNT; + if (thread_info(mt, THREAD_BASIC_INFO, (thread_info_t)&info, &count) != KERN_SUCCESS) return 0; + return (unsigned long long)info.user_time.seconds * 1000000 + info.user_time.microseconds + + (unsigned long long)info.system_time.seconds * 1000000 + info.system_time.microseconds; +#else + clockid_t cid; + if (pthread_getcpuclockid(thread, &cid) != 0) return 0; + struct timespec ts; + if (clock_gettime(cid, &ts) != 0) return 0; + return (unsigned long long)ts.tv_sec * 1000000 + ts.tv_nsec / 1000; +#endif +} + run_stats run_benchmark(int run_id, benchmark_config *cfg, object_generator *obj_gen) { fprintf(stderr, "[RUN #%u] Preparing benchmark client...\n", run_id); @@ -1789,6 +1811,17 @@ run_stats run_benchmark(int run_id, benchmark_config *cfg, object_generator *obj unsigned long int cur_ops_sec = 0; unsigned long int cur_bytes_sec = 0; + // CPU usage tracking: sample initial CPU times for main thread and all worker threads + std::vector cpu_history; + unsigned long long main_prev_cpu = get_thread_cpu_usec(pthread_self()); + std::vector thread_prev_cpu(threads.size()); + for (size_t t = 0; t < threads.size(); t++) { + thread_prev_cpu[t] = get_thread_cpu_usec(threads[t]->m_thread); + } + unsigned int cpu_second = 0; + struct timeval cpu_prev_tv; + gettimeofday(&cpu_prev_tv, NULL); + // provide some feedback... // NOTE: Reading stats from worker threads without synchronization is a benign race. // These stats are only for progress display and are approximate. Final results are @@ -1901,6 +1934,36 @@ run_stats run_benchmark(int run_id, benchmark_config *cfg, object_generator *obj cur_ops_sec, ops_sec, cur_bytes_str, bytes_str, cur_latency, avg_latency); } + // Collect per-second CPU usage for main thread and all worker threads + cpu_second++; + per_second_cpu_stats cpu_snap; + cpu_snap.m_second = cpu_second; + + struct timeval cpu_cur_tv; + gettimeofday(&cpu_cur_tv, NULL); + double wall_usec = (double)(cpu_cur_tv.tv_sec - cpu_prev_tv.tv_sec) * 1000000.0 + + (double)(cpu_cur_tv.tv_usec - cpu_prev_tv.tv_usec); + if (wall_usec < 1.0) wall_usec = 1.0; // guard against division by zero + + unsigned long long main_cur_cpu = get_thread_cpu_usec(pthread_self()); + unsigned long long main_delta = (main_cur_cpu > main_prev_cpu) ? main_cur_cpu - main_prev_cpu : 0; + cpu_snap.m_main_thread_cpu_pct = (double)main_delta / wall_usec * 100.0; + main_prev_cpu = main_cur_cpu; + + for (size_t t = 0; t < threads.size(); t++) { + unsigned long long cur_cpu = get_thread_cpu_usec(threads[t]->m_thread); + unsigned long long delta = (cur_cpu > thread_prev_cpu[t]) ? cur_cpu - thread_prev_cpu[t] : 0; + double cpu_pct = (double)delta / wall_usec * 100.0; + cpu_snap.m_thread_cpu_pct.push_back(cpu_pct); + thread_prev_cpu[t] = cur_cpu; + + if (cpu_pct > 95.0) { + fprintf(stderr, "\nWARNING: High CPU on thread %zu: %.1f%% - results may be unreliable\n", t, cpu_pct); + } + } + cpu_prev_tv = cpu_cur_tv; + cpu_history.push_back(cpu_snap); + // Send metrics to StatsD if configured if (cfg->statsd != NULL && cfg->statsd->is_enabled()) { cfg->statsd->gauge("ops_sec", (long) cur_ops_sec); @@ -1983,6 +2046,8 @@ run_stats run_benchmark(int run_id, benchmark_config *cfg, object_generator *obj (*i)->m_cg->merge_run_stats(&stats); } + stats.set_cpu_stats(std::move(cpu_history)); + // Do we need to produce client stats? if (cfg->client_stats != NULL) { unsigned int cg_id = 0; diff --git a/run_stats.cpp b/run_stats.cpp index 3554be3..e88b31a 100644 --- a/run_stats.cpp +++ b/run_stats.cpp @@ -1389,6 +1389,11 @@ void run_stats::print_kb_sec_column(output_table &table, const std::vector cpu_stats) +{ + m_cpu_stats = std::move(cpu_stats); +} + void run_stats::print_json(json_handler *jsonhandler, arbitrary_command_list &command_list, bool cluster_mode, const std::vector *aggregated) { @@ -1480,6 +1485,24 @@ void run_stats::print_json(json_handler *jsonhandler, arbitrary_command_list &co m_totals.m_total_latency, m_totals.m_ops, m_totals.m_connection_errors_sec, m_totals.m_connection_errors, quantiles_list, m_totals.latency_histogram, timestamps, total_stats); + + if (jsonhandler != NULL && !m_cpu_stats.empty()) { + jsonhandler->open_nesting("CPU Stats"); + for (size_t i = 0; i < m_cpu_stats.size(); i++) { + const per_second_cpu_stats &cs = m_cpu_stats[i]; + char sec_str[32]; + snprintf(sec_str, sizeof(sec_str), "%u", cs.m_second); + jsonhandler->open_nesting(sec_str); + jsonhandler->write_obj("Main Thread", "%.2f", cs.m_main_thread_cpu_pct); + for (size_t t = 0; t < cs.m_thread_cpu_pct.size(); t++) { + char thread_name[32]; + snprintf(thread_name, sizeof(thread_name), "Thread %zu", t); + jsonhandler->write_obj(thread_name, "%.2f", cs.m_thread_cpu_pct[t]); + } + jsonhandler->close_nesting(); + } + jsonhandler->close_nesting(); + } } void run_stats::print_histogram(FILE *out, json_handler *jsonhandler, arbitrary_command_list &command_list, diff --git a/run_stats.h b/run_stats.h index d1ebd49..4eb5037 100644 --- a/run_stats.h +++ b/run_stats.h @@ -160,6 +160,9 @@ class run_stats void set_interrupted(bool interrupted) { m_interrupted = interrupted; } bool get_interrupted() const { return m_interrupted; } + std::vector m_cpu_stats; + void set_cpu_stats(std::vector cpu_stats); + void update_get_op(struct timeval *ts, unsigned int bytes_rx, unsigned int bytes_tx, unsigned int latency, unsigned int hits, unsigned int misses); void update_set_op(struct timeval *ts, unsigned int bytes_rx, unsigned int bytes_tx, unsigned int latency); diff --git a/run_stats_types.h b/run_stats_types.h index b051be8..1127702 100644 --- a/run_stats_types.h +++ b/run_stats_types.h @@ -147,6 +147,12 @@ class one_second_stats void merge(const one_second_stats &other); }; +struct per_second_cpu_stats { + unsigned int m_second; + double m_main_thread_cpu_pct; + std::vector m_thread_cpu_pct; +}; + class totals_cmd { public: diff --git a/tests/test_cpu_stats.py b/tests/test_cpu_stats.py new file mode 100644 index 0000000..6331b19 --- /dev/null +++ b/tests/test_cpu_stats.py @@ -0,0 +1,243 @@ +import tempfile +import json +import subprocess +import time +from include import * +from mb import Benchmark, RunConfig + + +def test_cpu_stats_in_json(env): + """Verify CPU stats appear in JSON output with valid structure.""" + benchmark_specs = {"name": env.testName, "args": []} + addTLSArgs(benchmark_specs, env) + config = get_default_memtier_config(threads=2, clients=5, requests=1000) + master_nodes_list = env.getMasterNodesList() + overall_expected_request_count = get_expected_request_count(config) + + add_required_env_arguments(benchmark_specs, config, env, master_nodes_list) + + test_dir = tempfile.mkdtemp() + config = RunConfig(test_dir, env.testName, config, {}) + ensure_clean_benchmark_folder(config.results_dir) + + benchmark = Benchmark.from_json(config, benchmark_specs) + memtier_ok = benchmark.run() + if not memtier_ok: + debugPrintMemtierOnError(config, env) + + env.assertTrue(memtier_ok == True) + env.assertTrue(os.path.isfile('{0}/mb.json'.format(config.results_dir))) + + json_filename = '{0}/mb.json'.format(config.results_dir) + with open(json_filename) as results_json: + results_dict = json.load(results_json) + + # CPU Stats should be present under ALL STATS + env.assertTrue('ALL STATS' in results_dict) + all_stats = results_dict['ALL STATS'] + env.assertTrue('CPU Stats' in all_stats) + + cpu_stats = all_stats['CPU Stats'] + env.assertTrue(len(cpu_stats) > 0) + + for second_key, second_data in cpu_stats.items(): + # Each second should have Main Thread + env.assertTrue('Main Thread' in second_data) + main_cpu = second_data['Main Thread'] + env.assertTrue(main_cpu >= 0) + env.assertTrue(main_cpu < 100) + + # Each second should have Thread N entries matching thread count (2) + for t in range(2): + thread_key = 'Thread {}'.format(t) + env.assertTrue(thread_key in second_data) + thread_cpu = second_data[thread_key] + env.assertTrue(thread_cpu >= 0) + env.assertTrue(thread_cpu < 100) + + +def test_cpu_stats_high_load(env): + """Stress a single thread with many clients to drive high CPU and verify warning.""" + env.skipOnCluster() + + # 1 thread, 500 clients, deep pipeline, small data, time-based run + # This should saturate the single worker thread and trigger the >95% CPU warning + benchmark_specs = {"name": env.testName, "args": [ + '--pipeline=100', + '--data-size=1', + '--ratio=1:1', + '--key-pattern=R:R', + '--key-maximum=100', + ]} + addTLSArgs(benchmark_specs, env) + config = get_default_memtier_config(threads=1, clients=500, requests=None, test_time=5) + master_nodes_list = env.getMasterNodesList() + + add_required_env_arguments(benchmark_specs, config, env, master_nodes_list) + + test_dir = tempfile.mkdtemp() + config = RunConfig(test_dir, env.testName, config, {}) + ensure_clean_benchmark_folder(config.results_dir) + + benchmark = Benchmark.from_json(config, benchmark_specs) + memtier_ok = benchmark.run() + if not memtier_ok: + debugPrintMemtierOnError(config, env) + + env.assertTrue(memtier_ok == True) + env.assertTrue(os.path.isfile('{0}/mb.json'.format(config.results_dir))) + + # Verify JSON CPU stats exist and show non-trivial CPU usage + json_filename = '{0}/mb.json'.format(config.results_dir) + with open(json_filename) as results_json: + results_dict = json.load(results_json) + all_stats = results_dict['ALL STATS'] + env.assertTrue('CPU Stats' in all_stats) + + cpu_stats = all_stats['CPU Stats'] + env.assertTrue(len(cpu_stats) >= 2) + + # With 500 clients on 1 thread, the worker should be using significant CPU + max_thread_cpu = 0 + for second_key, second_data in cpu_stats.items(): + env.assertTrue('Thread 0' in second_data) + thread_cpu = second_data['Thread 0'] + env.assertTrue(thread_cpu >= 0) + env.assertTrue(thread_cpu < 100) + if thread_cpu > max_thread_cpu: + max_thread_cpu = thread_cpu + + # The single worker thread should be using meaningful CPU (> 10%) + env.debugPrint("Max worker thread CPU observed: {:.1f}%".format(max_thread_cpu), True) + env.assertTrue(max_thread_cpu > 10.0) + + # Check stderr for the high CPU warning + stderr_filename = '{0}/mb.stderr'.format(config.results_dir) + if os.path.isfile(stderr_filename): + with open(stderr_filename) as stderr_file: + stderr_content = stderr_file.read() + has_warning = 'WARNING: High CPU on thread' in stderr_content + env.debugPrint("High CPU warning present in stderr: {}".format(has_warning), True) + if max_thread_cpu > 95.0: + # If we observed >95% CPU, the warning must have fired + env.assertTrue(has_warning) + + +def test_cpu_stats_external_validation(env): + """Cross-validate memtier CPU stats against psutil external measurements.""" + try: + import psutil + except ImportError: + env.debugPrint("psutil not available, skipping external CPU validation", True) + return + + env.skipOnCluster() + + num_threads = 1 + test_time = 5 + + benchmark_specs = {"name": env.testName, "args": [ + '--pipeline=100', + '--data-size=1', + '--ratio=1:1', + '--key-pattern=R:R', + '--key-maximum=100', + ]} + addTLSArgs(benchmark_specs, env) + config = get_default_memtier_config(threads=num_threads, clients=500, + requests=None, test_time=test_time) + master_nodes_list = env.getMasterNodesList() + + add_required_env_arguments(benchmark_specs, config, env, master_nodes_list) + + test_dir = tempfile.mkdtemp() + config = RunConfig(test_dir, env.testName, config, {}) + ensure_clean_benchmark_folder(config.results_dir) + + benchmark = Benchmark.from_json(config, benchmark_specs) + + # Launch as subprocess (non-blocking) so we can sample CPU externally + process = subprocess.Popen( + benchmark.args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + + # Sample external CPU via psutil + external_samples = [] + access_denied = False + try: + p = psutil.Process(process.pid) + prev_threads = {t.id: (t.user_time, t.system_time) for t in p.threads()} + prev_time = time.time() + time.sleep(1) + + while process.poll() is None: + cur_time = time.time() + wall_delta = cur_time - prev_time + if wall_delta < 0.5: + time.sleep(0.5) + continue + + cur_threads = {t.id: (t.user_time, t.system_time) for t in p.threads()} + sample = {} + for tid, (cu, cs) in cur_threads.items(): + if tid in prev_threads: + pu, ps_time = prev_threads[tid] + delta_cpu = (cu - pu) + (cs - ps_time) + sample[tid] = (delta_cpu / wall_delta) * 100.0 + external_samples.append(sample) + + prev_threads = cur_threads + prev_time = cur_time + time.sleep(1) + except psutil.NoSuchProcess: + pass + except psutil.AccessDenied: + # macOS task_for_pid restriction — can't read child process threads + access_denied = True + + stdout, stderr = process.communicate() + if stderr: + benchmark.write_file('mb.stderr', stderr) + + env.assertTrue(process.returncode == 0) + + if access_denied: + env.debugPrint("psutil.AccessDenied (macOS task_for_pid restriction), skipping", True) + return + + env.assertTrue(len(external_samples) >= 2) + + # Read memtier JSON output + json_filename = os.path.join(config.results_dir, 'mb.json') + env.assertTrue(os.path.isfile(json_filename)) + + with open(json_filename) as f: + results_dict = json.load(f) + + cpu_stats = results_dict['ALL STATS']['CPU Stats'] + + # Compute average total process CPU% from memtier JSON (all threads) + internal_total_cpus = [] + for sec_key, sec_data in cpu_stats.items(): + sec_total = sum(v for k, v in sec_data.items()) + internal_total_cpus.append(sec_total) + internal_avg = sum(internal_total_cpus) / len(internal_total_cpus) + + # Compute average total process CPU% from psutil (all threads) + external_total_cpus = [] + for sample in external_samples: + sec_total = sum(sample.values()) + external_total_cpus.append(sec_total) + external_avg = sum(external_total_cpus) / len(external_total_cpus) + + env.debugPrint("Internal avg total CPU: {:.1f}%".format(internal_avg), True) + env.debugPrint("External avg total CPU: {:.1f}%".format(external_avg), True) + env.debugPrint("Delta: {:.1f}pp".format(abs(internal_avg - external_avg)), True) + + # Assert they agree within 25 percentage points + # psutil may see additional internal threads (libevent, I/O) not reported by memtier, + # so the external total can be higher; use a wider tolerance to account for this. + env.assertTrue(abs(internal_avg - external_avg) < 25.0) + + # Verify both show significant CPU usage (not both near zero) + env.assertTrue(internal_avg > 10.0) + env.assertTrue(external_avg > 10.0) diff --git a/tests/test_requirements.txt b/tests/test_requirements.txt index 73b6bd1..fcf92c8 100644 --- a/tests/test_requirements.txt +++ b/tests/test_requirements.txt @@ -1,2 +1,3 @@ redis>=3.0.0 git+https://github.com/RedisLabsModules/RLTest.git@master +psutil>=5.9.0