Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions packages/ucache_bench/client/UcacheBenchClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <folly/io/async/ScopedEventBaseThread.h>
#include <folly/portability/GFlags.h>
#include <mcrouter/McrouterFiberContext.h>
#include <mcrouter/lib/carbon/Result.h>
#include <mcrouter/lib/network/CpuController.h>

DECLARE_string(config);
Expand Down Expand Up @@ -748,6 +749,11 @@ UcacheBenchClient::WarmupResults UcacheBenchClient::warmup() {
localSuccesses++;
} else {
localErrors++;
if (FLAGS_verbose) {
printf(
"Warmup SET error: %s\n",
carbon::resultToString(*result.result_ref()));
}
}
co_return;
};
Expand Down Expand Up @@ -1130,9 +1136,19 @@ UcacheBenchClient::BenchmarkResults UcacheBenchClient::runBenchmark() {
workerSetSuccesses[workerId]->fetch_add(1);
} else {
workerSetErrors[workerId]->fetch_add(1);
if (FLAGS_verbose) {
printf(
"Benchmark SET error (on GET miss): %s\n",
carbon::resultToString(*setResult.result_ref()));
}
}
} else {
workerGetErrors[workerId]->fetch_add(1);
if (FLAGS_verbose) {
printf(
"Benchmark GET error: %s\n",
carbon::resultToString(*result.result_ref()));
}
}

co_return;
Expand Down Expand Up @@ -1189,6 +1205,11 @@ UcacheBenchClient::BenchmarkResults UcacheBenchClient::runBenchmark() {
workerSetSuccesses[workerId]->fetch_add(1);
} else {
workerSetErrors[workerId]->fetch_add(1);
if (FLAGS_verbose) {
printf(
"Benchmark SET error: %s\n",
carbon::resultToString(*result.result_ref()));
}
}

co_return;
Expand Down
11 changes: 11 additions & 0 deletions packages/ucache_bench/install_ucache_bench.sh
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,17 @@ else
echo " WARNING: ucachebench_client binary not found"
fi

# Copy affinitize NIC scripts for IRQ affinity tuning
AFFINITIZE_SRC="$BENCHPRESS_ROOT/packages/common/affinitize"
AFFINITIZE_DST="$UCACHE_BENCH_DIR/affinitize"
if [ -d "$AFFINITIZE_SRC" ]; then
echo "Copying affinitize NIC scripts..."
mkdir -p "$AFFINITIZE_DST"
cp -r "$AFFINITIZE_SRC/"* "$AFFINITIZE_DST/"
chmod +x "$AFFINITIZE_DST/affinitize_nic.py" 2>/dev/null || true
echo " Installed: $AFFINITIZE_DST/"
fi

echo ""
echo "=============================================="
echo "UCacheBench Installation Complete!"
Expand Down
156 changes: 154 additions & 2 deletions packages/ucache_bench/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

BENCHPRESS_ROOT: pathlib.Path = pathlib.Path(os.path.abspath(__file__)).parents[2]
UCACHE_BENCH_DIR: str = os.path.join(BENCHPRESS_ROOT, "benchmarks", "ucache_bench")
AFFINITIZE_NIC_SYSTEM_PATH: str = "/usr/local/bin/affinitize_nic"

# Constants
MEM_USAGE_FACTOR = 0.75 # to prevent OOM
Expand Down Expand Up @@ -158,6 +159,78 @@ def profile_server() -> None:
)


def get_affinitize_nic_path() -> str:
"""Get path to the affinitize_nic script.

Checks system path first, then common packages dir, falls back to
benchmark-local copy (created by install script for OSS builds).
"""
if os.path.exists(AFFINITIZE_NIC_SYSTEM_PATH):
return AFFINITIZE_NIC_SYSTEM_PATH
# Internal fbpkg: shared utility in packages/common/affinitize/
common_path = os.path.join(
BENCHPRESS_ROOT, "packages", "common", "affinitize", "affinitize_nic.py"
)
if os.path.exists(common_path):
return common_path
# OSS: copied by install script to benchmark dir
return os.path.join(UCACHE_BENCH_DIR, "affinitize", "affinitize_nic.py")


def affinitize_nic(args: argparse.Namespace) -> None:
"""Configure NIC IRQ affinity to distribute interrupts across CPUs.

This prevents IRQ processing from bottlenecking on a few cores.
Ported from TaoBench's NIC affinity tuning.

Steps:
1. Set the number of NIC combined channels using ethtool
2. Redistribute IRQ affinity across CPUs using affinitize_nic.py
"""
n_cores = len(os.sched_getaffinity(0))
n_channels = int(n_cores * args.nic_channel_ratio)

