Skip to content

Refactor S3 Multi part upload to separate class #79

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@
extension/httpfs/http_state.cpp
extension/httpfs/crypto.cpp
extension/httpfs/hash_functions.cpp
extension/httpfs/s3_multi_part_upload.cpp
extension/httpfs/create_secret_functions.cpp
extension/httpfs/httpfs_extension.cpp
${EXTRA_SOURCES} )
${EXTRA_SOURCES})

set(PARAMETERS "-warnings")
build_loadable_extension(
Expand Down
29 changes: 17 additions & 12 deletions extension/httpfs/httpfs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,23 +96,21 @@ void HTTPClientCache::StoreClient(unique_ptr<HTTPClient> client) {
clients.push_back(std::move(client));
}

unique_ptr<HTTPResponse> HTTPFileSystem::PostRequest(FileHandle &handle, string url, HTTPHeaders header_map,
unique_ptr<HTTPResponse> HTTPFileSystem::PostRequest(HTTPInput &input, string url, HTTPHeaders header_map,
string &buffer_out, char *buffer_in, idx_t buffer_in_len,
string params) {
auto &hfh = handle.Cast<HTTPFileHandle>();
auto &http_util = hfh.http_params.http_util;
PostRequestInfo post_request(url, header_map, hfh.http_params, const_data_ptr_cast(buffer_in), buffer_in_len);
auto &http_util = input.http_params.http_util;
PostRequestInfo post_request(url, header_map, input.http_params, const_data_ptr_cast(buffer_in), buffer_in_len);
auto result = http_util.Request(post_request);
buffer_out = std::move(post_request.buffer_out);
return result;
}

unique_ptr<HTTPResponse> HTTPFileSystem::PutRequest(FileHandle &handle, string url, HTTPHeaders header_map,
unique_ptr<HTTPResponse> HTTPFileSystem::PutRequest(HTTPInput &input, string url, HTTPHeaders header_map,
char *buffer_in, idx_t buffer_in_len, string params) {
auto &hfh = handle.Cast<HTTPFileHandle>();
auto &http_util = hfh.http_params.http_util;
auto &http_util = input.http_params.http_util;
string content_type = "application/octet-stream";
PutRequestInfo put_request(url, header_map, hfh.http_params, (const_data_ptr_t)buffer_in, buffer_in_len,
PutRequestInfo put_request(url, header_map, input.http_params, (const_data_ptr_t)buffer_in, buffer_in_len,
content_type);
return http_util.Request(put_request);
}
Expand Down Expand Up @@ -272,10 +270,11 @@ void TimestampToTimeT(timestamp_t timestamp, time_t &result) {
result = mktime(&tm);
}

HTTPFileHandle::HTTPFileHandle(FileSystem &fs, const OpenFileInfo &file, FileOpenFlags flags,
unique_ptr<HTTPParams> params_p)
: FileHandle(fs, file.path, flags), params(std::move(params_p)), http_params(params->Cast<HTTPFSParams>()),
flags(flags), length(0), force_full_download(false), buffer_available(0), buffer_idx(0), file_offset(0), buffer_start(0), buffer_end(0) {
HTTPInput::HTTPInput(unique_ptr<HTTPParams> params_p) : params(std::move(params_p)), http_params(params->Cast<HTTPFSParams>()) {}

HTTPFileHandle::HTTPFileHandle(FileSystem &fs, const OpenFileInfo &file, FileOpenFlags flags, shared_ptr<HTTPInput> input_p)
: FileHandle(fs, file.path, flags), http_input(std::move(input_p)), http_params(http_input->http_params),
flags(flags), length(0), force_full_download(false), buffer_available(0), buffer_idx(0), file_offset(0), buffer_start(0), buffer_end(0) {
// check if the handle has extended properties that can be set directly in the handle
// if we have these properties we don't need to do a head request to obtain them later
if (file.extended_info) {
Expand Down Expand Up @@ -303,6 +302,12 @@ HTTPFileHandle::HTTPFileHandle(FileSystem &fs, const OpenFileInfo &file, FileOpe
}
}
}

HTTPFileHandle::HTTPFileHandle(FileSystem &fs, const OpenFileInfo &file, FileOpenFlags flags,
unique_ptr<HTTPParams> params_p) :
HTTPFileHandle(fs, file, flags, make_shared_ptr<HTTPInput>(std::move(params_p))) {
}

unique_ptr<HTTPFileHandle> HTTPFileSystem::CreateHandle(const OpenFileInfo &file, FileOpenFlags flags,
optional_ptr<FileOpener> opener) {
D_ASSERT(flags.Compression() == FileCompressionType::UNCOMPRESSED);
Expand Down
27 changes: 24 additions & 3 deletions extension/httpfs/include/httpfs.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,40 @@ class HTTPClientCache {
mutex lock;
};

class HTTPInput {
public:
HTTPInput(unique_ptr<HTTPParams> params_p);
virtual ~HTTPInput() = default;

unique_ptr<HTTPParams> params;
HTTPFSParams &http_params;

template <class TARGET>
TARGET &Cast() {
DynamicCastCheck<TARGET>(this);
return reinterpret_cast<TARGET &>(*this);
}
template <class TARGET>
const TARGET &Cast() const {
DynamicCastCheck<TARGET>(this);
return reinterpret_cast<const TARGET &>(*this);
}
};

class HTTPFileSystem;

class HTTPFileHandle : public FileHandle {
public:
HTTPFileHandle(FileSystem &fs, const OpenFileInfo &file, FileOpenFlags flags, unique_ptr<HTTPParams> params);
HTTPFileHandle(FileSystem &fs, const OpenFileInfo &file, FileOpenFlags flags, shared_ptr<HTTPInput> input);
~HTTPFileHandle() override;
// This two-phase construction allows subclasses more flexible setup.
virtual void Initialize(optional_ptr<FileOpener> opener);

// We keep an http client stored for connection reuse with keep-alive headers
HTTPClientCache client_cache;

unique_ptr<HTTPParams> params;
shared_ptr<HTTPInput> http_input;
HTTPFSParams &http_params;

// File handle info
Expand Down Expand Up @@ -106,10 +127,10 @@ class HTTPFileSystem : public FileSystem {
// Get Request without a range (i.e., downloads full file)
virtual duckdb::unique_ptr<HTTPResponse> GetRequest(FileHandle &handle, string url, HTTPHeaders header_map);
// Post Request that can handle variable sized responses without a content-length header (needed for s3 multipart)
virtual duckdb::unique_ptr<HTTPResponse> PostRequest(FileHandle &handle, string url, HTTPHeaders header_map,
virtual duckdb::unique_ptr<HTTPResponse> PostRequest(HTTPInput &input, string url, HTTPHeaders header_map,
string &result, char *buffer_in, idx_t buffer_in_len,
string params = "");
virtual duckdb::unique_ptr<HTTPResponse> PutRequest(FileHandle &handle, string url, HTTPHeaders header_map,
virtual duckdb::unique_ptr<HTTPResponse> PutRequest(HTTPInput &input, string url, HTTPHeaders header_map,
char *buffer_in, idx_t buffer_in_len, string params = "");

virtual duckdb::unique_ptr<HTTPResponse> DeleteRequest(FileHandle &handle, string url, HTTPHeaders header_map);
Expand Down
84 changes: 84 additions & 0 deletions extension/httpfs/include/s3_multi_part_upload.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
#pragma once

#include "s3fs.hpp"
#include "duckdb/execution/task_error_manager.hpp"

namespace duckdb {

// Holds the buffered data for 1 part of an S3 Multipart upload
class S3WriteBuffer {
public:
explicit S3WriteBuffer(idx_t buffer_start, size_t buffer_size, BufferHandle buffer_p)
: idx(0), buffer_start(buffer_start), buffer(std::move(buffer_p)) {
buffer_end = buffer_start + buffer_size;
part_no = buffer_start / buffer_size;
uploading = false;
}

void *Ptr() {
return buffer.Ptr();
}

// The S3 multipart part number. Note that internally we start at 0 but AWS S3 starts at 1
idx_t part_no;

idx_t idx;
idx_t buffer_start;
idx_t buffer_end;
BufferHandle buffer;
atomic<bool> uploading;
};

class S3MultiPartUpload : public enable_shared_from_this<S3MultiPartUpload> {
public:
S3MultiPartUpload(S3FileSystem &s3fs_p, S3FileHandle &file_handle_p);

public:
static shared_ptr<S3MultiPartUpload> Initialize(S3FileHandle &file_handle);

idx_t GetPartSize() const {
return part_size;
}

shared_ptr<S3WriteBuffer> GetBuffer(idx_t write_buffer_idx);
static void FlushBuffer(shared_ptr<S3MultiPartUpload> upload_state, shared_ptr<S3WriteBuffer> write_buffer);
static void UploadBuffer(shared_ptr<S3MultiPartUpload> upload_state, shared_ptr<S3WriteBuffer> write_buffer);
void Finalize();

private:
string InitializeMultipartUpload();
void NotifyUploadsInProgress();
void FlushAllBuffers();
void FinalizeMultipartUpload();

private:
S3FileSystem &s3fs;
shared_ptr<HTTPInput> http_input;
string path;
S3ConfigParams config_params;
string multipart_upload_id;
idx_t part_size;

//! Write buffers for this file
mutex write_buffers_lock;
unordered_map<idx_t, shared_ptr<S3WriteBuffer>> write_buffers;

//! Synchronization for upload threads
mutex uploads_in_progress_lock;
std::condition_variable uploads_in_progress_cv;
std::condition_variable final_flush_cv;
uint16_t uploads_in_progress;

//! Etags are stored for each part
mutex part_etags_lock;
unordered_map<idx_t, string> part_etags;

//! Info for upload
atomic<uint16_t> parts_uploaded;
atomic<bool> upload_finalized;

//! Error handling in upload threads
TaskErrorManager error_manager;
};

} // namespace duckdb
92 changes: 14 additions & 78 deletions extension/httpfs/include/s3fs.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,92 +78,39 @@ struct S3ConfigParams {
static S3ConfigParams ReadFrom(optional_ptr<FileOpener> opener);
};

class S3FileSystem;

// Holds the buffered data for 1 part of an S3 Multipart upload
class S3WriteBuffer {
class S3HTTPInput : public HTTPInput {
public:
explicit S3WriteBuffer(idx_t buffer_start, size_t buffer_size, BufferHandle buffer_p)
: idx(0), buffer_start(buffer_start), buffer(std::move(buffer_p)) {
buffer_end = buffer_start + buffer_size;
part_no = buffer_start / buffer_size;
uploading = false;
}
S3HTTPInput(unique_ptr<HTTPParams> params, const S3AuthParams &auth_params_p, const S3ConfigParams &config_params_p);
~S3HTTPInput() override;

void *Ptr() {
return buffer.Ptr();
}
S3AuthParams auth_params;
S3ConfigParams config_params;
};

// The S3 multipart part number. Note that internally we start at 0 but AWS S3 starts at 1
idx_t part_no;
class S3FileSystem;

idx_t idx;
idx_t buffer_start;
idx_t buffer_end;
BufferHandle buffer;
atomic<bool> uploading;
};
class S3MultiPartUpload;

class S3FileHandle : public HTTPFileHandle {
friend class S3FileSystem;

public:
S3FileHandle(FileSystem &fs, const OpenFileInfo &file, FileOpenFlags flags, unique_ptr<HTTPParams> http_params_p,
const S3AuthParams &auth_params_p, const S3ConfigParams &config_params_p)
: HTTPFileHandle(fs, file, flags, std::move(http_params_p)), auth_params(auth_params_p),
config_params(config_params_p), uploads_in_progress(0), parts_uploaded(0), upload_finalized(false),
uploader_has_error(false), upload_exception(nullptr) {
if (flags.OpenForReading() && flags.OpenForWriting()) {
throw NotImplementedException("Cannot open an HTTP file for both reading and writing");
} else if (flags.OpenForAppending()) {
throw NotImplementedException("Cannot open an HTTP file for appending");
}
}
const S3AuthParams &auth_params_p, const S3ConfigParams &config_params_p);
~S3FileHandle() override;

S3AuthParams auth_params;
S3AuthParams &auth_params;
const S3ConfigParams config_params;
shared_ptr<S3MultiPartUpload> multi_part_upload;

public:
void Close() override;
void Initialize(optional_ptr<FileOpener> opener) override;

shared_ptr<S3WriteBuffer> GetBuffer(uint16_t write_buffer_idx);
void FinalizeUpload();

protected:
string multipart_upload_id;
size_t part_size;

//! Write buffers for this file
mutex write_buffers_lock;
unordered_map<uint16_t, shared_ptr<S3WriteBuffer>> write_buffers;

//! Synchronization for upload threads
mutex uploads_in_progress_lock;
std::condition_variable uploads_in_progress_cv;
std::condition_variable final_flush_cv;
uint16_t uploads_in_progress;

//! Etags are stored for each part
mutex part_etags_lock;
unordered_map<uint16_t, string> part_etags;

//! Info for upload
atomic<uint16_t> parts_uploaded;
bool upload_finalized = true;

//! Error handling in upload threads
atomic<bool> uploader_has_error {false};
std::exception_ptr upload_exception;

unique_ptr<HTTPClient> CreateClient() override;

//! Rethrow IO Exception originating from an upload thread
void RethrowIOError() {
if (uploader_has_error) {
std::rethrow_exception(upload_exception);
}
}
};

class S3FileSystem : public HTTPFileSystem {
Expand All @@ -180,10 +127,10 @@ class S3FileSystem : public HTTPFileSystem {
duckdb::unique_ptr<HTTPResponse> GetRangeRequest(FileHandle &handle, string s3_url, HTTPHeaders header_map,
idx_t file_offset, char *buffer_out,
idx_t buffer_out_len) override;
duckdb::unique_ptr<HTTPResponse> PostRequest(FileHandle &handle, string s3_url, HTTPHeaders header_map,
duckdb::unique_ptr<HTTPResponse> PostRequest(HTTPInput &input, string s3_url, HTTPHeaders header_map,
string &buffer_out, char *buffer_in, idx_t buffer_in_len,
string http_params = "") override;
duckdb::unique_ptr<HTTPResponse> PutRequest(FileHandle &handle, string s3_url, HTTPHeaders header_map,
duckdb::unique_ptr<HTTPResponse> PutRequest(HTTPInput &input, string s3_url, HTTPHeaders header_map,
char *buffer_in, idx_t buffer_in_len, string http_params = "") override;
duckdb::unique_ptr<HTTPResponse> DeleteRequest(FileHandle &handle, string s3_url, HTTPHeaders header_map) override;

Expand All @@ -196,21 +143,12 @@ class S3FileSystem : public HTTPFileSystem {
void FileSync(FileHandle &handle) override;
void Write(FileHandle &handle, void *buffer, int64_t nr_bytes, idx_t location) override;

string InitializeMultipartUpload(S3FileHandle &file_handle);
void FinalizeMultipartUpload(S3FileHandle &file_handle);

void FlushAllBuffers(S3FileHandle &handle);

void ReadQueryParams(const string &url_query_param, S3AuthParams &params);
static ParsedS3Url S3UrlParse(string url, S3AuthParams &params);

static string UrlEncode(const string &input, bool encode_slash = false);
static string UrlDecode(string input);

// Uploads the contents of write_buffer to S3.
// Note: caller is responsible to not call this method twice on the same buffer
static void UploadBuffer(S3FileHandle &file_handle, shared_ptr<S3WriteBuffer> write_buffer);

vector<OpenFileInfo> Glob(const string &glob_pattern, FileOpener *opener = nullptr) override;
bool ListFiles(const string &directory, const std::function<void(const string &, bool)> &callback,
FileOpener *opener = nullptr) override;
Expand All @@ -228,11 +166,9 @@ class S3FileSystem : public HTTPFileSystem {
static HTTPException GetS3Error(S3AuthParams &s3_auth_params, const HTTPResponse &response, const string &url);

protected:
static void NotifyUploadsInProgress(S3FileHandle &file_handle);
duckdb::unique_ptr<HTTPFileHandle> CreateHandle(const OpenFileInfo &file, FileOpenFlags flags,
optional_ptr<FileOpener> opener) override;

void FlushBuffer(S3FileHandle &handle, shared_ptr<S3WriteBuffer> write_buffer);
string GetPayloadHash(char *buffer, idx_t buffer_len);

HTTPException GetHTTPError(FileHandle &, const HTTPResponse &response, const string &url) override;
Expand Down
Loading
Loading