Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[submodule "third_party/palloc"]
path = third_party/palloc
url = https://github.com/AutoCookies/palloc.git
13 changes: 9 additions & 4 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@ set(CMAKE_EXPORT_COMPILE_COMMANDS ON)

include_directories(include third_party)

set(PA_OVERRIDE ON)
add_subdirectory(third_party/palloc)

add_library(pomai_cache_core
src/engine/engine.cpp
src/engine/ssd_store.cpp
src/policy/policies.cpp
src/server/resp.cpp
src/server/http.cpp
src/server/ai_cache.cpp
src/metrics/info_metrics.cpp
src/util/time.cpp
Expand All @@ -22,6 +25,8 @@ add_library(pomai_cache_core
src/ds/compression.cpp
)

target_link_libraries(pomai_cache_core PUBLIC palloc)

if(NOT WIN32)
add_executable(pomai_cache_server src/server/server_main.cpp)
target_link_libraries(pomai_cache_server PRIVATE pomai_cache_core uring)
Expand All @@ -48,14 +53,14 @@ if(BUILD_TESTING)
add_executable(test_engine tests/test_engine.cpp)
target_link_libraries(test_engine PRIVATE pomai_cache_core mini_catch_main)

add_executable(test_resp tests/test_resp.cpp)
target_link_libraries(test_resp PRIVATE pomai_cache_core mini_catch_main)
add_executable(test_http tests/test_http.cpp)
target_link_libraries(test_http PRIVATE pomai_cache_core mini_catch_main)

add_executable(test_ai_cache tests/test_ai_cache.cpp)
target_link_libraries(test_ai_cache PRIVATE pomai_cache_core mini_catch_main)

add_test(NAME test_engine COMMAND test_engine)
add_test(NAME test_resp COMMAND test_resp)
add_test(NAME test_http COMMAND test_http)
add_test(NAME test_ai_cache COMMAND test_ai_cache)

if(NOT WIN32)
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# Pomai Cache v1

<img src="./assets/logo.png"/>

Redis-compatible (subset) local cache core with RAM+SSD tiering, bounded TTL cleanup, crash-safe append-only SSD segments, selectable eviction policy (`lru`, `lfu`, `pomai_cost`), and an AI artifact cache layer for embeddings/prompts/RAG/rerank/response reuse.

## Repo structure
Expand Down
79 changes: 39 additions & 40 deletions bench/pomai_cache_netbench.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
#include "pomai_cache/resp.hpp"

#include <algorithm>
#include <arpa/inet.h>
#include <atomic>
Expand Down Expand Up @@ -47,44 +45,40 @@ struct SharedStats {
};

std::string make_cmd(const std::vector<std::string> &args) {
std::string out = "*" + std::to_string(args.size()) + "\r\n";
for (const auto &a : args) {
out += "$" + std::to_string(a.size()) + "\r\n" + a + "\r\n";
if (args.empty()) return "";
std::string cmd = args[0];
if (cmd == "GET") {
return "GET /key/" + args[1] + " HTTP/1.1\r\n\r\n";
} else if (cmd == "SET") {
std::string req = "POST /key/" + args[1];
if (args.size() > 3 && args[3] == "PX") {
req += "?px=" + args[4];
}
req += " HTTP/1.1\r\nContent-Length: " + std::to_string(args[2].size()) + "\r\n\r\n" + args[2];
return req;
} else if (cmd == "INFO") {
return "GET /info HTTP/1.1\r\n\r\n";
}
return out;
return "";
}

std::optional<std::string> read_reply(int fd) {
std::string out;
char c = 0;
char buf[4096];
while (true) {
ssize_t r = recv(fd, &c, 1, 0);
if (r <= 0)
return std::nullopt;
out.push_back(c);
if (out.size() >= 2 && out[out.size() - 2] == '\r' &&
out[out.size() - 1] == '\n') {
if (out[0] == '+' || out[0] == '-' || out[0] == ':')
return out;
if (out[0] == '$') {
int len = std::stoi(out.substr(1, out.size() - 3));
if (len < 0)
int r = recv(fd, buf, 4096, 0);
if (r <= 0) return std::nullopt;
out.append(buf, r);
if (out.find("\r\n\r\n") != std::string::npos) {
auto pos = out.find("Content-Length: ");
if (pos != std::string::npos) {
auto end = out.find("\r\n", pos);
int len = std::stoi(out.substr(pos + 16, end - pos - 16));
auto header_end = out.find("\r\n\r\n") + 4;
if (out.size() >= header_end + len) {
return out;
std::string payload(len + 2, '\0');
ssize_t got = recv(fd, payload.data(), payload.size(), MSG_WAITALL);
if (got <= 0)
return std::nullopt;
out += payload;
return out;
}
if (out[0] == '*') {
int n = std::stoi(out.substr(1, out.size() - 3));
for (int i = 0; i < n; ++i) {
auto child = read_reply(fd);
if (!child)
return std::nullopt;
out += *child;
}
} else {
return out;
}
}
Expand Down Expand Up @@ -249,7 +243,7 @@ int main(int argc, char **argv) {
++shared.ops;
if (expect_get[i]) {
++shared.get_ops;
if (rep->rfind("$-1", 0) != 0)
if (rep->find("200 OK") != std::string::npos)
++shared.get_hits;
} else {
++shared.set_ops;
Expand All @@ -272,13 +266,18 @@ int main(int argc, char **argv) {
auto cmd = make_cmd({"INFO"});
send(infofd, cmd.data(), cmd.size(), 0);
auto rep = read_reply(infofd);
if (rep && rep->size() > 0 && (*rep)[0] == '$') {
auto crlf = rep->find("\r\n");
int len = std::stoi(rep->substr(1, crlf - 1));
std::string body = rep->substr(crlf + 2, len);
parse_info(body, mem, evictions, admissions, ram_hits, ssd_hits,
ssd_read_mb, ssd_write_mb, ssd_bytes, fragmentation,
index_rebuild_ms);
if (rep) {
auto header_end = rep->find("\r\n\r\n");
if (header_end != std::string::npos) {
std::string body = rep->substr(header_end + 4);
parse_info(body, mem, evictions, admissions, ram_hits, ssd_hits,
ssd_read_mb, ssd_write_mb, ssd_bytes, fragmentation,
index_rebuild_ms);
} else {
parse_info(*rep, mem, evictions, admissions, ram_hits, ssd_hits,
ssd_read_mb, ssd_write_mb, ssd_bytes, fragmentation,
index_rebuild_ms);
}
}
close(infofd);
}
Expand Down
45 changes: 45 additions & 0 deletions include/pomai_cache/http.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#pragma once

#include <string_view>
#include <string>
#include <vector>
#include <optional>
#include <unordered_map>

namespace pomai_cache {

struct HttpRequest {
std::string method;
std::string path;
std::unordered_map<std::string, std::string> query_params;
std::unordered_map<std::string, std::string> headers;
std::string body;
};

class HttpParser {
public:
enum class State {
REQUEST_LINE,
HEADERS,
BODY,
COMPLETE,
ERROR
};

void feed(std::string_view data);
std::optional<HttpRequest> next_request();

private:
std::string buffer_;
std::string_view view_;
State state_{State::REQUEST_LINE};
HttpRequest current_req_;
int expected_body_len_{0};

bool parse_request_line();
bool parse_headers();
};

std::string http_response(int status_code, const std::string& status_text, const std::string& body, const std::string& content_type = "text/plain");

} // namespace pomai_cache
1 change: 0 additions & 1 deletion include/pomai_cache/replica.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
#include <vector>
#include <iostream>
#include "pomai_cache/engine_shard.hpp"
#include "pomai_cache/resp.hpp"

/**
* Inspired by DragonflyDB's `dfly::Replica` (src/server/replica.h).
Expand Down
43 changes: 0 additions & 43 deletions include/pomai_cache/resp.hpp

This file was deleted.

19 changes: 19 additions & 0 deletions netbench_summary.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"workload": "writeheavy",
"ops_per_sec": 560.8,
"p50_us": 1451.08,
"p95_us": 3815.46,
"p99_us": 11651.1,
"p999_us": 14622.2,
"hit_rate": 0.718412,
"ram_hits": 0,
"ssd_hits": 0,
"ssd_bytes": 0,
"ssd_read_mb": 0,
"ssd_write_mb": 0,
"ssd_index_rebuild_ms": 0,
"fragmentation_estimate": 0,
"memory_used_bytes": 0,
"evictions_per_sec": 0,
"admissions_rejected_per_sec": 0
}
Binary file added netbench_test
Binary file not shown.
33 changes: 33 additions & 0 deletions netbench_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#include <iostream>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <cstring>

int main() {
int fd = socket(AF_INET, SOCK_STREAM, 0);
sockaddr_in addr{};
addr.sin_family = AF_INET;
addr.sin_port = htons(6379);
inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr);

if (connect(fd, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
std::cerr << "Connect failed\n";
return 1;
}

const char* req = "GET /info HTTP/1.1\r\n\r\n";
send(fd, req, strlen(req), 0);

char buf[4096];
int r = recv(fd, buf, 4096, 0);
if (r > 0) {
std::cout << "Received " << r << " bytes: " << std::string(buf, r) << "\n";
} else {
std::cout << "Receive failed or 0 bytes\n";
}

close(fd);
return 0;
}
Loading
Loading