diff --git a/packages/cdn_bench/src/CMakeLists.txt b/packages/cdn_bench/src/CMakeLists.txt new file mode 100644 index 00000000..b0f65cec --- /dev/null +++ b/packages/cdn_bench/src/CMakeLists.txt @@ -0,0 +1,109 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. + +cmake_minimum_required(VERSION 3.10) +project(foss_revproxy CXX) + +set(CMAKE_CXX_STANDARD 20) +set(CMAKE_CXX_STANDARD_REQUIRED ON) + +find_package(proxygen REQUIRED) +find_package(folly REQUIRED) +find_package(gflags REQUIRED) +find_package(glog REQUIRED) +find_package(Threads REQUIRED) + +# Transitive dependencies needed by proxygen at link time +find_package(c-ares QUIET) +if(c-ares_FOUND AND TARGET c-ares::cares) + get_target_property(_CARES_LOC c-ares::cares IMPORTED_LOCATION_RELWITHDEBINFO) + if(_CARES_LOC) + get_filename_component(_CARES_DIR "${_CARES_LOC}" DIRECTORY) + link_directories(${_CARES_DIR}) + endif() +endif() + +# Source root for the bundled foss_revproxy code +set(SRC_ROOT ${CMAKE_CURRENT_SOURCE_DIR}/ti/foss_revproxy) + +# --- Libraries --- + +add_library(load_balancer + ${SRC_ROOT}/proxy/LoadBalancer.cpp + ${SRC_ROOT}/proxy/LoadBalancer.h +) +target_include_directories(load_balancer PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}) +target_link_libraries(load_balancer PUBLIC Threads::Threads) + +add_library(content_handler + ${SRC_ROOT}/server/ContentHandler.cpp + ${SRC_ROOT}/server/ContentHandler.h +) +target_include_directories(content_handler PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}) +target_link_libraries(content_handler + PUBLIC + proxygen::proxygen + proxygen::proxygencurl + Folly::folly + glog::glog +) + +add_library(proxy_handler + ${SRC_ROOT}/proxy/ProxyHandler.cpp + ${SRC_ROOT}/proxy/ProxyHandler.h +) +target_include_directories(proxy_handler PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}) +target_link_libraries(proxy_handler + PUBLIC + load_balancer + proxygen::proxygen + proxygen::proxygencurl + Folly::folly + glog::glog +) + +# --- Binaries --- + +add_executable(traffic_client ${SRC_ROOT}/client/TrafficClient.cpp) +target_include_directories(traffic_client PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}) +target_link_libraries(traffic_client + PRIVATE + proxygen::proxygen + proxygen::proxygencurl + proxygen::proxygenhqloggerhelper + Folly::folly + gflags + glog::glog +) + +add_executable(proxy_server ${SRC_ROOT}/proxy/ProxyServer.cpp) +target_include_directories(proxy_server PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}) +target_link_libraries(proxy_server + PRIVATE + load_balancer + proxy_handler + proxygen::proxygen + proxygen::proxygencurl + Folly::folly + gflags + glog::glog +) + +add_executable(content_server ${SRC_ROOT}/server/ContentServer.cpp) +target_include_directories(content_server PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}) +target_link_libraries(content_server + PRIVATE + content_handler + proxygen::proxygen + proxygen::proxygencurl + Folly::folly + gflags + glog::glog +) + +install(TARGETS traffic_client proxy_server content_server + RUNTIME DESTINATION bin +) diff --git a/packages/cdn_bench/src/ti/foss_revproxy/client/TrafficClient.cpp b/packages/cdn_bench/src/ti/foss_revproxy/client/TrafficClient.cpp new file mode 100644 index 00000000..723df2e5 --- /dev/null +++ b/packages/cdn_bench/src/ti/foss_revproxy/client/TrafficClient.cpp @@ -0,0 +1,344 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include "proxygen/httpserver/samples/hq/InsecureVerifierDangerousDoNotUseInProduction.h" +#include "proxygen/lib/http/coro/HTTPCoroSession.h" +#include "proxygen/lib/http/coro/HTTPFixedSource.h" +#include "proxygen/lib/http/coro/client/HTTPCoroConnector.h" +#include "proxygen/lib/http/coro/client/HTTPCoroSessionPool.h" + +using namespace proxygen; +using namespace proxygen::coro; + +// Constants +namespace { +constexpr size_t IO_BUFFER_SIZE = 4096; // Buffer size for reading responses +} // namespace + +// Configuration flags +DEFINE_string(target_host, "::1", "Target server hostname or IP"); +DEFINE_int32(target_port, 8081, "Target server port"); +DEFINE_bool(target_tls, false, "Use TLS to connect to target"); +DEFINE_bool(quic, false, "Use QUIC/HTTP3"); + +DEFINE_int32(target_rps, 10, "Target requests per second"); +DEFINE_int32(duration_sec, 10, "Duration to run traffic (seconds)"); +DEFINE_int32(max_requests, 0, "Max requests to send (0 = unlimited)"); + +DEFINE_int32(num_connections, 1, "Number of concurrent connections"); +DEFINE_int32( + streams_per_connection, + 1, + "Max concurrent streams per connection"); + +DEFINE_double( + reset_probability, + 0.0, + "Probability (0.0-1.0) of resetting requests"); + +// Test URLs to cycle through +static const std::vector TEST_URLS = { + "/index.html", + "/api/data.json", + "/app.js", + "/image.png", + "/api/users", + "/"}; + +namespace { +struct Metrics { + std::atomic requestsSent{0}; + std::atomic responsesReceived{0}; + std::atomic errors{0}; + std::atomic resets{0}; + std::chrono::steady_clock::time_point startTime; +}; +} // namespace + +std::string getRandomUrl() { + thread_local std::mt19937 rng(std::random_device{}()); + thread_local std::uniform_int_distribution dist( + 0, TEST_URLS.size() - 1); + return TEST_URLS[dist(rng)]; +} + +/** + * Sends a single HTTP request and awaits a response, potentially either + * resetting the connection or reading the full response body. Probability is + * controlled by reset_probability flag. + */ +folly::coro::Task sendSingleRequest( + HTTPCoroSessionPool& pool, + Metrics& metrics, + const uint64_t requestNum, + const double resetProb) { + std::string url = getRandomUrl(); + + // Get connection from pool + auto res = co_await co_awaitTry(pool.getSessionWithReservation()); + if (res.hasException()) { + XLOG(ERR) << "Req #" << requestNum + << " - Failed to get connection: " << res.exception().what(); + metrics.errors++; + co_return; + } + + // Create request + auto request = HTTPFixedSource::makeFixedRequest(url, HTTPMethod::GET); + request->msg_->getHeaders().set(HTTP_HEADER_HOST, FLAGS_target_host); + + XLOG(DBG4) << "Req #" << requestNum << " - Sending: " << url; + metrics.requestsSent++; + + // Send request + auto responseSource = co_await co_awaitTry(res->session->sendRequest( + std::move(request), std::move(res->reservation))); + + if (responseSource.hasException()) { + XLOG(ERR) << "Req #" << requestNum + << " - Send failed: " << responseSource.exception().what(); + metrics.errors++; + co_return; + } + + // Check if we should reset + thread_local std::mt19937 rng(std::random_device{}()); + std::uniform_real_distribution dist(0.0, 1.0); + if (resetProb > 0 && dist(rng) < resetProb) { + XLOG(INFO) << "Req #" << requestNum << " - Resetting"; + metrics.resets++; + // Just return without reading response (simulates reset) + co_return; + } + + // Read response + auto headerEvent = co_await co_awaitTry(responseSource->readHeaderEvent()); + if (headerEvent.hasException()) { + XLOG(ERR) << "Req #" << requestNum << " - Failed to read headers"; + metrics.errors++; + co_return; + } + + uint16_t status = headerEvent->headers->getStatusCode(); + XLOG(DBG4) << "Req #" << requestNum << " - Status: " << status; + + // Drain response body + if (!headerEvent->eom) { + while (true) { + auto bodyEvent = co_await responseSource->readBodyEvent(IO_BUFFER_SIZE); + if (bodyEvent.eom) { + break; + } + } + } + + metrics.responsesReceived++; +} + +/** + * Worker that sends requests at a controlled rate + */ +folly::coro::Task createTrafficWorkerTask( + HTTPCoroSessionPool& pool, + Metrics& metrics, + std::atomic& shouldStop, + const uint64_t workerNum) { + XLOG(INFO) << "Worker " << workerNum << " started"; + + uint64_t requestNum = 0; + auto sleepDuration = std::chrono::milliseconds( + 1000 * FLAGS_num_connections * FLAGS_streams_per_connection / + FLAGS_target_rps); + + while (!shouldStop.load()) { + // Launch concurrent streams + std::vector> tasks; + tasks.reserve(FLAGS_streams_per_connection); + for (int i = 0; i < FLAGS_streams_per_connection && !shouldStop.load(); + ++i) { + tasks.push_back(sendSingleRequest( + pool, + metrics, + workerNum * 1000000 + requestNum++, + FLAGS_reset_probability)); + } + + // Wait for all streams to complete + if (!tasks.empty()) { + co_await folly::coro::collectAllRange(std::move(tasks)); + } + + // Rate limiting + if (!shouldStop.load()) { + co_await folly::coro::sleep(sleepDuration); + } + + // Check max requests + if (FLAGS_max_requests > 0 && + metrics.requestsSent >= static_cast(FLAGS_max_requests)) { + break; + } + } + + XLOG(INFO) << "Worker " << workerNum << " stopped"; +} + +/** + * Run all workers and cleanup when complete + */ +folly::coro::Task runAllWorkers( + std::vector> tasks, + std::atomic* shouldStopPtr, + folly::EventBase* evbPtr) { + co_await folly::coro::collectAllRange(std::move(tasks)); + XLOG(INFO) << "All workers completed"; + shouldStopPtr->store(true); + evbPtr->terminateLoopSoon(); +} + +/** + * Get or create connection pool (singleton per EventBase) + */ +std::unique_ptr createConnectionPool( + folly::EventBase* evb) { + HTTPCoroSessionPool::PoolParams poolParams; + poolParams.maxConnections = FLAGS_num_connections * 2; + + if (FLAGS_quic) { + // QUIC/HTTP3 configuration + auto quicConnParams = + std::make_shared(); + + if (FLAGS_target_tls) { + // Configure TLS for QUIC with "h3" ALPN + auto fizzContext = HTTPCoroConnector::makeFizzClientContext( + HTTPCoroConnector::defaultQuicTLSParams()); + + // Use insecure verifier for test certificates + // WARNING: This is insecure and should only be used for testing! + auto insecureVerifier = std::make_shared< + proxygen::InsecureVerifierDangerousDoNotUseInProduction>(); + quicConnParams->fizzContextAndVerifier = { + std::move(fizzContext), std::move(insecureVerifier)}; + quicConnParams->serverName = FLAGS_target_host; + } + + return std::make_unique( + evb, + FLAGS_target_host, + FLAGS_target_port, + poolParams, + std::move(quicConnParams)); + } else if (FLAGS_target_tls) { + // TLS configuration (HTTP/1.1 or HTTP/2 over TCP) + auto connParams = HTTPCoroConnector::defaultConnectionParams(); + connParams.fizzContextAndVerifier = + HTTPCoroConnector::makeFizzClientContextAndVerifier( + HTTPCoroConnector::defaultTLSParams()); + + return std::make_unique( + evb, + FLAGS_target_host, + FLAGS_target_port, + poolParams, + std::move(connParams)); + } else { + // Plaintext configuration + return std::make_unique( + evb, FLAGS_target_host, FLAGS_target_port, poolParams); + } +} + +int main(int argc, char** argv) { + const folly::Init init(&argc, &argv); + ::gflags::ParseCommandLineFlags(&argc, &argv, false); + + XLOG(INFO) << "=== FOSS Revproxy Traffic Client ==="; + XLOG(INFO) << "Target: " << FLAGS_target_host << ":" << FLAGS_target_port; + XLOG(INFO) << "TLS: " << (FLAGS_target_tls ? "enabled" : "disabled"); + if (FLAGS_target_tls && FLAGS_quic) { + XLOG(INFO) << "QUIC/HTTP3: enabled"; + } + XLOG(INFO) << "Target RPS: " << FLAGS_target_rps; + XLOG(INFO) << "Duration: " << FLAGS_duration_sec << " seconds"; + XLOG(INFO) << "Connections: " << FLAGS_num_connections; + XLOG(INFO) << "Streams per connection: " << FLAGS_streams_per_connection; + XLOG(INFO) << "Reset probability: " << FLAGS_reset_probability; + + Metrics metrics; + metrics.startTime = std::chrono::steady_clock::now(); + + std::atomic shouldStop{false}; + folly::EventBase evb; + auto pool = createConnectionPool(&evb); + + // Schedule duration timer + evb.runAfterDelay( + [&shouldStop, &evb]() { + XLOG(INFO) << "Duration expired, stopping..."; + shouldStop.store(true); + evb.terminateLoopSoon(); + }, + FLAGS_duration_sec * 1000); // milliseconds + + // Launch workers + evb.runInEventBaseThread([poolPtr = pool.get(), + metricsPtr = &metrics, + shouldStopPtr = &shouldStop, + evbPtr = &evb]() { + // Create tasks + std::vector> tasks; + tasks.reserve(FLAGS_num_connections); + for (int i = 0; i < FLAGS_num_connections; ++i) { + tasks.push_back( + createTrafficWorkerTask(*poolPtr, *metricsPtr, *shouldStopPtr, i)); + } + + // Start all workers + folly::coro::co_withExecutor( + evbPtr, runAllWorkers(std::move(tasks), shouldStopPtr, evbPtr)) + .start(); + }); + + // Run event loop + XLOG(INFO) << "Starting event loop..."; + evb.loop(); + + // Print final statistics + auto elapsed = std::chrono::duration_cast( + std::chrono::steady_clock::now() - metrics.startTime); + + XLOG(INFO) << "=== Final Statistics ==="; + XLOG(INFO) << "Requests sent: " << metrics.requestsSent; + XLOG(INFO) << "Responses received: " << metrics.responsesReceived; + XLOG(INFO) << "Errors: " << metrics.errors; + XLOG(INFO) << "Resets: " << metrics.resets; + XLOG(INFO) << "Elapsed time: " << elapsed.count() << " ms"; + + if (elapsed.count() > 0) { + double actualRps = (metrics.requestsSent * 1000.0) / elapsed.count(); + XLOG(INFO) << "Actual RPS: " << actualRps; + } + + // Drain pool + pool->drain(); + + XLOG(INFO) << "=== Traffic Client Shutdown ==="; + return 0; +} diff --git a/packages/cdn_bench/src/ti/foss_revproxy/proxy/LoadBalancer.cpp b/packages/cdn_bench/src/ti/foss_revproxy/proxy/LoadBalancer.cpp new file mode 100644 index 00000000..076d48b1 --- /dev/null +++ b/packages/cdn_bench/src/ti/foss_revproxy/proxy/LoadBalancer.cpp @@ -0,0 +1,61 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include "ti/foss_revproxy/proxy/LoadBalancer.h" +#include +#include +#include + +namespace ti { +namespace foss_revproxy { + +LoadBalancerStrategy parseLoadBalancerStrategy(const std::string& strategy) { + std::string lower = strategy; + std::transform(lower.begin(), lower.end(), lower.begin(), ::tolower); + + if (lower == "random") { + return LoadBalancerStrategy::RANDOM; + } else if (lower == "roundrobin" || lower == "round_robin") { + return LoadBalancerStrategy::ROUND_ROBIN; + } else { + throw std::invalid_argument( + "Invalid load balancer strategy: '" + strategy + + "'. Valid options: random, roundrobin"); + } +} + +const char* loadBalancerStrategyToString(LoadBalancerStrategy strategy) { + switch (strategy) { + case LoadBalancerStrategy::RANDOM: + return "random"; + case LoadBalancerStrategy::ROUND_ROBIN: + return "roundrobin"; + default: + return "unknown"; + } +} + +std::optional RandomLoadBalancer::selectBackend(size_t numBackends) { + if (numBackends == 0) { + return std::nullopt; + } + thread_local std::mt19937 rng(std::random_device{}()); + std::uniform_int_distribution dist(0, numBackends - 1); + return dist(rng); +} + +std::optional RoundRobinLoadBalancer::selectBackend( + size_t numBackends) { + if (numBackends == 0) { + return std::nullopt; + } + return counter_.fetch_add(1, std::memory_order_relaxed) % numBackends; +} + +} // namespace foss_revproxy +} // namespace ti diff --git a/packages/cdn_bench/src/ti/foss_revproxy/proxy/LoadBalancer.h b/packages/cdn_bench/src/ti/foss_revproxy/proxy/LoadBalancer.h new file mode 100644 index 00000000..ca03045c --- /dev/null +++ b/packages/cdn_bench/src/ti/foss_revproxy/proxy/LoadBalancer.h @@ -0,0 +1,98 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +#pragma once + +#include +#include +#include +#include + +namespace ti { +namespace foss_revproxy { + +/** + * LoadBalancerStrategy - Enumeration of available load balancing algorithms + */ +enum class LoadBalancerStrategy { + RANDOM, + ROUND_ROBIN, +}; + +/** + * Convert string to LoadBalancerStrategy enum + * @throws std::invalid_argument if strategy string is invalid + */ +LoadBalancerStrategy parseLoadBalancerStrategy(const std::string& strategy); + +/** + * Convert LoadBalancerStrategy enum to string + */ +const char* loadBalancerStrategyToString(LoadBalancerStrategy strategy); + +/** + * LoadBalancer - Abstract interface for backend selection algorithms + * + * Implementations provide different strategies for selecting which backend + * to route requests to (random, round-robin, least-connections, etc.) + */ +class LoadBalancer { + public: + virtual ~LoadBalancer() = default; + + /** + * Select a backend index from the available backends + * @param numBackends Total number of available backends + * @return Index of selected backend (0 to numBackends-1), or std::nullopt if + * no backend available + */ + virtual std::optional selectBackend(size_t numBackends) = 0; + + /** + * Get the name of this load balancing algorithm + */ + virtual const char* getName() const = 0; +}; + +/** + * RandomLoadBalancer - Randomly selects backends + * + * Uses thread-safe random number generation to distribute requests + * uniformly across all available backends. + */ +class RandomLoadBalancer : public LoadBalancer { + public: + RandomLoadBalancer() = default; + + std::optional selectBackend(size_t numBackends) override; + const char* getName() const override { + return "Random"; + } +}; + +/** + * RoundRobinLoadBalancer - Cycles through backends sequentially + * + * Distributes requests evenly by selecting backends in order. + * Thread-safe using atomic counter. + */ +class RoundRobinLoadBalancer : public LoadBalancer { + public: + RoundRobinLoadBalancer() : counter_(0) {} + + std::optional selectBackend(size_t numBackends) override; + const char* getName() const override { + return "RoundRobin"; + } + + private: + std::atomic counter_; +}; + +} // namespace foss_revproxy +} // namespace ti diff --git a/packages/cdn_bench/src/ti/foss_revproxy/proxy/ProxyHandler.cpp b/packages/cdn_bench/src/ti/foss_revproxy/proxy/ProxyHandler.cpp new file mode 100644 index 00000000..4ea6f93a --- /dev/null +++ b/packages/cdn_bench/src/ti/foss_revproxy/proxy/ProxyHandler.cpp @@ -0,0 +1,219 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include "ti/foss_revproxy/proxy/ProxyHandler.h" + +#include + +#include "proxygen/lib/http/coro/HTTPFixedSource.h" +#include "proxygen/lib/http/coro/HTTPHybridSource.h" + +using namespace proxygen; +using namespace proxygen::coro; + +namespace ti { +namespace foss_revproxy { + +// Connection pool configuration constants +namespace { +constexpr std::chrono::seconds DEFAULT_CONNECT_TIMEOUT{5}; +constexpr size_t MAX_CONNECTION_ATTEMPTS = 3; +} // namespace + +ProxyHandler::ProxyHandler( + std::vector backends, + std::shared_ptr loadBalancer, + ProxyConfig config, + std::shared_ptr metrics) + : backends_(std::move(backends)), + loadBalancer_(std::move(loadBalancer)), + config_(std::move(config)), + metrics_( + metrics ? std::move(metrics) : std::make_shared()) {} + +HTTPCoroSessionPool& ProxyHandler::getBackendPool( + folly::EventBase* evb, + size_t backendIdx) { + using PoolKey = std::pair; + // TODO(sunobrien): ensure thread safety, leaving this for now since I'm going + // to change this implementation anyways as part of the upcoming KR + static std::map> pools; + + auto key = std::make_pair(evb, backendIdx); + auto it = pools.find(key); + + if (it == pools.end()) { + // Create new pool for this (EventBase, backend) pair + if (backendIdx >= backends_.size()) { + throw std::out_of_range("Backend index out of range"); + } + + const auto& backend = backends_[backendIdx]; + bool useH2 = config_.backendH2; + + XLOG(INFO) << "Creating pool for backend " << backendIdx << ": " + << backend.host << ":" << backend.port + << (backend.tls ? " (TLS, ALPN-negotiated)" + : (useH2 ? " (plaintext, HTTP/2)" + : " (plaintext, HTTP/1.1)")); + + std::unique_ptr pool; + HTTPCoroSessionPool::PoolParams poolParams; + poolParams.connectTimeout = DEFAULT_CONNECT_TIMEOUT; + poolParams.maxConnectionAttempts = MAX_CONNECTION_ATTEMPTS; + + if (backend.tls) { + // TLS configuration + auto connParams = HTTPCoroConnector::defaultConnectionParams(); + connParams.serverName = backend.host; + connParams.fizzContextAndVerifier = + HTTPCoroConnector::makeFizzClientContextAndVerifier( + HTTPCoroConnector::defaultTLSParams()); + + pool = std::make_unique( + evb, backend.host, backend.port, poolParams, connParams); + } else { + auto connParams = HTTPCoroConnector::defaultConnectionParams(); + if (useH2) { + connParams.plaintextProtocol = "h2"; + } + + pool = std::make_unique( + evb, backend.host, backend.port, poolParams, connParams); + } + + it = pools.emplace(key, std::move(pool)).first; + } + + return *it->second; +} + +folly::coro::Task ProxyHandler::handleRequest( + folly::EventBase* evb, + HTTPSessionContextPtr /* ctx */, + HTTPSourceHolder requestSource) { + auto requestStart = std::chrono::steady_clock::now(); + + metrics_->requestsReceived++; + + auto headerEvent = co_await co_awaitTry(requestSource.readHeaderEvent()); + if (headerEvent.hasException()) { + XLOG(ERR) << "Failed to read request headers: " + << headerEvent.exception().what(); + recordFailure(requestStart); + co_return getDirectResponse(400, "Bad Request"); + } + + XLOG(DBG2) << "Received request: " << headerEvent->headers->getMethodString() + << " " << headerEvent->headers->getPath(); + // Check if we should send a direct response (for testing) + if (config_.enableDirectResponse && + headerEvent->headers->getPath() == "/direct") { + XLOG(INFO) << "Sending direct response for /direct"; + recordSuccess(requestStart, requestStart); + co_return getDirectResponse(200, "Direct response from proxy\n"); + } + + // Forward to backend + // For requests with no body (eom=true), pass an empty source to avoid hanging + co_return co_await forwardToBackend( + evb, + std::move(headerEvent->headers), + headerEvent->eom ? HTTPSourceHolder() : std::move(requestSource), + requestStart); +} + +folly::coro::Task ProxyHandler::forwardToBackend( + folly::EventBase* evb, + std::unique_ptr headers, + HTTPSourceHolder requestSource, + std::chrono::steady_clock::time_point requestStart) { + // Check if we have any backends configured + if (backends_.empty()) { + XLOG(ERR) << "No backends configured"; + recordFailure(requestStart); + co_return getDirectResponse(503, "No backends available\n"); + } + + // Select backend using load balancer + auto backendIndexOpt = loadBalancer_->selectBackend(backends_.size()); + if (!backendIndexOpt.has_value()) { + XLOG(ERR) << "Load balancer failed to select a backend"; + recordFailure(requestStart); + co_return getDirectResponse(503, "Backend selection failed\n"); + } + size_t backendIndex = backendIndexOpt.value(); + + XLOG(DBG2) << "Selected backend " << backendIndex << " of " + << backends_.size(); + + try { + // Get connection pool for this backend and EventBase + // getBackendPool() lazily creates pools per (EventBase, backend) pair + auto& pool = getBackendPool(evb, backendIndex); + + XLOG(DBG3) << "Getting connection from pool..."; + + auto backendStart = std::chrono::steady_clock::now(); + auto res = co_await co_awaitTry(pool.getSessionWithReservation()); + + if (res.hasException()) { + XLOG(ERR) << "Failed to connect to backend " << backendIndex << ": " + << res.exception().what(); + recordFailure(requestStart); + co_return getDirectResponse(503, "Backend connection failed\n"); + } + + XLOG(DBG3) << "Got connection, forwarding request to backend " + << backendIndex; + + // Forward the request to the backend + // HTTPHybridSource combines headers + body stream + auto response = co_await res->session->sendRequest( + new HTTPHybridSource(std::move(headers), std::move(requestSource)), + std::move(res->reservation)); + + recordSuccess(requestStart, backendStart); + + co_return std::move(response); + + } catch (const std::exception& ex) { + XLOG(ERR) << "Exception while getting connection: " << ex.what(); + recordFailure(requestStart); + co_return getDirectResponse(503, "Backend connection exception\n"); + } +} + +HTTPSourceHolder ProxyHandler::getDirectResponse( + int statusCode, + const std::string& body) { + return HTTPFixedSource::makeFixedResponse(statusCode, body); +} + +void ProxyHandler::recordFailure( + std::chrono::steady_clock::time_point requestStart) { + metrics_->requestsFailed++; + auto elapsed = std::chrono::duration_cast( + std::chrono::steady_clock::now() - requestStart); + metrics_->totalLatencyUs += elapsed.count(); +} + +void ProxyHandler::recordSuccess( + std::chrono::steady_clock::time_point requestStart, + std::chrono::steady_clock::time_point backendStart) { + metrics_->requestsSucceeded++; + auto backendElapsed = std::chrono::duration_cast( + std::chrono::steady_clock::now() - backendStart); + metrics_->backendLatencyUs += backendElapsed.count(); + auto totalElapsed = std::chrono::duration_cast( + std::chrono::steady_clock::now() - requestStart); + metrics_->totalLatencyUs += totalElapsed.count(); +} + +} // namespace foss_revproxy +} // namespace ti diff --git a/packages/cdn_bench/src/ti/foss_revproxy/proxy/ProxyHandler.h b/packages/cdn_bench/src/ti/foss_revproxy/proxy/ProxyHandler.h new file mode 100644 index 00000000..4bb305d7 --- /dev/null +++ b/packages/cdn_bench/src/ti/foss_revproxy/proxy/ProxyHandler.h @@ -0,0 +1,174 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "proxygen/lib/http/coro/HTTPCoroSession.h" +#include "proxygen/lib/http/coro/client/HTTPCoroSessionPool.h" +#include "ti/foss_revproxy/proxy/LoadBalancer.h" + +namespace ti { +namespace foss_revproxy { + +// Backend configuration structure +struct Backend { + std::string host; + uint16_t port; + bool tls; +}; + +// Configuration for proxy behavior +struct ProxyConfig { + bool enableDirectResponse{false}; + bool backendH2{false}; // Use HTTP/2 for plaintext backend connections +}; + +// Metrics collected by the proxy - this is the key benchmarking data +struct ProxyMetrics { + std::atomic requestsReceived{0}; + std::atomic requestsSucceeded{0}; + std::atomic requestsFailed{0}; + std::atomic retriesAttempted{0}; + std::atomic retriesSucceeded{0}; + std::atomic totalLatencyUs{ + 0}; // Total processing time in microseconds + std::atomic backendLatencyUs{0}; // Time waiting for backend + std::chrono::steady_clock::time_point startTime{ + std::chrono::steady_clock::now()}; + + void reset() { + requestsReceived = 0; + requestsSucceeded = 0; + requestsFailed = 0; + retriesAttempted = 0; + retriesSucceeded = 0; + totalLatencyUs = 0; + backendLatencyUs = 0; + startTime = std::chrono::steady_clock::now(); + } + + double getElapsedSeconds() const { + auto now = std::chrono::steady_clock::now(); + return std::chrono::duration(now - startTime).count(); + } + + double getSuccessRate() const { + uint64_t total = requestsSucceeded + requestsFailed; + return total > 0 ? (100.0 * requestsSucceeded / total) : 0.0; + } + + double getActualRPS() const { + double elapsed = getElapsedSeconds(); + return elapsed > 0 ? (requestsSucceeded + requestsFailed) / elapsed : 0.0; + } + + double getAvgLatencyMs() const { + uint64_t total = requestsSucceeded + requestsFailed; + return total > 0 ? (totalLatencyUs / total) / 1000.0 : 0.0; + } + + double getAvgBackendLatencyMs() const { + return requestsSucceeded > 0 + ? (backendLatencyUs / requestsSucceeded) / 1000.0 + : 0.0; + } +}; + +/** + * ProxyHandler - Core HTTP proxy request handler + */ +class ProxyHandler : public proxygen::coro::HTTPHandler { + public: + /** + * Create a proxy handler with full configuration + * @param backends List of backend servers to proxy to + * @param loadBalancer Algorithm for selecting backends + * @param config Proxy configuration (h2 support, retries, etc.) + * @param metrics Optional shared metrics object for tracking performance + */ + ProxyHandler( + std::vector backends, + std::shared_ptr loadBalancer, + ProxyConfig config, + std::shared_ptr metrics = nullptr); + + /** + * Handle an incoming HTTP request + * + * This is called by the HTTPServer framework for each request. + * Returns an HTTPSourceHolder representing the response. + * + * Gets backend connection pools via getBackendPool() function which + * lazily creates pools per (EventBase, backend) pair. + */ + folly::coro::Task handleRequest( + folly::EventBase* evb, + proxygen::coro::HTTPSessionContextPtr ctx, + proxygen::coro::HTTPSourceHolder requestSource) override; + + /** + * Get the metrics object for this handler + */ + std::shared_ptr getMetrics() const { + return metrics_; + } + + private: + std::vector backends_; + std::shared_ptr loadBalancer_; + ProxyConfig config_; + std::shared_ptr metrics_; + + /** + * Get or create connection pool for a specific backend and EventBase. + * + * Uses static pool storage with lazy initialization per (EventBase, backend) + * pair. Returns reference for direct use. + */ + proxygen::coro::HTTPCoroSessionPool& getBackendPool( + folly::EventBase* evb, + size_t backendIdx); + + /** + * Forward request to a backend server + */ + folly::coro::Task forwardToBackend( + folly::EventBase* evb, + std::unique_ptr headers, + proxygen::coro::HTTPSourceHolder requestSource, + std::chrono::steady_clock::time_point requestStart); + + /** + * Send a direct response without going to backend + */ + proxygen::coro::HTTPSourceHolder getDirectResponse( + int statusCode, + const std::string& body = ""); + + /** + * Record a failed request in metrics + */ + void recordFailure(std::chrono::steady_clock::time_point requestStart); + + /** + * Record a successful request in metrics + */ + void recordSuccess( + std::chrono::steady_clock::time_point requestStart, + std::chrono::steady_clock::time_point backendStart); +}; + +} // namespace foss_revproxy +} // namespace ti diff --git a/packages/cdn_bench/src/ti/foss_revproxy/proxy/ProxyServer.cpp b/packages/cdn_bench/src/ti/foss_revproxy/proxy/ProxyServer.cpp new file mode 100644 index 00000000..cb934853 --- /dev/null +++ b/packages/cdn_bench/src/ti/foss_revproxy/proxy/ProxyServer.cpp @@ -0,0 +1,315 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "proxygen/lib/http/coro/server/HTTPServer.h" +#include "ti/foss_revproxy/proxy/LoadBalancer.h" +#include "ti/foss_revproxy/proxy/ProxyHandler.h" + +// Server configuration +DEFINE_int32(port, 8081, "Port to listen on"); +DEFINE_string(cert, "", "Certificate file (for TLS/QUIC)"); +DEFINE_string(key, "", "Key file (for TLS/QUIC)"); +DEFINE_string(plaintext_proto, "", "Plaintext protocol (h1, h2, etc.)"); +DEFINE_bool(quic, false, "Enable QUIC/HTTP3 (requires cert/key)"); + +// Backend configuration +DEFINE_string( + backend_servers, + "", + "Comma-separated list of backend server addresses (e.g., '::1,::1,::1')"); +DEFINE_string( + backend_ports, + "", + "Comma-separated list of backend server ports (e.g., '8082,8083,8084')"); +DEFINE_bool(backend_tls, false, "Use TLS for backend connections"); +DEFINE_bool(backend_h2, false, "Use HTTP/2 for plaintext backend connections"); + +// Load balancing +DEFINE_string( + lb_algorithm, + "random", + "Load balancing algorithm: 'random' or 'roundrobin'"); + +// Feature flags +DEFINE_bool( + enable_direct_response, + false, + "Enable direct response mode (send responses without backend)"); + +// Metrics configuration +DEFINE_int32( + metrics_interval, + 5, + "Interval in seconds between metrics output (0 = disabled)"); +DEFINE_bool(metrics_summary, true, "Print final metrics summary on shutdown"); + +using namespace proxygen; +using namespace proxygen::coro; +using namespace ti::foss_revproxy; + +namespace { + +/** + * Parse comma-separated values into a vector + * Reports warnings for empty elements that are skipped + */ +std::vector parseCSV(const std::string& csv) { + std::vector result; + + if (csv.empty()) { + return result; + } + + std::stringstream ss(csv); + std::string item; + size_t position = 0; + bool foundEmptyElement = false; + + while (std::getline(ss, item, ',')) { + // Trim whitespace + item.erase(0, item.find_first_not_of(" \t")); + item.erase(item.find_last_not_of(" \t") + 1); + + if (!item.empty()) { + result.push_back(item); + } else { + foundEmptyElement = true; + XLOG(WARN) << "Skipping empty element at position " << position + << " in CSV: '" << csv << "'"; + } + position++; + } + + if (foundEmptyElement) { + XLOG(WARN) << "CSV input had empty elements that were skipped"; + } + + return result; +} + +/** + * Create load balancer based on configuration and return it + */ +std::shared_ptr createLoadBalancer() { + LoadBalancerStrategy strategy; + try { + strategy = parseLoadBalancerStrategy(FLAGS_lb_algorithm); + } catch (const std::invalid_argument& e) { + XLOG(WARN) << "Invalid load balancer algorithm '" << FLAGS_lb_algorithm + << "': " << e.what() << ". Defaulting to Random."; + strategy = LoadBalancerStrategy::RANDOM; + } + + std::shared_ptr loadBalancer; + switch (strategy) { + case LoadBalancerStrategy::ROUND_ROBIN: + XLOG(INFO) << "Using RoundRobin load balancing"; + loadBalancer = std::make_shared(); + break; + case LoadBalancerStrategy::RANDOM: + default: // default not needed, but makes linter and Devmate Reviewer happy + XLOG(INFO) << "Using Random load balancing"; + loadBalancer = std::make_shared(); + break; + } + + return loadBalancer; +} + +/** + * Configure backends from flags and return backend vector + */ +std::vector configureBackends() { + std::vector backends; + + if (FLAGS_backend_servers.empty() || FLAGS_backend_ports.empty()) { + XLOG(WARN) << "No backends configured. Proxy will return 503 for all " + "requests (unless direct response is enabled)."; + return backends; + } + + auto servers = parseCSV(FLAGS_backend_servers); + auto ports = parseCSV(FLAGS_backend_ports); + + if (servers.size() != ports.size()) { + XLOG(ERR) << "Number of backend servers (" << servers.size() + << ") does not match number of backend ports (" << ports.size() + << ")"; + throw std::invalid_argument("Mismatched backend servers and ports"); + } + + XLOG(INFO) << "Configuring " << servers.size() << " backend server(s):"; + + // Populate backends vector + for (size_t i = 0; i < servers.size(); ++i) { + // Parse and validate port number + int portNum = std::stoi(ports[i]); + if (portNum < 1 || portNum > 65535) { + XLOG(ERR) << "Invalid port number: " << portNum << " (must be 1-65535)"; + throw std::invalid_argument( + "Port must be in range 1-65535, got: " + std::to_string(portNum)); + } + uint16_t port = static_cast(portNum); + backends.push_back({servers[i], port, FLAGS_backend_tls}); + XLOG(INFO) << " Backend " << i << ": " << servers[i] << ":" << port + << (FLAGS_backend_tls ? " (TLS)" : " (plaintext)"); + } + + return backends; +} + +/** + * Configure TLS if cert/key are provided + */ +void configureTLS(HTTPServer::Config& config) { + if (!FLAGS_cert.empty()) { + auto tlsConfig = HTTPServer::getDefaultTLSConfig(); + try { + tlsConfig.setCertificate(FLAGS_cert, FLAGS_key, ""); + } catch (const std::exception& ex) { + XLOG(ERR) << "Invalid certificate or key file: " << ex.what(); + throw; + } + config.socketConfig.sslContextConfigs.emplace_back(std::move(tlsConfig)); + + if (FLAGS_quic) { + XLOG(INFO) << "Enabling QUIC/HTTP3 support"; + config.quicConfig = HTTPServer::QuicConfig(); + } + } else if (FLAGS_quic) { + XLOG(ERR) << "QUIC requires --cert and --key"; + throw std::invalid_argument("QUIC requires certificates"); + } +} + +} // namespace + +int main(int argc, char** argv) { + const folly::Init init(&argc, &argv); + ::gflags::ParseCommandLineFlags(&argc, &argv, false); + + XLOG(INFO) << "=== FOSS Revproxy Starting ==="; + XLOG(INFO) << "Listening on port " << FLAGS_port; + + if (!FLAGS_cert.empty()) { + XLOG(INFO) << "TLS enabled with cert: " << FLAGS_cert; + if (FLAGS_quic) { + XLOG(INFO) << "QUIC/HTTP3 enabled"; + } + } else { + XLOG(INFO) << "Running in plaintext mode"; + } + + HTTPServer::Config httpServerConfig; + // Bind to IPv6 wildcard (::) which accepts both IPv4 and IPv6 on dual-stack + httpServerConfig.socketConfig.bindAddress.setFromIpPort("::", FLAGS_port); + httpServerConfig.plaintextProtocol = FLAGS_plaintext_proto; + + configureTLS(httpServerConfig); + + auto backends = configureBackends(); + + auto loadBalancer = createLoadBalancer(); + + // Create shared metrics object + auto metrics = std::make_shared(); + + ProxyConfig proxyConfig{ + .enableDirectResponse = FLAGS_enable_direct_response, + .backendH2 = FLAGS_backend_h2}; + + if (FLAGS_backend_h2) { + XLOG(INFO) << "HTTP/2 enabled for backend connections"; + } + + auto handler = std::make_shared( + backends, loadBalancer, proxyConfig, metrics); + + XLOG(INFO) << "Backend configuration complete"; + XLOG(INFO) << "Starting HTTP server..."; + + // Start metrics reporting thread if interval > 0 + std::atomic stopMetrics{false}; + std::thread metricsThread; + + if (FLAGS_metrics_interval > 0) { + XLOG(INFO) << "Metrics reporting enabled every " << FLAGS_metrics_interval + << " seconds"; + metricsThread = std::thread([&metrics, &stopMetrics]() { + while (!stopMetrics) { + std::this_thread::sleep_for( + std::chrono::seconds(FLAGS_metrics_interval)); + if (stopMetrics) { + break; + } + + // Print periodic metrics + XLOG(INFO) << "=== Proxy Metrics ==="; + XLOG(INFO) << "Elapsed: " << std::fixed << std::setprecision(1) + << metrics->getElapsedSeconds() << "s"; + XLOG(INFO) << "Requests: " << metrics->requestsReceived + << " | Success: " << metrics->requestsSucceeded + << " | Failed: " << metrics->requestsFailed; + XLOG(INFO) << "RPS: " << std::fixed << std::setprecision(1) + << metrics->getActualRPS() + << " | Success Rate: " << std::setprecision(2) + << metrics->getSuccessRate() << "%"; + XLOG(INFO) << "Avg Latency: " << std::setprecision(2) + << metrics->getAvgLatencyMs() + << "ms | Backend: " << metrics->getAvgBackendLatencyMs() + << "ms"; + if (metrics->retriesAttempted > 0) { + XLOG(INFO) << "Retries: " << metrics->retriesAttempted + << " | Retries Succeeded: " << metrics->retriesSucceeded; + } + } + }); + } + + HTTPServer server(std::move(httpServerConfig), handler); + server.start(); + + // Stop metrics thread + stopMetrics = true; + if (metricsThread.joinable()) { + metricsThread.join(); + } + + // Print final summary + if (FLAGS_metrics_summary) { + XLOG(INFO) << "=== Final Proxy Statistics ==="; + XLOG(INFO) << "Total Elapsed: " << std::fixed << std::setprecision(2) + << metrics->getElapsedSeconds() << " seconds"; + XLOG(INFO) << "Requests Received: " << metrics->requestsReceived; + XLOG(INFO) << "Requests Succeeded: " << metrics->requestsSucceeded; + XLOG(INFO) << "Requests Failed: " << metrics->requestsFailed; + XLOG(INFO) << "Success Rate: " << std::setprecision(2) + << metrics->getSuccessRate() << "%"; + XLOG(INFO) << "Actual RPS: " << std::setprecision(1) + << metrics->getActualRPS(); + XLOG(INFO) << "Avg Total Latency: " << std::setprecision(3) + << metrics->getAvgLatencyMs() << " ms"; + XLOG(INFO) << "Avg Backend Latency: " << metrics->getAvgBackendLatencyMs() + << " ms"; + XLOG(INFO) << "Retries Attempted: " << metrics->retriesAttempted; + XLOG(INFO) << "Retries Succeeded: " << metrics->retriesSucceeded; + } + + XLOG(INFO) << "=== FOSS Revproxy Shutdown ==="; + + return 0; +} diff --git a/packages/cdn_bench/src/ti/foss_revproxy/server/ContentHandler.cpp b/packages/cdn_bench/src/ti/foss_revproxy/server/ContentHandler.cpp new file mode 100644 index 00000000..992dbb35 --- /dev/null +++ b/packages/cdn_bench/src/ti/foss_revproxy/server/ContentHandler.cpp @@ -0,0 +1,218 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include "ti/foss_revproxy/server/ContentHandler.h" + +#include + +#include "proxygen/lib/http/coro/HTTPFixedSource.h" + +using namespace proxygen; +using namespace proxygen::coro; + +namespace ti { +namespace foss_revproxy { + +// Constants for content generation +namespace { +constexpr int NUM_HTML_VARIANTS = 3; +constexpr int NUM_JS_VARIANTS = 2; +constexpr int NUM_PNG_VARIANTS = 3; +constexpr size_t REQUEST_BODY_READ_SIZE = + 4096; // Buffer size for reading request bodies +} // namespace + +// Minimal valid PNG images (1x1 pixel, different colors) +static const std::vector RED_PNG = { + 0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A, 0x00, 0x00, 0x00, 0x0D, + 0x49, 0x48, 0x44, 0x52, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, + 0x08, 0x02, 0x00, 0x00, 0x00, 0x90, 0x77, 0x53, 0xDE, 0x00, 0x00, 0x00, + 0x0C, 0x49, 0x44, 0x41, 0x54, 0x08, 0xD7, 0x63, 0xF8, 0xCF, 0xC0, 0x00, + 0x00, 0x03, 0x01, 0x01, 0x00, 0x18, 0xDD, 0x8D, 0xB4, 0x00, 0x00, 0x00, + 0x00, 0x49, 0x45, 0x4E, 0x44, 0xAE, 0x42, 0x60, 0x82}; + +static const std::vector GREEN_PNG = { + 0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A, 0x00, 0x00, 0x00, 0x0D, + 0x49, 0x48, 0x44, 0x52, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, + 0x08, 0x02, 0x00, 0x00, 0x00, 0x90, 0x77, 0x53, 0xDE, 0x00, 0x00, 0x00, + 0x0C, 0x49, 0x44, 0x41, 0x54, 0x08, 0xD7, 0x63, 0x60, 0xF8, 0x0F, 0x00, + 0x00, 0x02, 0x01, 0x01, 0xE5, 0x27, 0xDE, 0xFC, 0x00, 0x00, 0x00, 0x00, + 0x49, 0x45, 0x4E, 0x44, 0xAE, 0x42, 0x60, 0x82}; + +static const std::vector BLUE_PNG = { + 0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A, 0x00, 0x00, 0x00, 0x0D, + 0x49, 0x48, 0x44, 0x52, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, + 0x08, 0x02, 0x00, 0x00, 0x00, 0x90, 0x77, 0x53, 0xDE, 0x00, 0x00, 0x00, + 0x0C, 0x49, 0x44, 0x41, 0x54, 0x08, 0xD7, 0x63, 0x60, 0x60, 0xF8, 0x0F, + 0x00, 0x00, 0x04, 0x01, 0x01, 0xE7, 0x87, 0xD8, 0x0A, 0x00, 0x00, 0x00, + 0x00, 0x49, 0x45, 0x4E, 0x44, 0xAE, 0x42, 0x60, 0x82}; + +ContentHandler::ContentHandler(double resetProbability) + : resetProbability_(resetProbability) {} + +folly::coro::Task ContentHandler::handleRequest( + folly::EventBase* evb, + HTTPSessionContextPtr ctx, + HTTPSourceHolder requestSource) { + // Read request headers + auto headerEvent = co_await co_awaitTry(requestSource.readHeaderEvent()); + if (headerEvent.hasException()) { + XLOG(ERR) << "Failed to read request headers"; + co_return HTTPFixedSource::makeFixedResponse(500, "Internal Server Error"); + } + + // Get path from headers + std::string path = headerEvent->headers->getPath(); + std::string method = headerEvent->headers->getMethodString(); + + if (!headerEvent->eom) { + // Drain request body + while (true) { + auto bodyEvent = + co_await requestSource.readBodyEvent(REQUEST_BODY_READ_SIZE); + if (bodyEvent.eom) { + break; + } + } + } + + // Check if we should reset this connection + thread_local std::mt19937 rng(std::random_device{}()); + std::uniform_real_distribution dist(0.0, 1.0); + if (resetProbability_ > 0 && dist(rng) < resetProbability_) { + XLOG(INFO) << "Randomly resetting connection"; + // Return error to simulate reset + co_return HTTPFixedSource::makeFixedResponse(503, "Connection Reset"); + } + + int reqNum = requestCounter_++; + XLOG(DBG2) << "Request #" << reqNum << ": " << method << " " << path; + + co_return generateResponse(path, reqNum); +} + +HTTPSourceHolder ContentHandler::generateResponse( + const std::string& path, + int requestNumber) { + // Route based on path + if (path == "/" || path == "/index.html") { + auto msg = std::make_unique(); + msg->setStatusCode(200); + msg->setStatusMessage("OK"); + msg->getHeaders().set(HTTP_HEADER_CONTENT_TYPE, "text/html; charset=utf-8"); + return HTTPFixedSource::makeFixedSource( + std::move(msg), + folly::IOBuf::copyBuffer(getHTMLContent(requestNumber))); + } + + if (path.find("/api/") == 0 || path.find(".json") != std::string::npos) { + auto msg = std::make_unique(); + msg->setStatusCode(200); + msg->setStatusMessage("OK"); + msg->getHeaders().set(HTTP_HEADER_CONTENT_TYPE, "application/json"); + return HTTPFixedSource::makeFixedSource( + std::move(msg), folly::IOBuf::copyBuffer(getJSONContent())); + } + + if (path.find(".js") != std::string::npos) { + auto msg = std::make_unique(); + msg->setStatusCode(200); + msg->setStatusMessage("OK"); + msg->getHeaders().set(HTTP_HEADER_CONTENT_TYPE, "application/javascript"); + return HTTPFixedSource::makeFixedSource( + std::move(msg), folly::IOBuf::copyBuffer(getJSContent(requestNumber))); + } + + if (path.find(".png") != std::string::npos || path.find("/image") == 0) { + auto imgData = getPNGImage(requestNumber); + auto msg = std::make_unique(); + msg->setStatusCode(200); + msg->setStatusMessage("OK"); + msg->getHeaders().set(HTTP_HEADER_CONTENT_TYPE, "image/png"); + return HTTPFixedSource::makeFixedSource( + std::move(msg), + folly::IOBuf::copyBuffer(imgData.data(), imgData.size())); + } + + // Default: return 404 + auto msg = std::make_unique(); + msg->setStatusCode(404); + msg->setStatusMessage("Not Found"); + msg->getHeaders().set(HTTP_HEADER_CONTENT_TYPE, "text/html; charset=utf-8"); + return HTTPFixedSource::makeFixedSource( + std::move(msg), + folly::IOBuf::copyBuffer( + "

