diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..889e4cf --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "third_party/palloc"] + path = third_party/palloc + url = https://github.com/AutoCookies/palloc.git diff --git a/CMakeLists.txt b/CMakeLists.txt index 40ac88b..242129d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -7,11 +7,14 @@ set(CMAKE_EXPORT_COMPILE_COMMANDS ON) include_directories(include third_party) +set(PA_OVERRIDE ON) +add_subdirectory(third_party/palloc) + add_library(pomai_cache_core src/engine/engine.cpp src/engine/ssd_store.cpp src/policy/policies.cpp - src/server/resp.cpp + src/server/http.cpp src/server/ai_cache.cpp src/metrics/info_metrics.cpp src/util/time.cpp @@ -22,6 +25,8 @@ add_library(pomai_cache_core src/ds/compression.cpp ) +target_link_libraries(pomai_cache_core PUBLIC palloc) + if(NOT WIN32) add_executable(pomai_cache_server src/server/server_main.cpp) target_link_libraries(pomai_cache_server PRIVATE pomai_cache_core uring) @@ -48,14 +53,14 @@ if(BUILD_TESTING) add_executable(test_engine tests/test_engine.cpp) target_link_libraries(test_engine PRIVATE pomai_cache_core mini_catch_main) - add_executable(test_resp tests/test_resp.cpp) - target_link_libraries(test_resp PRIVATE pomai_cache_core mini_catch_main) + add_executable(test_http tests/test_http.cpp) + target_link_libraries(test_http PRIVATE pomai_cache_core mini_catch_main) add_executable(test_ai_cache tests/test_ai_cache.cpp) target_link_libraries(test_ai_cache PRIVATE pomai_cache_core mini_catch_main) add_test(NAME test_engine COMMAND test_engine) - add_test(NAME test_resp COMMAND test_resp) + add_test(NAME test_http COMMAND test_http) add_test(NAME test_ai_cache COMMAND test_ai_cache) if(NOT WIN32) diff --git a/README.md b/README.md index 9e46f4b..adc7071 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,7 @@ # Pomai Cache v1 + + Redis-compatible (subset) local cache core with RAM+SSD tiering, bounded TTL cleanup, crash-safe append-only SSD segments, selectable eviction policy (`lru`, `lfu`, `pomai_cost`), and an AI artifact cache layer for embeddings/prompts/RAG/rerank/response reuse. ## Repo structure diff --git a/bench/pomai_cache_netbench.cpp b/bench/pomai_cache_netbench.cpp index 2a51dbb..d53cfef 100644 --- a/bench/pomai_cache_netbench.cpp +++ b/bench/pomai_cache_netbench.cpp @@ -1,5 +1,3 @@ -#include "pomai_cache/resp.hpp" - #include #include #include @@ -47,44 +45,40 @@ struct SharedStats { }; std::string make_cmd(const std::vector &args) { - std::string out = "*" + std::to_string(args.size()) + "\r\n"; - for (const auto &a : args) { - out += "$" + std::to_string(a.size()) + "\r\n" + a + "\r\n"; + if (args.empty()) return ""; + std::string cmd = args[0]; + if (cmd == "GET") { + return "GET /key/" + args[1] + " HTTP/1.1\r\n\r\n"; + } else if (cmd == "SET") { + std::string req = "POST /key/" + args[1]; + if (args.size() > 3 && args[3] == "PX") { + req += "?px=" + args[4]; + } + req += " HTTP/1.1\r\nContent-Length: " + std::to_string(args[2].size()) + "\r\n\r\n" + args[2]; + return req; + } else if (cmd == "INFO") { + return "GET /info HTTP/1.1\r\n\r\n"; } - return out; + return ""; } std::optional read_reply(int fd) { std::string out; - char c = 0; + char buf[4096]; while (true) { - ssize_t r = recv(fd, &c, 1, 0); - if (r <= 0) - return std::nullopt; - out.push_back(c); - if (out.size() >= 2 && out[out.size() - 2] == '\r' && - out[out.size() - 1] == '\n') { - if (out[0] == '+' || out[0] == '-' || out[0] == ':') - return out; - if (out[0] == '$') { - int len = std::stoi(out.substr(1, out.size() - 3)); - if (len < 0) + int r = recv(fd, buf, 4096, 0); + if (r <= 0) return std::nullopt; + out.append(buf, r); + if (out.find("\r\n\r\n") != std::string::npos) { + auto pos = out.find("Content-Length: "); + if (pos != std::string::npos) { + auto end = out.find("\r\n", pos); + int len = std::stoi(out.substr(pos + 16, end - pos - 16)); + auto header_end = out.find("\r\n\r\n") + 4; + if (out.size() >= header_end + len) { return out; - std::string payload(len + 2, '\0'); - ssize_t got = recv(fd, payload.data(), payload.size(), MSG_WAITALL); - if (got <= 0) - return std::nullopt; - out += payload; - return out; - } - if (out[0] == '*') { - int n = std::stoi(out.substr(1, out.size() - 3)); - for (int i = 0; i < n; ++i) { - auto child = read_reply(fd); - if (!child) - return std::nullopt; - out += *child; } + } else { return out; } } @@ -249,7 +243,7 @@ int main(int argc, char **argv) { ++shared.ops; if (expect_get[i]) { ++shared.get_ops; - if (rep->rfind("$-1", 0) != 0) + if (rep->find("200 OK") != std::string::npos) ++shared.get_hits; } else { ++shared.set_ops; @@ -272,13 +266,18 @@ int main(int argc, char **argv) { auto cmd = make_cmd({"INFO"}); send(infofd, cmd.data(), cmd.size(), 0); auto rep = read_reply(infofd); - if (rep && rep->size() > 0 && (*rep)[0] == '$') { - auto crlf = rep->find("\r\n"); - int len = std::stoi(rep->substr(1, crlf - 1)); - std::string body = rep->substr(crlf + 2, len); - parse_info(body, mem, evictions, admissions, ram_hits, ssd_hits, - ssd_read_mb, ssd_write_mb, ssd_bytes, fragmentation, - index_rebuild_ms); + if (rep) { + auto header_end = rep->find("\r\n\r\n"); + if (header_end != std::string::npos) { + std::string body = rep->substr(header_end + 4); + parse_info(body, mem, evictions, admissions, ram_hits, ssd_hits, + ssd_read_mb, ssd_write_mb, ssd_bytes, fragmentation, + index_rebuild_ms); + } else { + parse_info(*rep, mem, evictions, admissions, ram_hits, ssd_hits, + ssd_read_mb, ssd_write_mb, ssd_bytes, fragmentation, + index_rebuild_ms); + } } close(infofd); } diff --git a/include/pomai_cache/http.hpp b/include/pomai_cache/http.hpp new file mode 100644 index 0000000..1aa805a --- /dev/null +++ b/include/pomai_cache/http.hpp @@ -0,0 +1,45 @@ +#pragma once + +#include +#include +#include +#include +#include + +namespace pomai_cache { + +struct HttpRequest { + std::string method; + std::string path; + std::unordered_map query_params; + std::unordered_map headers; + std::string body; +}; + +class HttpParser { +public: + enum class State { + REQUEST_LINE, + HEADERS, + BODY, + COMPLETE, + ERROR + }; + + void feed(std::string_view data); + std::optional next_request(); + +private: + std::string buffer_; + std::string_view view_; + State state_{State::REQUEST_LINE}; + HttpRequest current_req_; + int expected_body_len_{0}; + + bool parse_request_line(); + bool parse_headers(); +}; + +std::string http_response(int status_code, const std::string& status_text, const std::string& body, const std::string& content_type = "text/plain"); + +} // namespace pomai_cache diff --git a/include/pomai_cache/replica.hpp b/include/pomai_cache/replica.hpp index 7a8e026..70ee5b6 100644 --- a/include/pomai_cache/replica.hpp +++ b/include/pomai_cache/replica.hpp @@ -6,7 +6,6 @@ #include #include #include "pomai_cache/engine_shard.hpp" -#include "pomai_cache/resp.hpp" /** * Inspired by DragonflyDB's `dfly::Replica` (src/server/replica.h). diff --git a/include/pomai_cache/resp.hpp b/include/pomai_cache/resp.hpp deleted file mode 100644 index 8fb430c..0000000 --- a/include/pomai_cache/resp.hpp +++ /dev/null @@ -1,43 +0,0 @@ -#include -#include -#include -#include - -namespace pomai_cache { - -/** - * Optimized Zero-Copy RESP Parser. - * Inspired by DragonflyDB's `facade::RedisParser` (src/facade/redis_parser.h). - */ -class RespParser { -public: - enum class State { - IDLE, - ARRAY_LEN, - BULK_LEN, - BULK_DATA, - SIMPLE_STR, - ERROR_STR, - INTEGER - }; - - void feed(std::string_view data); - std::optional> next_command(); - -private: - std::string buffer_; - std::string_view view_; - State state_{State::IDLE}; - int argc_{0}; - int bulk_len_{0}; - std::vector current_cmd_; -}; - -std::string resp_simple(const std::string &s); -std::string resp_error(const std::string &s); -std::string resp_integer(long long v); -std::string resp_bulk(const std::string &s); -std::string resp_null(); -std::string resp_array(const std::vector &items); - -} // namespace pomai_cache diff --git a/netbench_summary.json b/netbench_summary.json new file mode 100644 index 0000000..3d7693b --- /dev/null +++ b/netbench_summary.json @@ -0,0 +1,19 @@ +{ + "workload": "writeheavy", + "ops_per_sec": 560.8, + "p50_us": 1451.08, + "p95_us": 3815.46, + "p99_us": 11651.1, + "p999_us": 14622.2, + "hit_rate": 0.718412, + "ram_hits": 0, + "ssd_hits": 0, + "ssd_bytes": 0, + "ssd_read_mb": 0, + "ssd_write_mb": 0, + "ssd_index_rebuild_ms": 0, + "fragmentation_estimate": 0, + "memory_used_bytes": 0, + "evictions_per_sec": 0, + "admissions_rejected_per_sec": 0 +} diff --git a/netbench_test b/netbench_test new file mode 100755 index 0000000..25e308f Binary files /dev/null and b/netbench_test differ diff --git a/netbench_test.cpp b/netbench_test.cpp new file mode 100644 index 0000000..ff2ed46 --- /dev/null +++ b/netbench_test.cpp @@ -0,0 +1,33 @@ +#include +#include +#include +#include +#include +#include + +int main() { + int fd = socket(AF_INET, SOCK_STREAM, 0); + sockaddr_in addr{}; + addr.sin_family = AF_INET; + addr.sin_port = htons(6379); + inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr); + + if (connect(fd, (struct sockaddr*)&addr, sizeof(addr)) < 0) { + std::cerr << "Connect failed\n"; + return 1; + } + + const char* req = "GET /info HTTP/1.1\r\n\r\n"; + send(fd, req, strlen(req), 0); + + char buf[4096]; + int r = recv(fd, buf, 4096, 0); + if (r > 0) { + std::cout << "Received " << r << " bytes: " << std::string(buf, r) << "\n"; + } else { + std::cout << "Receive failed or 0 bytes\n"; + } + + close(fd); + return 0; +} diff --git a/src/server/http.cpp b/src/server/http.cpp new file mode 100644 index 0000000..b8119d0 --- /dev/null +++ b/src/server/http.cpp @@ -0,0 +1,141 @@ +#include "pomai_cache/http.hpp" +#include +#include + +namespace pomai_cache { + +void HttpParser::feed(std::string_view data) { + if (view_.empty()) { + buffer_.assign(data); + view_ = buffer_; + } else { + size_t offset = view_.data() - buffer_.data(); + buffer_.append(data); + view_ = std::string_view(buffer_).substr(offset); + } +} + +bool HttpParser::parse_request_line() { + auto pos = view_.find("\r\n"); + if (pos == std::string_view::npos) return false; + + auto line = view_.substr(0, pos); + view_.remove_prefix(pos + 2); + + auto sp1 = line.find(' '); + if (sp1 == std::string_view::npos) { state_ = State::ERROR; return false; } + current_req_.method = std::string(line.substr(0, sp1)); + + auto sp2 = line.find(' ', sp1 + 1); + if (sp2 == std::string_view::npos) { state_ = State::ERROR; return false; } + + auto full_path = line.substr(sp1 + 1, sp2 - sp1 - 1); + auto q_pos = full_path.find('?'); + if (q_pos != std::string_view::npos) { + current_req_.path = std::string(full_path.substr(0, q_pos)); + auto query_str = full_path.substr(q_pos + 1); + + // Parse query params (simple parsing) + size_t start = 0; + while (start < query_str.size()) { + auto amp = query_str.find('&', start); + auto pair = query_str.substr(start, amp == std::string_view::npos ? std::string_view::npos : amp - start); + auto eq = pair.find('='); + if (eq != std::string_view::npos) { + current_req_.query_params[std::string(pair.substr(0, eq))] = std::string(pair.substr(eq + 1)); + } else { + current_req_.query_params[std::string(pair)] = ""; + } + if (amp == std::string_view::npos) break; + start = amp + 1; + } + } else { + current_req_.path = std::string(full_path); + } + + state_ = State::HEADERS; + return true; +} + +bool HttpParser::parse_headers() { + while (true) { + auto pos = view_.find("\r\n"); + if (pos == std::string_view::npos) return false; + + if (pos == 0) { + view_.remove_prefix(2); + auto it = current_req_.headers.find("Content-Length"); + if (it != current_req_.headers.end()) { + expected_body_len_ = std::stoi(it->second); + state_ = expected_body_len_ > 0 ? State::BODY : State::COMPLETE; + } else { + expected_body_len_ = 0; + state_ = State::COMPLETE; + } + return true; + } + + auto line = view_.substr(0, pos); + view_.remove_prefix(pos + 2); + + auto colon = line.find(':'); + if (colon != std::string_view::npos) { + auto key = std::string(line.substr(0, colon)); + auto val = line.substr(colon + 1); + while (!val.empty() && (val[0] == ' ' || val[0] == '\t')) val.remove_prefix(1); + current_req_.headers[key] = std::string(val); + } + } +} + +std::optional HttpParser::next_request() { + while (!view_.empty()) { + switch (state_) { + case State::REQUEST_LINE: + if (!parse_request_line()) return std::nullopt; + break; + case State::HEADERS: + if (!parse_headers()) return std::nullopt; + break; + case State::BODY: + if (view_.size() >= static_cast(expected_body_len_)) { + current_req_.body = std::string(view_.substr(0, expected_body_len_)); + view_.remove_prefix(expected_body_len_); + state_ = State::COMPLETE; + } else { + return std::nullopt; + } + break; + case State::COMPLETE: { + auto req = std::move(current_req_); + current_req_ = HttpRequest(); + state_ = State::REQUEST_LINE; + if (view_.empty()) buffer_.clear(); + return req; + } + case State::ERROR: + return std::nullopt; + } + } + if (state_ == State::COMPLETE) { + auto req = std::move(current_req_); + current_req_ = HttpRequest(); + state_ = State::REQUEST_LINE; + if (view_.empty()) buffer_.clear(); + return req; + } + return std::nullopt; +} + +std::string http_response(int status_code, const std::string& status_text, const std::string& body, const std::string& content_type) { + std::ostringstream oss; + oss << "HTTP/1.1 " << status_code << " " << status_text << "\r\n"; + oss << "Content-Length: " << body.size() << "\r\n"; + oss << "Content-Type: " << content_type << "\r\n"; + oss << "Connection: keep-alive\r\n"; + oss << "\r\n"; + oss << body; + return oss.str(); +} + +} // namespace pomai_cache diff --git a/src/server/resp.cpp b/src/server/resp.cpp deleted file mode 100644 index 604511f..0000000 --- a/src/server/resp.cpp +++ /dev/null @@ -1,113 +0,0 @@ -#include "pomai_cache/resp.hpp" -#include -#include - -namespace pomai_cache { - -void RespParser::feed(std::string_view data) { - if (view_.empty()) { - buffer_.assign(data); - view_ = buffer_; - } else { - // If view_ points into buffer_, we might need to preserve it - size_t offset = view_.data() - buffer_.data(); - buffer_.append(data); - view_ = std::string_view(buffer_).substr(offset); - } -} - -std::optional> RespParser::next_command() { - while (!view_.empty()) { - switch (state_) { - case State::IDLE: - if (view_[0] == '*') { - state_ = State::ARRAY_LEN; - view_.remove_prefix(1); - } else if (view_[0] == '$') { - // Special case for malformed single bulk string if test expects it - state_ = State::BULK_LEN; - view_.remove_prefix(1); - argc_ = 1; - current_cmd_.clear(); - } else { - auto pos = view_.find("\r\n"); - if (pos == std::string_view::npos) return std::nullopt; - view_.remove_prefix(pos + 2); - } - break; - - case State::ARRAY_LEN: { - auto pos = view_.find("\r\n"); - if (pos == std::string_view::npos) return std::nullopt; - auto len_str = view_.substr(0, pos); - if (std::from_chars(len_str.data(), len_str.data() + len_str.size(), argc_).ec != std::errc{}) { - argc_ = 0; - } - view_.remove_prefix(pos + 2); - current_cmd_.clear(); - if (argc_ <= 0) { - state_ = State::IDLE; - return std::nullopt; - } - current_cmd_.reserve(argc_); - state_ = State::BULK_LEN; - break; - } - - case State::BULK_LEN: { - if (view_.empty()) return std::nullopt; - if (view_[0] != '$') { - state_ = State::IDLE; - return std::nullopt; - } - view_.remove_prefix(1); - auto pos = view_.find("\r\n"); - if (pos == std::string_view::npos) return std::nullopt; - auto len_str = view_.substr(0, pos); - if (std::from_chars(len_str.data(), len_str.data() + len_str.size(), bulk_len_).ec != std::errc{} || bulk_len_ < 0) { - state_ = State::IDLE; - return std::nullopt; - } - view_.remove_prefix(pos + 2); - state_ = State::BULK_DATA; - break; - } - - case State::BULK_DATA: { - if (view_.size() < static_cast(bulk_len_) + 2) return std::nullopt; - current_cmd_.emplace_back(view_.substr(0, bulk_len_)); - view_.remove_prefix(bulk_len_ + 2); - if (static_cast(current_cmd_.size()) == argc_) { - state_ = State::IDLE; - auto res = std::move(current_cmd_); - if (view_.empty()) buffer_.clear(); - return res; - } - state_ = State::BULK_LEN; - break; - } - default: - state_ = State::IDLE; - break; - } - } - return std::nullopt; -} - -std::string resp_simple(const std::string &s) { return "+" + s + "\r\n"; } -std::string resp_error(const std::string &s) { return "-ERR " + s + "\r\n"; } -std::string resp_integer(long long v) { - return ":" + std::to_string(v) + "\r\n"; -} -std::string resp_bulk(const std::string &s) { - return "$" + std::to_string(s.size()) + "\r\n" + s + "\r\n"; -} -std::string resp_null() { return "$-1\r\n"; } -std::string resp_array(const std::vector &items) { - std::string out = "*" + std::to_string(items.size()) + "\r\n"; - for (const auto &i : items) - out += i; - return out; -} - -} // namespace pomai_cache diff --git a/src/server/server_main.cpp b/src/server/server_main.cpp index 54ec215..12677a7 100644 --- a/src/server/server_main.cpp +++ b/src/server/server_main.cpp @@ -1,7 +1,7 @@ #include "pomai_cache/ai_cache.hpp" #include "pomai_cache/engine.hpp" #include "pomai_cache/engine_shard.hpp" -#include "pomai_cache/resp.hpp" +#include "pomai_cache/http.hpp" #include #include @@ -33,7 +33,7 @@ bool parse_u64(std::string_view s, std::uint64_t &out) { } struct ClientState { - pomai_cache::RespParser parser; + pomai_cache::HttpParser parser; std::string out; std::string sending; int fd; @@ -62,7 +62,7 @@ class UringWorker { addr.sin_addr.s_addr = INADDR_ANY; addr.sin_port = htons(port_); if (bind(listen_fd, reinterpret_cast(&addr), sizeof(addr)) < 0) { - std::cerr << "Thread " << id_ << " bind failed\n"; + std::cerr << "Worker " << id_ << " bind failed\n"; return; } listen(listen_fd, 128); @@ -101,7 +101,6 @@ class UringWorker { if (fd == listen_fd) { int cfd = cqe->res; if (cfd >= 0) { - std::cout << "DEBUG: Accept on fd " << cfd << "\n"; auto client = std::make_unique(); client->fd = cfd; clients[cfd] = std::move(client); @@ -119,11 +118,9 @@ class UringWorker { close(fd); clients.erase(it); } else { - std::cout << "DEBUG: Received " << r << " bytes from fd " << fd << "\n"; st.parser.feed(std::string_view(st.buf, r)); - while (auto cmd = st.parser.next_command()) { - std::cout << "DEBUG: Command " << (cmd->empty() ? "" : std::string((*cmd)[0])) << "\n"; - handle_command(st, *cmd); + while (auto req = st.parser.next_request()) { + handle_http_request(st, *req); } if (!st.is_sending && !st.out.empty()) { st.is_sending = true; @@ -175,133 +172,118 @@ class UringWorker { } private: - void handle_command(ClientState& st, const std::vector& cmd) { - if (cmd.empty()) return; - std::string c = upper(cmd[0]); - - if (c == "PING") { - st.out += pomai_cache::resp_simple("PONG"); - return; + std::vector split_path(const std::string& path) { + std::vector parts; + std::size_t start = 0; + while (start < path.size()) { + if (path[start] == '/') { + start++; + continue; + } + auto end = path.find('/', start); + if (end == std::string::npos) { + parts.push_back(path.substr(start)); + break; + } + parts.push_back(path.substr(start, end - start)); + start = end + 1; } + return parts; + } - // Security (AUTH) - if (c == "AUTH") { - if (cmd.size() == 2 && cmd[1] == "pomai_admin") { // Simulating a simple admin password - st.out += pomai_cache::resp_simple("OK"); - } else { - st.out += pomai_cache::resp_error("invalid password"); - } + void handle_http_request(ClientState& st, const pomai_cache::HttpRequest& req) { + auto parts = split_path(req.path); + if (parts.empty()) { + st.out += pomai_cache::http_response(404, "Not Found", "Path missing"); return; } - if (c == "INFO") { + std::string base = parts[0]; + + // INFO + if (base == "info" && req.method == "GET") { auto shards = pomai_cache::ShardSet::instance().all_shards(); std::string combined; - for (auto* s : shards) - combined += s->engine().info(); - st.out += pomai_cache::resp_bulk(combined); + for (auto* s : shards) combined += s->engine().info(); + st.out += pomai_cache::http_response(200, "OK", combined); return; } - if (c == "CONFIG") { - if (cmd.size() >= 3 && upper(cmd[1]) == "GET") { - std::string param = upper(cmd[2]); - if (param == "POLICY") { - auto shards = pomai_cache::ShardSet::instance().all_shards(); - std::string name = shards.empty() ? "unknown" : shards[0]->engine().policy().name(); - st.out += pomai_cache::resp_array({ - pomai_cache::resp_bulk("POLICY"), - pomai_cache::resp_bulk(name) - }); - } else { - st.out += pomai_cache::resp_array({}); - } + // CONFIG + if (base == "config" && req.method == "GET") { + if (parts.size() >= 2 && parts[1] == "policy") { + auto shards = pomai_cache::ShardSet::instance().all_shards(); + std::string name = shards.empty() ? "unknown" : shards[0]->engine().policy().name(); + st.out += pomai_cache::http_response(200, "OK", name); } else { - st.out += pomai_cache::resp_error("CONFIG subcommand not supported"); + st.out += pomai_cache::http_response(400, "Bad Request", "CONFIG param not supported"); } return; } - // Key-based routing - if (c == "GET" || c == "SET" || c == "DEL" || c == "EXPIRE" || c == "TTL") { - if (cmd.size() < 2) { - st.out += pomai_cache::resp_error("wrong number of arguments"); - return; - } - std::string key(cmd[1]); + // KEY Ops: /key/ + if (base == "key" && parts.size() >= 2) { + std::string key = parts[1]; auto* shard = pomai_cache::ShardSet::instance().get_shard(key); if (!shard) { - st.out += pomai_cache::resp_error("no shards available"); + st.out += pomai_cache::http_response(503, "Service Unavailable", "No shards"); return; } auto& engine = shard->engine(); - if (c == "GET") { + if (req.method == "GET") { auto v = engine.get(key); - st.out += v ? pomai_cache::resp_bulk(std::string(v->begin(), v->end())) : pomai_cache::resp_null(); - } else if (c == "SET") { - if (cmd.size() < 3) { st.out += pomai_cache::resp_error("SET key value"); return; } - std::vector val(cmd[2].begin(), cmd[2].end()); + if (v) { + st.out += pomai_cache::http_response(200, "OK", std::string(v->begin(), v->end())); + } else { + st.out += pomai_cache::http_response(404, "Not Found", "Key not found"); + } + } else if (req.method == "POST") { + std::vector val(req.body.begin(), req.body.end()); std::optional ttl_ms; - for (std::size_t i = 3; i + 1 < cmd.size(); i += 2) { - std::string opt = upper(cmd[i]); - std::uint64_t tv = 0; - if (!parse_u64(cmd[i + 1], tv)) continue; - if (opt == "PX") ttl_ms = tv; - else if (opt == "EX") ttl_ms = tv * 1000; + + auto it = req.query_params.find("ex"); + if (it != req.query_params.end()) { + std::uint64_t v = 0; + if (parse_u64(it->second, v)) ttl_ms = v * 1000; + } + it = req.query_params.find("px"); + if (it != req.query_params.end()) { + std::uint64_t v = 0; + if (parse_u64(it->second, v)) ttl_ms = v; } + std::string set_err; if (engine.set(key, val, ttl_ms, "default", &set_err)) { - shard->journal().record(pomai_cache::OpCode::SET, cmd); - st.out += pomai_cache::resp_simple("OK"); + std::vector jcmd = {"SET", key, req.body}; + if (ttl_ms) { jcmd.push_back("PX"); jcmd.push_back(std::to_string(*ttl_ms)); } + shard->journal().record(pomai_cache::OpCode::SET, jcmd); + st.out += pomai_cache::http_response(200, "OK", "OK"); } else { - st.out += pomai_cache::resp_error(set_err); - } - } else if (c == "DEL") { - st.out += pomai_cache::resp_integer(engine.del({key})); - } else if (c == "EXPIRE") { - std::uint64_t ttl_s = 0; - if (cmd.size() == 3 && parse_u64(cmd[2], ttl_s)) { - st.out += pomai_cache::resp_integer(engine.expire(key, ttl_s)); - } else st.out += pomai_cache::resp_error("invalid expire"); - } else if (c == "TTL") { - auto t = engine.ttl(key); - st.out += pomai_cache::resp_integer(t ? *t : -2); - } - return; - } - - // Multi-key routing (MGET) - if (c == "MGET") { - if (cmd.size() < 2) st.out += pomai_cache::resp_error("MGET key [key...]"); - else { - std::vector arr; - for (size_t i = 1; i < cmd.size(); ++i) { - std::string key(cmd[i]); - auto* shard = pomai_cache::ShardSet::instance().get_shard(key); - if (!shard) arr.push_back(pomai_cache::resp_null()); - else { - auto v = shard->engine().get(key); - arr.push_back(v ? pomai_cache::resp_bulk(std::string(v->begin(), v->end())) : pomai_cache::resp_null()); - } + st.out += pomai_cache::http_response(400, "Bad Request", set_err); } - st.out += pomai_cache::resp_array(arr); + } else if (req.method == "DELETE") { + int d = engine.del({key}); + st.out += pomai_cache::http_response(200, "OK", std::to_string(d)); + } else { + st.out += pomai_cache::http_response(405, "Method Not Allowed", "Use GET, POST or DELETE"); } return; } - // AI commands routing - if (c.rfind("AI.", 0) == 0) { - if (c == "AI.STATS") { + // AI Operations + if (base == "ai" && parts.size() >= 2) { + std::string op = parts[1]; + + if (op == "stats" && req.method == "GET") { auto shards = pomai_cache::ShardSet::instance().all_shards(); std::string combined; - for (auto* s : shards) - combined += s->ai_cache().stats(); - st.out += pomai_cache::resp_bulk(combined); + for (auto* s : shards) combined += s->ai_cache().stats(); + st.out += pomai_cache::http_response(200, "OK", combined); return; } - - if (c == "AI.COST.REPORT") { + + if (op == "cost_report" && req.method == "GET") { auto shards = pomai_cache::ShardSet::instance().all_shards(); std::ostringstream os; double total_saved = 0; @@ -317,22 +299,26 @@ class UringWorker { os << "total_tokens_saved:" << total_tokens << "\n"; os << "total_latency_saved_ms:" << total_latency << "\n"; os << "total_hits:" << total_hits << "\n"; - st.out += pomai_cache::resp_bulk(os.str()); + st.out += pomai_cache::http_response(200, "OK", os.str()); return; } - if (c == "AI.BUDGET" && cmd.size() >= 2) { - double budget = std::stod(cmd[1]); - auto shards = pomai_cache::ShardSet::instance().all_shards(); - for (auto* s : shards) - s->ai_cache().set_budget(budget / static_cast(shards.size())); - st.out += pomai_cache::resp_simple("OK"); + if (op == "budget" && req.method == "POST") { + auto it = req.query_params.find("value"); + if (it != req.query_params.end()) { + double budget = std::stod(it->second); + auto shards = pomai_cache::ShardSet::instance().all_shards(); + for (auto* s : shards) s->ai_cache().set_budget(budget / static_cast(shards.size())); + st.out += pomai_cache::http_response(200, "OK", "OK"); + } else { + st.out += pomai_cache::http_response(400, "Bad Request", "Missing value"); + } return; } - if (c == "AI.INVALIDATE" && cmd.size() >= 3) { - std::string subcmd = upper(cmd[1]); - std::string arg(cmd[2]); + if (op == "invalidate" && req.method == "POST" && parts.size() >= 4) { + std::string subcmd = upper(parts[2]); + std::string arg = parts[3]; std::size_t total = 0; auto shards = pomai_cache::ShardSet::instance().all_shards(); for (auto* s : shards) { @@ -341,169 +327,108 @@ class UringWorker { else if (subcmd == "PREFIX") total += s->ai_cache().invalidate_prefix(arg); else if (subcmd == "CASCADE") total += s->ai_cache().invalidate_cascade(arg); } - st.out += pomai_cache::resp_integer(static_cast(total)); + st.out += pomai_cache::http_response(200, "OK", std::to_string(total)); return; } - if (c == "AI.SIM.PUT" && cmd.size() >= 4) { - std::string key(cmd[1]); - auto* shard = pomai_cache::ShardSet::instance().get_shard(key); - if (!shard) { st.out += pomai_cache::resp_error("no shard"); return; } - - const auto& vec_str = cmd[2]; - std::vector vec; - std::istringstream vss(vec_str); - float val; - while (vss >> val) { vec.push_back(val); if (vss.peek() == ',') vss.ignore(); } - - std::vector payload(cmd[3].begin(), cmd[3].end()); - std::string meta_json = cmd.size() >= 5 ? cmd[4] : "{\"artifact_type\":\"embedding\",\"owner\":\"vector\",\"schema_version\":\"v1\"}"; - std::string err; - if (shard->ai_cache().sim_put(key, vec, payload, meta_json, &err)) - st.out += pomai_cache::resp_simple("OK"); - else - st.out += pomai_cache::resp_error(err); - return; - } - - if (c == "AI.SIM.GET" && cmd.size() >= 2) { - const auto& vec_str = cmd[1]; - std::vector query; - std::istringstream vss(vec_str); - float val; - while (vss >> val) { query.push_back(val); if (vss.peek() == ',') vss.ignore(); } - - std::size_t top_k = 1; - float threshold = 0.9f; - for (std::size_t i = 2; i + 1 < cmd.size(); i += 2) { - std::string opt = upper(cmd[i]); - if (opt == "TOPK") top_k = std::stoull(cmd[i+1]); - else if (opt == "THRESHOLD") threshold = std::stof(cmd[i+1]); + if (op == "sim" && parts.size() >= 3) { + std::string subcmd = parts[2]; + + if (subcmd == "put" && req.method == "POST" && parts.size() >= 4) { + std::string key = parts[3]; + auto* shard = pomai_cache::ShardSet::instance().get_shard(key); + if (!shard) { st.out += pomai_cache::http_response(503, "Service Unavailable", "no shard"); return; } + + auto it = req.query_params.find("vec"); + if (it == req.query_params.end()) { st.out += pomai_cache::http_response(400, "Bad Request", "missing vec"); return; } + + std::vector vec; + std::istringstream vss(it->second); + float val; + while (vss >> val) { vec.push_back(val); if (vss.peek() == ',') vss.ignore(); } + + std::vector payload(req.body.begin(), req.body.end()); + + auto meta_it = req.query_params.find("meta"); + std::string meta_json = meta_it != req.query_params.end() ? meta_it->second : "{\"artifact_type\":\"embedding\",\"owner\":\"vector\",\"schema_version\":\"v1\"}"; + + std::string err; + if (shard->ai_cache().sim_put(key, vec, payload, meta_json, &err)) + st.out += pomai_cache::http_response(200, "OK", "OK"); + else + st.out += pomai_cache::http_response(400, "Bad Request", err); + return; } - - auto shards = pomai_cache::ShardSet::instance().all_shards(); - std::vector arr; - for (auto* s : shards) { - auto results = s->ai_cache().sim_get(query, top_k, threshold); - for (const auto& r : results) { - arr.push_back(pomai_cache::resp_bulk(r.key)); - arr.push_back(pomai_cache::resp_bulk(std::to_string(r.score))); - arr.push_back(pomai_cache::resp_bulk( - pomai_cache::AiArtifactCache::meta_to_json(r.value.meta))); - arr.push_back(pomai_cache::resp_bulk( - std::string(r.value.payload.begin(), r.value.payload.end()))); + + if (subcmd == "get" && req.method == "GET") { + auto it = req.query_params.find("vec"); + if (it == req.query_params.end()) { st.out += pomai_cache::http_response(400, "Bad Request", "missing vec"); return; } + std::vector query; + std::istringstream vss(it->second); + float val; + while (vss >> val) { query.push_back(val); if (vss.peek() == ',') vss.ignore(); } + + std::size_t top_k = 1; + float threshold = 0.9f; + auto kt = req.query_params.find("topk"); + if (kt != req.query_params.end()) top_k = std::stoull(kt->second); + auto tt = req.query_params.find("threshold"); + if (tt != req.query_params.end()) threshold = std::stof(tt->second); + + auto shards = pomai_cache::ShardSet::instance().all_shards(); + std::ostringstream arr; + for (auto* s : shards) { + auto results = s->ai_cache().sim_get(query, top_k, threshold); + for (const auto& r : results) { + arr << "key:" << r.key << " score:" << r.score << "\n"; + arr << "meta:" << pomai_cache::AiArtifactCache::meta_to_json(r.value.meta) << "\n"; + arr << "body:" << std::string(r.value.payload.begin(), r.value.payload.end()) << "\n"; + } } + st.out += pomai_cache::http_response(200, "OK", arr.str()); + return; } - st.out += pomai_cache::resp_array(arr); - return; - } - - if (c == "AI.STREAM.BEGIN" && cmd.size() >= 3) { - std::string key(cmd[1]); - auto* shard = pomai_cache::ShardSet::instance().get_shard(key); - if (!shard) { st.out += pomai_cache::resp_error("no shard"); return; } - std::string err; - if (shard->ai_cache().stream_begin(key, cmd[2], &err)) - st.out += pomai_cache::resp_simple("OK"); - else - st.out += pomai_cache::resp_error(err); - return; - } - - if (c == "AI.STREAM.APPEND" && cmd.size() >= 3) { - std::string key(cmd[1]); - auto* shard = pomai_cache::ShardSet::instance().get_shard(key); - if (!shard) { st.out += pomai_cache::resp_error("no shard"); return; } - std::vector chunk(cmd[2].begin(), cmd[2].end()); - std::string err; - if (shard->ai_cache().stream_append(key, chunk, &err)) - st.out += pomai_cache::resp_simple("OK"); - else - st.out += pomai_cache::resp_error(err); - return; } - - if (c == "AI.STREAM.END" && cmd.size() >= 2) { - std::string key(cmd[1]); + + if (op == "put" && req.method == "POST" && parts.size() >= 4) { + std::string type = parts[2]; + std::string key = parts[3]; + auto* shard = pomai_cache::ShardSet::instance().get_shard(key); - if (!shard) { st.out += pomai_cache::resp_error("no shard"); return; } + if (!shard) { st.out += pomai_cache::http_response(503, "Service Unavailable", "no shard"); return; } + + std::vector payload(req.body.begin(), req.body.end()); + auto it = req.query_params.find("meta"); + std::string meta = it != req.query_params.end() ? it->second : "{}"; + std::string err; - if (shard->ai_cache().stream_end(key, &err)) - st.out += pomai_cache::resp_simple("OK"); - else - st.out += pomai_cache::resp_error(err); + if (shard->ai_cache().put(type, key, meta, payload, &err)) { + std::vector jcmd = {"AI.PUT", type, key, meta, req.body}; + shard->journal().record(pomai_cache::OpCode::AI_PUT, jcmd); + st.out += pomai_cache::http_response(200, "OK", "OK"); + } else { + st.out += pomai_cache::http_response(400, "Bad Request", err); + } return; } - - if (c == "AI.STREAM.GET" && cmd.size() >= 2) { - std::string key(cmd[1]); + + if (op == "get" && req.method == "GET" && parts.size() >= 3) { + std::string key = parts[2]; auto* shard = pomai_cache::ShardSet::instance().get_shard(key); - if (!shard) { st.out += pomai_cache::resp_error("no shard"); return; } - auto v = shard->ai_cache().stream_get(key); - if (!v) st.out += pomai_cache::resp_null(); - else { - std::vector arr{ - pomai_cache::resp_bulk(pomai_cache::AiArtifactCache::meta_to_json(v->meta)), - pomai_cache::resp_bulk(std::string(v->payload.begin(), v->payload.end())) - }; - st.out += pomai_cache::resp_array(arr); + if (!shard) { st.out += pomai_cache::http_response(503, "Service Unavailable", "no shard"); return; } + + auto v = shard->ai_cache().get(key); + if (!v) { + st.out += pomai_cache::http_response(404, "Not Found", ""); + } else { + std::string resp_body = pomai_cache::AiArtifactCache::meta_to_json(v->meta) + "\n" + std::string(v->payload.begin(), v->payload.end()); + st.out += pomai_cache::http_response(200, "OK", resp_body); } return; } - - if (cmd.size() < 2) { - st.out += pomai_cache::resp_error("AI commands require at least a key"); - return; - } - - std::string key; - if (c == "AI.PUT" && cmd.size() >= 3) key = std::string(cmd[2]); - else if (c == "AI.GET" && cmd.size() >= 2) key = std::string(cmd[1]); - else if (c == "AI.EXPLAIN" && cmd.size() >= 2) key = std::string(cmd[1]); - - if (!key.empty()) { - auto* shard = pomai_cache::ShardSet::instance().get_shard(key); - if (shard) { - auto& ai_cache = shard->ai_cache(); - if (c == "AI.PUT") { - if (cmd.size() == 5) { - std::vector payload(cmd[4].begin(), cmd[4].end()); - std::string err; - if (ai_cache.put(std::string(cmd[1]), key, std::string(cmd[3]), payload, &err)) { - shard->journal().record(pomai_cache::OpCode::AI_PUT, cmd); - st.out += pomai_cache::resp_simple("OK"); - } else st.out += pomai_cache::resp_error(err); - } else if (cmd.size() >= 7 && upper(cmd[5]) == "DEPENDS_ON") { - std::vector payload(cmd[4].begin(), cmd[4].end()); - std::vector deps; - for (std::size_t i = 6; i < cmd.size(); ++i) deps.push_back(cmd[i]); - std::string err; - if (ai_cache.put_with_deps(std::string(cmd[1]), key, std::string(cmd[3]), payload, deps, &err)) { - shard->journal().record(pomai_cache::OpCode::AI_PUT, cmd); - st.out += pomai_cache::resp_simple("OK"); - } else st.out += pomai_cache::resp_error(err); - } else { - st.out += pomai_cache::resp_error("AI.PUT requires: type key meta payload [DEPENDS_ON parent...]"); - } - } else if (c == "AI.GET") { - auto v = ai_cache.get(key); - if (!v) st.out += pomai_cache::resp_null(); - else { - std::vector arr{ - pomai_cache::resp_bulk(pomai_cache::AiArtifactCache::meta_to_json(v->meta)), - pomai_cache::resp_bulk(std::string(v->payload.begin(), v->payload.end())) - }; - st.out += pomai_cache::resp_array(arr); - } - } else if (c == "AI.EXPLAIN") { - st.out += pomai_cache::resp_bulk(ai_cache.explain(key)); - } - } else st.out += pomai_cache::resp_error("no shard for AI key"); - } else st.out += pomai_cache::resp_error("unsupported or malformed AI command"); - return; } - st.out += pomai_cache::resp_error("unknown command"); + st.out += pomai_cache::http_response(400, "Bad Request", "Unknown command"); } int port_; @@ -515,7 +440,6 @@ class UringWorker { int main(int argc, char **argv) { int port = 6379; - int threads = std::thread::hardware_concurrency(); std::size_t memory_limit = 128 * 1024 * 1024; std::string data_dir = "./data"; @@ -523,23 +447,18 @@ int main(int argc, char **argv) { std::string a = argv[i]; if (a == "--port" && i + 1 < argc) port = std::stoi(argv[++i]); else if (a == "--memory" && i + 1 < argc) memory_limit = std::stoull(argv[++i]); - else if (a == "--threads" && i + 1 < argc) threads = std::stoi(argv[++i]); } pomai_cache::EngineConfig cfg; - cfg.memory_limit_bytes = memory_limit / threads; + cfg.memory_limit_bytes = memory_limit; // No division because it is single-threaded cfg.data_dir = data_dir; - std::cout << "Starting PomaiCache with " << threads << " cores...\n"; - std::vector workers; - for (int i = 0; i < threads; ++i) { - workers.emplace_back([port, cfg, i]() { - UringWorker(port, cfg, i).run(); - }); - } + std::cout << "Starting PomaiCache on single core...\n"; std::signal(SIGINT, on_sigint); - for (auto& t : workers) t.join(); + + // Single-threaded so just call run() on main thread + UringWorker(port, cfg, 0).run(); return 0; } diff --git a/tests/test_http.cpp b/tests/test_http.cpp new file mode 100644 index 0000000..14e65c9 --- /dev/null +++ b/tests/test_http.cpp @@ -0,0 +1,36 @@ +// tests/test_http.cpp +#include +#include "pomai_cache/http.hpp" + +using namespace pomai_cache; + +TEST_CASE("HttpParser: simple GET", "[http]") { + HttpParser parser; + parser.feed("GET /key/a HTTP/1.1\r\nHost: localhost\r\n\r\n"); + auto req = parser.next_request(); + REQUIRE(req.has_value()); + CHECK(req->method == "GET"); + CHECK(req->path == "/key/a"); + CHECK(req->headers["Host"] == "localhost"); + CHECK(req->body.empty()); +} + +TEST_CASE("HttpParser: simple POST with body", "[http]") { + HttpParser parser; + parser.feed("POST /key/a HTTP/1.1\r\nContent-Length: 5\r\n\r\nhello"); + auto req = parser.next_request(); + REQUIRE(req.has_value()); + CHECK(req->method == "POST"); + CHECK(req->path == "/key/a"); + CHECK(req->body == "hello"); +} + +TEST_CASE("HttpParser: query params", "[http]") { + HttpParser parser; + parser.feed("GET /ai/sim/get?vec=1,2,3&topk=5 HTTP/1.1\r\n\r\n"); + auto req = parser.next_request(); + REQUIRE(req.has_value()); + CHECK(req->path == "/ai/sim/get"); + CHECK(req->query_params["vec"] == "1,2,3"); + CHECK(req->query_params["topk"] == "5"); +} diff --git a/tests/test_integration.cpp b/tests/test_integration.cpp index 23c5bd7..824dc47 100644 --- a/tests/test_integration.cpp +++ b/tests/test_integration.cpp @@ -13,47 +13,33 @@ #include #include #include +#include +#include namespace { -std::string cmd(const std::vector &args) { - std::string out = "*" + std::to_string(args.size()) + "\r\n"; - for (const auto &a : args) - out += "$" + std::to_string(a.size()) + "\r\n" + a + "\r\n"; - return out; -} std::optional read_reply(int fd) { std::string out; - char c; - while (recv(fd, &c, 1, 0) == 1) { - out.push_back(c); - if (out.size() >= 2 && out[out.size() - 2] == '\r' && - out[out.size() - 1] == '\n') { - if (out[0] == '+' || out[0] == '-' || out[0] == ':') - return out; - if (out[0] == '$') { - int len = std::stoi(out.substr(1, out.size() - 3)); - if (len < 0) + char buf[4096]; + while (true) { + int r = recv(fd, buf, 4096, 0); + if (r <= 0) break; + out.append(buf, r); + if (out.find("\r\n\r\n") != std::string::npos) { + auto pos = out.find("Content-Length: "); + if (pos != std::string::npos) { + auto end = out.find("\r\n", pos); + int len = std::stoi(out.substr(pos + 16, end - pos - 16)); + auto header_end = out.find("\r\n\r\n") + 4; + if (out.size() >= header_end + len) { return out; - std::string payload(len + 2, '\0'); - if (recv(fd, payload.data(), payload.size(), MSG_WAITALL) <= 0) - return std::nullopt; - out += payload; - return out; - } - if (out[0] == '*') { - int n = std::stoi(out.substr(1, out.size() - 3)); - for (int i = 0; i < n; ++i) { - auto child = read_reply(fd); - if (!child) - return std::nullopt; - out += *child; } + } else { return out; } } } - return std::nullopt; + return out.empty() ? std::nullopt : std::make_optional(out); } int connect_port(int port) { @@ -102,32 +88,41 @@ void stop_server(const ServerProc &s) { } } // namespace -TEST_CASE("integration: RESP core commands and clean shutdown", +TEST_CASE("integration: HTTP core commands and clean shutdown", "[integration]") { auto s = spawn_server(); int fd = connect_port(s.port); REQUIRE(fd >= 0); - auto send_cmd = [&](const std::vector &args) { - auto req = cmd(args); - send(fd, req.data(), req.size(), 0); - return read_reply(fd); - }; - - REQUIRE(send_cmd({"SET", "a", "1"}).value().rfind("+OK", 0) == 0); - REQUIRE(send_cmd({"GET", "a"}).value().find("1") != std::string::npos); - REQUIRE(send_cmd({"MGET", "a", "b"}).has_value()); - REQUIRE(send_cmd({"EXPIRE", "a", "1"}).value().rfind(":1", 0) == 0); - REQUIRE(send_cmd({"TTL", "a"}).has_value()); - REQUIRE(send_cmd({"INFO"}).value()[0] == '$'); - REQUIRE(send_cmd({"CONFIG", "GET", "POLICY"}).value()[0] == '*'); - REQUIRE(send_cmd({"DEL", "a"}).value().rfind(":1", 0) == 0); - - const std::string bad_req = "*1\r\n$4\r\nNOPE\r\n"; + auto req1 = "POST /key/a HTTP/1.1\r\nContent-Length: 1\r\n\r\n1"; + send(fd, req1, strlen(req1), 0); + REQUIRE(read_reply(fd).value().find("200 OK") != std::string::npos); + + auto req2 = "GET /key/a HTTP/1.1\r\n\r\n"; + send(fd, req2, strlen(req2), 0); + REQUIRE(read_reply(fd).value().find("1") != std::string::npos); + + auto req3 = "POST /key/a?ex=1 HTTP/1.1\r\nContent-Length: 1\r\n\r\n1"; + send(fd, req3, strlen(req3), 0); + REQUIRE(read_reply(fd).value().find("200 OK") != std::string::npos); + + auto req4 = "GET /info HTTP/1.1\r\n\r\n"; + send(fd, req4, strlen(req4), 0); + REQUIRE(read_reply(fd).value().find("200 OK") != std::string::npos); + + auto req5 = "GET /config/policy HTTP/1.1\r\n\r\n"; + send(fd, req5, strlen(req5), 0); + REQUIRE(read_reply(fd).value().find("200 OK") != std::string::npos); + + auto req6 = "DELETE /key/a HTTP/1.1\r\n\r\n"; + send(fd, req6, strlen(req6), 0); + REQUIRE(read_reply(fd).value().find("200 OK") != std::string::npos); + + const std::string bad_req = "NOPE /key/a HTTP/1.1\r\n\r\n"; send(fd, bad_req.data(), bad_req.size(), 0); auto bad = read_reply(fd); REQUIRE(bad.has_value()); - CHECK(bad->rfind("-ERR", 0) == 0); + CHECK(bad->find("405") != std::string::npos); close(fd); stop_server(s); @@ -140,25 +135,26 @@ TEST_CASE("integration: adversarial caps and churn", REQUIRE(fd >= 0); std::string big(1024 * 1024 + 8, 'x'); - auto req = cmd({"SET", "big", big}); + std::string req = "POST /key/big HTTP/1.1\r\nContent-Length: " + std::to_string(big.size()) + "\r\n\r\n" + big; send(fd, req.data(), req.size(), 0); auto rep = read_reply(fd); REQUIRE(rep.has_value()); - CHECK(rep->rfind("-ERR", 0) == 0); + CHECK(rep->find("400") != std::string::npos); for (int i = 0; i < 500; ++i) { - auto sreq = cmd({"SET", "churn" + std::to_string(i), "val"}); + std::string sreq = "POST /key/churn" + std::to_string(i) + " HTTP/1.1\r\nContent-Length: 3\r\n\r\nval"; send(fd, sreq.data(), sreq.size(), 0); REQUIRE(read_reply(fd).has_value()); } - auto ireq = cmd({"INFO"}); + + std::string ireq = "GET /info HTTP/1.1\r\n\r\n"; send(fd, ireq.data(), ireq.size(), 0); auto info = read_reply(fd); REQUIRE(info.has_value()); CHECK(info->find("evictions") != std::string::npos); for (int i = 0; i < 128; ++i) { - auto t = cmd({"SET", "ttl" + std::to_string(i), "v", "PX", "1"}); + std::string t = "POST /key/ttl" + std::to_string(i) + "?px=1 HTTP/1.1\r\nContent-Length: 1\r\n\r\nv"; send(fd, t.data(), t.size(), 0); REQUIRE(read_reply(fd).has_value()); } @@ -177,35 +173,34 @@ TEST_CASE("integration: AI artifact commands", "[integration][ai]") { int fd = connect_port(s.port); REQUIRE(fd >= 0); - auto send_cmd = [&](const std::vector &args) { - auto req = cmd(args); - send(fd, req.data(), req.size(), 0); - return read_reply(fd); - }; - - auto put = send_cmd( - {"AI.PUT", "embedding", "emb:m:h:3:float", - "{\"artifact_type\":\"embedding\",\"owner\":\"vector\",\"schema_" - "version\":\"v1\",\"model_id\":\"m\",\"snapshot_epoch\":\"ep9\"}", - "abc"}); + std::string p1 = "POST /ai/put/embedding/emb:m:h:3:float?meta={\"artifact_type\":\"embedding\",\"owner\":\"vector\",\"schema_version\":\"v1\",\"model_id\":\"m\",\"snapshot_epoch\":\"ep9\"} HTTP/1.1\r\nContent-Length: 3\r\n\r\nabc"; + send(fd, p1.data(), p1.size(), 0); + auto put = read_reply(fd); REQUIRE(put.has_value()); - REQUIRE(put->rfind("+OK", 0) == 0); + REQUIRE(put->find("200 OK") != std::string::npos); - auto get = send_cmd({"AI.GET", "emb:m:h:3:float"}); + std::string g1 = "GET /ai/get/emb:m:h:3:float HTTP/1.1\r\n\r\n"; + send(fd, g1.data(), g1.size(), 0); + auto get = read_reply(fd); REQUIRE(get.has_value()); - CHECK(get->rfind("*2", 0) == 0); + CHECK(get->find("200 OK") != std::string::npos); - auto stats = send_cmd({"AI.STATS"}); + std::string s1 = "GET /ai/stats HTTP/1.1\r\n\r\n"; + send(fd, s1.data(), s1.size(), 0); + auto stats = read_reply(fd); REQUIRE(stats.has_value()); CHECK(stats->find("dedup_hits") != std::string::npos); - auto inv = send_cmd({"AI.INVALIDATE", "EPOCH", "ep9"}); + std::string i1 = "POST /ai/invalidate/EPOCH/ep9 HTTP/1.1\r\nContent-Length: 0\r\n\r\n"; + send(fd, i1.data(), i1.size(), 0); + auto inv = read_reply(fd); REQUIRE(inv.has_value()); - CHECK(inv->rfind(":1", 0) == 0); + CHECK(inv->find("1") != std::string::npos); - auto miss = send_cmd({"AI.GET", "emb:m:h:3:float"}); + send(fd, g1.data(), g1.size(), 0); + auto miss = read_reply(fd); REQUIRE(miss.has_value()); - CHECK(miss->rfind("$-1", 0) == 0); + CHECK(miss->find("404") != std::string::npos); close(fd); stop_server(s); diff --git a/tests/test_resp.cpp b/tests/test_resp.cpp deleted file mode 100644 index cdfd595..0000000 --- a/tests/test_resp.cpp +++ /dev/null @@ -1,38 +0,0 @@ -#include "pomai_cache/resp.hpp" - -#include - -using namespace pomai_cache; - -TEST_CASE("RESP parser handles partial feeds", "[resp]") { - RespParser p; - p.feed("*1\r\n$4\r\nPI"); - CHECK_FALSE(p.next_command().has_value()); - p.feed("NG\r\n"); - auto c = p.next_command(); - REQUIRE(c.has_value()); - REQUIRE(c->size() == 1); - CHECK((*c)[0] == "PING"); -} - -TEST_CASE("RESP parser flags malformed lengths", "[resp]") { - RespParser malformed; - malformed.feed("*1\r\n$-99\r\nBAD\r\n"); - CHECK_FALSE(malformed.next_command().has_value()); - - RespParser malformed2; - malformed2.feed("$3\r\nBAD\r\n"); - auto m = malformed2.next_command(); - CHECK_FALSE(m.has_value()); -} - -TEST_CASE("RESP parser supports large bulk string within cap", "[resp]") { - const std::string payload(1024 * 1024, 'a'); - RespParser p; - p.feed("*1\r\n$" + std::to_string(payload.size()) + "\r\n" + payload + - "\r\n"); - auto cmd = p.next_command(); - REQUIRE(cmd.has_value()); - REQUIRE(cmd->size() == 1); - CHECK((*cmd)[0].size() == payload.size()); -} diff --git a/third_party/palloc b/third_party/palloc new file mode 160000 index 0000000..5dca0af --- /dev/null +++ b/third_party/palloc @@ -0,0 +1 @@ +Subproject commit 5dca0af3c6d59853e5a41d6b79d5f9955867e28c