Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
216 changes: 170 additions & 46 deletions agent/src/bench_tools.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Benchmark, profiling, and correctness test tools for the Agent framework.
// Benchmark, profiling, and correctness test tools for the Agent framework.

use crate::tools::{BenchmarkComparison, BenchmarkResult, FunctionProfile, ToolResult};
use serde::Deserialize;
Expand All @@ -22,12 +22,34 @@ const BENCHMARK_TIMEOUT_SECS: u64 = 600;

/// Default recall threshold for correctness tests.
const RECALL_THRESHOLD: f64 = 0.95;

fn anti_cheat_default_passed() -> bool {
true
}

#[derive(Debug, Clone, Deserialize)]
struct AntiCheatOutput {
#[serde(default = "anti_cheat_default_passed")]
passed: bool,
#[serde(default)]
message: String,
}

impl Default for AntiCheatOutput {
fn default() -> Self {
Self {
passed: true,
message: String::new(),
}
}
}

/// Wrapper for the benchmark binary's JSON output: `{"benchmark": ..., "anti_cheat": ...}`.
#[derive(Debug, Deserialize)]
struct BenchmarkOutput {
benchmark: BenchmarkResult,
#[allow(dead_code)]
anti_cheat: serde_json::Value,
#[serde(default)]
anti_cheat: AntiCheatOutput,
}