404 Not Found

")); +} + +std::string ContentHandler::getHTMLContent(int request_num) { + int variant = request_num % NUM_HTML_VARIANTS; + return fmt::format( + "\n\n\n" + "FOSS Revproxy Test Server - Page {}\n" + "\n" + "\n\n" + "

FOSS Revproxy Test Server

\n" + "

This is test page variant {}

\n" + "

Request counter: {}

\n" + "\n" + "\n", + variant, + variant, + requestCounter_.load()); +} + +std::string ContentHandler::getJSContent(int request_num) { + switch (request_num % NUM_JS_VARIANTS) { + case 0: + return R"( +console.log('FOSS Revproxy Test - Script A'); +function testFunction() { + return 'Hello from server script A'; +} +)"; + case 1: + default: // unnecessary but the linter wants it + return R"( +console.log('FOSS Revproxy Test - Script B'); +function testFunction() { + return 'Hello from server script B'; +} +)"; + } +} + +std::string ContentHandler::getJSONContent() { + return fmt::format( + "{{\n" + " \"server\": \"foss_revproxy_test\",\n" + " \"requestCount\": {},\n" + " \"timestamp\": {},\n" + " \"data\": [\n" + " {{\"id\": 1, \"value\": \"test1\"}},\n" + " {{\"id\": 2, \"value\": \"test2\"}},\n" + " {{\"id\": 3, \"value\": \"test3\"}}\n" + " ]\n" + "}}", + requestCounter_.load(), + time(nullptr)); +} + +std::vector ContentHandler::getPNGImage(int variant) { + switch (variant % NUM_PNG_VARIANTS) { + case 0: + return RED_PNG; + case 1: + return GREEN_PNG; + case 2: + default: // unnecessary but the linter wants it + return BLUE_PNG; + } +} +} // namespace foss_revproxy +} // namespace ti diff --git a/packages/cdn_bench/src/ti/foss_revproxy/server/ContentHandler.h b/packages/cdn_bench/src/ti/foss_revproxy/server/ContentHandler.h new file mode 100644 index 00000000..a85c173f --- /dev/null +++ b/packages/cdn_bench/src/ti/foss_revproxy/server/ContentHandler.h @@ -0,0 +1,56 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +#pragma once + +#include +#include +#include + +#include "proxygen/lib/http/coro/HTTPCoroSession.h" + +namespace ti { +namespace foss_revproxy { + +/** + * ContentHandler - HTTP server that serves various content types + * + * Features: + * - Serves HTML, JavaScript, JSON + * - Serves embedded images (PNG) + * - Can randomly reset connections for testing + * - Cycles through different responses + */ +class ContentHandler : public proxygen::coro::HTTPHandler { + public: + explicit ContentHandler(double resetProbability = 0.0); + + // HTTPHandler interface + folly::coro::Task handleRequest( + folly::EventBase* evb, + proxygen::coro::HTTPSessionContextPtr ctx, + proxygen::coro::HTTPSourceHolder requestSource) override; + + private: + // Content generation methods + std::string getHTMLContent(int variant); + std::string getJSContent(int variant); + std::string getJSONContent(); + std::vector getPNGImage(int variant); + + // Get appropriate content based on URL path + proxygen::coro::HTTPSourceHolder generateResponse( + const std::string& path, + int requestNumber); + + double resetProbability_; + std::atomic requestCounter_{0}; +}; + +} // namespace foss_revproxy +} // namespace ti diff --git a/packages/cdn_bench/src/ti/foss_revproxy/server/ContentServer.cpp b/packages/cdn_bench/src/ti/foss_revproxy/server/ContentServer.cpp new file mode 100644 index 00000000..c0bef3d0 --- /dev/null +++ b/packages/cdn_bench/src/ti/foss_revproxy/server/ContentServer.cpp @@ -0,0 +1,83 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include +#include +#include + +#include "proxygen/lib/http/coro/server/HTTPServer.h" +#include "ti/foss_revproxy/server/ContentHandler.h" + +using namespace proxygen::coro; +using namespace ti::foss_revproxy; + +// Configuration flags +DEFINE_int32(port, 8082, "Port to listen on"); +DEFINE_string(cert, "", "TLS certificate file (optional)"); +DEFINE_string(key, "", "TLS key file (optional)"); +DEFINE_bool(quic, false, "Enable QUIC/HTTP3 (requires cert/key)"); +DEFINE_string(plaintext_proto, "", "Plaintext protocol (h2 or http/1.1)"); +DEFINE_double( + reset_probability, + 0.0, + "Probability (0.0-1.0) of randomly resetting connections"); + +int main(int argc, char** argv) { + const folly::Init init(&argc, &argv); + ::gflags::ParseCommandLineFlags(&argc, &argv, false); + + XLOG(INFO) << "=== FOSS Revproxy Content Server ==="; + XLOG(INFO) << "Listening on port " << FLAGS_port; + XLOG(INFO) << "Reset probability: " << FLAGS_reset_probability; + + // Create HTTP server configuration + HTTPServer::Config config; + config.socketConfig.bindAddress.setFromLocalPort(FLAGS_port); + config.plaintextProtocol = FLAGS_plaintext_proto; + + // Configure TLS if provided + if (!FLAGS_cert.empty() && !FLAGS_key.empty()) { + XLOG(INFO) << "TLS enabled"; + if (FLAGS_quic) { + XLOG(INFO) << "QUIC/HTTP3 enabled"; + } + + auto tlsConfig = HTTPServer::getDefaultTLSConfig(); + try { + tlsConfig.setCertificate(FLAGS_cert, FLAGS_key, ""); + } catch (const std::exception& ex) { + XLOG(ERR) << "Failed to load TLS certificate: " << ex.what(); + return 1; + } + + config.socketConfig.sslContextConfigs.emplace_back(std::move(tlsConfig)); + + if (FLAGS_quic) { + XLOG(INFO) << "Enabling QUIC/HTTP3 support"; + config.quicConfig = HTTPServer::QuicConfig(); + } + } else if (FLAGS_quic) { + XLOG(ERR) << "QUIC requires --cert and --key"; + return 1; + } else { + XLOG(INFO) << "Running in plaintext mode"; + } + + // Create content handler + auto handler = std::make_shared(FLAGS_reset_probability); + + XLOG(INFO) << "Starting HTTP server..."; + XLOG(INFO) << "Content types: HTML, JavaScript, JSON, PNG images"; + + // Create and start server + HTTPServer server(std::move(config), handler); + server.start(); + + XLOG(INFO) << "=== Content Server Shutdown ==="; + return 0; +}