From 20169b3280f461f7fb898a2472fdc64cdfd50cc8 Mon Sep 17 00:00:00 2001 From: lmangani Date: Tue, 4 Mar 2025 16:58:50 +0000 Subject: [PATCH 1/8] HTTP WRITE support --- extension/httpfs/httpfs.cpp | 109 +++++++++++++++++++++++++- extension/httpfs/httpfs_extension.cpp | 3 + extension/httpfs/include/httpfs.hpp | 14 +++- extension_config.cmake | 2 +- 4 files changed, 123 insertions(+), 5 deletions(-) diff --git a/extension/httpfs/httpfs.cpp b/extension/httpfs/httpfs.cpp index 6e1f635..b93b9ce 100644 --- a/extension/httpfs/httpfs.cpp +++ b/extension/httpfs/httpfs.cpp @@ -12,6 +12,7 @@ #include "duckdb/main/client_context.hpp" #include "duckdb/main/database.hpp" #include "duckdb/main/secret/secret_manager.hpp" +#include "duckdb/storage/buffer_manager.hpp" #include "http_state.hpp" #include @@ -59,6 +60,7 @@ HTTPParams HTTPParams::ReadFrom(optional_ptr opener, optional_ptr(); + + // Check if HTTP write is enabled + if (!hfh.http_params.enable_http_write) { + throw NotImplementedException("Writing to HTTP files not implemented"); + } + + if (!buffer || nr_bytes <= 1) { + return; + } + + // Initialize the write buffer if it is not already done + if (hfh.write_buffer.empty()) { + hfh.write_buffer.resize(hfh.WRITE_BUFFER_LEN); + hfh.write_buffer_idx = 0; + } + + idx_t bytes_to_copy = nr_bytes; + idx_t buffer_offset = 0; + + // Accumulate data into the write buffer + while (bytes_to_copy > 0) { + idx_t space_in_buffer = hfh.WRITE_BUFFER_LEN - hfh.write_buffer_idx; + idx_t copy_amount = MinValue(space_in_buffer, bytes_to_copy); + + // Copy data to the write buffer + memcpy(hfh.write_buffer.data() + hfh.write_buffer_idx, (char *)buffer + buffer_offset, copy_amount); + hfh.write_buffer_idx += copy_amount; + bytes_to_copy -= copy_amount; + buffer_offset += copy_amount; + + // std::cout << "Write buffer idx after write: " << hfh.write_buffer_idx << std::endl; + + // If the buffer is full, send the data + if (hfh.write_buffer_idx == hfh.WRITE_BUFFER_LEN) { + // Perform the HTTP POST request + FlushBuffer(hfh); + } + } + + // Update the file offset + hfh.file_offset += nr_bytes; + + // std::cout << "Completed Write operation. Total bytes written: " << nr_bytes << std::endl; +} + +void HTTPFileSystem::FlushBuffer(HTTPFileHandle &hfh) { + // If no data in buffer, return + if (hfh.write_buffer_idx <= 1) { + return; + } + + // Prepare the URL and headers for the HTTP POST request + string path, proto_host_port; + ParseUrl(hfh.path, path, proto_host_port); + + HeaderMap header_map; + auto headers = InitializeHeaders(header_map, hfh.http_params); + + // Define the request lambda + std::function request([&]() { + auto client = GetClient(hfh.http_params, proto_host_port.c_str(), &hfh); + duckdb_httplib_openssl::Request req; + req.method = "POST"; + req.path = path; + req.headers = *headers; + req.headers.emplace("Content-Type", "application/octet-stream"); + + // Prepare the request body from the write buffer + req.body = std::string(reinterpret_cast(hfh.write_buffer.data()), hfh.write_buffer_idx); + + // std::cout << "Sending request with " << hfh.write_buffer_idx << " bytes of data" << std::endl; + + return client->send(req); + }); + + // Perform the HTTP POST request and handle retries + auto response = RunRequestWithRetry(request, hfh.path, "POST", hfh.http_params); + + // Check if the response was successful (HTTP 200-299 status code) + if (response->code < 200 || response->code >= 300) { + throw HTTPException(*response, "HTTP POST request failed to '%s' with status code: %d", hfh.path.c_str(), + response->code); + } + + // Reset the write buffer index after sending data + hfh.write_buffer_idx = 0; +} + +void HTTPFileHandle::Close() { + auto &fs = (HTTPFileSystem &)file_system; + if (flags.OpenForWriting()) { + fs.FlushBuffer(*this); + } } int64_t HTTPFileSystem::Write(FileHandle &handle, void *buffer, int64_t nr_bytes) { @@ -829,5 +924,15 @@ ResponseWrapper::ResponseWrapper(duckdb_httplib_openssl::Response &res, string & body = res.body; } -HTTPFileHandle::~HTTPFileHandle() = default; +HTTPFileHandle::~HTTPFileHandle() { + if (Exception::UncaughtException()) { + return; + } + + try { + Close(); + } catch (...) { // NOLINT + } +} + } // namespace duckdb diff --git a/extension/httpfs/httpfs_extension.cpp b/extension/httpfs/httpfs_extension.cpp index 536dce4..828eb5e 100644 --- a/extension/httpfs/httpfs_extension.cpp +++ b/extension/httpfs/httpfs_extension.cpp @@ -39,6 +39,9 @@ static void LoadInternal(DatabaseInstance &instance) { LogicalType::BOOLEAN, Value(false)); config.AddExtensionOption("ca_cert_file", "Path to a custom certificate file for self-signed certificates.", LogicalType::VARCHAR, Value("")); + // Experimental HTTPFS write + config.AddExtensionOption("enable_http_write", "Enable HTTPFS POST write", LogicalType::BOOLEAN, Value(false)); + // Global S3 config config.AddExtensionOption("s3_region", "S3 Region", LogicalType::VARCHAR, Value("us-east-1")); config.AddExtensionOption("s3_access_key_id", "S3 Access Key ID", LogicalType::VARCHAR); diff --git a/extension/httpfs/include/httpfs.hpp b/extension/httpfs/include/httpfs.hpp index 9dc9eda..161aabd 100644 --- a/extension/httpfs/include/httpfs.hpp +++ b/extension/httpfs/include/httpfs.hpp @@ -42,6 +42,7 @@ struct HTTPParams { static constexpr uint64_t DEFAULT_RETRY_WAIT_MS = 100; static constexpr float DEFAULT_RETRY_BACKOFF = 4; static constexpr bool DEFAULT_FORCE_DOWNLOAD = false; + static constexpr bool DEFAULT_ENABLE_HTTP_WRITE = false; static constexpr bool DEFAULT_KEEP_ALIVE = true; static constexpr bool DEFAULT_ENABLE_SERVER_CERT_VERIFICATION = false; static constexpr uint64_t DEFAULT_HF_MAX_PER_PAGE = 0; @@ -52,6 +53,7 @@ struct HTTPParams { uint64_t retry_wait_ms = DEFAULT_RETRY_WAIT_MS; float retry_backoff = DEFAULT_RETRY_BACKOFF; bool force_download = DEFAULT_FORCE_DOWNLOAD; + bool enable_http_write = DEFAULT_ENABLE_HTTP_WRITE; bool keep_alive = DEFAULT_KEEP_ALIVE; bool enable_server_cert_verification = DEFAULT_ENABLE_SERVER_CERT_VERIFICATION; idx_t hf_max_per_page = DEFAULT_HF_MAX_PER_PAGE; @@ -116,6 +118,12 @@ class HTTPFileHandle : public FileHandle { duckdb::unique_ptr read_buffer; constexpr static idx_t READ_BUFFER_LEN = 1000000; + // duckdb::unique_ptr write_buffer; + constexpr static idx_t WRITE_BUFFER_LEN = 1000000; + std::vector write_buffer; // Use a vector instead of a fixed-size array + idx_t write_buffer_idx = 0; // Tracks the current index in the buffer + idx_t current_buffer_len; + shared_ptr state; void AddHeaders(HeaderMap &map); @@ -126,8 +134,7 @@ class HTTPFileHandle : public FileHandle { void StoreClient(unique_ptr client); public: - void Close() override { - } + void Close() override; protected: //! Create a new Client @@ -139,6 +146,8 @@ class HTTPFileHandle : public FileHandle { }; class HTTPFileSystem : public FileSystem { + friend HTTPFileHandle; + public: static duckdb::unique_ptr GetClient(const HTTPParams &http_params, const char *proto_host_port, optional_ptr hfs); @@ -211,6 +220,7 @@ class HTTPFileSystem : public FileSystem { // Global cache mutex global_cache_lock; duckdb::unique_ptr global_metadata_cache; + void FlushBuffer(HTTPFileHandle &hfh); }; } // namespace duckdb diff --git a/extension_config.cmake b/extension_config.cmake index 0046a0b..9ccc06b 100644 --- a/extension_config.cmake +++ b/extension_config.cmake @@ -9,7 +9,7 @@ else () endif() duckdb_extension_load(httpfs - DONT_LINK +### DONT_LINK SOURCE_DIR ${CMAKE_CURRENT_LIST_DIR} INCLUDE_DIR ${CMAKE_CURRENT_LIST_DIR}/extension/httpfs/include ${LOAD_HTTPFS_TESTS} From fd72b85a7f5ec62cb79fc77723332b1ae84c230f Mon Sep 17 00:00:00 2001 From: qxip Date: Mon, 23 Jun 2025 14:54:27 +0200 Subject: [PATCH 2/8] resync --- extension/httpfs/httpfs.cpp | 133 +++++++++------------ extension/httpfs/include/httpfs.hpp | 38 +----- extension/httpfs/include/httpfs_client.hpp | 31 +++-- extension/httpfs/s3fs.cpp | 5 +- 4 files changed, 74 insertions(+), 133 deletions(-) diff --git a/extension/httpfs/httpfs.cpp b/extension/httpfs/httpfs.cpp index 9fdc362..0561f6a 100644 --- a/extension/httpfs/httpfs.cpp +++ b/extension/httpfs/httpfs.cpp @@ -22,6 +22,17 @@ namespace duckdb { +class HTTPFSUtil : public HTTPUtil { +public: + unique_ptr InitializeParameters(optional_ptr opener, + optional_ptr info) override; + unique_ptr InitializeClient(HTTPParams &http_params, const string &proto_host_port) override; + + string GetName() const override { + return "httpfs"; + } +}; + shared_ptr HTTPFSUtil::GetHTTPUtil(optional_ptr opener) { if (opener) { auto db = opener->TryGetDatabase(); @@ -54,9 +65,9 @@ unique_ptr HTTPFSUtil::InitializeParameters(optional_ptr FileOpener::TryGetCurrentSetting(opener, "http_keep_alive", result->keep_alive, info); FileOpener::TryGetCurrentSetting(opener, "enable_server_cert_verification", result->enable_server_cert_verification, info); - FileOpener::TryGetCurrentSetting(opener, "ca_cert_file", result.ca_cert_file, info); - FileOpener::TryGetCurrentSetting(opener, "hf_max_per_page", result.hf_max_per_page, info); - FileOpener::TryGetCurrentSetting(opener, "enable_http_write", result.enable_http_write, info); + FileOpener::TryGetCurrentSetting(opener, "ca_cert_file", result->ca_cert_file, info); + FileOpener::TryGetCurrentSetting(opener, "hf_max_per_page", result->hf_max_per_page, info); + FileOpener::TryGetCurrentSetting(opener, "enable_http_write", result->enable_http_write, info); // HTTP Secret lookups KeyValueSecretReader settings_reader(*opener, info, "http"); @@ -452,90 +463,48 @@ int64_t HTTPFileSystem::Read(FileHandle &handle, void *buffer, int64_t nr_bytes) void HTTPFileSystem::Write(FileHandle &handle, void *buffer, int64_t nr_bytes, idx_t location) { auto &hfh = handle.Cast(); - // Check if HTTP write is enabled - if (!hfh.http_params.enable_http_write) { - throw NotImplementedException("Writing to HTTP files not implemented"); - } - - if (!buffer || nr_bytes <= 1) { - return; - } - - // Initialize the write buffer if it is not already done - if (hfh.write_buffer.empty()) { - hfh.write_buffer.resize(hfh.WRITE_BUFFER_LEN); - hfh.write_buffer_idx = 0; + if (location != hfh.SeekPosition()) { + throw NotImplementedException("Writing to HTTP Files must be sequential"); } - idx_t bytes_to_copy = nr_bytes; - idx_t buffer_offset = 0; - - // Accumulate data into the write buffer - while (bytes_to_copy > 0) { - idx_t space_in_buffer = hfh.WRITE_BUFFER_LEN - hfh.write_buffer_idx; - idx_t copy_amount = MinValue(space_in_buffer, bytes_to_copy); - - // Copy data to the write buffer - memcpy(hfh.write_buffer.data() + hfh.write_buffer_idx, (char *)buffer + buffer_offset, copy_amount); - hfh.write_buffer_idx += copy_amount; - bytes_to_copy -= copy_amount; - buffer_offset += copy_amount; - - // std::cout << "Write buffer idx after write: " << hfh.write_buffer_idx << std::endl; - - // If the buffer is full, send the data - if (hfh.write_buffer_idx == hfh.WRITE_BUFFER_LEN) { - // Perform the HTTP POST request + // Adapted from Write logic, but to a buffer instead of disk + idx_t remaining = nr_bytes; + auto data = reinterpret_cast(buffer); + while (remaining > 0) { + idx_t to_write = std::min(remaining, hfh.current_buffer_len - hfh.write_buffer_idx); + if (to_write > 0) { + memcpy(hfh.write_buffer.data() + hfh.write_buffer_idx, data, to_write); + hfh.write_buffer_idx += to_write; + data += to_write; + remaining -= to_write; + } + if (remaining > 0) { FlushBuffer(hfh); } } - - // Update the file offset - hfh.file_offset += nr_bytes; - - // std::cout << "Completed Write operation. Total bytes written: " << nr_bytes << std::endl; } void HTTPFileSystem::FlushBuffer(HTTPFileHandle &hfh) { - // If no data in buffer, return - if (hfh.write_buffer_idx <= 1) { + if (hfh.write_buffer_idx == 0) { return; } - // Prepare the URL and headers for the HTTP POST request string path, proto_host_port; - ParseUrl(hfh.path, path, proto_host_port); - - HeaderMap header_map; - auto headers = InitializeHeaders(header_map, hfh.http_params); + HTTPUtil::DecomposeURL(hfh.path, path, proto_host_port); - // Define the request lambda - std::function request([&]() { - auto client = GetClient(hfh.http_params, proto_host_port.c_str(), &hfh); - duckdb_httplib_openssl::Request req; - req.method = "POST"; - req.path = path; - req.headers = *headers; - req.headers.emplace("Content-Type", "application/octet-stream"); + HTTPHeaders header_map; + hfh.AddHeaders(header_map); - // Prepare the request body from the write buffer - req.body = std::string(reinterpret_cast(hfh.write_buffer.data()), hfh.write_buffer_idx); - - // std::cout << "Sending request with " << hfh.write_buffer_idx << " bytes of data" << std::endl; - - return client->send(req); - }); + auto &http_util = hfh.http_params.http_util; - // Perform the HTTP POST request and handle retries - auto response = RunRequestWithRetry(request, hfh.path, "POST", hfh.http_params); + PostRequestInfo post_request(hfh.path, header_map, hfh.http_params, + const_data_ptr_cast(hfh.write_buffer.data()), hfh.write_buffer_idx); - // Check if the response was successful (HTTP 200-299 status code) - if (response->code < 200 || response->code >= 300) { - throw HTTPException(*response, "HTTP POST request failed to '%s' with status code: %d", hfh.path.c_str(), - response->code); + auto res = http_util.Request(post_request); + if (!res->Success()) { + throw HTTPException(hfh, *res, "Failed to write to file"); } - // Reset the write buffer index after sending data hfh.write_buffer_idx = 0; } @@ -548,7 +517,7 @@ void HTTPFileHandle::Close() { int64_t HTTPFileSystem::Write(FileHandle &handle, void *buffer, int64_t nr_bytes) { auto &hfh = handle.Cast(); - Write(handle, buffer, nr_bytes, hfh.file_offset); + Write(handle, buffer, nr_bytes, hfh.SeekPosition()); return nr_bytes; } @@ -823,18 +792,26 @@ void HTTPFileHandle::StoreClient(unique_ptr client) { client_cache.StoreClient(std::move(client)); } -ResponseWrapper::ResponseWrapper(duckdb_httplib_openssl::Response &res, string &original_url) { - code = res.status; - error = res.reason; - for (auto &h : res.headers) { - headers[h.first] = h.second; +ResponseWrapper::ResponseWrapper(HTTPResponse &res, string &original_url) { + this->code = res.status; + this->error = res.reason; + for (auto &header : res.headers) { + this->headers[header.first] = header.second; } - http_url = original_url; - body = res.body; + this->http_url = res.url; + this->body = res.body; } HTTPFileHandle::~HTTPFileHandle() { DUCKDB_LOG_FILE_SYSTEM_CLOSE((*this)); -}; +} + +string HTTPFileSystem::GetName() const { + return "httpfs"; +} + +void HTTPFileSystem::Verify() { + // TODO +} } // namespace duckdb diff --git a/extension/httpfs/include/httpfs.hpp b/extension/httpfs/include/httpfs.hpp index 5f2405f..40188b7 100644 --- a/extension/httpfs/include/httpfs.hpp +++ b/extension/httpfs/include/httpfs.hpp @@ -9,6 +9,7 @@ #include "duckdb/main/client_data.hpp" #include "http_metadata_cache.hpp" #include "httpfs_client.hpp" +#include "duckdb/common/http_util.hpp" #include @@ -21,7 +22,7 @@ using HeaderMap = case_insensitive_map_t; // avoid including httplib in header struct ResponseWrapper { public: - explicit ResponseWrapper(duckdb_httplib_openssl::Response &res, string &original_url); + explicit ResponseWrapper(HTTPResponse &res, string &original_url); int code; string error; HeaderMap headers; @@ -29,41 +30,6 @@ struct ResponseWrapper { string body; }; -struct HTTPParams { - - static constexpr uint64_t DEFAULT_TIMEOUT_SECONDS = 30; // 30 sec - static constexpr uint64_t DEFAULT_RETRIES = 3; - static constexpr uint64_t DEFAULT_RETRY_WAIT_MS = 100; - static constexpr float DEFAULT_RETRY_BACKOFF = 4; - static constexpr bool DEFAULT_FORCE_DOWNLOAD = false; - static constexpr bool DEFAULT_ENABLE_HTTP_WRITE = false; - static constexpr bool DEFAULT_KEEP_ALIVE = true; - static constexpr bool DEFAULT_ENABLE_SERVER_CERT_VERIFICATION = false; - static constexpr uint64_t DEFAULT_HF_MAX_PER_PAGE = 0; - - uint64_t timeout = DEFAULT_TIMEOUT_SECONDS; // seconds component of a timeout - uint64_t timeout_usec = 0; // usec component of a timeout - uint64_t retries = DEFAULT_RETRIES; - uint64_t retry_wait_ms = DEFAULT_RETRY_WAIT_MS; - float retry_backoff = DEFAULT_RETRY_BACKOFF; - bool force_download = DEFAULT_FORCE_DOWNLOAD; - bool enable_http_write = DEFAULT_ENABLE_HTTP_WRITE; - bool keep_alive = DEFAULT_KEEP_ALIVE; - bool enable_server_cert_verification = DEFAULT_ENABLE_SERVER_CERT_VERIFICATION; - idx_t hf_max_per_page = DEFAULT_HF_MAX_PER_PAGE; - - string ca_cert_file; - string http_proxy; - idx_t http_proxy_port; - string http_proxy_username; - string http_proxy_password; - string bearer_token; - unordered_map extra_headers; - - static HTTPParams ReadFrom(optional_ptr opener, optional_ptr info); -}; - - class HTTPClientCache { public: //! Get a client from the client cache diff --git a/extension/httpfs/include/httpfs_client.hpp b/extension/httpfs/include/httpfs_client.hpp index 1d7620c..283bcc0 100644 --- a/extension/httpfs/include/httpfs_client.hpp +++ b/extension/httpfs/include/httpfs_client.hpp @@ -9,31 +9,30 @@ struct FileOpenerInfo; class HTTPState; struct HTTPFSParams : public HTTPParams { - HTTPFSParams(HTTPUtil &http_util) : HTTPParams(http_util) { - } + using HTTPParams::HTTPParams; - static constexpr bool DEFAULT_ENABLE_SERVER_CERT_VERIFICATION = false; - static constexpr uint64_t DEFAULT_HF_MAX_PER_PAGE = 0; static constexpr bool DEFAULT_FORCE_DOWNLOAD = false; + static constexpr uint64_t DEFAULT_HF_MAX_PER_PAGE = 0; + static constexpr bool DEFAULT_ENABLE_SERVER_CERT_VERIFICATION = true; bool force_download = DEFAULT_FORCE_DOWNLOAD; - bool enable_server_cert_verification = DEFAULT_ENABLE_SERVER_CERT_VERIFICATION; idx_t hf_max_per_page = DEFAULT_HF_MAX_PER_PAGE; + bool enable_server_cert_verification = DEFAULT_ENABLE_SERVER_CERT_VERIFICATION; string ca_cert_file; - string bearer_token; - shared_ptr state; }; -class HTTPFSUtil : public HTTPUtil { +class HTTPClient { public: - unique_ptr InitializeParameters(optional_ptr opener, - optional_ptr info) override; - unique_ptr InitializeClient(HTTPParams &http_params, const string &proto_host_port) override; - - static unordered_map ParseGetParameters(const string &text); - static shared_ptr GetHTTPUtil(optional_ptr opener); - - string GetName() const override; + virtual ~HTTPClient() = default; + + virtual duckdb::unique_ptr Get(const string &url, HTTPHeaders &headers, idx_t file_offset, + char *buffer_out, idx_t buffer_out_len); + virtual duckdb::unique_ptr Head(const string &url, HTTPHeaders &headers); + virtual duckdb::unique_ptr Post(const string &url, HTTPHeaders &headers, const char *buffer_in, + idx_t buffer_in_len, string &result_p, string ¶ms_p); + virtual duckdb::unique_ptr Put(const string &url, HTTPHeaders &headers, const char *buffer_in, + idx_t buffer_in_len, const string ¶ms); + virtual duckdb::unique_ptr Delete(const string &url, HTTPHeaders &headers); }; } // namespace duckdb diff --git a/extension/httpfs/s3fs.cpp b/extension/httpfs/s3fs.cpp index 46069b3..9fabe77 100644 --- a/extension/httpfs/s3fs.cpp +++ b/extension/httpfs/s3fs.cpp @@ -1044,16 +1044,15 @@ string AWSListObjectV2::Request(string &path, HTTPParams &http_params, S3AuthPar req_params += "&delimiter=%2F"; } - string listobjectv2_url = req_path + "?" + req_params; + string listobjectv2_url = parsed_url.http_proto + parsed_url.host + req_path + "?" + req_params; auto header_map = create_s3_header(req_path, req_params, parsed_url.host, "s3", "GET", s3_auth_params, "", "", "", ""); // Get requests use fresh connection - string full_host = parsed_url.http_proto + parsed_url.host; std::stringstream response; GetRequestInfo get_request( - full_host, listobjectv2_url, header_map, http_params, + listobjectv2_url, header_map, http_params, [&](const HTTPResponse &response) { if (static_cast(response.status) >= 400) { string trimmed_path = path; From 2a5966e128837cad71530b9c68221d9ec0576116 Mon Sep 17 00:00:00 2001 From: qxip Date: Mon, 23 Jun 2025 22:52:42 +0200 Subject: [PATCH 3/8] resync --- extension/httpfs/httpfs.cpp | 24 ++++++++++++++-------- extension/httpfs/include/httpfs_client.hpp | 20 ++++++++---------- extension/httpfs/s3fs.cpp | 2 +- 3 files changed, 25 insertions(+), 21 deletions(-) diff --git a/extension/httpfs/httpfs.cpp b/extension/httpfs/httpfs.cpp index 0561f6a..1df016c 100644 --- a/extension/httpfs/httpfs.cpp +++ b/extension/httpfs/httpfs.cpp @@ -22,16 +22,22 @@ namespace duckdb { -class HTTPFSUtil : public HTTPUtil { -public: - unique_ptr InitializeParameters(optional_ptr opener, - optional_ptr info) override; - unique_ptr InitializeClient(HTTPParams &http_params, const string &proto_host_port) override; +string HTTPFSUtil::GetName() const { + return "httpfs"; +} - string GetName() const override { - return "httpfs"; +unordered_map HTTPFSUtil::ParseGetParameters(const string &text) { + unordered_map result; + auto pairs = StringUtil::Split(text, '&'); + for (const auto &pair : pairs) { + auto kv = StringUtil::Split(pair, '='); + if (kv.size() != 2) { + throw IOException("Error parsing GET parameters"); + } + result[kv[0]] = kv[1]; } -}; + return result; +} shared_ptr HTTPFSUtil::GetHTTPUtil(optional_ptr opener) { if (opener) { @@ -158,7 +164,7 @@ unique_ptr HTTPFileSystem::DeleteRequest(FileHandle &handle, strin } HTTPException HTTPFileSystem::GetHTTPError(FileHandle &, const HTTPResponse &response, const string &url) { - auto status_message = HTTPFSUtil::GetStatusMessage(response.status); + auto status_message = HTTPUtil::GetStatusMessage(response.status); string error = "HTTP GET error on '" + url + "' (HTTP " + to_string(static_cast(response.status)) + " " + status_message + ")"; if (response.status == HTTPStatusCode::RangeNotSatisfiable_416) { diff --git a/extension/httpfs/include/httpfs_client.hpp b/extension/httpfs/include/httpfs_client.hpp index 283bcc0..46d5916 100644 --- a/extension/httpfs/include/httpfs_client.hpp +++ b/extension/httpfs/include/httpfs_client.hpp @@ -21,18 +21,16 @@ struct HTTPFSParams : public HTTPParams { string ca_cert_file; }; -class HTTPClient { +class HTTPFSUtil : public HTTPUtil { public: - virtual ~HTTPClient() = default; - - virtual duckdb::unique_ptr Get(const string &url, HTTPHeaders &headers, idx_t file_offset, - char *buffer_out, idx_t buffer_out_len); - virtual duckdb::unique_ptr Head(const string &url, HTTPHeaders &headers); - virtual duckdb::unique_ptr Post(const string &url, HTTPHeaders &headers, const char *buffer_in, - idx_t buffer_in_len, string &result_p, string ¶ms_p); - virtual duckdb::unique_ptr Put(const string &url, HTTPHeaders &headers, const char *buffer_in, - idx_t buffer_in_len, const string ¶ms); - virtual duckdb::unique_ptr Delete(const string &url, HTTPHeaders &headers); + unique_ptr InitializeParameters(optional_ptr opener, + optional_ptr info) override; + unique_ptr InitializeClient(HTTPParams &http_params, const string &proto_host_port) override; + + static unordered_map ParseGetParameters(const string &text); + static shared_ptr GetHTTPUtil(optional_ptr opener); + + string GetName() const override; }; } // namespace duckdb diff --git a/extension/httpfs/s3fs.cpp b/extension/httpfs/s3fs.cpp index 9fabe77..bee76e4 100644 --- a/extension/httpfs/s3fs.cpp +++ b/extension/httpfs/s3fs.cpp @@ -1016,7 +1016,7 @@ HTTPException S3FileSystem::GetS3Error(S3AuthParams &s3_auth_params, const HTTPR if (response.status == HTTPStatusCode::Forbidden_403) { extra_text = GetS3AuthError(s3_auth_params); } - auto status_message = HTTPFSUtil::GetStatusMessage(response.status); + auto status_message = HTTPUtil::GetStatusMessage(response.status); throw HTTPException(response, "HTTP GET error reading '%s' in region '%s' (HTTP %d %s)%s", url, s3_auth_params.region, response.status, status_message, extra_text); } From 24d1b32f18c8a9a68de721d1be348d6893de49ae Mon Sep 17 00:00:00 2001 From: qxip Date: Mon, 23 Jun 2025 22:54:13 +0200 Subject: [PATCH 4/8] resync --- extension/httpfs/httpfs.cpp | 13 +++++-------- extension/httpfs/include/httpfs_client.hpp | 3 +++ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/extension/httpfs/httpfs.cpp b/extension/httpfs/httpfs.cpp index 1df016c..fb23dad 100644 --- a/extension/httpfs/httpfs.cpp +++ b/extension/httpfs/httpfs.cpp @@ -498,17 +498,18 @@ void HTTPFileSystem::FlushBuffer(HTTPFileHandle &hfh) { string path, proto_host_port; HTTPUtil::DecomposeURL(hfh.path, path, proto_host_port); - HTTPHeaders header_map; + HeaderMap header_map; hfh.AddHeaders(header_map); + HTTPHeaders headers(header_map); auto &http_util = hfh.http_params.http_util; - PostRequestInfo post_request(hfh.path, header_map, hfh.http_params, + PostRequestInfo post_request(hfh.path, headers, hfh.http_params, const_data_ptr_cast(hfh.write_buffer.data()), hfh.write_buffer_idx); auto res = http_util.Request(post_request); if (!res->Success()) { - throw HTTPException(hfh, *res, "Failed to write to file"); + throw HTTPException(*res, "Failed to write to file"); } hfh.write_buffer_idx = 0; @@ -799,7 +800,7 @@ void HTTPFileHandle::StoreClient(unique_ptr client) { } ResponseWrapper::ResponseWrapper(HTTPResponse &res, string &original_url) { - this->code = res.status; + this->code = static_cast(res.status); this->error = res.reason; for (auto &header : res.headers) { this->headers[header.first] = header.second; @@ -812,10 +813,6 @@ HTTPFileHandle::~HTTPFileHandle() { DUCKDB_LOG_FILE_SYSTEM_CLOSE((*this)); } -string HTTPFileSystem::GetName() const { - return "httpfs"; -} - void HTTPFileSystem::Verify() { // TODO } diff --git a/extension/httpfs/include/httpfs_client.hpp b/extension/httpfs/include/httpfs_client.hpp index 46d5916..99b140c 100644 --- a/extension/httpfs/include/httpfs_client.hpp +++ b/extension/httpfs/include/httpfs_client.hpp @@ -19,6 +19,9 @@ struct HTTPFSParams : public HTTPParams { idx_t hf_max_per_page = DEFAULT_HF_MAX_PER_PAGE; bool enable_server_cert_verification = DEFAULT_ENABLE_SERVER_CERT_VERIFICATION; string ca_cert_file; + bool enable_http_write = false; + string bearer_token; + shared_ptr state; }; class HTTPFSUtil : public HTTPUtil { From 9621c10c4ff49a9f9d953b3c091018e9b67b4677 Mon Sep 17 00:00:00 2001 From: qxip Date: Mon, 23 Jun 2025 22:58:31 +0200 Subject: [PATCH 5/8] resync --- extension/httpfs/httpfs.cpp | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/extension/httpfs/httpfs.cpp b/extension/httpfs/httpfs.cpp index fb23dad..42e9281 100644 --- a/extension/httpfs/httpfs.cpp +++ b/extension/httpfs/httpfs.cpp @@ -500,7 +500,11 @@ void HTTPFileSystem::FlushBuffer(HTTPFileHandle &hfh) { HeaderMap header_map; hfh.AddHeaders(header_map); - HTTPHeaders headers(header_map); + + HTTPHeaders headers; + for (const auto &kv : header_map) { + headers.Insert(kv.first, kv.second); + } auto &http_util = hfh.http_params.http_util; From cd4734fd55433a5f44020567dd8938792eefe97f Mon Sep 17 00:00:00 2001 From: qxip Date: Mon, 23 Jun 2025 23:01:10 +0200 Subject: [PATCH 6/8] resync --- extension/httpfs/httpfs.cpp | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/extension/httpfs/httpfs.cpp b/extension/httpfs/httpfs.cpp index 42e9281..fb7dde9 100644 --- a/extension/httpfs/httpfs.cpp +++ b/extension/httpfs/httpfs.cpp @@ -22,23 +22,6 @@ namespace duckdb { -string HTTPFSUtil::GetName() const { - return "httpfs"; -} - -unordered_map HTTPFSUtil::ParseGetParameters(const string &text) { - unordered_map result; - auto pairs = StringUtil::Split(text, '&'); - for (const auto &pair : pairs) { - auto kv = StringUtil::Split(pair, '='); - if (kv.size() != 2) { - throw IOException("Error parsing GET parameters"); - } - result[kv[0]] = kv[1]; - } - return result; -} - shared_ptr HTTPFSUtil::GetHTTPUtil(optional_ptr opener) { if (opener) { auto db = opener->TryGetDatabase(); From 6c381843391ee5bb308960ffa04349abadfedf84 Mon Sep 17 00:00:00 2001 From: qxip Date: Mon, 23 Jun 2025 23:04:47 +0200 Subject: [PATCH 7/8] resync --- extension/httpfs/httpfs.cpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/extension/httpfs/httpfs.cpp b/extension/httpfs/httpfs.cpp index fb7dde9..b63d740 100644 --- a/extension/httpfs/httpfs.cpp +++ b/extension/httpfs/httpfs.cpp @@ -804,4 +804,8 @@ void HTTPFileSystem::Verify() { // TODO } +void HTTPFileHandle::AddHeaders(HeaderMap &map) { + // Add any necessary headers here. For now, this is a stub to resolve the linker error. +} + } // namespace duckdb From 976ad18d0c5ec2344c1e41fb8d9f3755e408ee96 Mon Sep 17 00:00:00 2001 From: qxip Date: Mon, 23 Jun 2025 23:09:51 +0200 Subject: [PATCH 8/8] resync --- extension/httpfs/httpfs.cpp | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/extension/httpfs/httpfs.cpp b/extension/httpfs/httpfs.cpp index b63d740..4bac8a4 100644 --- a/extension/httpfs/httpfs.cpp +++ b/extension/httpfs/httpfs.cpp @@ -456,18 +456,26 @@ void HTTPFileSystem::Write(FileHandle &handle, void *buffer, int64_t nr_bytes, i throw NotImplementedException("Writing to HTTP Files must be sequential"); } - // Adapted from Write logic, but to a buffer instead of disk + // Ensure the write buffer is allocated and sized + if (hfh.write_buffer.empty() || hfh.current_buffer_len == 0) { + hfh.write_buffer.resize(hfh.WRITE_BUFFER_LEN); + hfh.current_buffer_len = hfh.WRITE_BUFFER_LEN; + hfh.write_buffer_idx = 0; + } + idx_t remaining = nr_bytes; auto data = reinterpret_cast(buffer); while (remaining > 0) { - idx_t to_write = std::min(remaining, hfh.current_buffer_len - hfh.write_buffer_idx); + idx_t space_left = hfh.current_buffer_len - hfh.write_buffer_idx; + idx_t to_write = std::min(remaining, space_left); if (to_write > 0) { memcpy(hfh.write_buffer.data() + hfh.write_buffer_idx, data, to_write); hfh.write_buffer_idx += to_write; data += to_write; remaining -= to_write; } - if (remaining > 0) { + // If buffer is full, flush it + if (hfh.write_buffer_idx == hfh.current_buffer_len) { FlushBuffer(hfh); } } @@ -483,7 +491,6 @@ void HTTPFileSystem::FlushBuffer(HTTPFileHandle &hfh) { HeaderMap header_map; hfh.AddHeaders(header_map); - HTTPHeaders headers; for (const auto &kv : header_map) { headers.Insert(kv.first, kv.second);