Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,16 @@ All C++ source files must include this header:
*/
```

## CPU Stats Testing

Per-thread CPU measurement is implemented using native OS APIs (Mach `thread_info` on macOS, `pthread_getcpuclockid` on Linux). Tests live in `tests/test_cpu_stats.py`:

- **`test_cpu_stats_in_json`**: Verifies CPU stats appear in JSON output with valid structure (per-second, per-thread entries with values in [0, 100))
- **`test_cpu_stats_high_load`**: Stresses a single thread with 500 clients to drive high CPU; verifies non-trivial usage and the >95% warning
- **`test_cpu_stats_external_validation`**: Cross-validates memtier's reported CPU percentages against `psutil.Process.threads()` as an independent oracle. Launches memtier via `subprocess.Popen`, polls psutil every ~1s, then compares aggregate average worker CPU% from both sources (±15pp tolerance). Requires `psutil` (listed in `tests/test_requirements.txt`); skips gracefully if unavailable.

All CPU stats tests assert that no individual thread reports exactly 100% CPU usage (values must be strictly less than 100%).

## Important Notes

- Default build includes debug symbols (`-g`) for crash analysis
Expand Down
5 changes: 5 additions & 0 deletions DEVELOPMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ The project includes a basic set of integration tests.
Integration tests are based on [RLTest](https://github.com/RedisLabsModules/RLTest), and specific setup parameters can be provided
to configure tests and topologies (OSS standalone and OSS cluster). By default the tests will be ran for all common commands, and with OSS standalone setup.

Test dependencies (installed via `tests/test_requirements.txt`):
- **redis** — Python Redis client
- **RLTest** — Redis Labs test framework
- **psutil** — Cross-platform process/thread CPU monitoring (used by `test_cpu_stats.py` for external validation of per-thread CPU measurements)

To run all integration tests in a Python virtualenv, follow these steps:

$ mkdir -p .env
Expand Down
65 changes: 65 additions & 0 deletions memtier_benchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,15 @@
#include <atomic>
#include <algorithm>

#ifdef __APPLE__
#include <mach/mach.h>
#endif

#include "client.h"
#include "JSON_handler.h"
#include "obj_gen.h"
#include "memtier_benchmark.h"
#include "run_stats_types.h"
#include "statsd.h"


Expand Down Expand Up @@ -1749,6 +1754,23 @@ static void print_all_threads_stack_trace(FILE *fp, int pid, const char *timestr
}
}

static unsigned long long get_thread_cpu_usec(pthread_t thread) {
#ifdef __APPLE__
mach_port_t mt = pthread_mach_thread_np(thread);
thread_basic_info_data_t info;
mach_msg_type_number_t count = THREAD_BASIC_INFO_COUNT;
if (thread_info(mt, THREAD_BASIC_INFO, (thread_info_t)&info, &count) != KERN_SUCCESS) return 0;
return (unsigned long long)info.user_time.seconds * 1000000 + info.user_time.microseconds
+ (unsigned long long)info.system_time.seconds * 1000000 + info.system_time.microseconds;
#else
clockid_t cid;
if (pthread_getcpuclockid(thread, &cid) != 0) return 0;
struct timespec ts;
if (clock_gettime(cid, &ts) != 0) return 0;
return (unsigned long long)ts.tv_sec * 1000000 + ts.tv_nsec / 1000;
#endif
}

run_stats run_benchmark(int run_id, benchmark_config *cfg, object_generator *obj_gen)
{
fprintf(stderr, "[RUN #%u] Preparing benchmark client...\n", run_id);
Expand Down Expand Up @@ -1789,6 +1811,17 @@ run_stats run_benchmark(int run_id, benchmark_config *cfg, object_generator *obj
unsigned long int cur_ops_sec = 0;
unsigned long int cur_bytes_sec = 0;

// CPU usage tracking: sample initial CPU times for main thread and all worker threads
std::vector<per_second_cpu_stats> cpu_history;
unsigned long long main_prev_cpu = get_thread_cpu_usec(pthread_self());
std::vector<unsigned long long> thread_prev_cpu(threads.size());
for (size_t t = 0; t < threads.size(); t++) {
thread_prev_cpu[t] = get_thread_cpu_usec(threads[t]->m_thread);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uninitialized pthread handle used for CPU measurement

High Severity

The code reads threads[t]->m_thread immediately after calling start() without verifying that thread creation succeeded. If pthread_create fails, m_thread remains uninitialized and passing it to get_thread_cpu_usec causes undefined behavior (potential crash when calling pthread_mach_thread_np on macOS or pthread_getcpuclockid on Linux with garbage pthread_t values).

Additional Locations (1)

Fix in Cursor Fix in Web

}
unsigned int cpu_second = 0;
struct timeval cpu_prev_tv;
gettimeofday(&cpu_prev_tv, NULL);

// provide some feedback...
// NOTE: Reading stats from worker threads without synchronization is a benign race.
// These stats are only for progress display and are approximate. Final results are
Expand Down Expand Up @@ -1901,6 +1934,36 @@ run_stats run_benchmark(int run_id, benchmark_config *cfg, object_generator *obj
cur_ops_sec, ops_sec, cur_bytes_str, bytes_str, cur_latency, avg_latency);
}

// Collect per-second CPU usage for main thread and all worker threads
cpu_second++;
per_second_cpu_stats cpu_snap;
cpu_snap.m_second = cpu_second;

struct timeval cpu_cur_tv;
gettimeofday(&cpu_cur_tv, NULL);
double wall_usec = (double)(cpu_cur_tv.tv_sec - cpu_prev_tv.tv_sec) * 1000000.0
+ (double)(cpu_cur_tv.tv_usec - cpu_prev_tv.tv_usec);
if (wall_usec < 1.0) wall_usec = 1.0; // guard against division by zero
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wall clock adjustment causes incorrect CPU percentages

Low Severity

If the system clock is adjusted backwards (via NTP, daylight saving, or manual change) during benchmark execution, wall_usec becomes negative. The guard if (wall_usec < 1.0) wall_usec = 1.0 clamps it to 1 microsecond instead of handling the error condition properly, causing CPU deltas divided by 1.0 to produce wildly inflated percentages (potentially millions of percent) in the JSON output.

Fix in Cursor Fix in Web


unsigned long long main_cur_cpu = get_thread_cpu_usec(pthread_self());
unsigned long long main_delta = (main_cur_cpu > main_prev_cpu) ? main_cur_cpu - main_prev_cpu : 0;
cpu_snap.m_main_thread_cpu_pct = (double)main_delta / wall_usec * 100.0;
main_prev_cpu = main_cur_cpu;

for (size_t t = 0; t < threads.size(); t++) {
unsigned long long cur_cpu = get_thread_cpu_usec(threads[t]->m_thread);
unsigned long long delta = (cur_cpu > thread_prev_cpu[t]) ? cur_cpu - thread_prev_cpu[t] : 0;
double cpu_pct = (double)delta / wall_usec * 100.0;
cpu_snap.m_thread_cpu_pct.push_back(cpu_pct);
thread_prev_cpu[t] = cur_cpu;
Comment thread
cursor[bot] marked this conversation as resolved.

if (cpu_pct > 95.0) {
fprintf(stderr, "\nWARNING: High CPU on thread %zu: %.1f%% - results may be unreliable\n", t, cpu_pct);
}
}
cpu_prev_tv = cpu_cur_tv;
cpu_history.push_back(cpu_snap);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CPU stats break across thread restarts

Medium Severity

Per-thread CPU tracking keeps thread_prev_cpu[t] keyed only by thread index, but worker threads can be join()ed and restart()ed with a different pthread_t. After a restart, deltas are computed against the previous thread’s CPU clock (or 0 if calls fail), producing incorrect per-thread CPU% and potentially spurious/missed >95% warnings.

Fix in Cursor Fix in Web


// Send metrics to StatsD if configured
if (cfg->statsd != NULL && cfg->statsd->is_enabled()) {
cfg->statsd->gauge("ops_sec", (long) cur_ops_sec);
Expand Down Expand Up @@ -1983,6 +2046,8 @@ run_stats run_benchmark(int run_id, benchmark_config *cfg, object_generator *obj
(*i)->m_cg->merge_run_stats(&stats);
}

stats.set_cpu_stats(std::move(cpu_history));

// Do we need to produce client stats?
if (cfg->client_stats != NULL) {
unsigned int cg_id = 0;
Expand Down
23 changes: 23 additions & 0 deletions run_stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1389,6 +1389,11 @@ void run_stats::print_kb_sec_column(output_table &table, const std::vector<aggre
table.add_column(column);
}

void run_stats::set_cpu_stats(std::vector<per_second_cpu_stats> cpu_stats)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CPU stats missing from aggregated results

Low Severity

When run_count > 1, average.aggregate_average(all_stats) computes aggregated output, but aggregate_average() never propagates/merges m_cpu_stats, so “AGGREGATED AVERAGE RESULTS” JSON omits CPU Stats even though individual runs may include them.

Additional Locations (1)

Fix in Cursor Fix in Web

{
m_cpu_stats = std::move(cpu_stats);
}

void run_stats::print_json(json_handler *jsonhandler, arbitrary_command_list &command_list, bool cluster_mode,
const std::vector<aggregated_command_type_stats> *aggregated)
{
Expand Down Expand Up @@ -1480,6 +1485,24 @@ void run_stats::print_json(json_handler *jsonhandler, arbitrary_command_list &co
m_totals.m_total_latency, m_totals.m_ops, m_totals.m_connection_errors_sec,
m_totals.m_connection_errors, quantiles_list, m_totals.latency_histogram, timestamps,
total_stats);

if (jsonhandler != NULL && !m_cpu_stats.empty()) {
jsonhandler->open_nesting("CPU Stats");
for (size_t i = 0; i < m_cpu_stats.size(); i++) {
const per_second_cpu_stats &cs = m_cpu_stats[i];
char sec_str[32];
snprintf(sec_str, sizeof(sec_str), "%u", cs.m_second);
jsonhandler->open_nesting(sec_str);
jsonhandler->write_obj("Main Thread", "%.2f", cs.m_main_thread_cpu_pct);
for (size_t t = 0; t < cs.m_thread_cpu_pct.size(); t++) {
char thread_name[32];
snprintf(thread_name, sizeof(thread_name), "Thread %zu", t);
jsonhandler->write_obj(thread_name, "%.2f", cs.m_thread_cpu_pct[t]);
}
jsonhandler->close_nesting();
}
jsonhandler->close_nesting();
}
}

void run_stats::print_histogram(FILE *out, json_handler *jsonhandler, arbitrary_command_list &command_list,
Expand Down
3 changes: 3 additions & 0 deletions run_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ class run_stats
void set_interrupted(bool interrupted) { m_interrupted = interrupted; }
bool get_interrupted() const { return m_interrupted; }

std::vector<per_second_cpu_stats> m_cpu_stats;
void set_cpu_stats(std::vector<per_second_cpu_stats> cpu_stats);

void update_get_op(struct timeval *ts, unsigned int bytes_rx, unsigned int bytes_tx, unsigned int latency,
unsigned int hits, unsigned int misses);
void update_set_op(struct timeval *ts, unsigned int bytes_rx, unsigned int bytes_tx, unsigned int latency);
Expand Down
6 changes: 6 additions & 0 deletions run_stats_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,12 @@ class one_second_stats
void merge(const one_second_stats &other);
};

struct per_second_cpu_stats {
unsigned int m_second;
double m_main_thread_cpu_pct;
std::vector<double> m_thread_cpu_pct;
};

class totals_cmd
{
public:
Expand Down
Loading
Loading