Skip to content

HTTP WRITE support #24

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 88 additions & 4 deletions extension/httpfs/httpfs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "duckdb/main/client_context.hpp"
#include "duckdb/main/database.hpp"
#include "duckdb/main/secret/secret_manager.hpp"
#include "duckdb/storage/buffer_manager.hpp"
#include "http_state.hpp"

#include <chrono>
Expand Down Expand Up @@ -55,6 +56,7 @@ unique_ptr<HTTPParams> HTTPFSUtil::InitializeParameters(optional_ptr<FileOpener>
info);
FileOpener::TryGetCurrentSetting(opener, "ca_cert_file", result->ca_cert_file, info);
FileOpener::TryGetCurrentSetting(opener, "hf_max_per_page", result->hf_max_per_page, info);
FileOpener::TryGetCurrentSetting(opener, "enable_http_write", result->enable_http_write, info);

// HTTP Secret lookups
KeyValueSecretReader settings_reader(*opener, info, "http");
Expand Down Expand Up @@ -145,7 +147,7 @@ unique_ptr<HTTPResponse> HTTPFileSystem::DeleteRequest(FileHandle &handle, strin
}

HTTPException HTTPFileSystem::GetHTTPError(FileHandle &, const HTTPResponse &response, const string &url) {
auto status_message = HTTPFSUtil::GetStatusMessage(response.status);
auto status_message = HTTPUtil::GetStatusMessage(response.status);
string error = "HTTP GET error on '" + url + "' (HTTP " + to_string(static_cast<int>(response.status)) + " " +
status_message + ")";
if (response.status == HTTPStatusCode::RangeNotSatisfiable_416) {
Expand Down Expand Up @@ -448,12 +450,75 @@ int64_t HTTPFileSystem::Read(FileHandle &handle, void *buffer, int64_t nr_bytes)
}

void HTTPFileSystem::Write(FileHandle &handle, void *buffer, int64_t nr_bytes, idx_t location) {
throw NotImplementedException("Writing to HTTP files not implemented");
auto &hfh = handle.Cast<HTTPFileHandle>();

if (location != hfh.SeekPosition()) {
throw NotImplementedException("Writing to HTTP Files must be sequential");
}

// Ensure the write buffer is allocated and sized
if (hfh.write_buffer.empty() || hfh.current_buffer_len == 0) {
hfh.write_buffer.resize(hfh.WRITE_BUFFER_LEN);
hfh.current_buffer_len = hfh.WRITE_BUFFER_LEN;
hfh.write_buffer_idx = 0;
}

idx_t remaining = nr_bytes;
auto data = reinterpret_cast<const data_t *>(buffer);
while (remaining > 0) {
idx_t space_left = hfh.current_buffer_len - hfh.write_buffer_idx;
idx_t to_write = std::min(remaining, space_left);
if (to_write > 0) {
memcpy(hfh.write_buffer.data() + hfh.write_buffer_idx, data, to_write);
hfh.write_buffer_idx += to_write;
data += to_write;
remaining -= to_write;
}
// If buffer is full, flush it
if (hfh.write_buffer_idx == hfh.current_buffer_len) {
FlushBuffer(hfh);
}
}
}

void HTTPFileSystem::FlushBuffer(HTTPFileHandle &hfh) {
if (hfh.write_buffer_idx == 0) {
return;
}

string path, proto_host_port;
HTTPUtil::DecomposeURL(hfh.path, path, proto_host_port);

HeaderMap header_map;
hfh.AddHeaders(header_map);
HTTPHeaders headers;
for (const auto &kv : header_map) {
headers.Insert(kv.first, kv.second);
}

auto &http_util = hfh.http_params.http_util;

PostRequestInfo post_request(hfh.path, headers, hfh.http_params,
const_data_ptr_cast(hfh.write_buffer.data()), hfh.write_buffer_idx);

auto res = http_util.Request(post_request);
if (!res->Success()) {
throw HTTPException(*res, "Failed to write to file");
}

hfh.write_buffer_idx = 0;
}

void HTTPFileHandle::Close() {
auto &fs = (HTTPFileSystem &)file_system;
if (flags.OpenForWriting()) {
fs.FlushBuffer(*this);
}
}

int64_t HTTPFileSystem::Write(FileHandle &handle, void *buffer, int64_t nr_bytes) {
auto &hfh = handle.Cast<HTTPFileHandle>();
Write(handle, buffer, nr_bytes, hfh.file_offset);
Write(handle, buffer, nr_bytes, hfh.SeekPosition());
return nr_bytes;
}

Expand Down Expand Up @@ -728,7 +793,26 @@ void HTTPFileHandle::StoreClient(unique_ptr<HTTPClient> client) {
client_cache.StoreClient(std::move(client));
}

ResponseWrapper::ResponseWrapper(HTTPResponse &res, string &original_url) {
this->code = static_cast<int>(res.status);
this->error = res.reason;
for (auto &header : res.headers) {
this->headers[header.first] = header.second;
}
this->http_url = res.url;
this->body = res.body;
}

HTTPFileHandle::~HTTPFileHandle() {
DUCKDB_LOG_FILE_SYSTEM_CLOSE((*this));
};
}

void HTTPFileSystem::Verify() {
// TODO
}

void HTTPFileHandle::AddHeaders(HeaderMap &map) {
// Add any necessary headers here. For now, this is a stub to resolve the linker error.
}

} // namespace duckdb
3 changes: 3 additions & 0 deletions extension/httpfs/httpfs_extension.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ static void LoadInternal(DatabaseInstance &instance) {
LogicalType::BOOLEAN, Value(false));
config.AddExtensionOption("ca_cert_file", "Path to a custom certificate file for self-signed certificates.",
LogicalType::VARCHAR, Value(""));
// Experimental HTTPFS write
config.AddExtensionOption("enable_http_write", "Enable HTTPFS POST write", LogicalType::BOOLEAN, Value(false));

