Skip to content

Commit 20169b3

Browse files
committed
HTTP WRITE support
1 parent 85ac466 commit 20169b3

File tree

4 files changed

+123
-5
lines changed

4 files changed

+123
-5
lines changed

extension/httpfs/httpfs.cpp

Lines changed: 107 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include "duckdb/main/client_context.hpp"
1313
#include "duckdb/main/database.hpp"
1414
#include "duckdb/main/secret/secret_manager.hpp"
15+
#include "duckdb/storage/buffer_manager.hpp"
1516
#include "http_state.hpp"
1617

1718
#include <chrono>
@@ -59,6 +60,7 @@ HTTPParams HTTPParams::ReadFrom(optional_ptr<FileOpener> opener, optional_ptr<Fi
5960
info);
6061
FileOpener::TryGetCurrentSetting(opener, "ca_cert_file", result.ca_cert_file, info);
6162
FileOpener::TryGetCurrentSetting(opener, "hf_max_per_page", result.hf_max_per_page, info);
63+
FileOpener::TryGetCurrentSetting(opener, "enable_http_write", result.enable_http_write, info);
6264

6365
// HTTP Secret lookups
6466
KeyValueSecretReader settings_reader(*opener, info, "http");
@@ -576,7 +578,100 @@ int64_t HTTPFileSystem::Read(FileHandle &handle, void *buffer, int64_t nr_bytes)
576578
}
577579

578580
void HTTPFileSystem::Write(FileHandle &handle, void *buffer, int64_t nr_bytes, idx_t location) {
579-
throw NotImplementedException("Writing to HTTP files not implemented");
581+
auto &hfh = handle.Cast<HTTPFileHandle>();
582+
583+
// Check if HTTP write is enabled
584+
if (!hfh.http_params.enable_http_write) {
585+
throw NotImplementedException("Writing to HTTP files not implemented");
586+
}
587+
588+
if (!buffer || nr_bytes <= 1) {
589+
return;
590+
}
591+
592+
// Initialize the write buffer if it is not already done
593+
if (hfh.write_buffer.empty()) {
594+
hfh.write_buffer.resize(hfh.WRITE_BUFFER_LEN);
595+
hfh.write_buffer_idx = 0;
596+
}
597+
598+
idx_t bytes_to_copy = nr_bytes;
599+
idx_t buffer_offset = 0;
600+
601+
// Accumulate data into the write buffer
602+
while (bytes_to_copy > 0) {
603+
idx_t space_in_buffer = hfh.WRITE_BUFFER_LEN - hfh.write_buffer_idx;
604+
idx_t copy_amount = MinValue<idx_t>(space_in_buffer, bytes_to_copy);
605+
606+
// Copy data to the write buffer
607+
memcpy(hfh.write_buffer.data() + hfh.write_buffer_idx, (char *)buffer + buffer_offset, copy_amount);
608+
hfh.write_buffer_idx += copy_amount;
609+
bytes_to_copy -= copy_amount;
610+
buffer_offset += copy_amount;
611+
612+
// std::cout << "Write buffer idx after write: " << hfh.write_buffer_idx << std::endl;
613+
614+
// If the buffer is full, send the data
615+
if (hfh.write_buffer_idx == hfh.WRITE_BUFFER_LEN) {
616+
// Perform the HTTP POST request
617+
FlushBuffer(hfh);
618+
}
619+
}
620+
621+
// Update the file offset
622+
hfh.file_offset += nr_bytes;
623+
624+
// std::cout << "Completed Write operation. Total bytes written: " << nr_bytes << std::endl;
625+
}
626+
627+
void HTTPFileSystem::FlushBuffer(HTTPFileHandle &hfh) {
628+
// If no data in buffer, return
629+
if (hfh.write_buffer_idx <= 1) {
630+
return;
631+
}
632+
633+
// Prepare the URL and headers for the HTTP POST request
634+
string path, proto_host_port;
635+
ParseUrl(hfh.path, path, proto_host_port);
636+
637+
HeaderMap header_map;
638+
auto headers = InitializeHeaders(header_map, hfh.http_params);
639+
640+
// Define the request lambda
641+
std::function<duckdb_httplib_openssl::Result(void)> request([&]() {
642+
auto client = GetClient(hfh.http_params, proto_host_port.c_str(), &hfh);
643+
duckdb_httplib_openssl::Request req;
644+
req.method = "POST";
645+
req.path = path;
646+
req.headers = *headers;
647+
req.headers.emplace("Content-Type", "application/octet-stream");
648+
649+
// Prepare the request body from the write buffer
650+
req.body = std::string(reinterpret_cast<const char *>(hfh.write_buffer.data()), hfh.write_buffer_idx);
651+
652+
// std::cout << "Sending request with " << hfh.write_buffer_idx << " bytes of data" << std::endl;
653+
654+
return client->send(req);
655+
});
656+
657+
// Perform the HTTP POST request and handle retries
658+
auto response = RunRequestWithRetry(request, hfh.path, "POST", hfh.http_params);
659+
660+
// Check if the response was successful (HTTP 200-299 status code)
661+
if (response->code < 200 || response->code >= 300) {
662+
throw HTTPException(*response, "HTTP POST request failed to '%s' with status code: %d", hfh.path.c_str(),
663+
response->code);
664+
}
665+
666+
// Reset the write buffer index after sending data
667+
hfh.write_buffer_idx = 0;
668+
}
669+
670+
void HTTPFileHandle::Close() {
671+
auto &fs = (HTTPFileSystem &)file_system;
672+
if (flags.OpenForWriting()) {
673+
fs.FlushBuffer(*this);
674+
}
580675
}
581676

