diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 80ff62f..e89a5ee 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -55,3 +55,30 @@ jobs: sleep 2 printf '*1\r\n$4\r\nPING\r\n' | nc 127.0.0.1 6391 | head -n 1 | grep PONG docker rm -f pomai-cache-ci + + + soak-short: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - run: cmake -S . -B build -DCMAKE_BUILD_TYPE=Release + - run: cmake --build build -j + - run: ./build/pomai_cache_server --port 6390 & + - run: sleep 2 + - run: python3 tests/soak/pomai_cache_soak.py --port 6390 --duration 120 + - run: pkill pomai_cache_server || true + + replay-smoke: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - run: cmake -S . -B build -DCMAKE_BUILD_TYPE=Release + - run: cmake --build build -j + - run: ./build/pomai_cache_server --port 6392 & + - run: sleep 2 + - run: ./build/pomai_cache_replay --trace traces/mini_hotset.trace --port 6392 --json out/replay_ci.json --csv out/replay_ci.csv + - uses: actions/upload-artifact@v4 + with: + name: replay-artifacts + path: out/replay_ci.* + - run: pkill pomai_cache_server || true diff --git a/.github/workflows/perf-nightly.yml b/.github/workflows/perf-nightly.yml new file mode 100644 index 0000000..9ad761b --- /dev/null +++ b/.github/workflows/perf-nightly.yml @@ -0,0 +1,22 @@ +name: perf-nightly +on: + workflow_dispatch: + schedule: + - cron: "0 3 * * *" + +jobs: + replay-perf: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - run: cmake -S . -B build -DCMAKE_BUILD_TYPE=Release + - run: cmake --build build -j + - run: ./build/pomai_cache_server --port 6393 & + - run: sleep 2 + - run: ./build/pomai_cache_replay --trace traces/mini_hotset.trace --port 6393 --json out/perf_mini.json --csv out/perf_mini.csv + - run: ./build/pomai_cache_replay --trace traces/ttlheavy.trace --port 6393 --json out/perf_ttl.json --csv out/perf_ttl.csv + - uses: actions/upload-artifact@v4 + with: + name: perf-nightly + path: out/perf_* + - run: pkill pomai_cache_server || true diff --git a/CMakeLists.txt b/CMakeLists.txt index 7f100af..4419b54 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -23,6 +23,9 @@ if(NOT WIN32) add_executable(pomai_cache_netbench bench/pomai_cache_netbench.cpp) target_link_libraries(pomai_cache_netbench PRIVATE pomai_cache_core) + + add_executable(pomai_cache_replay bench/pomai_cache_replay.cpp) + target_link_libraries(pomai_cache_replay PRIVATE pomai_cache_core) endif() add_executable(pomai_cache_bench bench/pomai_cache_bench.cpp) @@ -41,6 +44,14 @@ if(BUILD_TESTING) add_test(NAME test_engine COMMAND test_engine) add_test(NAME test_resp COMMAND test_resp) + add_executable(test_chaos tests/test_chaos.cpp) + target_link_libraries(test_chaos PRIVATE pomai_cache_core mini_catch_main) + add_test(NAME test_chaos COMMAND test_chaos) + + add_executable(test_canary tests/test_canary.cpp) + target_link_libraries(test_canary PRIVATE pomai_cache_core mini_catch_main) + add_test(NAME test_canary COMMAND test_canary) + if(NOT WIN32) add_executable(test_integration tests/test_integration.cpp) target_link_libraries(test_integration PRIVATE pomai_cache_core mini_catch_main) diff --git a/README.md b/README.md index cfc1867..f24c25b 100644 --- a/README.md +++ b/README.md @@ -109,3 +109,26 @@ Bench reports per workload and policy: - max concurrent connections enforced - slow-client protection via bounded output buffer - bounded per-tick TTL cleanup + + +## Trace replay example + +```bash +./build/pomai_cache_replay --trace traces/mini_hotset.trace --port 6379 --scale 2.0 --json out/replay_summary.json --csv out/replay_timeseries.csv +``` + +## Canary rollout example + +```bash +redis-cli -p 6379 CONFIG SET POLICY.CANARY_PCT 10 +redis-cli -p 6379 CONFIG SET PARAMS config/policy_params.json +redis-cli -p 6379 INFO +``` + +## Slowlog and diagnostics + +```bash +redis-cli -p 6379 SLOWLOG GET 10 +redis-cli -p 6379 DEBUG DUMPSTATS /tmp/pomai_dump.txt +redis-cli -p 6379 TRACE STREAM +``` diff --git a/bench/pomai_cache_replay.cpp b/bench/pomai_cache_replay.cpp new file mode 100644 index 0000000..f6ec1db --- /dev/null +++ b/bench/pomai_cache_replay.cpp @@ -0,0 +1,160 @@ +#include "pomai_cache/resp.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace { +struct TraceOp { std::uint64_t ts_ms{0}; std::string op; std::size_t key_hash{0}; std::size_t value_size{0}; }; + +bool extract_u64(const std::string &line, const std::string &key, std::uint64_t &out) { + std::regex re("\\\"" + key + "\\\"\\s*:\\s*([0-9]+)"); + std::smatch m; + if (!std::regex_search(line, m, re)) return false; + out = std::stoull(m[1].str()); + return true; +} + +bool extract_str(const std::string &line, const std::string &key, std::string &out) { + std::regex re("\\\"" + key + "\\\"\\s*:\\s*\\\"([^\\\"]*)\\\""); + std::smatch m; + if (!std::regex_search(line, m, re)) return false; + out = m[1].str(); + return true; +} + +std::string mkcmd(const TraceOp &op) { + std::string key = "k" + std::to_string(op.key_hash % 1000); + if (op.op == "GET") return "*2\r\n$3\r\nGET\r\n$" + std::to_string(key.size()) + "\r\n" + key + "\r\n"; + if (op.op == "DEL") return "*2\r\n$3\r\nDEL\r\n$" + std::to_string(key.size()) + "\r\n" + key + "\r\n"; + std::string value(op.value_size > 0 ? op.value_size : 16, 'x'); + return "*3\r\n$3\r\nSET\r\n$" + std::to_string(key.size()) + "\r\n" + key + "\r\n$" + std::to_string(value.size()) + "\r\n" + value + "\r\n"; +} + +std::string percentile(const std::vector &v, double p) { + if (v.empty()) return "0"; + std::vector s = v; + std::sort(s.begin(), s.end()); + std::size_t idx = static_cast(std::floor((s.size() - 1) * p)); + std::ostringstream os; + os << s[idx]; + return os.str(); +} + +std::string send_cmd(int fd, const std::string &cmd) { + send(fd, cmd.data(), cmd.size(), 0); + char buf[4096]; + ssize_t n = recv(fd, buf, sizeof(buf), 0); + if (n <= 0) return {}; + return std::string(buf, static_cast(n)); +} +} // namespace + +int main(int argc, char **argv) { + std::string trace_path = "traces/mini_hotset.trace"; + std::string out_json = "out/replay_summary.json"; + std::string out_csv = "out/replay_timeseries.csv"; + int port = 6379; + double scale = 1.0; + for (int i = 1; i < argc; ++i) { + std::string a = argv[i]; + if (a == "--trace" && i + 1 < argc) trace_path = argv[++i]; + else if (a == "--port" && i + 1 < argc) port = std::stoi(argv[++i]); + else if (a == "--scale" && i + 1 < argc) scale = std::stod(argv[++i]); + else if (a == "--json" && i + 1 < argc) out_json = argv[++i]; + else if (a == "--csv" && i + 1 < argc) out_csv = argv[++i]; + } + + std::ifstream in(trace_path); + if (!in.is_open()) { + std::cerr << "trace file not found\n"; + return 1; + } + std::vector ops; + for (std::string line; std::getline(in, line);) { + TraceOp op; + std::uint64_t v = 0; + extract_u64(line, "ts_ms", op.ts_ms); + extract_u64(line, "key_hash", v); op.key_hash = static_cast(v); + extract_u64(line, "value_size", v); op.value_size = static_cast(v); + extract_str(line, "op", op.op); + if (!op.op.empty()) ops.push_back(op); + } + + int fd = socket(AF_INET, SOCK_STREAM, 0); + sockaddr_in addr{}; + addr.sin_family = AF_INET; + addr.sin_port = htons(port); + inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr); + if (connect(fd, reinterpret_cast(&addr), sizeof(addr)) < 0) { + std::cerr << "connect failed\n"; + return 2; + } + + auto before = send_cmd(fd, "*1\r\n$4\r\nINFO\r\n"); + std::vector lats; + std::vector ts_rows; + std::uint64_t hits = 0; + std::uint64_t gets = 0; + const auto replay_start = std::chrono::steady_clock::now(); + std::uint64_t base_ts = ops.empty() ? 0 : ops.front().ts_ms; + + for (std::size_t i = 0; i < ops.size(); ++i) { + if (i > 0 && scale > 0.0) { + auto target_ms = static_cast((ops[i].ts_ms - base_ts) / scale); + auto now_ms = static_cast(std::chrono::duration_cast(std::chrono::steady_clock::now() - replay_start).count()); + if (target_ms > now_ms) std::this_thread::sleep_for(std::chrono::milliseconds(target_ms - now_ms)); + } + auto st = std::chrono::steady_clock::now(); + auto resp = send_cmd(fd, mkcmd(ops[i])); + auto en = std::chrono::steady_clock::now(); + double us = std::chrono::duration(en - st).count(); + lats.push_back(us); + if (ops[i].op == "GET") { + ++gets; + if (resp.rfind("$-1", 0) != 0) ++hits; + } + if (i % 50 == 0) { + ts_rows.push_back(std::to_string(i) + "," + std::to_string(us)); + } + } + auto after = send_cmd(fd, "*1\r\n$4\r\nINFO\r\n"); + close(fd); + + double seconds = std::chrono::duration(std::chrono::steady_clock::now() - replay_start).count(); + double ops_s = seconds > 0 ? static_cast(ops.size()) / seconds : 0.0; + double hit_rate = gets > 0 ? static_cast(hits) / static_cast(gets) : 0.0; + + std::ofstream jout(out_json); + jout << "{\n"; + jout << " \"trace\": \"" << trace_path << "\",\n"; + jout << " \"ops\": " << ops.size() << ",\n"; + jout << " \"ops_per_sec\": " << ops_s << ",\n"; + jout << " \"p50_us\": " << percentile(lats, 0.50) << ",\n"; + jout << " \"p95_us\": " << percentile(lats, 0.95) << ",\n"; + jout << " \"p99_us\": " << percentile(lats, 0.99) << ",\n"; + jout << " \"p999_us\": " << percentile(lats, 0.999) << ",\n"; + jout << " \"hit_rate\": " << hit_rate << "\n"; + jout << "}\n"; + + std::ofstream csv(out_csv); + csv << "op_index,latency_us\n"; + for (const auto &r : ts_rows) csv << r << "\n"; + + std::cout << "ops/s=" << ops_s << " p50=" << percentile(lats, 0.50) << " p95=" << percentile(lats, 0.95) + << " p99=" << percentile(lats, 0.99) << " p999=" << percentile(lats, 0.999) << " hit_rate=" << hit_rate << "\n"; + std::cout << "INFO_BEFORE\n" << before << "\nINFO_AFTER\n" << after << "\n"; + return 0; +} diff --git a/docs/CANARY_ROLLOUT.md b/docs/CANARY_ROLLOUT.md new file mode 100644 index 0000000..4acd64f --- /dev/null +++ b/docs/CANARY_ROLLOUT.md @@ -0,0 +1,7 @@ +# Canary rollout +1. Load existing params as control (LKG persists automatically). +2. Enable canary split: `CONFIG SET POLICY.CANARY_PCT 10`. +3. Reload params file with candidate values via `CONFIG SET PARAMS `. + +Server tracks control vs candidate hit-rate and p99 latency and auto-rolls back to LKG if guardrails are violated. +INFO includes canary fields and last rollback event. diff --git a/docs/PERF_TOOLING.md b/docs/PERF_TOOLING.md new file mode 100644 index 0000000..1ad9a8c --- /dev/null +++ b/docs/PERF_TOOLING.md @@ -0,0 +1,8 @@ +# Perf tooling +- `scripts/perf/perf_record.sh ` +- `scripts/perf/flamegraph.sh` +- `scripts/perf/tsan_build.sh` +- `scripts/perf/asan_build.sh` +- `scripts/perf/heap_profile.md` + +Nightly perf workflow replays reference traces and uploads summary artifacts. diff --git a/docs/SOAK_CHAOS.md b/docs/SOAK_CHAOS.md new file mode 100644 index 0000000..c194f01 --- /dev/null +++ b/docs/SOAK_CHAOS.md @@ -0,0 +1,5 @@ +# Soak + chaos +Run soak: +`python3 tests/soak/pomai_cache_soak.py --port 6379 --duration 180` + +Chaos coverage uses `tests/test_chaos.cpp` for churn with mixed set/get/del/expire ensuring no memory overflow. diff --git a/docs/TRACING.md b/docs/TRACING.md new file mode 100644 index 0000000..149b0d9 --- /dev/null +++ b/docs/TRACING.md @@ -0,0 +1,10 @@ +# Tracing +Tracing is off by default. + +Enable: +- `CONFIG SET TRACE.PATH /tmp/pomai.trace.jsonl` +- `CONFIG SET TRACE.SAMPLE_RATE 0.1` +- `CONFIG SET TRACE.ENABLED yes` + +Trace lines are JSONL with hashed keys, op type, value size, ttl class, owner, hit/miss and latency bucket. +Use `TRACE STREAM` for capped in-memory recent trace lines. diff --git a/include/pomai_cache/engine.hpp b/include/pomai_cache/engine.hpp index cce465a..81de5fc 100644 --- a/include/pomai_cache/engine.hpp +++ b/include/pomai_cache/engine.hpp @@ -4,6 +4,7 @@ #include #include +#include #include #include #include @@ -46,6 +47,10 @@ class Engine { std::string info() const; bool reload_params(const std::string &path, std::string *err = nullptr); + void set_canary_pct(std::uint64_t pct); + std::uint64_t canary_pct() const { return canary_pct_; } + bool rollback_to_lkg(std::string *err = nullptr); + bool dump_stats(const std::string &path, std::string *err = nullptr) const; const EngineStats &stats() const { return stats_; } std::size_t memory_used() const { return memory_used_; } @@ -70,6 +75,15 @@ class Engine { void evict_until_fit(); double owner_miss_cost(const std::string &owner) const; std::size_t bucket_for(std::size_t size) const; + bool is_canary_key(const std::string &key) const; + void maybe_evaluate_canary(); + static std::uint64_t p99_from_samples(const std::deque &samples); + + struct CohortStats { + std::uint64_t gets{0}; + std::uint64_t hits{0}; + std::deque latency_us; + }; EngineConfig cfg_; std::unique_ptr policy_; @@ -84,6 +98,18 @@ class Engine { std::size_t memory_used_{0}; std::size_t bucket_used_{0}; std::size_t expiration_backlog_{0}; + std::uint64_t canary_pct_{0}; + bool canary_active_{false}; + PolicyParams control_params_{}; + PolicyParams canary_params_{}; + std::string lkg_path_{".pomai_lkg_params.json"}; + CohortStats control_stats_; + CohortStats canary_stats_; + TimePoint canary_start_{Clock::now()}; + TimePoint last_guardrail_eval_{Clock::now()}; + std::uint64_t baseline_evictions_{0}; + std::uint64_t rollback_events_{0}; + std::string last_canary_event_{"none"}; }; std::unique_ptr make_policy_by_name(const std::string &mode); diff --git a/scripts/perf/asan_build.sh b/scripts/perf/asan_build.sh new file mode 100755 index 0000000..7f37962 --- /dev/null +++ b/scripts/perf/asan_build.sh @@ -0,0 +1,4 @@ +#!/usr/bin/env bash +set -euo pipefail +cmake -S . -B build-asan -DCMAKE_BUILD_TYPE=Debug -DCMAKE_CXX_FLAGS='-fsanitize=address,undefined -fno-omit-frame-pointer' -DCMAKE_EXE_LINKER_FLAGS='-fsanitize=address,undefined' +cmake --build build-asan -j diff --git a/scripts/perf/flamegraph.sh b/scripts/perf/flamegraph.sh new file mode 100755 index 0000000..73dbb1d --- /dev/null +++ b/scripts/perf/flamegraph.sh @@ -0,0 +1,3 @@ +#!/usr/bin/env bash +set -euo pipefail +perf script | stackcollapse-perf.pl | flamegraph.pl > flamegraph.svg diff --git a/scripts/perf/heap_profile.md b/scripts/perf/heap_profile.md new file mode 100644 index 0000000..df237c4 --- /dev/null +++ b/scripts/perf/heap_profile.md @@ -0,0 +1,7 @@ +# Heap profiling +Use jemalloc or mimalloc profiling if available. + +Example (jemalloc): +``` +MALLOC_CONF=prof:true,lg_prof_sample:19,prof_prefix:jeprof ./build/pomai_cache_server +``` diff --git a/scripts/perf/perf_record.sh b/scripts/perf/perf_record.sh new file mode 100755 index 0000000..0c08a9e --- /dev/null +++ b/scripts/perf/perf_record.sh @@ -0,0 +1,3 @@ +#!/usr/bin/env bash +set -euo pipefail +perf record -F 99 -g -- "$@" diff --git a/scripts/perf/tsan_build.sh b/scripts/perf/tsan_build.sh new file mode 100755 index 0000000..c7e67ab --- /dev/null +++ b/scripts/perf/tsan_build.sh @@ -0,0 +1,4 @@ +#!/usr/bin/env bash +set -euo pipefail +cmake -S . -B build-tsan -DCMAKE_BUILD_TYPE=Debug -DCMAKE_CXX_FLAGS='-fsanitize=thread -fno-omit-frame-pointer' -DCMAKE_EXE_LINKER_FLAGS='-fsanitize=thread' +cmake --build build-tsan -j diff --git a/src/engine/engine.cpp b/src/engine/engine.cpp index 307a6c6..8f5ad9a 100644 --- a/src/engine/engine.cpp +++ b/src/engine/engine.cpp @@ -3,6 +3,8 @@ #include #include #include +#include +#include #include #include @@ -41,11 +43,13 @@ Engine::Engine(EngineConfig cfg, std::unique_ptr policy) : cfg_(cfg), policy_(std::move(policy)) { owner_miss_cost_default_["default"] = 1.0; owner_miss_cost_default_["premium"] = 2.0; + control_params_ = policy_->params(); } bool Engine::set(const std::string &key, const std::vector &value, std::optional ttl_ms, std::string owner, std::string *err) { + const auto start = Clock::now(); tick(); if (key.empty() || key.size() > cfg_.max_key_len) { if (err) @@ -82,12 +86,19 @@ bool Engine::set(const std::string &key, const std::vector &value, } CandidateView cv{key, &candidate, owner_miss_cost(candidate.owner)}; + const auto original = policy_->params(); + if (is_canary_key(key) && canary_active_) + policy_->set_params(canary_params_); + else + policy_->set_params(control_params_); if (!policy_->should_admit(cv)) { + policy_->set_params(original); ++stats_.admissions_rejected; if (err) *err = "admission rejected"; return false; } + policy_->set_params(original); if (entries_.contains(key)) { owner_usage_[entries_[key].owner] -= entries_[key].size_bytes; @@ -108,19 +119,36 @@ bool Engine::set(const std::string &key, const std::vector &value, } evict_until_fit(); + const auto dur = std::chrono::duration_cast(Clock::now() - start).count(); + auto &cohort = is_canary_key(key) ? canary_stats_ : control_stats_; + cohort.latency_us.push_back(static_cast(std::max(0, dur))); + if (cohort.latency_us.size() > 2048) + cohort.latency_us.pop_front(); return true; } std::optional> Engine::get(const std::string &key) { + const auto start = Clock::now(); tick(); + auto &cohort = is_canary_key(key) ? canary_stats_ : control_stats_; + ++cohort.gets; if (!exists_and_not_expired(key)) { ++stats_.misses; + const auto dur = std::chrono::duration_cast(Clock::now() - start).count(); + cohort.latency_us.push_back(static_cast(std::max(0, dur))); + if (cohort.latency_us.size() > 2048) + cohort.latency_us.pop_front(); return std::nullopt; } auto &e = entries_[key]; e.last_access = Clock::now(); ++e.hit_count; ++stats_.hits; + ++cohort.hits; + const auto dur = std::chrono::duration_cast(Clock::now() - start).count(); + cohort.latency_us.push_back(static_cast(std::max(0, dur))); + if (cohort.latency_us.size() > 2048) + cohort.latency_us.pop_front(); policy_->on_access(key, e); return e.value; } @@ -200,6 +228,7 @@ void Engine::tick() { ++expiration_backlog_; snapshot.pop(); } + maybe_evaluate_canary(); } std::string Engine::info() const { @@ -216,6 +245,16 @@ std::string Engine::info() const { os << "evictions:" << stats_.evictions << "\n"; os << "expirations:" << stats_.expirations << "\n"; os << "admissions_rejected:" << stats_.admissions_rejected << "\n"; + os << "canary_enabled:" << (canary_active_ ? 1 : 0) << "\n"; + os << "canary_pct:" << canary_pct_ << "\n"; + const double control_hr = control_stats_.gets == 0 ? 0.0 : static_cast(control_stats_.hits) / static_cast(control_stats_.gets); + const double canary_hr = canary_stats_.gets == 0 ? 0.0 : static_cast(canary_stats_.hits) / static_cast(canary_stats_.gets); + os << "canary_control_hit_rate:" << control_hr << "\n"; + os << "canary_candidate_hit_rate:" << canary_hr << "\n"; + os << "canary_control_p99_us:" << p99_from_samples(control_stats_.latency_us) << "\n"; + os << "canary_candidate_p99_us:" << p99_from_samples(canary_stats_.latency_us) << "\n"; + os << "canary_rollback_events:" << rollback_events_ << "\n"; + os << "canary_last_event:" << last_canary_event_ << "\n"; std::vector> counts; counts.reserve(entries_.size()); @@ -286,12 +325,27 @@ bool Engine::reload_params(const std::string &path, std::string *err) { if (extract_string(text, "version", s)) p.version = s; - policy_->set_params(p); + if (canary_pct_ > 0) { + canary_params_ = p; + canary_active_ = true; + canary_start_ = Clock::now(); + baseline_evictions_ = stats_.evictions; + canary_stats_ = {}; + control_stats_ = {}; + last_canary_event_ = "canary_started:" + p.version; + } else { + control_params_ = p; + policy_->set_params(control_params_); + last_canary_event_ = "params_loaded:" + p.version; + std::ofstream out(lkg_path_); + if (out.is_open()) + out << text; + } return true; } void Engine::set_policy(std::unique_ptr policy) { - PolicyParams p = policy_->params(); + PolicyParams p = control_params_; policy_ = std::move(policy); policy_->set_params(p); } @@ -363,4 +417,105 @@ double Engine::memory_overhead_ratio() const { return static_cast(bucket_used_) / static_cast(memory_used_); } +void Engine::set_canary_pct(std::uint64_t pct) { + canary_pct_ = std::min(100, pct); + if (canary_pct_ == 0) + canary_active_ = false; +} + +bool Engine::rollback_to_lkg(std::string *err) { + std::ifstream in(lkg_path_); + if (!in.is_open()) { + if (err) + *err = "lkg file not found"; + return false; + } + std::stringstream ss; + ss << in.rdbuf(); + const std::string text = ss.str(); + PolicyParams p = control_params_; + double d; + std::uint64_t u; + std::string s; + if (extract_double(text, "w_miss", d)) p.w_miss = d; + if (extract_double(text, "w_reuse", d)) p.w_reuse = d; + if (extract_double(text, "w_mem", d)) p.w_mem = d; + if (extract_double(text, "w_risk", d)) p.w_risk = d; + if (extract_double(text, "admit_threshold", d)) p.admit_threshold = d; + if (extract_double(text, "evict_pressure", d)) p.evict_pressure = d; + if (extract_u64(text, "max_evictions_per_second", u)) p.max_evictions_per_second = u; + if (extract_u64(text, "max_admissions_per_second", u)) p.max_admissions_per_second = u; + if (extract_u64(text, "owner_cap_bytes", u)) p.owner_cap_bytes = static_cast(u); + if (extract_string(text, "version", s)) p.version = s; + control_params_ = p; + policy_->set_params(control_params_); + canary_active_ = false; + ++rollback_events_; + last_canary_event_ = "rollback_to_lkg:" + p.version; + return true; +} + +bool Engine::dump_stats(const std::string &path, std::string *err) const { + std::ofstream out(path); + if (!out.is_open()) { + if (err) + *err = "unable to open dump file"; + return false; + } + out << "config_hash:na\n"; + out << "policy_params_version:" << control_params_.version << "\n"; + out << "memory_used_bytes:" << memory_used_ << "\n"; + out << "memory_limit_bytes:" << cfg_.memory_limit_bytes << "\n"; + out << "owners:"; + std::vector> owners(owner_usage_.begin(), owner_usage_.end()); + std::sort(owners.begin(), owners.end(), [](const auto &a, const auto &b){ return a.first < b.first;}); + for (std::size_t i = 0; i < std::min(5, owners.size()); ++i) { + if (i) out << ","; + out << owners[i].first << ":" << owners[i].second; + } + out << "\n"; + out << "eviction_reasons:memory_pressure=" << stats_.evictions << ",expiry=" << stats_.expirations << "\n"; + return true; +} + +bool Engine::is_canary_key(const std::string &key) const { + if (!canary_active_ || canary_pct_ == 0) + return false; + const auto h = std::hash{}(key) % 100; + return h < canary_pct_; +} + +std::uint64_t Engine::p99_from_samples(const std::deque &samples) { + if (samples.empty()) + return 0; + std::vector sorted(samples.begin(), samples.end()); + std::sort(sorted.begin(), sorted.end()); + const std::size_t idx = (sorted.size() - 1) * 99 / 100; + return sorted[idx]; +} + +void Engine::maybe_evaluate_canary() { + if (!canary_active_) + return; + const auto now = Clock::now(); + if (std::chrono::duration_cast(now - last_guardrail_eval_).count() < 1) + return; + last_guardrail_eval_ = now; + if (std::chrono::duration_cast(now - canary_start_).count() < 5) + return; + const double control_hr = control_stats_.gets == 0 ? 1.0 : static_cast(control_stats_.hits) / static_cast(control_stats_.gets); + const double canary_hr = canary_stats_.gets == 0 ? control_hr : static_cast(canary_stats_.hits) / static_cast(canary_stats_.gets); + const auto control_p99 = static_cast(p99_from_samples(control_stats_.latency_us)); + const auto canary_p99 = static_cast(p99_from_samples(canary_stats_.latency_us)); + const auto evictions_delta = stats_.evictions - baseline_evictions_; + const bool latency_bad = control_p99 > 0 && canary_p99 > control_p99 * 1.5; + const bool hit_bad = canary_hr + 0.05 < control_hr; + const bool eviction_bad = evictions_delta > 1000; + if (latency_bad || hit_bad || eviction_bad) { + std::string ignored; + rollback_to_lkg(&ignored); + last_canary_event_ = "auto_rollback_guardrail"; + } +} + } // namespace pomai_cache diff --git a/src/server/server_main.cpp b/src/server/server_main.cpp index 47b4734..307983d 100644 --- a/src/server/server_main.cpp +++ b/src/server/server_main.cpp @@ -3,11 +3,15 @@ #include #include +#include #include #include +#include +#include #include #include #include +#include #include #include #include @@ -34,10 +38,21 @@ bool parse_u64(const std::string &s, std::uint64_t &out) { } } +std::string latency_bucket(std::uint64_t us) { + if (us < 100) + return "lt100us"; + if (us < 500) + return "lt500us"; + if (us < 1000) + return "lt1ms"; + if (us < 5000) + return "lt5ms"; + return "ge5ms"; +} + struct ClientState { pomai_cache::RespParser parser; std::string out; - std::size_t bytes_pending{0}; }; struct ServerStats { @@ -46,6 +61,19 @@ struct ServerStats { std::uint64_t request_count{0}; }; +struct TraceConfig { + bool enabled{false}; + std::string path{"trace/pomai_cache.trace.jsonl"}; + double sample_rate{0.0}; + std::uint64_t dropped{0}; +}; + +struct SlowEntry { + std::string cmd; + std::uint64_t latency_us{0}; + std::uint64_t timestamp_ms{0}; +}; + } // namespace int main(int argc, char **argv) { @@ -70,11 +98,20 @@ int main(int argc, char **argv) { } auto policy = pomai_cache::make_policy_by_name(policy_mode); - pomai_cache::Engine engine({memory_limit, 256, 1024 * 1024, 128}, - std::move(policy)); + pomai_cache::Engine engine({memory_limit, 256, 1024 * 1024, 128}, std::move(policy)); std::string reload_err; engine.reload_params(params_path, &reload_err); + TraceConfig trace_cfg; + std::ofstream trace_stream; + std::uint64_t rng_seed = 424242; + std::mt19937_64 rng(rng_seed); + std::uniform_real_distribution sample_dist(0.0, 1.0); + std::deque trace_ring; + std::deque slowlog; + constexpr std::size_t max_slowlog = 256; + constexpr std::size_t max_trace_ring = 512; + int server_fd = socket(AF_INET, SOCK_STREAM, 0); int one = 1; setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)); @@ -107,8 +144,7 @@ int main(int argc, char **argv) { FD_SET(fd, &readfds); if (!st.out.empty()) FD_SET(fd, &writefds); - if (fd > maxfd) - maxfd = fd; + maxfd = std::max(maxfd, fd); } timeval tv{0, 20000}; int n = select(maxfd + 1, &readfds, &writefds, nullptr, &tv); @@ -147,6 +183,11 @@ int main(int argc, char **argv) { break; ++processed; ++stats.request_count; + const auto op_start = pomai_cache::Clock::now(); + std::string first_key; + std::string op_name = cmd->empty() ? "UNKNOWN" : upper((*cmd)[0]); + bool hit = false; + if (cmd->size() == 1 && cmd->front() == "__MALFORMED__") { ++stats.rejected_requests; st.out += pomai_cache::resp_error("malformed RESP"); @@ -158,15 +199,14 @@ int main(int argc, char **argv) { continue; } - const auto c = upper((*cmd)[0]); - if (c == "PING") + if (op_name == "PING") st.out += pomai_cache::resp_simple("PONG"); - else if (c == "SET") { + else if (op_name == "SET") { if (cmd->size() < 3) { ++stats.rejected_requests; - st.out += pomai_cache::resp_error( - "SET key value [EX sec|PX ms] [OWNER name]"); + st.out += pomai_cache::resp_error("SET key value [EX sec|PX ms] [OWNER name]"); } else { + first_key = (*cmd)[1]; std::optional ttl_ms; std::string owner = "default"; bool valid = true; @@ -179,8 +219,9 @@ int main(int argc, char **argv) { } else if (opt == "PX") { valid = parse_u64((*cmd)[i + 1], ttl_tmp); ttl_ms = ttl_tmp; - } else if (opt == "OWNER") + } else if (opt == "OWNER") { owner = (*cmd)[i + 1]; + } if (!valid) break; } @@ -188,92 +229,126 @@ int main(int argc, char **argv) { ++stats.rejected_requests; st.out += pomai_cache::resp_error("invalid numeric argument"); } else { - std::vector val((*cmd)[2].begin(), - (*cmd)[2].end()); + std::vector val((*cmd)[2].begin(), (*cmd)[2].end()); std::string err; - if (engine.set((*cmd)[1], val, ttl_ms, owner, &err)) + if (engine.set((*cmd)[1], val, ttl_ms, owner, &err)) { st.out += pomai_cache::resp_simple("OK"); - else { + hit = true; + } else { ++stats.rejected_requests; st.out += pomai_cache::resp_error(err); } } } - } else if (c == "GET") { + } else if (op_name == "GET") { if (cmd->size() != 2) { ++stats.rejected_requests; st.out += pomai_cache::resp_error("GET key"); } else { + first_key = (*cmd)[1]; auto v = engine.get((*cmd)[1]); + hit = v.has_value(); if (!v) st.out += pomai_cache::resp_null(); else - st.out += - pomai_cache::resp_bulk(std::string(v->begin(), v->end())); + st.out += pomai_cache::resp_bulk(std::string(v->begin(), v->end())); } - } else if (c == "MGET") { + } else if (op_name == "MGET") { if (cmd->size() < 2) { ++stats.rejected_requests; st.out += pomai_cache::resp_error("MGET key [key...]"); } else { + first_key = (*cmd)[1]; std::vector keys(cmd->begin() + 1, cmd->end()); auto vals = engine.mget(keys); std::vector arr; arr.reserve(vals.size()); - for (auto &v : vals) - arr.push_back(v ? pomai_cache::resp_bulk( - std::string(v->begin(), v->end())) - : pomai_cache::resp_null()); + for (auto &v : vals) { + if (v) + hit = true; + arr.push_back(v ? pomai_cache::resp_bulk(std::string(v->begin(), v->end())) : pomai_cache::resp_null()); + } st.out += pomai_cache::resp_array(arr); } - } else if (c == "DEL") { + } else if (op_name == "DEL") { if (cmd->size() < 2) { ++stats.rejected_requests; st.out += pomai_cache::resp_error("DEL key [key...]"); } else { + first_key = (*cmd)[1]; std::vector keys(cmd->begin() + 1, cmd->end()); st.out += pomai_cache::resp_integer(engine.del(keys)); } - } else if (c == "EXPIRE") { + } else if (op_name == "EXPIRE") { if (cmd->size() != 3) { ++stats.rejected_requests; st.out += pomai_cache::resp_error("EXPIRE key seconds"); } else { + first_key = (*cmd)[1]; std::uint64_t ttl_s = 0; if (!parse_u64((*cmd)[2], ttl_s)) { ++stats.rejected_requests; st.out += pomai_cache::resp_error("invalid numeric argument"); } else { - st.out += pomai_cache::resp_integer( - engine.expire((*cmd)[1], ttl_s) ? 1 : 0); + hit = engine.expire((*cmd)[1], ttl_s); + st.out += pomai_cache::resp_integer(hit ? 1 : 0); } } - } else if (c == "TTL") { + } else if (op_name == "TTL") { if (cmd->size() != 2) { ++stats.rejected_requests; st.out += pomai_cache::resp_error("TTL key"); } else { + first_key = (*cmd)[1]; auto t = engine.ttl((*cmd)[1]); + hit = t.has_value(); st.out += pomai_cache::resp_integer(t ? *t : -2); } - } else if (c == "INFO") { + } else if (op_name == "INFO") { std::ostringstream info; info << engine.info(); info << "connected_clients:" << clients.size() << "\n"; info << "rejected_requests:" << stats.rejected_requests << "\n"; - const double avg_bytes = - stats.request_count == 0 - ? 0.0 - : static_cast(stats.total_request_bytes) / - static_cast(stats.request_count); + const double avg_bytes = stats.request_count == 0 ? 0.0 : static_cast(stats.total_request_bytes) / static_cast(stats.request_count); info << "avg_request_bytes:" << avg_bytes << "\n"; + info << "trace_enabled:" << (trace_cfg.enabled ? 1 : 0) << "\n"; + info << "trace_sample_rate:" << trace_cfg.sample_rate << "\n"; + info << "trace_dropped:" << trace_cfg.dropped << "\n"; st.out += pomai_cache::resp_bulk(info.str()); - } else if (c == "CONFIG") { + } else if (op_name == "SLOWLOG") { + if (cmd->size() >= 2 && upper((*cmd)[1]) == "RESET") { + slowlog.clear(); + st.out += pomai_cache::resp_simple("OK"); + } else if (cmd->size() >= 2 && upper((*cmd)[1]) == "GET") { + std::uint64_t nentries = 16; + if (cmd->size() == 3) + parse_u64((*cmd)[2], nentries); + nentries = std::min(nentries, max_slowlog); + std::vector arr; + for (std::size_t i = 0; i < std::min(nentries, slowlog.size()); ++i) { + const auto &e = slowlog[slowlog.size() - 1 - i]; + std::vector item{pomai_cache::resp_integer(static_cast(e.timestamp_ms)), pomai_cache::resp_integer(static_cast(e.latency_us)), pomai_cache::resp_bulk(e.cmd)}; + arr.push_back(pomai_cache::resp_array(item)); + } + st.out += pomai_cache::resp_array(arr); + } else { + st.out += pomai_cache::resp_error("SLOWLOG GET [N]|RESET"); + } + } else if (op_name == "TRACE" && cmd->size() == 2 && upper((*cmd)[1]) == "STREAM") { + std::vector arr; + for (const auto &line : trace_ring) + arr.push_back(pomai_cache::resp_bulk(line)); + st.out += pomai_cache::resp_array(arr); + } else if (op_name == "DEBUG" && cmd->size() == 3 && upper((*cmd)[1]) == "DUMPSTATS") { + std::string err; + if (engine.dump_stats((*cmd)[2], &err)) + st.out += pomai_cache::resp_simple("OK"); + else + st.out += pomai_cache::resp_error(err); + } else if (op_name == "CONFIG") { if (cmd->size() >= 2 && upper((*cmd)[1]) == "GET") { if (cmd->size() == 3 && upper((*cmd)[2]) == "POLICY") { - std::vector arr{ - pomai_cache::resp_bulk("policy"), - pomai_cache::resp_bulk(engine.policy().name())}; + std::vector arr{pomai_cache::resp_bulk("policy"), pomai_cache::resp_bulk(engine.policy().name())}; st.out += pomai_cache::resp_array(arr); } else { ++stats.rejected_requests; @@ -291,6 +366,34 @@ int main(int argc, char **argv) { } else { st.out += pomai_cache::resp_simple("OK"); } + } else if (cmd->size() == 4 && upper((*cmd)[2]) == "POLICY.CANARY_PCT") { + std::uint64_t pct = 0; + if (!parse_u64((*cmd)[3], pct)) { + st.out += pomai_cache::resp_error("invalid numeric argument"); + } else { + engine.set_canary_pct(pct); + st.out += pomai_cache::resp_simple("OK"); + } + } else if (cmd->size() == 4 && upper((*cmd)[2]) == "TRACE.ENABLED") { + trace_cfg.enabled = upper((*cmd)[3]) == "YES" || (*cmd)[3] == "1"; + if (trace_cfg.enabled && !trace_stream.is_open()) { + trace_stream.open(trace_cfg.path, std::ios::app); + } + st.out += pomai_cache::resp_simple("OK"); + } else if (cmd->size() == 4 && upper((*cmd)[2]) == "TRACE.PATH") { + trace_cfg.path = (*cmd)[3]; + if (trace_stream.is_open()) + trace_stream.close(); + if (trace_cfg.enabled) + trace_stream.open(trace_cfg.path, std::ios::app); + st.out += pomai_cache::resp_simple("OK"); + } else if (cmd->size() == 4 && upper((*cmd)[2]) == "TRACE.SAMPLE_RATE") { + try { + trace_cfg.sample_rate = std::clamp(std::stod((*cmd)[3]), 0.0, 1.0); + st.out += pomai_cache::resp_simple("OK"); + } catch (...) { + st.out += pomai_cache::resp_error("invalid sample rate"); + } } else { ++stats.rejected_requests; st.out += pomai_cache::resp_error("unsupported CONFIG SET"); @@ -303,6 +406,37 @@ int main(int argc, char **argv) { ++stats.rejected_requests; st.out += pomai_cache::resp_error("unknown command"); } + + const auto latency_us = static_cast(std::chrono::duration_cast(pomai_cache::Clock::now() - op_start).count()); + if (latency_us > 5000) { + slowlog.push_back({op_name, latency_us, static_cast(std::chrono::duration_cast(pomai_cache::Clock::now().time_since_epoch()).count())}); + if (slowlog.size() > max_slowlog) + slowlog.pop_front(); + } + + if (trace_cfg.enabled && sample_dist(rng) <= trace_cfg.sample_rate) { + if (!trace_stream.is_open()) + trace_stream.open(trace_cfg.path, std::ios::app); + if (trace_stream.is_open()) { + const auto now_ms = std::chrono::duration_cast(pomai_cache::Clock::now().time_since_epoch()).count(); + const std::string owner = cmd->size() > 1 ? "default" : "n/a"; + const std::size_t key_hash = first_key.empty() ? 0 : std::hash{}(first_key); + std::ostringstream line; + line << "{\"ts_ms\":" << now_ms << ",\"op\":\"" << op_name << "\",\"key_hash\":" << key_hash << ",\"value_size\":"; + if (op_name == "SET" && cmd->size() >= 3) + line << (*cmd)[2].size(); + else + line << 0; + line << ",\"ttl_class\":\"" << (op_name == "SET" && cmd->size() > 3 ? "with_ttl" : "none") << "\",\"owner\":\"" << owner << "\",\"result\":\"" << (hit ? "hit" : "miss") << "\",\"lat_bucket\":\"" << latency_bucket(latency_us) << "\",\"policy_version\":\"" << engine.policy().params().version << "\",\"rng_seed\":" << rng_seed << "}"; + trace_stream << line.str() << "\n"; + trace_ring.push_back(line.str()); + if (trace_ring.size() > max_trace_ring) + trace_ring.pop_front(); + } else { + ++trace_cfg.dropped; + } + } + if (st.out.size() > max_pending_out) { ++stats.rejected_requests; to_close.push_back(fd); @@ -310,9 +444,9 @@ int main(int argc, char **argv) { } } } + if (FD_ISSET(fd, &writefds) && !st.out.empty()) { - const std::size_t send_bytes = - std::min(st.out.size(), 8192); + const std::size_t send_bytes = std::min(st.out.size(), 8192); ssize_t w = send(fd, st.out.data(), send_bytes, 0); if (w <= 0) to_close.push_back(fd); @@ -320,9 +454,9 @@ int main(int argc, char **argv) { st.out.erase(0, static_cast(w)); } } + std::sort(to_close.begin(), to_close.end()); - to_close.erase(std::unique(to_close.begin(), to_close.end()), - to_close.end()); + to_close.erase(std::unique(to_close.begin(), to_close.end()), to_close.end()); for (int fd : to_close) { close(fd); clients.erase(fd); @@ -332,5 +466,7 @@ int main(int argc, char **argv) { for (auto &[fd, _] : clients) close(fd); close(server_fd); + if (trace_stream.is_open()) + trace_stream.close(); return 0; } diff --git a/tests/soak/pomai_cache_soak.py b/tests/soak/pomai_cache_soak.py new file mode 100755 index 0000000..75e0076 --- /dev/null +++ b/tests/soak/pomai_cache_soak.py @@ -0,0 +1,37 @@ +#!/usr/bin/env python3 +import argparse, random, socket, time, json + +def cmd(*parts): + out=[f"*{len(parts)}\r\n".encode()] + for p in parts: + b=str(p).encode() + out.append(f"${len(b)}\r\n".encode()+b+b"\r\n") + return b"".join(out) + +def send(s,*parts): + s.sendall(cmd(*parts)) + return s.recv(4096) + +ap=argparse.ArgumentParser() +ap.add_argument('--port',type=int,default=6379) +ap.add_argument('--duration',type=int,default=180) +args=ap.parse_args() + +s=socket.create_connection(('127.0.0.1',args.port)) +start=time.time(); ops=0; hits=0 +samples=[] +while time.time()-start + +TEST_CASE("chaos churn does not crash", "[chaos]") { + pomai_cache::Engine e({1024 * 1024, 256, 1024, 32}, pomai_cache::make_policy_by_name("pomai_cost")); + std::mt19937_64 rng(42); + for (int i = 0; i < 20000; ++i) { + const auto key = std::string("k") + std::to_string(rng() % 2000); + if (rng() % 4 == 0) { + std::vector v(static_cast(rng() % 128 + 1), 'a'); + e.set(key, v, std::nullopt, "default"); + } else if (rng() % 4 == 1) { + e.get(key); + } else if (rng() % 4 == 2) { + e.expire(key, 1); + } else { + e.del({key}); + } + e.tick(); + } + REQUIRE(e.memory_used() <= 1024 * 1024); +} diff --git a/tools/redis_bench_compat b/tools/redis_bench_compat new file mode 100755 index 0000000..06f87f3 --- /dev/null +++ b/tools/redis_bench_compat @@ -0,0 +1,36 @@ +#!/usr/bin/env python3 +import argparse, csv, socket, time, random + +p=argparse.ArgumentParser() +p.add_argument('-n',type=int,default=1000) +p.add_argument('-c',type=int,default=1) +p.add_argument('-t',default='get,set') +p.add_argument('-d',type=int,default=16) +p.add_argument('-P',type=int,default=1) +p.add_argument('--csv',action='store_true') +p.add_argument('--port',type=int,default=6379) +a=p.parse_args() + +ops=a.t.split(',') +s=socket.create_connection(('127.0.0.1',a.port)) + +def send(parts): + wire=f"*{len(parts)}\r\n".encode() + for part in parts: + b=str(part).encode(); wire+=f"${len(b)}\r\n".encode()+b+b"\r\n" + s.sendall(wire); s.recv(4096) + +lat=[] +start=time.time() +for i in range(a.n): + op=ops[i%len(ops)].upper(); k=f"k{random.randint(0,999)}" + st=time.perf_counter_ns() + if op=='SET': send(['SET',k,'x'*a.d]) + else: send(['GET',k]) + lat.append((time.perf_counter_ns()-st)/1000) +secs=time.time()-start +row={'ops/s':a.n/max(secs,1e-9),'p50':sorted(lat)[int(0.5*len(lat))],'p95':sorted(lat)[int(0.95*len(lat))]} +if a.csv: + w=csv.DictWriter(open('/dev/stdout','w'),fieldnames=row.keys()); w.writeheader(); w.writerow(row) +else: + print(row) diff --git a/traces/mini_hotset.trace b/traces/mini_hotset.trace new file mode 100644 index 0000000..104ae70 --- /dev/null +++ b/traces/mini_hotset.trace @@ -0,0 +1,6 @@ +{"ts_ms":0,"op":"SET","key_hash":1,"value_size":32} +{"ts_ms":5,"op":"SET","key_hash":2,"value_size":32} +{"ts_ms":10,"op":"GET","key_hash":1,"value_size":0} +{"ts_ms":12,"op":"GET","key_hash":3,"value_size":0} +{"ts_ms":20,"op":"GET","key_hash":2,"value_size":0} +{"ts_ms":25,"op":"DEL","key_hash":2,"value_size":0} diff --git a/traces/ttlheavy.trace b/traces/ttlheavy.trace new file mode 100644 index 0000000..1da23b6 --- /dev/null +++ b/traces/ttlheavy.trace @@ -0,0 +1,6 @@ +{"ts_ms":0,"op":"SET","key_hash":100,"value_size":16} +{"ts_ms":1,"op":"SET","key_hash":101,"value_size":16} +{"ts_ms":2,"op":"SET","key_hash":102,"value_size":16} +{"ts_ms":5,"op":"GET","key_hash":100,"value_size":0} +{"ts_ms":10,"op":"GET","key_hash":101,"value_size":0} +{"ts_ms":15,"op":"GET","key_hash":103,"value_size":0}