Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 21 additions & 21 deletions packages/ucache_bench/client/UcacheBenchClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -716,10 +716,10 @@ UcacheBenchClient::WarmupResults UcacheBenchClient::warmup() {
std::string value = generateValue();

UcbSetRequest request;
request.key_ref() =
request.key() =
carbon::Keys<folly::IOBuf>(std::move(*folly::IOBuf::copyBuffer(key)));
request.value_ref() = *folly::IOBuf::copyBuffer(value);
request.exptime_ref() = 3600;
request.value() = *folly::IOBuf::copyBuffer(value);
request.exptime() = 3600;

if (FLAGS_enable_random_source_ip) {
uint8_t randomOctet = folly::Random::rand32(1, 255);
Expand All @@ -744,7 +744,7 @@ UcacheBenchClient::WarmupResults UcacheBenchClient::warmup() {
UcbSetReply result = co_await std::move(future);

localOps++;
if (*result.result_ref() == carbon::Result::STORED) {
if (*result.result() == carbon::Result::STORED) {
localSuccesses++;
} else {
localErrors++;
Expand Down Expand Up @@ -1050,7 +1050,7 @@ UcacheBenchClient::BenchmarkResults UcacheBenchClient::runBenchmark() {
std::string key = generateKey();

UcbGetRequest request;
request.key_ref() =
request.key() =
carbon::Keys<folly::IOBuf>(std::move(*folly::IOBuf::copyBuffer(key)));

if (FLAGS_enable_random_source_ip) {
Expand Down Expand Up @@ -1088,20 +1088,20 @@ UcacheBenchClient::BenchmarkResults UcacheBenchClient::runBenchmark() {
workerTotalOps[workerId]->fetch_add(1);
workerGetOps[workerId]->fetch_add(1);

if (*result.result_ref() == carbon::Result::FOUND) {
if (*result.result() == carbon::Result::FOUND) {
workerGetHits[workerId]->fetch_add(1);
} else if (*result.result_ref() == carbon::Result::NOTFOUND) {
} else if (*result.result() == carbon::Result::NOTFOUND) {
workerGetMisses[workerId]->fetch_add(1);

// SET on GET miss to simulate real cache warming behavior
// This matches the behavior from the old synchronous version
std::string value = generateValue();

UcbSetRequest setRequest;
setRequest.key_ref() = carbon::Keys<folly::IOBuf>(
setRequest.key() = carbon::Keys<folly::IOBuf>(
std::move(*folly::IOBuf::copyBuffer(key)));
setRequest.value_ref() = *folly::IOBuf::copyBuffer(value);
setRequest.exptime_ref() = 3600;
setRequest.value() = *folly::IOBuf::copyBuffer(value);
setRequest.exptime() = 3600;

if (FLAGS_enable_random_source_ip) {
uint8_t randomOctet = folly::Random::rand32(1, 255);
Expand All @@ -1126,7 +1126,7 @@ UcacheBenchClient::BenchmarkResults UcacheBenchClient::runBenchmark() {
UcbSetReply setResult = co_await std::move(setFuture);

workerSetOps[workerId]->fetch_add(1);
if (*setResult.result_ref() == carbon::Result::STORED) {
if (*setResult.result() == carbon::Result::STORED) {
workerSetSuccesses[workerId]->fetch_add(1);
} else {
workerSetErrors[workerId]->fetch_add(1);
Expand All @@ -1145,10 +1145,10 @@ UcacheBenchClient::BenchmarkResults UcacheBenchClient::runBenchmark() {
std::string value = generateValue();

UcbSetRequest request;
request.key_ref() =
request.key() =
carbon::Keys<folly::IOBuf>(std::move(*folly::IOBuf::copyBuffer(key)));
request.value_ref() = *folly::IOBuf::copyBuffer(value);
request.exptime_ref() = 3600;
request.value() = *folly::IOBuf::copyBuffer(value);
request.exptime() = 3600;

if (FLAGS_enable_random_source_ip) {
uint8_t randomOctet = folly::Random::rand32(1, 255);
Expand Down Expand Up @@ -1185,7 +1185,7 @@ UcacheBenchClient::BenchmarkResults UcacheBenchClient::runBenchmark() {
workerTotalOps[workerId]->fetch_add(1);
workerSetOps[workerId]->fetch_add(1);

if (*result.result_ref() == carbon::Result::STORED) {
if (*result.result() == carbon::Result::STORED) {
workerSetSuccesses[workerId]->fetch_add(1);
} else {
workerSetErrors[workerId]->fetch_add(1);
Expand Down Expand Up @@ -1430,7 +1430,7 @@ void UcacheBenchClient::sendUcbGetRequestSync(
const std::string& key,
const std::function<void(UcbGetReply&&)>& callback) {
UcbGetRequest request;
request.key_ref() =
request.key() =
carbon::Keys<folly::IOBuf>(std::move(*folly::IOBuf::copyBuffer(key)));

// Set random source IP if enabled for connection fanout
Expand Down Expand Up @@ -1463,7 +1463,7 @@ void UcacheBenchClient::sendUcbGetRequestSync(

if (!success) {
// Failed to send - populate error message (result defaults to UNKNOWN)
result.message_ref() = "Failed to send GET request to mcrouter";
result.message() = "Failed to send GET request to mcrouter";
callback(std::move(result));
return;
}
Expand All @@ -1480,10 +1480,10 @@ void UcacheBenchClient::sendUcbSetRequestSync(
const std::string& value,
const std::function<void(UcbSetReply&&)>& callback) {
UcbSetRequest request;
request.key_ref() =
request.key() =
carbon::Keys<folly::IOBuf>(std::move(*folly::IOBuf::copyBuffer(key)));
request.value_ref() = *folly::IOBuf::copyBuffer(value);
request.exptime_ref() = 3600; // 1 hour default TTL
request.value() = *folly::IOBuf::copyBuffer(value);
request.exptime() = 3600; // 1 hour default TTL

// Set random source IP if enabled for connection fanout
if (FLAGS_enable_random_source_ip) {
Expand Down Expand Up @@ -1514,7 +1514,7 @@ void UcacheBenchClient::sendUcbSetRequestSync(

if (!success) {
// Failed to send - populate error message (result defaults to UNKNOWN)
result.message_ref() = "Failed to send SET request to mcrouter";
result.message() = "Failed to send SET request to mcrouter";
callback(std::move(result));
return;
}
Expand Down
16 changes: 8 additions & 8 deletions packages/ucache_bench/server/UcacheBenchOnRequest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ void UcacheBenchOnRequest::onRequestThrift(
cb->result(std::move(reply));
} catch (const std::exception& ex) {
UcbGetReply reply;
reply.result_ref() = carbon::Result::REMOTE_ERROR;
reply.message_ref() = ex.what();
reply.result() = carbon::Result::REMOTE_ERROR;
reply.message() = ex.what();
cb->result(std::move(reply));
}
});
Expand All @@ -44,8 +44,8 @@ void UcacheBenchOnRequest::onRequestThrift(
cb->result(std::move(reply));
} catch (const std::exception& ex) {
UcbSetReply reply;
reply.result_ref() = carbon::Result::REMOTE_ERROR;
reply.message_ref() = ex.what();
reply.result() = carbon::Result::REMOTE_ERROR;
reply.message() = ex.what();
cb->result(std::move(reply));
}
});
Expand All @@ -61,8 +61,8 @@ void UcacheBenchOnRequest::onRequestThrift(
cb->result(std::move(reply));
} catch (const std::exception& ex) {
UcbDeleteReply reply;
reply.result_ref() = carbon::Result::REMOTE_ERROR;
reply.message_ref() = ex.what();
reply.result() = carbon::Result::REMOTE_ERROR;
reply.message() = ex.what();
cb->result(std::move(reply));
}
});
Expand All @@ -75,8 +75,8 @@ void UcacheBenchOnRequest::onRequestThrift(
ucacheBenchOnRequestCommon(
std::move(callback), std::move(request), [](auto&& cb, auto&& /* req */) {
facebook::memcache::McVersionReply reply;
reply.result_ref() = carbon::Result::FOUND;
reply.value_ref() =
reply.result() = carbon::Result::FOUND;
reply.value() =
*folly::IOBuf::copyBuffer("UcacheBench 1.0 (with Fiber support)");
cb->result(std::move(reply));
});
Expand Down
51 changes: 24 additions & 27 deletions packages/ucache_bench/server/UcacheBenchServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,24 +238,23 @@ void UcacheBenchServer::setupCacheLib() {
folly::SemiFuture<UcbGetReply> UcacheBenchServer::processUcbGet(
const UcbGetRequest& req) {
UcbGetReply reply;
reply.result_ref() = carbon::Result::NOTFOUND;
reply.result() = carbon::Result::NOTFOUND;

try {
// Extract key from Carbon Keys type - convert to string
std::string keyStr = req.key_ref()->fullKey().str();
std::string keyStr = req.key()->fullKey().str();

auto item = cache_->find(keyStr);
if (item) {
// Cache hit
reply.result_ref() = carbon::Result::FOUND;
reply.result() = carbon::Result::FOUND;

// Set the value as IOBuf
auto valueView = item->getMemory();
reply.value_ref() = *folly::IOBuf::copyBuffer(
reply.value() = *folly::IOBuf::copyBuffer(
reinterpret_cast<const char*>(valueView), item->getSize());

reply.flags_ref() =
req.flags_ref().has_value() ? req.flags_ref().value() : 0;
reply.flags() = req.flags().has_value() ? req.flags().value() : 0;

recordGet(true /* hit */);

Expand All @@ -264,7 +263,7 @@ folly::SemiFuture<UcbGetReply> UcacheBenchServer::processUcbGet(
}
} else {
// Cache miss
reply.result_ref() = carbon::Result::NOTFOUND;
reply.result() = carbon::Result::NOTFOUND;
recordGet(false /* miss */);

if (config_.verbose) {
Expand All @@ -273,8 +272,8 @@ folly::SemiFuture<UcbGetReply> UcacheBenchServer::processUcbGet(
}
} catch (const std::exception& ex) {
printf("Error processing get request: %s\n", ex.what());
reply.result_ref() = carbon::Result::REMOTE_ERROR;
reply.message_ref() = ex.what();
reply.result() = carbon::Result::REMOTE_ERROR;
reply.message() = ex.what();
}

return folly::makeSemiFuture(std::move(reply));
Expand All @@ -283,14 +282,14 @@ folly::SemiFuture<UcbGetReply> UcacheBenchServer::processUcbGet(
folly::SemiFuture<UcbSetReply> UcacheBenchServer::processUcbSet(
const UcbSetRequest& req) {
UcbSetReply reply;
reply.result_ref() = carbon::Result::NOTSTORED;
reply.result() = carbon::Result::NOTSTORED;

try {
// Extract key from Carbon Keys type - convert to string
std::string keyStr = req.key_ref()->fullKey().str();
std::string keyStr = req.key()->fullKey().str();

// Extract value from IOBuf (need to work with the const IOBuf)
const auto& valueIoBuf = req.value_ref();
const auto& valueIoBuf = req.value();
auto valueStr = valueIoBuf->to<std::string>();

// Create item
Expand All @@ -303,9 +302,8 @@ folly::SemiFuture<UcbSetReply> UcacheBenchServer::processUcbSet(
// It returns the old item handle (if replaced) or null (if new insertion)
cache_->insertOrReplace(item);

reply.result_ref() = carbon::Result::STORED;
reply.flags_ref() =
req.flags_ref().has_value() ? req.flags_ref().value() : 0;
reply.result() = carbon::Result::STORED;
reply.flags() = req.flags().has_value() ? req.flags().value() : 0;

recordSet();

Expand All @@ -320,13 +318,13 @@ folly::SemiFuture<UcbSetReply> UcacheBenchServer::processUcbSet(
valueStr.size(),
static_cast<uint32_t>(poolId_));
}
reply.result_ref() = carbon::Result::NOTSTORED;
reply.message_ref() = "allocate failed";
reply.result() = carbon::Result::NOTSTORED;
reply.message() = "allocate failed";
}
} catch (const std::exception& ex) {
printf("Error processing set request: %s\n", ex.what());
reply.result_ref() = carbon::Result::REMOTE_ERROR;
reply.message_ref() = ex.what();
reply.result() = carbon::Result::REMOTE_ERROR;
reply.message() = ex.what();
}

return folly::makeSemiFuture(std::move(reply));
Expand All @@ -335,34 +333,33 @@ folly::SemiFuture<UcbSetReply> UcacheBenchServer::processUcbSet(
folly::SemiFuture<UcbDeleteReply> UcacheBenchServer::processUcbDelete(
const UcbDeleteRequest& req) {
UcbDeleteReply reply;
reply.result_ref() = carbon::Result::NOTFOUND;
reply.result() = carbon::Result::NOTFOUND;

try {
// Extract key from Carbon Keys type - convert to string
std::string keyStr = req.key_ref()->fullKey().str();
std::string keyStr = req.key()->fullKey().str();

// Try to remove from cache
auto removeResult = cache_->remove(keyStr);
if (removeResult == CacheAllocator::RemoveRes::kSuccess) {
reply.result_ref() = carbon::Result::DELETED;
reply.flags_ref() =
req.flags_ref().has_value() ? req.flags_ref().value() : 0;
reply.result() = carbon::Result::DELETED;
reply.flags() = req.flags().has_value() ? req.flags().value() : 0;

recordDelete();

if (config_.verbose) {
printf("Deleted key: %s\n", keyStr.c_str());
}
} else {
reply.result_ref() = carbon::Result::NOTFOUND;
reply.result() = carbon::Result::NOTFOUND;
if (config_.verbose) {
printf("Key not found for deletion: %s\n", keyStr.c_str());
}
}
} catch (const std::exception& ex) {
printf("Error processing delete request: %s\n", ex.what());
reply.result_ref() = carbon::Result::REMOTE_ERROR;
reply.message_ref() = ex.what();
reply.result() = carbon::Result::REMOTE_ERROR;
reply.message() = ex.what();
}

return folly::makeSemiFuture(std::move(reply));
Expand Down
Loading