if n_channels <= 0:
print(
f"Skipping NIC affinity: n_channels={n_channels} "
f"(cores={n_cores}, ratio={args.nic_channel_ratio})"
)
return

# Step 1: Set NIC channel count
try:
cmd = ["ethtool", "-L", args.interface_name, "combined", str(n_channels)]
print(f"Setting NIC channels: {' '.join(cmd)}")
subprocess.run(cmd, check=True, capture_output=True)
except (subprocess.CalledProcessError, FileNotFoundError, OSError) as e:
print(f"Failed to set NIC channels to {n_channels}: {e}")

# Step 2: Set IRQ affinity
try:
cmd = [
get_affinitize_nic_path(),
"-f",
"-a",
"--xps",
]
if args.hard_binding:
cmd += [
"--cpu",
" ".join(str(x) for x in range(n_channels)),
]
else:
cmd += [
"-A",
"all-nodes",
"--max-cpus",
str(n_channels),
]
print(f"Setting NIC IRQ affinity: {' '.join(cmd)}")
subprocess.run(cmd, check=True, capture_output=True)
except (subprocess.CalledProcessError, FileNotFoundError, OSError) as e:
print(f"Failed to set NIC IRQ affinity: {e}")


def run_server(args: argparse.Namespace) -> None:
"""Run the UcacheBench server.

Expand All @@ -177,6 +250,10 @@ def run_server(args: argparse.Namespace) -> None:
hash_power = calculate_hash_power(memory_mb)
print(f"Auto-calculated hash_power={hash_power} for {memory_mb}MB memory")

# Configure NIC IRQ affinity before starting server
if args.interface_name != "lo" and args.nic_channel_ratio > 0:
affinitize_nic(args)

print(
f"Starting UcacheBench server with {args.memory_mb}MB memory on port {args.port}"
)
Expand Down Expand Up @@ -235,6 +312,10 @@ def run_server(args: argparse.Namespace) -> None:
server_cmd.append(
f"--rpc_num_cpu_worker_threads={args.rpc_num_cpu_worker_threads}"
)
if args.rpc_socket_max_reads_per_event != 1:
server_cmd.append(
f"--rpc_socket_max_reads_per_event={args.rpc_socket_max_reads_per_event}"
)

# CPU pinning configuration
if args.cpu_pinning_enabled:
Expand Down Expand Up @@ -269,6 +350,18 @@ def run_server(args: argparse.Namespace) -> None:
if args.verbose:
server_cmd.append("--verbose=true")

# Fiber configuration
if args.enable_fibers:
server_cmd.append("--enable_fibers=true")
if args.fiber_stack_size != 65536:
server_cmd.append(f"--fiber_stack_size={args.fiber_stack_size}")
if args.fiber_max_pool_size != 1000:
server_cmd.append(f"--fiber_max_pool_size={args.fiber_max_pool_size}")
if args.fiber_pool_resize_period_ms != 1000:
server_cmd.append(
f"--fiber_pool_resize_period_ms={args.fiber_pool_resize_period_ms}"
)

if "DCPERF_PERF_RECORD" in os.environ and os.environ["DCPERF_PERF_RECORD"] == "1":
delay = args.perf_record_delay
print(f"DCPERF_PERF_RECORD=1, will start perf record after {delay}s delay")
Expand Down Expand Up @@ -461,6 +554,12 @@ def init_parser() -> argparse.ArgumentParser:
default=1,
help="Number of CPU worker threads for ThriftServer",
)
server_parser.add_argument(
"--rpc-socket-max-reads-per-event",
type=int,
default=1,
help="Max reads per socket per event loop iteration (production uses 1, ThriftServer default is 16)",
)

# CPU pinning configuration
server_parser.add_argument(
Expand Down Expand Up @@ -500,6 +599,27 @@ def init_parser() -> argparse.ArgumentParser:
help="Reduce IO thread count to match non-IRQ CPU count (set to non-zero to enable)",
)

# NIC IRQ affinity configuration (ported from TaoBench)
server_parser.add_argument(
"--nic-channel-ratio",
type=float,
default=0.0,
help="Ratio of NIC channels to logical cores (0.0 = disabled, 0.5 = TaoBench default). "
"Sets ethtool combined channels and redistributes IRQ affinity.",
)
server_parser.add_argument(
"--interface-name",
type=str,
default="eth0",
help="Network interface name for NIC IRQ affinity tuning",
)
server_parser.add_argument(
"--hard-binding",
type=int,
default=0,
help="Hard bind NIC channels to specific CPU cores (set to non-zero to enable)",
)

# Navy (hybrid mode) configuration
server_parser.add_argument(
"--navy-cache-path",
Expand Down Expand Up @@ -564,6 +684,32 @@ def init_parser() -> argparse.ArgumentParser:
help="Timeout in seconds for waiting for clients (0 = no timeout)",
)

# Fiber configuration
server_parser.add_argument(
"--enable-fibers",
type=int,
default=0,
help="Enable fiber-based request processing (set to non-zero to enable)",
)
server_parser.add_argument(
"--fiber-stack-size",
type=int,
default=65536,
help="Stack size for IO thread fibers in bytes",
)
server_parser.add_argument(
"--fiber-max-pool-size",
type=int,
default=1000,
help="Maximum number of preallocated free fibers to keep around",
)
server_parser.add_argument(
"--fiber-pool-resize-period-ms",
type=int,
default=1000,
help="Period in ms for resizing the fiber pool (0 = disabled)",
)

# Profiling configuration
server_parser.add_argument(
"--perf-record-delay",
Expand All @@ -574,7 +720,10 @@ def init_parser() -> argparse.ArgumentParser:
)

server_parser.add_argument(
"--verbose", action="store_true", help="Enable verbose logging"
"--verbose",
type=int,
default=0,
help="Enable verbose logging (set to non-zero to enable)",
)
server_parser.add_argument(
"--real", action="store_true", help="Actually run the command"
Expand Down Expand Up @@ -731,7 +880,10 @@ def init_parser() -> argparse.ArgumentParser:
)

client_parser.add_argument(
"--verbose", action="store_true", help="Enable verbose logging"
"--verbose",
type=int,
default=0,
help="Enable verbose logging (set to non-zero to enable)",
)
client_parser.add_argument(
"--real", action="store_true", help="Actually run the command"
Expand Down
15 changes: 12 additions & 3 deletions packages/ucache_bench/server/UcacheBenchRpcServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ DEFINE_uint32(
1,
"Number of CPU worker threads for ThriftServer. "
"Production ucache uses 1. These handle CPU-bound work separate from IO");
DEFINE_uint32(
rpc_socket_max_reads_per_event,
1,
"Max reads per socket per event loop iteration. "
"Production ucache uses 1. ThriftServer default is 16. "
"Higher values let a single connection deliver more requests per epoll wakeup");

// CPU pinning configuration flags
DEFINE_bool(
Expand Down Expand Up @@ -163,7 +169,8 @@ apache::thrift::ThriftServer& UcacheBenchRpcServer::addThriftServer() {
// Prevent single connection from monopolizing an IO thread's event loop.
// Without this, a few hot connections can starve others, limiting
// multi-client scalability.
thriftServer_->setSocketMaxReadsPerEvent(1);
thriftServer_->setSocketMaxReadsPerEvent(
FLAGS_rpc_socket_max_reads_per_event);

// Disable timeouts — let clients control timing, same as production ucache.
thriftServer_->setQueueTimeout(std::chrono::milliseconds(0));
Expand All @@ -176,8 +183,10 @@ apache::thrift::ThriftServer& UcacheBenchRpcServer::addThriftServer() {
thriftServer_->disableActiveRequestsTracking();

XLOG(INFO) << "ThriftServer configured with "
<< FLAGS_rpc_num_cpu_worker_threads << " CPU worker threads and "
<< numAcceptorThreads << " acceptor threads";
<< FLAGS_rpc_num_cpu_worker_threads << " CPU worker threads, "
<< numAcceptorThreads << " acceptor threads, "
<< "socketMaxReadsPerEvent="
<< FLAGS_rpc_socket_max_reads_per_event;

return *thriftServer_;
}
Expand Down
5 changes: 5 additions & 0 deletions packages/ucache_bench/server/UcacheBenchServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ void UcacheBenchServer::setupCacheLib() {
cacheConfig.setAccessConfig(
{config_.hash_power, config_.hashtable_lock_power});

// Configure number of CacheLib shards if specified
if (config_.cachelib_num_shards > 0) {
cacheConfig.setNumShards(config_.cachelib_num_shards);
}

// Generate alloc sizes (factor 1.25, min allocation size)
// This provides a good distribution of allocation classes for cache items
// Max alloc size increased to 64KB to support production traffic distribution
Expand Down
Loading