diff --git a/.gitignore b/.gitignore index 21839782df..9e2bf24566 100644 --- a/.gitignore +++ b/.gitignore @@ -16,7 +16,11 @@ example/benchmark example/redirect !example/redirect.* example/ssecli +!example/ssecli.* +example/ssecli-stream +!example/ssecli-stream.* example/ssesvr +!example/ssesvr.* example/upload !example/upload.* example/one_time_request diff --git a/README-stream.md b/README-stream.md new file mode 100644 index 0000000000..ce02e1c705 --- /dev/null +++ b/README-stream.md @@ -0,0 +1,315 @@ +# cpp-httplib Streaming API + +This document describes the streaming extensions for cpp-httplib, providing an iterator-style API for handling HTTP responses incrementally with **true socket-level streaming**. + +> **Important Notes**: +> +> - **No Keep-Alive**: Each `stream::Get()` call uses a dedicated connection that is closed after the response is fully read. For connection reuse, use `Client::Get()`. +> - **Single iteration only**: The `next()` method can only iterate through the body once. +> - **Result is not thread-safe**: While `stream::Get()` can be called from multiple threads simultaneously, the returned `stream::Result` must be used from a single thread only. + +## Overview + +The streaming API allows you to process HTTP response bodies chunk by chunk using an iterator-style pattern. Data is read directly from the network socket, enabling low-memory processing of large responses. This is particularly useful for: + +- **LLM/AI streaming responses** (e.g., ChatGPT, Claude, Ollama) +- **Server-Sent Events (SSE)** +- **Large file downloads** with progress tracking +- **Reverse proxy implementations** + +## Quick Start + +```cpp +#include "httplib.h" + +int main() { + httplib::Client cli("http://localhost:8080"); + + // Get streaming response + auto result = httplib::stream::Get(cli, "/stream"); + + if (result) { + // Process response body in chunks + while (result.next()) { + std::cout.write(result.data(), result.size()); + } + } + + return 0; +} +``` + +## API Layers + +cpp-httplib provides multiple API layers for different use cases: + +```text +┌─────────────────────────────────────────────┐ +│ SSEClient (planned) │ ← SSE-specific, parsed events +│ - on_message(), on_event() │ +│ - Auto-reconnect, Last-Event-ID │ +├─────────────────────────────────────────────┤ +│ stream::Get() / stream::Result │ ← Iterator-based streaming +│ - while (result.next()) { ... } │ +├─────────────────────────────────────────────┤ +│ open_stream() / StreamHandle │ ← General-purpose streaming +│ - handle.read(buf, len) │ +├─────────────────────────────────────────────┤ +│ Client::Get() │ ← Traditional, full buffering +└─────────────────────────────────────────────┘ +``` + +| Use Case | Recommended API | +|----------|----------------| +| SSE with auto-reconnect | SSEClient (planned) or `ssecli-stream.cc` example | +| LLM streaming (JSON Lines) | `stream::Get()` | +| Large file download | `stream::Get()` or `open_stream()` | +| Reverse proxy | `open_stream()` | +| Small responses with Keep-Alive | `Client::Get()` | + +## API Reference + +### Low-Level API: `StreamHandle` + +The `StreamHandle` struct provides direct control over streaming responses. It takes ownership of the socket connection and reads data directly from the network. + +> **Note:** When using `open_stream()`, the connection is dedicated to streaming and **Keep-Alive is not supported**. For Keep-Alive connections, use `client.Get()` instead. + +```cpp +// Open a stream (takes ownership of socket) +httplib::Client cli("http://localhost:8080"); +auto handle = cli.open_stream("/path"); + +// Check validity +if (handle.is_valid()) { + // Access response headers immediately + int status = handle.response->status; + auto content_type = handle.response->get_header_value("Content-Type"); + + // Read body incrementally + char buf[4096]; + ssize_t n; + while ((n = handle.read(buf, sizeof(buf))) > 0) { + process(buf, n); + } +} +``` + +#### StreamHandle Members + +| Member | Type | Description | +|--------|------|-------------| +| `response` | `std::unique_ptr` | HTTP response with headers | +| `error` | `Error` | Error code if request failed | +| `is_valid()` | `bool` | Returns true if response is valid | +| `read(buf, len)` | `ssize_t` | Read up to `len` bytes directly from socket | +| `get_read_error()` | `Error` | Get the last read error | +| `has_read_error()` | `bool` | Check if a read error occurred | + +### High-Level API: `stream::Get()` and `stream::Result` + +The `httplib.h` header provides a more ergonomic iterator-style API. + +```cpp +#include "httplib.h" + +httplib::Client cli("http://localhost:8080"); + +// Simple GET +auto result = httplib::stream::Get(cli, "/path"); + +// GET with custom headers +httplib::Headers headers = {{"Authorization", "Bearer token"}}; +auto result = httplib::stream::Get(cli, "/path", headers); + +// Process the response +if (result) { + while (result.next()) { + process(result.data(), result.size()); + } +} + +// Or read entire body at once +auto result2 = httplib::stream::Get(cli, "/path"); +if (result2) { + std::string body = result2.read_all(); +} +``` + +#### stream::Result Members + +| Member | Type | Description | +|--------|------|-------------| +| `operator bool()` | `bool` | Returns true if response is valid | +| `is_valid()` | `bool` | Same as `operator bool()` | +| `status()` | `int` | HTTP status code | +| `headers()` | `const Headers&` | Response headers | +| `get_header_value(key, def)` | `std::string` | Get header value (with optional default) | +| `has_header(key)` | `bool` | Check if header exists | +| `next()` | `bool` | Read next chunk, returns false when done | +| `data()` | `const char*` | Pointer to current chunk data | +| `size()` | `size_t` | Size of current chunk | +| `read_all()` | `std::string` | Read entire remaining body into string | +| `error()` | `Error` | Get the connection/request error | +| `read_error()` | `Error` | Get the last read error | +| `has_read_error()` | `bool` | Check if a read error occurred | + +## Usage Examples + +### Example 1: SSE (Server-Sent Events) Client + +```cpp +#include "httplib.h" +#include + +int main() { + httplib::Client cli("http://localhost:1234"); + + auto result = httplib::stream::Get(cli, "/events"); + if (!result) { return 1; } + + while (result.next()) { + std::cout.write(result.data(), result.size()); + std::cout.flush(); + } + + return 0; +} +``` + +For a complete SSE client with auto-reconnection and event parsing, see `example/ssecli-stream.cc`. + +### Example 2: LLM Streaming Response + +```cpp +#include "httplib.h" +#include + +int main() { + httplib::Client cli("http://localhost:11434"); // Ollama + + auto result = httplib::stream::Get(cli, "/api/generate"); + + if (result && result.status() == 200) { + while (result.next()) { + std::cout.write(result.data(), result.size()); + std::cout.flush(); + } + } + + // Check for connection errors + if (result.read_error() != httplib::Error::Success) { + std::cerr << "Connection lost\n"; + } + + return 0; +} +``` + +### Example 3: Large File Download with Progress + +```cpp +#include "httplib.h" +#include +#include + +int main() { + httplib::Client cli("http://example.com"); + auto result = httplib::stream::Get(cli, "/large-file.zip"); + + if (!result || result.status() != 200) { + std::cerr << "Download failed\n"; + return 1; + } + + std::ofstream file("download.zip", std::ios::binary); + size_t total = 0; + + while (result.next()) { + file.write(result.data(), result.size()); + total += result.size(); + std::cout << "\rDownloaded: " << (total / 1024) << " KB" << std::flush; + } + + std::cout << "\nComplete!\n"; + return 0; +} +``` + +### Example 4: Reverse Proxy Streaming + +```cpp +#include "httplib.h" + +httplib::Server svr; + +svr.Get("/proxy/(.*)", [](const httplib::Request& req, httplib::Response& res) { + httplib::Client upstream("http://backend:8080"); + auto handle = upstream.open_stream("/" + req.matches[1].str()); + + if (!handle.is_valid()) { + res.status = 502; + return; + } + + res.status = handle.response->status; + res.set_chunked_content_provider( + handle.response->get_header_value("Content-Type"), + [handle = std::move(handle)](size_t, httplib::DataSink& sink) mutable { + char buf[8192]; + auto n = handle.read(buf, sizeof(buf)); + if (n > 0) { + sink.write(buf, static_cast(n)); + return true; + } + sink.done(); + return true; + } + ); +}); + +svr.listen("0.0.0.0", 3000); +``` + +## Comparison with Existing APIs + +| Feature | `Client::Get()` | `open_stream()` | `stream::Get()` | +|---------|----------------|-----------------|----------------| +| Headers available | After complete | Immediately | Immediately | +| Body reading | All at once | Direct from socket | Iterator-based | +| Memory usage | Full body in RAM | Minimal (controlled) | Minimal (controlled) | +| Keep-Alive support | ✅ Yes | ❌ No | ❌ No | +| Compression | Auto-handled | Auto-handled | Auto-handled | +| Best for | Small responses, Keep-Alive | Low-level streaming | Easy streaming | + +## Features + +- **True socket-level streaming**: Data is read directly from the network socket +- **Low memory footprint**: Only the current chunk is held in memory +- **Compression support**: Automatic decompression for gzip, brotli, and zstd +- **Chunked transfer**: Full support for chunked transfer encoding +- **SSL/TLS support**: Works with HTTPS connections + +## Important Notes + +### Keep-Alive Behavior + +The streaming API (`stream::Get()` / `open_stream()`) takes ownership of the socket connection for the duration of the stream. This means: + +- **Keep-Alive is not supported** for streaming connections +- The socket is closed when `StreamHandle` is destroyed +- For Keep-Alive scenarios, use the standard `client.Get()` API instead + +```cpp +// Use for streaming (no Keep-Alive) +auto result = httplib::stream::Get(cli, "/large-stream"); +while (result.next()) { /* ... */ } + +// Use for Keep-Alive connections +auto res = cli.Get("/api/data"); // Connection can be reused +``` + +## Related + +- [Issue #2269](https://github.com/yhirose/cpp-httplib/issues/2269) - Original feature request +- [example/ssecli-stream.cc](./example/ssecli-stream.cc) - SSE client with auto-reconnection diff --git a/README.md b/README.md index edecbb6a42..01eca3ea39 100644 --- a/README.md +++ b/README.md @@ -1188,6 +1188,31 @@ std::string decoded_component = httplib::decode_uri_component(encoded_component) Use `encode_uri()` for full URLs and `encode_uri_component()` for individual query parameters or path segments. +Streaming API +------------- + +Process large responses without loading everything into memory. + +```c++ +httplib::Client cli("localhost", 8080); + +auto result = httplib::stream::Get(cli, "/large-file"); +if (result) { + while (result.next()) { + process(result.data(), result.size()); // Process each chunk as it arrives + } +} + +// Or read the entire body at once +auto result2 = httplib::stream::Get(cli, "/file"); +if (result2) { + std::string body = result2.read_all(); +} +``` + +All HTTP methods are supported: `stream::Get`, `Post`, `Put`, `Patch`, `Delete`, `Head`, `Options`. + +See [README-stream.md](README-stream.md) for more details. Split httplib.h into .h and .cc ------------------------------- diff --git a/example/Makefile b/example/Makefile index 3082b88014..0d3c46d080 100644 --- a/example/Makefile +++ b/example/Makefile @@ -18,7 +18,7 @@ ZLIB_SUPPORT = -DCPPHTTPLIB_ZLIB_SUPPORT -lz BROTLI_DIR = $(PREFIX)/opt/brotli BROTLI_SUPPORT = -DCPPHTTPLIB_BROTLI_SUPPORT -I$(BROTLI_DIR)/include -L$(BROTLI_DIR)/lib -lbrotlicommon -lbrotlienc -lbrotlidec -all: server client hello simplecli simplesvr upload redirect ssesvr ssecli benchmark one_time_request server_and_client accept_header +all: server client hello simplecli simplesvr upload redirect ssesvr ssecli ssecli-stream benchmark one_time_request server_and_client accept_header server : server.cc ../httplib.h Makefile $(CXX) -o server $(CXXFLAGS) server.cc $(OPENSSL_SUPPORT) $(ZLIB_SUPPORT) $(BROTLI_SUPPORT) @@ -47,6 +47,9 @@ ssesvr : ssesvr.cc ../httplib.h Makefile ssecli : ssecli.cc ../httplib.h Makefile $(CXX) -o ssecli $(CXXFLAGS) ssecli.cc $(OPENSSL_SUPPORT) $(ZLIB_SUPPORT) $(BROTLI_SUPPORT) +ssecli-stream : ssecli-stream.cc ../httplib.h ../httplib.h Makefile + $(CXX) -o ssecli-stream $(CXXFLAGS) ssecli-stream.cc $(OPENSSL_SUPPORT) $(ZLIB_SUPPORT) $(BROTLI_SUPPORT) + benchmark : benchmark.cc ../httplib.h Makefile $(CXX) -o benchmark $(CXXFLAGS) benchmark.cc $(OPENSSL_SUPPORT) $(ZLIB_SUPPORT) $(BROTLI_SUPPORT) @@ -64,4 +67,4 @@ pem: openssl req -new -key key.pem | openssl x509 -days 3650 -req -signkey key.pem > cert.pem clean: - rm server client hello simplecli simplesvr upload redirect ssesvr ssecli benchmark one_time_request server_and_client accept_header *.pem + rm server client hello simplecli simplesvr upload redirect ssesvr ssecli ssecli-stream benchmark one_time_request server_and_client accept_header *.pem diff --git a/example/ssecli-stream.cc b/example/ssecli-stream.cc new file mode 100644 index 0000000000..0dd279ad37 --- /dev/null +++ b/example/ssecli-stream.cc @@ -0,0 +1,234 @@ +// +// ssecli-stream.cc +// +// Copyright (c) 2025 Yuji Hirose. All rights reserved. +// MIT License +// +// SSE (Server-Sent Events) client example using Streaming API +// with automatic reconnection support (similar to JavaScript's EventSource) +// + +#include + +#include +#include +#include +#include + +//------------------------------------------------------------------------------ +// SSE Event Parser +//------------------------------------------------------------------------------ +// Parses SSE events from the stream according to the SSE specification. +// SSE format: +// event: (optional, defaults to "message") +// data: (can have multiple lines) +// id: (optional, used for reconnection) +// retry: (optional, reconnection interval) +// (signals end of event) +// +struct SSEEvent { + std::string event = "message"; // Event type (default: "message") + std::string data; // Event payload + std::string id; // Event ID for Last-Event-ID header + + void clear() { + event = "message"; + data.clear(); + id.clear(); + } +}; + +// Parse a single SSE field line (e.g., "data: hello") +// Returns true if this line ends an event (blank line) +bool parse_sse_line(const std::string &line, SSEEvent &event, int &retry_ms) { + // Blank line signals end of event + if (line.empty() || line == "\r") { return true; } + + // Find the colon separator + auto colon_pos = line.find(':'); + if (colon_pos == std::string::npos) { + // Line with no colon is treated as field name with empty value + return false; + } + + std::string field = line.substr(0, colon_pos); + std::string value; + + // Value starts after colon, skip optional single space + if (colon_pos + 1 < line.size()) { + size_t value_start = colon_pos + 1; + if (line[value_start] == ' ') { value_start++; } + value = line.substr(value_start); + // Remove trailing \r if present + if (!value.empty() && value.back() == '\r') { value.pop_back(); } + } + + // Handle known fields + if (field == "event") { + event.event = value; + } else if (field == "data") { + // Multiple data lines are concatenated with newlines + if (!event.data.empty()) { event.data += "\n"; } + event.data += value; + } else if (field == "id") { + // Empty id is valid (clears the last event ID) + event.id = value; + } else if (field == "retry") { + // Parse retry interval in milliseconds + try { + retry_ms = std::stoi(value); + } catch (...) { + // Invalid retry value, ignore + } + } + // Unknown fields are ignored per SSE spec + + return false; +} + +//------------------------------------------------------------------------------ +// Main - SSE Client with Auto-Reconnection +//------------------------------------------------------------------------------ +int main(void) { + // Configuration + const std::string host = "http://localhost:1234"; + const std::string path = "/event1"; + + httplib::Client cli(host); + + // State for reconnection (persists across connections) + std::string last_event_id; // Sent as Last-Event-ID header on reconnect + int retry_ms = 3000; // Reconnection delay (server can override via retry:) + int connection_count = 0; + + std::cout << "SSE Client starting...\n"; + std::cout << "Target: " << host << path << "\n"; + std::cout << "Press Ctrl+C to exit\n\n"; + + //---------------------------------------------------------------------------- + // Main reconnection loop + // This mimics JavaScript's EventSource behavior: + // - Automatically reconnects on connection failure + // - Sends Last-Event-ID header to resume from last received event + // - Respects server's retry interval + //---------------------------------------------------------------------------- + while (true) { + connection_count++; + std::cout << "[Connection #" << connection_count << "] Connecting...\n"; + + // Build headers, including Last-Event-ID if we have one + httplib::Headers headers; + if (!last_event_id.empty()) { + headers.emplace("Last-Event-ID", last_event_id); + std::cout << "[Connection #" << connection_count + << "] Resuming from event ID: " << last_event_id << "\n"; + } + + // Open streaming connection + auto result = httplib::stream::Get(cli, path, headers); + + //-------------------------------------------------------------------------- + // Connection error handling + //-------------------------------------------------------------------------- + if (!result) { + std::cerr << "[Connection #" << connection_count + << "] Failed: " << httplib::to_string(result.error()) << "\n"; + std::cerr << "Reconnecting in " << retry_ms << "ms...\n\n"; + std::this_thread::sleep_for(std::chrono::milliseconds(retry_ms)); + continue; + } + + if (result.status() != 200) { + std::cerr << "[Connection #" << connection_count + << "] HTTP error: " << result.status() << "\n"; + + // For certain errors, don't reconnect + if (result.status() == 204 || // No Content - server wants us to stop + result.status() == 404 || // Not Found + result.status() == 401 || // Unauthorized + result.status() == 403) { // Forbidden + std::cerr << "Permanent error, not reconnecting.\n"; + return 1; + } + + std::cerr << "Reconnecting in " << retry_ms << "ms...\n\n"; + std::this_thread::sleep_for(std::chrono::milliseconds(retry_ms)); + continue; + } + + // Verify Content-Type (optional but recommended) + auto content_type = result.get_header_value("Content-Type"); + if (content_type.find("text/event-stream") == std::string::npos) { + std::cerr << "[Warning] Content-Type is not text/event-stream: " + << content_type << "\n"; + } + + std::cout << "[Connection #" << connection_count << "] Connected!\n\n"; + + //-------------------------------------------------------------------------- + // Event receiving loop + // Reads chunks from the stream and parses SSE events + //-------------------------------------------------------------------------- + std::string buffer; + SSEEvent current_event; + int event_count = 0; + + // Read data from stream using httplib::stream API + while (result.next()) { + buffer.append(result.data(), result.size()); + + // Process complete lines in the buffer + size_t line_start = 0; + size_t newline_pos; + + while ((newline_pos = buffer.find('\n', line_start)) != + std::string::npos) { + std::string line = buffer.substr(line_start, newline_pos - line_start); + line_start = newline_pos + 1; + + // Parse the line and check if event is complete + bool event_complete = parse_sse_line(line, current_event, retry_ms); + + if (event_complete && !current_event.data.empty()) { + // Event received - process it + event_count++; + + std::cout << "--- Event #" << event_count << " ---\n"; + std::cout << "Type: " << current_event.event << "\n"; + std::cout << "Data: " << current_event.data << "\n"; + if (!current_event.id.empty()) { + std::cout << "ID: " << current_event.id << "\n"; + } + std::cout << "\n"; + + // Update last_event_id for reconnection + // Note: Empty id clears the last event ID per SSE spec + if (!current_event.id.empty()) { last_event_id = current_event.id; } + + current_event.clear(); + } + } + + // Keep unprocessed data in buffer + buffer.erase(0, line_start); + } + + //-------------------------------------------------------------------------- + // Connection ended - check why + //-------------------------------------------------------------------------- + if (result.read_error() != httplib::Error::Success) { + std::cerr << "\n[Connection #" << connection_count + << "] Error: " << httplib::to_string(result.read_error()) + << "\n"; + } else { + std::cout << "\n[Connection #" << connection_count + << "] Stream ended normally\n"; + } + + std::cout << "Received " << event_count << " events in this connection\n"; + std::cout << "Reconnecting in " << retry_ms << "ms...\n\n"; + std::this_thread::sleep_for(std::chrono::milliseconds(retry_ms)); + } + + return 0; +} diff --git a/httplib.h b/httplib.h index a235fd8ed4..f63f76cc0e 100644 --- a/httplib.h +++ b/httplib.h @@ -969,6 +969,7 @@ bool set_socket_opt_impl(socket_t sock, int level, int optname, bool set_socket_opt(socket_t sock, int level, int optname, int opt); bool set_socket_opt_time(socket_t sock, int level, int optname, time_t sec, time_t usec); +int close_socket(socket_t sock); } // namespace detail @@ -1391,6 +1392,87 @@ class Result { #endif }; +struct ClientConnection { + socket_t sock = INVALID_SOCKET; +#ifdef CPPHTTPLIB_OPENSSL_SUPPORT + SSL *ssl = nullptr; +#endif + + bool is_open() const { return sock != INVALID_SOCKET; } + + ClientConnection() = default; + + ~ClientConnection() { +#ifdef CPPHTTPLIB_OPENSSL_SUPPORT + if (ssl) { + SSL_free(ssl); + ssl = nullptr; + } +#endif + if (sock != INVALID_SOCKET) { + detail::close_socket(sock); + sock = INVALID_SOCKET; + } + } + + ClientConnection(const ClientConnection &) = delete; + ClientConnection &operator=(const ClientConnection &) = delete; + + ClientConnection(ClientConnection &&other) noexcept + : sock(other.sock) +#ifdef CPPHTTPLIB_OPENSSL_SUPPORT + , + ssl(other.ssl) +#endif + { + other.sock = INVALID_SOCKET; +#ifdef CPPHTTPLIB_OPENSSL_SUPPORT + other.ssl = nullptr; +#endif + } + + ClientConnection &operator=(ClientConnection &&other) noexcept { + if (this != &other) { + sock = other.sock; +#ifdef CPPHTTPLIB_OPENSSL_SUPPORT + ssl = other.ssl; +#endif + other.sock = INVALID_SOCKET; +#ifdef CPPHTTPLIB_OPENSSL_SUPPORT + other.ssl = nullptr; +#endif + } + return *this; + } +}; + +namespace detail { + +struct ChunkedDecoder; + +struct BodyReader { + Stream *stream = nullptr; + size_t content_length = 0; + size_t bytes_read = 0; + bool chunked = false; + bool eof = false; + std::unique_ptr chunked_decoder; + Error last_error = Error::Success; + + ssize_t read(char *buf, size_t len); + bool has_error() const { return last_error != Error::Success; } +}; + +inline ssize_t read_body_content(Stream *stream, BodyReader &br, char *buf, + size_t len) { + (void)stream; + return br.read(buf, len); +} + +class decompressor; + +} // namespace detail + class ClientImpl { public: explicit ClientImpl(const std::string &host); @@ -1405,6 +1487,43 @@ class ClientImpl { virtual bool is_valid() const; + struct StreamHandle { + std::unique_ptr response; + Error error = Error::Success; + + StreamHandle() = default; + StreamHandle(const StreamHandle &) = delete; + StreamHandle &operator=(const StreamHandle &) = delete; + StreamHandle(StreamHandle &&) = default; + StreamHandle &operator=(StreamHandle &&) = default; + ~StreamHandle() = default; + + bool is_valid() const { + return response != nullptr && error == Error::Success; + } + + ssize_t read(char *buf, size_t len); + void parse_trailers_if_needed(); + Error get_read_error() const { return body_reader_.last_error; } + bool has_read_error() const { return body_reader_.has_error(); } + + bool trailers_parsed_ = false; + + private: + friend class ClientImpl; + + ssize_t read_with_decompression(char *buf, size_t len); + + std::unique_ptr connection_; + std::unique_ptr socket_stream_; + Stream *stream_ = nullptr; + detail::BodyReader body_reader_; + + std::unique_ptr decompressor_; + std::string decompress_buffer_; + size_t decompress_offset_ = 0; + }; + // clang-format off Result Get(const std::string &path, DownloadProgress progress = nullptr); Result Get(const std::string &path, ContentReceiver content_receiver, DownloadProgress progress = nullptr); @@ -1498,6 +1617,15 @@ class ClientImpl { Result Options(const std::string &path, const Headers &headers); // clang-format on + // Streaming API: Open a stream for reading response body incrementally + // Socket ownership is transferred to StreamHandle for true streaming + // Supports all HTTP methods (GET, POST, PUT, PATCH, DELETE, etc.) + StreamHandle open_stream(const std::string &method, const std::string &path, + const Params ¶ms = {}, + const Headers &headers = {}, + const std::string &body = {}, + const std::string &content_type = {}); + bool send(Request &req, Response &res, Error &error); Result send(const Request &req); @@ -1593,6 +1721,7 @@ class ClientImpl { }; virtual bool create_and_connect_socket(Socket &socket, Error &error); + virtual bool ensure_socket_connection(Socket &socket, Error &error); // All of: // shutdown_ssl @@ -1718,6 +1847,8 @@ class ClientImpl { Response &res) const; bool write_request(Stream &strm, Request &req, bool close_connection, Error &error); + void prepare_default_headers(Request &r, bool for_stream, + const std::string &ct); bool redirect(Request &req, Response &res, Error &error); bool create_redirect_client(const std::string &scheme, const std::string &host, int port, Request &req, @@ -1748,6 +1879,8 @@ class ClientImpl { std::chrono::time_point start_time, std::function callback); virtual bool is_ssl() const; + + void transfer_socket_ownership_to_handle(StreamHandle &handle); }; class Client { @@ -1866,6 +1999,16 @@ class Client { Result Options(const std::string &path, const Headers &headers); // clang-format on + // Streaming API: Open a stream for reading response body incrementally + // Socket ownership is transferred to StreamHandle for true streaming + // Supports all HTTP methods (GET, POST, PUT, PATCH, DELETE, etc.) + ClientImpl::StreamHandle open_stream(const std::string &method, + const std::string &path, + const Params ¶ms = {}, + const Headers &headers = {}, + const std::string &body = {}, + const std::string &content_type = {}); + bool send(Request &req, Response &res, Error &error); Result send(const Request &req); @@ -2028,6 +2171,7 @@ class SSLClient final : public ClientImpl { private: bool create_and_connect_socket(Socket &socket, Error &error) override; + bool ensure_socket_connection(Socket &socket, Error &error) override; void shutdown_ssl(Socket &socket, bool shutdown_gracefully) override; void shutdown_ssl_impl(Socket &socket, bool shutdown_gracefully); @@ -2670,6 +2814,25 @@ class stream_line_reader { std::string growable_buffer_; }; +bool parse_trailers(stream_line_reader &line_reader, Headers &dest, + const Headers &src_headers); + +struct ChunkedDecoder { + Stream &strm; + size_t chunk_remaining = 0; + bool finished = false; + char line_buf[64]; + size_t last_chunk_total = 0; + size_t last_chunk_offset = 0; + + explicit ChunkedDecoder(Stream &s); + + ssize_t read_payload(char *buf, size_t len, size_t &out_chunk_offset, + size_t &out_chunk_total); + + bool parse_trailers_into(Headers &dest, const Headers &src_headers); +}; + class mmap { public: mmap(const char *path); @@ -2959,6 +3122,149 @@ inline std::string file_extension(const std::string &path) { inline bool is_space_or_tab(char c) { return c == ' ' || c == '\t'; } +template +inline bool parse_header(const char *beg, const char *end, T fn); + +template +inline bool parse_header(const char *beg, const char *end, T fn) { + // Skip trailing spaces and tabs. + while (beg < end && is_space_or_tab(end[-1])) { + end--; + } + + auto p = beg; + while (p < end && *p != ':') { + p++; + } + + auto name = std::string(beg, p); + if (!detail::fields::is_field_name(name)) { return false; } + + if (p == end) { return false; } + + auto key_end = p; + + if (*p++ != ':') { return false; } + + while (p < end && is_space_or_tab(*p)) { + p++; + } + + if (p <= end) { + auto key_len = key_end - beg; + if (!key_len) { return false; } + + auto key = std::string(beg, key_end); + auto val = std::string(p, end); + + if (!detail::fields::is_field_value(val)) { return false; } + + if (case_ignore::equal(key, "Location") || + case_ignore::equal(key, "Referer")) { + fn(key, val); + } else { + fn(key, decode_path_component(val)); + } + + return true; + } + + return false; +} + +inline bool parse_trailers(stream_line_reader &line_reader, Headers &dest, + const Headers &src_headers) { + // NOTE: In RFC 9112, '7.1 Chunked Transfer Coding' mentions "The chunked + // transfer coding is complete when a chunk with a chunk-size of zero is + // received, possibly followed by a trailer section, and finally terminated by + // an empty line". https://www.rfc-editor.org/rfc/rfc9112.html#section-7.1 + // + // In '7.1.3. Decoding Chunked', however, the pseudo-code in the section + // doesn't care for the existence of the final CRLF. In other words, it seems + // to be ok whether the final CRLF exists or not in the chunked data. + // https://www.rfc-editor.org/rfc/rfc9112.html#section-7.1.3 + // + // According to the reference code in RFC 9112, cpp-httplib now allows + // chunked transfer coding data without the final CRLF. + + // RFC 7230 Section 4.1.2 - Headers prohibited in trailers + thread_local case_ignore::unordered_set prohibited_trailers = { + "transfer-encoding", + "content-length", + "host", + "authorization", + "www-authenticate", + "proxy-authenticate", + "proxy-authorization", + "cookie", + "set-cookie", + "cache-control", + "expect", + "max-forwards", + "pragma", + "range", + "te", + "age", + "expires", + "date", + "location", + "retry-after", + "vary", + "warning", + "content-encoding", + "content-type", + "content-range", + "trailer"}; + + case_ignore::unordered_set declared_trailers; + auto trailer_header = get_header_value(src_headers, "Trailer", "", 0); + if (trailer_header && std::strlen(trailer_header)) { + auto len = std::strlen(trailer_header); + split(trailer_header, trailer_header + len, ',', + [&](const char *b, const char *e) { + const char *kbeg = b; + const char *kend = e; + while (kbeg < kend && (*kbeg == ' ' || *kbeg == '\t')) { + ++kbeg; + } + while (kend > kbeg && (kend[-1] == ' ' || kend[-1] == '\t')) { + --kend; + } + std::string key(kbeg, static_cast(kend - kbeg)); + if (!key.empty() && + prohibited_trailers.find(key) == prohibited_trailers.end()) { + declared_trailers.insert(key); + } + }); + } + + size_t trailer_header_count = 0; + while (strcmp(line_reader.ptr(), "\r\n") != 0) { + if (line_reader.size() > CPPHTTPLIB_HEADER_MAX_LENGTH) { return false; } + if (trailer_header_count >= CPPHTTPLIB_HEADER_MAX_COUNT) { return false; } + + constexpr auto line_terminator_len = 2; + auto line_beg = line_reader.ptr(); + auto line_end = + line_reader.ptr() + line_reader.size() - line_terminator_len; + + if (!parse_header(line_beg, line_end, + [&](const std::string &key, const std::string &val) { + if (declared_trailers.find(key) != + declared_trailers.end()) { + dest.emplace(key, val); + trailer_header_count++; + } + })) { + return false; + } + + if (!line_reader.getline()) { return false; } + } + + return true; +} + inline std::pair trim(const char *b, const char *e, size_t left, size_t right) { while (b + left < e && is_space_or_tab(b[left])) { @@ -4381,6 +4687,7 @@ inline EncodingType encoding_type(const Request &req, const Response &res) { return EncodingType::None; } +#ifdef CPPHTTPLIB_ZLIB_SUPPORT inline bool nocompressor::compress(const char *data, size_t data_length, bool /*last*/, Callback callback) { @@ -4388,7 +4695,6 @@ inline bool nocompressor::compress(const char *data, size_t data_length, return callback(data, data_length); } -#ifdef CPPHTTPLIB_ZLIB_SUPPORT inline gzip_compressor::gzip_compressor() { std::memset(&strm_, 0, sizeof(strm_)); strm_.zalloc = Z_NULL; @@ -4642,6 +4948,27 @@ inline bool zstd_decompressor::decompress(const char *data, size_t data_length, } #endif +inline std::unique_ptr +create_decompressor(const std::string &encoding) { + std::unique_ptr decompressor; + + if (encoding == "gzip" || encoding == "deflate") { +#ifdef CPPHTTPLIB_ZLIB_SUPPORT + decompressor = detail::make_unique(); +#endif + } else if (encoding.find("br") != std::string::npos) { +#ifdef CPPHTTPLIB_BROTLI_SUPPORT + decompressor = detail::make_unique(); +#endif + } else if (encoding == "zstd" || encoding.find("zstd") != std::string::npos) { +#ifdef CPPHTTPLIB_ZSTD_SUPPORT + decompressor = detail::make_unique(); +#endif + } + + return decompressor; +} + inline bool is_prohibited_header_name(const std::string &name) { using udl::operator""_t; @@ -4678,53 +5005,6 @@ inline const char *get_header_value(const Headers &headers, return def; } -template -inline bool parse_header(const char *beg, const char *end, T fn) { - // Skip trailing spaces and tabs. - while (beg < end && is_space_or_tab(end[-1])) { - end--; - } - - auto p = beg; - while (p < end && *p != ':') { - p++; - } - - auto name = std::string(beg, p); - if (!detail::fields::is_field_name(name)) { return false; } - - if (p == end) { return false; } - - auto key_end = p; - - if (*p++ != ':') { return false; } - - while (p < end && is_space_or_tab(*p)) { - p++; - } - - if (p <= end) { - auto key_len = key_end - beg; - if (!key_len) { return false; } - - auto key = std::string(beg, key_end); - auto val = std::string(p, end); - - if (!detail::fields::is_field_value(val)) { return false; } - - if (case_ignore::equal(key, "Location") || - case_ignore::equal(key, "Referer")) { - fn(key, val); - } else { - fn(key, decode_path_component(val)); - } - - return true; - } - - return false; -} - inline bool read_headers(Stream &strm, Headers &headers) { const auto bufsiz = 2048; char buf[bufsiz]; @@ -4776,10 +5056,18 @@ inline bool read_content_with_length(Stream &strm, size_t len, ContentReceiverWithProgress out) { char buf[CPPHTTPLIB_RECV_BUFSIZ]; + detail::BodyReader br; + br.stream = &strm; + br.content_length = len; + br.chunked = false; + br.bytes_read = 0; + br.last_error = Error::Success; + size_t r = 0; while (r < len) { auto read_len = static_cast(len - r); - auto n = strm.read(buf, (std::min)(read_len, CPPHTTPLIB_RECV_BUFSIZ)); + auto to_read = (std::min)(read_len, CPPHTTPLIB_RECV_BUFSIZ); + auto n = detail::read_body_content(&strm, br, buf, to_read); if (n <= 0) { return false; } if (!out(buf, static_cast(n), r, len)) { return false; } @@ -4839,161 +5127,57 @@ template inline ReadContentResult read_content_chunked(Stream &strm, T &x, size_t payload_max_length, ContentReceiverWithProgress out) { - const auto bufsiz = 16; - char buf[bufsiz]; - - stream_line_reader line_reader(strm, buf, bufsiz); + detail::ChunkedDecoder dec(strm); - if (!line_reader.getline()) { return ReadContentResult::Error; } - - unsigned long chunk_len; + char buf[CPPHTTPLIB_RECV_BUFSIZ]; size_t total_len = 0; - while (true) { - char *end_ptr; - - chunk_len = std::strtoul(line_reader.ptr(), &end_ptr, 16); - if (end_ptr == line_reader.ptr()) { return ReadContentResult::Error; } - if (chunk_len == ULONG_MAX) { return ReadContentResult::Error; } + for (;;) { + size_t chunk_offset = 0; + size_t chunk_total = 0; + auto n = dec.read_payload(buf, sizeof(buf), chunk_offset, chunk_total); + if (n < 0) { return ReadContentResult::Error; } - if (chunk_len == 0) { break; } + if (n == 0) { + if (!dec.parse_trailers_into(x.trailers, x.headers)) { + return ReadContentResult::Error; + } + return ReadContentResult::Success; + } - // Check if adding this chunk would exceed the payload limit if (total_len > payload_max_length || - payload_max_length - total_len < chunk_len) { + payload_max_length - total_len < static_cast(n)) { return ReadContentResult::PayloadTooLarge; } - total_len += chunk_len; - - if (!read_content_with_length(strm, chunk_len, nullptr, out)) { - return ReadContentResult::Error; - } - - if (!line_reader.getline()) { return ReadContentResult::Error; } - - if (strcmp(line_reader.ptr(), "\r\n") != 0) { + if (!out(buf, static_cast(n), chunk_offset, chunk_total)) { return ReadContentResult::Error; } - if (!line_reader.getline()) { return ReadContentResult::Error; } + total_len += static_cast(n); } +} - assert(chunk_len == 0); +inline bool is_chunked_transfer_encoding(const Headers &headers) { + return case_ignore::equal( + get_header_value(headers, "Transfer-Encoding", "", 0), "chunked"); +} - // NOTE: In RFC 9112, '7.1 Chunked Transfer Coding' mentions "The chunked - // transfer coding is complete when a chunk with a chunk-size of zero is - // received, possibly followed by a trailer section, and finally terminated by - // an empty line". https://www.rfc-editor.org/rfc/rfc9112.html#section-7.1 - // - // In '7.1.3. Decoding Chunked', however, the pseudo-code in the section - // does't care for the existence of the final CRLF. In other words, it seems - // to be ok whether the final CRLF exists or not in the chunked data. - // https://www.rfc-editor.org/rfc/rfc9112.html#section-7.1.3 - // - // According to the reference code in RFC 9112, cpp-httplib now allows - // chunked transfer coding data without the final CRLF. - if (!line_reader.getline()) { return ReadContentResult::Success; } +template +bool prepare_content_receiver(T &x, int &status, + ContentReceiverWithProgress receiver, + bool decompress, U callback) { + if (decompress) { + std::string encoding = x.get_header_value("Content-Encoding"); + std::unique_ptr decompressor; - // RFC 7230 Section 4.1.2 - Headers prohibited in trailers - thread_local case_ignore::unordered_set prohibited_trailers = { - // Message framing - "transfer-encoding", "content-length", - - // Routing - "host", - - // Authentication - "authorization", "www-authenticate", "proxy-authenticate", - "proxy-authorization", "cookie", "set-cookie", - - // Request modifiers - "cache-control", "expect", "max-forwards", "pragma", "range", "te", - - // Response control - "age", "expires", "date", "location", "retry-after", "vary", "warning", - - // Payload processing - "content-encoding", "content-type", "content-range", "trailer"}; - - // Parse declared trailer headers once for performance - case_ignore::unordered_set declared_trailers; - if (has_header(x.headers, "Trailer")) { - auto trailer_header = get_header_value(x.headers, "Trailer", "", 0); - auto len = std::strlen(trailer_header); - - split(trailer_header, trailer_header + len, ',', - [&](const char *b, const char *e) { - std::string key(b, e); - if (prohibited_trailers.find(key) == prohibited_trailers.end()) { - declared_trailers.insert(key); - } - }); - } - - size_t trailer_header_count = 0; - while (strcmp(line_reader.ptr(), "\r\n") != 0) { - if (line_reader.size() > CPPHTTPLIB_HEADER_MAX_LENGTH) { - return ReadContentResult::Error; - } - - // Check trailer header count limit - if (trailer_header_count >= CPPHTTPLIB_HEADER_MAX_COUNT) { - return ReadContentResult::Error; - } - - // Exclude line terminator - constexpr auto line_terminator_len = 2; - auto end = line_reader.ptr() + line_reader.size() - line_terminator_len; - - parse_header(line_reader.ptr(), end, - [&](const std::string &key, const std::string &val) { - if (declared_trailers.find(key) != declared_trailers.end()) { - x.trailers.emplace(key, val); - trailer_header_count++; - } - }); - - if (!line_reader.getline()) { return ReadContentResult::Error; } - } - - return ReadContentResult::Success; -} - -inline bool is_chunked_transfer_encoding(const Headers &headers) { - return case_ignore::equal( - get_header_value(headers, "Transfer-Encoding", "", 0), "chunked"); -} - -template -bool prepare_content_receiver(T &x, int &status, - ContentReceiverWithProgress receiver, - bool decompress, U callback) { - if (decompress) { - std::string encoding = x.get_header_value("Content-Encoding"); - std::unique_ptr decompressor; - - if (encoding == "gzip" || encoding == "deflate") { -#ifdef CPPHTTPLIB_ZLIB_SUPPORT - decompressor = detail::make_unique(); -#else - status = StatusCode::UnsupportedMediaType_415; - return false; -#endif - } else if (encoding.find("br") != std::string::npos) { -#ifdef CPPHTTPLIB_BROTLI_SUPPORT - decompressor = detail::make_unique(); -#else - status = StatusCode::UnsupportedMediaType_415; - return false; -#endif - } else if (encoding == "zstd") { -#ifdef CPPHTTPLIB_ZSTD_SUPPORT - decompressor = detail::make_unique(); -#else - status = StatusCode::UnsupportedMediaType_415; - return false; -#endif + if (!encoding.empty()) { + decompressor = detail::create_decompressor(encoding); + if (!decompressor) { + // Unsupported encoding or no support compiled in + status = StatusCode::UnsupportedMediaType_415; + return false; + } } if (decompressor) { @@ -7069,6 +7253,64 @@ inline ssize_t Stream::write(const std::string &s) { return write(s.data(), s.size()); } +// BodyReader implementation +inline ssize_t detail::BodyReader::read(char *buf, size_t len) { + if (!stream) { + last_error = Error::Connection; + return -1; + } + if (eof) { return 0; } + + if (!chunked) { + // Content-Length based reading + if (bytes_read >= content_length) { + eof = true; + return 0; + } + + auto remaining = content_length - bytes_read; + auto to_read = (std::min)(len, remaining); + auto n = stream->read(buf, to_read); + + if (n < 0) { + last_error = Error::Read; + eof = true; + return n; + } + if (n == 0) { + // Unexpected EOF before content_length + last_error = Error::Read; + eof = true; + return 0; + } + + bytes_read += static_cast(n); + if (bytes_read >= content_length) { eof = true; } + return n; + } + + // Chunked transfer encoding: delegate to shared decoder instance. + if (!chunked_decoder) { chunked_decoder.reset(new ChunkedDecoder(*stream)); } + + size_t chunk_offset = 0; + size_t chunk_total = 0; + auto n = chunked_decoder->read_payload(buf, len, chunk_offset, chunk_total); + if (n < 0) { + last_error = Error::Read; + eof = true; + return n; + } + + if (n == 0) { + // Final chunk observed. Leave trailer parsing to the caller (StreamHandle). + eof = true; + return 0; + } + + bytes_read += static_cast(n); + return n; +} + namespace detail { inline void calc_actual_timeout(time_t max_timeout_msec, time_t duration_msec, @@ -7936,10 +8178,39 @@ inline bool Server::read_content_core( // RFC 7230 Section 3.3.3: If this is a request message and none of the above // are true (no Transfer-Encoding and no Content-Length), then the message // body length is zero (no message body is present). + // + // For non-SSL builds, peek into the socket to detect clients that send a + // body without a Content-Length header (raw HTTP over TCP). If there is + // pending data that exceeds the configured payload limit, treat this as an + // oversized request and fail early (causing connection close). For SSL + // builds we cannot reliably peek the decrypted application bytes, so keep + // the original behaviour. +#if !defined(CPPHTTPLIB_OPENSSL_SUPPORT) && !defined(_WIN32) + if (!req.has_header("Content-Length") && + !detail::is_chunked_transfer_encoding(req.headers)) { + socket_t s = strm.socket(); + if (s != INVALID_SOCKET) { + // Peek up to payload_max_length_ + 1 bytes. If more than + // payload_max_length_ bytes are pending, reject the request. + size_t to_peek = + (payload_max_length_ > 0) + ? (std::min)(payload_max_length_ + 1, static_cast(4096)) + : 1; + std::vector peekbuf(to_peek); + ssize_t n = ::recv(s, peekbuf.data(), to_peek, MSG_PEEK); + if (n > 0 && static_cast(n) > payload_max_length_) { + // Indicate failure so connection will be closed. + return false; + } + } + return true; + } +#else if (!req.has_header("Content-Length") && !detail::is_chunked_transfer_encoding(req.headers)) { return true; } +#endif if (!detail::read_content(strm, req, payload_max_length_, res.status, nullptr, out, true)) { @@ -8757,6 +9028,26 @@ inline bool ClientImpl::create_and_connect_socket(Socket &socket, return true; } +inline bool ClientImpl::ensure_socket_connection(Socket &socket, Error &error) { + return create_and_connect_socket(socket, error); +} + +#ifdef CPPHTTPLIB_OPENSSL_SUPPORT +inline bool SSLClient::ensure_socket_connection(Socket &socket, Error &error) { + if (!ClientImpl::ensure_socket_connection(socket, error)) { return false; } + + if (!proxy_host_.empty() && proxy_port_ != -1) { return true; } + + if (!initialize_ssl(socket, error)) { + shutdown_socket(socket); + close_socket(socket); + return false; + } + + return true; +} +#endif + inline void ClientImpl::shutdown_ssl(Socket & /*socket*/, bool /*shutdown_gracefully*/) { // If there are any requests in flight from threads other than us, then it's @@ -8869,7 +9160,7 @@ inline bool ClientImpl::send_(Request &req, Response &res, Error &error) { } if (!is_alive) { - if (!create_and_connect_socket(socket_, error)) { + if (!ensure_socket_connection(socket_, error)) { output_error_log(error, &req); return false; } @@ -8887,9 +9178,11 @@ inline bool ClientImpl::send_(Request &req, Response &res, Error &error) { } } - if (!scli.initialize_ssl(socket_, error)) { - output_error_log(error, &req); - return false; + if (!proxy_host_.empty() && proxy_port_ != -1) { + if (!scli.initialize_ssl(socket_, error)) { + output_error_log(error, &req); + return false; + } } } #endif @@ -8962,6 +9255,342 @@ inline Result ClientImpl::send_(Request &&req) { #endif } +inline void ClientImpl::prepare_default_headers(Request &r, bool for_stream, + const std::string &ct) { + (void)for_stream; + for (const auto &header : default_headers_) { + if (!r.has_header(header.first)) { r.headers.insert(header); } + } + + if (!r.has_header("Host")) { + if (address_family_ == AF_UNIX) { + r.headers.emplace("Host", "localhost"); + } else { + r.headers.emplace("Host", host_and_port_); + } + } + + if (!r.has_header("Accept")) { r.headers.emplace("Accept", "*/*"); } + + if (!r.content_receiver) { + if (!r.has_header("Accept-Encoding")) { + std::string accept_encoding; +#ifdef CPPHTTPLIB_BROTLI_SUPPORT + accept_encoding = "br"; +#endif +#ifdef CPPHTTPLIB_ZLIB_SUPPORT + if (!accept_encoding.empty()) { accept_encoding += ", "; } + accept_encoding += "gzip, deflate"; +#endif +#ifdef CPPHTTPLIB_ZSTD_SUPPORT + if (!accept_encoding.empty()) { accept_encoding += ", "; } + accept_encoding += "zstd"; +#endif + r.set_header("Accept-Encoding", accept_encoding); + } + +#ifndef CPPHTTPLIB_NO_DEFAULT_USER_AGENT + if (!r.has_header("User-Agent")) { + auto agent = std::string("cpp-httplib/") + CPPHTTPLIB_VERSION; + r.set_header("User-Agent", agent); + } +#endif + } + + if (!r.body.empty()) { + if (!ct.empty() && !r.has_header("Content-Type")) { + r.headers.emplace("Content-Type", ct); + } + if (!r.has_header("Content-Length")) { + r.headers.emplace("Content-Length", std::to_string(r.body.size())); + } + } +} + +inline ClientImpl::StreamHandle +ClientImpl::open_stream(const std::string &method, const std::string &path, + const Params ¶ms, const Headers &headers, + const std::string &body, + const std::string &content_type) { + StreamHandle handle; + handle.response = detail::make_unique(); + handle.error = Error::Success; + + auto query_path = params.empty() ? path : append_query_params(path, params); + handle.connection_ = detail::make_unique(); + + { + std::lock_guard guard(socket_mutex_); + + auto is_alive = false; + if (socket_.is_open()) { + is_alive = detail::is_socket_alive(socket_.sock); +#ifdef CPPHTTPLIB_OPENSSL_SUPPORT + if (is_alive && is_ssl()) { + if (detail::is_ssl_peer_could_be_closed(socket_.ssl, socket_.sock)) { + is_alive = false; + } + } +#endif + if (!is_alive) { + shutdown_ssl(socket_, false); + shutdown_socket(socket_); + close_socket(socket_); + } + } + + if (!is_alive) { + if (!ensure_socket_connection(socket_, handle.error)) { + handle.response.reset(); + return handle; + } + +#ifdef CPPHTTPLIB_OPENSSL_SUPPORT + if (is_ssl()) { + auto &scli = static_cast(*this); + if (!proxy_host_.empty() && proxy_port_ != -1) { + if (!scli.initialize_ssl(socket_, handle.error)) { + handle.response.reset(); + return handle; + } + } + } +#endif + } + + transfer_socket_ownership_to_handle(handle); + } + +#ifdef CPPHTTPLIB_OPENSSL_SUPPORT + if (is_ssl() && handle.connection_->ssl) { + handle.socket_stream_ = detail::make_unique( + handle.connection_->sock, handle.connection_->ssl, read_timeout_sec_, + read_timeout_usec_, write_timeout_sec_, write_timeout_usec_); + } else { + handle.socket_stream_ = detail::make_unique( + handle.connection_->sock, read_timeout_sec_, read_timeout_usec_, + write_timeout_sec_, write_timeout_usec_); + } +#else + handle.socket_stream_ = detail::make_unique( + handle.connection_->sock, read_timeout_sec_, read_timeout_usec_, + write_timeout_sec_, write_timeout_usec_); +#endif + handle.stream_ = handle.socket_stream_.get(); + + Request req; + req.method = method; + req.path = query_path; + req.headers = headers; + req.body = body; + + prepare_default_headers(req, true, content_type); + + auto &strm = *handle.stream_; + if (detail::write_request_line(strm, req.method, req.path) < 0) { + handle.error = Error::Write; + handle.response.reset(); + return handle; + } + + if (!detail::write_headers(strm, req.headers)) { + handle.error = Error::Write; + handle.response.reset(); + return handle; + } + + if (!body.empty()) { + if (strm.write(body.data(), body.size()) < 0) { + handle.error = Error::Write; + handle.response.reset(); + return handle; + } + } + + if (!read_response_line(strm, req, *handle.response) || + !detail::read_headers(strm, handle.response->headers)) { + handle.error = Error::Read; + handle.response.reset(); + return handle; + } + + handle.body_reader_.stream = handle.stream_; + + auto content_length_str = handle.response->get_header_value("Content-Length"); + if (!content_length_str.empty()) { + handle.body_reader_.content_length = + static_cast(std::stoull(content_length_str)); + } + + auto transfer_encoding = + handle.response->get_header_value("Transfer-Encoding"); + handle.body_reader_.chunked = (transfer_encoding == "chunked"); + + auto content_encoding = handle.response->get_header_value("Content-Encoding"); + if (!content_encoding.empty()) { + handle.decompressor_ = detail::create_decompressor(content_encoding); + } + + return handle; +} + +inline ssize_t ClientImpl::StreamHandle::read(char *buf, size_t len) { + if (!is_valid() || !response) { return -1; } + + if (decompressor_) { return read_with_decompression(buf, len); } + auto n = detail::read_body_content(stream_, body_reader_, buf, len); + + if (n <= 0 && body_reader_.chunked && !trailers_parsed_ && stream_) { + trailers_parsed_ = true; + if (body_reader_.chunked_decoder) { + if (!body_reader_.chunked_decoder->parse_trailers_into( + response->trailers, response->headers)) { + return n; + } + } else { + detail::ChunkedDecoder dec(*stream_); + if (!dec.parse_trailers_into(response->trailers, response->headers)) { + return n; + } + } + } + + return n; +} + +inline ssize_t ClientImpl::StreamHandle::read_with_decompression(char *buf, + size_t len) { + if (decompress_offset_ < decompress_buffer_.size()) { + auto available = decompress_buffer_.size() - decompress_offset_; + auto to_copy = (std::min)(len, available); + std::memcpy(buf, decompress_buffer_.data() + decompress_offset_, to_copy); + decompress_offset_ += to_copy; + return static_cast(to_copy); + } + + decompress_buffer_.clear(); + decompress_offset_ = 0; + + constexpr size_t kDecompressionBufferSize = 8192; + char compressed_buf[kDecompressionBufferSize]; + + while (true) { + auto n = detail::read_body_content(stream_, body_reader_, compressed_buf, + sizeof(compressed_buf)); + + if (n <= 0) { return n; } + + bool decompress_ok = + decompressor_->decompress(compressed_buf, static_cast(n), + [this](const char *data, size_t data_len) { + decompress_buffer_.append(data, data_len); + return true; + }); + + if (!decompress_ok) { + body_reader_.last_error = Error::Read; + return -1; + } + + if (!decompress_buffer_.empty()) { break; } + } + + auto to_copy = (std::min)(len, decompress_buffer_.size()); + std::memcpy(buf, decompress_buffer_.data(), to_copy); + decompress_offset_ = to_copy; + return static_cast(to_copy); +} + +inline void ClientImpl::StreamHandle::parse_trailers_if_needed() { + if (!response || !stream_ || !body_reader_.chunked || trailers_parsed_) { + return; + } + + trailers_parsed_ = true; + + const auto bufsiz = 128; + char line_buf[bufsiz]; + detail::stream_line_reader line_reader(*stream_, line_buf, bufsiz); + + if (!line_reader.getline()) { return; } + + if (!detail::parse_trailers(line_reader, response->trailers, + response->headers)) { + return; + } +} + +// Inline method implementations for `ChunkedDecoder`. +namespace detail { + +inline ChunkedDecoder::ChunkedDecoder(Stream &s) : strm(s) {} + +inline ssize_t ChunkedDecoder::read_payload(char *buf, size_t len, + size_t &out_chunk_offset, + size_t &out_chunk_total) { + if (finished) { return 0; } + + if (chunk_remaining == 0) { + stream_line_reader lr(strm, line_buf, sizeof(line_buf)); + if (!lr.getline()) { return -1; } + + char *endptr = nullptr; + unsigned long chunk_len = std::strtoul(lr.ptr(), &endptr, 16); + if (endptr == lr.ptr()) { return -1; } + if (chunk_len == ULONG_MAX) { return -1; } + + if (chunk_len == 0) { + chunk_remaining = 0; + finished = true; + out_chunk_offset = 0; + out_chunk_total = 0; + return 0; + } + + chunk_remaining = static_cast(chunk_len); + last_chunk_total = chunk_remaining; + last_chunk_offset = 0; + } + + auto to_read = (std::min)(chunk_remaining, len); + auto n = strm.read(buf, to_read); + if (n <= 0) { return -1; } + + auto offset_before = last_chunk_offset; + last_chunk_offset += static_cast(n); + chunk_remaining -= static_cast(n); + + out_chunk_offset = offset_before; + out_chunk_total = last_chunk_total; + + if (chunk_remaining == 0) { + stream_line_reader lr(strm, line_buf, sizeof(line_buf)); + if (!lr.getline()) { return -1; } + if (std::strcmp(lr.ptr(), "\r\n") != 0) { return -1; } + } + + return n; +} + +inline bool ChunkedDecoder::parse_trailers_into(Headers &dest, + const Headers &src_headers) { + stream_line_reader lr(strm, line_buf, sizeof(line_buf)); + if (!lr.getline()) { return false; } + return parse_trailers(lr, dest, src_headers); +} + +} // namespace detail + +inline void +ClientImpl::transfer_socket_ownership_to_handle(StreamHandle &handle) { + handle.connection_->sock = socket_.sock; +#ifdef CPPHTTPLIB_OPENSSL_SUPPORT + handle.connection_->ssl = socket_.ssl; + socket_.ssl = nullptr; +#endif + socket_.sock = INVALID_SOCKET; +} + inline bool ClientImpl::handle_request(Stream &strm, Request &req, Response &res, bool close_connection, Error &error) { @@ -9264,42 +9893,11 @@ inline bool ClientImpl::write_request(Stream &strm, Request &req, } } - if (!req.has_header("Host")) { - // For Unix socket connections, use "localhost" as Host header (similar to - // curl behavior) - if (address_family_ == AF_UNIX) { - req.set_header("Host", "localhost"); - } else { - req.set_header("Host", host_and_port_); - } + std::string ct_for_defaults; + if (!req.has_header("Content-Type") && !req.body.empty()) { + ct_for_defaults = "text/plain"; } - - if (!req.has_header("Accept")) { req.set_header("Accept", "*/*"); } - - if (!req.content_receiver) { - if (!req.has_header("Accept-Encoding")) { - std::string accept_encoding; -#ifdef CPPHTTPLIB_BROTLI_SUPPORT - accept_encoding = "br"; -#endif -#ifdef CPPHTTPLIB_ZLIB_SUPPORT - if (!accept_encoding.empty()) { accept_encoding += ", "; } - accept_encoding += "gzip, deflate"; -#endif -#ifdef CPPHTTPLIB_ZSTD_SUPPORT - if (!accept_encoding.empty()) { accept_encoding += ", "; } - accept_encoding += "zstd"; -#endif - req.set_header("Accept-Encoding", accept_encoding); - } - -#ifndef CPPHTTPLIB_NO_DEFAULT_USER_AGENT - if (!req.has_header("User-Agent")) { - auto agent = std::string("cpp-httplib/") + CPPHTTPLIB_VERSION; - req.set_header("User-Agent", agent); - } -#endif - }; + prepare_default_headers(req, false, ct_for_defaults); if (req.body.empty()) { if (req.content_provider_) { @@ -9315,15 +9913,6 @@ inline bool ClientImpl::write_request(Stream &strm, Request &req, req.set_header("Content-Length", "0"); } } - } else { - if (!req.has_header("Content-Type")) { - req.set_header("Content-Type", "text/plain"); - } - - if (!req.has_header("Content-Length")) { - auto length = std::to_string(req.body.size()); - req.set_header("Content-Length", length); - } } if (!basic_auth_password_.empty() || !basic_auth_username_.empty()) { @@ -11276,7 +11865,7 @@ inline bool SSLClient::connect_with_proxy( close_socket(socket); // Create a new socket for the authenticated CONNECT request - if (!create_and_connect_socket(socket, error)) { + if (!ensure_socket_connection(socket, error)) { success = false; output_error_log(error, nullptr); return false; @@ -12174,6 +12763,13 @@ inline Result Client::Options(const std::string &path, const Headers &headers) { return cli_->Options(path, headers); } +inline ClientImpl::StreamHandle +Client::open_stream(const std::string &method, const std::string &path, + const Params ¶ms, const Headers &headers, + const std::string &body, const std::string &content_type) { + return cli_->open_stream(method, path, params, headers, body, content_type); +} + inline bool Client::send(Request &req, Response &res, Error &error) { return cli_->send(req, res, error); } @@ -12333,6 +12929,375 @@ inline SSL_CTX *Client::ssl_context() const { // ---------------------------------------------------------------------------- +/* + * C++11/14/17 Streaming API + * + * This section provides iterator-style streaming functionality for C++11/14/17. + * For C++20 and later, a coroutine-based API with range-for syntax is + * available. + * + * Usage: + * httplib::Client cli("example.com"); + * auto result = httplib::stream::Get(cli, "/large-file"); + * if (result) { + * while (result.next()) { + * process(result.data(), result.size()); + * } + * } + */ + +namespace stream { + +class Result { +public: + Result() : chunk_size_(8192) {} + + explicit Result(ClientImpl::StreamHandle &&handle, size_t chunk_size = 8192) + : handle_(std::move(handle)), chunk_size_(chunk_size) {} + + Result(Result &&other) noexcept + : handle_(std::move(other.handle_)), buffer_(std::move(other.buffer_)), + current_size_(other.current_size_), chunk_size_(other.chunk_size_), + finished_(other.finished_) { + other.current_size_ = 0; + other.finished_ = true; + } + + Result &operator=(Result &&other) noexcept { + if (this != &other) { + handle_ = std::move(other.handle_); + buffer_ = std::move(other.buffer_); + current_size_ = other.current_size_; + chunk_size_ = other.chunk_size_; + finished_ = other.finished_; + other.current_size_ = 0; + other.finished_ = true; + } + return *this; + } + + Result(const Result &) = delete; + Result &operator=(const Result &) = delete; + + // Check if the result is valid (connection succeeded and response received) + bool is_valid() const { return handle_.is_valid(); } + explicit operator bool() const { return is_valid(); } + + // Response status code + int status() const { + return handle_.response ? handle_.response->status : -1; + } + + // Response headers + const Headers &headers() const { + static const Headers empty_headers; + return handle_.response ? handle_.response->headers : empty_headers; + } + + std::string get_header_value(const std::string &key, + const char *def = "") const { + return handle_.response ? handle_.response->get_header_value(key, def) + : def; + } + + bool has_header(const std::string &key) const { + return handle_.response ? handle_.response->has_header(key) : false; + } + + // Error information + Error error() const { return handle_.error; } + Error read_error() const { return handle_.get_read_error(); } + bool has_read_error() const { return handle_.has_read_error(); } + + // Streaming iteration API + // Call next() to read the next chunk, then access data via data()/size() + // Returns true if data was read, false when stream is exhausted + bool next() { + if (!handle_.is_valid() || finished_) { return false; } + + if (buffer_.size() < chunk_size_) { buffer_.resize(chunk_size_); } + + ssize_t n = handle_.read(&buffer_[0], chunk_size_); + if (n > 0) { + current_size_ = static_cast(n); + return true; + } + + current_size_ = 0; + finished_ = true; + return false; + } + + // Pointer to current chunk data (valid after next() returns true) + const char *data() const { return buffer_.data(); } + + // Size of current chunk (valid after next() returns true) + size_t size() const { return current_size_; } + + // Convenience method: read all remaining data into a string + std::string read_all() { + std::string result; + while (next()) { + result.append(data(), size()); + } + return result; + } + +private: + ClientImpl::StreamHandle handle_; + std::string buffer_; + size_t current_size_ = 0; + size_t chunk_size_; + bool finished_ = false; +}; + +// GET +template +inline Result Get(ClientType &cli, const std::string &path, + size_t chunk_size = 8192) { + return Result{cli.open_stream("GET", path), chunk_size}; +} + +template +inline Result Get(ClientType &cli, const std::string &path, + const Headers &headers, size_t chunk_size = 8192) { + return Result{cli.open_stream("GET", path, {}, headers), chunk_size}; +} + +template +inline Result Get(ClientType &cli, const std::string &path, + const Params ¶ms, size_t chunk_size = 8192) { + return Result{cli.open_stream("GET", path, params), chunk_size}; +} + +template +inline Result Get(ClientType &cli, const std::string &path, + const Params ¶ms, const Headers &headers, + size_t chunk_size = 8192) { + return Result{cli.open_stream("GET", path, params, headers), chunk_size}; +} + +// POST +template +inline Result Post(ClientType &cli, const std::string &path, + const std::string &body, const std::string &content_type, + size_t chunk_size = 8192) { + return Result{cli.open_stream("POST", path, {}, {}, body, content_type), + chunk_size}; +} + +template +inline Result Post(ClientType &cli, const std::string &path, + const Headers &headers, const std::string &body, + const std::string &content_type, size_t chunk_size = 8192) { + return Result{cli.open_stream("POST", path, {}, headers, body, content_type), + chunk_size}; +} + +template +inline Result Post(ClientType &cli, const std::string &path, + const Params ¶ms, const std::string &body, + const std::string &content_type, size_t chunk_size = 8192) { + return Result{cli.open_stream("POST", path, params, {}, body, content_type), + chunk_size}; +} + +template +inline Result Post(ClientType &cli, const std::string &path, + const Params ¶ms, const Headers &headers, + const std::string &body, const std::string &content_type, + size_t chunk_size = 8192) { + return Result{ + cli.open_stream("POST", path, params, headers, body, content_type), + chunk_size}; +} + +// PUT +template +inline Result Put(ClientType &cli, const std::string &path, + const std::string &body, const std::string &content_type, + size_t chunk_size = 8192) { + return Result{cli.open_stream("PUT", path, {}, {}, body, content_type), + chunk_size}; +} + +template +inline Result Put(ClientType &cli, const std::string &path, + const Headers &headers, const std::string &body, + const std::string &content_type, size_t chunk_size = 8192) { + return Result{cli.open_stream("PUT", path, {}, headers, body, content_type), + chunk_size}; +} + +template +inline Result Put(ClientType &cli, const std::string &path, + const Params ¶ms, const std::string &body, + const std::string &content_type, size_t chunk_size = 8192) { + return Result{cli.open_stream("PUT", path, params, {}, body, content_type), + chunk_size}; +} + +template +inline Result Put(ClientType &cli, const std::string &path, + const Params ¶ms, const Headers &headers, + const std::string &body, const std::string &content_type, + size_t chunk_size = 8192) { + return Result{ + cli.open_stream("PUT", path, params, headers, body, content_type), + chunk_size}; +} + +// PATCH +template +inline Result Patch(ClientType &cli, const std::string &path, + const std::string &body, const std::string &content_type, + size_t chunk_size = 8192) { + return Result{cli.open_stream("PATCH", path, {}, {}, body, content_type), + chunk_size}; +} + +template +inline Result Patch(ClientType &cli, const std::string &path, + const Headers &headers, const std::string &body, + const std::string &content_type, size_t chunk_size = 8192) { + return Result{cli.open_stream("PATCH", path, {}, headers, body, content_type), + chunk_size}; +} + +template +inline Result Patch(ClientType &cli, const std::string &path, + const Params ¶ms, const std::string &body, + const std::string &content_type, size_t chunk_size = 8192) { + return Result{cli.open_stream("PATCH", path, params, {}, body, content_type), + chunk_size}; +} + +template +inline Result Patch(ClientType &cli, const std::string &path, + const Params ¶ms, const Headers &headers, + const std::string &body, const std::string &content_type, + size_t chunk_size = 8192) { + return Result{ + cli.open_stream("PATCH", path, params, headers, body, content_type), + chunk_size}; +} + +// DELETE +template +inline Result Delete(ClientType &cli, const std::string &path, + size_t chunk_size = 8192) { + return Result{cli.open_stream("DELETE", path), chunk_size}; +} + +template +inline Result Delete(ClientType &cli, const std::string &path, + const Headers &headers, size_t chunk_size = 8192) { + return Result{cli.open_stream("DELETE", path, {}, headers), chunk_size}; +} + +template +inline Result Delete(ClientType &cli, const std::string &path, + const std::string &body, const std::string &content_type, + size_t chunk_size = 8192) { + return Result{cli.open_stream("DELETE", path, {}, {}, body, content_type), + chunk_size}; +} + +template +inline Result Delete(ClientType &cli, const std::string &path, + const Headers &headers, const std::string &body, + const std::string &content_type, + size_t chunk_size = 8192) { + return Result{ + cli.open_stream("DELETE", path, {}, headers, body, content_type), + chunk_size}; +} + +template +inline Result Delete(ClientType &cli, const std::string &path, + const Params ¶ms, size_t chunk_size = 8192) { + return Result{cli.open_stream("DELETE", path, params), chunk_size}; +} + +template +inline Result Delete(ClientType &cli, const std::string &path, + const Params ¶ms, const Headers &headers, + size_t chunk_size = 8192) { + return Result{cli.open_stream("DELETE", path, params, headers), chunk_size}; +} + +template +inline Result Delete(ClientType &cli, const std::string &path, + const Params ¶ms, const std::string &body, + const std::string &content_type, + size_t chunk_size = 8192) { + return Result{cli.open_stream("DELETE", path, params, {}, body, content_type), + chunk_size}; +} + +template +inline Result Delete(ClientType &cli, const std::string &path, + const Params ¶ms, const Headers &headers, + const std::string &body, const std::string &content_type, + size_t chunk_size = 8192) { + return Result{ + cli.open_stream("DELETE", path, params, headers, body, content_type), + chunk_size}; +} + +// HEAD +template +inline Result Head(ClientType &cli, const std::string &path, + size_t chunk_size = 8192) { + return Result{cli.open_stream("HEAD", path), chunk_size}; +} + +template +inline Result Head(ClientType &cli, const std::string &path, + const Headers &headers, size_t chunk_size = 8192) { + return Result{cli.open_stream("HEAD", path, {}, headers), chunk_size}; +} + +template +inline Result Head(ClientType &cli, const std::string &path, + const Params ¶ms, size_t chunk_size = 8192) { + return Result{cli.open_stream("HEAD", path, params), chunk_size}; +} + +template +inline Result Head(ClientType &cli, const std::string &path, + const Params ¶ms, const Headers &headers, + size_t chunk_size = 8192) { + return Result{cli.open_stream("HEAD", path, params, headers), chunk_size}; +} + +// OPTIONS +template +inline Result Options(ClientType &cli, const std::string &path, + size_t chunk_size = 8192) { + return Result{cli.open_stream("OPTIONS", path), chunk_size}; +} + +template +inline Result Options(ClientType &cli, const std::string &path, + const Headers &headers, size_t chunk_size = 8192) { + return Result{cli.open_stream("OPTIONS", path, {}, headers), chunk_size}; +} + +template +inline Result Options(ClientType &cli, const std::string &path, + const Params ¶ms, size_t chunk_size = 8192) { + return Result{cli.open_stream("OPTIONS", path, params), chunk_size}; +} + +template +inline Result Options(ClientType &cli, const std::string &path, + const Params ¶ms, const Headers &headers, + size_t chunk_size = 8192) { + return Result{cli.open_stream("OPTIONS", path, params, headers), chunk_size}; +} + +} // namespace stream } // namespace httplib #endif // CPPHTTPLIB_HTTPLIB_H diff --git a/test/test.cc b/test/test.cc index 76713ad0cf..3282936a65 100644 --- a/test/test.cc +++ b/test/test.cc @@ -3198,6 +3198,33 @@ class ServerTest : public ::testing::Test { return true; }); }) + .Get("/streamed-chunked-with-prohibited-trailer", + [&](const Request & /*req*/, Response &res) { + auto i = new int(0); + // Declare both a prohibited trailer (Content-Length) and an + // allowed one + res.set_header("Trailer", "Content-Length, X-Allowed"); + + res.set_chunked_content_provider( + "text/plain", + [i](size_t /*offset*/, DataSink &sink) { + switch (*i) { + case 0: sink.os << "123"; break; + case 1: sink.os << "456"; break; + case 2: sink.os << "789"; break; + case 3: { + sink.done_with_trailer( + {{"Content-Length", "5"}, {"X-Allowed", "yes"}}); + } break; + } + (*i)++; + return true; + }, + [i](bool success) { + EXPECT_TRUE(success); + delete i; + }); + }) .Get("/streamed-chunked2", [&](const Request & /*req*/, Response &res) { auto i = new int(0); @@ -11686,3 +11713,754 @@ TEST(ServerRequestParsingTest, RequestWithoutContentLengthOrTransferEncoding) { &resp)); EXPECT_TRUE(resp.find("HTTP/1.1 200 OK") == 0); } + +//============================================================================== +// open_stream() Tests +//============================================================================== + +inline std::string read_all(ClientImpl::StreamHandle &handle) { + std::string result; + char buf[8192]; + ssize_t n; + while ((n = handle.read(buf, sizeof(buf))) > 0) { + result.append(buf, static_cast(n)); + } + return result; +} + +// Mock stream for unit tests +class MockStream : public Stream { +public: + std::string data; + size_t pos = 0; + ssize_t error_after = -1; // -1 = no error + + explicit MockStream(const std::string &d, ssize_t err = -1) + : data(d), error_after(err) {} + bool is_readable() const override { return true; } + bool wait_readable() const override { return true; } + bool wait_writable() const override { return true; } + ssize_t read(char *ptr, size_t size) override { + if (error_after >= 0 && pos >= static_cast(error_after)) return -1; + if (pos >= data.size()) return 0; + size_t limit = + error_after >= 0 ? static_cast(error_after) : data.size(); + size_t to_read = std::min(size, std::min(data.size() - pos, limit - pos)); + std::memcpy(ptr, data.data() + pos, to_read); + pos += to_read; + return static_cast(to_read); + } + ssize_t write(const char *, size_t) override { return -1; } + void get_remote_ip_and_port(std::string &ip, int &port) const override { + ip = "127.0.0.1"; + port = 0; + } + void get_local_ip_and_port(std::string &ip, int &port) const override { + ip = "127.0.0.1"; + port = 0; + } + socket_t socket() const override { return INVALID_SOCKET; } + time_t duration() const override { return 0; } +}; + +TEST(StreamHandleTest, Basic) { + ClientImpl::StreamHandle handle; + EXPECT_FALSE(handle.is_valid()); + handle.response = detail::make_unique(); + handle.error = Error::Connection; + EXPECT_FALSE(handle.is_valid()); + handle.error = Error::Success; + EXPECT_TRUE(handle.is_valid()); +} + +TEST(BodyReaderTest, Basic) { + MockStream stream("Hello, World!"); + detail::BodyReader reader; + reader.stream = &stream; + reader.content_length = 13; + char buf[32]; + EXPECT_EQ(13, reader.read(buf, sizeof(buf))); + EXPECT_EQ(0, reader.read(buf, sizeof(buf))); + EXPECT_TRUE(reader.eof); +} + +TEST(BodyReaderTest, NoStream) { + detail::BodyReader reader; + char buf[32]; + EXPECT_EQ(-1, reader.read(buf, sizeof(buf))); + EXPECT_EQ(Error::Connection, reader.last_error); +} + +TEST(BodyReaderTest, Error) { + MockStream stream("Hello, World!", 5); + detail::BodyReader reader; + reader.stream = &stream; + reader.content_length = 13; + char buf[32]; + EXPECT_EQ(5, reader.read(buf, sizeof(buf))); + EXPECT_EQ(-1, reader.read(buf, sizeof(buf))); + EXPECT_EQ(Error::Read, reader.last_error); +} + +// Memory buffer mode removed: StreamHandle reads only from socket streams. +// Mock-based StreamHandle tests relying on private internals are removed. + +class OpenStreamTest : public ::testing::Test { +protected: + void SetUp() override { + svr_.Get("/hello", [](const Request &, Response &res) { + res.set_content("Hello World!", "text/plain"); + }); + svr_.Get("/large", [](const Request &, Response &res) { + res.set_content(std::string(10000, 'X'), "text/plain"); + }); + svr_.Get("/chunked", [](const Request &, Response &res) { + res.set_chunked_content_provider("text/plain", + [](size_t offset, DataSink &sink) { + if (offset < 15) { + sink.write("chunk", 5); + return true; + } + sink.done(); + return true; + }); + }); + svr_.Get("/compressible", [](const Request &, Response &res) { + res.set_chunked_content_provider("text/plain", [](size_t offset, + DataSink &sink) { + if (offset < 100 * 1024) { + std::string chunk(std::min(size_t(8192), 100 * 1024 - offset), 'A'); + sink.write(chunk.data(), chunk.size()); + return true; + } + sink.done(); + return true; + }); + }); + svr_.Get("/streamed-chunked-with-prohibited-trailer", + [](const Request & /*req*/, Response &res) { + auto i = new int(0); + res.set_header("Trailer", "Content-Length, X-Allowed"); + res.set_chunked_content_provider( + "text/plain", + [i](size_t /*offset*/, DataSink &sink) { + switch (*i) { + case 0: sink.os << "123"; break; + case 1: sink.os << "456"; break; + case 2: sink.os << "789"; break; + case 3: { + sink.done_with_trailer( + {{"Content-Length", "5"}, {"X-Allowed", "yes"}}); + } break; + } + (*i)++; + return true; + }, + [i](bool success) { + EXPECT_TRUE(success); + delete i; + }); + }); + // Echo headers endpoint for header-related tests + svr_.Get("/echo-headers", [](const Request &req, Response &res) { + std::string body; + for (const auto &h : req.headers) { + body.append(h.first); + body.push_back(':'); + body.append(h.second); + body.push_back('\n'); + } + res.set_content(body, "text/plain"); + }); + svr_.Post("/echo-headers", [](const Request &req, Response &res) { + std::string body; + for (const auto &h : req.headers) { + body.append(h.first); + body.push_back(':'); + body.append(h.second); + body.push_back('\n'); + } + res.set_content(body, "text/plain"); + }); + thread_ = std::thread([this]() { svr_.listen("127.0.0.1", 8787); }); + svr_.wait_until_ready(); + } + void TearDown() override { + svr_.stop(); + if (thread_.joinable()) thread_.join(); + } + Server svr_; + std::thread thread_; +}; + +TEST_F(OpenStreamTest, Basic) { + Client cli("127.0.0.1", 8787); + auto handle = cli.open_stream("GET", "/hello"); + EXPECT_TRUE(handle.is_valid()); + EXPECT_EQ("Hello World!", read_all(handle)); +} + +TEST_F(OpenStreamTest, SmallBuffer) { + Client cli("127.0.0.1", 8787); + auto handle = cli.open_stream("GET", "/hello"); + std::string result; + char buf[4]; + ssize_t n; + while ((n = handle.read(buf, sizeof(buf))) > 0) + result.append(buf, static_cast(n)); + EXPECT_EQ("Hello World!", result); +} + +TEST_F(OpenStreamTest, DefaultHeaders) { + Client cli("127.0.0.1", 8787); + + // open_stream GET should include Host, User-Agent and Accept-Encoding + { + auto handle = cli.open_stream("GET", "/echo-headers"); + ASSERT_TRUE(handle.is_valid()); + auto body = read_all(handle); + EXPECT_NE(body.find("Host:127.0.0.1:8787"), std::string::npos); + EXPECT_NE(body.find("User-Agent:cpp-httplib/" CPPHTTPLIB_VERSION), + std::string::npos); + EXPECT_NE(body.find("Accept-Encoding:"), std::string::npos); + } + + // open_stream POST with body and no explicit content_type should NOT add + // text/plain Content-Type (behavior differs from non-streaming path), but + // should include Content-Length + { + auto handle = cli.open_stream("POST", "/echo-headers", {}, {}, "hello", ""); + ASSERT_TRUE(handle.is_valid()); + auto body = read_all(handle); + EXPECT_EQ(body.find("Content-Type: text/plain"), std::string::npos); + EXPECT_NE(body.find("Content-Length:5"), std::string::npos); + } + + // open_stream POST with explicit Content-Type should preserve it + { + auto handle = cli.open_stream("POST", "/echo-headers", {}, + {{"Content-Type", "application/custom"}}, + "{}", "application/custom"); + ASSERT_TRUE(handle.is_valid()); + auto body = read_all(handle); + EXPECT_NE(body.find("Content-Type:application/custom"), std::string::npos); + } + + // User-specified User-Agent must not be overwritten for stream API + { + auto handle = cli.open_stream("GET", "/echo-headers", {}, + {{"User-Agent", "MyAgent/1.2"}}); + ASSERT_TRUE(handle.is_valid()); + auto body = read_all(handle); + EXPECT_NE(body.find("User-Agent:MyAgent/1.2"), std::string::npos); + } +} + +TEST_F(OpenStreamTest, Large) { + Client cli("127.0.0.1", 8787); + auto handle = cli.open_stream("GET", "/large"); + EXPECT_EQ(10000u, read_all(handle).size()); +} + +TEST_F(OpenStreamTest, ConnectionError) { + Client cli("127.0.0.1", 9999); + auto handle = cli.open_stream("GET", "/hello"); + EXPECT_FALSE(handle.is_valid()); +} + +TEST_F(OpenStreamTest, Chunked) { + Client cli("127.0.0.1", 8787); + auto handle = cli.open_stream("GET", "/chunked"); + EXPECT_TRUE(handle.response && handle.response->get_header_value( + "Transfer-Encoding") == "chunked"); + EXPECT_EQ("chunkchunkchunk", read_all(handle)); +} + +TEST_F(OpenStreamTest, ProhibitedTrailersAreIgnored_Stream) { + Client cli("127.0.0.1", 8787); + auto handle = + cli.open_stream("GET", "/streamed-chunked-with-prohibited-trailer"); + ASSERT_TRUE(handle.is_valid()); + + // Consume body to allow trailers to be received/parsed + auto body = read_all(handle); + + // Explicitly parse trailers (ensure trailers are available for assertion) + handle.parse_trailers_if_needed(); + EXPECT_EQ(std::string("123456789"), body); + + // The response should include a Trailer header declaring both names + ASSERT_TRUE(handle.response); + EXPECT_TRUE(handle.response->has_header("Trailer")); + EXPECT_EQ(std::string("Content-Length, X-Allowed"), + handle.response->get_header_value("Trailer")); + + // Prohibited trailer must not be present + EXPECT_FALSE(handle.response->has_trailer("Content-Length")); + // Allowed trailer should be present + EXPECT_TRUE(handle.response->has_trailer("X-Allowed")); + EXPECT_EQ(std::string("yes"), + handle.response->get_trailer_value("X-Allowed")); + + // Verify trailers are NOT present as regular headers + EXPECT_EQ(std::string(""), + handle.response->get_header_value("Content-Length")); + EXPECT_EQ(std::string(""), handle.response->get_header_value("X-Allowed")); +} + +#ifdef CPPHTTPLIB_ZLIB_SUPPORT +TEST_F(OpenStreamTest, Gzip) { + Client cli("127.0.0.1", 8787); + auto handle = cli.open_stream("GET", "/compressible", {}, + {{"Accept-Encoding", "gzip"}}); + EXPECT_EQ("gzip", handle.response->get_header_value("Content-Encoding")); + EXPECT_EQ(100u * 1024u, read_all(handle).size()); +} +#endif + +#ifdef CPPHTTPLIB_BROTLI_SUPPORT +TEST_F(OpenStreamTest, Brotli) { + Client cli("127.0.0.1", 8787); + auto handle = + cli.open_stream("GET", "/compressible", {}, {{"Accept-Encoding", "br"}}); + EXPECT_EQ("br", handle.response->get_header_value("Content-Encoding")); + EXPECT_EQ(100u * 1024u, read_all(handle).size()); +} +#endif + +#ifdef CPPHTTPLIB_ZSTD_SUPPORT +TEST_F(OpenStreamTest, Zstd) { + Client cli("127.0.0.1", 8787); + auto handle = cli.open_stream("GET", "/compressible", {}, + {{"Accept-Encoding", "zstd"}}); + EXPECT_EQ("zstd", handle.response->get_header_value("Content-Encoding")); + EXPECT_EQ(100u * 1024u, read_all(handle).size()); +} +#endif + +#ifdef CPPHTTPLIB_OPENSSL_SUPPORT +class SSLOpenStreamTest : public ::testing::Test { +protected: + SSLOpenStreamTest() : svr_("cert.pem", "key.pem") {} + void SetUp() override { + svr_.Get("/hello", [](const Request &, Response &res) { + res.set_content("Hello SSL World!", "text/plain"); + }); + svr_.Get("/chunked", [](const Request &, Response &res) { + res.set_chunked_content_provider("text/plain", + [](size_t offset, DataSink &sink) { + if (offset < 15) { + sink.write("chunk", 5); + return true; + } + sink.done(); + return true; + }); + }); + svr_.Post("/echo", [](const Request &req, Response &res) { + res.set_content(req.body, req.get_header_value("Content-Type")); + }); + svr_.Post("/chunked-response", [](const Request &req, Response &res) { + std::string body = req.body; + res.set_chunked_content_provider( + "text/plain", [body](size_t offset, DataSink &sink) { + if (offset < body.size()) { + sink.write(body.data() + offset, body.size() - offset); + } + sink.done(); + return true; + }); + }); + thread_ = std::thread([this]() { svr_.listen("127.0.0.1", 8788); }); + svr_.wait_until_ready(); + } + void TearDown() override { + svr_.stop(); + if (thread_.joinable()) thread_.join(); + } + SSLServer svr_; + std::thread thread_; +}; + +TEST_F(SSLOpenStreamTest, Basic) { + SSLClient cli("127.0.0.1", 8788); + cli.enable_server_certificate_verification(false); + auto handle = cli.open_stream("GET", "/hello"); + ASSERT_TRUE(handle.is_valid()); + EXPECT_EQ("Hello SSL World!", read_all(handle)); +} + +TEST_F(SSLOpenStreamTest, Chunked) { + SSLClient cli("127.0.0.1", 8788); + cli.enable_server_certificate_verification(false); + + auto handle = cli.open_stream("GET", "/chunked"); + + ASSERT_TRUE(handle.is_valid()) << "Error: " << static_cast(handle.error); + EXPECT_TRUE(handle.response && handle.response->get_header_value( + "Transfer-Encoding") == "chunked"); + + auto body = read_all(handle); + EXPECT_EQ("chunkchunkchunk", body); +} + +TEST_F(SSLOpenStreamTest, Post) { + SSLClient cli("127.0.0.1", 8788); + cli.enable_server_certificate_verification(false); + + auto handle = + cli.open_stream("POST", "/echo", {}, {}, "Hello SSL POST", "text/plain"); + + ASSERT_TRUE(handle.is_valid()) << "Error: " << static_cast(handle.error); + EXPECT_EQ(200, handle.response->status); + + auto body = read_all(handle); + EXPECT_EQ("Hello SSL POST", body); +} + +TEST_F(SSLOpenStreamTest, PostChunked) { + SSLClient cli("127.0.0.1", 8788); + cli.enable_server_certificate_verification(false); + + auto handle = cli.open_stream("POST", "/chunked-response", {}, {}, + "Chunked SSL Data", "text/plain"); + + ASSERT_TRUE(handle.is_valid()); + EXPECT_EQ(200, handle.response->status); + + auto body = read_all(handle); + EXPECT_EQ("Chunked SSL Data", body); +} +#endif // CPPHTTPLIB_OPENSSL_SUPPORT + +//============================================================================== +// Parity Tests: ensure streaming and non-streaming APIs produce identical +// results for various scenarios. +//============================================================================== + +TEST(ParityTest, GetVsOpenStream) { + Server svr; + + const std::string path = "/parity"; + const std::string content = "Parity test content: hello world"; + + svr.Get(path, [&](const Request & /*req*/, Response &res) { + res.set_content(content, "text/plain"); + }); + + auto t = std::thread([&]() { svr.listen(HOST, PORT); }); + auto se = detail::scope_exit([&] { + svr.stop(); + t.join(); + ASSERT_FALSE(svr.is_running()); + }); + + svr.wait_until_ready(); + + Client cli(HOST, PORT); + + // Non-stream path + auto r1 = cli.Get(path); + ASSERT_TRUE(r1); + EXPECT_EQ(StatusCode::OK_200, r1->status); + + // Stream path + auto h = cli.open_stream("GET", path); + ASSERT_TRUE(h.is_valid()); + + EXPECT_EQ(r1->body, read_all(h)); +} + +// Helper to compress data with provided compressor type T +template +static std::string compress_payload_for_parity(const std::string &in) { + std::string out; + Compressor compressor; + bool ok = compressor.compress(in.data(), in.size(), /*last=*/true, + [&](const char *data, size_t n) { + out.append(data, n); + return true; + }); + EXPECT_TRUE(ok); + return out; +} + +// Helper function for compression parity tests +template +static void test_compression_parity(const std::string &original, + const std::string &path, + const std::string &encoding) { + const std::string compressed = + compress_payload_for_parity(original); + + Server svr; + + svr.Get(path, [&](const Request & /*req*/, Response &res) { + res.set_content(compressed, "application/octet-stream"); + res.set_header("Content-Encoding", encoding); + }); + + auto t = std::thread([&] { svr.listen("localhost", 1234); }); + auto se = detail::scope_exit([&] { + svr.stop(); + t.join(); + ASSERT_FALSE(svr.is_running()); + }); + + svr.wait_until_ready(); + + Client cli("localhost", 1234); + + // Non-streaming + { + auto res = cli.Get(path); + ASSERT_TRUE(res); + EXPECT_EQ(StatusCode::OK_200, res->status); + EXPECT_EQ(original, res->body); + } + + // Streaming + { + auto h = cli.open_stream("GET", path); + ASSERT_TRUE(h.is_valid()); + EXPECT_EQ(original, read_all(h)); + } +} + +#ifdef CPPHTTPLIB_ZLIB_SUPPORT +TEST(ParityTest, Gzip) { + test_compression_parity( + "The quick brown fox jumps over the lazy dog", "/parity-gzip", "gzip"); +} +#endif + +#ifdef CPPHTTPLIB_BROTLI_SUPPORT +TEST(ParityTest, Brotli) { + test_compression_parity( + "Hello, brotli parity test payload", "/parity-br", "br"); +} +#endif + +#ifdef CPPHTTPLIB_ZSTD_SUPPORT +TEST(ParityTest, Zstd) { + test_compression_parity( + "Zstandard parity test payload", "/parity-zstd", "zstd"); +} +#endif + +//============================================================================== +// New Stream API Tests +//============================================================================== + +inline std::string read_body(httplib::stream::Result &result) { + std::string body; + while (result.next()) { + body.append(result.data(), result.size()); + } + return body; +} + +TEST(ClientConnectionTest, Basic) { + httplib::ClientConnection conn; + EXPECT_FALSE(conn.is_open()); + conn.sock = 1; + EXPECT_TRUE(conn.is_open()); + httplib::ClientConnection conn2(std::move(conn)); + EXPECT_EQ(INVALID_SOCKET, conn.sock); + conn2.sock = INVALID_SOCKET; +} + +// Unified test server for all stream::* tests +class StreamApiTest : public ::testing::Test { +protected: + void SetUp() override { + svr_.Get("/hello", [](const httplib::Request &, httplib::Response &res) { + res.set_content("Hello World!", "text/plain"); + }); + svr_.Get("/echo-params", + [](const httplib::Request &req, httplib::Response &res) { + std::string r; + for (const auto &p : req.params) { + if (!r.empty()) r += "&"; + r += p.first + "=" + p.second; + } + res.set_content(r, "text/plain"); + }); + svr_.Post("/echo", [](const httplib::Request &req, httplib::Response &res) { + res.set_content(req.body, req.get_header_value("Content-Type")); + }); + svr_.Post("/echo-headers", + [](const httplib::Request &req, httplib::Response &res) { + std::string r; + for (const auto &h : req.headers) + r += h.first + ": " + h.second + "\n"; + res.set_content(r, "text/plain"); + }); + svr_.Post("/echo-params", + [](const httplib::Request &req, httplib::Response &res) { + std::string r = "params:"; + for (const auto &p : req.params) + r += p.first + "=" + p.second + ";"; + res.set_content(r + " body:" + req.body, "text/plain"); + }); + svr_.Post("/large", [](const httplib::Request &, httplib::Response &res) { + res.set_content(std::string(100 * 1024, 'X'), "application/octet-stream"); + }); + svr_.Put("/echo", [](const httplib::Request &req, httplib::Response &res) { + res.set_content("PUT:" + req.body, "text/plain"); + }); + svr_.Patch("/echo", + [](const httplib::Request &req, httplib::Response &res) { + res.set_content("PATCH:" + req.body, "text/plain"); + }); + svr_.Delete( + "/resource", [](const httplib::Request &req, httplib::Response &res) { + res.set_content(req.body.empty() ? "Deleted" : "Deleted:" + req.body, + "text/plain"); + }); + svr_.Get("/head-test", + [](const httplib::Request &, httplib::Response &res) { + res.set_content("body for HEAD", "text/plain"); + }); + svr_.Options("/options", + [](const httplib::Request &, httplib::Response &res) { + res.set_header("Allow", "GET, POST, PUT, DELETE, OPTIONS"); + }); + thread_ = std::thread([this]() { svr_.listen("localhost", 8790); }); + svr_.wait_until_ready(); + } + void TearDown() override { + svr_.stop(); + if (thread_.joinable()) thread_.join(); + } + httplib::Server svr_; + std::thread thread_; +}; + +// stream::Get tests +TEST_F(StreamApiTest, GetBasic) { + httplib::Client cli("localhost", 8790); + auto result = httplib::stream::Get(cli, "/hello"); + ASSERT_TRUE(result.is_valid()); + EXPECT_EQ(200, result.status()); + EXPECT_EQ("Hello World!", read_body(result)); +} + +TEST_F(StreamApiTest, GetWithParams) { + httplib::Client cli("localhost", 8790); + httplib::Params params{{"foo", "bar"}}; + auto result = httplib::stream::Get(cli, "/echo-params", params); + ASSERT_TRUE(result.is_valid()); + EXPECT_TRUE(read_body(result).find("foo=bar") != std::string::npos); +} + +TEST_F(StreamApiTest, GetConnectionError) { + httplib::Client cli("localhost", 9999); + EXPECT_FALSE(httplib::stream::Get(cli, "/hello").is_valid()); +} + +TEST_F(StreamApiTest, Get404) { + httplib::Client cli("localhost", 8790); + auto result = httplib::stream::Get(cli, "/nonexistent"); + EXPECT_TRUE(result.is_valid()); + EXPECT_EQ(404, result.status()); +} + +// stream::Post tests +TEST_F(StreamApiTest, PostBasic) { + httplib::Client cli("localhost", 8790); + auto result = httplib::stream::Post(cli, "/echo", R"({"key":"value"})", + "application/json"); + ASSERT_TRUE(result.is_valid()); + EXPECT_EQ("application/json", result.get_header_value("Content-Type")); + EXPECT_EQ(R"({"key":"value"})", read_body(result)); +} + +TEST_F(StreamApiTest, PostWithHeaders) { + httplib::Client cli("localhost", 8790); + httplib::Headers headers{{"X-Custom", "value"}}; + auto result = httplib::stream::Post(cli, "/echo-headers", headers, "body", + "text/plain"); + EXPECT_TRUE(read_body(result).find("X-Custom: value") != std::string::npos); +} + +TEST_F(StreamApiTest, PostWithParams) { + httplib::Client cli("localhost", 8790); + httplib::Params params{{"k", "v"}}; + auto result = + httplib::stream::Post(cli, "/echo-params", params, "data", "text/plain"); + auto body = read_body(result); + EXPECT_TRUE(body.find("k=v") != std::string::npos); + EXPECT_TRUE(body.find("body:data") != std::string::npos); +} + +TEST_F(StreamApiTest, PostLarge) { + httplib::Client cli("localhost", 8790); + auto result = httplib::stream::Post(cli, "/large", "", "text/plain"); + size_t total = 0; + while (result.next()) { + total += result.size(); + } + EXPECT_EQ(100u * 1024u, total); +} + +// stream::Put/Patch tests +TEST_F(StreamApiTest, PutAndPatch) { + httplib::Client cli("localhost", 8790); + auto put = httplib::stream::Put(cli, "/echo", "test", "text/plain"); + EXPECT_EQ("PUT:test", read_body(put)); + auto patch = httplib::stream::Patch(cli, "/echo", "test", "text/plain"); + EXPECT_EQ("PATCH:test", read_body(patch)); +} + +// stream::Delete tests +TEST_F(StreamApiTest, Delete) { + httplib::Client cli("localhost", 8790); + auto del1 = httplib::stream::Delete(cli, "/resource"); + EXPECT_EQ("Deleted", read_body(del1)); + auto del2 = httplib::stream::Delete(cli, "/resource", "data", "text/plain"); + EXPECT_EQ("Deleted:data", read_body(del2)); +} + +// stream::Head/Options tests +TEST_F(StreamApiTest, HeadAndOptions) { + httplib::Client cli("localhost", 8790); + auto head = httplib::stream::Head(cli, "/head-test"); + EXPECT_TRUE(head.is_valid()); + EXPECT_FALSE(head.get_header_value("Content-Length").empty()); + + auto opts = httplib::stream::Options(cli, "/options"); + EXPECT_EQ("GET, POST, PUT, DELETE, OPTIONS", opts.get_header_value("Allow")); +} + +// SSL stream::* tests +#ifdef CPPHTTPLIB_OPENSSL_SUPPORT +class SSLStreamApiTest : public ::testing::Test { +protected: + void SetUp() override { + svr_.Get("/hello", [](const httplib::Request &, httplib::Response &res) { + res.set_content("Hello SSL!", "text/plain"); + }); + svr_.Post("/echo", [](const httplib::Request &req, httplib::Response &res) { + res.set_content(req.body, "text/plain"); + }); + thread_ = std::thread([this]() { svr_.listen("127.0.0.1", 8803); }); + svr_.wait_until_ready(); + } + void TearDown() override { + svr_.stop(); + if (thread_.joinable()) thread_.join(); + } + httplib::SSLServer svr_{"cert.pem", "key.pem"}; + std::thread thread_; +}; + +TEST_F(SSLStreamApiTest, GetAndPost) { + httplib::SSLClient cli("127.0.0.1", 8803); + cli.enable_server_certificate_verification(false); + auto get = httplib::stream::Get(cli, "/hello"); + EXPECT_EQ("Hello SSL!", read_body(get)); + auto post = httplib::stream::Post(cli, "/echo", "test", "text/plain"); + EXPECT_EQ("test", read_body(post)); +} +#endif \ No newline at end of file