582677
int64_t HTTPFileSystem::Write(FileHandle &handle, void *buffer, int64_t nr_bytes) {
@@ -829,5 +924,15 @@ ResponseWrapper::ResponseWrapper(duckdb_httplib_openssl::Response &res, string &
829924
body = res.body;
830925
}
831926

832-
HTTPFileHandle::~HTTPFileHandle() = default;
927+
HTTPFileHandle::~HTTPFileHandle() {
928+
if (Exception::UncaughtException()) {
929+
return;
930+
}
931+
932+
try {
933+
Close();
934+
} catch (...) { // NOLINT
935+
}
936+
}
937+
833938
} // namespace duckdb

extension/httpfs/httpfs_extension.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ static void LoadInternal(DatabaseInstance &instance) {
3939
LogicalType::BOOLEAN, Value(false));
4040
config.AddExtensionOption("ca_cert_file", "Path to a custom certificate file for self-signed certificates.",
4141
LogicalType::VARCHAR, Value(""));
42+
// Experimental HTTPFS write
43+
config.AddExtensionOption("enable_http_write", "Enable HTTPFS POST write", LogicalType::BOOLEAN, Value(false));
44+
4245
// Global S3 config
4346
config.AddExtensionOption("s3_region", "S3 Region", LogicalType::VARCHAR, Value("us-east-1"));
4447
config.AddExtensionOption("s3_access_key_id", "S3 Access Key ID", LogicalType::VARCHAR);

extension/httpfs/include/httpfs.hpp

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ struct HTTPParams {
4242
static constexpr uint64_t DEFAULT_RETRY_WAIT_MS = 100;
4343
static constexpr float DEFAULT_RETRY_BACKOFF = 4;
4444
static constexpr bool DEFAULT_FORCE_DOWNLOAD = false;
45+
static constexpr bool DEFAULT_ENABLE_HTTP_WRITE = false;
4546
static constexpr bool DEFAULT_KEEP_ALIVE = true;
4647
static constexpr bool DEFAULT_ENABLE_SERVER_CERT_VERIFICATION = false;
4748
static constexpr uint64_t DEFAULT_HF_MAX_PER_PAGE = 0;
@@ -52,6 +53,7 @@ struct HTTPParams {
5253
uint64_t retry_wait_ms = DEFAULT_RETRY_WAIT_MS;
5354
float retry_backoff = DEFAULT_RETRY_BACKOFF;
5455
bool force_download = DEFAULT_FORCE_DOWNLOAD;
56+
bool enable_http_write = DEFAULT_ENABLE_HTTP_WRITE;
5557
bool keep_alive = DEFAULT_KEEP_ALIVE;
5658
bool enable_server_cert_verification = DEFAULT_ENABLE_SERVER_CERT_VERIFICATION;
5759
idx_t hf_max_per_page = DEFAULT_HF_MAX_PER_PAGE;
@@ -116,6 +118,12 @@ class HTTPFileHandle : public FileHandle {
116118
duckdb::unique_ptr<data_t[]> read_buffer;
117119
constexpr static idx_t READ_BUFFER_LEN = 1000000;
118120

121+
// duckdb::unique_ptr<data_t[]> write_buffer;
122+
constexpr static idx_t WRITE_BUFFER_LEN = 1000000;
123+
std::vector<data_t> write_buffer; // Use a vector instead of a fixed-size array
124+
idx_t write_buffer_idx = 0; // Tracks the current index in the buffer
125+
idx_t current_buffer_len;
126+
119127
shared_ptr<HTTPState> state;
120128

121129
void AddHeaders(HeaderMap &map);
@@ -126,8 +134,7 @@ class HTTPFileHandle : public FileHandle {
126134
void StoreClient(unique_ptr<duckdb_httplib_openssl::Client> client);
127135

128136
public:
129-
void Close() override {
130-
}
137+
void Close() override;
131138

132139
protected:
133140
//! Create a new Client
@@ -139,6 +146,8 @@ class HTTPFileHandle : public FileHandle {
139146
};
140147

141148
class HTTPFileSystem : public FileSystem {
149+
friend HTTPFileHandle;
150+
142151
public:
143152
static duckdb::unique_ptr<duckdb_httplib_openssl::Client>
144153
GetClient(const HTTPParams &http_params, const char *proto_host_port, optional_ptr<HTTPFileHandle> hfs);
@@ -211,6 +220,7 @@ class HTTPFileSystem : public FileSystem {
211220
// Global cache
212221
mutex global_cache_lock;
213222
duckdb::unique_ptr<HTTPMetadataCache> global_metadata_cache;
223+
void FlushBuffer(HTTPFileHandle &hfh);
214224
};
215225

216226
} // namespace duckdb

extension_config.cmake

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ else ()
99
endif()
1010

1111
duckdb_extension_load(httpfs
12-
DONT_LINK
12+
### DONT_LINK
1313
SOURCE_DIR ${CMAKE_CURRENT_LIST_DIR}
1414
INCLUDE_DIR ${CMAKE_CURRENT_LIST_DIR}/extension/httpfs/include
1515
${LOAD_HTTPFS_TESTS}

0 commit comments

Comments
 (0)