// Global S3 config
config.AddExtensionOption("s3_region", "S3 Region", LogicalType::VARCHAR, Value("us-east-1"));
config.AddExtensionOption("s3_access_key_id", "S3 Access Key ID", LogicalType::VARCHAR);
Expand Down
32 changes: 29 additions & 3 deletions extension/httpfs/include/httpfs.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,27 @@
#include "duckdb/main/client_data.hpp"
#include "http_metadata_cache.hpp"
#include "httpfs_client.hpp"
#include "duckdb/common/http_util.hpp"

#include <mutex>

namespace duckdb {

class HTTPLogger;

using HeaderMap = case_insensitive_map_t<string>;

// avoid including httplib in header
struct ResponseWrapper {
public:
explicit ResponseWrapper(HTTPResponse &res, string &original_url);
int code;
string error;
HeaderMap headers;
string http_url;
string body;
};

class HTTPClientCache {
public:
//! Get a client from the client cache
Expand Down Expand Up @@ -68,16 +84,23 @@ class HTTPFileHandle : public FileHandle {
duckdb::unique_ptr<data_t[]> read_buffer;
constexpr static idx_t READ_BUFFER_LEN = 1000000;

void AddHeaders(HTTPHeaders &map);
// Write buffer
constexpr static idx_t WRITE_BUFFER_LEN = 1000000;
std::vector<data_t> write_buffer; // Use a vector instead of a fixed-size array
idx_t write_buffer_idx = 0; // Tracks the current index in the buffer
idx_t current_buffer_len;

shared_ptr<HTTPState> state;

void AddHeaders(HeaderMap &map);

// Get a Client to run requests over
unique_ptr<HTTPClient> GetClient();
// Return the client for re-use
void StoreClient(unique_ptr<HTTPClient> client);

public:
void Close() override {
}
void Close() override;

protected:
//! Create a new Client
Expand All @@ -91,6 +114,8 @@ class HTTPFileHandle : public FileHandle {
};

class HTTPFileSystem : public FileSystem {
friend HTTPFileHandle;

public:
static bool TryParseLastModifiedTime(const string &timestamp, time_t &result);

Expand Down Expand Up @@ -163,6 +188,7 @@ class HTTPFileSystem : public FileSystem {
// Global cache
mutex global_cache_lock;
duckdb::unique_ptr<HTTPMetadataCache> global_metadata_cache;
void FlushBuffer(HTTPFileHandle &hfh);
};

} // namespace duckdb
10 changes: 5 additions & 5 deletions extension/httpfs/include/httpfs_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,17 @@ struct FileOpenerInfo;
class HTTPState;

struct HTTPFSParams : public HTTPParams {
HTTPFSParams(HTTPUtil &http_util) : HTTPParams(http_util) {
}
using HTTPParams::HTTPParams;

static constexpr bool DEFAULT_ENABLE_SERVER_CERT_VERIFICATION = false;
static constexpr uint64_t DEFAULT_HF_MAX_PER_PAGE = 0;
static constexpr bool DEFAULT_FORCE_DOWNLOAD = false;
static constexpr uint64_t DEFAULT_HF_MAX_PER_PAGE = 0;
static constexpr bool DEFAULT_ENABLE_SERVER_CERT_VERIFICATION = true;

bool force_download = DEFAULT_FORCE_DOWNLOAD;
bool enable_server_cert_verification = DEFAULT_ENABLE_SERVER_CERT_VERIFICATION;
idx_t hf_max_per_page = DEFAULT_HF_MAX_PER_PAGE;
bool enable_server_cert_verification = DEFAULT_ENABLE_SERVER_CERT_VERIFICATION;
string ca_cert_file;
bool enable_http_write = false;
string bearer_token;
shared_ptr<HTTPState> state;
};
Expand Down
7 changes: 3 additions & 4 deletions extension/httpfs/s3fs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1016,7 +1016,7 @@ HTTPException S3FileSystem::GetS3Error(S3AuthParams &s3_auth_params, const HTTPR
if (response.status == HTTPStatusCode::Forbidden_403) {
extra_text = GetS3AuthError(s3_auth_params);
}
auto status_message = HTTPFSUtil::GetStatusMessage(response.status);
auto status_message = HTTPUtil::GetStatusMessage(response.status);
throw HTTPException(response, "HTTP GET error reading '%s' in region '%s' (HTTP %d %s)%s", url,
s3_auth_params.region, response.status, status_message, extra_text);
}
Expand Down Expand Up @@ -1044,16 +1044,15 @@ string AWSListObjectV2::Request(string &path, HTTPParams &http_params, S3AuthPar
req_params += "&delimiter=%2F";
}

string listobjectv2_url = req_path + "?" + req_params;
string listobjectv2_url = parsed_url.http_proto + parsed_url.host + req_path + "?" + req_params;

auto header_map =
create_s3_header(req_path, req_params, parsed_url.host, "s3", "GET", s3_auth_params, "", "", "", "");

// Get requests use fresh connection
string full_host = parsed_url.http_proto + parsed_url.host;
std::stringstream response;
GetRequestInfo get_request(
full_host, listobjectv2_url, header_map, http_params,
listobjectv2_url, header_map, http_params,
[&](const HTTPResponse &response) {
if (static_cast<int>(response.status) >= 400) {
string trimmed_path = path;
Expand Down