/// Timeout (seconds) for the server to become ready after startup.
Expand All @@ -51,7 +73,8 @@ fn next_round_number(dir: &Path, prefix: &str) -> u32 {
let name = entry.file_name().to_string_lossy().to_string();
// Parse "prefix_NNN.ext" �?NNN
if name.starts_with(prefix) {
if let Some(num_part) = name.strip_prefix(prefix).and_then(|s| s.split('.').next()) {
if let Some(num_part) = name.strip_prefix(prefix).and_then(|s| s.split('.').next())
{
if let Ok(n) = num_part.parse::<u32>() {
max = max.max(n);
}
Expand Down Expand Up @@ -109,6 +132,14 @@ fn build_comparison(prev: &BenchmarkResult, curr: &BenchmarkResult) -> Benchmark
}
}

fn apply_anti_cheat_guard(benchmark: &mut BenchmarkResult, anti_cheat: &AntiCheatOutput) {
if anti_cheat.passed {
return;
}
benchmark.qps = 0.0;
benchmark.recall_passed = false;
}

/// Save profiling results (flamegraph + report) to profiling/ with round numbers.
/// Returns (round_number, flamegraph_path, report_path).
fn save_profiling_results(
Expand All @@ -133,7 +164,10 @@ fn save_profiling_results(
let report_path = dir.join(format!("report_{:03}.txt", round));
let mut report = String::new();
report.push_str(&format!("Profiling Report #{:03}\n", round));
report.push_str(&format!("Date: {}\n\n", chrono::Utc::now().format("%Y-%m-%d %H:%M:%S UTC")));
report.push_str(&format!(
"Date: {}\n\n",
chrono::Utc::now().format("%Y-%m-%d %H:%M:%S UTC")
));
report.push_str("Top Functions:\n");
for f in top_functions {
report.push_str(&format!(" {:>6.2}% {}\n", f.percentage, f.function));
Expand Down Expand Up @@ -245,11 +279,7 @@ async fn try_cargo_build_inner(work_dir: &Path, profiling: bool) -> Result<(), S
cmd.env("CARGO_PROFILE_RELEASE_CODEGEN_UNITS", "16");
}

let result = timeout(
Duration::from_secs(BUILD_TIMEOUT_SECS),
cmd.output(),
)
.await;
let result = timeout(Duration::from_secs(BUILD_TIMEOUT_SECS), cmd.output()).await;

match result {
Ok(Ok(output)) => {
Expand Down Expand Up @@ -290,7 +320,11 @@ pub async fn build_project_tool(work_dir: &Path) -> ToolResult {
///
/// Launches `target/release/<binary>` in `work_dir` with the `PORT` environment
/// variable set to the given port.
async fn start_server(work_dir: &Path, port: u16, cpu_cores: Option<&str>) -> Result<tokio::process::Child, String> {
async fn start_server(
work_dir: &Path,
port: u16,
cpu_cores: Option<&str>,
) -> Result<tokio::process::Child, String> {
// Dynamically detect binary name from Cargo.toml
let binary_name = detect_binary_name(work_dir)?;
let binary = work_dir.join(format!("target/release/{}", binary_name));
Expand All @@ -310,13 +344,9 @@ async fn start_server(work_dir: &Path, port: u16, cpu_cores: Option<&str>) -> Re
)
})?;

let abs_work_dir = work_dir.canonicalize().map_err(|e| {
format!(
"Failed to resolve work_dir '{}': {}",
work_dir.display(),
e
)
})?;
let abs_work_dir = work_dir
.canonicalize()
.map_err(|e| format!("Failed to resolve work_dir '{}': {}", work_dir.display(), e))?;

// Use taskset to pin the server to specific CPU cores.
// This prevents the model's code from consuming all CPUs on the machine.
Expand Down Expand Up @@ -467,7 +497,6 @@ fn find_base_vectors(data_dir: &Path, work_dir: &Path) -> Result<PathBuf, String
Ok(merged_path)
}


/// Run the benchmark client against the vector database service.
///
/// Launches the benchmark binary with the given concurrency and warmup settings,
Expand Down Expand Up @@ -515,7 +544,9 @@ pub async fn run_benchmark(
};

// 4. Wait for server readiness
if let Err(e) = wait_for_server_ready(port, SERVER_READY_TIMEOUT_SECS, SERVER_POLL_INTERVAL_MS).await {
if let Err(e) =
wait_for_server_ready(port, SERVER_READY_TIMEOUT_SECS, SERVER_POLL_INTERVAL_MS).await
{
kill_server(&mut child).await;
return ToolResult::Error {
message: format!("Server not ready: {}", e),
Expand Down Expand Up @@ -617,6 +648,17 @@ pub async fn run_benchmark(
match serde_json::from_str::<BenchmarkOutput>(&stdout) {
Ok(output) => {
let mut bench = output.benchmark;
if !output.anti_cheat.passed {
eprintln!(
"[benchmark] Anti-cheat failed. Invalidating score (QPS=0). Detail: {}",
if output.anti_cheat.message.is_empty() {
"SUSPICIOUS benchmark output"
} else {
output.anti_cheat.message.as_str()
}
);
}
apply_anti_cheat_guard(&mut bench, &output.anti_cheat);
// Add comparison with previous run
if let Some(prev) = load_previous_benchmark(work_dir) {
bench.comparison = Some(build_comparison(&prev, &bench));
Expand All @@ -635,7 +677,6 @@ pub async fn run_benchmark(
}
}


/// Run performance profiling on the skeleton server process.
///
/// **Unlike `run_benchmark` and `run_correctness_test`, this function does NOT
Expand All @@ -646,7 +687,11 @@ pub async fn run_benchmark(
/// Manages the full server lifecycle: kill leftover processes on the port,
/// build the project, start the server, wait for readiness, run `perf record`,
/// generate flamegraph, extract top functions, and finally kill the server.
pub async fn run_profiling(work_dir: &Path, config: &BenchConfig, _duration: Option<u64>) -> ToolResult {
pub async fn run_profiling(
work_dir: &Path,
config: &BenchConfig,
_duration: Option<u64>,
) -> ToolResult {
let perf_data = work_dir.join("perf.data");
let flamegraph_svg = work_dir.join("flamegraph.svg");

Expand Down Expand Up @@ -675,7 +720,9 @@ pub async fn run_profiling(work_dir: &Path, config: &BenchConfig, _duration: Opt
};

// 4. Wait for server readiness
if let Err(e) = wait_for_server_ready(port, SERVER_READY_TIMEOUT_SECS, SERVER_POLL_INTERVAL_MS).await {
if let Err(e) =
wait_for_server_ready(port, SERVER_READY_TIMEOUT_SECS, SERVER_POLL_INTERVAL_MS).await
{
kill_server(&mut child).await;
return ToolResult::Error {
message: format!("Server not ready: {}", e),
Expand All @@ -698,7 +745,9 @@ pub async fn run_profiling(work_dir: &Path, config: &BenchConfig, _duration: Opt
Ok(p) => p,
Err(e) => {
kill_server(&mut child).await;
return ToolResult::Error { message: format!("Profiling needs real data: {}", e) };
return ToolResult::Error {
message: format!("Profiling needs real data: {}", e),
};
}
};
let query_vectors = config.data_dir.join("query_vectors.json");
Expand Down Expand Up @@ -730,10 +779,13 @@ pub async fn run_profiling(work_dir: &Path, config: &BenchConfig, _duration: Opt
let mut perf_child = match Command::new("perf")
.args([
"record",
"-F", "99",
"-p", &pid.to_string(),
"-F",
"99",
"-p",
&pid.to_string(),
"-g",
"-o", perf_data.to_str().unwrap_or("perf.data"),
"-o",
perf_data.to_str().unwrap_or("perf.data"),
])
.current_dir(work_dir)
.stdout(std::process::Stdio::piped())
Expand All @@ -753,13 +805,20 @@ pub async fn run_profiling(work_dir: &Path, config: &BenchConfig, _duration: Opt
let bench_result = timeout(
Duration::from_secs(BENCHMARK_TIMEOUT_SECS),
Command::new(benchmark_bin.to_str().unwrap_or_default())
.arg("--server-url").arg(format!("http://127.0.0.1:{}", port))
.arg("--concurrency").arg("4")
.arg("--warmup").arg("100")
.arg("--max-queries").arg("1000")
.arg("--base-vectors").arg(base_vectors.to_str().unwrap_or_default())
.arg("--query-vectors").arg(query_vectors.to_str().unwrap_or_default())
.arg("--ground-truth").arg(ground_truth.to_str().unwrap_or_default())
.arg("--server-url")
.arg(format!("http://127.0.0.1:{}", port))
.arg("--concurrency")
.arg("4")
.arg("--warmup")
.arg("100")
.arg("--max-queries")
.arg("1000")
.arg("--base-vectors")
.arg(base_vectors.to_str().unwrap_or_default())
.arg("--query-vectors")
.arg(query_vectors.to_str().unwrap_or_default())
.arg("--ground-truth")
.arg(ground_truth.to_str().unwrap_or_default())
.current_dir(work_dir)
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
Expand All @@ -772,14 +831,18 @@ pub async fn run_profiling(work_dir: &Path, config: &BenchConfig, _duration: Opt
{
if let Some(perf_pid) = perf_child.id() {
// SIGINT (2) tells perf to flush and exit cleanly
unsafe { libc::kill(perf_pid as i32, libc::SIGINT); }
unsafe {
libc::kill(perf_pid as i32, libc::SIGINT);
}
}
}
// Wait for perf to finish writing
let perf_wait = timeout(Duration::from_secs(10), perf_child.wait()).await;
match perf_wait {
Ok(Ok(_)) => {} // perf exited
_ => { let _ = perf_child.kill().await; } // force kill if stuck
_ => {
let _ = perf_child.kill().await;
} // force kill if stuck
}

// Log benchmark result (informational, not the main output)
Expand All @@ -789,13 +852,17 @@ pub async fn run_profiling(work_dir: &Path, config: &BenchConfig, _duration: Opt
eprintln!("[profiling] Benchmark client completed successfully during profiling.");
} else {
let stderr = String::from_utf8_lossy(&output.stderr);
eprintln!("[profiling] Benchmark client exited with code {}: {}",
eprintln!(
"[profiling] Benchmark client exited with code {}: {}",
output.status.code().unwrap_or(-1),
&stderr[..stderr.len().min(500)]);
&stderr[..stderr.len().min(500)]
);
}
}
Ok(Err(e)) => eprintln!("[profiling] Benchmark client failed to execute: {}", e),
Err(_) => eprintln!("[profiling] Benchmark client timed out (profiling data should still be valid)."),
Err(_) => eprintln!(
"[profiling] Benchmark client timed out (profiling data should still be valid)."
),
}

// Check perf.data was produced
Expand Down Expand Up @@ -918,12 +985,15 @@ fn parse_perf_report(report: &str) -> Vec<FunctionProfile> {
}

// Sort by percentage descending, take top 10
functions.sort_by(|a, b| b.percentage.partial_cmp(&a.percentage).unwrap_or(std::cmp::Ordering::Equal));
functions.sort_by(|a, b| {
b.percentage
.partial_cmp(&a.percentage)
.unwrap_or(std::cmp::Ordering::Equal)
});
functions.truncate(10);
functions
}


/// Run a correctness test by executing the benchmark with a small query subset.
///
/// Runs the benchmark client in a lightweight mode and checks whether the recall
Expand Down Expand Up @@ -960,7 +1030,9 @@ pub async fn run_correctness_test(work_dir: &Path, config: &BenchConfig) -> Tool
};

// 4. Wait for server readiness
if let Err(e) = wait_for_server_ready(port, SERVER_READY_TIMEOUT_SECS, SERVER_POLL_INTERVAL_MS).await {
if let Err(e) =
wait_for_server_ready(port, SERVER_READY_TIMEOUT_SECS, SERVER_POLL_INTERVAL_MS).await
{
kill_server(&mut child).await;
return ToolResult::Error {
message: format!("Server not ready: {}", e),
Expand Down Expand Up @@ -1280,6 +1352,61 @@ mod tests {
assert!((output.benchmark.qps - 1500.5).abs() < f64::EPSILON);
assert_eq!(output.benchmark.total_queries, 10000);
assert!(output.benchmark.recall_passed);
assert!(output.anti_cheat.passed);
}

#[test]
fn test_apply_anti_cheat_guard_invalidates_score() {
let mut benchmark = BenchmarkResult {
qps: 1500.5,
total_queries: 10000,
duration_secs: 6.66,
avg_latency_ms: 2.5,
p50_latency_ms: 2.0,
p95_latency_ms: 5.0,
p99_latency_ms: 10.0,
recall: 0.98,
recall_threshold: 0.95,
recall_passed: true,
concurrency: 4,
comparison: None,
};
let anti_cheat = AntiCheatOutput {
passed: false,
message: "SUSPICIOUS".to_string(),
};

apply_anti_cheat_guard(&mut benchmark, &anti_cheat);

assert_eq!(benchmark.qps, 0.0);
assert!(!benchmark.recall_passed);
}

#[test]
fn test_apply_anti_cheat_guard_keeps_clean_result() {
let mut benchmark = BenchmarkResult {
qps: 1500.5,
total_queries: 10000,
duration_secs: 6.66,
avg_latency_ms: 2.5,
p50_latency_ms: 2.0,
p95_latency_ms: 5.0,
p99_latency_ms: 10.0,
recall: 0.98,
recall_threshold: 0.95,
recall_passed: true,
concurrency: 4,
comparison: None,
};
let anti_cheat = AntiCheatOutput {
passed: true,
message: "OK".to_string(),
};

apply_anti_cheat_guard(&mut benchmark, &anti_cheat);

assert_eq!(benchmark.qps, 1500.5);
assert!(benchmark.recall_passed);
}

// ─── detect_binary_name tests ───────────────────────────────────────────
Expand Down Expand Up @@ -1550,10 +1677,7 @@ fn main() {
// ─── helper ──────────────────────────────────────────────────────────────

fn tempdir() -> std::path::PathBuf {
let dir = std::env::temp_dir().join(format!(
"bench_tools_test_{}",
uuid::Uuid::new_v4()
));
let dir = std::env::temp_dir().join(format!("bench_tools_test_{}", uuid::Uuid::new_v4()));
std::fs::create_dir_all(&dir).unwrap();
dir
}
Expand Down
Loading